cteward-ng/cteward_ng/permissions.py
2026-06-08 20:33:47 +02:00

173 lines
5.4 KiB
Python

"""Flag-based permission resolution.
Replaces the permission logic in authprovider.js:
find_config_flags, find_database_flags, impersonate,
effective_permissions.
"""
import logging
from flask import abort
from database import member_lookup
from memberdata import realstatus as md_realstatus
logger = logging.getLogger(__name__)
def find_config_flags(ctx):
"""Assign permission flags from config based on username.
Replaces find_config_flags() in authprovider.js.
Steps:
1. Remember if the original caller is an "impersonating-limited" user
(_impersonate_ flag but NOT _admin_, _board_, _bot_).
2. Reset flags to ['_anonymous_'].
3. Add '_self_' if username == crewname path param.
4. Merge in config.auth.flags[username].
5. If impersonating-limited, strip _admin_, _board_, _bot_.
"""
if ctx.get('username') is None:
abort(401, description="Not authorized. #5")
flags = list(ctx.get('flags', ['_anonymous_']))
# Detect "impersonating-limited" state before we reset flags
impersonating_limited = (
'_impersonate_' in flags
and '_admin_' not in flags
and '_board_' not in flags
and '_bot_' not in flags
)
# Reset to baseline
flags = ['_anonymous_']
# Self check: user is querying their own record
req = ctx['request']
crewname = req.view_args.get('crewname') if req.view_args else None
if ctx['username'] == crewname:
flags.append('_self_')
# Merge config-level flags for this user
user_flags = ctx['config'].get('auth', {}).get('flags', {}).get(ctx['username'])
if user_flags:
flags.extend(user_flags)
# Reduce impersonated permissions
if impersonating_limited:
for strip_flag in ('_admin_', '_board_', '_bot_'):
while strip_flag in flags:
flags.remove(strip_flag)
ctx['flags'] = flags
logger.debug("Config flags for %s: %s", ctx['username'], flags)
return ctx
def find_database_flags(ctx):
"""Look up user in DB and assign _member_ / _astronaut_ / _passive_ flags.
Replaces find_database_flags() in authprovider.js.
Looks up the username as a Kurzname in the Adresse table, determines
real status, and appends the corresponding flag. Silently ignores
lookup failures (non-members simply don't get DB-based flags).
"""
if ctx.get('username') is None:
abort(401, description="Not authorized. #6")
try:
member_row = member_lookup(ctx['username'])
status = md_realstatus(member_row)
if status == 'crew':
ctx['flags'].append('_member_')
elif status == 'raumfahrer':
ctx['flags'].append('_astronaut_')
elif status == 'passiv':
ctx['flags'].append('_passive_')
logger.debug(
"DB flags for %s: status=%s → flags=%s",
ctx['username'], status, ctx['flags'],
)
except Exception as exc:
# User not in DB — no special flags (same as Node.js catch)
logger.debug("DB flag lookup failed for %s: %s", ctx['username'], exc)
return ctx
def impersonate(ctx):
"""Handle ?impersonate= query parameter.
Replaces impersonate() in authprovider.js.
If the caller has _admin_, _board_ or _impersonate_ flag and passes
a different username via `?impersonate=<name>`, switch identity and
re-run find_config_flags + find_database_flags for the new user.
"""
if ctx.get('username') is None:
abort(401, description="Not authorized. #7")
impersonate_as = ctx['request'].args.get('impersonate')
if impersonate_as is None or impersonate_as == ctx['username']:
return ctx
if not any(f in ctx['flags'] for f in ('_admin_', '_board_', '_impersonate_')):
abort(401, description="Not authorized. #8")
logger.info("Impersonation: %s%s", ctx['username'], impersonate_as)
ctx['username'] = impersonate_as
# Re-resolve flags for the impersonated identity
find_config_flags(ctx)
find_database_flags(ctx)
return ctx
def effective_permissions(ctx):
"""Determine the effective permission set (lowest level wins).
Replaces effective_permissions() in authprovider.js.
Iterates over the route-specific permissions map. For every flag the
user possesses, pick the permission entry with the smallest `level`
number. That becomes the active query/filter configuration.
Aborts 403 if no matching permission is found.
"""
if ctx.get('username') is None:
abort(401, description="Not authorized. #9")
perms = ctx.get('permissions', {})
flags = ctx.get('flags', [])
best_level = 255
best_permission = perms.get('_anonymous_')
for flag, perm in perms.items():
if flag not in flags:
continue
level = perm.get('level', 255)
if level < best_level:
best_level = level
best_permission = perm
if best_permission is None:
abort(403, description=(
"The link you followed is either outdated, inaccurate, or "
"the server has been instructed not to let you have it."
))
logger.debug("Effective permission for %s: %s", ctx['username'], best_permission)
# Merge permission keys (query, filter) into the context
ctx['permission'] = best_permission
ctx['query'] = best_permission.get('query')
ctx['filter'] = best_permission.get('filter')
return ctx