""" Wrapper functions for malias to handle SQLite threading issues in Flask """ import sqlite3 from pathlib import Path import httpx import json from datetime import datetime import os import bcrypt # Use project subfolder for database project_dir = Path(__file__).parent filepath = project_dir.joinpath('data') database = filepath.joinpath('malias2.db') logfile = filepath.joinpath('malias2.log') def init_database(): """Initialize database schema if it doesn't exist""" Path(filepath).mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(database) cursor = conn.cursor() # Create tables cursor.execute('''CREATE TABLE IF NOT EXISTS settings ( id INTEGER NOT NULL PRIMARY KEY, first_run INTEGER, server TEXT, apikey TEXT NOT NULL, data_copy INTEGER )''') cursor.execute('''CREATE TABLE IF NOT EXISTS aliases (id integer NOT NULL PRIMARY KEY, alias text NOT NULL, goto text NOT NULL, created text NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS timedaliases (id integer NOT NULL PRIMARY KEY, alias text NOT NULL, goto text NOT NULL, validity text NOT NULL)''') cursor.execute('''CREATE TABLE IF NOT EXISTS dbversion (version integer NOT NULL DEFAULT 0)''') cursor.execute('''CREATE TABLE IF NOT EXISTS auth (id integer NOT NULL PRIMARY KEY, password_hash text NOT NULL)''') # Check if settings exist, if not create default cursor.execute('SELECT count(*) FROM settings') count = cursor.fetchone()[0] if count == 0: cursor.execute('INSERT INTO settings VALUES (?, ?, ?, ?, ?)', (0, 1, 'dummy.server', 'DUMMY_KEY', 0)) # Check if auth table has password, if not create default cursor.execute('SELECT count(*) FROM auth') count = cursor.fetchone()[0] if count == 0: # Default password is "admin123" default_hash = bcrypt.hashpw("admin123".encode('utf-8'), bcrypt.gensalt()) cursor.execute('INSERT INTO auth VALUES (?, ?)', (0, default_hash.decode('utf-8'))) conn.commit() conn.close() def get_db_connection(): """Get a fresh database connection for the current thread""" Path(filepath).mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(database) return conn def get_settings_from_db(): """Get Mailcow server and API key from database""" conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT server, apikey FROM settings WHERE id = 0') data = cursor.fetchone() conn.close() if data and data[0] != 'dummy.server' and data[1] != 'DUMMY_KEY': return {'server': data[0], 'key': data[1]} return None def set_connection_info(server, apikey): """Set Mailcow connection information""" now = datetime.now().strftime("%m-%d-%Y %H:%M") conn = get_db_connection() cursor = conn.cursor() cursor.execute('UPDATE settings SET server = ?, apikey = ? WHERE id = 0', (server, apikey)) conn.commit() conn.close() return True def get_config(): """Get current configuration""" conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT server, apikey FROM settings WHERE id = 0') data = cursor.fetchone() conn.close() if data: return { 'mailcow_server': data[0] if data[0] != 'dummy.server' else '', 'mailcow_api_key': data[1] if data[1] != 'DUMMY_KEY' else '' } return {'mailcow_server': '', 'mailcow_api_key': ''} def search_aliases(query): """Search for aliases in local database""" conn = get_db_connection() cursor = conn.cursor() search_term = '%' + query + '%' cursor.execute('SELECT alias, goto FROM aliases WHERE alias LIKE ? OR goto LIKE ?', (search_term, search_term)) results = cursor.fetchall() conn.close() return results def get_number_of_aliases_on_server(): """Get number of aliases from Mailcow server""" connection = get_settings_from_db() if not connection: raise Exception("No Mailcow server configured") req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] } ) data = req.json() return len(data) def get_all_aliases(page=1, per_page=20): """Get all aliases with pagination from Mailcow server""" connection = get_settings_from_db() if not connection: raise Exception("No Mailcow server configured") req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] } ) data = req.json() # Calculate pagination total_aliases = len(data) total_pages = (total_aliases + per_page - 1) // per_page # Ceiling division start_idx = (page - 1) * per_page end_idx = start_idx + per_page # Get the page slice page_data = data[start_idx:end_idx] # Format aliases aliases = [] for alias_data in page_data: aliases.append({ 'alias': alias_data['address'], 'goto': alias_data['goto'], 'active': alias_data.get('active', '1') }) return { 'aliases': aliases, 'page': page, 'per_page': per_page, 'total': total_aliases, 'total_pages': total_pages, 'has_prev': page > 1, 'has_next': page < total_pages } def get_domains(): """Get all mail domains from Mailcow""" connection = get_settings_from_db() if not connection: raise Exception("No Mailcow server configured") req = httpx.get(f'https://{connection["server"]}/api/v1/get/domain/all', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] } ) return req.json() def update_aliases(): """Sync aliases from server to local DB""" connection = get_settings_from_db() if not connection: raise Exception("No Mailcow server configured") now = datetime.now().strftime("%m-%d-%Y %H:%M") req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] } ) data = req.json() conn = get_db_connection() cursor = conn.cursor() count_alias = 0 for alias_data in data: cursor.execute('SELECT count(*) FROM aliases WHERE alias LIKE ? AND goto LIKE ?', (alias_data['address'], alias_data['goto'])) count = cursor.fetchone()[0] if count == 0: cursor.execute('INSERT INTO aliases VALUES (?, ?, ?, ?)', (alias_data['id'], alias_data['address'], alias_data['goto'], now)) count_alias += 1 conn.commit() conn.close() return count_alias def create_alias(alias, goto): """Create a new alias""" connection = get_settings_from_db() if not connection: raise Exception("No Mailcow server configured") now = datetime.now().strftime("%m-%d-%Y %H:%M") # Check if alias exists if check_alias_exists(alias): raise Exception(f"Alias {alias} already exists") # Create on server new_data = {'address': alias, 'goto': goto, 'active': "1"} new_data_json = json.dumps(new_data) req = httpx.post(f'https://{connection["server"]}/api/v1/add/alias', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] }, data=new_data_json ) # Get the new alias ID alias_id_val = get_alias_id(alias) if alias_id_val: # Add to local DB conn = get_db_connection() cursor = conn.cursor() cursor.execute('INSERT INTO aliases VALUES (?, ?, ?, ?)', (alias_id_val, alias, goto, now)) conn.commit() conn.close() return True else: raise Exception("Failed to create alias on server") def delete_alias(alias): """Delete an alias""" connection = get_settings_from_db() if not connection: raise Exception("No Mailcow server configured") alias_id_val = get_alias_id(alias) if not alias_id_val: raise Exception(f"Alias {alias} not found") # Delete from server delete_data = json.dumps({'id': alias_id_val}) req = httpx.post(f'https://{connection["server"]}/api/v1/delete/alias', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] }, data=delete_data ) # Delete from local DB conn = get_db_connection() cursor = conn.cursor() cursor.execute('DELETE FROM aliases WHERE id = ?', (alias_id_val,)) conn.commit() conn.close() return True def create_timed_alias(username, domain): """Create a time-limited alias""" connection = get_settings_from_db() if not connection: raise Exception("No Mailcow server configured") data = {'username': username, 'domain': domain, 'description': 'malias web interface'} data_json = json.dumps(data) req = httpx.post(f'https://{connection["server"]}/api/v1/add/time_limited_alias', data=data_json, headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] } ) response = json.loads(req.text) if response and len(response) > 0: if response[0].get('type') == 'danger': raise Exception(response[0].get('msg', 'Unknown error')) return True def check_alias_exists(alias): """Check if alias exists on server or in local DB""" connection = get_settings_from_db() if not connection: return False # Check server req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] } ) data = req.json() for item in data: if alias == item['address'] or alias in item['goto']: return True # Check local DB conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT count(*) FROM aliases WHERE alias = ? OR goto = ?', (alias, alias)) count = cursor.fetchone()[0] conn.close() return count >= 1 def get_alias_id(alias): """Get alias ID from server""" connection = get_settings_from_db() if not connection: return None req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all', headers={"Content-Type": "application/json", 'X-API-Key': connection['key'] } ) data = req.json() for item in data: if item['address'] == alias: return item['id'] return None # Password management functions def verify_password(password): """Verify password against stored hash""" conn = get_db_connection() cursor = conn.cursor() cursor.execute('SELECT password_hash FROM auth WHERE id = 0') result = cursor.fetchone() conn.close() if not result: return False stored_hash = result[0].encode('utf-8') return bcrypt.checkpw(password.encode('utf-8'), stored_hash) def change_password(old_password, new_password): """Change password - requires old password for verification""" # Verify old password if not verify_password(old_password): raise Exception("Current password is incorrect") # Hash new password new_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) # Update database conn = get_db_connection() cursor = conn.cursor() cursor.execute('UPDATE auth SET password_hash = ? WHERE id = 0', (new_hash.decode('utf-8'),)) conn.commit() conn.close() return True