"""Authentication provider. Replaces authprovider.js: - check_password: plaintext and apr1 MD5 hash verification - find_botuser: bot user lookup from config - find_ldapuser: LDAP authentication via ldap3 - authorize: full auth pipeline entry point """ import base64 import logging from flask import abort from passlib.hash import apr_md5_crypt from database import member_lookup from memberdata import realstatus as md_realstatus from permissions import ( find_config_flags, find_database_flags, impersonate, effective_permissions, ) logger = logging.getLogger(__name__) def check_password(password, hash_value): """Verify *password* against *hash_value*. Supports plaintext and apr1 (MD5 crypt) formats. Replaces Node.js apache-md5 module. """ logger.debug("Verifying password for hash type: %s", hash_value[:5] if hash_value else 'None') # Plaintext if not hash_value.startswith('$'): logger.debug("Plaintext password check") return password == hash_value # Parse algorithm tag: $apr1$... try: _, algo, _ = hash_value.split('$', 2) except ValueError: raise ValueError("Password hashing algorithm not selected") if algo == 'apr1': logger.debug("apr1 MD5 crypt verification for hash %s...", hash_value[:8]) try: return apr_md5_crypt.verify(password, hash_value) except Exception as exc: logger.warning("apr1 verify failed for hash %s: %s", hash_value[:8], exc) return False raise ValueError(f"Unsupported password hashing algorithm: {algo}") def _parse_basic_auth(request): """Extract (username, password) tuple from the Authorization header. Returns None if no Basic auth is present or parsing fails. """ auth_header = request.headers.get('Authorization', '') if not auth_header: logger.debug("No Authorization header present") return None parts = auth_header.split(' ', 1) if len(parts) != 2 or parts[0] != 'Basic': logger.debug("Invalid Authorization header format") return None try: decoded = base64.b64decode(parts[1]).decode('utf-8') username, _, password = decoded.partition(':') logger.debug("Basic auth parsed: username=%s", username) return (username, password) except Exception: logger.warning("Failed to decode Basic auth header") return None def find_botuser(ctx): """Try to authenticate as a configured bot user. If the Basic auth username matches a key in config.auth.bots and the password checks out, set ctx['username'] and return. Otherwise pass through silently (no username set yet). """ if ctx.get('username') is not None: # Already authenticated as bot or LDAP user logger.debug("Username already set: %s, skipping bot auth", ctx.get('username')) return ctx auth = _parse_basic_auth(ctx['request']) if auth is None: logger.debug("No Basic auth found, skipping bot auth") return ctx username, password = auth bots = ctx['config'].get('auth', {}).get('bots', {}) bot_pass = bots.get(username) if bot_pass is None: # Not a known bot — pass through to LDAP logger.debug("User '%s' not found in bot config, trying LDAP", username) return ctx logger.info("Attempting bot auth for user '%s'", username) try: match = check_password(password, bot_pass) except Exception as exc: logger.error("Bot password check error for user '%s': %s", username, exc) abort(500, description=str(exc)) if not match: logger.warning("Bot auth failed for user '%s' - invalid password", username) abort(401, description="Not authorized. #2") logger.info("Bot auth successful for user '%s'", username) ctx['username'] = username return ctx def find_ldapuser(ctx): """Try to authenticate against LDAP. If ctx already has a username (bot auth succeeded), short-circuit. Otherwise attempt LDAP bind with the Basic auth credentials. On success set ctx['username']. On failure silently pass through (same as Node.js — wrong password or unknown user is not an error, it just means "not this identity"). """ if ctx.get('username') is not None: logger.debug("Username already set: %s, skipping LDAP auth", ctx.get('username')) return ctx auth = _parse_basic_auth(ctx['request']) if auth is None: logger.debug("No Basic auth found, skipping LDAP auth") return ctx ldap_config = ctx['config'].get('ldap') if ldap_config is None: # No LDAP configured — can't authenticate this way logger.critical("NO LDAP configuration found") return ctx username, password = auth logger.info("Attempting LDAP auth for user '%s'", username) try: authenticated = _authenticate_ldap(ldap_config, username, password) except ConnectionError as exc: logger.error("LDAP connection failed for user '%s': %s", username, exc) abort(500, description="LDAP connection failed.") if authenticated: logger.info("LDAP auth successful for user '%s'", username) ctx['username'] = username else: logger.debug("LDAP auth failed for user '%s' - invalid credentials", username) return ctx def _authenticate_ldap(config, username, password): """Perform a single LDAP bind attempt. Returns True on success, False on bad credentials. Raises ConnectionError for server-level failures. """ from ldap3 import Server, Connection, ALL, SUBTREE server_url = config.get('url', 'ldap://localhost') logger.debug("LDAP bind attempt: server=%s, bindDN=%s", server_url, config.get('bindDN', '').format(username=username)) try: server = Server( config.get('url', 'ldap://localhost'), use_ssl=config.get('useSSL', False), connect_timeout=config.get('connectTimeout', 10), ) conn = Connection( server, auto_bind=True, user=config.get('bindDN', '').format(username=username), password=password, read_only=True, ) finally: pass # ldap3 handles cleanup if config.get('searchBase'): # Verify the user actually exists in the directory search_filter = config.get('searchFilter', '(uid={})').format(username=username) logger.debug("LDAP search: base=%s, filter=%s", config['searchBase'], search_filter) conn.search( search_base=config['searchBase'], search_filter=search_filter, search_scope=SUBTREE, attributes=config.get('attributes', []), ) logger.debug("LDAP search returned %d entries", len(conn.entries)) if not conn.entries: return False conn.unbind() return True def authorize(ctx): """Run the full authorization pipeline. Replaces the Promise chain in authprovider.js:authorize(). ctx is a dict with keys: config, request, response, permissions Returns ctx augmented with: username, flags, data, query, filter, permission Raises Flask 401 / 403 abort on auth failures. """ req = ctx['request'] perms = ctx.get('permissions', {}) auth = _parse_basic_auth(req) logger.debug("Authorization started, credentials present: %s", auth is not None) # No credentials at all — allow only if anonymous access is permitted if auth is None: if '_anonymous_' not in perms: logger.warning("Authorization failed: no credentials and anonymous access prohibited") abort(401, description="Not authorized, anonymous access prohibited.") # Anonymous path — set a sentinel username logger.info("Anonymous access granted") ctx['username'] = 'anonymous' ctx['flags'] = ['_anonymous_'] else: # Pipeline: bot → LDAP → config flags → DB flags → impersonate → permissions logger.debug("Running auth pipeline for user '%s'", auth[0] if auth else 'unknown') find_botuser(ctx) find_ldapuser(ctx) if ctx.get('username') is None: logger.warning("Authorization failed: authentication not successful") abort(401, description="Not authorized. #5") # Config-level flags ctx['flags'] = ['_anonymous_'] find_config_flags(ctx) find_database_flags(ctx) impersonate(ctx) effective_permissions(ctx) logger.info("Authorization complete for user '%s'", ctx.get('username', 'unknown')) return ctx