271 lines
7.4 KiB
Python
271 lines
7.4 KiB
Python
#!/usr/bin/python3
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from sqlite3 import Error
|
|
import urllib.request
|
|
import json
|
|
import logging
|
|
import argparse
|
|
import requests
|
|
import os
|
|
import time
|
|
import sys
|
|
from types import SimpleNamespace
|
|
from datetime import datetime
|
|
from string import ascii_letters, digits
|
|
from rich import print
|
|
from argparse import RawTextHelpFormatter
|
|
|
|
# Info pages for dev
|
|
# https://mailcow.docs.apiary.io/#reference/aliases/get-aliases/get-aliases
|
|
# https://demo.mailcow.email/api/#/Aliases
|
|
|
|
|
|
homefilepath = Path.home()
|
|
filepath = homefilepath.joinpath('.config/malias')
|
|
database = filepath.joinpath('malias.db')
|
|
logfile = filepath.joinpath('malias.log')
|
|
Path(filepath).mkdir(parents=True, exist_ok=True)
|
|
logging.basicConfig(filename=logfile,level=logging.INFO,format='%(message)s')
|
|
app_version = '0.1'
|
|
|
|
|
|
def connect_database():
|
|
Path(filepath).mkdir(parents=True, exist_ok=True)
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(database)
|
|
except Error as e:
|
|
logging.error(time.strftime("%Y-%m-%d %H:%M") + ' - Error : ' + str(e))
|
|
print(e)
|
|
finally:
|
|
if conn:
|
|
c = conn.cursor()
|
|
c.execute('''CREATE TABLE IF NOT EXISTS apikey
|
|
(id integer NOT NULL PRIMARY KEY,
|
|
api text NOT NULL)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS settings (
|
|
id INTEGER NOT NULL PRIMARY KEY,
|
|
first_run INTEGER DEFAULT 0,
|
|
server TEXT DEFAULT "TEST"
|
|
)''')
|
|
c.execute('''CREATE TABLE IF NOT EXISTS aliases
|
|
(id integer NOT NULL PRIMARY KEY,
|
|
alias text NOT NULL,
|
|
goto text NOT NULL,
|
|
created text NOT NULL)''')
|
|
|
|
conn.commit()
|
|
first_run(conn)
|
|
return conn
|
|
|
|
|
|
def first_run(conn):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT count(*) FROM settings')
|
|
count = cursor.fetchone()[0]
|
|
if count == 0:
|
|
logging.error(now + ' - First run!')
|
|
cursor.execute('INSERT INTO settings values(?,?,?)', (1, 1, 'dummy.server'))
|
|
cursor.execute('INSERT INTO apikey values(?,?)', (1, 'DUMMY_KEY'))
|
|
conn.commit()
|
|
return None
|
|
|
|
|
|
|
|
def get_settings(kind):
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT * FROM settings')
|
|
data = cursor.fetchall()
|
|
first_run_status = data[0][1]
|
|
mail_server = data[0][2]
|
|
if kind == 'mail_server':
|
|
if mail_server == 'dummy.server':
|
|
print('Error: No MailCow server active. Please add one with [b]malias -m [i]your.server[/i][/b]')
|
|
exit(0)
|
|
else:
|
|
return mail_server
|
|
if kind == 'first_run_status':
|
|
return first_run_status
|
|
|
|
|
|
|
|
def get_api():
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT api FROM apikey')
|
|
apikey = cursor.fetchone()[0]
|
|
if apikey == 'DUMMY_KEY':
|
|
print('Missing API key. Please add with [b]malias -k [i]YOUR-API-KEY[/i][/b]')
|
|
exit(0)
|
|
else:
|
|
return apikey
|
|
|
|
|
|
def set_mailserver(server):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE settings SET server = ? WHERE id = 1',(server,))
|
|
logging.info(now + ' - Info : MailCow server updated')
|
|
print('Your mail server has been updated.')
|
|
conn.commit()
|
|
|
|
|
|
|
|
def apikey(key):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
cursor = conn.cursor()
|
|
cursor.execute('UPDATE apikey SET api = ? WHERE id = 1',(key,))
|
|
logging.info(now + ' - Info : API key updated')
|
|
print('Your API key has been updated.')
|
|
conn.commit()
|
|
|
|
|
|
def create(alias,to_address):
|
|
now = datetime.now().strftime("%m-%d-%Y %H:%M")
|
|
server = get_settings('mail_server')
|
|
apikey = get_api()
|
|
if checklist(alias) == True:
|
|
logging.error(now + ' - Error : alias %s exists on the MailCow instance %s ' %(alias,server))
|
|
print('\n[b]Error[/b] : alias %s exists on the MailCow instance %s \n' %(alias,server))
|
|
exit(0)
|
|
else:
|
|
values = """
|
|
{
|
|
"address": alias,
|
|
"goto": to_address,
|
|
"active": "1"
|
|
}
|
|
"""
|
|
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'X-API-Key': apikey
|
|
}
|
|
request = Request('https://rune.pm/api/v1/add/alias', data=values, headers=headers)
|
|
|
|
response_body = urlopen(request).read()
|
|
print (response_body)
|
|
|
|
|
|
|
|
def delete(alias):
|
|
print('Delete : ' + alias)
|
|
|
|
|
|
def checklist(alias):
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
i = 0
|
|
for search in remoteData:
|
|
if remoteData[i]['address'] == alias:
|
|
return True
|
|
i=i+1
|
|
return None
|
|
|
|
|
|
def number_of_aliases_on_server():
|
|
apikey = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
|
|
req.add_header('Content-Type', 'application/json')
|
|
req.add_header('X-API-Key', apikey)
|
|
current = urllib.request.urlopen(req)
|
|
remote = current.read().decode('utf-8')
|
|
remoteData = json.loads(remote)
|
|
i = 0
|
|
for count in remoteData:
|
|
i=i+1
|
|
|
|
return i
|
|
|
|
def number_of_aliases_in_db():
|
|
cursor = conn.cursor()
|
|
cursor.execute('SELECT count(*) FROM aliases')
|
|
count = cursor.fetchone()[0]
|
|
return count
|
|
|
|
|
|
def search(alias):
|
|
results = checklist(alias)
|
|
mail_server = get_settings('mail_server')
|
|
if results == True:
|
|
print('\n\nThe mail address %s exists. Using the MailCow instance : %s\n\n'%(alias,mail_server))
|
|
else:
|
|
print('\n\nThe mail address %s [b]does not[/b] exists. Using the MailCow instance : %s\n\n'%(alias,mail_server))
|
|
|
|
|
|
|
|
def show_current_info():
|
|
API = get_api()
|
|
mail_server = get_settings('mail_server')
|
|
|
|
if API == 'DUMMY_KEY':
|
|
API = 'Missing API Key!'
|
|
|
|
if mail_server == 'dummy.server':
|
|
mail_server = 'Missing address to MailCow instance!'
|
|
|
|
aliases_server = number_of_aliases_on_server()
|
|
alias_db = number_of_aliases_in_db()
|
|
|
|
print('\n[b]malias[/b] - Manage aliases on MailCow Instance.')
|
|
print('===================================================')
|
|
print('API key : [b]%s[/b]' % (API))
|
|
print('MailCow Instance : [b]%s[/b]' % (mail_server))
|
|
print('Logfile : [b]%s[/b]' % (logfile))
|
|
print('Aliases on server : [b]%s[/b]' % (aliases_server))
|
|
print('Aliases in DB : [b]%s[/b]' % (alias_db))
|
|
print('')
|
|
print('App version : [b]%s[/b] (https://gitlab.pm/rune/malias)' % (app_version))
|
|
print('')
|
|
|
|
|
|
conn = connect_database()
|
|
|
|
|
|
parser = argparse.ArgumentParser(prog='malias',
|
|
description='Application description',
|
|
formatter_class=RawTextHelpFormatter,
|
|
epilog='Making MailCow easier...')
|
|
|
|
parser.add_argument('-k', '--api', help='Add/Change API key.\n\n',
|
|
nargs=1, metavar=('APIkey'), required=False, action="append")
|
|
|
|
parser.add_argument('-s', '--search', help='Search for alias.\n\n',
|
|
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
|
|
|
|
parser.add_argument('-m', '--server', help='Add/Uppdate MailCow instance.\n\n',
|
|
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
|
|
|
|
|
|
parser.add_argument('-a', '--add', help='Add new alias.\n\n',
|
|
nargs=2, metavar=('alias@domain.com', 'to@domain.com'), required=False, action="append")
|
|
|
|
parser.add_argument('-v', '--version', help='Show current version and config info\n\n',
|
|
required=False, action='store_true')
|
|
|
|
args = vars(parser.parse_args())
|
|
|
|
if args['api']:
|
|
apikey(args['api'][0][0])
|
|
elif args['search']:
|
|
search(args['search'][0][0])
|
|
elif args['server']:
|
|
set_mailserver(args['server'][0][0])
|
|
elif args['add']:
|
|
create(args['add'][0][0],args['add'][0][1])
|
|
elif args['version']:
|
|
show_current_info()
|
|
|
|
|
|
else:
|
|
# get_settings(first_run_status)
|
|
get_api()
|
|
print('\n\nError use [b]malias -h[/b] to see the help screen!\n\n\n') |