hanabi-league/src/ratings.py
Maximilian Keßler a377dd74af
Implement correct values for rating system
Added k-factors to config.
Implemented the season 0 rating change logic.
2023-11-24 17:15:38 +01:00

229 lines
9.7 KiB
Python

from typing import List, Dict, Tuple
import utils
from database import conn_manager
import psycopg2.extras
from log_setup import logger
import constants
from config import config_manager
def get_development_coefficient(num_games, player_rating):
config = config_manager.get_config()
if num_games <= config.k_factor_num_early_games:
return config.k_factor_for_few_games
if player_rating >= config.k_factor_high_rating_cutoff:
return config.k_factor_for_high_rating
return config.k_factor_normal
def expected_result(player_rating, var_rating):
expected = (1 - constants.UNWINNABLE_SEED_FRACTION) / (1 + pow(10, (var_rating - player_rating) / 400))
return expected
def compute_rating_changes(user_ratings: Dict[int, float], games_played: Dict[int, float], variant_rating: float, win: bool) -> Tuple[Dict[int, float], float]:
"""
@param user_ratings: Mapping of user ids to ratings that played this game.
@param games_played: Mapping of users ids to the number of games these users played so far.
@param variant_rating: Rating of the variant that was played
@param win: Whether the team won the game
@return: Mapping of user ids to their rating *changes* and *change* in variant rating
"""
expected_score = sum(expected_result(player_rating, variant_rating) for player_rating in user_ratings.values()) / len(user_ratings)
actual_score = 1 if win else 0
user_changes = {}
for user_id, num_games in games_played.items():
coefficient = get_development_coefficient(num_games, user_ratings[user_id])
user_changes[user_id] = coefficient * (actual_score - expected_score)
variant_change = config_manager.get_config().k_factor_for_variants * (expected_score - actual_score)
return user_changes, variant_change
def next_game_to_rate():
cur = conn_manager.get_new_cursor()
cur.execute("SELECT games.id FROM games "
"LEFT OUTER JOIN user_ratings"
" ON games.league_id = user_ratings.league_id "
"WHERE user_ratings.league_id IS NULL "
"ORDER BY games.league_id ASC "
"LIMIT 1"
)
query_result = cur.fetchone()
if query_result is None:
return
(game_id,) = query_result
return game_id
def get_current_user_ratings(user_ids: List[int], rating_type: int) -> Dict[int, float]:
"""
Fetches the current ratings for specified players and rating type from DB
@return: Mapping user_id -> current rating
"""
cur = conn_manager.get_new_cursor()
cur.execute("SELECT user_id, rating FROM user_base_ratings "
"WHERE user_id IN ({}) AND type = %s".format(", ".join("%s" for _ in user_ids)),
user_ids + [rating_type]
)
base_ratings = cur.fetchall()
# This query is a bit tricky:
# The subclause transforms the user_ratings table into the same table (with lesse columns), except that we now
# group entries corresponding to the same (user_id, type) and replace all of them with just the maximum league id
# Then we can do an inner join with this specific table, where we join again on (user_id, type), but now also
# require that the league id matches the max_league_id column from the subclause-generated table.
# Since an inner join only returns rows that match both tables, this will act as a filter on the initial table,
# even though we do not retrieve any values from the subclause-table
cur.execute("SELECT user_ratings.user_id, value_after FROM user_ratings "
"INNER JOIN ("
" SELECT user_id, type, MAX(league_id) AS max_league_id"
" FROM user_ratings "
" GROUP BY (user_id, type)"
" ) AS latest_user_ratings "
" ON"
" user_ratings.league_id = latest_user_ratings.max_league_id"
" AND user_ratings.user_id = latest_user_ratings.user_id"
" AND user_ratings.type = latest_user_ratings.type "
"WHERE "
" user_ratings.user_id IN ({})"
" AND user_ratings.type = %s"
.format(", ".join("%s" for _ in user_ids))
, user_ids + [rating_type]
)
current_ratings = cur.fetchall()
ratings: Dict[int, float] = {}
for user_id, base_rating in base_ratings:
ratings[user_id] = base_rating
for user_id, rating in current_ratings:
ratings[user_id] = rating
return ratings
def get_current_variant_rating(variant_id: int, num_players: int) -> float:
cur = conn_manager.get_new_cursor()
# Again, this query is tricky. For explanation, see the corresponding query for the user ratings
cur.execute("SELECT value_after FROM variant_ratings "
"INNER JOIN ("
" SELECT variant_id, num_players, MAX(league_id) AS max_league_id"
" FROM variant_ratings "
" GROUP BY (variant_id, num_players)"
" ) AS latest_variant_ratings "
" ON"
" variant_ratings.league_id = latest_variant_ratings.max_league_id "
" AND variant_ratings.variant_id = latest_variant_ratings.variant_id "
" AND variant_ratings.num_players = latest_variant_ratings.num_players "
"WHERE variant_ratings.variant_id = %s AND variant_ratings.num_players = %s",
(variant_id, num_players)
)
query_result = cur.fetchone()
if query_result is not None:
(current_rating, ) = query_result
return current_rating
# Reaching this point of code execution just means this is the first game for this variant rating
cur.execute("SELECT rating FROM variant_base_ratings "
"WHERE variant_id = %s AND num_players = %s",
(variant_id, num_players)
)
query_result = cur.fetchone()
if query_result is None:
err_msg = "Failed to get current variant rating for variant {}.".format(variant_id)
logger.error(err_msg)
raise ValueError(err_msg)
(base_rating, ) = query_result
return base_rating
def process_rating_of_next_game() -> bool:
game_id = next_game_to_rate()
if game_id is None:
logger.verbose("All games already processed for rating changes.")
return False
logger.verbose("Processing rating for game {}".format(game_id))
cur = conn_manager.get_new_cursor()
# Fetch data on the game played
cur.execute(
"SELECT games.league_id, games.num_players, games.score, variants.num_suits, variants.clue_starved, variants.id "
"FROM games "
"INNER JOIN variants "
" ON games.variant_id = variants.id "
"WHERE games.id = %s",
(game_id,)
)
league_id, num_players, score, num_suits, clue_starved, variant_id = cur.fetchone()
# Fetch game participants and how many games they played each so far
cur.execute("SELECT game_participants.user_id, COUNT(games.id) "
"FROM game_participants "
"INNER JOIN games "
" ON games.id = game_participants.game_id "
"WHERE user_id IN"
" ("
" SELECT game_participants.user_id FROM games "
" INNER JOIN game_participants "
" ON games.id = game_participants.game_id "
" WHERE games.id = %s"
" )"
"AND league_id <= %s "
"GROUP BY user_id",
(game_id, league_id)
)
games_played = {}
for (user_id, num_games) in cur.fetchall():
games_played[user_id] = num_games
if len(games_played) != num_players:
err_msg = "Player number mismatch: Expected {} participants for game {}, but only found {} in DB: [{}]".format(
num_players, game_id, len(games_played), ", ".join(games_played)
)
logger.error(err_msg)
raise ValueError(err_msg)
# Fetch current ratings of variant and players involved
rating_type = utils.get_rating_type(clue_starved)
user_ratings = get_current_user_ratings(list(games_played.keys()), rating_type)
variant_rating = get_current_variant_rating(variant_id, num_players)
# Calculate changes in rating
user_changes, variant_change = compute_rating_changes(user_ratings, games_played, variant_rating, score == 5 * num_suits)
# Update database for variants
cur.execute("INSERT INTO variant_ratings (league_id, variant_id, num_players, change, value_after) "
"VALUES (%s, %s, %s, %s, %s)",
(league_id, variant_id, num_players, variant_change, variant_rating + variant_change)
)
# Note: We do not commit here, only after players have been processed as well
user_ratings_vals = []
for user_id, change in user_changes.items():
user_ratings_vals.append((league_id, user_id, rating_type, change, user_ratings[user_id] + change))
# This updates the player rating.
psycopg2.extras.execute_values(
cur,
"INSERT INTO user_ratings (league_id, user_id, type, change, value_after) "
"VALUES %s",
user_ratings_vals
)
conn_manager.get_connection().commit()
return True
def process_rating_of_all_games():
# It might seem a bit tedious of processing every single game separately,
# which means reading and writing to the database more than we would strictly need.
# However, since we have such a small number of games, this is fast enough without problems,
# and makes the program structure easier, therefore avoiding mistakes and improving granularity
# of our database updates.
while process_rating_of_next_game():
pass