Files
malias-web/app.py
2026-01-23 14:07:09 +01:00

796 lines
32 KiB
Python

from flask import Flask, render_template, request, jsonify, redirect, url_for, session, make_response
from functools import wraps
from werkzeug.middleware.proxy_fix import ProxyFix
from datetime import timedelta
import malias_wrapper as malias_w
import os
import argparse
import sys
import secrets
import logging
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('mailcow-alias-manager')
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'malias-default-secret-key-please-change') # Consistent secret key
# Configure proxy mode based on environment variable
# ENABLE_PROXY=true: Behind reverse proxy (Authelia/Zoraxy/Nginx/etc.)
# ENABLE_PROXY=false: Direct IP:port access (default)
ENABLE_PROXY = os.getenv('ENABLE_PROXY', 'false').lower() in ('true', '1', 'yes')
if ENABLE_PROXY:
# Mode: Behind reverse proxy (Authelia/Zoraxy/Nginx/etc.)
logger.info("=" * 60)
logger.info(" ACCESS MODE: Reverse Proxy (ENABLE_PROXY=true)")
logger.info("=" * 60)
logger.info(" Configuration:")
logger.info(" - ProxyFix middleware: ACTIVE")
logger.info(" - Trusted proxies: 2 (e.g., Zoraxy + Authelia)")
logger.info(" - Cookie security: HTTPS only (Secure=True)")
logger.info(" - SameSite policy: None (cross-origin auth)")
logger.info(" - URL scheme: https://")
logger.info("=" * 60)
app.wsgi_app = ProxyFix(
app.wsgi_app,
x_for=2, # Trust X-Forwarded-For with 2 proxies
x_proto=2, # Trust X-Forwarded-Proto (http/https)
x_host=2, # Trust X-Forwarded-Host
x_prefix=2 # Trust X-Forwarded-Prefix
)
cookie_secure = True
cookie_samesite = 'None'
preferred_scheme = 'https'
else:
# Mode: Direct IP:port access (no proxy)
logger.info("=" * 60)
logger.info(" ACCESS MODE: Direct IP:Port (ENABLE_PROXY=false)")
logger.info("=" * 60)
logger.info(" Configuration:")
logger.info(" - ProxyFix middleware: DISABLED")
logger.info(" - Cookie security: HTTP allowed (Secure=False)")
logger.info(" - SameSite policy: Lax (standard mode)")
logger.info(" - URL scheme: http://")
logger.info(" - Access via: http://your-ip:5172")
logger.info("=" * 60)
# Don't apply ProxyFix for direct access
cookie_secure = False
cookie_samesite = 'Lax'
preferred_scheme = 'http'
# Initialize database on startup
malias_w.init_database()
# Session configuration - dynamic based on proxy mode
app.config.update(
PERMANENT_SESSION_LIFETIME=timedelta(hours=24),
SESSION_COOKIE_NAME='malias_session',
SESSION_COOKIE_SECURE=cookie_secure,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE=cookie_samesite,
SESSION_COOKIE_PATH='/',
SESSION_COOKIE_DOMAIN=None,
SESSION_REFRESH_EACH_REQUEST=True,
PREFERRED_URL_SCHEME=preferred_scheme
)
@app.after_request
def after_request(response):
"""Process the response before it's sent"""
if ENABLE_PROXY:
# Set CORS headers for proxy mode (needed for Authelia/Zoraxy)
response.headers['Access-Control-Allow-Origin'] = request.headers.get('Origin', '*')
response.headers['Access-Control-Allow-Credentials'] = 'true'
# Cache control (applies to both modes)
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
def get_authelia_user():
"""Helper to get authenticated user from Authelia headers or session cookie"""
# Check several possible header variations (ORDER MATTERS - most specific first!)
auth_headers = [
'X-Authelia-Username', # Authelia standard header (used by Zoraxy)
'Remote-User', # Common standard
'X-Remote-User',
'X-Forwarded-User',
'REMOTE_USER',
'Http-Remote-User',
'Http-X-Remote-User',
'X-Authenticated-User'
]
for header in auth_headers:
user = request.headers.get(header)
if user:
logger.info(f"✅ Authelia user detected via header '{header}': {user}")
return user
# Check Zoraxy forwarded headers (sometimes encoded differently)
if 'X-Forwarded-Headers' in request.headers:
try:
# Some reverse proxies encode headers as JSON
fwd_headers = json.loads(request.headers.get('X-Forwarded-Headers'))
for header in auth_headers:
if header in fwd_headers:
user = fwd_headers[header]
logger.info(f"✅ Authelia user detected via forwarded headers - {header}: {user}")
return user
except:
pass
# WORKAROUND: If Authelia session cookie exists and we're coming from auth.rune.pm,
# assume user is authenticated (Authelia not forwarding headers properly)
if ENABLE_PROXY and 'authelia_session' in request.cookies:
referer = request.headers.get('Referer', '')
if 'auth.rune.pm' in referer or request.headers.get('X-Forwarded-Proto') == 'https':
# Valid Authelia session exists - assume authenticated
# Use a generic identifier since we don't have the actual username
pseudo_user = f"authelia_user_{request.cookies.get('authelia_session')[:8]}"
logger.info(f"✅ Authelia authentication detected via session cookie (headers not forwarded)")
logger.info(f" Using pseudo-user identifier: {pseudo_user}")
logger.info(f" NOTE: Configure Authelia to forward Remote-User header for proper username")
return pseudo_user
# Log when no Authelia user found (for debugging)
if ENABLE_PROXY:
logger.debug("⚠️ No Authelia headers found in request")
logger.debug(f"Available headers: {list(request.headers.keys())}")
return None
def login_required(f):
"""Decorator to require login for routes"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Auto-login with Authelia (only when ENABLE_PROXY=true)
if ENABLE_PROXY:
authelia_user = get_authelia_user()
# If Authelia authenticated the user, auto-login
if authelia_user:
if not session.get('logged_in') or session.get('authelia_user') != authelia_user:
logger.info(f"🔐 Auto-login via Authelia in API route: {authelia_user}")
session.clear()
session.permanent = True
session['logged_in'] = True
session['authelia_user'] = authelia_user
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'authelia'
session.modified = True
# Store additional info (check both Remote-* and X-Authelia-* headers)
session['remote_email'] = (request.headers.get('X-Authelia-Email') or
request.headers.get('Remote-Email', ''))
session['remote_name'] = (request.headers.get('X-Authelia-DisplayName') or
request.headers.get('Remote-Name', ''))
session['remote_groups'] = (request.headers.get('X-Authelia-Groups') or
request.headers.get('Remote-Groups', ''))
logger.info(f"✅ Auto-login in API route: {authelia_user}")
return f(*args, **kwargs)
# Regular session check (when ENABLE_PROXY=false or no Authelia headers)
if not session.get('logged_in'):
logger.warning("Access denied: User not authenticated")
if request.is_json:
return jsonify({'status': 'error', 'message': 'Not authenticated', 'redirect': '/login'}), 401
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page or JSON login endpoint"""
# Validate session mode matches current proxy setting
if session.get('logged_in'):
session_mode = session.get('auth_method', 'unknown')
# Clear session if mode mismatch
if ENABLE_PROXY and session_mode == 'local':
logger.info("⚠️ Session mode mismatch: Clearing local session (proxy mode enabled)")
session.clear()
elif not ENABLE_PROXY and session_mode == 'authelia':
logger.info("⚠️ Session mode mismatch: Clearing authelia session (proxy mode disabled)")
session.clear()
# Auto-login when ENABLE_PROXY=true and Authelia headers are present
if ENABLE_PROXY:
authelia_user = get_authelia_user()
if authelia_user:
# User authenticated by Authelia - auto-login
if not session.get('logged_in'):
logger.info(f"🔐 Auto-login: User '{authelia_user}' authenticated by Authelia")
session.clear()
session.permanent = True
session['logged_in'] = True
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'authelia'
session['authelia_user'] = authelia_user
session.modified = True
# Get additional Authelia info (check both Remote-* and X-Authelia-* headers)
session['remote_email'] = (request.headers.get('X-Authelia-Email') or
request.headers.get('Remote-Email', ''))
session['remote_name'] = (request.headers.get('X-Authelia-DisplayName') or
request.headers.get('Remote-Name', ''))
session['remote_groups'] = (request.headers.get('X-Authelia-Groups') or
request.headers.get('Remote-Groups', ''))
logger.info(f"✅ Auto-login successful: {authelia_user} ({session.get('remote_email', 'no email')})")
# Already logged in via Authelia - redirect to main page
return redirect(url_for('index'))
else:
# ENABLE_PROXY=true but no Authelia headers found
logger.warning("⚠️ ENABLE_PROXY=true but no Authelia headers detected!")
logger.warning(" Make sure your reverse proxy forwards authentication headers")
logger.warning(f" Available headers: {list(request.headers.keys())}")
# Handle form submission for local authentication (only when ENABLE_PROXY=false)
if request.method == 'POST':
password = request.json.get('password', '')
logger.info("Login attempt with password (redacted)")
if malias_w.verify_password(password):
logger.info("Login successful with local password")
session.clear()
session.permanent = True
session['logged_in'] = True
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'local'
session.modified = True
# Return JSON response
response = jsonify({'status': 'success', 'message': 'Login successful'})
return response
else:
logger.warning("Login failed: Invalid password")
return jsonify({'status': 'error', 'message': 'Invalid password'})
# If already logged in, redirect to index
if session.get('logged_in'):
return redirect(url_for('index'))
# Check cookies and client IP for troubleshooting
if app.debug:
logger.info(f"Cookies: {request.cookies}")
logger.info(f"Client IP: {request.remote_addr}")
logger.info(f"X-Forwarded-For: {request.headers.get('X-Forwarded-For')}")
# Log all headers to see what's coming from Authelia
logger.info(f"All headers: {dict(request.headers)}")
# Show login form
return render_template('login.html')
@app.route('/logout')
def logout():
"""Logout"""
# Get auth method before clearing session
auth_method = session.get('auth_method')
authelia_user = session.get('authelia_user')
# Clear local session
session.clear()
# If user was authenticated via Authelia, redirect to app login (not Authelia logout)
# This keeps the Authelia session active for other apps
if ENABLE_PROXY and (auth_method == 'authelia' or authelia_user):
logger.info(f"Logout for Authelia user - redirecting to app login page")
# Just redirect back to login page - Authelia session stays active
response = redirect(url_for('login'))
response.set_cookie(app.config['SESSION_COOKIE_NAME'], '', expires=0)
return response
# Default case: redirect to login page
response = redirect(url_for('login'))
# Clear cookie by setting expired date
response.set_cookie(app.config['SESSION_COOKIE_NAME'], '', expires=0)
return response
@app.route('/')
def index():
"""Main page - requires login"""
# Auto-login with Authelia (only when ENABLE_PROXY=true)
if ENABLE_PROXY:
authelia_user = get_authelia_user()
if authelia_user and not session.get('logged_in'):
# Auto-login for users authenticated by Authelia
logger.info(f"🔐 Auto-login via Authelia for user: {authelia_user}")
session.clear()
session.permanent = True
session['logged_in'] = True
session['authelia_user'] = authelia_user
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'authelia'
session.modified = True
# Store additional Authelia info (check both Remote-* and X-Authelia-* headers)
session['remote_email'] = (request.headers.get('X-Authelia-Email') or
request.headers.get('Remote-Email', ''))
session['remote_name'] = (request.headers.get('X-Authelia-DisplayName') or
request.headers.get('Remote-Name', ''))
session['remote_groups'] = (request.headers.get('X-Authelia-Groups') or
request.headers.get('Remote-Groups', ''))
logger.info(f"✅ Auto-login successful: {authelia_user} ({session.get('remote_email', 'no email')})")
return render_template('index.html')
# Check if logged in
if not session.get('logged_in'):
return redirect(url_for('login'))
# Debug logging
if app.debug and session.get('logged_in'):
logger.info(f"User authenticated with method: {session.get('auth_method', 'unknown')}")
# Show main page
return render_template('index.html')
@app.route('/list_aliases', methods=['POST'])
@login_required
def list_aliases():
"""List all aliases from Mailcow with pagination"""
try:
connection = malias_w.get_settings_from_db()
if not connection:
return jsonify({'status': 'error', 'message': 'Please configure Mailcow server first'})
# Get page number from request, default to 1
page = request.json.get('page', 1) if request.json else 1
result = malias_w.get_all_aliases(page=page, per_page=20)
return jsonify({'status': 'success', 'data': result})
except Exception as e:
logger.exception("Error listing aliases")
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
@app.route('/sync_aliases', methods=['POST'])
@login_required
def sync_aliases():
"""Sync aliases from server to local DB"""
try:
count = malias_w.update_aliases()
result = f"Aliases synchronized successfully! Added {count} new aliases to local DB."
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error syncing aliases")
return jsonify({'status': 'error', 'message': f"Error syncing: {str(e)}"})
@app.route('/get_domains', methods=['POST'])
@login_required
def get_domains():
"""Get all mail domains"""
try:
domains = malias_w.get_domains()
domain_list = [d['domain_name'] for d in domains]
result = f"Domains: {', '.join(domain_list)}"
return jsonify({'status': 'success', 'message': result, 'domains': domain_list})
except Exception as e:
logger.exception("Error getting domains")
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
@app.route('/create_alias', methods=['POST'])
@login_required
def create_alias():
"""Create a new alias"""
try:
data = request.json
alias = data.get('alias', '')
goto = data.get('goto', '')
if not alias or not goto:
return jsonify({'status': 'error', 'message': 'Both alias and destination are required'})
malias_w.create_alias(alias, goto)
result = f"Alias {alias} created successfully for {goto}"
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error creating alias")
return jsonify({'status': 'error', 'message': f"Error creating alias: {str(e)}"})
@app.route('/delete_alias', methods=['POST'])
@login_required
def delete_alias_route():
"""Delete an alias"""
try:
data = request.json
alias = data.get('alias', '')
if not alias:
return jsonify({'status': 'error', 'message': 'Alias is required'})
malias_w.delete_alias(alias)
result = f"Alias {alias} deleted successfully"
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error deleting alias")
return jsonify({'status': 'error', 'message': f"Error deleting alias: {str(e)}"})
@app.route('/delete_aliases_bulk', methods=['POST'])
@login_required
def delete_aliases_bulk():
"""Delete multiple aliases at once"""
try:
data = request.json
aliases = data.get('aliases', [])
if not aliases or not isinstance(aliases, list):
return jsonify({'status': 'error', 'message': 'Aliases array is required'})
if len(aliases) == 0:
return jsonify({'status': 'error', 'message': 'No aliases provided'})
# Delete multiple aliases
result = malias_w.delete_multiple_aliases(aliases)
# Build response message
success_count = result['success_count']
failed_count = result['failed_count']
total_count = success_count + failed_count
if failed_count == 0:
message = f"{success_count} of {total_count} aliases deleted successfully"
return jsonify({
'status': 'success',
'message': message,
'details': result
})
elif success_count == 0:
# All failed
failed_list = ', '.join([f['alias'] for f in result['failed_aliases'][:3]])
if len(result['failed_aliases']) > 3:
failed_list += f" and {len(result['failed_aliases']) - 3} more"
message = f"Failed to delete aliases: {failed_list}"
return jsonify({
'status': 'error',
'message': message,
'details': result
})
else:
# Partial success
failed_list = ', '.join([f['alias'] for f in result['failed_aliases'][:3]])
if len(result['failed_aliases']) > 3:
failed_list += f" and {len(result['failed_aliases']) - 3} more"
message = f"{success_count} of {total_count} aliases deleted successfully. Failed: {failed_list}"
return jsonify({
'status': 'partial',
'message': message,
'details': result
})
except Exception as e:
logger.exception("Error deleting aliases bulk")
return jsonify({'status': 'error', 'message': f"Error deleting aliases: {str(e)}"})
@app.route('/create_timed_alias', methods=['POST'])
@login_required
def create_timed_alias():
"""Create a time-limited alias"""
try:
data = request.json
username = data.get('username', '')
domain = data.get('domain', '')
if not username or not domain:
return jsonify({'status': 'error', 'message': 'Both username and domain are required'})
malias_w.create_timed_alias(username, domain)
result = f"Timed alias created for {username} on domain {domain}"
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error creating timed alias")
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
@app.route('/search', methods=['POST'])
@login_required
def search():
"""Search for aliases"""
query = request.json.get('query', '')
if not query:
return jsonify({'status': 'error', 'message': 'No search query provided'})
try:
results = malias_w.search_aliases(query)
if results:
result_list = [f"{alias}{goto}" for alias, goto in results]
message = f"Found {len(results)} alias(es):\n" + "\n".join(result_list)
else:
message = f"No aliases found matching '{query}'"
return jsonify({'status': 'success', 'message': message, 'query': query})
except Exception as e:
logger.exception("Error searching aliases")
return jsonify({'status': 'error', 'message': f"Search error: {str(e)}"})
@app.route('/config', methods=['GET', 'POST'])
@login_required
def config_page():
"""Configuration management for Mailcow connection"""
if request.method == 'POST':
mailcow_server = request.json.get('mailcow_server', '')
mailcow_api_key = request.json.get('mailcow_api_key', '')
if mailcow_server and mailcow_api_key:
malias_w.set_connection_info(mailcow_server, mailcow_api_key)
return jsonify({'status': 'success', 'message': 'Mailcow configuration saved successfully'})
else:
return jsonify({'status': 'error', 'message': 'Server and API key are required'})
# GET request - return current config
try:
current_config = malias_w.get_config()
return jsonify(current_config)
except Exception as e:
return jsonify({'mailcow_server': '', 'mailcow_api_key': ''})
@app.route('/change_password', methods=['POST'])
@login_required
def change_password_route():
"""Change password"""
try:
data = request.json
old_password = data.get('old_password', '')
new_password = data.get('new_password', '')
confirm_password = data.get('confirm_password', '')
if not old_password or not new_password or not confirm_password:
return jsonify({'status': 'error', 'message': 'All fields are required'})
if new_password != confirm_password:
return jsonify({'status': 'error', 'message': 'New passwords do not match'})
if len(new_password) < 6:
return jsonify({'status': 'error', 'message': 'Password must be at least 6 characters'})
malias_w.change_password(old_password, new_password)
return jsonify({'status': 'success', 'message': 'Password changed successfully'})
except Exception as e:
logger.exception("Error changing password")
return jsonify({'status': 'error', 'message': str(e)})
# Add a health check endpoint for proxies
@app.route('/health')
def health_check():
"""Health check endpoint for proxies"""
return jsonify({
'status': 'ok',
'authenticated': session.get('logged_in', False),
'authelia_user': session.get('authelia_user', None),
'auth_method': session.get('auth_method'),
'version': '1.0.2',
'cookies': {k: '***' for k in request.cookies.keys()}, # Only show cookie names
'authelia_headers_present': get_authelia_user() is not None,
'zoraxy_headers_present': 'X-Forwarded-Server' in request.headers
})
# Add a debugging endpoint
@app.route('/debug')
def debug_info():
"""Show debug information about the current request"""
# Always allow this in production to help with troubleshooting
debug_data = {
'headers': dict(request.headers),
'cookies': {k: '***' for k in request.cookies.keys()}, # Don't expose cookie values
'session': {k: ('***' if k in ['user_token', 'csrf_token'] else v) for k, v in session.items()} if session else {},
'remote_addr': request.remote_addr,
'scheme': request.scheme,
'host': request.host,
'path': request.path,
'is_secure': request.is_secure,
'x_forwarded_for': request.headers.get('X-Forwarded-For'),
'x_forwarded_proto': request.headers.get('X-Forwarded-Proto'),
'x_forwarded_host': request.headers.get('X-Forwarded-Host'),
'x_forwarded_prefix': request.headers.get('X-Forwarded-Prefix'),
'remote_user': get_authelia_user(),
}
# Additional server information
debug_data['server_info'] = {
'cookie_settings': {
'SESSION_COOKIE_NAME': app.config['SESSION_COOKIE_NAME'],
'SESSION_COOKIE_SECURE': app.config['SESSION_COOKIE_SECURE'],
'SESSION_COOKIE_HTTPONLY': app.config['SESSION_COOKIE_HTTPONLY'],
'SESSION_COOKIE_SAMESITE': app.config['SESSION_COOKIE_SAMESITE'],
'SESSION_COOKIE_PATH': app.config['SESSION_COOKIE_PATH'],
},
'app_config': {
'DEBUG': app.debug,
'PREFERRED_URL_SCHEME': app.config['PREFERRED_URL_SCHEME'],
'ENABLE_PROXY': ENABLE_PROXY
}
}
response = jsonify(debug_data)
response.headers['Cache-Control'] = 'no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# Dedicated endpoint for header diagnostics
@app.route('/headers')
def show_headers():
"""Show all request headers - useful for debugging proxies"""
# Log headers to help diagnose issues with Zoraxy/Authelia
logger.info(f"Headers endpoint: All headers received: {dict(request.headers)}")
return jsonify({
'headers': dict(request.headers),
'remote_addr': request.remote_addr,
'authelia_user': get_authelia_user()
})
# Self-test endpoint for cookies
@app.route('/cookie-test')
def cookie_test():
"""Test cookie handling"""
# Clear any existing cookie
resp = make_response(jsonify({'status': 'ok', 'message': 'Cookie set test'}))
# Set a test cookie with the same attributes as session
resp.set_cookie(
'malias_test_cookie',
'test-value',
max_age=3600,
secure=app.config['SESSION_COOKIE_SECURE'],
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
samesite='None',
path=app.config['SESSION_COOKIE_PATH']
)
return resp
# Endpoint to check if test cookie is set
@app.route('/cookie-check')
def cookie_check():
"""Check if test cookie was properly set"""
test_cookie = request.cookies.get('malias_test_cookie')
return jsonify({
'test_cookie_present': test_cookie is not None,
'test_cookie_value': test_cookie if test_cookie else None,
'all_cookies': {k: '***' for k in request.cookies.keys()}
})
# New endpoint to test Zoraxy auth configuration
@app.route('/authelia-test')
def authelia_test():
"""Test if Authelia headers are correctly passed through Zoraxy"""
all_headers = dict(request.headers)
authelia_headers = {}
# Check for common Authelia-related headers
auth_related_headers = [
'Remote-User', 'X-Remote-User', 'Remote-Groups', 'X-Remote-Groups',
'Remote-Name', 'X-Remote-Name', 'Remote-Email', 'X-Remote-Email',
'X-Authelia-URL', 'X-Original-URL', 'X-Forwarded-Proto'
]
for header in auth_related_headers:
if header.lower() in [h.lower() for h in all_headers.keys()]:
for actual_header in all_headers.keys():
if header.lower() == actual_header.lower():
authelia_headers[actual_header] = all_headers[actual_header]
# Check for auth cookies
auth_cookies = {}
for cookie_name in request.cookies:
if 'auth' in cookie_name.lower():
auth_cookies[cookie_name] = '***' # Hide actual value
return jsonify({
'request_host': request.host,
'authelia_user_detected': get_authelia_user() is not None,
'authelia_user': get_authelia_user(),
'authelia_headers': authelia_headers,
'auth_cookies': auth_cookies,
'all_headers_count': len(all_headers),
'zoraxy_detected': any('zoraxy' in h.lower() for h in all_headers.keys()) or 'X-Forwarded-Server' in all_headers,
'host_header': request.headers.get('Host'),
'referer': request.headers.get('Referer'),
})
if __name__ == '__main__':
# Parse command-line arguments
parser = argparse.ArgumentParser(
description='Mailcow Alias Manager - Web interface for managing mail aliases',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
python3 app.py Run in normal mode on port 5172
python3 app.py --debug Run in debug mode with auto-reload
python3 app.py --port 8080 Run on custom port 8080
python3 app.py --debug --port 8080 Run in debug mode on port 8080
python3 app.py --host 127.0.0.1 Bind to localhost only
Environment Variables (Docker):
FLASK_DEBUG Set to 'True' for debug mode (default: False)
FLASK_PORT Port to run on (default: 5172)
FLASK_HOST Host to bind to (default: 0.0.0.0)
FLASK_ENV Set to 'production' or 'development'
'''
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug mode with auto-reload and detailed error messages'
)
parser.add_argument(
'--port',
type=int,
default=None,
help='Port to run on (default: 5172 or FLASK_PORT env var)'
)
parser.add_argument(
'--host',
type=str,
default=None,
help='Host to bind to (default: 0.0.0.0 or FLASK_HOST env var)'
)
args = parser.parse_args()
# Priority: Command-line args > Environment variables > Defaults
# Debug mode
if args.debug:
debug_mode = True
else:
debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() in ('true', '1', 'yes')
# Set log level based on debug mode
logger.setLevel(logging.DEBUG if debug_mode else logging.INFO)
# Port
if args.port is not None:
port = args.port
else:
port = int(os.getenv('FLASK_PORT', '5172'))
# Host
if args.host is not None:
host = args.host
else:
host = os.getenv('FLASK_HOST', '0.0.0.0')
# Print startup info
print('=' * 60)
print(' Mailcow Alias Manager - Starting...')
print('=' * 60)
print(f' Mode: {"DEBUG (Development)" if debug_mode else "NORMAL (Production)"}')
print(f' Host: {host}')
print(f' Port: {port}')
print(f' URL: http://{host}:{port}')
print('=' * 60)
if debug_mode:
print(' ⚠️ WARNING: Debug mode is enabled!')
print(' - Auto-reload on code changes')
print(' - Detailed error messages shown')
print(' - NOT suitable for production use')
print('=' * 60)
print()
logger.info(f"Starting Mailcow Alias Manager - Debug: {debug_mode}, Host: {host}, Port: {port}")
app.run(debug=debug_mode, host=host, port=port)