Refactor imports, remove code in imported files

We now only use relative imports for files in the same directory
Also, only modules are imported, never classes/functions etc
Furthermore, main methods in package files have been removed,
since they do not belong there
This commit is contained in:
Maximilian Keßler 2023-07-04 20:06:06 +02:00
parent 6ae72a4b03
commit a93601c997
Signed by: max
GPG key ID: BCC5A619923C0BA5
16 changed files with 441 additions and 511 deletions

View file

@ -12,7 +12,7 @@ cur = conn.cursor()
# populate_static_tables() # populate_static_tables()
class Game(): class Game:
def __init__(self, info=None): def __init__(self, info=None):
self.id = -1 self.id = -1
self.num_players = -1 self.num_players = -1

View file

@ -1,6 +1,6 @@
/* Database schema for the tables storing information on available hanab.live variants, suits and colors */ /* Database schema for the tables storing information on available hanab.live variants, suits and colors */
/* Available suits. The associated id is arbitrary upon initial generation, but fixed for referentiability */ /* Available suits. The associated id is arbitrary upon initial generation, but fixed afterwards for identification */
DROP TABLE IF EXISTS suits CASCADE; DROP TABLE IF EXISTS suits CASCADE;
CREATE TABLE suits ( CREATE TABLE suits (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
@ -27,7 +27,7 @@ CREATE TABLE suits (
); );
CREATE INDEX suits_name_idx ON suits (name); CREATE INDEX suits_name_idx ON suits (name);
/* Available color clues. The indexing is arbitrary upon initial generation, but fixed for referentiability */ /* Available color clues. The indexing is arbitrary upon initial generation, but fixed afterwards for identification */
DROP TABLE IF EXISTS colors CASCADE; DROP TABLE IF EXISTS colors CASCADE;
CREATE TABLE colors ( CREATE TABLE colors (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
@ -99,7 +99,7 @@ CREATE TABLE variants (
*/ */
special_rank_ranks SMALLINT NOT NULL DEFAULT 1, special_rank_ranks SMALLINT NOT NULL DEFAULT 1,
/** /**
Encodes how cards of the special rank (if present) are touched by colorss, Encodes how cards of the special rank (if present) are touched by colors,
in the same manner how we encoded in @table suits in the same manner how we encoded in @table suits
*/ */
special_rank_colors SMALLINT NOT NULL DEFAULT 1, special_rank_colors SMALLINT NOT NULL DEFAULT 1,

View file

@ -1,4 +1,4 @@
from typing import Optional, List from typing import Optional, List, Generator
from enum import Enum from enum import Enum
from termcolor import colored from termcolor import colored
@ -30,7 +30,7 @@ class DeckCard:
return 1000 * self.suitIndex + self.rank return 1000 * self.suitIndex + self.rank
def pp_deck(deck: List[DeckCard]) -> str: def pp_deck(deck: Generator[DeckCard, None, None]) -> str:
return "[" + ", ".join(card.colorize() for card in deck) + "]" return "[" + ", ".join(card.colorize() for card in deck) + "]"

View file

@ -4,10 +4,10 @@ import argparse
import verboselogs import verboselogs
from hanabi import logger from hanabi import logger, logger_manager
from hanabi.live.check_game import check_game from hanabi.live import check_game
from hanabi.live.download_data import detailed_export_game from hanabi.live import download_data
from hanabi.live.compress import link from hanabi.live import compress
""" """
init db + populate tables init db + populate tables
@ -39,16 +39,16 @@ def add_analyze_subparser(subparsers):
def analyze_game(game_id: int, download: bool = False): def analyze_game(game_id: int, download: bool = False):
if download: if download:
detailed_export_game(game_id) download_data.detailed_export_game(game_id)
logger.info('Analyzing game {}'.format(game_id)) logger.info('Analyzing game {}'.format(game_id))
turn, sol = check_game(game_id) turn, sol = check_game.check_game(game_id)
if turn == 0: if turn == 0:
logger.info('Instance is unfeasible') logger.info('Instance is unfeasible')
else: else:
logger.info('Game was first lost after {} turns.'.format(turn)) logger.info('Game was first lost after {} turns.'.format(turn))
logger.info( logger.info(
'A replay achieving perfect score from the previous turn onwards is: {}#{}' 'A replay achieving perfect score from the previous turn onwards is: {}#{}'
.format(link(sol), turn) .format(compress.link(sol), turn)
) )
@ -66,8 +66,7 @@ def main_parser() -> argparse.ArgumentParser:
return parser return parser
def hanabi_cli():
if __name__ == "__main__":
args = main_parser().parse_args() args = main_parser().parse_args()
switcher = { switcher = {
'analyze': analyze_game 'analyze': analyze_game
@ -78,3 +77,7 @@ if __name__ == "__main__":
method_args.pop('command') method_args.pop('command')
method_args.pop('verbose') method_args.pop('verbose')
switcher[args.command](**method_args) switcher[args.command](**method_args)
if __name__ == "__main__":
hanabi_cli()

View file

@ -1,12 +1,12 @@
import copy import copy
from typing import Tuple from typing import Tuple
from hanabi.database import conn
from hanabi.live.compress import decompress_deck, decompress_actions, link
from hanabi.game import GameState
from hanabi.live.hanab_live import HanabLiveInstance, HanabLiveGameState
from hanabi.solvers.sat import solve_sat
from hanabi import logger from hanabi import logger
from hanabi import database
from hanabi import hanab_game
from hanabi.live import hanab_live
from hanabi.live import compress
from hanabi.solvers import sat
# returns minimal number T of turns (from game) after which instance was infeasible # returns minimal number T of turns (from game) after which instance was infeasible
@ -16,9 +16,9 @@ from hanabi import logger
# returns 1 if instance is feasible but first turn is suboptimal # returns 1 if instance is feasible but first turn is suboptimal
# ... # ...
# # turns + 1 if the final state is still winning # # turns + 1 if the final state is still winning
def check_game(game_id: int) -> Tuple[int, GameState]: def check_game(game_id: int) -> Tuple[int, hanab_game.GameState]:
logger.debug("Analysing game {}".format(game_id)) logger.debug("Analysing game {}".format(game_id))
with conn.cursor() as cur: with database.conn.cursor() as cur:
cur.execute("SELECT games.num_players, deck, actions, score, games.variant_id FROM games " cur.execute("SELECT games.num_players, deck, actions, score, games.variant_id FROM games "
"INNER JOIN seeds ON seeds.seed = games.seed " "INNER JOIN seeds ON seeds.seed = games.seed "
"WHERE games.id = (%s)", "WHERE games.id = (%s)",
@ -28,25 +28,25 @@ def check_game(game_id: int) -> Tuple[int, GameState]:
if res is None: if res is None:
raise ValueError("No game associated with id {} in database.".format(game_id)) raise ValueError("No game associated with id {} in database.".format(game_id))
(num_players, compressed_deck, compressed_actions, score, variant_id) = res (num_players, compressed_deck, compressed_actions, score, variant_id) = res
deck = decompress_deck(compressed_deck) deck = compress.decompress_deck(compressed_deck)
actions = decompress_actions(compressed_actions) actions = compress.decompress_actions(compressed_actions)
instance = HanabLiveInstance(deck, num_players, variant_id=variant_id) instance = hanab_live.HanabLiveInstance(deck, num_players, variant_id=variant_id)
# check if the instance is already won # check if the instance is already won
if instance.max_score == score: if instance.max_score == score:
game = HanabLiveGameState(instance) game = hanab_live.HanabLiveGameState(instance)
for action in actions: for action in actions:
game.make_action(action) game.make_action(action)
# instance has been won, nothing to compute here # instance has been won, nothing to compute here
return len(actions) + 1, game return len(actions) + 1, game
# first, check if the instance itself is feasible: # first, check if the instance itself is feasible:
game = HanabLiveGameState(instance) game = hanab_live.HanabLiveGameState(instance)
solvable, solution = solve_sat(game) solvable, solution = sat.solve_sat(game)
if not solvable: if not solvable:
return 0, solution return 0, solution
logger.verbose("Instance {} is feasible after 0 turns: {}".format(game_id, link(solution))) logger.verbose("Instance {} is feasible after 0 turns: {}".format(game_id, compress.link(solution)))
# store lower and upper bounds of numbers of turns after which we know the game was feasible / infeasible # store lower and upper bounds of numbers of turns after which we know the game was feasible / infeasible
solvable_turn = 0 solvable_turn = 0
@ -59,13 +59,13 @@ def check_game(game_id: int) -> Tuple[int, GameState]:
for a in range(solvable_turn, try_turn): for a in range(solvable_turn, try_turn):
try_game.make_action(actions[a]) try_game.make_action(actions[a])
logger.debug("Checking if instance {} is feasible after {} turns.".format(game_id, try_turn)) logger.debug("Checking if instance {} is feasible after {} turns.".format(game_id, try_turn))
solvable, potential_sol = solve_sat(try_game) solvable, potential_sol = sat.solve_sat(try_game)
if solvable: if solvable:
solution = potential_sol solution = potential_sol
game = try_game game = try_game
solvable_turn = try_turn solvable_turn = try_turn
logger.verbose("Instance {} is feasible after {} turns: {}#{}" logger.verbose("Instance {} is feasible after {} turns: {}#{}"
.format(game_id, solvable_turn, link(solution), solvable_turn + 1)) .format(game_id, solvable_turn, compress.link(solution), solvable_turn + 1))
else: else:
unsolvable_turn = try_turn unsolvable_turn = try_turn
logger.verbose("Instance {} is not feasible after {} turns.".format(game_id, unsolvable_turn)) logger.verbose("Instance {} is not feasible after {} turns.".format(game_id, unsolvable_turn))

View file

@ -1,191 +1,190 @@
#! /bin/python3
import sys
import more_itertools
from typing import List, Union from typing import List, Union
from hanabi.game import DeckCard, ActionType, Action, GameState, HanabiInstance import more_itertools
from hanab_live import HanabLiveGameState, HanabLiveInstance
from hanabi import hanab_game
from hanabi.live import hanab_live
# use same BASE62 as on hanab.live to encode decks # use same BASE62 as on hanab.live to encode decks
BASE62 = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; BASE62 = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"
# Helper method, iterate over chunks of length n in a string # Helper method, iterate over chunks of length n in a string
def chunks(s: str, n: int): def chunks(s: str, n: int):
for i in range(0, len(s), n): for i in range(0, len(s), n):
yield s[i:i+n] yield s[i:i + n]
# exception thrown by decompression methods if parsing fails # exception thrown by decompression methods if parsing fails
class InvalidFormatError(ValueError): class InvalidFormatError(ValueError):
pass pass
def compress_actions(actions: List[Action], game_id=None) -> str: def compress_actions(actions: List[hanab_game.Action]) -> str:
minType = 0 min_type = 0
maxType = 0 max_type = 0
if len(actions) != 0: if len(actions) != 0:
minType = min(map(lambda a: a.type.value, actions)) min_type = min(map(lambda a: a.type.value, actions))
maxType = max(map(lambda a: a.type.value, actions)) max_type = max(map(lambda a: a.type.value, actions))
typeRange = maxType - minType + 1 type_range = max_type - min_type + 1
def compress_action(action): def compress_action(action):
## We encode action values with +1 to differentiate # We encode action values with +1 to differentiate
# null (encoded 0) and 0 (encoded 1) # null (encoded 0) and 0 (encoded 1)
value = 0 if action.value is None else action.value + 1 value = 0 if action.value is None else action.value + 1
if action.type == ActionType.VoteTerminate: if action.type == hanab_game.ActionType.VoteTerminate:
# This is currently a hack, the actual format has a 10 here # This is currently a hack, the actual format has a 10 here,
# but we cannot encode this # but we cannot encode this
value = 0 value = 0
try: try:
a = BASE62[typeRange * value + (action.type.value - minType)] a = BASE62[type_range * value + (action.type.value - min_type)]
b = BASE62[action.target] b = BASE62[action.target]
except IndexError as e: except IndexError as e:
raise ValueError("Encoding action failed, value too large, found {}".format(value)) from e raise ValueError("Encoding action failed, value too large, found {}".format(value)) from e
return a + b return a + b
return "{}{}{}".format( return "{}{}{}".format(
minType, min_type,
maxType, max_type,
''.join(map(compress_action, actions)) ''.join(map(compress_action, actions))
) )
def decompress_actions(actions_str: str) -> List[Action]: def decompress_actions(actions_str: str) -> List[hanab_game.Action]:
if not len(actions_str) >= 2: if not len(actions_str) >= 2:
raise InvalidFormatError("min/max range not specified, found: {}".format(actions_str)) raise InvalidFormatError("min/max range not specified, found: {}".format(actions_str))
try: try:
minType = int(actions_str[0]) min_type = int(actions_str[0])
maxType = int(actions_str[1]) max_type = int(actions_str[1])
except ValueError as e: except ValueError as e:
raise InvalidFormatError( raise InvalidFormatError(
"min/max range of actions not specified, expected two integers, found {}".format(actions_str[:2]) "min/max range of actions not specified, expected two integers, found {}".format(actions_str[:2])
) from e ) from e
if not minType <= maxType: if not min_type <= max_type:
raise InvalidFormatError("min/max range illegal, found [{},{}]".format(minType, maxType)) raise InvalidFormatError("min/max range illegal, found [{},{}]".format(min_type, max_type))
typeRange = maxType - minType + 1 type_range = max_type - min_type + 1
if not len(actions_str) % 2 == 0: if not len(actions_str) % 2 == 0:
raise InvalidFormatError("Invalid action string length: Expected even number of characters") raise InvalidFormatError("Invalid action string length: Expected even number of characters")
for (index, char) in enumerate(actions_str[2:]): for (index, char) in enumerate(actions_str[2:]):
if not char in BASE62: if char not in BASE62:
raise InvalidFormatError( raise InvalidFormatError(
"Invalid character at index {}: Found {}, expected one of {}".format( "Invalid character at index {}: Found {}, expected one of {}".format(
index, char, BASE62 index, char, BASE62
) )
) )
def decompress_action(index, action): def decompress_action(action_idx: int, action: str):
try: try:
action_type_value = (BASE62.index(action[0]) % typeRange) + minType action_type_value = (BASE62.index(action[0]) % type_range) + min_type
action_type = ActionType(action_type_value) action_type = hanab_game.ActionType(action_type_value)
except ValueError as e: except ValueError as e:
raise InvalidFormatError( raise InvalidFormatError(
"Invalid action type at action {}: Found {}, expected one of {}".format( "Invalid action type at action {}: Found {}, expected one of {}".format(
index, action_type_value, action_idx, action_type_value,
[action_type.value for action_type in ActionType] [action_type.value for action_type in hanab_game.ActionType]
) )
) from e ) from e
## We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1)
value = BASE62.index(action[0]) // typeRange - 1 # We encode values with +1 to differentiate null (encoded 0) and 0 (encoded 1)
value = BASE62.index(action[0]) // type_range - 1
if value == -1: if value == -1:
value = None value = None
if action_type in [ActionType.Play, ActionType.Discard]: if action_type in [hanab_game.ActionType.Play, hanab_game.ActionType.Discard]:
if value is not None: if value is not None:
raise InvalidFormatError( raise InvalidFormatError(
"Invalid action value: Action at action index {} is Play/Discard, expected value None, found: {}".format(index, value) "Invalid action value: Action at action index {} is Play/Discard, expected value None, "
"found: {}".format(action_idx, value)
) )
target = BASE62.index(action[1]) target = BASE62.index(action[1])
return Action(action_type, target, value) return hanab_game.Action(action_type, target, value)
return [decompress_action(idx, a) for (idx, a) in enumerate(chunks(actions_str[2:], 2))] return [decompress_action(idx, a) for (idx, a) in enumerate(chunks(actions_str[2:], 2))]
def compress_deck(deck: List[DeckCard]) -> str: def compress_deck(deck: List[hanab_game.DeckCard]) -> str:
assert(len(deck) != 0) assert (len(deck) != 0)
minRank = min(map(lambda c: c.rank, deck)) min_rank = min(map(lambda card: card.rank, deck))
maxRank = max(map(lambda c: c.rank, deck)) max_rank = max(map(lambda card: card.rank, deck))
rankRange = maxRank - minRank + 1 rank_range = max_rank - min_rank + 1
def compress_card(card): def compress_card(card):
try: try:
return BASE62[rankRange * card.suitIndex + (card.rank - minRank)] return BASE62[rank_range * card.suitIndex + (card.rank - min_rank)]
except IndexError as e: except IndexError as e:
raise InvalidFormatError( raise InvalidFormatError(
"Could not compress card, suit or rank too large. Found: {}".format(card) "Could not compress card, suit or rank too large. Found: {}".format(card)
) from e ) from e
return "{}{}{}".format( return "{}{}{}".format(
minRank, min_rank,
maxRank, max_rank,
''.join(map(compress_card, deck)) ''.join(map(compress_card, deck))
) )
def decompress_deck(deck_str: str) -> List[DeckCard]: def decompress_deck(deck_str: str) -> List[hanab_game.DeckCard]:
if len(deck_str) < 2: if len(deck_str) < 2:
raise InvalidFormatError("min/max rank range not specified, found: {}".format(deck_str)) raise InvalidFormatError("min/max rank range not specified, found: {}".format(deck_str))
try: try:
minRank = int(deck_str[0]) min_rank = int(deck_str[0])
maxRank = int(deck_str[1]) max_rank = int(deck_str[1])
except ValueError as e: except ValueError as e:
raise InvalidFormatError( raise InvalidFormatError(
"min/max rank range not specified, expected two integers, found {}".format(deck_str[:2]) "min/max rank range not specified, expected two integers, found {}".format(deck_str[:2])
) from e ) from e
if not maxRank >= minRank: if not max_rank >= min_rank:
raise InvalidFormatError( raise InvalidFormatError(
"Invalid rank range, found [{},{}]".format(minRank, maxRank) "Invalid rank range, found [{},{}]".format(min_rank, max_rank)
) )
rankRange = maxRank - minRank + 1 rank_range = max_rank - min_rank + 1
for (index, char) in enumerate(deck_str[2:]): for (index, char) in enumerate(deck_str[2:]):
if not char in BASE62: if char not in BASE62:
raise InvalidFormatError( raise InvalidFormatError(
"Invalid character at index {}: Found {}, expected one of {}".format( "Invalid character at index {}: Found {}, expected one of {}".format(
index, char, BASE62 index, char, BASE62
) )
) )
def decompress_card(card_char): def decompress_card(card_char):
index = BASE62.index(card_char) encoded = BASE62.index(card_char)
suitIndex = index // rankRange suit_index = encoded // rank_range
rank = index % rankRange + minRank rank = encoded % rank_range + min_rank
return DeckCard(suitIndex, rank) return hanab_game.DeckCard(suit_index, rank)
return [decompress_card(c) for c in deck_str[2:]] return [decompress_card(card) for card in deck_str[2:]]
# compresses a standard GameState object into hanab.live format # compresses a standard GameState object into hanab.live format
# which can be used in json replay links # which can be used in json replay links
# The GameState object has to be standard / fitting hanab.live variants, # The GameState object has to be standard / fitting hanab.live variants,
# otherwise compression is not possible # otherwise compression is not possible
def compress_game_state(state: Union[GameState, HanabLiveGameState]) -> str: def compress_game_state(state: Union[hanab_game.GameState, hanab_live.HanabLiveGameState]) -> str:
var_id = -1 if isinstance(state, hanab_live.HanabLiveGameState):
if isinstance(state, HanabLiveGameState):
var_id = state.instance.variant_id var_id = state.instance.variant_id
else: else:
assert isinstance(state, GameState) assert isinstance(state, hanab_game.GameState)
var_id = HanabLiveInstance.select_standard_variant_id(state.instance) var_id = hanab_live.HanabLiveInstance.select_standard_variant_id(state.instance)
out = "{}{},{},{}".format( out = "{}{},{},{}".format(
state.instance.num_players, state.instance.num_players,
compress_deck(state.instance.deck), compress_deck(state.instance.deck),
compress_actions(state.actions), compress_actions(state.actions),
var_id var_id
) )
with_dashes = ''.join(more_itertools.intersperse("-", out, 20)) with_dashes = ''.join(more_itertools.intersperse("-", out, 20))
return with_dashes return with_dashes
def decompress_game_state(game_str: str) -> GameState: def decompress_game_state(game_str: str) -> hanab_game.GameState:
game_str = game_str.replace("-", "") game_str = game_str.replace("-", "")
parts = game_str.split(",") parts = game_str.split(",")
if not len(parts) == 3: if not len(parts) == 3:
raise InvalidFormatError( raise InvalidFormatError(
"Expected 3 comma-separated parts of game, found {}".format( "Expected 3 comma-separated parts of game, found {}".format(
len(parts) len(parts)
) )
) )
[players_deck, actions, variant_id] = parts [players_deck, actions, variant_id] = parts
if len(players_deck) == 0: if len(players_deck) == 0:
@ -212,35 +211,14 @@ def decompress_game_state(game_str: str) -> GameState:
except ValueError: except ValueError:
raise ValueError("Expected variant id, found: {}".format(variant_id)) raise ValueError("Expected variant id, found: {}".format(variant_id))
instance = HanabiInstance(deck, num_players) instance = hanab_game.HanabiInstance(deck, num_players)
game = GameState(instance) game = hanab_game.GameState(instance)
# TODO: game is not in consistent state # TODO: game is not in consistent state
game.actions = actions game.actions = actions
return game return game
def link(game_state: GameState) -> str: def link(game_state: hanab_game.GameState) -> str:
compressed = compress_game_state(game_state) compressed = compress_game_state(game_state)
return "https://hanab.live/shared-replay-json/{}".format(compressed) return "https://hanab.live/shared-replay-json/{}".format(compressed)
# add link method to GameState class
GameState.link = link
if __name__ == "__main__":
for arg in sys.argv[1:]:
deck = decompress_deck(arg)
c = compress_deck(deck)
assert(c == arg)
print(deck)
inst = HanabiInstance(deck, 5, variant_id = 32)
game = GameState(inst)
game.play(1)
game.play(5)
game.clue()
print(game.link())

View file

@ -3,11 +3,12 @@ from typing import Dict, Optional
import psycopg2.errors import psycopg2.errors
from hanabi.live.site_api import get, api from hanabi import hanab_game
from hanabi.database.database import conn, cur from hanabi.database import database
from hanabi.live.compress import compress_deck, compress_actions, DeckCard, Action, InvalidFormatError from hanabi.live import site_api
from hanabi.live.variants import variant_id, variant_name from hanabi.live import compress
from hanab_live import HanabLiveInstance, HanabLiveGameState from hanabi.live import variants
from hanabi.live import hanab_live
from hanabi import logger from hanabi import logger
@ -30,29 +31,29 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti
assert_msg = "Invalid response format from hanab.live while exporting game id {}".format(game_id) assert_msg = "Invalid response format from hanab.live while exporting game id {}".format(game_id)
game_json = get("export/{}".format(game_id)) game_json = site_api.get("export/{}".format(game_id))
assert game_json.get('id') == game_id, assert_msg assert game_json.get('id') == game_id, assert_msg
players = game_json.get('players', []) players = game_json.get('players', [])
num_players = len(players) num_players = len(players)
seed = game_json.get('seed', None) seed = game_json.get('seed', None)
options = game_json.get('options', {}) options = game_json.get('options', {})
var_id = var_id or variant_id(options.get('variant', 'No Variant')) var_id = var_id or variants.variant_id(options.get('variant', 'No Variant'))
deck_plays = options.get('deckPlays', False) deck_plays = options.get('deckPlays', False)
one_extra_card = options.get('oneExtraCard', False) one_extra_card = options.get('oneExtraCard', False)
one_less_card = options.get('oneLessCard', False) one_less_card = options.get('oneLessCard', False)
all_or_nothing = options.get('allOrNothing', False) all_or_nothing = options.get('allOrNothing', False)
starting_player = options.get('startingPlayer', 0) starting_player = options.get('startingPlayer', 0)
actions = [Action.from_json(action) for action in game_json.get('actions', [])] actions = [hanab_game.Action.from_json(action) for action in game_json.get('actions', [])]
deck = [DeckCard.from_json(card) for card in game_json.get('deck', None)] deck = [hanab_game.DeckCard.from_json(card) for card in game_json.get('deck', None)]
assert players != [], assert_msg assert players != [], assert_msg
assert seed is not None, assert_msg assert seed is not None, assert_msg
if score is None: if score is None:
# need to play through the game once to find out its score # need to play through the game once to find out its score
game = HanabLiveGameState( game = hanab_live.HanabLiveGameState(
HanabLiveInstance( hanab_live.HanabLiveInstance(
deck, num_players, var_id, deck, num_players, var_id,
deck_plays=deck_plays, deck_plays=deck_plays,
one_less_card=one_less_card, one_less_card=one_less_card,
@ -67,18 +68,18 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti
score = game.score score = game.score
try: try:
compressed_deck = compress_deck(deck) compressed_deck = compress.compress_deck(deck)
except InvalidFormatError: except compress.InvalidFormatError:
logger.error("Failed to compress deck while exporting game {}: {}".format(game_id, deck)) logger.error("Failed to compress deck while exporting game {}: {}".format(game_id, deck))
raise raise
try: try:
compressed_actions = compress_actions(actions) compressed_actions = compress.compress_actions(actions)
except InvalidFormatError: except compress.InvalidFormatError:
logger.error("Failed to compress actions while exporting game {}".format(game_id)) logger.error("Failed to compress actions while exporting game {}".format(game_id))
raise raise
if not seed_exists: if not seed_exists:
cur.execute( database.cur.execute(
"INSERT INTO seeds (seed, num_players, variant_id, deck)" "INSERT INTO seeds (seed, num_players, variant_id, deck)"
"VALUES (%s, %s, %s, %s)" "VALUES (%s, %s, %s, %s)"
"ON CONFLICT (seed) DO NOTHING", "ON CONFLICT (seed) DO NOTHING",
@ -86,7 +87,7 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti
) )
logger.debug("New seed {} imported.".format(seed)) logger.debug("New seed {} imported.".format(seed))
cur.execute( database.cur.execute(
"INSERT INTO games (" "INSERT INTO games ("
"id, num_players, starting_player, score, seed, variant_id, deck_plays, one_extra_card, one_less_card," "id, num_players, starting_player, score, seed, variant_id, deck_plays, one_extra_card, one_less_card,"
"all_or_nothing, actions" "all_or_nothing, actions"
@ -115,9 +116,9 @@ def process_game_row(game: Dict, var_id):
if any(v is None for v in [game_id, seed, num_players, score]): if any(v is None for v in [game_id, seed, num_players, score]):
raise ValueError("Unknown response format on hanab.live") raise ValueError("Unknown response format on hanab.live")
cur.execute("SAVEPOINT seed_insert") database.cur.execute("SAVEPOINT seed_insert")
try: try:
cur.execute( database.cur.execute(
"INSERT INTO games (id, seed, num_players, score, variant_id)" "INSERT INTO games (id, seed, num_players, score, variant_id)"
"VALUES" "VALUES"
"(%s, %s ,%s ,%s ,%s)" "(%s, %s ,%s ,%s ,%s)"
@ -125,20 +126,20 @@ def process_game_row(game: Dict, var_id):
(game_id, seed, num_players, score, var_id) (game_id, seed, num_players, score, var_id)
) )
except psycopg2.errors.ForeignKeyViolation: except psycopg2.errors.ForeignKeyViolation:
cur.execute("ROLLBACK TO seed_insert") database.cur.execute("ROLLBACK TO seed_insert")
detailed_export_game(game_id, score, var_id) detailed_export_game(game_id, score, var_id)
cur.execute("RELEASE seed_insert") database.cur.execute("RELEASE seed_insert")
logger.debug("Imported game {}".format(game_id)) logger.debug("Imported game {}".format(game_id))
def download_games(var_id): def download_games(var_id):
name = variant_name(var_id) name = variants.variant_name(var_id)
page_size = 100 page_size = 100
if name is None: if name is None:
raise ValueError("{} is not a known variant_id.".format(var_id)) raise ValueError("{} is not a known variant_id.".format(var_id))
url = "variants/{}".format(var_id) url = "variants/{}".format(var_id)
r = api(url, refresh=True) r = site_api.api(url, refresh=True)
if not r: if not r:
raise RuntimeError("Failed to download request from hanab.live") raise RuntimeError("Failed to download request from hanab.live")
@ -146,12 +147,12 @@ def download_games(var_id):
if num_entries is None: if num_entries is None:
raise ValueError("Unknown response format on hanab.live") raise ValueError("Unknown response format on hanab.live")
cur.execute( database.cur.execute(
"SELECT COUNT(*) FROM games WHERE variant_id = %s AND id <= " "SELECT COUNT(*) FROM games WHERE variant_id = %s AND id <= "
"(SELECT COALESCE (last_game_id, 0) FROM variant_game_downloads WHERE variant_id = %s)", "(SELECT COALESCE (last_game_id, 0) FROM variant_game_downloads WHERE variant_id = %s)",
(var_id, var_id) (var_id, var_id)
) )
num_already_downloaded_games = cur.fetchone()[0] num_already_downloaded_games = database.cur.fetchone()[0]
assert num_already_downloaded_games <= num_entries, "Database inconsistent, too many games present." assert num_already_downloaded_games <= num_entries, "Database inconsistent, too many games present."
next_page = num_already_downloaded_games // page_size next_page = num_already_downloaded_games // page_size
last_page = (num_entries - 1) // page_size last_page = (num_entries - 1) // page_size
@ -171,7 +172,7 @@ def download_games(var_id):
enrich_print=False enrich_print=False
) as bar: ) as bar:
for page in range(next_page, last_page + 1): for page in range(next_page, last_page + 1):
r = api(url + "?col[0]=0&page={}".format(page), refresh=page == last_page) r = site_api.api(url + "?col[0]=0&page={}".format(page), refresh=page == last_page)
rows = r.get('rows', []) rows = r.get('rows', [])
if page == next_page: if page == next_page:
rows = rows[num_already_downloaded_games % 100:] rows = rows[num_already_downloaded_games % 100:]
@ -180,11 +181,10 @@ def download_games(var_id):
for row in rows: for row in rows:
process_game_row(row, var_id) process_game_row(row, var_id)
bar() bar()
cur.execute( database.cur.execute(
"INSERT INTO variant_game_downloads (variant_id, last_game_id) VALUES" "INSERT INTO variant_game_downloads (variant_id, last_game_id) VALUES"
"(%s, %s)" "(%s, %s)"
"ON CONFLICT (variant_id) DO UPDATE SET last_game_id = EXCLUDED.last_game_id", "ON CONFLICT (variant_id) DO UPDATE SET last_game_id = EXCLUDED.last_game_id",
(var_id, r['rows'][-1]['id']) (var_id, r['rows'][-1]['id'])
) )
conn.commit() database.conn.commit()

View file

@ -1,14 +1,14 @@
from typing import List from typing import List
import hanabi from hanabi import hanab_game
from hanabi import constants from hanabi import constants
from hanabi.live.variants import Variant from hanabi.live import variants
class HanabLiveInstance(hanabi.HanabiInstance): class HanabLiveInstance(hanab_game.HanabiInstance):
def __init__( def __init__(
self, self,
deck: List[hanabi.DeckCard], deck: List[hanab_game.DeckCard],
num_players: int, num_players: int,
variant_id: int, variant_id: int,
one_extra_card: bool = False, one_extra_card: bool = False,
@ -24,10 +24,10 @@ class HanabLiveInstance(hanabi.HanabiInstance):
super().__init__(deck, num_players, hand_size=hand_size, *args, **kwargs) super().__init__(deck, num_players, hand_size=hand_size, *args, **kwargs)
self.variant_id = variant_id self.variant_id = variant_id
self.variant = Variant.from_db(self.variant_id) self.variant = variants.Variant.from_db(self.variant_id)
@staticmethod @staticmethod
def select_standard_variant_id(instance: hanabi.HanabiInstance): def select_standard_variant_id(instance: hanab_game.HanabiInstance):
err_msg = "Hanabi instance not supported by hanab.live, cannot convert to HanabLiveInstance: " err_msg = "Hanabi instance not supported by hanab.live, cannot convert to HanabLiveInstance: "
assert 3 <= instance.num_suits <= 6, \ assert 3 <= instance.num_suits <= 6, \
err_msg + "Illegal number of suits ({}) found, must be in range [3,6]".format(instance.num_suits) err_msg + "Illegal number of suits ({}) found, must be in range [3,6]".format(instance.num_suits)
@ -40,40 +40,40 @@ class HanabLiveInstance(hanabi.HanabiInstance):
return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[instance.num_suits][instance.num_dark_suits] return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[instance.num_suits][instance.num_dark_suits]
class HanabLiveGameState(hanabi.GameState): class HanabLiveGameState(hanab_game.GameState):
def __init__(self, instance: HanabLiveInstance, starting_player: int = 0): def __init__(self, instance: HanabLiveInstance, starting_player: int = 0):
super().__init__(instance, starting_player) super().__init__(instance, starting_player)
self.instance: HanabLiveInstance = instance self.instance: HanabLiveInstance = instance
def make_action(self, action): def make_action(self, action):
match action.type: match action.type:
case hanabi.ActionType.ColorClue | hanabi.ActionType.RankClue: case hanab_game.ActionType.ColorClue | hanab_game.ActionType.RankClue:
assert(self.clues > 0) assert(self.clues > 0)
self.actions.append(action) self.actions.append(action)
self.clues -= self.instance.clue_increment self.clues -= self.instance.clue_increment
self._make_turn() self._make_turn()
# TODO: could check that the clue specified is in fact legal # TODO: could check that the clue specified is in fact legal
case hanabi.ActionType.Play: case hanab_game.ActionType.Play:
self.play(action.target) self.play(action.target)
case hanabi.ActionType.Discard: case hanab_game.ActionType.Discard:
self.discard(action.target) self.discard(action.target)
case hanabi.ActionType.EndGame | hanabi.ActionType.VoteTerminate: case hanab_game.ActionType.EndGame | hanab_game.ActionType.VoteTerminate:
self.over = True self.over = True
def _waste_clue(self) -> hanabi.Action: def _waste_clue(self) -> hanab_game.Action:
for player in range(self.turn + 1, self.turn + self.num_players): for player in range(self.turn + 1, self.turn + self.num_players):
for card in self.hands[player % self.num_players]: for card in self.hands[player % self.num_players]:
for rank in self.instance.variant.ranks: for rank in self.instance.variant.ranks:
if self.instance.variant.rank_touches(card, rank): if self.instance.variant.rank_touches(card, rank):
return hanabi.Action( return hanab_game.Action(
hanabi.ActionType.RankClue, hanab_game.ActionType.RankClue,
player % self.num_players, player % self.num_players,
rank rank
) )
for color in range(self.instance.variant.num_colors): for color in range(self.instance.variant.num_colors):
if self.instance.variant.color_touches(card, color): if self.instance.variant.color_touches(card, color):
return hanabi.Action( return hanab_game.Action(
hanabi.ActionType.ColorClue, hanab_game.ActionType.ColorClue,
player % self.num_players, player % self.num_players,
color color
) )

View file

@ -3,26 +3,26 @@ import pebble.concurrent
import concurrent.futures import concurrent.futures
import traceback import traceback
import alive_progress
import threading
import time
from hanabi.solvers.sat import solve_sat
from hanabi.database.database import conn, cur
from hanabi.live.download_data import detailed_export_game
from alive_progress import alive_bar
from hanabi.live.compress import decompress_deck, link
from hanabi.game import HanabiInstance
from threading import Lock
from time import perf_counter
from hanabi.solvers.greedy_solver import GameState, GreedyStrategy
from hanabi import logger from hanabi import logger
from hanabi.solvers.deck_analyzer import analyze, InfeasibilityReason from hanabi.solvers.sat import solve_sat
from hanabi.live.variants import Variant from hanabi.database import database
from hanabi.live import download_data
from hanabi.live import compress
from hanabi import hanab_game
from hanabi.solvers import greedy_solver
from hanabi.solvers import deck_analyzer
from hanabi.live import variants
MAX_PROCESSES = 6 MAX_PROCESSES = 6
def update_seeds_db(): def update_seeds_db():
cur2 = conn.cursor() cur2 = database.conn.cursor()
with conn.cursor() as cur: with database.conn.cursor() as cur:
cur.execute("SELECT num_players, seed, variant_id from games;") cur.execute("SELECT num_players, seed, variant_id from games;")
for (num_players, seed, variant_id) in cur: for (num_players, seed, variant_id) in cur:
cur2.execute("SELECT COUNT(*) from seeds WHERE seed = (%s);", (seed,)) cur2.execute("SELECT COUNT(*) from seeds WHERE seed = (%s);", (seed,))
@ -34,47 +34,47 @@ def update_seeds_db():
"(%s, %s, %s)", "(%s, %s, %s)",
(seed, num_players, variant_id) (seed, num_players, variant_id)
) )
conn.commit() database.conn.commit()
else: else:
print("seed {} already found in DB".format(seed)) print("seed {} already found in DB".format(seed))
def get_decks_of_seeds(): def get_decks_of_seeds():
cur2 = conn.cursor() cur2 = database.conn.cursor()
cur.execute("SELECT seed, variant_id FROM seeds WHERE deck is NULL") database.cur.execute("SELECT seed, variant_id FROM seeds WHERE deck is NULL")
for (seed, variant_id) in cur: for (seed, variant_id) in database.cur:
cur2.execute("SELECT id FROM games WHERE seed = (%s) LIMIT 1", (seed,)) cur2.execute("SELECT id FROM games WHERE seed = (%s) LIMIT 1", (seed,))
(game_id,) = cur2.fetchone() (game_id,) = cur2.fetchone()
logger.verbose("Exporting game {} for seed {}.".format(game_id, seed)) logger.verbose("Exporting game {} for seed {}.".format(game_id, seed))
detailed_export_game(game_id, var_id=variant_id, seed_exists=True) download_data.detailed_export_game(game_id, var_id=variant_id, seed_exists=True)
conn.commit() database.conn.commit()
def update_trivially_feasible_games(variant_id): def update_trivially_feasible_games(variant_id):
variant: Variant = Variant.from_db(variant_id) variant: variants.Variant = variants.Variant.from_db(variant_id)
cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,)) database.cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,))
seeds = cur.fetchall() seeds = database.cur.fetchall()
print('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds))) print('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds)))
with alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar: with alive_progress.alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar:
for (seed,) in seeds: for (seed,) in seeds:
cur.execute("SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing " database.cur.execute("SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing "
"FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;", "FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;",
(variant.max_score, seed) (variant.max_score, seed)
) )
res = cur.fetchall() res = database.cur.fetchall()
logger.debug("Checking seed {}: {:3} results".format(seed, len(res))) logger.debug("Checking seed {}: {:3} results".format(seed, len(res)))
for (game_id, a, b, c, d) in res: for (game_id, a, b, c, d) in res:
if None in [a, b, c, d]: if None in [a, b, c, d]:
logger.debug(' Game {} not found in database, exporting...'.format(game_id)) logger.debug(' Game {} not found in database, exporting...'.format(game_id))
detailed_export_game(game_id, var_id=variant_id) download_data.detailed_export_game(game_id, var_id=variant_id)
else: else:
logger.debug(' Game {} already in database'.format(game_id, valid)) logger.debug(' Game {} already in database'.format(game_id, valid))
valid = not any([a, b, c, d]) valid = not any([a, b, c, d])
if valid: if valid:
logger.verbose('Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id)) logger.verbose('Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id))
cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (True, seed)) database.cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (True, seed))
conn.commit() database.conn.commit()
break break
else: else:
logger.verbose(' Cheaty game found') logger.verbose(' Cheaty game found')
@ -82,7 +82,7 @@ def update_trivially_feasible_games(variant_id):
def get_decks_for_all_seeds(): def get_decks_for_all_seeds():
cur = conn.cursor() cur = database.conn.database.cursor()
cur.execute("SELECT id " cur.execute("SELECT id "
"FROM games " "FROM games "
" INNER JOIN seeds " " INNER JOIN seeds "
@ -96,26 +96,26 @@ def get_decks_for_all_seeds():
) )
print("Exporting decks for all seeds") print("Exporting decks for all seeds")
res = cur.fetchall() res = cur.fetchall()
with alive_bar(len(res), title="Exporting decks") as bar: with alive_progress.alive_bar(len(res), title="Exporting decks") as bar:
for (game_id,) in res: for (game_id,) in res:
detailed_export_game(game_id) download_data.detailed_export_game(game_id)
bar() bar()
mutex = Lock() mutex = threading.Lock()
def solve_instance(instance: HanabiInstance): def solve_instance(instance: hanab_game.HanabiInstance):
# first, sanity check on running out of pace # first, sanity check on running out of pace
result = analyze(instance) result = deck_analyzer.analyze(instance)
if result is not None: if result is not None:
assert type(result) == InfeasibilityReason assert type(result) == deck_analyzer.InfeasibilityReason
logger.debug("found infeasible deck") logger.debug("found infeasible deck")
return False, None, None return False, None, None
for num_remaining_cards in [0, 20]: for num_remaining_cards in [0, 20]:
# logger.info("trying with {} remaining cards".format(num_remaining_cards)) # logger.info("trying with {} remaining cards".format(num_remaining_cards))
game = GameState(instance) game = hanab_game.GameState(instance)
strat = GreedyStrategy(game) strat = greedy_solver.GreedyStrategy(game)
# make a number of greedy moves # make a number of greedy moves
while not game.is_over() and not game.is_known_lost(): while not game.is_over() and not game.is_known_lost():
@ -136,10 +136,10 @@ def solve_instance(instance: HanabiInstance):
return True, sol, num_remaining_cards return True, sol, num_remaining_cards
logger.debug( logger.debug(
"No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format( "No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(
num_remaining_cards, link(game))) num_remaining_cards, compress.link(game)))
# print("Aborting trying with greedy strat") # print("Aborting trying with greedy strat")
logger.debug("Starting full SAT solver") logger.debug("Starting full SAT solver")
game = GameState(instance) game = hanab_game.GameState(instance)
a, b = solve_sat(game) a, b = solve_sat(game)
return a, b, instance.draw_pile_size return a, b, instance.draw_pile_size
@ -148,21 +148,21 @@ def solve_instance(instance: HanabiInstance):
def solve_seed_with_timeout(seed, num_players, deck_compressed, var_name: Optional[str] = None): def solve_seed_with_timeout(seed, num_players, deck_compressed, var_name: Optional[str] = None):
try: try:
logger.verbose("Starting to solve seed {}".format(seed)) logger.verbose("Starting to solve seed {}".format(seed))
deck = decompress_deck(deck_compressed) deck = compress.decompress_deck(deck_compressed)
t0 = perf_counter() t0 = time.perf_counter()
solvable, solution, num_remaining_cards = solve_instance(HanabiInstance(deck, num_players)) solvable, solution, num_remaining_cards = solve_instance(hanab_game.HanabiInstance(deck, num_players))
t1 = perf_counter() t1 = time.perf_counter()
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), solvable)) logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), solvable))
mutex.acquire() mutex.acquire()
if solvable is not None: if solvable is not None:
cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed)) database.cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed))
conn.commit() database.conn.commit()
mutex.release() mutex.release()
if solvable == True: if solvable == True:
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format( logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
num_remaining_cards, seed, link(solution)) num_remaining_cards, seed, compress.link(solution))
) )
elif solvable == False: elif solvable == False:
logger.debug("seed {} was not solvable".format(seed)) logger.debug("seed {} was not solvable".format(seed))
@ -187,18 +187,14 @@ def solve_seed(seed, num_players, deck_compressed, var_name: Optional[str] = Non
def solve_unknown_seeds(variant_id, variant_name: Optional[str] = None): def solve_unknown_seeds(variant_id, variant_name: Optional[str] = None):
cur.execute("SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL", (variant_id,)) database.cur.execute(
res = cur.fetchall() "SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL",
(variant_id,)
# for r in res: )
# solve_seed(r[0], r[1], r[2], variant_name) res = database.cur.fetchall()
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor: with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
fs = [executor.submit(solve_seed, r[0], r[1], r[2], variant_name) for r in res] fs = [executor.submit(solve_seed, r[0], r[1], r[2], variant_name) for r in res]
with alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar: with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for f in concurrent.futures.as_completed(fs): for f in concurrent.futures.as_completed(fs):
bar() bar()
update_trivially_feasible_games(0)
solve_unknown_seeds(0, "No Variant")

View file

@ -1,6 +1,6 @@
import enum import enum
from typing import List, Optional from typing import List, Optional
from hanabi.game import DeckCard, ActionType from hanabi import hanab_game
from hanabi.database.database import cur from hanabi.database.database import cur
@ -161,7 +161,7 @@ class Variant:
def _synesthesia_ranks(self, color_value: int) -> List[int]: def _synesthesia_ranks(self, color_value: int) -> List[int]:
return [rank for rank in self.ranks if (rank - color_value) % len(self.colors) == 0] return [rank for rank in self.ranks if (rank - color_value) % len(self.colors) == 0]
def rank_touches(self, card: DeckCard, value: int): def rank_touches(self, card: hanab_game.DeckCard, value: int):
assert 0 <= card.suitIndex < self.num_suits,\ assert 0 <= card.suitIndex < self.num_suits,\
f"Unexpected card {card}, suitIndex {card.suitIndex} out of bounds for {self.num_suits} suits." f"Unexpected card {card}, suitIndex {card.suitIndex} out of bounds for {self.num_suits} suits."
assert not self.no_rank_clues, "Cluing rank not allowed in this variant." assert not self.no_rank_clues, "Cluing rank not allowed in this variant."
@ -186,7 +186,7 @@ class Variant:
ranks = self._preprocess_rank(value) ranks = self._preprocess_rank(value)
return any(self.suits[card.suitIndex].rank_touches(card.rank, rank) for rank in ranks) return any(self.suits[card.suitIndex].rank_touches(card.rank, rank) for rank in ranks)
def color_touches(self, card: DeckCard, value: int): def color_touches(self, card: hanab_game.DeckCard, value: int):
assert 0 <= card.suitIndex < self.num_suits, \ assert 0 <= card.suitIndex < self.num_suits, \
f"Unexpected card {card}, suitIndex {card.suitIndex} out of bounds for {self.num_suits} suits." f"Unexpected card {card}, suitIndex {card.suitIndex} out of bounds for {self.num_suits} suits."
assert not self.no_color_clues, "Cluing color not allowed in this variant." assert not self.no_color_clues, "Cluing color not allowed in this variant."

View file

@ -24,7 +24,6 @@ class LoggerManager:
'%(message)s' '%(message)s'
) )
self.console_handler = logging.StreamHandler() self.console_handler = logging.StreamHandler()
self.console_handler.setLevel(console_level) self.console_handler.setLevel(console_level)
self.console_handler.setFormatter(self.nothing_formatter) self.console_handler.setFormatter(self.nothing_formatter)

View file

@ -1,14 +1,14 @@
from hanabi.live.compress import DeckCard from hanabi.live import compress
from enum import Enum from enum import Enum
from hanabi.database import conn from hanabi.database import database
from hanabi.game import HanabiInstance, pp_deck from hanabi import hanab_game
from hanabi.live.compress import decompress_deck from hanabi.live import compress
class InfeasibilityType(Enum): class InfeasibilityType(Enum):
OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is
OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit
NotTrivial = 2 NotTrivial = 2
CritAtBottom = 3 CritAtBottom = 3
@ -32,17 +32,17 @@ class InfeasibilityReason():
def analyze_suit(occurrences): def analyze_suit(occurrences):
# denotes the indexes of copies we can use wlog # denotes the indexes of copies we can use wlog
picks = { picks = {
1: 0, 1: 0,
**{ r: None for r in range(2, 5) }, **{r: None for r in range(2, 5)},
5: 0 5: 0
} }
# denotes the intervals when cards will be played wlog # denotes the intervals when cards will be played wlog
play_times = { play_times = {
1: [occurrences[1][0]], 1: [occurrences[1][0]],
**{ r: None for _ in range(instance.num_suits) **{r: None for _ in range(instance.num_suits)
for r in range(2,6) for r in range(2, 6)
} }
} }
print("occurrences are: {}".format(occurrences)) print("occurrences are: {}".format(occurrences))
@ -51,7 +51,7 @@ def analyze_suit(occurrences):
# general analysis # general analysis
earliest_play = max(min(play_times[rank - 1]), min(occurrences[rank])) earliest_play = max(min(play_times[rank - 1]), min(occurrences[rank]))
latest_play = max( *play_times[rank - 1], *occurrences[rank]) latest_play = max(*play_times[rank - 1], *occurrences[rank])
play_times[rank] = [earliest_play, latest_play] play_times[rank] = [earliest_play, latest_play]
# check a few extra cases regarding the picks when the rank is not 5 # check a few extra cases regarding the picks when the rank is not 5
@ -62,30 +62,28 @@ def analyze_suit(occurrences):
play_times[rank] = [min(occurrences[rank])] play_times[rank] = [min(occurrences[rank])]
continue continue
# check if the second copy is not worse than the first when it comes, # check if the second copy is not worse than the first when it comes,
# because we either have to wait for smaller cards anyway # because we either have to wait for smaller cards anyway
# or the next card is not there anyway # or the next card is not there anyway
if max(occurrences[rank]) < max(earliest_play, min(occurrences[rank + 1])): if max(occurrences[rank]) < max(earliest_play, min(occurrences[rank + 1])):
picks[rank] = 1 picks[rank] = 1
return picks, play_times return picks, play_times
def analyze_card_usage(instance: hanab_game.HanabiInstance):
def analyze_card_usage(instance: HanabiInstance):
storage_size = instance.num_players * instance.hand_size storage_size = instance.num_players * instance.hand_size
for suit in range(instance.num_suits): for suit in range(instance.num_suits):
print("analysing suit {}: {}".format( print("analysing suit {}: {}".format(
suit, suit,
pp_deck((c for c in instance.deck if c.suitIndex == suit)) hanab_game.pp_deck((c for c in instance.deck if c.suitIndex == suit))
) )
) )
occurrences = { occurrences = {
rank: [max(0, i - storage_size + 1) for (i, card) in enumerate(instance.deck) if card == DeckCard(suit, rank)] rank: [max(0, i - storage_size + 1) for (i, card) in enumerate(instance.deck) if
for rank in range(1,6) card == hanab_game.DeckCard(suit, rank)]
for rank in range(1, 6)
} }
picks, play_times = analyze_suit(occurrences) picks, play_times = analyze_suit(occurrences)
@ -96,9 +94,7 @@ def analyze_card_usage(instance: HanabiInstance):
print() print()
def analyze(instance: hanab_game.HanabiInstance, find_non_trivial=False) -> InfeasibilityReason | None:
def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityReason | None:
if instance.deck[-1].rank != 5 and instance.deck[-1].suitIndex + instance.num_dark_suits >= instance.num_suits: if instance.deck[-1].rank != 5 and instance.deck[-1].suitIndex + instance.num_dark_suits >= instance.num_suits:
return InfeasibilityReason(InfeasibilityType.CritAtBottom, instance.deck_size - 1) return InfeasibilityReason(InfeasibilityType.CritAtBottom, instance.deck_size - 1)
@ -121,7 +117,7 @@ def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityRe
stacks[card.suitIndex] += 1 stacks[card.suitIndex] += 1
# check for further playables that we stored # check for further playables that we stored
for check_rank in range(card.rank + 1, 6): for check_rank in range(card.rank + 1, 6):
check_card = DeckCard(card.suitIndex, check_rank) check_card = hanab_game.DeckCard(card.suitIndex, check_rank)
if check_card in stored_cards: if check_card in stored_cards:
stacks[card.suitIndex] += 1 stacks[card.suitIndex] += 1
stored_cards.remove(check_card) stored_cards.remove(check_card)
@ -130,34 +126,34 @@ def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityRe
else: else:
break break
elif card.rank <= stacks[card.suitIndex]: elif card.rank <= stacks[card.suitIndex]:
pass # card is trash pass # card is trash
elif card.rank > stacks[card.suitIndex] + 1: elif card.rank > stacks[card.suitIndex] + 1:
# need to store card # need to store card
if card in stored_cards or card.rank == 5: if card in stored_cards or card.rank == 5:
stored_crits.add(card) stored_crits.add(card)
stored_cards.add(card) stored_cards.add(card)
## check for out of handsize: # check for out of handsize:
if len(stored_crits) == instance.num_players * instance.hand_size: if len(stored_crits) == instance.num_players * instance.hand_size:
return InfeasibilityReason(InfeasibilityType.OutOfHandSize, i) return InfeasibilityReason(InfeasibilityType.OutOfHandSize, i)
if find_non_trivial and len(stored_cards) == instance.num_players * instance.hand_size: if find_non_trivial and len(stored_cards) == instance.num_players * instance.hand_size:
ret = InfeasibilityReason(InfeasibilityType.NotTrivial, i) ret = InfeasibilityReason(InfeasibilityType.NotTrivial, i)
# the last - 1 is there because we have to discard 'next', causing a further draw # the last - 1 is there because we have to discard 'next', causing a further draw
max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1 max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1
needed_plays = 5 * instance.num_suits - sum(stacks) needed_plays = 5 * instance.num_suits - sum(stacks)
missing = max_remaining_plays - needed_plays missing = max_remaining_plays - needed_plays
if missing < min_forced_pace: if missing < min_forced_pace:
# print("update to {}: {}".format(i, missing)) # print("update to {}: {}".format(i, missing))
min_forced_pace = missing min_forced_pace = missing
worst_index = i worst_index = i
# check that we correctly walked through the deck # check that we correctly walked through the deck
assert(len(stored_cards) == 0) assert (len(stored_cards) == 0)
assert(len(stored_crits) == 0) assert (len(stored_crits) == 0)
assert(sum(stacks) == 5 * instance.num_suits) assert (sum(stacks) == 5 * instance.num_suits)
if min_forced_pace < 0: if min_forced_pace < 0:
return InfeasibilityReason(InfeasibilityType.OutOfPace, worst_index, min_forced_pace) return InfeasibilityReason(InfeasibilityType.OutOfPace, worst_index, min_forced_pace)
@ -168,10 +164,12 @@ def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityRe
def run_on_database(): def run_on_database():
cur = conn.cursor() cur = database.conn.cursor()
cur2 = conn.cursor() cur2 = database.conn.cursor()
for num_p in range(2, 6): for num_p in range(2, 6):
cur.execute("SELECT seed, num_players, deck from seeds where variant_id = 0 and num_players = (%s) order by seed asc", (num_p,)) cur.execute(
"SELECT seed, num_players, deck from seeds where variant_id = 0 and num_players = (%s) order by seed asc",
(num_p,))
res = cur.fetchall() res = cur.fetchall()
hand = 0 hand = 0
pace = 0 pace = 0
@ -179,11 +177,11 @@ def run_on_database():
d = None d = None
print("Checking {} {}-player seeds from database".format(len(res), num_p)) print("Checking {} {}-player seeds from database".format(len(res), num_p))
for (seed, num_players, deck) in res: for (seed, num_players, deck) in res:
deck = decompress_deck(deck) deck = compress.decompress_deck(deck)
a = analyze(HanabiInstance(deck, num_players), True) a = analyze(hanab_game.HanabiInstance(deck, num_players), True)
if type(a) == InfeasibilityReason: if type(a) == InfeasibilityReason:
if a.type == InfeasibilityType.OutOfHandSize: if a.type == InfeasibilityType.OutOfHandSize:
# print("Seed {} infeasible: {}\n{}".format(seed, a, deck)) # print("Seed {} infeasible: {}\n{}".format(seed, a, deck))
hand += 1 hand += 1
elif a.type == InfeasibilityType.OutOfPace: elif a.type == InfeasibilityType.OutOfPace:
pace += 1 pace += 1
@ -191,28 +189,12 @@ def run_on_database():
non_trivial += 1 non_trivial += 1
d = seed, deck d = seed, deck
print("Found {} seeds running out of hand size, {} running out of pace and {} that are not trivial".format(hand, pace, non_trivial)) print("Found {} seeds running out of hand size, {} running out of pace and {} that are not trivial".format(hand,
pace,
non_trivial))
if d is not None: if d is not None:
print("example non-trivial deck (seed {}): [{}]" print("example non-trivial deck (seed {}): [{}]".format(
.format( d[0],
d[0], ", ".join(c.colorize() for c in d[1])
", ".join(c.colorize() for c in d[1]) ))
)
)
print() print()
# if p < 0:
# print("seed {} ({} players) runs out of pace ({}) after drawing {}: {}:\n{}".format(seed, num_players, p, i, deck[i], deck))
# cur.execute("UPDATE seeds SET feasible = f WHERE seed = (%s)", seed)
if __name__ == "__main__":
# print(deck)
# a = analyze(deck, 4)
# print(a)
# run_on_database()
deck_str = "15bcfwnqsdmbnfuvhskrgfixwckklojxgemrhpqppuaaiyadultv"
deck_str = "15misofrmvvuxujkphaqpcflegysdwqaakcilbxtuhwfrbgdnpkn"
deck_str = "15wqpvhdkufjcrewyxulvarhgolkixmfgmndbpstqbupcanfisak"
deck = decompress_deck(deck_str)
print(pp_deck(deck))
instance = HanabiInstance(deck, 2)
analyze_card_usage(instance)

View file

@ -1,13 +1,14 @@
#! /bin/python3 #! /bin/python3
import collections import collections
import sys import sys
from enum import Enum from enum import Enum
from hanabi import logger
from typing import Optional from typing import Optional
from hanabi.game import DeckCard, GameState, HanabiInstance from hanabi import logger
from hanabi.live.compress import link, decompress_deck from hanabi import hanab_game
from hanabi.database.database import conn from hanabi.live import compress
from hanabi.database import database
class CardType(Enum): class CardType(Enum):
@ -19,8 +20,8 @@ class CardType(Enum):
UniqueVisible = 4 UniqueVisible = 4
class CardState(): class CardState:
def __init__(self, card_type: CardType, card: DeckCard, weight=1): def __init__(self, card_type: CardType, card: hanab_game.DeckCard, weight: Optional[int] = 1):
self.card_type = card_type self.card_type = card_type
self.card = card self.card = card
self.weight = weight self.weight = weight
@ -66,7 +67,7 @@ class WeightedCard:
class HandState: class HandState:
def __init__(self, player: int, game_state: GameState): def __init__(self, player: int, game_state: hanab_game.GameState):
self.trash = [] self.trash = []
self.playable = [] self.playable = []
self.critical = [] self.critical = []
@ -111,14 +112,14 @@ class HandState:
else: else:
assert len(self.critical) > 0, "Programming error." assert len(self.critical) > 0, "Programming error."
self.best_discard = self.critical[-1] self.best_discard = self.critical[-1]
self.discard_badness = 600 - 100*self.best_discard.card.rank self.discard_badness = 600 - 100 * self.best_discard.card.rank
def num_useful_cards(self): def num_useful_cards(self):
return len(self.dupes) + len(self.uniques) + len(self.playable) + len(self.critical) return len(self.dupes) + len(self.uniques) + len(self.playable) + len(self.critical)
class CheatingStrategy: class CheatingStrategy:
def __init__(self, game_state: GameState): def __init__(self, game_state: hanab_game.GameState):
self.game_state = game_state self.game_state = game_state
def make_move(self): def make_move(self):
@ -135,10 +136,8 @@ class CheatingStrategy:
exit(0) exit(0)
class GreedyStrategy(): class GreedyStrategy():
def __init__(self, game_state: GameState): def __init__(self, game_state: hanab_game.GameState):
self.game_state = game_state self.game_state = game_state
self.earliest_draw_times = [] self.earliest_draw_times = []
@ -146,7 +145,7 @@ class GreedyStrategy():
self.earliest_draw_times.append([]) self.earliest_draw_times.append([])
for r in range(1, 6): for r in range(1, 6):
self.earliest_draw_times[s].append(max( self.earliest_draw_times[s].append(max(
game_state.deck.index(DeckCard(s, r)) - game_state.hand_size * game_state.num_players + 1, game_state.deck.index(hanab_game.DeckCard(s, r)) - game_state.hand_size * game_state.num_players + 1,
0 if r == 1 else self.earliest_draw_times[s][r - 2] 0 if r == 1 else self.earliest_draw_times[s][r - 2]
)) ))
@ -188,7 +187,7 @@ class GreedyStrategy():
copy_holders = set(self.game_state.holding_players(state.card)) copy_holders = set(self.game_state.holding_players(state.card))
copy_holders.remove(player) copy_holders.remove(player)
connecting_holders = set( connecting_holders = set(
self.game_state.holding_players(DeckCard(state.card.suitIndex, state.card.rank + 1))) self.game_state.holding_players(hanab_game.DeckCard(state.card.suitIndex, state.card.rank + 1)))
if len(copy_holders) == 0: if len(copy_holders) == 0:
# card is unique, imortancy is based lexicographically on whether somebody has the conn. card and the rank # card is unique, imortancy is based lexicographically on whether somebody has the conn. card and the rank
@ -244,8 +243,8 @@ class GreedyStrategy():
self.game_state.clue() self.game_state.clue()
def run_deck(instance: HanabiInstance) -> GameState: def run_deck(instance: hanab_game.HanabiInstance) -> hanab_game.GameState:
gs = GameState(instance) gs = hanab_game.GameState(instance)
strat = CheatingStrategy(gs) strat = CheatingStrategy(gs)
while not gs.is_over(): while not gs.is_over():
strat.make_move() strat.make_move()
@ -256,7 +255,7 @@ def run_samples(num_players, sample_size):
logger.info("Running {} test games on {} players using greedy strategy.".format(sample_size, num_players)) logger.info("Running {} test games on {} players using greedy strategy.".format(sample_size, num_players))
won = 0 won = 0
lost = 0 lost = 0
cur = conn.cursor() cur = database.conn.cursor()
cur.execute( cur.execute(
"SELECT seed, num_players, deck, variant_id " "SELECT seed, num_players, deck, variant_id "
"FROM seeds WHERE variant_id = 0 AND num_players = (%s)" "FROM seeds WHERE variant_id = 0 AND num_players = (%s)"
@ -264,13 +263,13 @@ def run_samples(num_players, sample_size):
(num_players, sample_size)) (num_players, sample_size))
for r in cur: for r in cur:
seed, num_players, deck_str, var_id = r seed, num_players, deck_str, var_id = r
deck = decompress_deck(deck_str) deck = compress.decompress_deck(deck_str)
instance = HanabiInstance(deck, num_players) instance = hanab_game.HanabiInstance(deck, num_players)
final_game_state = run_deck(instance) final_game_state = run_deck(instance)
if final_game_state.score != instance.max_score: if final_game_state.score != instance.max_score:
logger.verbose( logger.verbose(
"Greedy strategy lost {}-player seed {:10} {}:\n{}" "Greedy strategy lost {}-player seed {:10} {}:\n{}"
.format(num_players, seed, str(deck), link(final_game_state)) .format(num_players, seed, str(deck), compress.link(final_game_state))
) )
lost += 1 lost += 1
else: else:
@ -279,9 +278,3 @@ def run_samples(num_players, sample_size):
logger.info("Won {} ({}%) and lost {} ({}%) from sample of {} test games using greedy strategy.".format( logger.info("Won {} ({}%) and lost {} ({}%) from sample of {} test games using greedy strategy.".format(
won, round(100 * won / sample_size, 2), lost, round(100 * lost / sample_size, 2), sample_size won, round(100 * won / sample_size, 2), lost, round(100 * lost / sample_size, 2), sample_size
)) ))
if __name__ == "__main__":
for p in range(2, 6):
run_samples(p, int(sys.argv[1]))
print()

View file

@ -1,13 +1,12 @@
import copy import copy
from pysmt.shortcuts import Symbol, Bool, Not, Implies, Iff, And, Or, AtMostOne, get_model, Equals, GE, NotEquals, Int
from pysmt.typing import INT
from typing import Optional, Tuple from typing import Optional, Tuple
from hanabi.game import DeckCard, GameState, HanabiInstance from pysmt.shortcuts import Symbol, Bool, Not, Implies, Iff, And, Or, AtMostOne, get_model, Equals, GE, NotEquals, Int
from hanabi.live.compress import link, decompress_deck from pysmt.typing import INT
from greedy_solver import GreedyStrategy
from hanabi.constants import COLOR_INITIALS
from hanabi import logger from hanabi import logger
from hanabi import constants
from hanabi import hanab_game
# literals to model game as sat instance to check for feasibility # literals to model game as sat instance to check for feasibility
@ -15,16 +14,15 @@ from hanabi import logger
class Literals(): class Literals():
# num_suits is total number of suits, i.e. also counts the dark suits # num_suits is total number of suits, i.e. also counts the dark suits
# default distribution among all suits is assumed # default distribution among all suits is assumed
def __init__(self, instance: HanabiInstance): def __init__(self, instance: hanab_game.HanabiInstance):
# clues[m][i] == "after move m we have i clues", in clue starved, this counts half clues # clues[m][i] == "after move m we have i clues", in clue starved, this counts half clues
self.clues = { self.clues = {
-1: Int(16 if instance.clue_starved else 8) # we have 8 clues after turn -1: Int(16 if instance.clue_starved else 8) # we have 8 clues after turn
, **{ , **{
m: Symbol('m{}clues'.format(m), INT) m: Symbol('m{}clues'.format(m), INT)
for m in range(instance.max_winning_moves) for m in range(instance.max_winning_moves)
} }
} }
self.pace = { self.pace = {
-1: Int(instance.initial_pace) -1: Int(instance.initial_pace)
@ -36,78 +34,83 @@ class Literals():
# strikes[m][i] == "after move m we have at least i strikes" # strikes[m][i] == "after move m we have at least i strikes"
self.strikes = { self.strikes = {
-1: {i: Bool(i == 0) for i in range(0, instance.num_strikes + 1)} # no strikes when we start -1: {i: Bool(i == 0) for i in range(0, instance.num_strikes + 1)} # no strikes when we start
, **{ , **{
m: { m: {
0: Bool(True), 0: Bool(True),
**{ s: Symbol('m{}strikes{}'.format(m,s)) for s in range(1, instance.num_strikes) }, **{s: Symbol('m{}strikes{}'.format(m, s)) for s in range(1, instance.num_strikes)},
instance.num_strikes: Bool(False) # never so many clues that we lose. Implicitly forbids striking out instance.num_strikes: Bool(False)
} # never so many clues that we lose. Implicitly forbids striking out
for m in range(instance.max_winning_moves) }
} for m in range(instance.max_winning_moves)
} }
}
# extraturn[m] = "turn m is a move part of the extra round or a dummy turn" # extraturn[m] = "turn m is a move part of the extra round or a dummy turn"
self.extraround = { self.extraround = {
-1: Bool(False) -1: Bool(False)
, **{ , **{
m: Bool(False) if m < instance.draw_pile_size else Symbol('m{}extra'.format(m)) # it takes at least as many turns as cards in the draw pile to start the extra round m: Bool(False) if m < instance.draw_pile_size else Symbol('m{}extra'.format(m))
for m in range(0, instance.max_winning_moves) # it takes at least as many turns as cards in the draw pile to start the extra round
} for m in range(0, instance.max_winning_moves)
} }
}
# dummyturn[m] = "turn m is a dummy nurn and not actually part of the game" # dummyturn[m] = "turn m is a dummy nurn and not actually part of the game"
self.dummyturn = { self.dummyturn = {
-1: Bool(False) -1: Bool(False)
, **{ , **{
m: Bool(False) if m < instance.draw_pile_size + instance.num_players else Symbol('m{}dummy'.format(m)) m: Bool(False) if m < instance.draw_pile_size + instance.num_players else Symbol('m{}dummy'.format(m))
for m in range(0, instance.max_winning_moves) for m in range(0, instance.max_winning_moves)
} }
} }
# draw[m][i] == "at move m we play/discard deck[i]" # draw[m][i] == "at move m we play/discard deck[i]"
self.discard = { self.discard = {
m: {i: Symbol('m{}discard{}'.format(m, i)) for i in range(instance.deck_size)} m: {i: Symbol('m{}discard{}'.format(m, i)) for i in range(instance.deck_size)}
for m in range(instance.max_winning_moves) for m in range(instance.max_winning_moves)
} }
# draw[m][i] == "at move m we draw deck card i" # draw[m][i] == "at move m we draw deck card i"
self.draw = { self.draw = {
-1: { i: Bool(i == instance.num_dealt_cards - 1) for i in range(instance.num_dealt_cards - 1, instance.deck_size) } -1: {i: Bool(i == instance.num_dealt_cards - 1) for i in
, **{ range(instance.num_dealt_cards - 1, instance.deck_size)}
m: { , **{
instance.num_dealt_cards - 1: Bool(False), m: {
**{i: Symbol('m{}draw{}'.format(m, i)) for i in range(instance.num_dealt_cards, instance.deck_size)} instance.num_dealt_cards - 1: Bool(False),
} **{i: Symbol('m{}draw{}'.format(m, i)) for i in range(instance.num_dealt_cards, instance.deck_size)}
for m in range(instance.max_winning_moves) }
} for m in range(instance.max_winning_moves)
} }
}
# strike[m] = "at move m we get a strike" # strike[m] = "at move m we get a strike"
self.strike = { self.strike = {
-1: Bool(False) -1: Bool(False)
, **{ , **{
m: Symbol('m{}newstrike'.format(m)) m: Symbol('m{}newstrike'.format(m))
for m in range(instance.max_winning_moves) for m in range(instance.max_winning_moves)
} }
} }
# progress[m][card = (suitIndex, rank)] == "after move m we have played in suitIndex up to rank" # progress[m][card = (suitIndex, rank)] == "after move m we have played in suitIndex up to rank"
self.progress = { self.progress = {
-1: {(s, r): Bool(r == 0) for s in range(0, instance.num_suits) for r in range(0, 6)} # at start, have only played rank zero -1: {(s, r): Bool(r == 0) for s in range(0, instance.num_suits) for r in range(0, 6)}
, **{ # at start, have only played rank zero
m: { , **{
**{(s, 0): Bool(True) for s in range(0, instance.num_suits)}, m: {
**{(s, r): Symbol('m{}progress{}{}'.format(m, s, r)) for s in range(0, instance.num_suits) for r in range(1, 6)} **{(s, 0): Bool(True) for s in range(0, instance.num_suits)},
} **{(s, r): Symbol('m{}progress{}{}'.format(m, s, r)) for s in range(0, instance.num_suits) for r in
for m in range(instance.max_winning_moves) range(1, 6)}
} }
} for m in range(instance.max_winning_moves)
}
}
## Utility variables ## Utility variables
# discard_any[m] == "at move m we play/discard a card" # discard_any[m] == "at move m we play/discard a card"
self.discard_any = { m: Symbol('m{}discard_any'.format(m)) for m in range(instance.max_winning_moves) } self.discard_any = {m: Symbol('m{}discard_any'.format(m)) for m in range(instance.max_winning_moves)}
# draw_any[m] == "at move m we draw a card" # draw_any[m] == "at move m we draw a card"
self.draw_any = {m: Symbol('m{}draw_any'.format(m)) for m in range(instance.max_winning_moves)} self.draw_any = {m: Symbol('m{}draw_any'.format(m)) for m in range(instance.max_winning_moves)}
@ -122,11 +125,12 @@ class Literals():
self.incr_clues = {m: Symbol('m{}c+'.format(m)) for m in range(instance.max_winning_moves)} self.incr_clues = {m: Symbol('m{}c+'.format(m)) for m in range(instance.max_winning_moves)}
def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int] = 0) -> Tuple[bool, Optional[GameState]]: def solve_sat(starting_state: hanab_game.GameState | hanab_game.HanabiInstance, min_pace: Optional[int] = 0) -> Tuple[
if isinstance(starting_state, HanabiInstance): bool, Optional[hanab_game.GameState]]:
if isinstance(starting_state, hanab_game.HanabiInstance):
instance = starting_state instance = starting_state
game_state = GameState(instance) game_state = hanab_game.GameState(instance)
elif isinstance(starting_state, GameState): elif isinstance(starting_state, hanab_game.GameState):
instance = starting_state.instance instance = starting_state.instance
game_state = starting_state game_state = starting_state
else: else:
@ -141,7 +145,7 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int
starting_hands = [[card.deck_index for card in hand] for hand in game_state.hands] starting_hands = [[card.deck_index for card in hand] for hand in game_state.hands]
first_turn = len(game_state.actions) first_turn = len(game_state.actions)
if isinstance(starting_state, GameState): if isinstance(starting_state, hanab_game.GameState):
# have to set additional variables # have to set additional variables
# set initial clues # set initial clues
@ -157,7 +161,7 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int
# check if extraround has started (usually not) # check if extraround has started (usually not)
ls.extraround[first_turn - 1] = Bool(game_state.remaining_extra_turns < game_state.num_players) ls.extraround[first_turn - 1] = Bool(game_state.remaining_extra_turns < game_state.num_players)
ls.dummyturn[first_turn -1] = Bool(False) ls.dummyturn[first_turn - 1] = Bool(False)
# set recent draws: important to model progress # set recent draws: important to model progress
# we just pretend that the last card drawn was in fact drawn last turn, # we just pretend that the last card drawn was in fact drawn last turn,
@ -169,7 +173,6 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int
for m in range(first_turn, instance.max_winning_moves): for m in range(first_turn, instance.max_winning_moves):
ls.draw[m][game_state.progress - 1] = Bool(False) ls.draw[m][game_state.progress - 1] = Bool(False)
# model initial progress # model initial progress
for s in range(0, game_state.num_suits): for s in range(0, game_state.num_suits):
for r in range(0, 6): for r in range(0, 6):
@ -195,20 +198,24 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int
Implies(ls.play[m], ls.discard_any[m]), Implies(ls.play[m], ls.discard_any[m]),
# definition of ls.play5 # definition of ls.play5
Iff(ls.play5[m], And(ls.play[m], Or(ls.discard[m][i] for i in range(instance.deck_size) if instance.deck[i].rank == 5))), Iff(ls.play5[m],
And(ls.play[m], Or(ls.discard[m][i] for i in range(instance.deck_size) if instance.deck[i].rank == 5))),
# definition of ls.incr_clues # definition of ls.incr_clues
Iff(ls.incr_clues[m], And(ls.discard_any[m], NotEquals(ls.clues[m-1], Int(16 if instance.clue_starved else 8)), Implies(ls.play[m], ls.play5[m]))), Iff(ls.incr_clues[m],
And(ls.discard_any[m], NotEquals(ls.clues[m - 1], Int(16 if instance.clue_starved else 8)),
Implies(ls.play[m], ls.play5[m]))),
# change of ls.clues # change of ls.clues
Implies(And(Not(ls.discard_any[m]), Not(ls.dummyturn[m])), Implies(And(Not(ls.discard_any[m]), Not(ls.dummyturn[m])),
Equals(ls.clues[m], ls.clues[m-1] - (2 if instance.clue_starved else 1))), Equals(ls.clues[m], ls.clues[m - 1] - (2 if instance.clue_starved else 1))),
Implies(ls.incr_clues[m], Equals(ls.clues[m], ls.clues[m-1] + 1)), Implies(ls.incr_clues[m], Equals(ls.clues[m], ls.clues[m - 1] + 1)),
Implies(And(Or(ls.discard_any[m], ls.dummyturn[m]), Not(ls.incr_clues[m])), Equals(ls.clues[m], ls.clues[m-1])), Implies(And(Or(ls.discard_any[m], ls.dummyturn[m]), Not(ls.incr_clues[m])),
Equals(ls.clues[m], ls.clues[m - 1])),
# change of pace # change of pace
Implies(And(ls.discard_any[m], Or(ls.strike[m], Not(ls.play[m]))), Equals(ls.pace[m], ls.pace[m-1] - 1)), Implies(And(ls.discard_any[m], Or(ls.strike[m], Not(ls.play[m]))), Equals(ls.pace[m], ls.pace[m - 1] - 1)),
Implies(Or(Not(ls.discard_any[m]), And(Not(ls.strike[m]), ls.play[m])), Equals(ls.pace[m], ls.pace[m-1])), Implies(Or(Not(ls.discard_any[m]), And(Not(ls.strike[m]), ls.play[m])), Equals(ls.pace[m], ls.pace[m - 1])),
# pace is nonnegative # pace is nonnegative
GE(ls.pace[m], Int(min_pace)), GE(ls.pace[m], Int(min_pace)),
@ -218,85 +225,95 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int
# It's easy to see that if there is any solution to the instance, then there is also one where we only strike at 8 clues # It's easy to see that if there is any solution to the instance, then there is also one where we only strike at 8 clues
# (or not at all) -> Just strike later if neccessary # (or not at all) -> Just strike later if neccessary
# So, we decrease the solution space with this formulation, but do not change whether it's empty or not # So, we decrease the solution space with this formulation, but do not change whether it's empty or not
Iff(ls.strike[m], And(ls.discard_any[m], Not(ls.play[m]), Equals(ls.clues[m-1], Int(16 if instance.clue_starved else 8)))), Iff(ls.strike[m],
And(ls.discard_any[m], Not(ls.play[m]), Equals(ls.clues[m - 1], Int(16 if instance.clue_starved else 8)))),
# change of strikes # change of strikes
*[Iff(ls.strikes[m][i], Or(ls.strikes[m-1][i], And(ls.strikes[m-1][i-1], ls.strike[m]))) for i in range(1, instance.num_strikes + 1)], *[Iff(ls.strikes[m][i], Or(ls.strikes[m - 1][i], And(ls.strikes[m - 1][i - 1], ls.strike[m]))) for i in
range(1, instance.num_strikes + 1)],
# less than 0 clues not allowed # less than 0 clues not allowed
Implies(Not(ls.discard_any[m]), Or(GE(ls.clues[m-1], Int(1)), ls.dummyturn[m])), Implies(Not(ls.discard_any[m]), Or(GE(ls.clues[m - 1], Int(1)), ls.dummyturn[m])),
# we can only draw card i if the last ls.drawn card was i-1 # we can only draw card i if the last ls.drawn card was i-1
*[Implies(ls.draw[m][i], Or(And(ls.draw[m0][i-1], *[Not(ls.draw_any[m1]) for m1 in range(m0+1, m)]) for m0 in range(max(first_turn - 1, m-9), m))) for i in range(game_state.progress, instance.deck_size)], *[Implies(ls.draw[m][i], Or(
And(ls.draw[m0][i - 1], *[Not(ls.draw_any[m1]) for m1 in range(m0 + 1, m)]) for m0 in
range(max(first_turn - 1, m - 9), m))) for i in range(game_state.progress, instance.deck_size)],
# we can only draw at most one card (NOTE: redundant, FIXME: avoid quadratic formula) # we can only draw at most one card (NOTE: redundant, FIXME: avoid quadratic formula)
AtMostOne(ls.draw[m][i] for i in range(game_state.progress, instance.deck_size)), AtMostOne(ls.draw[m][i] for i in range(game_state.progress, instance.deck_size)),
# we can only discard a card if we drew it earlier... # we can only discard a card if we drew it earlier...
*[Implies(ls.discard[m][i], Or(ls.draw[m0][i] for m0 in range(m-instance.num_players, first_turn - 1, -instance.num_players))) for i in range(game_state.progress, instance.deck_size)], *[Implies(ls.discard[m][i],
Or(ls.draw[m0][i] for m0 in range(m - instance.num_players, first_turn - 1, -instance.num_players)))
for i in range(game_state.progress, instance.deck_size)],
# ...or if it was part of the initial hand # ...or if it was part of the initial hand
*[Not(ls.discard[m][i]) for i in range(0, game_state.progress) if i not in starting_hands[m % instance.num_players] ], *[Not(ls.discard[m][i]) for i in range(0, game_state.progress) if
i not in starting_hands[m % instance.num_players]],
# we can only discard a card if we did not discard it yet # we can only discard a card if we did not discard it yet
*[Implies(ls.discard[m][i], And(Not(ls.discard[m0][i]) for m0 in range(m-instance.num_players, first_turn - 1, -instance.num_players))) for i in range(instance.deck_size)], *[Implies(ls.discard[m][i], And(
Not(ls.discard[m0][i]) for m0 in range(m - instance.num_players, first_turn - 1, -instance.num_players)))
for i in range(instance.deck_size)],
# we can only discard at most one card (FIXME: avoid quadratic formula) # we can only discard at most one card (FIXME: avoid quadratic formula)
AtMostOne(ls.discard[m][i] for i in range(instance.deck_size)), AtMostOne(ls.discard[m][i] for i in range(instance.deck_size)),
# we can only play a card if it matches the progress # we can only play a card if it matches the progress
*[Implies( *[Implies(
And(ls.discard[m][i], ls.play[m]), And(ls.discard[m][i], ls.play[m]),
And( And(
Not(ls.progress[m-1][instance.deck[i].suitIndex, instance.deck[i].rank]), Not(ls.progress[m - 1][instance.deck[i].suitIndex, instance.deck[i].rank]),
ls.progress[m-1][instance.deck[i].suitIndex, instance.deck[i].rank-1 ] ls.progress[m - 1][instance.deck[i].suitIndex, instance.deck[i].rank - 1]
) )
) )
for i in range(instance.deck_size) for i in range(instance.deck_size)
], ],
# change of progress # change of progress
*[ *[
Iff( Iff(
ls.progress[m][s, r], ls.progress[m][s, r],
Or( Or(
ls.progress[m-1][s, r], ls.progress[m - 1][s, r],
And(ls.play[m], Or(ls.discard[m][i] And(ls.play[m], Or(ls.discard[m][i]
for i in range(0, instance.deck_size) for i in range(0, instance.deck_size)
if instance.deck[i] == DeckCard(s, r) )) if instance.deck[i] == hanab_game.DeckCard(s, r)))
)
) )
)
for s in range(0, instance.num_suits) for s in range(0, instance.num_suits)
for r in range(1, 6) for r in range(1, 6)
], ],
# extra round bool # extra round bool
Iff(ls.extraround[m], Or(ls.extraround[m-1], ls.draw[m-1][instance.deck_size-1])), Iff(ls.extraround[m], Or(ls.extraround[m - 1], ls.draw[m - 1][instance.deck_size - 1])),
# dummy turn bool # dummy turn bool
*[Iff(ls.dummyturn[m], Or(ls.dummyturn[m-1], ls.draw[m-1 - instance.num_players][instance.deck_size-1])) for i in range(0,1) if m >= instance.num_players] *[Iff(ls.dummyturn[m], Or(ls.dummyturn[m - 1], ls.draw[m - 1 - instance.num_players][instance.deck_size - 1]))
for i in range(0, 1) if m >= instance.num_players]
) )
win = And( win = And(
# maximum progress at each color # maximum progress at each color
*[ls.progress[instance.max_winning_moves-1][s, 5] for s in range(0, instance.num_suits)], *[ls.progress[instance.max_winning_moves - 1][s, 5] for s in range(0, instance.num_suits)],
# played every color/value combination (NOTE: redundant, but makes solving faster) # played every color/value combination (NOTE: redundant, but makes solving faster)
*[ *[
Or( Or(
And(ls.discard[m][i], ls.play[m]) And(ls.discard[m][i], ls.play[m])
for m in range(first_turn, instance.max_winning_moves) for m in range(first_turn, instance.max_winning_moves)
for i in range(instance.deck_size) for i in range(instance.deck_size)
if game_state.deck[i] == DeckCard(s, r) if game_state.deck[i] == hanab_game.DeckCard(s, r)
) )
for s in range(0, instance.num_suits) for s in range(0, instance.num_suits)
for r in range(1, 6) for r in range(1, 6)
if r > game_state.stacks[s] if r > game_state.stacks[s]
] ]
) )
constraints = And(*[valid_move(m) for m in range(first_turn, instance.max_winning_moves)], win) constraints = And(*[valid_move(m) for m in range(first_turn, instance.max_winning_moves)], win)
# print('Solving instance with {} variables, {} nodes'.format(len(get_atoms(constraints)), get_formula_size(constraints))) # print('Solving instance with {} variables, {} nodes'.format(len(get_atoms(constraints)), get_formula_size(constraints)))
model = get_model(constraints) model = get_model(constraints)
if model: if model:
@ -304,11 +321,11 @@ def solve_sat(starting_state: GameState | HanabiInstance, min_pace: Optional[int
solution = evaluate_model(model, copy.deepcopy(game_state), ls) solution = evaluate_model(model, copy.deepcopy(game_state), ls)
return True, solution return True, solution
else: else:
#conj = list(conjunctive_partition(constraints)) # conj = list(conjunctive_partition(constraints))
#print('statements: {}'.format(len(conj))) # print('statements: {}'.format(len(conj)))
#ucore = get_unsat_core(conj) # ucore = get_unsat_core(conj)
#print('unsat core size: {}'.format(len(ucore))) # print('unsat core size: {}'.format(len(ucore)))
#for f in ucore: # for f in ucore:
# print(f.serialize()) # print(f.serialize())
return False, None return False, None
@ -322,24 +339,29 @@ def log_model(model, cur_game_state, ls: Literals):
logger.debug('=== move {} ==='.format(m)) logger.debug('=== move {} ==='.format(m))
logger.debug('clues: {}'.format(model.get_py_value(ls.clues[m]))) logger.debug('clues: {}'.format(model.get_py_value(ls.clues[m])))
logger.debug('strikes: ' + ''.join(str(i) for i in range(1, 3) if model.get_py_value(ls.strikes[m][i]))) logger.debug('strikes: ' + ''.join(str(i) for i in range(1, 3) if model.get_py_value(ls.strikes[m][i])))
logger.debug('draw: ' + ', '.join('{}: {}'.format(i, deck[i]) for i in range(cur_game_state.progress, cur_game_state.instance.deck_size) if model.get_py_value(ls.draw[m][i]))) logger.debug('draw: ' + ', '.join(
logger.debug('discard: ' + ', '.join('{}: {}'.format(i, deck[i]) for i in range(cur_game_state.instance.deck_size) if model.get_py_value(ls.discard[m][i]))) '{}: {}'.format(i, deck[i]) for i in range(cur_game_state.progress, cur_game_state.instance.deck_size) if
model.get_py_value(ls.draw[m][i])))
logger.debug('discard: ' + ', '.join(
'{}: {}'.format(i, deck[i]) for i in range(cur_game_state.instance.deck_size) if
model.get_py_value(ls.discard[m][i])))
logger.debug('pace: {}'.format(model.get_py_value(ls.pace[m]))) logger.debug('pace: {}'.format(model.get_py_value(ls.pace[m])))
for s in range(0, cur_game_state.instance.num_suits): for s in range(0, cur_game_state.instance.num_suits):
logger.debug('progress {}: '.format(COLOR_INITIALS[s]) + ''.join(str(r) for r in range(1, 6) if model.get_py_value(ls.progress[m][s, r]))) logger.debug('progress {}: '.format(constants.COLOR_INITIALS[s]) + ''.join(
str(r) for r in range(1, 6) if model.get_py_value(ls.progress[m][s, r])))
flags = ['discard_any', 'draw_any', 'play', 'play5', 'incr_clues', 'strike', 'extraround', 'dummyturn'] flags = ['discard_any', 'draw_any', 'play', 'play5', 'incr_clues', 'strike', 'extraround', 'dummyturn']
logger.debug(', '.join(f for f in flags if model.get_py_value(getattr(ls, f)[m]))) logger.debug(', '.join(f for f in flags if model.get_py_value(getattr(ls, f)[m])))
# given the initial game state and the model found by the SAT solver, # given the initial game state and the model found by the SAT solver,
# evaluates the model to produce a full game history # evaluates the model to produce a full game history
def evaluate_model(model, cur_game_state: GameState, ls: Literals) -> GameState: def evaluate_model(model, cur_game_state: hanab_game.GameState, ls: Literals) -> hanab_game.GameState:
for m in range(len(cur_game_state.actions), cur_game_state.instance.max_winning_moves): for m in range(len(cur_game_state.actions), cur_game_state.instance.max_winning_moves):
if model.get_py_value(ls.dummyturn[m]) or cur_game_state.is_over(): if model.get_py_value(ls.dummyturn[m]) or cur_game_state.is_over():
break break
if model.get_py_value(ls.discard_any[m]): if model.get_py_value(ls.discard_any[m]):
card_idx = next(i for i in range(0, cur_game_state.instance.deck_size) if model.get_py_value(ls.discard[m][i])) card_idx = next(
i for i in range(0, cur_game_state.instance.deck_size) if model.get_py_value(ls.discard[m][i]))
if model.get_py_value(ls.play[m]) or model.get_py_value(ls.strike[m]): if model.get_py_value(ls.play[m]) or model.get_py_value(ls.strike[m]):
cur_game_state.play(card_idx) cur_game_state.play(card_idx)
else: else:
@ -348,39 +370,3 @@ def evaluate_model(model, cur_game_state: GameState, ls: Literals) -> GameState:
cur_game_state.clue() cur_game_state.clue()
return cur_game_state return cur_game_state
def run_deck():
puzzle = True
if puzzle:
deck_str = 'p5 p3 b4 r5 y4 y4 y5 r4 b2 y2 y3 g5 g2 g3 g4 p4 r3 b2 b3 b3 p4 b1 p2 b1 b1 p2 p1 p1 g1 r4 g1 r1 r3 r1 g1 r1 p1 b4 p3 g2 g3 g4 b5 y1 y1 y1 r2 r2 y2 y3'
deck = [DeckCard(COLOR_INITIALS.index(c[0]), int(c[1])) for c in deck_str.split(" ")]
num_p = 5
else:
deck_str = "15gfvqluvuwaqnmrkpkaignlaxpjbmsprksfcddeybfixchuhtwo"
deck_str = "15diuknfwhqbplsrlkxjuvfbwyacoaxgtudcerskqfnhpgampmiv"
deck_str = "15jdxlpobvikrnhkslcuwggimtphafquqfvcwadampxkeyfrbnsu"
deck = decompress_deck(deck_str)
num_p = 6
print(deck)
gs = GameState(HanabiInstance(deck, num_p))
if puzzle:
gs.play(2)
else:
strat = GreedyStrategy(gs)
for _ in range(17):
strat.make_move()
solvable, sol = solve_sat(gs, 0)
if solvable:
print(sol)
print(link(sol))
else:
print('unsolvable')
if __name__ == "__main__":
run_deck()

21
test.py
View file

@ -1,18 +1,9 @@
import json from hanabi.live.variants import Variant
from hanabi.live.variants import Suit
import alive_progress from hanabi.live.download_data import download_games, detailed_export_game
import requests from hanabi.database.database import conn, cur
from variants import Variant
from variants import Suit, variant_name
from site_api import *
from download_data import download_games, detailed_export_game
from check_game import check_game
from compress import link
from database.database import conn, cur
from database.init_database import init_database_tables, populate_static_tables
from hanabi.hanabi_cli import hanabi_cli
def find_double_dark_games(): def find_double_dark_games():
cur.execute("SELECT variants.id, variants.name, count(suits.id) from variants " cur.execute("SELECT variants.id, variants.name, count(suits.id) from variants "
@ -74,6 +65,8 @@ def export_all_seeds():
if __name__ == "__main__": if __name__ == "__main__":
hanabi_cli()
exit(0)
find_double_dark_games() find_double_dark_games()
exit(0) exit(0)
var_id = 964532 var_id = 964532