rewrite compress methods for new GameState object, proper error handling
This commit is contained in:
parent
b0d0aaae1b
commit
7aa8a78a9d
1 changed files with 175 additions and 143 deletions
318
compress.py
318
compress.py
|
@ -1,15 +1,18 @@
|
||||||
#! /bin/python3
|
#! /bin/python3
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
from enum import Enum
|
|
||||||
from typing import List, Optional
|
|
||||||
import more_itertools
|
import more_itertools
|
||||||
from variants import variant_id, variant_name
|
|
||||||
|
from enum import Enum
|
||||||
from termcolor import colored
|
from termcolor import colored
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
from variants import variant_id, variant_name
|
||||||
|
from hanabi import DeckCard, ActionType, Action, GameState, HanabiInstance
|
||||||
|
|
||||||
|
|
||||||
|
# use same BASE62 as on hanab.live to encode decks
|
||||||
BASE62 = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
BASE62 = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||||
COLORS = 'rygbpt'
|
|
||||||
|
|
||||||
|
|
||||||
# Helper method, iterate over chunks of length n in a string
|
# Helper method, iterate over chunks of length n in a string
|
||||||
|
@ -18,68 +21,9 @@ def chunks(s: str, n: int):
|
||||||
yield s[i:i+n]
|
yield s[i:i+n]
|
||||||
|
|
||||||
|
|
||||||
class DeckCard():
|
# exception thrown by decompression methods if parsing fails
|
||||||
def __init__(self, suitIndex: int, rank: int, deck_index=None):
|
class InvalidFormatError(ValueError):
|
||||||
self.suitIndex: int = suitIndex
|
pass
|
||||||
self.rank: int = rank
|
|
||||||
self.deck_index = deck_index
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_json(deck_card):
|
|
||||||
return DeckCard(**deck_card)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return self.suitIndex == other.suitIndex and self.rank == other.rank
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return COLORS[self.suitIndex] + str(self.rank)
|
|
||||||
|
|
||||||
def __hash__(self):
|
|
||||||
# should be injective enough, we never use cards with ranks differing by 1000
|
|
||||||
return 1000 * self.suitIndex + self.rank
|
|
||||||
|
|
||||||
def colorize(self):
|
|
||||||
color = ["green", "blue", "magenta", "yellow", "white", "cyan"][self.suitIndex]
|
|
||||||
return colored(str(self), color)
|
|
||||||
|
|
||||||
class ActionType(Enum):
|
|
||||||
Play = 0
|
|
||||||
Discard = 1
|
|
||||||
ColorClue = 2
|
|
||||||
RankClue = 3
|
|
||||||
EndGame = 4
|
|
||||||
VoteTerminate = 5 ## hack: online, this is encoded as a 10
|
|
||||||
|
|
||||||
|
|
||||||
class Action():
|
|
||||||
def __init__(self, type_: ActionType, target: int, value: Optional[int] = None):
|
|
||||||
self.type = type_
|
|
||||||
self.target = target
|
|
||||||
self.value = value
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_json(action):
|
|
||||||
return Action(
|
|
||||||
ActionType(action['type']),
|
|
||||||
int(action['target']),
|
|
||||||
action.get('value', None)
|
|
||||||
)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
match self.type:
|
|
||||||
case ActionType.Play:
|
|
||||||
return "Play card {}".format(self.target)
|
|
||||||
case ActionType.Discard:
|
|
||||||
return "Discard card {}".format(self.target)
|
|
||||||
case ActionType.ColorClue:
|
|
||||||
return "Clue color {} to player {}".format(self.value, self.target)
|
|
||||||
case ActionType.RankClue:
|
|
||||||
return "Clue rank {} to player {}".format(self.value, self.target)
|
|
||||||
case ActionType.EndGame:
|
|
||||||
return "Player {} ends the game (code {})".format(self.target, self.value)
|
|
||||||
case ActionType.VoteTerminate:
|
|
||||||
return "Players vote to terminate the game (code {})".format(self.value)
|
|
||||||
return "Undefined action"
|
|
||||||
|
|
||||||
|
|
||||||
def compress_actions(actions: List[Action], game_id=None) -> str:
|
def compress_actions(actions: List[Action], game_id=None) -> str:
|
||||||
|
@ -89,43 +33,78 @@ def compress_actions(actions: List[Action], game_id=None) -> str:
|
||||||
minType = min(map(lambda a: a.type.value, actions))
|
minType = min(map(lambda a: a.type.value, actions))
|
||||||
maxType = max(map(lambda a: a.type.value, actions))
|
maxType = max(map(lambda a: a.type.value, actions))
|
||||||
typeRange = maxType - minType + 1
|
typeRange = maxType - minType + 1
|
||||||
|
|
||||||
def compress_action(action):
|
def compress_action(action):
|
||||||
## We encode action values with +1 to differentiate
|
## We encode action values with +1 to differentiate
|
||||||
# null (encoded 0) and 0 (encoded 1)
|
# null (encoded 0) and 0 (encoded 1)
|
||||||
value = 0 if action.value is None else action.value + 1
|
value = 0 if action.value is None else action.value + 1
|
||||||
if action.type == ActionType.VoteTerminate:
|
if action.type == ActionType.VoteTerminate:
|
||||||
|
# This is currently a hack, the actual format has a 10 here
|
||||||
|
# but we cannot encode this
|
||||||
value = 0
|
value = 0
|
||||||
with open('vote_terminate_actions.txt', 'a') as f:
|
try:
|
||||||
f.write('target: {}, value: {}, game_id: {}\n'.format(action.target, action.value, game_id))
|
a = BASE62[typeRange * value + (action.type.value - minType)]
|
||||||
a = BASE62[typeRange * value + (action.type.value - minType)]
|
b = BASE62[action.target]
|
||||||
b = BASE62[action.target]
|
except IndexError as e:
|
||||||
|
raise ValueError("Encoding action failed, value too large, found {}".format(value)) from e
|
||||||
return a + b
|
return a + b
|
||||||
out = str(minType) + str(maxType)
|
|
||||||
out += ''.join(map(compress_action, actions))
|
return "{}{}{}".format(
|
||||||
return out
|
minType,
|
||||||
|
maxType,
|
||||||
|
''.join(map(compress_action, actions))
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def decompress_actions(actions_str: str) -> List[Action]:
|
def decompress_actions(actions_str: str) -> List[Action]:
|
||||||
|
if not len(actions_str >= 2):
|
||||||
|
raise InvalidFormatError("min/max range not specified, found: {}".format(actions_str))
|
||||||
try:
|
try:
|
||||||
minType = int(actions_str[0])
|
minType = int(actions_str[0])
|
||||||
maxType = int(actions_str[1])
|
maxType = int(actions_str[1])
|
||||||
except ValueError:
|
except ValueError as e:
|
||||||
raise ValueError("invalid action string: invalid min/max range")
|
raise InvalidFormatError(
|
||||||
assert(maxType >= minType)
|
"min/max range of actions not specified, expected two integers, found {}".format(actions_str[:2])
|
||||||
if not len(actions_str) % 2 == 0:
|
) from e
|
||||||
raise ValueError("Invalid length of action str")
|
if not minType <= maxType:
|
||||||
|
raise InvalidFormatError("min/max range illegal, found [{},{}]".format(minType, maxType))
|
||||||
typeRange = maxType - minType + 1
|
typeRange = maxType - minType + 1
|
||||||
def decompress_action(action):
|
|
||||||
actionType = ActionType((BASE62.index(action[0]) % typeRange) + minType)
|
if not len(actions_str) % 2 == 0:
|
||||||
value = None
|
raise InvalidFormatError("Invalid action string length: Expected even number of characters")
|
||||||
if actionType not in [actionType.Play, actionType.Discard]:
|
|
||||||
## We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1)
|
for (index, char) in enumerate(actions_str[2:]):
|
||||||
value = BASE62.index(action[0]) // typeRange - 1
|
if not char in BASE62:
|
||||||
if value == -1:
|
raise InvalidFormatError(
|
||||||
value = None
|
"Invalid character at index {}: Found {}, expected one of {}".format(
|
||||||
|
index, char, BASE62
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def decompress_action(index, action):
|
||||||
|
try:
|
||||||
|
action_type_value = (BASE62.index(action[0]) % typeRange) + minType
|
||||||
|
action_type = ActionType(action_type_value)
|
||||||
|
except ValueError as e:
|
||||||
|
raise InvalidFormatError(
|
||||||
|
"Invalid action type at action {}: Found {}, expected one of {}".format(
|
||||||
|
index, actionTypeValue,
|
||||||
|
[action_type.value for action_type in ActionType]
|
||||||
|
)
|
||||||
|
) from e
|
||||||
|
## We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1)
|
||||||
|
value = BASE62.index(action[0]) // typeRange - 1
|
||||||
|
if value == -1:
|
||||||
|
value = None
|
||||||
|
if action_type in [ActionType.Play, ActionType.Discard]:
|
||||||
|
if value is not None:
|
||||||
|
raise InvalidFormatError(
|
||||||
|
"Invalid action value: Action at action index {} is Play/Discard, expected value None, found: {}".format(value)
|
||||||
|
)
|
||||||
target = BASE62.index(action[1])
|
target = BASE62.index(action[1])
|
||||||
return Action(actionType, target, value)
|
return Action(action_type, target, value)
|
||||||
return [decompress_action(a) for a in chunks(actions_str[2:], 2)]
|
|
||||||
|
return [decompress_action(idx, a) for (idx, a) in enumerate(chunks(actions_str[2:], 2))]
|
||||||
|
|
||||||
|
|
||||||
def compress_deck(deck: List[DeckCard]) -> str:
|
def compress_deck(deck: List[DeckCard]) -> str:
|
||||||
|
@ -133,83 +112,136 @@ def compress_deck(deck: List[DeckCard]) -> str:
|
||||||
minRank = min(map(lambda c: c.rank, deck))
|
minRank = min(map(lambda c: c.rank, deck))
|
||||||
maxRank = max(map(lambda c: c.rank, deck))
|
maxRank = max(map(lambda c: c.rank, deck))
|
||||||
rankRange = maxRank - minRank + 1
|
rankRange = maxRank - minRank + 1
|
||||||
|
|
||||||
def compress_card(card):
|
def compress_card(card):
|
||||||
return BASE62[rankRange * card.suitIndex + (card.rank - minRank)]
|
try:
|
||||||
out = str(minRank) + str(maxRank)
|
return BASE62[rankRange * card.suitIndex + (card.rank - minRank)]
|
||||||
out += ''.join(map(compress_card, deck))
|
except IndexError as e:
|
||||||
return out
|
raise InvalidFormatError(
|
||||||
|
"Could not compress card, suit or rank too large. Found: {}".format(card)
|
||||||
|
) from e
|
||||||
|
return "{}{}{}".format(
|
||||||
|
minRank,
|
||||||
|
maxRank,
|
||||||
|
''.join(map(compress_card, deck))
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def decompress_deck(deck_str: str) -> List[DeckCard]:
|
def decompress_deck(deck_str: str) -> List[DeckCard]:
|
||||||
assert(len(deck_str) >= 2)
|
if len(deck_str) < 2:
|
||||||
minRank = int(deck_str[0])
|
raise InvalidFormatError("min/max rank range not specified, found: {}".format(deck_str))
|
||||||
maxRank = int(deck_str[1])
|
try:
|
||||||
assert(maxRank >= minRank)
|
minRank = int(deck_str[0])
|
||||||
|
maxRank = int(deck_str[1])
|
||||||
|
except ValueError as e:
|
||||||
|
raise InvalidFormatError(
|
||||||
|
"min/max rank range not specified, expected two integers, found {}".format(actions_str[:2])
|
||||||
|
) from e
|
||||||
|
if not maxRank >= minRank:
|
||||||
|
raise InvalidFormatError(
|
||||||
|
"Invalid rank range, found [{},{}]".format(minRank, maxRank)
|
||||||
|
)
|
||||||
rankRange = maxRank - minRank + 1
|
rankRange = maxRank - minRank + 1
|
||||||
|
|
||||||
|
for (index, char) in enumerate(deck_str[2:]):
|
||||||
|
if not char in BASE62:
|
||||||
|
raise InvalidFormatError(
|
||||||
|
"Invalid character at index {}: Found {}, expected one of {}".format(
|
||||||
|
index, char, BASE62
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def decompress_card(card_char):
|
def decompress_card(card_char):
|
||||||
index = BASE62.index(card_char)
|
index = BASE62.index(card_char)
|
||||||
suitIndex = index // rankRange
|
suitIndex = index // rankRange
|
||||||
rank = index % rankRange + minRank
|
rank = index % rankRange + minRank
|
||||||
return DeckCard(suitIndex, rank)
|
return DeckCard(suitIndex, rank)
|
||||||
|
|
||||||
return [decompress_card(c) for c in deck_str[2:]]
|
return [decompress_card(c) for c in deck_str[2:]]
|
||||||
|
|
||||||
|
|
||||||
def compressJSONGame(game_json: dict) -> str:
|
# compresses a standard GameState object into hanab.live format
|
||||||
out = ""
|
# which can be used in json replay links
|
||||||
num_players = len(game_json.get('players', []))
|
# The GameState object has to be standard / fitting hanab.live variants,
|
||||||
num_players = game_json.get('num_players', num_players)
|
# otherwise compression is not possible
|
||||||
if not 2 <= num_players:
|
def compress_game_state(state: GameState) -> str:
|
||||||
raise ValueError("Invalid JSON game: could not parse num players")
|
if not state.instance.is_standard():
|
||||||
out = "{}".format(num_players)
|
raise ValueError("Cannot compress non-standard hanabi instance")
|
||||||
try:
|
out = "{}{},{},{}".format(
|
||||||
deck = game_json['deck']
|
state.instance.num_players,
|
||||||
except:
|
compress_deck(state.instance.deck),
|
||||||
raise KeyError("JSON game contains no deck")
|
compress_actions(state.actions),
|
||||||
assert(len(deck) > 0)
|
state.instance.variant_id # Note that a sane default is chosen if construction did not provide one
|
||||||
if type(deck[0]) != DeckCard:
|
)
|
||||||
try:
|
with_dashes = ''.join(more_itertools.intersperse("-", out, 20))
|
||||||
deck = [DeckCard.from_json(card) for card in deck]
|
return with_dashes
|
||||||
except:
|
|
||||||
raise ValueError("invalid jSON format: could not convert to deck cards")
|
|
||||||
# now, deck is in the correct form
|
|
||||||
out += compress_deck(deck)
|
|
||||||
out += "," # first part finished
|
|
||||||
actions = game_json.get('actions', [])
|
|
||||||
if len(actions) == 0:
|
|
||||||
print("WARNING: action array is empty")
|
|
||||||
else:
|
|
||||||
if type(actions[0]) != Action:
|
|
||||||
try:
|
|
||||||
actions = [Action.from_json(action) for action in actions]
|
|
||||||
except:
|
|
||||||
raise ValueError("invalid JSON format: could not convert to actions")
|
|
||||||
out += compress_actions(actions)
|
|
||||||
out += ","
|
|
||||||
variant = game_json.get("variant", "No Variant")
|
|
||||||
out += str(variant_id(variant))
|
|
||||||
return ''.join(more_itertools.intersperse("-", out, 20))
|
|
||||||
|
|
||||||
|
|
||||||
def decompressJSONGame(game_str: str)->dict:
|
def decompress_game_state(game_str: str) -> GameState:
|
||||||
game = {}
|
|
||||||
game_str = game_str.replace("-", "")
|
game_str = game_str.replace("-", "")
|
||||||
|
parts = game_str.split(",")
|
||||||
|
if not len(parts) == 3:
|
||||||
|
raise InvalidFormatError(
|
||||||
|
"Expected 3 comma-separated parts of game, found {}".format(
|
||||||
|
len(parts)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
[players_deck, actions, variant_id] = parts
|
||||||
|
if len(players_deck) == 0:
|
||||||
|
raise InvalidFormatError("Expected nonempty first part")
|
||||||
try:
|
try:
|
||||||
[players_deck, actions, variant_id] = game_str.split(",")
|
num_players = int(players_deck[0])
|
||||||
except:
|
except ValueError as e:
|
||||||
raise ValueError("Invalid format of compressed string!")
|
raise InvalidFormatError(
|
||||||
game['players'] = ["Alice", "Bob", "Cathy", "Donald", "Emily"][:int(players_deck[0])]
|
"Expected number of players, found: {}".format(players_deck[0])
|
||||||
game['deck'] = decompress_deck(players_deck[1:])
|
) from e
|
||||||
game['actions'] = decompress_actions(actions)
|
|
||||||
game['options'] = {
|
try:
|
||||||
"variant": variant_name(int(variant_id))
|
deck = decompress_deck(players_deck[1:])
|
||||||
}
|
except InvalidFormatError as e:
|
||||||
|
raise InvalidFormatError("Error while parsing deck") from e
|
||||||
|
|
||||||
|
try:
|
||||||
|
actions = decompress_actions(actions)
|
||||||
|
except InvalidFormatError as e:
|
||||||
|
raise InvalidFormatError("Error while parsing actions") from e
|
||||||
|
|
||||||
|
try:
|
||||||
|
variant_id = int(variant_id)
|
||||||
|
except ValueError:
|
||||||
|
raise ValueError("Expected variant id, found: {}".format(variant_id))
|
||||||
|
|
||||||
|
instance = HanabInstance(deck, num_players, variant_id=variant_id)
|
||||||
|
game = GameState(instance)
|
||||||
|
|
||||||
|
# TODO: game is not in consistent state
|
||||||
|
game.actions = actions
|
||||||
return game
|
return game
|
||||||
|
|
||||||
def link(game_json: dict) -> str:
|
|
||||||
compressed = compressJSONGame(game_json)
|
def link(game_state: GameState) -> str:
|
||||||
|
compressed = compress_game_state(game_state)
|
||||||
return "https://hanab.live/replay-json/{}".format(compressed)
|
return "https://hanab.live/replay-json/{}".format(compressed)
|
||||||
|
|
||||||
|
|
||||||
|
# add link method to GameState class
|
||||||
|
GameState.link = link
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
for arg in sys.argv[1:]:
|
for arg in sys.argv[1:]:
|
||||||
deck = decompress_deck(arg)
|
deck = decompress_deck(arg)
|
||||||
|
c = compress_deck(deck)
|
||||||
|
assert(c == arg)
|
||||||
print(deck)
|
print(deck)
|
||||||
|
|
||||||
|
inst = HanabiInstance(deck, 5, variant_id = 32)
|
||||||
|
game = GameState(inst)
|
||||||
|
game.play(1)
|
||||||
|
game.play(5)
|
||||||
|
game.clue()
|
||||||
|
# a = compress_game_state(game)
|
||||||
|
# print(a)
|
||||||
|
print(game.link())
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue