forked from Hanabi/hanabi-league
309 lines
12 KiB
Python
309 lines
12 KiB
Python
import json
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional
|
|
|
|
import platformdirs
|
|
import requests_cache
|
|
import psycopg2.extras
|
|
|
|
import hanabi.live.hanab_live
|
|
import hanabi.hanab_game
|
|
|
|
import constants
|
|
import games_db_interface
|
|
import database
|
|
import utils
|
|
from database import conn_manager
|
|
from log_setup import logger
|
|
from config import config_manager
|
|
|
|
# This ensures that we do not pose too many requests to the website, especially while in development where we
|
|
# might frequently re-initialize the database.
|
|
session = requests_cache.CachedSession(
|
|
platformdirs.user_cache_dir(constants.APP_NAME) + '/hanab.live.requests-cache',
|
|
urls_expire_after={
|
|
# Game exports will never change, so cache them forever
|
|
'hanab.live/export/*': requests_cache.NEVER_EXPIRE,
|
|
# Cash history requests for 5 minutes, this seems like a reasonable time
|
|
'hanab.live/api/v1/history-full/*': constants.USER_HISTORY_CACHE_TIME
|
|
}
|
|
)
|
|
|
|
@dataclass
|
|
class GameInfo:
|
|
game_id: int
|
|
num_players: int
|
|
variant_id: int
|
|
seed: str
|
|
score: int
|
|
num_turns: int
|
|
user_ids: List[int]
|
|
normalized_usernames: List[str]
|
|
datetime_started: str
|
|
datetime_ended: str
|
|
|
|
|
|
def fetch_games_for_player(username: str, latest_game_id: int):
|
|
logger.verbose("Fetching games for username {} more recent than id {}".format(username, latest_game_id))
|
|
url = "https://hanab.live/api/v1/history-full/{}?start={}".format(username, latest_game_id + 1)
|
|
response = session.get(url)
|
|
if not response.status_code == 200:
|
|
err_msg = "Failed to fetch games for username {}, requested URL {}".format(username, url)
|
|
logger.error(err_msg)
|
|
raise ConnectionError(err_msg)
|
|
return json.loads(response.text)
|
|
|
|
|
|
def process_game_entry(game_json: Dict, username_dict: Dict, variant_ids: List[int]) -> Optional[GameInfo]:
|
|
logger.debug("Processing entry {}".format(game_json))
|
|
config = config_manager.get_config()
|
|
|
|
# Parse game properties
|
|
game_id = game_json["id"]
|
|
players = game_json["playerNames"]
|
|
num_players = len(players)
|
|
seed = game_json["seed"]
|
|
score = game_json["score"]
|
|
num_turns = game_json["numTurns"]
|
|
start_time = game_json["datetimeStarted"]
|
|
end_time = game_json["datetimeFinished"]
|
|
|
|
game_options = game_json["options"]
|
|
var_id = game_options["variantID"]
|
|
|
|
normalized_usernames = [utils.normalize_username(username) for username in players]
|
|
|
|
# Now, check if the game is one that we accept for league
|
|
|
|
if not all([
|
|
utils.are_game_options_allowed(game_id, game_options),
|
|
utils.is_player_count_allowed(game_id, num_players),
|
|
utils.is_game_id_allowed(game_id),
|
|
utils.is_time_allowed(game_id, start_time)
|
|
]):
|
|
return
|
|
|
|
if var_id not in variant_ids:
|
|
logger.debug("Rejected game {} due to invalid variant id {}".format(game_id, var_id))
|
|
return
|
|
|
|
# Everything matches, so we can parse the participants now
|
|
user_ids = [] # This will be a list of the (league specific) user_id's that played this game.
|
|
for normalized_username in normalized_usernames:
|
|
user_id = username_dict.get(normalized_username, None)
|
|
if user_id is None:
|
|
logger.debug("Rejected game {} due to unregistered participant {}".format(game_id, normalized_username))
|
|
return
|
|
user_ids.append(user_id)
|
|
|
|
return GameInfo(game_id, num_players, var_id, seed, score, num_turns, user_ids, normalized_usernames, start_time, end_time)
|
|
|
|
|
|
def fetch_games_for_all_players():
|
|
logger.info("Fetching new games.")
|
|
cur = conn_manager.get_new_cursor()
|
|
cur.execute("SELECT user_accounts.normalized_username, user_accounts.user_id, downloads.latest_game_id "
|
|
"FROM user_accounts "
|
|
"LEFT OUTER JOIN downloads "
|
|
" ON user_accounts.normalized_username = downloads.normalized_username"
|
|
)
|
|
# This will be a mapping of normalized username -> user ID that we built from the DB data
|
|
username_dict = {}
|
|
|
|
# This will be a mapping of game id -> JSON data that we get from hanab.live, where we will collect all the
|
|
# possibly relevant games now
|
|
games: Dict[int, Dict] = {}
|
|
for username, user_id, latest_game_id in cur.fetchall():
|
|
username_dict[username] = user_id
|
|
# Use the starting id as fallback if we have not downloaded any games for this player yet.
|
|
if latest_game_id is None:
|
|
latest_game_id = config_manager.get_config().starting_game_id
|
|
|
|
player_games = fetch_games_for_player(username, latest_game_id)
|
|
for game in player_games:
|
|
games[game['id']] = game
|
|
|
|
logger.info("Found {} potential league game(s) to process.".format(len(games)))
|
|
|
|
allowed_variants = database.get_variant_ids()
|
|
|
|
# This will hold the processed games that we will add to the database.
|
|
good_games: Dict[int, GameInfo] = {}
|
|
|
|
for game_id, game in games.items():
|
|
game_info = process_game_entry(game, username_dict, allowed_variants)
|
|
if game_info is not None:
|
|
good_games[game_id] = game_info
|
|
|
|
logger.verbose("Found {} valid league game(s).".format(len(good_games)))
|
|
return good_games
|
|
|
|
|
|
def store_new_games(games: Dict[int, GameInfo]):
|
|
conn = conn_manager.get_connection()
|
|
cur = conn.cursor()
|
|
games_vals = []
|
|
game_participants_vals = []
|
|
latest_game_ids: Dict[str, int] = {}
|
|
|
|
# Now, iterate over all games and convert to tuples for insertion
|
|
for game in sorted(games.values(), key=lambda game_info: game_info.game_id):
|
|
tup = (game.game_id, game.num_players, game.variant_id, game.seed, game.score, game.num_turns, game.datetime_started, game.datetime_ended)
|
|
games_vals.append(tup)
|
|
for player_id in game.user_ids:
|
|
tup = (game.game_id, player_id)
|
|
game_participants_vals.append(tup)
|
|
|
|
# Note that this gives the maximum in the end since we process the games in order
|
|
for normalized_username in game.normalized_usernames:
|
|
latest_game_ids[normalized_username] = game.game_id
|
|
|
|
# Do the insertions
|
|
# Notice that on conflict we can just do nothing: In that case, we already have the game in the DB for some reason
|
|
# (for example, because we forced a download refresh)
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
"INSERT INTO games (id, num_players, variant_id, seed, score, num_turns, datetime_started, datetime_finished) "
|
|
"VALUES %s "
|
|
"ON CONFLICT (id) DO NOTHING",
|
|
games_vals
|
|
)
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
"INSERT INTO game_participants (game_id, user_id) "
|
|
"VALUES %s "
|
|
"ON CONFLICT (game_id, user_id) DO NOTHING",
|
|
game_participants_vals
|
|
)
|
|
# Here, we want to update on insertion conflict
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
"INSERT INTO downloads (normalized_username, latest_game_id) "
|
|
"VALUES %s "
|
|
"ON CONFLICT (normalized_username) "
|
|
"DO UPDATE SET (normalized_username, latest_game_id) = (EXCLUDED.normalized_username, EXCLUDED.latest_game_id)",
|
|
latest_game_ids.items()
|
|
)
|
|
logger.info("Added {} game(s) to database.".format(len(games)))
|
|
# We only commit after performing all insertions. This guarantees that the download table is always in sync
|
|
# with the actual games stored in the database.
|
|
conn.commit()
|
|
|
|
|
|
def detailed_fetch_game(game_id: int) -> bool:
|
|
"""
|
|
Fetches full game details from the server and stores it in local DB if this game is a league game.
|
|
@warning: Game data has to be present in database already, game details will then be added.
|
|
@param game_id: Game ID from hanab.live
|
|
@return: Whether the processed game was accepted as a league game, i.e. inserted into the DB
|
|
"""
|
|
logger.verbose("Fetching details on game {}.".format(game_id))
|
|
url = "https://hanab.live/export/{}".format(game_id)
|
|
response = session.get(url)
|
|
if not response.status_code == 200:
|
|
err_msg = "Failed to fetch game {}, requested URL {}".format(game_id, url)
|
|
logger.error(err_msg)
|
|
raise ConnectionError(err_msg)
|
|
game_json = json.loads(response.text)
|
|
|
|
instance, actions = hanabi.live.hanab_live.parse_json_game(game_json, False)
|
|
|
|
game_id = game_json["id"]
|
|
players = game_json["players"]
|
|
num_players = len(players)
|
|
seed = game_json["seed"]
|
|
game_options = game_json.get("options", {})
|
|
var_name = game_options.get("variant", "No Variant")
|
|
|
|
if not utils.are_game_options_allowed(game_id, game_options):
|
|
return False
|
|
|
|
if not utils.is_player_count_allowed(game_id, num_players):
|
|
return False
|
|
|
|
if not utils.is_game_id_allowed(game_id):
|
|
return False
|
|
|
|
var_id = database.get_variant_id(var_name)
|
|
if var_id is None:
|
|
logger.debug("Rejected game {} due to invalid variant id {}".format(game_id, var_id))
|
|
return False
|
|
|
|
# All game options are ok, now check if the number of players is okay.
|
|
normalized_usernames = [utils.normalize_username(username) for username in players]
|
|
user_ids = database.get_user_ids_from_normalized_usernames(normalized_usernames)
|
|
# The return value here is a str if there was an unregistered participant
|
|
if type(user_ids) is str:
|
|
logger.debug("Rejected game {} due to unregistered participant {}".format(game_id, user_ids))
|
|
return False
|
|
|
|
# Now, we can start to actually process the game details
|
|
instance, actions = hanabi.live.hanab_live.parse_json_game(game_json, False)
|
|
game = hanabi.hanab_game.GameState(instance)
|
|
# In order to figure out the score, we will need to play the game once
|
|
for action in actions:
|
|
game.make_action(action)
|
|
|
|
conn = conn_manager.get_connection()
|
|
cur = conn_manager.get_new_cursor()
|
|
# Note that we assume the game is present in the database already
|
|
|
|
|
|
game_participants_vals = []
|
|
for seat, user_id in enumerate(user_ids):
|
|
tup = (game_id, user_id, seat)
|
|
game_participants_vals.append(tup)
|
|
|
|
# This inserts the game participants now.
|
|
# Note that the participants might already be stored, but not their seat (since we do not know the seat when only
|
|
# getting a game list from the server on fetching the games played by some player)
|
|
psycopg2.extras.execute_values(
|
|
cur,
|
|
"INSERT INTO game_participants "
|
|
"(game_id, user_id, seat) "
|
|
"VALUES %s "
|
|
"ON CONFLICT (game_id, user_id) DO UPDATE "
|
|
"SET seat = EXCLUDED.seat",
|
|
game_participants_vals
|
|
)
|
|
# DB is now in a consistent state again: We made sure the game and its participants are added.
|
|
conn.commit()
|
|
|
|
# It remains to store the seed and action data for this game
|
|
# Note that we store the seed first. This ensures that whenever we have actions stored, we also have the seed stored.
|
|
games_db_interface.store_deck_for_seed(seed, instance.deck)
|
|
games_db_interface.store_actions(game_id, actions)
|
|
logger.debug("Fetched all game details of game {}.".format(game_id))
|
|
|
|
# Do some sanity checks that loading the stored data did not change it
|
|
assert actions == games_db_interface.load_actions(game_id)
|
|
assert instance.deck == games_db_interface.load_deck(seed)
|
|
return True
|
|
|
|
|
|
def fetch_all_game_details():
|
|
logger.info("Fetching detailed game data for all games.")
|
|
cur = conn_manager.get_new_cursor()
|
|
|
|
# Get all games with no actions
|
|
cur.execute("SELECT id FROM games "
|
|
"LEFT OUTER JOIN game_actions"
|
|
" ON games.id = game_actions.game_id "
|
|
"WHERE game_actions.game_id IS NULL"
|
|
)
|
|
for (game_id,) in cur.fetchall():
|
|
detailed_fetch_game(game_id)
|
|
|
|
# Just to make sure, check that we have data on all seeds
|
|
cur.execute("SELECT id FROM games "
|
|
"LEFT OUTER JOIN seeds"
|
|
" ON seeds.seed = games.seed "
|
|
"WHERE seeds.seed IS NULL"
|
|
)
|
|
games_without_seeds = [game_id for (game_id,) in cur.fetchall()]
|
|
if len(games_without_seeds) != 0:
|
|
logger.warn("Detected the following games without seed data, but with actions already stored: {}."
|
|
"Fetching them now".format(", ".join(games_without_seeds))
|
|
)
|
|
for game_id in games_without_seeds:
|
|
detailed_fetch_game(game_id)
|