418 lines
14 KiB
Python
418 lines
14 KiB
Python
#!/usr/bin/python3
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from sqlite3 import Error
|
|
import urllib.request
|
|
import json
|
|
import logging
|
|
import argparse
|
|
import requests
|
|
import os
|
|
import time
|
|
import sys
|
|
from types import SimpleNamespace
|
|
from datetime import datetime
|
|
from string import ascii_letters, digits
|
|
from rich import print
|
|
from argparse import RawTextHelpFormatter
|
|
#from urllib import Request, urlopen
|
|
from urllib.request import urlopen
|
|
|
|
# Info pages for dev
|
|
# https://mailcow.docs.apiary.io/#reference/aliases/get-aliases/get-aliases
|
|
# https://demo.mailcow.email/api/#/Aliases
|
|
|
|
|
|
homefilepath = Path.home()
|
|
filepath = homefilepath.joinpath('.config/malias')
|
|
database = filepath.joinpath('malias.db')
|
|
logfile = filepath.joinpath('malias.log')
|
|
Path(filepath).mkdir(parents=True, exist_ok=True)
|
|
logging.basicConfig(filename=logfile,level=logging.INFO,format='%(message)s')
|
|
app_version = '0.2.1'
|
|
|
|
|
|
def connect_database():
|
|
Path(filepath).mkdir(parents=True, exist_ok=True)
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(database)
|
|
except Error as e:
|
|
logging.error(time.strftime("%Y-%m-%d %H:%M") + ' - Error : ' + str(e))
|
|
print(e)
|
|
finally:
|
|
if conn:
|
|
c = conn.cursor()
|
|
c.execute('''CREATE TABLE IF NOT EXISTS apikey
|
|
(id integer NOT NULL PRIMARY KEY,
|
|
api text NOT NULL)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS settings (
|
|
id INTEGER NOT NULL PRIMARY KEY,
|
|
first_run INTEGER,
|
|
server TEXT,
|
|
data_copy INTEGER
|
|
)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS aliases
|
|
(id integer NOT NULL PRIMARY KEY,
|
|
alias text NOT NULL,
|
|
goto text NOT NULL,
|
|
created text NOT NULL)''')
|
|
|
|
conn.commit()
|
|
first_run(conn)
|
|
return conn
|
|
|
|
|
|
|
|
def first_run(conn):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT count(*) FROM settings')
|
|
count = cursor.fetchone()[0]
|
|
if count == 0:
|
|
logging.error(now + ' - First run!')
|
|
cursor.execute('INSERT INTO settings values(?,?,?,?)', (1, 1, 'dummy.server',0))
|
|
cursor.execute('INSERT INTO apikey values(?,?)', (1, 'DUMMY_KEY'))
|
|
conn.commit()
|
|
return None
|
|
|
|
|
|
|
|
def get_settings(kind):
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT * FROM settings')
|
|
data = cursor.fetchall()
|
|
first_run_status = data[0][1]
|
|
mail_server = data[0][2]
|
|
copy_status = data[0][3]
|
|
if kind == 'mail_server':
|
|
if mail_server == 'dummy.server':
|
|
print('Error: No mailcow server active. Please add one with [b]malias -m [i]your.server[/i][/b]')
|
|
exit(0)
|
|
else:
|
|
return mail_server
|
|
if kind == 'first_run_status':
|
|
return first_run_status
|
|
if kind == 'copy_status':
|
|
return copy_status
|
|
|
|
|
|
|
|
def get_api():
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT api FROM apikey')
|
|
apikey = cursor.fetchone()[0]
|
|
if apikey == 'DUMMY_KEY':
|
|
print('Missing API key. Please add with [b]malias -k [i]YOUR-API-KEY[/i][/b]')
|
|
exit(0)
|
|
else:
|
|
return apikey
|
|
|
|
|
|
def set_mailserver(server):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE settings SET server = ? WHERE id = 1',(server,))
|
|
logging.info(now + ' - Info : mailcow server updated')
|
|
print('Your mail server has been updated.')
|
|
conn.commit()
|
|
|
|
|
|
|
|
def apikey(key):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE apikey SET api = ? WHERE id = 1',(key,))
|
|
logging.info(now + ' - Info : API key updated')
|
|
print('Your API key has been updated.')
|
|
conn.commit()
|
|
|
|
|
|
def create(alias,to_address):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
server = get_settings('mail_server')
|
|
apikey = get_api()
|
|
if checklist(alias) == True:
|
|
logging.error(now + ' - Error : alias %s exists on the mailcow instance %s ' %(alias,server))
|
|
print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server))
|
|
exit(0)
|
|
else:
|
|
data = {'address': alias,'goto': to_address,'active': "1"}
|
|
headers = {'X-API-Key': apikey, "Content-Type": "application/json"}
|
|
response = requests.post('https://'+server+'/api/v1/add/alias',
|
|
data=json.dumps(data), headers=headers)
|
|
mail_id = alias_id(alias)
|
|
if mail_id == None:
|
|
logging.error(now + ' - Error : alias %s not created on the mailcow instance %s ' %(alias,server))
|
|
print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server))
|
|
else:
|
|
cursor = conn.cursor()
|
|
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (mail_id, alias,to_address,now))
|
|
conn.commit()
|
|
logging.info(now + ' - Info : alias %s created for %s on the mailcow instance %s ' %(alias,to_address,server))
|
|
print('\n[b]Info[/b] : alias %s created for %s on the mailcow instance %s \n' %(alias,to_address,server))
|
|
|
|
|
|
|
|
def delete_alias(alias):
|
|
server = get_settings('mail_server')
|
|
apikey = get_api()
|
|
if checklist(alias) == True:
|
|
the_alias_id = alias_id(alias)
|
|
data = {'id': the_alias_id}
|
|
headers = {'X-API-Key': apikey, "Content-Type": "application/json"}
|
|
response = requests.post('https://'+server+'/api/v1/delete/alias',
|
|
data=json.dumps(data), headers=headers)
|
|
response_data = response.json()
|
|
if response_data[0]['type'] == 'success':
|
|
if check_local_db(the_alias_id) == 1:
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
cursor.execute('DELETE from aliases where id = ?',(the_alias_id,))
|
|
logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s and Local DB' %(alias,server))
|
|
conn.commit()
|
|
print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s and local DB' %(alias,server))
|
|
else:
|
|
logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s.' %(alias,server))
|
|
print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s.' %(alias,server))
|
|
else:
|
|
logging.error(now + ' - Error : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data))
|
|
conn.commit()
|
|
print('\n[b]Error[/b] : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data))
|
|
else:
|
|
print('\n[b]Error[/b] : The alias %s not found')
|
|
|
|
|
|
|
|
def copy_data():
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
if get_settings('copy_status') == 0:
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
i = 0
|
|
for data in remoteData:
|
|
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (remoteData[i]['id'], remoteData[i]['address'],remoteData[i]['goto'],now))
|
|
i=i+1
|
|
|
|
cursor.execute('UPDATE settings SET data_copy = ? WHERE id = 1',(1,))
|
|
conn.commit()
|
|
logging.info(now + ' - Info : aliases imported from the mailcow instance %s to local DB' %(mail_server))
|
|
print('\n[b]Info[/b] : aliases imported from the mailcow instance %s to local DB\n' %(mail_server))
|
|
else:
|
|
print('\n[b]Info[/b] : aliases alreday imported from the mailcow instance %s to local DB\n' %(mail_server))
|
|
|
|
|
|
def checklist(alias):
|
|
alias_exist = None
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
i = 0
|
|
for search in remoteData:
|
|
if alias in remoteData[i]['address'] or alias in remoteData[i]['goto']:
|
|
alias_exist = True
|
|
i=i+1
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', (alias,alias,))
|
|
count = cursor.fetchone()[0]
|
|
if count >= 1 :
|
|
alias_exist = True
|
|
|
|
return alias_exist
|
|
|
|
|
|
def list_alias():
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
i = 0
|
|
print('\n[b]malias[/b] - All aliases on %s' %(mail_server))
|
|
print('===================================================')
|
|
for search in remoteData:
|
|
the_alias = remoteData[i]['address'].ljust(20,' ')
|
|
print(the_alias + '\tgoes to\t\t' + remoteData[i]['goto'])
|
|
i=i+1
|
|
|
|
|
|
|
|
def alias_id(alias):
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
i = 0
|
|
for search in remoteData:
|
|
if remoteData[i]['address'] == alias:
|
|
return remoteData[i]['id']
|
|
i=i+1
|
|
return None
|
|
|
|
|
|
def number_of_aliases_in_db():
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT count(*) FROM aliases')
|
|
count = cursor.fetchone()[0]
|
|
return count
|
|
|
|
|
|
|
|
def number_of_aliases_on_server():
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
return len(remoteData)
|
|
|
|
|
|
def check_local_db(alias_id):
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT count(*) FROM aliases where id = ?',(alias_id,))
|
|
count = cursor.fetchone()[0]
|
|
return count
|
|
|
|
|
|
|
|
|
|
def search(alias):
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT data_copy FROM settings where id = 1')
|
|
result = cursor.fetchone()[0]
|
|
if result == 1:
|
|
search_term = '%'+alias+'%'
|
|
cursor.execute('SELECT * from aliases where alias like ? or goto like ?',(search_term,search_term,))
|
|
remoteData = cursor.fetchall()
|
|
i = 0
|
|
print('\nAliases on %s that contains %s' %(mail_server,alias))
|
|
print('=================================================================')
|
|
for search in remoteData:
|
|
the_alias = remoteData[i][1].ljust(20,' ')
|
|
print(the_alias + '\tgoes to\t\t' + remoteData[i][2])
|
|
i=i+1
|
|
print('\n\nData from local DB')
|
|
else:
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
i = 0
|
|
print('\nAliases on %s that contains %s' %(mail_server,alias))
|
|
print('=================================================')
|
|
for search in remoteData:
|
|
if alias in remoteData[i]['address'] or alias in remoteData[i]['goto']:
|
|
print(remoteData[i]['address'] + '\tgoes to\t\t' + remoteData[i]['goto'])
|
|
i=i+1
|
|
print('\n\nData from server')
|
|
|
|
|
|
|
|
def show_current_info():
|
|
API = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
|
|
if API == 'DUMMY_KEY':
|
|
API = 'Missing API Key!'
|
|
|
|
if mail_server == 'dummy.server':
|
|
mail_server = 'Missing address to mailcow instance!'
|
|
|
|
aliases_server = number_of_aliases_on_server()
|
|
alias_db = number_of_aliases_in_db()
|
|
|
|
print('\n[b]malias[/b] - Manage aliases on mailcow Instance.')
|
|
print('===================================================')
|
|
print('API key : [b]%s[/b]' % (API))
|
|
print('mailcow Instance : [b]%s[/b]' % (mail_server))
|
|
print('Logfile : [b]%s[/b]' % (logfile))
|
|
print('Aliases on server : [b]%s[/b]' % (aliases_server))
|
|
print('Aliases in DB : [b]%s[/b]' % (alias_db))
|
|
print('')
|
|
print('App version : [b]%s[/b] (https://gitlab.pm/rune/malias)' % (app_version))
|
|
print('')
|
|
|
|
|
|
conn = connect_database()
|
|
|
|
|
|
parser = argparse.ArgumentParser(prog='malias',
|
|
description='This is a simple application to help you create and delete aliases on a mailcow instance.\nIf you find any issues or would like to submit a PR - please head over to https://gitlab.pm/rune/malias. \n\nI hope this makes your mailcow life a bit easier!',
|
|
formatter_class=RawTextHelpFormatter,
|
|
epilog='Making mailcow easier...')
|
|
|
|
parser.add_argument('-k', '--api', help='Add/Change API key.\n\n',
|
|
nargs=1, metavar=('APIkey'), required=False, action="append")
|
|
|
|
parser.add_argument('-s', '--search', help='Search for alias.\n\n',
|
|
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
|
|
|
|
parser.add_argument('-m', '--server', help='Add/Uppdate mailcow instance.\n\n',
|
|
nargs=1, metavar=('mailcow-server.tld'), required=False, action="append")
|
|
|
|
|
|
parser.add_argument('-a', '--add', help='Add new alias.\n\n',
|
|
nargs=2, metavar=('alias@domain.com', 'to@domain.com'), required=False, action="append")
|
|
|
|
parser.add_argument('-d', '--delete', help='Delete alias.\n\n',
|
|
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
|
|
|
|
|
|
parser.add_argument('-v', '--version', help='Show current version and config info\n\n',
|
|
required=False, action='store_true')
|
|
|
|
parser.add_argument('-c', '--copy', help='Copy alias data from mailcow server to local DB.\n\n',
|
|
required=False, action='store_true')
|
|
|
|
parser.add_argument('-l', '--list', help='List all aliases on the Mailcow instance.\n\n',
|
|
required=False, action='store_true')
|
|
|
|
|
|
args = vars(parser.parse_args())
|
|
|
|
if args['api']:
|
|
apikey(args['api'][0][0])
|
|
elif args['search']:
|
|
search(args['search'][0][0])
|
|
elif args['server']:
|
|
set_mailserver(args['server'][0][0])
|
|
elif args['add']:
|
|
create(args['add'][0][0],args['add'][0][1])
|
|
elif args['delete']:
|
|
delete_alias(args['delete'][0][0])
|
|
elif args['version']:
|
|
show_current_info()
|
|
elif args['copy']:
|
|
copy_data()
|
|
elif args['list']:
|
|
list_alias()
|
|
|
|
|
|
else:
|
|
print('\n\nEh, sorry! I need something more to help you! If you write [b]malias -h[/b] I\'ll show a help screen to get you going!!!\n\n\n') |