From a93601c997f6023f1f84ce4922393dda0f1e1e31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Ke=C3=9Fler?= Date: Tue, 4 Jul 2023 20:06:06 +0200 Subject: [PATCH] Refactor imports, remove code in imported files We now only use relative imports for files in the same directory Also, only modules are imported, never classes/functions etc Furthermore, main methods in package files have been removed, since they do not belong there --- hanabi/database/__init__.py | 2 +- hanabi/database/database.py | 2 +- hanabi/database/variant_suits_schema.sql | 6 +- hanabi/{game.py => hanab_game.py} | 4 +- hanabi/hanabi_cli.py | 21 +- hanabi/live/check_game.py | 32 +-- hanabi/live/compress.py | 198 +++++++-------- hanabi/live/download_data.py | 58 ++--- hanabi/live/hanab_live.py | 32 +-- hanabi/live/instance_finder.py | 108 ++++---- hanabi/live/variants.py | 6 +- hanabi/logger_setup.py | 1 - hanabi/solvers/deck_analyzer.py | 114 ++++----- hanabi/solvers/greedy_solver.py | 45 ++-- hanabi/solvers/sat.py | 302 +++++++++++------------ test.py | 21 +- 16 files changed, 441 insertions(+), 511 deletions(-) rename hanabi/{game.py => hanab_game.py} (99%) diff --git a/hanabi/database/__init__.py b/hanabi/database/__init__.py index a5e7fa9..230c331 100644 --- a/hanabi/database/__init__.py +++ b/hanabi/database/__init__.py @@ -1 +1 @@ -from .database import cur, conn \ No newline at end of file +from .database import cur, conn diff --git a/hanabi/database/database.py b/hanabi/database/database.py index 1edbdcb..6392373 100644 --- a/hanabi/database/database.py +++ b/hanabi/database/database.py @@ -12,7 +12,7 @@ cur = conn.cursor() # populate_static_tables() -class Game(): +class Game: def __init__(self, info=None): self.id = -1 self.num_players = -1 diff --git a/hanabi/database/variant_suits_schema.sql b/hanabi/database/variant_suits_schema.sql index e0faea6..3c12299 100644 --- a/hanabi/database/variant_suits_schema.sql +++ b/hanabi/database/variant_suits_schema.sql @@ -1,6 +1,6 @@ /* Database schema for the tables storing information on available hanab.live variants, suits and colors */ -/* Available suits. The associated id is arbitrary upon initial generation, but fixed for referentiability */ +/* Available suits. The associated id is arbitrary upon initial generation, but fixed afterwards for identification */ DROP TABLE IF EXISTS suits CASCADE; CREATE TABLE suits ( id SERIAL PRIMARY KEY, @@ -27,7 +27,7 @@ CREATE TABLE suits ( ); CREATE INDEX suits_name_idx ON suits (name); -/* Available color clues. The indexing is arbitrary upon initial generation, but fixed for referentiability */ +/* Available color clues. The indexing is arbitrary upon initial generation, but fixed afterwards for identification */ DROP TABLE IF EXISTS colors CASCADE; CREATE TABLE colors ( id SERIAL PRIMARY KEY, @@ -99,7 +99,7 @@ CREATE TABLE variants ( */ special_rank_ranks SMALLINT NOT NULL DEFAULT 1, /** - Encodes how cards of the special rank (if present) are touched by colorss, + Encodes how cards of the special rank (if present) are touched by colors, in the same manner how we encoded in @table suits */ special_rank_colors SMALLINT NOT NULL DEFAULT 1, diff --git a/hanabi/game.py b/hanabi/hanab_game.py similarity index 99% rename from hanabi/game.py rename to hanabi/hanab_game.py index 8778fce..92900c7 100644 --- a/hanabi/game.py +++ b/hanabi/hanab_game.py @@ -1,4 +1,4 @@ -from typing import Optional, List +from typing import Optional, List, Generator from enum import Enum from termcolor import colored @@ -30,7 +30,7 @@ class DeckCard: return 1000 * self.suitIndex + self.rank -def pp_deck(deck: List[DeckCard]) -> str: +def pp_deck(deck: Generator[DeckCard, None, None]) -> str: return "[" + ", ".join(card.colorize() for card in deck) + "]" diff --git a/hanabi/hanabi_cli.py b/hanabi/hanabi_cli.py index d9d4dc1..6ba6087 100755 --- a/hanabi/hanabi_cli.py +++ b/hanabi/hanabi_cli.py @@ -4,10 +4,10 @@ import argparse import verboselogs -from hanabi import logger -from hanabi.live.check_game import check_game -from hanabi.live.download_data import detailed_export_game -from hanabi.live.compress import link +from hanabi import logger, logger_manager +from hanabi.live import check_game +from hanabi.live import download_data +from hanabi.live import compress """ init db + populate tables @@ -39,16 +39,16 @@ def add_analyze_subparser(subparsers): def analyze_game(game_id: int, download: bool = False): if download: - detailed_export_game(game_id) + download_data.detailed_export_game(game_id) logger.info('Analyzing game {}'.format(game_id)) - turn, sol = check_game(game_id) + turn, sol = check_game.check_game(game_id) if turn == 0: logger.info('Instance is unfeasible') else: logger.info('Game was first lost after {} turns.'.format(turn)) logger.info( 'A replay achieving perfect score from the previous turn onwards is: {}#{}' - .format(link(sol), turn) + .format(compress.link(sol), turn) ) @@ -66,8 +66,7 @@ def main_parser() -> argparse.ArgumentParser: return parser - -if __name__ == "__main__": +def hanabi_cli(): args = main_parser().parse_args() switcher = { 'analyze': analyze_game @@ -78,3 +77,7 @@ if __name__ == "__main__": method_args.pop('command') method_args.pop('verbose') switcher[args.command](**method_args) + + +if __name__ == "__main__": + hanabi_cli() \ No newline at end of file diff --git a/hanabi/live/check_game.py b/hanabi/live/check_game.py index 37749cd..2efbb4f 100644 --- a/hanabi/live/check_game.py +++ b/hanabi/live/check_game.py @@ -1,12 +1,12 @@ import copy from typing import Tuple -from hanabi.database import conn -from hanabi.live.compress import decompress_deck, decompress_actions, link -from hanabi.game import GameState -from hanabi.live.hanab_live import HanabLiveInstance, HanabLiveGameState -from hanabi.solvers.sat import solve_sat from hanabi import logger +from hanabi import database +from hanabi import hanab_game +from hanabi.live import hanab_live +from hanabi.live import compress +from hanabi.solvers import sat # returns minimal number T of turns (from game) after which instance was infeasible @@ -16,9 +16,9 @@ from hanabi import logger # returns 1 if instance is feasible but first turn is suboptimal # ... # # turns + 1 if the final state is still winning -def check_game(game_id: int) -> Tuple[int, GameState]: +def check_game(game_id: int) -> Tuple[int, hanab_game.GameState]: logger.debug("Analysing game {}".format(game_id)) - with conn.cursor() as cur: + with database.conn.cursor() as cur: cur.execute("SELECT games.num_players, deck, actions, score, games.variant_id FROM games " "INNER JOIN seeds ON seeds.seed = games.seed " "WHERE games.id = (%s)", @@ -28,25 +28,25 @@ def check_game(game_id: int) -> Tuple[int, GameState]: if res is None: raise ValueError("No game associated with id {} in database.".format(game_id)) (num_players, compressed_deck, compressed_actions, score, variant_id) = res - deck = decompress_deck(compressed_deck) - actions = decompress_actions(compressed_actions) + deck = compress.decompress_deck(compressed_deck) + actions = compress.decompress_actions(compressed_actions) - instance = HanabLiveInstance(deck, num_players, variant_id=variant_id) + instance = hanab_live.HanabLiveInstance(deck, num_players, variant_id=variant_id) # check if the instance is already won if instance.max_score == score: - game = HanabLiveGameState(instance) + game = hanab_live.HanabLiveGameState(instance) for action in actions: game.make_action(action) # instance has been won, nothing to compute here return len(actions) + 1, game # first, check if the instance itself is feasible: - game = HanabLiveGameState(instance) - solvable, solution = solve_sat(game) + game = hanab_live.HanabLiveGameState(instance) + solvable, solution = sat.solve_sat(game) if not solvable: return 0, solution - logger.verbose("Instance {} is feasible after 0 turns: {}".format(game_id, link(solution))) + logger.verbose("Instance {} is feasible after 0 turns: {}".format(game_id, compress.link(solution))) # store lower and upper bounds of numbers of turns after which we know the game was feasible / infeasible solvable_turn = 0 @@ -59,13 +59,13 @@ def check_game(game_id: int) -> Tuple[int, GameState]: for a in range(solvable_turn, try_turn): try_game.make_action(actions[a]) logger.debug("Checking if instance {} is feasible after {} turns.".format(game_id, try_turn)) - solvable, potential_sol = solve_sat(try_game) + solvable, potential_sol = sat.solve_sat(try_game) if solvable: solution = potential_sol game = try_game solvable_turn = try_turn logger.verbose("Instance {} is feasible after {} turns: {}#{}" - .format(game_id, solvable_turn, link(solution), solvable_turn + 1)) + .format(game_id, solvable_turn, compress.link(solution), solvable_turn + 1)) else: unsolvable_turn = try_turn logger.verbose("Instance {} is not feasible after {} turns.".format(game_id, unsolvable_turn)) diff --git a/hanabi/live/compress.py b/hanabi/live/compress.py index 2618f87..6e8416a 100644 --- a/hanabi/live/compress.py +++ b/hanabi/live/compress.py @@ -1,191 +1,190 @@ -#! /bin/python3 -import sys -import more_itertools - from typing import List, Union -from hanabi.game import DeckCard, ActionType, Action, GameState, HanabiInstance -from hanab_live import HanabLiveGameState, HanabLiveInstance +import more_itertools +from hanabi import hanab_game +from hanabi.live import hanab_live # use same BASE62 as on hanab.live to encode decks -BASE62 = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; +BASE62 = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" # Helper method, iterate over chunks of length n in a string def chunks(s: str, n: int): for i in range(0, len(s), n): - yield s[i:i+n] + yield s[i:i + n] # exception thrown by decompression methods if parsing fails class InvalidFormatError(ValueError): - pass + pass -def compress_actions(actions: List[Action], game_id=None) -> str: - minType = 0 - maxType = 0 +def compress_actions(actions: List[hanab_game.Action]) -> str: + min_type = 0 + max_type = 0 if len(actions) != 0: - minType = min(map(lambda a: a.type.value, actions)) - maxType = max(map(lambda a: a.type.value, actions)) - typeRange = maxType - minType + 1 + min_type = min(map(lambda a: a.type.value, actions)) + max_type = max(map(lambda a: a.type.value, actions)) + type_range = max_type - min_type + 1 def compress_action(action): - ## We encode action values with +1 to differentiate + # We encode action values with +1 to differentiate # null (encoded 0) and 0 (encoded 1) value = 0 if action.value is None else action.value + 1 - if action.type == ActionType.VoteTerminate: - # This is currently a hack, the actual format has a 10 here + if action.type == hanab_game.ActionType.VoteTerminate: + # This is currently a hack, the actual format has a 10 here, # but we cannot encode this value = 0 try: - a = BASE62[typeRange * value + (action.type.value - minType)] + a = BASE62[type_range * value + (action.type.value - min_type)] b = BASE62[action.target] except IndexError as e: raise ValueError("Encoding action failed, value too large, found {}".format(value)) from e return a + b return "{}{}{}".format( - minType, - maxType, - ''.join(map(compress_action, actions)) + min_type, + max_type, + ''.join(map(compress_action, actions)) ) -def decompress_actions(actions_str: str) -> List[Action]: +def decompress_actions(actions_str: str) -> List[hanab_game.Action]: if not len(actions_str) >= 2: raise InvalidFormatError("min/max range not specified, found: {}".format(actions_str)) try: - minType = int(actions_str[0]) - maxType = int(actions_str[1]) + min_type = int(actions_str[0]) + max_type = int(actions_str[1]) except ValueError as e: raise InvalidFormatError( - "min/max range of actions not specified, expected two integers, found {}".format(actions_str[:2]) + "min/max range of actions not specified, expected two integers, found {}".format(actions_str[:2]) ) from e - if not minType <= maxType: - raise InvalidFormatError("min/max range illegal, found [{},{}]".format(minType, maxType)) - typeRange = maxType - minType + 1 + if not min_type <= max_type: + raise InvalidFormatError("min/max range illegal, found [{},{}]".format(min_type, max_type)) + type_range = max_type - min_type + 1 if not len(actions_str) % 2 == 0: raise InvalidFormatError("Invalid action string length: Expected even number of characters") for (index, char) in enumerate(actions_str[2:]): - if not char in BASE62: + if char not in BASE62: raise InvalidFormatError( - "Invalid character at index {}: Found {}, expected one of {}".format( - index, char, BASE62 - ) + "Invalid character at index {}: Found {}, expected one of {}".format( + index, char, BASE62 + ) ) - def decompress_action(index, action): + def decompress_action(action_idx: int, action: str): try: - action_type_value = (BASE62.index(action[0]) % typeRange) + minType - action_type = ActionType(action_type_value) + action_type_value = (BASE62.index(action[0]) % type_range) + min_type + action_type = hanab_game.ActionType(action_type_value) except ValueError as e: raise InvalidFormatError( - "Invalid action type at action {}: Found {}, expected one of {}".format( - index, action_type_value, - [action_type.value for action_type in ActionType] - ) + "Invalid action type at action {}: Found {}, expected one of {}".format( + action_idx, action_type_value, + [action_type.value for action_type in hanab_game.ActionType] + ) ) from e - ## We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1) - value = BASE62.index(action[0]) // typeRange - 1 + + # We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1) + value = BASE62.index(action[0]) // type_range - 1 if value == -1: value = None - if action_type in [ActionType.Play, ActionType.Discard]: + if action_type in [hanab_game.ActionType.Play, hanab_game.ActionType.Discard]: if value is not None: raise InvalidFormatError( - "Invalid action value: Action at action index {} is Play/Discard, expected value None, found: {}".format(index, value) + "Invalid action value: Action at action index {} is Play/Discard, expected value None, " + "found: {}".format(action_idx, value) ) target = BASE62.index(action[1]) - return Action(action_type, target, value) + return hanab_game.Action(action_type, target, value) return [decompress_action(idx, a) for (idx, a) in enumerate(chunks(actions_str[2:], 2))] -def compress_deck(deck: List[DeckCard]) -> str: - assert(len(deck) != 0) - minRank = min(map(lambda c: c.rank, deck)) - maxRank = max(map(lambda c: c.rank, deck)) - rankRange = maxRank - minRank + 1 +def compress_deck(deck: List[hanab_game.DeckCard]) -> str: + assert (len(deck) != 0) + min_rank = min(map(lambda card: card.rank, deck)) + max_rank = max(map(lambda card: card.rank, deck)) + rank_range = max_rank - min_rank + 1 def compress_card(card): try: - return BASE62[rankRange * card.suitIndex + (card.rank - minRank)] + return BASE62[rank_range * card.suitIndex + (card.rank - min_rank)] except IndexError as e: raise InvalidFormatError( - "Could not compress card, suit or rank too large. Found: {}".format(card) + "Could not compress card, suit or rank too large. Found: {}".format(card) ) from e + return "{}{}{}".format( - minRank, - maxRank, - ''.join(map(compress_card, deck)) + min_rank, + max_rank, + ''.join(map(compress_card, deck)) ) -def decompress_deck(deck_str: str) -> List[DeckCard]: +def decompress_deck(deck_str: str) -> List[hanab_game.DeckCard]: if len(deck_str) < 2: raise InvalidFormatError("min/max rank range not specified, found: {}".format(deck_str)) try: - minRank = int(deck_str[0]) - maxRank = int(deck_str[1]) + min_rank = int(deck_str[0]) + max_rank = int(deck_str[1]) except ValueError as e: raise InvalidFormatError( - "min/max rank range not specified, expected two integers, found {}".format(deck_str[:2]) + "min/max rank range not specified, expected two integers, found {}".format(deck_str[:2]) ) from e - if not maxRank >= minRank: + if not max_rank >= min_rank: raise InvalidFormatError( - "Invalid rank range, found [{},{}]".format(minRank, maxRank) + "Invalid rank range, found [{},{}]".format(min_rank, max_rank) ) - rankRange = maxRank - minRank + 1 + rank_range = max_rank - min_rank + 1 for (index, char) in enumerate(deck_str[2:]): - if not char in BASE62: + if char not in BASE62: raise InvalidFormatError( - "Invalid character at index {}: Found {}, expected one of {}".format( - index, char, BASE62 - ) + "Invalid character at index {}: Found {}, expected one of {}".format( + index, char, BASE62 + ) ) def decompress_card(card_char): - index = BASE62.index(card_char) - suitIndex = index // rankRange - rank = index % rankRange + minRank - return DeckCard(suitIndex, rank) + encoded = BASE62.index(card_char) + suit_index = encoded // rank_range + rank = encoded % rank_range + min_rank + return hanab_game.DeckCard(suit_index, rank) - return [decompress_card(c) for c in deck_str[2:]] + return [decompress_card(card) for card in deck_str[2:]] # compresses a standard GameState object into hanab.live format # which can be used in json replay links # The GameState object has to be standard / fitting hanab.live variants, # otherwise compression is not possible -def compress_game_state(state: Union[GameState, HanabLiveGameState]) -> str: - var_id = -1 - if isinstance(state, HanabLiveGameState): +def compress_game_state(state: Union[hanab_game.GameState, hanab_live.HanabLiveGameState]) -> str: + if isinstance(state, hanab_live.HanabLiveGameState): var_id = state.instance.variant_id else: - assert isinstance(state, GameState) - var_id = HanabLiveInstance.select_standard_variant_id(state.instance) + assert isinstance(state, hanab_game.GameState) + var_id = hanab_live.HanabLiveInstance.select_standard_variant_id(state.instance) out = "{}{},{},{}".format( - state.instance.num_players, - compress_deck(state.instance.deck), - compress_actions(state.actions), - var_id - ) + state.instance.num_players, + compress_deck(state.instance.deck), + compress_actions(state.actions), + var_id + ) with_dashes = ''.join(more_itertools.intersperse("-", out, 20)) return with_dashes -def decompress_game_state(game_str: str) -> GameState: +def decompress_game_state(game_str: str) -> hanab_game.GameState: game_str = game_str.replace("-", "") parts = game_str.split(",") if not len(parts) == 3: raise InvalidFormatError( - "Expected 3 comma-separated parts of game, found {}".format( - len(parts) - ) + "Expected 3 comma-separated parts of game, found {}".format( + len(parts) + ) ) [players_deck, actions, variant_id] = parts if len(players_deck) == 0: @@ -212,35 +211,14 @@ def decompress_game_state(game_str: str) -> GameState: except ValueError: raise ValueError("Expected variant id, found: {}".format(variant_id)) - instance = HanabiInstance(deck, num_players) - game = GameState(instance) - + instance = hanab_game.HanabiInstance(deck, num_players) + game = hanab_game.GameState(instance) + # TODO: game is not in consistent state game.actions = actions return game -def link(game_state: GameState) -> str: +def link(game_state: hanab_game.GameState) -> str: compressed = compress_game_state(game_state) return "https://hanab.live/shared-replay-json/{}".format(compressed) - - -# add link method to GameState class -GameState.link = link - - - -if __name__ == "__main__": - for arg in sys.argv[1:]: - deck = decompress_deck(arg) - c = compress_deck(deck) - assert(c == arg) - print(deck) - - inst = HanabiInstance(deck, 5, variant_id = 32) - game = GameState(inst) - game.play(1) - game.play(5) - game.clue() - print(game.link()) - diff --git a/hanabi/live/download_data.py b/hanabi/live/download_data.py index 4f366c4..b4cbf66 100644 --- a/hanabi/live/download_data.py +++ b/hanabi/live/download_data.py @@ -3,11 +3,12 @@ from typing import Dict, Optional import psycopg2.errors -from hanabi.live.site_api import get, api -from hanabi.database.database import conn, cur -from hanabi.live.compress import compress_deck, compress_actions, DeckCard, Action, InvalidFormatError -from hanabi.live.variants import variant_id, variant_name -from hanab_live import HanabLiveInstance, HanabLiveGameState +from hanabi import hanab_game +from hanabi.database import database +from hanabi.live import site_api +from hanabi.live import compress +from hanabi.live import variants +from hanabi.live import hanab_live from hanabi import logger @@ -30,29 +31,29 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti assert_msg = "Invalid response format from hanab.live while exporting game id {}".format(game_id) - game_json = get("export/{}".format(game_id)) + game_json = site_api.get("export/{}".format(game_id)) assert game_json.get('id') == game_id, assert_msg players = game_json.get('players', []) num_players = len(players) seed = game_json.get('seed', None) options = game_json.get('options', {}) - var_id = var_id or variant_id(options.get('variant', 'No Variant')) + var_id = var_id or variants.variant_id(options.get('variant', 'No Variant')) deck_plays = options.get('deckPlays', False) one_extra_card = options.get('oneExtraCard', False) one_less_card = options.get('oneLessCard', False) all_or_nothing = options.get('allOrNothing', False) starting_player = options.get('startingPlayer', 0) - actions = [Action.from_json(action) for action in game_json.get('actions', [])] - deck = [DeckCard.from_json(card) for card in game_json.get('deck', None)] + actions = [hanab_game.Action.from_json(action) for action in game_json.get('actions', [])] + deck = [hanab_game.DeckCard.from_json(card) for card in game_json.get('deck', None)] assert players != [], assert_msg assert seed is not None, assert_msg if score is None: # need to play through the game once to find out its score - game = HanabLiveGameState( - HanabLiveInstance( + game = hanab_live.HanabLiveGameState( + hanab_live.HanabLiveInstance( deck, num_players, var_id, deck_plays=deck_plays, one_less_card=one_less_card, @@ -67,18 +68,18 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti score = game.score try: - compressed_deck = compress_deck(deck) - except InvalidFormatError: + compressed_deck = compress.compress_deck(deck) + except compress.InvalidFormatError: logger.error("Failed to compress deck while exporting game {}: {}".format(game_id, deck)) raise try: - compressed_actions = compress_actions(actions) - except InvalidFormatError: + compressed_actions = compress.compress_actions(actions) + except compress.InvalidFormatError: logger.error("Failed to compress actions while exporting game {}".format(game_id)) raise if not seed_exists: - cur.execute( + database.cur.execute( "INSERT INTO seeds (seed, num_players, variant_id, deck)" "VALUES (%s, %s, %s, %s)" "ON CONFLICT (seed) DO NOTHING", @@ -86,7 +87,7 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti ) logger.debug("New seed {} imported.".format(seed)) - cur.execute( + database.cur.execute( "INSERT INTO games (" "id, num_players, starting_player, score, seed, variant_id, deck_plays, one_extra_card, one_less_card," "all_or_nothing, actions" @@ -115,9 +116,9 @@ def process_game_row(game: Dict, var_id): if any(v is None for v in [game_id, seed, num_players, score]): raise ValueError("Unknown response format on hanab.live") - cur.execute("SAVEPOINT seed_insert") + database.cur.execute("SAVEPOINT seed_insert") try: - cur.execute( + database.cur.execute( "INSERT INTO games (id, seed, num_players, score, variant_id)" "VALUES" "(%s, %s ,%s ,%s ,%s)" @@ -125,20 +126,20 @@ def process_game_row(game: Dict, var_id): (game_id, seed, num_players, score, var_id) ) except psycopg2.errors.ForeignKeyViolation: - cur.execute("ROLLBACK TO seed_insert") + database.cur.execute("ROLLBACK TO seed_insert") detailed_export_game(game_id, score, var_id) - cur.execute("RELEASE seed_insert") + database.cur.execute("RELEASE seed_insert") logger.debug("Imported game {}".format(game_id)) def download_games(var_id): - name = variant_name(var_id) + name = variants.variant_name(var_id) page_size = 100 if name is None: raise ValueError("{} is not a known variant_id.".format(var_id)) url = "variants/{}".format(var_id) - r = api(url, refresh=True) + r = site_api.api(url, refresh=True) if not r: raise RuntimeError("Failed to download request from hanab.live") @@ -146,12 +147,12 @@ def download_games(var_id): if num_entries is None: raise ValueError("Unknown response format on hanab.live") - cur.execute( + database.cur.execute( "SELECT COUNT(*) FROM games WHERE variant_id = %s AND id <= " "(SELECT COALESCE (last_game_id, 0) FROM variant_game_downloads WHERE variant_id = %s)", (var_id, var_id) ) - num_already_downloaded_games = cur.fetchone()[0] + num_already_downloaded_games = database.cur.fetchone()[0] assert num_already_downloaded_games <= num_entries, "Database inconsistent, too many games present." next_page = num_already_downloaded_games // page_size last_page = (num_entries - 1) // page_size @@ -171,7 +172,7 @@ def download_games(var_id): enrich_print=False ) as bar: for page in range(next_page, last_page + 1): - r = api(url + "?col[0]=0&page={}".format(page), refresh=page == last_page) + r = site_api.api(url + "?col[0]=0&page={}".format(page), refresh=page == last_page) rows = r.get('rows', []) if page == next_page: rows = rows[num_already_downloaded_games % 100:] @@ -180,11 +181,10 @@ def download_games(var_id): for row in rows: process_game_row(row, var_id) bar() - cur.execute( + database.cur.execute( "INSERT INTO variant_game_downloads (variant_id, last_game_id) VALUES" "(%s, %s)" "ON CONFLICT (variant_id) DO UPDATE SET last_game_id = EXCLUDED.last_game_id", (var_id, r['rows'][-1]['id']) ) - conn.commit() - + database.conn.commit() diff --git a/hanabi/live/hanab_live.py b/hanabi/live/hanab_live.py index 832dc04..1c1a3e5 100644 --- a/hanabi/live/hanab_live.py +++ b/hanabi/live/hanab_live.py @@ -1,14 +1,14 @@ from typing import List -import hanabi +from hanabi import hanab_game from hanabi import constants -from hanabi.live.variants import Variant +from hanabi.live import variants -class HanabLiveInstance(hanabi.HanabiInstance): +class HanabLiveInstance(hanab_game.HanabiInstance): def __init__( self, - deck: List[hanabi.DeckCard], + deck: List[hanab_game.DeckCard], num_players: int, variant_id: int, one_extra_card: bool = False, @@ -24,10 +24,10 @@ class HanabLiveInstance(hanabi.HanabiInstance): super().__init__(deck, num_players, hand_size=hand_size, *args, **kwargs) self.variant_id = variant_id - self.variant = Variant.from_db(self.variant_id) + self.variant = variants.Variant.from_db(self.variant_id) @staticmethod - def select_standard_variant_id(instance: hanabi.HanabiInstance): + def select_standard_variant_id(instance: hanab_game.HanabiInstance): err_msg = "Hanabi instance not supported by hanab.live, cannot convert to HanabLiveInstance: " assert 3 <= instance.num_suits <= 6, \ err_msg + "Illegal number of suits ({}) found, must be in range [3,6]".format(instance.num_suits) @@ -40,40 +40,40 @@ class HanabLiveInstance(hanabi.HanabiInstance): return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[instance.num_suits][instance.num_dark_suits] -class HanabLiveGameState(hanabi.GameState): +class HanabLiveGameState(hanab_game.GameState): def __init__(self, instance: HanabLiveInstance, starting_player: int = 0): super().__init__(instance, starting_player) self.instance: HanabLiveInstance = instance def make_action(self, action): match action.type: - case hanabi.ActionType.ColorClue | hanabi.ActionType.RankClue: + case hanab_game.ActionType.ColorClue | hanab_game.ActionType.RankClue: assert(self.clues > 0) self.actions.append(action) self.clues -= self.instance.clue_increment self._make_turn() # TODO: could check that the clue specified is in fact legal - case hanabi.ActionType.Play: + case hanab_game.ActionType.Play: self.play(action.target) - case hanabi.ActionType.Discard: + case hanab_game.ActionType.Discard: self.discard(action.target) - case hanabi.ActionType.EndGame | hanabi.ActionType.VoteTerminate: + case hanab_game.ActionType.EndGame | hanab_game.ActionType.VoteTerminate: self.over = True - def _waste_clue(self) -> hanabi.Action: + def _waste_clue(self) -> hanab_game.Action: for player in range(self.turn + 1, self.turn + self.num_players): for card in self.hands[player % self.num_players]: for rank in self.instance.variant.ranks: if self.instance.variant.rank_touches(card, rank): - return hanabi.Action( - hanabi.ActionType.RankClue, + return hanab_game.Action( + hanab_game.ActionType.RankClue, player % self.num_players, rank ) for color in range(self.instance.variant.num_colors): if self.instance.variant.color_touches(card, color): - return hanabi.Action( - hanabi.ActionType.ColorClue, + return hanab_game.Action( + hanab_game.ActionType.ColorClue, player % self.num_players, color ) diff --git a/hanabi/live/instance_finder.py b/hanabi/live/instance_finder.py index f413f48..e4ecd27 100644 --- a/hanabi/live/instance_finder.py +++ b/hanabi/live/instance_finder.py @@ -3,26 +3,26 @@ import pebble.concurrent import concurrent.futures import traceback +import alive_progress +import threading +import time -from hanabi.solvers.sat import solve_sat -from hanabi.database.database import conn, cur -from hanabi.live.download_data import detailed_export_game -from alive_progress import alive_bar -from hanabi.live.compress import decompress_deck, link -from hanabi.game import HanabiInstance -from threading import Lock -from time import perf_counter -from hanabi.solvers.greedy_solver import GameState, GreedyStrategy from hanabi import logger -from hanabi.solvers.deck_analyzer import analyze, InfeasibilityReason -from hanabi.live.variants import Variant +from hanabi.solvers.sat import solve_sat +from hanabi.database import database +from hanabi.live import download_data +from hanabi.live import compress +from hanabi import hanab_game +from hanabi.solvers import greedy_solver +from hanabi.solvers import deck_analyzer +from hanabi.live import variants MAX_PROCESSES = 6 def update_seeds_db(): - cur2 = conn.cursor() - with conn.cursor() as cur: + cur2 = database.conn.cursor() + with database.conn.cursor() as cur: cur.execute("SELECT num_players, seed, variant_id from games;") for (num_players, seed, variant_id) in cur: cur2.execute("SELECT COUNT(*) from seeds WHERE seed = (%s);", (seed,)) @@ -34,47 +34,47 @@ def update_seeds_db(): "(%s, %s, %s)", (seed, num_players, variant_id) ) - conn.commit() + database.conn.commit() else: print("seed {} already found in DB".format(seed)) def get_decks_of_seeds(): - cur2 = conn.cursor() - cur.execute("SELECT seed, variant_id FROM seeds WHERE deck is NULL") - for (seed, variant_id) in cur: + cur2 = database.conn.cursor() + database.cur.execute("SELECT seed, variant_id FROM seeds WHERE deck is NULL") + for (seed, variant_id) in database.cur: cur2.execute("SELECT id FROM games WHERE seed = (%s) LIMIT 1", (seed,)) (game_id,) = cur2.fetchone() logger.verbose("Exporting game {} for seed {}.".format(game_id, seed)) - detailed_export_game(game_id, var_id=variant_id, seed_exists=True) - conn.commit() + download_data.detailed_export_game(game_id, var_id=variant_id, seed_exists=True) + database.conn.commit() def update_trivially_feasible_games(variant_id): - variant: Variant = Variant.from_db(variant_id) - cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,)) - seeds = cur.fetchall() + variant: variants.Variant = variants.Variant.from_db(variant_id) + database.cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,)) + seeds = database.cur.fetchall() print('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds))) - with alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar: + with alive_progress.alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar: for (seed,) in seeds: - cur.execute("SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing " + database.cur.execute("SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing " "FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;", (variant.max_score, seed) ) - res = cur.fetchall() + res = database.cur.fetchall() logger.debug("Checking seed {}: {:3} results".format(seed, len(res))) for (game_id, a, b, c, d) in res: if None in [a, b, c, d]: logger.debug(' Game {} not found in database, exporting...'.format(game_id)) - detailed_export_game(game_id, var_id=variant_id) + download_data.detailed_export_game(game_id, var_id=variant_id) else: logger.debug(' Game {} already in database'.format(game_id, valid)) valid = not any([a, b, c, d]) if valid: logger.verbose('Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id)) - cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (True, seed)) - conn.commit() + database.cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (True, seed)) + database.conn.commit() break else: logger.verbose(' Cheaty game found') @@ -82,7 +82,7 @@ def update_trivially_feasible_games(variant_id): def get_decks_for_all_seeds(): - cur = conn.cursor() + cur = database.conn.database.cursor() cur.execute("SELECT id " "FROM games " " INNER JOIN seeds " @@ -96,26 +96,26 @@ def get_decks_for_all_seeds(): ) print("Exporting decks for all seeds") res = cur.fetchall() - with alive_bar(len(res), title="Exporting decks") as bar: + with alive_progress.alive_bar(len(res), title="Exporting decks") as bar: for (game_id,) in res: - detailed_export_game(game_id) + download_data.detailed_export_game(game_id) bar() -mutex = Lock() +mutex = threading.Lock() -def solve_instance(instance: HanabiInstance): +def solve_instance(instance: hanab_game.HanabiInstance): # first, sanity check on running out of pace - result = analyze(instance) + result = deck_analyzer.analyze(instance) if result is not None: - assert type(result) == InfeasibilityReason + assert type(result) == deck_analyzer.InfeasibilityReason logger.debug("found infeasible deck") return False, None, None for num_remaining_cards in [0, 20]: # logger.info("trying with {} remaining cards".format(num_remaining_cards)) - game = GameState(instance) - strat = GreedyStrategy(game) + game = hanab_game.GameState(instance) + strat = greedy_solver.GreedyStrategy(game) # make a number of greedy moves while not game.is_over() and not game.is_known_lost(): @@ -136,10 +136,10 @@ def solve_instance(instance: HanabiInstance): return True, sol, num_remaining_cards logger.debug( "No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format( - num_remaining_cards, link(game))) + num_remaining_cards, compress.link(game))) # print("Aborting trying with greedy strat") logger.debug("Starting full SAT solver") - game = GameState(instance) + game = hanab_game.GameState(instance) a, b = solve_sat(game) return a, b, instance.draw_pile_size @@ -148,21 +148,21 @@ def solve_instance(instance: HanabiInstance): def solve_seed_with_timeout(seed, num_players, deck_compressed, var_name: Optional[str] = None): try: logger.verbose("Starting to solve seed {}".format(seed)) - deck = decompress_deck(deck_compressed) - t0 = perf_counter() - solvable, solution, num_remaining_cards = solve_instance(HanabiInstance(deck, num_players)) - t1 = perf_counter() + deck = compress.decompress_deck(deck_compressed) + t0 = time.perf_counter() + solvable, solution, num_remaining_cards = solve_instance(hanab_game.HanabiInstance(deck, num_players)) + t1 = time.perf_counter() logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), solvable)) mutex.acquire() if solvable is not None: - cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed)) - conn.commit() + database.cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed)) + database.conn.commit() mutex.release() if solvable == True: logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format( - num_remaining_cards, seed, link(solution)) + num_remaining_cards, seed, compress.link(solution)) ) elif solvable == False: logger.debug("seed {} was not solvable".format(seed)) @@ -187,18 +187,14 @@ def solve_seed(seed, num_players, deck_compressed, var_name: Optional[str] = Non def solve_unknown_seeds(variant_id, variant_name: Optional[str] = None): - cur.execute("SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL", (variant_id,)) - res = cur.fetchall() - - # for r in res: - # solve_seed(r[0], r[1], r[2], variant_name) + database.cur.execute( + "SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL", + (variant_id,) + ) + res = database.cur.fetchall() with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor: fs = [executor.submit(solve_seed, r[0], r[1], r[2], variant_name) for r in res] - with alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar: + with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar: for f in concurrent.futures.as_completed(fs): bar() - - -update_trivially_feasible_games(0) -solve_unknown_seeds(0, "No Variant") \ No newline at end of file diff --git a/hanabi/live/variants.py b/hanabi/live/variants.py index 644ffd0..27b3f36 100644 --- a/hanabi/live/variants.py +++ b/hanabi/live/variants.py @@ -1,6 +1,6 @@ import enum from typing import List, Optional -from hanabi.game import DeckCard, ActionType +from hanabi import hanab_game from hanabi.database.database import cur @@ -161,7 +161,7 @@ class Variant: def _synesthesia_ranks(self, color_value: int) -> List[int]: return [rank for rank in self.ranks if (rank - color_value) % len(self.colors) == 0] - def rank_touches(self, card: DeckCard, value: int): + def rank_touches(self, card: hanab_game.DeckCard, value: int): assert 0 <= card.suitIndex < self.num_suits,\ f"Unexpected card {card}, suitIndex {card.suitIndex} out of bounds for {self.num_suits} suits." assert not self.no_rank_clues, "Cluing rank not allowed in this variant." @@ -186,7 +186,7 @@ class Variant: ranks = self._preprocess_rank(value) return any(self.suits[card.suitIndex].rank_touches(card.rank, rank) for rank in ranks) - def color_touches(self, card: DeckCard, value: int): + def color_touches(self, card: hanab_game.DeckCard, value: int): assert 0 <= card.suitIndex < self.num_suits, \ f"Unexpected card {card}, suitIndex {card.suitIndex} out of bounds for {self.num_suits} suits." assert not self.no_color_clues, "Cluing color not allowed in this variant." diff --git a/hanabi/logger_setup.py b/hanabi/logger_setup.py index f643f5e..a3569a6 100644 --- a/hanabi/logger_setup.py +++ b/hanabi/logger_setup.py @@ -24,7 +24,6 @@ class LoggerManager: '%(message)s' ) - self.console_handler = logging.StreamHandler() self.console_handler.setLevel(console_level) self.console_handler.setFormatter(self.nothing_formatter) diff --git a/hanabi/solvers/deck_analyzer.py b/hanabi/solvers/deck_analyzer.py index 6361d91..93bd9ff 100644 --- a/hanabi/solvers/deck_analyzer.py +++ b/hanabi/solvers/deck_analyzer.py @@ -1,14 +1,14 @@ -from hanabi.live.compress import DeckCard +from hanabi.live import compress from enum import Enum -from hanabi.database import conn -from hanabi.game import HanabiInstance, pp_deck -from hanabi.live.compress import decompress_deck +from hanabi.database import database +from hanabi import hanab_game +from hanabi.live import compress class InfeasibilityType(Enum): - OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is - OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit + OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is + OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit NotTrivial = 2 CritAtBottom = 3 @@ -32,17 +32,17 @@ class InfeasibilityReason(): def analyze_suit(occurrences): # denotes the indexes of copies we can use wlog picks = { - 1: 0, - **{ r: None for r in range(2, 5) }, - 5: 0 + 1: 0, + **{r: None for r in range(2, 5)}, + 5: 0 } # denotes the intervals when cards will be played wlog play_times = { - 1: [occurrences[1][0]], - **{ r: None for _ in range(instance.num_suits) - for r in range(2,6) - } + 1: [occurrences[1][0]], + **{r: None for _ in range(instance.num_suits) + for r in range(2, 6) + } } print("occurrences are: {}".format(occurrences)) @@ -51,7 +51,7 @@ def analyze_suit(occurrences): # general analysis earliest_play = max(min(play_times[rank - 1]), min(occurrences[rank])) - latest_play = max( *play_times[rank - 1], *occurrences[rank]) + latest_play = max(*play_times[rank - 1], *occurrences[rank]) play_times[rank] = [earliest_play, latest_play] # check a few extra cases regarding the picks when the rank is not 5 @@ -62,30 +62,28 @@ def analyze_suit(occurrences): play_times[rank] = [min(occurrences[rank])] continue - # check if the second copy is not worse than the first when it comes, # because we either have to wait for smaller cards anyway # or the next card is not there anyway - if max(occurrences[rank]) < max(earliest_play, min(occurrences[rank + 1])): + if max(occurrences[rank]) < max(earliest_play, min(occurrences[rank + 1])): picks[rank] = 1 - return picks, play_times - -def analyze_card_usage(instance: HanabiInstance): +def analyze_card_usage(instance: hanab_game.HanabiInstance): storage_size = instance.num_players * instance.hand_size for suit in range(instance.num_suits): print("analysing suit {}: {}".format( suit, - pp_deck((c for c in instance.deck if c.suitIndex == suit)) - ) + hanab_game.pp_deck((c for c in instance.deck if c.suitIndex == suit)) + ) ) occurrences = { - rank: [max(0, i - storage_size + 1) for (i, card) in enumerate(instance.deck) if card == DeckCard(suit, rank)] - for rank in range(1,6) + rank: [max(0, i - storage_size + 1) for (i, card) in enumerate(instance.deck) if + card == hanab_game.DeckCard(suit, rank)] + for rank in range(1, 6) } picks, play_times = analyze_suit(occurrences) @@ -96,9 +94,7 @@ def analyze_card_usage(instance: HanabiInstance): print() - -def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityReason | None: - +def analyze(instance: hanab_game.HanabiInstance, find_non_trivial=False) -> InfeasibilityReason | None: if instance.deck[-1].rank != 5 and instance.deck[-1].suitIndex + instance.num_dark_suits >= instance.num_suits: return InfeasibilityReason(InfeasibilityType.CritAtBottom, instance.deck_size - 1) @@ -121,7 +117,7 @@ def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityRe stacks[card.suitIndex] += 1 # check for further playables that we stored for check_rank in range(card.rank + 1, 6): - check_card = DeckCard(card.suitIndex, check_rank) + check_card = hanab_game.DeckCard(card.suitIndex, check_rank) if check_card in stored_cards: stacks[card.suitIndex] += 1 stored_cards.remove(check_card) @@ -130,36 +126,36 @@ def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityRe else: break elif card.rank <= stacks[card.suitIndex]: - pass # card is trash + pass # card is trash elif card.rank > stacks[card.suitIndex] + 1: # need to store card if card in stored_cards or card.rank == 5: stored_crits.add(card) stored_cards.add(card) - - ## check for out of handsize: + + # check for out of handsize: if len(stored_crits) == instance.num_players * instance.hand_size: return InfeasibilityReason(InfeasibilityType.OutOfHandSize, i) if find_non_trivial and len(stored_cards) == instance.num_players * instance.hand_size: - ret = InfeasibilityReason(InfeasibilityType.NotTrivial, i) + ret = InfeasibilityReason(InfeasibilityType.NotTrivial, i) # the last - 1 is there because we have to discard 'next', causing a further draw max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1 needed_plays = 5 * instance.num_suits - sum(stacks) - missing = max_remaining_plays - needed_plays + missing = max_remaining_plays - needed_plays if missing < min_forced_pace: -# print("update to {}: {}".format(i, missing)) + # print("update to {}: {}".format(i, missing)) min_forced_pace = missing worst_index = i # check that we correctly walked through the deck - assert(len(stored_cards) == 0) - assert(len(stored_crits) == 0) - assert(sum(stacks) == 5 * instance.num_suits) + assert (len(stored_cards) == 0) + assert (len(stored_crits) == 0) + assert (sum(stacks) == 5 * instance.num_suits) - if min_forced_pace < 0: + if min_forced_pace < 0: return InfeasibilityReason(InfeasibilityType.OutOfPace, worst_index, min_forced_pace) elif ret is not None: return ret @@ -168,10 +164,12 @@ def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityRe def run_on_database(): - cur = conn.cursor() - cur2 = conn.cursor() + cur = database.conn.cursor() + cur2 = database.conn.cursor() for num_p in range(2, 6): - cur.execute("SELECT seed, num_players, deck from seeds where variant_id = 0 and num_players = (%s) order by seed asc", (num_p,)) + cur.execute( + "SELECT seed, num_players, deck from seeds where variant_id = 0 and num_players = (%s) order by seed asc", + (num_p,)) res = cur.fetchall() hand = 0 pace = 0 @@ -179,11 +177,11 @@ def run_on_database(): d = None print("Checking {} {}-player seeds from database".format(len(res), num_p)) for (seed, num_players, deck) in res: - deck = decompress_deck(deck) - a = analyze(HanabiInstance(deck, num_players), True) + deck = compress.decompress_deck(deck) + a = analyze(hanab_game.HanabiInstance(deck, num_players), True) if type(a) == InfeasibilityReason: if a.type == InfeasibilityType.OutOfHandSize: - # print("Seed {} infeasible: {}\n{}".format(seed, a, deck)) + # print("Seed {} infeasible: {}\n{}".format(seed, a, deck)) hand += 1 elif a.type == InfeasibilityType.OutOfPace: pace += 1 @@ -191,28 +189,12 @@ def run_on_database(): non_trivial += 1 d = seed, deck - print("Found {} seeds running out of hand size, {} running out of pace and {} that are not trivial".format(hand, pace, non_trivial)) + print("Found {} seeds running out of hand size, {} running out of pace and {} that are not trivial".format(hand, + pace, + non_trivial)) if d is not None: - print("example non-trivial deck (seed {}): [{}]" - .format( - d[0], - ", ".join(c.colorize() for c in d[1]) - ) - ) + print("example non-trivial deck (seed {}): [{}]".format( + d[0], + ", ".join(c.colorize() for c in d[1]) + )) print() -# if p < 0: -# print("seed {} ({} players) runs out of pace ({}) after drawing {}: {}:\n{}".format(seed, num_players, p, i, deck[i], deck)) -# cur.execute("UPDATE seeds SET feasible = f WHERE seed = (%s)", seed) - -if __name__ == "__main__": -# print(deck) -# a = analyze(deck, 4) -# print(a) -# run_on_database() - deck_str = "15bcfwnqsdmbnfuvhskrgfixwckklojxgemrhpqppuaaiyadultv" - deck_str = "15misofrmvvuxujkphaqpcflegysdwqaakcilbxtuhwfrbgdnpkn" - deck_str = "15wqpvhdkufjcrewyxulvarhgolkixmfgmndbpstqbupcanfisak" - deck = decompress_deck(deck_str) - print(pp_deck(deck)) - instance = HanabiInstance(deck, 2) - analyze_card_usage(instance) diff --git a/hanabi/solvers/greedy_solver.py b/hanabi/solvers/greedy_solver.py index 3c1a2e7..c78fff8 100755 --- a/hanabi/solvers/greedy_solver.py +++ b/hanabi/solvers/greedy_solver.py @@ -1,13 +1,14 @@ #! /bin/python3 import collections import sys + from enum import Enum -from hanabi import logger from typing import Optional -from hanabi.game import DeckCard, GameState, HanabiInstance -from hanabi.live.compress import link, decompress_deck -from hanabi.database.database import conn +from hanabi import logger +from hanabi import hanab_game +from hanabi.live import compress +from hanabi.database import database class CardType(Enum): @@ -19,8 +20,8 @@ class CardType(Enum): UniqueVisible = 4 -class CardState(): - def __init__(self, card_type: CardType, card: DeckCard, weight=1): +class CardState: + def __init__(self, card_type: CardType, card: hanab_game.DeckCard, weight: Optional[int] = 1): self.card_type = card_type self.card = card self.weight = weight @@ -66,7 +67,7 @@ class WeightedCard: class HandState: - def __init__(self, player: int, game_state: GameState): + def __init__(self, player: int, game_state: hanab_game.GameState): self.trash = [] self.playable = [] self.critical = [] @@ -111,14 +112,14 @@ class HandState: else: assert len(self.critical) > 0, "Programming error." self.best_discard = self.critical[-1] - self.discard_badness = 600 - 100*self.best_discard.card.rank + self.discard_badness = 600 - 100 * self.best_discard.card.rank def num_useful_cards(self): return len(self.dupes) + len(self.uniques) + len(self.playable) + len(self.critical) class CheatingStrategy: - def __init__(self, game_state: GameState): + def __init__(self, game_state: hanab_game.GameState): self.game_state = game_state def make_move(self): @@ -135,10 +136,8 @@ class CheatingStrategy: exit(0) - - class GreedyStrategy(): - def __init__(self, game_state: GameState): + def __init__(self, game_state: hanab_game.GameState): self.game_state = game_state self.earliest_draw_times = [] @@ -146,7 +145,7 @@ class GreedyStrategy(): self.earliest_draw_times.append([]) for r in range(1, 6): self.earliest_draw_times[s].append(max( - game_state.deck.index(DeckCard(s, r)) - game_state.hand_size * game_state.num_players + 1, + game_state.deck.index(hanab_game.DeckCard(s, r)) - game_state.hand_size * game_state.num_players + 1, 0 if r == 1 else self.earliest_draw_times[s][r - 2] )) @@ -188,7 +187,7 @@ class GreedyStrategy(): copy_holders = set(self.game_state.holding_players(state.card)) copy_holders.remove(player) connecting_holders = set( - self.game_state.holding_players(DeckCard(state.card.suitIndex, state.card.rank + 1))) + self.game_state.holding_players(hanab_game.DeckCard(state.card.suitIndex, state.card.rank + 1))) if len(copy_holders) == 0: # card is unique, imortancy is based lexicographically on whether somebody has the conn. card and the rank @@ -244,8 +243,8 @@ class GreedyStrategy(): self.game_state.clue() -def run_deck(instance: HanabiInstance) -> GameState: - gs = GameState(instance) +def run_deck(instance: hanab_game.HanabiInstance) -> hanab_game.GameState: + gs = hanab_game.GameState(instance) strat = CheatingStrategy(gs) while not gs.is_over(): strat.make_move() @@ -256,7 +255,7 @@ def run_samples(num_players, sample_size): logger.info("Running {} test games on {} players using greedy strategy.".format(sample_size, num_players)) won = 0 lost = 0 - cur = conn.cursor() + cur = database.conn.cursor() cur.execute( "SELECT seed, num_players, deck, variant_id " "FROM seeds WHERE variant_id = 0 AND num_players = (%s)" @@ -264,13 +263,13 @@ def run_samples(num_players, sample_size): (num_players, sample_size)) for r in cur: seed, num_players, deck_str, var_id = r - deck = decompress_deck(deck_str) - instance = HanabiInstance(deck, num_players) + deck = compress.decompress_deck(deck_str) + instance = hanab_game.HanabiInstance(deck, num_players) final_game_state = run_deck(instance) if final_game_state.score != instance.max_score: logger.verbose( "Greedy strategy lost {}-player seed {:10} {}:\n{}" - .format(num_players, seed, str(deck), link(final_game_state)) + .format(num_players, seed, str(deck), compress.link(final_game_state)) ) lost += 1 else: @@ -279,9 +278,3 @@ def run_samples(num_players, sample_size): logger.info("Won {} ({}%) and lost {} ({}%) from sample of {} test games using greedy strategy.".format( won, round(100 * won / sample_size, 2), lost, round(100 * lost / sample_size, 2), sample_size )) - - -if __name__ == "__main__": - for p in range(2, 6): - run_samples(p, int(sys.argv[1])) - print() diff --git a/hanabi/solvers/sat.py b/hanabi/solvers/sat.py index 9f2d997..fc6191b 100644 --- a/hanabi/solvers/sat.py +++ b/hanabi/solvers/sat.py @@ -1,13 +1,12 @@ import copy -from pysmt.shortcuts import Symbol, Bool, Not, Implies, Iff, And, Or, AtMostOne, get_model, Equals, GE, NotEquals, Int -from pysmt.typing import INT from typing import Optional, Tuple -from hanabi.game import DeckCard, GameState, HanabiInstance -from hanabi.live.compress import link, decompress_deck -from greedy_solver import GreedyStrategy -from hanabi.constants import COLOR_INITIALS +from pysmt.shortcuts import Symbol, Bool, Not, Implies, Iff, And, Or, AtMostOne, get_model, Equals, GE, NotEquals, Int +from pysmt.typing import INT + from hanabi import logger +from hanabi import constants +from hanabi import hanab_game # literals to model game as sat instance to check for feasibility @@ -15,16 +14,15 @@ from hanabi import logger class Literals(): # num_suits is total number of suits, i.e. also counts the dark suits # default distribution among all suits is assumed - def __init__(self, instance: HanabiInstance): - + def __init__(self, instance: hanab_game.HanabiInstance): # clues[m][i] == "after move m we have i clues", in clue starved, this counts half clues self.clues = { - -1: Int(16 if instance.clue_starved else 8) # we have 8 clues after turn - , **{ - m: Symbol('m{}clues'.format(m), INT) - for m in range(instance.max_winning_moves) - } - } + -1: Int(16 if instance.clue_starved else 8) # we have 8 clues after turn + , **{ + m: Symbol('m{}clues'.format(m), INT) + for m in range(instance.max_winning_moves) + } + } self.pace = { -1: Int(instance.initial_pace) @@ -36,78 +34,83 @@ class Literals(): # strikes[m][i] == "after move m we have at least i strikes" self.strikes = { - -1: {i: Bool(i == 0) for i in range(0, instance.num_strikes + 1)} # no strikes when we start - , **{ - m: { - 0: Bool(True), - **{ s: Symbol('m{}strikes{}'.format(m,s)) for s in range(1, instance.num_strikes) }, - instance.num_strikes: Bool(False) # never so many clues that we lose. Implicitly forbids striking out - } - for m in range(instance.max_winning_moves) - } - } + -1: {i: Bool(i == 0) for i in range(0, instance.num_strikes + 1)} # no strikes when we start + , **{ + m: { + 0: Bool(True), + **{s: Symbol('m{}strikes{}'.format(m, s)) for s in range(1, instance.num_strikes)}, + instance.num_strikes: Bool(False) + # never so many clues that we lose. Implicitly forbids striking out + } + for m in range(instance.max_winning_moves) + } + } # extraturn[m] = "turn m is a move part of the extra round or a dummy turn" self.extraround = { - -1: Bool(False) - , **{ - m: Bool(False) if m < instance.draw_pile_size else Symbol('m{}extra'.format(m)) # it takes at least as many turns as cards in the draw pile to start the extra round - for m in range(0, instance.max_winning_moves) - } - } + -1: Bool(False) + , **{ + m: Bool(False) if m < instance.draw_pile_size else Symbol('m{}extra'.format(m)) + # it takes at least as many turns as cards in the draw pile to start the extra round + for m in range(0, instance.max_winning_moves) + } + } # dummyturn[m] = "turn m is a dummy nurn and not actually part of the game" self.dummyturn = { - -1: Bool(False) - , **{ - m: Bool(False) if m < instance.draw_pile_size + instance.num_players else Symbol('m{}dummy'.format(m)) - for m in range(0, instance.max_winning_moves) - } - } + -1: Bool(False) + , **{ + m: Bool(False) if m < instance.draw_pile_size + instance.num_players else Symbol('m{}dummy'.format(m)) + for m in range(0, instance.max_winning_moves) + } + } # draw[m][i] == "at move m we play/discard deck[i]" self.discard = { - m: {i: Symbol('m{}discard{}'.format(m, i)) for i in range(instance.deck_size)} - for m in range(instance.max_winning_moves) - } + m: {i: Symbol('m{}discard{}'.format(m, i)) for i in range(instance.deck_size)} + for m in range(instance.max_winning_moves) + } # draw[m][i] == "at move m we draw deck card i" self.draw = { - -1: { i: Bool(i == instance.num_dealt_cards - 1) for i in range(instance.num_dealt_cards - 1, instance.deck_size) } - , **{ - m: { - instance.num_dealt_cards - 1: Bool(False), - **{i: Symbol('m{}draw{}'.format(m, i)) for i in range(instance.num_dealt_cards, instance.deck_size)} - } - for m in range(instance.max_winning_moves) - } - } + -1: {i: Bool(i == instance.num_dealt_cards - 1) for i in + range(instance.num_dealt_cards - 1, instance.deck_size)} + , **{ + m: { + instance.num_dealt_cards - 1: Bool(False), + **{i: Symbol('m{}draw{}'.format(m, i)) for i in range(instance.num_dealt_cards, instance.deck_size)} + } + for m in range(instance.max_winning_moves) + } + } # strike[m] = "at move m we get a strike" self.strike = { - -1: Bool(False) - , **{ - m: Symbol('m{}newstrike'.format(m)) - for m in range(instance.max_winning_moves) - } - } + -1: Bool(False) + , **{ + m: Symbol('m{}newstrike'.format(m)) + for m in range(instance.max_winning_moves) + } + } # progress[m][card = (suitIndex, rank)] == "after move m we have played in suitIndex up to rank" self.progress = { - -1: {(s, r): Bool(r == 0) for s in range(0, instance.num_suits) for r in range(0, 6)} # at start, have only played rank zero - , **{ - m: { - **{(s, 0): Bool(True) for s in range(0, instance.num_suits)}, - **{(s, r): Symbol('m{}progress{}{}'.format(m, s, r)) for s in range(0, instance.num_suits) for r in range(1, 6)} - } - for m in range(instance.max_winning_moves) - } - } + -1: {(s, r): Bool(r == 0) for s in range(0, instance.num_suits) for r in range(0, 6)} + # at start, have only played rank zero + , **{ + m: { + **{(s, 0): Bool(True) for s in range(0, instance.num_suits)}, + **{(s, r): Symbol('m{}progress{}{}'.format(m, s, r)) for s in range(0, instance.num_suits) for r in + range(1, 6)} + } + for m in range(instance.max_winning_moves) + } + } ## Utility variables # discard_any[m] == "at move m we play/discard a card" - self.discard_any = { m: Symbol('m{}discard_any'.format(m)) for m in range(instance.max_winning_moves) } + self.discard_any = {m: Symbol('m{}discard_any'.format(m)) for m in range(instance.max_winning_moves)} # draw_any[m] == "at move m we draw a card" self.draw_any = {m: Symbol('m{}draw_any'.format(m)) for m in range(instance.max_winning_moves)} @@ -122,11 +125,12 @@ class Literals(): self.incr_clues = {m: Symbol('m{}c+'.format(m)) for m in range(instance.max_winning_moves)} -def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int] = 0) -> Tuple[bool, Optional[GameState]]: - if isinstance(starting_state, HanabiInstance): +def solve_sat(starting_state: hanab_game.GameState | hanab_game.HanabiInstance, min_pace: Optional[int] = 0) -> Tuple[ + bool, Optional[hanab_game.GameState]]: + if isinstance(starting_state, hanab_game.HanabiInstance): instance = starting_state - game_state = GameState(instance) - elif isinstance(starting_state, GameState): + game_state = hanab_game.GameState(instance) + elif isinstance(starting_state, hanab_game.GameState): instance = starting_state.instance game_state = starting_state else: @@ -141,7 +145,7 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int starting_hands = [[card.deck_index for card in hand] for hand in game_state.hands] first_turn = len(game_state.actions) - if isinstance(starting_state, GameState): + if isinstance(starting_state, hanab_game.GameState): # have to set additional variables # set initial clues @@ -157,7 +161,7 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int # check if extraround has started (usually not) ls.extraround[first_turn - 1] = Bool(game_state.remaining_extra_turns < game_state.num_players) - ls.dummyturn[first_turn -1] = Bool(False) + ls.dummyturn[first_turn - 1] = Bool(False) # set recent draws: important to model progress # we just pretend that the last card drawn was in fact drawn last turn, @@ -169,7 +173,6 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int for m in range(first_turn, instance.max_winning_moves): ls.draw[m][game_state.progress - 1] = Bool(False) - # model initial progress for s in range(0, game_state.num_suits): for r in range(0, 6): @@ -195,20 +198,24 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int Implies(ls.play[m], ls.discard_any[m]), # definition of ls.play5 - Iff(ls.play5[m], And(ls.play[m], Or(ls.discard[m][i] for i in range(instance.deck_size) if instance.deck[i].rank == 5))), + Iff(ls.play5[m], + And(ls.play[m], Or(ls.discard[m][i] for i in range(instance.deck_size) if instance.deck[i].rank == 5))), # definition of ls.incr_clues - Iff(ls.incr_clues[m], And(ls.discard_any[m], NotEquals(ls.clues[m-1], Int(16 if instance.clue_starved else 8)), Implies(ls.play[m], ls.play5[m]))), + Iff(ls.incr_clues[m], + And(ls.discard_any[m], NotEquals(ls.clues[m - 1], Int(16 if instance.clue_starved else 8)), + Implies(ls.play[m], ls.play5[m]))), # change of ls.clues Implies(And(Not(ls.discard_any[m]), Not(ls.dummyturn[m])), - Equals(ls.clues[m], ls.clues[m-1] - (2 if instance.clue_starved else 1))), - Implies(ls.incr_clues[m], Equals(ls.clues[m], ls.clues[m-1] + 1)), - Implies(And(Or(ls.discard_any[m], ls.dummyturn[m]), Not(ls.incr_clues[m])), Equals(ls.clues[m], ls.clues[m-1])), + Equals(ls.clues[m], ls.clues[m - 1] - (2 if instance.clue_starved else 1))), + Implies(ls.incr_clues[m], Equals(ls.clues[m], ls.clues[m - 1] + 1)), + Implies(And(Or(ls.discard_any[m], ls.dummyturn[m]), Not(ls.incr_clues[m])), + Equals(ls.clues[m], ls.clues[m - 1])), # change of pace - Implies(And(ls.discard_any[m], Or(ls.strike[m], Not(ls.play[m]))), Equals(ls.pace[m], ls.pace[m-1] - 1)), - Implies(Or(Not(ls.discard_any[m]), And(Not(ls.strike[m]), ls.play[m])), Equals(ls.pace[m], ls.pace[m-1])), + Implies(And(ls.discard_any[m], Or(ls.strike[m], Not(ls.play[m]))), Equals(ls.pace[m], ls.pace[m - 1] - 1)), + Implies(Or(Not(ls.discard_any[m]), And(Not(ls.strike[m]), ls.play[m])), Equals(ls.pace[m], ls.pace[m - 1])), # pace is nonnegative GE(ls.pace[m], Int(min_pace)), @@ -218,85 +225,95 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int # It's easy to see that if there is any solution to the instance, then there is also one where we only strike at 8 clues # (or not at all) -> Just strike later if neccessary # So, we decrease the solution space with this formulation, but do not change whether it's empty or not - Iff(ls.strike[m], And(ls.discard_any[m], Not(ls.play[m]), Equals(ls.clues[m-1], Int(16 if instance.clue_starved else 8)))), + Iff(ls.strike[m], + And(ls.discard_any[m], Not(ls.play[m]), Equals(ls.clues[m - 1], Int(16 if instance.clue_starved else 8)))), # change of strikes - *[Iff(ls.strikes[m][i], Or(ls.strikes[m-1][i], And(ls.strikes[m-1][i-1], ls.strike[m]))) for i in range(1, instance.num_strikes + 1)], + *[Iff(ls.strikes[m][i], Or(ls.strikes[m - 1][i], And(ls.strikes[m - 1][i - 1], ls.strike[m]))) for i in + range(1, instance.num_strikes + 1)], # less than 0 clues not allowed - Implies(Not(ls.discard_any[m]), Or(GE(ls.clues[m-1], Int(1)), ls.dummyturn[m])), + Implies(Not(ls.discard_any[m]), Or(GE(ls.clues[m - 1], Int(1)), ls.dummyturn[m])), # we can only draw card i if the last ls.drawn card was i-1 - *[Implies(ls.draw[m][i], Or(And(ls.draw[m0][i-1], *[Not(ls.draw_any[m1]) for m1 in range(m0+1, m)]) for m0 in range(max(first_turn - 1, m-9), m))) for i in range(game_state.progress, instance.deck_size)], + *[Implies(ls.draw[m][i], Or( + And(ls.draw[m0][i - 1], *[Not(ls.draw_any[m1]) for m1 in range(m0 + 1, m)]) for m0 in + range(max(first_turn - 1, m - 9), m))) for i in range(game_state.progress, instance.deck_size)], # we can only draw at most one card (NOTE: redundant, FIXME: avoid quadratic formula) AtMostOne(ls.draw[m][i] for i in range(game_state.progress, instance.deck_size)), # we can only discard a card if we drew it earlier... - *[Implies(ls.discard[m][i], Or(ls.draw[m0][i] for m0 in range(m-instance.num_players, first_turn - 1, -instance.num_players))) for i in range(game_state.progress, instance.deck_size)], + *[Implies(ls.discard[m][i], + Or(ls.draw[m0][i] for m0 in range(m - instance.num_players, first_turn - 1, -instance.num_players))) + for i in range(game_state.progress, instance.deck_size)], # ...or if it was part of the initial hand - *[Not(ls.discard[m][i]) for i in range(0, game_state.progress) if i not in starting_hands[m % instance.num_players] ], + *[Not(ls.discard[m][i]) for i in range(0, game_state.progress) if + i not in starting_hands[m % instance.num_players]], # we can only discard a card if we did not discard it yet - *[Implies(ls.discard[m][i], And(Not(ls.discard[m0][i]) for m0 in range(m-instance.num_players, first_turn - 1, -instance.num_players))) for i in range(instance.deck_size)], + *[Implies(ls.discard[m][i], And( + Not(ls.discard[m0][i]) for m0 in range(m - instance.num_players, first_turn - 1, -instance.num_players))) + for i in range(instance.deck_size)], # we can only discard at most one card (FIXME: avoid quadratic formula) AtMostOne(ls.discard[m][i] for i in range(instance.deck_size)), # we can only play a card if it matches the progress *[Implies( - And(ls.discard[m][i], ls.play[m]), - And( - Not(ls.progress[m-1][instance.deck[i].suitIndex, instance.deck[i].rank]), - ls.progress[m-1][instance.deck[i].suitIndex, instance.deck[i].rank-1 ] - ) - ) - for i in range(instance.deck_size) - ], + And(ls.discard[m][i], ls.play[m]), + And( + Not(ls.progress[m - 1][instance.deck[i].suitIndex, instance.deck[i].rank]), + ls.progress[m - 1][instance.deck[i].suitIndex, instance.deck[i].rank - 1] + ) + ) + for i in range(instance.deck_size) + ], # change of progress *[ Iff( ls.progress[m][s, r], Or( - ls.progress[m-1][s, r], + ls.progress[m - 1][s, r], And(ls.play[m], Or(ls.discard[m][i] - for i in range(0, instance.deck_size) - if instance.deck[i] == DeckCard(s, r) )) - ) + for i in range(0, instance.deck_size) + if instance.deck[i] == hanab_game.DeckCard(s, r))) ) + ) for s in range(0, instance.num_suits) for r in range(1, 6) - ], + ], # extra round bool - Iff(ls.extraround[m], Or(ls.extraround[m-1], ls.draw[m-1][instance.deck_size-1])), + Iff(ls.extraround[m], Or(ls.extraround[m - 1], ls.draw[m - 1][instance.deck_size - 1])), # dummy turn bool - *[Iff(ls.dummyturn[m], Or(ls.dummyturn[m-1], ls.draw[m-1 - instance.num_players][instance.deck_size-1])) for i in range(0,1) if m >= instance.num_players] + *[Iff(ls.dummyturn[m], Or(ls.dummyturn[m - 1], ls.draw[m - 1 - instance.num_players][instance.deck_size - 1])) + for i in range(0, 1) if m >= instance.num_players] ) win = And( # maximum progress at each color - *[ls.progress[instance.max_winning_moves-1][s, 5] for s in range(0, instance.num_suits)], + *[ls.progress[instance.max_winning_moves - 1][s, 5] for s in range(0, instance.num_suits)], # played every color/value combination (NOTE: redundant, but makes solving faster) *[ Or( - And(ls.discard[m][i], ls.play[m]) - for m in range(first_turn, instance.max_winning_moves) - for i in range(instance.deck_size) - if game_state.deck[i] == DeckCard(s, r) - ) - for s in range(0, instance.num_suits) - for r in range(1, 6) - if r > game_state.stacks[s] - ] + And(ls.discard[m][i], ls.play[m]) + for m in range(first_turn, instance.max_winning_moves) + for i in range(instance.deck_size) + if game_state.deck[i] == hanab_game.DeckCard(s, r) + ) + for s in range(0, instance.num_suits) + for r in range(1, 6) + if r > game_state.stacks[s] + ] ) constraints = And(*[valid_move(m) for m in range(first_turn, instance.max_winning_moves)], win) -# print('Solving instance with {} variables, {} nodes'.format(len(get_atoms(constraints)), get_formula_size(constraints))) + # print('Solving instance with {} variables, {} nodes'.format(len(get_atoms(constraints)), get_formula_size(constraints))) model = get_model(constraints) if model: @@ -304,11 +321,11 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int solution = evaluate_model(model, copy.deepcopy(game_state), ls) return True, solution else: - #conj = list(conjunctive_partition(constraints)) - #print('statements: {}'.format(len(conj))) - #ucore = get_unsat_core(conj) - #print('unsat core size: {}'.format(len(ucore))) - #for f in ucore: + # conj = list(conjunctive_partition(constraints)) + # print('statements: {}'.format(len(conj))) + # ucore = get_unsat_core(conj) + # print('unsat core size: {}'.format(len(ucore))) + # for f in ucore: # print(f.serialize()) return False, None @@ -322,24 +339,29 @@ def log_model(model, cur_game_state, ls: Literals): logger.debug('=== move {} ==='.format(m)) logger.debug('clues: {}'.format(model.get_py_value(ls.clues[m]))) logger.debug('strikes: ' + ''.join(str(i) for i in range(1, 3) if model.get_py_value(ls.strikes[m][i]))) - logger.debug('draw: ' + ', '.join('{}: {}'.format(i, deck[i]) for i in range(cur_game_state.progress, cur_game_state.instance.deck_size) if model.get_py_value(ls.draw[m][i]))) - logger.debug('discard: ' + ', '.join('{}: {}'.format(i, deck[i]) for i in range(cur_game_state.instance.deck_size) if model.get_py_value(ls.discard[m][i]))) + logger.debug('draw: ' + ', '.join( + '{}: {}'.format(i, deck[i]) for i in range(cur_game_state.progress, cur_game_state.instance.deck_size) if + model.get_py_value(ls.draw[m][i]))) + logger.debug('discard: ' + ', '.join( + '{}: {}'.format(i, deck[i]) for i in range(cur_game_state.instance.deck_size) if + model.get_py_value(ls.discard[m][i]))) logger.debug('pace: {}'.format(model.get_py_value(ls.pace[m]))) for s in range(0, cur_game_state.instance.num_suits): - logger.debug('progress {}: '.format(COLOR_INITIALS[s]) + ''.join(str(r) for r in range(1, 6) if model.get_py_value(ls.progress[m][s, r]))) + logger.debug('progress {}: '.format(constants.COLOR_INITIALS[s]) + ''.join( + str(r) for r in range(1, 6) if model.get_py_value(ls.progress[m][s, r]))) flags = ['discard_any', 'draw_any', 'play', 'play5', 'incr_clues', 'strike', 'extraround', 'dummyturn'] logger.debug(', '.join(f for f in flags if model.get_py_value(getattr(ls, f)[m]))) - # given the initial game state and the model found by the SAT solver, # evaluates the model to produce a full game history -def evaluate_model(model, cur_game_state: GameState, ls: Literals) -> GameState: +def evaluate_model(model, cur_game_state: hanab_game.GameState, ls: Literals) -> hanab_game.GameState: for m in range(len(cur_game_state.actions), cur_game_state.instance.max_winning_moves): if model.get_py_value(ls.dummyturn[m]) or cur_game_state.is_over(): break if model.get_py_value(ls.discard_any[m]): - card_idx = next(i for i in range(0, cur_game_state.instance.deck_size) if model.get_py_value(ls.discard[m][i])) + card_idx = next( + i for i in range(0, cur_game_state.instance.deck_size) if model.get_py_value(ls.discard[m][i])) if model.get_py_value(ls.play[m]) or model.get_py_value(ls.strike[m]): cur_game_state.play(card_idx) else: @@ -348,39 +370,3 @@ def evaluate_model(model, cur_game_state: GameState, ls: Literals) -> GameState: cur_game_state.clue() return cur_game_state - - - -def run_deck(): - puzzle = True - if puzzle: - deck_str = 'p5 p3 b4 r5 y4 y4 y5 r4 b2 y2 y3 g5 g2 g3 g4 p4 r3 b2 b3 b3 p4 b1 p2 b1 b1 p2 p1 p1 g1 r4 g1 r1 r3 r1 g1 r1 p1 b4 p3 g2 g3 g4 b5 y1 y1 y1 r2 r2 y2 y3' - - deck = [DeckCard(COLOR_INITIALS.index(c[0]), int(c[1])) for c in deck_str.split(" ")] - num_p = 5 - else: - deck_str = "15gfvqluvuwaqnmrkpkaignlaxpjbmsprksfcddeybfixchuhtwo" - deck_str = "15diuknfwhqbplsrlkxjuvfbwyacoaxgtudcerskqfnhpgampmiv" - deck_str = "15jdxlpobvikrnhkslcuwggimtphafquqfvcwadampxkeyfrbnsu" - deck = decompress_deck(deck_str) - num_p = 6 - - print(deck) - - gs = GameState(HanabiInstance(deck, num_p)) - if puzzle: - gs.play(2) - else: - strat = GreedyStrategy(gs) - for _ in range(17): - strat.make_move() - - solvable, sol = solve_sat(gs, 0) - if solvable: - print(sol) - print(link(sol)) - else: - print('unsolvable') - -if __name__ == "__main__": - run_deck() diff --git a/test.py b/test.py index e6c212d..cd0b40d 100644 --- a/test.py +++ b/test.py @@ -1,18 +1,9 @@ -import json - -import alive_progress -import requests - -from variants import Variant -from variants import Suit, variant_name -from site_api import * -from download_data import download_games, detailed_export_game -from check_game import check_game -from compress import link -from database.database import conn, cur - -from database.init_database import init_database_tables, populate_static_tables +from hanabi.live.variants import Variant +from hanabi.live.variants import Suit +from hanabi.live.download_data import download_games, detailed_export_game +from hanabi.database.database import conn, cur +from hanabi.hanabi_cli import hanabi_cli def find_double_dark_games(): cur.execute("SELECT variants.id, variants.name, count(suits.id) from variants " @@ -74,6 +65,8 @@ def export_all_seeds(): if __name__ == "__main__": + hanabi_cli() + exit(0) find_double_dark_games() exit(0) var_id = 964532