Compare commits

..

No commits in common. "main" and "cheating-bot" have entirely different histories.

33 changed files with 630 additions and 1435 deletions

View file

@ -48,27 +48,23 @@ source venv/bin/activate
pip install -r requirements.txt
```
### SAT-solver
After installing python, you should have installed `pysmt` with it, an interface to use SAT-solvers with python.
We still need a solver, for this, run
```
$ pysmt-install --help
// Pick a solver, e.g. z3 works
$ pysmt-install --z3
```
### PostgreSQL
You need to install PostgreSQL on your system, for installation instructions refer to your distribution.
Create a new database and user, for example:
```
$ sudo -iu postgres
$ psql
# CREATE USER hanabi WITH PASSWORD 'Insert password here';
# CREATE DATABASE "hanab-live" with owner "hanabi";
# CREATE DATABASE "hanab-live";
# \c hanab-live
# CREATE USER hanabi WITH PASSWORD '1234';
# GRANT ALL PRIVILEGES ON DATABASE "hanab-live" TO hanabi;
# GRANT USAGE ON SCHEMA public TO hanabi;
# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO hanabi;
# GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO hanabi;
```
Put the connection parameters in a config file (for the format, see `example_config.yaml`).
This should be located at your system default for the application `hanabi-suite`,
on POSIX systems this should be `~/.config/hanabi-suite/config.yaml`.
on POSIX systems this should be `~/.config/hanabi-suit/config.yaml`.
## Usage of stuff that already works:

58
cheating_strategy Normal file
View file

@ -0,0 +1,58 @@
Just some preliminary notes on how to implement a cheating bot
Based on https://github.com/fpvandoorn/hanabi
card types:
trash, playable, useful (dispensable), critical
pace := #(cards left in deck) + #players - #(cards left to play)
modified_pace := pace - #(players without useful cards)
endgame := #(cards left to play) - #(cards left in deck) = #players - pace
-> endgame >= 0 iff pace <= #players
in_endgame := endgame >= 0
discard_badness(card) :=
1 if trash
8 - #players if card useful but duplicate visible # TODO: should probably account for rank of card as well, currently, lowest one is chosen
80 - 10*rank if card is not critical but currently unique # this ensures we prefer to discard higher ranked cards
600 - 100*rank if only criticals in hand # essentially not relevant, since we are currently only optimizing for full score
Algorithm:
if (have playable card):
if (in endgame) and not (in extraround):
stall in the following situations:
- we have exactly one useful card, it is a 5, and a copy of each useful card is visible
- we have exactly one useful card, it is a 4, the player with the matching 5 has another critical card to play
- we have exactly one useful card (todo: maybe use critical here?), the deck has size 1, someone else has 2 crits
- we have exactly one playable card, it is a 4, and a further useful card, but the playable is redistributable in the following sense:
the other playing only has this one useful card, and the player holding the matching 5 sits after the to-be-redistributed player
- sth else that seems messy and is currently not understood, ignored for now
TODO: maybe introduce some midgame stalls here, since we know the deck?
play a card, matching the first of the following criteria. if several cards match, recurse with this set of cards
- if in extraround, play crit
- if in second last round and we have 2 crits, play crit
- play card with lowest rank
- play a critical card
- play unique card, i.e. not visible
- lowest suit index (for determinancy)
if 8 hints:
give a hint
if 0 hints:
discard card with lowest badness
stall in the following situations:
- #(cards in deck) == 2 and (card of rank 3 or lower is missing) and we have the connecting card
- #clues >= 8 - #(useful cards in hand), there are useful cards in the deck and either:
- the next player has no useful cards at all
- we have two more crits than the next player and they have trash
- we are in endgame and the deck only contains one card
- it is possible that no-one discards in the following round and we are not waiting for a card whose rank is smaller than pace // TODO: this feels like a weird condition
discard if (discard badness) + #hints < 10
stall if someone has a better discard

View file

@ -1,5 +1,4 @@
import argparse
import json
from typing import Optional
import verboselogs
@ -9,11 +8,8 @@ from hanabi.live import variants
from hanabi.live import check_game
from hanabi.live import download_data
from hanabi.live import compress
from hanabi.live import instance_finder
from hanabi.hanab_game import GameState
from hanabi.database import init_database
from hanabi.database import global_db_connection_manager
from hanabi.database.games_db_interface import load_instance
"""
Commands supported:
@ -25,16 +21,6 @@ analyze single game
"""
def subcommand_show(seed: str):
inst = load_instance(seed)
if inst is None:
logger.info("This seed does not exist in the database")
return
game = GameState(inst)
game.terminate()
l = compress.link(game)
logger.info("Deck: {}\nReplay Link: {}".format(inst.deck, l))
def subcommand_analyze(game_id: int, download: bool = False):
if download:
@ -51,13 +37,6 @@ def subcommand_analyze(game_id: int, download: bool = False):
)
def subcommand_decompress(game_link: str):
parts = game_link.split('replay-json/')
game_str = parts[-1].rstrip('/')
game = compress.decompress_game_state(game_str)
print(json.dumps(game.to_json()))
def subcommand_init(force: bool, populate: bool):
tables = init_database.get_existing_tables()
if len(tables) > 0 and not force:
@ -98,20 +77,9 @@ def subcommand_download(
logger.info("Successfully exported games for all variants")
def subcommand_solve(var_id: int, seed_class: int, num_players: Optional[int], timeout: int, num_threads: int):
instance_finder.solve_unknown_seeds(var_id, seed_class, num_players, timeout, num_threads)
def subcommand_gen_config():
global_db_connection_manager.create_config_file()
def add_show_seed_subparser(subparsers):
parser = subparsers.add_parser(
'show',
help='Show a seed and output a replay link for it'
)
parser.add_argument('seed', type=str)
def add_init_subparser(subparsers):
parser = subparsers.add_parser(
@ -161,19 +129,6 @@ def add_config_gen_subparser(subparsers):
parser = subparsers.add_parser('gen-config', help='Generate config file at default location')
def add_solve_subparser(subparsers):
parser = subparsers.add_parser('solve', help='Seed solving')
parser.add_argument('var_id', type=int, help='Variant id to solve instances from.', default=0)
parser.add_argument('--timeout', '-t', type=int, help='Timeout [s] for individual seeds.', default=150)
parser.add_argument('--class', '-c', type=int, dest='seed_class', help='Class of seed to analyze. 0 stands for hanab.live seeds', default=0)
parser.add_argument('--num_players', '-n', type=int, help='Restrict to number of players. If not specified, all player counts are analyzed.', default = None)
parser.add_argument('--num_threads', '-p', type=int, help='Number of threads to solve with.', default=4)
def add_decompress_subparser(subparsers):
parser = subparsers.add_parser('decompress', help='Decompress a hanab.live JSON-encoded replay link')
parser.add_argument('game_link', type=str)
def main_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog='hanabi_suite',
@ -186,9 +141,6 @@ def main_parser() -> argparse.ArgumentParser:
add_analyze_subparser(subparsers)
add_download_subparser(subparsers)
add_config_gen_subparser(subparsers)
add_solve_subparser(subparsers)
add_decompress_subparser(subparsers)
add_show_seed_subparser(subparsers)
return parser
@ -199,10 +151,7 @@ def hanabi_cli():
'analyze': subcommand_analyze,
'init': subcommand_init,
'download': subcommand_download,
'gen-config': subcommand_gen_config,
'solve': subcommand_solve,
'decompress': subcommand_decompress,
'show': subcommand_show
'gen-config': subcommand_gen_config
}[args.command]
if args.command != 'gen-config':

View file

@ -87,8 +87,8 @@ class DBConnectionManager:
logger.info("Initialised default config file {}".format(self.config_file))
def connect(self):
conn = psycopg2.connect("dbname='{}' user='{}' password='{}' host='localhost' sslmode='disable'".format(
self.db_name, self.db_user, self.db_pass),
conn = psycopg2.connect("dbname='{}' user='{}' password='{}' host='localhost'".format(
self.db_name, self.db_user, self.db_pass)
)
cur = conn.cursor()
self.lazy_conn.set_conn(conn)

View file

@ -0,0 +1,49 @@
DROP TABLE IF EXISTS seeds CASCADE;
CREATE TABLE seeds (
seed TEXT NOT NULL PRIMARY KEY,
num_players SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
deck VARCHAR(62) NOT NULL,
starting_player SMALLINT NOT NULL DEFAULT 0,
feasible BOOLEAN DEFAULT NULL,
max_score_theoretical SMALLINT
);
CREATE INDEX seeds_variant_idx ON seeds (variant_id);
DROP TABLE IF EXISTS games CASCADE;
CREATE TABLE games (
id INT PRIMARY KEY,
seed TEXT NOT NULL REFERENCES seeds,
num_players SMALLINT NOT NULL,
score SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
deck_plays BOOLEAN,
one_extra_card BOOLEAN,
one_less_card BOOLEAN,
all_or_nothing BOOLEAN,
detrimental_characters BOOLEAN,
num_turns SMALLINT,
actions TEXT
);
CREATE INDEX games_seed_score_idx ON games (seed, score);
CREATE INDEX games_var_seed_idx ON games (variant_id, seed);
CREATE INDEX games_player_idx ON games (num_players);
DROP TABLE IF EXISTS score_upper_bounds;
CREATE TABLE score_upper_bounds (
seed TEXT NOT NULL REFERENCES seeds ON DELETE CASCADE,
score_upper_bound SMALLINT NOT NULL,
reason SMALLINT NOT NULL,
UNIQUE (seed, reason)
);
DROP TABLE IF EXISTS score_lower_bounds;
CREATE TABLE score_lower_bounds (
seed TEXT NOT NULL REFERENCES seeds ON DELETE CASCADE,
score_lower_bound SMALLINT NOT NULL,
game_id INT REFERENCES games ON DELETE CASCADE,
actions TEXT,
CHECK (num_nonnulls(game_id, actions) = 1)
);

View file

@ -192,7 +192,7 @@ def _populate_variants(variants):
def _download_json_files():
logger.verbose("Downloading JSON files for suits and variants from github...")
base_url = "https://raw.githubusercontent.com/Hanabi-Live/hanabi-live/main/packages/game/src/json"
base_url = "https://raw.githubusercontent.com/Hanabi-Live/hanabi-live/main/packages/data/src/json"
cache_dir = Path(platformdirs.user_cache_dir(constants.APP_NAME))
cache_dir.mkdir(parents=True, exist_ok=True)
data = {}
@ -204,7 +204,7 @@ def _download_json_files():
url = base_url + "/" + file.name
response = requests.get(url)
if not response.status_code == 200:
err_msg = "Could not download initialization file {} from github (tried url {})".format(file.name, url)
err_msg = "Could not download initialization file {} from github (tried url {})".format(filename, url)
logger.error(err_msg)
raise RuntimeError(err_msg)
file.write_text(response.text)

View file

@ -25,12 +25,6 @@ class DeckCard:
raise ParseError("No rank specified in deck_card")
return DeckCard(suit_index, rank)
def to_json(self):
return {
"suitIndex": self.suitIndex,
"rank": self.rank
}
def colorize(self):
color = ["green", "blue", "magenta", "yellow", "white", "cyan"][self.suitIndex]
return colored(str(self), color)
@ -39,8 +33,6 @@ class DeckCard:
return self.suitIndex == other.suitIndex and self.rank == other.rank
def __repr__(self):
if self.suitIndex == 0 and self.rank == 0:
return "kt"
return constants.COLOR_INITIALS[self.suitIndex] + str(self.rank)
def __hash__(self):
@ -92,13 +84,6 @@ class Action:
action_value
)
def to_json(self):
return {
"type": self.type.value,
"target": self.target,
"value": self.value
}
def __repr__(self):
match self.type:
case ActionType.Play:
@ -254,26 +239,6 @@ class GameState:
self.clues -= 1
self._make_turn()
def make_action(self, action):
match action.type:
case ActionType.ColorClue | ActionType.RankClue:
assert self.clues >= 1
self.actions.append(action)
self.clues -= 1
self._make_turn()
# TODO: could check that the clue specified is in fact legal
case ActionType.Play:
self.play(action.target)
case ActionType.Discard:
self.discard(action.target)
case ActionType.EndGame | ActionType.VoteTerminate:
self.actions.append(action)
self.over = True
def terminate(self):
action = Action(ActionType.EndGame, 0, 0)
self.make_action(action)
# Forward some properties of the underlying instance
@property
def num_players(self):
@ -299,10 +264,6 @@ class GameState:
def deck_size(self):
return self.instance.deck_size
@property
def draw_pile_size(self):
return self.deck_size - self.progress
# Properties of GameState
def is_over(self):
@ -326,23 +287,6 @@ class GameState:
# Utilities
def is_playable(self, card: DeckCard):
return self.stacks[card.suitIndex] + 1 == card.rank
def is_trash(self, card: DeckCard):
return self.stacks[card.suitIndex] >= card.rank
def is_critical(self, card: DeckCard):
if card.rank == 5:
return True
if self.is_trash(card):
return False
count = 0
for hand in self.hands:
count += hand.count(card)
count += self.deck[self.progress:].count(card)
return count == 1
def holding_players(self, card):
for (player, hand) in enumerate(self.hands):
if card in hand:
@ -357,9 +301,9 @@ class GameState:
)
)
return {
"deck": [card.to_json() for card in self.instance.deck],
"deck": self.instance.deck,
"players": self.instance.player_names,
"actions": [action.to_json() for action in self.actions],
"actions": self.actions,
"first_player": 0,
"options": {
"variant": "No Variant",

View file

@ -8,8 +8,6 @@ from hanabi.live import hanab_live
from hanabi.live import compress
from hanabi.solvers import sat
from hanabi.database import games_db_interface
# returns minimal number T of turns (from game) after which instance was infeasible
# and a replay achieving maximum score while following the replay for the first (T-1) turns:
@ -21,15 +19,24 @@ from hanabi.database import games_db_interface
def check_game(game_id: int) -> Tuple[int, hanab_game.GameState]:
logger.debug("Analysing game {}".format(game_id))
with database.conn.cursor() as cur:
cur.execute("SELECT games.num_players, score, games.variant_id, starting_player FROM games "
cur.execute("SELECT games.num_players, deck, actions, score, games.variant_id, starting_player FROM games "
"INNER JOIN seeds ON seeds.seed = games.seed "
"WHERE games.id = (%s)",
(game_id,)
)
res = cur.fetchone()
if res is None:
raise ValueError("No game associated with id {} in database.".format(game_id))
(num_players, score, variant_id, starting_player) = res
instance, actions = games_db_interface.load_game_parts(game_id)
(num_players, compressed_deck, compressed_actions, score, variant_id, starting_player) = res
deck = compress.decompress_deck(compressed_deck)
actions = compress.decompress_actions(compressed_actions)
instance = hanab_live.HanabLiveInstance(
deck,
num_players,
variant_id=variant_id,
starting_player=starting_player
)
# check if the instance is already won
if instance.max_score == score:

View file

@ -177,7 +177,7 @@ def compress_game_state(state: Union[hanab_game.GameState, hanab_live.HanabLiveG
return with_dashes
def decompress_game_state(game_str: str) -> hanab_live.HanabLiveGameState:
def decompress_game_state(game_str: str) -> hanab_game.GameState:
game_str = game_str.replace("-", "")
parts = game_str.split(",")
if not len(parts) == 3:
@ -211,8 +211,8 @@ def decompress_game_state(game_str: str) -> hanab_live.HanabLiveGameState:
except ValueError:
raise ValueError("Expected variant id, found: {}".format(variant_id))
instance = hanab_live.HanabLiveInstance(deck, num_players, variant_id)
game = hanab_live.HanabLiveGameState(instance)
instance = hanab_game.HanabiInstance(deck, num_players)
game = hanab_game.GameState(instance)
# TODO: game is not in consistent state
game.actions = actions
@ -221,4 +221,4 @@ def decompress_game_state(game_str: str) -> hanab_live.HanabLiveGameState:
def link(game_state: hanab_game.GameState) -> str:
compressed = compress_game_state(game_state)
return "https://hanab.live/replay-json/{}".format(compressed)
return "https://hanab.live/shared-replay-json/{}".format(compressed)

View file

@ -1,21 +1,18 @@
import alive_progress
from typing import Dict, Optional, List
from typing import Dict, Optional
import psycopg2.errors
import psycopg2.extras
import platformdirs
import unidecode
from hanabi import hanab_game
from hanabi import constants
from hanabi import logger
from hanabi import database
from hanabi.live import site_api
from hanabi.live import compress
from hanabi.live import variants
from hanabi.live import hanab_live
from hanabi.database import games_db_interface
class GameExportError(ValueError):
@ -52,29 +49,6 @@ class GameExportInvalidNumberOfPlayersError(GameExportInvalidFormatError):
)
def ensure_users_in_db_and_get_ids(usernames: List[str]):
normalized_usernames = [unidecode.unidecode(username) for username in usernames]
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO users (username, normalized_username)"
"VALUES %s "
"ON CONFLICT (username) DO NOTHING ",
zip(usernames, normalized_usernames)
)
# To only do one DB query, we sort by the normalized username.
ids = []
for username in usernames:
database.cur.execute(
"SELECT id FROM users "
"WHERE username = %s",
(username,)
)
(id, ) = database.cur.fetchone()
ids.append(id)
return ids
#
def detailed_export_game(
game_id: int
@ -120,19 +94,12 @@ def detailed_export_game(
options = game_json.get('options', {})
var_id = var_id or variants.variant_id(options.get('variant', 'No Variant'))
timed = options.get('timed', False)
time_base = options.get('timeBase', 0)
time_per_turn = options.get('timePerTurn', 0)
speedrun = options.get('speedrun', False)
card_cycle = options.get('cardCycle', False)
deck_plays = options.get('deckPlays', False)
empty_clues = options.get('emptyClues', False)
one_extra_card = options.get('oneExtraCard', False)
one_less_card = options.get('oneLessCard', False)
all_or_nothing = options.get('allOrNothing', False)
detrimental_characters = options.get('detrimentalCharacters', False)
starting_player = options.get('startingPlayer', 0)
detrimental_characters = options.get('detrimentalCharacters', False)
try:
actions = [hanab_game.Action.from_json(action) for action in game_json.get('actions', [])]
@ -164,53 +131,44 @@ def detailed_export_game(
game.make_action(action)
score = game.score
try:
compressed_deck = compress.compress_deck(deck)
except compress.InvalidFormatError as e:
logger.error("Failed to compress deck while exporting game {}: {}".format(game_id, deck))
raise GameExportInvalidFormatError(game_id, "Failed to compress deck") from e
try:
compressed_actions = compress.compress_actions(actions)
except compress.InvalidFormatError as e:
logger.error("Failed to compress actions while exporting game {}".format(game_id))
raise GameExportInvalidFormatError(game_id, "Failed to compress actions") from e
if not seed_exists:
database.cur.execute(
"INSERT INTO seeds (seed, num_players, starting_player, variant_id)"
"VALUES (%s, %s, %s, %s)"
"INSERT INTO seeds (seed, num_players, starting_player, variant_id, deck)"
"VALUES (%s, %s, %s, %s, %s)"
"ON CONFLICT (seed) DO NOTHING",
(seed, num_players, starting_player, var_id)
(seed, num_players, starting_player, var_id, compressed_deck)
)
logger.debug("New seed {} imported.".format(seed))
games_db_interface.store_deck_for_seed(seed, deck)
database.cur.execute(
"INSERT INTO games ("
"id, num_players, starting_player, variant_id, timed, time_base, time_per_turn, speedrun, card_cycle, "
"deck_plays, empty_clues, one_extra_card, one_less_card,"
"all_or_nothing, detrimental_characters, seed, score"
"id, num_players, score, seed, variant_id, deck_plays, one_extra_card, one_less_card,"
"all_or_nothing, detrimental_characters, actions"
")"
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
"ON CONFLICT (id) DO UPDATE SET ("
"timed, time_base, time_per_turn, speedrun, card_cycle, deck_plays, empty_clues, one_extra_card,"
"all_or_nothing, detrimental_characters"
"deck_plays, one_extra_card, one_less_card, all_or_nothing, actions, detrimental_characters"
") = ("
"EXCLUDED.timed, EXCLUDED.time_base, EXCLUDED.time_per_turn, EXCLUDED.speedrun, EXCLUDED.card_cycle, "
"EXCLUDED.deck_plays, EXCLUDED.empty_clues, EXCLUDED.one_extra_card,"
"EXCLUDED.all_or_nothing, EXCLUDED.detrimental_characters"
"EXCLUDED.deck_plays, EXCLUDED.one_extra_card, EXCLUDED.one_less_card, EXCLUDED.all_or_nothing,"
"EXCLUDED.actions, EXCLUDED.detrimental_characters"
")",
(
game_id, num_players, starting_player, var_id, timed, time_base, time_per_turn, speedrun, card_cycle,
deck_plays, empty_clues, one_extra_card, one_less_card,
all_or_nothing, detrimental_characters, seed, score
game_id, num_players, score, seed, var_id, deck_plays, one_extra_card, one_less_card,
all_or_nothing, detrimental_characters, compressed_actions
)
)
# Insert participants into database
ids = ensure_users_in_db_and_get_ids(players)
game_participant_values = []
for index, user_id in enumerate(ids):
game_participant_values.append((game_id, user_id, index))
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO game_participants (game_id, user_id, seat) VALUES %s "
"ON CONFLICT (game_id, user_id) DO UPDATE SET seat = excluded.seat",
game_participant_values
)
games_db_interface.store_actions(game_id, actions)
logger.debug("Imported game {}".format(game_id))
@ -234,8 +192,6 @@ def _process_game_row(game: Dict, var_id, export_all_games: bool = False):
return
# raise GameExportInvalidNumberOfPlayersError(game_id, num_players, users)
# Ensure users in database and find out their ids
if export_all_games:
detailed_export_game(game_id, score=score, var_id=var_id)
logger.debug("Imported game {}".format(game_id))
@ -250,26 +206,12 @@ def _process_game_row(game: Dict, var_id, export_all_games: bool = False):
"ON CONFLICT (id) DO NOTHING",
(game_id, seed, num_players, score, var_id)
)
database.cur.execute("RELEASE seed_insert")
except psycopg2.errors.ForeignKeyViolation:
# Sometimes, seed is not present in the database yet, then we will have to query the full game details
# (including the seed) to export it accordingly
database.cur.execute("ROLLBACK TO seed_insert")
detailed_export_game(game_id, score=score, var_id=var_id)
# Insert participants into database
ids = ensure_users_in_db_and_get_ids(users)
game_participant_values = []
for index, user_id in enumerate(ids):
game_participant_values.append((game_id, user_id, index))
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO game_participants (game_id, user_id, seat) VALUES %s "
"ON CONFLICT (game_id, user_id) DO UPDATE SET seat = excluded.seat",
game_participant_values
)
database.cur.execute("RELEASE seed_insert")
logger.debug("Imported game {}".format(game_id))

81
hanabi/live/hanab_live.py Normal file
View file

@ -0,0 +1,81 @@
from typing import List
from hanabi import hanab_game
from hanabi import constants
from hanabi.live import variants
class HanabLiveInstance(hanab_game.HanabiInstance):
def __init__(
self,
deck: List[hanab_game.DeckCard],
num_players: int,
variant_id: int,
one_extra_card: bool = False,
one_less_card: bool = False,
*args, **kwargs
):
assert 2 <= num_players <= 6
hand_size = constants.HAND_SIZES[num_players]
if one_less_card:
hand_size -= 1
if one_extra_card:
hand_size += 1
super().__init__(deck, num_players, hand_size=hand_size, *args, **kwargs)
self.variant_id = variant_id
self.variant = variants.Variant.from_db(self.variant_id)
@staticmethod
def select_standard_variant_id(instance: hanab_game.HanabiInstance):
err_msg = "Hanabi instance not supported by hanab.live, cannot convert to HanabLiveInstance: "
assert 3 <= instance.num_suits <= 6, \
err_msg + "Illegal number of suits ({}) found, must be in range [3,6]".format(instance.num_suits)
assert 0 <= instance.num_dark_suits <= 2, \
err_msg + "Illegal number of dark suits ({}) found, must be in range [0,2]".format(instance.num_dark_suits)
assert 4 <= instance.num_suits - instance.num_dark_suits, \
err_msg + "Illegal ratio of dark suits to suits, can have at most {} dark suits with {} total suits".format(
max(instance.num_suits - 4, 0), instance.num_suits
)
return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[instance.num_suits][instance.num_dark_suits]
class HanabLiveGameState(hanab_game.GameState):
def __init__(self, instance: HanabLiveInstance):
super().__init__(instance)
self.instance: HanabLiveInstance = instance
def make_action(self, action):
match action.type:
case hanab_game.ActionType.ColorClue | hanab_game.ActionType.RankClue:
assert(self.clues > 0)
self.actions.append(action)
self.clues -= self.instance.clue_increment
self._make_turn()
# TODO: could check that the clue specified is in fact legal
case hanab_game.ActionType.Play:
self.play(action.target)
case hanab_game.ActionType.Discard:
self.discard(action.target)
case hanab_game.ActionType.EndGame | hanab_game.ActionType.VoteTerminate:
self.over = True
def _waste_clue(self) -> hanab_game.Action:
for player in range(self.turn + 1, self.turn + self.num_players):
for card in self.hands[player % self.num_players]:
for rank in self.instance.variant.ranks:
if self.instance.variant.rank_touches(card, rank):
return hanab_game.Action(
hanab_game.ActionType.RankClue,
player % self.num_players,
rank
)
for color in range(self.instance.variant.num_colors):
if self.instance.variant.color_touches(card, color):
return hanab_game.Action(
hanab_game.ActionType.ColorClue,
player % self.num_players,
color
)
raise RuntimeError("Current game state did not permit any legal clue."
"This case is incredibly rare and currently not handled.")

View file

@ -0,0 +1,185 @@
from typing import Optional
import pebble.concurrent
import concurrent.futures
import traceback
import alive_progress
import threading
import time
from hanabi import logger
from hanabi.solvers.sat import solve_sat
from hanabi import database
from hanabi.live import download_data
from hanabi.live import compress
from hanabi import hanab_game
from hanabi.solvers import greedy_solver
from hanabi.solvers import deck_analyzer
from hanabi.live import variants
MAX_PROCESSES = 6
def update_trivially_feasible_games(variant_id):
variant: variants.Variant = variants.Variant.from_db(variant_id)
database.cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,))
seeds = database.cur.fetchall()
logger.verbose('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds)))
with alive_progress.alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar:
for (seed,) in seeds:
database.cur.execute(
"SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing, detrimental_characters "
"FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;",
(variant.max_score, seed)
)
res = database.cur.fetchall()
logger.debug("Checking seed {}: {:3} results".format(seed, len(res)))
for (game_id, a, b, c, d, e) in res:
if None in [a, b, c, d, e]:
logger.debug(' Game {} not found in database, exporting...'.format(game_id))
download_data.detailed_export_game(
game_id, var_id=variant_id, score=variant.max_score, seed_exists=True
)
database.cur.execute("SELECT deck_plays, one_extra_card, one_less_card, all_or_nothing, "
"detrimental_characters "
"FROM games WHERE id = (%s)",
(game_id,))
(a, b, c, d, e) = database.cur.fetchone()
else:
logger.debug(' Game {} already in database'.format(game_id))
valid = not any([a, b, c, d, e])
if valid:
print(a, b, c, d, e)
logger.verbose(
'Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id))
database.cur.execute("UPDATE seeds SET (feasible, max_score_theoretical) = (%s, %s) WHERE seed = "
"(%s)", (True, variant.max_score, seed))
database.cur.execute(
"INSERT INTO score_lower_bounds (seed, score_lower_bound, game_id) VALUES (%s, %s, %s)",
(seed, variant.max_score, game_id)
)
database.conn.commit()
break
else:
logger.verbose(' Cheaty game {} found'.format(game_id))
bar()
def get_decks_for_all_seeds():
cur = database.conn.database.cursor()
cur.execute("SELECT id "
"FROM games "
" INNER JOIN seeds "
" ON seeds.seed = games.seed"
" WHERE"
" seeds.deck is null"
" AND"
" games.id = ("
" SELECT id FROM games WHERE games.seed = seeds.seed LIMIT 1"
" )"
)
print("Exporting decks for all seeds")
res = cur.fetchall()
with alive_progress.alive_bar(len(res), title="Exporting decks") as bar:
for (game_id,) in res:
download_data.detailed_export_game(game_id)
bar()
mutex = threading.Lock()
def solve_instance(instance: hanab_game.HanabiInstance):
# first, sanity check on running out of pace
result = deck_analyzer.analyze(instance)
if result is not None:
assert type(result) == deck_analyzer.InfeasibilityReason
logger.debug("found infeasible deck")
return False, None, None
for num_remaining_cards in [0, 20]:
# logger.info("trying with {} remaining cards".format(num_remaining_cards))
game = hanab_game.GameState(instance)
strat = greedy_solver.GreedyStrategy(game)
# make a number of greedy moves
while not game.is_over() and not game.is_known_lost():
if num_remaining_cards != 0 and game.progress == game.deck_size - num_remaining_cards:
break # stop solution here
strat.make_move()
# check if we won already
if game.is_won():
# print("won with greedy strat")
return True, game, num_remaining_cards
# now, apply sat solver
if not game.is_over():
logger.debug("continuing greedy sol with SAT")
solvable, sol = solve_sat(game)
if solvable is None:
return True, sol, num_remaining_cards
logger.debug(
"No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(
num_remaining_cards, compress.link(game)))
# print("Aborting trying with greedy strat")
logger.debug("Starting full SAT solver")
game = hanab_game.GameState(instance)
a, b = solve_sat(game)
return a, b, instance.draw_pile_size
@pebble.concurrent.process(timeout=150)
def solve_seed_with_timeout(seed, num_players, deck_compressed, var_name: Optional[str] = None):
try:
logger.verbose("Starting to solve seed {}".format(seed))
deck = compress.decompress_deck(deck_compressed)
t0 = time.perf_counter()
solvable, solution, num_remaining_cards = solve_instance(hanab_game.HanabiInstance(deck, num_players))
t1 = time.perf_counter()
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), solvable))
mutex.acquire()
if solvable is not None:
database.cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed))
database.conn.commit()
mutex.release()
if solvable == True:
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
num_remaining_cards, seed, compress.link(solution))
)
elif solvable == False:
logger.debug("seed {} was not solvable".format(seed))
logger.debug('{}-player, seed {:10}, {}\n'.format(num_players, seed, var_name))
elif solvable is None:
logger.verbose("seed {} skipped".format(seed))
else:
raise Exception("Programming Error")
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
def solve_seed(seed, num_players, deck_compressed, var_name: Optional[str] = None):
f = solve_seed_with_timeout(seed, num_players, deck_compressed, var_name)
try:
return f.result()
except TimeoutError:
logger.verbose("Solving on seed {} timed out".format(seed))
return
def solve_unknown_seeds(variant_id, variant_name: Optional[str] = None):
database.cur.execute(
"SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL",
(variant_id,)
)
res = database.cur.fetchall()
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
fs = [executor.submit(solve_seed, r[0], r[1], r[2], variant_name) for r in res]
with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for f in concurrent.futures.as_completed(fs):
bar()

View file

@ -23,7 +23,7 @@ def get_all_variant_ids() -> List[int]:
return [var_id for (var_id,) in database.cur.fetchall()]
def variant_name(var_id) -> Optional[str]:
def variant_name(var_id) -> Optional[int]:
database.cur.execute(
"SELECT name FROM variants WHERE id = %s",
(var_id,)

View file

@ -0,0 +1,195 @@
from enum import Enum
from typing import List
import alive_progress
from hanabi import database
from hanabi import logger
from hanabi import hanab_game
from hanabi.live import compress
class InfeasibilityType(Enum):
OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is
OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit
CritAtBottom = 3
class InfeasibilityReason:
def __init__(self, infeasibility_type: InfeasibilityType, score_upper_bound, value=None):
self.type = infeasibility_type
self.score_upper_bound = score_upper_bound
self.value = value
def __repr__(self):
match self.type:
case InfeasibilityType.OutOfPace:
return "Upper bound {}, since deck runs out of pace after drawing card {}".format(self.score_upper_bound, self.value)
case InfeasibilityType.OutOfHandSize:
return "Upper bound {}, since deck runs out of hand size after drawing card {}".format(self.score_upper_bound, self.value)
case InfeasibilityType.CritAtBottom:
return "Upper bound {}, sicne deck has critical non-5 at bottom".format(self.score_upper_bound)
def analyze(instance: hanab_game.HanabiInstance, only_find_first=False) -> List[InfeasibilityReason]:
"""
Checks instance for the following (easy) certificates for unfeasibility
- There is a critical non-5 at the bottom
- We necessarily run out of pace when playing this deck:
At some point, among all drawn cards, there are too few playable ones so that the next discard
reduces pace to a negative amount
- We run out of hand size when playing this deck:
At some point, there are too many critical cards (that also could not have been played) for the players
to hold collectively
:param instance: Instance to be analyzed
:param only_find_first: If true, we immediately return when finding the first infeasibility reason and don't
check for further ones. Might be slightly faster on some instances, especially dark ones.
:return: List with all reasons found. Empty if none is found.
In particular, if return value is not the empty list, the analyzed instance is unfeasible
"""
reasons = []
# check for critical non-fives at bottom of the deck
bottom_card = instance.deck[-1]
if bottom_card.rank != 5 and bottom_card.suitIndex in instance.dark_suits:
reasons.append(InfeasibilityReason(
InfeasibilityType.CritAtBottom,
instance.max_score - 5 + bottom_card.rank,
instance.deck_size - 1
))
if only_find_first:
return reasons
# we will sweep through the deck and pretend that
# - we keep all non-trash cards in our hands
# - we instantly play all playable cards as soon as we have them
# - we recurse on this instant-play
#
# For example, we assume that once we draw r2, we check if we can play r2.
# If yes, then we also check if we drew r3 earlier and so on.
# If not, then we keep r2 in our hands
#
# In total, this is equivalent to assuming that we infinitely many clues
# and infinite storage space in our hands (which is of course not true),
# but even in this setting, some games are infeasible due to pace issues
# that we can detect
#
# A small refinement is to pretend that we only have infinite storage for non-crit cards,
# for crit-cards, the usual hand card limit applies.
# This allows us to detect some seeds where there are simply too many unplayable cards to hold at some point
# that also can't be discarded
# this allows us to detect standard pace issue arguments
stacks = [0] * instance.num_suits
# we will ensure that stored_crits is a subset of stored_cards
stored_cards = set()
stored_crits = set()
min_forced_pace = instance.initial_pace
worst_pace_index = 0
max_forced_crit_discard = 0
worst_crit_index = 0
for (i, card) in enumerate(instance.deck):
if card.rank == stacks[card.suitIndex] + 1:
# card is playable
stacks[card.suitIndex] += 1
# check for further playables that we stored
for check_rank in range(card.rank + 1, 6):
check_card = hanab_game.DeckCard(card.suitIndex, check_rank)
if check_card in stored_cards:
stacks[card.suitIndex] += 1
stored_cards.remove(check_card)
if check_card in stored_crits:
stored_crits.remove(check_card)
else:
break
elif card.rank <= stacks[card.suitIndex]:
pass # card is trash
elif card.rank > stacks[card.suitIndex] + 1:
# need to store card
if card in stored_cards or card.rank == 5:
stored_crits.add(card)
stored_cards.add(card)
# check for out of handsize (this number can be negative, in which case nothing applies)
# Note the +1 at the end, which is there because we have to discard next,
# so even if we currently have as many crits as we can hold, we have to discard one
num_forced_crit_discards = len(stored_crits) - instance.num_players * instance.hand_size + 1
if len(stored_crits) - instance.num_players * instance.hand_size > max_forced_crit_discard:
worst_crit_index = i
max_forced_crit_discard = num_forced_crit_discards
if only_find_first:
reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
# the last - 1 is there because we have to discard 'next', causing a further draw
max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1
needed_plays = instance.max_score - sum(stacks)
cur_pace = max_remaining_plays - needed_plays
if cur_pace < min(0, min_forced_pace):
min_forced_pace = cur_pace
worst_pace_index = i
if only_find_first:
reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
# check that we correctly walked through the deck
assert (len(stored_cards) == 0)
assert (len(stored_crits) == 0)
assert (sum(stacks) == instance.max_score)
if max_forced_crit_discard > 0:
reasons.append(
InfeasibilityReason(
InfeasibilityType.OutOfHandSize,
instance.max_score - max_forced_crit_discard,
worst_crit_index
)
)
if min_forced_pace < 0:
reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
def run_on_database(variant_id):
database.cur.execute(
"SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) ORDER BY (num_players, seed)",
(variant_id,)
)
res = database.cur.fetchall()
logger.verbose("Checking {} seeds of variant {} for infeasibility".format(len(res), variant_id))
with alive_progress.alive_bar(total=len(res), title='Check for infeasibility reasons in var {}'.format(variant_id)) as bar:
for (seed, num_players, deck_str) in res:
deck = compress.decompress_deck(deck_str)
reasons = analyze(hanab_game.HanabiInstance(deck, num_players))
for reason in reasons:
database.cur.execute(
"INSERT INTO score_upper_bounds (seed, score_upper_bound, reason) "
"VALUES (%s,%s,%s) "
"ON CONFLICT (seed, reason) DO UPDATE "
"SET score_upper_bound = EXCLUDED.score_upper_bound",
(seed, reason.score_upper_bound, reason.type.value)
)
database.cur.execute(
"UPDATE seeds SET feasible = (%s) WHERE seed = (%s)",
(False, seed)
)
bar()
database.conn.commit()

View file

@ -157,7 +157,7 @@ class GreedyStrategy():
hand_states = [[CardState(card_type(self.game_state, card), card, None) for card in self.game_state.hands[p]]
for p in range(self.game_state.num_players)]
# find dupes in players hands, mark one card crit and the other one trash
# find dupes in players hands, marke one card crit and the other one trash
p = False
for states in hand_states:
counter = collections.Counter(map(lambda state: state.card, states))

View file

@ -32,15 +32,6 @@ class Literals():
}
}
# progress[m] = i "after move m the next card drawn from the deck has index i"
self.next_draw = {
-1: Int(0)
, **{
m: Symbol('m{}progress'.format(m), INT)
for m in range(instance.max_winning_moves)
}
}
# strikes[m][i] == "after move m we have at least i strikes"
self.strikes = {
-1: {i: Bool(i == 0) for i in range(0, instance.num_strikes + 1)} # no strikes when we start
@ -222,10 +213,6 @@ def solve_sat(starting_state: hanab_game.GameState | hanab_game.HanabiInstance,
Implies(And(Or(ls.discard_any[m], ls.dummyturn[m]), Not(ls.incr_clues[m])),
Equals(ls.clues[m], ls.clues[m - 1])),
# change of progress
Implies(ls.draw_any[m], Equals(ls.next_draw[m], ls.next_draw[m-1] + 1)),
Implies(Not(ls.draw_any[m]), Equals(ls.next_draw[m], ls.next_draw[m-1])),
# change of pace
Implies(And(ls.discard_any[m], Or(ls.strike[m], Not(ls.play[m]))), Equals(ls.pace[m], ls.pace[m - 1] - 1)),
Implies(Or(Not(ls.discard_any[m]), And(Not(ls.strike[m]), ls.play[m])), Equals(ls.pace[m], ls.pace[m - 1])),
@ -328,7 +315,7 @@ def solve_sat(starting_state: hanab_game.GameState | hanab_game.HanabiInstance,
constraints = And(*[valid_move(m) for m in range(first_turn, instance.max_winning_moves)], win)
# print('Solving instance with {} variables, {} nodes'.format(len(get_atoms(constraints)), get_formula_size(constraints)))
model = get_model(constraints, solver_name="z3")
model = get_model(constraints)
if model:
log_model(model, game_state, ls)
solution = evaluate_model(model, copy.deepcopy(game_state), ls)

View file

@ -1,4 +1,4 @@
#! /usr//bin/env python3
#! /bin/python3
"""
Short executable file to start the command-line-interface for the hanabi package.

View file

@ -1,32 +0,0 @@
[project]
name = "hanabi"
version = "1.1.5"
description = "Hanabi interface"
readme = "README.md"
license = { file = "LICENSE" }
keywords = [ "hanabi" ]
authors = [
{ name = "Maximilian Keßler", email = "git@maximilian-kessler.de" }
]
dependencies = [
"requests",
"requests_cache",
"pysmt",
"termcolor",
"more_itertools",
"psycopg2",
"alive_progress",
"argparse",
"verboselogs",
"pebble",
"platformdirs",
"PyYAML",
"cython==0.29.36"
]
[project.urls]
"Homepage" = "https://gitlab.com/kesslermaximilian/hanabi"
[build-system]
requires = ["setuptools>=43.0.0", "wheel"]
build-backend = "setuptools.build_meta"

View file

@ -10,5 +10,3 @@ verboselogs
pebble
platformdirs
PyYAML
cython==0.29.36
unidecode

View file

@ -1,145 +0,0 @@
from typing import List, Tuple, Optional
import psycopg2.extras
import hanabi.hanab_game
import hanabi.live.hanab_live
from hanabi import logger
from hanabi.database import conn, cur
def get_actions_table_name(cert_game: bool):
return "certificate_game_actions" if cert_game else "game_actions"
def store_actions(game_id: int, actions: List[hanabi.hanab_game.Action], cert_game: bool = False):
vals = []
for turn, action in enumerate(actions):
vals.append((game_id, turn, action.type.value, action.target, action.value or 0))
psycopg2.extras.execute_values(
cur,
"INSERT INTO {} (game_id, turn, type, target, value) "
"VALUES %s "
"ON CONFLICT (game_id, turn) "
"DO NOTHING".format(get_actions_table_name(cert_game)),
vals
)
conn.commit()
def store_deck_for_seed(seed: str, deck: List[hanabi.hanab_game.DeckCard]):
vals = []
for index, card in enumerate(deck):
vals.append((seed, index, card.suitIndex, card.rank))
psycopg2.extras.execute_values(
cur,
"INSERT INTO decks (seed, deck_index, suit_index, rank) "
"VALUES %s "
"ON CONFLICT (seed, deck_index) DO UPDATE SET "
"(suit_index, rank) = (excluded.suit_index, excluded.rank)",
vals
)
conn.commit()
def load_actions(game_id: int, cert_game: bool = False) -> List[hanabi.hanab_game.Action]:
cur.execute("SELECT type, target, value FROM {} "
"WHERE game_id = %s "
"ORDER BY turn ASC".format(get_actions_table_name(cert_game)),
(game_id,))
actions = []
for action_type, target, value in cur.fetchall():
actions.append(
hanabi.hanab_game.Action(hanabi.hanab_game.ActionType(action_type), target, value)
)
if len(actions) == 0:
err_msg = "Failed to load actions for game id {} from DB: No actions stored.".format(game_id)
logger.error(err_msg)
raise ValueError(err_msg)
return actions
def load_deck(seed: str) -> List[hanabi.hanab_game.DeckCard]:
cur.execute("SELECT deck_index, suit_index, rank FROM decks "
"WHERE seed = %s "
"ORDER BY deck_index ASC",
(seed,)
)
deck = []
for index, (card_index, suit_index, rank) in enumerate(cur.fetchall()):
assert index == card_index
deck.append(
hanabi.hanab_game.DeckCard(suit_index, rank, card_index)
)
if len(deck) == 0:
err_msg = "Failed to load deck for seed {} from DB: No cards stored.".format(seed)
logger.error(err_msg)
raise ValueError(err_msg)
return deck
def load_instance(seed: str) -> Optional[hanabi.live.hanab_live.HanabLiveInstance]:
cur.execute(
"SELECT num_players, variant_id "
"FROM seeds WHERE seed = %s ",
(seed,)
)
res = cur.fetchone()
if res is None:
return None
(num_players, var_id) = res
deck = load_deck(seed)
return hanabi.live.hanab_live.HanabLiveInstance(deck, num_players, var_id)
def load_game_parts(game_id: int, cert_game: bool = False) -> Tuple[hanabi.live.hanab_live.HanabLiveInstance, List[hanabi.hanab_game.Action]]:
"""
Loads information on game from database
@param game_id: ID of game
@return: Instance (i.e. deck + settings) of game, list of actions, variant name
"""
cur.execute(
"SELECT "
"games.num_players, games.seed, games.one_extra_card, games.one_less_card, games.deck_plays, "
"games.all_or_nothing,"
"variants.clue_starved, variants.name, variants.id, variants.throw_it_in_a_hole "
"FROM games "
"INNER JOIN variants"
" ON games.variant_id = variants.id "
"WHERE games.id = %s",
(game_id,)
)
res = cur.fetchone()
if res is None:
err_msg = "Failed to retrieve game details of game {}.".format(game_id)
logger.error(err_msg)
raise ValueError(err_msg)
# Unpack results now
(num_players, seed, one_extra_card, one_less_card, deck_plays, all_or_nothing, clue_starved, variant_name, variant_id, throw_it_in_a_hole) = res
actions = load_actions(game_id, cert_game)
deck = load_deck(seed)
instance = hanabi.live.hanab_live.HanabLiveInstance(
deck=deck,
num_players=num_players,
variant_id=variant_id,
one_extra_card=one_extra_card,
one_less_card=one_less_card,
fives_give_clue=not throw_it_in_a_hole,
deck_plays=deck_plays,
all_or_nothing=all_or_nothing,
clue_starved=clue_starved
)
return instance, actions
def load_game(game_id: int, cert_game: bool = False) -> hanabi.live.hanab_live.HanabLiveGameState:
instance, actions = load_game_parts(game_id, cert_game)
game = hanabi.live.hanab_live.HanabLiveGameState(instance)
for action in actions:
game.make_action(action)
return game

View file

@ -1,181 +0,0 @@
DROP TABLE IF EXISTS users CASCADE;
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
normalized_username TEXT NOT NULL UNIQUE
);
DROP TABLE IF EXISTS seeds CASCADE;
CREATE TABLE seeds (
seed TEXT NOT NULL PRIMARY KEY,
num_players SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
starting_player SMALLINT NOT NULL DEFAULT 0,
/* Type of seed: 0 is from the website, all other integers are customly generated testsets, arbitrarily ordered into groups */
class SMALLINT NOT NULL DEFAULT 0,
/* For seeds on the website: Always 0. For custom seeds: Numbered within their class */
num INT NOT NULL DEFAULT 0,
feasible BOOLEAN DEFAULT NULL,
solved BOOLEAN GENERATED ALWAYS AS ( feasible IS NOT NULL ) STORED,
/*
If seed solved: Amount of time (in ms) to solve seed.
If seed not solved: Maximum amount of time spent on solving before timeout
*/
solve_time_ms INT NOT NULL DEFAULT 0,
max_score_theoretical SMALLINT
);
CREATE INDEX seeds_variant_class_feasible_idx ON seeds (variant_id, class, feasible);
DROP TABLE IF EXISTS decks CASCADE;
CREATE TABLE decks (
seed TEXT REFERENCES seeds (seed) ON DELETE CASCADE,
/* Order of card in deck*/
deck_index SMALLINT NOT NULL,
/* Suit */
suit_index SMALLINT NOT NULL,
/* Rank */
rank SMALLINT NOT NULL,
PRIMARY KEY (seed, deck_index)
);
DROP TABLE IF EXISTS games CASCADE;
CREATE TABLE games (
id INT PRIMARY KEY,
num_players SMALLINT NOT NULL,
starting_player SMALLINT NOT NULL DEFAULT 0,
variant_id SMALLINT NOT NULL,
timed BOOLEAN,
time_base INTEGER,
time_per_turn INTEGER,
speedrun BOOLEAN,
card_cycle BOOLEAN,
deck_plays BOOLEAN,
empty_clues BOOLEAN,
one_extra_card BOOLEAN,
one_less_card BOOLEAN,
all_or_nothing BOOLEAN,
detrimental_characters BOOLEAN,
seed TEXT NOT NULL REFERENCES seeds,
score SMALLINT NOT NULL,
num_turns SMALLINT
);
CREATE INDEX games_seed_score_idx ON games (seed, score);
CREATE INDEX games_var_seed_idx ON games (variant_id, seed);
CREATE INDEX games_player_idx ON games (num_players);
/* Example games finishing with max score, not necessarily played by humans. */
DROP TABLE IF EXISTS certificate_games CASCADE;
CREATE TABLE certificate_games (
id SERIAL PRIMARY KEY,
seed TEXT NOT NULL REFERENCES seeds ON DELETE CASCADE,
num_turns SMALLINT NOT NULL,
min_pace SMALLINT,
num_bdrs SMALLINT
);
CREATE INDEX certificate_games_seed_idx ON games (seed);
DROP TABLE IF EXISTS game_participants CASCADE;
CREATE TABLE game_participants (
id SERIAL PRIMARY KEY,
game_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
seat SMALLINT NOT NULL, /* Needed for the "GetNotes()" function */
FOREIGN KEY (game_id) REFERENCES games (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE,
CONSTRAINT game_participants_unique UNIQUE (game_id, user_id)
);
DROP FUNCTION IF EXISTS delete_game_of_deleted_participant;
CREATE FUNCTION delete_game_of_deleted_participant() RETURNS TRIGGER AS $_$
BEGIN
DELETE FROM games WHERE games.id = OLD.game_id;
RETURN OLD;
END $_$ LANGUAGE 'plpgsql';
CREATE TRIGGER delete_game_upon_participant_deletion
AFTER DELETE ON game_participants
FOR EACH ROW
EXECUTE PROCEDURE delete_game_of_deleted_participant();
DROP TABLE IF EXISTS game_participant_notes CASCADE;
CREATE TABLE game_participant_notes (
game_participant_id INTEGER NOT NULL,
card_order SMALLINT NOT NULL, /* "order" is a reserved word in PostgreSQL. */
note TEXT NOT NULL,
FOREIGN KEY (game_participant_id) REFERENCES game_participants (id) ON DELETE CASCADE,
PRIMARY KEY (game_participant_id, card_order)
);
DROP TABLE IF EXISTS game_actions CASCADE;
CREATE TABLE game_actions (
game_id INTEGER NOT NULL,
turn SMALLINT NOT NULL,
/**
* Corresponds to the "DatabaseGameActionType" enum.
*
* - 0 - play
* - 1 - discard
* - 2 - color clue
* - 3 - rank clue
* - 4 - game over
*/
type SMALLINT NOT NULL,
/**
* - If a play or a discard, corresponds to the order of the the card that was played/discarded.
* - If a clue, corresponds to the index of the player that received the clue.
* - If a game over, corresponds to the index of the player that caused the game to end or -1 if
* the game was terminated by the server.
*/
target SMALLINT NOT NULL,
/**
* - If a play or discard, then 0 (as NULL). It uses less database space and reduces code
* complexity to use a value of 0 for NULL than to use a SQL NULL:
* https://dev.mysql.com/doc/refman/8.0/en/data-size.html
* - If a color clue, then 0 if red, 1 if yellow, etc.
* - If a rank clue, then 1 if 1, 2 if 2, etc.
* - If a game over, then the value corresponds to the "endCondition" values in "constants.go".
*/
value SMALLINT NOT NULL,
FOREIGN KEY (game_id) REFERENCES games (id) ON DELETE CASCADE,
PRIMARY KEY (game_id, turn)
);
/* Functions the same as game_actions, just for certificate_games instead */
DROP TABLE IF EXISTS certificate_game_actions CASCADE;
CREATE TABLE certificate_game_actions (
game_id INTEGER NOT NULL,
turn SMALLINT NOT NULL,
type SMALLINT NOT NULL,
target SMALLINT NOT NULL,
value SMALLINT NOT NULL,
FOREIGN KEY (game_id) REFERENCES certificate_games (id) ON DELETE CASCADE,
PRIMARY KEY (game_id, turn)
);
DROP TABLE IF EXISTS infeasibility_reasons CASCADE;
CREATE TABLE infeasibility_reasons (
seed TEXT NOT NULL REFERENCES seeds (seed) ON DELETE CASCADE,
reason SMALLINT NOT NULL,
/*
Some value whose meaning depends on the type of reason, for example index when pace loss occurs.
Can be null for some reason.
*/
value SMALLINT,
PRIMARY KEY (seed, reason)
);

View file

@ -1,73 +0,0 @@
import hanabi.live.compress
from hanabi.hanab_game import DeckCard
from hanabi import database
from hanabi.live.variants import Variant
from hanabi.database import games_db_interface
import random
from src.hanabi.solvers.sat import solve_sat
def get_deck(variant: Variant):
deck = []
for suit_index, suit in enumerate(variant.suits):
if suit.dark:
for rank in range(1, 6):
deck.append(DeckCard(suit_index, rank))
else:
deck.append(DeckCard(suit_index, 1))
if not suit.reversed:
deck.append(DeckCard(suit_index, 1))
deck.append(DeckCard(suit_index, 1))
for rank in range(2,5):
deck.append(DeckCard(suit_index, rank))
deck.append(DeckCard(suit_index, rank))
deck.append(DeckCard(suit_index, 5))
if suit.reversed:
deck.append(DeckCard(suit_index, 5))
deck.append(DeckCard(suit_index, 5))
return deck
def generate_deck(variant: Variant, num_players: int, seed: int, seed_class: int = 1):
deck = get_deck(variant)
seed = "p{}c{}s{}".format(num_players, seed_class, seed)
random.seed(seed)
random.shuffle(deck)
return seed, deck
def link():
seed = "p5v0sunblinkingly-kobe-prescriptively"
deck = database.games_db_interface.load_deck(seed)
database.cur.execute("SELECT id FROM certificate_games WHERE seed = %s", (seed,))
(game_id, ) = database.cur.fetchone()
actions = database.games_db_interface.load_actions(game_id, True)
inst = hanabi.hanab_game.HanabiInstance(deck, 5)
game = hanabi.hanab_game.GameState(inst)
for action in actions:
game.make_action(action)
print(hanabi.live.compress.link(game))
def generate_decks_for_variant(variant_id: int, num_players: int, num_seeds: int, seed_class: int = 1):
variant = Variant.from_db(variant_id)
for seed_num in range(num_seeds):
seed, deck = generate_deck(variant, num_players, seed_num, seed_class)
database.cur.execute(
"INSERT INTO seeds (seed, num_players, starting_player, variant_id, class, num) "
"VALUES (%s, %s, %s, %s, %s, %s)"
"ON CONFLICT (seed) DO NOTHING",
(seed, num_players, 0, variant_id, seed_class, seed_num)
)
games_db_interface.store_deck_for_seed(seed, deck)
def main():
database.global_db_connection_manager.read_config()
database.global_db_connection_manager.connect()
link()
# generate_decks_for_variant(0, 2, 100)
if __name__ == '__main__':
main()

View file

@ -1,146 +0,0 @@
from typing import List, Dict, Tuple
from hanabi.hanab_game import Action, ParseError
from hanabi import hanab_game
from hanabi import constants
from hanabi.live import variants
class HanabLiveInstance(hanab_game.HanabiInstance):
def __init__(
self,
deck: List[hanab_game.DeckCard],
num_players: int,
variant_id: int,
one_extra_card: bool = False,
one_less_card: bool = False,
*args, **kwargs
):
self.one_extra_card = one_extra_card
self.one_less_card = one_less_card
assert 2 <= num_players <= 6
hand_size = constants.HAND_SIZES[num_players]
if one_less_card:
hand_size -= 1
if one_extra_card:
hand_size += 1
super().__init__(deck, num_players, hand_size=hand_size, *args, **kwargs)
self.variant_id = variant_id
self.variant = variants.Variant.from_db(self.variant_id)
@staticmethod
def select_standard_variant_id(instance: hanab_game.HanabiInstance):
err_msg = "Hanabi instance not supported by hanab.live, cannot convert to HanabLiveInstance: "
assert 3 <= instance.num_suits <= 6, \
err_msg + "Illegal number of suits ({}) found, must be in range [3,6]".format(instance.num_suits)
assert 0 <= instance.num_dark_suits <= 2, \
err_msg + "Illegal number of dark suits ({}) found, must be in range [0,2]".format(instance.num_dark_suits)
assert 4 <= max(instance.num_suits, 4) - instance.num_dark_suits, \
err_msg + "Illegal ratio of dark suits to suits, can have at most {} dark suits with {} total suits".format(
max(instance.num_suits - 4, 0), instance.num_suits
)
return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[instance.num_suits][instance.num_dark_suits]
def parse_json_game(game_json: Dict, as_hanab_live_instance: bool = True) \
-> Tuple[HanabLiveInstance | hanab_game.HanabiInstance, List[Action]]:
game_id = game_json.get('id', None)
players = game_json.get('players', [])
num_players = len(players)
if num_players < 2 or num_players > 6:
raise ParseError(num_players)
options = game_json.get('options', {})
var_name = options.get('variant', 'No Variant')
deck_plays = options.get('deckPlays', False)
one_extra_card = options.get('oneExtraCard', False)
one_less_card = options.get('oneLessCard', False)
all_or_nothing = options.get('allOrNothing', False)
starting_player = options.get('startingPlayer', 0)
detrimental_characters = options.get('detrimentalCharacters', False)
try:
actions = [hanab_game.Action.from_json(action) for action in game_json.get('actions', [])]
except hanab_game.ParseError as e:
raise ParseError("Failed to parse actions") from e
try:
deck = [hanab_game.DeckCard.from_json(card) for card in game_json.get('deck', None)]
except hanab_game.ParseError as e:
raise ParseError("Failed to parse deck") from e
if detrimental_characters:
raise NotImplementedError(
"detrimental characters not supported, cannot determine score of game {}".format(game_id)
)
if as_hanab_live_instance:
var_id = variants.variant_id(var_name)
return HanabLiveInstance(
deck, num_players, var_id,
deck_plays=deck_plays,
one_less_card=one_less_card,
one_extra_card=one_extra_card,
all_or_nothing=all_or_nothing,
starting_player=starting_player
), actions
else:
hand_size = constants.HAND_SIZES[num_players]
if one_less_card:
hand_size -= 1
if one_extra_card:
hand_size += 1
clue_starved = 'Clue Starved' in var_name
return hanab_game.HanabiInstance(
deck, num_players, hand_size,
clue_starved=clue_starved,
deck_plays=deck_plays,
all_or_nothing=all_or_nothing,
starting_player=starting_player
), actions
class HanabLiveGameState(hanab_game.GameState):
def __init__(self, instance: HanabLiveInstance):
super().__init__(instance)
self.instance: HanabLiveInstance = instance
def to_json(self):
return {
"actions": [action.to_json() for action in self.actions],
"deck": [card.to_json() for card in self.deck],
"players": ["Alice", "Bob", "Cathy", "Donald", "Emily", "Frank"][:self.num_players],
"notes": [[]] * self.num_players,
"options": {
"variant": self.instance.variant_id,
"deckPlays": self.instance.deck_plays,
"oneExtraCard": self.instance.one_extra_card,
"oneLessCard": self.instance.one_less_card,
"allOrNothing": self.instance.all_or_nothing,
"startingPlayer": self.instance.starting_player
}
}
def _waste_clue(self) -> hanab_game.Action:
for player in range(self.turn + 1, self.turn + self.num_players):
for card in self.hands[player % self.num_players]:
for rank in self.instance.variant.ranks:
if self.instance.variant.rank_touches(card, rank):
return hanab_game.Action(
hanab_game.ActionType.RankClue,
player % self.num_players,
rank
)
for color in range(self.instance.variant.num_colors):
if self.instance.variant.color_touches(card, color):
return hanab_game.Action(
hanab_game.ActionType.ColorClue,
player % self.num_players,
color
)
raise RuntimeError("Current game state did not permit any legal clue."
"This case is incredibly rare and currently not handled.")

View file

@ -1,270 +0,0 @@
from dataclasses import dataclass
from types import NoneType
from typing import Optional, Tuple, List
import pebble.concurrent
import concurrent.futures
import traceback
import alive_progress
import threading
import time
import psycopg2.extras
import hanabi.hanab_game
from hanabi import logger
from hanabi.hanab_game import GameState
from hanabi.solvers.sat import solve_sat
from hanabi import database
from hanabi.live import download_data
from hanabi.live import compress
from hanabi import hanab_game
from hanabi.solvers import greedy_solver
from hanabi.solvers import deck_analyzer
from hanabi.live import variants
from hanabi.database.games_db_interface import store_actions
MAX_PROCESSES = 3
def update_trivially_feasible_games(variant_id):
variant: variants.Variant = variants.Variant.from_db(variant_id)
database.cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,))
seeds = database.cur.fetchall()
logger.verbose('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds)))
with alive_progress.alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar:
for (seed,) in seeds:
database.cur.execute(
"SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing, detrimental_characters "
"FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;",
(variant.max_score, seed)
)
res = database.cur.fetchall()
logger.debug("Checking seed {}: {:3} results".format(seed, len(res)))
for (game_id, a, b, c, d, e) in res:
if None in [a, b, c, d, e]:
logger.debug(' Game {} not found in database, exporting...'.format(game_id))
download_data.detailed_export_game(
game_id, var_id=variant_id, score=variant.max_score, seed_exists=True
)
database.cur.execute("SELECT deck_plays, one_extra_card, one_less_card, all_or_nothing, "
"detrimental_characters "
"FROM games WHERE id = (%s)",
(game_id,))
(a, b, c, d, e) = database.cur.fetchone()
else:
logger.debug(' Game {} already in database'.format(game_id))
valid = not any([a, b, c, d, e])
if valid:
print(a, b, c, d, e)
logger.verbose(
'Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id))
database.cur.execute("UPDATE seeds SET (feasible, max_score_theoretical) = (%s, %s) WHERE seed = "
"(%s)", (True, variant.max_score, seed))
database.cur.execute(
"INSERT INTO score_lower_bounds (seed, score_lower_bound, game_id) VALUES (%s, %s, %s)",
(seed, variant.max_score, game_id)
)
database.conn.commit()
break
else:
logger.verbose(' Cheaty game {} found'.format(game_id))
bar()
def get_decks_for_all_seeds():
cur = database.conn.database.cursor()
cur.execute("SELECT id "
"FROM games "
" INNER JOIN seeds "
" ON seeds.seed = games.seed"
" WHERE"
" seeds.deck is null"
" AND"
" games.id = ("
" SELECT id FROM games WHERE games.seed = seeds.seed LIMIT 1"
" )"
)
print("Exporting decks for all seeds")
res = cur.fetchall()
with alive_progress.alive_bar(len(res), title="Exporting decks") as bar:
for (game_id,) in res:
download_data.detailed_export_game(game_id)
bar()
@dataclass
class SolutionData:
infeasibility_reasons: Optional[List[deck_analyzer.InfeasibilityReason]]
seed: str = None
time_ms: int = 0
feasible: Optional[bool] = None
solution: Optional[GameState] = None
num_remaining_cards: Optional[int] = None
skipped: bool = False
def __init__(self):
self.infeasibility_reasons = []
def solve_instance(instance: hanab_game.HanabiInstance)-> SolutionData:
retval = SolutionData()
# first, sanity check on running out of pace
result = deck_analyzer.analyze(instance)
if len(result.infeasibility_reasons) != 0:
logger.verbose("found infeasible deck by preliminary analysis")
retval.feasible = False
retval.infeasibility_reasons = result.infeasibility_reasons
return retval
for num_remaining_cards in [0, 10, 20]:
# logger.info("trying with {} remaining cards".format(num_remaining_cards))
game = hanab_game.GameState(instance)
strat = greedy_solver.GreedyStrategy(game)
# make a number of greedy moves
while not game.is_over() and not game.is_known_lost():
if num_remaining_cards != 0 and game.progress == game.deck_size - num_remaining_cards:
break # stop solution here
strat.make_move()
# check if we won already
if game.is_won():
retval.feasible = True
retval.solution = game
retval.num_remaining_cards = num_remaining_cards
# print("won with greedy strat")
return retval
# now, apply sat solver
if not game.is_over():
logger.debug("continuing greedy sol with SAT")
solvable, solution = solve_sat(game)
if solvable:
retval.feasible = True
retval.solution = solution
retval.num_remaining_cards = num_remaining_cards
return retval
logger.debug(
"No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(
num_remaining_cards, compress.link(game)))
logger.debug("Starting full SAT solver")
game = hanab_game.GameState(instance)
retval.feasible, retval.solution = solve_sat(game)
retval.num_remaining_cards = instance.draw_pile_size
if not retval.feasible:
assert len(retval.infeasibility_reasons) == 0
retval.infeasibility_reasons.append(deck_analyzer.InfeasibilityReason(deck_analyzer.InfeasibilityType.SAT))
return retval
def solve_seed(seed, num_players, deck, timeout: Optional[int] = 150) -> SolutionData:
try:
@pebble.concurrent.process(timeout=timeout)
def solve_seed_with_timeout(seed, num_players, deck) -> SolutionData:
try:
logger.verbose("Starting to solve seed {}".format(seed))
t0 = time.perf_counter()
retval = solve_instance(hanab_game.HanabiInstance(deck, num_players))
t1 = time.perf_counter()
retval.seed = seed
retval.time_ms = round((t1 - t0) * 1000)
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), retval.feasible))
return retval
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
f = solve_seed_with_timeout(seed, num_players, deck)
try:
return f.result()
except TimeoutError:
retval = SolutionData()
retval.seed = seed
retval.feasible = None
retval.time_ms = 1000 * timeout
logger.verbose("Solving on seed {} timed out".format(seed))
return retval
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
def process_solve_result(result: SolutionData):
if result.feasible is not None:
database.cur.execute("UPDATE seeds SET (feasible, solve_time_ms) = (%s, %s) WHERE seed = (%s)",
(result.feasible, result.time_ms, result.seed))
if result.feasible:
assert result.solution is not None
database.cur.execute("INSERT INTO certificate_games (seed, num_turns) "
"VALUES (%s, %s) "
"RETURNING ID ", (result.seed, len(result.solution.actions)))
game_id = database.cur.fetchone()[0]
store_actions(game_id, result.solution.actions, True)
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
result.num_remaining_cards, result.seed, compress.link(result.solution))
)
else:
logger.debug("seed {} was not solvable".format(result.seed))
vals = [(result.seed, reason.type.value, reason.value) for reason in result.infeasibility_reasons]
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO infeasibility_reasons (seed, reason, value) "
"VALUES %s "
"ON CONFLICT (seed, reason) DO NOTHING",
vals
)
database.conn.commit()
elif result.skipped:
logger.verbose("seed {} skipped".format(result.seed))
else:
database.cur.execute("UPDATE seeds SET solve_time_ms = %s WHERE seed = (%s)", (result.time_ms, result.seed))
database.conn.commit()
def solve_unknown_seeds(variant_id, seed_class: int = 0, num_players: Optional[int] = None, timeout: Optional[int] = 150, num_threads: int = 4):
variant_name = variants.variant_name(variant_id)
query = "SELECT seeds.seed, num_players, array_agg(suit_index order by deck_index asc), array_agg(rank order by deck_index asc) "\
"FROM seeds "\
"INNER JOIN decks ON seeds.seed = decks.seed "\
"WHERE variant_id = (%s) "\
"AND class = (%s) "\
"AND feasible IS NULL "\
"AND solve_time_ms < (%s)"
if num_players is not None:
query += "AND num_players = {} ".format(num_players)
query += "GROUP BY seeds.seed ORDER BY num"
database.cur.execute(query,
(variant_id, seed_class, 1000 * timeout)
)
res = database.cur.fetchall()
data = []
for (seed, num_players, suits, ranks) in res:
assert len(suits) == len(ranks)
deck = []
for (suit, rank) in zip(suits, ranks):
deck.append(hanabi.hanab_game.DeckCard(suit, rank))
data.append((seed, num_players, deck))
"""
with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for d in data:
solve_seed(d[0], d[1], d[2], timeout)
bar()
return
"""
with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
fs = [executor.submit(solve_seed, d[0], d[1], d[2], timeout) for d in data]
with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for f in concurrent.futures.as_completed(fs):
result = f.result()
process_solve_result(result)
bar()

View file

@ -1,349 +0,0 @@
import collections
import dataclasses
from enum import Enum
from typing import List, Any, Optional, Tuple, Set
from dataclasses import dataclass
import alive_progress
import hanabi.hanab_game
from hanabi import database
from hanabi import logger
from hanabi import hanab_game
from hanabi.hanab_game import DeckCard
from hanabi.live import compress
from hanabi.database import games_db_interface
class InfeasibilityType(Enum):
Pace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is
DoubleBottom2With5s = 1 # same, special case for 2p
TripleBottom1With5s = 2 # same, special case for 2p
HandSize = 10 # idx denotes index of last card drawn before being forced to discard a crit
PaceAfterSqueeze = 11 # pace goes down to 0 after cards have been forcibly lost due to hand size that *might* have helped prevent this situation.
BottomTopDeck = 20 # Card distribution in a single suit in starting hands + near end of deck is impossible to win. value represents suit Index
# further reasons, currently not scanned for
DoubleBottomTopDeck = 30 # Card distribution in two suits in starting hands + near end of deck is impossible to win.
CritAtBottom = 40
# Default reason when we have nothing else
SAT = 50
Manual = 60
class InfeasibilityReason:
def __init__(self, infeasibility_type: InfeasibilityType, value=None):
self.type = infeasibility_type
self.value = value
def __repr__(self):
match self.type:
case InfeasibilityType.Pace:
return "Out of Pace after drawing card {}".format(self.value)
case InfeasibilityType.HandSize:
return "Out of hand size after drawing card {}".format(self.value)
case InfeasibilityType.CritAtBottom:
return "Critical non-5 at bottom"
case _:
return "{} ({})".format(self.type, self.value)
def __eq__(self, other):
return self.type == other.type and self.value == other.value
def __hash__(self):
return (self.type, self.value).__hash__()
def generate_all_choices(l: List[List[Any]]):
if len(l) == 0:
yield []
return
head, *tail = l
for option in head:
for back in generate_all_choices(tail):
yield [option] + back
# Returns index of the suit that makes deck infeasible, or None if it does not exist
def check_for_top_bottom_deck_loss(instance: hanab_game.HanabiInstance) -> Optional[int]:
hands = [instance.deck[p * instance.hand_size : (p+1) * instance.hand_size] for p in range(instance.num_players)]
# scan the deck in reverse order if any card is forced to be late
found = {}
# Note that only the last 4 cards are relevant for single-suit distribution loss
for i, card in enumerate(reversed(instance.deck[-4:])):
if card in found.keys():
found[card] += 1
else:
found[card] = 1
if found[card] >= 3 or (card.rank != 1 and found[card] >= 2):
max_rank_starting_extra_round = card.rank + (instance.deck_size - card.deck_index - 2)
# Next, need to figure out what positions of cards of the same suit are fixed
positions_by_rank = [[] for _ in range(6)]
for rank in range(max_rank_starting_extra_round, 6):
for player, hand in enumerate(hands):
card_test = DeckCard(card.suitIndex, rank)
for card_hand in hand:
if card_test == card_hand:
positions_by_rank[rank].append(player)
# clean up where we have free choice anyway
for rank, positions in enumerate(positions_by_rank):
if rank != 5 and len(positions) < 2:
positions.clear()
if len(positions) == 0:
positions.append(None)
# Now, iterate through all choices in starting hands (None stands for free choice of a card) and check them
assignment_found = False
for assignment in generate_all_choices(positions_by_rank):
cur_player = None
num_turns = 0
for rank in range(max_rank_starting_extra_round, 6):
if cur_player is None or assignment[rank] is None:
num_turns += 1
else:
# Note the -1 and +1 to output things in range [1,5] instead of [0,4]
num_turns += (assignment[rank] - cur_player - 1) % instance.num_players + 1
if assignment[rank] is not None:
cur_player = assignment[rank]
elif cur_player is not None:
cur_player = (cur_player + 1) % instance.num_players
if num_turns <= instance.num_players + 1:
assignment_found = True
# If no assignment worked out, the deck is infeasible because of this suit
if not assignment_found:
return card.suitIndex
# If we reach this point, we checked for every card near the bottom of the deck and found a possible endgame each
return None
def analyze_2p_bottom_loss(instance: hanab_game.HanabiInstance) -> List[InfeasibilityReason]:
reasons = []
filtered_deck = [card for card in instance.deck if card.rank != 5]
if instance.num_players == 2:
if filtered_deck[-1] == filtered_deck[-2] and filtered_deck[-1].rank == 2:
reasons.append(InfeasibilityReason(InfeasibilityType.Pace, filtered_deck[-2].deck_index - 1))
reasons.append(InfeasibilityReason(InfeasibilityType.DoubleBottom2With5s, filtered_deck[-2].deck_index - 1))
if filtered_deck[-1] == filtered_deck[-2] and filtered_deck[-2] == filtered_deck[-3] and filtered_deck[-3].rank == 1:
reasons.append(InfeasibilityReason(InfeasibilityType.Pace, filtered_deck[-3].deck_index - 1))
reasons.append(InfeasibilityReason(InfeasibilityType.TripleBottom1With5s, filtered_deck[-2].deck_index - 1))
return reasons
@dataclass
class ValueWithIndex:
value: int
index: int
stores_minimum: bool = True
def update(self, value, index):
if (self.stores_minimum and value < self.value) or (not self.stores_minimum and value > self.value):
self.value = value
self.index = index
def __repr__(self):
return "{} (at {})".format(self.value, self.index)
@dataclass
class AnalysisResult:
infeasibility_reasons: List[InfeasibilityReason] = dataclasses.field(default_factory=lambda: [])
min_pace: ValueWithIndex = dataclasses.field(default_factory=lambda: ValueWithIndex(100, 0, True))
max_stored_crits: ValueWithIndex = dataclasses.field(default_factory=lambda: ValueWithIndex(0, 0, False))
max_stored_cards: ValueWithIndex = dataclasses.field(default_factory=lambda: ValueWithIndex(0, 0, False))
def analyze_pace_and_hand_size(instance: hanab_game.HanabiInstance, do_squeeze: bool = True) -> AnalysisResult:
reasons = AnalysisResult()
# we will sweep through the deck and pretend that
# - we keep all non-trash cards in our hands
# - we instantly play all playable cards as soon as we have them
# - we recurse on this instant-play
#
# For example, we assume that once we draw r2, we check if we can play r2.
# If yes, then we also check if we drew r3 earlier and so on.
# If not, then we keep r2 in our hands
#
# In total, this is equivalent to assuming that we have infinitely many clues
# and infinite storage space in our hands (which is of course not true),
# but even in this setting, some games are infeasible due to pace issues
# that we can detect
#
# A small refinement is to pretend that we only have infinite storage for non-crit cards,
# for crit-cards, the usual hand card limit applies.
# This allows us to detect some seeds where there are simply too many unplayable cards to hold at some point
# that also can't be discarded
stacks = [0] * instance.num_suits
# we will ensure that stored_crits is a subset of stored_cards
stored_cards = set()
stored_crits = set()
pace_found = False
hand_size_found = False
squeeze = False
artificial_crits = set()
# Investigate BDRs. This catches special cases of Pace losses in 2p, as well as mark some cards critical because
# their second copies cannot be used.
filtered_deck = [card for card in instance.deck if card.rank != 5]
if instance.num_players == 2:
# In 2-player, the second-last card cannot be played if it is a 2
if filtered_deck[-2].rank == 2:
artificial_crits.add(filtered_deck[-2])
# In 2-player, in case there is double bottom 3 of the same suit, the card immediately before cannot be played:
# After playing that one and drawing the first 3, exactly 3,4,5 of the bottom suit have to be played
if filtered_deck[-1] == filtered_deck[-2] and filtered_deck[-2].rank == 3:
artificial_crits.add(filtered_deck[-3])
elif instance.num_players == 3:
if filtered_deck[-1] == filtered_deck[-2] and filtered_deck[-2].rank == 2:
artificial_crits.add(filtered_deck[-3])
# Last card in the deck can never be played unless it is a five.
if instance.deck[-1].rank != 5:
artificial_crits.add(instance.deck[-1])
for (card_index, card) in enumerate(instance.deck):
if card.rank == stacks[card.suitIndex] + 1:
# card is playable
stacks[card.suitIndex] += 1
# check for further playables that we stored
for check_rank in range(card.rank + 1, 6):
check_card = hanab_game.DeckCard(card.suitIndex, check_rank)
if check_card in stored_cards:
stacks[card.suitIndex] += 1
stored_cards.remove(check_card)
if check_card in stored_crits:
stored_crits.remove(check_card)
else:
break
elif card.rank <= stacks[card.suitIndex]:
pass # card is trash
elif card.rank > stacks[card.suitIndex] + 1:
# need to store card
if card in stored_cards or card.rank == 5 or card in artificial_crits:
stored_crits.add(card)
stored_cards.add(card)
hand_size_left_for_crits = instance.num_players * instance.hand_size - len(stored_crits) - 1
# In case we can only keep the critical cards exactly, get rid of all others
if hand_size_left_for_crits == 0 and do_squeeze:
# Note the very important copy here (!)
stored_cards = stored_crits.copy()
squeeze = True
# Use a bool flag to only mark this reason once
if hand_size_left_for_crits < 0 and not hand_size_found:
reasons.infeasibility_reasons.append(InfeasibilityReason(InfeasibilityType.HandSize, card_index))
hand_size_found = True
# the last - 1 is there because we have to discard 'next', causing a further draw
max_remaining_plays = (instance.deck_size - card_index - 1) + instance.num_players - 1
needed_plays = instance.max_score - sum(stacks)
cur_pace = max_remaining_plays - needed_plays
if cur_pace < 0 and not pace_found:
if squeeze:
# We checked single-suit pace losses beforehand (which can only occur in 2p)
reasons.infeasibility_reasons.append(InfeasibilityReason(InfeasibilityType.PaceAfterSqueeze, card_index))
else:
reasons.infeasibility_reasons.append(InfeasibilityReason(InfeasibilityType.Pace, card_index))
pace_found = True
# if card_index != instance.deck_size - 1:
reasons.min_pace.update(cur_pace, card_index)
reasons.max_stored_cards.update(len(stored_cards), card_index)
reasons.max_stored_crits.update(len(stored_crits), card_index)
return reasons
def analyze(instance: hanab_game.HanabiInstance) -> AnalysisResult:
# Check for pace and hand size problems:
result = analyze_pace_and_hand_size(instance)
# In case pace ran out after a squeeze from hand size, we want to run a clean pace analysis again
if any(map(lambda r: r.type == InfeasibilityType.PaceAfterSqueeze, result.infeasibility_reasons)):
result.infeasibility_reasons += analyze_pace_and_hand_size(instance, False).infeasibility_reasons
# Top/bottom deck losses in a single suit.
top_bottom_deck_loss = check_for_top_bottom_deck_loss(instance)
if top_bottom_deck_loss is not None:
result.infeasibility_reasons.append(InfeasibilityReason(InfeasibilityType.BottomTopDeck, top_bottom_deck_loss))
# Special cases of pace loss, categorization for 2p only
result.infeasibility_reasons += analyze_2p_bottom_loss(instance)
# check for critical non-fives at bottom of the deck
bottom_card = instance.deck[-1]
if bottom_card.rank != 5 and bottom_card.suitIndex in instance.dark_suits:
result.infeasibility_reasons.append(InfeasibilityReason(
InfeasibilityType.CritAtBottom,
instance.deck_size - 1
))
# clean up reasons to unique
result.infeasibility_reasons = list(set(result.infeasibility_reasons))
return result
def run_on_database(variant_id):
database.cur.execute(
"SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) ORDER BY (num_players, seed)",
(variant_id,)
)
res = database.cur.fetchall()
logger.verbose("Checking {} seeds of variant {} for infeasibility".format(len(res), variant_id))
with alive_progress.alive_bar(total=len(res), title='Check for infeasibility reasons in var {}'.format(variant_id)) as bar:
for (seed, num_players, deck_str) in res:
deck = compress.decompress_deck(deck_str)
result = analyze(hanab_game.HanabiInstance(deck, num_players))
for reason in result.infeasibility_reasons:
database.cur.execute(
"INSERT INTO score_upper_bounds (seed, score_upper_bound, reason) "
"VALUES (%s,%s,%s) "
"ON CONFLICT (seed, reason) DO UPDATE "
"SET score_upper_bound = EXCLUDED.score_upper_bound",
(seed, reason.score_upper_bound, reason.type.value)
)
database.cur.execute(
"UPDATE seeds SET feasible = (%s) WHERE seed = (%s)",
(False, seed)
)
bar()
database.conn.commit()
def main():
seed = "p5v0sporcupines-underclass-phantasmagorical"
seed = 'p5c1s98804'
seed = 'p4c1s1116'
seed = 'p5c1s14459'
num_players = 5
database.global_db_connection_manager.read_config()
database.global_db_connection_manager.connect()
database.cur.execute("SELECT seed, num_players FROM seeds WHERE (feasible IS NULL OR feasible = false) AND class = 1 AND num_players = 5")
# for (seed, num_players) in database.cur.fetchall():
for _ in range(1):
deck = database.games_db_interface.load_deck(seed)
inst = hanabi.hanab_game.HanabiInstance(deck, num_players)
lost = check_for_top_bottom_deck_loss(inst)
if lost:
print(seed)
if __name__ == "__main__":
main()