phase 1 completed

This commit is contained in:
smile 2026-06-06 12:04:59 +02:00
parent a5f928f4e6
commit be5400d349
29 changed files with 318 additions and 24 deletions

2
cteward_ng/README.md Normal file
View file

@ -0,0 +1,2 @@
# cteward-st-lexware Python rewrite
# Read-only REST API backend for Lexware Vereinsverwaltung (MSSQL)

1
cteward_ng/__init__.py Normal file
View file

@ -0,0 +1 @@
"""cteward-st-lexware rewritten in Python/Flask."""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

160
cteward_ng/app.py Normal file
View file

@ -0,0 +1,160 @@
"""Flask application factory and middleware setup."""
import base64
import json
import logging
import os
import socket
from datetime import datetime, timezone
from logging.handlers import RotatingFileHandler
from flask import Flask, request, Response
from flask_cors import CORS
from flask_compress import Compress
from .config import load_config
# Bunyan level mapping
_BUNYAN_LEVELS = {
logging.DEBUG: 20,
logging.INFO: 30,
logging.WARNING: 40,
logging.ERROR: 50,
logging.CRITICAL: 60,
}
# Reverse mapping: string name → Python level
_LEVEL_NAMES = {
'trace': logging.DEBUG,
'debug': logging.DEBUG,
'info': logging.INFO,
'warn': logging.WARNING,
'error': logging.ERROR,
'fatal': logging.CRITICAL,
}
class BunyanFormatter(logging.Formatter):
"""Produce bunyan-style JSON log lines.
Each log line is a single JSON object with keys:
name, hostname, pid, level, msg, time, v
Matches the format expected by the existing test suite.
"""
def __init__(self):
super().__init__()
self.hostname = socket.gethostname()
self.pid = os.getpid()
def format(self, record):
log_entry = {
'name': 'cteward-st-lexware',
'hostname': self.hostname,
'pid': self.pid,
'level': _BUNYAN_LEVELS.get(record.levelno, 30),
'msg': record.getMessage(),
'time': datetime.now(timezone.utc).isoformat(),
'v': 0,
}
# Attach request context if available
if hasattr(record, 'username'):
log_entry['username'] = record.username
if hasattr(record, 'method'):
log_entry['method'] = record.method
if hasattr(record, 'url'):
log_entry['url'] = record.url
return json.dumps(log_entry)
def create_app(config_path=None):
"""Create and configure the Flask application.
Replaces the restify server + middleware chain from startup.js.
"""
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY='cteward-st-lexware',
)
# Load runtime config (JSON file)
app.cteward_config = load_config(config_path)
_setup_logging(app)
_setup_extensions(app)
_register_prehandlers(app)
_register_blueprints(app)
return app
def _setup_logging(app):
"""Setup structured JSON logging similar to bunyan."""
log_level_name = app.cteward_config.get('loglevel', 'info').lower()
logfile = app.cteward_config.get('logfile')
log_level = _LEVEL_NAMES.get(log_level_name, logging.INFO)
handler = (
RotatingFileHandler(logfile)
if logfile
else logging.StreamHandler()
)
handler.setLevel(log_level)
handler.setFormatter(BunyanFormatter())
app.logger.handlers.clear()
app.logger.addHandler(handler)
app.logger.setLevel(log_level)
def _setup_extensions(app):
"""Enable CORS and gzip compression."""
CORS(app)
Compress(app)
def _extract_basic_username(headers):
"""Extract username from Basic auth header for logging."""
auth_header = headers.get('Authorization', '')
if not auth_header:
return 'anonymous'
parts = auth_header.split(' ', 1)
if len(parts) == 2 and parts[0] == 'Basic':
try:
decoded = base64.b64decode(parts[1]).decode('utf-8')
return decoded.split(':')[0]
except Exception:
return 'anonymous'
return 'anonymous'
def _register_prehandlers(app):
"""Before-request hooks: extract username for logging."""
@app.before_request
def log_request():
username = _extract_basic_username(request.headers)
extra = {'username': username, 'method': request.method, 'url': request.url}
# Attach extra fields to the log record so BunyanFormatter picks them up
app.logger.info(
'%s %s %s', username, request.method, request.url,
extra=extra,
extra_data=extra,
)
@app.after_request
def www_authenticate(response):
"""Add WWW-Authenticate header when no auth is present."""
if 'Authorization' not in request.headers:
realm = app.cteward_config.get('server', {}).get(
'realm', 'cteward API access'
)
response.headers['WWW-Authenticate'] = f'Basic realm="{realm}"'
return response
def _register_blueprints(app):
"""Register all route blueprints."""
from .views import legacy_bp
app.register_blueprint(legacy_bp, url_prefix='/legacy')

51
cteward_ng/auth.py Normal file
View file

@ -0,0 +1,51 @@
"""Authentication provider.
Replaces authprovider.js:
- check_password: plaintext and apr1 MD5 hash verification
- find_botuser: bot user lookup from config
- find_ldapuser: LDAP authentication via ldap3
- authorize: full auth pipeline entry point
"""
import logging
logger = logging.getLogger(__name__)
# Placeholder imports — implemented in Phase 4
# from . import database
# from . import memberdata
def check_password(password, hash_value):
"""Verify *password* against *hash_value*.
Supports plaintext and apr1 (MD5 crypt) formats.
Replaces Node.js apache-md5 module.
"""
# Plaintext
if not hash_value.startswith('$'):
return password == hash_value
# Parse algorithm tag: $apr1$...
try:
_, algo, _ = hash_value.split('$', 2)
except ValueError:
raise ValueError("Password hashing algorithm not selected")
if algo == 'apr1':
# TODO Phase 4: use passlib to verify apr1 hash
raise NotImplementedError("apr1 verification not yet implemented")
raise ValueError(f"Unsupported password hashing algorithm: {algo}")
def authorize(ctx):
"""Run the full authorization pipeline.
ctx is a dict with keys: config, request, response, permissions
Returns ctx augmented with: username, flags, data, query, filter, permission
"""
# TODO Phase 4: pipeline
# find_botuser -> find_ldapuser -> find_config_flags ->
# find_database_flags -> impersonate -> effective_permissions
raise NotImplementedError("Authorization pipeline not yet implemented")

41
cteward_ng/config.py Normal file
View file

@ -0,0 +1,41 @@
"""Configuration loader.
Reads the same JSON config format as the Node.js app:
CTEWARD_ST_LEXWARE_CONFIG env var or /etc/cteward/st-lexware.json
Applies the same defaults as startup.js:
mssql={}, server={}, auth={}, auth.bots={}, auth.flags={}
"""
import json
import os
_DEFAULT_CONFIG_PATH = '/etc/cteward/st-lexware.json'
def load_config(config_path=None):
"""Load and apply defaults to the JSON configuration.
Mirrors the config loading in startup.js lines 16-28.
"""
path = config_path or os.environ.get(
'CTEWARD_ST_LEXWARE_CONFIG', _DEFAULT_CONFIG_PATH
)
config = {}
try:
with open(path, 'r') as fh:
config = json.load(fh)
except (FileNotFoundError, json.JSONDecodeError) as exc:
# Same behavior as Node.js: print warning, use defaults
print(f"Can't load configfile '{path}': {exc}")
# Apply defaults (mirrors startup.js)
config.setdefault('mssql', {})
config.setdefault('server', {})
config.setdefault('auth', {})
config['auth'].setdefault('bots', {})
config['auth'].setdefault('flags', {})
return config

205
cteward_ng/database.py Normal file
View file

@ -0,0 +1,205 @@
"""Database connectivity and query execution.
Replaces database.js:
- pyodbc connection pool (via DBUtils)
- Health check (checkBackendOkay)
- Parameterized query execution (runquery)
- All SQL statement definitions
- Stats query aggregations
"""
import logging
logger = logging.getLogger(__name__)
# TODO Phase 2: import pyodbc, DBUtils.PooledDB
# import pyodbc
# from DBUtils.PooledDB import PooledDB
# Module-level connection pool
_pool = None
def init(config=None):
"""Initialize the MSSQL connection pool.
Replaces database.init() from database.js.
Uses the same config keys: user, password, server, port, database.
"""
global _pool
# TODO Phase 2: create PooledDB
raise NotImplementedError("Database not yet initialized")
def connected():
"""Check if the connection pool is alive."""
# TODO Phase 2
raise NotImplementedError
def check_backend_okay():
"""Health check: verify DB is reachable and has expected data.
Replaces checkBackendOkay() from database.js.
"""
# TODO Phase 2
raise NotImplementedError
def run_query(query_def, params):
"""Execute a parameterized query and return rows as list of dicts.
Replaces runquery() from database.js.
Args:
query_def: dict with 'statement' (str) and 'params' (dict of names)
or 'special' (str) for stats queries
params: dict of parameter values
Returns:
list of dict rows
"""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_members():
"""Special aggregation query for member count over time."""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_contracts():
"""Special aggregation query for contract statistics."""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_genders():
"""Special aggregation query for gender demographics."""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_ages():
"""Special aggregation query for age demographics."""
# TODO Phase 2
raise NotImplementedError
def member_lookup(crewname):
"""Look up a single member by crewname.
Replaces memberlookup() from database.js.
Returns a single dict row or raises.
"""
# TODO Phase 2
raise NotImplementedError
# ─── SQL Statement Definitions ───────────────────────────────────────────────
# Replacing the QUERY_* constants from database.js
# Parameter placeholders use pyodbc '?' style instead of T-SQL '@name'
QUERY_CONTRACTLIST_BY_CREWNAME = {
'statement': (
'SELECT MgVert.* FROM Adresse, MgVert '
'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_CONTRACT_BY_CREWNAME_AND_CONTRACT = {
'statement': (
'SELECT MgVert.* FROM Adresse, MgVert '
'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr '
"AND (MgVert.VertragNr = ? OR MgVert.VertragNr = ' ' + ?)"
),
'params': ['crewname', 'contract'],
}
QUERY_DEBITLIST_BY_CREWNAME = {
'statement': (
'SELECT MgSolln.* FROM Adresse, MgSolln '
'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr '
'ORDER BY MgSolln.Jahr, MgSolln.Zeitraum'
),
'params': ['crewname'],
}
QUERY_DEBIT_BY_CREWNAME_AND_GUID = {
'statement': (
'SELECT MgSolln.* FROM Adresse, MgSolln '
'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr '
'AND MgSolln.GUID = ?'
),
'params': ['crewname', 'guid'],
}
QUERY_MEMBERLIST = {
'statement': (
"SELECT AdrNr, Firma4, Nachname, Vorname, Kurzname, Kennung3, "
"Telefon3, Kontaktwoher, Eintritt, Austritt "
"FROM Adresse WHERE Kurzname != '' ORDER BY Nachname"
),
'params': [],
}
QUERY_STATS_MEMBERS = {
'special': 'QUERY_STATS_MEMBERS',
}
QUERY_STATS_CONTRACTS = {
'special': 'QUERY_STATS_CONTRACTS',
}
QUERY_STATS_GENDERS = {
'special': 'QUERY_STATS_GENDERS',
}
QUERY_STATS_AGES = {
'special': 'QUERY_STATS_AGES',
}
QUERY_MEMBERLIST_RAW = {
'statement': 'SELECT * FROM Adresse ORDER BY Nachname',
'params': [],
}
QUERY_MEMBER_BY_CREWNAME = {
'statement': 'SELECT * FROM Adresse WHERE Kurzname = ?',
'params': ['crewname'],
}
QUERY_MEMBER_MEMO_BY_CREWNAME = {
'statement': (
'SELECT Memof.* FROM Adresse, Memof '
'WHERE Adresse.Kurzname = ? AND Memof.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_WITHDRAWALLIST_BY_CREWNAME = {
'statement': (
'SELECT MgLast.* FROM Adresse, MgLast '
'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_WITHDRAWAL_BY_CREWNAME_AND_GUID = {
'statement': (
'SELECT MgLast.* FROM Adresse, MgLast '
'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr '
'AND MgLast.GUID = ?'
),
'params': ['crewname', 'guid'],
}
QUERY_PAYMENTLIST_BY_CREWNAME = {
'statement': (
'SELECT F5bew4.* FROM Adresse, F5bew4 '
'WHERE Adresse.Kurzname = ? AND F5bew4.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}

127
cteward_ng/filters.py Normal file
View file

@ -0,0 +1,127 @@
"""Data filter functions.
Replaces filters.js:
- MEMBERLIST_ACTIVE_ONLY: keep only active crew/raumfahrer/passive
- MEMBERLIST_SELF_ONLY: keep only the requesting user's record
- runfilter: apply a filter function if configured
"""
def memberlist_active_only(ctx):
"""Filter member list to active members only.
Replaces MEMBERLIST_ACTIVE_ONLY from filters.js.
The requesting user is always included regardless of status.
"""
username = ctx.get('username', '')
newdata = []
for row in ctx.get('data', []):
# Always include self
if username == row.get('Kurzname'):
newdata.append(row)
continue
status = _realstatus(row)
if status not in ('crew', 'raumfahrer', 'passiv'):
continue
kurzname = row.get('Kurzname', '')
if not kurzname or kurzname[0] == 'X':
continue
newrow = {
'Kurzname': kurzname,
'Kennung3': row.get('Kennung3'),
'Eintritt': row.get('Eintritt'),
'Kontaktwoher': row.get('Kontaktwoher'),
}
if row.get('Nachname'):
newrow['Nachname'] = row['Nachname']
if row.get('Vorname'):
newrow['Vorname'] = row['Vorname']
if row.get('Austritt'):
newrow['Austritt'] = row['Austritt']
newdata.append(newrow)
ctx['data'] = newdata
return ctx
def memberlist_self_only(ctx):
"""Filter member list to only the requesting user.
Replaces MEMBERLIST_SELF_ONLY from filters.js.
"""
username = ctx.get('username', '')
newdata = []
for row in ctx.get('data', []):
if username == row.get('Kurzname'):
newdata.append(row)
break
ctx['data'] = newdata
return ctx
def runfilter(ctx):
"""Apply the configured filter, if any.
Replaces runfilter() from filters.js.
"""
filt = ctx.get('filter')
if filt is None:
return ctx
return filt(ctx)
# ── Internal helpers ──────────────────────────────────────────────────────────
# We import realstatus lazily to avoid circular imports
_realstatus = None
def _get_realstatus():
global _realstatus
if _realstatus is None:
from .memberdata import realstatus as _rs
_realstatus = _rs
return _realstatus
# Override the closure to use the real function
def memberlist_active_only(ctx):
rs = _get_realstatus()
username = ctx.get('username', '')
newdata = []
for row in ctx.get('data', []):
if username == row.get('Kurzname'):
newdata.append(row)
continue
status = rs(row)
if status not in ('crew', 'raumfahrer', 'passiv'):
continue
kurzname = row.get('Kurzname', '')
if not kurzname or kurzname[0] == 'X':
continue
newrow = {
'Kurzname': kurzname,
'Kennung3': row.get('Kennung3'),
'Eintritt': row.get('Eintritt'),
'Kontaktwoher': row.get('Kontaktwoher'),
}
if row.get('Nachname'):
newrow['Nachname'] = row['Nachname']
if row.get('Vorname'):
newrow['Vorname'] = row['Vorname']
if row.get('Austritt'):
newrow['Austritt'] = row['Austritt']
newdata.append(newrow)
ctx['data'] = newdata
return ctx

86
cteward_ng/mappings.py Normal file
View file

@ -0,0 +1,86 @@
"""Data mapping functions.
Replaces mappings.js transforms raw DB rows into API response shapes.
Mappers:
NONE, CONTRACT, CONTRACTLIST, DEBIT, DEBITLIST, CONTRIBUTIONS,
MEMBER, MEMO, MEMBERLIST, MEMBERLIST_TO_LDAPCSV,
WITHDRAWAL, WITHDRAWALLIST
"""
from . import memberdata
# Placeholder — full implementation in Phase 5
MAPPERS = {}
def none_mapper(ctx):
"""Identity mapper — returns context unchanged."""
return ctx
def contract_mapper(ctx):
"""Map a single contract record."""
# TODO Phase 5
raise NotImplementedError
def contractlist_mapper(ctx):
"""Map a list of contract records into paginated format."""
# TODO Phase 5
raise NotImplementedError
def debit_mapper(ctx):
"""Map a single debit record."""
# TODO Phase 5
raise NotImplementedError
def debitlist_mapper(ctx):
"""Map a list of debit records into paginated format."""
# TODO Phase 5
raise NotImplementedError
def contributions_mapper(ctx):
"""Aggregate contributions (billed/paid/unpaid) across contracts and years."""
# TODO Phase 5
raise NotImplementedError
def member_mapper(ctx):
"""Map a single member record with all fields."""
# TODO Phase 5
raise NotImplementedError
def memo_mapper(ctx):
"""Parse RTF memo and extract embedded JSON data."""
# TODO Phase 5
raise NotImplementedError
def memberlist_mapper(ctx):
"""Map a list of members into paginated format."""
# TODO Phase 5
raise NotImplementedError
def memberlist_to_ldapcsv_mapper(ctx):
"""Map member list to LDAP CSV export format."""
# TODO Phase 5
raise NotImplementedError
def withdrawal_mapper(ctx):
"""Map a single withdrawal record."""
# TODO Phase 5
raise NotImplementedError
def withdrawallist_mapper(ctx):
"""Map a list of withdrawal records into paginated format."""
# TODO Phase 5
raise NotImplementedError

99
cteward_ng/memberdata.py Normal file
View file

@ -0,0 +1,99 @@
"""Member data utility functions.
Replaces memberdata.js:
- realstatus: determine crew/passive/ex-crew/raumfahrer status
- datum: parse 'YYYYMMDD' strings to German date format (d.m.YYYY)
- datum_parsed: parse date strings via Python datetime
- patenarray: split comma-separated sponsor names
- cleanpaten: clean and rejoin sponsor names
"""
from datetime import datetime
def realstatus(member):
"""Determine the real membership status of *member*.
member: dict with keys Kennung3, Austritt, Kurzname
Returns one of: 'crew', 'raumfahrer', 'passiv', 'ex-crew', 'ex-raumfahrer'
"""
if member is None:
raise TypeError("Need a member record to work with")
status = member.get('Kennung3') or ''
# Normalize Kennung3 prefix
if not status or status.startswith('check'):
status = 'crew'
if status.startswith('crew'):
status = 'crew'
if status.startswith('raumfahrer'):
status = 'raumfahrer'
if status.startswith('passiv'):
status = 'passiv'
# Check for expiry date (Austritt in the past)
austritt = member.get('Austritt')
if austritt and austritt != '':
try:
austritt_dt = datetime.strptime(austritt, '%Y-%m-%dT%H:%M:%S.%fZ')
# Set to end of that day
austritt_dt = austritt_dt.replace(hour=23, minute=59, second=59, microsecond=999999)
if austritt_dt < datetime.utcnow():
if status == 'crew' or status == 'passiv':
status = 'ex-crew'
elif status == 'raumfahrer':
status = 'ex-raumfahrer'
except ValueError:
pass
# Disabled crewname prefix
kurzname = member.get('Kurzname') or ''
if kurzname.startswith('disabled-'):
status = 'ex-crew'
return status
def datum(isodate):
"""Parse 'YYYYMMDD' string to German date format 'd.m.YYYY'.
Returns '1.1.1970' on failure.
"""
if not isinstance(isodate, str) or len(isodate) != 8:
return '1.1.1970'
try:
dt = datetime.strptime(isodate, '%Y%m%d')
return f'{dt.day}.{dt.month}.{dt.year}'
except ValueError:
return '1.1.1970'
def datum_parsed(isodate):
"""Parse a date string via Python datetime, return German format.
Returns '1.1.1970' on failure.
"""
try:
dt = datetime.fromisoformat(isodate.replace('Z', '+00:00'))
return f'{dt.day}.{dt.month}.{dt.year}'
except (ValueError, AttributeError):
return '1.1.1970'
def patenarray(patenstr):
"""Split a comma-separated sponsor string into a clean list.
Handles leading/trailing whitespace and empty entries.
"""
if not patenstr:
return []
if ',' not in patenstr:
stripped = patenstr.strip()
return [stripped] if stripped else []
return [p.strip() for p in patenstr.split(',') if p.strip()]
def cleanpaten(patenstr):
"""Clean and rejoin sponsor names as a single comma-separated string."""
return ','.join(patenarray(patenstr))

46
cteward_ng/permissions.py Normal file
View file

@ -0,0 +1,46 @@
"""Flag-based permission resolution.
Replaces the permission logic in authprovider.js:
find_config_flags, find_database_flags, impersonate,
effective_permissions.
"""
import logging
logger = logging.getLogger(__name__)
def find_config_flags(ctx):
"""Assign permission flags from config based on username.
Replaces find_config_flags() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError
def find_database_flags(ctx):
"""Look up user in DB and assign _member_ / _astronaut_ / _passive_ flags.
Replaces find_database_flags() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError
def impersonate(ctx):
"""Handle ?impersonate= query parameter.
Replaces impersonate() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError
def effective_permissions(ctx):
"""Determine the effective permission set (lowest level wins).
Replaces effective_permissions() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError

3
cteward_ng/pytest.ini Normal file
View file

@ -0,0 +1,3 @@
[pytest]
testpaths = cteward-ng/tests
pythonpath = .

70
cteward_ng/renderers.py Normal file
View file

@ -0,0 +1,70 @@
"""Response renderers.
Replaces renderers.js:
- JSON_OUTPUT: JSON with 2-decimal float formatting + JSONP callback support
- CSV_OUTPUT: semicolon-delimited CSV
"""
import csv
import io
import json
from flask import jsonify, Response
def json_output(ctx):
"""Render data as JSON.
Replaces JSON_OUTPUT from renderers.js.
Supports JSONP via ?callback= query parameter.
Rounds floats to 2 decimal places.
"""
data = ctx.get('data', {})
def _replacer(val):
if isinstance(val, float):
return round(val, 2)
return val
json_data = json.dumps(data, default=_replacer, indent=2)
from flask import request
callback = request.args.get('callback')
if callback:
return Response(
f'{callback}({json_data})',
mimetype='application/javascript',
)
return Response(
json_data,
mimetype='application/json; charset=utf-8',
)
def csv_output(ctx):
"""Render data as semicolon-delimited CSV.
Replaces CSV_OUTPUT from renderers.js.
"""
data = ctx.get('data', [])
si = io.StringIO()
writer = csv.writer(
si,
delimiter=';',
lineterminator='\n',
)
if data and isinstance(data[0], dict):
# Write header
writer.writerow(data[0].keys())
for row in data:
writer.writerow(row.values())
body = si.getvalue()
si.close()
return Response(
body.encode('utf-8'),
mimetype='text/csv; charset=utf-8',
)

View file

@ -0,0 +1,10 @@
Flask>=3.0,<4.0
flask-cors>=4.0,<5.0
flask-compress>=1.15,<2.0
pyodbc>=5.0,<6.0
DBUtils>=3.1,<4.0
ldap3>=2.9,<3.0
passlib>=1.7,<2.0
gunicorn>=21.0,<23.0
pytest>=7.0,<9.0
pytest-mock>=3.0,<4.0

View file

@ -0,0 +1 @@
# Tests for cteward-ng

Binary file not shown.

View file

@ -0,0 +1,22 @@
"""Test fixtures and app factory for the test suite.
Replaces test/000-startup.js bootstrap logic.
"""
import pytest
from cteward_ng.app import create_app
@pytest.fixture
def app():
"""Create a test Flask app using the test config."""
app = create_app(config_path='st-lexware-test.json')
app.config['TESTING'] = True
return app
@pytest.fixture
def client(app):
"""Test client for making requests."""
return app.test_client()

View file

@ -0,0 +1,59 @@
"""Test configuration loading.
Verifies that config.py loads the JSON config and applies defaults
the same way startup.js does.
"""
import json
import os
import tempfile
import pytest
from cteward_ng.config import load_config
class TestLoadConfig:
def test_loads_json_file(self):
with tempfile.NamedTemporaryFile(
mode='w', suffix='.json', delete=False
) as fh:
json.dump({'mssql': {'password': 'test'}}, fh)
fh.flush()
config = load_config(fh.name)
os.unlink(fh.name)
assert config['mssql']['password'] == 'test'
def test_applies_defaults(self):
with tempfile.NamedTemporaryFile(
mode='w', suffix='.json', delete=False
) as fh:
json.dump({}, fh)
fh.flush()
config = load_config(fh.name)
os.unlink(fh.name)
assert 'mssql' in config
assert 'server' in config
assert 'auth' in config
assert 'bots' in config['auth']
assert 'flags' in config['auth']
def test_missing_file_returns_defaults(self):
config = load_config('/nonexistent/path.json')
assert 'mssql' in config
assert 'server' in config
assert 'auth' in config
def test_invalid_json_returns_defaults(self):
with tempfile.NamedTemporaryFile(
mode='w', suffix='.json', delete=False
) as fh:
fh.write('{invalid json}')
fh.flush()
config = load_config(fh.name)
os.unlink(fh.name)
assert 'mssql' in config
assert 'server' in config
assert 'auth' in config

View file

@ -0,0 +1,86 @@
"""Unit tests for memberdata utility functions.
Replaces test/memberdata_datum.js, test/memberdata_datum_parsed.js,
test/memberdata_patenarray.js, test/memberdata_realstatus.js.
"""
import pytest
from cteward_ng.memberdata import (
realstatus,
datum,
datum_parsed,
patenarray,
cleanpaten,
)
class TestDatum:
def test_valid_yyyy_mmdd(self):
assert datum('20230115') == '15.1.2023'
def test_invalid_length(self):
assert datum('2023011') == '1.1.1970'
def test_not_a_string(self):
assert datum(12345) == '1.1.1970'
def test_invalid_format(self):
assert datum('notadate') == '1.1.1970'
def test_empty_string(self):
assert datum('') == '1.1.1970'
class TestDatumParsed:
def test_iso_format(self):
result = datum_parsed('2023-01-15T00:00:00.000Z')
assert '2023' in result
def test_invalid(self):
assert datum_parsed('not-a-date') == '1.1.1970'
class TestPatenarray:
def test_single_name(self):
assert patenarray('Alice') == ['Alice']
def test_multiple_names(self):
assert patenarray('Alice, Bob, Charlie') == ['Alice', 'Bob', 'Charlie']
def test_empty_string(self):
assert patenarray('') == []
def test_none(self):
assert patenarray(None) == []
def test_whitespace_handling(self):
assert patenarray(' Alice , Bob ') == ['Alice', 'Bob']
def test_empty_entries(self):
assert patenarray('Alice,,Bob') == ['Alice', 'Bob']
class TestCleanpaten:
def test_clean_and_rejoin(self):
assert cleanpaten(' Alice , Bob , Charlie ') == 'Alice,Bob,Charlie'
class TestRealstatus:
def test_none_raises(self):
with pytest.raises(TypeError):
realstatus(None)
def test_default_to_crew(self):
assert realstatus({}) == 'crew'
def test_explicit_crew(self):
assert realstatus({'Kennung3': 'crew'}) == 'crew'
def test_raumfahrer(self):
assert realstatus({'Kennung3': 'raumfahrer'}) == 'raumfahrer'
def test_passiv(self):
assert realstatus({'Kennung3': 'passiv'}) == 'passiv'
def test_disabled_prefix(self):
assert realstatus({'Kurzname': 'disabled-someone'}) == 'ex-crew'

108
cteward_ng/views.py Normal file
View file

@ -0,0 +1,108 @@
"""Legacy API route handlers.
Replaces all server.get() calls in startup.js.
Blueprint URL prefix: /legacy
Endpoints:
GET /monitor
GET /memberlist-oldformat
GET /stats/members
GET /stats/contracts
GET /stats/genders
GET /stats/ages
GET /member/<crewname>
GET /member/<crewname>/raw
GET /member/<crewname>/memo
GET /member/<crewname>/contributions
GET /member/<crewname>/<contract|debit|withdrawal|payment>/[<id>]/raw/
"""
from flask import Blueprint, current_app, request
legacy_bp = Blueprint('legacy', __name__)
# Placeholder imports — implemented in subsequent phases
# from . import auth
# from . import database
# from . import filters
# from . import mappings
# from . import renderers
@legacy_bp.route('/monitor')
def monitor():
"""Health check endpoint."""
# TODO Phase 6: call database.check_backend_okay()
from flask import jsonify
return jsonify({'status': 'OK'})
@legacy_bp.route('/memberlist-oldformat')
def memberlist_oldformat():
"""CSV member list for LDAP export."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/members')
def stats_members():
"""Member count over time."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/contracts')
def stats_contracts():
"""Contract statistics."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/genders')
def stats_genders():
"""Gender demographics."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/ages')
def stats_ages():
"""Age demographics."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>')
def member(crewname):
"""Member details (or list when crewname is '' or '*')."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/raw')
def member_raw(crewname):
"""Raw DB member record."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/memo')
def member_memo(crewname):
"""RTF memo for a member."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/contributions')
def member_contributions(crewname):
"""Contribution summary for a member."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/<detail_type>/raw/')
@legacy_bp.route('/member/<crewname>/<detail_type>/<detail_id>/raw/')
def member_detail_raw(crewname, detail_type, detail_id=None):
"""Raw detail records (contract, debit, withdrawal, payment)."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")