2026-06-06 10:18:15 +02:00
|
|
|
"""Flag-based permission resolution.
|
|
|
|
|
|
|
|
|
|
Replaces the permission logic in authprovider.js:
|
|
|
|
|
find_config_flags, find_database_flags, impersonate,
|
|
|
|
|
effective_permissions.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
|
2026-06-06 22:21:20 +02:00
|
|
|
from flask import abort
|
|
|
|
|
|
2026-06-08 20:33:47 +02:00
|
|
|
from database import member_lookup
|
|
|
|
|
from memberdata import realstatus as md_realstatus
|
2026-06-06 22:21:20 +02:00
|
|
|
|
2026-06-06 10:18:15 +02:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_config_flags(ctx):
|
|
|
|
|
"""Assign permission flags from config based on username.
|
|
|
|
|
|
|
|
|
|
Replaces find_config_flags() in authprovider.js.
|
2026-06-06 22:21:20 +02:00
|
|
|
|
|
|
|
|
Steps:
|
|
|
|
|
1. Remember if the original caller is an "impersonating-limited" user
|
|
|
|
|
(_impersonate_ flag but NOT _admin_, _board_, _bot_).
|
|
|
|
|
2. Reset flags to ['_anonymous_'].
|
|
|
|
|
3. Add '_self_' if username == crewname path param.
|
|
|
|
|
4. Merge in config.auth.flags[username].
|
|
|
|
|
5. If impersonating-limited, strip _admin_, _board_, _bot_.
|
2026-06-06 10:18:15 +02:00
|
|
|
"""
|
2026-06-06 22:21:20 +02:00
|
|
|
if ctx.get('username') is None:
|
|
|
|
|
abort(401, description="Not authorized. #5")
|
|
|
|
|
|
|
|
|
|
flags = list(ctx.get('flags', ['_anonymous_']))
|
|
|
|
|
|
|
|
|
|
# Detect "impersonating-limited" state before we reset flags
|
|
|
|
|
impersonating_limited = (
|
|
|
|
|
'_impersonate_' in flags
|
|
|
|
|
and '_admin_' not in flags
|
|
|
|
|
and '_board_' not in flags
|
|
|
|
|
and '_bot_' not in flags
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Reset to baseline
|
|
|
|
|
flags = ['_anonymous_']
|
|
|
|
|
|
|
|
|
|
# Self check: user is querying their own record
|
|
|
|
|
req = ctx['request']
|
|
|
|
|
crewname = req.view_args.get('crewname') if req.view_args else None
|
|
|
|
|
if ctx['username'] == crewname:
|
|
|
|
|
flags.append('_self_')
|
|
|
|
|
|
|
|
|
|
# Merge config-level flags for this user
|
|
|
|
|
user_flags = ctx['config'].get('auth', {}).get('flags', {}).get(ctx['username'])
|
|
|
|
|
if user_flags:
|
|
|
|
|
flags.extend(user_flags)
|
|
|
|
|
|
|
|
|
|
# Reduce impersonated permissions
|
|
|
|
|
if impersonating_limited:
|
|
|
|
|
for strip_flag in ('_admin_', '_board_', '_bot_'):
|
|
|
|
|
while strip_flag in flags:
|
|
|
|
|
flags.remove(strip_flag)
|
|
|
|
|
|
|
|
|
|
ctx['flags'] = flags
|
|
|
|
|
logger.debug("Config flags for %s: %s", ctx['username'], flags)
|
|
|
|
|
return ctx
|
2026-06-06 10:18:15 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_database_flags(ctx):
|
|
|
|
|
"""Look up user in DB and assign _member_ / _astronaut_ / _passive_ flags.
|
|
|
|
|
|
|
|
|
|
Replaces find_database_flags() in authprovider.js.
|
2026-06-06 22:21:20 +02:00
|
|
|
|
|
|
|
|
Looks up the username as a Kurzname in the Adresse table, determines
|
|
|
|
|
real status, and appends the corresponding flag. Silently ignores
|
|
|
|
|
lookup failures (non-members simply don't get DB-based flags).
|
2026-06-06 10:18:15 +02:00
|
|
|
"""
|
2026-06-06 22:21:20 +02:00
|
|
|
if ctx.get('username') is None:
|
|
|
|
|
abort(401, description="Not authorized. #6")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
member_row = member_lookup(ctx['username'])
|
|
|
|
|
status = md_realstatus(member_row)
|
|
|
|
|
|
|
|
|
|
if status == 'crew':
|
|
|
|
|
ctx['flags'].append('_member_')
|
|
|
|
|
elif status == 'raumfahrer':
|
|
|
|
|
ctx['flags'].append('_astronaut_')
|
|
|
|
|
elif status == 'passiv':
|
|
|
|
|
ctx['flags'].append('_passive_')
|
|
|
|
|
|
|
|
|
|
logger.debug(
|
|
|
|
|
"DB flags for %s: status=%s → flags=%s",
|
|
|
|
|
ctx['username'], status, ctx['flags'],
|
|
|
|
|
)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
# User not in DB — no special flags (same as Node.js catch)
|
|
|
|
|
logger.debug("DB flag lookup failed for %s: %s", ctx['username'], exc)
|
|
|
|
|
|
|
|
|
|
return ctx
|
2026-06-06 10:18:15 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def impersonate(ctx):
|
|
|
|
|
"""Handle ?impersonate= query parameter.
|
|
|
|
|
|
|
|
|
|
Replaces impersonate() in authprovider.js.
|
2026-06-06 22:21:20 +02:00
|
|
|
|
|
|
|
|
If the caller has _admin_, _board_ or _impersonate_ flag and passes
|
|
|
|
|
a different username via `?impersonate=<name>`, switch identity and
|
|
|
|
|
re-run find_config_flags + find_database_flags for the new user.
|
2026-06-06 10:18:15 +02:00
|
|
|
"""
|
2026-06-06 22:21:20 +02:00
|
|
|
if ctx.get('username') is None:
|
|
|
|
|
abort(401, description="Not authorized. #7")
|
|
|
|
|
|
|
|
|
|
impersonate_as = ctx['request'].args.get('impersonate')
|
|
|
|
|
if impersonate_as is None or impersonate_as == ctx['username']:
|
|
|
|
|
return ctx
|
|
|
|
|
|
|
|
|
|
if not any(f in ctx['flags'] for f in ('_admin_', '_board_', '_impersonate_')):
|
|
|
|
|
abort(401, description="Not authorized. #8")
|
|
|
|
|
|
|
|
|
|
logger.info("Impersonation: %s → %s", ctx['username'], impersonate_as)
|
|
|
|
|
ctx['username'] = impersonate_as
|
|
|
|
|
|
|
|
|
|
# Re-resolve flags for the impersonated identity
|
|
|
|
|
find_config_flags(ctx)
|
|
|
|
|
find_database_flags(ctx)
|
|
|
|
|
|
|
|
|
|
return ctx
|
2026-06-06 10:18:15 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def effective_permissions(ctx):
|
|
|
|
|
"""Determine the effective permission set (lowest level wins).
|
|
|
|
|
|
|
|
|
|
Replaces effective_permissions() in authprovider.js.
|
2026-06-06 22:21:20 +02:00
|
|
|
|
|
|
|
|
Iterates over the route-specific permissions map. For every flag the
|
|
|
|
|
user possesses, pick the permission entry with the smallest `level`
|
|
|
|
|
number. That becomes the active query/filter configuration.
|
|
|
|
|
|
|
|
|
|
Aborts 403 if no matching permission is found.
|
2026-06-06 10:18:15 +02:00
|
|
|
"""
|
2026-06-06 22:21:20 +02:00
|
|
|
if ctx.get('username') is None:
|
|
|
|
|
abort(401, description="Not authorized. #9")
|
|
|
|
|
|
|
|
|
|
perms = ctx.get('permissions', {})
|
|
|
|
|
flags = ctx.get('flags', [])
|
|
|
|
|
|
|
|
|
|
best_level = 255
|
|
|
|
|
best_permission = perms.get('_anonymous_')
|
|
|
|
|
|
|
|
|
|
for flag, perm in perms.items():
|
|
|
|
|
if flag not in flags:
|
|
|
|
|
continue
|
|
|
|
|
level = perm.get('level', 255)
|
|
|
|
|
if level < best_level:
|
|
|
|
|
best_level = level
|
|
|
|
|
best_permission = perm
|
|
|
|
|
|
|
|
|
|
if best_permission is None:
|
|
|
|
|
abort(403, description=(
|
|
|
|
|
"The link you followed is either outdated, inaccurate, or "
|
|
|
|
|
"the server has been instructed not to let you have it."
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
logger.debug("Effective permission for %s: %s", ctx['username'], best_permission)
|
|
|
|
|
|
|
|
|
|
# Merge permission keys (query, filter) into the context
|
|
|
|
|
ctx['permission'] = best_permission
|
|
|
|
|
ctx['query'] = best_permission.get('query')
|
|
|
|
|
ctx['filter'] = best_permission.get('filter')
|
|
|
|
|
|
|
|
|
|
return ctx
|