cteward-ng/cteward_ng/database.py
2026-06-08 23:13:38 +02:00

658 lines
21 KiB
Python

"""Database connectivity and query execution.
Replaces database.js:
- pyodbc connection pool (via DBUtils.PooledDB)
- Health check (checkBackendOkay)
- Parameterized query execution (runquery)
- All SQL statement definitions
- Stats query aggregations
"""
import logging
from datetime import datetime
import pyodbc
from dbutils.pooled_db import PooledDB
logger = logging.getLogger(__name__)
# Module-level connection pool
_pool = None
def init(config=None):
"""Initialize the MSSQL connection pool.
Replaces database.init() from database.js.
Uses the same config keys: user, password, server, port, database.
Args:
config: dict with MSSQL connection parameters.
If None, uses safe defaults that will fail on first query.
If config['fakedata'] is True, skips actual connection (for tests).
"""
global _pool
if config is None:
config = {}
# Support fake data mode for testing
if config.get('fakedata'):
logger.info("Fake data mode enabled - skipping database connection")
return
user = config.get('user', 'readonly')
password = config.get('password', 'XXXXXXXXXXXXXXXX')
server = config.get('server', 'localhost')
port = str(config.get('port', '1433'))
database = config.get('database', 'Linear')
logger.info("Initializing MSSQL connection pool: server=%s, database=%s", server, database)
dsn = (
f"DRIVER={{ODBC Driver 18 for SQL Server}};"
f"SERVER={server},{port};"
f"DATABASE={database};"
f"UID={user};"
f"PWD={password};"
"Encrypt=yes;"
"TrustServerCertificate=yes;"
)
logger.debug("ODBC DSN: %s...", dsn[:50])
try:
_pool = PooledDB(
creator=pyodbc,
maxconnections=10,
mincached=1,
maxcached=5,
blocking=True,
maxusage=None,
setsession=[],
reset=False,
dsn=dsn,
)
# Verify connectivity immediately
conn = _pool.connection()
cursor = conn.cursor()
cursor.execute("SELECT @@VERSION")
cursor.fetchone()
cursor.close()
conn.close()
logger.info("Connected to MSSQL: %s/%s", server, database)
except Exception as exc:
logger.error("Failed to connect to MSSQL: %s", exc)
raise
def _get_connection():
"""Get a connection from the pool, creating it if needed."""
if _pool is None:
logger.error("Database not initialized. Call init() first.")
raise RuntimeError("Database not initialized. Call init() first.")
logger.debug("Acquiring connection from pool")
return _pool.connection()
def connected():
"""Check if the connection pool is alive and reachable."""
if _pool is None:
logger.debug("Connection pool not initialized")
return False
try:
conn = _pool.connection()
cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()
cursor.close()
conn.close()
logger.debug("Database connectivity check passed")
return True
except Exception as exc:
logger.error("Database connectivity check failed: %s", exc)
return False
def check_backend_okay():
"""Health check: verify DB is reachable and has expected data.
Replaces checkBackendOkay() from database.js.
Checks:
- Connection is alive
- Member count >= 7
- No duplicate crewnames
"""
logger.info("Running backend health check")
conn = _get_connection()
cursor = conn.cursor()
try:
# Member count check
cursor.execute("SELECT COUNT(*) AS MemberCount FROM Adresse")
row = cursor.fetchone()
logger.debug("Member count: %d", row.MemberCount)
if row.MemberCount < 7:
raise RuntimeError("Too few members.")
# Duplicate crewname check
cursor.execute(
"SELECT Kurzname AS Crewname, COUNT(*) AS cnt "
"FROM Adresse "
"GROUP BY Kurzname "
"HAVING COUNT(*) > 1"
)
duplicates = cursor.fetchall()
logger.debug("Duplicate crewname check returned %d results", len(duplicates))
if duplicates:
raise RuntimeError("Duplicate membernames.")
finally:
cursor.close()
conn.close()
logger.info("Backend health check passed")
def _exec_query(statement, params_list):
"""Execute a parameterized query and return rows as list of dicts.
Args:
statement: SQL with pyodbc '?' placeholders.
params_list: list of parameter values in order.
Returns:
list of dict rows keyed by column name.
"""
logger.debug("Executing query: %s with params %s", statement[:80], params_list)
conn = _get_connection()
cursor = conn.cursor()
try:
cursor.execute(statement, params_list)
columns = [desc[0] for desc in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
logger.debug("Query returned %d rows", len(rows))
return rows
finally:
cursor.close()
conn.close()
def run_query(query_def, params):
"""Execute a parameterized query and return rows as list of dicts.
Replaces runquery() from database.js.
Args:
query_def: dict with 'statement' (str) and 'params' (list of names)
or 'special' (str) for stats queries.
params: dict of parameter name → value.
Returns:
list of dict rows.
Raises:
RuntimeError: if query parameters are missing.
Exception: on database errors.
"""
logger.debug("run_query called with special=%s", query_def.get('special'))
# Handle special stats queries
special = query_def.get('special')
if special:
if special == QUERY_STATS_MEMBERS['special']:
logger.info("Executing stats query: members")
return run_query_stats_members()
if special == QUERY_STATS_CONTRACTS['special']:
logger.info("Executing stats query: contracts")
return run_query_stats_contracts()
if special == QUERY_STATS_GENDERS['special']:
logger.info("Executing stats query: genders")
return run_query_stats_genders()
if special == QUERY_STATS_AGES['special']:
logger.info("Executing stats query: ages")
return run_query_stats_ages(params)
raise ValueError(f"Unknown special query: {special}")
statement = query_def['statement']
param_names = query_def.get('params', [])
if not param_names:
logger.debug("Executing query with no params")
return _exec_query(statement, [])
# Validate all required params are provided
missing = [p for p in param_names if p not in params]
if missing:
logger.error("Missing query parameters: %s", missing)
raise RuntimeError(f"Missing query parameters: {missing}")
params_list = [params[p] for p in param_names]
logger.debug("Executing query with params: %s", params_list)
return _exec_query(statement, params_list)
# ─── Stats Query Implementations ────────────────────────────────
# These replicate the time-series aggregation logic from database.js.
# Each one:
# 1. Finds the date range from MIN/MAX date columns
# 2. Generates one query per month in that range
# 3. Aggregates results into a list of monthly snapshots
def _get_date_range(table, min_col, max_col):
"""Find earliest and latest dates across min/max columns.
Returns (mindate, maxdate) as datetime objects.
"""
logger.debug("Getting date range for table=%s, min_col=%s, max_col=%s", table, min_col, max_col)
conn = _get_connection()
cursor = conn.cursor()
try:
cursor.execute(
f"SELECT MIN({min_col}) AS min_val, MAX({max_col}) AS max_val "
f"FROM {table}"
)
row = cursor.fetchone()
if not row:
logger.debug("No data found in %s, using epoch dates", table)
return datetime(1970, 1, 1), datetime.now()
logger.debug("Date range for %s: %s to %s", table, row[0], row[1])
return row[0] or datetime(1970, 1, 1), row[1] or datetime.now()
finally:
cursor.close()
conn.close()
def _month_end(year, month):
"""Return the 28th of the given month/year as a datetime."""
return datetime(year, month, 28)
def run_query_stats_members():
"""Member count time series.
Returns list of dicts: [{'Year': int, 'Month': int, 'Members': int}, ...]
Counts active members (joined before month-end, not yet left) per month.
"""
logger.info("Starting members stats query")
mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt')
logger.debug("Members date range: %s to %s", mindate, maxdate)
results = []
year = mindate.year
month = mindate.month
while year < maxdate.year or (year == maxdate.year and month <= maxdate.month):
cutoff = _month_end(year, month)
stmt = (
"SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, "
"COUNT(Kurzname) AS Members "
"FROM Adresse "
"WHERE Eintritt < ? "
"AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?)"
)
rows = _exec_query(stmt, [year, month, cutoff, cutoff])
if rows:
results.append(rows[0])
else:
results.append({'Year': year, 'Month': month, 'Members': 0})
month += 1
if month > 12:
month = 1
year += 1
logger.info("Members stats query complete: %d months returned", len(results))
return results
def run_query_stats_contracts():
"""Contract statistics time series.
Returns list of dicts:
[{'Year': int, 'Month': int, 'Contracts': [{'Type': str, 'Count': int}, ...]}, ...]
"""
logger.info("Starting contracts stats query")
mindate, maxdate = _get_date_range('MgVert', 'VertragBegin', 'VertragEnde')
logger.debug("Contracts date range: %s to %s", mindate, maxdate)
results = []
year = mindate.year
month = mindate.month
while year < maxdate.year or (year == maxdate.year and month <= maxdate.month):
cutoff = _month_end(year, month)
stmt = (
"SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, "
"COUNT(ArtName) AS Contracts, ArtName AS ContractName "
"FROM MgVert "
"WHERE VertragBegin < ? "
"AND (VertragEnde IS NULL OR VertragEnde = '' OR VertragEnde > ?) "
"GROUP BY ArtName"
)
rows = _exec_query(stmt, [year, month, cutoff, cutoff])
period = {
'Year': year,
'Month': month,
'Contracts': [
{'Type': r['ContractName'], 'Count': r['Contracts']}
for r in rows
],
}
results.append(period)
month += 1
if month > 12:
month = 1
year += 1
logger.info("Contracts stats query complete: %d months returned", len(results))
return results
def run_query_stats_genders():
"""Gender demographics time series.
Returns list of dicts:
[{'Year': int, 'Month': int, 'Male': int, 'Female': int, 'Business': int, 'Other': int}, ...]
"""
logger.info("Starting gender stats query")
mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt')
logger.debug("Gender stats date range: %s to %s", mindate, maxdate)
results = []
year = mindate.year
month = mindate.month
while year < maxdate.year or (year == maxdate.year and month <= maxdate.month):
cutoff = _month_end(year, month)
stmt = (
"SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, "
"Anrede, COUNT(Anrede) AS Anreden, "
"Betreung AS Geschlecht, COUNT(Betreung) AS Geschlechter, "
"Firma4 AS Firmenname "
"FROM Adresse "
"WHERE Eintritt < ? "
"AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?) "
"GROUP BY Anrede, Betreung, Firma4"
)
rows = _exec_query(stmt, [year, month, cutoff, cutoff])
entry = {
'Year': year,
'Month': month,
'Male': 0,
'Female': 0,
'Business': 0,
'Other': 0,
}
for r in rows:
# Business members: have a company name (not c/o)
firma = r.get('Firmenname') or ''
if firma and firma != '' and 'c/o ' not in firma.lower():
entry['Business'] += r['Anreden']
elif r.get('Geschlecht') == 'MÄNNLICH':
entry['Male'] += r['Geschlechter']
elif r.get('Geschlecht') == 'WEIBLICH':
entry['Female'] += r['Geschlechter']
elif r.get('Anrede') == 'Herr':
entry['Male'] += r['Anreden']
elif r.get('Anrede') == 'Frau':
entry['Female'] += r['Anreden']
else:
entry['Other'] += r['Anreden']
results.append(entry)
month += 1
if month > 12:
month = 1
year += 1
logger.info("Gender stats query complete: %d months returned", len(results))
return results
def run_query_stats_ages(params=None):
"""Age demographics time series.
Returns list of dicts with dynamic age-bucket keys.
Args:
params: optional dict with query params 'step', 'min', 'max' for
age bucket configuration. Defaults: step=10, min=20, max=60.
"""
logger.info("Starting age stats query")
if params is None:
params = {}
mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt')
logger.debug("Age stats date range: %s to %s", mindate, maxdate)
# Parse age bucket parameters
try:
step = max(1, int(params.get('step', 10)))
except (ValueError, TypeError):
step = 10
try:
limit_min = int(params.get('min', 20))
except (ValueError, TypeError):
limit_min = 20
try:
limit_max = int(params.get('max', 60))
except (ValueError, TypeError):
limit_max = 60
logger.debug("Age stats params: step=%d, min=%d, max=%d", step, limit_min, limit_max)
# Convert year-based limits to age-based
thisyear = datetime.now().year
if limit_min > 200:
limit_min -= thisyear
if limit_max > 200:
limit_max -= thisyear
if limit_min > limit_max:
limit_min, limit_max = limit_max, limit_min
results = []
year = mindate.year
month = mindate.month
while year < maxdate.year or (year == maxdate.year and month <= maxdate.month):
cutoff = _month_end(year, month)
stmt = (
"SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, "
"DATEPART(YEAR, Geburtsdatum) AS Geburtsjahr, "
"COUNT(*) AS Anzahl "
"FROM Adresse "
"WHERE Eintritt < ? "
"AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?) "
"GROUP BY DATEPART(YEAR, Geburtsdatum)"
)
rows = _exec_query(stmt, [year, month, cutoff, cutoff])
entry = {'Year': year, 'Month': month}
minage = 9001
maxage = 0
for r in rows:
gebjahr = r.get('Geburtsjahr')
if gebjahr:
age = thisyear - gebjahr
if limit_min and age < limit_min:
label = f"< {limit_min} (> {thisyear - limit_min})"
elif limit_max and age > limit_max:
label = f"> {limit_max} (< {thisyear - limit_max})"
elif step == 1:
label = f"{age} ({thisyear - age})"
minage = min(minage, age)
maxage = max(maxage, age)
else:
startage = age - (age % step)
endage = startage + step
label = (
f"{startage}-{endage - 1} "
f"({thisyear - endage}-{thisyear - startage - 1})"
)
minage = min(minage, startage)
maxage = max(maxage, endage)
else:
label = "Other"
entry[label] = entry.get(label, 0) + r['Anzahl']
# Ensure all age bucket labels are present
for age in range(minage, maxage, step):
if step == 1:
label = f"{age} ({thisyear - age})"
else:
label = f"{age}-{age + step - 1} ({thisyear - age - step}-{thisyear - age - 1})"
entry.setdefault(label, 0)
if limit_min:
entry.setdefault(f"< {limit_min} (> {thisyear - limit_min})", 0)
entry.setdefault(f"> {limit_max} (< {thisyear - limit_max})", 0)
results.append(entry)
month += 1
if month > 12:
month = 1
year += 1
logger.info("Age stats query complete: %d months returned", len(results))
return results
def member_lookup(crewname):
"""Look up a single member by crewname.
Replaces memberlookup() from database.js.
Args:
crewname: the Kurzname to look up.
Returns:
dict with keys: Kurzname, Kennung3, Eintritt, Austritt.
Raises:
RuntimeError: if not found or multiple rows returned.
"""
logger.debug("Member lookup for crewname: %s", crewname)
rows = _exec_query(
"SELECT Kurzname, Kennung3, Eintritt, Austritt "
"FROM Adresse WHERE Kurzname = ?",
[crewname],
)
logger.debug("Member lookup returned %d rows", len(rows))
if len(rows) != 1:
logger.error("Member lookup for '%s': expected 1 row, got %d", crewname, len(rows))
raise RuntimeError(f"Member lookup for '{crewname}': expected 1 row, got {len(rows)}")
logger.info("Member lookup successful for '%s'", crewname)
return rows[0]
# ─── SQL Statement Definitions ──────────────────────────────────
# Parameter placeholders use pyodbc '?' style instead of T-SQL '@name'
QUERY_CONTRACTLIST_BY_CREWNAME = {
'statement': (
'SELECT MgVert.* FROM Adresse, MgVert '
'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_CONTRACT_BY_CREWNAME_AND_CONTRACT = {
'statement': (
'SELECT MgVert.* FROM Adresse, MgVert '
'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr '
"AND (MgVert.VertragNr = ? OR MgVert.VertragNr = ' ' + ?)"
),
'params': ['crewname', 'contract'],
}
QUERY_DEBITLIST_BY_CREWNAME = {
'statement': (
'SELECT MgSolln.* FROM Adresse, MgSolln '
'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr '
'ORDER BY MgSolln.Jahr, MgSolln.Zeitraum'
),
'params': ['crewname'],
}
QUERY_DEBIT_BY_CREWNAME_AND_GUID = {
'statement': (
'SELECT MgSolln.* FROM Adresse, MgSolln '
'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr '
'AND MgSolln.GUID = ?'
),
'params': ['crewname', 'guid'],
}
QUERY_MEMBERLIST = {
'statement': (
"SELECT AdrNr, Firma4, Nachname, Vorname, Kurzname, Kennung3, "
"Telefon3, Kontaktwoher, Eintritt, Austritt "
"FROM Adresse WHERE Kurzname != '' ORDER BY Nachname"
),
'params': [],
}
QUERY_STATS_MEMBERS = {
'special': 'QUERY_STATS_MEMBERS',
}
QUERY_STATS_CONTRACTS = {
'special': 'QUERY_STATS_CONTRACTS',
}
QUERY_STATS_GENDERS = {
'special': 'QUERY_STATS_GENDERS',
}
QUERY_STATS_AGES = {
'special': 'QUERY_STATS_AGES',
}
QUERY_MEMBERLIST_RAW = {
'statement': 'SELECT * FROM Adresse ORDER BY Nachname',
'params': [],
}
QUERY_MEMBER_BY_CREWNAME = {
'statement': 'SELECT * FROM Adresse WHERE Kurzname = ?',
'params': ['crewname'],
}
QUERY_MEMBER_MEMO_BY_CREWNAME = {
'statement': (
'SELECT Memof.* FROM Adresse, Memof '
'WHERE Adresse.Kurzname = ? AND Memof.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_WITHDRAWALLIST_BY_CREWNAME = {
'statement': (
'SELECT MgLast.* FROM Adresse, MgLast '
'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_WITHDRAWAL_BY_CREWNAME_AND_GUID = {
'statement': (
'SELECT MgLast.* FROM Adresse, MgLast '
'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr '
'AND MgLast.GUID = ?'
),
'params': ['crewname', 'guid'],
}
QUERY_PAYMENTLIST_BY_CREWNAME = {
'statement': (
'SELECT F5bew4.* FROM Adresse, F5bew4 '
'WHERE Adresse.Kurzname = ? AND F5bew4.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}