812 lines
32 KiB
Python
812 lines
32 KiB
Python
from flask import Flask, render_template, request, jsonify, redirect, url_for, session, make_response
|
|
from functools import wraps
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
from datetime import timedelta
|
|
import malias_wrapper as malias_w
|
|
import os
|
|
import argparse
|
|
import sys
|
|
import secrets
|
|
import logging
|
|
import json
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger('mailcow-alias-manager')
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.getenv('SECRET_KEY', 'malias-default-secret-key-please-change') # Consistent secret key
|
|
|
|
# Configure proxy mode based on environment variable
|
|
# ENABLE_PROXY=true: Behind reverse proxy (Authelia/Zoraxy/Nginx/etc.)
|
|
# ENABLE_PROXY=false: Direct IP:port access (default)
|
|
ENABLE_PROXY = os.getenv('ENABLE_PROXY', 'false').lower() in ('true', '1', 'yes')
|
|
|
|
if ENABLE_PROXY:
|
|
# Mode: Behind reverse proxy (Authelia/Zoraxy/Nginx/etc.)
|
|
logger.info("=" * 60)
|
|
logger.info(" ACCESS MODE: Reverse Proxy (ENABLE_PROXY=true)")
|
|
logger.info("=" * 60)
|
|
logger.info(" Configuration:")
|
|
logger.info(" - ProxyFix middleware: ACTIVE")
|
|
logger.info(" - Trusted proxies: 2 (e.g., Zoraxy + Authelia)")
|
|
logger.info(" - Cookie security: HTTPS only (Secure=True)")
|
|
logger.info(" - SameSite policy: None (cross-origin auth)")
|
|
logger.info(" - URL scheme: https://")
|
|
logger.info("=" * 60)
|
|
|
|
app.wsgi_app = ProxyFix(
|
|
app.wsgi_app,
|
|
x_for=2, # Trust X-Forwarded-For with 2 proxies
|
|
x_proto=2, # Trust X-Forwarded-Proto (http/https)
|
|
x_host=2, # Trust X-Forwarded-Host
|
|
x_prefix=2 # Trust X-Forwarded-Prefix
|
|
)
|
|
|
|
cookie_secure = True
|
|
cookie_samesite = 'None'
|
|
preferred_scheme = 'https'
|
|
else:
|
|
# Mode: Direct IP:port access (no proxy)
|
|
logger.info("=" * 60)
|
|
logger.info(" ACCESS MODE: Direct IP:Port (ENABLE_PROXY=false)")
|
|
logger.info("=" * 60)
|
|
logger.info(" Configuration:")
|
|
logger.info(" - ProxyFix middleware: DISABLED")
|
|
logger.info(" - Cookie security: HTTP allowed (Secure=False)")
|
|
logger.info(" - SameSite policy: Lax (standard mode)")
|
|
logger.info(" - URL scheme: http://")
|
|
logger.info(" - Access via: http://your-ip:5172")
|
|
logger.info("=" * 60)
|
|
|
|
# Don't apply ProxyFix for direct access
|
|
cookie_secure = False
|
|
cookie_samesite = 'Lax'
|
|
preferred_scheme = 'http'
|
|
|
|
# Initialize database on startup
|
|
malias_w.init_database()
|
|
|
|
# Session configuration - dynamic based on proxy mode
|
|
app.config.update(
|
|
PERMANENT_SESSION_LIFETIME=timedelta(hours=24),
|
|
SESSION_COOKIE_NAME='malias_session',
|
|
SESSION_COOKIE_SECURE=cookie_secure,
|
|
SESSION_COOKIE_HTTPONLY=True,
|
|
SESSION_COOKIE_SAMESITE=cookie_samesite,
|
|
SESSION_COOKIE_PATH='/',
|
|
SESSION_COOKIE_DOMAIN=None,
|
|
SESSION_REFRESH_EACH_REQUEST=True,
|
|
PREFERRED_URL_SCHEME=preferred_scheme
|
|
)
|
|
|
|
@app.after_request
|
|
def after_request(response):
|
|
"""Process the response before it's sent"""
|
|
if ENABLE_PROXY:
|
|
# Set CORS headers for proxy mode (needed for Authelia/Zoraxy)
|
|
response.headers['Access-Control-Allow-Origin'] = request.headers.get('Origin', '*')
|
|
response.headers['Access-Control-Allow-Credentials'] = 'true'
|
|
|
|
# Cache control (applies to both modes)
|
|
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
|
|
response.headers['Pragma'] = 'no-cache'
|
|
response.headers['Expires'] = '0'
|
|
|
|
return response
|
|
|
|
def get_authelia_user():
|
|
"""Helper to get authenticated user from Authelia headers or session cookie"""
|
|
# Check several possible header variations (ORDER MATTERS - most specific first!)
|
|
auth_headers = [
|
|
'X-Authelia-Username', # Authelia standard header (used by Zoraxy)
|
|
'Remote-User', # Common standard
|
|
'X-Remote-User',
|
|
'X-Forwarded-User',
|
|
'REMOTE_USER',
|
|
'Http-Remote-User',
|
|
'Http-X-Remote-User',
|
|
'X-Authenticated-User'
|
|
]
|
|
|
|
for header in auth_headers:
|
|
user = request.headers.get(header)
|
|
if user:
|
|
logger.info(f"✅ Authelia user detected via header '{header}': {user}")
|
|
return user
|
|
|
|
# Check Zoraxy forwarded headers (sometimes encoded differently)
|
|
if 'X-Forwarded-Headers' in request.headers:
|
|
try:
|
|
# Some reverse proxies encode headers as JSON
|
|
fwd_headers = json.loads(request.headers.get('X-Forwarded-Headers'))
|
|
for header in auth_headers:
|
|
if header in fwd_headers:
|
|
user = fwd_headers[header]
|
|
logger.info(f"✅ Authelia user detected via forwarded headers - {header}: {user}")
|
|
return user
|
|
except:
|
|
pass
|
|
|
|
# WORKAROUND: If Authelia session cookie exists and we're coming from auth.rune.pm,
|
|
# assume user is authenticated (Authelia not forwarding headers properly)
|
|
if ENABLE_PROXY and 'authelia_session' in request.cookies:
|
|
referer = request.headers.get('Referer', '')
|
|
if 'auth.rune.pm' in referer or request.headers.get('X-Forwarded-Proto') == 'https':
|
|
# Valid Authelia session exists - assume authenticated
|
|
# Use a generic identifier since we don't have the actual username
|
|
pseudo_user = f"authelia_user_{request.cookies.get('authelia_session')[:8]}"
|
|
logger.info(f"✅ Authelia authentication detected via session cookie (headers not forwarded)")
|
|
logger.info(f" Using pseudo-user identifier: {pseudo_user}")
|
|
logger.info(f" NOTE: Configure Authelia to forward Remote-User header for proper username")
|
|
return pseudo_user
|
|
|
|
# Log when no Authelia user found (for debugging)
|
|
if ENABLE_PROXY:
|
|
logger.debug("⚠️ No Authelia headers found in request")
|
|
logger.debug(f"Available headers: {list(request.headers.keys())}")
|
|
|
|
return None
|
|
|
|
def login_required(f):
|
|
"""Decorator to require login for routes"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# Auto-login with Authelia (only when ENABLE_PROXY=true)
|
|
if ENABLE_PROXY:
|
|
authelia_user = get_authelia_user()
|
|
|
|
# If Authelia authenticated the user, auto-login
|
|
if authelia_user:
|
|
if not session.get('logged_in') or session.get('authelia_user') != authelia_user:
|
|
logger.info(f"🔐 Auto-login via Authelia in API route: {authelia_user}")
|
|
session.clear()
|
|
session.permanent = True
|
|
session['logged_in'] = True
|
|
session['authelia_user'] = authelia_user
|
|
session['user_token'] = secrets.token_urlsafe(32)
|
|
session['auth_method'] = 'authelia'
|
|
session.modified = True
|
|
|
|
# Store additional info (check both Remote-* and X-Authelia-* headers)
|
|
session['remote_email'] = (request.headers.get('X-Authelia-Email') or
|
|
request.headers.get('Remote-Email', ''))
|
|
session['remote_name'] = (request.headers.get('X-Authelia-DisplayName') or
|
|
request.headers.get('Remote-Name', ''))
|
|
session['remote_groups'] = (request.headers.get('X-Authelia-Groups') or
|
|
request.headers.get('Remote-Groups', ''))
|
|
|
|
logger.info(f"✅ Auto-login in API route: {authelia_user}")
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
# Regular session check (when ENABLE_PROXY=false or no Authelia headers)
|
|
if not session.get('logged_in'):
|
|
logger.warning("Access denied: User not authenticated")
|
|
if request.is_json:
|
|
return jsonify({'status': 'error', 'message': 'Not authenticated', 'redirect': '/login'}), 401
|
|
return redirect(url_for('login'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
"""Login page or JSON login endpoint"""
|
|
|
|
# Validate session mode matches current proxy setting
|
|
if session.get('logged_in'):
|
|
session_mode = session.get('auth_method', 'unknown')
|
|
|
|
# Clear session if mode mismatch
|
|
if ENABLE_PROXY and session_mode == 'local':
|
|
logger.info("⚠️ Session mode mismatch: Clearing local session (proxy mode enabled)")
|
|
session.clear()
|
|
elif not ENABLE_PROXY and session_mode == 'authelia':
|
|
logger.info("⚠️ Session mode mismatch: Clearing authelia session (proxy mode disabled)")
|
|
session.clear()
|
|
|
|
# Auto-login when ENABLE_PROXY=true and Authelia headers are present
|
|
if ENABLE_PROXY:
|
|
authelia_user = get_authelia_user()
|
|
|
|
if authelia_user:
|
|
# User authenticated by Authelia - auto-login
|
|
if not session.get('logged_in'):
|
|
logger.info(f"🔐 Auto-login: User '{authelia_user}' authenticated by Authelia")
|
|
session.clear()
|
|
session.permanent = True
|
|
session['logged_in'] = True
|
|
session['user_token'] = secrets.token_urlsafe(32)
|
|
session['auth_method'] = 'authelia'
|
|
session['authelia_user'] = authelia_user
|
|
session.modified = True
|
|
|
|
# Get additional Authelia info (check both Remote-* and X-Authelia-* headers)
|
|
session['remote_email'] = (request.headers.get('X-Authelia-Email') or
|
|
request.headers.get('Remote-Email', ''))
|
|
session['remote_name'] = (request.headers.get('X-Authelia-DisplayName') or
|
|
request.headers.get('Remote-Name', ''))
|
|
session['remote_groups'] = (request.headers.get('X-Authelia-Groups') or
|
|
request.headers.get('Remote-Groups', ''))
|
|
|
|
logger.info(f"✅ Auto-login successful: {authelia_user} ({session.get('remote_email', 'no email')})")
|
|
|
|
# Already logged in via Authelia - redirect to main page
|
|
return redirect(url_for('index'))
|
|
else:
|
|
# ENABLE_PROXY=true but no Authelia headers found
|
|
logger.warning("⚠️ ENABLE_PROXY=true but no Authelia headers detected!")
|
|
logger.warning(" Make sure your reverse proxy forwards authentication headers")
|
|
logger.warning(f" Available headers: {list(request.headers.keys())}")
|
|
|
|
# Handle form submission for local authentication (only when ENABLE_PROXY=false)
|
|
if request.method == 'POST':
|
|
password = request.json.get('password', '')
|
|
logger.info("Login attempt with password (redacted)")
|
|
|
|
if malias_w.verify_password(password):
|
|
logger.info("Login successful with local password")
|
|
session.clear()
|
|
session.permanent = True
|
|
session['logged_in'] = True
|
|
session['user_token'] = secrets.token_urlsafe(32)
|
|
session['auth_method'] = 'local'
|
|
session.modified = True
|
|
|
|
# Return JSON response
|
|
response = jsonify({'status': 'success', 'message': 'Login successful'})
|
|
return response
|
|
else:
|
|
logger.warning("Login failed: Invalid password")
|
|
return jsonify({'status': 'error', 'message': 'Invalid password'})
|
|
|
|
# If already logged in, redirect to index
|
|
if session.get('logged_in'):
|
|
return redirect(url_for('index'))
|
|
|
|
# Check cookies and client IP for troubleshooting
|
|
if app.debug:
|
|
logger.info(f"Cookies: {request.cookies}")
|
|
logger.info(f"Client IP: {request.remote_addr}")
|
|
logger.info(f"X-Forwarded-For: {request.headers.get('X-Forwarded-For')}")
|
|
# Log all headers to see what's coming from Authelia
|
|
logger.info(f"All headers: {dict(request.headers)}")
|
|
|
|
# Show login form
|
|
return render_template('login.html')
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
"""Logout"""
|
|
# Get auth method before clearing session
|
|
auth_method = session.get('auth_method')
|
|
authelia_user = session.get('authelia_user')
|
|
|
|
# Clear local session
|
|
session.clear()
|
|
|
|
# If user was authenticated via Authelia, try to redirect to Authelia logout
|
|
if auth_method == 'authelia' or authelia_user:
|
|
# Look for Authelia URL in headers
|
|
authelia_url = request.headers.get('X-Authelia-URL')
|
|
|
|
# If found, redirect to Authelia logout
|
|
if authelia_url:
|
|
logger.info(f"Redirecting to Authelia logout: {authelia_url}/logout")
|
|
return redirect(f"{authelia_url}/logout")
|
|
|
|
# Try some common authelia URLs based on the request
|
|
if request.host:
|
|
domain_parts = request.host.split('.')
|
|
if len(domain_parts) >= 2:
|
|
base_domain = '.'.join(domain_parts[1:]) # e.g., extract 'example.com' from 'app.example.com'
|
|
common_authelia_urls = [
|
|
f"https://auth.{base_domain}/logout",
|
|
f"https://authelia.{base_domain}/logout",
|
|
f"https://sso.{base_domain}/logout"
|
|
]
|
|
|
|
# Try the first one as a fallback
|
|
logger.info(f"No Authelia URL header, trying fallback: {common_authelia_urls[0]}")
|
|
return redirect(common_authelia_urls[0])
|
|
|
|
# Default case: redirect to login page
|
|
response = redirect(url_for('login'))
|
|
|
|
# Clear cookie by setting expired date
|
|
response.set_cookie(app.config['SESSION_COOKIE_NAME'], '', expires=0)
|
|
|
|
return response
|
|
|
|
@app.route('/')
|
|
def index():
|
|
"""Main page - requires login"""
|
|
# Auto-login with Authelia (only when ENABLE_PROXY=true)
|
|
if ENABLE_PROXY:
|
|
authelia_user = get_authelia_user()
|
|
|
|
if authelia_user and not session.get('logged_in'):
|
|
# Auto-login for users authenticated by Authelia
|
|
logger.info(f"🔐 Auto-login via Authelia for user: {authelia_user}")
|
|
session.clear()
|
|
session.permanent = True
|
|
session['logged_in'] = True
|
|
session['authelia_user'] = authelia_user
|
|
session['user_token'] = secrets.token_urlsafe(32)
|
|
session['auth_method'] = 'authelia'
|
|
session.modified = True
|
|
|
|
# Store additional Authelia info (check both Remote-* and X-Authelia-* headers)
|
|
session['remote_email'] = (request.headers.get('X-Authelia-Email') or
|
|
request.headers.get('Remote-Email', ''))
|
|
session['remote_name'] = (request.headers.get('X-Authelia-DisplayName') or
|
|
request.headers.get('Remote-Name', ''))
|
|
session['remote_groups'] = (request.headers.get('X-Authelia-Groups') or
|
|
request.headers.get('Remote-Groups', ''))
|
|
|
|
logger.info(f"✅ Auto-login successful: {authelia_user} ({session.get('remote_email', 'no email')})")
|
|
return render_template('index.html')
|
|
|
|
# Check if logged in
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
|
|
# Debug logging
|
|
if app.debug and session.get('logged_in'):
|
|
logger.info(f"User authenticated with method: {session.get('auth_method', 'unknown')}")
|
|
|
|
# Show main page
|
|
return render_template('index.html')
|
|
|
|
@app.route('/list_aliases', methods=['POST'])
|
|
@login_required
|
|
def list_aliases():
|
|
"""List all aliases from Mailcow with pagination"""
|
|
try:
|
|
connection = malias_w.get_settings_from_db()
|
|
if not connection:
|
|
return jsonify({'status': 'error', 'message': 'Please configure Mailcow server first'})
|
|
|
|
# Get page number from request, default to 1
|
|
page = request.json.get('page', 1) if request.json else 1
|
|
|
|
result = malias_w.get_all_aliases(page=page, per_page=20)
|
|
return jsonify({'status': 'success', 'data': result})
|
|
except Exception as e:
|
|
logger.exception("Error listing aliases")
|
|
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
|
|
|
|
@app.route('/sync_aliases', methods=['POST'])
|
|
@login_required
|
|
def sync_aliases():
|
|
"""Sync aliases from server to local DB"""
|
|
try:
|
|
count = malias_w.update_aliases()
|
|
result = f"Aliases synchronized successfully! Added {count} new aliases to local DB."
|
|
return jsonify({'status': 'success', 'message': result})
|
|
except Exception as e:
|
|
logger.exception("Error syncing aliases")
|
|
return jsonify({'status': 'error', 'message': f"Error syncing: {str(e)}"})
|
|
|
|
@app.route('/get_domains', methods=['POST'])
|
|
@login_required
|
|
def get_domains():
|
|
"""Get all mail domains"""
|
|
try:
|
|
domains = malias_w.get_domains()
|
|
domain_list = [d['domain_name'] for d in domains]
|
|
result = f"Domains: {', '.join(domain_list)}"
|
|
return jsonify({'status': 'success', 'message': result, 'domains': domain_list})
|
|
except Exception as e:
|
|
logger.exception("Error getting domains")
|
|
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
|
|
|
|
@app.route('/create_alias', methods=['POST'])
|
|
@login_required
|
|
def create_alias():
|
|
"""Create a new alias"""
|
|
try:
|
|
data = request.json
|
|
alias = data.get('alias', '')
|
|
goto = data.get('goto', '')
|
|
|
|
if not alias or not goto:
|
|
return jsonify({'status': 'error', 'message': 'Both alias and destination are required'})
|
|
|
|
malias_w.create_alias(alias, goto)
|
|
result = f"Alias {alias} created successfully for {goto}"
|
|
return jsonify({'status': 'success', 'message': result})
|
|
except Exception as e:
|
|
logger.exception("Error creating alias")
|
|
return jsonify({'status': 'error', 'message': f"Error creating alias: {str(e)}"})
|
|
|
|
@app.route('/delete_alias', methods=['POST'])
|
|
@login_required
|
|
def delete_alias_route():
|
|
"""Delete an alias"""
|
|
try:
|
|
data = request.json
|
|
alias = data.get('alias', '')
|
|
|
|
if not alias:
|
|
return jsonify({'status': 'error', 'message': 'Alias is required'})
|
|
|
|
malias_w.delete_alias(alias)
|
|
result = f"Alias {alias} deleted successfully"
|
|
return jsonify({'status': 'success', 'message': result})
|
|
except Exception as e:
|
|
logger.exception("Error deleting alias")
|
|
return jsonify({'status': 'error', 'message': f"Error deleting alias: {str(e)}"})
|
|
|
|
@app.route('/delete_aliases_bulk', methods=['POST'])
|
|
@login_required
|
|
def delete_aliases_bulk():
|
|
"""Delete multiple aliases at once"""
|
|
try:
|
|
data = request.json
|
|
aliases = data.get('aliases', [])
|
|
|
|
if not aliases or not isinstance(aliases, list):
|
|
return jsonify({'status': 'error', 'message': 'Aliases array is required'})
|
|
|
|
if len(aliases) == 0:
|
|
return jsonify({'status': 'error', 'message': 'No aliases provided'})
|
|
|
|
# Delete multiple aliases
|
|
result = malias_w.delete_multiple_aliases(aliases)
|
|
|
|
# Build response message
|
|
success_count = result['success_count']
|
|
failed_count = result['failed_count']
|
|
total_count = success_count + failed_count
|
|
|
|
if failed_count == 0:
|
|
message = f"{success_count} of {total_count} aliases deleted successfully"
|
|
return jsonify({
|
|
'status': 'success',
|
|
'message': message,
|
|
'details': result
|
|
})
|
|
elif success_count == 0:
|
|
# All failed
|
|
failed_list = ', '.join([f['alias'] for f in result['failed_aliases'][:3]])
|
|
if len(result['failed_aliases']) > 3:
|
|
failed_list += f" and {len(result['failed_aliases']) - 3} more"
|
|
message = f"Failed to delete aliases: {failed_list}"
|
|
return jsonify({
|
|
'status': 'error',
|
|
'message': message,
|
|
'details': result
|
|
})
|
|
else:
|
|
# Partial success
|
|
failed_list = ', '.join([f['alias'] for f in result['failed_aliases'][:3]])
|
|
if len(result['failed_aliases']) > 3:
|
|
failed_list += f" and {len(result['failed_aliases']) - 3} more"
|
|
message = f"{success_count} of {total_count} aliases deleted successfully. Failed: {failed_list}"
|
|
return jsonify({
|
|
'status': 'partial',
|
|
'message': message,
|
|
'details': result
|
|
})
|
|
except Exception as e:
|
|
logger.exception("Error deleting aliases bulk")
|
|
return jsonify({'status': 'error', 'message': f"Error deleting aliases: {str(e)}"})
|
|
|
|
@app.route('/create_timed_alias', methods=['POST'])
|
|
@login_required
|
|
def create_timed_alias():
|
|
"""Create a time-limited alias"""
|
|
try:
|
|
data = request.json
|
|
username = data.get('username', '')
|
|
domain = data.get('domain', '')
|
|
|
|
if not username or not domain:
|
|
return jsonify({'status': 'error', 'message': 'Both username and domain are required'})
|
|
|
|
malias_w.create_timed_alias(username, domain)
|
|
result = f"Timed alias created for {username} on domain {domain}"
|
|
return jsonify({'status': 'success', 'message': result})
|
|
except Exception as e:
|
|
logger.exception("Error creating timed alias")
|
|
return jsonify({'status': 'error', 'message': f"Error: {str(e)}"})
|
|
|
|
@app.route('/search', methods=['POST'])
|
|
@login_required
|
|
def search():
|
|
"""Search for aliases"""
|
|
query = request.json.get('query', '')
|
|
|
|
if not query:
|
|
return jsonify({'status': 'error', 'message': 'No search query provided'})
|
|
|
|
try:
|
|
results = malias_w.search_aliases(query)
|
|
|
|
if results:
|
|
result_list = [f"{alias} → {goto}" for alias, goto in results]
|
|
message = f"Found {len(results)} alias(es):\n" + "\n".join(result_list)
|
|
else:
|
|
message = f"No aliases found matching '{query}'"
|
|
|
|
return jsonify({'status': 'success', 'message': message, 'query': query})
|
|
except Exception as e:
|
|
logger.exception("Error searching aliases")
|
|
return jsonify({'status': 'error', 'message': f"Search error: {str(e)}"})
|
|
|
|
@app.route('/config', methods=['GET', 'POST'])
|
|
@login_required
|
|
def config_page():
|
|
"""Configuration management for Mailcow connection"""
|
|
if request.method == 'POST':
|
|
mailcow_server = request.json.get('mailcow_server', '')
|
|
mailcow_api_key = request.json.get('mailcow_api_key', '')
|
|
|
|
if mailcow_server and mailcow_api_key:
|
|
malias_w.set_connection_info(mailcow_server, mailcow_api_key)
|
|
return jsonify({'status': 'success', 'message': 'Mailcow configuration saved successfully'})
|
|
else:
|
|
return jsonify({'status': 'error', 'message': 'Server and API key are required'})
|
|
|
|
# GET request - return current config
|
|
try:
|
|
current_config = malias_w.get_config()
|
|
return jsonify(current_config)
|
|
except Exception as e:
|
|
return jsonify({'mailcow_server': '', 'mailcow_api_key': ''})
|
|
|
|
@app.route('/change_password', methods=['POST'])
|
|
@login_required
|
|
def change_password_route():
|
|
"""Change password"""
|
|
try:
|
|
data = request.json
|
|
old_password = data.get('old_password', '')
|
|
new_password = data.get('new_password', '')
|
|
confirm_password = data.get('confirm_password', '')
|
|
|
|
if not old_password or not new_password or not confirm_password:
|
|
return jsonify({'status': 'error', 'message': 'All fields are required'})
|
|
|
|
if new_password != confirm_password:
|
|
return jsonify({'status': 'error', 'message': 'New passwords do not match'})
|
|
|
|
if len(new_password) < 6:
|
|
return jsonify({'status': 'error', 'message': 'Password must be at least 6 characters'})
|
|
|
|
malias_w.change_password(old_password, new_password)
|
|
return jsonify({'status': 'success', 'message': 'Password changed successfully'})
|
|
except Exception as e:
|
|
logger.exception("Error changing password")
|
|
return jsonify({'status': 'error', 'message': str(e)})
|
|
|
|
# Add a health check endpoint for proxies
|
|
@app.route('/health')
|
|
def health_check():
|
|
"""Health check endpoint for proxies"""
|
|
return jsonify({
|
|
'status': 'ok',
|
|
'authenticated': session.get('logged_in', False),
|
|
'authelia_user': session.get('authelia_user', None),
|
|
'auth_method': session.get('auth_method'),
|
|
'version': '1.0.2',
|
|
'cookies': {k: '***' for k in request.cookies.keys()}, # Only show cookie names
|
|
'authelia_headers_present': get_authelia_user() is not None,
|
|
'zoraxy_headers_present': 'X-Forwarded-Server' in request.headers
|
|
})
|
|
|
|
# Add a debugging endpoint
|
|
@app.route('/debug')
|
|
def debug_info():
|
|
"""Show debug information about the current request"""
|
|
# Always allow this in production to help with troubleshooting
|
|
debug_data = {
|
|
'headers': dict(request.headers),
|
|
'cookies': {k: '***' for k in request.cookies.keys()}, # Don't expose cookie values
|
|
'session': {k: ('***' if k in ['user_token', 'csrf_token'] else v) for k, v in session.items()} if session else {},
|
|
'remote_addr': request.remote_addr,
|
|
'scheme': request.scheme,
|
|
'host': request.host,
|
|
'path': request.path,
|
|
'is_secure': request.is_secure,
|
|
'x_forwarded_for': request.headers.get('X-Forwarded-For'),
|
|
'x_forwarded_proto': request.headers.get('X-Forwarded-Proto'),
|
|
'x_forwarded_host': request.headers.get('X-Forwarded-Host'),
|
|
'x_forwarded_prefix': request.headers.get('X-Forwarded-Prefix'),
|
|
'remote_user': get_authelia_user(),
|
|
}
|
|
|
|
# Additional server information
|
|
debug_data['server_info'] = {
|
|
'cookie_settings': {
|
|
'SESSION_COOKIE_NAME': app.config['SESSION_COOKIE_NAME'],
|
|
'SESSION_COOKIE_SECURE': app.config['SESSION_COOKIE_SECURE'],
|
|
'SESSION_COOKIE_HTTPONLY': app.config['SESSION_COOKIE_HTTPONLY'],
|
|
'SESSION_COOKIE_SAMESITE': app.config['SESSION_COOKIE_SAMESITE'],
|
|
'SESSION_COOKIE_PATH': app.config['SESSION_COOKIE_PATH'],
|
|
},
|
|
'app_config': {
|
|
'DEBUG': app.debug,
|
|
'PREFERRED_URL_SCHEME': app.config['PREFERRED_URL_SCHEME'],
|
|
'ENABLE_PROXY': ENABLE_PROXY
|
|
}
|
|
}
|
|
|
|
response = jsonify(debug_data)
|
|
response.headers['Cache-Control'] = 'no-store, must-revalidate'
|
|
response.headers['Pragma'] = 'no-cache'
|
|
response.headers['Expires'] = '0'
|
|
return response
|
|
|
|
# Dedicated endpoint for header diagnostics
|
|
@app.route('/headers')
|
|
def show_headers():
|
|
"""Show all request headers - useful for debugging proxies"""
|
|
# Log headers to help diagnose issues with Zoraxy/Authelia
|
|
logger.info(f"Headers endpoint: All headers received: {dict(request.headers)}")
|
|
return jsonify({
|
|
'headers': dict(request.headers),
|
|
'remote_addr': request.remote_addr,
|
|
'authelia_user': get_authelia_user()
|
|
})
|
|
|
|
# Self-test endpoint for cookies
|
|
@app.route('/cookie-test')
|
|
def cookie_test():
|
|
"""Test cookie handling"""
|
|
# Clear any existing cookie
|
|
resp = make_response(jsonify({'status': 'ok', 'message': 'Cookie set test'}))
|
|
|
|
# Set a test cookie with the same attributes as session
|
|
resp.set_cookie(
|
|
'malias_test_cookie',
|
|
'test-value',
|
|
max_age=3600,
|
|
secure=app.config['SESSION_COOKIE_SECURE'],
|
|
httponly=app.config['SESSION_COOKIE_HTTPONLY'],
|
|
samesite='None',
|
|
path=app.config['SESSION_COOKIE_PATH']
|
|
)
|
|
|
|
return resp
|
|
|
|
# Endpoint to check if test cookie is set
|
|
@app.route('/cookie-check')
|
|
def cookie_check():
|
|
"""Check if test cookie was properly set"""
|
|
test_cookie = request.cookies.get('malias_test_cookie')
|
|
return jsonify({
|
|
'test_cookie_present': test_cookie is not None,
|
|
'test_cookie_value': test_cookie if test_cookie else None,
|
|
'all_cookies': {k: '***' for k in request.cookies.keys()}
|
|
})
|
|
|
|
# New endpoint to test Zoraxy auth configuration
|
|
@app.route('/authelia-test')
|
|
def authelia_test():
|
|
"""Test if Authelia headers are correctly passed through Zoraxy"""
|
|
all_headers = dict(request.headers)
|
|
authelia_headers = {}
|
|
|
|
# Check for common Authelia-related headers
|
|
auth_related_headers = [
|
|
'Remote-User', 'X-Remote-User', 'Remote-Groups', 'X-Remote-Groups',
|
|
'Remote-Name', 'X-Remote-Name', 'Remote-Email', 'X-Remote-Email',
|
|
'X-Authelia-URL', 'X-Original-URL', 'X-Forwarded-Proto'
|
|
]
|
|
|
|
for header in auth_related_headers:
|
|
if header.lower() in [h.lower() for h in all_headers.keys()]:
|
|
for actual_header in all_headers.keys():
|
|
if header.lower() == actual_header.lower():
|
|
authelia_headers[actual_header] = all_headers[actual_header]
|
|
|
|
# Check for auth cookies
|
|
auth_cookies = {}
|
|
for cookie_name in request.cookies:
|
|
if 'auth' in cookie_name.lower():
|
|
auth_cookies[cookie_name] = '***' # Hide actual value
|
|
|
|
return jsonify({
|
|
'request_host': request.host,
|
|
'authelia_user_detected': get_authelia_user() is not None,
|
|
'authelia_user': get_authelia_user(),
|
|
'authelia_headers': authelia_headers,
|
|
'auth_cookies': auth_cookies,
|
|
'all_headers_count': len(all_headers),
|
|
'zoraxy_detected': any('zoraxy' in h.lower() for h in all_headers.keys()) or 'X-Forwarded-Server' in all_headers,
|
|
'host_header': request.headers.get('Host'),
|
|
'referer': request.headers.get('Referer'),
|
|
})
|
|
|
|
if __name__ == '__main__':
|
|
# Parse command-line arguments
|
|
parser = argparse.ArgumentParser(
|
|
description='Mailcow Alias Manager - Web interface for managing mail aliases',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='''
|
|
Examples:
|
|
python3 app.py Run in normal mode on port 5172
|
|
python3 app.py --debug Run in debug mode with auto-reload
|
|
python3 app.py --port 8080 Run on custom port 8080
|
|
python3 app.py --debug --port 8080 Run in debug mode on port 8080
|
|
python3 app.py --host 127.0.0.1 Bind to localhost only
|
|
|
|
Environment Variables (Docker):
|
|
FLASK_DEBUG Set to 'True' for debug mode (default: False)
|
|
FLASK_PORT Port to run on (default: 5172)
|
|
FLASK_HOST Host to bind to (default: 0.0.0.0)
|
|
FLASK_ENV Set to 'production' or 'development'
|
|
'''
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--debug',
|
|
action='store_true',
|
|
help='Enable debug mode with auto-reload and detailed error messages'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--port',
|
|
type=int,
|
|
default=None,
|
|
help='Port to run on (default: 5172 or FLASK_PORT env var)'
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--host',
|
|
type=str,
|
|
default=None,
|
|
help='Host to bind to (default: 0.0.0.0 or FLASK_HOST env var)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Priority: Command-line args > Environment variables > Defaults
|
|
# Debug mode
|
|
if args.debug:
|
|
debug_mode = True
|
|
else:
|
|
debug_mode = os.getenv('FLASK_DEBUG', 'False').lower() in ('true', '1', 'yes')
|
|
|
|
# Set log level based on debug mode
|
|
logger.setLevel(logging.DEBUG if debug_mode else logging.INFO)
|
|
|
|
# Port
|
|
if args.port is not None:
|
|
port = args.port
|
|
else:
|
|
port = int(os.getenv('FLASK_PORT', '5172'))
|
|
|
|
# Host
|
|
if args.host is not None:
|
|
host = args.host
|
|
else:
|
|
host = os.getenv('FLASK_HOST', '0.0.0.0')
|
|
|
|
# Print startup info
|
|
print('=' * 60)
|
|
print(' Mailcow Alias Manager - Starting...')
|
|
print('=' * 60)
|
|
print(f' Mode: {"DEBUG (Development)" if debug_mode else "NORMAL (Production)"}')
|
|
print(f' Host: {host}')
|
|
print(f' Port: {port}')
|
|
print(f' URL: http://{host}:{port}')
|
|
print('=' * 60)
|
|
if debug_mode:
|
|
print(' ⚠️ WARNING: Debug mode is enabled!')
|
|
print(' - Auto-reload on code changes')
|
|
print(' - Detailed error messages shown')
|
|
print(' - NOT suitable for production use')
|
|
print('=' * 60)
|
|
print()
|
|
|
|
logger.info(f"Starting Mailcow Alias Manager - Debug: {debug_mode}, Host: {host}, Port: {port}")
|
|
|
|
app.run(debug=debug_mode, host=host, port=port) |