cteward-ng/cteward_ng/auth.py
2026-06-06 22:21:20 +02:00

218 lines
6.3 KiB
Python

"""Authentication provider.
Replaces authprovider.js:
- check_password: plaintext and apr1 MD5 hash verification
- find_botuser: bot user lookup from config
- find_ldapuser: LDAP authentication via ldap3
- authorize: full auth pipeline entry point
"""
import base64
import logging
from flask import abort
from passlib.hash import apr_md5_crypt
from .database import member_lookup
from .memberdata import realstatus as md_realstatus
from .permissions import (
find_config_flags,
find_database_flags,
impersonate,
effective_permissions,
)
logger = logging.getLogger(__name__)
def check_password(password, hash_value):
"""Verify *password* against *hash_value*.
Supports plaintext and apr1 (MD5 crypt) formats.
Replaces Node.js apache-md5 module.
"""
# Plaintext
if not hash_value.startswith('$'):
return password == hash_value
# Parse algorithm tag: $apr1$...
try:
_, algo, _ = hash_value.split('$', 2)
except ValueError:
raise ValueError("Password hashing algorithm not selected")
if algo == 'apr1':
try:
return apr_md5_crypt.verify(password, hash_value)
except Exception as exc:
logger.warning("apr1 verify failed for hash %s: %s", hash_value[:8], exc)
return False
raise ValueError(f"Unsupported password hashing algorithm: {algo}")
def _parse_basic_auth(request):
"""Extract (username, password) tuple from the Authorization header.
Returns None if no Basic auth is present or parsing fails.
"""
auth_header = request.headers.get('Authorization', '')
if not auth_header:
return None
parts = auth_header.split(' ', 1)
if len(parts) != 2 or parts[0] != 'Basic':
return None
try:
decoded = base64.b64decode(parts[1]).decode('utf-8')
username, _, password = decoded.partition(':')
return (username, password)
except Exception:
return None
def find_botuser(ctx):
"""Try to authenticate as a configured bot user.
If the Basic auth username matches a key in config.auth.bots and the
password checks out, set ctx['username'] and return.
Otherwise pass through silently (no username set yet).
"""
if ctx.get('username') is not None:
# Already authenticated as bot or LDAP user
return ctx
auth = _parse_basic_auth(ctx['request'])
if auth is None:
return ctx
username, password = auth
bots = ctx['config'].get('auth', {}).get('bots', {})
bot_pass = bots.get(username)
if bot_pass is None:
# Not a known bot — pass through to LDAP
return ctx
try:
match = check_password(password, bot_pass)
except Exception as exc:
logger.error("Bot password check error: %s", exc)
abort(500, description=str(exc))
if not match:
abort(401, description="Not authorized. #2")
ctx['username'] = username
return ctx
def find_ldapuser(ctx):
"""Try to authenticate against LDAP.
If ctx already has a username (bot auth succeeded), short-circuit.
Otherwise attempt LDAP bind with the Basic auth credentials.
On success set ctx['username']. On failure silently pass through
(same as Node.js — wrong password or unknown user is not an error,
it just means "not this identity").
"""
if ctx.get('username') is not None:
return ctx
auth = _parse_basic_auth(ctx['request'])
if auth is None:
return ctx
ldap_config = ctx['config'].get('ldap')
if ldap_config is None:
# No LDAP configured — can't authenticate this way
return ctx
username, password = auth
try:
authenticated = _authenticate_ldap(ldap_config, username, password)
except ConnectionError as exc:
logger.error("LDAP connection failed: %s", exc)
abort(500, description="LDAP connection failed.")
if authenticated:
ctx['username'] = username
return ctx
def _authenticate_ldap(config, username, password):
"""Perform a single LDAP bind attempt.
Returns True on success, False on bad credentials.
Raises ConnectionError for server-level failures.
"""
from ldap3 import Server, Connection, ALL, SUBTREE
try:
server = Server(
config.get('url', 'ldap://localhost'),
use_ssl=config.get('useSSL', False),
connect_timeout=config.get('connectTimeout', 10),
)
conn = Connection(
server,
auto_bind=True,
user=config.get('bindDN', '').format(username=username),
password=password,
read_only=True,
)
finally:
pass # ldap3 handles cleanup
if config.get('searchBase'):
# Verify the user actually exists in the directory
conn.search(
search_base=config['searchBase'],
search_filter=config.get('searchFilter', '(uid={})').format(username=username),
search_scope=SUBTREE,
attributes=config.get('attributes', []),
)
if not conn.entries:
return False
conn.unbind()
return True
def authorize(ctx):
"""Run the full authorization pipeline.
Replaces the Promise chain in authprovider.js:authorize().
ctx is a dict with keys: config, request, response, permissions
Returns ctx augmented with: username, flags, data, query, filter, permission
Raises Flask 401 / 403 abort on auth failures.
"""
req = ctx['request']
perms = ctx.get('permissions', {})
auth = _parse_basic_auth(req)
# No credentials at all — allow only if anonymous access is permitted
if auth is None:
if '_anonymous_' not in perms:
abort(401, description="Not authorized, anonymous access prohibited.")
# Anonymous path — set a sentinel username
ctx['username'] = 'anonymous'
ctx['flags'] = ['_anonymous_']
else:
# Pipeline: bot → LDAP → config flags → DB flags → impersonate → permissions
find_botuser(ctx)
find_ldapuser(ctx)
if ctx.get('username') is None:
abort(401, description="Not authorized. #5")
# Config-level flags
ctx['flags'] = ['_anonymous_']
find_config_flags(ctx)
find_database_flags(ctx)
impersonate(ctx)
effective_permissions(ctx)
return ctx