Initial commit
This commit is contained in:
345
oai.py
Normal file
345
oai.py
Normal file
@@ -0,0 +1,345 @@
|
||||
#!/usr/bin/python3 -W ignore::DeprecationWarning
|
||||
import os
|
||||
import requests
|
||||
from pathlib import Path
|
||||
from typing import Optional, List
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
from openrouter import OpenRouter
|
||||
import pyperclip
|
||||
import mimetypes
|
||||
import base64
|
||||
import re
|
||||
import sqlite3 # Added for SQLite DB integration
|
||||
|
||||
app = typer.Typer()
|
||||
console = Console()
|
||||
|
||||
# DB configuration - define DB file and table setup
|
||||
homefilepath = Path.home()
|
||||
filepath = homefilepath.joinpath('.config/oai')
|
||||
database = filepath.joinpath('oai_config.db')
|
||||
DB_FILE = database
|
||||
|
||||
def create_table_if_not_exists():
|
||||
"""Ensure the config table exists and directories are created."""
|
||||
os.makedirs(os.path.dirname(DB_FILE), exist_ok=True) # Create directories if needed
|
||||
with sqlite3.connect(DB_FILE) as conn:
|
||||
conn.execute('''CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL
|
||||
)''')
|
||||
conn.commit()
|
||||
|
||||
def get_config(key: str) -> Optional[str]:
|
||||
"""Fetch a config value from the DB."""
|
||||
create_table_if_not_exists() # Ensure table exists
|
||||
with sqlite3.connect(DB_FILE) as conn:
|
||||
cursor = conn.execute('SELECT value FROM config WHERE key = ?', (key,))
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else None
|
||||
|
||||
def set_config(key: str, value: str):
|
||||
"""Set or update a config value in the DB."""
|
||||
create_table_if_not_exists() # Ensure table exists
|
||||
with sqlite3.connect(DB_FILE) as conn:
|
||||
conn.execute('INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)', (key, value))
|
||||
conn.commit()
|
||||
|
||||
# Load configurations from DB on startup (or set defaults if not present)
|
||||
API_KEY = get_config('api_key')
|
||||
OPENROUTER_BASE_URL = get_config('base_url') or "https://openrouter.ai/api/v1" # Default if not set
|
||||
STREAM_ENABLED = get_config('stream_enabled') or "on" # Default to streaming on
|
||||
|
||||
# Fetch models once at module level (with loaded BASE_URL)
|
||||
models_data = []
|
||||
text_models = [] # Filtered models: allow "image" but exclude "video"
|
||||
try:
|
||||
headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}
|
||||
response = requests.get(f"{OPENROUTER_BASE_URL}/models", headers=headers)
|
||||
response.raise_for_status()
|
||||
models_data = response.json()["data"]
|
||||
# Filter: Exclude "video" models, but allow "image" for attachments
|
||||
text_models = [m for m in models_data if "modalities" not in m or "video" not in (m.get("modalities") or [])]
|
||||
except Exception as e:
|
||||
models_data = []
|
||||
text_models = []
|
||||
|
||||
@app.command()
|
||||
def chat():
|
||||
"""Start the OAI chat app with OpenRouter models."""
|
||||
global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED # Allow updates via /config commands
|
||||
|
||||
if not API_KEY:
|
||||
console.print("[bold red]API key not found in database. Please set it with '/config api'.[/]")
|
||||
# Prompt for API key on startup if missing
|
||||
try:
|
||||
new_api_key = typer.prompt("Enter your API key")
|
||||
if new_api_key.strip():
|
||||
set_config('api_key', new_api_key.strip())
|
||||
API_KEY = new_api_key.strip()
|
||||
console.print("[bold green]API key saved. Re-run the app or continue with '/model'.[/]")
|
||||
else:
|
||||
raise typer.Exit()
|
||||
except Exception:
|
||||
console.print("[bold red]No API key provided. Exiting.[/]")
|
||||
raise typer.Exit()
|
||||
|
||||
if not text_models:
|
||||
console.print("[bold red]No suitable models available or error fetching models (check API key and base URL).[/]")
|
||||
raise typer.Exit()
|
||||
|
||||
selected_model = None
|
||||
client = OpenRouter(api_key=API_KEY)
|
||||
|
||||
console.print("[bold blue]Welcome to OAI! Type your message, '/model [search]' to select/change model, '/config api/url/stream' to configure, '/help' for examples, or 'exit'/'quit' to end.[/]")
|
||||
if not selected_model:
|
||||
console.print("[bold yellow]No model selected. Use '/model' to choose one.[/]")
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = typer.prompt("You").strip()
|
||||
if user_input.lower() in ["exit", "quit", "bye"]:
|
||||
console.print("[bold yellow]Goodbye![/]")
|
||||
return
|
||||
|
||||
# Handle /model command (unchanged)
|
||||
if user_input.startswith("/model"):
|
||||
args = user_input[7:].strip() # Get everything after "/model" as search term
|
||||
search_term = args if args else ""
|
||||
filtered_models = text_models
|
||||
if search_term:
|
||||
# Substring filter (case-insensitive) on name or id
|
||||
filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()]
|
||||
if not filtered_models:
|
||||
console.print(f"[bold red]No models match '{search_term}'. Try '/model' without search.[/]")
|
||||
continue
|
||||
# Display filtered models
|
||||
table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta")
|
||||
for i, model in enumerate(filtered_models, 1):
|
||||
table.add_row(str(i), model["name"], model["id"])
|
||||
console.print(Panel(table, title=f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left"))
|
||||
# Prompt selection
|
||||
while True:
|
||||
try:
|
||||
choice = int(typer.prompt("Enter model number (or 0 to cancel)"))
|
||||
if choice == 0:
|
||||
break
|
||||
if 1 <= choice <= len(filtered_models):
|
||||
selected_model = filtered_models[choice - 1]
|
||||
console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]")
|
||||
break
|
||||
console.print("[bold red]Invalid choice. Try again.[/]")
|
||||
except ValueError:
|
||||
console.print("[bold red]Invalid input. Enter a number.[/]")
|
||||
continue
|
||||
|
||||
# Handle /config command (updated to include stream toggle)
|
||||
if user_input.startswith("/config"):
|
||||
args = user_input[8:].strip().lower() # Get args after "/config"
|
||||
if args == "api":
|
||||
try:
|
||||
new_api_key = typer.prompt("Enter new API key")
|
||||
if new_api_key.strip():
|
||||
set_config('api_key', new_api_key.strip())
|
||||
API_KEY = new_api_key.strip()
|
||||
client = OpenRouter(api_key=API_KEY) # Reinitialize client
|
||||
console.print("[bold green]API key updated and client reinitialized![/]")
|
||||
else:
|
||||
console.print("[bold yellow]No change made.[/]")
|
||||
except Exception as e:
|
||||
console.print(f"[bold red]Error updating API key: {e}[/]")
|
||||
elif args == "url":
|
||||
try:
|
||||
new_url = typer.prompt("Enter new base URL (e.g., https://openrouter.ai/api/v1)")
|
||||
if new_url.strip():
|
||||
set_config('base_url', new_url.strip())
|
||||
OPENROUTER_BASE_URL = new_url.strip()
|
||||
console.print("[bold green]Base URL updated![/]")
|
||||
else:
|
||||
console.print("[bold yellow]No change made.[/]")
|
||||
except Exception as e:
|
||||
console.print(f"[bold red]Error updating base URL: {e}[/]")
|
||||
elif args.startswith("stream"):
|
||||
sub_args = args[7:].strip() # After "stream"
|
||||
if sub_args in ["on", "off"]:
|
||||
set_config('stream_enabled', sub_args)
|
||||
STREAM_ENABLED = sub_args
|
||||
console.print(f"[bold green]Streaming {'enabled' if sub_args == 'on' else 'disabled'}.[/]")
|
||||
else:
|
||||
console.print("[bold yellow]Usage: /config stream on|off[/]")
|
||||
else:
|
||||
# /config with no args: Display current configs
|
||||
table = Table("Setting", "Value", show_header=True, header_style="bold magenta")
|
||||
table.add_row("API Key", API_KEY or "[Not set]")
|
||||
table.add_row("Base URL", OPENROUTER_BASE_URL or "[Not set]")
|
||||
table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled")
|
||||
table.add_row("Database", str(database) or "[Not set]")
|
||||
console.print(Panel(table, title="[bold green]Current Configurations[/]", title_align="left"))
|
||||
continue
|
||||
|
||||
# Handle /help command (new: display examples for each command)
|
||||
if user_input.lower() == "/help":
|
||||
help_table = Table("Command", "Description", "Example", show_header=True, header_style="bold cyan")
|
||||
help_table.add_row(
|
||||
"/model [search]",
|
||||
"Select or change the model for chatting. Supports searching by name or ID.",
|
||||
"/model gpt\nYou: 1\n(Selects first matching model)"
|
||||
)
|
||||
help_table.add_row(
|
||||
"/config api",
|
||||
"Set or update the OpenRouter API key.",
|
||||
"/config api\nEnter new API key: sk-...\n[bold green]API key updated and client reinitialized![/bold green]"
|
||||
)
|
||||
help_table.add_row(
|
||||
"/config url",
|
||||
"Set or update the base URL for OpenRouter.",
|
||||
"/config url\nEnter new base URL: https://api.example.com/v1\n[bold green]Base URL updated![/bold green]"
|
||||
)
|
||||
help_table.add_row(
|
||||
"/config stream on|off",
|
||||
"Enable or disable response streaming.",
|
||||
"/config stream off\n[bold green]Streaming disabled.[/bold green]"
|
||||
)
|
||||
help_table.add_row(
|
||||
"/config",
|
||||
"View all current configurations.",
|
||||
"/config\n(Displays table of API Key, Base URL, etc.)"
|
||||
)
|
||||
help_table.add_row(
|
||||
"/help",
|
||||
"Show this help menu with examples.",
|
||||
"/help\n(Displays this table)"
|
||||
)
|
||||
help_table.add_row(
|
||||
"Chatting with files",
|
||||
"Attach images or plain text files to messages using '@path'.",
|
||||
"Explain this @/Users/me/image.jpg\n(Attaches image.jpg if supported by the model)"
|
||||
)
|
||||
help_table.add_row(
|
||||
"Exiting",
|
||||
"Quit the chat app with either 'exit', 'quit' or 'bye'",
|
||||
"exit\n[bold yellow]Goodbye![/bold yellow]"
|
||||
)
|
||||
console.print(Panel(help_table, title="[bold cyan]OAI Chat Help - Command Examples[/]", title_align="center"))
|
||||
continue
|
||||
|
||||
if not selected_model:
|
||||
console.print("[bold yellow]Select a model first with '/model'.[/]")
|
||||
continue
|
||||
|
||||
# Process file attachments in the prompt (unchanged)
|
||||
content_blocks = []
|
||||
text_part = user_input
|
||||
file_attachments = []
|
||||
|
||||
# Regex to find @path (e.g., @/Users/user/file.jpg or @c:\folder\file.txt)
|
||||
file_pattern = r'@([^\s]+)' # @ followed by non-spaces
|
||||
for match in re.finditer(file_pattern, user_input):
|
||||
file_path = match.group(1)
|
||||
expanded_path = os.path.expanduser(os.path.abspath(file_path))
|
||||
if not os.path.exists(expanded_path) or os.path.isdir(expanded_path):
|
||||
console.print(f"[bold red]File not found or is a directory: {expanded_path}[/]")
|
||||
continue
|
||||
file_size = os.path.getsize(expanded_path)
|
||||
if file_size > 10 * 1024 * 1024: # 10MB limit
|
||||
console.print(f"[bold red]File too large (>10MB): {expanded_path}[/]")
|
||||
continue
|
||||
mime_type, _ = mimetypes.guess_type(expanded_path)
|
||||
try:
|
||||
with open(expanded_path, 'rb') as f:
|
||||
file_data = f.read()
|
||||
if mime_type and mime_type.startswith('image/'):
|
||||
if "image" not in selected_model.get("modalities", []):
|
||||
console.print("[bold red]Selected model does not support image attachments.[/]")
|
||||
continue
|
||||
b64_data = base64.b64encode(file_data).decode('utf-8')
|
||||
content_blocks.append({"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"}})
|
||||
elif mime_type == 'text/plain':
|
||||
text_content = file_data.decode('utf-8')
|
||||
content_blocks.append({"type": "text", "text": text_content})
|
||||
else:
|
||||
console.print(f"[bold red]Unsupported file type: {mime_type} for {expanded_path}. Only images and plain text supported.[/]")
|
||||
continue
|
||||
file_attachments.append(file_path)
|
||||
except Exception as e:
|
||||
console.print(f"[bold red]Error reading file {expanded_path}: {e}[/]")
|
||||
continue
|
||||
# Remove @path from text_part
|
||||
text_part = re.sub(file_pattern, '', text_part).strip()
|
||||
|
||||
# Build message content (unchanged)
|
||||
if text_part or content_blocks:
|
||||
message_content = []
|
||||
if text_part:
|
||||
message_content.append({"type": "text", "text": text_part})
|
||||
message_content.extend(content_blocks)
|
||||
else:
|
||||
console.print("[bold red]Prompt cannot be empty.[/]")
|
||||
continue
|
||||
|
||||
# Send to model with streaming (enable/disable based on config)
|
||||
is_streaming = STREAM_ENABLED == "on"
|
||||
if is_streaming:
|
||||
console.print("[bold green]Streaming response... (Press Ctrl+C to cancel)[/]")
|
||||
else:
|
||||
console.print("[bold green]Thinking...[/]", end="\r")
|
||||
|
||||
try:
|
||||
response = client.chat.send(
|
||||
model=selected_model["id"],
|
||||
messages=[{"role": "user", "content": message_content}],
|
||||
stream=is_streaming # Enable/disable streaming
|
||||
)
|
||||
except Exception as e:
|
||||
console.print(f"[bold red]Error sending request: {e}[/]")
|
||||
continue
|
||||
|
||||
if is_streaming:
|
||||
# Handle streaming response
|
||||
full_response = ""
|
||||
try:
|
||||
for chunk in response:
|
||||
# Check for mid-stream errors (per OpenRouter docs)
|
||||
if 'error' in chunk and chunk.error:
|
||||
console.print(f"\n[bold red]Stream error: {chunk.error.message}[/]")
|
||||
break
|
||||
# Print streaming content
|
||||
if chunk.choices and chunk.choices[0].delta.content:
|
||||
content_chunk = chunk.choices[0].delta.content
|
||||
print(content_chunk, end="", flush=True)
|
||||
full_response += content_chunk
|
||||
if full_response:
|
||||
console.print() # New line after streaming
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n[bold yellow]Streaming cancelled![/]")
|
||||
continue
|
||||
else:
|
||||
# Non-streaming fallback
|
||||
full_response = response.choices[0].message.content if response.choices else ""
|
||||
console.print(f"\r{' ' * 20}\r", end="") # Clear "Thinking..." overlay
|
||||
|
||||
if full_response:
|
||||
# Pretty print response panel (only if we have content)
|
||||
console.print(Panel(full_response, title="[bold green]AI Response[/]", title_align="left"))
|
||||
|
||||
# Clipboard prompt (unchanged)
|
||||
try:
|
||||
copy_choice = input("Type 'c' to copy response to clipboard, or press Enter to continue: ").strip().lower()
|
||||
if copy_choice == "c":
|
||||
pyperclip.copy(full_response)
|
||||
console.print("[bold green]Response copied to clipboard![/]")
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
pass # Handle non-interactive environments gracefully
|
||||
else:
|
||||
console.print("[bold red]No response received.[/]")
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"[bold red]Error: {e}[/]")
|
||||
console.print("[bold yellow]Try again or use '/model' to select a different model.[/]")
|
||||
|
||||
if __name__ == "__main__":
|
||||
app()
|
||||
Reference in New Issue
Block a user