refactor: proper classes for hanabi instances and games
This commit is contained in:
parent
c43b0fc475
commit
b0d0aaae1b
3 changed files with 338 additions and 9 deletions
33
constants.py
Normal file
33
constants.py
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
# constants.py
|
||||||
|
|
||||||
|
|
||||||
|
# some values shared by all (default) hanabi instances
|
||||||
|
HAND_SIZES = {2: 5, 3: 5, 4: 4, 5: 4, 6: 3}
|
||||||
|
NUM_STRIKES = 3
|
||||||
|
COLOR_INITIALS = 'rygbp'
|
||||||
|
PLAYER_NAMES = ["Alice", "Bob", "Cathy", "Donald", "Emily", "Frank"]
|
||||||
|
|
||||||
|
|
||||||
|
#### hanab.live stuff
|
||||||
|
|
||||||
|
# Id of no variant
|
||||||
|
NO_VARIANT_ID = 0
|
||||||
|
|
||||||
|
# a map (num_suits, num_dark_suits) -> variant id of a variant on hanab.live fitting that distribution
|
||||||
|
VARIANT_IDS_STANDARD_DISTRIBUTIONS = {
|
||||||
|
3: {
|
||||||
|
0: 18 # 3 Suits
|
||||||
|
},
|
||||||
|
4: {
|
||||||
|
0: 15 # 4 Suits
|
||||||
|
},
|
||||||
|
5: {
|
||||||
|
0: 0, # No Variant
|
||||||
|
1: 21 # Black (5 Suits)
|
||||||
|
},
|
||||||
|
6: {
|
||||||
|
0: 1, # 6 Suits
|
||||||
|
1: 2, # Black (6 Suits)
|
||||||
|
2: 60, # Black & Gray
|
||||||
|
}
|
||||||
|
}
|
280
hanabi.py
Normal file
280
hanabi.py
Normal file
|
@ -0,0 +1,280 @@
|
||||||
|
from typing import Optional, List
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
import constants
|
||||||
|
|
||||||
|
|
||||||
|
class DeckCard():
|
||||||
|
def __init__(self, suitIndex: int, rank: int, deck_index=None):
|
||||||
|
self.suitIndex: int = suitIndex
|
||||||
|
self.rank: int = rank
|
||||||
|
self.deck_index: Optional[int] = deck_index
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_json(deck_card):
|
||||||
|
return DeckCard(**deck_card)
|
||||||
|
|
||||||
|
def colorize(self):
|
||||||
|
color = ["green", "blue", "magenta", "yellow", "white", "cyan"][self.suitIndex]
|
||||||
|
return colored(str(self), color)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return self.suitIndex == other.suitIndex and self.rank == other.rank
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return constants.COLOR_INITIALS[self.suitIndex] + str(self.rank)
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
# should be injective enough, we never use cards with ranks differing by 1000
|
||||||
|
return 1000 * self.suitIndex + self.rank
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class ActionType(Enum):
|
||||||
|
Play = 0
|
||||||
|
Discard = 1
|
||||||
|
ColorClue = 2
|
||||||
|
RankClue = 3
|
||||||
|
EndGame = 4
|
||||||
|
VoteTerminate = 5 ## hack: online, this is encoded as a 10
|
||||||
|
|
||||||
|
|
||||||
|
class Action():
|
||||||
|
def __init__(self, type_: ActionType, target: int, value: Optional[int] = None):
|
||||||
|
self.type = type_
|
||||||
|
self.target = target
|
||||||
|
self.value = value
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_json(action):
|
||||||
|
return Action(
|
||||||
|
ActionType(action['type']),
|
||||||
|
int(action['target']),
|
||||||
|
action.get('value', None)
|
||||||
|
)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
match self.type:
|
||||||
|
case ActionType.Play:
|
||||||
|
return "Play card {}".format(self.target)
|
||||||
|
case ActionType.Discard:
|
||||||
|
return "Discard card {}".format(self.target)
|
||||||
|
case ActionType.ColorClue:
|
||||||
|
return "Clue color {} to player {}".format(self.value, self.target)
|
||||||
|
case ActionType.RankClue:
|
||||||
|
return "Clue rank {} to player {}".format(self.value, self.target)
|
||||||
|
case ActionType.EndGame:
|
||||||
|
return "Player {} ends the game (code {})".format(self.target, self.value)
|
||||||
|
case ActionType.VoteTerminate:
|
||||||
|
return "Players vote to terminate the game (code {})".format(self.value)
|
||||||
|
return "Undefined action"
|
||||||
|
|
||||||
|
|
||||||
|
class HanabiInstance():
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
deck: List[DeckCard], # assumes a default deck, every suit has to be distributed either [1,1,1,2,2,3,3,4,4,5] or [1,2,3,4,5]
|
||||||
|
num_players: int, # number of players that play this deck, in range [2,6]
|
||||||
|
hand_size: Optional[int] = None, # number of cards that each player holds
|
||||||
|
num_strikes: Optional[int] = None, # number of strikes that leads to game loss
|
||||||
|
variant_id: Optional[int] = None # optional: variant id of hanab.live, useful if instance gets exported to be viewed in browser
|
||||||
|
):
|
||||||
|
assert(2 <= num_players <= 6)
|
||||||
|
|
||||||
|
# defining properties
|
||||||
|
self.deck = deck
|
||||||
|
self.num_players = num_players
|
||||||
|
self.hand_size = hand_size or constants.HAND_SIZES[self.num_players]
|
||||||
|
self.num_strikes = num_strikes or constants.NUM_STRIKES
|
||||||
|
|
||||||
|
# normalize deck indices
|
||||||
|
for (idx, card) in enumerate(self.deck):
|
||||||
|
card.deck_index = idx
|
||||||
|
|
||||||
|
# deducable properties, to be calculated once
|
||||||
|
self.num_suits = max(map(lambda c: c.suitIndex, deck)) + 1
|
||||||
|
self.num_dark_suits = (len(deck) - 10 * self.num_suits) // (-5)
|
||||||
|
self.player_names = constants.PLAYER_NAMES[:self.num_players]
|
||||||
|
self.deck_size = len(self.deck)
|
||||||
|
|
||||||
|
## maximum number of moves in any game that can achieve max score
|
||||||
|
# each suit gives 15 moves, as we can play and discard 5 cards each and give 5 clues. dark suits only give 5 moves, since no discards are added
|
||||||
|
# number of cards that remain in players hands after end of game. they cost 2 turns each, since we cannot discard them and also have one clue less
|
||||||
|
# 8 clues at beginning, one further clue for each suit but one (the clue of the last 5 is never useful since it is gained in the extra-round)
|
||||||
|
# subtract a further move for a second 5-clue that can't be used in 5 or 6-player games, since the extraround starts too soon
|
||||||
|
self.max_winning_moves = 15 * self.num_suits - 10 * self.num_dark_suits \
|
||||||
|
- 2 * self.num_players * (self.hand_size - 1) \
|
||||||
|
+ 8 + (self.num_suits - 1) \
|
||||||
|
+ (-1 if self.num_players >= 5 else 0)
|
||||||
|
|
||||||
|
# TODO: set a meaningful default here for export?
|
||||||
|
self._variant_id: Optional[int] = variant_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def num_dealt_cards(self):
|
||||||
|
return self.num_players * self.hand_size
|
||||||
|
|
||||||
|
@property
|
||||||
|
def draw_pile_size(self):
|
||||||
|
return self.deck_size - self.num_dealt_cards
|
||||||
|
|
||||||
|
@property
|
||||||
|
def variant_id(self):
|
||||||
|
if self._variant_id is not None:
|
||||||
|
return self._variant_id
|
||||||
|
else:
|
||||||
|
# ensure no key error can happen
|
||||||
|
assert(self.is_standard())
|
||||||
|
return constants.VARIANT_IDS_STANDARD_DISTRIBUTIONS[self.num_suits][self.num_dark_suits]
|
||||||
|
|
||||||
|
# returns True if the instance has values matching hanabi-live rules
|
||||||
|
# (i.e. standard + extra variants with 5 / 6 suits)
|
||||||
|
def is_standard(self):
|
||||||
|
return all([
|
||||||
|
2 <= self.num_players <= 6,
|
||||||
|
self.hand_size == constants.HAND_SIZES[self.num_players],
|
||||||
|
self.num_strikes == constants.NUM_STRIKES,
|
||||||
|
3 <= self.num_suits <= 6,
|
||||||
|
0 <= self.num_dark_suits <= 2,
|
||||||
|
4 <= self.num_suits - self.num_dark_suits or self.num_suits == 3
|
||||||
|
# TODO: check that variant id matches deck distribution
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class GameState():
|
||||||
|
def __init__(self, instance: HanabiInstance):
|
||||||
|
# will not be modified
|
||||||
|
self.instance = instance
|
||||||
|
|
||||||
|
# dynamic game state
|
||||||
|
self.progress = self.instance.num_players * self.instance.hand_size # index of next card to be drawn
|
||||||
|
self.hands = [self.instance.deck[self.instance.hand_size * p : self.instance.hand_size * (p+1)] for p in range(0, self.instance.num_players)]
|
||||||
|
self.stacks = [0 for i in range(0, self.instance.num_suits)]
|
||||||
|
self.strikes = 0
|
||||||
|
self.clues = 8
|
||||||
|
self.turn = 0
|
||||||
|
self.pace = self.instance.deck_size - 5 * self.instance.num_suits - self.instance.num_players * (self.instance.hand_size - 1)
|
||||||
|
self.remaining_extra_turns = self.instance.num_players + 1
|
||||||
|
self.trash = []
|
||||||
|
|
||||||
|
# can be set to true if game is known to be in a lost state
|
||||||
|
self.in_lost_state = False
|
||||||
|
|
||||||
|
# will track replay as game progresses
|
||||||
|
self.actions = []
|
||||||
|
|
||||||
|
|
||||||
|
## Methods to control game state change
|
||||||
|
|
||||||
|
def make_action(self, Action):
|
||||||
|
match Action.ActionType:
|
||||||
|
case ActionType.clue:
|
||||||
|
self.clue()
|
||||||
|
case ActionType.Play:
|
||||||
|
self.play(action.target)
|
||||||
|
|
||||||
|
def play(self, card_idx):
|
||||||
|
card = self.instance.deck[card_idx]
|
||||||
|
if card.rank == self.stacks[card.suitIndex] + 1:
|
||||||
|
self.stacks[card.suitIndex] += 1
|
||||||
|
if card.rank == 5 and self.clues != 8:
|
||||||
|
self.clues += 1
|
||||||
|
else:
|
||||||
|
self.strikes += 1
|
||||||
|
assert (self.strikes < instance.num_strikes)
|
||||||
|
self.trash.append(instance.deck[card_idx])
|
||||||
|
self.actions.append(Action(ActionType.Play, target=card_idx))
|
||||||
|
self.__replace(card_idx)
|
||||||
|
self.__make_turn()
|
||||||
|
|
||||||
|
def discard(self, card_idx):
|
||||||
|
assert(self.clues < 8)
|
||||||
|
self.actions.append(Action(ActionType.Discard, target=card_idx))
|
||||||
|
self.clues += 1
|
||||||
|
self.pace -= 1
|
||||||
|
self.trash.append(instance.deck[card_idx])
|
||||||
|
self.__replace(card_idx)
|
||||||
|
self.__make_turn()
|
||||||
|
|
||||||
|
def clue(self):
|
||||||
|
assert(self.clues > 0)
|
||||||
|
self.actions.append(
|
||||||
|
Action(
|
||||||
|
ActionType.RankClue,
|
||||||
|
target=(self.turn +1) % self.instance.num_players, # clue next plyaer
|
||||||
|
value=self.hands[(self.turn +1) % self.instance.num_players][0].rank # clue index 0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.clues -= 1
|
||||||
|
self.__make_turn()
|
||||||
|
|
||||||
|
|
||||||
|
# Properties of GameState
|
||||||
|
|
||||||
|
def is_over(self):
|
||||||
|
return all(s == 5 for s in self.stacks) or (self.remaining_extra_turns == 0) or (self.is_known_lost())
|
||||||
|
|
||||||
|
def is_won(self):
|
||||||
|
return self.score == 5 * instance.num_suits
|
||||||
|
|
||||||
|
def is_known_lost(self):
|
||||||
|
return self.in_lost_state
|
||||||
|
|
||||||
|
@property
|
||||||
|
def score(self):
|
||||||
|
return sum(self.stacks)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cur_hand(self):
|
||||||
|
return self.hands[self.turn]
|
||||||
|
|
||||||
|
|
||||||
|
# Utilities
|
||||||
|
|
||||||
|
def holding_players(self, card):
|
||||||
|
for (player, hand) in enumerate(self.hands):
|
||||||
|
if card in hand:
|
||||||
|
yield player
|
||||||
|
|
||||||
|
|
||||||
|
def to_json(self):
|
||||||
|
# ensure we have at least one action
|
||||||
|
if len(self.actions) == 0:
|
||||||
|
self.actions.append(Action(
|
||||||
|
ActionType.EndGame,
|
||||||
|
target=0
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"deck": instance.deck,
|
||||||
|
"players": instance.players,
|
||||||
|
"actions": self.actions,
|
||||||
|
"first_player": 0,
|
||||||
|
"options": {
|
||||||
|
"variant": "No Variant",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Private helpers
|
||||||
|
|
||||||
|
# increments turn counter and tracks extra round
|
||||||
|
def __make_turn(self):
|
||||||
|
assert(self.remaining_extra_turns > 0)
|
||||||
|
self.turn = (self.turn + 1) % self.instance.num_players
|
||||||
|
if self.progress == self.instance.deck_size:
|
||||||
|
self.remaining_extra_turns -= 1
|
||||||
|
|
||||||
|
# replaces the specified card (has to be in current player's hand) with the next card of the deck (if nonempty)
|
||||||
|
def __replace(self, card_idx):
|
||||||
|
idx_in_hand = next((i for (i, card) in enumerate(self.cur_hand) if card.deck_index == card_idx), None)
|
||||||
|
|
||||||
|
assert(idx_in_hand is not None)
|
||||||
|
|
||||||
|
for i in range(idx_in_hand, self.instance.hand_size - 1):
|
||||||
|
self.cur_hand[i] = self.cur_hand[i + 1]
|
||||||
|
if self.progress < self.instance.deck_size:
|
||||||
|
self.cur_hand[self.instance.hand_size - 1] = self.instance.deck[self.progress]
|
||||||
|
self.progress += 1
|
||||||
|
|
|
@ -11,6 +11,7 @@ from threading import Lock
|
||||||
from time import sleep, perf_counter
|
from time import sleep, perf_counter
|
||||||
from greedy_solver import GameState, GreedyStrategy
|
from greedy_solver import GameState, GreedyStrategy
|
||||||
from logger_setup import logger
|
from logger_setup import logger
|
||||||
|
from deck_analyzer import analyze, InfeasibilityReason, InfeasibilityType
|
||||||
|
|
||||||
|
|
||||||
MAX_PROCESSES=4
|
MAX_PROCESSES=4
|
||||||
|
@ -106,8 +107,14 @@ def get_decks_for_all_seeds():
|
||||||
mutex = Lock()
|
mutex = Lock()
|
||||||
|
|
||||||
def solve_instance(num_players, deck):
|
def solve_instance(num_players, deck):
|
||||||
|
# first, sanity check on running out of pace
|
||||||
|
result = analyze(deck, num_players)
|
||||||
|
if result is not None:
|
||||||
|
assert type(result) == InfeasibilityReason
|
||||||
|
logger.info("found infeasible deck")
|
||||||
|
return False, None, None
|
||||||
for num_remaining_cards in [0, 5, 10, 20, 30]:
|
for num_remaining_cards in [0, 5, 10, 20, 30]:
|
||||||
# print("trying with {} greedy moves".format(num_greedy_moves))
|
# logger.info("trying with {} remaining cards".format(num_remaining_cards))
|
||||||
game = GameState(num_players, deck)
|
game = GameState(num_players, deck)
|
||||||
strat = GreedyStrategy(game)
|
strat = GreedyStrategy(game)
|
||||||
|
|
||||||
|
@ -124,10 +131,11 @@ def solve_instance(num_players, deck):
|
||||||
|
|
||||||
# now, apply sat solver
|
# now, apply sat solver
|
||||||
if not game.is_over():
|
if not game.is_over():
|
||||||
|
logger.info("continuing greedy sol with SAT")
|
||||||
solvable, sol = solve_sat(game)
|
solvable, sol = solve_sat(game)
|
||||||
if solvable:
|
if solvable:
|
||||||
return solvable, sol, num_remaining_cards
|
return True, sol, num_remaining_cards
|
||||||
logger.info("No success, reducing number of greedy moves, failed attempt was: {}".format(link(game.to_json())))
|
logger.info("No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(num_remaining_cards, link(game.to_json())))
|
||||||
# print("Aborting trying with greedy strat")
|
# print("Aborting trying with greedy strat")
|
||||||
logger.info("Starting full SAT solver")
|
logger.info("Starting full SAT solver")
|
||||||
game = GameState(num_players, deck)
|
game = GameState(num_players, deck)
|
||||||
|
@ -143,17 +151,22 @@ def solve_seed(seed, num_players, deck_compressed, var_id):
|
||||||
logger.info("Solved instance {} in {} seconds".format(seed, round(t1-t0, 2)))
|
logger.info("Solved instance {} in {} seconds".format(seed, round(t1-t0, 2)))
|
||||||
|
|
||||||
mutex.acquire()
|
mutex.acquire()
|
||||||
lcur = conn.cursor()
|
if solvable is not None:
|
||||||
lcur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed))
|
lcur = conn.cursor()
|
||||||
conn.commit()
|
lcur.execute("UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", (solvable, seed))
|
||||||
if solvable:
|
conn.commit()
|
||||||
|
|
||||||
|
if solvable == True:
|
||||||
with open("remaining_cards.txt", "a") as f:
|
with open("remaining_cards.txt", "a") as f:
|
||||||
f.write("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(num_remaining_cards, seed ,link(solution.to_json())))
|
f.write("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(num_remaining_cards, seed ,link(solution.to_json())))
|
||||||
|
elif solvable == False:
|
||||||
if not solvable:
|
|
||||||
logger.info("seed {} was not solvable".format(seed))
|
logger.info("seed {} was not solvable".format(seed))
|
||||||
with open('infeasible_instances.txt', 'a') as f:
|
with open('infeasible_instances.txt', 'a') as f:
|
||||||
f.write('{}-player, seed {:10}, {}\n'.format(num_players, seed, variant_name(var_id)))
|
f.write('{}-player, seed {:10}, {}\n'.format(num_players, seed, variant_name(var_id)))
|
||||||
|
elif solvable is None:
|
||||||
|
logger.info("seed {} skipped".format(seed))
|
||||||
|
else:
|
||||||
|
raise Exception("Programming Error")
|
||||||
|
|
||||||
mutex.release()
|
mutex.release()
|
||||||
|
|
||||||
|
@ -164,6 +177,9 @@ def solve_unknown_seeds():
|
||||||
cur.execute("SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL AND deck IS NOT NULL", (var['id'],))
|
cur.execute("SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) AND feasible IS NULL AND deck IS NOT NULL", (var['id'],))
|
||||||
res = cur.fetchall()
|
res = cur.fetchall()
|
||||||
|
|
||||||
|
# for r in res:
|
||||||
|
# solve_seed(r[0], r[1], r[2], var['id'])
|
||||||
|
|
||||||
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
|
with concurrent.futures.ProcessPoolExecutor(max_workers=MAX_PROCESSES) as executor:
|
||||||
fs = [executor.submit(solve_seed, r[0], r[1], r[2], var['id']) for r in res]
|
fs = [executor.submit(solve_seed, r[0], r[1], r[2], var['id']) for r in res]
|
||||||
with alive_bar(len(res), title='Seed solving on {}'.format(var['name'])) as bar:
|
with alive_bar(len(res), title='Seed solving on {}'.format(var['name'])) as bar:
|
||||||
|
|
Loading…
Reference in a new issue