Compare commits

..

No commits in common. "main" and "endgames" have entirely different histories.

6 changed files with 37 additions and 355 deletions

View file

@ -1,9 +0,0 @@
# Streak Hunting spreadsheet
This is currently not in a state that you could easily run this.
If you want to, contact me. Anyway:
In order to run, assumes that you
- Have the PyHanabi sources available as the `hanabi` module, installed and set up PostgreSQL along with it
- Have the `endgame-analyzer` executable in the local folder
- Have the `rust-hanabi` executable in the local folder

4
bdr.py
View file

@ -9,11 +9,11 @@ def analyze_game(instance: HanabLiveInstance, actions: List[Action]) -> Tuple[Li
termination = '' termination = ''
game = HanabLiveGameState(instance) game = HanabLiveGameState(instance)
for action in actions: for action in actions:
if action.type == ActionType.Discard or (action.type == ActionType.Play and not game.is_playable(instance.deck[action.target])): if action.type == ActionType.Discard:
discard = instance.deck[action.target] discard = instance.deck[action.target]
if not game.is_trash(discard): if not game.is_trash(discard):
if game.is_critical(discard): if game.is_critical(discard):
termination = 'Discard crit' if action.type == ActionType.Discard else 'Bomb crit' termination = 'Discard crit'
break break
if discard.rank != 1: if discard.rank != 1:
if discard in game.deck[game.progress:]: if discard in game.deck[game.progress:]:

19
bots.py
View file

@ -1,19 +0,0 @@
import subprocess
from games import get_game_json, GAMES_PATH
def run_strategy(game_id: int, strategy: str):
if strategy not in ['cheat', 'info', 'random']:
print('Strategy has to be one of cheat, info or random')
return
get_game_json(game_id)
result = subprocess.run(['./rust_hanabi', '--file', str((GAMES_PATH / str(game_id))), '--json-output', '%-' + strategy, '--strategy', strategy])
if result.returncode != 0:
print('Failed to run game {} with cheating stratgey'.format(game_id))
def ensure_bot_game_exists(game_id: int, strategy: str):
filename = GAMES_PATH / '{}-{}'.format(game_id, strategy)
if not filename.exists():
run_strategy(game_id, strategy)

View file

@ -1,106 +0,0 @@
import json
import re
import subprocess
from typing import Dict, Optional
from games import GAMES_PATH
from bots import ensure_bot_game_exists
from pathlib import Path
DATA_FILE = Path('endgame-data.json')
if not DATA_FILE.exists():
DATA_FILE.write_text('{}')
with open(DATA_FILE, 'r') as f:
DATA: Dict = json.loads(f.read())
def analyze_game(filename: str):
max_draw_pile_size = 15
try:
result = subprocess.run(['./endgame-analyzer', '-f', filename, '-d', str(max_draw_pile_size), '--interactive', '0', '--quiet', '-r'], stdout=subprocess.PIPE, timeout=30)
raw_output = result.stdout
except subprocess.TimeoutExpired as time_err:
raw_output = time_err.stdout
output = raw_output.decode('utf-8')
# Now, parse all results that we obtained (unclear how many depending on whether we ran into the timeout)
probabilities = {}
m = re.search(r"Specified draw pile size of ([0-9]+) cannot be reached with specified replay\.\nReplay ends at turn [0-9]+ with score of ([0-9]+)\.", output, re.M)
if m:
won = '100' if m.group(2) == '25' else 0
for draw_pile_size in range(1, int(m.group(1)) + 1):
probabilities[str(draw_pile_size)] = won
for m in re.finditer('Probability with ([0-9]+) cards left in deck: .*/.* ~ ([0-9.]+)', output):
probabilities[str(m.group(1))] = m.group(2)
return probabilities
def full_analyze_game(filename: str):
max_draw_pile_size = 10
try:
result = subprocess.run(['./endgame-analyzer', '-f', filename, '-d', str(max_draw_pile_size), '-i', '0', '--all-clues', '-r', '--quiet'], stdout=subprocess.PIPE, timeout=30)
raw_output = result.stdout
except subprocess.TimeoutExpired as time_err:
raw_output = time_err.stdout
output = raw_output.decode('utf-8')
probabilities = {}
zero_dict = {
(('+' if clue_modifier >= 0 else '') + str(clue_modifier)): 0 for clue_modifier in range(-8, 9)
}
hundred_dict = {
(('+' if clue_modifier >= 0 else '') + str(clue_modifier)): 100 for clue_modifier in range(-8, 9)
}
m = re.search(r"Specified draw pile size of ([0-9]+) cannot be reached with specified replay\.\nReplay ends at turn [0-9]+ with score of ([0-9]+)\.", output, re.M)
if m:
won = hundred_dict if m.group(2) == '25' else zero_dict
for draw_pile_size in range(1, int(m.group(1)) + 1):
probabilities[str(draw_pile_size)] = won
for m in re.finditer('Probability with ([0-9]+) cards left in deck and [0-8] clues \((.[0-8])\).*: .*/.* ~ ([0-9.]*)', output):
if m.group(1) not in probabilities.keys():
probabilities[m.group(1)] = {}
probabilities[m.group(1)][m.group(2)] = m.group(3)
return probabilities
def full_analyze_game_cached(game_id: int, strategy: Optional[str] = None):
key = 'all' if strategy is None else 'all-{}'.format(strategy)
if strategy is not None:
ensure_bot_game_exists(game_id, strategy)
cached = DATA.get(key, {}).get(str(game_id), None)
if cached is not None:
return cached
result = full_analyze_game(str(GAMES_PATH / str(game_id)) + ('-{}'.format(strategy) if strategy is not None else ''))
if key not in DATA.keys():
DATA[key] = {}
DATA[key][game_id] = result
save_cache()
return result
def analyze_game_cached(game_id: int, strategy: Optional[str] = None):
key = 'normal' if strategy is None else 'normal-{}'.format(strategy)
if strategy is not None:
ensure_bot_game_exists(game_id, strategy)
cached = DATA.get(key, {}).get(str(game_id), None)
if cached is not None:
return cached
result = analyze_game(str(GAMES_PATH / str(game_id)) + ('-{}'.format(strategy) if strategy is not None else ''))
if key not in DATA.keys():
DATA[key] = {}
DATA[key][game_id] = result
save_cache()
return result
def save_cache():
with open(DATA_FILE, 'w') as f:
f.writelines(json.dumps(DATA, indent=2))

View file

@ -1,27 +0,0 @@
import json
from typing import Dict, Optional
from pathlib import Path
from hanabi.live.site_api import get
GAMES_PATH = Path('games')
if not GAMES_PATH.exists():
GAMES_PATH.mkdir(parents=True)
def get_game_json(game_id: int, strategy: Optional[str] = None) -> Dict:
filename = GAMES_PATH / (str(game_id) + ('-{}'.format(strategy) if strategy is not None else ''))
if filename.exists():
with open(filename, 'r') as f:
return json.load(f)
if strategy is not None:
print('Failed to load replay of {} strategy version of game with id {}'.format(strategy, game_id))
return {}
game = get("export/" + str(game_id))
with open(filename, 'w') as f:
f.write(json.dumps(game, indent=2))
return game

View file

@ -3,38 +3,27 @@ import requests_cache
import json import json
import csv import csv
import pandas import pandas
from typing import List, Optional, Dict
from pathlib import Path
from hanabi.database import global_db_connection_manager from hanabi.database import global_db_connection_manager
from hanabi.live.hanab_live import parse_json_game, HanabLiveGameState from hanabi.live.site_api import get
from hanabi.live.compress import link
from bdr import describe_game from bdr import describe_game
from endgames import analyze_game_cached, full_analyze_game_cached from endgames import analyze_game_cached
from games import get_game_json
# Init db connection # Init db connection
global_db_connection_manager.read_config() global_db_connection_manager.read_config()
global_db_connection_manager.connect() global_db_connection_manager.connect()
session = requests_cache.CachedSession('.hanab-live.cache',expire_after=30000) session = requests_cache.CachedSession('.hanab-live.cache',expire_after=300)
OUT_PATH = Path('out')
if not OUT_PATH.exists():
OUT_PATH.mkdir(parents=True)
player_mapping = { player_mapping = {
'RamaNoVarjan': 'Ramanujan', 'RamaNoVarjan': 'Ramanujan',
'RamaNoVarjan2': 'Ramanujan',
'purplejoe2': 'PurpleJoe', 'purplejoe2': 'PurpleJoe',
'PurpleJoeVar': 'PurpleJoe', 'PurpleJoeVar': 'PurpleJoe',
'PurpleJoeVar2': 'PurpleJoe',
'yagami_blank': 'Yagami', 'yagami_blank': 'Yagami',
'yagami_black': 'Yagami', 'yagami_black': 'Yagami',
'MarkusKahlsen': 'Markus', 'MarkusKahlsen': 'Markus',
'NoVarkusKahlsen': 'Markus', 'NoVarkusKahlsen': 'Markus',
'NoVarkusKarlsen': 'Markus',
'spring': 'spring', 'spring': 'spring',
'str8tsknacker': 'str8tsknacker', 'str8tsknacker': 'str8tsknacker',
'novarknacker': 'str8tsknacker', 'novarknacker': 'str8tsknacker',
@ -76,7 +65,7 @@ def get_player_games(player: str):
return json.loads(r.text) return json.loads(r.text)
def collect_player_games() -> Dict[int, Entry]: def collect_player_games():
global_games = {} global_games = {}
for player in player_mapping.keys(): for player in player_mapping.keys():
print('Parsing games for player {}'.format(player)) print('Parsing games for player {}'.format(player))
@ -109,26 +98,18 @@ def collect_player_games() -> Dict[int, Entry]:
def analyze_games(games): def analyze_games(games):
retval = {} retval = {}
for game_id in games.keys(): for game_id in games.keys():
bdrs, termination = describe_game(get_game_json(game_id)) game = get("export/" + str(game_id))
bdrs, termination = describe_game(game)
retval[game_id] = bdrs, termination retval[game_id] = bdrs, termination
return retval return retval
def analyze_endgames(games):
def analyze_endgames(games, strategy: Optional[str] = None):
retval = {} retval = {}
for game_id in games: for game_id in games.keys():
print('Analysing endgames {} of game {}'.format('with strategy {}'.format(strategy) if strategy is not None else '', game_id)) print('Analysing endgames of game {}'.format(game_id))
result = analyze_game_cached(game_id, strategy) result = analyze_game_cached(game_id)
retval[game_id] = result
return retval
def full_analyze_endgames(games, strategy: Optional[str] = None):
retval = {}
for game_id in games:
print('Analysing all endgames {} of game {}'.format('with strategy {}'.format(strategy) if strategy is not None else '', game_id))
result = full_analyze_game_cached(game_id, strategy)
retval[game_id] = result retval[game_id] = result
print(result)
return retval return retval
@ -143,77 +124,19 @@ def sort_players_by_num_games(games_dict):
return sorted(player_cols, key = lambda col: -nums[col]) return sorted(player_cols, key = lambda col: -nums[col])
def lookup_val(endgame_dict, clue_modifier) -> str: if __name__ == "__main__":
if clue_modifier > 0: games = collect_player_games()
for lookup in range(clue_modifier, 0, -1): analysis = analyze_games(games)
val = endgame_dict.get('+' + str(lookup), None) endgames = analyze_endgames(games)
if val is not None:
return val
if clue_modifier < 0:
for lookup in range(clue_modifier, 0):
val = endgame_dict.get(str(lookup))
if val is not None:
return val
retval = endgame_dict.get('+0', None)
return retval
def make_endgame_tables(ids: List[int], strategy: Optional[str] = None):
endgames = analyze_endgames(ids, strategy)
postfix = '-{}'.format(strategy) if strategy is not None else ''
main_fname = OUT_PATH / ('endgames' + postfix + '.csv')
fieldnames = ['Game ID'] + [str(i) for i in range(1, 16)]
with open(main_fname, 'w', newline='') as f:
f.writelines([','.join(fieldnames), "\n"])
with open(main_fname, 'a', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
for game_id, endgame in sorted(endgames.items()):
endgame['Game ID'] = "<a href='https://hanab.live/replay/{}'>{}</a>".format(game_id, game_id)
writer.writerow(endgame)
special_fname = str(OUT_PATH / ('endgames' + postfix + '_modifier_{}.csv'))
x = pandas.read_csv(main_fname)
x.to_html(main_fname.with_suffix('.html'), escape=False)
all_endgames = full_analyze_endgames(ids, strategy)
fieldnames = ['Game ID'] + [str(i) for i in range(1, 11)]
for clue_modifier in range(-2, 3):
filename = special_fname.format(clue_modifier)
with open(filename, 'w') as f:
f.writelines([','.join(fieldnames), "\n"])
with open(filename, 'a') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
for game_id, endgame in sorted(all_endgames.items()):
# print(endgame)
row = {'Game ID': game_id}
for deck_size in range(1, 11):
val = lookup_val(endgame.get(str(deck_size), {}), clue_modifier)
if val is not None:
row[str(deck_size)] = val
else:
print("WARN: No results found for game {} and deck size {}: {}".format(game_id, deck_size, endgame.get(str(deck_size))))
writer.writerow(row)
print('processed file {}'.format(filename))
x = pandas.read_csv(filename)
x.to_html(Path(filename).with_suffix('.html'), escape=False)
def make_results_table(games, analysis):
streaks = {} streaks = {}
fieldnames = ['Game ID', 'Seed', 'Player #', 'Result', 'BDR'] fieldnames = ['Game ID', 'Seed', 'Player #', 'Result', 'BDR']
fieldnames += sort_players_by_num_games(games) fieldnames += sort_players_by_num_games(games)
fieldnames += ['Other'] fieldnames += ['Other']
with open('out/games.csv', 'w', newline='') as f: with open('games.csv', 'w', newline='') as f:
f.writelines([','.join(fieldnames), "\n"]) f.writelines([','.join(fieldnames), "\n"])
with open('out/games.csv', 'a', newline='') as f: with open('games.csv', 'a', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames) writer = csv.DictWriter(f, fieldnames=fieldnames)
for game_id, entry in sorted(games.items()): for game_id, entry in sorted(games.items()):
bdrs, termination = analysis[game_id] bdrs, termination = analysis[game_id]
@ -240,98 +163,18 @@ def make_results_table(games, analysis):
row['Other'] = num_others + 1 row['Other'] = num_others + 1
writer.writerow(row) writer.writerow(row)
a = pandas.read_csv("out/games.csv") fieldnames = ['Game ID'] + [str(i) for i in range(1, 16)]
a.to_html("out/games.html", escape=False) with open('endgames.csv', 'w', newline='') as f:
f.writelines([','.join(fieldnames), "\n"])
with open('endgames.csv', 'a', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
for game_id, endgame in sorted(endgames.items()):
endgame['Game ID'] = "<a href='https://hanab.live/replay/{}'>{}</a>".format(game_id, game_id)
writer.writerow(endgame)
def make_team_table(games, analysis): a = pandas.read_csv("games.csv")
stats = {3: {}, 4: {}, 5: {}} a.to_html("games.html", escape=False)
for game_id, entry in games.items():
normalized_players = sorted(map(lambda player: player_mapping.get(player, 'Other'), entry.players))
if 'Other' in normalized_players:
continue
num_p = len(normalized_players) b = pandas.read_csv('endgames.csv')
b.to_html("endgames.html", escape=False)
key = ", ".join(normalized_players)
bdr, termination = analysis[game_id]
if key not in stats[num_p]:
stats[num_p][key] = {
'Games': 0,
'BDR': 0,
'Win': 0,
'Loss': 0,
'Discard crit': 0,
'Bomb crit': 0,
'Strikeout': 0,
'VTK': 0,
'Lost Endgame': 0
}
stats[num_p][key]['Games'] += 1
stats[num_p][key]['BDR'] += len(bdr)
if entry.won:
stats[num_p][key]['Win'] += 1
else:
stats[num_p][key]['Loss'] += 1
stats[num_p][key][termination] += 1
for num_p in range(3, 6):
filename = Path('out/teams_{}.csv'.format(num_p))
with open(filename, 'w') as f:
writer = csv.DictWriter(f, fieldnames=['Team', 'Games', 'Win', 'Loss', 'BDR', 'Discard crit', 'Bomb crit', 'Strikeout', 'VTK', 'Lost Endgame'])
writer.writeheader()
for team, row in sorted(stats[num_p].items(), key=lambda item: -item[1]['Games']):
row['Team'] = team
writer.writerow(row)
x = pandas.read_csv(filename)
x.to_html(filename.with_suffix('.html'))
def create_replay_links(ids: List[int], strategy: str):
outfile = Path('out/{}_links.csv'.format(strategy))
with open(outfile, 'w') as f:
writer = csv.writer(f)
writer.writerow(["Game ID", "Result", "{} Replay Link".format(strategy)])
for game_id in ids:
replay = get_game_json(game_id, strategy)
instance, actions = parse_json_game(replay)
game = HanabLiveGameState(instance)
for action in actions:
game.make_action(action)
bdrs, termination = describe_game(replay)
writer.writerow([game_id, 'Win' if game.is_won() else termination, link(game)])
x = pandas.read_csv(outfile)
x.to_html(outfile.with_suffix('.html'), render_links=True)
def main():
games = collect_player_games()
analysis = analyze_games(games)
game_ids = sorted(int(key) for key in games.keys())
# This is the main table, tracking streaks, loss reasons, BDRs
make_results_table(games, analysis)
# This tracks team statistics
make_team_table(games, analysis)
# Additional endgame analysis stats:
# For the real games
make_endgame_tables(game_ids, None)
# For the cheating strategy
make_endgame_tables(game_ids, 'cheat')
# For the information strategy
make_endgame_tables(game_ids, 'info')
# Create JSON replay links to the bot games to watch
create_replay_links(game_ids, 'cheat')
create_replay_links(game_ids, 'info')
if __name__ == "__main__":
main()