Files
oai/oai.py
2025-12-04 12:12:33 +01:00

345 lines
17 KiB
Python

#!/usr/bin/python3 -W ignore::DeprecationWarning
import os
import requests
from pathlib import Path
from typing import Optional, List
import typer
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from openrouter import OpenRouter
import pyperclip
import mimetypes
import base64
import re
import sqlite3 # Added for SQLite DB integration
app = typer.Typer()
console = Console()
# DB configuration - define DB file and table setup
homefilepath = Path.home()
filepath = homefilepath.joinpath('.config/oai')
database = filepath.joinpath('oai_config.db')
DB_FILE = database
def create_table_if_not_exists():
"""Ensure the config table exists and directories are created."""
os.makedirs(os.path.dirname(DB_FILE), exist_ok=True) # Create directories if needed
with sqlite3.connect(DB_FILE) as conn:
conn.execute('''CREATE TABLE IF NOT EXISTS config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)''')
conn.commit()
def get_config(key: str) -> Optional[str]:
"""Fetch a config value from the DB."""
create_table_if_not_exists() # Ensure table exists
with sqlite3.connect(DB_FILE) as conn:
cursor = conn.execute('SELECT value FROM config WHERE key = ?', (key,))
result = cursor.fetchone()
return result[0] if result else None
def set_config(key: str, value: str):
"""Set or update a config value in the DB."""
create_table_if_not_exists() # Ensure table exists
with sqlite3.connect(DB_FILE) as conn:
conn.execute('INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)', (key, value))
conn.commit()
# Load configurations from DB on startup (or set defaults if not present)
API_KEY = get_config('api_key')
OPENROUTER_BASE_URL = get_config('base_url') or "https://openrouter.ai/api/v1" # Default if not set
STREAM_ENABLED = get_config('stream_enabled') or "on" # Default to streaming on
# Fetch models once at module level (with loaded BASE_URL)
models_data = []
text_models = [] # Filtered models: allow "image" but exclude "video"
try:
headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}
response = requests.get(f"{OPENROUTER_BASE_URL}/models", headers=headers)
response.raise_for_status()
models_data = response.json()["data"]
# Filter: Exclude "video" models, but allow "image" for attachments
text_models = [m for m in models_data if "modalities" not in m or "video" not in (m.get("modalities") or [])]
except Exception as e:
models_data = []
text_models = []
@app.command()
def chat():
"""Start the OAI chat app with OpenRouter models."""
global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED # Allow updates via /config commands
if not API_KEY:
console.print("[bold red]API key not found in database. Please set it with '/config api'.[/]")
# Prompt for API key on startup if missing
try:
new_api_key = typer.prompt("Enter your API key")
if new_api_key.strip():
set_config('api_key', new_api_key.strip())
API_KEY = new_api_key.strip()
console.print("[bold green]API key saved. Re-run the app or continue with '/model'.[/]")
else:
raise typer.Exit()
except Exception:
console.print("[bold red]No API key provided. Exiting.[/]")
raise typer.Exit()
if not text_models:
console.print("[bold red]No suitable models available or error fetching models (check API key and base URL).[/]")
raise typer.Exit()
selected_model = None
client = OpenRouter(api_key=API_KEY)
console.print("[bold blue]Welcome to OAI! Type your message, '/model [search]' to select/change model, '/config api/url/stream' to configure, '/help' for examples, or 'exit'/'quit' to end.[/]")
if not selected_model:
console.print("[bold yellow]No model selected. Use '/model' to choose one.[/]")
while True:
try:
user_input = typer.prompt("You").strip()
if user_input.lower() in ["exit", "quit", "bye"]:
console.print("[bold yellow]Goodbye![/]")
return
# Handle /model command (unchanged)
if user_input.startswith("/model"):
args = user_input[7:].strip() # Get everything after "/model" as search term
search_term = args if args else ""
filtered_models = text_models
if search_term:
# Substring filter (case-insensitive) on name or id
filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()]
if not filtered_models:
console.print(f"[bold red]No models match '{search_term}'. Try '/model' without search.[/]")
continue
# Display filtered models
table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta")
for i, model in enumerate(filtered_models, 1):
table.add_row(str(i), model["name"], model["id"])
console.print(Panel(table, title=f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left"))
# Prompt selection
while True:
try:
choice = int(typer.prompt("Enter model number (or 0 to cancel)"))
if choice == 0:
break
if 1 <= choice <= len(filtered_models):
selected_model = filtered_models[choice - 1]
console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]")
break
console.print("[bold red]Invalid choice. Try again.[/]")
except ValueError:
console.print("[bold red]Invalid input. Enter a number.[/]")
continue
# Handle /config command (updated to include stream toggle)
if user_input.startswith("/config"):
args = user_input[8:].strip().lower() # Get args after "/config"
if args == "api":
try:
new_api_key = typer.prompt("Enter new API key")
if new_api_key.strip():
set_config('api_key', new_api_key.strip())
API_KEY = new_api_key.strip()
client = OpenRouter(api_key=API_KEY) # Reinitialize client
console.print("[bold green]API key updated and client reinitialized![/]")
else:
console.print("[bold yellow]No change made.[/]")
except Exception as e:
console.print(f"[bold red]Error updating API key: {e}[/]")
elif args == "url":
try:
new_url = typer.prompt("Enter new base URL (e.g., https://openrouter.ai/api/v1)")
if new_url.strip():
set_config('base_url', new_url.strip())
OPENROUTER_BASE_URL = new_url.strip()
console.print("[bold green]Base URL updated![/]")
else:
console.print("[bold yellow]No change made.[/]")
except Exception as e:
console.print(f"[bold red]Error updating base URL: {e}[/]")
elif args.startswith("stream"):
sub_args = args[7:].strip() # After "stream"
if sub_args in ["on", "off"]:
set_config('stream_enabled', sub_args)
STREAM_ENABLED = sub_args
console.print(f"[bold green]Streaming {'enabled' if sub_args == 'on' else 'disabled'}.[/]")
else:
console.print("[bold yellow]Usage: /config stream on|off[/]")
else:
# /config with no args: Display current configs
table = Table("Setting", "Value", show_header=True, header_style="bold magenta")
table.add_row("API Key", API_KEY or "[Not set]")
table.add_row("Base URL", OPENROUTER_BASE_URL or "[Not set]")
table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled")
table.add_row("Database", str(database) or "[Not set]")
console.print(Panel(table, title="[bold green]Current Configurations[/]", title_align="left"))
continue
# Handle /help command (new: display examples for each command)
if user_input.lower() == "/help":
help_table = Table("Command", "Description", "Example", show_header=True, header_style="bold cyan")
help_table.add_row(
"/model [search]",
"Select or change the model for chatting. Supports searching by name or ID.",
"/model gpt\nYou: 1\n(Selects first matching model)"
)
help_table.add_row(
"/config api",
"Set or update the OpenRouter API key.",
"/config api\nEnter new API key: sk-...\n[bold green]API key updated and client reinitialized![/bold green]"
)
help_table.add_row(
"/config url",
"Set or update the base URL for OpenRouter.",
"/config url\nEnter new base URL: https://api.example.com/v1\n[bold green]Base URL updated![/bold green]"
)
help_table.add_row(
"/config stream on|off",
"Enable or disable response streaming.",
"/config stream off\n[bold green]Streaming disabled.[/bold green]"
)
help_table.add_row(
"/config",
"View all current configurations.",
"/config\n(Displays table of API Key, Base URL, etc.)"
)
help_table.add_row(
"/help",
"Show this help menu with examples.",
"/help\n(Displays this table)"
)
help_table.add_row(
"Chatting with files",
"Attach images or plain text files to messages using '@path'.",
"Explain this @/Users/me/image.jpg\n(Attaches image.jpg if supported by the model)"
)
help_table.add_row(
"Exiting",
"Quit the chat app with either 'exit', 'quit' or 'bye'",
"exit\n[bold yellow]Goodbye![/bold yellow]"
)
console.print(Panel(help_table, title="[bold cyan]OAI Chat Help - Command Examples[/]", title_align="center"))
continue
if not selected_model:
console.print("[bold yellow]Select a model first with '/model'.[/]")
continue
# Process file attachments in the prompt (unchanged)
content_blocks = []
text_part = user_input
file_attachments = []
# Regex to find @path (e.g., @/Users/user/file.jpg or @c:\folder\file.txt)
file_pattern = r'@([^\s]+)' # @ followed by non-spaces
for match in re.finditer(file_pattern, user_input):
file_path = match.group(1)
expanded_path = os.path.expanduser(os.path.abspath(file_path))
if not os.path.exists(expanded_path) or os.path.isdir(expanded_path):
console.print(f"[bold red]File not found or is a directory: {expanded_path}[/]")
continue
file_size = os.path.getsize(expanded_path)
if file_size > 10 * 1024 * 1024: # 10MB limit
console.print(f"[bold red]File too large (>10MB): {expanded_path}[/]")
continue
mime_type, _ = mimetypes.guess_type(expanded_path)
try:
with open(expanded_path, 'rb') as f:
file_data = f.read()
if mime_type and mime_type.startswith('image/'):
if "image" not in selected_model.get("modalities", []):
console.print("[bold red]Selected model does not support image attachments.[/]")
continue
b64_data = base64.b64encode(file_data).decode('utf-8')
content_blocks.append({"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"}})
elif mime_type == 'text/plain':
text_content = file_data.decode('utf-8')
content_blocks.append({"type": "text", "text": text_content})
else:
console.print(f"[bold red]Unsupported file type: {mime_type} for {expanded_path}. Only images and plain text supported.[/]")
continue
file_attachments.append(file_path)
except Exception as e:
console.print(f"[bold red]Error reading file {expanded_path}: {e}[/]")
continue
# Remove @path from text_part
text_part = re.sub(file_pattern, '', text_part).strip()
# Build message content (unchanged)
if text_part or content_blocks:
message_content = []
if text_part:
message_content.append({"type": "text", "text": text_part})
message_content.extend(content_blocks)
else:
console.print("[bold red]Prompt cannot be empty.[/]")
continue
# Send to model with streaming (enable/disable based on config)
is_streaming = STREAM_ENABLED == "on"
if is_streaming:
console.print("[bold green]Streaming response... (Press Ctrl+C to cancel)[/]")
else:
console.print("[bold green]Thinking...[/]", end="\r")
try:
response = client.chat.send(
model=selected_model["id"],
messages=[{"role": "user", "content": message_content}],
stream=is_streaming # Enable/disable streaming
)
except Exception as e:
console.print(f"[bold red]Error sending request: {e}[/]")
continue
if is_streaming:
# Handle streaming response
full_response = ""
try:
for chunk in response:
# Check for mid-stream errors (per OpenRouter docs)
if 'error' in chunk and chunk.error:
console.print(f"\n[bold red]Stream error: {chunk.error.message}[/]")
break
# Print streaming content
if chunk.choices and chunk.choices[0].delta.content:
content_chunk = chunk.choices[0].delta.content
print(content_chunk, end="", flush=True)
full_response += content_chunk
if full_response:
console.print() # New line after streaming
except KeyboardInterrupt:
console.print("\n[bold yellow]Streaming cancelled![/]")
continue
else:
# Non-streaming fallback
full_response = response.choices[0].message.content if response.choices else ""
console.print(f"\r{' ' * 20}\r", end="") # Clear "Thinking..." overlay
if full_response:
# Pretty print response panel (only if we have content)
console.print(Panel(full_response, title="[bold green]AI Response[/]", title_align="left"))
# Clipboard prompt (unchanged)
try:
copy_choice = input("Type 'c' to copy response to clipboard, or press Enter to continue: ").strip().lower()
if copy_choice == "c":
pyperclip.copy(full_response)
console.print("[bold green]Response copied to clipboard![/]")
except (EOFError, KeyboardInterrupt):
pass # Handle non-interactive environments gracefully
else:
console.print("[bold red]No response received.[/]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/]")
console.print("[bold yellow]Try again or use '/model' to select a different model.[/]")
if __name__ == "__main__":
app()