scaffolding and basics

This commit is contained in:
smile 2026-06-06 10:18:15 +02:00
parent e8b0e787ec
commit a5f928f4e6
18 changed files with 1115 additions and 0 deletions

2
cteward-ng/README.md Normal file
View file

@ -0,0 +1,2 @@
# cteward-st-lexware Python rewrite
# Read-only REST API backend for Lexware Vereinsverwaltung (MSSQL)

1
cteward-ng/__init__.py Normal file
View file

@ -0,0 +1 @@
"""cteward-st-lexware rewritten in Python/Flask."""

100
cteward-ng/app.py Normal file
View file

@ -0,0 +1,100 @@
"""Flask application factory and middleware setup."""
import base64
import logging
from logging.handlers import RotatingFileHandler
from flask import Flask, request, Response
from flask_cors import CORS
from flask_compress import Compress
from .config import load_config
def create_app(config_path=None):
"""Create and configure the Flask application.
Replaces the restify server + middleware chain from startup.js.
"""
app = Flask(__name__)
app.config.from_mapping(
SECRET_KEY='cteward-st-lexware',
)
# Load runtime config (JSON file)
app.cteward_config = load_config(config_path)
_setup_logging(app)
_setup_extensions(app)
_register_prehandlers(app)
_register_blueprints(app)
return app
def _setup_logging(app):
"""Setup structured JSON logging similar to bunyan."""
log_level = app.cteward_config.get('loglevel', 'info').upper()
logfile = app.cteward_config.get('logfile')
handler = (
RotatingFileHandler(logfile)
if logfile
else logging.StreamHandler()
)
handler.setLevel(getattr(logging, log_level, logging.INFO))
formatter = logging.Formatter(
'%(asctime)s %(name)s %(levelname)s %(message)s'
)
handler.setFormatter(formatter)
app.logger.handlers.clear()
app.logger.addHandler(handler)
app.logger.setLevel(handler.level)
def _setup_extensions(app):
"""Enable CORS and gzip compression."""
CORS(app)
Compress(app)
def _extract_basic_username(headers):
"""Extract username from Basic auth header for logging."""
auth_header = headers.get('Authorization', '')
if not auth_header:
return 'anonymous'
parts = auth_header.split(' ', 1)
if len(parts) == 2 and parts[0] == 'Basic':
try:
decoded = base64.b64decode(parts[1]).decode('utf-8')
return decoded.split(':')[0]
except Exception:
return 'anonymous'
return 'anonymous'
def _register_prehandlers(app):
"""Before-request hooks: extract username for logging."""
@app.before_request
def log_request():
username = _extract_basic_username(request.headers)
app.logger.info('%s %s %s', username, request.method, request.url)
@app.after_request
def www_authenticate(response):
"""Add WWW-Authenticate header when no auth is present."""
if 'Authorization' not in request.headers:
realm = app.cteward_config.get('server', {}).get(
'realm', 'cteward API access'
)
response.headers['WWW-Authenticate'] = f'Basic realm="{realm}"'
return response
def _register_blueprints(app):
"""Register all route blueprints."""
from .views import legacy_bp
app.register_blueprint(legacy_bp, url_prefix='/legacy')

51
cteward-ng/auth.py Normal file
View file

@ -0,0 +1,51 @@
"""Authentication provider.
Replaces authprovider.js:
- check_password: plaintext and apr1 MD5 hash verification
- find_botuser: bot user lookup from config
- find_ldapuser: LDAP authentication via ldap3
- authorize: full auth pipeline entry point
"""
import logging
logger = logging.getLogger(__name__)
# Placeholder imports — implemented in Phase 4
# from . import database
# from . import memberdata
def check_password(password, hash_value):
"""Verify *password* against *hash_value*.
Supports plaintext and apr1 (MD5 crypt) formats.
Replaces Node.js apache-md5 module.
"""
# Plaintext
if not hash_value.startswith('$'):
return password == hash_value
# Parse algorithm tag: $apr1$...
try:
_, algo, _ = hash_value.split('$', 2)
except ValueError:
raise ValueError("Password hashing algorithm not selected")
if algo == 'apr1':
# TODO Phase 4: use passlib to verify apr1 hash
raise NotImplementedError("apr1 verification not yet implemented")
raise ValueError(f"Unsupported password hashing algorithm: {algo}")
def authorize(ctx):
"""Run the full authorization pipeline.
ctx is a dict with keys: config, request, response, permissions
Returns ctx augmented with: username, flags, data, query, filter, permission
"""
# TODO Phase 4: pipeline
# find_botuser -> find_ldapuser -> find_config_flags ->
# find_database_flags -> impersonate -> effective_permissions
raise NotImplementedError("Authorization pipeline not yet implemented")

41
cteward-ng/config.py Normal file
View file

@ -0,0 +1,41 @@
"""Configuration loader.
Reads the same JSON config format as the Node.js app:
CTEWARD_ST_LEXWARE_CONFIG env var or /etc/cteward/st-lexware.json
Applies the same defaults as startup.js:
mssql={}, server={}, auth={}, auth.bots={}, auth.flags={}
"""
import json
import os
_DEFAULT_CONFIG_PATH = '/etc/cteward/st-lexware.json'
def load_config(config_path=None):
"""Load and apply defaults to the JSON configuration.
Mirrors the config loading in startup.js lines 16-28.
"""
path = config_path or os.environ.get(
'CTEWARD_ST_LEXWARE_CONFIG', _DEFAULT_CONFIG_PATH
)
config = {}
try:
with open(path, 'r') as fh:
config = json.load(fh)
except (FileNotFoundError, json.JSONDecodeError) as exc:
# Same behavior as Node.js: print warning, use defaults
print(f"Can't load configfile '{path}': {exc}")
# Apply defaults (mirrors startup.js)
config.setdefault('mssql', {})
config.setdefault('server', {})
config.setdefault('auth', {})
config['auth'].setdefault('bots', {})
config['auth'].setdefault('flags', {})
return config

205
cteward-ng/database.py Normal file
View file

@ -0,0 +1,205 @@
"""Database connectivity and query execution.
Replaces database.js:
- pyodbc connection pool (via DBUtils)
- Health check (checkBackendOkay)
- Parameterized query execution (runquery)
- All SQL statement definitions
- Stats query aggregations
"""
import logging
logger = logging.getLogger(__name__)
# TODO Phase 2: import pyodbc, DBUtils.PooledDB
# import pyodbc
# from DBUtils.PooledDB import PooledDB
# Module-level connection pool
_pool = None
def init(config=None):
"""Initialize the MSSQL connection pool.
Replaces database.init() from database.js.
Uses the same config keys: user, password, server, port, database.
"""
global _pool
# TODO Phase 2: create PooledDB
raise NotImplementedError("Database not yet initialized")
def connected():
"""Check if the connection pool is alive."""
# TODO Phase 2
raise NotImplementedError
def check_backend_okay():
"""Health check: verify DB is reachable and has expected data.
Replaces checkBackendOkay() from database.js.
"""
# TODO Phase 2
raise NotImplementedError
def run_query(query_def, params):
"""Execute a parameterized query and return rows as list of dicts.
Replaces runquery() from database.js.
Args:
query_def: dict with 'statement' (str) and 'params' (dict of names)
or 'special' (str) for stats queries
params: dict of parameter values
Returns:
list of dict rows
"""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_members():
"""Special aggregation query for member count over time."""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_contracts():
"""Special aggregation query for contract statistics."""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_genders():
"""Special aggregation query for gender demographics."""
# TODO Phase 2
raise NotImplementedError
def run_query_stats_ages():
"""Special aggregation query for age demographics."""
# TODO Phase 2
raise NotImplementedError
def member_lookup(crewname):
"""Look up a single member by crewname.
Replaces memberlookup() from database.js.
Returns a single dict row or raises.
"""
# TODO Phase 2
raise NotImplementedError
# ─── SQL Statement Definitions ───────────────────────────────────────────────
# Replacing the QUERY_* constants from database.js
# Parameter placeholders use pyodbc '?' style instead of T-SQL '@name'
QUERY_CONTRACTLIST_BY_CREWNAME = {
'statement': (
'SELECT MgVert.* FROM Adresse, MgVert '
'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_CONTRACT_BY_CREWNAME_AND_CONTRACT = {
'statement': (
'SELECT MgVert.* FROM Adresse, MgVert '
'WHERE Adresse.Kurzname = ? AND MgVert.AdrNr = Adresse.AdrNr '
"AND (MgVert.VertragNr = ? OR MgVert.VertragNr = ' ' + ?)"
),
'params': ['crewname', 'contract'],
}
QUERY_DEBITLIST_BY_CREWNAME = {
'statement': (
'SELECT MgSolln.* FROM Adresse, MgSolln '
'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr '
'ORDER BY MgSolln.Jahr, MgSolln.Zeitraum'
),
'params': ['crewname'],
}
QUERY_DEBIT_BY_CREWNAME_AND_GUID = {
'statement': (
'SELECT MgSolln.* FROM Adresse, MgSolln '
'WHERE Adresse.Kurzname = ? AND MgSolln.AdrNr = Adresse.AdrNr '
'AND MgSolln.GUID = ?'
),
'params': ['crewname', 'guid'],
}
QUERY_MEMBERLIST = {
'statement': (
"SELECT AdrNr, Firma4, Nachname, Vorname, Kurzname, Kennung3, "
"Telefon3, Kontaktwoher, Eintritt, Austritt "
"FROM Adresse WHERE Kurzname != '' ORDER BY Nachname"
),
'params': [],
}
QUERY_STATS_MEMBERS = {
'special': 'QUERY_STATS_MEMBERS',
}
QUERY_STATS_CONTRACTS = {
'special': 'QUERY_STATS_CONTRACTS',
}
QUERY_STATS_GENDERS = {
'special': 'QUERY_STATS_GENDERS',
}
QUERY_STATS_AGES = {
'special': 'QUERY_STATS_AGES',
}
QUERY_MEMBERLIST_RAW = {
'statement': 'SELECT * FROM Adresse ORDER BY Nachname',
'params': [],
}
QUERY_MEMBER_BY_CREWNAME = {
'statement': 'SELECT * FROM Adresse WHERE Kurzname = ?',
'params': ['crewname'],
}
QUERY_MEMBER_MEMO_BY_CREWNAME = {
'statement': (
'SELECT Memof.* FROM Adresse, Memof '
'WHERE Adresse.Kurzname = ? AND Memof.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_WITHDRAWALLIST_BY_CREWNAME = {
'statement': (
'SELECT MgLast.* FROM Adresse, MgLast '
'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr'
),
'params': ['crewname'],
}
QUERY_WITHDRAWAL_BY_CREWNAME_AND_GUID = {
'statement': (
'SELECT MgLast.* FROM Adresse, MgLast '
'WHERE Adresse.Kurzname = ? AND MgLast.Adr_Nummer = Adresse.AdrNr '
'AND MgLast.GUID = ?'
),
'params': ['crewname', 'guid'],
}
QUERY_PAYMENTLIST_BY_CREWNAME = {
'statement': (
'SELECT F5bew4.* FROM Adresse, F5bew4 '
'WHERE Adresse.Kurzname = ? AND F5bew4.AdrNr = Adresse.AdrNr'
),
'params': ['crewname'],
}

127
cteward-ng/filters.py Normal file
View file

@ -0,0 +1,127 @@
"""Data filter functions.
Replaces filters.js:
- MEMBERLIST_ACTIVE_ONLY: keep only active crew/raumfahrer/passive
- MEMBERLIST_SELF_ONLY: keep only the requesting user's record
- runfilter: apply a filter function if configured
"""
def memberlist_active_only(ctx):
"""Filter member list to active members only.
Replaces MEMBERLIST_ACTIVE_ONLY from filters.js.
The requesting user is always included regardless of status.
"""
username = ctx.get('username', '')
newdata = []
for row in ctx.get('data', []):
# Always include self
if username == row.get('Kurzname'):
newdata.append(row)
continue
status = _realstatus(row)
if status not in ('crew', 'raumfahrer', 'passiv'):
continue
kurzname = row.get('Kurzname', '')
if not kurzname or kurzname[0] == 'X':
continue
newrow = {
'Kurzname': kurzname,
'Kennung3': row.get('Kennung3'),
'Eintritt': row.get('Eintritt'),
'Kontaktwoher': row.get('Kontaktwoher'),
}
if row.get('Nachname'):
newrow['Nachname'] = row['Nachname']
if row.get('Vorname'):
newrow['Vorname'] = row['Vorname']
if row.get('Austritt'):
newrow['Austritt'] = row['Austritt']
newdata.append(newrow)
ctx['data'] = newdata
return ctx
def memberlist_self_only(ctx):
"""Filter member list to only the requesting user.
Replaces MEMBERLIST_SELF_ONLY from filters.js.
"""
username = ctx.get('username', '')
newdata = []
for row in ctx.get('data', []):
if username == row.get('Kurzname'):
newdata.append(row)
break
ctx['data'] = newdata
return ctx
def runfilter(ctx):
"""Apply the configured filter, if any.
Replaces runfilter() from filters.js.
"""
filt = ctx.get('filter')
if filt is None:
return ctx
return filt(ctx)
# ── Internal helpers ──────────────────────────────────────────────────────────
# We import realstatus lazily to avoid circular imports
_realstatus = None
def _get_realstatus():
global _realstatus
if _realstatus is None:
from .memberdata import realstatus as _rs
_realstatus = _rs
return _realstatus
# Override the closure to use the real function
def memberlist_active_only(ctx):
rs = _get_realstatus()
username = ctx.get('username', '')
newdata = []
for row in ctx.get('data', []):
if username == row.get('Kurzname'):
newdata.append(row)
continue
status = rs(row)
if status not in ('crew', 'raumfahrer', 'passiv'):
continue
kurzname = row.get('Kurzname', '')
if not kurzname or kurzname[0] == 'X':
continue
newrow = {
'Kurzname': kurzname,
'Kennung3': row.get('Kennung3'),
'Eintritt': row.get('Eintritt'),
'Kontaktwoher': row.get('Kontaktwoher'),
}
if row.get('Nachname'):
newrow['Nachname'] = row['Nachname']
if row.get('Vorname'):
newrow['Vorname'] = row['Vorname']
if row.get('Austritt'):
newrow['Austritt'] = row['Austritt']
newdata.append(newrow)
ctx['data'] = newdata
return ctx

86
cteward-ng/mappings.py Normal file
View file

@ -0,0 +1,86 @@
"""Data mapping functions.
Replaces mappings.js transforms raw DB rows into API response shapes.
Mappers:
NONE, CONTRACT, CONTRACTLIST, DEBIT, DEBITLIST, CONTRIBUTIONS,
MEMBER, MEMO, MEMBERLIST, MEMBERLIST_TO_LDAPCSV,
WITHDRAWAL, WITHDRAWALLIST
"""
from . import memberdata
# Placeholder — full implementation in Phase 5
MAPPERS = {}
def none_mapper(ctx):
"""Identity mapper — returns context unchanged."""
return ctx
def contract_mapper(ctx):
"""Map a single contract record."""
# TODO Phase 5
raise NotImplementedError
def contractlist_mapper(ctx):
"""Map a list of contract records into paginated format."""
# TODO Phase 5
raise NotImplementedError
def debit_mapper(ctx):
"""Map a single debit record."""
# TODO Phase 5
raise NotImplementedError
def debitlist_mapper(ctx):
"""Map a list of debit records into paginated format."""
# TODO Phase 5
raise NotImplementedError
def contributions_mapper(ctx):
"""Aggregate contributions (billed/paid/unpaid) across contracts and years."""
# TODO Phase 5
raise NotImplementedError
def member_mapper(ctx):
"""Map a single member record with all fields."""
# TODO Phase 5
raise NotImplementedError
def memo_mapper(ctx):
"""Parse RTF memo and extract embedded JSON data."""
# TODO Phase 5
raise NotImplementedError
def memberlist_mapper(ctx):
"""Map a list of members into paginated format."""
# TODO Phase 5
raise NotImplementedError
def memberlist_to_ldapcsv_mapper(ctx):
"""Map member list to LDAP CSV export format."""
# TODO Phase 5
raise NotImplementedError
def withdrawal_mapper(ctx):
"""Map a single withdrawal record."""
# TODO Phase 5
raise NotImplementedError
def withdrawallist_mapper(ctx):
"""Map a list of withdrawal records into paginated format."""
# TODO Phase 5
raise NotImplementedError

99
cteward-ng/memberdata.py Normal file
View file

@ -0,0 +1,99 @@
"""Member data utility functions.
Replaces memberdata.js:
- realstatus: determine crew/passive/ex-crew/raumfahrer status
- datum: parse 'YYYYMMDD' strings to German date format (d.m.YYYY)
- datum_parsed: parse date strings via Python datetime
- patenarray: split comma-separated sponsor names
- cleanpaten: clean and rejoin sponsor names
"""
from datetime import datetime
def realstatus(member):
"""Determine the real membership status of *member*.
member: dict with keys Kennung3, Austritt, Kurzname
Returns one of: 'crew', 'raumfahrer', 'passiv', 'ex-crew', 'ex-raumfahrer'
"""
if member is None:
raise TypeError("Need a member record to work with")
status = member.get('Kennung3') or ''
# Normalize Kennung3 prefix
if not status or status.startswith('check'):
status = 'crew'
if status.startswith('crew'):
status = 'crew'
if status.startswith('raumfahrer'):
status = 'raumfahrer'
if status.startswith('passiv'):
status = 'passiv'
# Check for expiry date (Austritt in the past)
austritt = member.get('Austritt')
if austritt and austritt != '':
try:
austritt_dt = datetime.strptime(austritt, '%Y-%m-%dT%H:%M:%S.%fZ')
# Set to end of that day
austritt_dt = austritt_dt.replace(hour=23, minute=59, second=59, microsecond=999999)
if austritt_dt < datetime.utcnow():
if status == 'crew' or status == 'passiv':
status = 'ex-crew'
elif status == 'raumfahrer':
status = 'ex-raumfahrer'
except ValueError:
pass
# Disabled crewname prefix
kurzname = member.get('Kurzname') or ''
if kurzname.startswith('disabled-'):
status = 'ex-crew'
return status
def datum(isodate):
"""Parse 'YYYYMMDD' string to German date format 'd.m.YYYY'.
Returns '1.1.1970' on failure.
"""
if not isinstance(isodate, str) or len(isodate) != 8:
return '1.1.1970'
try:
dt = datetime.strptime(isodate, '%Y%m%d')
return f'{dt.day}.{dt.month + 1}.{dt.year}'
except ValueError:
return '1.1.1970'
def datum_parsed(isodate):
"""Parse a date string via Python datetime, return German format.
Returns '1.1.1970' on failure.
"""
try:
dt = datetime.fromisoformat(isodate.replace('Z', '+00:00'))
return f'{dt.day}.{dt.month}.{dt.year}'
except (ValueError, AttributeError):
return '1.1.1970'
def patenarray(patenstr):
"""Split a comma-separated sponsor string into a clean list.
Handles leading/trailing whitespace and empty entries.
"""
if not patenstr:
return []
if ',' not in patenstr:
stripped = patenstr.strip()
return [stripped] if stripped else []
return [p.strip() for p in patenstr.split(',') if p.strip()]
def cleanpaten(patenstr):
"""Clean and rejoin sponsor names as a single comma-separated string."""
return ','.join(patenarray(patenstr))

46
cteward-ng/permissions.py Normal file
View file

@ -0,0 +1,46 @@
"""Flag-based permission resolution.
Replaces the permission logic in authprovider.js:
find_config_flags, find_database_flags, impersonate,
effective_permissions.
"""
import logging
logger = logging.getLogger(__name__)
def find_config_flags(ctx):
"""Assign permission flags from config based on username.
Replaces find_config_flags() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError
def find_database_flags(ctx):
"""Look up user in DB and assign _member_ / _astronaut_ / _passive_ flags.
Replaces find_database_flags() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError
def impersonate(ctx):
"""Handle ?impersonate= query parameter.
Replaces impersonate() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError
def effective_permissions(ctx):
"""Determine the effective permission set (lowest level wins).
Replaces effective_permissions() in authprovider.js.
"""
# TODO Phase 4
raise NotImplementedError

5
cteward-ng/pytest.ini Normal file
View file

@ -0,0 +1,5 @@
"""pytest configuration."""
[pytest]
testpaths = cteward-ng/tests
pythonpath = .

70
cteward-ng/renderers.py Normal file
View file

@ -0,0 +1,70 @@
"""Response renderers.
Replaces renderers.js:
- JSON_OUTPUT: JSON with 2-decimal float formatting + JSONP callback support
- CSV_OUTPUT: semicolon-delimited CSV
"""
import csv
import io
import json
from flask import jsonify, Response
def json_output(ctx):
"""Render data as JSON.
Replaces JSON_OUTPUT from renderers.js.
Supports JSONP via ?callback= query parameter.
Rounds floats to 2 decimal places.
"""
data = ctx.get('data', {})
def _replacer(val):
if isinstance(val, float):
return round(val, 2)
return val
json_data = json.dumps(data, default=_replacer, indent=2)
from flask import request
callback = request.args.get('callback')
if callback:
return Response(
f'{callback}({json_data})',
mimetype='application/javascript',
)
return Response(
json_data,
mimetype='application/json; charset=utf-8',
)
def csv_output(ctx):
"""Render data as semicolon-delimited CSV.
Replaces CSV_OUTPUT from renderers.js.
"""
data = ctx.get('data', [])
si = io.StringIO()
writer = csv.writer(
si,
delimiter=';',
lineterminator='\n',
)
if data and isinstance(data[0], dict):
# Write header
writer.writerow(data[0].keys())
for row in data:
writer.writerow(row.values())
body = si.getvalue()
si.close()
return Response(
body.encode('utf-8'),
mimetype='text/csv; charset=utf-8',
)

View file

@ -0,0 +1,10 @@
Flask>=3.0,<4.0
flask-cors>=4.0,<5.0
flask-compress>=1.15,<2.0
pyodbc>=5.0,<6.0
DBUtils>=3.1,<4.0
ldap3>=2.9,<3.0
passlib>=1.7,<2.0
gunicorn>=21.0,<23.0
pytest>=7.0,<9.0
pytest-mock>=3.0,<4.0

View file

@ -0,0 +1 @@
# Tests for cteward-ng

View file

@ -0,0 +1,22 @@
"""Test fixtures and app factory for the test suite.
Replaces test/000-startup.js bootstrap logic.
"""
import pytest
from cteward_ng.app import create_app
@pytest.fixture
def app():
"""Create a test Flask app using the test config."""
app = create_app(config_path='st-lexware-test.json')
app.config['TESTING'] = True
return app
@pytest.fixture
def client(app):
"""Test client for making requests."""
return app.test_client()

View file

@ -0,0 +1,55 @@
"""Test configuration loading.
Verifies that config.py loads the JSON config and applies defaults
the same way startup.js does.
"""
import json
import os
import tempfile
import pytest
from cteward_ng.config import load_config
class TestLoadConfig:
def test_loads_json_file(self):
with tempfile.NamedTemporaryFile(
mode='w', suffix='.json', delete=False
) as fh:
json.dump({'mssql': {'password': 'test'}}, fh)
fh.flush()
config = load_config(fh.name)
os.unlink(fh.name)
assert config['mssql']['password'] == 'test'
def test_applies_defaults(self):
with tempfile.NamedTemporaryFile(
mode='w', suffix='.json', delete=False
) as fh:
json.dump({}, fh)
fh.flush()
config = load_config(fh.name)
os.unlink(fh.name)
assert 'mssql' in config
assert 'server' in config
assert 'auth' in config
assert 'bots' in config['auth']
assert 'flags' in config['auth']
def test_missing_file_returns_empty(self):
config = load_config('/nonexistent/path.json')
assert config == {}
def test_invalid_json_returns_empty(self):
with tempfile.NamedTemporaryFile(
mode='w', suffix='.json', delete=False
) as fh:
fh.write('{invalid json}')
fh.flush()
config = load_config(fh.name)
os.unlink(fh.name)
assert config == {}

View file

@ -0,0 +1,86 @@
"""Unit tests for memberdata utility functions.
Replaces test/memberdata_datum.js, test/memberdata_datum_parsed.js,
test/memberdata_patenarray.js, test/memberdata_realstatus.js.
"""
import pytest
from cteward_ng.memberdata import (
realstatus,
datum,
datum_parsed,
patenarray,
cleanpaten,
)
class TestDatum:
def test_valid_yyyy_mmdd(self):
assert datum('20230115') == '15.2.2023'
def test_invalid_length(self):
assert datum('2023011') == '1.1.1970'
def test_not_a_string(self):
assert datum(12345) == '1.1.1970'
def test_invalid_format(self):
assert datum('notadate') == '1.1.1970'
def test_empty_string(self):
assert datum('') == '1.1.1970'
class TestDatumParsed:
def test_iso_format(self):
result = datum_parsed('2023-01-15T00:00:00.000Z')
assert '2023' in result
def test_invalid(self):
assert datum_parsed('not-a-date') == '1.1.1970'
class TestPatenarray:
def test_single_name(self):
assert patenarray('Alice') == ['Alice']
def test_multiple_names(self):
assert patenarray('Alice, Bob, Charlie') == ['Alice', 'Bob', 'Charlie']
def test_empty_string(self):
assert patenarray('') == []
def test_none(self):
assert patenarray(None) == []
def test_whitespace_handling(self):
assert patenarray(' Alice , Bob ') == ['Alice', 'Bob']
def test_empty_entries(self):
assert patenarray('Alice,,Bob') == ['Alice', 'Bob']
class TestCleanpaten:
def test_clean_and_rejoin(self):
assert cleanpaten(' Alice , Bob , Charlie ') == 'Alice,Bob,Charlie'
class TestRealstatus:
def test_none_raises(self):
with pytest.raises(TypeError):
realstatus(None)
def test_default_to_crew(self):
assert realstatus({}) == 'crew'
def test_explicit_crew(self):
assert realstatus({'Kennung3': 'crew'}) == 'crew'
def test_raumfahrer(self):
assert realstatus({'Kennung3': 'raumfahrer'}) == 'raumfahrer'
def test_passiv(self):
assert realstatus({'Kennung3': 'passiv'}) == 'passiv'
def test_disabled_prefix(self):
assert realstatus({'Kurzname': 'disabled-someone'}) == 'ex-crew'

108
cteward-ng/views.py Normal file
View file

@ -0,0 +1,108 @@
"""Legacy API route handlers.
Replaces all server.get() calls in startup.js.
Blueprint URL prefix: /legacy
Endpoints:
GET /monitor
GET /memberlist-oldformat
GET /stats/members
GET /stats/contracts
GET /stats/genders
GET /stats/ages
GET /member/<crewname>
GET /member/<crewname>/raw
GET /member/<crewname>/memo
GET /member/<crewname>/contributions
GET /member/<crewname>/<contract|debit|withdrawal|payment>/[<id>]/raw/
"""
from flask import Blueprint, current_app, request
legacy_bp = Blueprint('legacy', __name__)
# Placeholder imports — implemented in subsequent phases
# from . import auth
# from . import database
# from . import filters
# from . import mappings
# from . import renderers
@legacy_bp.route('/monitor')
def monitor():
"""Health check endpoint."""
# TODO Phase 6: call database.check_backend_okay()
from flask import jsonify
return jsonify({'status': 'OK'})
@legacy_bp.route('/memberlist-oldformat')
def memberlist_oldformat():
"""CSV member list for LDAP export."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/members')
def stats_members():
"""Member count over time."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/contracts')
def stats_contracts():
"""Contract statistics."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/genders')
def stats_genders():
"""Gender demographics."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/stats/ages')
def stats_ages():
"""Age demographics."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>')
def member(crewname):
"""Member details (or list when crewname is '' or '*')."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/raw')
def member_raw(crewname):
"""Raw DB member record."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/memo')
def member_memo(crewname):
"""RTF memo for a member."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/contributions')
def member_contributions(crewname):
"""Contribution summary for a member."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")
@legacy_bp.route('/member/<crewname>/<detail_type>/raw/')
@legacy_bp.route('/member/<crewname>/<detail_type>/<detail_id>/raw/')
def member_detail_raw(crewname, detail_type, detail_id=None):
"""Raw detail records (contract, debit, withdrawal, payment)."""
# TODO Phase 6
raise NotImplementedError("Not yet implemented")