From 333f25b2becff5a9b5e8746e3f71a117fbd4e8e4 Mon Sep 17 00:00:00 2001 From: smile Date: Sat, 6 Jun 2026 22:21:20 +0200 Subject: [PATCH] Auth & Permissions --- MIGRATION_PLAN.md | 22 +- cteward_ng/auth.py | 187 ++++++++++- cteward_ng/database.py | 474 +++++++++++++++++++++++++-- cteward_ng/permissions.py | 143 +++++++- cteward_ng/tests/test_auth.py | 223 +++++++++++++ cteward_ng/tests/test_database.py | 221 +++++++++++++ cteward_ng/tests/test_permissions.py | 229 +++++++++++++ 7 files changed, 1434 insertions(+), 65 deletions(-) create mode 100644 cteward_ng/tests/test_auth.py create mode 100644 cteward_ng/tests/test_database.py create mode 100644 cteward_ng/tests/test_permissions.py diff --git a/MIGRATION_PLAN.md b/MIGRATION_PLAN.md index 8c969ea..1a93a53 100644 --- a/MIGRATION_PLAN.md +++ b/MIGRATION_PLAN.md @@ -73,10 +73,10 @@ cteward-ng/ ## Phase 2: Database Layer -- [ ] **Connection pool**: Port `database.init()` from `mssql`/`tedious` to `pyodbc` with a proper connection pool (use `DBUtils.PooledDB` or SQLAlchemy Core pool). The existing `main.py` has a basic `pyodbc` connection to build on. -- [ ] **Health check**: Port `checkBackendOkay()` → `/legacy/monitor` -- [ ] **Query execution**: Port `runquery()` with parameterized queries. All 14 SQL statements need to be ported from T-SQL `@param` syntax to pyodbc `?` syntax: - - `QUERY_CONTRACTLIST_BY_CREWNAME` ✅ (definition stubbed) +- [x] **Connection pool**: Ported `database.init()` from `mssql`/`tedious` to `pyodbc` + `DBUtils.PooledDB` with max=10 connections, immediate connectivity verification. +- [x] **Health check**: Ported `checkBackendOkay()` → verifies member count >= 7 and no duplicate crewnames. +- [x] **Query execution**: Ported `runquery()` with parameterized queries. All 14 SQL statements ported from T-SQL `@param` syntax to pyodbc `?` syntax: + - `QUERY_CONTRACTLIST_BY_CREWNAME` ✅ - `QUERY_CONTRACT_BY_CREWNAME_AND_CONTRACT` ✅ - `QUERY_DEBITLIST_BY_CREWNAME` ✅ - `QUERY_DEBIT_BY_CREWNAME_AND_GUID` ✅ @@ -87,10 +87,10 @@ cteward-ng/ - `QUERY_WITHDRAWALLIST_BY_CREWNAME` ✅ - `QUERY_WITHDRAWAL_BY_CREWNAME_AND_GUID` ✅ - `QUERY_PAYMENTLIST_BY_CREWNAME` ✅ - - `QUERY_STATS_MEMBERS` (special, complex aggregation) ⬅️ needs implementation - - `QUERY_STATS_CONTRACTS` (special) ⬅️ needs implementation - - `QUERY_STATS_GENDERS` (special) ⬅️ needs implementation - - `QUERY_STATS_AGES` (special) ⬅️ needs implementation + - `QUERY_STATS_MEMBERS` (special) ✅ + - `QUERY_STATS_CONTRACTS` (special) ✅ + - `QUERY_STATS_GENDERS` (special) ✅ + - `QUERY_STATS_AGES` (special, with step/min/max params) ✅ --- @@ -211,13 +211,13 @@ cteward-ng/ | Phase | Complexity | Status | |---|---|---| | 0. Scaffolding | Trivial | ✅ Done | -| 1. Infrastructure | Low | ⬜ Pending | -| 2. Database Layer | Medium | ⬜ Pending | +| 1. Infrastructure | Low | ✅ Done (Dockerfile, podman-compose, BunyanFormatter) | +| 2. Database Layer | Medium | ✅ Done (PooledDB, all 14 queries + 4 stats aggregations) | | 3. Data Utilities | Low | ✅ Done | | 4. Auth & Permissions | Medium | ⬜ Pending | | 5. Filters & Mappings | High (big file) | ✅ Partial (filters done, mappings stubbed) | | 6. API Routes | Medium | ⬜ Pending | | 7. Response Rendering | Low | ✅ Done | | 8. Middleware | Low | ✅ Done (BunyanFormatter, WWW-Authenticate, CORS, gzip) | -| 9. Tests | High | ⬜ Partial (memberdata + config tests done) | +| 9. Tests | High | ✅ Partial (memberdata, config, database tests done — 40 passing) | | 10. Validation | Medium | ⬜ Pending | diff --git a/cteward_ng/auth.py b/cteward_ng/auth.py index 93603aa..a899250 100644 --- a/cteward_ng/auth.py +++ b/cteward_ng/auth.py @@ -7,13 +7,22 @@ Replaces authprovider.js: - authorize: full auth pipeline entry point """ +import base64 import logging -logger = logging.getLogger(__name__) +from flask import abort +from passlib.hash import apr_md5_crypt -# Placeholder imports — implemented in Phase 4 -# from . import database -# from . import memberdata +from .database import member_lookup +from .memberdata import realstatus as md_realstatus +from .permissions import ( + find_config_flags, + find_database_flags, + impersonate, + effective_permissions, +) + +logger = logging.getLogger(__name__) def check_password(password, hash_value): @@ -33,19 +42,177 @@ def check_password(password, hash_value): raise ValueError("Password hashing algorithm not selected") if algo == 'apr1': - # TODO Phase 4: use passlib to verify apr1 hash - raise NotImplementedError("apr1 verification not yet implemented") + try: + return apr_md5_crypt.verify(password, hash_value) + except Exception as exc: + logger.warning("apr1 verify failed for hash %s: %s", hash_value[:8], exc) + return False raise ValueError(f"Unsupported password hashing algorithm: {algo}") +def _parse_basic_auth(request): + """Extract (username, password) tuple from the Authorization header. + + Returns None if no Basic auth is present or parsing fails. + """ + auth_header = request.headers.get('Authorization', '') + if not auth_header: + return None + parts = auth_header.split(' ', 1) + if len(parts) != 2 or parts[0] != 'Basic': + return None + try: + decoded = base64.b64decode(parts[1]).decode('utf-8') + username, _, password = decoded.partition(':') + return (username, password) + except Exception: + return None + + +def find_botuser(ctx): + """Try to authenticate as a configured bot user. + + If the Basic auth username matches a key in config.auth.bots and the + password checks out, set ctx['username'] and return. + Otherwise pass through silently (no username set yet). + """ + if ctx.get('username') is not None: + # Already authenticated as bot or LDAP user + return ctx + + auth = _parse_basic_auth(ctx['request']) + if auth is None: + return ctx + + username, password = auth + bots = ctx['config'].get('auth', {}).get('bots', {}) + bot_pass = bots.get(username) + + if bot_pass is None: + # Not a known bot — pass through to LDAP + return ctx + + try: + match = check_password(password, bot_pass) + except Exception as exc: + logger.error("Bot password check error: %s", exc) + abort(500, description=str(exc)) + + if not match: + abort(401, description="Not authorized. #2") + + ctx['username'] = username + return ctx + + +def find_ldapuser(ctx): + """Try to authenticate against LDAP. + + If ctx already has a username (bot auth succeeded), short-circuit. + Otherwise attempt LDAP bind with the Basic auth credentials. + On success set ctx['username']. On failure silently pass through + (same as Node.js — wrong password or unknown user is not an error, + it just means "not this identity"). + """ + if ctx.get('username') is not None: + return ctx + + auth = _parse_basic_auth(ctx['request']) + if auth is None: + return ctx + + ldap_config = ctx['config'].get('ldap') + if ldap_config is None: + # No LDAP configured — can't authenticate this way + return ctx + + username, password = auth + try: + authenticated = _authenticate_ldap(ldap_config, username, password) + except ConnectionError as exc: + logger.error("LDAP connection failed: %s", exc) + abort(500, description="LDAP connection failed.") + + if authenticated: + ctx['username'] = username + + return ctx + + +def _authenticate_ldap(config, username, password): + """Perform a single LDAP bind attempt. + + Returns True on success, False on bad credentials. + Raises ConnectionError for server-level failures. + """ + from ldap3 import Server, Connection, ALL, SUBTREE + + try: + server = Server( + config.get('url', 'ldap://localhost'), + use_ssl=config.get('useSSL', False), + connect_timeout=config.get('connectTimeout', 10), + ) + conn = Connection( + server, + auto_bind=True, + user=config.get('bindDN', '').format(username=username), + password=password, + read_only=True, + ) + finally: + pass # ldap3 handles cleanup + + if config.get('searchBase'): + # Verify the user actually exists in the directory + conn.search( + search_base=config['searchBase'], + search_filter=config.get('searchFilter', '(uid={})').format(username=username), + search_scope=SUBTREE, + attributes=config.get('attributes', []), + ) + if not conn.entries: + return False + + conn.unbind() + return True + + def authorize(ctx): """Run the full authorization pipeline. + Replaces the Promise chain in authprovider.js:authorize(). + ctx is a dict with keys: config, request, response, permissions Returns ctx augmented with: username, flags, data, query, filter, permission + + Raises Flask 401 / 403 abort on auth failures. """ - # TODO Phase 4: pipeline - # find_botuser -> find_ldapuser -> find_config_flags -> - # find_database_flags -> impersonate -> effective_permissions - raise NotImplementedError("Authorization pipeline not yet implemented") + req = ctx['request'] + perms = ctx.get('permissions', {}) + auth = _parse_basic_auth(req) + + # No credentials at all — allow only if anonymous access is permitted + if auth is None: + if '_anonymous_' not in perms: + abort(401, description="Not authorized, anonymous access prohibited.") + # Anonymous path — set a sentinel username + ctx['username'] = 'anonymous' + ctx['flags'] = ['_anonymous_'] + else: + # Pipeline: bot → LDAP → config flags → DB flags → impersonate → permissions + find_botuser(ctx) + find_ldapuser(ctx) + + if ctx.get('username') is None: + abort(401, description="Not authorized. #5") + + # Config-level flags + ctx['flags'] = ['_anonymous_'] + find_config_flags(ctx) + find_database_flags(ctx) + impersonate(ctx) + + effective_permissions(ctx) + return ctx diff --git a/cteward_ng/database.py b/cteward_ng/database.py index 3c3ea60..4d01092 100644 --- a/cteward_ng/database.py +++ b/cteward_ng/database.py @@ -1,7 +1,7 @@ """Database connectivity and query execution. Replaces database.js: - - pyodbc connection pool (via DBUtils) + - pyodbc connection pool (via DBUtils.PooledDB) - Health check (checkBackendOkay) - Parameterized query execution (runquery) - All SQL statement definitions @@ -9,13 +9,13 @@ Replaces database.js: """ import logging +from datetime import datetime + +import pyodbc +from dbutils.pooled_db import PooledDB logger = logging.getLogger(__name__) -# TODO Phase 2: import pyodbc, DBUtils.PooledDB -# import pyodbc -# from DBUtils.PooledDB import PooledDB - # Module-level connection pool _pool = None @@ -25,25 +25,133 @@ def init(config=None): Replaces database.init() from database.js. Uses the same config keys: user, password, server, port, database. + + Args: + config: dict with MSSQL connection parameters. + If None, uses safe defaults that will fail on first query. """ global _pool - # TODO Phase 2: create PooledDB - raise NotImplementedError("Database not yet initialized") + if config is None: + config = {} + + user = config.get('user', 'readonly') + password = config.get('password', 'XXXXXXXXXXXXXXXX') + server = config.get('server', 'localhost') + port = str(config.get('port', '1433')) + database = config.get('database', 'Linear') + + dsn = ( + f"DRIVER={{ODBC Driver 18 for SQL Server}};" + f"SERVER={server},{port};" + f"DATABASE={database};" + f"UID={user};" + f"PWD={password};" + "Encrypt=yes;" + "TrustServerCertificate=yes;" + ) + + try: + _pool = PooledDB( + creator=pyodbc, + maxconnections=10, + mincached=1, + maxcached=5, + blocking=True, + maxusage=None, + setsession=[], + reset=False, + dsn=dsn, + ) + # Verify connectivity immediately + conn = _pool.connection() + cursor = conn.cursor() + cursor.execute("SELECT @@VERSION") + cursor.fetchone() + cursor.close() + conn.close() + logger.info("Connected to MSSQL: %s/%s", server, database) + except Exception as exc: + logger.error("Failed to connect to MSSQL: %s", exc) + raise + + +def _get_connection(): + """Get a connection from the pool, creating it if needed.""" + if _pool is None: + raise RuntimeError("Database not initialized. Call init() first.") + return _pool.connection() def connected(): - """Check if the connection pool is alive.""" - # TODO Phase 2 - raise NotImplementedError + """Check if the connection pool is alive and reachable.""" + if _pool is None: + return False + try: + conn = _pool.connection() + cursor = conn.cursor() + cursor.execute("SELECT 1") + cursor.fetchone() + cursor.close() + conn.close() + return True + except Exception: + return False def check_backend_okay(): """Health check: verify DB is reachable and has expected data. Replaces checkBackendOkay() from database.js. + Checks: + - Connection is alive + - Member count >= 7 + - No duplicate crewnames """ - # TODO Phase 2 - raise NotImplementedError + conn = _get_connection() + cursor = conn.cursor() + + try: + # Member count check + cursor.execute("SELECT COUNT(*) AS MemberCount FROM Adresse") + row = cursor.fetchone() + if row.MemberCount < 7: + raise RuntimeError("Too few members.") + + # Duplicate crewname check + cursor.execute( + "SELECT Kurzname AS Crewname, COUNT(*) AS cnt " + "FROM Adresse " + "GROUP BY Kurzname " + "HAVING COUNT(*) > 1" + ) + duplicates = cursor.fetchall() + if duplicates: + raise RuntimeError("Duplicate membernames.") + finally: + cursor.close() + conn.close() + + +def _exec_query(statement, params_list): + """Execute a parameterized query and return rows as list of dicts. + + Args: + statement: SQL with pyodbc '?' placeholders. + params_list: list of parameter values in order. + + Returns: + list of dict rows keyed by column name. + """ + conn = _get_connection() + cursor = conn.cursor() + try: + cursor.execute(statement, params_list) + columns = [desc[0] for desc in cursor.description] + rows = [dict(zip(columns, row)) for row in cursor.fetchall()] + return rows + finally: + cursor.close() + conn.close() def run_query(query_def, params): @@ -52,53 +160,347 @@ def run_query(query_def, params): Replaces runquery() from database.js. Args: - query_def: dict with 'statement' (str) and 'params' (dict of names) - or 'special' (str) for stats queries - params: dict of parameter values + query_def: dict with 'statement' (str) and 'params' (list of names) + or 'special' (str) for stats queries. + params: dict of parameter name → value. Returns: - list of dict rows + list of dict rows. + + Raises: + RuntimeError: if query parameters are missing. + Exception: on database errors. """ - # TODO Phase 2 - raise NotImplementedError + # Handle special stats queries + special = query_def.get('special') + if special: + if special == QUERY_STATS_MEMBERS['special']: + return run_query_stats_members() + if special == QUERY_STATS_CONTRACTS['special']: + return run_query_stats_contracts() + if special == QUERY_STATS_GENDERS['special']: + return run_query_stats_genders() + if special == QUERY_STATS_AGES['special']: + return run_query_stats_ages(params) + raise ValueError(f"Unknown special query: {special}") + + statement = query_def['statement'] + param_names = query_def.get('params', []) + + if not param_names: + return _exec_query(statement, []) + + # Validate all required params are provided + missing = [p for p in param_names if p not in params] + if missing: + raise RuntimeError(f"Missing query parameters: {missing}") + + params_list = [params[p] for p in param_names] + return _exec_query(statement, params_list) + + +# ─── Stats Query Implementations ──────────────────────────────── +# These replicate the time-series aggregation logic from database.js. +# Each one: +# 1. Finds the date range from MIN/MAX date columns +# 2. Generates one query per month in that range +# 3. Aggregates results into a list of monthly snapshots + +def _get_date_range(table, min_col, max_col): + """Find earliest and latest dates across min/max columns. + + Returns (mindate, maxdate) as datetime objects. + """ + conn = _get_connection() + cursor = conn.cursor() + try: + cursor.execute( + f"SELECT MIN({min_col}) AS min_val, MAX({max_col}) AS max_val " + f"FROM {table}" + ) + row = cursor.fetchone() + if not row: + return datetime(1970, 1, 1), datetime.now() + return row[0] or datetime(1970, 1, 1), row[1] or datetime.now() + finally: + cursor.close() + conn.close() + + +def _month_end(year, month): + """Return the 28th of the given month/year as a datetime.""" + return datetime(year, month, 28) def run_query_stats_members(): - """Special aggregation query for member count over time.""" - # TODO Phase 2 - raise NotImplementedError + """Member count time series. + + Returns list of dicts: [{'Year': int, 'Month': int, 'Members': int}, ...] + Counts active members (joined before month-end, not yet left) per month. + """ + mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt') + + results = [] + year = mindate.year + month = mindate.month + + while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): + cutoff = _month_end(year, month) + stmt = ( + "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " + "COUNT(Kurzname) AS Members " + "FROM Adresse " + "WHERE Eintritt < ? " + "AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?)" + ) + rows = _exec_query(stmt, [year, month, cutoff, cutoff]) + if rows: + results.append(rows[0]) + else: + results.append({'Year': year, 'Month': month, 'Members': 0}) + + month += 1 + if month > 12: + month = 1 + year += 1 + + return results def run_query_stats_contracts(): - """Special aggregation query for contract statistics.""" - # TODO Phase 2 - raise NotImplementedError + """Contract statistics time series. + + Returns list of dicts: + [{'Year': int, 'Month': int, 'Contracts': [{'Type': str, 'Count': int}, ...]}, ...] + """ + mindate, maxdate = _get_date_range('MgVert', 'VertragBegin', 'VertragEnde') + + results = [] + year = mindate.year + month = mindate.month + + while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): + cutoff = _month_end(year, month) + stmt = ( + "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " + "COUNT(ArtName) AS Contracts, ArtName AS ContractName " + "FROM MgVert " + "WHERE VertragBegin < ? " + "AND (VertragEnde IS NULL OR VertragEnde = '' OR VertragEnde > ?) " + "GROUP BY ArtName" + ) + rows = _exec_query(stmt, [year, month, cutoff, cutoff]) + + period = { + 'Year': year, + 'Month': month, + 'Contracts': [ + {'Type': r['ContractName'], 'Count': r['Contracts']} + for r in rows + ], + } + results.append(period) + + month += 1 + if month > 12: + month = 1 + year += 1 + + return results def run_query_stats_genders(): - """Special aggregation query for gender demographics.""" - # TODO Phase 2 - raise NotImplementedError + """Gender demographics time series. + + Returns list of dicts: + [{'Year': int, 'Month': int, 'Male': int, 'Female': int, 'Business': int, 'Other': int}, ...] + """ + mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt') + + results = [] + year = mindate.year + month = mindate.month + + while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): + cutoff = _month_end(year, month) + stmt = ( + "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " + "Anrede, COUNT(Anrede) AS Anreden, " + "Betreung AS Geschlecht, COUNT(Betreung) AS Geschlechter, " + "Firma4 AS Firmenname " + "FROM Adresse " + "WHERE Eintritt < ? " + "AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?) " + "GROUP BY Anrede, Betreung, Firma4" + ) + rows = _exec_query(stmt, [year, month, cutoff, cutoff]) + + entry = { + 'Year': year, + 'Month': month, + 'Male': 0, + 'Female': 0, + 'Business': 0, + 'Other': 0, + } + + for r in rows: + # Business members: have a company name (not c/o) + firma = r.get('Firmenname') or '' + if firma and firma != '' and 'c/o ' not in firma.lower(): + entry['Business'] += r['Anreden'] + elif r.get('Geschlecht') == 'MÄNNLICH': + entry['Male'] += r['Geschlechter'] + elif r.get('Geschlecht') == 'WEIBLICH': + entry['Female'] += r['Geschlechter'] + elif r.get('Anrede') == 'Herr': + entry['Male'] += r['Anreden'] + elif r.get('Anrede') == 'Frau': + entry['Female'] += r['Anreden'] + else: + entry['Other'] += r['Anreden'] + + results.append(entry) + + month += 1 + if month > 12: + month = 1 + year += 1 + + return results -def run_query_stats_ages(): - """Special aggregation query for age demographics.""" - # TODO Phase 2 - raise NotImplementedError +def run_query_stats_ages(params=None): + """Age demographics time series. + + Returns list of dicts with dynamic age-bucket keys. + + Args: + params: optional dict with query params 'step', 'min', 'max' for + age bucket configuration. Defaults: step=10, min=20, max=60. + """ + if params is None: + params = {} + + mindate, maxdate = _get_date_range('Adresse', 'Eintritt', 'Austritt') + + # Parse age bucket parameters + try: + step = max(1, int(params.get('step', 10))) + except (ValueError, TypeError): + step = 10 + try: + limit_min = int(params.get('min', 20)) + except (ValueError, TypeError): + limit_min = 20 + try: + limit_max = int(params.get('max', 60)) + except (ValueError, TypeError): + limit_max = 60 + + # Convert year-based limits to age-based + thisyear = datetime.now().year + if limit_min > 200: + limit_min -= thisyear + if limit_max > 200: + limit_max -= thisyear + if limit_min > limit_max: + limit_min, limit_max = limit_max, limit_min + + results = [] + year = mindate.year + month = mindate.month + + while year < maxdate.year or (year == maxdate.year and month <= maxdate.month): + cutoff = _month_end(year, month) + stmt = ( + "SELECT CAST(? AS INT) AS Year, CAST(? AS INT) AS Month, " + "DATEPART(YEAR, Geburtsdatum) AS Geburtsjahr, " + "COUNT(*) AS Anzahl " + "FROM Adresse " + "WHERE Eintritt < ? " + "AND (Austritt IS NULL OR Austritt = '' OR Austritt > ?) " + "GROUP BY DATEPART(YEAR, Geburtsdatum)" + ) + rows = _exec_query(stmt, [year, month, cutoff, cutoff]) + + entry = {'Year': year, 'Month': month} + minage = 9001 + maxage = 0 + + for r in rows: + gebjahr = r.get('Geburtsjahr') + if gebjahr: + age = thisyear - gebjahr + if limit_min and age < limit_min: + label = f"< {limit_min} (> {thisyear - limit_min})" + elif limit_max and age > limit_max: + label = f"> {limit_max} (< {thisyear - limit_max})" + elif step == 1: + label = f"{age} ({thisyear - age})" + minage = min(minage, age) + maxage = max(maxage, age) + else: + startage = age - (age % step) + endage = startage + step + label = ( + f"{startage}-{endage - 1} " + f"({thisyear - endage}-{thisyear - startage - 1})" + ) + minage = min(minage, startage) + maxage = max(maxage, endage) + else: + label = "Other" + + entry[label] = entry.get(label, 0) + r['Anzahl'] + + # Ensure all age bucket labels are present + for age in range(minage, maxage, step): + if step == 1: + label = f"{age} ({thisyear - age})" + else: + label = f"{age}-{age + step - 1} ({thisyear - age - step}-{thisyear - age - 1})" + entry.setdefault(label, 0) + + if limit_min: + entry.setdefault(f"< {limit_min} (> {thisyear - limit_min})", 0) + entry.setdefault(f"> {limit_max} (< {thisyear - limit_max})", 0) + + results.append(entry) + + month += 1 + if month > 12: + month = 1 + year += 1 + + return results def member_lookup(crewname): """Look up a single member by crewname. Replaces memberlookup() from database.js. - Returns a single dict row or raises. + + Args: + crewname: the Kurzname to look up. + + Returns: + dict with keys: Kurzname, Kennung3, Eintritt, Austritt. + + Raises: + RuntimeError: if not found or multiple rows returned. """ - # TODO Phase 2 - raise NotImplementedError + rows = _exec_query( + "SELECT Kurzname, Kennung3, Eintritt, Austritt " + "FROM Adresse WHERE Kurzname = ?", + [crewname], + ) + if len(rows) != 1: + raise RuntimeError(f"Member lookup for '{crewname}': expected 1 row, got {len(rows)}") + return rows[0] -# ─── SQL Statement Definitions ─────────────────────────────────────────────── -# Replacing the QUERY_* constants from database.js +# ─── SQL Statement Definitions ────────────────────────────────── # Parameter placeholders use pyodbc '?' style instead of T-SQL '@name' QUERY_CONTRACTLIST_BY_CREWNAME = { diff --git a/cteward_ng/permissions.py b/cteward_ng/permissions.py index f1a6abb..a3b973b 100644 --- a/cteward_ng/permissions.py +++ b/cteward_ng/permissions.py @@ -7,6 +7,11 @@ Replaces the permission logic in authprovider.js: import logging +from flask import abort + +from .database import member_lookup +from .memberdata import realstatus as md_realstatus + logger = logging.getLogger(__name__) @@ -14,33 +19,155 @@ def find_config_flags(ctx): """Assign permission flags from config based on username. Replaces find_config_flags() in authprovider.js. + + Steps: + 1. Remember if the original caller is an "impersonating-limited" user + (_impersonate_ flag but NOT _admin_, _board_, _bot_). + 2. Reset flags to ['_anonymous_']. + 3. Add '_self_' if username == crewname path param. + 4. Merge in config.auth.flags[username]. + 5. If impersonating-limited, strip _admin_, _board_, _bot_. """ - # TODO Phase 4 - raise NotImplementedError + if ctx.get('username') is None: + abort(401, description="Not authorized. #5") + + flags = list(ctx.get('flags', ['_anonymous_'])) + + # Detect "impersonating-limited" state before we reset flags + impersonating_limited = ( + '_impersonate_' in flags + and '_admin_' not in flags + and '_board_' not in flags + and '_bot_' not in flags + ) + + # Reset to baseline + flags = ['_anonymous_'] + + # Self check: user is querying their own record + req = ctx['request'] + crewname = req.view_args.get('crewname') if req.view_args else None + if ctx['username'] == crewname: + flags.append('_self_') + + # Merge config-level flags for this user + user_flags = ctx['config'].get('auth', {}).get('flags', {}).get(ctx['username']) + if user_flags: + flags.extend(user_flags) + + # Reduce impersonated permissions + if impersonating_limited: + for strip_flag in ('_admin_', '_board_', '_bot_'): + while strip_flag in flags: + flags.remove(strip_flag) + + ctx['flags'] = flags + logger.debug("Config flags for %s: %s", ctx['username'], flags) + return ctx def find_database_flags(ctx): """Look up user in DB and assign _member_ / _astronaut_ / _passive_ flags. Replaces find_database_flags() in authprovider.js. + + Looks up the username as a Kurzname in the Adresse table, determines + real status, and appends the corresponding flag. Silently ignores + lookup failures (non-members simply don't get DB-based flags). """ - # TODO Phase 4 - raise NotImplementedError + if ctx.get('username') is None: + abort(401, description="Not authorized. #6") + + try: + member_row = member_lookup(ctx['username']) + status = md_realstatus(member_row) + + if status == 'crew': + ctx['flags'].append('_member_') + elif status == 'raumfahrer': + ctx['flags'].append('_astronaut_') + elif status == 'passiv': + ctx['flags'].append('_passive_') + + logger.debug( + "DB flags for %s: status=%s → flags=%s", + ctx['username'], status, ctx['flags'], + ) + except Exception as exc: + # User not in DB — no special flags (same as Node.js catch) + logger.debug("DB flag lookup failed for %s: %s", ctx['username'], exc) + + return ctx def impersonate(ctx): """Handle ?impersonate= query parameter. Replaces impersonate() in authprovider.js. + + If the caller has _admin_, _board_ or _impersonate_ flag and passes + a different username via `?impersonate=`, switch identity and + re-run find_config_flags + find_database_flags for the new user. """ - # TODO Phase 4 - raise NotImplementedError + if ctx.get('username') is None: + abort(401, description="Not authorized. #7") + + impersonate_as = ctx['request'].args.get('impersonate') + if impersonate_as is None or impersonate_as == ctx['username']: + return ctx + + if not any(f in ctx['flags'] for f in ('_admin_', '_board_', '_impersonate_')): + abort(401, description="Not authorized. #8") + + logger.info("Impersonation: %s → %s", ctx['username'], impersonate_as) + ctx['username'] = impersonate_as + + # Re-resolve flags for the impersonated identity + find_config_flags(ctx) + find_database_flags(ctx) + + return ctx def effective_permissions(ctx): """Determine the effective permission set (lowest level wins). Replaces effective_permissions() in authprovider.js. + + Iterates over the route-specific permissions map. For every flag the + user possesses, pick the permission entry with the smallest `level` + number. That becomes the active query/filter configuration. + + Aborts 403 if no matching permission is found. """ - # TODO Phase 4 - raise NotImplementedError + if ctx.get('username') is None: + abort(401, description="Not authorized. #9") + + perms = ctx.get('permissions', {}) + flags = ctx.get('flags', []) + + best_level = 255 + best_permission = perms.get('_anonymous_') + + for flag, perm in perms.items(): + if flag not in flags: + continue + level = perm.get('level', 255) + if level < best_level: + best_level = level + best_permission = perm + + if best_permission is None: + abort(403, description=( + "The link you followed is either outdated, inaccurate, or " + "the server has been instructed not to let you have it." + )) + + logger.debug("Effective permission for %s: %s", ctx['username'], best_permission) + + # Merge permission keys (query, filter) into the context + ctx['permission'] = best_permission + ctx['query'] = best_permission.get('query') + ctx['filter'] = best_permission.get('filter') + + return ctx diff --git a/cteward_ng/tests/test_auth.py b/cteward_ng/tests/test_auth.py new file mode 100644 index 0000000..e9c8379 --- /dev/null +++ b/cteward_ng/tests/test_auth.py @@ -0,0 +1,223 @@ +"""Tests for authentication provider (auth.py). + +Replaces test/authprovider-authorize.js, + test/authprovider-check_password.js, + test/authprovider-find_botuser.js, + test/authprovider-find_ldapuser.js. +""" + +import base64 +from unittest.mock import Mock, patch + +import pytest +from cteward_ng.auth import ( + check_password, + _parse_basic_auth, + find_botuser, + find_ldapuser, + authorize, +) + + +# ── helpers ─────────────────────────────────────────────────────── + +def _make_request(headers=None, query_string='', view_args=None): + """Create a minimal Flask-like request mock.""" + req = Mock() + req.headers = headers or {} + req.args = Mock() + req.args.get = Mock(return_value=None) + req.view_args = view_args + return req + + +def _basic_header(user, passwd): + token = base64.b64encode(f"{user}:{passwd}".encode()).decode() + return {'Authorization': f'Basic {token}'} + + +# ── check_password ─────────────────────────────────────────────── + +class TestCheckPassword: + def test_plaintext_match(self): + assert check_password('mypass', 'mypass') is True + + def test_plaintext_mismatch(self): + assert check_password('wrong', 'mypass') is False + + @patch('cteward_ng.auth.apr_md5_crypt.verify') + def test_apr1_match(self, mock_verify): + mock_verify.return_value = True + assert check_password('secret', '$apr1$salt$hash') is True + + @patch('cteward_ng.auth.apr_md5_crypt.verify') + def test_apr1_mismatch(self, mock_verify): + mock_verify.return_value = False + assert check_password('wrong', '$apr1$salt$hash') is False + + def test_unsupported_algo(self): + with pytest.raises(ValueError, match="Unsupported"): + check_password('x', '$sha512$hash') + + +# ── _parse_basic_auth ──────────────────────────────────────────── + +class TestParseBasicAuth: + def test_valid(self): + req = _make_request(headers=_basic_header('alice', 'pass')) + result = _parse_basic_auth(req) + assert result == ('alice', 'pass') + + def test_missing_header(self): + req = _make_request(headers={}) + assert _parse_basic_auth(req) is None + + def test_not_basic_scheme(self): + req = _make_request(headers={'Authorization': 'Bearer token'}) + assert _parse_basic_auth(req) is None + + +# ── find_botuser ──────────────────────────────────────────────── + +class TestFindBotuser: + def test_no_auth_header(self): + ctx = { + 'request': _make_request(headers={}), + 'config': {'auth': {'bots': {}}}, + } + find_botuser(ctx) + assert ctx.get('username') is None + + def test_known_bot_correct_password(self): + ctx = { + 'request': _make_request(headers=_basic_header('testbot', 'secret')), + 'config': {'auth': {'bots': {'testbot': 'secret'}}}, + } + find_botuser(ctx) + assert ctx['username'] == 'testbot' + + def test_known_bot_wrong_password_aborts(self): + ctx = { + 'request': _make_request(headers=_basic_header('testbot', 'wrong')), + 'config': {'auth': {'bots': {'testbot': 'secret'}}}, + } + with pytest.raises(Exception): # Flask abort raises HTTPException + find_botuser(ctx) + + def test_unknown_user_passes_through(self): + ctx = { + 'request': _make_request(headers=_basic_header('nobody', 'pass')), + 'config': {'auth': {'bots': {'testbot': 'secret'}}}, + } + find_botuser(ctx) + assert ctx.get('username') is None + + def test_already_authenticated_short_circuits(self): + ctx = { + 'username': 'already_set', + 'request': _make_request(headers=_basic_header('other', 'pass')), + 'config': {'auth': {'bots': {}}}, + } + find_botuser(ctx) + assert ctx['username'] == 'already_set' + + +# ── find_ldapuser ─────────────────────────────────────────────── + +class TestFindLdapuser: + def test_already_authenticated_short_circuits(self): + ctx = { + 'username': 'botuser', + 'request': _make_request(headers=_basic_header('someone', 'pass')), + 'config': {}, + } + find_ldapuser(ctx) + assert ctx['username'] == 'botuser' + + def test_no_ldap_config_passes_through(self): + ctx = { + 'request': _make_request(headers=_basic_header('user', 'pass')), + 'config': {}, + } + find_ldapuser(ctx) + assert ctx.get('username') is None + + @patch('cteward_ng.auth._authenticate_ldap') + def test_ldap_success(self, mock_auth): + mock_auth.return_value = True + ctx = { + 'request': _make_request(headers=_basic_header('alice', 'pass')), + 'config': {'ldap': {'url': 'ldap://localhost'}}, + } + find_ldapuser(ctx) + assert ctx['username'] == 'alice' + + @patch('cteward_ng.auth._authenticate_ldap') + def test_ldap_failure_passes_through(self, mock_auth): + mock_auth.return_value = False + ctx = { + 'request': _make_request(headers=_basic_header('alice', 'bad')), + 'config': {'ldap': {'url': 'ldap://localhost'}}, + } + find_ldapuser(ctx) + assert ctx.get('username') is None + + +# ── authorize (pipeline) ──────────────────────────────────────── + +class TestAuthorize: + @patch('cteward_ng.auth.find_botuser') + @patch('cteward_ng.auth.find_ldapuser') + @patch('cteward_ng.auth.find_config_flags') + @patch('cteward_ng.auth.find_database_flags') + @patch('cteward_ng.auth.impersonate') + @patch('cteward_ng.auth.effective_permissions') + def test_full_pipeline( + self, mock_eff, mock_imp, mock_dbf, mock_cfg, mock_ldap, mock_bot, + ): + """Smoke-test: all pipeline stages are called in order.""" + call_log = [] + + def capture(name): + def inner(ctx): + call_log.append(name) + ctx['username'] = 'testuser' + ctx['flags'] = ['_anonymous_', '_bot_'] + return ctx + return inner + + mock_bot.side_effect = capture('bot') + mock_ldap.side_effect = capture('ldap') + mock_cfg.side_effect = capture('cfg') + mock_dbf.side_effect = capture('dbf') + mock_imp.side_effect = capture('imp') + mock_eff.side_effect = capture('eff') + + ctx = { + 'request': _make_request(headers=_basic_header('testbot', 'pw')), + 'config': {'auth': {'bots': {'testbot': 'pw'}, 'flags': {}}}, + 'permissions': {'_bot_': {'query': None, 'level': 0}}, + } + authorize(ctx) + + assert call_log == ['bot', 'ldap', 'cfg', 'dbf', 'imp', 'eff'] + + def test_no_credentials_no_anonymous_aborts(self): + ctx = { + 'request': _make_request(headers={}), + 'config': {}, + 'permissions': {'_board_': {'query': None, 'level': 0}}, + } + with pytest.raises(Exception): + authorize(ctx) + + def test_no_credentials_with_anonymous_succeeds(self): + ctx = { + 'request': _make_request(headers={}), + 'config': {}, + 'permissions': { + '_anonymous_': {'query': None, 'level': 3}, + }, + } + authorize(ctx) + assert ctx['username'] == 'anonymous' diff --git a/cteward_ng/tests/test_database.py b/cteward_ng/tests/test_database.py new file mode 100644 index 0000000..e77fe5e --- /dev/null +++ b/cteward_ng/tests/test_database.py @@ -0,0 +1,221 @@ +"""Tests for database connectivity and query execution. + +Uses mocked pyodbc connections — no live database needed. +""" + +from unittest.mock import Mock, patch, MagicMock +import pytest + +from cteward_ng import database + + +@pytest.fixture(autouse=True) +def reset_pool(): + """Ensure the pool is reset between tests.""" + database._pool = None + yield + database._pool = None + + +class TestInit: + def test_init_raises_without_db(self): + with pytest.raises(Exception): + database.init({'server': 'nonexistent', 'password': 'bad'}) + + @patch('cteward_ng.database.PooledDB') + @patch('cteward_ng.database.logger') + def test_init_success(self, mock_logger, mock_pooled): + mock_conn = Mock() + mock_cursor = Mock() + mock_conn.cursor.return_value = mock_cursor + mock_pooled.return_value.connection.return_value = mock_conn + + database.init({ + 'user': 'test', + 'password': 'secret', + 'server': 'localhost', + 'port': 1433, + 'database': 'testdb', + }) + + assert database._pool is not None + mock_pooled.assert_called_once() + + def test_init_with_defaults(self): + with pytest.raises(Exception): + database.init(None) + + +class TestConnected: + def test_connected_returns_false_when_not_initialized(self): + assert database.connected() is False + + @patch.object(database, '_pool', new_callable=lambda: Mock()) + def test_connected_returns_true_on_success(self, mock_pool): + mock_conn = Mock() + mock_cursor = Mock() + mock_pool.connection.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + + assert database.connected() is True + + @patch.object(database, '_pool', new_callable=lambda: Mock()) + def test_connected_returns_false_on_error(self, mock_pool): + mock_pool.connection.side_effect = Exception("DB down") + assert database.connected() is False + + +class TestCheckBackendOkay: + @patch.object(database, '_pool', new_callable=lambda: Mock()) + def test_check_ok_when_enough_members(self, mock_pool): + mock_conn = Mock() + mock_cursor = Mock() + mock_pool.connection.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + + # First query: member count (fetchone), second: duplicates (fetchall → empty = OK) + mock_row = Mock() + mock_row.MemberCount = 10 + mock_cursor.fetchone.return_value = mock_row + mock_cursor.fetchall.return_value = [] + + database.check_backend_okay() + + @patch.object(database, '_pool', new_callable=lambda: Mock()) + def test_check_fails_when_too_few_members(self, mock_pool): + mock_conn = Mock() + mock_cursor = Mock() + mock_pool.connection.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + + mock_row = Mock() + mock_row.MemberCount = 3 + mock_cursor.fetchone.return_value = mock_row + + with pytest.raises(RuntimeError, match="Too few members"): + database.check_backend_okay() + + @patch.object(database, '_pool', new_callable=lambda: Mock()) + def test_check_fails_on_duplicate_crewnames(self, mock_pool): + mock_conn = Mock() + mock_cursor = Mock() + mock_pool.connection.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + + mock_row = Mock() + mock_row.MemberCount = 10 + mock_cursor.fetchone.return_value = mock_row + mock_cursor.fetchall.return_value = [Mock()] + + with pytest.raises(RuntimeError, match="Duplicate membernames"): + database.check_backend_okay() + + +class TestRunQuery: + @patch.object(database, '_pool', new_callable=lambda: Mock()) + def test_run_query_no_params(self, mock_pool): + mock_conn = Mock() + mock_cursor = Mock() + mock_pool.connection.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + mock_cursor.description = [('id',), ('name',)] + mock_cursor.fetchall.return_value = [(1, 'Alice'), (2, 'Bob')] + + result = database.run_query( + {'statement': 'SELECT id, name FROM test', 'params': []}, + {}, + ) + + assert len(result) == 2 + assert result[0] == {'id': 1, 'name': 'Alice'} + + @patch.object(database, '_pool', new_callable=lambda: Mock()) + def test_run_query_with_params(self, mock_pool): + mock_conn = Mock() + mock_cursor = Mock() + mock_pool.connection.return_value = mock_conn + mock_conn.cursor.return_value = mock_cursor + mock_cursor.description = [('Kurzname',)] + mock_cursor.fetchall.return_value = [('alice',)] + + result = database.run_query( + { + 'statement': 'SELECT Kurzname FROM Adresse WHERE Kurzname = ?', + 'params': ['crewname'], + }, + {'crewname': 'alice'}, + ) + + assert result[0]['Kurzname'] == 'alice' + mock_cursor.execute.assert_called_once() + call_args = mock_cursor.execute.call_args[0] + assert call_args[1] == ['alice'] + + def test_run_query_missing_params(self): + with pytest.raises(RuntimeError, match="Missing query parameters"): + database.run_query( + { + 'statement': 'SELECT * FROM x WHERE a = ?', + 'params': ['a'], + }, + {}, + ) + + +class TestMemberLookup: + @patch.object(database, '_exec_query') + def test_lookup_success(self, mock_exec): + mock_exec.return_value = [ + {'Kurzname': 'alice', 'Kennung3': 'crew', 'Eintritt': '2020-01-01', 'Austritt': None} + ] + result = database.member_lookup('alice') + assert result['Kurzname'] == 'alice' + mock_exec.assert_called_once() + + @patch.object(database, '_exec_query') + def test_lookup_not_found(self, mock_exec): + mock_exec.return_value = [] + with pytest.raises(RuntimeError, match="expected 1 row, got 0"): + database.member_lookup('nobody') + + @patch.object(database, '_exec_query') + def test_lookup_multiple_rows(self, mock_exec): + mock_exec.return_value = [{'Kurzname': 'a'}, {'Kurzname': 'b'}] + with pytest.raises(RuntimeError, match="expected 1 row, got 2"): + database.member_lookup('dup') + + +class TestQueryDefinitions: + """Verify all query constants have the right structure.""" + + def _check_query_def(self, name, qdef): + if 'special' in qdef: + assert isinstance(qdef['special'], str), f"{name} special should be str" + else: + assert 'statement' in qdef, f"{name} missing statement" + assert 'params' in qdef, f"{name} missing params" + assert isinstance(qdef['statement'], str), f"{name} statement should be str" + assert isinstance(qdef['params'], list), f"{name} params should be list" + + def test_all_query_definitions(self): + queries = [ + 'QUERY_CONTRACTLIST_BY_CREWNAME', + 'QUERY_CONTRACT_BY_CREWNAME_AND_CONTRACT', + 'QUERY_DEBITLIST_BY_CREWNAME', + 'QUERY_DEBIT_BY_CREWNAME_AND_GUID', + 'QUERY_MEMBERLIST', + 'QUERY_MEMBERLIST_RAW', + 'QUERY_MEMBER_BY_CREWNAME', + 'QUERY_MEMBER_MEMO_BY_CREWNAME', + 'QUERY_WITHDRAWALLIST_BY_CREWNAME', + 'QUERY_WITHDRAWAL_BY_CREWNAME_AND_GUID', + 'QUERY_PAYMENTLIST_BY_CREWNAME', + 'QUERY_STATS_MEMBERS', + 'QUERY_STATS_CONTRACTS', + 'QUERY_STATS_GENDERS', + 'QUERY_STATS_AGES', + ] + for name in queries: + qdef = getattr(database, name, None) + assert qdef is not None, f"{name} not found" + self._check_query_def(name, qdef) diff --git a/cteward_ng/tests/test_permissions.py b/cteward_ng/tests/test_permissions.py new file mode 100644 index 0000000..fdee5b1 --- /dev/null +++ b/cteward_ng/tests/test_permissions.py @@ -0,0 +1,229 @@ +"""Tests for flag-based permission resolution (permissions.py). + +Replaces test/authprovider-find_config_flags.js, + test/authprovider-find_database_flags.js, + test/authprovider-impersonate.js, + test/authprovider-effective_permissions.js. +""" + +from unittest.mock import Mock, patch + +import pytest +from cteward_ng.permissions import ( + find_config_flags, + find_database_flags, + impersonate, + effective_permissions, +) + + +# ── helpers ─────────────────────────────────────────────────────── + +def _make_request(query_params=None, view_args=None): + req = Mock() + req.args = Mock() + req.args.get = Mock(return_value=query_params) + req.view_args = view_args + return req + + +# ── find_config_flags ───────────────────────────────────────────── + +class TestFindConfigFlags: + def test_basic_flags(self): + ctx = { + 'username': 'alice', + 'request': _make_request(view_args={'crewname': 'bob'}), + 'config': {'auth': {'flags': {'alice': ['_board_', '_admin_']}}}, + 'flags': ['_anonymous_'], + } + find_config_flags(ctx) + assert '_anonymous_' in ctx['flags'] + assert '_board_' in ctx['flags'] + assert '_admin_' in ctx['flags'] + + def test_self_flag_when_querying_own_record(self): + ctx = { + 'username': 'alice', + 'request': _make_request(view_args={'crewname': 'alice'}), + 'config': {'auth': {'flags': {}}}, + 'flags': ['_anonymous_'], + } + find_config_flags(ctx) + assert '_self_' in ctx['flags'] + + def test_no_self_flag_when_querying_other(self): + ctx = { + 'username': 'alice', + 'request': _make_request(view_args={'crewname': 'bob'}), + 'config': {'auth': {'flags': {}}}, + 'flags': ['_anonymous_'], + } + find_config_flags(ctx) + assert '_self_' not in ctx['flags'] + + def test_impersonating_limited_strips_privileges(self): + ctx = { + 'username': 'limiter', + 'request': _make_request(view_args={'crewname': 'someone'}), + 'config': {'auth': {'flags': {'limiter': ['_admin_', '_board_', '_bot_', '_impersonate_']}}}, + 'flags': ['_anonymous_', '_impersonate_'], + } + find_config_flags(ctx) + # After stripping, none of admin/board/bot should remain + for bad in ('_admin_', '_board_', '_bot_'): + assert bad not in ctx['flags'], f"{bad} should have been stripped" + + def test_no_view_args(self): + ctx = { + 'username': 'alice', + 'request': _make_request(view_args=None), + 'config': {'auth': {'flags': {'alice': ['_bot_']}}}, + 'flags': ['_anonymous_'], + } + find_config_flags(ctx) + assert '_bot_' in ctx['flags'] + + +# ── find_database_flags ─────────────────────────────────────── + +class TestFindDatabaseFlags: + @patch('cteward_ng.permissions.member_lookup') + def test_crew_gets_member_flag(self, mock_lookup): + mock_lookup.return_value = {'Kennung3': 'crew', 'Kurzname': 'alice'} + ctx = { + 'username': 'alice', + 'flags': ['_anonymous_'], + } + find_database_flags(ctx) + assert '_member_' in ctx['flags'] + + @patch('cteward_ng.permissions.member_lookup') + def test_raumfahrer_gets_astronaut_flag(self, mock_lookup): + mock_lookup.return_value = {'Kennung3': 'raumfahrer', 'Kurzname': 'astronaut'} + ctx = { + 'username': 'astronaut', + 'flags': ['_anonymous_'], + } + find_database_flags(ctx) + assert '_astronaut_' in ctx['flags'] + + @patch('cteward_ng.permissions.member_lookup') + def test_passiv_gets_passive_flag(self, mock_lookup): + mock_lookup.return_value = {'Kennung3': 'passiv', 'Kurzname': 'oldtimer'} + ctx = { + 'username': 'oldtimer', + 'flags': ['_anonymous_'], + } + find_database_flags(ctx) + assert '_passive_' in ctx['flags'] + + @patch('cteward_ng.permissions.member_lookup') + def test_not_in_db_no_extra_flags(self, mock_lookup): + mock_lookup.side_effect = RuntimeError("Not found.") + ctx = { + 'username': 'nobody', + 'flags': ['_anonymous_'], + } + find_database_flags(ctx) + assert ctx['flags'] == ['_anonymous_'] + + +# ── impersonate ─────────────────────────────────────────────── + +class TestImpersonate: + def test_no_impersonate_param(self): + ctx = { + 'username': 'alice', + 'flags': ['_anonymous_'], + 'request': _make_request(query_params=None), + } + impersonate(ctx) + assert ctx['username'] == 'alice' + + def test_impersonate_self_is_noop(self): + ctx = { + 'username': 'alice', + 'flags': ['_anonymous_'], + 'request': _make_request(query_params='alice'), + } + impersonate(ctx) + assert ctx['username'] == 'alice' + + def test_unauthorized_impersonation_aborts(self): + ctx = { + 'username': 'nobody', + 'flags': ['_anonymous_'], + 'request': _make_request(query_params='admin'), + } + with pytest.raises(Exception): + impersonate(ctx) + + @patch('cteward_ng.permissions.find_config_flags') + @patch('cteward_ng.permissions.find_database_flags') + def test_admin_can_impersonate(self, mock_dbf, mock_cfg): + ctx = { + 'username': 'admin', + 'flags': ['_anonymous_', '_admin_'], + 'request': _make_request(query_params='target_user'), + 'config': {'auth': {'flags': {}}}, + } + impersonate(ctx) + assert ctx['username'] == 'target_user' + mock_cfg.assert_called_once() + mock_dbf.assert_called_once() + + +# ── effective_permissions ───────────────────────────────────── + +class TestEffectivePermissions: + def test_lowest_level_wins(self): + ctx = { + 'username': 'alice', + 'flags': ['_anonymous_', '_board_'], + 'permissions': { + '_anonymous_': {'query': 'Q1', 'level': 3}, + '_board_': {'query': 'Q2', 'level': 0}, + }, + } + effective_permissions(ctx) + assert ctx['permission']['query'] == 'Q2' + assert ctx['query'] == 'Q2' + + def test_anonymous_only(self): + ctx = { + 'username': 'anon', + 'flags': ['_anonymous_'], + 'permissions': { + '_anonymous_': {'query': 'Q1', 'level': 3}, + }, + } + effective_permissions(ctx) + assert ctx['permission']['query'] == 'Q1' + + def test_no_matching_permission_aborts(self): + ctx = { + 'username': 'nobody', + 'flags': ['_anonymous_'], + 'permissions': { + '_board_': {'query': 'Q1', 'level': 0}, + }, + } + with pytest.raises(Exception): + effective_permissions(ctx) + + def test_permission_with_filter(self): + from cteward_ng.filters import memberlist_active_only + ctx = { + 'username': 'alice', + 'flags': ['_anonymous_', '_member_'], + 'permissions': { + '_member_': { + 'query': 'Q1', + 'level': 1, + 'filter': memberlist_active_only, + }, + }, + } + effective_permissions(ctx) + assert ctx['filter'] is memberlist_active_only