diff --git a/compress.py b/compress.py index 8173003..efb9407 100755 --- a/compress.py +++ b/compress.py @@ -1,15 +1,18 @@ #! /bin/python3 import json import sys -from enum import Enum -from typing import List, Optional import more_itertools -from variants import variant_id, variant_name + +from enum import Enum from termcolor import colored +from typing import List, Optional + +from variants import variant_id, variant_name +from hanabi import DeckCard, ActionType, Action, GameState, HanabiInstance +# use same BASE62 as on hanab.live to encode decks BASE62 = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; -COLORS = 'rygbpt' # Helper method, iterate over chunks of length n in a string @@ -18,68 +21,9 @@ def chunks(s: str, n: int): yield s[i:i+n] -class DeckCard(): - def __init__(self, suitIndex: int, rank: int, deck_index=None): - self.suitIndex: int = suitIndex - self.rank: int = rank - self.deck_index = deck_index - - @staticmethod - def from_json(deck_card): - return DeckCard(**deck_card) - - def __eq__(self, other): - return self.suitIndex == other.suitIndex and self.rank == other.rank - - def __repr__(self): - return COLORS[self.suitIndex] + str(self.rank) - - def __hash__(self): - # should be injective enough, we never use cards with ranks differing by 1000 - return 1000 * self.suitIndex + self.rank - - def colorize(self): - color = ["green", "blue", "magenta", "yellow", "white", "cyan"][self.suitIndex] - return colored(str(self), color) - -class ActionType(Enum): - Play = 0 - Discard = 1 - ColorClue = 2 - RankClue = 3 - EndGame = 4 - VoteTerminate = 5 ## hack: online, this is encoded as a 10 - - -class Action(): - def __init__(self, type_: ActionType, target: int, value: Optional[int] = None): - self.type = type_ - self.target = target - self.value = value - - @staticmethod - def from_json(action): - return Action( - ActionType(action['type']), - int(action['target']), - action.get('value', None) - ) - - def __repr__(self): - match self.type: - case ActionType.Play: - return "Play card {}".format(self.target) - case ActionType.Discard: - return "Discard card {}".format(self.target) - case ActionType.ColorClue: - return "Clue color {} to player {}".format(self.value, self.target) - case ActionType.RankClue: - return "Clue rank {} to player {}".format(self.value, self.target) - case ActionType.EndGame: - return "Player {} ends the game (code {})".format(self.target, self.value) - case ActionType.VoteTerminate: - return "Players vote to terminate the game (code {})".format(self.value) - return "Undefined action" +# exception thrown by decompression methods if parsing fails +class InvalidFormatError(ValueError): + pass def compress_actions(actions: List[Action], game_id=None) -> str: @@ -89,43 +33,78 @@ def compress_actions(actions: List[Action], game_id=None) -> str: minType = min(map(lambda a: a.type.value, actions)) maxType = max(map(lambda a: a.type.value, actions)) typeRange = maxType - minType + 1 + def compress_action(action): ## We encode action values with +1 to differentiate # null (encoded 0) and 0 (encoded 1) value = 0 if action.value is None else action.value + 1 if action.type == ActionType.VoteTerminate: + # This is currently a hack, the actual format has a 10 here + # but we cannot encode this value = 0 - with open('vote_terminate_actions.txt', 'a') as f: - f.write('target: {}, value: {}, game_id: {}\n'.format(action.target, action.value, game_id)) - a = BASE62[typeRange * value + (action.type.value - minType)] - b = BASE62[action.target] + try: + a = BASE62[typeRange * value + (action.type.value - minType)] + b = BASE62[action.target] + except IndexError as e: + raise ValueError("Encoding action failed, value too large, found {}".format(value)) from e return a + b - out = str(minType) + str(maxType) - out += ''.join(map(compress_action, actions)) - return out + + return "{}{}{}".format( + minType, + maxType, + ''.join(map(compress_action, actions)) + ) def decompress_actions(actions_str: str) -> List[Action]: + if not len(actions_str >= 2): + raise InvalidFormatError("min/max range not specified, found: {}".format(actions_str)) try: minType = int(actions_str[0]) maxType = int(actions_str[1]) - except ValueError: - raise ValueError("invalid action string: invalid min/max range") - assert(maxType >= minType) - if not len(actions_str) % 2 == 0: - raise ValueError("Invalid length of action str") + except ValueError as e: + raise InvalidFormatError( + "min/max range of actions not specified, expected two integers, found {}".format(actions_str[:2]) + ) from e + if not minType <= maxType: + raise InvalidFormatError("min/max range illegal, found [{},{}]".format(minType, maxType)) typeRange = maxType - minType + 1 - def decompress_action(action): - actionType = ActionType((BASE62.index(action[0]) % typeRange) + minType) - value = None - if actionType not in [actionType.Play, actionType.Discard]: - ## We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1) - value = BASE62.index(action[0]) // typeRange - 1 - if value == -1: - value = None + + if not len(actions_str) % 2 == 0: + raise InvalidFormatError("Invalid action string length: Expected even number of characters") + + for (index, char) in enumerate(actions_str[2:]): + if not char in BASE62: + raise InvalidFormatError( + "Invalid character at index {}: Found {}, expected one of {}".format( + index, char, BASE62 + ) + ) + + def decompress_action(index, action): + try: + action_type_value = (BASE62.index(action[0]) % typeRange) + minType + action_type = ActionType(action_type_value) + except ValueError as e: + raise InvalidFormatError( + "Invalid action type at action {}: Found {}, expected one of {}".format( + index, actionTypeValue, + [action_type.value for action_type in ActionType] + ) + ) from e + ## We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1) + value = BASE62.index(action[0]) // typeRange - 1 + if value == -1: + value = None + if action_type in [ActionType.Play, ActionType.Discard]: + if value is not None: + raise InvalidFormatError( + "Invalid action value: Action at action index {} is Play/Discard, expected value None, found: {}".format(value) + ) target = BASE62.index(action[1]) - return Action(actionType, target, value) - return [decompress_action(a) for a in chunks(actions_str[2:], 2)] + return Action(action_type, target, value) + + return [decompress_action(idx, a) for (idx, a) in enumerate(chunks(actions_str[2:], 2))] def compress_deck(deck: List[DeckCard]) -> str: @@ -133,83 +112,136 @@ def compress_deck(deck: List[DeckCard]) -> str: minRank = min(map(lambda c: c.rank, deck)) maxRank = max(map(lambda c: c.rank, deck)) rankRange = maxRank - minRank + 1 + def compress_card(card): - return BASE62[rankRange * card.suitIndex + (card.rank - minRank)] - out = str(minRank) + str(maxRank) - out += ''.join(map(compress_card, deck)) - return out + try: + return BASE62[rankRange * card.suitIndex + (card.rank - minRank)] + except IndexError as e: + raise InvalidFormatError( + "Could not compress card, suit or rank too large. Found: {}".format(card) + ) from e + return "{}{}{}".format( + minRank, + maxRank, + ''.join(map(compress_card, deck)) + ) def decompress_deck(deck_str: str) -> List[DeckCard]: - assert(len(deck_str) >= 2) - minRank = int(deck_str[0]) - maxRank = int(deck_str[1]) - assert(maxRank >= minRank) + if len(deck_str) < 2: + raise InvalidFormatError("min/max rank range not specified, found: {}".format(deck_str)) + try: + minRank = int(deck_str[0]) + maxRank = int(deck_str[1]) + except ValueError as e: + raise InvalidFormatError( + "min/max rank range not specified, expected two integers, found {}".format(actions_str[:2]) + ) from e + if not maxRank >= minRank: + raise InvalidFormatError( + "Invalid rank range, found [{},{}]".format(minRank, maxRank) + ) rankRange = maxRank - minRank + 1 + + for (index, char) in enumerate(deck_str[2:]): + if not char in BASE62: + raise InvalidFormatError( + "Invalid character at index {}: Found {}, expected one of {}".format( + index, char, BASE62 + ) + ) + def decompress_card(card_char): index = BASE62.index(card_char) suitIndex = index // rankRange rank = index % rankRange + minRank return DeckCard(suitIndex, rank) + return [decompress_card(c) for c in deck_str[2:]] -def compressJSONGame(game_json: dict) -> str: - out = "" - num_players = len(game_json.get('players', [])) - num_players = game_json.get('num_players', num_players) - if not 2 <= num_players: - raise ValueError("Invalid JSON game: could not parse num players") - out = "{}".format(num_players) - try: - deck = game_json['deck'] - except: - raise KeyError("JSON game contains no deck") - assert(len(deck) > 0) - if type(deck[0]) != DeckCard: - try: - deck = [DeckCard.from_json(card) for card in deck] - except: - raise ValueError("invalid jSON format: could not convert to deck cards") - # now, deck is in the correct form - out += compress_deck(deck) - out += "," # first part finished - actions = game_json.get('actions', []) - if len(actions) == 0: - print("WARNING: action array is empty") - else: - if type(actions[0]) != Action: - try: - actions = [Action.from_json(action) for action in actions] - except: - raise ValueError("invalid JSON format: could not convert to actions") - out += compress_actions(actions) - out += "," - variant = game_json.get("variant", "No Variant") - out += str(variant_id(variant)) - return ''.join(more_itertools.intersperse("-", out, 20)) +# compresses a standard GameState object into hanab.live format +# which can be used in json replay links +# The GameState object has to be standard / fitting hanab.live variants, +# otherwise compression is not possible +def compress_game_state(state: GameState) -> str: + if not state.instance.is_standard(): + raise ValueError("Cannot compress non-standard hanabi instance") + out = "{}{},{},{}".format( + state.instance.num_players, + compress_deck(state.instance.deck), + compress_actions(state.actions), + state.instance.variant_id # Note that a sane default is chosen if construction did not provide one + ) + with_dashes = ''.join(more_itertools.intersperse("-", out, 20)) + return with_dashes -def decompressJSONGame(game_str: str)->dict: - game = {} +def decompress_game_state(game_str: str) -> GameState: game_str = game_str.replace("-", "") + parts = game_str.split(",") + if not len(parts) == 3: + raise InvalidFormatError( + "Expected 3 comma-separated parts of game, found {}".format( + len(parts) + ) + ) + [players_deck, actions, variant_id] = parts + if len(players_deck) == 0: + raise InvalidFormatError("Expected nonempty first part") try: - [players_deck, actions, variant_id] = game_str.split(",") - except: - raise ValueError("Invalid format of compressed string!") - game['players'] = ["Alice", "Bob", "Cathy", "Donald", "Emily"][:int(players_deck[0])] - game['deck'] = decompress_deck(players_deck[1:]) - game['actions'] = decompress_actions(actions) - game['options'] = { - "variant": variant_name(int(variant_id)) - } + num_players = int(players_deck[0]) + except ValueError as e: + raise InvalidFormatError( + "Expected number of players, found: {}".format(players_deck[0]) + ) from e + + try: + deck = decompress_deck(players_deck[1:]) + except InvalidFormatError as e: + raise InvalidFormatError("Error while parsing deck") from e + + try: + actions = decompress_actions(actions) + except InvalidFormatError as e: + raise InvalidFormatError("Error while parsing actions") from e + + try: + variant_id = int(variant_id) + except ValueError: + raise ValueError("Expected variant id, found: {}".format(variant_id)) + + instance = HanabInstance(deck, num_players, variant_id=variant_id) + game = GameState(instance) + + # TODO: game is not in consistent state + game.actions = actions return game -def link(game_json: dict) -> str: - compressed = compressJSONGame(game_json) + +def link(game_state: GameState) -> str: + compressed = compress_game_state(game_state) return "https://hanab.live/replay-json/{}".format(compressed) + +# add link method to GameState class +GameState.link = link + + + if __name__ == "__main__": for arg in sys.argv[1:]: deck = decompress_deck(arg) + c = compress_deck(deck) + assert(c == arg) print(deck) + + inst = HanabiInstance(deck, 5, variant_id = 32) + game = GameState(inst) + game.play(1) + game.play(5) + game.clue() +# a = compress_game_state(game) +# print(a) + print(game.link()) +