diff --git a/install/database_schema.sql b/install/database_schema.sql index 5850250..7b1d2fe 100644 --- a/install/database_schema.sql +++ b/install/database_schema.sql @@ -382,4 +382,21 @@ CREATE TABLE endgames ( enumerator INTEGER NOT NULL CHECK (enumerator >= 0), denominator INTEGER NOT NULL CHECK (denominator > 0), PRIMARY KEY (game_id, turn, action_type, suit_index, rank) +); + +/** + We store separately whether we analyzed a certain game already and what the termination reason for the analysis was: + 0 if evaluation completed within specified time and memory + 1 if evaluation ran into timeout + 2 if evaluation was empty because state is unreachable + 3 if evaluation ran out of memory + This is also necessary because for some endgames, because in case 2 we will not have data, + simply because the game replay ended too early. + To avoid re-analyzing these seeds, we mark all seeds analyzed in this table. +*/ +DROP TABLE IF EXISTS endgames_analyzed; +CREATE TABLE endgames_analyzed ( + game_id INTEGER REFERENCES games (id), + termination_reason SMALLINT NOT NULL, + PRIMARY KEY (game_id) ); \ No newline at end of file diff --git a/src/constants.py b/src/constants.py index cfe049d..dd0e165 100644 --- a/src/constants.py +++ b/src/constants.py @@ -50,7 +50,7 @@ UNWINNABLE_SEED_FRACTION = 0.02 WEBSITE_OUTPUT_DIRECTORY = 'build' -ENDGAME_MAX_DRAW_PILE_SIZE = 15 +ENDGAME_MAX_DRAW_PILE_SIZE = 15 # Not interested in game states with more than 15 cards, this should be enough. ENDGAME_MEMORY_BYTES = 4 * 1024 * 1024 * 1024 # 4 GB of memory -# In seconds -ENDGAME_TIMEOUT = 10 +ENDGAME_TIMEOUT_SECONDS = 60 * 15 # 15 Minutes per game by default +ENDGAME_ANALYSIS_QUERY_INTERVAL_MINUTES = 5 # Re-query database every 5 minutes diff --git a/src/endgames.py b/src/endgames.py index 4407b93..67a8d42 100644 --- a/src/endgames.py +++ b/src/endgames.py @@ -2,12 +2,14 @@ import json import subprocess import re import resource +import time from typing import List, Dict, Tuple from dataclasses import dataclass from pathlib import Path import platformdirs import psycopg2.extras +import psycopg2.errors import hanabi.hanab_game import hanabi.constants @@ -16,6 +18,7 @@ import hanabi.live.compress import constants import games_db_interface from database import conn_manager +from log_setup import logger @dataclass @@ -29,7 +32,7 @@ class EndgameAction: def analyze_and_store_game(game_id: int) -> int: actions, return_code = analyze_game_from_db(game_id) - store_endgame_actions(game_id, actions) + store_endgame_actions(game_id, actions, return_code) return return_code @@ -60,7 +63,8 @@ def analyze_endgame_from_file(filename: str) -> Tuple[List[EndgameAction], int]: @return: List of all evaluated actions and return code why evaluation finished: 0 if evaluation completed within specified time and memory 1 if evaluation ran into timeout - 2 if evaluation ran out of memory + 2 if evaluation was empty because state is unreachable + 3 if evaluation ran out of memory No guarantee can be made on what actions are actually evaluated, these might be more or less depending on timeouts and/or resource limitation. @@ -77,19 +81,19 @@ def analyze_endgame_from_file(filename: str) -> Tuple[List[EndgameAction], int]: args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - timeout=constants.ENDGAME_TIMEOUT, + timeout=constants.ENDGAME_TIMEOUT_SECONDS, preexec_fn=set_memory_limit ) if result.returncode != 0: # 2 is the return code to report that the specified game state is not reachable # In this case, there is nothing to analyze, so we will return an empty list and normal program termination. if result.returncode == 2: - return [], 0 + return [], 2 # 3 is the return code used by the subprocess to indicate an out of memory exception # Since we intentionally limited the memory, this is actually not an exception for us, # we will simply parse the results we have and report the OOM exception. if result.returncode == 3: - return_code = 2 + return_code = 3 else: raise RuntimeError( "Abnormal program termination of endgame-analyzer subprocess: Call of\n" @@ -107,7 +111,8 @@ def analyze_endgame_from_file(filename: str) -> Tuple[List[EndgameAction], int]: return_code = 1 raw_output = time_err.stdout - output = raw_output.decode('utf-8') + # It could be that we got no output. In that case, we also cannot parse anything + output = raw_output.decode('utf-8') if raw_output else "" pattern = r"Turn (?P\d+), (?P\w+)(?:\s(?P\w\w))?: (?P\d+)/(?P\d+)" @@ -118,11 +123,14 @@ def set_memory_limit(): resource.setrlimit(resource.RLIMIT_DATA, (constants.ENDGAME_MEMORY_BYTES, constants.ENDGAME_MEMORY_BYTES)) -def store_endgame_actions(game_id: int, endgame_actions: List[EndgameAction]) -> None: +def store_endgame_actions(game_id: int, endgame_actions: List[EndgameAction], result_code) -> None: values = [] for action in endgame_actions: values.append((game_id, action.turn, action.action_type.value, action.card.suitIndex, action.card.rank, action.enumerator, action.denominator)) + # Remove duplicates (even though we expect none), otherwise this causes errors on insertion. + values = list(set(values)) + conn = conn_manager.get_connection() cur = conn.cursor() psycopg2.extras.execute_values( @@ -134,6 +142,15 @@ def store_endgame_actions(game_id: int, endgame_actions: List[EndgameAction]) -> "SET (enumerator, denominator) = (EXCLUDED.enumerator, EXCLUDED.denominator)", values ) + # Mark this game as analyzed. + cur.execute( + "INSERT INTO endgames_analyzed " + "VALUES (%s, %s) " + "ON CONFLICT (game_id) " + "DO UPDATE " + "SET termination_reason = EXCLUDED.termination_reason", + (game_id, result_code) + ) conn.commit() @@ -187,3 +204,36 @@ def parse_card(card: str) -> hanabi.hanab_game.DeckCard: assert suit is not None return hanabi.hanab_game.DeckCard(suit, rank) + +def work_thread(): + """ + Will continuously query database to analyze endgames. + @return: + """ + conn = conn_manager.get_connection() + cur = conn.cursor() + while True: + cur.execute( + "SELECT games.id " + "FROM games " + "LEFT OUTER JOIN endgames_analyzed " + " ON endgames_analyzed.game_id = games.id " + "WHERE endgames_analyzed.termination_reason IS NULL " + "ORDER BY games.league_id DESC " + "LIMIT 1", + (False,) + ) + res = cur.fetchone() + if res is None: + logger.info("No game found to analyze. Going to sleep for {} Minutes".format( + constants.ENDGAME_ANALYSIS_QUERY_INTERVAL_MINUTES) + ) + time.sleep(60 * constants.ENDGAME_ANALYSIS_QUERY_INTERVAL_MINUTES) + else: + (game_id, ) = res + logger.info("Analyisng endgame of {}".format(game_id)) + return_code = analyze_and_store_game(game_id) + print("Finished endgame analysis of {}: Returncode {}".format(game_id, return_code)) + + +work_thread()