"""Database connectivity and query execution. Replaces database.js: - pyodbc connection pool (via DBUtils.PooledDB) - Health check (checkBackendOkay) - Parameterized query execution (runquery) - All SQL statement definitions - Stats query aggregations """ import logging from datetime import datetime import pyodbc from dbutils.pooled_db import PooledDB logger = logging.getLogger(__name__) # Module-level connection pool _pool = None def init(config=None): """Initialize the MSSQL connection pool. Replaces database.init() from database.js. Uses the same config keys: user, password, server, port, database. Args: config: dict with MSSQL connection parameters. If None, uses safe defaults that will fail on first query. If config['fakedata'] is True, skips actual connection (for tests). """ global _pool if config is None: config = {} # Support fake data mode for testing if config.get('fakedata'): logger.info("Fake data mode enabled - skipping database connection") return user = config.get('user', 'readonly') password = config.get('password', 'XXXXXXXXXXXXXXXX') server = config.get('server', 'localhost') port = str(config.get('port', '1433')) database = config.get('database', 'Linear') logger.info("Initializing MSSQL connection pool: server=%s, database=%s", server, database) dsn = ( f"DRIVER={{ODBC Driver 18 for SQL Server}};" f"SERVER={server},{port};" f"DATABASE={database};" f"UID={user};" f"PWD={password};" "Encrypt=yes;" "TrustServerCertificate=yes;" ) logger.debug("ODBC DSN: %s...", dsn[:50]) try: _pool = PooledDB( creator=pyodbc, maxconnections=10, mincached=1, maxcached=5, blocking=True, maxusage=None, setsession=[], reset=False, dsn=dsn, ) # Verify connectivity immediately conn = _pool.connection() cursor = conn.cursor() cursor.execute("SELECT @@VERSION") cursor.fetchone() cursor.close() conn.close() logger.info("Connected to MSSQL: %s/%s", server, database) except Exception as exc: logger.error("Failed to connect to MSSQL: %s", exc) raise def _get_connection(): """Get a connection from the pool, creating it if needed.""" if _pool is None: logger.error("Database not initialized. Call init() first.") raise RuntimeError("Database not initialized. Call init() first.") logger.debug("Acquiring connection from pool") return _pool.connection() def connected(): """Check if the connection pool is alive and reachable.""" if _pool is None: logger.debug("Connection pool not initialized") return False try: conn = _pool.connection() cursor = conn.cursor() cursor.execute("SELECT 1") cursor.fetchone() cursor.close() conn.close() logger.debug("Database connectivity check passed") return True except Exception as exc: logger.error("Database connectivity check failed: %s", exc) return False def check_backend_okay(): """Health check: verify DB is reachable and has expected data. Replaces checkBackendOkay() from database.js. Checks: - Connection is alive - Member count >= 7 - No duplicate crewnames """ logger.info("Running backend health check") conn = _get_connection() cursor = conn.cursor() try: # Member count check cursor.execute("SELECT COUNT(*) AS MemberCount FROM Adresse") row = cursor.fetchone() logger.debug("Member count: %d", row.MemberCount) if row.MemberCount < 7: raise RuntimeError("Too few members.") # Duplicate crewname check cursor.execute( "SELECT Kurzname AS Crewname, COUNT(*) AS cnt " "FROM Adresse " "GROUP BY Kurzname " "HAVING COUNT(*) > 1" ) duplicates = cursor.fetchall() logger.debug("Duplicate crewname check returned %d results", len(duplicates)) if duplicates: raise RuntimeError("Duplicate membernames.") finally: cursor.close() conn.close() logger.info("Backend health check passed") def _exec_query(statement, params_list): """Execute a parameterized query and return rows as list of dicts. Args: statement: SQL with pyodbc '?' placeholders. params_list: list of parameter values in order. Returns: list of dict rows keyed by column name. """ logger.debug("Executing query: %s with params %s", statement[:80], params_list) conn = _get_connection() cursor = conn.cursor() try: cursor.execute(statement, params_list) columns = [desc[0] for desc in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] logger.debug("Query returned %d rows", len(rows)) return rows finally: cursor.close() conn.close() def run_query(query_def, params): """Execute a parameterized query and return rows as list of dicts. Replaces runquery() from database.js. Args: query_def: dict with 'statement' (str) and 'params' (list of names) or 'special' (str) for stats queries. params: dict of parameter name → value. Returns: list of dict rows. Raises: RuntimeError: if query parameters are missing. Exception: on database errors. """ logger.debug("run_query called with special=%s", query_def.get('special')) # Handle special stats queries special = query_def.get('special') if special: if special == QUERY_STATS_MEMBERS['special']: logger.info("Executing stats query: members") return run_query_stats_members() if special == QUERY_STATS_CONTRACTS['special']: logger.info("Executing stats query: contracts") return run_query_stats_contracts() if special == QUERY_STATS_GENDERS['special']: logger.info("Executing stats query: genders") return run_query_stats_genders() if special == QUERY_STATS_AGES['special']: logger.info("Executing stats query: ages") return run_query_stats_ages(params) raise ValueError(f"Unknown special query: {special}") statement = query_def['statement'] param_names = query_def.get('params', []) if not param_names: logger.debug("Executing query with no params") return _exec_query(statement, []) # Validate all required params are provided missing = [p for p in param_names if p not in params] if missing: logger.error("Missing query parameters: %s", missing) raise RuntimeError(f"Missing query parameters: {missing}") params_list = [params[p] for p in param_names] logger.debug("Executing query with params: %s", params_list) return _exec_query(statement, params_list) # ─── Stats Query Implementations ──────────────────────────────── # These replicate the time-series aggregation logic from database.js. # Each one: # 1. Finds the date range from MIN/MAX date columns # 2. Generates one query per month in that range # 3. Aggregates results into a list of monthly snapshots def _get_date_range(table, min_col, max_col): """Find earliest and latest dates across min/max columns. Returns (mindate, maxdate) as datetime objects. """ logger.debug("Getting date range for table=%s, min_col=%s, max_col=%s", table, min_col, max_col) conn = _get_connection() cursor = conn.cursor() try: cursor.execute( f"SELECT MIN({min_col}) AS min_val, MAX({max_col}) AS max_val " f"FROM {table}" ) row = cursor.fetchone() if not row: logger.debug("No data found in %s, using epoch dates", table) return datetime(1970, 1, 1), datetime.now() logger.debug("Date range for %s: %s to %s", table, row[0], row[1]) return row[0] or datetime(1970, 1, 1), row[1] or datetime.now() finally: cursor.close() conn.close() def _month_end(year, month): """Return the 28th of the given month/year as a datetime.""" return datetime(year, month, 28) def run_query_stats_members(): """Member count time series. Returns list of dicts: [{'Year': int, 'Month': int, 'Members': int}, ...] Counts active members (joined before month-end, not yet left) per month. """ logger.info("Starting members stats query") mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt') logger.debug("Members date range: %s to %s", mindate, maxdate) results = [] year = mindate.year month = mindate.month while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): cutoff = _month_end(year, month) stmt = ( "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " "COUNT(Kurzname) AS Members " "FROM Adresse " "WHERE Eintritt < ? " "AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?)" ) rows = _exec_query(stmt, [year, month, cutoff, cutoff]) if rows: results.append(rows[0]) else: results.append({'Year': year, 'Month': month, 'Members': 0}) month += 1 if month > 12: month = 1 year += 1 logger.info("Members stats query complete: %d months returned", len(results)) return results def run_query_stats_contracts(): """Contract statistics time series. Returns list of dicts: [{'Year': int, 'Month': int, 'Contracts': [{'Type': str, 'Count': int}, ...]}, ...] """ logger.info("Starting contracts stats query") mindate, maxdate = _get_date_range('MgVert', 'VertragBegin', 'VertragEnde') logger.debug("Contracts date range: %s to %s", mindate, maxdate) results = [] year = mindate.year month = mindate.month while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): cutoff = _month_end(year, month) stmt = ( "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " "COUNT(ArtName) AS Contracts, ArtName AS ContractName " "FROM MgVert " "WHERE VertragBegin < ? " "AND (VertragEnde IS NULL OR VertragEnde = '' OR VertragEnde > ?) " "GROUP BY ArtName" ) rows = _exec_query(stmt, [year, month, cutoff, cutoff]) period = { 'Year': year, 'Month': month, 'Contracts': [ {'Type': r['ContractName'], 'Count': r['Contracts']} for r in rows ], } results.append(period) month += 1 if month > 12: month = 1 year += 1 logger.info("Contracts stats query complete: %d months returned", len(results)) return results def run_query_stats_genders(): """Gender demographics time series. Returns list of dicts: [{'Year': int, 'Month': int, 'Male': int, 'Female': int, 'Business': int, 'Other': int}, ...] """ logger.info("Starting gender stats query") mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt') logger.debug("Gender stats date range: %s to %s", mindate, maxdate) results = [] year = mindate.year month = mindate.month while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): cutoff = _month_end(year, month) stmt = ( "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " "Anrede, COUNT(Anrede) AS Anreden, " "Betreung AS Geschlecht, COUNT(Betreung) AS Geschlechter, " "Firma4 AS Firmenname " "FROM Adresse " "WHERE Eintritt < ? " "AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?) " "GROUP BY Anrede, Betreung, Firma4" ) rows = _exec_query(stmt, [year, month, cutoff, cutoff]) entry = { 'Year': year, 'Month': month, 'Male': 0, 'Female': 0, 'Business': 0, 'Other': 0, } for r in rows: # Business members: have a company name (not c/o) firma = r.get('Firmenname') or '' if firma and firma != '' and 'c/o ' not in firma.lower(): entry['Business'] += r['Anreden'] elif r.get('Geschlecht') == 'MÄNNLICH': entry['Male'] += r['Geschlechter'] elif r.get('Geschlecht') == 'WEIBLICH': entry['Female'] += r['Geschlechter'] elif r.get('Anrede') == 'Herr': entry['Male'] += r['Anreden'] elif r.get('Anrede') == 'Frau': entry['Female'] += r['Anreden'] else: entry['Other'] += r['Anreden'] results.append(entry) month += 1 if month > 12: month = 1 year += 1 logger.info("Gender stats query complete: %d months returned", len(results)) return results def run_query_stats_ages(params=None): """Age demographics time series. Returns list of dicts with dynamic age-bucket keys. Args: params: optional dict with query params 'step', 'min', 'max' for age bucket configuration. Defaults: step=10, min=20, max=60. """ logger.info("Starting age stats query") if params is None: params = {} mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt') logger.debug("Age stats date range: %s to %s", mindate, maxdate) # Parse age bucket parameters try: step = max(1, int(params.get('step', 10))) except (ValueError, TypeError): step = 10 try: limit_min = int(params.get('min', 20)) except (ValueError, TypeError): limit_min = 20 try: limit_max = int(params.get('max', 60)) except (ValueError, TypeError): limit_max = 60 logger.debug("Age stats params: step=%d, min=%d, max=%d", step, limit_min, limit_max) # Convert year-based limits to age-based thisyear = datetime.now().year if limit_min > 200: limit_min -= thisyear if limit_max > 200: limit_max -= thisyear if limit_min > limit_max: limit_min, limit_max = limit_max, limit_min results = [] year = mindate.year month = mindate.month while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): cutoff = _month_end(year, month) stmt = ( "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " "DATEPART(YEAR, Geburtsdatum) AS Geburtsjahr, " "COUNT(*) AS Anzahl " "FROM Adresse " "WHERE Eintritt < ? " "AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?) " "GROUP BY DATEPART(YEAR, Geburtsdatum)" ) rows = _exec_query(stmt, [year, month, cutoff, cutoff]) entry = {'Year': year, 'Month': month} minage = 9001 maxage = 0 for r in rows: gebjahr = r.get('Geburtsjahr') if gebjahr: age = thisyear - gebjahr if limit_min and age < limit_min: label = f"< {limit_min} (> {thisyear - limit_min})" elif limit_max and age > limit_max: label = f"> {limit_max} (< {thisyear - limit_max})" elif step == 1: label = f"{age} ({thisyear - age})" minage = min(minage, age) maxage = max(maxage, age) else: startage = age - (age % step) endage = startage + step label = ( f"{startage}-{endage - 1} " f"({thisyear - endage}-{thisyear - startage - 1})" ) minage = min(minage, startage) maxage = max(maxage, endage) else: label = "Other" entry[label] = entry.get(label, 0) + r['Anzahl'] # Ensure all age bucket labels are present for age in range(minage, maxage, step): if step == 1: label = f"{age} ({thisyear - age})" else: label = f"{age}-{age + step - 1} ({thisyear - age - step}-{thisyear - age - 1})" entry.setdefault(label, 0) if limit_min: entry.setdefault(f"< {limit_min} (> {thisyear - limit_min})", 0) entry.setdefault(f"> {limit_max} (< {thisyear - limit_max})", 0) results.append(entry) month += 1 if month > 12: month = 1 year += 1 logger.info("Age stats query complete: %d months returned", len(results)) return results def member_lookup(crewname): """Look up a single member by crewname. Replaces memberlookup() from database.js. Args: crewname: the Kurzname to look up. Returns: dict with keys: Kurzname, Kennung3, Eintritt, Austritt. Raises: RuntimeError: if not found or multiple rows returned. """ logger.debug("Member lookup for crewname: %s", crewname) rows = _exec_query( "SELECT Kurzname, Kennung3, Eintritt, Austritt " "FROM Adresse WHERE Kurzname = ?", [crewname], ) logger.debug("Member lookup returned %d rows", len(rows)) if len(rows) != 1: logger.error("Member lookup for '%s': expected 1 row, got %d", crewname, len(rows)) raise RuntimeError(f"Member lookup for '{crewname}': expected 1 row, got {len(rows)}") logger.info("Member lookup successful for '%s'", crewname) return rows[0] # ─── SQL Statement Definitions ────────────────────────────────── # Parameter placeholders use pyodbc '?' style instead of T-SQL '@name' QUERY_CONTRACTLIST_BY_CREWNAME = { 'statement': ( 'SELECT MgVert.* FROM Adresse, MgVert ' 'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr' ), 'params': ['crewname'], } QUERY_CONTRACT_BY_CREWNAME_AND_CONTRACT = { 'statement': ( 'SELECT MgVert.* FROM Adresse, MgVert ' 'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr ' "AND (MgVert.VertragNr = ? OR MgVert.VertragNr = ' ' + ?)" ), 'params': ['crewname', 'contract'], } QUERY_DEBITLIST_BY_CREWNAME = { 'statement': ( 'SELECT MgSolln.* FROM Adresse, MgSolln ' 'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr ' 'ORDER BY MgSolln.Jahr, MgSolln.Zeitraum' ), 'params': ['crewname'], } QUERY_DEBIT_BY_CREWNAME_AND_GUID = { 'statement': ( 'SELECT MgSolln.* FROM Adresse, MgSolln ' 'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr ' 'AND MgSolln.GUID = ?' ), 'params': ['crewname', 'guid'], } QUERY_MEMBERLIST = { 'statement': ( "SELECT AdrNr, Firma4, Nachname, Vorname, Kurzname, Kennung3, " "Telefon3, Kontaktwoher, Eintritt, Austritt " "FROM Adresse WHERE Kurzname != '' ORDER BY Nachname" ), 'params': [], } QUERY_STATS_MEMBERS = { 'special': 'QUERY_STATS_MEMBERS', } QUERY_STATS_CONTRACTS = { 'special': 'QUERY_STATS_CONTRACTS', } QUERY_STATS_GENDERS = { 'special': 'QUERY_STATS_GENDERS', } QUERY_STATS_AGES = { 'special': 'QUERY_STATS_AGES', } QUERY_MEMBERLIST_RAW = { 'statement': 'SELECT * FROM Adresse ORDER BY Nachname', 'params': [], } QUERY_MEMBER_BY_CREWNAME = { 'statement': 'SELECT * FROM Adresse WHERE Kurzname = ?', 'params': ['crewname'], } QUERY_MEMBER_MEMO_BY_CREWNAME = { 'statement': ( 'SELECT Memof.* FROM Adresse, Memof ' 'WHERE Adresse.Kurzname = ? AND Memof.AdrNr = Adresse.AdrNr' ), 'params': ['crewname'], } QUERY_WITHDRAWALLIST_BY_CREWNAME = { 'statement': ( 'SELECT MgLast.* FROM Adresse, MgLast ' 'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr' ), 'params': ['crewname'], } QUERY_WITHDRAWAL_BY_CREWNAME_AND_GUID = { 'statement': ( 'SELECT MgLast.* FROM Adresse, MgLast ' 'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr ' 'AND MgLast.GUID = ?' ), 'params': ['crewname', 'guid'], } QUERY_PAYMENTLIST_BY_CREWNAME = { 'statement': ( 'SELECT F5bew4.* FROM Adresse, F5bew4 ' 'WHERE Adresse.Kurzname = ? AND F5bew4.AdrNr = Adresse.AdrNr' ), 'params': ['crewname'], }