421 lines
13 KiB
Python
421 lines
13 KiB
Python
"""
|
|
Wrapper functions for malias to handle SQLite threading issues in Flask
|
|
"""
|
|
import sqlite3
|
|
from pathlib import Path
|
|
import httpx
|
|
import json
|
|
from datetime import datetime
|
|
import os
|
|
import bcrypt
|
|
|
|
# Use project subfolder for database
|
|
project_dir = Path(__file__).parent
|
|
filepath = project_dir.joinpath('data')
|
|
database = filepath.joinpath('malias2.db')
|
|
logfile = filepath.joinpath('malias2.log')
|
|
|
|
def init_database():
|
|
"""Initialize database schema if it doesn't exist"""
|
|
Path(filepath).mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(database)
|
|
cursor = conn.cursor()
|
|
|
|
# Create tables
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS settings (
|
|
id INTEGER NOT NULL PRIMARY KEY,
|
|
first_run INTEGER,
|
|
server TEXT,
|
|
apikey TEXT NOT NULL,
|
|
data_copy INTEGER
|
|
)''')
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS aliases
|
|
(id integer NOT NULL PRIMARY KEY,
|
|
alias text NOT NULL,
|
|
goto text NOT NULL,
|
|
created text NOT NULL)''')
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS timedaliases
|
|
(id integer NOT NULL PRIMARY KEY,
|
|
alias text NOT NULL,
|
|
goto text NOT NULL,
|
|
validity text NOT NULL)''')
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS dbversion
|
|
(version integer NOT NULL DEFAULT 0)''')
|
|
cursor.execute('''CREATE TABLE IF NOT EXISTS auth
|
|
(id integer NOT NULL PRIMARY KEY,
|
|
password_hash text NOT NULL)''')
|
|
|
|
# Check if settings exist, if not create default
|
|
cursor.execute('SELECT count(*) FROM settings')
|
|
count = cursor.fetchone()[0]
|
|
if count == 0:
|
|
cursor.execute('INSERT INTO settings VALUES (?, ?, ?, ?, ?)',
|
|
(0, 1, 'dummy.server', 'DUMMY_KEY', 0))
|
|
|
|
# Check if auth table has password, if not create default
|
|
cursor.execute('SELECT count(*) FROM auth')
|
|
count = cursor.fetchone()[0]
|
|
if count == 0:
|
|
# Default password is "admin123"
|
|
default_hash = bcrypt.hashpw("admin123".encode('utf-8'), bcrypt.gensalt())
|
|
cursor.execute('INSERT INTO auth VALUES (?, ?)', (0, default_hash.decode('utf-8')))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_db_connection():
|
|
"""Get a fresh database connection for the current thread"""
|
|
Path(filepath).mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(database)
|
|
return conn
|
|
|
|
def get_settings_from_db():
|
|
"""Get Mailcow server and API key from database"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT server, apikey FROM settings WHERE id = 0')
|
|
data = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if data and data[0] != 'dummy.server' and data[1] != 'DUMMY_KEY':
|
|
return {'server': data[0], 'key': data[1]}
|
|
return None
|
|
|
|
def set_connection_info(server, apikey):
|
|
"""Set Mailcow connection information"""
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE settings SET server = ?, apikey = ? WHERE id = 0', (server, apikey))
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
|
|
def get_config():
|
|
"""Get current configuration"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT server, apikey FROM settings WHERE id = 0')
|
|
data = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if data:
|
|
return {
|
|
'mailcow_server': data[0] if data[0] != 'dummy.server' else '',
|
|
'mailcow_api_key': data[1] if data[1] != 'DUMMY_KEY' else ''
|
|
}
|
|
return {'mailcow_server': '', 'mailcow_api_key': ''}
|
|
|
|
def search_aliases(query):
|
|
"""Search for aliases in local database (matches from start of alias address only)"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
search_term = query + '%' # Match from start only
|
|
cursor.execute('SELECT alias, goto FROM aliases WHERE alias LIKE ?',
|
|
(search_term,))
|
|
results = cursor.fetchall()
|
|
conn.close()
|
|
return results
|
|
|
|
def get_number_of_aliases_on_server():
|
|
"""Get number of aliases from Mailcow server"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
raise Exception("No Mailcow server configured")
|
|
|
|
req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
}
|
|
)
|
|
data = req.json()
|
|
return len(data)
|
|
|
|
def get_all_aliases(page=1, per_page=20):
|
|
"""Get all aliases with pagination from Mailcow server"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
raise Exception("No Mailcow server configured")
|
|
|
|
req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
}
|
|
)
|
|
data = req.json()
|
|
|
|
# Calculate pagination
|
|
total_aliases = len(data)
|
|
total_pages = (total_aliases + per_page - 1) // per_page # Ceiling division
|
|
start_idx = (page - 1) * per_page
|
|
end_idx = start_idx + per_page
|
|
|
|
# Get the page slice
|
|
page_data = data[start_idx:end_idx]
|
|
|
|
# Format aliases
|
|
aliases = []
|
|
for alias_data in page_data:
|
|
aliases.append({
|
|
'alias': alias_data['address'],
|
|
'goto': alias_data['goto'],
|
|
'active': alias_data.get('active', '1')
|
|
})
|
|
|
|
return {
|
|
'aliases': aliases,
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total': total_aliases,
|
|
'total_pages': total_pages,
|
|
'has_prev': page > 1,
|
|
'has_next': page < total_pages
|
|
}
|
|
|
|
def get_domains():
|
|
"""Get all mail domains from Mailcow"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
raise Exception("No Mailcow server configured")
|
|
|
|
req = httpx.get(f'https://{connection["server"]}/api/v1/get/domain/all',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
}
|
|
)
|
|
return req.json()
|
|
|
|
def update_aliases():
|
|
"""Sync aliases from server to local DB"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
raise Exception("No Mailcow server configured")
|
|
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
}
|
|
)
|
|
data = req.json()
|
|
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
|
|
count_alias = 0
|
|
for alias_data in data:
|
|
cursor.execute('SELECT count(*) FROM aliases WHERE alias LIKE ? AND goto LIKE ?',
|
|
(alias_data['address'], alias_data['goto']))
|
|
count = cursor.fetchone()[0]
|
|
if count == 0:
|
|
cursor.execute('INSERT INTO aliases VALUES (?, ?, ?, ?)',
|
|
(alias_data['id'], alias_data['address'], alias_data['goto'], now))
|
|
count_alias += 1
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
return count_alias
|
|
|
|
def create_alias(alias, goto):
|
|
"""Create a new alias"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
raise Exception("No Mailcow server configured")
|
|
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
|
|
# Check if alias exists
|
|
if check_alias_exists(alias):
|
|
raise Exception(f"Alias {alias} already exists")
|
|
|
|
# Create on server
|
|
new_data = {'address': alias, 'goto': goto, 'active': "1"}
|
|
new_data_json = json.dumps(new_data)
|
|
|
|
req = httpx.post(f'https://{connection["server"]}/api/v1/add/alias',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
},
|
|
data=new_data_json
|
|
)
|
|
|
|
# Get the new alias ID
|
|
alias_id_val = get_alias_id(alias)
|
|
|
|
if alias_id_val:
|
|
# Add to local DB
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('INSERT INTO aliases VALUES (?, ?, ?, ?)',
|
|
(alias_id_val, alias, goto, now))
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
else:
|
|
raise Exception("Failed to create alias on server")
|
|
|
|
def delete_alias(alias):
|
|
"""Delete an alias"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
raise Exception("No Mailcow server configured")
|
|
|
|
alias_id_val = get_alias_id(alias)
|
|
if not alias_id_val:
|
|
raise Exception(f"Alias {alias} not found")
|
|
|
|
# Delete from server
|
|
delete_data = json.dumps({'id': alias_id_val})
|
|
req = httpx.post(f'https://{connection["server"]}/api/v1/delete/alias',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
},
|
|
data=delete_data
|
|
)
|
|
|
|
# Delete from local DB
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('DELETE FROM aliases WHERE id = ?', (alias_id_val,))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return True
|
|
|
|
def create_timed_alias(username, domain):
|
|
"""Create a time-limited alias"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
raise Exception("No Mailcow server configured")
|
|
|
|
data = {'username': username, 'domain': domain, 'description': 'malias web interface'}
|
|
data_json = json.dumps(data)
|
|
|
|
req = httpx.post(f'https://{connection["server"]}/api/v1/add/time_limited_alias',
|
|
data=data_json,
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
}
|
|
)
|
|
|
|
response = json.loads(req.text)
|
|
|
|
if response and len(response) > 0:
|
|
if response[0].get('type') == 'danger':
|
|
raise Exception(response[0].get('msg', 'Unknown error'))
|
|
|
|
return True
|
|
|
|
def check_alias_exists(alias):
|
|
"""Check if alias exists on server or in local DB"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
return False
|
|
|
|
# Check server
|
|
req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
}
|
|
)
|
|
data = req.json()
|
|
|
|
for item in data:
|
|
if alias == item['address'] or alias in item['goto']:
|
|
return True
|
|
|
|
# Check local DB
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT count(*) FROM aliases WHERE alias = ? OR goto = ?', (alias, alias))
|
|
count = cursor.fetchone()[0]
|
|
conn.close()
|
|
|
|
return count >= 1
|
|
|
|
def get_alias_id(alias):
|
|
"""Get alias ID from server"""
|
|
connection = get_settings_from_db()
|
|
if not connection:
|
|
return None
|
|
|
|
req = httpx.get(f'https://{connection["server"]}/api/v1/get/alias/all',
|
|
headers={"Content-Type": "application/json",
|
|
'X-API-Key': connection['key']
|
|
}
|
|
)
|
|
data = req.json()
|
|
|
|
for item in data:
|
|
if item['address'] == alias:
|
|
return item['id']
|
|
|
|
return None
|
|
|
|
# Password management functions
|
|
def verify_password(password):
|
|
"""Verify password against stored hash"""
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT password_hash FROM auth WHERE id = 0')
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if not result:
|
|
return False
|
|
|
|
stored_hash = result[0].encode('utf-8')
|
|
return bcrypt.checkpw(password.encode('utf-8'), stored_hash)
|
|
|
|
def change_password(old_password, new_password):
|
|
"""Change password - requires old password for verification"""
|
|
# Verify old password
|
|
if not verify_password(old_password):
|
|
raise Exception("Current password is incorrect")
|
|
|
|
# Hash new password
|
|
new_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
|
|
|
|
# Update database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE auth SET password_hash = ? WHERE id = 0', (new_hash.decode('utf-8'),))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return True
|
|
|
|
def delete_multiple_aliases(alias_list):
|
|
"""
|
|
Delete multiple aliases in one operation
|
|
|
|
Args:
|
|
alias_list: List of alias email addresses to delete
|
|
|
|
Returns:
|
|
Dictionary with:
|
|
- success_count: Number of successfully deleted aliases
|
|
- failed_count: Number of failed deletions
|
|
- failed_aliases: List of dicts with 'alias' and 'error' for each failure
|
|
"""
|
|
success_count = 0
|
|
failed_count = 0
|
|
failed_aliases = []
|
|
|
|
for alias in alias_list:
|
|
try:
|
|
delete_alias(alias)
|
|
success_count += 1
|
|
except Exception as e:
|
|
failed_count += 1
|
|
failed_aliases.append({
|
|
'alias': alias,
|
|
'error': str(e)
|
|
})
|
|
|
|
return {
|
|
'success_count': success_count,
|
|
'failed_count': failed_count,
|
|
'failed_aliases': failed_aliases
|
|
}
|