Access DB only in main process, pass results

Accessing the database connection within the created child processes
repeatedly caused problems, this is not officially supported by
psycopg2.
We thus pass all relevant computed information back to the main thread
(via futures) and handle it there synchronously.
This commit is contained in:
Maximilian Keßler 2024-10-12 13:53:38 +02:00
parent 444476c685
commit b021bfeb71
2 changed files with 76 additions and 46 deletions

View file

@ -1,4 +1,6 @@
from typing import Optional, Tuple
from dataclasses import dataclass
from types import NoneType
from typing import Optional, Tuple, List
import pebble.concurrent
import concurrent.futures
@ -90,15 +92,26 @@ def get_decks_for_all_seeds():
bar()
mutex = threading.Lock()
@dataclass
class SolutionData:
seed: str = None
time_ms: int = 0
feasible: Optional[bool] = None
solution: Optional[GameState] = None
infeasibility_reasons: Optional[List[deck_analyzer.InfeasibilityReason]] = None
num_remaining_cards: Optional[int] = None
skipped: bool = False
def solve_instance(instance: hanab_game.HanabiInstance)-> Tuple[bool, Optional[GameState], Optional[int]]:
def solve_instance(instance: hanab_game.HanabiInstance)-> SolutionData:
retval = SolutionData()
# first, sanity check on running out of pace
result = deck_analyzer.analyze(instance)
# print(result)
if len(result) != 0:
logger.verbose("found infeasible deck by preliminary analysis")
return False, None, None
retval.feasible = False
retval.infeasibility_reasons = result
return retval
for num_remaining_cards in [0, 20]:
# logger.info("trying with {} remaining cards".format(num_remaining_cards))
game = hanab_game.GameState(instance)
@ -112,62 +125,52 @@ def solve_instance(instance: hanab_game.HanabiInstance)-> Tuple[bool, Optional[G
# check if we won already
if game.is_won():
retval.feasible = True
retval.solution = game
retval.num_remaining_cards = num_remaining_cards
# print("won with greedy strat")
return True, game, num_remaining_cards
return retval
# now, apply sat solver
if not game.is_over():
logger.debug("continuing greedy sol with SAT")
solvable, sol = solve_sat(game)
if solvable is None:
return True, sol, num_remaining_cards
solvable, solution = solve_sat(game)
if solvable:
retval.feasible = True
retval.solution = solution
retval.num_remaining_cards = num_remaining_cards
return retval
logger.debug(
"No success with {} remaining cards, reducing number of greedy moves, failed attempt was: {}".format(
num_remaining_cards, compress.link(game)))
# print("Aborting trying with greedy strat")
logger.debug("Starting full SAT solver")
game = hanab_game.GameState(instance)
a, b = solve_sat(game)
return a, b, instance.draw_pile_size
retval.feasible, retval.solution = solve_sat(game)
retval.num_remaining_cards = instance.draw_pile_size
if not retval.feasible:
assert len(retval.infeasibility_reasons) == 0
retval.infeasibility_reasons.append(deck_analyzer.InfeasibilityReason(deck_analyzer.InfeasibilityType.SAT))
return retval
def solve_seed(seed, num_players, deck, var_name: str, timeout: Optional[int] = 150):
def solve_seed(seed, num_players, deck, timeout: Optional[int] = 150) -> SolutionData:
try:
@pebble.concurrent.process(timeout=timeout)
def solve_seed_with_timeout(seed, num_players, deck):
def solve_seed_with_timeout(seed, num_players, deck) -> SolutionData:
try:
logger.verbose("Starting to solve seed {}".format(seed))
t0 = time.perf_counter()
solvable, solution, num_remaining_cards = solve_instance(hanab_game.HanabiInstance(deck, num_players))
retval = solve_instance(hanab_game.HanabiInstance(deck, num_players))
t1 = time.perf_counter()
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), solvable))
mutex.acquire()
if solvable is not None:
time_ms = round((t1 - t0) * 1000)
database.cur.execute("UPDATE seeds SET (feasible, solve_time_ms) = (%s, %s) WHERE seed = (%s)",
(solvable, time_ms, seed))
if solvable:
database.cur.execute("INSERT INTO certificate_games (seed, num_turns) "
"VALUES (%s, %s) "
"RETURNING ID ", (seed, len(solution.actions)))
game_id = database.cur.fetchone()[0]
store_actions(game_id, solution.actions, True)
database.conn.commit()
mutex.release()
if solvable:
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
num_remaining_cards, seed, compress.link(solution))
)
elif not solvable:
logger.debug("seed {} was not solvable".format(seed))
logger.debug('{}-player, seed {:10}, {}\n'.format(num_players, seed, var_name))
elif solvable is None:
logger.verbose("seed {} skipped".format(seed))
else:
raise Exception("Programming Error")
retval.seed = seed
retval.time_ms = round((t1 - t0) * 1000)
logger.verbose("Solved instance {} in {} seconds: {}".format(seed, round(t1 - t0, 2), retval.feasible))
return retval
except Exception as e:
print("exception in subprocess:")
@ -177,17 +180,40 @@ def solve_seed(seed, num_players, deck, var_name: str, timeout: Optional[int] =
try:
return f.result()
except TimeoutError:
retval = SolutionData()
retval.seed = seed
retval.feasible = None
retval.time_ms = 1000 * timeout
logger.verbose("Solving on seed {} timed out".format(seed))
mutex.acquire()
database.cur.execute("UPDATE seeds SET solve_time_ms = %s WHERE seed = (%s)", (1000 * timeout, seed))
database.conn.commit()
mutex.release()
return
return retval
except Exception as e:
print("exception in subprocess:")
traceback.print_exc()
def process_solve_result(result: SolutionData):
if result.feasible is not None:
database.cur.execute("UPDATE seeds SET (feasible, solve_time_ms) = (%s, %s) WHERE seed = (%s)",
(result.feasible, result.time_ms, result.seed))
if result.feasible:
assert result.solution is not None
database.cur.execute("INSERT INTO certificate_games (seed, num_turns) "
"VALUES (%s, %s) "
"RETURNING ID ", (result.seed, len(result.solution.actions)))
game_id = database.cur.fetchone()[0]
store_actions(game_id, result.solution.actions, True)
logger.verbose("Success with {} cards left in draw by greedy solver on seed {}: {}\n".format(
result.num_remaining_cards, result.seed, compress.link(result.solution))
)
else:
logger.debug("seed {} was not solvable".format(result.seed))
elif result.skipped:
logger.verbose("seed {} skipped".format(result.seed))
else:
database.cur.execute("UPDATE seeds SET solve_time_ms = %s WHERE seed = (%s)", (result.time_ms, result.seed))
database.conn.commit()
def solve_unknown_seeds(variant_id, seed_class: int = 0, num_players: Optional[int] = None, timeout: Optional[int] = 150):
variant_name = variants.variant_name(variant_id)
query = "SELECT seeds.seed, num_players, array_agg(suit_index order by deck_index asc), array_agg(rank order by deck_index asc) "\
@ -224,4 +250,6 @@ def solve_unknown_seeds(variant_id, seed_class: int = 0, num_players: Optional[i
fs = [executor.submit(solve_seed, d[0], d[1], d[2], timeout) for d in data]
with alive_progress.alive_bar(len(res), title='Seed solving on {}'.format(variant_name)) as bar:
for f in concurrent.futures.as_completed(fs):
result = f.result()
process_solve_result(result)
bar()

View file

@ -1,5 +1,6 @@
from enum import Enum
from typing import List
from dataclasses import dataclass
import alive_progress
@ -25,6 +26,7 @@ class InfeasibilityType(Enum):
BottomTopDeck = 20
DoubleBottomTopDeck = 30
CritAtBottom = 40
SAT = 50
class InfeasibilityReason: