diff --git a/bdr.py b/bdr.py index 8d57e0b..b01cbef 100644 --- a/bdr.py +++ b/bdr.py @@ -13,7 +13,7 @@ def analyze_game(instance: HanabLiveInstance, actions: List[Action]) -> Tuple[Li discard = instance.deck[action.target] if not game.is_trash(discard): if game.is_critical(discard): - termination = 'Discard crit' + termination = 'Discard crit' if action.type == ActionType.Discard else 'Bomb crit' break if discard.rank != 1: if discard in game.deck[game.progress:]: diff --git a/bots.py b/bots.py new file mode 100644 index 0000000..430d8db --- /dev/null +++ b/bots.py @@ -0,0 +1,19 @@ +import subprocess + +from games import get_game_json, GAMES_PATH + + +def run_strategy(game_id: int, strategy: str): + if strategy not in ['cheat', 'info', 'random']: + print('Strategy has to be one of cheat, info or random') + return + get_game_json(game_id) + result = subprocess.run(['./rust_hanabi', '--file', str((GAMES_PATH / str(game_id))), '--json-output', '%-' + strategy, '--strategy', strategy]) + if result.returncode != 0: + print('Failed to run game {} with cheating stratgey'.format(game_id)) + + +def ensure_bot_game_exists(game_id: int, strategy: str): + filename = GAMES_PATH / '{}-{}'.format(game_id, strategy) + if not filename.exists(): + run_strategy(game_id, strategy) diff --git a/endgames.py b/endgames.py new file mode 100644 index 0000000..db49f7d --- /dev/null +++ b/endgames.py @@ -0,0 +1,106 @@ +import json +import re +import subprocess +from typing import Dict, Optional + +from games import GAMES_PATH +from bots import ensure_bot_game_exists + +from pathlib import Path +DATA_FILE = Path('endgame-data.json') + +if not DATA_FILE.exists(): + DATA_FILE.write_text('{}') + + +with open(DATA_FILE, 'r') as f: + DATA: Dict = json.loads(f.read()) + + +def analyze_game(filename: str): + max_draw_pile_size = 15 + try: + result = subprocess.run(['./endgame-analyzer', '-f', filename, '-d', str(max_draw_pile_size), '--interactive', '0', '--quiet', '-r'], stdout=subprocess.PIPE, timeout=30) + raw_output = result.stdout + except subprocess.TimeoutExpired as time_err: + raw_output = time_err.stdout + output = raw_output.decode('utf-8') + + # Now, parse all results that we obtained (unclear how many depending on whether we ran into the timeout) + probabilities = {} + + m = re.search(r"Specified draw pile size of ([0-9]+) cannot be reached with specified replay\.\nReplay ends at turn [0-9]+ with score of ([0-9]+)\.", output, re.M) + if m: + won = '100' if m.group(2) == '25' else 0 + for draw_pile_size in range(1, int(m.group(1)) + 1): + probabilities[str(draw_pile_size)] = won + + for m in re.finditer('Probability with ([0-9]+) cards left in deck: .*/.* ~ ([0-9.]+)', output): + probabilities[str(m.group(1))] = m.group(2) + + return probabilities + + +def full_analyze_game(filename: str): + max_draw_pile_size = 10 + try: + result = subprocess.run(['./endgame-analyzer', '-f', filename, '-d', str(max_draw_pile_size), '-i', '0', '--all-clues', '-r', '--quiet'], stdout=subprocess.PIPE, timeout=30) + raw_output = result.stdout + except subprocess.TimeoutExpired as time_err: + raw_output = time_err.stdout + output = raw_output.decode('utf-8') + + probabilities = {} + zero_dict = { + (('+' if clue_modifier >= 0 else '') + str(clue_modifier)): 0 for clue_modifier in range(-8, 9) + } + hundred_dict = { + (('+' if clue_modifier >= 0 else '') + str(clue_modifier)): 100 for clue_modifier in range(-8, 9) + } + + m = re.search(r"Specified draw pile size of ([0-9]+) cannot be reached with specified replay\.\nReplay ends at turn [0-9]+ with score of ([0-9]+)\.", output, re.M) + if m: + won = hundred_dict if m.group(2) == '25' else zero_dict + for draw_pile_size in range(1, int(m.group(1)) + 1): + probabilities[str(draw_pile_size)] = won + + for m in re.finditer('Probability with ([0-9]+) cards left in deck and [0-8] clues \((.[0-8])\).*: .*/.* ~ ([0-9.]*)', output): + if m.group(1) not in probabilities.keys(): + probabilities[m.group(1)] = {} + probabilities[m.group(1)][m.group(2)] = m.group(3) + return probabilities + + +def full_analyze_game_cached(game_id: int, strategy: Optional[str] = None): + key = 'all' if strategy is None else 'all-{}'.format(strategy) + if strategy is not None: + ensure_bot_game_exists(game_id, strategy) + cached = DATA.get(key, {}).get(str(game_id), None) + if cached is not None: + return cached + result = full_analyze_game(str(GAMES_PATH / str(game_id)) + ('-{}'.format(strategy) if strategy is not None else '')) + if key not in DATA.keys(): + DATA[key] = {} + DATA[key][game_id] = result + save_cache() + return result + + +def analyze_game_cached(game_id: int, strategy: Optional[str] = None): + key = 'normal' if strategy is None else 'normal-{}'.format(strategy) + if strategy is not None: + ensure_bot_game_exists(game_id, strategy) + cached = DATA.get(key, {}).get(str(game_id), None) + if cached is not None: + return cached + result = analyze_game(str(GAMES_PATH / str(game_id)) + ('-{}'.format(strategy) if strategy is not None else '')) + if key not in DATA.keys(): + DATA[key] = {} + DATA[key][game_id] = result + save_cache() + return result + + +def save_cache(): + with open(DATA_FILE, 'w') as f: + f.writelines(json.dumps(DATA, indent=2)) diff --git a/games.py b/games.py new file mode 100644 index 0000000..41a5264 --- /dev/null +++ b/games.py @@ -0,0 +1,27 @@ +import json + +from typing import Dict, Optional +from pathlib import Path + +from hanabi.live.site_api import get + +GAMES_PATH = Path('games') + +if not GAMES_PATH.exists(): + GAMES_PATH.mkdir(parents=True) + + +def get_game_json(game_id: int, strategy: Optional[str] = None) -> Dict: + filename = GAMES_PATH / (str(game_id) + ('-{}'.format(strategy) if strategy is not None else '')) + if filename.exists(): + with open(filename, 'r') as f: + return json.load(f) + + if strategy is not None: + print('Failed to load replay of {} strategy version of game with id {}'.format(strategy, game_id)) + return {} + + game = get("export/" + str(game_id)) + with open(filename, 'w') as f: + f.write(json.dumps(game, indent=2)) + return game diff --git a/get_sheet.py b/get_sheet.py index 4674427..3e4a78e 100644 --- a/get_sheet.py +++ b/get_sheet.py @@ -3,24 +3,38 @@ import requests_cache import json import csv import pandas - -from bdr import describe_game -from hanabi.live.site_api import get +from typing import List, Optional, Dict +from pathlib import Path from hanabi.database import global_db_connection_manager +from hanabi.live.hanab_live import parse_json_game, HanabLiveGameState +from hanabi.live.compress import link + +from bdr import describe_game +from endgames import analyze_game_cached, full_analyze_game_cached +from games import get_game_json + +# Init db connection global_db_connection_manager.read_config() global_db_connection_manager.connect() -session = requests_cache.CachedSession('.hanab-live.cache',expire_after=300) +session = requests_cache.CachedSession('.hanab-live.cache',expire_after=30000) + +OUT_PATH = Path('out') +if not OUT_PATH.exists(): + OUT_PATH.mkdir(parents=True) player_mapping = { 'RamaNoVarjan': 'Ramanujan', + 'RamaNoVarjan2': 'Ramanujan', 'purplejoe2': 'PurpleJoe', 'PurpleJoeVar': 'PurpleJoe', + 'PurpleJoeVar2': 'PurpleJoe', 'yagami_blank': 'Yagami', 'yagami_black': 'Yagami', 'MarkusKahlsen': 'Markus', 'NoVarkusKahlsen': 'Markus', + 'NoVarkusKarlsen': 'Markus', 'spring': 'spring', 'str8tsknacker': 'str8tsknacker', 'novarknacker': 'str8tsknacker', @@ -45,7 +59,8 @@ player_cols = set() for _, p in player_mapping.items(): player_cols.add(p) -class Entry(): + +class Entry: def __init__(self, game_id, seed, score, players, bdr=0): self.game_id = game_id self.seed = seed @@ -60,7 +75,8 @@ def get_player_games(player: str): if r.status_code == 200: return json.loads(r.text) -def collect_player_games(): + +def collect_player_games() -> Dict[int, Entry]: global_games = {} for player in player_mapping.keys(): print('Parsing games for player {}'.format(player)) @@ -89,15 +105,33 @@ def collect_player_games(): return global_games + def analyze_games(games): retval = {} for game_id in games.keys(): - game = get("export/" + str(game_id)) - bdrs, termination = describe_game(game) + bdrs, termination = describe_game(get_game_json(game_id)) retval[game_id] = bdrs, termination return retval +def analyze_endgames(games, strategy: Optional[str] = None): + retval = {} + for game_id in games: + print('Analysing endgames {} of game {}'.format('with strategy {}'.format(strategy) if strategy is not None else '', game_id)) + result = analyze_game_cached(game_id, strategy) + retval[game_id] = result + return retval + + +def full_analyze_endgames(games, strategy: Optional[str] = None): + retval = {} + for game_id in games: + print('Analysing all endgames {} of game {}'.format('with strategy {}'.format(strategy) if strategy is not None else '', game_id)) + result = full_analyze_game_cached(game_id, strategy) + retval[game_id] = result + return retval + + def sort_players_by_num_games(games_dict): nums = {} for _, entry in games_dict.items(): @@ -108,28 +142,88 @@ def sort_players_by_num_games(games_dict): nums[col] = num + 1 return sorted(player_cols, key = lambda col: -nums[col]) -if __name__ == "__main__": - games = collect_player_games() - analysis = analyze_games(games) + +def lookup_val(endgame_dict, clue_modifier) -> str: + if clue_modifier > 0: + for lookup in range(clue_modifier, 0, -1): + val = endgame_dict.get('+' + str(lookup), None) + if val is not None: + return val + if clue_modifier < 0: + for lookup in range(clue_modifier, 0): + val = endgame_dict.get(str(lookup)) + if val is not None: + return val + retval = endgame_dict.get('+0', None) + return retval + + +def make_endgame_tables(ids: List[int], strategy: Optional[str] = None): + endgames = analyze_endgames(ids, strategy) + + postfix = '-{}'.format(strategy) if strategy is not None else '' + main_fname = OUT_PATH / ('endgames' + postfix + '.csv') + + fieldnames = ['Game ID'] + [str(i) for i in range(1, 16)] + with open(main_fname, 'w', newline='') as f: + f.writelines([','.join(fieldnames), "\n"]) + + with open(main_fname, 'a', newline='') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + for game_id, endgame in sorted(endgames.items()): + endgame['Game ID'] = "{}".format(game_id, game_id) + writer.writerow(endgame) + + special_fname = str(OUT_PATH / ('endgames' + postfix + '_modifier_{}.csv')) + + x = pandas.read_csv(main_fname) + x.to_html(main_fname.with_suffix('.html'), escape=False) + + all_endgames = full_analyze_endgames(ids, strategy) + fieldnames = ['Game ID'] + [str(i) for i in range(1, 11)] + for clue_modifier in range(-2, 3): + filename = special_fname.format(clue_modifier) + with open(filename, 'w') as f: + f.writelines([','.join(fieldnames), "\n"]) + with open(filename, 'a') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + for game_id, endgame in sorted(all_endgames.items()): + # print(endgame) + row = {'Game ID': game_id} + for deck_size in range(1, 11): + val = lookup_val(endgame.get(str(deck_size), {}), clue_modifier) + if val is not None: + row[str(deck_size)] = val + else: + print("WARN: No results found for game {} and deck size {}: {}".format(game_id, deck_size, endgame.get(str(deck_size)))) + writer.writerow(row) + + print('processed file {}'.format(filename)) + + x = pandas.read_csv(filename) + x.to_html(Path(filename).with_suffix('.html'), escape=False) + + +def make_results_table(games, analysis): streaks = {} fieldnames = ['Game ID', 'Seed', 'Player #', 'Result', 'BDR'] fieldnames += sort_players_by_num_games(games) fieldnames += ['Other'] - with open('games.csv', 'w', newline='') as f: + with open('out/games.csv', 'w', newline='') as f: f.writelines([','.join(fieldnames), "\n"]) - with open('games.csv', 'a', newline='') as f: + with open('out/games.csv', 'a', newline='') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) for game_id, entry in sorted(games.items()): bdrs, termination = analysis[game_id] row = { - 'Game ID': "{}".format(entry.game_id, entry.game_id), - 'Seed': "{}".format(entry.seed, entry.seed), - 'Player #': entry.num_players, - 'Result': 'Win' if entry.won else termination, - 'BDR': len(bdrs), - } + 'Game ID': "{}".format(entry.game_id, entry.game_id), + 'Seed': "{}".format(entry.seed, entry.seed), + 'Player #': entry.num_players, + 'Result': 'Win' if entry.won else termination, + 'BDR': len(bdrs), + } for player in entry.players: col = player_mapping.get(player, None) if col is not None: @@ -145,5 +239,99 @@ if __name__ == "__main__": num_others = row.get('Other', 0) row['Other'] = num_others + 1 writer.writerow(row) - a = pandas.read_csv("games.csv") - a.to_html("games.html", escape=False) + + a = pandas.read_csv("out/games.csv") + a.to_html("out/games.html", escape=False) + + +def make_team_table(games, analysis): + stats = {3: {}, 4: {}, 5: {}} + for game_id, entry in games.items(): + normalized_players = sorted(map(lambda player: player_mapping.get(player, 'Other'), entry.players)) + if 'Other' in normalized_players: + continue + + num_p = len(normalized_players) + + key = ", ".join(normalized_players) + + bdr, termination = analysis[game_id] + + if key not in stats[num_p]: + stats[num_p][key] = { + 'Games': 0, + 'BDR': 0, + 'Win': 0, + 'Loss': 0, + 'Discard crit': 0, + 'Bomb crit': 0, + 'Strikeout': 0, + 'VTK': 0, + 'Lost Endgame': 0 + } + + stats[num_p][key]['Games'] += 1 + stats[num_p][key]['BDR'] += len(bdr) + if entry.won: + stats[num_p][key]['Win'] += 1 + else: + stats[num_p][key]['Loss'] += 1 + stats[num_p][key][termination] += 1 + + for num_p in range(3, 6): + filename = Path('out/teams_{}.csv'.format(num_p)) + with open(filename, 'w') as f: + writer = csv.DictWriter(f, fieldnames=['Team', 'Games', 'Win', 'Loss', 'BDR', 'Discard crit', 'Bomb crit', 'Strikeout', 'VTK', 'Lost Endgame']) + writer.writeheader() + for team, row in sorted(stats[num_p].items(), key=lambda item: -item[1]['Games']): + row['Team'] = team + writer.writerow(row) + + x = pandas.read_csv(filename) + x.to_html(filename.with_suffix('.html')) + + +def create_replay_links(ids: List[int], strategy: str): + outfile = Path('out/{}_links.csv'.format(strategy)) + with open(outfile, 'w') as f: + writer = csv.writer(f) + writer.writerow(["Game ID", "Result", "{} Replay Link".format(strategy)]) + for game_id in ids: + replay = get_game_json(game_id, strategy) + instance, actions = parse_json_game(replay) + game = HanabLiveGameState(instance) + for action in actions: + game.make_action(action) + bdrs, termination = describe_game(replay) + writer.writerow([game_id, 'Win' if game.is_won() else termination, link(game)]) + + x = pandas.read_csv(outfile) + x.to_html(outfile.with_suffix('.html'), render_links=True) + + +def main(): + games = collect_player_games() + analysis = analyze_games(games) + game_ids = sorted(int(key) for key in games.keys()) + + # This is the main table, tracking streaks, loss reasons, BDRs + make_results_table(games, analysis) + + # This tracks team statistics + make_team_table(games, analysis) + + # Additional endgame analysis stats: + # For the real games + make_endgame_tables(game_ids, None) + # For the cheating strategy + make_endgame_tables(game_ids, 'cheat') + # For the information strategy + make_endgame_tables(game_ids, 'info') + + # Create JSON replay links to the bot games to watch + create_replay_links(game_ids, 'cheat') + create_replay_links(game_ids, 'info') + + +if __name__ == "__main__": + main()