#!/usr/bin/python3 -W ignore::DeprecationWarning import sys import os import requests import time # For response time tracking from pathlib import Path from typing import Optional, List, Dict, Any import typer from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.text import Text from rich.markdown import Markdown from rich.live import Live from openrouter import OpenRouter import pyperclip import mimetypes import base64 import re import sqlite3 import json import datetime import logging # Added missing import for logging from prompt_toolkit import PromptSession from prompt_toolkit.history import FileHistory from rich.logging import RichHandler from prompt_toolkit.auto_suggest import AutoSuggestFromHistory app = typer.Typer() # Application identification for OpenRouter APP_NAME = "oAI" APP_URL = "https://iurl.no/oai" # Paths home = Path.home() config_dir = home / '.config' / 'oai' history_file = config_dir / 'history.txt' # Persistent input history file database = config_dir / 'oai_config.db' log_file = config_dir / 'oai.log' # Create dirs if needed config_dir.mkdir(parents=True, exist_ok=True) # Rich console for chat UI (separate from logging) console = Console() # Supported code file extensions SUPPORTED_CODE_EXTENSIONS = { '.py', '.js', '.ts', '.cs', '.java', '.c', '.cpp', '.h', '.hpp', '.rb', '.ruby', '.php', '.swift', '.kt', '.kts', '.go', '.sh', '.bat', '.ps1', '.R', '.scala', '.pl', '.lua', '.dart', '.elm', '.xml', '.json', '.yaml', '.yml', '.md', '.txt' } # Session metrics constants (per 1M tokens, in USD; adjustable) MODEL_PRICING = { 'input': 3.0, # $3/M input tokens (adjustable) 'output': 15.0 # $15/M output tokens (adjustable) } LOW_CREDIT_RATIO = 0.1 # Warn if credits left < 10% of total LOW_CREDIT_AMOUNT = 1.0 # Warn if credits left < $1 in absolute terms HIGH_COST_WARNING = "cost_warning_threshold" # Configurable key for cost threshold, default $0.01 # Setup Rich-powered logging to file as per [rich.readthedocs.io](https://rich.readthedocs.io/en/latest/logging.html) log_console = Console(file=open(log_file, 'a'), width=120) # Append to log file, wider for tracebacks handler = RichHandler( console=log_console, level=logging.INFO, rich_tracebacks=True, tracebacks_suppress=['requests', 'openrouter', 'urllib3', 'httpx', 'openai'] # Suppress irrelevant tracebacks for cleaner logs ) logging.basicConfig( level=logging.NOTSET, # Let handler control what gets written format="%(message)s", # Rich formats it datefmt="[%X]", handlers=[handler] ) app_logger = logging.getLogger("oai_app") app_logger.setLevel(logging.INFO) # DB configuration database = config_dir / 'oai_config.db' DB_FILE = str(database) version = '1.7' def create_table_if_not_exists(): """Ensure the config and conversation_sessions tables exist.""" os.makedirs(config_dir, exist_ok=True) with sqlite3.connect(DB_FILE) as conn: conn.execute('''CREATE TABLE IF NOT EXISTS config ( key TEXT PRIMARY KEY, value TEXT NOT NULL )''') conn.execute('''CREATE TABLE IF NOT EXISTS conversation_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, timestamp TEXT NOT NULL, data TEXT NOT NULL -- JSON of session_history )''') conn.commit() def get_config(key: str) -> Optional[str]: create_table_if_not_exists() with sqlite3.connect(DB_FILE) as conn: cursor = conn.execute('SELECT value FROM config WHERE key = ?', (key,)) result = cursor.fetchone() return result[0] if result else None def set_config(key: str, value: str): create_table_if_not_exists() with sqlite3.connect(DB_FILE) as conn: conn.execute('INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)', (key, value)) conn.commit() def save_conversation(name: str, data: List[Dict[str, str]]): """Save conversation history to DB.""" timestamp = datetime.datetime.now().isoformat() data_json = json.dumps(data) with sqlite3.connect(DB_FILE) as conn: conn.execute('INSERT INTO conversation_sessions (name, timestamp, data) VALUES (?, ?, ?)', (name, timestamp, data_json)) conn.commit() def load_conversation(name: str) -> Optional[List[Dict[str, str]]]: """Load conversation history from DB (latest by timestamp).""" with sqlite3.connect(DB_FILE) as conn: cursor = conn.execute('SELECT data FROM conversation_sessions WHERE name = ? ORDER BY timestamp DESC LIMIT 1', (name,)) result = cursor.fetchone() if result: return json.loads(result[0]) return None def delete_conversation(name: str) -> int: """Delete all conversation sessions with the given name. Returns number of deleted rows.""" with sqlite3.connect(DB_FILE) as conn: cursor = conn.execute('DELETE FROM conversation_sessions WHERE name = ?', (name,)) conn.commit() return cursor.rowcount def list_conversations() -> List[Dict[str, Any]]: """List all saved conversations from DB with metadata.""" with sqlite3.connect(DB_FILE) as conn: cursor = conn.execute(''' SELECT name, MAX(timestamp) as last_saved, data FROM conversation_sessions GROUP BY name ORDER BY last_saved DESC ''') conversations = [] for row in cursor.fetchall(): name, timestamp, data_json = row data = json.loads(data_json) conversations.append({ 'name': name, 'timestamp': timestamp, 'message_count': len(data) }) return conversations def estimate_cost(input_tokens: int, output_tokens: int) -> float: """Estimate cost in USD based on token counts.""" return (input_tokens * MODEL_PRICING['input'] / 1_000_000) + (output_tokens * MODEL_PRICING['output'] / 1_000_000) def export_as_markdown(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str: """Export conversation history as Markdown.""" lines = ["# Conversation Export", ""] if session_system_prompt: lines.extend([f"**System Prompt:** {session_system_prompt}", ""]) lines.append(f"**Export Date:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("") lines.append("---") lines.append("") for i, entry in enumerate(session_history, 1): lines.append(f"## Message {i}") lines.append("") lines.append("**User:**") lines.append("") lines.append(entry['prompt']) lines.append("") lines.append("**Assistant:**") lines.append("") lines.append(entry['response']) lines.append("") lines.append("---") lines.append("") return "\n".join(lines) def export_as_json(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str: """Export conversation history as JSON.""" export_data = { "export_date": datetime.datetime.now().isoformat(), "system_prompt": session_system_prompt, "message_count": len(session_history), "messages": session_history } return json.dumps(export_data, indent=2, ensure_ascii=False) def export_as_html(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str: """Export conversation history as HTML.""" # Escape HTML special characters def escape_html(text): return text.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') html_parts = [ "", "", "", " ", " ", " Conversation Export", " ", "", "", "
", "

💬 Conversation Export

", f"
📅 Exported: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
", f"
📊 Total Messages: {len(session_history)}
", "
", ] if session_system_prompt: html_parts.extend([ "
", " ⚙️ System Prompt", f"
{escape_html(session_system_prompt)}
", "
", ]) for i, entry in enumerate(session_history, 1): html_parts.extend([ "
", f"
Message {i} of {len(session_history)}
", "
", "
👤 User
", f"
{escape_html(entry['prompt'])}
", "
", "
", "
🤖 Assistant
", f"
{escape_html(entry['response'])}
", "
", "
", ]) html_parts.extend([ "
", "

Generated by oAI Chat • https://iurl.no/oai

", "
", "", "", ]) return "\n".join(html_parts) # Load configs API_KEY = get_config('api_key') OPENROUTER_BASE_URL = get_config('base_url') or "https://openrouter.ai/api/v1" STREAM_ENABLED = get_config('stream_enabled') or "on" DEFAULT_MODEL_ID = get_config('default_model') MAX_TOKEN = int(get_config('max_token') or "100000") COST_WARNING_THRESHOLD = float(get_config(HIGH_COST_WARNING) or "0.01") # Configurable cost threshold for alerts # Fetch models with app identification headers models_data = [] text_models = [] try: headers = { "Authorization": f"Bearer {API_KEY}", "HTTP-Referer": APP_URL, "X-Title": APP_NAME } if API_KEY else { "HTTP-Referer": APP_URL, "X-Title": APP_NAME } response = requests.get(f"{OPENROUTER_BASE_URL}/models", headers=headers) response.raise_for_status() models_data = response.json()["data"] text_models = [m for m in models_data if "modalities" not in m or "video" not in (m.get("modalities") or [])] selected_model_default = None if DEFAULT_MODEL_ID: selected_model_default = next((m for m in text_models if m["id"] == DEFAULT_MODEL_ID), None) if not selected_model_default: console.print(f"[bold yellow]Warning: Default model '{DEFAULT_MODEL_ID}' unavailable. Use '/config model'.[/]") except Exception as e: models_data = [] text_models = [] app_logger.error(f"Failed to fetch models: {e}") def get_credits(api_key: str, base_url: str = OPENROUTER_BASE_URL) -> Optional[Dict[str, str]]: if not api_key: return None url = f"{base_url}/credits" headers = { "Authorization": f"Bearer {api_key}", "HTTP-Referer": APP_URL, "X-Title": APP_NAME } try: response = requests.get(url, headers=headers) response.raise_for_status() data = response.json().get('data', {}) total_credits = float(data.get('total_credits', 0)) total_usage = float(data.get('total_usage', 0)) credits_left = total_credits - total_usage return { 'total_credits': f"${total_credits:.2f}", 'used_credits': f"${total_usage:.2f}", 'credits_left': f"${credits_left:.2f}" } except Exception as e: console.print(f"[bold red]Error fetching credits: {e}[/]") return None def check_credit_alerts(credits_data: Optional[Dict[str, str]]) -> List[str]: """Check and return list of credit-related alerts.""" alerts = [] if credits_data: credits_left_value = float(credits_data['credits_left'].strip('$')) total_credits_value = float(credits_data['total_credits'].strip('$')) if credits_left_value < LOW_CREDIT_AMOUNT: alerts.append(f"Critical credit alert: Less than ${LOW_CREDIT_AMOUNT:.2f} left ({credits_data['credits_left']})") elif credits_left_value < total_credits_value * LOW_CREDIT_RATIO: alerts.append(f"Low credit alert: Credits left < 10% of total ({credits_data['credits_left']})") return alerts def clear_screen(): try: print("\033[H\033[J", end="", flush=True) except: print("\n" * 100) @app.command() def chat(): global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED, MAX_TOKEN, COST_WARNING_THRESHOLD session_max_token = 0 session_system_prompt = "" session_history = [] current_index = -1 total_input_tokens = 0 total_output_tokens = 0 total_cost = 0.0 message_count = 0 middle_out_enabled = False # Session-level middle-out transform flag conversation_memory_enabled = True # Memory ON by default memory_start_index = 0 # Track when memory was last enabled saved_conversations_cache = [] # Cache for /list results to use with /load by number app_logger.info("Starting new chat session with memory enabled") # Log session start if not API_KEY: console.print("[bold red]API key not found. Use '/config api'.[/]") try: new_api_key = typer.prompt("Enter API key") if new_api_key.strip(): set_config('api_key', new_api_key.strip()) API_KEY = new_api_key.strip() console.print("[bold green]API key saved. Re-run.[/]") else: raise typer.Exit() except: console.print("[bold red]No API key. Exiting.[/]") raise typer.Exit() if not text_models: console.print("[bold red]No models available. Check API key/URL.[/]") raise typer.Exit() # Check for credit alerts at startup credits_data = get_credits(API_KEY, OPENROUTER_BASE_URL) startup_credit_alerts = check_credit_alerts(credits_data) if startup_credit_alerts: startup_alert_msg = " | ".join(startup_credit_alerts) console.print(f"[bold red]⚠️ Startup {startup_alert_msg}[/]") app_logger.warning(f"Startup credit alerts: {startup_alert_msg}") selected_model = selected_model_default # Initialize OpenRouter client client = OpenRouter(api_key=API_KEY) if selected_model: console.print(f"[bold blue]Welcome to oAI![/] [bold red]Active model: {selected_model['name']}[/]") else: console.print("[bold blue]Welcome to oAI![/] [italic blue]Select a model with '/model'.[/]") if not selected_model: console.print("[bold yellow]No model selected. Use '/model'.[/]") # Persistent input history session = PromptSession(history=FileHistory(str(history_file))) while True: try: user_input = session.prompt("You> ", auto_suggest=AutoSuggestFromHistory()).strip() if user_input.lower() in ["exit", "quit", "bye"]: total_tokens = total_input_tokens + total_output_tokens app_logger.info(f"Session ended. Total messages: {message_count}, Total tokens: {total_tokens}, Total cost: ${total_cost:.4f}") # Log session summary console.print("[bold yellow]Goodbye![/]") return # Commands with logging if user_input.lower() == "/retry": if not session_history: console.print("[bold red]No history to retry.[/]") app_logger.warning("Retry attempted with no history") continue last_prompt = session_history[-1]['prompt'] console.print("[bold green]Retrying last prompt...[/]") app_logger.info(f"Retrying prompt: {last_prompt[:100]}...") user_input = last_prompt elif user_input.lower().startswith("/memory"): args = user_input[8:].strip() if not args: status = "enabled" if conversation_memory_enabled else "disabled" history_count = len(session_history) - memory_start_index if conversation_memory_enabled and memory_start_index < len(session_history) else 0 console.print(f"[bold blue]Conversation memory {status}.[/]") if conversation_memory_enabled: console.print(f"[dim blue]Tracking {history_count} message(s) since memory enabled.[/]") else: console.print(f"[dim yellow]Memory disabled. Each request is independent (saves tokens/cost).[/]") continue if args.lower() == "on": conversation_memory_enabled = True memory_start_index = len(session_history) # Remember where we started console.print("[bold green]Conversation memory enabled. Will remember conversations from this point forward.[/]") console.print(f"[dim blue]Memory will track messages starting from index {memory_start_index}.[/]") app_logger.info(f"Conversation memory enabled at index {memory_start_index}") elif args.lower() == "off": conversation_memory_enabled = False console.print("[bold green]Conversation memory disabled. API calls will not include history (lower cost).[/]") console.print(f"[dim yellow]Note: Messages are still saved locally but not sent to API.[/]") app_logger.info("Conversation memory disabled") else: console.print("[bold yellow]Usage: /memory on|off (or /memory to view status)[/]") continue elif user_input.lower().startswith("/paste"): # Get optional prompt after /paste optional_prompt = user_input[7:].strip() try: clipboard_content = pyperclip.paste() except Exception as e: console.print(f"[bold red]Failed to access clipboard: {e}[/]") app_logger.error(f"Clipboard access error: {e}") continue if not clipboard_content or not clipboard_content.strip(): console.print("[bold red]Clipboard is empty.[/]") app_logger.warning("Paste attempted with empty clipboard") continue # Validate it's text (check if it's valid UTF-8 and printable) try: # Try to encode/decode to ensure it's valid text clipboard_content.encode('utf-8') # Show preview of pasted content preview_lines = clipboard_content.split('\n')[:10] # First 10 lines preview_text = '\n'.join(preview_lines) if len(clipboard_content.split('\n')) > 10: preview_text += "\n... (content truncated for preview)" char_count = len(clipboard_content) line_count = len(clipboard_content.split('\n')) console.print(Panel( preview_text, title=f"[bold cyan]📋 Clipboard Content Preview ({char_count} chars, {line_count} lines)[/]", title_align="left", border_style="cyan" )) # Build the final prompt if optional_prompt: final_prompt = f"{optional_prompt}\n\n```\n{clipboard_content}\n```" console.print(f"[dim blue]Sending with prompt: '{optional_prompt}'[/]") else: final_prompt = clipboard_content console.print("[dim blue]Sending clipboard content without additional prompt[/]") # Set user_input to the pasted content so it gets processed normally user_input = final_prompt app_logger.info(f"Pasted content from clipboard: {char_count} chars, {line_count} lines, with prompt: {bool(optional_prompt)}") except UnicodeDecodeError: console.print("[bold red]Clipboard contains non-text (binary) data. Only plain text is supported.[/]") app_logger.error("Paste failed - clipboard contains binary data") continue except Exception as e: console.print(f"[bold red]Error processing clipboard content: {e}[/]") app_logger.error(f"Clipboard processing error: {e}") continue elif user_input.lower().startswith("/export"): args = user_input[8:].strip().split(maxsplit=1) if len(args) != 2: console.print("[bold red]Usage: /export [/]") console.print("[bold yellow]Formats: md (Markdown), json (JSON), html (HTML)[/]") console.print("[bold yellow]Example: /export md my_conversation.md[/]") continue export_format = args[0].lower() filename = args[1] if not session_history: console.print("[bold red]No conversation history to export.[/]") continue # Validate format if export_format not in ['md', 'json', 'html']: console.print("[bold red]Invalid format. Use: md, json, or html[/]") continue try: # Generate export content if export_format == 'md': content = export_as_markdown(session_history, session_system_prompt) elif export_format == 'json': content = export_as_json(session_history, session_system_prompt) elif export_format == 'html': content = export_as_html(session_history, session_system_prompt) # Write to file export_path = Path(filename).expanduser() with open(export_path, 'w', encoding='utf-8') as f: f.write(content) console.print(f"[bold green]✅ Conversation exported to: {export_path.absolute()}[/]") console.print(f"[dim blue]Format: {export_format.upper()} | Messages: {len(session_history)} | Size: {len(content)} bytes[/]") app_logger.info(f"Conversation exported as {export_format} to {export_path} ({len(session_history)} messages)") except Exception as e: console.print(f"[bold red]Export failed: {e}[/]") app_logger.error(f"Export error: {e}") continue elif user_input.lower().startswith("/save"): args = user_input[6:].strip() if not args: console.print("[bold red]Usage: /save [/]") continue if not session_history: console.print("[bold red]No history to save.[/]") continue save_conversation(args, session_history) console.print(f"[bold green]Conversation saved as '{args}'.[/]") app_logger.info(f"Conversation saved as '{args}' with {len(session_history)} messages") continue elif user_input.lower().startswith("/load"): args = user_input[6:].strip() if not args: console.print("[bold red]Usage: /load [/]") console.print("[bold yellow]Tip: Use /list to see numbered conversations[/]") continue # Check if input is a number conversation_name = None if args.isdigit(): conv_number = int(args) if saved_conversations_cache and 1 <= conv_number <= len(saved_conversations_cache): conversation_name = saved_conversations_cache[conv_number - 1]['name'] console.print(f"[bold cyan]Loading conversation #{conv_number}: '{conversation_name}'[/]") else: console.print(f"[bold red]Invalid conversation number: {conv_number}[/]") console.print(f"[bold yellow]Use /list to see available conversations (1-{len(saved_conversations_cache) if saved_conversations_cache else 0})[/]") continue else: conversation_name = args loaded_data = load_conversation(conversation_name) if not loaded_data: console.print(f"[bold red]Conversation '{conversation_name}' not found.[/]") app_logger.warning(f"Load failed for '{conversation_name}' - not found") continue session_history = loaded_data current_index = len(session_history) - 1 # When loading, reset memory tracking if memory is enabled if conversation_memory_enabled: memory_start_index = 0 # Include all loaded messages in memory total_input_tokens = 0 total_output_tokens = 0 total_cost = 0.0 message_count = 0 console.print(f"[bold green]Conversation '{conversation_name}' loaded with {len(session_history)} messages.[/]") app_logger.info(f"Conversation '{conversation_name}' loaded with {len(session_history)} messages") continue elif user_input.lower().startswith("/delete"): args = user_input[8:].strip() if not args: console.print("[bold red]Usage: /delete [/]") console.print("[bold yellow]Tip: Use /list to see numbered conversations[/]") continue # Check if input is a number conversation_name = None if args.isdigit(): conv_number = int(args) if saved_conversations_cache and 1 <= conv_number <= len(saved_conversations_cache): conversation_name = saved_conversations_cache[conv_number - 1]['name'] console.print(f"[bold cyan]Deleting conversation #{conv_number}: '{conversation_name}'[/]") else: console.print(f"[bold red]Invalid conversation number: {conv_number}[/]") console.print(f"[bold yellow]Use /list to see available conversations (1-{len(saved_conversations_cache) if saved_conversations_cache else 0})[/]") continue else: conversation_name = args # Confirm deletion try: confirm = typer.confirm(f"Delete conversation '{conversation_name}'? This cannot be undone.", default=False) if not confirm: console.print("[bold yellow]Deletion cancelled.[/]") continue except (EOFError, KeyboardInterrupt): console.print("\n[bold yellow]Deletion cancelled.[/]") continue deleted_count = delete_conversation(conversation_name) if deleted_count > 0: console.print(f"[bold green]Conversation '{conversation_name}' deleted ({deleted_count} version(s) removed).[/]") app_logger.info(f"Conversation '{conversation_name}' deleted - {deleted_count} version(s)") # Refresh cache if deleted conversation was in it if saved_conversations_cache: saved_conversations_cache = [c for c in saved_conversations_cache if c['name'] != conversation_name] else: console.print(f"[bold red]Conversation '{conversation_name}' not found.[/]") app_logger.warning(f"Delete failed for '{conversation_name}' - not found") continue elif user_input.lower() == "/list": conversations = list_conversations() if not conversations: console.print("[bold yellow]No saved conversations found.[/]") app_logger.info("User viewed conversation list - empty") saved_conversations_cache = [] continue # Update cache for /load command saved_conversations_cache = conversations table = Table("No.", "Name", "Messages", "Last Saved", show_header=True, header_style="bold magenta") for idx, conv in enumerate(conversations, 1): # Parse ISO timestamp and format it nicely try: dt = datetime.datetime.fromisoformat(conv['timestamp']) formatted_time = dt.strftime('%Y-%m-%d %H:%M:%S') except: formatted_time = conv['timestamp'] table.add_row( str(idx), conv['name'], str(conv['message_count']), formatted_time ) console.print(Panel(table, title=f"[bold green]Saved Conversations ({len(conversations)} total)[/]", title_align="left", subtitle="[dim]Use /load or /delete to manage conversations[/]", subtitle_align="right")) app_logger.info(f"User viewed conversation list - {len(conversations)} conversations") continue elif user_input.lower() == "/prev": if not session_history or current_index <= 0: console.print("[bold red]No previous response.[/]") continue current_index -= 1 prev_response = session_history[current_index]['response'] # Render as markdown with proper formatting md = Markdown(prev_response) console.print(Panel(md, title=f"[bold green]Previous Response ({current_index + 1}/{len(session_history)})[/]", title_align="left")) app_logger.debug(f"Viewed previous response at index {current_index}") continue elif user_input.lower() == "/next": if not session_history or current_index >= len(session_history) - 1: console.print("[bold red]No next response.[/]") continue current_index += 1 next_response = session_history[current_index]['response'] # Render as markdown with proper formatting md = Markdown(next_response) console.print(Panel(md, title=f"[bold green]Next Response ({current_index + 1}/{len(session_history)})[/]", title_align="left")) app_logger.debug(f"Viewed next response at index {current_index}") continue elif user_input.lower() == "/stats": credits = get_credits(API_KEY, OPENROUTER_BASE_URL) credits_left = credits['credits_left'] if credits else "Unknown" stats = f"Total Input: {total_input_tokens}, Total Output: {total_output_tokens}, Total Tokens: {total_input_tokens + total_output_tokens}, Total Cost: ${total_cost:.4f}, Avg Cost/Message: ${total_cost / message_count:.4f}" if message_count > 0 else "No messages." table = Table("Metric", "Value", show_header=True, header_style="bold magenta") table.add_row("Session Stats", stats) table.add_row("Credits Left", credits_left) console.print(Panel(table, title="[bold green]Session Cost Summary[/]", title_align="left")) app_logger.info(f"User viewed stats: {stats}") # Cost warnings in /stats warnings = check_credit_alerts(credits) if warnings: warning_text = '|'.join(warnings) console.print(f"[bold red]⚠️ {warning_text}[/]") app_logger.warning(f"Warnings in stats: {warning_text}") continue elif user_input.lower().startswith("/middleout"): args = user_input[11:].strip() if not args: console.print(f"[bold blue]Middle-out transform {'enabled' if middle_out_enabled else 'disabled'}.[/]") continue if args.lower() == "on": middle_out_enabled = True console.print("[bold green]Middle-out transform enabled.[/]") elif args.lower() == "off": middle_out_enabled = False console.print("[bold green]Middle-out transform disabled.[/]") else: console.print("[bold yellow]Usage: /middleout on|off (or /middleout to view status)[/]") continue elif user_input.lower() == "/reset": confirm = typer.confirm("Reset conversation context? This clears history and prompt.", default=False) if not confirm: console.print("[bold yellow]Reset cancelled.[/]") continue session_history = [] current_index = -1 session_system_prompt = "" memory_start_index = 0 # Reset memory tracking total_input_tokens = 0 total_output_tokens = 0 total_cost = 0.0 message_count = 0 console.print("[bold green]Conversation context reset.[/]") app_logger.info("Conversation context reset by user") continue elif user_input.lower().startswith("/info"): args = user_input[6:].strip() if not args: if not selected_model: console.print("[bold red]No model selected and no model ID provided. Use '/model' first or '/info '.[/]") continue model_to_show = selected_model else: model_to_show = next((m for m in models_data if m["id"] == args or m.get("canonical_slug") == args or args.lower() in m["name"].lower()), None) if not model_to_show: console.print(f"[bold red]Model '{args}' not found.[/]") continue # Display model info (unchanged) pricing = model_to_show.get("pricing", {}) architecture = model_to_show.get("architecture", {}) supported_params = ", ".join(model_to_show.get("supported_parameters", [])) or "None" top_provider = model_to_show.get("top_provider", {}) table = Table("Property", "Value", show_header=True, header_style="bold magenta") table.add_row("ID", model_to_show["id"]) table.add_row("Name", model_to_show["name"]) table.add_row("Description", model_to_show.get("description", "N/A")) table.add_row("Context Length", str(model_to_show.get("context_length", "N/A"))) table.add_row("Pricing - Prompt ($/M tokens)", pricing.get("prompt", "N/A")) table.add_row("Pricing - Completion ($/M tokens)", pricing.get("completion", "N/A")) table.add_row("Pricing - Request ($)", pricing.get("request", "N/A")) table.add_row("Pricing - Image ($)", pricing.get("image", "N/A")) table.add_row("Input Modalities", ", ".join(architecture.get("input_modalities", [])) or "None") table.add_row("Output Modalities", ", ".join(architecture.get("output_modalities", [])) or "None") table.add_row("Supported Parameters", supported_params) table.add_row("Top Provider Context Length", str(top_provider.get("context_length", "N/A"))) table.add_row("Max Completion Tokens", str(top_provider.get("max_completion_tokens", "N/A"))) table.add_row("Moderated", "Yes" if top_provider.get("is_moderated", False) else "No") console.print(Panel(table, title=f"[bold green]Model Info: {model_to_show['name']}[/]", title_align="left")) continue # Model selection (unchanged but with logging) elif user_input.startswith("/model"): app_logger.info("User initiated model selection") args = user_input[7:].strip() search_term = args if args else "" filtered_models = text_models if search_term: filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()] if not filtered_models: console.print(f"[bold red]No models match '{search_term}'. Try '/model'.[/]") continue table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta") for i, model in enumerate(filtered_models, 1): table.add_row(str(i), model["name"], model["id"]) console.print(Panel(table, title=f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left")) while True: try: choice = int(typer.prompt("Enter model number (or 0 to cancel)")) if choice == 0: break if 1 <= choice <= len(filtered_models): selected_model = filtered_models[choice - 1] console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]") app_logger.info(f"Model selected: {selected_model['name']} ({selected_model['id']})") break console.print("[bold red]Invalid choice. Try again.[/]") except ValueError: console.print("[bold red]Invalid input. Enter a number.[/]") continue elif user_input.startswith("/maxtoken"): # (unchanged) args = user_input[10:].strip() if not args: console.print(f"[bold blue]Current session max tokens: {session_max_token}[/]") continue try: new_limit = int(args) if new_limit < 1: console.print("[bold red]Session token limit must be at least 1.[/]") continue if new_limit > MAX_TOKEN: console.print(f"[bold yellow]Cannot exceed stored max ({MAX_TOKEN}). Capping.[/]") new_limit = MAX_TOKEN session_max_token = new_limit console.print(f"[bold green]Session max tokens set to: {session_max_token}[/]") except ValueError: console.print("[bold red]Invalid token limit. Provide a positive integer.[/]") continue elif user_input.startswith("/system"): # (unchanged but added to history view) args = user_input[8:].strip() if not args: if session_system_prompt: console.print(f"[bold blue]Current session system prompt:[/] {session_system_prompt}") else: console.print("[bold blue]No session system prompt set.[/]") continue if args.lower() == "clear": session_system_prompt = "" console.print("[bold green]Session system prompt cleared.[/]") else: session_system_prompt = args console.print(f"[bold green]Session system prompt set to: {session_system_prompt}[/]") continue elif user_input.startswith("/config"): args = user_input[8:].strip().lower() if args == "api": try: new_api_key = typer.prompt("Enter new API key") if new_api_key.strip(): set_config('api_key', new_api_key.strip()) API_KEY = new_api_key.strip() # Reinitialize client with new API key client = OpenRouter(api_key=API_KEY) console.print("[bold green]API key updated![/]") else: console.print("[bold yellow]No change.[/]") except Exception as e: console.print(f"[bold red]Error updating API key: {e}[/]") elif args == "url": try: new_url = typer.prompt("Enter new base URL") if new_url.strip(): set_config('base_url', new_url.strip()) OPENROUTER_BASE_URL = new_url.strip() console.print("[bold green]Base URL updated![/]") else: console.print("[bold yellow]No change.[/]") except Exception as e: console.print(f"[bold red]Error updating URL: {e}[/]") elif args.startswith("costwarning"): sub_args = args[11:].strip() # Extract everything after "costwarning" if not sub_args: console.print(f"[bold blue]Stored cost warning threshold: ${COST_WARNING_THRESHOLD:.4f}[/]") continue try: new_threshold = float(sub_args) if new_threshold < 0: console.print("[bold red]Cost warning threshold must be >= 0.[/]") continue set_config(HIGH_COST_WARNING, str(new_threshold)) COST_WARNING_THRESHOLD = new_threshold console.print(f"[bold green]Cost warning threshold set to ${COST_WARNING_THRESHOLD:.4f}[/]") except ValueError: console.print("[bold red]Invalid cost threshold. Provide a valid number.[/]") elif args.startswith("stream"): sub_args = args[7:].strip() if sub_args in ["on", "off"]: set_config('stream_enabled', sub_args) STREAM_ENABLED = sub_args console.print(f"[bold green]Streaming {'enabled' if sub_args == 'on' else 'disabled'}.[/]") else: console.print("[bold yellow]Usage: /config stream on|off[/]") elif args.startswith("maxtoken"): sub_args = args[9:].strip() if not sub_args: console.print(f"[bold blue]Stored max token limit: {MAX_TOKEN}[/]") continue try: new_max = int(sub_args) if new_max < 1: console.print("[bold red]Max token limit must be at least 1.[/]") continue if new_max > 1000000: console.print("[bold yellow]Capped at 1M for safety.[/]") new_max = 1000000 set_config('max_token', str(new_max)) MAX_TOKEN = new_max if session_max_token > MAX_TOKEN: session_max_token = MAX_TOKEN console.print(f"[bold yellow]Session adjusted to {session_max_token}.[/]") console.print(f"[bold green]Stored max token limit updated to: {MAX_TOKEN}[/]") except ValueError: console.print("[bold red]Invalid token limit.[/]") elif args.startswith("model"): sub_args = args[6:].strip() search_term = sub_args if sub_args else "" filtered_models = text_models if search_term: filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()] if not filtered_models: console.print(f"[bold red]No models match '{search_term}'. Try without search.[/]") continue table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta") for i, model in enumerate(filtered_models, 1): table.add_row(str(i), model["name"], model["id"]) console.print(Panel(table, title=f"[bold green]Available Models for Default ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left")) while True: try: choice = int(typer.prompt("Enter model number (or 0 to cancel)")) if choice == 0: break if 1 <= choice <= len(filtered_models): default_model = filtered_models[choice - 1] set_config('default_model', default_model["id"]) current_name = selected_model['name'] if selected_model else "None" console.print(f"[bold cyan]Default model set to: {default_model['name']} ({default_model['id']}). Current unchanged: {current_name}[/]") break console.print("[bold red]Invalid choice. Try again.[/]") except ValueError: console.print("[bold red]Invalid input. Enter a number.[/]") else: DEFAULT_MODEL_ID = get_config('default_model') memory_status = "Enabled" if conversation_memory_enabled else "Disabled" memory_tracked = len(session_history) - memory_start_index if conversation_memory_enabled else 0 table = Table("Setting", "Value", show_header=True, header_style="bold magenta", width=console.width - 10) table.add_row("API Key", API_KEY or "[Not set]") table.add_row("Base URL", OPENROUTER_BASE_URL or "[Not set]") table.add_row("DB Path", str(database) or "[Not set]") table.add_row("Logfile", str(log_file) or "[Not set]") table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled") table.add_row("Default Model", DEFAULT_MODEL_ID or "[Not set]") table.add_row("Current Model", "[Not set]" if selected_model is None else str(selected_model["name"])) table.add_row("Max Token", str(MAX_TOKEN)) table.add_row("Session Token", "[Not set]" if session_max_token == 0 else str(session_max_token)) table.add_row("Session System Prompt", session_system_prompt or "[Not set]") table.add_row("Cost Warning Threshold", f"${COST_WARNING_THRESHOLD:.4f}") table.add_row("Middle-out Transform", "Enabled" if middle_out_enabled else "Disabled") table.add_row("Conversation Memory", f"{memory_status} ({memory_tracked} tracked)" if conversation_memory_enabled else memory_status) table.add_row("History Size", str(len(session_history))) table.add_row("Current History Index", str(current_index) if current_index >= 0 else "[None]") table.add_row("App Name", APP_NAME) table.add_row("App URL", APP_URL) credits = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits: table.add_row("Total Credits", credits['total_credits']) table.add_row("Used Credits", credits['used_credits']) table.add_row("Credits Left", credits['credits_left']) else: table.add_row("Total Credits", "[Unavailable - Check API key]") table.add_row("Used Credits", "[Unavailable - Check API key]") table.add_row("Credits Left", "[Unavailable - Check API key]") console.print(Panel(table, title="[bold green]Current Configurations[/]", title_align="left", subtitle="[bold green]oAI Version %s" % version, subtitle_align="right")) continue if user_input.lower() == "/credits": credits = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits: console.print(f"[bold green]Credits left: {credits['credits_left']}[/]") alerts = check_credit_alerts(credits) if alerts: for alert in alerts: console.print(f"[bold red]⚠️ {alert}[/]") else: console.print("[bold red]Unable to fetch credits. Check your API key or network.[/]") continue if user_input.lower() == "/clear": clear_screen() DEFAULT_MODEL_ID = get_config('default_model') token_value = session_max_token if session_max_token != 0 else " Not set" console.print(f"[bold cyan]Token limits: Max= {MAX_TOKEN}, Session={token_value}[/]") console.print("[bold blue]Active model[/] [bold red]%s[/]" %(str(selected_model["name"]) if selected_model else "None")) continue if user_input.lower() == "/help": help_table = Table("Command", "Description", "Example", show_header=True, header_style="bold cyan", width=console.width - 10) # ===== SESSION COMMANDS ===== help_table.add_row( "[bold yellow]━━━ SESSION COMMANDS ━━━[/]", "", "" ) help_table.add_row( "/clear", "Clear the terminal screen for a clean interface. You can also use the keycombo [bold]ctrl+l[/]", "/clear" ) help_table.add_row( "/help", "Show this help menu with all available commands.", "/help" ) help_table.add_row( "/memory [on|off]", "Toggle conversation memory. ON sends history (AI remembers), OFF sends only current message (saves cost).", "/memory\n/memory off" ) help_table.add_row( "/next", "View the next response in history.", "/next" ) help_table.add_row( "/paste [prompt]", "Paste plain text/code from clipboard and send to AI. Optional prompt can be added.", "/paste\n/paste Explain this code" ) help_table.add_row( "/prev", "View the previous response in history.", "/prev" ) help_table.add_row( "/reset", "Clear conversation history and reset system prompt (resets session metrics). Requires confirmation.", "/reset" ) help_table.add_row( "/retry", "Resend the last prompt from history.", "/retry" ) # ===== MODEL COMMANDS ===== help_table.add_row( "[bold yellow]━━━ MODEL COMMANDS ━━━[/]", "", "" ) help_table.add_row( "/info [model_id]", "Display detailed info (pricing, modalities, context length, etc.) for current or specified model.", "/info\n/info gpt-4o" ) help_table.add_row( "/model [search]", "Select or change the current model for the session. Supports searching by name or ID.", "/model\n/model gpt" ) # ===== CONFIGURATION COMMANDS (ALPHABETICAL) ===== help_table.add_row( "[bold yellow]━━━ CONFIGURATION ━━━[/]", "", "" ) help_table.add_row( "/config", "View all current configurations, including limits, credits, and history.", "/config" ) help_table.add_row( "/config api", "Set or update the OpenRouter API key.", "/config api" ) help_table.add_row( "/config costwarning [value]", "Set the cost warning threshold. Alerts when response exceeds this cost (in USD).", "/config costwarning 0.05" ) help_table.add_row( "/config maxtoken [value]", "Set stored max token limit (persisted in DB). View current if no value provided.", "/config maxtoken 50000" ) help_table.add_row( "/config model [search]", "Set default model that loads on startup. Doesn't change current session model.", "/config model gpt" ) help_table.add_row( "/config stream [on|off]", "Enable or disable response streaming.", "/config stream off" ) help_table.add_row( "/config url", "Set or update the base URL for OpenRouter API.", "/config url" ) # ===== TOKEN & SYSTEM COMMANDS ===== help_table.add_row( "[bold yellow]━━━ TOKEN & SYSTEM ━━━[/]", "", "" ) help_table.add_row( "/maxtoken [value]", "Set temporary session token limit (≤ stored max). View current if no value provided.", "/maxtoken 2000" ) help_table.add_row( "/middleout [on|off]", "Enable/disable middle-out transform to compress prompts exceeding context size.", "/middleout on" ) help_table.add_row( "/system [prompt|clear]", "Set session-level system prompt to guide AI behavior. Use 'clear' to reset.", "/system You are a Python expert" ) # ===== CONVERSATION MANAGEMENT ===== help_table.add_row( "[bold yellow]━━━ CONVERSATION MGMT ━━━[/]", "", "" ) help_table.add_row( "/delete ", "Delete a saved conversation by name or number (from /list). Requires confirmation.", "/delete my_chat\n/delete 3" ) help_table.add_row( "/export ", "Export conversation to file. Formats: md (Markdown), json (JSON), html (HTML).", "/export md notes.md\n/export html report.html" ) help_table.add_row( "/list", "List all saved conversations with numbers, message counts, and timestamps.", "/list" ) help_table.add_row( "/load ", "Load a saved conversation by name or number (from /list). Resets session metrics.", "/load my_chat\n/load 3" ) help_table.add_row( "/save ", "Save current conversation history to database.", "/save my_chat" ) # ===== MONITORING & STATS ===== help_table.add_row( "[bold yellow]━━━ MONITORING & STATS ━━━[/]", "", "" ) help_table.add_row( "/credits", "Display credits left on your OpenRouter account with alerts.", "/credits" ) help_table.add_row( "/stats", "Display session cost summary: tokens, cost, credits left, and warnings.", "/stats" ) # ===== FILE ATTACHMENTS ===== help_table.add_row( "[bold yellow]━━━ INPUT METHODS ━━━[/]", "", "" ) help_table.add_row( "@/path/to/file", "Attach files to messages: images (PNG, JPG, etc.), PDFs, and code files (.py, .js, etc.).", "Debug @script.py\nSummarize @document.pdf\nAnalyze @image.png" ) help_table.add_row( "Clipboard paste", "Use /paste to send clipboard content (plain text/code) to AI.", "/paste\n/paste Explain this" ) # ===== EXIT ===== help_table.add_row( "[bold yellow]━━━ EXIT ━━━[/]", "", "" ) help_table.add_row( "exit | quit | bye", "Quit the chat application and display session summary.", "exit" ) console.print(Panel( help_table, title="[bold cyan]oAI Chat Help (Version %s)[/]" % version, title_align="center", subtitle="💡 Tip: Commands are case-insensitive • Memory ON by default (toggle with /memory) • Visit: https://iurl.no/oai", subtitle_align="center", border_style="cyan" )) continue if not selected_model: console.print("[bold yellow]Select a model first with '/model'.[/]") continue # Process file attachments with PDF support content_blocks = [] text_part = user_input file_attachments = [] for match in re.finditer(r'@([^\s]+)', user_input): file_path = match.group(1) expanded_path = os.path.expanduser(os.path.abspath(file_path)) if not os.path.exists(expanded_path) or os.path.isdir(expanded_path): console.print(f"[bold red]File not found or is a directory: {expanded_path}[/]") continue file_size = os.path.getsize(expanded_path) if file_size > 10 * 1024 * 1024: console.print(f"[bold red]File too large (>10MB): {expanded_path}[/]") continue mime_type, _ = mimetypes.guess_type(expanded_path) file_ext = os.path.splitext(expanded_path)[1].lower() try: with open(expanded_path, 'rb') as f: file_data = f.read() # Handle images if mime_type and mime_type.startswith('image/'): modalities = selected_model.get("architecture", {}).get("input_modalities", []) if "image" not in modalities: console.print("[bold red]Selected model does not support image attachments.[/]") console.print(f"[dim yellow]Supported modalities: {', '.join(modalities) if modalities else 'text only'}[/]") continue b64_data = base64.b64encode(file_data).decode('utf-8') content_blocks.append({"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"}}) console.print(f"[dim green]✓ Image attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]") # Handle PDFs elif mime_type == 'application/pdf' or file_ext == '.pdf': modalities = selected_model.get("architecture", {}).get("input_modalities", []) # Check for various possible modality indicators for PDFs supports_pdf = any(mod in modalities for mod in ["document", "pdf", "file"]) if not supports_pdf: console.print("[bold red]Selected model does not support PDF attachments.[/]") console.print(f"[dim yellow]Supported modalities: {', '.join(modalities) if modalities else 'text only'}[/]") continue b64_data = base64.b64encode(file_data).decode('utf-8') content_blocks.append({"type": "image_url", "image_url": {"url": f"data:application/pdf;base64,{b64_data}"}}) console.print(f"[dim green]✓ PDF attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]") # Handle code/text files elif (mime_type == 'text/plain' or file_ext in SUPPORTED_CODE_EXTENSIONS): text_content = file_data.decode('utf-8') content_blocks.append({"type": "text", "text": f"Code File: {os.path.basename(expanded_path)}\n\n{text_content}"}) console.print(f"[dim green]✓ Code file attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]") else: console.print(f"[bold red]Unsupported file type ({mime_type}) for {expanded_path}.[/]") console.print("[bold yellow]Supported types: images (PNG, JPG, etc.), PDFs, and code files (.py, .js, etc.)[/]") continue file_attachments.append(file_path) app_logger.info(f"File attached: {os.path.basename(expanded_path)}, Type: {mime_type or file_ext}, Size: {file_size / 1024:.1f} KB") except UnicodeDecodeError: console.print(f"[bold red]Cannot decode {expanded_path} as UTF-8. File may be binary or use unsupported encoding.[/]") app_logger.error(f"UTF-8 decode error for {expanded_path}") continue except Exception as e: console.print(f"[bold red]Error reading file {expanded_path}: {e}[/]") app_logger.error(f"File read error for {expanded_path}: {e}") continue text_part = re.sub(r'@([^\s]+)', '', text_part).strip() # Build message content if text_part or content_blocks: message_content = [] if text_part: message_content.append({"type": "text", "text": text_part}) message_content.extend(content_blocks) else: console.print("[bold red]Prompt cannot be empty.[/]") continue # Build API messages with conversation history if memory is enabled api_messages = [] # Add system prompt if set if session_system_prompt: api_messages.append({"role": "system", "content": session_system_prompt}) # Add conversation history only if memory is enabled (from memory start point onwards) if conversation_memory_enabled: # Only include history from when memory was last enabled for i in range(memory_start_index, len(session_history)): history_entry = session_history[i] api_messages.append({ "role": "user", "content": history_entry['prompt'] }) api_messages.append({ "role": "assistant", "content": history_entry['response'] }) # Add current user message api_messages.append({"role": "user", "content": message_content}) # Build API params with app identification headers (using http_headers) api_params = { "model": selected_model["id"], "messages": api_messages, "stream": STREAM_ENABLED == "on", "http_headers": { "HTTP-Referer": APP_URL, "X-Title": APP_NAME } } if session_max_token > 0: api_params["max_tokens"] = session_max_token if middle_out_enabled: api_params["transforms"] = ["middle-out"] # Log API request file_count = len(file_attachments) history_messages_count = len(session_history) - memory_start_index if conversation_memory_enabled else 0 memory_status = "ON" if conversation_memory_enabled else "OFF" app_logger.info(f"API Request: Model '{selected_model['id']}', Prompt length: {len(text_part)} chars, {file_count} file(s) attached, Memory: {memory_status}, History sent: {history_messages_count} messages, Transforms: middle-out {'enabled' if middle_out_enabled else 'disabled'}, App: {APP_NAME} ({APP_URL}).") # Send and handle response with metrics and timing is_streaming = STREAM_ENABLED == "on" if is_streaming: console.print("[bold green]Streaming response...[/] [dim](Press Ctrl+C to cancel)[/]") console.print("") # Add spacing before response else: console.print("[bold green]Thinking...[/]", end="\r") start_time = time.time() # Start timing request try: response = client.chat.send(**api_params) app_logger.info(f"API call successful for model '{selected_model['id']}'") except Exception as e: console.print(f"[bold red]Error sending request: {e}[/]") app_logger.error(f"API Error: {type(e).__name__}: {e}") continue response_time = time.time() - start_time # Calculate response time full_response = "" if is_streaming: try: # Use Live display for smooth streaming with proper wrapping with Live("", console=console, refresh_per_second=10, auto_refresh=True) as live: for chunk in response: if hasattr(chunk, 'error') and chunk.error: console.print(f"\n[bold red]Stream error: {chunk.error.message}[/]") app_logger.error(f"Stream error: {chunk.error.message}") break if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: content_chunk = chunk.choices[0].delta.content full_response += content_chunk # Update live display with markdown rendering md = Markdown(full_response) live.update(md) # Add newline after streaming completes console.print("") except KeyboardInterrupt: console.print("\n[bold yellow]Streaming cancelled![/]") app_logger.info("Streaming cancelled by user") continue else: full_response = response.choices[0].message.content if response.choices else "" console.print(f"\r{' ' * 20}\r", end="") if full_response: # Render response with proper markdown formatting if not is_streaming: # Only show panel for non-streaming (streaming already displayed) md = Markdown(full_response) console.print(Panel(md, title="[bold green]AI Response[/]", title_align="left", border_style="green")) session_history.append({'prompt': user_input, 'response': full_response}) current_index = len(session_history) - 1 # Process metrics for per-message display and session tracking usage = getattr(response, 'usage', None) input_tokens = usage.input_tokens if usage and hasattr(usage, 'input_tokens') else 0 output_tokens = usage.output_tokens if usage and hasattr(usage, 'output_tokens') else 0 msg_cost = usage.total_cost_usd if usage and hasattr(usage, 'total_cost_usd') else estimate_cost(input_tokens, output_tokens) total_input_tokens += input_tokens total_output_tokens += output_tokens total_cost += msg_cost message_count += 1 # Log response metrics app_logger.info(f"Response: Tokens - I:{input_tokens} O:{output_tokens} T:{input_tokens + output_tokens}, Cost: ${msg_cost:.4f}, Time: {response_time:.2f}s") # Per-message metrics display with context info if conversation_memory_enabled: context_count = len(session_history) - memory_start_index context_info = f", Context: {context_count} msg(s)" if context_count > 1 else "" else: context_info = ", Memory: OFF" console.print(f"\n[dim blue]📊 Metrics: {input_tokens + output_tokens} tokens | ${msg_cost:.4f} | {response_time:.2f}s{context_info} | Session: {total_input_tokens + total_output_tokens} tokens | ${total_cost:.4f}[/]") # Cost and credit alerts warnings = [] if msg_cost > COST_WARNING_THRESHOLD: warnings.append(f"High cost alert: This response exceeded configurable threshold ${COST_WARNING_THRESHOLD:.4f}") credits_data = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits_data: warning_alerts = check_credit_alerts(credits_data) warnings.extend(warning_alerts) if warnings: warning_text = ' | '.join(warnings) console.print(f"[bold red]⚠️ {warning_text}[/]") app_logger.warning(f"Warnings triggered: {warning_text}") # Add spacing before copy prompt console.print("") try: copy_choice = input("💾 Type 'c' to copy response, or press Enter to continue: ").strip().lower() if copy_choice == "c": pyperclip.copy(full_response) console.print("[bold green]✅ Response copied to clipboard![/]") except (EOFError, KeyboardInterrupt): pass # Add spacing after interaction console.print("") else: console.print("[bold red]No response received.[/]") app_logger.error("No response from API") except KeyboardInterrupt: console.print("\n[bold yellow]Input interrupted. Continuing...[/]") app_logger.warning("Input interrupted by Ctrl+C") continue except EOFError: console.print("\n[bold yellow]Goodbye![/]") total_tokens = total_input_tokens + total_output_tokens app_logger.info(f"Session ended via EOF. Total messages: {message_count}, Total tokens: {total_tokens}, Total cost: ${total_cost:.4f}") return except Exception as e: console.print(f"[bold red]Error: {e}[/]") console.print("[bold yellow]Try again or select a model.[/]") app_logger.error(f"Unexpected error: {type(e).__name__}: {e}") if __name__ == "__main__": clear_screen() app()