#!/usr/bin/python3 import sqlite3 from pathlib import Path from sqlite3 import Error import urllib.request import json import logging import argparse import requests import os import time import sys from types import SimpleNamespace from datetime import datetime from string import ascii_letters, digits from rich import print from argparse import RawTextHelpFormatter # Info pages for dev # https://mailcow.docs.apiary.io/#reference/aliases/get-aliases/get-aliases # https://demo.mailcow.email/api/#/Aliases homefilepath = Path.home() filepath = homefilepath.joinpath('.config/malias') database = filepath.joinpath('malias.db') logfile = filepath.joinpath('malias.log') Path(filepath).mkdir(parents=True, exist_ok=True) logging.basicConfig(filename=logfile,level=logging.INFO,format='%(message)s') app_version = '0.1' def connect_database(): Path(filepath).mkdir(parents=True, exist_ok=True) conn = None try: conn = sqlite3.connect(database) except Error as e: logging.error(time.strftime("%Y-%m-%d %H:%M") + ' - Error : ' + str(e)) print(e) finally: if conn: c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS apikey (id integer NOT NULL PRIMARY KEY, api text NOT NULL)''') c.execute('''CREATE TABLE IF NOT EXISTS aliases (id integer NOT NULL PRIMARY KEY, alias text NOT NULL, created text NOT NULL)''') return conn def get_api(): cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM apikey') count = cursor.fetchone()[0] if count == 0: return None else: cursor.execute('SELECT * FROM apikey') rows = cursor.fetchone() return rows[1] def apikey(key): cursor = conn.cursor() cursor.execute('SELECT COUNT(*) FROM apikey') count = cursor.fetchone()[0] if count == 0: cursor.execute('INSERT INTO apikey values(?,?)', (1, key)) logging.info(time.strftime("%Y-%m-%d %H:%M") + ' - Info : API key added') print('Your API key has been added.') else: cursor.execute('UPDATE apikey SET api = ? WHERE id = 1',(key,)) logging.info(time.strftime("%Y-%m-%d %H:%M") + ' - Info : API key updated') print('Your API key has been updated.') conn.commit() def create(alias): print('Create : '+ alias) def delete(alias): print('Delete : ' + alias) def checklist(alias): apikey = get_api() req = urllib.request.Request('https://rune.pm/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) i = 0 for search in remoteData: if remoteData[i]['address'] == alias: return True i=i+1 return None def search(alias): print('Search for : ' + alias) result = checklist(alias) print(result) conn = connect_database() # updatedb() parser = argparse.ArgumentParser(prog='malias', description='Application descript', formatter_class=RawTextHelpFormatter, epilog='Making Mailcow easier...') parser.add_argument('-k', '--api', help='Add/Change API key.\n\n', nargs=1, metavar=('APIkey'), required=False, action="append") parser.add_argument('-s', '--search', help='Search for alias.\n\n', nargs=1, metavar=('alias@domain.com'), required=False, action="append") args = vars(parser.parse_args()) if len(args)<1: parser.print_help(sys.stderr) sys.exit(1) if args['api']: apikey(args['api'][0][0]) elif args['search']: search(args['search'][0][0]) else: print('Error ')