"""Authentication provider. Replaces authprovider.js: - check_password: plaintext and apr1 MD5 hash verification - find_botuser: bot user lookup from config - find_ldapuser: LDAP authentication via ldap3 - authorize: full auth pipeline entry point """ import base64 import logging from flask import abort from passlib.hash import apr_md5_crypt from database import member_lookup from memberdata import realstatus as md_realstatus from permissions import ( find_config_flags, find_database_flags, impersonate, effective_permissions, ) logger = logging.getLogger(__name__) def check_password(password, hash_value): """Verify *password* against *hash_value*. Supports plaintext and apr1 (MD5 crypt) formats. Replaces Node.js apache-md5 module. """ # Plaintext if not hash_value.startswith('$'): return password == hash_value # Parse algorithm tag: $apr1$... try: _, algo, _ = hash_value.split('$', 2) except ValueError: raise ValueError("Password hashing algorithm not selected") if algo == 'apr1': try: return apr_md5_crypt.verify(password, hash_value) except Exception as exc: logger.warning("apr1 verify failed for hash %s: %s", hash_value[:8], exc) return False raise ValueError(f"Unsupported password hashing algorithm: {algo}") def _parse_basic_auth(request): """Extract (username, password) tuple from the Authorization header. Returns None if no Basic auth is present or parsing fails. """ auth_header = request.headers.get('Authorization', '') if not auth_header: return None parts = auth_header.split(' ', 1) if len(parts) != 2 or parts[0] != 'Basic': return None try: decoded = base64.b64decode(parts[1]).decode('utf-8') username, _, password = decoded.partition(':') return (username, password) except Exception: return None def find_botuser(ctx): """Try to authenticate as a configured bot user. If the Basic auth username matches a key in config.auth.bots and the password checks out, set ctx['username'] and return. Otherwise pass through silently (no username set yet). """ if ctx.get('username') is not None: # Already authenticated as bot or LDAP user return ctx auth = _parse_basic_auth(ctx['request']) if auth is None: return ctx username, password = auth bots = ctx['config'].get('auth', {}).get('bots', {}) bot_pass = bots.get(username) if bot_pass is None: # Not a known bot — pass through to LDAP return ctx try: match = check_password(password, bot_pass) except Exception as exc: logger.error("Bot password check error: %s", exc) abort(500, description=str(exc)) if not match: abort(401, description="Not authorized. #2") ctx['username'] = username return ctx def find_ldapuser(ctx): """Try to authenticate against LDAP. If ctx already has a username (bot auth succeeded), short-circuit. Otherwise attempt LDAP bind with the Basic auth credentials. On success set ctx['username']. On failure silently pass through (same as Node.js — wrong password or unknown user is not an error, it just means "not this identity"). """ if ctx.get('username') is not None: return ctx auth = _parse_basic_auth(ctx['request']) if auth is None: return ctx ldap_config = ctx['config'].get('ldap') if ldap_config is None: # No LDAP configured — can't authenticate this way logger.critical("NO LDAP") return ctx username, password = auth try: authenticated = _authenticate_ldap(ldap_config, username, password) except ConnectionError as exc: logger.error("LDAP connection failed: %s", exc) abort(500, description="LDAP connection failed.") if authenticated: ctx['username'] = username return ctx def _authenticate_ldap(config, username, password): """Perform a single LDAP bind attempt. Returns True on success, False on bad credentials. Raises ConnectionError for server-level failures. """ from ldap3 import Server, Connection, ALL, SUBTREE try: server = Server( config.get('url', 'ldap://localhost'), use_ssl=config.get('useSSL', False), connect_timeout=config.get('connectTimeout', 10), ) conn = Connection( server, auto_bind=True, user=config.get('bindDN', '').format(username=username), password=password, read_only=True, ) finally: pass # ldap3 handles cleanup if config.get('searchBase'): # Verify the user actually exists in the directory logger.critical("searchBase: %s", config.get('searchFilter', '(uid={})').format(username=username)) conn.search( search_base=config['searchBase'], search_filter=config.get('searchFilter', '(uid={})').format(username=username), search_scope=SUBTREE, attributes=config.get('attributes', []), ) logger.critical("search done") if not conn.entries: return False conn.unbind() return True def authorize(ctx): """Run the full authorization pipeline. Replaces the Promise chain in authprovider.js:authorize(). ctx is a dict with keys: config, request, response, permissions Returns ctx augmented with: username, flags, data, query, filter, permission Raises Flask 401 / 403 abort on auth failures. """ req = ctx['request'] perms = ctx.get('permissions', {}) auth = _parse_basic_auth(req) # No credentials at all — allow only if anonymous access is permitted if auth is None: if '_anonymous_' not in perms: abort(401, description="Not authorized, anonymous access prohibited.") # Anonymous path — set a sentinel username ctx['username'] = 'anonymous' ctx['flags'] = ['_anonymous_'] else: # Pipeline: bot → LDAP → config flags → DB flags → impersonate → permissions find_botuser(ctx) find_ldapuser(ctx) if ctx.get('username') is None: abort(401, description="Not authorized. #5") # Config-level flags ctx['flags'] = ['_anonymous_'] find_config_flags(ctx) find_database_flags(ctx) impersonate(ctx) effective_permissions(ctx) return ctx