Files
malias-web/app.py

746 lines
28 KiB
Python

from flask import Flask, render_template, request, jsonify, redirect, url_for, session, make_response
from functools import wraps
from werkzeug.middleware.proxy_fix import ProxyFix
from datetime import timedelta
import malias_wrapper as malias_w
import os
import argparse
import sys
import secrets
import logging
import json
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger('mailcow-alias-manager')
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', 'malias-default-secret-key-please-change') # Consistent secret key
# Configure for reverse proxy - modified to handle multiple proxy layers including Authelia
# Increasing the number of proxies to handle Zoraxy->Authelia->App chain
app.wsgi_app = ProxyFix(
app.wsgi_app,
x_for=2, # Trust X-Forwarded-For with 2 proxies (Zoraxy + Authelia)
x_proto=2, # Trust X-Forwarded-Proto (http/https)
x_host=2, # Trust X-Forwarded-Host
x_prefix=2 # Trust X-Forwarded-Prefix
)
# Initialize database on startup
malias_w.init_database()
# Session configuration optimized for reverse proxy with Authelia
app.config.update(
PERMANENT_SESSION_LIFETIME=timedelta(hours=24),
SESSION_COOKIE_NAME='malias_session', # Unique name to avoid conflicts with Authelia
SESSION_COOKIE_SECURE=True, # Always use secure cookies with Authelia
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='None', # Required for authentication proxies
SESSION_COOKIE_PATH='/',
SESSION_COOKIE_DOMAIN=None, # Let browser auto-set domain
SESSION_REFRESH_EACH_REQUEST=True, # Keep session alive
PREFERRED_URL_SCHEME='https'
)
# Set this to True to use the special cookie fix for Zoraxy
ZORAXY_COOKIE_FIX = True
def fix_cookie_for_zoraxy(response):
"""Special handler for Zoraxy cookie issues with SameSite=None"""
if not ZORAXY_COOKIE_FIX:
return response
# Get all cookies from the response
cookies = response.headers.getlist('Set-Cookie')
if not cookies:
return response
# Clear existing cookies
del response.headers['Set-Cookie']
# Fix each cookie and add it back
for cookie in cookies:
if 'malias_session' in cookie and 'SameSite' not in cookie:
# Add SameSite=None and Secure attributes
if 'HttpOnly' in cookie:
cookie = cookie.replace('HttpOnly', 'HttpOnly; SameSite=None; Secure')
else:
cookie += '; SameSite=None; Secure'
response.headers.add('Set-Cookie', cookie)
return response
@app.after_request
def after_request(response):
"""Process the response before it's sent"""
# Apply special cookie fix for Zoraxy
response = fix_cookie_for_zoraxy(response)
# Set CORS headers to allow Zoraxy and Authelia to work together
response.headers['Access-Control-Allow-Origin'] = request.headers.get('Origin', '*')
response.headers['Access-Control-Allow-Credentials'] = 'true'
# Cache control
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
def get_authelia_user():
"""Helper to get authenticated user from Authelia headers"""
# Check several possible header variations
auth_headers = [
'Remote-User',
'X-Remote-User',
'X-Authelia-Username',
'X-Forwarded-User',
'REMOTE_USER',
'Http-Remote-User',
'Http-X-Remote-User',
'X-Authenticated-User'
]
for header in auth_headers:
user = request.headers.get(header)
if user:
logger.info(f"Authelia user detected via {header}: {user}")
return user
# Check Zoraxy forwarded headers (sometimes encoded differently)
if 'X-Forwarded-Headers' in request.headers:
try:
# Some reverse proxies encode headers as JSON
fwd_headers = json.loads(request.headers.get('X-Forwarded-Headers'))
for header in auth_headers:
if header in fwd_headers:
user = fwd_headers[header]
logger.info(f"Authelia user detected via forwarded headers - {header}: {user}")
return user
except:
pass
return None
def login_required(f):
"""Decorator to require login for routes"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Check for Authelia authentication
authelia_user = get_authelia_user()
# If Authelia authenticated the user, update local session
if authelia_user:
# Log all headers for debugging
if app.debug:
logger.info(f"Headers for authenticated request: {dict(request.headers)}")
if not session.get('logged_in') or session.get('authelia_user') != authelia_user:
logger.info(f"Auto-login via Authelia for user: {authelia_user}")
session.clear()
session.permanent = True
session['logged_in'] = True
session['authelia_user'] = authelia_user
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'authelia'
session.modified = True
return f(*args, **kwargs)
# Regular session check
if not session.get('logged_in'):
logger.warning("Access denied: User not authenticated")
if request.is_json:
return jsonify({'status': 'error', 'message': 'Not authenticated', 'redirect': '/login'}), 401
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
@app.route('/login', methods=['GET', 'POST'])
def login():
"""Login page"""
# First, try Authelia authentication
authelia_user = get_authelia_user()
# Debug logging for all requests
if app.debug:
logger.info(f"Login route: method={request.method}, headers={dict(request.headers)}")
# If Authelia authenticated, login and redirect to index
if authelia_user:
logger.info(f"Login via Authelia for user: {authelia_user}")
session.clear()
session.permanent = True
session['logged_in'] = True
session['authelia_user'] = authelia_user
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'authelia'
session.modified = True
# Set a cookie manually to ensure it's properly formatted for Zoraxy
response = redirect(url_for('index'))
# Set cookie parameters to work with Zoraxy/Authelia
response.set_cookie(
key=app.config['SESSION_COOKIE_NAME'],
value=request.cookies.get(app.config['SESSION_COOKIE_NAME']),
max_age=int(app.config['PERMANENT_SESSION_LIFETIME'].total_seconds()),
path=app.config['SESSION_COOKIE_PATH'],
secure=app.config['SESSION_COOKIE_SECURE'],
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
samesite='None'
)
return response
# Handle form submission for local authentication
if request.method == 'POST':
password = request.json.get('password', '')
logger.info("Login attempt with password (redacted)")
if malias_w.verify_password(password):
logger.info("Login successful with local password")
session.clear()
session.permanent = True
session['logged_in'] = True
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'local'
session.modified = True
# Return JSON response
response = jsonify({'status': 'success', 'message': 'Login successful'})
# Manually set cookie with correct parameters for Zoraxy
if ZORAXY_COOKIE_FIX:
max_age = int(app.config['PERMANENT_SESSION_LIFETIME'].total_seconds())
cookie_value = request.cookies.get(app.config['SESSION_COOKIE_NAME']) or session.sid
response.set_cookie(
app.config['SESSION_COOKIE_NAME'],
cookie_value,
max_age=max_age,
secure=app.config['SESSION_COOKIE_SECURE'],
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
samesite='None',
path=app.config['SESSION_COOKIE_PATH']
)
logger.info(f"Set fixed cookie for Zoraxy: {app.config['SESSION_COOKIE_NAME']}")
return response
else:
logger.warning("Login failed: Invalid password")
return jsonify({'status': 'error', 'message': 'Invalid password'})
# If already logged in, redirect to index
if session.get('logged_in'):
return redirect(url_for('index'))
# Check cookies and client IP for troubleshooting
if app.debug:
logger.info(f"Cookies: {request.cookies}")
logger.info(f"Client IP: {request.remote_addr}")
logger.info(f"X-Forwarded-For: {request.headers.get('X-Forwarded-For')}")
# Show login form
return render_template('login.html')
@app.route('/logout')
def logout():
"""Logout"""
# Get auth method before clearing session
auth_method = session.get('auth_method')
authelia_user = session.get('authelia_user')
# Clear local session
session.clear()
# If user was authenticated via Authelia, try to redirect to Authelia logout
if auth_method == 'authelia' or authelia_user:
# Look for Authelia URL in headers
authelia_url = request.headers.get('X-Authelia-URL')
# If found, redirect to Authelia logout
if authelia_url:
logger.info(f"Redirecting to Authelia logout: {authelia_url}/logout")
return redirect(f"{authelia_url}/logout")
# Try some common authelia URLs based on the request
if request.host:
domain_parts = request.host.split('.')
if len(domain_parts) >= 2:
base_domain = '.'.join(domain_parts[1:]) # e.g., extract 'example.com' from 'app.example.com'
common_authelia_urls = [
f"https://auth.{base_domain}/logout",
f"https://authelia.{base_domain}/logout",
f"https://sso.{base_domain}/logout"
]
# Try the first one as a fallback
logger.info(f"No Authelia URL header, trying fallback: {common_authelia_urls[0]}")
return redirect(common_authelia_urls[0])
# Default case: redirect to login page
response = redirect(url_for('login'))
# Clear cookie by setting expired date
response.set_cookie(app.config['SESSION_COOKIE_NAME'], '', expires=0)
return response
@app.route('/')
def index():
"""Main page - requires login"""
# Try to auto-login with Authelia
authelia_user = get_authelia_user()
if authelia_user and not session.get('logged_in'):
# Auto-login for users authenticated by Authelia
logger.info(f"Auto-login via Authelia for user: {authelia_user}")
session.clear()
session.permanent = True
session['logged_in'] = True
session['authelia_user'] = authelia_user
session['user_token'] = secrets.token_urlsafe(32)
session['auth_method'] = 'authelia'
session.modified = True
# Set cookie manually with correct parameters
response = make_response(render_template('index.html'))
if ZORAXY_COOKIE_FIX:
max_age = int(app.config['PERMANENT_SESSION_LIFETIME'].total_seconds())
cookie_value = request.cookies.get(app.config['SESSION_COOKIE_NAME']) or session.sid
response.set_cookie(
app.config['SESSION_COOKIE_NAME'],
cookie_value,
max_age=max_age,
secure=app.config['SESSION_COOKIE_SECURE'],
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
samesite='None',
path=app.config['SESSION_COOKIE_PATH']
)
logger.info(f"Set fixed cookie for Zoraxy on index: {app.config['SESSION_COOKIE_NAME']}")
return response
# Check if logged in
if not session.get('logged_in'):
return redirect(url_for('login'))
# Debug logging
if app.debug and session.get('logged_in'):
logger.info(f"User authenticated with method: {session.get('auth_method', 'unknown')}")
# Show main page
return render_template('index.html')
@app.route('/list_aliases', methods=['POST'])
@login_required
def list_aliases():
"""List all aliases from Mailcow with pagination"""
try:
connection = malias_w.get_settings_from_db()
if not connection:
return jsonify({'status': 'error', 'message': 'Please configure Mailcow server first'})
# Get page number from request, default to 1
page = request.json.get('page', 1) if request.json else 1
result = malias_w.get_all_aliases(page=page, per_page=20)
return jsonify({'status': 'success', 'data': result})
except Exception as e:
logger.exception("Error listing aliases")
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
@app.route('/sync_aliases', methods=['POST'])
@login_required
def sync_aliases():
"""Sync aliases from server to local DB"""
try:
count = malias_w.update_aliases()
result = f"Aliases synchronized successfully! Added {count} new aliases to local DB."
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error syncing aliases")
return jsonify({'status': 'error', 'message': f"Error syncing: {str(e)}"})
@app.route('/get_domains', methods=['POST'])
@login_required
def get_domains():
"""Get all mail domains"""
try:
domains = malias_w.get_domains()
domain_list = [d['domain_name'] for d in domains]
result = f"Domains: {', '.join(domain_list)}"
return jsonify({'status': 'success', 'message': result, 'domains': domain_list})
except Exception as e:
logger.exception("Error getting domains")
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
@app.route('/create_alias', methods=['POST'])
@login_required
def create_alias():
"""Create a new alias"""
try:
data = request.json
alias = data.get('alias', '')
goto = data.get('goto', '')
if not alias or not goto:
return jsonify({'status': 'error', 'message': 'Both alias and destination are required'})
malias_w.create_alias(alias, goto)
result = f"Alias {alias} created successfully for {goto}"
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error creating alias")
return jsonify({'status': 'error', 'message': f"Error creating alias: {str(e)}"})
@app.route('/delete_alias', methods=['POST'])
@login_required
def delete_alias_route():
"""Delete an alias"""
try:
data = request.json
alias = data.get('alias', '')
if not alias:
return jsonify({'status': 'error', 'message': 'Alias is required'})
malias_w.delete_alias(alias)
result = f"Alias {alias} deleted successfully"
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error deleting alias")
return jsonify({'status': 'error', 'message': f"Error deleting alias: {str(e)}"})
@app.route('/delete_aliases_bulk', methods=['POST'])
@login_required
def delete_aliases_bulk():
"""Delete multiple aliases at once"""
try:
data = request.json
aliases = data.get('aliases', [])
if not aliases or not isinstance(aliases, list):
return jsonify({'status': 'error', 'message': 'Aliases array is required'})
if len(aliases) == 0:
return jsonify({'status': 'error', 'message': 'No aliases provided'})
# Delete multiple aliases
result = malias_w.delete_multiple_aliases(aliases)
# Build response message
success_count = result['success_count']
failed_count = result['failed_count']
total_count = success_count + failed_count
if failed_count == 0:
message = f"{success_count} of {total_count} aliases deleted successfully"
return jsonify({
'status': 'success',
'message': message,
'details': result
})
elif success_count == 0:
# All failed
failed_list = ', '.join([f['alias'] for f in result['failed_aliases'][:3]])
if len(result['failed_aliases']) > 3:
failed_list += f" and {len(result['failed_aliases']) - 3} more"
message = f"Failed to delete aliases: {failed_list}"
return jsonify({
'status': 'error',
'message': message,
'details': result
})
else:
# Partial success
failed_list = ', '.join([f['alias'] for f in result['failed_aliases'][:3]])
if len(result['failed_aliases']) > 3:
failed_list += f" and {len(result['failed_aliases']) - 3} more"
message = f"{success_count} of {total_count} aliases deleted successfully. Failed: {failed_list}"
return jsonify({
'status': 'partial',
'message': message,
'details': result
})
except Exception as e:
logger.exception("Error deleting aliases bulk")
return jsonify({'status': 'error', 'message': f"Error deleting aliases: {str(e)}"})
@app.route('/create_timed_alias', methods=['POST'])
@login_required
def create_timed_alias():
"""Create a time-limited alias"""
try:
data = request.json
username = data.get('username', '')
domain = data.get('domain', '')
if not username or not domain:
return jsonify({'status': 'error', 'message': 'Both username and domain are required'})
malias_w.create_timed_alias(username, domain)
result = f"Timed alias created for {username} on domain {domain}"
return jsonify({'status': 'success', 'message': result})
except Exception as e:
logger.exception("Error creating timed alias")
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
@app.route('/search', methods=['POST'])
@login_required
def search():
"""Search for aliases"""
query = request.json.get('query', '')
if not query:
return jsonify({'status': 'error', 'message': 'No search query provided'})
try:
results = malias_w.search_aliases(query)
if results:
result_list = [f"{alias}{goto}" for alias, goto in results]
message = f"Found {len(results)} alias(es):\n" + "\n".join(result_list)
else:
message = f"No aliases found matching '{query}'"
return jsonify({'status': 'success', 'message': message, 'query': query})
except Exception as e:
logger.exception("Error searching aliases")
return jsonify({'status': 'error', 'message': f"Search error: {str(e)}"})
@app.route('/config', methods=['GET', 'POST'])
@login_required
def config_page():
"""Configuration management for Mailcow connection"""
if request.method == 'POST':
mailcow_server = request.json.get('mailcow_server', '')
mailcow_api_key = request.json.get('mailcow_api_key', '')
if mailcow_server and mailcow_api_key:
malias_w.set_connection_info(mailcow_server, mailcow_api_key)
return jsonify({'status': 'success', 'message': 'Mailcow configuration saved successfully'})
else:
return jsonify({'status': 'error', 'message': 'Server and API key are required'})
# GET request - return current config
try:
current_config = malias_w.get_config()
return jsonify(current_config)
except Exception as e:
return jsonify({'mailcow_server': '', 'mailcow_api_key': ''})
@app.route('/change_password', methods=['POST'])
@login_required
def change_password_route():
"""Change password"""
try:
data = request.json
old_password = data.get('old_password', '')
new_password = data.get('new_password', '')
confirm_password = data.get('confirm_password', '')
if not old_password or not new_password or not confirm_password:
return jsonify({'status': 'error', 'message': 'All fields are required'})
if new_password != confirm_password:
return jsonify({'status': 'error', 'message': 'New passwords do not match'})
if len(new_password) < 6:
return jsonify({'status': 'error', 'message': 'Password must be at least 6 characters'})
malias_w.change_password(old_password, new_password)
return jsonify({'status': 'success', 'message': 'Password changed successfully'})
except Exception as e:
logger.exception("Error changing password")
return jsonify({'status': 'error', 'message': str(e)})
# Add a health check endpoint for proxies
@app.route('/health')
def health_check():
"""Health check endpoint for proxies"""
return jsonify({
'status': 'ok',
'authenticated': session.get('logged_in', False),
'authelia_user': session.get('authelia_user', None),
'auth_method': session.get('auth_method'),
'version': '1.0.2',
'cookies': {k: '***' for k in request.cookies.keys()}, # Only show cookie names
'authelia_headers_present': get_authelia_user() is not None,
'zoraxy_headers_present': 'X-Forwarded-Server' in request.headers
})
# Add a debugging endpoint
@app.route('/debug')
def debug_info():
"""Show debug information about the current request"""
# Always allow this in production to help with troubleshooting
debug_data = {
'headers': dict(request.headers),
'cookies': {k: '***' for k in request.cookies.keys()}, # Don't expose cookie values
'session': {k: ('***' if k in ['user_token', 'csrf_token'] else v) for k, v in session.items()} if session else {},
'remote_addr': request.remote_addr,
'scheme': request.scheme,
'host': request.host,
'path': request.path,
'is_secure': request.is_secure,
'x_forwarded_for': request.headers.get('X-Forwarded-For'),
'x_forwarded_proto': request.headers.get('X-Forwarded-Proto'),
'x_forwarded_host': request.headers.get('X-Forwarded-Host'),
'x_forwarded_prefix': request.headers.get('X-Forwarded-Prefix'),
'remote_user': get_authelia_user(),
}
# Additional server information
debug_data['server_info'] = {
'cookie_settings': {
'SESSION_COOKIE_NAME': app.config['SESSION_COOKIE_NAME'],
'SESSION_COOKIE_SECURE': app.config['SESSION_COOKIE_SECURE'],
'SESSION_COOKIE_HTTPONLY': app.config['SESSION_COOKIE_HTTPONLY'],
'SESSION_COOKIE_SAMESITE': app.config['SESSION_COOKIE_SAMESITE'],
'SESSION_COOKIE_PATH': app.config['SESSION_COOKIE_PATH'],
},
'app_config': {
'DEBUG': app.debug,
'PREFERRED_URL_SCHEME': app.config['PREFERRED_URL_SCHEME'],
'ZORAXY_COOKIE_FIX': ZORAXY_COOKIE_FIX
}
}
response = jsonify(debug_data)
response.headers['Cache-Control'] = 'no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
# Dedicated endpoint for header diagnostics
@app.route('/headers')
def show_headers():
"""Show all request headers - useful for debugging proxies"""
return jsonify({
'headers': dict(request.headers),
'remote_addr': request.remote_addr,
'authelia_user': get_authelia_user()
})
# Self-test endpoint for cookies
@app.route('/cookie-test')
def cookie_test():
"""Test cookie handling"""
# Clear any existing cookie
resp = make_response(jsonify({'status': 'ok', 'message': 'Cookie set test'}))
# Set a test cookie with the same attributes as session
resp.set_cookie(
'malias_test_cookie',
'test-value',
max_age=3600,
secure=app.config['SESSION_COOKIE_SECURE'],
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
samesite='None',
path=app.config['SESSION_COOKIE_PATH']
)
return resp
# Endpoint to check if test cookie is set
@app.route('/cookie-check')
def cookie_check():
"""Check if test cookie was properly set"""
test_cookie = request.cookies.get('malias_test_cookie')
return jsonify({
'test_cookie_present': test_cookie is not None,
'test_cookie_value': test_cookie if test_cookie else None,
'all_cookies': {k: '***' for k in request.cookies.keys()}
})
if __name__ == '__main__':
# Parse command-line arguments
parser = argparse.ArgumentParser(
description='Mailcow Alias Manager - Web interface for managing mail aliases',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
python3 app.py Run in normal mode on port 5172
python3 app.py --debug Run in debug mode with auto-reload
python3 app.py --port 8080 Run on custom port 8080
python3 app.py --debug --port 8080 Run in debug mode on port 8080
python3 app.py --host 127.0.0.1 Bind to localhost only
Environment Variables (Docker):
FLASK_DEBUG Set to 'True' for debug mode (default: False)
FLASK_PORT Port to run on (default: 5172)
FLASK_HOST Host to bind to (default: 0.0.0.0)
FLASK_ENV Set to 'production' or 'development'
'''
)
parser.add_argument(
'--debug',
action='store_true',
help='Enable debug mode with auto-reload and detailed error messages'
)
parser.add_argument(
'--port',
type=int,
default=None,
help='Port to run on (default: 5172 or FLASK_PORT env var)'
)
parser.add_argument(
'--host',
type=str,
default=None,
help='Host to bind to (default: 0.0.0.0 or FLASK_HOST env var)'
)
args = parser.parse_args()
# Priority: Command-line args > Environment variables > Defaults
# Debug mode
if args.debug:
debug_mode = True
else:
debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() in ('true', '1', 'yes')
# Set log level based on debug mode
logger.setLevel(logging.DEBUG if debug_mode else logging.INFO)
# Port
if args.port is not None:
port = args.port
else:
port = int(os.getenv('FLASK_PORT', '5172'))
# Host
if args.host is not None:
host = args.host
else:
host = os.getenv('FLASK_HOST', '0.0.0.0')
# Print startup info
print('=' * 60)
print(' Mailcow Alias Manager - Starting...')
print('=' * 60)
print(f' Mode: {"DEBUG (Development)" if debug_mode else "NORMAL (Production)"}')
print(f' Host: {host}')
print(f' Port: {port}')
print(f' URL: http://{host}:{port}')
print('=' * 60)
if debug_mode:
print(' ⚠️ WARNING: Debug mode is enabled!')
print(' - Auto-reload on code changes')
print(' - Detailed error messages shown')
print(' - NOT suitable for production use')
print('=' * 60)
print()
logger.info(f"Starting Mailcow Alias Manager - Debug: {debug_mode}, Host: {host}, Port: {port}")
app.run(debug=debug_mode, host=host, port=port)