put database related stuff into own python subpackage

This commit is contained in:
Maximilian Keßler 2023-05-13 19:39:18 +02:00
parent 76d585e656
commit 4e373be425
Signed by: max
GPG Key ID: BCC5A619923C0BA5
7 changed files with 147 additions and 154 deletions

View File

@ -1,7 +1,7 @@
import copy
from typing import Tuple, Optional
from database import conn
from database.database import conn
from compress import decompress_deck, decompress_actions, compress_actions, link
from hanabi import Action, GameState
from hanab_live import HanabLiveInstance, HanabLiveGameState

0
database/__init__.py Normal file
View File

79
database/database.py Normal file
View File

@ -0,0 +1,79 @@
import psycopg2
from typing import Optional, Dict
# global connection
conn = psycopg2.connect("dbname=hanab-live-2 user=postgres")
# cursor
cur = conn.cursor()
# init_database_tables()
# populate_static_tables()
class Game():
def __init__(self, info=None):
self.id = -1
self.num_players = -1
self.score = -1
self.seed = ""
self.variant_id = -1
self.deck_plays = None
self.one_extra_card = None
self.one_less_card = None
self.all_or_nothing = None
self.num_turns = None
if type(info) == dict:
self.__dict__.update(info)
@staticmethod
def from_tuple(t):
g = Game()
g.id = t[0]
g.num_players = t[1]
g.score = t[2]
g.seed = t[3]
g.variant_id = t[4]
g.deck_plays = t[5]
g.one_extra_card = t[6]
g.one_less_card = t[7]
g.all_or_nothing = t[8]
g.num_turns = t[9]
return g
def __eq__(self, other):
return self.__dict__ == other.__dict__
def load(game_id: int) -> Optional[Game]:
cur.execute("SELECT * from games WHERE id = {};".format(game_id))
a = cur.fetchone()
if a is None:
return None
else:
return Game.from_tuple(a)
def store(game: Game):
stored = load(game.id)
if stored is None:
# print("inserting game with id {} into DB".format(game.id))
cur.execute(
"INSERT INTO games"
"(id, num_players, score, seed, variant_id)"
"VALUES"
"(%s, %s, %s, %s, %s);",
(game.id, game.num_players, game.score, game.seed, game.variant_id)
)
print("Inserted game with id {}".format(game.id))
else:
pass
# if not stored == game:
# print("Already stored game with id {}, aborting".format(game.id))
# print("Stored game is: {}".format(stored.__dict__))
# print("New game is: {}".format(game.__dict__))
def commit():
conn.commit()

View File

@ -0,0 +1,29 @@
DROP TABLE IF EXISTS seeds CASCADE;
CREATE TABLE seeds (
seed TEXT NOT NULL PRIMARY KEY,
num_players SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
deck VARCHAR(60) NOT NULL,
feasible BOOLEAN DEFAULT NULL,
max_score_theoretical SMALLINT
);
CREATE INDEX seeds_variant_idx ON seeds (variant_id);
DROP TABLE IF EXISTS games CASCADE;
CREATE TABLE games (
id INT PRIMARY KEY,
seed TEXT NOT NULL REFERENCES seeds,
num_players SMALLINT NOT NULL,
score SMALLINT NOT NULL,
variant_id SMALLINT NOT NULL,
deck_plays BOOLEAN,
one_extra_card BOOLEAN,
one_less_card BOOLEAN,
all_or_nothing BOOLEAN,
num_turns SMALLINT,
actions TEXT
);
CREATE INDEX games_seed_score_idx ON games (seed, score);
CREATE INDEX games_var_seed_idx ON games (variant_id, seed);
CREATE INDEX games_player_idx ON games (num_players);

View File

@ -1,99 +1,39 @@
import json
import psycopg2
from typing import Optional, Dict
import requests
from pathlib import Path
## global connection
conn = psycopg2.connect("dbname=hanab-live user=postgres")
## cursor
cur = conn.cursor()
# cur.execute("DROP TABLE games;")
# conn.commit()
# exit(0)
## check if table exists, else create it
def create_games_table():
tablename = "games"
cur.execute(
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = '{}');".format(tablename))
a = cur.fetchone()
if a[0] is False:
print("Creating table '{}'".format(tablename))
cur.execute(
"CREATE TABLE {} ("
"id INT PRIMARY KEY,"
"num_players SMALLINT NOT NULL,"
"score SMALLINT NOT NULL,"
"seed TEXT NOT NULL,"
"variant_id SMALLINT NOT NULL,"
"deck_plays BOOLEAN,"
"one_extra_card BOOLEAN,"
"one_less_card BOOLEAN,"
"all_or_nothing BOOLEAN,"
"num_turns SMALLINT,"
"actions TEXT"
")".format(tablename))
conn.commit()
from .database import cur, conn
# else:
# print("table already exists")
def create_seeds_table():
tablename = 'seeds'
cur.execute(
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = '{}');".format(tablename))
a = cur.fetchone()
if a[0] is False:
print("Creating table '{}'".format(tablename))
cur.execute(
"CREATE TABLE {} ("
"seed TEXT NOT NULL PRIMARY KEY,"
"num_players SMALLINT NOT NULL,"
"variant_id SMALLINT NOT NULL,"
"feasible BOOLEAN," # theoretical solvability
"max_score_theoretical SMALLINT," # if infeasible, max score
"deck VARCHAR(60)"
")".format(tablename))
conn.commit()
def init_static_tables():
# check if table already exists
create = False
tables = ['suits', 'colors', 'suit_colors', 'variants', 'variant_suits']
for table in tables:
cur.execute(
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = 'public' AND tablename = '{}');".format(table)
)
a = cur.fetchone()
if a[0] is False:
create = True
if not create:
pass
# init tables in database
with open("variant_suits_schema.sql", "r") as f:
def init_database_tables():
this = Path(__file__)
with open(this.parent / "games_seeds_schema.sql") as f:
cur.execute(f.read())
with open("suits.json", "r") as f:
suits: Dict = json.loads(f.read())
with open(this.parent / "variant_suits_schema.sql", "r") as f:
cur.execute(f.read())
with open('variants.json', 'r') as f:
variants = json.loads(f.read())
conn.commit()
def populate_static_tables():
_populate_static_tables(*_download_json_files())
def _populate_static_tables(suits, variants):
suits_to_reverse = set()
for var in variants:
for suit in var['suits']:
if 'Reversed' in suit:
suits_to_reverse.add(suit.replace(' Reversed', ''))
_populate_suits(suits, suits_to_reverse)
_populate_variants(variants)
conn.commit()
def _populate_suits(suits, suits_to_reverse):
for suit in suits:
name: str = suit['name']
display_name: str = suit.get('displayName', name)
@ -150,6 +90,8 @@ def init_static_tables():
(suit_id, color_id)
)
def _populate_variants(variants):
for var in variants:
var_id = var['id']
name = var['name']
@ -210,74 +152,17 @@ def init_static_tables():
(var_id, suit_id, index)
)
conn.commit()
create_games_table()
create_seeds_table()
init_static_tables()
class Game():
def __init__(self, info=None):
self.id = -1
self.num_players = -1
self.score = -1
self.seed = ""
self.variant_id = -1
self.deck_plays = None
self.one_extra_card = None
self.one_less_card = None
self.all_or_nothing = None
self.num_turns = None
if type(info) == dict:
self.__dict__.update(info)
@staticmethod
def from_tuple(t):
g = Game()
g.id = t[0]
g.num_players = t[1]
g.score = t[2]
g.seed = t[3]
g.variant_id = t[4]
g.deck_plays = t[5]
g.one_extra_card = t[6]
g.one_less_card = t[7]
g.all_or_nothing = t[8]
g.num_turns = t[9]
return g
def __eq__(self, other):
return self.__dict__ == other.__dict__
def load(game_id: int) -> Optional[Game]:
cur.execute("SELECT * from games WHERE id = {};".format(game_id))
a = cur.fetchone()
if a is None:
return None
else:
return Game.from_tuple(a)
def store(game: Game):
stored = load(game.id)
if stored is None:
# print("inserting game with id {} into DB".format(game.id))
cur.execute(
"INSERT INTO games"
"(id, num_players, score, seed, variant_id)"
"VALUES"
"(%s, %s, %s, %s, %s);",
(game.id, game.num_players, game.score, game.seed, game.variant_id)
)
else:
if not stored == game:
print("Already stored game with id {}, aborting".format(game.id))
print("Stored game is: {}".format(stored.__dict__))
print("New game is: {}".format(game.__dict__))
def commit():
conn.commit()
def _download_json_files():
base_url = "https://raw.githubusercontent.com/Hanabi-Live/hanabi-live/main/packages/data/src/json"
data = {}
for name in ["suits", "variants"]:
filename = name + '.json'
url = base_url + "/" + filename
response = requests.get(url)
if not response.status_code == 200:
raise RuntimeError(
"Could not download initialization file {} from github (tried url {})".format(filename, url)
)
data[name] = json.loads(response.text)
return data['suits'], data['variants']

View File

@ -2,7 +2,7 @@ import enum
from typing import List
from hanabi import DeckCard, ActionType
from database import cur
from database.database import cur
def variant_id(name):