381 lines
19 KiB
Python
381 lines
19 KiB
Python
#!/usr/bin/python3 -W ignore::DeprecationWarning
|
|
import sys
|
|
import os
|
|
import requests
|
|
from pathlib import Path
|
|
from typing import Optional, List
|
|
import typer
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.table import Table
|
|
from rich.text import Text
|
|
from openrouter import OpenRouter
|
|
import pyperclip
|
|
import mimetypes
|
|
import base64
|
|
import re
|
|
import sqlite3 # Added for SQLite DB integration
|
|
from prompt_toolkit import PromptSession
|
|
|
|
app = typer.Typer()
|
|
console = Console()
|
|
|
|
# DB configuration - define DB file and table setup
|
|
homefilepath = Path.home()
|
|
filepath = homefilepath.joinpath('.config/oai')
|
|
database = filepath.joinpath('oai_config.db')
|
|
DB_FILE = database
|
|
|
|
def create_table_if_not_exists():
|
|
"""Ensure the config table exists and directories are created."""
|
|
os.makedirs(os.path.dirname(DB_FILE), exist_ok=True) # Create directories if needed
|
|
with sqlite3.connect(DB_FILE) as conn:
|
|
conn.execute('''CREATE TABLE IF NOT EXISTS config (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT NOT NULL
|
|
)''')
|
|
conn.commit()
|
|
|
|
def get_config(key: str) -> Optional[str]:
|
|
"""Fetch a config value from the DB."""
|
|
create_table_if_not_exists() # Ensure table exists
|
|
with sqlite3.connect(DB_FILE) as conn:
|
|
cursor = conn.execute('SELECT value FROM config WHERE key = ?', (key,))
|
|
result = cursor.fetchone()
|
|
return result[0] if result else None
|
|
|
|
def set_config(key: str, value: str):
|
|
"""Set or update a config value in the DB."""
|
|
create_table_if_not_exists() # Ensure table exists
|
|
with sqlite3.connect(DB_FILE) as conn:
|
|
conn.execute('INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)', (key, value))
|
|
conn.commit()
|
|
|
|
# Load configurations from DB on startup (or set defaults if not present)
|
|
API_KEY = get_config('api_key')
|
|
OPENROUTER_BASE_URL = get_config('base_url') or "https://openrouter.ai/api/v1" # Default if not set
|
|
STREAM_ENABLED = get_config('stream_enabled') or "on" # Default to streaming on
|
|
|
|
# Fetch models once at module level (with loaded BASE_URL)
|
|
models_data = []
|
|
text_models = [] # Filtered models: allow "image" but exclude "video"
|
|
try:
|
|
headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}
|
|
response = requests.get(f"{OPENROUTER_BASE_URL}/models", headers=headers)
|
|
response.raise_for_status()
|
|
models_data = response.json()["data"]
|
|
# Filter: Exclude "video" models, but allow "image" for attachments
|
|
text_models = [m for m in models_data if "modalities" not in m or "video" not in (m.get("modalities") or [])]
|
|
except Exception as e:
|
|
models_data = []
|
|
text_models = []
|
|
|
|
# Function to clear the screen using ANSI escape sequences (cross-platform)
|
|
def clear_screen():
|
|
"""Clear the terminal screen using ANSI escape codes, with fallback to newlines."""
|
|
try:
|
|
# ANSI sequence to clear screen and move cursor to top-left
|
|
# Cf. [usavps.com](https://usavps.com/blog/46203/) and [phoenixnap.de](https://phoenixnap.de/kb/clear-terminal) for ANSI usage
|
|
print("\033[H\033[J", end="", flush=True)
|
|
except Exception:
|
|
# Fallback: Fill with newlines (simpler, per [medium.com](https://medium.com/@ryan_forrester_/c-screen-clearing-how-to-guide-cff5bf764ccd))
|
|
print("\n" * 100)
|
|
|
|
@app.command()
|
|
def chat():
|
|
"""Start the oAI chat app with OpenRouter models."""
|
|
global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED # Allow updates via /config commands
|
|
|
|
if not API_KEY:
|
|
console.print("[bold red]API key not found in database. Please set it with '/config api'.[/]")
|
|
# Prompt for API key on startup if missing
|
|
try:
|
|
new_api_key = typer.prompt("Enter your API key")
|
|
if new_api_key.strip():
|
|
set_config('api_key', new_api_key.strip())
|
|
API_KEY = new_api_key.strip()
|
|
console.print("[bold green]API key saved. Re-run the app or continue with '/model'.[/]")
|
|
else:
|
|
raise typer.Exit()
|
|
except Exception:
|
|
console.print("[bold red]No API key provided. Exiting.[/]")
|
|
raise typer.Exit()
|
|
|
|
if not text_models:
|
|
console.print("[bold red]No suitable models available or error fetching models (check API key and base URL).[/]")
|
|
raise typer.Exit()
|
|
|
|
selected_model = None
|
|
client = OpenRouter(api_key=API_KEY)
|
|
|
|
console.print("[bold blue]Welcome to oAI! Type your message, '/model [search]' to select/change model, '/config api/url/stream' to configure, '/help' for examples, or 'exit'/'quit' to end.[/]")
|
|
if not selected_model:
|
|
console.print("[bold yellow]No model selected. Use '/model' to choose one.[/]")
|
|
|
|
# Initialize PromptSession for input history and arrow keys
|
|
session = PromptSession(history=None) # 'None' uses InMemoryHistory by default; can be customized later for disk persistence
|
|
|
|
while True:
|
|
try:
|
|
user_input = session.prompt("You> ").strip()
|
|
if user_input.lower() in ["exit", "quit", "bye"]:
|
|
console.print("[bold yellow]Goodbye![/]")
|
|
return
|
|
|
|
# Handle /model command (unchanged)
|
|
if user_input.startswith("/model"):
|
|
args = user_input[7:].strip() # Get everything after "/model" as search term
|
|
search_term = args if args else ""
|
|
filtered_models = text_models
|
|
if search_term:
|
|
# Substring filter (case-insensitive) on name or id
|
|
filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()]
|
|
if not filtered_models:
|
|
console.print(f"[bold red]No models match '{search_term}'. Try '/model' without search.[/]")
|
|
continue
|
|
# Display filtered models
|
|
table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta")
|
|
for i, model in enumerate(filtered_models, 1):
|
|
table.add_row(str(i), model["name"], model["id"])
|
|
console.print(Panel(table, title=f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left"))
|
|
# Prompt selection
|
|
while True:
|
|
try:
|
|
choice = int(typer.prompt("Enter model number (or 0 to cancel)"))
|
|
if choice == 0:
|
|
break
|
|
if 1 <= choice <= len(filtered_models):
|
|
selected_model = filtered_models[choice - 1]
|
|
console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]")
|
|
break
|
|
console.print("[bold red]Invalid choice. Try again.[/]")
|
|
except ValueError:
|
|
console.print("[bold red]Invalid input. Enter a number.[/]")
|
|
continue
|
|
|
|
# Handle /config command (updated to include stream toggle)
|
|
if user_input.startswith("/config"):
|
|
args = user_input[8:].strip().lower() # Get args after "/config"
|
|
if args == "api":
|
|
try:
|
|
new_api_key = typer.prompt("Enter new API key")
|
|
if new_api_key.strip():
|
|
set_config('api_key', new_api_key.strip())
|
|
API_KEY = new_api_key.strip()
|
|
client = OpenRouter(api_key=API_KEY) # Reinitialize client
|
|
console.print("[bold green]API key updated and client reinitialized![/]")
|
|
else:
|
|
console.print("[bold yellow]No change made.[/]")
|
|
except Exception as e:
|
|
console.print(f"[bold red]Error updating API key: {e}[/]")
|
|
elif args == "url":
|
|
try:
|
|
new_url = typer.prompt("Enter new base URL (e.g., https://openrouter.ai/api/v1)")
|
|
if new_url.strip():
|
|
set_config('base_url', new_url.strip())
|
|
OPENROUTER_BASE_URL = new_url.strip()
|
|
console.print("[bold green]Base URL updated![/]")
|
|
else:
|
|
console.print("[bold yellow]No change made.[/]")
|
|
except Exception as e:
|
|
console.print(f"[bold red]Error updating base URL: {e}[/]")
|
|
elif args.startswith("stream"):
|
|
sub_args = args[7:].strip() # After "stream"
|
|
if sub_args in ["on", "off"]:
|
|
set_config('stream_enabled', sub_args)
|
|
STREAM_ENABLED = sub_args
|
|
console.print(f"[bold green]Streaming {'enabled' if sub_args == 'on' else 'disabled'}.[/]")
|
|
else:
|
|
console.print("[bold yellow]Usage: /config stream on|off[/]")
|
|
else:
|
|
# /config with no args: Display current configs
|
|
table = Table("Setting", "Value", show_header=True, header_style="bold magenta")
|
|
table.add_row("API Key", API_KEY or "[Not set]")
|
|
table.add_row("Base URL", OPENROUTER_BASE_URL or "[Not set]")
|
|
table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled")
|
|
table.add_row("Database", str(database) or "[Not set]")
|
|
table.add_row("Current Model", "[Not set]" if selected_model is None else str(selected_model["name"]))
|
|
console.print(Panel(table, title="[bold green]Current Configurations[/]", title_align="left"))
|
|
continue
|
|
|
|
# Handle /clear command to clear the screen
|
|
if user_input.lower() == "/clear":
|
|
clear_screen()
|
|
console.print("[bold cyan]Screen cleared. Ready for your next input![/]")
|
|
continue
|
|
|
|
# Handle /help command (updated: now includes /clear)
|
|
if user_input.lower() == "/help":
|
|
help_table = Table("Command", "Description", "Example", show_header=True, header_style="bold cyan")
|
|
help_table.add_row(
|
|
"/model [search]",
|
|
"Select or change the model for chatting. Supports searching by name or ID.",
|
|
"/model gpt\nYou: 1\n(Selects first matching model)"
|
|
)
|
|
help_table.add_row(
|
|
"/config api",
|
|
"Set or update the OpenRouter API key.",
|
|
"/config api\nEnter new API key: sk-...\n[bold green]API key updated and client reinitialized![/bold green]"
|
|
)
|
|
help_table.add_row(
|
|
"/config url",
|
|
"Set or update the base URL for OpenRouter.",
|
|
"/config url\nEnter new base URL: https://api.example.com/v1\n[bold green]Base URL updated![/bold green]"
|
|
)
|
|
help_table.add_row(
|
|
"/config stream on|off",
|
|
"Enable or disable response streaming.",
|
|
"/config stream off\n[bold green]Streaming disabled.[/bold green]"
|
|
)
|
|
help_table.add_row(
|
|
"/config",
|
|
"View all current configurations.",
|
|
"/config\n(Displays table of API Key, Base URL, etc.)"
|
|
)
|
|
help_table.add_row(
|
|
"/clear",
|
|
"Clear the terminal screen for a clean interface.",
|
|
"/clear\n[bold cyan]Screen cleared. Ready for your next input![/bold cyan]"
|
|
)
|
|
help_table.add_row(
|
|
"/help",
|
|
"Show this help menu with examples.",
|
|
"/help\n(Displays this table)"
|
|
)
|
|
help_table.add_row(
|
|
"Chatting with files",
|
|
"Attach images or plain text files to messages using '@path'.",
|
|
"Explain this @/Users/me/image.jpg\n(Attaches image.jpg if supported by model)"
|
|
)
|
|
help_table.add_row(
|
|
"Exiting",
|
|
"Quit the chat app with either 'exit', 'quit' or 'bye'",
|
|
"exit\n[bold yellow]Goodbye![/bold yellow]"
|
|
)
|
|
console.print(Panel(help_table, title="[bold cyan]oAI Chat Help - Command Examples[/]", title_align="center"))
|
|
continue
|
|
|
|
if not selected_model:
|
|
console.print("[bold yellow]Select a model first with '/model'.[/]")
|
|
continue
|
|
|
|
# Process file attachments in the prompt (unchanged)
|
|
content_blocks = []
|
|
text_part = user_input
|
|
file_attachments = []
|
|
|
|
# Regex to find @path (e.g., @/Users/user/file.jpg or @c:\folder\file.txt)
|
|
file_pattern = r'@([^\s]+)' # @ followed by non-spaces
|
|
for match in re.finditer(file_pattern, user_input):
|
|
file_path = match.group(1)
|
|
expanded_path = os.path.expanduser(os.path.abspath(file_path))
|
|
if not os.path.exists(expanded_path) or os.path.isdir(expanded_path):
|
|
console.print(f"[bold red]File not found or is a directory: {expanded_path}[/]")
|
|
continue
|
|
file_size = os.path.getsize(expanded_path)
|
|
if file_size > 10 * 1024 * 1024: # 10MB limit
|
|
console.print(f"[bold red]File too large (>10MB): {expanded_path}[/]")
|
|
continue
|
|
mime_type, _ = mimetypes.guess_type(expanded_path)
|
|
try:
|
|
with open(expanded_path, 'rb') as f:
|
|
file_data = f.read()
|
|
if mime_type and mime_type.startswith('image/'):
|
|
if "image" not in selected_model.get("modalities", []):
|
|
console.print("[bold red]Selected model does not support image attachments.[/]")
|
|
continue
|
|
b64_data = base64.b64encode(file_data).decode('utf-8')
|
|
content_blocks.append({"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"}})
|
|
elif mime_type == 'text/plain':
|
|
text_content = file_data.decode('utf-8')
|
|
content_blocks.append({"type": "text", "text": text_content})
|
|
else:
|
|
console.print(f"[bold red]Unsupported file type: {mime_type} for {expanded_path}. Only images and plain text supported.[/]")
|
|
continue
|
|
file_attachments.append(file_path)
|
|
except Exception as e:
|
|
console.print(f"[bold red]Error reading file {expanded_path}: {e}[/]")
|
|
continue
|
|
# Remove @path from text_part
|
|
text_part = re.sub(file_pattern, '', text_part).strip()
|
|
|
|
# Build message content (unchanged)
|
|
if text_part or content_blocks:
|
|
message_content = []
|
|
if text_part:
|
|
message_content.append({"type": "text", "text": text_part})
|
|
message_content.extend(content_blocks)
|
|
else:
|
|
console.print("[bold red]Prompt cannot be empty.[/]")
|
|
continue
|
|
|
|
# Send to model with streaming (enable/disable based on config)
|
|
is_streaming = STREAM_ENABLED == "on"
|
|
if is_streaming:
|
|
console.print("[bold green]Streaming response... (Press Ctrl+C to cancel)[/]")
|
|
else:
|
|
console.print("[bold green]Thinking...[/]", end="\r")
|
|
|
|
try:
|
|
response = client.chat.send(
|
|
model=selected_model["id"],
|
|
messages=[{"role": "user", "content": message_content}],
|
|
stream=is_streaming # Enable/disable streaming
|
|
)
|
|
except Exception as e:
|
|
console.print(f"[bold red]Error sending request: {e}[/]")
|
|
continue
|
|
|
|
if is_streaming:
|
|
# Handle streaming response
|
|
full_response = ""
|
|
try:
|
|
for chunk in response:
|
|
# Check for mid-stream errors (per OpenRouter docs)
|
|
if 'error' in chunk and chunk.error:
|
|
console.print(f"\n[bold red]Stream error: {chunk.error.message}[/]")
|
|
break
|
|
# Print streaming content
|
|
if chunk.choices and chunk.choices[0].delta.content:
|
|
content_chunk = chunk.choices[0].delta.content
|
|
print(content_chunk, end="", flush=True)
|
|
full_response += content_chunk
|
|
if full_response:
|
|
console.print() # New line after streaming
|
|
except KeyboardInterrupt:
|
|
console.print("\n[bold yellow]Streaming cancelled![/]")
|
|
continue
|
|
else:
|
|
# Non-streaming fallback
|
|
full_response = response.choices[0].message.content if response.choices else ""
|
|
console.print(f"\r{' ' * 20}\r", end="") # Clear "Thinking..." overlay
|
|
|
|
if full_response:
|
|
# Pretty print response panel (only if we have content)
|
|
console.print(Panel(full_response, title="[bold green]AI Response[/]", title_align="left"))
|
|
|
|
# Clipboard prompt (unchanged)
|
|
try:
|
|
copy_choice = input("Type 'c' to copy response to clipboard, or press Enter to continue: ").strip().lower()
|
|
if copy_choice == "c":
|
|
pyperclip.copy(full_response)
|
|
console.print("[bold green]Response copied to clipboard![/]")
|
|
except (EOFError, KeyboardInterrupt):
|
|
pass # Handle non-interactive environments gracefully
|
|
else:
|
|
console.print("[bold red]No response received.[/]")
|
|
|
|
except KeyboardInterrupt:
|
|
# Handle Ctrl+C during prompt input (continue loop instead of crashing)
|
|
console.print("\n[bold yellow]Input interrupted. Continuing...[/]")
|
|
continue
|
|
except EOFError:
|
|
# Handle Ctrl+D (exit loop gracefully)
|
|
console.print("\n[bold yellow]Goodbye![/]")
|
|
return
|
|
except Exception as e:
|
|
console.print(f"[bold red]Error: {e}[/]")
|
|
console.print("[bold yellow]Try again or use '/model' to select a different model.[/]")
|
|
|
|
if __name__ == "__main__":
|
|
app() |