Merge branch 'greedy-solver'

This commit is contained in:
Maximilian Keßler 2023-06-24 17:26:51 +02:00
commit 558a341aeb
Signed by: max
GPG key ID: BCC5A619923C0BA5
13 changed files with 352 additions and 120 deletions

2
.gitignore vendored
View file

@ -12,5 +12,5 @@ remaining_cards.txt
lost_seeds.txt
utils.py
infeasible_instances.txt
debug_log.txt
verbose_log.txt
debug_log.txt

54
cheating_strategy Normal file
View file

@ -0,0 +1,54 @@
card types:
trash, playable, useful (dispensable), critical
pace := #(cards left in deck) + #players - #(cards left to play)
modified_pace := pace - #(players without useful cards)
endgame := #(cards left to play) - #(cards left in deck) = #players - pace
-> endgame >= 0 iff pace <= #players
in_endgame := endgame >= 0
discard_badness(card) :=
1 if trash
8 - #players if card useful but duplicate visible # TODO: should probably account for rank of card as well, currently, lowest one is chosen
80 - 10*rank if card is not critical but currently unique # this ensures we prefer to discard higher ranked cards
600 - 100*rank if only criticals in hand # essentially not relevant, since we are currently only optimizing for full score
Algorithm:
if (have playable card):
if (in endgame) and not (in extraround):
stall in the following situations:
- we have exactly one useful card, it is a 5, and a copy of each useful card is visible
- we have exactly one useful card, it is a 4, the player with the matching 5 has another critical card to play
- we have exactly one useful card (todo: maybe use critical here?), the deck has size 1, someone else has 2 crits
- we have exactly one playable card, it is a 4, and a further useful card, but the playable is redistributable in the following sense:
the other playing only has this one useful card, and the player holding the matching 5 sits after the to-be-redistributed player
- sth else that seems messy and is currently not understood, ignored for now
TODO: maybe introduce some midgame stalls here, since we know the deck?
play a card, matching the first of the following criteria. if several cards match, recurse with this set of cards
- if in extraround, play crit
- if in second last round and we have 2 crits, play crit
- play card with lowest rank
- play a critical card
- play unique card, i.e. not visible
- lowest suit index (for determinancy)
if 8 hints:
give a hint
if 0 hints:
discard card with lowest badness
stall in the following situations:
- #(cards in deck) == 2 and (card of rank 3 or lower is missing) and we have the connecting card
- #clues >= 8 - #(useful cards in hand), there are useful cards in the deck and either:
- the next player has no useful cards at all
- we have two more crits than the next player and they have trash
- we are in endgame and the deck only contains one card
- it is possible that no-one discards in the following round and we are not waiting for a card whose rank is smaller than pace // TODO: this feels like a weird condition
discard if (discard badness) + #hints < 10
stall if someone has a better discard

View file

@ -0,0 +1 @@
from .database import cur, conn

View file

@ -15,6 +15,7 @@ CREATE TABLE games (
id INT PRIMARY KEY,
seed TEXT NOT NULL REFERENCES seeds,
num_players SMALLINT NOT NULL,
starting_player SMALLINT NOT NULL DEFAULT 0,
score SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
deck_plays BOOLEAN,

View file

@ -12,6 +12,7 @@ class InfeasibilityType(Enum):
OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is
OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit
NotTrivial = 2
CritAtBottom = 3
class InfeasibilityReason():
@ -26,6 +27,9 @@ class InfeasibilityReason():
return "Deck runs out of pace ({}) after drawing card {}".format(self.value, self.index)
case InfeasibilityType.OutOfHandSize:
return "Deck runs out of hand size after drawing card {}".format(self.index)
case InfeasibilityType.CritAtBottom:
return "Deck has crit non-5 at bottom (index {})".format(self.index)
def analyze_suit(occurrences):
# denotes the indexes of copies we can use wlog
@ -97,6 +101,9 @@ def analyze_card_usage(instance: HanabiInstance):
def analyze(instance: HanabiInstance, find_non_trivial=False) -> InfeasibilityReason | None:
if instance.deck[-1].rank != 5 and instance.deck[-1].suitIndex + instance.num_dark_suits >= instance.num_suits:
return InfeasibilityReason(InfeasibilityType.CritAtBottom, instance.deck_size - 1)
# we will sweep through the deck and pretend that we instantly play all cards
# as soon as we have them (and recurse this)
# this allows us to detect standard pace issue arguments

View file

@ -42,6 +42,7 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti
one_extra_card = options.get('oneExtraCard', False)
one_less_card = options.get('oneLessCard', False)
all_or_nothing = options.get('allOrNothing', False)
starting_player = options.get('startingPlayer', 0)
actions = [Action.from_json(action) for action in game_json.get('actions', [])]
deck = [DeckCard.from_json(card) for card in game_json.get('deck', None)]
@ -49,11 +50,18 @@ def detailed_export_game(game_id: int, score: Optional[int] = None, var_id: Opti
assert seed is not None, assert_msg
if score is None:
if deck_plays or one_less_card or one_extra_card or all_or_nothing:
# TODO: need to incorporate extra options here regarding hand size etc
raise RuntimeError('Not implemented.')
# need to play through the game once to find out its score
game = HanabLiveGameState(HanabLiveInstance(deck, num_players, var_id))
game = HanabLiveGameState(
HanabLiveInstance(
deck, num_players, var_id,
deck_plays=deck_plays,
one_less_card=one_less_card,
one_extra_card=one_extra_card,
all_or_nothing=all_or_nothing
),
starting_player
)
print(game.instance.hand_size, game.instance.num_players)
for action in actions:
game.make_action(action)
score = game.score

View file

@ -3,6 +3,7 @@ import collections
import sys
from enum import Enum
from log_setup import logger
from typing import Tuple, List, Optional
from time import sleep
from hanabi import DeckCard, Action, ActionType, GameState, HanabiInstance
@ -11,10 +12,12 @@ from database.database import conn
class CardType(Enum):
Dispensable = -1
Trash = 0
Playable = 1
Critical = 2
Dispensable = 3
DuplicateVisible = 3
UniqueVisible = 4
class CardState():
@ -31,8 +34,10 @@ class CardState():
return "Playable ({}) with weight {}".format(self.card, self.weight)
case CardType.Critical:
return "Critical ({})".format(self.card)
case CardType.Dispensable:
return "Dispensable ({}) with weight {}".format(self.card, self.weight)
case CardType.DuplicateVisible:
return "Useful (duplicate visible) ({}) with weight {}".format(self.card, self.weight)
case CardType.UniqueVisible:
return "Useful (unique visible) ({}) with weight {}".format(self.card, self.weight)
# TODO
@ -45,7 +50,92 @@ def card_type(game_state, card):
elif card.rank == 5 or card in game_state.trash:
return CardType.Critical
else:
return CardType.Dispensable
visible_cards = sum((game_state.hands[player] for player in range(game_state.num_players)), [])
if visible_cards.count(card) >= 2:
return CardType.DuplicateVisible
else:
return CardType.UniqueVisible
class WeightedCard:
def __init__(self, card, weight: Optional[int] = None):
self.card = card
self.weight = weight
def __repr__(self):
return "{} with weight {}".format(self.card, self.weight)
class HandState:
def __init__(self, player: int, game_state: GameState):
self.trash = []
self.playable = []
self.critical = []
self.dupes = []
self.uniques = []
for card in game_state.hands[player]:
match card_type(game_state, card):
case CardType.Trash:
self.trash.append(WeightedCard(card))
case CardType.Playable:
if card not in map(lambda c: c.card, self.playable):
self.playable.append(WeightedCard(card))
else:
self.trash.append(card)
case CardType.Critical:
self.critical.append(WeightedCard(card))
case CardType.UniqueVisible:
self.uniques.append(WeightedCard(card))
case CardType.DuplicateVisible:
copy = next((w for w in self.dupes if w.card == card), None)
if copy is not None:
self.dupes.remove(copy)
self.critical.append(copy)
self.trash.append(card)
else:
self.dupes.append(WeightedCard(card))
self.playable.sort(key=lambda c: c.card.rank)
self.dupes.sort(key=lambda c: c.card.rank)
self.uniques.sort(key=lambda c: c.card.rank)
if len(self.trash) > 0:
self.best_discard = self.trash[0]
self.discard_badness = 0
elif len(self.dupes) > 0:
self.best_discard = self.dupes[0]
self.discard_badness = 8 - game_state.num_players
elif len(self.uniques) > 0:
self.best_discard = self.uniques[-1]
self.discard_badness = 80 - 10 * self.best_discard.card.rank
elif len(self.playable) > 0:
self.best_discard = self.playable[-1]
self.discard_badness = 80 - 10 * self.best_discard.card.rank
else:
assert len(self.critical) > 0, "Programming error."
self.best_discard = self.critical[-1]
self.discard_badness = 600 - 100*self.best_discard.card.rank
def num_useful_cards(self):
return len(self.dupes) + len(self.uniques) + len(self.playable) + len(self.critical)
class CheatingStrategy:
def __init__(self, game_state: GameState):
self.game_state = game_state
def make_move(self):
hand_states = [HandState(player, self.game_state) for player in range(self.game_state.num_players)]
modified_pace = self.game_state.pace - sum(
1 for state in hand_states if len(state.trash) == self.game_state.hand_size
)
cur_hand = hand_states[self.game_state.turn]
print([state.__dict__ for state in hand_states])
print(self.game_state.pace)
exit(0)
class GreedyStrategy():
@ -157,7 +247,7 @@ class GreedyStrategy():
def run_deck(instance: HanabiInstance) -> GameState:
gs = GameState(instance)
strat = GreedyStrategy(gs)
strat = CheatingStrategy(gs)
while not gs.is_over():
strat.make_move()
return gs
@ -190,3 +280,9 @@ def run_samples(num_players, sample_size):
logger.info("Won {} ({}%) and lost {} ({}%) from sample of {} test games using greedy strategy.".format(
won, round(100 * won / sample_size, 2), lost, round(100 * lost / sample_size, 2), sample_size
))
if __name__ == "__main__":
for p in range(2, 6):
run_samples(p, int(sys.argv[1]))
print()

View file

@ -41,8 +41,8 @@ class HanabLiveInstance(hanabi.HanabiInstance):
class HanabLiveGameState(hanabi.GameState):
def __init__(self, instance: HanabLiveInstance):
super().__init__(instance)
def __init__(self, instance: HanabLiveInstance, starting_player: int = 0):
super().__init__(instance, starting_player)
self.instance: HanabLiveInstance = instance
def make_action(self, action):

View file

@ -103,6 +103,7 @@ class HanabiInstance:
self.fives_give_clue = fives_give_clue
self.deck_plays = deck_plays,
self.all_or_nothing = all_or_nothing
assert not self.all_or_nothing, "All or nothing not implemented"
# normalize deck indices
for (idx, card) in enumerate(self.deck):
@ -114,6 +115,8 @@ class HanabiInstance:
self.player_names = constants.PLAYER_NAMES[:self.num_players]
self.deck_size = len(self.deck)
self.initial_pace = self.deck_size - 5 * self.num_suits - self.num_players * (self.hand_size - 1)
# # maximum number of moves in any game that can achieve max score each suit gives 15 moves, as we can play
# and discard 5 cards each and give 5 clues. dark suits only give 5 moves, since no discards are added number
# of cards that remain in players hands after end of game. they cost 2 turns each, since we cannot discard
@ -143,7 +146,7 @@ class HanabiInstance:
class GameState:
def __init__(self, instance: HanabiInstance):
def __init__(self, instance: HanabiInstance, starting_player: int = 0):
# will not be modified
self.instance = instance
@ -154,9 +157,8 @@ class GameState:
self.stacks = [0 for i in range(0, self.instance.num_suits)]
self.strikes = 0
self.clues = 8
self.turn = 0
self.pace = self.instance.deck_size - 5 * self.instance.num_suits - self.instance.num_players * (
self.instance.hand_size - 1)
self.turn = starting_player
self.pace = self.instance.initial_pace
self.remaining_extra_turns = self.instance.num_players + 1
self.trash = []
@ -181,6 +183,7 @@ class GameState:
else:
self.strikes += 1
self.trash.append(self.instance.deck[card_idx])
self.pace -= 1
self.actions.append(Action(ActionType.Play, target=card_idx))
self._replace(card_idx, allow_not_present=self.instance.deck_plays and (card_idx == self.deck_size - 1))
self._make_turn()
@ -233,7 +236,7 @@ class GameState:
return self.over or self.is_known_lost()
def is_won(self):
return self.score == 5 * instance.num_suits
return self.score == self.instance.max_score
def is_known_lost(self):
return self.in_lost_state
@ -273,6 +276,27 @@ class GameState:
}
}
# Query helpers for implementing bots
def copy_holders(self, card: DeckCard, exclude_player: Optional[int]):
return [
player for player in range(self.num_players)
if player != exclude_player and card in self.hands[player]
]
@staticmethod
def in_strict_order(player_a, player_b, player_c):
"""
Check whether the three given players sit in order, where equality is not allowed
:param player_a:
:param player_b:
:param player_c:
:return:
"""
return player_a < player_b < player_c or player_b < player_c < player_a or player_c < player_a < player_b
def is_in_extra_round(self):
return self.remaining_extra_turns <= self.instance.num_players
# Private helpers
# increments turn counter and tracks extra round
@ -287,7 +311,7 @@ class GameState:
# replaces the specified card (has to be in current player's hand) with the next card of the deck (if nonempty)
def _replace(self, card_idx, allow_not_present: bool = False):
try:
idx_in_hand = next((i for (i, card) in enumerate(self.cur_hand) if card.deck_index == card_idx), None)
idx_in_hand = next((i for (i, card) in enumerate(self.cur_hand) if card.deck_index == card_idx))
except StopIteration:
if not allow_not_present:
raise

View file

@ -1,17 +1,24 @@
from typing import Optional
import pebble.concurrent
import concurrent.futures
import traceback
from sat import solve_sat
from database import conn
from download_data import export_game
from variants import VARIANTS, variant_name
from database.database import conn, cur
from download_data import detailed_export_game
from alive_progress import alive_bar
from compress import decompress_deck, link
import concurrent.futures
from hanabi import HanabiInstance
from threading import Lock
from time import perf_counter
from greedy_solver import GameState, GreedyStrategy
from log_setup.logger_setup import logger
from log_setup import logger
from deck_analyzer import analyze, InfeasibilityReason
from variants import Variant
MAX_PROCESSES = 6
MAX_PROCESSES=4
def update_seeds_db():
cur2 = conn.cursor()
@ -33,53 +40,47 @@ def update_seeds_db():
def get_decks_of_seeds():
cur = conn.cursor()
cur2 = conn.cursor()
cur.execute("SELECT seed FROM seeds WHERE deck is NULL")
for (seed,) in cur:
cur2.execute("SELECT id FROM games WHERE seed = (%s)", (seed,))
cur.execute("SELECT seed, variant_id FROM seeds WHERE deck is NULL")
for (seed, variant_id) in cur:
cur2.execute("SELECT id FROM games WHERE seed = (%s) LIMIT 1", (seed,))
(game_id,) = cur2.fetchone()
print("Exporting game {} for seed {}.".format(game_id, seed))
export_game(game_id)
logger.verbose("Exporting game {} for seed {}.".format(game_id, seed))
detailed_export_game(game_id, var_id=variant_id, seed_exists=True)
conn.commit()
def update_trivially_feasible_games():
cur = conn.cursor()
for var in VARIANTS:
cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (var['id'],))
def update_trivially_feasible_games(variant_id):
variant: Variant = Variant.from_db(variant_id)
cur.execute("SELECT seed FROM seeds WHERE variant_id = (%s) AND feasible is null", (variant_id,))
seeds = cur.fetchall()
print('Checking variant {} (id {}), found {} seeds to check...'.format(var['name'], var['id'], len(seeds)))
print('Checking variant {} (id {}), found {} seeds to check...'.format(variant.name, variant_id, len(seeds)))
with alive_bar(total=len(seeds), title='{} ({})'.format(var['name'], var['id'])) as bar:
with alive_bar(total=len(seeds), title='{} ({})'.format(variant.name, variant_id)) as bar:
for (seed,) in seeds:
cur.execute("SELECT id, deck_plays, one_extra_card, one_less_card, all_or_nothing "
"FROM games WHERE score = (%s) AND seed = (%s) ORDER BY id;",
(5 * len(var['suits']), seed)
(variant.max_score, seed)
)
res = cur.fetchall()
print("Checking seed {}: {:3} results".format(seed, len(res)))
logger.debug("Checking seed {}: {:3} results".format(seed, len(res)))
for (game_id, a, b, c, d) in res:
if None in [a, b, c, d]:
print(' Game {} not found in database, exporting...'.format(game_id))
succ, valid = export_game(game_id)
if not succ:
print('Error exporting game {}.'.format(game_id))
continue
logger.debug(' Game {} not found in database, exporting...'.format(game_id))
detailed_export_game(game_id, var_id=variant_id)
else:
logger.debug(' Game {} already in database'.format(game_id, valid))
valid = not any([a, b, c, d])
print(' Game {} already in database, valid: {}'.format(game_id, valid))
if valid:
print('Seed {:10} (variant {} / {}) found to be feasible via game {:6}'.format(seed, var['id'], var['name'], game_id))
logger.verbose('Seed {:10} (variant {}) found to be feasible via game {:6}'.format(seed, variant_id, game_id))
cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (True, seed))
conn.commit()
break
else:
print(' Cheaty game found')
logger.verbose(' Cheaty game found')
bar()
def get_decks_for_all_seeds():
cur = conn.cursor()
cur.execute("SELECT id "
@ -103,16 +104,17 @@ def get_decks_for_all_seeds():
mutex = Lock()
def solve_instance(num_players, deck):
def solve_instance(instance: HanabiInstance):
# first, sanity check on running out of pace
result = analyze(deck, num_players)
result = analyze(instance)
if result is not None:
assert type(result) == InfeasibilityReason
logger.info("found infeasible deck")
logger.debug("found infeasible deck")
return False, None, None
for num_remaining_cards in [0, 5, 10, 20, 30]:
for num_remaining_cards in [0, 20]:
# logger.info("trying with {} remaining cards".format(num_remaining_cards))
game = GameState(num_players, deck)
game = GameState(instance)
strat = GreedyStrategy(game)
# make a number of greedy moves
@ -128,65 +130,75 @@ def solve_instance(num_players, deck):
# now, apply sat solver
if not game.is_over():
logger.info("continuing greedy sol with SAT")
logger.debug("continuing greedy sol with SAT")
solvable, sol = solve_sat(game)
if solvable:
if solvable is None:
return True, sol, num_remaining_cards
logger.info("No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(num_remaining_cards, link(game.to_json())))
logger.debug(
"No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(
num_remaining_cards, link(game)))
# print("Aborting trying with greedy strat")
logger.info("Starting full SAT solver")
game = GameState(num_players, deck)
logger.debug("Starting full SAT solver")
game = GameState(instance)
a, b = solve_sat(game)
return a, b, 99
return a, b, instance.draw_pile_size
def solve_seed(seed, num_players, deck_compressed, var_id):
@pebble.concurrent.process(timeout=150)
def solve_seed_with_timeout(seed, num_players, deck_compressed, var_name: Optional[str] = None):
try:
logger.verbose("Starting to solve seed {}".format(seed))
deck = decompress_deck(deck_compressed)
t0 = perf_counter()
solvable, solution, num_remaining_cards = solve_instance(num_players, deck)
solvable, solution, num_remaining_cards = solve_instance(HanabiInstance(deck, num_players))
t1 = perf_counter()
logger.info("Solved instance {} in {} seconds".format(seed, round(t1-t0, 2)))
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), solvable))
mutex.acquire()
if solvable is not None:
lcur = conn.cursor()
lcur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed))
cur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed))
conn.commit()
mutex.release()
if solvable == True:
with open("remaining_cards.txt", "a") as f:
f.write("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(num_remaining_cards, seed ,link(solution.to_json())))
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
num_remaining_cards, seed, link(solution))
)
elif solvable == False:
logger.info("seed {} was not solvable".format(seed))
with open('infeasible_instances.txt', 'a') as f:
f.write('{}-player, seed {:10}, {}\n'.format(num_players, seed, variant_name(var_id)))
logger.debug("seed {} was not solvable".format(seed))
logger.debug('{}-player, seed {:10}, {}\n'.format(num_players, seed, var_name))
elif solvable is None:
logger.info("seed {} skipped".format(seed))
logger.verbose("seed {} skipped".format(seed))
else:
raise Exception("Programming Error")
mutex.release()
except Exception:
traceback.format_exc()
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
def solve_unknown_seeds():
cur = conn.cursor()
for var in VARIANTS:
cur.execute("SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL AND deck IS NOT NULL", (var['id'],))
def solve_seed(seed, num_players, deck_compressed, var_name: Optional[str] = None):
f = solve_seed_with_timeout(seed, num_players, deck_compressed, var_name)
try:
return f.result()
except TimeoutError:
logger.verbose("Solving on seed {} timed out".format(seed))
return
def solve_unknown_seeds(variant_id, variant_name: Optional[str] = None):
cur.execute("SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL", (variant_id,))
res = cur.fetchall()
# for r in res:
# solve_seed(r[0], r[1], r[2], var['id'])
# solve_seed(r[0], r[1], r[2], variant_name)
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
fs = [executor.submit(solve_seed, r[0], r[1], r[2], var['id']) for r in res]
with alive_bar(len(res), title='Seed solving on {}'.format(var['name'])) as bar:
fs = [executor.submit(solve_seed, r[0], r[1], r[2], variant_name) for r in res]
with alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for f in concurrent.futures.as_completed(fs):
bar()
break
solve_unknown_seeds()
update_trivially_feasible_games(0)
solve_unknown_seeds(0, "No Variant")

View file

@ -7,3 +7,4 @@ psycopg2
alive_progress
argparse
verboselogs
pebble

24
test.py
View file

@ -14,6 +14,28 @@ from database.database import conn, cur
from database.init_database import init_database_tables, populate_static_tables
def find_double_dark_games():
cur.execute("SELECT variants.id, variants.name, count(suits.id) from variants "
"inner join variant_suits on variants.id = variant_suits.variant_id "
"left join suits on suits.id = variant_suits.suit_id "
"where suits.dark = (%s) "
"group by variants.id "
"order by count(suits.id), variants.id",
(True,)
)
cur2 = conn.cursor()
r = []
for (var_id, var_name, num_dark_suits) in cur.fetchall():
if num_dark_suits == 2:
cur2.execute("select count(*) from games where variant_id = (%s)", (var_id,))
games = cur2.fetchone()[0]
cur2.execute("select count(*) from seeds where variant_id = (%s)", (var_id, ))
r.append((var_name, games, cur2.fetchone()[0]))
l = sorted(r, key=lambda e: -e[1])
for (name, games, seeds) in l:
print("{}: {} games on {} seeds".format(name, games, seeds))
def test_suits():
suit = Suit.from_db(55)
print(suit.__dict__)
@ -52,6 +74,8 @@ def export_all_seeds():
if __name__ == "__main__":
find_double_dark_games()
exit(0)
var_id = 964532
export_all_seeds()
exit(0)

View file

@ -218,6 +218,10 @@ class Variant:
return True
return suit.color_touches(self.colors[value])
@property
def max_score(self):
return self.num_suits * 5
@staticmethod
def from_db(var_id):
cur.execute(