import json from typing import Dict, List, Optional import platformdirs import requests_cache import psycopg2.extras import hanabi.live.hanab_live import hanabi.hanab_game import constants import games_db_interface import database import utils from database import conn_manager from log_setup import logger from config import config_manager # This ensures that we do not pose too many requests to the website, especially while in development where we # might frequently re-initialize the database. session = requests_cache.CachedSession( platformdirs.user_cache_dir(constants.APP_NAME) + '/hanab.live.requests-cache', urls_expire_after={ # Game exports will never change, so cache them forever 'hanab.live/export/*': requests_cache.NEVER_EXPIRE, # Cash history requests for 5 minutes, this seems like a reasonable time 'hanab.live/api/v1/history-full/*': constants.USER_HISTORY_CACHE_TIME } ) print(platformdirs.user_cache_dir(constants.APP_NAME) + 'hanab.live.requests-cache') class GameInfo: def __init__(self, game_id: int, num_players: int, variant_id: int, seed: str, score: int, num_turns: int, user_ids: List[int], normalized_usernames: List[str]): self.game_id = game_id self.num_players = num_players self.variant_id = variant_id self.seed = seed self.score = score self.num_turns = num_turns self.user_ids = user_ids self.normalized_usernames = normalized_usernames def fetch_games_for_player(username: str, latest_game_id: int): logger.verbose("Fetching games for username {} more recent than id {}".format(username, latest_game_id)) url = "https://hanab.live/api/v1/history-full/{}?start={}".format(username, latest_game_id + 1) response = session.get(url) if not response.status_code == 200: err_msg = "Failed to fetch games for username {}, requested URL {}".format(username, url) logger.error(err_msg) raise ConnectionError(err_msg) return json.loads(response.text) def process_game_entry(game_json: Dict, username_dict: Dict, variant_ids: List[int]) -> Optional[GameInfo]: logger.debug("Processing entry {}".format(game_json)) config = config_manager.get_config() # Parse game properties game_id = game_json["id"] players = game_json["playerNames"] num_players = len(players) seed = game_json["seed"] score = game_json["score"] num_turns = game_json["numTurns"] start_time = game_json["datetimeStarted"] game_options = game_json["options"] var_id = game_options["variantID"] normalized_usernames = [utils.normalize_username(username) for username in players] # Now, check if the game is one that we accept for league if not all([ utils.are_game_options_allowed(game_id, game_options), utils.is_player_count_allowed(game_id, num_players), utils.is_game_id_allowed(game_id), utils.is_time_allowed(game_id, start_time) ]): return if var_id not in variant_ids: logger.debug("Rejected game {} due to invalid variant id {}".format(game_id, var_id)) return # Everything matches, so we can parse the participants now user_ids = [] # This will be a list of the (league specific) user_id's that played this game. for normalized_username in normalized_usernames: user_id = username_dict.get(normalized_username, None) if user_id is None: logger.debug("Rejected game {} due to unregistered participant {}".format(game_id, normalized_username)) return user_ids.append(user_id) return GameInfo(game_id, num_players, 0, seed, score, num_turns, user_ids, normalized_usernames) def fetch_games_for_all_players(): logger.info("Fetching new games.") cur = conn_manager.get_new_cursor() cur.execute("SELECT user_accounts.normalized_username, user_accounts.user_id, downloads.latest_game_id " "FROM user_accounts " "LEFT OUTER JOIN downloads " " ON user_accounts.normalized_username = downloads.normalized_username" ) # This will be a mapping of normalized username -> user ID that we built from the DB data username_dict = {} # This will be a mapping of game id -> JSON data that we get from hanab.live, where we will collect all the # possibly relevant games now games: Dict[int, Dict] = {} for username, user_id, latest_game_id in cur.fetchall(): username_dict[username] = user_id # Use the starting id as fallback if we have not downloaded any games for this player yet. if latest_game_id is None: latest_game_id = config_manager.get_config().starting_game_id player_games = fetch_games_for_player(username, latest_game_id) for game in player_games: games[game['id']] = game logger.info("Found {} potential league game(s) to process.".format(len(games))) allowed_variants = database.get_variant_ids() # This will hold the processed games that we will add to the database. good_games: Dict[int, GameInfo] = {} for game_id, game in games.items(): game_info = process_game_entry(game, username_dict, allowed_variants) if game_info is not None: good_games[game_id] = game_info logger.verbose("Found {} valid league game(s).".format(len(good_games))) return good_games def store_new_games(games: Dict[int, GameInfo]): conn = conn_manager.get_connection() cur = conn.cursor() games_vals = [] game_participants_vals = [] latest_game_ids: Dict[str, int] = {} # Now, iterate over all games and convert to tuples for insertion for game in sorted(games.values(), key=lambda game_info: game_info.game_id): tup = (game.game_id, game.num_players, game.variant_id, game.seed, game.score, game.num_turns) games_vals.append(tup) for player_id in game.user_ids: tup = (game.game_id, player_id) game_participants_vals.append(tup) # Note that this gives the maximum in the end since we process the games in order for normalized_username in game.normalized_usernames: latest_game_ids[normalized_username] = game.game_id # Do the insertions # Notice that on conflict we can just do nothing: In that case, we already have the game in the DB for some reason # (for example, because we forced a download refresh) psycopg2.extras.execute_values( cur, "INSERT INTO games (id, num_players, variant_id, seed, score, num_turns) " "VALUES %s " "ON CONFLICT (id) DO NOTHING", games_vals ) psycopg2.extras.execute_values( cur, "INSERT INTO game_participants (game_id, user_id) " "VALUES %s " "ON CONFLICT (game_id, user_id) DO NOTHING", game_participants_vals ) # Here, we want to update on insertion conflict psycopg2.extras.execute_values( cur, "INSERT INTO downloads (normalized_username, latest_game_id) " "VALUES %s " "ON CONFLICT (normalized_username) " "DO UPDATE SET (normalized_username, latest_game_id) = (EXCLUDED.normalized_username, EXCLUDED.latest_game_id)", latest_game_ids.items() ) logger.info("Added {} game(s) to database.".format(len(games))) # We only commit after performing all insertions. This guarantees that the download table is always in sync # with the actual games stored in the database. conn.commit() def detailed_fetch_game(game_id: int) -> bool: """ Fetches full game details from the server and stores it in local DB if this game is a league game. @param game_id: Game ID from hanab.live @return: Whether the processed game was accepted as a league game, i.e. inserted into the DB """ logger.verbose("Fetching details on game {}.".format(game_id)) url = "https://hanab.live/export/{}".format(game_id) response = session.get(url) if not response.status_code == 200: err_msg = "Failed to fetch game {}, requested URL {}".format(game_id, url) logger.error(err_msg) raise ConnectionError(err_msg) game_json = json.loads(response.text) instance, actions = hanabi.live.hanab_live.parse_json_game(game_json, False) game_id = game_json["id"] players = game_json["players"] num_players = len(players) seed = game_json["seed"] game_options = game_json.get("options", {}) var_name = game_options.get("variant", "No Variant") if not utils.are_game_options_allowed(game_id, game_options): return False if not utils.is_player_count_allowed(game_id, num_players): return False if not utils.is_game_id_allowed(game_id): return False var_id = database.get_variant_id(var_name) if var_id is None: logger.debug("Rejected game {} due to invalid variant id {}".format(game_id, var_id)) return False # All game options are ok, now check if the number of players is okay. normalized_usernames = [utils.normalize_username(username) for username in players] user_ids = database.get_user_ids_from_normalized_usernames(normalized_usernames) # The return value here is a str if there was an unregistered participant if type(user_ids) is str: logger.debug("Rejected game {} due to unregistered participant {}".format(game_id, user_ids)) return False # Now, we can start to actually process the game details instance, actions = hanabi.live.hanab_live.parse_json_game(game_json, False) game = hanabi.hanab_game.GameState(instance) # In order to figure out the score, we will need to play the game once for action in actions: game.make_action(action) conn = conn_manager.get_connection() cur = conn_manager.get_new_cursor() # Insert the game into the database if not present cur.execute("INSERT INTO games " "(id, num_players, variant_id, seed, score, num_turns) " "VALUES " "(%s, %s, %s, %s, %s, %s) " "ON CONFLICT (id) DO NOTHING", (game_id, num_players, var_id, seed, game.score, len(actions))) # TODO Max: Check if len(actions) is the correct number here, in case there was a VTK action, we might want to subtract one game_participants_vals = [] for seat, user_id in enumerate(user_ids): tup = (game_id, user_id, seat) game_participants_vals.append(tup) # This inserts the game participants now. # Note that the participants might already be stored, but not their seat (since we do not know the seat when only # getting a game list from the server on fetching the games played by some player) psycopg2.extras.execute_values( cur, "INSERT INTO game_participants " "(game_id, user_id, seat) " "VALUES %s " "ON CONFLICT (game_id, user_id) DO UPDATE " "SET seat = EXCLUDED.seat", game_participants_vals ) # DB is now in a consistent state again: We made sure the game and its participants are added. conn.commit() # It remains to store the seed and action data for this game # Note that we store the seed first. This ensures that whenever we have actions stored, we also have the seed stored. games_db_interface.store_deck_for_seed(seed, instance.deck) games_db_interface.store_actions(game_id, actions) logger.debug("Fetched all game details of game {}.".format(game_id)) # Do some sanity checks that loading the stored data did not change it assert actions == games_db_interface.load_actions(game_id) assert instance.deck == games_db_interface.load_deck(seed) return True def fetch_all_game_details(): logger.info("Fetching detailed game data for all games.") cur = conn_manager.get_new_cursor() # Get all games with no actions cur.execute("SELECT id FROM games " "LEFT OUTER JOIN game_actions" " ON games.id = game_actions.game_id " "WHERE game_actions.game_id IS NULL" ) for (game_id,) in cur.fetchall(): detailed_fetch_game(game_id) # Just to make sure, check that we have data on all seeds cur.execute("SELECT id FROM games " "LEFT OUTER JOIN seeds" " ON seeds.seed = games.seed " "WHERE seeds.seed IS NULL" ) games_without_seeds = [game_id for (game_id,) in cur.fetchall()] if len(games_without_seeds) != 0: logger.warn("Detected the following games without seed data, but with actions already stored: {}." "Fetching them now".format(", ".join(games_without_seeds)) ) for game_id in games_without_seeds: detailed_fetch_game(game_id)