Implement fetching new league users

This commit is contained in:
Maximilian Keßler 2023-12-09 13:51:10 +01:00
parent e6407aeaae
commit 42a0d7059c
Signed by: max
GPG key ID: BCC5A619923C0BA5
6 changed files with 145 additions and 6 deletions

View file

@ -68,7 +68,9 @@
DROP TABLE IF EXISTS users CASCADE; DROP TABLE IF EXISTS users CASCADE;
CREATE TABLE users ( CREATE TABLE users (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
player_name TEXT NOT NULL UNIQUE player_name TEXT NOT NULL UNIQUE,
/* Can be null */
discord_tag TEXT UNIQUE
); );

View file

@ -81,3 +81,8 @@ excluded_variants:
- Synesthesia - Synesthesia
- Throw - Throw
- White - White
# ID of Google spreadsheet containing the signups
# If the url of your spreadsheet is https://docs.google.com/spreadsheets/d/.../edit#gid=...,
# the spreadsheet id is the (here omitted) part between /d/ and /edit
spreadsheet-id: ''

18
main.py
View file

@ -9,6 +9,8 @@ import config
import constants import constants
import database import database
import log_setup import log_setup
import fetch_players
import fetch_games
from log_setup import logger from log_setup import logger
@ -42,6 +44,16 @@ def subcommand_generate_config():
config.create_config() config.create_config()
def subcommand_fetch(target: str):
if target in ["all", "players"]:
fetch_players.fetch_players_interactive()
if target in ["all", "games"]:
games = fetch_games.fetch_games_for_all_players()
fetch_games.store_new_games(games)
if target in ["all", "game-details"]:
fetch_games.fetch_all_game_details()
def get_parser() -> argparse.ArgumentParser: def get_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog=constants.APP_NAME, prog=constants.APP_NAME,
@ -57,6 +69,9 @@ def get_parser() -> argparse.ArgumentParser:
subparsers.add_parser('generate-config', help='Generate config file at default location') subparsers.add_parser('generate-config', help='Generate config file at default location')
fetch_parser = subparsers.add_parser('fetch', help='Fetch new data.')
fetch_parser.add_argument(dest='target', choices=['all', 'players', 'games', 'game-details'], default='all')
return parser return parser
@ -66,7 +81,8 @@ def main():
subcommand_func = { subcommand_func = {
'init': subcommand_init, 'init': subcommand_init,
'generate-config': subcommand_generate_config 'generate-config': subcommand_generate_config,
'fetch': subcommand_fetch
}[args.command] }[args.command]
if args.verbose: if args.verbose:

View file

@ -196,6 +196,11 @@ class Config:
logger.error("Unexpected config format for entry {} in 'variant_base_ratings'".format(variant_name)) logger.error("Unexpected config format for entry {} in 'variant_base_ratings'".format(variant_name))
@property
@check_config_attr
def google_spreadsheet_id(self):
return self._config["spreadsheet-id"]
def get_config_path(): def get_config_path():
config_dir = Path(platformdirs.user_config_dir(constants.APP_NAME, ensure_exists=True)) config_dir = Path(platformdirs.user_config_dir(constants.APP_NAME, ensure_exists=True))

View file

@ -197,10 +197,11 @@ def add_user_name_to_player(hanabi_username: str, player_name: str):
existing_username, existing_player_name existing_username, existing_player_name
)) ))
else: else:
logger.error( err_msg = "Hanabi username {} is already associated to player {}, cannot register it to player {}.".format(
"Hanabi username {} is already associated to player {}, cannot register it to player {}.".format( res[0], res[1], player_name
res[0], res[1], player_name )
)) logger.error(err_msg)
raise ValueError(err_msg)
return return
cur.execute( cur.execute(
"INSERT INTO user_accounts (username, normalized_username, user_id) VALUES (%s, %s, %s)", "INSERT INTO user_accounts (username, normalized_username, user_id) VALUES (%s, %s, %s)",
@ -249,3 +250,9 @@ def get_variant_ids():
cur = conn_manager.get_new_cursor() cur = conn_manager.get_new_cursor()
cur.execute("SELECT id FROM variants") cur.execute("SELECT id FROM variants")
return [var_id for (var_id,) in cur.fetchall()] return [var_id for (var_id,) in cur.fetchall()]
def get_player_names():
cur = conn_manager.get_new_cursor()
cur.execute("SELECT player_name FROM users ORDER BY player_name")
return [player_name for (player_name,) in cur.fetchall()]

104
src/fetch_players.py Normal file
View file

@ -0,0 +1,104 @@
from dataclasses import dataclass
from typing import List, Optional
import requests_cache
import platformdirs
import csv
import constants
from config import config_manager
import database
from log_setup import logger
session = requests_cache.CachedSession(
platformdirs.user_cache_dir(constants.APP_NAME) + '/google-docs.requests-cache',
urls_expire_after={"*": 60 * 5} # Cache entries for 5 minutes (good especially during development)
)
@dataclass
class Registration:
user_account: str
player_name: str
discord_tag: Optional[str]
def fetch_csv() -> List[str]:
config = config_manager.get_config()
url = "https://docs.google.com/spreadsheets/d/{}/gviz/tq?tqx=out:csv".format(config.google_spreadsheet_id)
response = session.get(url)
if not response.status_code == 200:
err_msg = "Failed to fetch signup spreadsheet, url tried was {}".format(url)
logger.error(err_msg)
raise ConnectionError(err_msg)
return response.text.split("\n")
def parse_csv(lines: List[str]):
reader = csv.reader(lines, delimiter=",", quotechar='"')
ret = []
for row in reader:
if len(row) != 0:
user_account = row[1]
player_name = row[2]
discord_tag = row[3]
ret.append(Registration(user_account, player_name, discord_tag))
# We ignore the first row, since that corresponds to the column names
return ret[1:]
def get_new_registrations():
data = fetch_csv()
registrations = sorted(parse_csv(data), key=lambda registration: registration.player_name.lower())
registered_players = database.get_player_names()
new_registrations = []
i = 0 # Index in registrations
j = 0 # Index in registered players
# We will traverse both lists at the same time, where we maintain the invariant that j always points to the
# lowest index whose player name is at least as big as where i points to:
while i < len(registrations):
if j == len(registered_players) or registrations[i].player_name.lower() < registered_players[j].lower():
new_registrations.append(registrations[i])
i += 1
elif registrations[i].player_name == registered_players[j]:
i += 1
else:
j += 1
return new_registrations
def check_account_exists(user_account):
response = session.get("https://hanab.live/history/{}".format(user_account))
ok = response.status_code == 200
return ok
def ask_for_registration_confirmations(registrations: List[Registration], check_existence: bool = True) -> List[Registration]:
accepted = []
for registration in registrations:
if check_existence and not check_account_exists(registration.user_account):
logger.warning("Rejecting registration {}: Account does not exist on hanab.live".format(registration))
continue
response = input("Accept registration account={}, player_name={}, discord_tag={}? [Y/n]".format(
registration.user_account, registration.player_name, registration.discord_tag
))
if response in ["y", "Y", ""]:
accepted.append(registration)
logger.info("Accepted registration".format())
else:
logger.info("Rejected registration")
return accepted
def fetch_players_interactive():
logger.info("Fetching new players.")
regs = get_new_registrations()
if len(regs) == 0:
logger.info("No new players to register")
return
accepted = ask_for_registration_confirmations(regs)
for registration in accepted:
database.add_player(registration.user_account, registration.player_name)
logger.info("Successfully registered {} new players".format(len(accepted)))