#!/usr/bin/python3 -W ignore::DeprecationWarning import sys import os import requests from pathlib import Path from typing import Optional, List import typer from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.text import Text from openrouter import OpenRouter import pyperclip import mimetypes import base64 import re import sqlite3 from prompt_toolkit import PromptSession app = typer.Typer() console = Console() # DB configuration - define DB file and table setup homefilepath = Path.home() filepath = homefilepath.joinpath('.config/oai') database = filepath.joinpath('oai_config.db') DB_FILE = database version = '1.0' def create_table_if_not_exists(): """Ensure the config table exists and directories are created.""" os.makedirs(os.path.dirname(DB_FILE), exist_ok=True) # Create directories if needed with sqlite3.connect(DB_FILE) as conn: conn.execute('''CREATE TABLE IF NOT EXISTS config ( key TEXT PRIMARY KEY, value TEXT NOT NULL )''') conn.commit() def get_config(key: str) -> Optional[str]: """Fetch a config value from the DB.""" create_table_if_not_exists() # Ensure table exists with sqlite3.connect(DB_FILE) as conn: cursor = conn.execute('SELECT value FROM config WHERE key = ?', (key,)) result = cursor.fetchone() return result[0] if result else None def set_config(key: str, value: str): """Set or update a config value in the DB.""" create_table_if_not_exists() # Ensure table exists with sqlite3.connect(DB_FILE) as conn: conn.execute('INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)', (key, value)) conn.commit() # Load configurations from DB on startup (or set defaults if not present) API_KEY = get_config('api_key') OPENROUTER_BASE_URL = get_config('base_url') or "https://openrouter.ai/api/v1" # Default if not set STREAM_ENABLED = get_config('stream_enabled') or "on" # Default to streaming on DEFAULT_MODEL_ID = get_config('default_model') # Load default model ID from DB MAX_TOKEN = int(get_config('max_token') or "100000") # Load max token limit, default to 100k # Fetch models once at module level (with loaded BASE_URL) models_data = [] text_models = [] # Filtered models: allow "image" but exclude "video" try: headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {} response = requests.get(f"{OPENROUTER_BASE_URL}/models", headers=headers) response.raise_for_status() models_data = response.json()["data"] # Filter: Exclude "video" models, but allow "image" for attachments text_models = [m for m in models_data if "modalities" not in m or "video" not in (m.get("modalities") or [])] # After fetching models, load default model if set and available selected_model_default = None if DEFAULT_MODEL_ID: selected_model_default = next((m for m in text_models if m["id"] == DEFAULT_MODEL_ID), None) if not selected_model_default: console.print(f"[bold yellow]Warning: Saved default model '{DEFAULT_MODEL_ID}' is not available. Use '/config model' to set a new one.[/]") except Exception as e: models_data = [] text_models = [] # **Function to fetch credit information from OpenRouter API** def get_credits(api_key: str, base_url: str = OPENROUTER_BASE_URL) -> Optional[dict]: """ Fetch credit information from OpenRouter API. Returns a dict with 'total_credits', 'used_credits', 'credits_left' or None on error. Based on OpenRouter's /credits endpoint ([openrouter.ai/docs/limits](https://openrouter.ai/docs/limits)). """ if not api_key: return None url = f"{base_url}/credits" headers = {"Authorization": f"Bearer {api_key}"} try: response = requests.get(url, headers=headers) response.raise_for_status() data = response.json().get('data', {}) total_credits = float(data.get('total_credits', 0)) total_usage = float(data.get('total_usage', 0)) credits_left = total_credits - total_usage return { 'total_credits': f"${total_credits:.2f}", 'used_credits': f"${total_usage:.2f}", 'credits_left': f"${credits_left:.2f}" } except Exception as e: # Gracefully handle errors (e.g., invalid API key, network issues) console.print(f"[bold red]Error fetching credits: {e}[/]") return None # Function to clear the screen using ANSI escape sequences (cross-platform) def clear_screen(): """Clear the terminal screen using ANSI escape codes, with fallback to newlines.""" try: # ANSI sequence to clear screen and move cursor to top-left # Cf. [usavps.com](https://usavps.com/blog/46203/) and [phoenixnap.de](https://phoenixnap.de/kb/clear-terminal) for ANSI usage print("\033[H\033[J", end="", flush=True) except Exception: # Fallback: Fill with newlines (simpler, per [medium.com](https://medium.com/@ryan_forrester_/c-screen-clearing-how-to-guide-cff5bf764ccd)) print("\n" * 100) @app.command() def chat(): """Start the oAI chat app with OpenRouter models.""" global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED, MAX_TOKEN # Declare globals for updates session_max_token = 0 if not API_KEY: console.print("[bold red]API key not found in database. Please set it with '/config api'.[/]") # Prompt for API key on startup if missing try: new_api_key = typer.prompt("Enter your API key") if new_api_key.strip(): set_config('api_key', new_api_key.strip()) API_KEY = new_api_key.strip() console.print("[bold green]API key saved. Re-run the app or continue with '/model'.[/]") else: raise typer.Exit() except Exception: console.print("[bold red]No API key provided. Exiting.[/]") raise typer.Exit() if not text_models: console.print("[bold red]No suitable models available or error fetching models (check API key and base URL).[/]") raise typer.Exit() # Initialize selected_model with default if available, else None (session-specific changes via /model) selected_model = selected_model_default # Set from DB load, or None client = OpenRouter(api_key=API_KEY) if selected_model: console.print("[bold blue]Welcome to oAI! Type your message, use '/help' for examples, or 'exit'/'quit'/'bye' to end.[/] \n[bold red]Active model : %s[/]" %(selected_model["name"])) else: console.print("[bold blue]Welcome to oAI![/] [italic blue]No active model. Use: '/model [search]' to select model.[/] [bold blue]Use '/help' for examples, or 'exit'/'quit'/'bye' to end.[/]") if not selected_model: console.print("[bold yellow]No model selected. Use '/model' to choose one.[/]") # Initialize PromptSession for input history and arrow keys session = PromptSession(history=None) # 'None' uses InMemoryHistory by default; can be customized later for disk persistence while True: try: user_input = session.prompt("You> ").strip() if user_input.lower() in ["exit", "quit", "bye"]: console.print("[bold yellow]Goodbye![/]") return # Handle /model command (unchanged - only changes current session model) if user_input.startswith("/model"): args = user_input[7:].strip() # Get everything after "/model" as search term search_term = args if args else "" filtered_models = text_models if search_term: # Substring filter (case-insensitive) on name or id filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()] if not filtered_models: console.print(f"[bold red]No models match '{search_term}'. Try '/model' without search.[/]") continue # Display filtered models table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta") for i, model in enumerate(filtered_models, 1): table.add_row(str(i), model["name"], model["id"]) console.print(Panel(table, title=f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left")) # Prompt selection while True: try: choice = int(typer.prompt("Enter model number (or 0 to cancel)")) if choice == 0: break if 1 <= choice <= len(filtered_models): selected_model = filtered_models[choice - 1] console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]") break console.print("[bold red]Invalid choice. Try again.[/]") except ValueError: console.print("[bold red]Invalid input. Enter a number.[/]") continue # Handle /maxtoken command (session limit) if user_input.startswith("/maxtoken"): args = user_input[10:].strip() if not args: console.print(f"[bold blue]Current session max tokens: {session_max_token}[/]") continue try: new_limit = int(args) if new_limit < 1: console.print("[bold red]Session token limit must be at least 1.[/]") continue if new_limit > MAX_TOKEN: console.print(f"[bold yellow]Warning: Cannot exceed stored max token limit ({MAX_TOKEN}). Capping to {MAX_TOKEN}.[/]") new_limit = MAX_TOKEN session_max_token = new_limit console.print(f"[bold green]Session max tokens set to: {session_max_token}[/]") except ValueError: console.print("[bold red]Invalid token limit. Provide a positive integer.[/]") continue # Handle /config command if user_input.startswith("/config"): args = user_input[8:].strip().lower() # Get args after "/config" if args == "api": try: new_api_key = typer.prompt("Enter new API key") if new_api_key.strip(): set_config('api_key', new_api_key.strip()) API_KEY = new_api_key.strip() client = OpenRouter(api_key=API_KEY) # Reinitialize client console.print("[bold green]API key updated and client reinitialized![/]") else: console.print("[bold yellow]No change made.[/]") except Exception as e: console.print(f"[bold red]Error updating API key: {e}[/]") elif args == "url": try: new_url = typer.prompt("Enter new base URL (e.g., https://openrouter.ai/api/v1)") if new_url.strip(): set_config('base_url', new_url.strip()) OPENROUTER_BASE_URL = new_url.strip() console.print("[bold green]Base URL updated![/]") else: console.print("[bold yellow]No change made.[/]") except Exception as e: console.print(f"[bold red]Error updating base URL: {e}[/]") elif args.startswith("stream"): sub_args = args[7:].strip() # After "stream" if sub_args in ["on", "off"]: set_config('stream_enabled', sub_args) STREAM_ENABLED = sub_args console.print(f"[bold green]Streaming {'enabled' if sub_args == 'on' else 'disabled'}.[/]") else: console.print("[bold yellow]Usage: /config stream on|off[/]") # Handle /config maxtoken (global already declared at top) elif args.startswith("maxtoken"): sub_args = args[9:].strip() if not sub_args: console.print(f"[bold blue]Stored max token limit: {MAX_TOKEN}[/]") continue try: new_max = int(sub_args) if new_max < 1: console.print("[bold red]Max token limit must be at least 1.[/]") continue if new_max > 1000000: # Reasonable upper cap to prevent typos console.print("[bold yellow]Warning: Max token limit capped at 1M for safety.[/]") new_max = 1000000 set_config('max_token', str(new_max)) MAX_TOKEN = new_max console.print(f"[bold green]Stored max token limit updated to: {MAX_TOKEN}[/]") # Adjust session if needed if session_max_token > MAX_TOKEN: session_max_token = MAX_TOKEN console.print(f"[bold yellow]Session token limit adjusted to match: {session_max_token}[/]") except ValueError: console.print("[bold red]Invalid token limit. Provide a positive integer.[/]") # Handle /config model (unchanged) elif args.startswith("model"): sub_args = args[6:].strip() # After "model" (optional search term) search_term = sub_args if sub_args else "" filtered_models = text_models if search_term: # Substring filter (case-insensitive) on name or id filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()] if not filtered_models: console.print(f"[bold red]No models match '{search_term}'. Try '/config model' without search.[/]") continue # Display filtered models table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta") for i, model in enumerate(filtered_models, 1): table.add_row(str(i), model["name"], model["id"]) console.print(Panel(table, title=f"[bold green]Available Models for Default ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left")) # Prompt selection and save as default (DOES NOT change current selected_model) while True: try: choice = int(typer.prompt("Enter model number (or 0 to cancel)")) if choice == 0: break if 1 <= choice <= len(filtered_models): default_model = filtered_models[choice - 1] set_config('default_model', default_model["id"]) # Save default model ID to DB current_name = selected_model['name'] if selected_model else "None" console.print(f"[bold cyan]Default model set to: {default_model['name']} ({default_model['id']}). Current model unchanged: {current_name}[/]") break console.print("[bold red]Invalid choice. Try again.[/]") except ValueError: console.print("[bold red]Invalid input. Enter a number.[/]") else: # Display all configs DEFAULT_MODEL_ID = get_config('default_model') # Load default model ID from DB table = Table("Setting", "Value", show_header=True, header_style="bold magenta") table.add_row("API Key", API_KEY or "[Not set]") table.add_row("Base URL", OPENROUTER_BASE_URL or "[Not set]") table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled") table.add_row("Default Model", DEFAULT_MODEL_ID or "[Not set]") table.add_row("Current Model", "[Not set]" if selected_model is None else str(selected_model["name"])) table.add_row("Max Token", str(MAX_TOKEN)) table.add_row("Session Token", "[Not set]" if session_max_token == 0 else str(session_max_token)) # Fetch and display credit info credits = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits: table.add_row("Total Credits", credits['total_credits']) table.add_row("Used Credits", credits['used_credits']) table.add_row("Credits Left", credits['credits_left']) else: table.add_row("Total Credits", "[Unavailable - Check API key]") table.add_row("Used Credits", "[Unavailable - Check API key]") table.add_row("Credits Left", "[Unavailable - Check API key]") console.print(Panel(table, title="[bold green]Current Configurations[/]", title_align="left")) continue # **Handle /credits command to display credits left** if user_input.lower() == "/credits": credits = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits: console.print(f"[bold green]Credits left: {credits['credits_left']}[/]") else: console.print("[bold red]Unable to fetch credits. Check your API key or network.[/]") continue # Handle /clear command to clear the screen if user_input.lower() == "/clear": os.system("clear" if os.name == "posix" else "cls") # Cross-platform clear DEFAULT_MODEL_ID = get_config('default_model') token_value = session_max_token if session_max_token != 0 else " Not set" console.print(f"[bold cyan]Token limits: Max= {MAX_TOKEN}, Session={token_value}[/]") console.print("[bold blue]Active model[/] [bold red]%s[/]" %(DEFAULT_MODEL_ID)) continue # Handle /help command. if user_input.lower() == "/help": help_table = Table("Command", "Description", "Example", show_header=True, header_style="bold cyan") help_table.add_row( "/model [search]", "Select or change the current model for the session. Supports searching by name or ID (not persisted).", "/model gpt\nYou: 1\n(Selects first matching model for this chat only)" ) help_table.add_row( "/maxtoken [value]", "Set temporary session token limit (≤ stored max). View limit if no value provided.", "/maxtoken 2000\n[bold green]Session max tokens set to: 2000[/bold green]" ) help_table.add_row( "/config model [search]", "Set a default model that loads on startup; displays models for selection (persisted in DB, doesn't change current session model).", "/config model gpt\n(Shows GPT models; user selects to set as default)" ) help_table.add_row( "/config maxtoken [value]", "Set stored max token limit (persisted in DB).", "/config maxtoken 50000\n[bold green]Stored max token limit updated to: 50000[/bold green]" ) help_table.add_row( "/config api", "Set or update the OpenRouter API key.", "/config api\nEnter new API key: sk-...\n[bold green]API key updated and client reinitialized![/]" ) help_table.add_row( "/config url", "Set or update the base URL for OpenRouter.", "/config url\nEnter new base URL: https://api.example.com/v1\n[bold green]Base URL updated![/]" ) help_table.add_row( "/config stream on/off", "Enable or disable response streaming.", "/config stream off\n[bold green]Streaming disabled.[/]" ) help_table.add_row( "/config", "View all current configurations, including credits and token limits.", "/config\n(Displays table with token limits & credits)" ) help_table.add_row( "/credits", "Display credits left on your OpenRouter account.", "/credits\n[bold green]Credits left: $4.23[/bold green]" ) help_table.add_row( "/clear", "Clear the terminal screen for a clean interface.", "/clear\n[bold cyan]Screen cleared. Ready for your next input![/]" ) help_table.add_row( "/help", "Show this help menu with examples.", "/help\n(Displays this table)" ) help_table.add_row( "Chatting with files", "Attach images or plain text files to messages using '@path'.", "Explain this @/Users/me/image.jpg\n(Attaches image.jpg if supported by model)" ) help_table.add_row( "Exiting", "Quit the chat app with either 'exit', 'quit' or 'bye'", "exit\n[bold yellow]Goodbye![/]" ) console.print(Panel(help_table, title="[bold cyan]oAI (Version %s) Chat Help - Command Examples[/]" %(version), title_align="center", subtitle="oAI can be found at https://iurl.no/oai", subtitle_align="center")) continue if not selected_model: console.print("[bold yellow]Select a model first with '/model'.[/]") continue # Process file attachments in the prompt (unchanged) content_blocks = [] text_part = user_input file_attachments = [] # Regex to find @path (e.g., @/Users/user/file.jpg or @c:\folder\file.txt) file_pattern = r'@([^\s]+)' # @ followed by non-spaces for match in re.finditer(file_pattern, user_input): file_path = match.group(1) expanded_path = os.path.expanduser(os.path.abspath(file_path)) if not os.path.exists(expanded_path) or os.path.isdir(expanded_path): console.print(f"[bold red]File not found or is a directory: {expanded_path}[/]") continue file_size = os.path.getsize(expanded_path) if file_size > 10 * 1024 * 1024: # 10MB limit console.print(f"[bold red]File too large (>10MB): {expanded_path}[/]") continue mime_type, _ = mimetypes.guess_type(expanded_path) try: with open(expanded_path, 'rb') as f: file_data = f.read() if mime_type and mime_type.startswith('image/'): if "image" not in selected_model.get("modalities", []): console.print("[bold red]Selected model does not support image attachments.[/]") continue b64_data = base64.b64encode(file_data).decode('utf-8') content_blocks.append({"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"}}) elif mime_type == 'text/plain': text_content = file_data.decode('utf-8') content_blocks.append({"type": "text", "text": text_content}) else: console.print(f"[bold red]Unsupported file type: {mime_type} for {expanded_path}. Only images and plain text supported.[/]") continue file_attachments.append(file_path) except Exception as e: console.print(f"[bold red]Error reading file {expanded_path}: {e}[/]") continue # Remove @path from text_part text_part = re.sub(file_pattern, '', text_part).strip() # Build message content (unchanged) if text_part or content_blocks: message_content = [] if text_part: message_content.append({"type": "text", "text": text_part}) message_content.extend(content_blocks) else: console.print("[bold red]Prompt cannot be empty.[/]") continue # Prepare API call params, including max_tokens if session limit is set api_params = { "model": selected_model["id"], "messages": [{"role": "user", "content": message_content}], "stream": STREAM_ENABLED == "on" } if session_max_token > 0: api_params["max_tokens"] = session_max_token # Send to model with streaming (enable/disable based on config) is_streaming = STREAM_ENABLED == "on" if is_streaming: console.print("[bold green]Streaming response... (Press Ctrl+C to cancel)[/]") else: console.print("[bold green]Thinking...[/]", end="\r") try: response = client.chat.send(**api_params) except Exception as e: console.print(f"[bold red]Error sending request: {e}[/]") continue if is_streaming: # Handle streaming response full_response = "" try: for chunk in response: # Check for mid-stream errors (per OpenRouter docs) if 'error' in chunk and chunk.error: console.print(f"\n[bold red]Stream error: {chunk.error.message}[/]") break # Print streaming content if chunk.choices and chunk.choices[0].delta.content: content_chunk = chunk.choices[0].delta.content print(content_chunk, end="", flush=True) full_response += content_chunk if full_response: console.print() # New line after streaming except KeyboardInterrupt: console.print("\n[bold yellow]Streaming cancelled![/]") continue else: # Non-streaming fallback full_response = response.choices[0].message.content if response.choices else "" console.print(f"\r{' ' * 20}\r", end="") # Clear "Thinking..." overlay if full_response: # Pretty print response panel (only if we have content) console.print(Panel(full_response, title="[bold green]AI Response[/]", title_align="left")) # Clipboard prompt (unchanged) try: copy_choice = input("Type 'c' to copy response to clipboard, or press Enter to continue: ").strip().lower() if copy_choice == "c": pyperclip.copy(full_response) console.print("[bold green]Response copied to clipboard![/]") except (EOFError, KeyboardInterrupt): pass # Handle non-interactive environments gracefully else: console.print("[bold red]No response received.[/]") except KeyboardInterrupt: # Handle Ctrl+C during prompt input (continue loop instead of crashing) console.print("\n[bold yellow]Input interrupted. Continuing...[/]") continue except EOFError: # Handle Ctrl+D (exit loop gracefully) console.print("\n[bold yellow]Goodbye![/]") return except Exception as e: console.print(f"[bold red]Error: {e}[/]") console.print("[bold yellow]Try again or use '/model' to select a different model.[/]") if __name__ == "__main__": clear_screen() app()