From 29cae8f139726c4e62e55130468573455180a7b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Ke=C3=9Fler?= Date: Sat, 8 Jul 2023 09:48:22 +0200 Subject: [PATCH] rework analysis of upper bounds: compute all bounds now, insert into DB properly --- hanabi/solvers/deck_analyzer.py | 263 ++++++++++++++++---------------- 1 file changed, 129 insertions(+), 134 deletions(-) diff --git a/hanabi/solvers/deck_analyzer.py b/hanabi/solvers/deck_analyzer.py index 7994c32..4a94a09 100644 --- a/hanabi/solvers/deck_analyzer.py +++ b/hanabi/solvers/deck_analyzer.py @@ -1,7 +1,8 @@ -from hanabi.live import compress from enum import Enum +from typing import List from hanabi import database +from hanabi import logger from hanabi import hanab_game from hanabi.live import compress @@ -9,107 +10,85 @@ from hanabi.live import compress class InfeasibilityType(Enum): OutOfPace = 0 # idx denotes index of last card drawn before being forced to reduce pace, value denotes how bad pace is OutOfHandSize = 1 # idx denotes index of last card drawn before being forced to discard a crit - NotTrivial = 2 CritAtBottom = 3 -class InfeasibilityReason(): - def __init__(self, infeasibility_type, idx, value=None): +class InfeasibilityReason: + def __init__(self, infeasibility_type: InfeasibilityType, score_upper_bound, value=None): self.type = infeasibility_type - self.index = idx + self.score_upper_bound = score_upper_bound self.value = value def __repr__(self): match self.type: case InfeasibilityType.OutOfPace: - return "Deck runs out of pace ({}) after drawing card {}".format(self.value, self.index) + return "Upper bound {}, since deck runs out of pace after drawing card {}".format(self.score_upper_bound, self.value) case InfeasibilityType.OutOfHandSize: - return "Deck runs out of hand size after drawing card {}".format(self.index) + return "Upper bound {}, since deck runs out of hand size after drawing card {}".format(self.score_upper_bound, self.value) case InfeasibilityType.CritAtBottom: - return "Deck has crit non-5 at bottom (index {})".format(self.index) + return "Upper bound {}, sicne deck has critical non-5 at bottom".format(self.score_upper_bound) -def analyze_suit(occurrences): - # denotes the indexes of copies we can use wlog - picks = { - 1: 0, - **{r: None for r in range(2, 5)}, - 5: 0 - } +def analyze(instance: hanab_game.HanabiInstance, only_find_first=False) -> List[InfeasibilityReason]: + """ + Checks instance for the following (easy) certificates for unfeasibility + - There is a critical non-5 at the bottom + - We necessarily run out of pace when playing this deck: + At some point, among all drawn cards, there are too few playable ones so that the next discard + reduces pace to a negative amount + - We run out of hand size when playing this deck: + At some point, there are too many critical cards (that also could not have been played) for the players + to hold collectively + :param instance: Instance to be analyzed + :param only_find_first: If true, we immediately return when finding the first infeasibility reason and don't + check for further ones. Might be slightly faster on some instances, especially dark ones. + :return: List with all reasons found. Empty if none is found. + In particular, if return value is not the empty list, the analyzed instance is unfeasible + """ + reasons = [] - # denotes the intervals when cards will be played wlog - play_times = { - 1: [occurrences[1][0]], - **{r: None for _ in range(instance.num_suits) - for r in range(2, 6) - } - } + # check for critical non-fives at bottom of the deck + bottom_card = instance.deck[-1] + if bottom_card.rank != 5 and bottom_card.suitIndex in instance.dark_suits: + reasons.append(InfeasibilityReason( + InfeasibilityType.CritAtBottom, + instance.max_score - 5 + bottom_card.rank, + instance.deck_size - 1 + )) + if only_find_first: + return reasons - print("occurrences are: {}".format(occurrences)) - - for rank in range(2, 6): - - # general analysis - earliest_play = max(min(play_times[rank - 1]), min(occurrences[rank])) - latest_play = max(*play_times[rank - 1], *occurrences[rank]) - play_times[rank] = [earliest_play, latest_play] - - # check a few extra cases regarding the picks when the rank is not 5 - if rank != 5: - # check if we can just play the first copy - if max(play_times[rank - 1]) < min(occurrences[rank]): - picks[rank] = 0 - play_times[rank] = [min(occurrences[rank])] - continue - - # check if the second copy is not worse than the first when it comes, - # because we either have to wait for smaller cards anyway - # or the next card is not there anyway - if max(occurrences[rank]) < max(earliest_play, min(occurrences[rank + 1])): - picks[rank] = 1 - - return picks, play_times - - -def analyze_card_usage(instance: hanab_game.HanabiInstance): - storage_size = instance.num_players * instance.hand_size - for suit in range(instance.num_suits): - print("analysing suit {}: {}".format( - suit, - hanab_game.pp_deck((c for c in instance.deck if c.suitIndex == suit)) - ) - ) - - occurrences = { - rank: [max(0, i - storage_size + 1) for (i, card) in enumerate(instance.deck) if - card == hanab_game.DeckCard(suit, rank)] - for rank in range(1, 6) - } - - picks, play_times = analyze_suit(occurrences) - - print("did analysis:") - print("play times: ", play_times) - print("picks: ", picks) - print() - - -def analyze(instance: hanab_game.HanabiInstance, find_non_trivial=False) -> InfeasibilityReason | None: - if instance.deck[-1].rank != 5 and instance.deck[-1].suitIndex + instance.num_dark_suits >= instance.num_suits: - return InfeasibilityReason(InfeasibilityType.CritAtBottom, instance.deck_size - 1) - - # we will sweep through the deck and pretend that we instantly play all cards - # as soon as we have them (and recurse this) + # we will sweep through the deck and pretend that + # - we keep all non-trash cards in our hands + # - we instantly play all playable cards as soon as we have them + # - we recurse on this instant-play + # + # For example, we assume that once we draw r2, we check if we can play r2. + # If yes, then we also check if we drew r3 earlier and so on. + # If not, then we keep r2 in our hands + # + # In total, this is equivalent to assuming that we infinitely many clues + # and infinite storage space in our hands (which is of course not true), + # but even in this setting, some games are infeasible due to pace issues + # that we can detect + # + # A small refinement is to pretend that we only have infinite storage for non-crit cards, + # for crit-cards, the usual hand card limit applies. + # This allows us to detect some seeds where there are simply too many unplayable cards to hold at some point + # that also can't be discarded # this allows us to detect standard pace issue arguments stacks = [0] * instance.num_suits + + # we will ensure that stored_crits is a subset of stored_cards stored_cards = set() stored_crits = set() - min_forced_pace = 100 - worst_index = 0 + min_forced_pace = instance.initial_pace + worst_pace_index = 0 - ret = None + max_forced_crit_discard = 0 + worst_crit_index = 0 for (i, card) in enumerate(instance.deck): if card.rank == stacks[card.suitIndex] + 1: @@ -133,68 +112,84 @@ def analyze(instance: hanab_game.HanabiInstance, find_non_trivial=False) -> Infe stored_crits.add(card) stored_cards.add(card) - # check for out of handsize: - if len(stored_crits) == instance.num_players * instance.hand_size: - return InfeasibilityReason(InfeasibilityType.OutOfHandSize, i) - - if find_non_trivial and len(stored_cards) == instance.num_players * instance.hand_size: - ret = InfeasibilityReason(InfeasibilityType.NotTrivial, i) + # check for out of handsize (this number can be negative, in which case nothing applies) + # Note the +1 at the end, which is there because we have to discard next, + # so even if we currently have as many crits as we can hold, we have to discard one + num_forced_crit_discards = len(stored_crits) - instance.num_players * instance.hand_size + 1 + if len(stored_crits) - instance.num_players * instance.hand_size > max_forced_crit_discard: + worst_crit_index = i + max_forced_crit_discard = num_forced_crit_discards + if only_find_first: + reasons.append(InfeasibilityReason( + InfeasibilityType.OutOfPace, + instance.max_score + min_forced_pace, + worst_pace_index + )) + return reasons # the last - 1 is there because we have to discard 'next', causing a further draw max_remaining_plays = (instance.deck_size - i - 1) + instance.num_players - 1 - - needed_plays = 5 * instance.num_suits - sum(stacks) - missing = max_remaining_plays - needed_plays - if missing < min_forced_pace: - # print("update to {}: {}".format(i, missing)) - min_forced_pace = missing - worst_index = i + needed_plays = instance.max_score - sum(stacks) + cur_pace = max_remaining_plays - needed_plays + if cur_pace < min(0, min_forced_pace): + min_forced_pace = cur_pace + worst_pace_index = i + if only_find_first: + reasons.append(InfeasibilityReason( + InfeasibilityType.OutOfPace, + instance.max_score + min_forced_pace, + worst_pace_index + )) + return reasons # check that we correctly walked through the deck assert (len(stored_cards) == 0) assert (len(stored_crits) == 0) - assert (sum(stacks) == 5 * instance.num_suits) + assert (sum(stacks) == instance.max_score) + + if max_forced_crit_discard > 0: + reasons.append( + InfeasibilityReason( + InfeasibilityType.OutOfHandSize, + instance.max_score - max_forced_crit_discard, + worst_crit_index + ) + ) if min_forced_pace < 0: - return InfeasibilityReason(InfeasibilityType.OutOfPace, worst_index, min_forced_pace) - elif ret is not None: - return ret - else: - return None + reasons.append(InfeasibilityReason( + InfeasibilityType.OutOfPace, + instance.max_score + min_forced_pace, + worst_pace_index + )) + + return reasons -def run_on_database(): - cur = database.conn.cursor() - cur2 = database.conn.cursor() - for num_p in range(2, 6): - cur.execute( - "SELECT seed, num_players, deck from seeds where variant_id = 0 and num_players = (%s) order by seed asc", - (num_p,)) - res = cur.fetchall() - hand = 0 - pace = 0 - non_trivial = 0 - d = None - print("Checking {} {}-player seeds from database".format(len(res), num_p)) - for (seed, num_players, deck) in res: - deck = compress.decompress_deck(deck) - a = analyze(hanab_game.HanabiInstance(deck, num_players), True) - if type(a) == InfeasibilityReason: - if a.type == InfeasibilityType.OutOfHandSize: - # print("Seed {} infeasible: {}\n{}".format(seed, a, deck)) - hand += 1 - elif a.type == InfeasibilityType.OutOfPace: - pace += 1 - elif a.type == InfeasibilityType.NotTrivial: - non_trivial += 1 - d = seed, deck - - print("Found {} seeds running out of hand size, {} running out of pace and {} that are not trivial".format(hand, - pace, - non_trivial)) - if d is not None: - print("example non-trivial deck (seed {}): [{}]".format( - d[0], - ", ".join(c.colorize() for c in d[1]) - )) - print() +def run_on_database(variant_id): + database.cur.execute( + "SELECT seed, num_players, deck FROM seeds WHERE variant_id = (%s) ORDER BY (num_players, seed)", + (variant_id,) + ) + res = database.cur.fetchall() + logger.info("Checking {} seeds of variant {} for infeasibility".format(len(res), variant_id)) + for (seed, num_players, deck_str) in res: + deck = compress.decompress_deck(deck_str) + reasons = analyze(hanab_game.HanabiInstance(deck, num_players)) + if reasons: + print("found infeasible seed {}: {}".format(seed, reasons)) + else: + print("found nothing for seed {}".format(seed)) + for reason in reasons: + database.cur.execute( + "INSERT INTO score_upper_bounds (seed, score_upper_bound, reason) " + "VALUES (%s,%s,%s) " + "ON CONFLICT (seed, reason) DO UPDATE " + "SET score_upper_bound = EXCLUDED.score_upper_bound", + (seed, reason.score_upper_bound, reason.type.value) + ) + database.cur.execute( + "UPDATE seeds SET feasible = (%s) WHERE seed = (%s)", + (False, seed) + ) + database.conn.commit()