rework analysis of upper bounds: compute all bounds now, insert into DB properly

This commit is contained in:
Maximilian Keßler 2023-07-08 09:48:22 +02:00
parent 91f3c73eb3
commit 29cae8f139
Signed by: max
GPG Key ID: BCC5A619923C0BA5

View File

@ -1,7 +1,8 @@
from hanabi.live import compress
from enum import Enum from enum import Enum
from typing import List
from hanabi import database from hanabi import database
from hanabi import logger
from hanabi import hanab_game from hanabi import hanab_game
from hanabi.live import compress from hanabi.live import compress
@ -9,107 +10,85 @@ from hanabi.live import compress
class InfeasibilityType(Enum): class InfeasibilityType(Enum):
OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is
OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit
NotTrivial = 2
CritAtBottom = 3 CritAtBottom = 3
class InfeasibilityReason(): class InfeasibilityReason:
def __init__(self, infeasibility_type, idx, value=None): def __init__(self, infeasibility_type: InfeasibilityType, score_upper_bound, value=None):
self.type = infeasibility_type self.type = infeasibility_type
self.index = idx self.score_upper_bound = score_upper_bound
self.value = value self.value = value
def __repr__(self): def __repr__(self):
match self.type: match self.type:
case InfeasibilityType.OutOfPace: case InfeasibilityType.OutOfPace:
return "Deck runs out of pace ({}) after drawing card {}".format(self.value, self.index) return "Upper bound {}, since deck runs out of pace after drawing card {}".format(self.score_upper_bound, self.value)
case InfeasibilityType.OutOfHandSize: case InfeasibilityType.OutOfHandSize:
return "Deck runs out of hand size after drawing card {}".format(self.index) return "Upper bound {}, since deck runs out of hand size after drawing card {}".format(self.score_upper_bound, self.value)
case InfeasibilityType.CritAtBottom: case InfeasibilityType.CritAtBottom:
return "Deck has crit non-5 at bottom (index {})".format(self.index) return "Upper bound {}, sicne deck has critical non-5 at bottom".format(self.score_upper_bound)
def analyze_suit(occurrences): def analyze(instance: hanab_game.HanabiInstance, only_find_first=False) -> List[InfeasibilityReason]:
# denotes the indexes of copies we can use wlog """
picks = { Checks instance for the following (easy) certificates for unfeasibility
1: 0, - There is a critical non-5 at the bottom
**{r: None for r in range(2, 5)}, - We necessarily run out of pace when playing this deck:
5: 0 At some point, among all drawn cards, there are too few playable ones so that the next discard
} reduces pace to a negative amount
- We run out of hand size when playing this deck:
At some point, there are too many critical cards (that also could not have been played) for the players
to hold collectively
:param instance: Instance to be analyzed
:param only_find_first: If true, we immediately return when finding the first infeasibility reason and don't
check for further ones. Might be slightly faster on some instances, especially dark ones.
:return: List with all reasons found. Empty if none is found.
In particular, if return value is not the empty list, the analyzed instance is unfeasible
"""
reasons = []
# denotes the intervals when cards will be played wlog # check for critical non-fives at bottom of the deck
play_times = { bottom_card = instance.deck[-1]
1: [occurrences[1][0]], if bottom_card.rank != 5 and bottom_card.suitIndex in instance.dark_suits:
**{r: None for _ in range(instance.num_suits) reasons.append(InfeasibilityReason(
for r in range(2, 6) InfeasibilityType.CritAtBottom,
} instance.max_score - 5 + bottom_card.rank,
} instance.deck_size - 1
))
if only_find_first:
return reasons
print("occurrences are: {}".format(occurrences)) # we will sweep through the deck and pretend that
# - we keep all non-trash cards in our hands
for rank in range(2, 6): # - we instantly play all playable cards as soon as we have them
# - we recurse on this instant-play
# general analysis #
earliest_play = max(min(play_times[rank - 1]), min(occurrences[rank])) # For example, we assume that once we draw r2, we check if we can play r2.
latest_play = max(*play_times[rank - 1], *occurrences[rank]) # If yes, then we also check if we drew r3 earlier and so on.
play_times[rank] = [earliest_play, latest_play] # If not, then we keep r2 in our hands
#
# check a few extra cases regarding the picks when the rank is not 5 # In total, this is equivalent to assuming that we infinitely many clues
if rank != 5: # and infinite storage space in our hands (which is of course not true),
# check if we can just play the first copy # but even in this setting, some games are infeasible due to pace issues
if max(play_times[rank - 1]) < min(occurrences[rank]): # that we can detect
picks[rank] = 0 #
play_times[rank] = [min(occurrences[rank])] # A small refinement is to pretend that we only have infinite storage for non-crit cards,
continue # for crit-cards, the usual hand card limit applies.
# This allows us to detect some seeds where there are simply too many unplayable cards to hold at some point
# check if the second copy is not worse than the first when it comes, # that also can't be discarded
# because we either have to wait for smaller cards anyway
# or the next card is not there anyway
if max(occurrences[rank]) < max(earliest_play, min(occurrences[rank + 1])):
picks[rank] = 1
return picks, play_times
def analyze_card_usage(instance: hanab_game.HanabiInstance):
storage_size = instance.num_players * instance.hand_size
for suit in range(instance.num_suits):
print("analysing suit {}: {}".format(
suit,
hanab_game.pp_deck((c for c in instance.deck if c.suitIndex == suit))
)
)
occurrences = {
rank: [max(0, i - storage_size + 1) for (i, card) in enumerate(instance.deck) if
card == hanab_game.DeckCard(suit, rank)]
for rank in range(1, 6)
}
picks, play_times = analyze_suit(occurrences)
print("did analysis:")
print("play times: ", play_times)
print("picks: ", picks)
print()
def analyze(instance: hanab_game.HanabiInstance, find_non_trivial=False) -> InfeasibilityReason | None:
if instance.deck[-1].rank != 5 and instance.deck[-1].suitIndex + instance.num_dark_suits >= instance.num_suits:
return InfeasibilityReason(InfeasibilityType.CritAtBottom, instance.deck_size - 1)
# we will sweep through the deck and pretend that we instantly play all cards
# as soon as we have them (and recurse this)
# this allows us to detect standard pace issue arguments # this allows us to detect standard pace issue arguments
stacks = [0] * instance.num_suits stacks = [0] * instance.num_suits
# we will ensure that stored_crits is a subset of stored_cards
stored_cards = set() stored_cards = set()
stored_crits = set() stored_crits = set()
min_forced_pace = 100 min_forced_pace = instance.initial_pace
worst_index = 0 worst_pace_index = 0
ret = None max_forced_crit_discard = 0
worst_crit_index = 0
for (i, card) in enumerate(instance.deck): for (i, card) in enumerate(instance.deck):
if card.rank == stacks[card.suitIndex] + 1: if card.rank == stacks[card.suitIndex] + 1:
@ -133,68 +112,84 @@ def analyze(instance: hanab_game.HanabiInstance, find_non_trivial=False) -> Infe
stored_crits.add(card) stored_crits.add(card)
stored_cards.add(card) stored_cards.add(card)
# check for out of handsize: # check for out of handsize (this number can be negative, in which case nothing applies)
if len(stored_crits) == instance.num_players * instance.hand_size: # Note the +1 at the end, which is there because we have to discard next,
return InfeasibilityReason(InfeasibilityType.OutOfHandSize, i) # so even if we currently have as many crits as we can hold, we have to discard one
num_forced_crit_discards = len(stored_crits) - instance.num_players * instance.hand_size + 1
if find_non_trivial and len(stored_cards) == instance.num_players * instance.hand_size: if len(stored_crits) - instance.num_players * instance.hand_size > max_forced_crit_discard:
ret = InfeasibilityReason(InfeasibilityType.NotTrivial, i) worst_crit_index = i
max_forced_crit_discard = num_forced_crit_discards
if only_find_first:
reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
# the last - 1 is there because we have to discard 'next', causing a further draw # the last - 1 is there because we have to discard 'next', causing a further draw
max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1 max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1
needed_plays = instance.max_score - sum(stacks)
needed_plays = 5 * instance.num_suits - sum(stacks) cur_pace = max_remaining_plays - needed_plays
missing = max_remaining_plays - needed_plays if cur_pace < min(0, min_forced_pace):
if missing < min_forced_pace: min_forced_pace = cur_pace
# print("update to {}: {}".format(i, missing)) worst_pace_index = i
min_forced_pace = missing if only_find_first:
worst_index = i reasons.append(InfeasibilityReason(
InfeasibilityType.OutOfPace,
instance.max_score + min_forced_pace,
worst_pace_index
))
return reasons
# check that we correctly walked through the deck # check that we correctly walked through the deck
assert (len(stored_cards) == 0) assert (len(stored_cards) == 0)
assert (len(stored_crits) == 0) assert (len(stored_crits) == 0)
assert (sum(stacks) == 5 * instance.num_suits) assert (sum(stacks) == instance.max_score)
if max_forced_crit_discard > 0:
reasons.append(
InfeasibilityReason(
InfeasibilityType.OutOfHandSize,
instance.max_score - max_forced_crit_discard,
worst_crit_index
)
)
if min_forced_pace < 0: if min_forced_pace < 0:
return InfeasibilityReason(InfeasibilityType.OutOfPace, worst_index, min_forced_pace) reasons.append(InfeasibilityReason(
elif ret is not None: InfeasibilityType.OutOfPace,
return ret instance.max_score + min_forced_pace,
else: worst_pace_index
return None
def run_on_database():
cur = database.conn.cursor()
cur2 = database.conn.cursor()
for num_p in range(2, 6):
cur.execute(
"SELECT seed, num_players, deck from seeds where variant_id = 0 and num_players = (%s) order by seed asc",
(num_p,))
res = cur.fetchall()
hand = 0
pace = 0
non_trivial = 0
d = None
print("Checking {} {}-player seeds from database".format(len(res), num_p))
for (seed, num_players, deck) in res:
deck = compress.decompress_deck(deck)
a = analyze(hanab_game.HanabiInstance(deck, num_players), True)
if type(a) == InfeasibilityReason:
if a.type == InfeasibilityType.OutOfHandSize:
# print("Seed {} infeasible: {}\n{}".format(seed, a, deck))
hand += 1
elif a.type == InfeasibilityType.OutOfPace:
pace += 1
elif a.type == InfeasibilityType.NotTrivial:
non_trivial += 1
d = seed, deck
print("Found {} seeds running out of hand size, {} running out of pace and {} that are not trivial".format(hand,
pace,
non_trivial))
if d is not None:
print("example non-trivial deck (seed {}): [{}]".format(
d[0],
", ".join(c.colorize() for c in d[1])
)) ))
print()
return reasons
def run_on_database(variant_id):
database.cur.execute(
"SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) ORDER BY (num_players, seed)",
(variant_id,)
)
res = database.cur.fetchall()
logger.info("Checking {} seeds of variant {} for infeasibility".format(len(res), variant_id))
for (seed, num_players, deck_str) in res:
deck = compress.decompress_deck(deck_str)
reasons = analyze(hanab_game.HanabiInstance(deck, num_players))
if reasons:
print("found infeasible seed {}: {}".format(seed, reasons))
else:
print("found nothing for seed {}".format(seed))
for reason in reasons:
database.cur.execute(
"INSERT INTO score_upper_bounds (seed, score_upper_bound, reason) "
"VALUES (%s,%s,%s) "
"ON CONFLICT (seed, reason) DO UPDATE "
"SET score_upper_bound = EXCLUDED.score_upper_bound",
(seed, reason.score_upper_bound, reason.type.value)
)
database.cur.execute(
"UPDATE seeds SET feasible = (%s) WHERE seed = (%s)",
(False, seed)
)
database.conn.commit()