Compare commits

..

No commits in common. "main" and "cheating-bot" have entirely different histories.

33 changed files with 628 additions and 1434 deletions

View file

@ -48,27 +48,23 @@ source venv/bin/activate
pip install -r requirements.txt pip install -r requirements.txt
``` ```
### SAT-solver
After installing python, you should have installed `pysmt` with it, an interface to use SAT-solvers with python.
We still need a solver, for this, run
```
$ pysmt-install --help
// Pick a solver, e.g. z3 works
$ pysmt-install --z3
```
### PostgreSQL ### PostgreSQL
You need to install PostgreSQL on your system, for installation instructions refer to your distribution. You need to install PostgreSQL on your system, for installation instructions refer to your distribution.
Create a new database and user, for example: Create a new database and user, for example:
``` ```
$ sudo -iu postgres $ sudo -iu postgres
$ psql $ psql
# CREATE USER hanabi WITH PASSWORD 'Insert password here'; # CREATE DATABASE "hanab-live";
# CREATE DATABASE "hanab-live" with owner "hanabi"; # \c hanab-live
# CREATE USER hanabi WITH PASSWORD '1234';
# GRANT ALL PRIVILEGES ON DATABASE "hanab-live" TO hanabi;
# GRANT USAGE ON SCHEMA public TO hanabi;
# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO hanabi;
# GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO hanabi;
``` ```
Put the connection parameters in a config file (for the format, see `example_config.yaml`). Put the connection parameters in a config file (for the format, see `example_config.yaml`).
This should be located at your system default for the application `hanabi-suite`, This should be located at your system default for the application `hanabi-suite`,
on POSIX systems this should be `~/.config/hanabi-suite/config.yaml`. on POSIX systems this should be `~/.config/hanabi-suit/config.yaml`.
## Usage of stuff that already works: ## Usage of stuff that already works:

58
cheating_strategy Normal file
View file

@ -0,0 +1,58 @@
Just some preliminary notes on how to implement a cheating bot
Based on https://github.com/fpvandoorn/hanabi
card types:
trash, playable, useful (dispensable), critical
pace := #(cards left in deck) + #players - #(cards left to play)
modified_pace := pace - #(players without useful cards)
endgame := #(cards left to play) - #(cards left in deck) = #players - pace
-> endgame >= 0 iff pace <= #players
in_endgame := endgame >= 0
discard_badness(card) :=
1 if trash
8 - #players if card useful but duplicate visible # TODO: should probably account for rank of card as well, currently, lowest one is chosen
80 - 10*rank if card is not critical but currently unique # this ensures we prefer to discard higher ranked cards
600 - 100*rank if only criticals in hand # essentially not relevant, since we are currently only optimizing for full score
Algorithm:
if (have playable card):
if (in endgame) and not (in extraround):
stall in the following situations:
- we have exactly one useful card, it is a 5, and a copy of each useful card is visible
- we have exactly one useful card, it is a 4, the player with the matching 5 has another critical card to play
- we have exactly one useful card (todo: maybe use critical here?), the deck has size 1, someone else has 2 crits
- we have exactly one playable card, it is a 4, and a further useful card, but the playable is redistributable in the following sense:
the other playing only has this one useful card, and the player holding the matching 5 sits after the to-be-redistributed player
- sth else that seems messy and is currently not understood, ignored for now
TODO: maybe introduce some midgame stalls here, since we know the deck?
play a card, matching the first of the following criteria. if several cards match, recurse with this set of cards
- if in extraround, play crit
- if in second last round and we have 2 crits, play crit
- play card with lowest rank
- play a critical card
- play unique card, i.e. not visible
- lowest suit index (for determinancy)
if 8 hints:
give a hint
if 0 hints:
discard card with lowest badness
stall in the following situations:
- #(cards in deck) == 2 and (card of rank 3 or lower is missing) and we have the connecting card
- #clues >= 8 - #(useful cards in hand), there are useful cards in the deck and either:
- the next player has no useful cards at all
- we have two more crits than the next player and they have trash
- we are in endgame and the deck only contains one card
- it is possible that no-one discards in the following round and we are not waiting for a card whose rank is smaller than pace // TODO: this feels like a weird condition
discard if (discard badness) + #hints < 10
stall if someone has a better discard

View file

@ -1,5 +1,4 @@
import argparse import argparse
import json
from typing import Optional from typing import Optional
import verboselogs import verboselogs
@ -9,11 +8,8 @@ from hanabi.live import variants
from hanabi.live import check_game from hanabi.live import check_game
from hanabi.live import download_data from hanabi.live import download_data
from hanabi.live import compress from hanabi.live import compress
from hanabi.live import instance_finder
from hanabi.hanab_game import GameState
from hanabi.database import init_database from hanabi.database import init_database
from hanabi.database import global_db_connection_manager from hanabi.database import global_db_connection_manager
from hanabi.database.games_db_interface import load_instance
""" """
Commands supported: Commands supported:
@ -25,16 +21,6 @@ analyze single game
""" """
def subcommand_show(seed: str):
inst = load_instance(seed)
if inst is None:
logger.info("This seed does not exist in the database")
return
game = GameState(inst)
game.terminate()
l = compress.link(game)
logger.info("Deck: {}\nReplay Link: {}".format(inst.deck, l))
def subcommand_analyze(game_id: int, download: bool = False): def subcommand_analyze(game_id: int, download: bool = False):
if download: if download:
@ -51,13 +37,6 @@ def subcommand_analyze(game_id: int, download: bool = False):
) )
def subcommand_decompress(game_link: str):
parts = game_link.split('replay-json/')
game_str = parts[-1].rstrip('/')
game = compress.decompress_game_state(game_str)
print(json.dumps(game.to_json()))
def subcommand_init(force: bool, populate: bool): def subcommand_init(force: bool, populate: bool):
tables = init_database.get_existing_tables() tables = init_database.get_existing_tables()
if len(tables) > 0 and not force: if len(tables) > 0 and not force:
@ -98,20 +77,9 @@ def subcommand_download(
logger.info("Successfully exported games for all variants") logger.info("Successfully exported games for all variants")
def subcommand_solve(var_id: int, seed_class: int, num_players: Optional[int], timeout: int, num_threads: int):
instance_finder.solve_unknown_seeds(var_id, seed_class, num_players, timeout, num_threads)
def subcommand_gen_config(): def subcommand_gen_config():
global_db_connection_manager.create_config_file() global_db_connection_manager.create_config_file()
def add_show_seed_subparser(subparsers):
parser = subparsers.add_parser(
'show',
help='Show a seed and output a replay link for it'
)
parser.add_argument('seed', type=str)
def add_init_subparser(subparsers): def add_init_subparser(subparsers):
parser = subparsers.add_parser( parser = subparsers.add_parser(
@ -161,19 +129,6 @@ def add_config_gen_subparser(subparsers):
parser = subparsers.add_parser('gen-config', help='Generate config file at default location') parser = subparsers.add_parser('gen-config', help='Generate config file at default location')
def add_solve_subparser(subparsers):
parser = subparsers.add_parser('solve', help='Seed solving')
parser.add_argument('var_id', type=int, help='Variant id to solve instances from.', default=0)
parser.add_argument('--timeout', '-t', type=int, help='Timeout [s] for individual seeds.', default=150)
parser.add_argument('--class', '-c', type=int, dest='seed_class', help='Class of seed to analyze. 0 stands for hanab.live seeds', default=0)
parser.add_argument('--num_players', '-n', type=int, help='Restrict to number of players. If not specified, all player counts are analyzed.', default = None)
parser.add_argument('--num_threads', '-p', type=int, help='Number of threads to solve with.', default=4)
def add_decompress_subparser(subparsers):
parser = subparsers.add_parser('decompress', help='Decompress a hanab.live JSON-encoded replay link')
parser.add_argument('game_link', type=str)
def main_parser() -> argparse.ArgumentParser: def main_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='hanabi_suite', prog='hanabi_suite',
@ -186,9 +141,6 @@ def main_parser() -> argparse.ArgumentParser:
add_analyze_subparser(subparsers) add_analyze_subparser(subparsers)
add_download_subparser(subparsers) add_download_subparser(subparsers)
add_config_gen_subparser(subparsers) add_config_gen_subparser(subparsers)
add_solve_subparser(subparsers)
add_decompress_subparser(subparsers)
add_show_seed_subparser(subparsers)
return parser return parser
@ -199,10 +151,7 @@ def hanabi_cli():
'analyze': subcommand_analyze, 'analyze': subcommand_analyze,
'init': subcommand_init, 'init': subcommand_init,
'download': subcommand_download, 'download': subcommand_download,
'gen-config': subcommand_gen_config, 'gen-config': subcommand_gen_config
'solve': subcommand_solve,
'decompress': subcommand_decompress,
'show': subcommand_show
}[args.command] }[args.command]
if args.command != 'gen-config': if args.command != 'gen-config':

View file

@ -87,8 +87,8 @@ class DBConnectionManager:
logger.info("Initialised default config file {}".format(self.config_file)) logger.info("Initialised default config file {}".format(self.config_file))
def connect(self): def connect(self):
conn = psycopg2.connect("dbname='{}' user='{}' password='{}' host='localhost' sslmode='disable'".format( conn = psycopg2.connect("dbname='{}' user='{}' password='{}' host='localhost'".format(
self.db_name, self.db_user, self.db_pass), self.db_name, self.db_user, self.db_pass)
) )
cur = conn.cursor() cur = conn.cursor()
self.lazy_conn.set_conn(conn) self.lazy_conn.set_conn(conn)

View file

@ -0,0 +1,49 @@
DROP TABLE IF EXISTS seeds CASCADE;
CREATE TABLE seeds (
seed TEXT NOT NULL PRIMARY KEY,
num_players SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
deck VARCHAR(62) NOT NULL,
starting_player SMALLINT NOT NULL DEFAULT 0,
feasible BOOLEAN DEFAULT NULL,
max_score_theoretical SMALLINT
);
CREATE INDEX seeds_variant_idx ON seeds (variant_id);
DROP TABLE IF EXISTS games CASCADE;
CREATE TABLE games (
id INT PRIMARY KEY,
seed TEXT NOT NULL REFERENCES seeds,
num_players SMALLINT NOT NULL,
score SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
deck_plays BOOLEAN,
one_extra_card BOOLEAN,
one_less_card BOOLEAN,
all_or_nothing BOOLEAN,
detrimental_characters BOOLEAN,
num_turns SMALLINT,
actions TEXT
);
CREATE INDEX games_seed_score_idx ON games (seed, score);
CREATE INDEX games_var_seed_idx ON games (variant_id, seed);
CREATE INDEX games_player_idx ON games (num_players);
DROP TABLE IF EXISTS score_upper_bounds;
CREATE TABLE score_upper_bounds (
seed TEXT NOT NULL REFERENCES seeds ON DELETE CASCADE,
score_upper_bound SMALLINT NOT NULL,
reason SMALLINT NOT NULL,
UNIQUE (seed, reason)
);
DROP TABLE IF EXISTS score_lower_bounds;
CREATE TABLE score_lower_bounds (
seed TEXT NOT NULL REFERENCES seeds ON DELETE CASCADE,
score_lower_bound SMALLINT NOT NULL,
game_id INT REFERENCES games ON DELETE CASCADE,
actions TEXT,
CHECK (num_nonnulls(game_id, actions) = 1)
);

View file

@ -192,7 +192,7 @@ def _populate_variants(variants):
def _download_json_files(): def _download_json_files():
logger.verbose("Downloading JSON files for suits and variants from github...") logger.verbose("Downloading JSON files for suits and variants from github...")
base_url = "https://raw.githubusercontent.com/Hanabi-Live/hanabi-live/main/packages/game/src/json" base_url = "https://raw.githubusercontent.com/Hanabi-Live/hanabi-live/main/packages/data/src/json"
cache_dir = Path(platformdirs.user_cache_dir(constants.APP_NAME)) cache_dir = Path(platformdirs.user_cache_dir(constants.APP_NAME))
cache_dir.mkdir(parents=True, exist_ok=True) cache_dir.mkdir(parents=True, exist_ok=True)
data = {} data = {}
@ -204,7 +204,7 @@ def _download_json_files():
url = base_url + "/" + file.name url = base_url + "/" + file.name
response = requests.get(url) response = requests.get(url)
if not response.status_code == 200: if not response.status_code == 200:
err_msg = "Could not download initialization file {} from github (tried url {})".format(file.name, url) err_msg = "Could not download initialization file {} from github (tried url {})".format(filename, url)
logger.error(err_msg) logger.error(err_msg)
raise RuntimeError(err_msg) raise RuntimeError(err_msg)
file.write_text(response.text) file.write_text(response.text)

View file

@ -25,12 +25,6 @@ class DeckCard:
raise ParseError("No rank specified in deck_card") raise ParseError("No rank specified in deck_card")
return DeckCard(suit_index, rank) return DeckCard(suit_index, rank)
def to_json(self):
return {
"suitIndex": self.suitIndex,
"rank": self.rank
}
def colorize(self): def colorize(self):
color = ["green", "blue", "magenta", "yellow", "white", "cyan"][self.suitIndex] color = ["green", "blue", "magenta", "yellow", "white", "cyan"][self.suitIndex]
return colored(str(self), color) return colored(str(self), color)
@ -39,8 +33,6 @@ class DeckCard:
return self.suitIndex == other.suitIndex and self.rank == other.rank return self.suitIndex == other.suitIndex and self.rank == other.rank
def __repr__(self): def __repr__(self):
if self.suitIndex == 0 and self.rank == 0:
return "kt"
return constants.COLOR_INITIALS[self.suitIndex] + str(self.rank) return constants.COLOR_INITIALS[self.suitIndex] + str(self.rank)
def __hash__(self): def __hash__(self):
@ -92,13 +84,6 @@ class Action:
action_value action_value
) )
def to_json(self):
return {
"type": self.type.value,
"target": self.target,
"value": self.value
}
def __repr__(self): def __repr__(self):
match self.type: match self.type:
case ActionType.Play: case ActionType.Play:
@ -254,26 +239,6 @@ class GameState:
self.clues -= 1 self.clues -= 1
self._make_turn() self._make_turn()
def make_action(self, action):
match action.type:
case ActionType.ColorClue | ActionType.RankClue:
assert self.clues >= 1
self.actions.append(action)
self.clues -= 1
self._make_turn()
# TODO: could check that the clue specified is in fact legal
case ActionType.Play:
self.play(action.target)
case ActionType.Discard:
self.discard(action.target)
case ActionType.EndGame | ActionType.VoteTerminate:
self.actions.append(action)
self.over = True
def terminate(self):
action = Action(ActionType.EndGame, 0, 0)
self.make_action(action)
# Forward some properties of the underlying instance # Forward some properties of the underlying instance
@property @property
def num_players(self): def num_players(self):
@ -299,10 +264,6 @@ class GameState:
def deck_size(self): def deck_size(self):
return self.instance.deck_size return self.instance.deck_size
@property
def draw_pile_size(self):
return self.deck_size - self.progress
# Properties of GameState # Properties of GameState
def is_over(self): def is_over(self):
@ -326,23 +287,6 @@ class GameState:
# Utilities # Utilities
def is_playable(self, card: DeckCard):
return self.stacks[card.suitIndex] + 1 == card.rank
def is_trash(self, card: DeckCard):
return self.stacks[card.suitIndex] >= card.rank
def is_critical(self, card: DeckCard):
if card.rank == 5:
return True
if self.is_trash(card):
return False
count = 0
for hand in self.hands:
count += hand.count(card)
count += self.deck[self.progress:].count(card)
return count == 1
def holding_players(self, card): def holding_players(self, card):
for (player, hand) in enumerate(self.hands): for (player, hand) in enumerate(self.hands):
if card in hand: if card in hand:
@ -357,9 +301,9 @@ class GameState:
) )
) )
return { return {
"deck": [card.to_json() for card in self.instance.deck], "deck": self.instance.deck,
"players": self.instance.player_names, "players": self.instance.player_names,
"actions": [action.to_json() for action in self.actions], "actions": self.actions,
"first_player": 0, "first_player": 0,
"options": { "options": {
"variant": "No Variant", "variant": "No Variant",

View file

@ -8,8 +8,6 @@ from hanabi.live import hanab_live
from hanabi.live import compress from hanabi.live import compress
from hanabi.solvers import sat from hanabi.solvers import sat
from hanabi.database import games_db_interface
# returns minimal number T of turns (from game) after which instance was infeasible # returns minimal number T of turns (from game) after which instance was infeasible
# and a replay achieving maximum score while following the replay for the first (T-1) turns: # and a replay achieving maximum score while following the replay for the first (T-1) turns:
@ -21,15 +19,24 @@ from hanabi.database import games_db_interface
def check_game(game_id: int) -> Tuple[int, hanab_game.GameState]: def check_game(game_id: int) -> Tuple[int, hanab_game.GameState]:
logger.debug("Analysing game {}".format(game_id)) logger.debug("Analysing game {}".format(game_id))
with database.conn.cursor() as cur: with database.conn.cursor() as cur:
cur.execute("SELECT games.num_players, score, games.variant_id, starting_player FROM games " cur.execute("SELECT games.num_players, deck, actions, score, games.variant_id, starting_player FROM games "
"INNER JOIN seeds ON seeds.seed = games.seed "
"WHERE games.id = (%s)", "WHERE games.id = (%s)",
(game_id,) (game_id,)
) )
res = cur.fetchone() res = cur.fetchone()
if res is None: if res is None:
raise ValueError("No game associated with id {} in database.".format(game_id)) raise ValueError("No game associated with id {} in database.".format(game_id))
(num_players, score, variant_id, starting_player) = res (num_players, compressed_deck, compressed_actions, score, variant_id, starting_player) = res
instance, actions = games_db_interface.load_game_parts(game_id) deck = compress.decompress_deck(compressed_deck)
actions = compress.decompress_actions(compressed_actions)
instance = hanab_live.HanabLiveInstance(
deck,
num_players,
variant_id=variant_id,
starting_player=starting_player
)
# check if the instance is already won # check if the instance is already won
if instance.max_score == score: if instance.max_score == score:

View file

@ -177,7 +177,7 @@ def compress_game_state(state: Union[hanab_game.GameState, hanab_live.HanabLiveG
return with_dashes return with_dashes
def decompress_game_state(game_str: str) -> hanab_live.HanabLiveGameState: def decompress_game_state(game_str: str) -> hanab_game.GameState:
game_str = game_str.replace("-", "") game_str = game_str.replace("-", "")
parts = game_str.split(",") parts = game_str.split(",")
if not len(parts) == 3: if not len(parts) == 3:
@ -211,8 +211,8 @@ def decompress_game_state(game_str: str) -> hanab_live.HanabLiveGameState:
except ValueError: except ValueError:
raise ValueError("Expected variant id, found: {}".format(variant_id)) raise ValueError("Expected variant id, found: {}".format(variant_id))
instance = hanab_live.HanabLiveInstance(deck, num_players, variant_id) instance = hanab_game.HanabiInstance(deck, num_players)
game = hanab_live.HanabLiveGameState(instance) game = hanab_game.GameState(instance)
# TODO: game is not in consistent state # TODO: game is not in consistent state
game.actions = actions game.actions = actions
@ -221,4 +221,4 @@ def decompress_game_state(game_str: str) -> hanab_live.HanabLiveGameState:
def link(game_state: hanab_game.GameState) -> str: def link(game_state: hanab_game.GameState) -> str:
compressed = compress_game_state(game_state) compressed = compress_game_state(game_state)
return "https://hanab.live/replay-json/{}".format(compressed) return "https://hanab.live/shared-replay-json/{}".format(compressed)

View file

@ -1,21 +1,18 @@
import alive_progress import alive_progress
from typing import Dict, Optional, List from typing import Dict, Optional
import psycopg2.errors import psycopg2.errors
import psycopg2.extras
import platformdirs import platformdirs
import unidecode
from hanabi import hanab_game from hanabi import hanab_game
from hanabi import constants from hanabi import constants
from hanabi import logger from hanabi import logger
from hanabi import database from hanabi import database
from hanabi.live import site_api from hanabi.live import site_api
from hanabi.live import compress
from hanabi.live import variants from hanabi.live import variants
from hanabi.live import hanab_live from hanabi.live import hanab_live
from hanabi.database import games_db_interface
class GameExportError(ValueError): class GameExportError(ValueError):
@ -52,29 +49,6 @@ class GameExportInvalidNumberOfPlayersError(GameExportInvalidFormatError):
) )
def ensure_users_in_db_and_get_ids(usernames: List[str]):
normalized_usernames = [unidecode.unidecode(username) for username in usernames]
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO users (username, normalized_username)"
"VALUES %s "
"ON CONFLICT (username) DO NOTHING ",
zip(usernames, normalized_usernames)
)
# To only do one DB query, we sort by the normalized username.
ids = []
for username in usernames:
database.cur.execute(
"SELECT id FROM users "
"WHERE username = %s",
(username,)
)
(id, ) = database.cur.fetchone()
ids.append(id)
return ids
# #
def detailed_export_game( def detailed_export_game(
game_id: int game_id: int
@ -120,19 +94,12 @@ def detailed_export_game(
options = game_json.get('options', {}) options = game_json.get('options', {})
var_id = var_id or variants.variant_id(options.get('variant', 'No Variant')) var_id = var_id or variants.variant_id(options.get('variant', 'No Variant'))
timed = options.get('timed', False)
time_base = options.get('timeBase', 0)
time_per_turn = options.get('timePerTurn', 0)
speedrun = options.get('speedrun', False)
card_cycle = options.get('cardCycle', False)
deck_plays = options.get('deckPlays', False) deck_plays = options.get('deckPlays', False)
empty_clues = options.get('emptyClues', False)
one_extra_card = options.get('oneExtraCard', False) one_extra_card = options.get('oneExtraCard', False)
one_less_card = options.get('oneLessCard', False) one_less_card = options.get('oneLessCard', False)
all_or_nothing = options.get('allOrNothing', False) all_or_nothing = options.get('allOrNothing', False)
detrimental_characters = options.get('detrimentalCharacters', False)
starting_player = options.get('startingPlayer', 0) starting_player = options.get('startingPlayer', 0)
detrimental_characters = options.get('detrimentalCharacters', False)
try: try:
actions = [hanab_game.Action.from_json(action) for action in game_json.get('actions', [])] actions = [hanab_game.Action.from_json(action) for action in game_json.get('actions', [])]
@ -164,53 +131,44 @@ def detailed_export_game(
game.make_action(action) game.make_action(action)
score = game.score score = game.score
try:
compressed_deck = compress.compress_deck(deck)
except compress.InvalidFormatError as e:
logger.error("Failed to compress deck while exporting game {}: {}".format(game_id, deck))
raise GameExportInvalidFormatError(game_id, "Failed to compress deck") from e
try:
compressed_actions = compress.compress_actions(actions)
except compress.InvalidFormatError as e:
logger.error("Failed to compress actions while exporting game {}".format(game_id))
raise GameExportInvalidFormatError(game_id, "Failed to compress actions") from e
if not seed_exists: if not seed_exists:
database.cur.execute( database.cur.execute(
"INSERT INTO seeds (seed, num_players, starting_player, variant_id)" "INSERT INTO seeds (seed, num_players, starting_player, variant_id, deck)"
"VALUES (%s, %s, %s, %s)" "VALUES (%s, %s, %s, %s, %s)"
"ON CONFLICT (seed) DO NOTHING", "ON CONFLICT (seed) DO NOTHING",
(seed, num_players, starting_player, var_id) (seed, num_players, starting_player, var_id, compressed_deck)
) )
logger.debug("New seed {} imported.".format(seed)) logger.debug("New seed {} imported.".format(seed))
games_db_interface.store_deck_for_seed(seed, deck)
database.cur.execute( database.cur.execute(
"INSERT INTO games (" "INSERT INTO games ("
"id, num_players, starting_player, variant_id, timed, time_base, time_per_turn, speedrun, card_cycle, " "id, num_players, score, seed, variant_id, deck_plays, one_extra_card, one_less_card,"
"deck_plays, empty_clues, one_extra_card, one_less_card," "all_or_nothing, detrimental_characters, actions"
"all_or_nothing, detrimental_characters, seed, score"
")" ")"
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
"ON CONFLICT (id) DO UPDATE SET (" "ON CONFLICT (id) DO UPDATE SET ("
"timed, time_base, time_per_turn, speedrun, card_cycle, deck_plays, empty_clues, one_extra_card," "deck_plays, one_extra_card, one_less_card, all_or_nothing, actions, detrimental_characters"
"all_or_nothing, detrimental_characters"
") = (" ") = ("
"EXCLUDED.timed, EXCLUDED.time_base, EXCLUDED.time_per_turn, EXCLUDED.speedrun, EXCLUDED.card_cycle, " "EXCLUDED.deck_plays, EXCLUDED.one_extra_card, EXCLUDED.one_less_card, EXCLUDED.all_or_nothing,"
"EXCLUDED.deck_plays, EXCLUDED.empty_clues, EXCLUDED.one_extra_card," "EXCLUDED.actions, EXCLUDED.detrimental_characters"
"EXCLUDED.all_or_nothing, EXCLUDED.detrimental_characters"
")", ")",
( (
game_id, num_players, starting_player, var_id, timed, time_base, time_per_turn, speedrun, card_cycle, game_id, num_players, score, seed, var_id, deck_plays, one_extra_card, one_less_card,
deck_plays, empty_clues, one_extra_card, one_less_card, all_or_nothing, detrimental_characters, compressed_actions
all_or_nothing, detrimental_characters, seed, score
) )
) )
# Insert participants into database
ids = ensure_users_in_db_and_get_ids(players)
game_participant_values = []
for index, user_id in enumerate(ids):
game_participant_values.append((game_id, user_id, index))
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO game_participants (game_id, user_id, seat) VALUES %s "
"ON CONFLICT (game_id, user_id) DO UPDATE SET seat = excluded.seat",
game_participant_values
)
games_db_interface.store_actions(game_id, actions)
logger.debug("Imported game {}".format(game_id)) logger.debug("Imported game {}".format(game_id))
@ -234,8 +192,6 @@ def _process_game_row(game: Dict, var_id, export_all_games: bool = False):
return return
# raise GameExportInvalidNumberOfPlayersError(game_id, num_players, users) # raise GameExportInvalidNumberOfPlayersError(game_id, num_players, users)
# Ensure users in database and find out their ids
if export_all_games: if export_all_games:
detailed_export_game(game_id, score=score, var_id=var_id) detailed_export_game(game_id, score=score, var_id=var_id)
logger.debug("Imported game {}".format(game_id)) logger.debug("Imported game {}".format(game_id))
@ -250,26 +206,12 @@ def _process_game_row(game: Dict, var_id, export_all_games: bool = False):
"ON CONFLICT (id) DO NOTHING", "ON CONFLICT (id) DO NOTHING",
(game_id, seed, num_players, score, var_id) (game_id, seed, num_players, score, var_id)
) )
database.cur.execute("RELEASE seed_insert")
except psycopg2.errors.ForeignKeyViolation: except psycopg2.errors.ForeignKeyViolation:
# Sometimes, seed is not present in the database yet, then we will have to query the full game details # Sometimes, seed is not present in the database yet, then we will have to query the full game details
# (including the seed) to export it accordingly # (including the seed) to export it accordingly
database.cur.execute("ROLLBACK TO seed_insert") database.cur.execute("ROLLBACK TO seed_insert")
detailed_export_game(game_id, score=score, var_id=var_id) detailed_export_game(game_id, score=score, var_id=var_id)
database.cur.execute("RELEASE seed_insert")
# Insert participants into database
ids = ensure_users_in_db_and_get_ids(users)
game_participant_values = []
for index, user_id in enumerate(ids):
game_participant_values.append((game_id, user_id, index))
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO game_participants (game_id, user_id, seat) VALUES %s "
"ON CONFLICT (game_id, user_id) DO UPDATE SET seat = excluded.seat",
game_participant_values
)
logger.debug("Imported game {}".format(game_id)) logger.debug("Imported game {}".format(game_id))

81
hanabi/live/hanab_live.py Normal file
View file

@ -0,0 +1,81 @@
from typing import List
from hanabi import hanab_game
from hanabi import constants
from hanabi.live import variants
class HanabLiveInstance(hanab_game.HanabiInstance):
def __init__(
self,
deck: List[hanab_game.DeckCard],
num_players: int,
variant_id: int,
one_extra_card: bool = False,
one_less_card: bool = False,
*args, **kwargs
):
assert 2 <= num_players <= 6
hand_size = constants.HAND_SIZES[num_players]
if one_less_card:
hand_size -= 1
if one_extra_card:
hand_size += 1
super().__init__(deck, num_players, hand_size=hand_size, *args, **kwargs)
self.variant_id = variant_id
self.variant = variants.Variant.from_db(self.variant_id)
@staticmethod
def select_standard_variant_id(instance: hanab_game.HanabiInstance):
err_msg = "Hanabi instance not supported by hanab.live, cannot convert to HanabLiveInstance: "
assert 3 <= instance.num_suits <= 6, \
err_msg + "Illegal number of suits ({}) found, must be in range [3,6]".format(instance.num_suits)
assert 0 <= instance.num_dark_suits <= 2, \
err_msg + "Illegal number of dark suits ({}) found, must be in range [0,2]".format(instance.num_dark_suits)
assert 4 <= instance.num_suits - instance.num_dark_suits, \
err_msg + "Illegal ratio of dark suits to suits, can have at most {} dark suits with {} total suits".format(
max(instance.num_suits - 4, 0), instance.num_suits
)
return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[instance.num_suits][instance.num_dark_suits]
class HanabLiveGameState(hanab_game.GameState):
def __init__(self, instance: HanabLiveInstance):
super().__init__(instance)
self.instance: HanabLiveInstance = instance
def make_action(self, action):
match action.type:
case hanab_game.ActionType.ColorClue | hanab_game.ActionType.RankClue:
assert(self.clues > 0)
self.actions.append(action)
self.clues -= self.instance.clue_increment
self._make_turn()
# TODO: could check that the clue specified is in fact legal
case hanab_game.ActionType.Play:
self.play(action.target)
case hanab_game.ActionType.Discard:
self.discard(action.target)
case hanab_game.ActionType.EndGame | hanab_game.ActionType.VoteTerminate:
self.over = True
def _waste_clue(self) -> hanab_game.Action:
for player in range(self.turn + 1, self.turn + self.num_players):
for card in self.hands[player % self.num_players]:
for rank in self.instance.variant.ranks:
if self.instance.variant.rank_touches(card, rank):
return hanab_game.Action(
hanab_game.ActionType.RankClue,
player % self.num_players,
rank
)
for color in range(self.instance.variant.num_colors):
if self.instance.variant.color_touches(card, color):
return hanab_game.Action(
hanab_game.ActionType.ColorClue,
player % self.num_players,
color
)
raise RuntimeError("Current game state did not permit any legal clue."
"This case is incredibly rare and currently not handled.")

View file

@ -0,0 +1,185 @@
from typing import Optional
import pebble.concurrent
import concurrent.futures
import traceback
import alive_progress
import threading
import time
from hanabi import logger
from hanabi.solvers.sat import solve_sat
from hanabi import database
from hanabi.live import download_data
from hanabi.live import compress
from hanabi import hanab_game
from hanabi.solvers import greedy_solver
from hanabi.solvers import deck_analyzer
from hanabi.live import variants
MAX_PROCESSES = 6
def update_trivially_feasible_games(variant_id):
variant: variants.Variant = variants.Variant.from_db(variant_id)
database.cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,))
seeds = database.cur.fetchall()
logger.verbose('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds)))
with alive_progress.alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar:
for (seed,) in seeds:
database.cur.execute(
"SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing, detrimental_characters "
"FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;",
(variant.max_score, seed)
)
res = database.cur.fetchall()
logger.debug("Checking seed {}: {:3} results".format(seed, len(res)))
for (game_id, a, b, c, d, e) in res:
if None in [a, b, c, d, e]:
logger.debug(' Game {} not found in database, exporting...'.format(game_id))
download_data.detailed_export_game(
game_id, var_id=variant_id, score=variant.max_score, seed_exists=True
)
database.cur.execute("SELECT deck_plays, one_extra_card, one_less_card, all_or_nothing, "
"detrimental_characters "
"FROM games WHERE id = (%s)",
(game_id,))
(a, b, c, d, e) = database.cur.fetchone()
else:
logger.debug(' Game {} already in database'.format(game_id))
valid = not any([a, b, c, d, e])
if valid:
print(a, b, c, d, e)
logger.verbose(
'Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id))
database.cur.execute("UPDATE seeds SET (feasible, max_score_theoretical) = (%s, %s) WHERE seed = "
"(%s)", (True, variant.max_score, seed))
database.cur.execute(
"INSERT INTO score_lower_bounds (seed, score_lower_bound, game_id) VALUES (%s, %s, %s)",
(seed, variant.max_score, game_id)
)
database.conn.commit()
break
else:
logger.verbose(' Cheaty game {} found'.format(game_id))
bar()
def get_decks_for_all_seeds():
cur = database.conn.database.cursor()
cur.execute("SELECT id "
"FROM games "
" INNER JOIN seeds "
" ON seeds.seed = games.seed"
" WHERE"
" seeds.deck is null"
" AND"
" games.id = ("
" SELECT id FROM games WHERE games.seed = seeds.seed LIMIT 1"
" )"
)
print("Exporting decks for all seeds")
res = cur.fetchall()
with alive_progress.alive_bar(len(res), title="Exporting decks") as bar:
for (game_id,) in res:
download_data.detailed_export_game(game_id)
bar()
mutex = threading.Lock()
def solve_instance(instance: hanab_game.HanabiInstance):
# first, sanity check on running out of pace
result = deck_analyzer.analyze(instance)
if result is not None:
assert type(result) == deck_analyzer.InfeasibilityReason
logger.debug("found infeasible deck")
return False, None, None
for num_remaining_cards in [0, 20]:
# logger.info("trying with {} remaining cards".format(num_remaining_cards))
game = hanab_game.GameState(instance)
strat = greedy_solver.GreedyStrategy(game)
# make a number of greedy moves
while not game.is_over() and not game.is_known_lost():
if num_remaining_cards != 0 and game.progress == game.deck_size - num_remaining_cards:
break # stop solution here
strat.make_move()
# check if we won already
if game.is_won():
# print("won with greedy strat")
return True, game, num_remaining_cards
# now, apply sat solver
if not game.is_over():
logger.debug("continuing greedy sol with SAT")
solvable, sol = solve_sat(game)
if solvable is None:
return True, sol, num_remaining_cards
logger.debug(
"No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(
num_remaining_cards, compress.link(game)))
# print("Aborting trying with greedy strat")
logger.debug("Starting full SAT solver")
game = hanab_game.GameState(instance)
a, b = solve_sat(game)
return a, b, instance.draw_pile_size
@pebble.concurrent.process(timeout=150)
def solve_seed_with_timeout(seed, num_players, deck_compressed, var_name: Optional[str] = None):
try:
logger.verbose("Starting to solve seed {}".format(seed))
deck = compress.decompress_deck(deck_compressed)
t0 = time.perf_counter()
solvable, solution, num_remaining_cards = solve_instance(hanab_game.HanabiInstance(deck, num_players))
t1 = time.perf_counter()
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), solvable))
mutex.acquire()
if solvable is not None:
database.cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed))
database.conn.commit()
mutex.release()
if solvable == True:
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
num_remaining_cards, seed, compress.link(solution))
)
elif solvable == False:
logger.debug("seed {} was not solvable".format(seed))
logger.debug('{}-player, seed {:10}, {}\n'.format(num_players, seed, var_name))
elif solvable is None:
logger.verbose("seed {} skipped".format(seed))
else:
raise Exception("Programming Error")
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
def solve_seed(seed, num_players, deck_compressed, var_name: Optional[str] = None):
f = solve_seed_with_timeout(seed, num_players, deck_compressed, var_name)
try:
return f.result()
except TimeoutError:
logger.verbose("Solving on seed {} timed out".format(seed))
return
def solve_unknown_seeds(variant_id, variant_name: Optional[str] = None):
database.cur.execute(
"SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL",
(variant_id,)
)
res = database.cur.fetchall()
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
fs = [executor.submit(solve_seed, r[0], r[1], r[2], variant_name) for r in res]
with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for f in concurrent.futures.as_completed(fs):
bar()

View file

@ -23,7 +23,7 @@ def get_all_variant_ids() -> List[int]:
return [var_id for (var_id,) in database.cur.fetchall()] return [var_id for (var_id,) in database.cur.fetchall()]
def variant_name(var_id) -> Optional[str]: def variant_name(var_id) -> Optional[int]:
database.cur.execute( database.cur.execute(
"SELECT name FROM variants WHERE id = %s", "SELECT name FROM variants WHERE id = %s",
(var_id,) (var_id,)

View file

@ -0,0 +1,195 @@
from enum import Enum
from typing import List
import alive_progress
from hanabi import database
from hanabi import logger
from hanabi import hanab_game
from hanabi.live import compress
class InfeasibilityType(Enum):
OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is
OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit
CritAtBottom = 3
class InfeasibilityReason:
def __init__(self, infeasibility_type: InfeasibilityType, score_upper_bound, value=None):
self.type = infeasibility_type
self.score_upper_bound = score_upper_bound
self.value = value
def __repr__(self):
match self.type:
case InfeasibilityType.OutOfPace:
return "Upper bound {}, since deck runs out of pace after drawing card {}".format(self.score_upper_bound, self.value)
case InfeasibilityType.OutOfHandSize:
return "Upper bound {}, since deck runs out of hand size after drawing card {}".format(self.score_upper_bound, self.value)
case InfeasibilityType.CritAtBottom:
return "Upper bound {}, sicne deck has critical non-5 at bottom".format(self.score_upper_bound)
def analyze(instance: hanab_game.HanabiInstance, only_find_first=False) -> List[InfeasibilityReason]:
"""
Checks instance for the following (easy) certificates for unfeasibility
- There is a critical non-5 at the bottom
- We necessarily run out of pace when playing this deck:
At some point, among all drawn cards, there are too few playable ones so that the next discard
reduces pace to a negative amount
- We run out of hand size when playing this deck:
At some point, there are too many critical cards (that also could not have been played) for the players
to hold collectively
:param instance: Instance to be analyzed
:param only_find_first: If true, we immediately return when finding the first infeasibility reason and don't
check for further ones. Might be slightly faster on some instances, especially dark ones.
:return: List with all reasons found. Empty if none is found.
In particular, if return value is not the empty list, the analyzed instance is unfeasible
"""
reasons = []
# check for critical non-fives at bottom of the deck
bottom_card = instance.deck[-1]
if bottom_card.rank != 5 and bottom_card.suitIndex in instance.dark_suits:
reasons.append(InfeasibilityReason(
InfeasibilityType.CritAtBottom,
instance.max_score - 5 + bottom_card.rank,
instance.deck_size - 1
))
if only_find_first:
return reasons
# we will sweep through the deck and pretend that
# - we keep all non-trash cards in our hands
# - we instantly play all playable cards as soon as we have them
# - we recurse on this instant-play
#
# For example, we assume that once we draw r2, we check if we can play r2.
# If yes, then we also check if we drew r3 earlier and so on.
# If not, then we keep r2 in our hands
#
# In total, this is equivalent to assuming that we infinitely many clues
# and infinite storage space in our hands (which is of course not true),
# but even in this setting, some games are infeasible due to pace issues
# that we can detect
#
# A small refinement is to pretend that we only have infinite storage for non-crit cards,
# for crit-cards, the usual hand card limit applies.
# This allows us to detect some seeds where there are simply too many unplayable cards to hold at some point
# that also can't be discarded
# this allows us to detect standard pace issue arguments
stacks = [0] * instance.num_suits
# we will ensure that stored_crits is a subset of stored_cards
stored_cards = set()
stored_crits = set()
min_forced_pace = instance.initial_pace
worst_pace_index = 0
max_forced_crit_discard = 0
worst_crit_index = 0
for (i, card) in enumerate(instance.deck):
if card.rank == stacks[card.suitIndex] + 1:
# card is playable
stacks[card.suitIndex] += 1
# check for further playables that we stored
for check_rank in range(card.rank + 1, 6):
check_card = hanab_game.DeckCard(card.suitIndex, check_rank)
if check_card in stored_cards:
stacks[card.suitIndex] += 1
stored_cards.remove(check_card)
if check_card in stored_crits:
stored_crits.remove(check_card)
else:
break
elif card.rank <= stacks[card.suitIndex]:
pass # card is trash
elif card.rank > stacks[card.suitIndex] + 1:
# need to store card
if card in stored_cards or card.rank == 5:
stored_crits.add(card)
stored_cards.add(card)
# check for out of handsize (this number can be negative, in which case nothing applies)
# Note the +1 at the end, which is there because we have to discard next,
# so even if we currently have as many crits as we can hold, we have to discard one
num_forced_crit_discards = len(stored_crits) - instance.num_players * instance.hand_size + 1
if len(stored_crits) - instance.num_players * instance.hand_size > max_forced_crit_discard:
worst_crit_index = i
max_forced_crit_discard = num_forced_crit_discards
if only_find_first:
reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
# the last - 1 is there because we have to discard 'next', causing a further draw
max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1
needed_plays = instance.max_score - sum(stacks)
cur_pace = max_remaining_plays - needed_plays
if cur_pace < min(0, min_forced_pace):
min_forced_pace = cur_pace
worst_pace_index = i
if only_find_first:
reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
# check that we correctly walked through the deck
assert (len(stored_cards) == 0)
assert (len(stored_crits) == 0)
assert (sum(stacks) == instance.max_score)
if max_forced_crit_discard > 0:
reasons.append(
InfeasibilityReason(
InfeasibilityType.OutOfHandSize,
instance.max_score - max_forced_crit_discard,
worst_crit_index
)
)
if min_forced_pace < 0:
reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
def run_on_database(variant_id):
database.cur.execute(
"SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) ORDER BY (num_players, seed)",
(variant_id,)
)
res = database.cur.fetchall()
logger.verbose("Checking {} seeds of variant {} for infeasibility".format(len(res), variant_id))
with alive_progress.alive_bar(total=len(res), title='Check for infeasibility reasons in var {}'.format(variant_id)) as bar:
for (seed, num_players, deck_str) in res:
deck = compress.decompress_deck(deck_str)
reasons = analyze(hanab_game.HanabiInstance(deck, num_players))
for reason in reasons:
database.cur.execute(
"INSERT INTO score_upper_bounds (seed, score_upper_bound, reason) "
"VALUES (%s,%s,%s) "
"ON CONFLICT (seed, reason) DO UPDATE "
"SET score_upper_bound = EXCLUDED.score_upper_bound",
(seed, reason.score_upper_bound, reason.type.value)
)
database.cur.execute(
"UPDATE seeds SET feasible = (%s) WHERE seed = (%s)",
(False, seed)
)
bar()
database.conn.commit()

View file

@ -32,15 +32,6 @@ class Literals():
} }
} }
# progress[m] = i "after move m the next card drawn from the deck has index i"
self.next_draw = {
-1: Int(0)
, **{
m: Symbol('m{}progress'.format(m), INT)
for m in range(instance.max_winning_moves)
}
}
# strikes[m][i] == "after move m we have at least i strikes" # strikes[m][i] == "after move m we have at least i strikes"
self.strikes = { self.strikes = {
-1: {i: Bool(i == 0) for i in range(0, instance.num_strikes + 1)} # no strikes when we start -1: {i: Bool(i == 0) for i in range(0, instance.num_strikes + 1)} # no strikes when we start
@ -222,10 +213,6 @@ def solve_sat(starting_state: hanab_game.GameState | hanab_game.HanabiInstance,
Implies(And(Or(ls.discard_any[m], ls.dummyturn[m]), Not(ls.incr_clues[m])), Implies(And(Or(ls.discard_any[m], ls.dummyturn[m]), Not(ls.incr_clues[m])),
Equals(ls.clues[m], ls.clues[m - 1])), Equals(ls.clues[m], ls.clues[m - 1])),
# change of progress
Implies(ls.draw_any[m], Equals(ls.next_draw[m], ls.next_draw[m-1] + 1)),
Implies(Not(ls.draw_any[m]), Equals(ls.next_draw[m], ls.next_draw[m-1])),
# change of pace # change of pace
Implies(And(ls.discard_any[m], Or(ls.strike[m], Not(ls.play[m]))), Equals(ls.pace[m], ls.pace[m - 1] - 1)), Implies(And(ls.discard_any[m], Or(ls.strike[m], Not(ls.play[m]))), Equals(ls.pace[m], ls.pace[m - 1] - 1)),
Implies(Or(Not(ls.discard_any[m]), And(Not(ls.strike[m]), ls.play[m])), Equals(ls.pace[m], ls.pace[m - 1])), Implies(Or(Not(ls.discard_any[m]), And(Not(ls.strike[m]), ls.play[m])), Equals(ls.pace[m], ls.pace[m - 1])),

View file

@ -1,4 +1,4 @@
#! /usr//bin/env python3 #! /bin/python3
""" """
Short executable file to start the command-line-interface for the hanabi package. Short executable file to start the command-line-interface for the hanabi package.

View file

@ -1,32 +0,0 @@
[project]
name = "hanabi"
version = "1.1.5"
description = "Hanabi interface"
readme = "README.md"
license = { file = "LICENSE" }
keywords = [ "hanabi" ]
authors = [
{ name = "Maximilian Keßler", email = "git@maximilian-kessler.de" }
]
dependencies = [
"requests",
"requests_cache",
"pysmt",
"termcolor",
"more_itertools",
"psycopg2",
"alive_progress",
"argparse",
"verboselogs",
"pebble",
"platformdirs",
"PyYAML",
"cython==0.29.36"
]
[project.urls]
"Homepage" = "https://gitlab.com/kesslermaximilian/hanabi"
[build-system]
requires = ["setuptools>=43.0.0", "wheel"]
build-backend = "setuptools.build_meta"

View file

@ -10,5 +10,3 @@ verboselogs
pebble pebble
platformdirs platformdirs
PyYAML PyYAML
cython==0.29.36
unidecode

View file

@ -1,145 +0,0 @@
from typing import List, Tuple, Optional
import psycopg2.extras
import hanabi.hanab_game
import hanabi.live.hanab_live
from hanabi import logger
from hanabi.database import conn, cur
def get_actions_table_name(cert_game: bool):
return "certificate_game_actions" if cert_game else "game_actions"
def store_actions(game_id: int, actions: List[hanabi.hanab_game.Action], cert_game: bool = False):
vals = []
for turn, action in enumerate(actions):
vals.append((game_id, turn, action.type.value, action.target, action.value or 0))
psycopg2.extras.execute_values(
cur,
"INSERT INTO {} (game_id, turn, type, target, value) "
"VALUES %s "
"ON CONFLICT (game_id, turn) "
"DO NOTHING".format(get_actions_table_name(cert_game)),
vals
)
conn.commit()
def store_deck_for_seed(seed: str, deck: List[hanabi.hanab_game.DeckCard]):
vals = []
for index, card in enumerate(deck):
vals.append((seed, index, card.suitIndex, card.rank))
psycopg2.extras.execute_values(
cur,
"INSERT INTO decks (seed, deck_index, suit_index, rank) "
"VALUES %s "
"ON CONFLICT (seed, deck_index) DO UPDATE SET "
"(suit_index, rank) = (excluded.suit_index, excluded.rank)",
vals
)
conn.commit()
def load_actions(game_id: int, cert_game: bool = False) -> List[hanabi.hanab_game.Action]:
cur.execute("SELECT type, target, value FROM {} "
"WHERE game_id = %s "
"ORDER BY turn ASC".format(get_actions_table_name(cert_game)),
(game_id,))
actions = []
for action_type, target, value in cur.fetchall():
actions.append(
hanabi.hanab_game.Action(hanabi.hanab_game.ActionType(action_type), target, value)
)
if len(actions) == 0:
err_msg = "Failed to load actions for game id {} from DB: No actions stored.".format(game_id)
logger.error(err_msg)
raise ValueError(err_msg)
return actions
def load_deck(seed: str) -> List[hanabi.hanab_game.DeckCard]:
cur.execute("SELECT deck_index, suit_index, rank FROM decks "
"WHERE seed = %s "
"ORDER BY deck_index ASC",
(seed,)
)
deck = []
for index, (card_index, suit_index, rank) in enumerate(cur.fetchall()):
assert index == card_index
deck.append(
hanabi.hanab_game.DeckCard(suit_index, rank, card_index)
)
if len(deck) == 0:
err_msg = "Failed to load deck for seed {} from DB: No cards stored.".format(seed)
logger.error(err_msg)
raise ValueError(err_msg)
return deck
def load_instance(seed: str) -> Optional[hanabi.live.hanab_live.HanabLiveInstance]:
cur.execute(
"SELECT num_players, variant_id "
"FROM seeds WHERE seed = %s ",
(seed,)
)
res = cur.fetchone()
if res is None:
return None
(num_players, var_id) = res
deck = load_deck(seed)
return hanabi.live.hanab_live.HanabLiveInstance(deck, num_players, var_id)
def load_game_parts(game_id: int, cert_game: bool = False) -> Tuple[hanabi.live.hanab_live.HanabLiveInstance, List[hanabi.hanab_game.Action]]:
"""
Loads information on game from database
@param game_id: ID of game
@return: Instance (i.e. deck + settings) of game, list of actions, variant name
"""
cur.execute(
"SELECT "
"games.num_players, games.seed, games.one_extra_card, games.one_less_card, games.deck_plays, "
"games.all_or_nothing,"
"variants.clue_starved, variants.name, variants.id, variants.throw_it_in_a_hole "
"FROM games "
"INNER JOIN variants"
" ON games.variant_id = variants.id "
"WHERE games.id = %s",
(game_id,)
)
res = cur.fetchone()
if res is None:
err_msg = "Failed to retrieve game details of game {}.".format(game_id)
logger.error(err_msg)
raise ValueError(err_msg)
# Unpack results now
(num_players, seed, one_extra_card, one_less_card, deck_plays, all_or_nothing, clue_starved, variant_name, variant_id, throw_it_in_a_hole) = res
actions = load_actions(game_id, cert_game)
deck = load_deck(seed)
instance = hanabi.live.hanab_live.HanabLiveInstance(
deck=deck,
num_players=num_players,
variant_id=variant_id,
one_extra_card=one_extra_card,
one_less_card=one_less_card,
fives_give_clue=not throw_it_in_a_hole,
deck_plays=deck_plays,
all_or_nothing=all_or_nothing,
clue_starved=clue_starved
)
return instance, actions
def load_game(game_id: int, cert_game: bool = False) -> hanabi.live.hanab_live.HanabLiveGameState:
instance, actions = load_game_parts(game_id, cert_game)
game = hanabi.live.hanab_live.HanabLiveGameState(instance)
for action in actions:
game.make_action(action)
return game

View file

@ -1,181 +0,0 @@
DROP TABLE IF EXISTS users CASCADE;
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
normalized_username TEXT NOT NULL UNIQUE
);
DROP TABLE IF EXISTS seeds CASCADE;
CREATE TABLE seeds (
seed TEXT NOT NULL PRIMARY KEY,
num_players SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
starting_player SMALLINT NOT NULL DEFAULT 0,
/* Type of seed: 0 is from the website, all other integers are customly generated testsets, arbitrarily ordered into groups */
class SMALLINT NOT NULL DEFAULT 0,
/* For seeds on the website: Always 0. For custom seeds: Numbered within their class */
num INT NOT NULL DEFAULT 0,
feasible BOOLEAN DEFAULT NULL,
solved BOOLEAN GENERATED ALWAYS AS ( feasible IS NOT NULL ) STORED,
/*
If seed solved: Amount of time (in ms) to solve seed.
If seed not solved: Maximum amount of time spent on solving before timeout
*/
solve_time_ms INT NOT NULL DEFAULT 0,
max_score_theoretical SMALLINT
);
CREATE INDEX seeds_variant_custom_feasible_idx ON seeds (variant_id, custom, feasible);
DROP TABLE IF EXISTS decks CASCADE;
CREATE TABLE decks (
seed TEXT REFERENCES seeds (seed) ON DELETE CASCADE,
/* Order of card in deck*/
deck_index SMALLINT NOT NULL,
/* Suit */
suit_index SMALLINT NOT NULL,
/* Rank */
rank SMALLINT NOT NULL,
PRIMARY KEY (seed, deck_index)
);
DROP TABLE IF EXISTS games CASCADE;
CREATE TABLE games (
id INT PRIMARY KEY,
num_players SMALLINT NOT NULL,
starting_player SMALLINT NOT NULL DEFAULT 0,
variant_id SMALLINT NOT NULL,
timed BOOLEAN,
time_base INTEGER,
time_per_turn INTEGER,
speedrun BOOLEAN,
card_cycle BOOLEAN,
deck_plays BOOLEAN,
empty_clues BOOLEAN,
one_extra_card BOOLEAN,
one_less_card BOOLEAN,
all_or_nothing BOOLEAN,
detrimental_characters BOOLEAN,
seed TEXT NOT NULL REFERENCES seeds,
score SMALLINT NOT NULL,
num_turns SMALLINT
);
CREATE INDEX games_seed_score_idx ON games (seed, score);
CREATE INDEX games_var_seed_idx ON games (variant_id, seed);
CREATE INDEX games_player_idx ON games (num_players);
/* Example games finishing with max score, not necessarily played by humans. */
DROP TABLE IF EXISTS certificate_games;
CREATE TABLE certificate_games (
id SERIAL PRIMARY KEY,
seed TEXT NOT NULL REFERENCES seeds,
num_turns SMALLINT NOT NULL,
min_pace SMALLINT,
num_bdrs SMALLINT
);
CREATE INDEX certificate_games_seed_idx ON games (seed);
DROP TABLE IF EXISTS game_participants CASCADE;
CREATE TABLE game_participants (
id SERIAL PRIMARY KEY,
game_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
seat SMALLINT NOT NULL, /* Needed for the "GetNotes()" function */
FOREIGN KEY (game_id) REFERENCES games (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE,
CONSTRAINT game_participants_unique UNIQUE (game_id, user_id)
);
DROP FUNCTION IF EXISTS delete_game_of_deleted_participant;
CREATE FUNCTION delete_game_of_deleted_participant() RETURNS TRIGGER AS $_$
BEGIN
DELETE FROM games WHERE games.id = OLD.game_id;
RETURN OLD;
END $_$ LANGUAGE 'plpgsql';
CREATE TRIGGER delete_game_upon_participant_deletion
AFTER DELETE ON game_participants
FOR EACH ROW
EXECUTE PROCEDURE delete_game_of_deleted_participant();
DROP TABLE IF EXISTS game_participant_notes CASCADE;
CREATE TABLE game_participant_notes (
game_participant_id INTEGER NOT NULL,
card_order SMALLINT NOT NULL, /* "order" is a reserved word in PostgreSQL. */
note TEXT NOT NULL,
FOREIGN KEY (game_participant_id) REFERENCES game_participants (id) ON DELETE CASCADE,
PRIMARY KEY (game_participant_id, card_order)
);
DROP TABLE IF EXISTS game_actions CASCADE;
CREATE TABLE game_actions (
game_id INTEGER NOT NULL,
turn SMALLINT NOT NULL,
/**
* Corresponds to the "DatabaseGameActionType" enum.
*
* - 0 - play
* - 1 - discard
* - 2 - color clue
* - 3 - rank clue
* - 4 - game over
*/
type SMALLINT NOT NULL,
/**
* - If a play or a discard, corresponds to the order of the the card that was played/discarded.
* - If a clue, corresponds to the index of the player that received the clue.
* - If a game over, corresponds to the index of the player that caused the game to end or -1 if
* the game was terminated by the server.
*/
target SMALLINT NOT NULL,
/**
* - If a play or discard, then 0 (as NULL). It uses less database space and reduces code
* complexity to use a value of 0 for NULL than to use a SQL NULL:
* https://dev.mysql.com/doc/refman/8.0/en/data-size.html
* - If a color clue, then 0 if red, 1 if yellow, etc.
* - If a rank clue, then 1 if 1, 2 if 2, etc.
* - If a game over, then the value corresponds to the "endCondition" values in "constants.go".
*/
value SMALLINT NOT NULL,
FOREIGN KEY (game_id) REFERENCES games (id) ON DELETE CASCADE,
PRIMARY KEY (game_id, turn)
);
/* Functions the same as game_actions, just for certificate_games instead */
DROP TABLE IF EXISTS certificate_game_actions CASCADE;
CREATE TABLE certificate_game_actions (
game_id INTEGER NOT NULL,
turn SMALLINT NOT NULL,
type SMALLINT NOT NULL,
target SMALLINT NOT NULL,
value SMALLINT NOT NULL,
FOREIGN KEY (game_id) REFERENCES certificate_games (id) ON DELETE CASCADE,
PRIMARY KEY (game_id, turn)
);
DROP TABLE IF EXISTS infeasibility_reasons;
CREATE TABLE infeasibility_reasons (
seed TEXT NOT NULL REFERENCES seeds (seed) ON DELETE CASCADE,
reason SMALLINT NOT NULL,
/*
Some value whose meaning depends on the type of reason, for example index when pace loss occurs.
Can be null for some reason.
*/
value SMALLINT,
PRIMARY KEY (seed, reason)
);

View file

@ -1,73 +0,0 @@
import hanabi.live.compress
from hanabi.hanab_game import DeckCard
from hanabi import database
from hanabi.live.variants import Variant
from hanabi.database import games_db_interface
import random
from src.hanabi.solvers.sat import solve_sat
def get_deck(variant: Variant):
deck = []
for suit_index, suit in enumerate(variant.suits):
if suit.dark:
for rank in range(1, 6):
deck.append(DeckCard(suit_index, rank))
else:
deck.append(DeckCard(suit_index, 1))
if not suit.reversed:
deck.append(DeckCard(suit_index, 1))
deck.append(DeckCard(suit_index, 1))
for rank in range(2,5):
deck.append(DeckCard(suit_index, rank))
deck.append(DeckCard(suit_index, rank))
deck.append(DeckCard(suit_index, 5))
if suit.reversed:
deck.append(DeckCard(suit_index, 5))
deck.append(DeckCard(suit_index, 5))
return deck
def generate_deck(variant: Variant, num_players: int, seed: int, seed_class: int = 1):
deck = get_deck(variant)
seed = "p{}c{}s{}".format(num_players, seed_class, seed)
random.seed(seed)
random.shuffle(deck)
return seed, deck
def link():
seed = "p5v0sunblinkingly-kobe-prescriptively"
deck = database.games_db_interface.load_deck(seed)
database.cur.execute("SELECT id FROM certificate_games WHERE seed = %s", (seed,))
(game_id, ) = database.cur.fetchone()
actions = database.games_db_interface.load_actions(game_id, True)
inst = hanabi.hanab_game.HanabiInstance(deck, 5)
game = hanabi.hanab_game.GameState(inst)
for action in actions:
game.make_action(action)
print(hanabi.live.compress.link(game))
def generate_decks_for_variant(variant_id: int, num_players: int, num_seeds: int, seed_class: int = 1):
variant = Variant.from_db(variant_id)
for seed_num in range(num_seeds):
seed, deck = generate_deck(variant, num_players, seed_num, seed_class)
database.cur.execute(
"INSERT INTO seeds (seed, num_players, starting_player, variant_id, class, num) "
"VALUES (%s, %s, %s, %s, %s, %s)"
"ON CONFLICT (seed) DO NOTHING",
(seed, num_players, 0, variant_id, seed_class, seed_num)
)
games_db_interface.store_deck_for_seed(seed, deck)
def main():
database.global_db_connection_manager.read_config()
database.global_db_connection_manager.connect()
link()
# generate_decks_for_variant(0, 2, 100)
if __name__ == '__main__':
main()

View file

@ -1,146 +0,0 @@
from typing import List, Dict, Tuple
from hanabi.hanab_game import Action, ParseError
from hanabi import hanab_game
from hanabi import constants
from hanabi.live import variants
class HanabLiveInstance(hanab_game.HanabiInstance):
def __init__(
self,
deck: List[hanab_game.DeckCard],
num_players: int,
variant_id: int,
one_extra_card: bool = False,
one_less_card: bool = False,
*args, **kwargs
):
self.one_extra_card = one_extra_card
self.one_less_card = one_less_card
assert 2 <= num_players <= 6
hand_size = constants.HAND_SIZES[num_players]
if one_less_card:
hand_size -= 1
if one_extra_card:
hand_size += 1
super().__init__(deck, num_players, hand_size=hand_size, *args, **kwargs)
self.variant_id = variant_id
self.variant = variants.Variant.from_db(self.variant_id)
@staticmethod
def select_standard_variant_id(instance: hanab_game.HanabiInstance):
err_msg = "Hanabi instance not supported by hanab.live, cannot convert to HanabLiveInstance: "
assert 3 <= instance.num_suits <= 6, \
err_msg + "Illegal number of suits ({}) found, must be in range [3,6]".format(instance.num_suits)
assert 0 <= instance.num_dark_suits <= 2, \
err_msg + "Illegal number of dark suits ({}) found, must be in range [0,2]".format(instance.num_dark_suits)
assert 4 <= max(instance.num_suits, 4) - instance.num_dark_suits, \
err_msg + "Illegal ratio of dark suits to suits, can have at most {} dark suits with {} total suits".format(
max(instance.num_suits - 4, 0), instance.num_suits
)
return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[instance.num_suits][instance.num_dark_suits]
def parse_json_game(game_json: Dict, as_hanab_live_instance: bool = True) \
-> Tuple[HanabLiveInstance | hanab_game.HanabiInstance, List[Action]]:
game_id = game_json.get('id', None)
players = game_json.get('players', [])
num_players = len(players)
if num_players < 2 or num_players > 6:
raise ParseError(num_players)
options = game_json.get('options', {})
var_name = options.get('variant', 'No Variant')
deck_plays = options.get('deckPlays', False)
one_extra_card = options.get('oneExtraCard', False)
one_less_card = options.get('oneLessCard', False)
all_or_nothing = options.get('allOrNothing', False)
starting_player = options.get('startingPlayer', 0)
detrimental_characters = options.get('detrimentalCharacters', False)
try:
actions = [hanab_game.Action.from_json(action) for action in game_json.get('actions', [])]
except hanab_game.ParseError as e:
raise ParseError("Failed to parse actions") from e
try:
deck = [hanab_game.DeckCard.from_json(card) for card in game_json.get('deck', None)]
except hanab_game.ParseError as e:
raise ParseError("Failed to parse deck") from e
if detrimental_characters:
raise NotImplementedError(
"detrimental characters not supported, cannot determine score of game {}".format(game_id)
)
if as_hanab_live_instance:
var_id = variants.variant_id(var_name)
return HanabLiveInstance(
deck, num_players, var_id,
deck_plays=deck_plays,
one_less_card=one_less_card,
one_extra_card=one_extra_card,
all_or_nothing=all_or_nothing,
starting_player=starting_player
), actions
else:
hand_size = constants.HAND_SIZES[num_players]
if one_less_card:
hand_size -= 1
if one_extra_card:
hand_size += 1
clue_starved = 'Clue Starved' in var_name
return hanab_game.HanabiInstance(
deck, num_players, hand_size,
clue_starved=clue_starved,
deck_plays=deck_plays,
all_or_nothing=all_or_nothing,
starting_player=starting_player
), actions
class HanabLiveGameState(hanab_game.GameState):
def __init__(self, instance: HanabLiveInstance):
super().__init__(instance)
self.instance: HanabLiveInstance = instance
def to_json(self):
return {
"actions": [action.to_json() for action in self.actions],
"deck": [card.to_json() for card in self.deck],
"players": ["Alice", "Bob", "Cathy", "Donald", "Emily", "Frank"][:self.num_players],
"notes": [[]] * self.num_players,
"options": {
"variant": self.instance.variant_id,
"deckPlays": self.instance.deck_plays,
"oneExtraCard": self.instance.one_extra_card,
"oneLessCard": self.instance.one_less_card,
"allOrNothing": self.instance.all_or_nothing,
"startingPlayer": self.instance.starting_player
}
}
def _waste_clue(self) -> hanab_game.Action:
for player in range(self.turn + 1, self.turn + self.num_players):
for card in self.hands[player % self.num_players]:
for rank in self.instance.variant.ranks:
if self.instance.variant.rank_touches(card, rank):
return hanab_game.Action(
hanab_game.ActionType.RankClue,
player % self.num_players,
rank
)
for color in range(self.instance.variant.num_colors):
if self.instance.variant.color_touches(card, color):
return hanab_game.Action(
hanab_game.ActionType.ColorClue,
player % self.num_players,
color
)
raise RuntimeError("Current game state did not permit any legal clue."
"This case is incredibly rare and currently not handled.")

View file

@ -1,270 +0,0 @@
from dataclasses import dataclass
from types import NoneType
from typing import Optional, Tuple, List
import pebble.concurrent
import concurrent.futures
import traceback
import alive_progress
import threading
import time
import psycopg2.extras
import hanabi.hanab_game
from hanabi import logger
from hanabi.hanab_game import GameState
from hanabi.solvers.sat import solve_sat
from hanabi import database
from hanabi.live import download_data
from hanabi.live import compress
from hanabi import hanab_game
from hanabi.solvers import greedy_solver
from hanabi.solvers import deck_analyzer
from hanabi.live import variants
from hanabi.database.games_db_interface import store_actions
MAX_PROCESSES = 3
def update_trivially_feasible_games(variant_id):
variant: variants.Variant = variants.Variant.from_db(variant_id)
database.cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,))
seeds = database.cur.fetchall()
logger.verbose('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds)))
with alive_progress.alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar:
for (seed,) in seeds:
database.cur.execute(
"SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing, detrimental_characters "
"FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;",
(variant.max_score, seed)
)
res = database.cur.fetchall()
logger.debug("Checking seed {}: {:3} results".format(seed, len(res)))
for (game_id, a, b, c, d, e) in res:
if None in [a, b, c, d, e]:
logger.debug(' Game {} not found in database, exporting...'.format(game_id))
download_data.detailed_export_game(
game_id, var_id=variant_id, score=variant.max_score, seed_exists=True
)
database.cur.execute("SELECT deck_plays, one_extra_card, one_less_card, all_or_nothing, "
"detrimental_characters "
"FROM games WHERE id = (%s)",
(game_id,))
(a, b, c, d, e) = database.cur.fetchone()
else:
logger.debug(' Game {} already in database'.format(game_id))
valid = not any([a, b, c, d, e])
if valid:
print(a, b, c, d, e)
logger.verbose(
'Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id))
database.cur.execute("UPDATE seeds SET (feasible, max_score_theoretical) = (%s, %s) WHERE seed = "
"(%s)", (True, variant.max_score, seed))
database.cur.execute(
"INSERT INTO score_lower_bounds (seed, score_lower_bound, game_id) VALUES (%s, %s, %s)",
(seed, variant.max_score, game_id)
)
database.conn.commit()
break
else:
logger.verbose(' Cheaty game {} found'.format(game_id))
bar()
def get_decks_for_all_seeds():
cur = database.conn.database.cursor()
cur.execute("SELECT id "
"FROM games "
" INNER JOIN seeds "
" ON seeds.seed = games.seed"
" WHERE"
" seeds.deck is null"
" AND"
" games.id = ("
" SELECT id FROM games WHERE games.seed = seeds.seed LIMIT 1"
" )"
)
print("Exporting decks for all seeds")
res = cur.fetchall()
with alive_progress.alive_bar(len(res), title="Exporting decks") as bar:
for (game_id,) in res:
download_data.detailed_export_game(game_id)
bar()
@dataclass
class SolutionData:
infeasibility_reasons: Optional[List[deck_analyzer.InfeasibilityReason]]
seed: str = None
time_ms: int = 0
feasible: Optional[bool] = None
solution: Optional[GameState] = None
num_remaining_cards: Optional[int] = None
skipped: bool = False
def __init__(self):
self.infeasibility_reasons = []
def solve_instance(instance: hanab_game.HanabiInstance)-> SolutionData:
retval = SolutionData()
# first, sanity check on running out of pace
result = deck_analyzer.analyze(instance)
if len(result) != 0:
logger.verbose("found infeasible deck by preliminary analysis")
retval.feasible = False
retval.infeasibility_reasons = result
return retval
for num_remaining_cards in [0, 10, 20]:
# logger.info("trying with {} remaining cards".format(num_remaining_cards))
game = hanab_game.GameState(instance)
strat = greedy_solver.GreedyStrategy(game)
# make a number of greedy moves
while not game.is_over() and not game.is_known_lost():
if num_remaining_cards != 0 and game.progress == game.deck_size - num_remaining_cards:
break # stop solution here
strat.make_move()
# check if we won already
if game.is_won():
retval.feasible = True
retval.solution = game
retval.num_remaining_cards = num_remaining_cards
# print("won with greedy strat")
return retval
# now, apply sat solver
if not game.is_over():
logger.debug("continuing greedy sol with SAT")
solvable, solution = solve_sat(game)
if solvable:
retval.feasible = True
retval.solution = solution
retval.num_remaining_cards = num_remaining_cards
return retval
logger.debug(
"No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(
num_remaining_cards, compress.link(game)))
logger.debug("Starting full SAT solver")
game = hanab_game.GameState(instance)
retval.feasible, retval.solution = solve_sat(game)
retval.num_remaining_cards = instance.draw_pile_size
if not retval.feasible:
assert len(retval.infeasibility_reasons) == 0
retval.infeasibility_reasons.append(deck_analyzer.InfeasibilityReason(deck_analyzer.InfeasibilityType.SAT))
return retval
def solve_seed(seed, num_players, deck, timeout: Optional[int] = 150) -> SolutionData:
try:
@pebble.concurrent.process(timeout=timeout)
def solve_seed_with_timeout(seed, num_players, deck) -> SolutionData:
try:
logger.verbose("Starting to solve seed {}".format(seed))
t0 = time.perf_counter()
retval = solve_instance(hanab_game.HanabiInstance(deck, num_players))
t1 = time.perf_counter()
retval.seed = seed
retval.time_ms = round((t1 - t0) * 1000)
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), retval.feasible))
return retval
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
f = solve_seed_with_timeout(seed, num_players, deck)
try:
return f.result()
except TimeoutError:
retval = SolutionData()
retval.seed = seed
retval.feasible = None
retval.time_ms = 1000 * timeout
logger.verbose("Solving on seed {} timed out".format(seed))
return retval
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
def process_solve_result(result: SolutionData):
if result.feasible is not None:
database.cur.execute("UPDATE seeds SET (feasible, solve_time_ms) = (%s, %s) WHERE seed = (%s)",
(result.feasible, result.time_ms, result.seed))
if result.feasible:
assert result.solution is not None
database.cur.execute("INSERT INTO certificate_games (seed, num_turns) "
"VALUES (%s, %s) "
"RETURNING ID ", (result.seed, len(result.solution.actions)))
game_id = database.cur.fetchone()[0]
store_actions(game_id, result.solution.actions, True)
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
result.num_remaining_cards, result.seed, compress.link(result.solution))
)
else:
logger.debug("seed {} was not solvable".format(result.seed))
vals = [(result.seed, reason.type.value, reason.value) for reason in result.infeasibility_reasons]
psycopg2.extras.execute_values(
database.cur,
"INSERT INTO infeasibility_reasons (seed, reason, value) "
"VALUES %s "
"ON CONFLICT (seed, reason) DO NOTHING",
vals
)
database.conn.commit()
elif result.skipped:
logger.verbose("seed {} skipped".format(result.seed))
else:
database.cur.execute("UPDATE seeds SET solve_time_ms = %s WHERE seed = (%s)", (result.time_ms, result.seed))
database.conn.commit()
def solve_unknown_seeds(variant_id, seed_class: int = 0, num_players: Optional[int] = None, timeout: Optional[int] = 150, num_threads: int = 4):
variant_name = variants.variant_name(variant_id)
query = "SELECT seeds.seed, num_players, array_agg(suit_index order by deck_index asc), array_agg(rank order by deck_index asc) "\
"FROM seeds "\
"INNER JOIN decks ON seeds.seed = decks.seed "\
"WHERE variant_id = (%s) "\
"AND class = (%s) "\
"AND feasible IS NULL "\
"AND solve_time_ms < (%s)"
if num_players is not None:
query += "AND num_players = {} ".format(num_players)
query += "GROUP BY seeds.seed ORDER BY num"
database.cur.execute(query,
(variant_id, seed_class, 1000 * timeout)
)
res = database.cur.fetchall()
data = []
for (seed, num_players, suits, ranks) in res:
assert len(suits) == len(ranks)
deck = []
for (suit, rank) in zip(suits, ranks):
deck.append(hanabi.hanab_game.DeckCard(suit, rank))
data.append((seed, num_players, deck))
"""
with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for d in data:
solve_seed(d[0], d[1], d[2], timeout)
bar()
return
"""
with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
fs = [executor.submit(solve_seed, d[0], d[1], d[2], timeout) for d in data]
with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for f in concurrent.futures.as_completed(fs):
result = f.result()
process_solve_result(result)
bar()

View file

@ -1,350 +0,0 @@
import collections
from enum import Enum
from typing import List, Any, Optional, Tuple
from dataclasses import dataclass
import alive_progress
import hanabi.hanab_game
from hanabi import database
from hanabi import logger
from hanabi import hanab_game
from hanabi.hanab_game import DeckCard
from hanabi.live import compress
from hanabi.database import games_db_interface
class InfeasibilityType(Enum):
Pace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is
DoubleBottom2With5s = 1 # same, special case for 2p
TripleBottom1With5s = 2 # same, special case for 2p
MultiSuitBdr = 3
PaceAfterSqueeze = 4
HandSize = 10 # idx denotes index of last card drawn before being forced to discard a crit
HandSizeDirect = 11
HandSizeWithSqueeze = 12
HandSizeWithBdr = 13
HandSizeWithBdrSqueeze = 14
BottomTopDeck = 20
# further reasons, currently not scanned for
DoubleBottomTopDeck = 30
CritAtBottom = 40
# Default reason when we have nothing else
SAT = 50
class InfeasibilityReason:
def __init__(self, infeasibility_type: InfeasibilityType, value=None):
self.type = infeasibility_type
self.value = value
def __repr__(self):
return "{} ({})".format(self.type, self.value)
match self.type:
case InfeasibilityType.Pace:
return "Out of Pace after drawing card {}".format(self.value)
case InfeasibilityType.HandSize:
return "Out of hand size after drawing card {}".format(self.value)
case InfeasibilityType.CritAtBottom:
return "Critical non-5 at bottom"
def generate_all_choices(l: List[List[Any]]):
if len(l) == 0:
yield []
return
head, *tail = l
for option in head:
for back in generate_all_choices(tail):
yield [option] + back
def check_for_top_bottom_deck_loss(instance: hanab_game.HanabiInstance) -> bool:
hands = [instance.deck[p * instance.hand_size : (p+1) * instance.hand_size] for p in range(instance.num_players)]
# scan the deck in reverse order if any card is forced to be late
found = {}
# Note that only the last 4 cards are relevant for single-suit distribution loss
for i, card in enumerate(reversed(instance.deck[-4:])):
if card in found.keys():
found[card] += 1
else:
found[card] = 1
if found[card] >= 3 or (card.rank != 1 and found[card] >= 2):
max_rank_starting_extra_round = card.rank + (instance.deck_size - card.deck_index - 2)
# Next, need to figure out what positions of cards of the same suit are fixed
positions_by_rank = [[] for _ in range(6)]
for rank in range(max_rank_starting_extra_round, 6):
for player, hand in enumerate(hands):
card_test = DeckCard(card.suitIndex, rank)
for card_hand in hand:
if card_test == card_hand:
positions_by_rank[rank].append(player)
# clean up where we have free choice anyway
for rank, positions in enumerate(positions_by_rank):
if rank != 5 and len(positions) < 2:
positions.clear()
if len(positions) == 0:
positions.append(None)
# Now, iterate through all choices in starting hands (None stands for free choice of a card) and check them
assignment_found = False
for assignment in generate_all_choices(positions_by_rank):
cur_player = None
num_turns = 0
for rank in range(max_rank_starting_extra_round, 6):
if cur_player is None or assignment[rank] is None:
num_turns += 1
else:
# Note the -1 and +1 to output things in range [1,5] instead of [0,4]
num_turns += (assignment[rank] - cur_player - 1) % instance.num_players + 1
if assignment[rank] is not None:
cur_player = assignment[rank]
elif cur_player is not None:
cur_player = (cur_player + 1) % instance.num_players
if num_turns <= instance.num_players + 1:
assignment_found = True
# If no assignment worked out, the deck is infeasible because of this suit
if not assignment_found:
return True
# If we reach this point, we checked for every card near the bottom of the deck and found a possible endgame each
return False
def analyze(instance: hanab_game.HanabiInstance, only_find_first=False) -> List[InfeasibilityReason]:
"""
Checks instance for the following (easy) certificates for unfeasibility
- There is a critical non-5 at the bottom
- We necessarily run out of pace when playing this deck:
At some point, among all drawn cards, there are too few playable ones so that the next discard
reduces pace to a negative amount
- We run out of hand size when playing this deck:
At some point, there are too many critical cards (that also could not have been played) for the players
to hold collectively
:param instance: Instance to be analyzed
:param only_find_first: If true, we immediately return when finding the first infeasibility reason and don't
check for further ones. Might be slightly faster on some instances, especially dark ones.
:return: List with all reasons found. Empty if none is found.
In particular, if return value is not the empty list, the analyzed instance is unfeasible
"""
reasons = []
top_bottom_deck_loss = check_for_top_bottom_deck_loss(instance)
if top_bottom_deck_loss:
reasons.append(InfeasibilityReason(InfeasibilityType.BottomTopDeck))
if only_find_first:
return reasons
# check for critical non-fives at bottom of the deck
bottom_card = instance.deck[-1]
if bottom_card.rank != 5 and bottom_card.suitIndex in instance.dark_suits:
reasons.append(InfeasibilityReason(
InfeasibilityType.CritAtBottom,
instance.deck_size - 1
))
if only_find_first:
return reasons
# we will sweep through the deck and pretend that
# - we keep all non-trash cards in our hands
# - we instantly play all playable cards as soon as we have them
# - we recurse on this instant-play
#
# For example, we assume that once we draw r2, we check if we can play r2.
# If yes, then we also check if we drew r3 earlier and so on.
# If not, then we keep r2 in our hands
#
# In total, this is equivalent to assuming that we have infinitely many clues
# and infinite storage space in our hands (which is of course not true),
# but even in this setting, some games are infeasible due to pace issues
# that we can detect
#
# A small refinement is to pretend that we only have infinite storage for non-crit cards,
# for crit-cards, the usual hand card limit applies.
# This allows us to detect some seeds where there are simply too many unplayable cards to hold at some point
# that also can't be discarded
stacks = [0] * instance.num_suits
# we will ensure that stored_crits is a subset of stored_cards
stored_cards = set()
stored_crits = set()
pace_found = False
hand_size_found = False
squeeze = False
considered_bdr = False
artificial_crits = set()
# Investigate BDRs. This catches special cases of Pace losses in 2p, as well as mark some cards critical because
# their second copies cannot be used.
filtered_deck = [card for card in instance.deck if card.rank != 5]
if instance.num_players == 2:
if filtered_deck[-1] == filtered_deck[-2] and filtered_deck[-1].rank == 2:
reasons.append(InfeasibilityReason(InfeasibilityType.Pace, filtered_deck[-2].deck_index - 1))
if only_find_first:
return reasons
reasons.append(InfeasibilityReason(InfeasibilityType.DoubleBottom2With5s, filtered_deck[-2].deck_index - 1))
pace_found = True
if filtered_deck[-1] == filtered_deck[-2] and filtered_deck[-2] == filtered_deck[-3] and filtered_deck[-3].rank == 1:
reasons.append(InfeasibilityReason(InfeasibilityType.Pace, filtered_deck[-3].deck_index - 1))
if only_find_first:
return reasons
reasons.append(InfeasibilityReason(InfeasibilityType.DoubleBottom2With5s, filtered_deck[-2].deck_index - 1))
pace_found = True
# In 2-player, the second-last card cannot be played if it is a 2
if filtered_deck[-2].rank == 2:
artificial_crits.add(filtered_deck[-2])
# In 2-player, in case there is double bottom 3 of the same suit, the card immediately before cannot be played:
# After playing that one and drawing the first 3, exactly 3,4,5 of the bottom suit have to be played
if filtered_deck[-1] == filtered_deck[-2] and filtered_deck[-2].rank == 3:
artificial_crits.add(filtered_deck[-2])
# Last card in the deck can never be played
if instance.deck[-1].rank != 5:
artificial_crits.add(instance.deck[-1])
for (i, card) in enumerate(instance.deck):
if card.rank == stacks[card.suitIndex] + 1:
# card is playable
stacks[card.suitIndex] += 1
# check for further playables that we stored
for check_rank in range(card.rank + 1, 6):
check_card = hanab_game.DeckCard(card.suitIndex, check_rank)
if check_card in stored_cards:
stacks[card.suitIndex] += 1
stored_cards.remove(check_card)
if check_card in stored_crits:
stored_crits.remove(check_card)
else:
break
elif card.rank <= stacks[card.suitIndex]:
pass # card is trash
elif card.rank > stacks[card.suitIndex] + 1:
# need to store card
if card in stored_cards or card.rank == 5:
stored_crits.add(card)
elif card in artificial_crits:
stored_crits.add(card)
considered_bdr = True
stored_cards.add(card)
hand_size_left_for_crits = instance.num_players * instance.hand_size - len(stored_crits) - 1
# In case we can only keep the critical cards exactly, get rid of all others
if hand_size_left_for_crits == 0:
# Note the very important copy here (!)
stored_cards = stored_crits.copy()
squeeze = True
# Use a bool flag to only mark this reason once
if hand_size_left_for_crits < 0 and not hand_size_found:
reasons.append(InfeasibilityReason(
InfeasibilityType.HandSize,
i
))
if only_find_first:
return reasons
hand_size_found = True
# More detailed analysis of loss, categorization only
if squeeze:
if considered_bdr:
reasons.append(InfeasibilityReason(InfeasibilityType.HandSizeWithBdrSqueeze, i))
else:
reasons.append(InfeasibilityReason(InfeasibilityType.HandSizeWithSqueeze, i))
else:
if considered_bdr:
reasons.append(InfeasibilityReason(InfeasibilityType.HandSizeWithBdr, i))
else:
reasons.append(InfeasibilityReason(InfeasibilityType.HandSizeDirect, i))
# the last - 1 is there because we have to discard 'next', causing a further draw
max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1
needed_plays = instance.max_score - sum(stacks)
cur_pace = max_remaining_plays - needed_plays
if cur_pace < 0 and not pace_found and not hand_size_found:
reasons.append(InfeasibilityReason(
InfeasibilityType.Pace,
i
))
if only_find_first:
return reasons
# We checked single-suit pace losses beforehand (which can only occur in 2p)
if squeeze:
reasons.append(InfeasibilityReason(InfeasibilityType.PaceAfterSqueeze, i))
else:
reasons.append(InfeasibilityReason(
InfeasibilityType.MultiSuitBdr,
i
))
pace_found = True
return reasons
def run_on_database(variant_id):
database.cur.execute(
"SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) ORDER BY (num_players, seed)",
(variant_id,)
)
res = database.cur.fetchall()
logger.verbose("Checking {} seeds of variant {} for infeasibility".format(len(res), variant_id))
with alive_progress.alive_bar(total=len(res), title='Check for infeasibility reasons in var {}'.format(variant_id)) as bar:
for (seed, num_players, deck_str) in res:
deck = compress.decompress_deck(deck_str)
reasons = analyze(hanab_game.HanabiInstance(deck, num_players))
for reason in reasons:
database.cur.execute(
"INSERT INTO score_upper_bounds (seed, score_upper_bound, reason) "
"VALUES (%s,%s,%s) "
"ON CONFLICT (seed, reason) DO UPDATE "
"SET score_upper_bound = EXCLUDED.score_upper_bound",
(seed, reason.score_upper_bound, reason.type.value)
)
database.cur.execute(
"UPDATE seeds SET feasible = (%s) WHERE seed = (%s)",
(False, seed)
)
bar()
database.conn.commit()
def main():
seed = "p5v0sporcupines-underclass-phantasmagorical"
seed = 'p5c1s98804'
seed = 'p4c1s1116'
seed = 'p5c1s14459'
num_players = 5
database.global_db_connection_manager.read_config()
database.global_db_connection_manager.connect()
database.cur.execute("SELECT seed, num_players FROM seeds WHERE (feasible IS NULL OR feasible = false) AND class = 1 AND num_players = 5")
# for (seed, num_players) in database.cur.fetchall():
for _ in range(1):
deck = database.games_db_interface.load_deck(seed)
inst = hanabi.hanab_game.HanabiInstance(deck, num_players)
lost = check_for_top_bottom_deck_loss(inst)
if lost:
print(seed)
if __name__ == "__main__":
main()