add worker thread for endgame analysis

This commit is contained in:
Maximilian Keßler 2024-01-13 15:35:49 +01:00
parent 20f4cfc67e
commit 816bf0d940
Signed by: max
GPG key ID: BCC5A619923C0BA5
3 changed files with 77 additions and 10 deletions

View file

@ -382,4 +382,21 @@ CREATE TABLE endgames (
enumerator INTEGER NOT NULL CHECK (enumerator >= 0), enumerator INTEGER NOT NULL CHECK (enumerator >= 0),
denominator INTEGER NOT NULL CHECK (denominator > 0), denominator INTEGER NOT NULL CHECK (denominator > 0),
PRIMARY KEY (game_id, turn, action_type, suit_index, rank) PRIMARY KEY (game_id, turn, action_type, suit_index, rank)
);
/**
We store separately whether we analyzed a certain game already and what the termination reason for the analysis was:
0 if evaluation completed within specified time and memory
1 if evaluation ran into timeout
2 if evaluation was empty because state is unreachable
3 if evaluation ran out of memory
This is also necessary because for some endgames, because in case 2 we will not have data,
simply because the game replay ended too early.
To avoid re-analyzing these seeds, we mark all seeds analyzed in this table.
*/
DROP TABLE IF EXISTS endgames_analyzed;
CREATE TABLE endgames_analyzed (
game_id INTEGER REFERENCES games (id),
termination_reason SMALLINT NOT NULL,
PRIMARY KEY (game_id)
); );

View file

@ -50,7 +50,7 @@ UNWINNABLE_SEED_FRACTION = 0.02
WEBSITE_OUTPUT_DIRECTORY = 'build' WEBSITE_OUTPUT_DIRECTORY = 'build'
ENDGAME_MAX_DRAW_PILE_SIZE = 15 ENDGAME_MAX_DRAW_PILE_SIZE = 15 # Not interested in game states with more than 15 cards, this should be enough.
ENDGAME_MEMORY_BYTES = 4 * 1024 * 1024 * 1024 # 4 GB of memory ENDGAME_MEMORY_BYTES = 4 * 1024 * 1024 * 1024 # 4 GB of memory
# In seconds ENDGAME_TIMEOUT_SECONDS = 60 * 15 # 15 Minutes per game by default
ENDGAME_TIMEOUT = 10 ENDGAME_ANALYSIS_QUERY_INTERVAL_MINUTES = 5 # Re-query database every 5 minutes

View file

@ -2,12 +2,14 @@ import json
import subprocess import subprocess
import re import re
import resource import resource
import time
from typing import List, Dict, Tuple from typing import List, Dict, Tuple
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import platformdirs import platformdirs
import psycopg2.extras import psycopg2.extras
import psycopg2.errors
import hanabi.hanab_game import hanabi.hanab_game
import hanabi.constants import hanabi.constants
@ -16,6 +18,7 @@ import hanabi.live.compress
import constants import constants
import games_db_interface import games_db_interface
from database import conn_manager from database import conn_manager
from log_setup import logger
@dataclass @dataclass
@ -29,7 +32,7 @@ class EndgameAction:
def analyze_and_store_game(game_id: int) -> int: def analyze_and_store_game(game_id: int) -> int:
actions, return_code = analyze_game_from_db(game_id) actions, return_code = analyze_game_from_db(game_id)
store_endgame_actions(game_id, actions) store_endgame_actions(game_id, actions, return_code)
return return_code return return_code
@ -60,7 +63,8 @@ def analyze_endgame_from_file(filename: str) -> Tuple[List[EndgameAction], int]:
@return: List of all evaluated actions and return code why evaluation finished: @return: List of all evaluated actions and return code why evaluation finished:
0 if evaluation completed within specified time and memory 0 if evaluation completed within specified time and memory
1 if evaluation ran into timeout 1 if evaluation ran into timeout
2 if evaluation ran out of memory 2 if evaluation was empty because state is unreachable
3 if evaluation ran out of memory
No guarantee can be made on what actions are actually evaluated, these might be more or less depending on No guarantee can be made on what actions are actually evaluated, these might be more or less depending on
timeouts and/or resource limitation. timeouts and/or resource limitation.
@ -77,19 +81,19 @@ def analyze_endgame_from_file(filename: str) -> Tuple[List[EndgameAction], int]:
args, args,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
timeout=constants.ENDGAME_TIMEOUT, timeout=constants.ENDGAME_TIMEOUT_SECONDS,
preexec_fn=set_memory_limit preexec_fn=set_memory_limit
) )
if result.returncode != 0: if result.returncode != 0:
# 2 is the return code to report that the specified game state is not reachable # 2 is the return code to report that the specified game state is not reachable
# In this case, there is nothing to analyze, so we will return an empty list and normal program termination. # In this case, there is nothing to analyze, so we will return an empty list and normal program termination.
if result.returncode == 2: if result.returncode == 2:
return [], 0 return [], 2
# 3 is the return code used by the subprocess to indicate an out of memory exception # 3 is the return code used by the subprocess to indicate an out of memory exception
# Since we intentionally limited the memory, this is actually not an exception for us, # Since we intentionally limited the memory, this is actually not an exception for us,
# we will simply parse the results we have and report the OOM exception. # we will simply parse the results we have and report the OOM exception.
if result.returncode == 3: if result.returncode == 3:
return_code = 2 return_code = 3
else: else:
raise RuntimeError( raise RuntimeError(
"Abnormal program termination of endgame-analyzer subprocess: Call of\n" "Abnormal program termination of endgame-analyzer subprocess: Call of\n"
@ -107,7 +111,8 @@ def analyze_endgame_from_file(filename: str) -> Tuple[List[EndgameAction], int]:
return_code = 1 return_code = 1
raw_output = time_err.stdout raw_output = time_err.stdout
output = raw_output.decode('utf-8') # It could be that we got no output. In that case, we also cannot parse anything
output = raw_output.decode('utf-8') if raw_output else ""
pattern = r"Turn (?P<turn>\d+), (?P<type>\w+)(?:\s(?P<card>\w\w))?: (?P<enumerator>\d+)/(?P<denominator>\d+)" pattern = r"Turn (?P<turn>\d+), (?P<type>\w+)(?:\s(?P<card>\w\w))?: (?P<enumerator>\d+)/(?P<denominator>\d+)"
@ -118,11 +123,14 @@ def set_memory_limit():
resource.setrlimit(resource.RLIMIT_DATA, (constants.ENDGAME_MEMORY_BYTES, constants.ENDGAME_MEMORY_BYTES)) resource.setrlimit(resource.RLIMIT_DATA, (constants.ENDGAME_MEMORY_BYTES, constants.ENDGAME_MEMORY_BYTES))
def store_endgame_actions(game_id: int, endgame_actions: List[EndgameAction]) -> None: def store_endgame_actions(game_id: int, endgame_actions: List[EndgameAction], result_code) -> None:
values = [] values = []
for action in endgame_actions: for action in endgame_actions:
values.append((game_id, action.turn, action.action_type.value, action.card.suitIndex, action.card.rank, action.enumerator, action.denominator)) values.append((game_id, action.turn, action.action_type.value, action.card.suitIndex, action.card.rank, action.enumerator, action.denominator))
# Remove duplicates (even though we expect none), otherwise this causes errors on insertion.
values = list(set(values))
conn = conn_manager.get_connection() conn = conn_manager.get_connection()
cur = conn.cursor() cur = conn.cursor()
psycopg2.extras.execute_values( psycopg2.extras.execute_values(
@ -134,6 +142,15 @@ def store_endgame_actions(game_id: int, endgame_actions: List[EndgameAction]) ->
"SET (enumerator, denominator) = (EXCLUDED.enumerator, EXCLUDED.denominator)", "SET (enumerator, denominator) = (EXCLUDED.enumerator, EXCLUDED.denominator)",
values values
) )
# Mark this game as analyzed.
cur.execute(
"INSERT INTO endgames_analyzed "
"VALUES (%s, %s) "
"ON CONFLICT (game_id) "
"DO UPDATE "
"SET termination_reason = EXCLUDED.termination_reason",
(game_id, result_code)
)
conn.commit() conn.commit()
@ -187,3 +204,36 @@ def parse_card(card: str) -> hanabi.hanab_game.DeckCard:
assert suit is not None assert suit is not None
return hanabi.hanab_game.DeckCard(suit, rank) return hanabi.hanab_game.DeckCard(suit, rank)
def work_thread():
"""
Will continuously query database to analyze endgames.
@return:
"""
conn = conn_manager.get_connection()
cur = conn.cursor()
while True:
cur.execute(
"SELECT games.id "
"FROM games "
"LEFT OUTER JOIN endgames_analyzed "
" ON endgames_analyzed.game_id = games.id "
"WHERE endgames_analyzed.termination_reason IS NULL "
"ORDER BY games.league_id DESC "
"LIMIT 1",
(False,)
)
res = cur.fetchone()
if res is None:
logger.info("No game found to analyze. Going to sleep for {} Minutes".format(
constants.ENDGAME_ANALYSIS_QUERY_INTERVAL_MINUTES)
)
time.sleep(60 * constants.ENDGAME_ANALYSIS_QUERY_INTERVAL_MINUTES)
else:
(game_id, ) = res
logger.info("Analyisng endgame of {}".format(game_id))
return_code = analyze_and_store_game(game_id)
print("Finished endgame analysis of {}: Returncode {}".format(game_id, return_code))
work_thread()