import psycopg2 import psycopg2.extensions import constants from config import read_db_config from log_setup import logger class DBConnectionManager: def __init__(self): self._conn = None def connect(self): config = read_db_config() logger.debug("Establishing database connection with dbname={}, user={}, password={}".format( config.db_name, config.db_user, config.db_pass )) self._conn = psycopg2.connect("dbname='{}' user='{}' password='{}' host='localhost'".format( config.db_name, config.db_user, config.db_pass )) logger.debug("Established database connection.") def get_connection(self) -> psycopg2.extensions.connection: """ Get the database connection. If not already connected, this reads the database config file and connects to the DB. Otherwise, the already active connection is returned. """ if self._conn is None: self.connect() return self._conn # Global instance that will hold our DB connection conn_manager = DBConnectionManager() def get_existing_tables(): conn = conn_manager.get_connection() cur = conn.cursor() table_names = ", ".join("'{}'".format(tablename) for tablename in constants.DB_TABLE_NAMES) cur.execute( "SELECT tablename FROM pg_tables WHERE" " schemaname = 'public' AND" " tablename IN ({})".format( table_names ) ) return [table for (table,) in cur.fetchall()] def init_database(): """ Warning: This drops all existing tables from the database """ conn = conn_manager.get_connection() cur = conn.cursor() with open(constants.DATABASE_SCHEMA_PATH, "r") as f: cur.execute(f.read()) conn.commit() logger.verbose("Initialized DB tables.")