diff --git a/app.py b/app.py index f3a4fe5..159fa77 100644 --- a/app.py +++ b/app.py @@ -7,15 +7,24 @@ import os import argparse import sys import secrets +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger('mailcow-alias-manager') app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', 'malias-default-secret-key-please-change') # Consistent secret key -# Configure for reverse proxy (Authelia, Zoraxy, Nginx, etc.) -# Use higher number of proxies to accommodate both Zoraxy and Authelia +# Configure for reverse proxy - modified to handle multiple proxy layers including Authelia +# Increasing the number of proxies to handle Zoraxy->Authelia->App chain app.wsgi_app = ProxyFix( app.wsgi_app, - x_for=2, # Trust X-Forwarded-For with 2 proxies (Zoraxy + Authelia) + x_for=2, # Trust X-Forwarded-For with 2 proxies (Zoraxy + Authelia) x_proto=2, # Trust X-Forwarded-Proto (http/https) x_host=2, # Trust X-Forwarded-Host x_prefix=2 # Trust X-Forwarded-Prefix @@ -27,113 +36,189 @@ malias_w.init_database() # Session configuration optimized for reverse proxy with Authelia app.config.update( PERMANENT_SESSION_LIFETIME=timedelta(hours=24), - SESSION_COOKIE_NAME='malias_session', # Unique name to avoid conflicts with Authelia - SESSION_COOKIE_SECURE=True, # Always use secure cookies with Authelia + SESSION_COOKIE_NAME='malias_session', # Unique name to avoid conflicts with Authelia + SESSION_COOKIE_SECURE=True, # Always use secure cookies with Authelia SESSION_COOKIE_HTTPONLY=True, - SESSION_COOKIE_SAMESITE='None', # Required for authentication proxies + SESSION_COOKIE_SAMESITE='None', # Required for authentication proxies SESSION_COOKIE_PATH='/', - SESSION_COOKIE_DOMAIN=None, # Let browser auto-set domain - SESSION_REFRESH_EACH_REQUEST=True, # Keep session alive - PREFERRED_URL_SCHEME='https', - APPLICATION_ROOT='/', + SESSION_COOKIE_DOMAIN=None, # Let browser auto-set domain + SESSION_REFRESH_EACH_REQUEST=True, # Keep session alive + PREFERRED_URL_SCHEME='https' ) +@app.after_request +def add_security_headers(response): + """Add security headers to every response""" + # Add SameSite=None explicitly for all cookies if Auth proxy is detected + if request.headers.get('Remote-User') or request.headers.get('X-Remote-User'): + cookie_header = response.headers.get('Set-Cookie', '') + if cookie_header and 'SameSite=' not in cookie_header: + cookie_header = cookie_header.replace('HttpOnly', 'HttpOnly; SameSite=None; Secure') + response.headers['Set-Cookie'] = cookie_header + + return response + +def get_authelia_user(): + """Helper to get authenticated user from Authelia headers""" + # Check several possible header variations + auth_headers = [ + 'Remote-User', + 'X-Remote-User', + 'X-Authelia-Username', + 'X-Forwarded-User', + 'REMOTE_USER' + ] + + for header in auth_headers: + user = request.headers.get(header) + if user: + logger.info(f"Authelia user detected via {header}: {user}") + return user + + return None + def login_required(f): """Decorator to require login for routes""" @wraps(f) def decorated_function(*args, **kwargs): - # Check for Authelia headers first - remote_user = request.headers.get('Remote-User') or request.headers.get('X-Remote-User') - remote_groups = request.headers.get('Remote-Groups') or request.headers.get('X-Remote-Groups') + # Check for Authelia authentication + authelia_user = get_authelia_user() - # If Authelia authenticated the user, trust it and bypass local auth - if remote_user: - if not session.get('logged_in'): + # If Authelia authenticated the user, update local session + if authelia_user: + # Log all headers for debugging + if app.debug: + logger.info(f"Headers for authenticated request: {dict(request.headers)}") + + if not session.get('logged_in') or session.get('authelia_user') != authelia_user: + logger.info(f"Auto-login via Authelia for user: {authelia_user}") session.clear() session.permanent = True session['logged_in'] = True - session['authelia_user'] = remote_user + session['authelia_user'] = authelia_user session['user_token'] = secrets.token_urlsafe(32) + session['auth_method'] = 'authelia' session.modified = True + return f(*args, **kwargs) - # Otherwise, fall back to local auth + # Regular session check if not session.get('logged_in'): + logger.warning("Access denied: User not authenticated") return jsonify({'status': 'error', 'message': 'Not authenticated', 'redirect': '/login'}), 401 + return f(*args, **kwargs) return decorated_function @app.route('/login', methods=['GET', 'POST']) def login(): """Login page""" - # Check for Authelia authentication - remote_user = request.headers.get('Remote-User') or request.headers.get('X-Remote-User') + # First, try Authelia authentication + authelia_user = get_authelia_user() - # If Authelia authenticated, redirect to index - if remote_user: + # Debug logging for all requests + if app.debug: + logger.info(f"Login route: method={request.method}, headers={dict(request.headers)}") + + # If Authelia authenticated, login and redirect to index + if authelia_user: + logger.info(f"Login via Authelia for user: {authelia_user}") session.clear() session.permanent = True session['logged_in'] = True - session['authelia_user'] = remote_user + session['authelia_user'] = authelia_user session['user_token'] = secrets.token_urlsafe(32) + session['auth_method'] = 'authelia' session.modified = True return redirect(url_for('index')) + # Handle form submission for local authentication if request.method == 'POST': password = request.json.get('password', '') + logger.info("Login attempt with password (redacted)") + if malias_w.verify_password(password): + logger.info("Login successful with local password") session.clear() session.permanent = True session['logged_in'] = True session['user_token'] = secrets.token_urlsafe(32) + session['auth_method'] = 'local' session.modified = True response = jsonify({'status': 'success', 'message': 'Login successful'}) - # Ensure cookies are properly configured for proxied setup return response else: + logger.warning("Login failed: Invalid password") return jsonify({'status': 'error', 'message': 'Invalid password'}) + # If already logged in, redirect to index if session.get('logged_in'): return redirect(url_for('index')) + # Show login form return render_template('login.html') @app.route('/logout') def logout(): """Logout""" - session.pop('logged_in', None) - session.pop('authelia_user', None) + # Get auth method before clearing session + auth_method = session.get('auth_method') + authelia_user = session.get('authelia_user') - # Determine if we need to redirect to Authelia logout - remote_user = request.headers.get('Remote-User') or request.headers.get('X-Remote-User') + # Clear local session + session.clear() - if remote_user: - # Redirect to Authelia logout if available - authelia_url = request.headers.get('X-Authelia-URL') or None + # If user was authenticated via Authelia, try to redirect to Authelia logout + if auth_method == 'authelia' or authelia_user: + # Look for Authelia URL in headers + authelia_url = request.headers.get('X-Authelia-URL') + + # If found, redirect to Authelia logout if authelia_url: + logger.info(f"Redirecting to Authelia logout: {authelia_url}/logout") return redirect(f"{authelia_url}/logout") + + # Try some common authelia URLs based on the request + if request.host: + domain_parts = request.host.split('.') + if len(domain_parts) >= 2: + base_domain = '.'.join(domain_parts[1:]) # e.g., extract 'example.com' from 'app.example.com' + common_authelia_urls = [ + f"https://auth.{base_domain}/logout", + f"https://authelia.{base_domain}/logout", + f"https://sso.{base_domain}/logout" + ] + + # Try the first one as a fallback + logger.info(f"No Authelia URL header, trying fallback: {common_authelia_urls[0]}") + return redirect(common_authelia_urls[0]) + # Default case: redirect to login page return redirect(url_for('login')) @app.route('/') def index(): """Main page - requires login""" - # Check Authelia authentication - remote_user = request.headers.get('Remote-User') or request.headers.get('X-Remote-User') + # Try to auto-login with Authelia + authelia_user = get_authelia_user() - if remote_user and not session.get('logged_in'): - # Auto-login users authenticated by Authelia + if authelia_user and not session.get('logged_in'): + # Auto-login for users authenticated by Authelia + logger.info(f"Auto-login via Authelia for user: {authelia_user}") session.clear() session.permanent = True session['logged_in'] = True - session['authelia_user'] = remote_user + session['authelia_user'] = authelia_user session['user_token'] = secrets.token_urlsafe(32) + session['auth_method'] = 'authelia' session.modified = True - + + # Check if logged in if not session.get('logged_in'): return redirect(url_for('login')) + # Show main page return render_template('index.html') @app.route('/list_aliases', methods=['POST']) @@ -151,6 +236,7 @@ def list_aliases(): result = malias_w.get_all_aliases(page=page, per_page=20) return jsonify({'status': 'success', 'data': result}) except Exception as e: + logger.exception("Error listing aliases") return jsonify({'status': 'error', 'message': f"Error: {str(e)}"}) @app.route('/sync_aliases', methods=['POST']) @@ -162,6 +248,7 @@ def sync_aliases(): result = f"Aliases synchronized successfully! Added {count} new aliases to local DB." return jsonify({'status': 'success', 'message': result}) except Exception as e: + logger.exception("Error syncing aliases") return jsonify({'status': 'error', 'message': f"Error syncing: {str(e)}"}) @app.route('/get_domains', methods=['POST']) @@ -174,6 +261,7 @@ def get_domains(): result = f"Domains: {', '.join(domain_list)}" return jsonify({'status': 'success', 'message': result, 'domains': domain_list}) except Exception as e: + logger.exception("Error getting domains") return jsonify({'status': 'error', 'message': f"Error: {str(e)}"}) @app.route('/create_alias', methods=['POST']) @@ -192,6 +280,7 @@ def create_alias(): result = f"Alias {alias} created successfully for {goto}" return jsonify({'status': 'success', 'message': result}) except Exception as e: + logger.exception("Error creating alias") return jsonify({'status': 'error', 'message': f"Error creating alias: {str(e)}"}) @app.route('/delete_alias', methods=['POST']) @@ -209,6 +298,7 @@ def delete_alias_route(): result = f"Alias {alias} deleted successfully" return jsonify({'status': 'success', 'message': result}) except Exception as e: + logger.exception("Error deleting alias") return jsonify({'status': 'error', 'message': f"Error deleting alias: {str(e)}"}) @app.route('/delete_aliases_bulk', methods=['POST']) @@ -263,6 +353,7 @@ def delete_aliases_bulk(): 'details': result }) except Exception as e: + logger.exception("Error deleting aliases bulk") return jsonify({'status': 'error', 'message': f"Error deleting aliases: {str(e)}"}) @app.route('/create_timed_alias', methods=['POST']) @@ -281,6 +372,7 @@ def create_timed_alias(): result = f"Timed alias created for {username} on domain {domain}" return jsonify({'status': 'success', 'message': result}) except Exception as e: + logger.exception("Error creating timed alias") return jsonify({'status': 'error', 'message': f"Error: {str(e)}"}) @app.route('/search', methods=['POST']) @@ -303,6 +395,7 @@ def search(): return jsonify({'status': 'success', 'message': message, 'query': query}) except Exception as e: + logger.exception("Error searching aliases") return jsonify({'status': 'error', 'message': f"Search error: {str(e)}"}) @app.route('/config', methods=['GET', 'POST']) @@ -348,6 +441,7 @@ def change_password_route(): malias_w.change_password(old_password, new_password) return jsonify({'status': 'success', 'message': 'Password changed successfully'}) except Exception as e: + logger.exception("Error changing password") return jsonify({'status': 'error', 'message': str(e)}) # Add a health check endpoint for proxies @@ -358,20 +452,19 @@ def health_check(): 'status': 'ok', 'authenticated': session.get('logged_in', False), 'authelia_user': session.get('authelia_user', None), - 'version': '1.0.0' + 'auth_method': session.get('auth_method'), + 'version': '1.0.1' }) -# Add a debugging endpoint (only active in debug mode) +# Add a debugging endpoint @app.route('/debug') def debug_info(): """Show debug information about the current request""" - if not app.debug: - return jsonify({'status': 'error', 'message': 'Debug mode not enabled'}), 403 - + # Always allow this in production to help with troubleshooting debug_data = { 'headers': dict(request.headers), - 'cookies': dict(request.cookies), - 'session': dict(session) if session else {}, + 'cookies': {k: '***' for k in request.cookies.keys()}, # Don't expose cookie values + 'session': {k: ('***' if k in ['user_token', 'csrf_token'] else v) for k, v in session.items()} if session else {}, 'remote_addr': request.remote_addr, 'scheme': request.scheme, 'host': request.host, @@ -381,11 +474,39 @@ def debug_info(): 'x_forwarded_proto': request.headers.get('X-Forwarded-Proto'), 'x_forwarded_host': request.headers.get('X-Forwarded-Host'), 'x_forwarded_prefix': request.headers.get('X-Forwarded-Prefix'), - 'remote_user': request.headers.get('Remote-User') or request.headers.get('X-Remote-User'), - 'remote_groups': request.headers.get('Remote-Groups') or request.headers.get('X-Remote-Groups'), + 'remote_user': get_authelia_user(), } - return jsonify(debug_data) + # Additional server information + debug_data['server_info'] = { + 'cookie_settings': { + 'SESSION_COOKIE_NAME': app.config['SESSION_COOKIE_NAME'], + 'SESSION_COOKIE_SECURE': app.config['SESSION_COOKIE_SECURE'], + 'SESSION_COOKIE_HTTPONLY': app.config['SESSION_COOKIE_HTTPONLY'], + 'SESSION_COOKIE_SAMESITE': app.config['SESSION_COOKIE_SAMESITE'], + 'SESSION_COOKIE_PATH': app.config['SESSION_COOKIE_PATH'], + }, + 'app_config': { + 'DEBUG': app.debug, + 'PREFERRED_URL_SCHEME': app.config['PREFERRED_URL_SCHEME'], + } + } + + response = jsonify(debug_data) + response.headers['Cache-Control'] = 'no-store, must-revalidate' + response.headers['Pragma'] = 'no-cache' + response.headers['Expires'] = '0' + return response + +# Dedicated endpoint for header diagnostics +@app.route('/headers') +def show_headers(): + """Show all request headers - useful for debugging proxies""" + return jsonify({ + 'headers': dict(request.headers), + 'remote_addr': request.remote_addr, + 'authelia_user': get_authelia_user() + }) if __name__ == '__main__': # Parse command-line arguments @@ -437,6 +558,9 @@ Environment Variables (Docker): else: debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() in ('true', '1', 'yes') + # Set log level based on debug mode + logger.setLevel(logging.DEBUG if debug_mode else logging.INFO) + # Port if args.port is not None: port = args.port @@ -466,4 +590,6 @@ Environment Variables (Docker): print('=' * 60) print() + logger.info(f"Starting Mailcow Alias Manager - Debug: {debug_mode}, Host: {host}, Port: {port}") + app.run(debug=debug_mode, host=host, port=port) \ No newline at end of file