#!/usr/bin/python3 -W ignore::DeprecationWarning import sys import os import requests import time import asyncio from pathlib import Path from typing import Optional, List, Dict, Any import typer from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.text import Text from rich.markdown import Markdown from rich.live import Live from openrouter import OpenRouter import pyperclip import mimetypes import base64 import re import sqlite3 import json import datetime import logging from logging.handlers import RotatingFileHandler from prompt_toolkit import PromptSession from prompt_toolkit.history import FileHistory from rich.logging import RichHandler from prompt_toolkit.auto_suggest import AutoSuggestFromHistory from packaging import version as pkg_version import io import platform import shutil import subprocess import fnmatch import signal # MCP imports try: from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client MCP_AVAILABLE = True except ImportError: MCP_AVAILABLE = False print("Warning: MCP library not found. Install with: pip install mcp") # App version version = '2.1.0-beta' app = typer.Typer() # Application identification APP_NAME = "oAI" APP_URL = "https://iurl.no/oai" # Paths home = Path.home() config_dir = home / '.config' / 'oai' cache_dir = home / '.cache' / 'oai' history_file = config_dir / 'history.txt' database = config_dir / 'oai_config.db' log_file = config_dir / 'oai.log' # Create dirs config_dir.mkdir(parents=True, exist_ok=True) cache_dir.mkdir(parents=True, exist_ok=True) # Rich console console = Console() # Valid commands VALID_COMMANDS = { '/retry', '/online', '/memory', '/paste', '/export', '/save', '/load', '/delete', '/list', '/prev', '/next', '/stats', '/middleout', '/reset', '/info', '/model', '/maxtoken', '/system', '/config', '/credits', '/clear', '/cl', '/help', '/mcp' } # Command help database (COMPLETE - includes MCP comprehensive guide) COMMAND_HELP = { '/clear': { 'aliases': ['/cl'], 'description': 'Clear the terminal screen for a clean interface.', 'usage': '/clear\n/cl', 'examples': [ ('Clear screen', '/clear'), ('Using short alias', '/cl'), ], 'notes': 'You can also use the keyboard shortcut Ctrl+L to clear the screen.' }, '/help': { 'description': 'Display help information for commands.', 'usage': '/help [command|topic]', 'examples': [ ('Show all commands', '/help'), ('Get help for a specific command', '/help /model'), ('Get detailed MCP help', '/help mcp'), ], 'notes': 'Use /help without arguments to see the full command list, /help for detailed command info, or /help mcp for comprehensive MCP documentation.' }, 'mcp': { 'description': 'Complete guide to MCP (Model Context Protocol) - file access and database querying for AI agents.', 'usage': 'See detailed examples below', 'examples': [], 'notes': ''' MCP (Model Context Protocol) gives your AI assistant direct access to: • Local files and folders (read, search, list) • SQLite databases (inspect, search, query) ╭─────────────────────────────────────────────────────────╮ │ 🗂️ FILE MODE │ ╰─────────────────────────────────────────────────────────╯ SETUP: /mcp enable Start MCP server /mcp add ~/Documents Grant access to folder /mcp add ~/Code/project Add another folder /mcp list View all allowed folders USAGE (just ask naturally): "List all Python files in my Code folder" "Read the contents of Documents/notes.txt" "Search for files containing 'budget'" "What's in my Documents folder?" FEATURES: ✓ Automatically respects .gitignore patterns ✓ Skips virtual environments (venv, node_modules, etc.) ✓ Handles large files (auto-truncates >50KB) ✓ Cross-platform (macOS, Linux, Windows) MANAGEMENT: /mcp remove ~/Desktop Remove folder access /mcp remove 2 Remove by number /mcp gitignore on|off Toggle .gitignore filtering /mcp status Show comprehensive status ╭─────────────────────────────────────────────────────────╮ │ 🗄️ DATABASE MODE │ ╰─────────────────────────────────────────────────────────╯ SETUP: /mcp add db ~/app/data.db Add specific database /mcp add db ~/.config/oai/oai_config.db /mcp db list View all databases SWITCH TO DATABASE: /mcp db 1 Work with database #1 /mcp db 2 Switch to database #2 /mcp files Switch back to file mode USAGE (after selecting a database): "Show me all tables in this database" "Find any records mentioning 'Santa Clause'" "How many users are in the database?" "Show me the schema for the orders table" "Get the 10 most recent transactions" FEATURES: ✓ Read-only mode (no data modification possible) ✓ Smart schema inspection ✓ Full-text search across all tables ✓ SQL query execution (SELECT only) ✓ JOINs, subqueries, CTEs supported ✓ Automatic query timeout (5 seconds) ✓ Result limits (max 1000 rows) SAFETY: • All queries are read-only (INSERT/UPDATE/DELETE blocked) • Database opened in read-only mode • Query validation prevents dangerous operations • Timeout protection prevents infinite loops ╭─────────────────────────────────────────────────────────╮ │ 💡 TIPS & TRICKS │ ╰─────────────────────────────────────────────────────────╯ MODE INDICATORS: [🔧 MCP: Files] You're in file mode [🗄️ MCP: DB #1] You're querying database #1 QUICK REFERENCE: /mcp status See current mode, stats, folders/databases /mcp files Switch to file mode (default) /mcp db Switch to database mode /mcp db list List all databases with details /mcp list List all folders TROUBLESHOOTING: • No results? Check /mcp status to see what's accessible • Wrong mode? Use /mcp files or /mcp db to switch • Database errors? Ensure file exists and is valid SQLite • .gitignore not working? Check /mcp status shows patterns SECURITY NOTES: • MCP only accesses explicitly added folders/databases • File mode: read-only access (no write/delete) • Database mode: SELECT queries only (no modifications) • System directories are blocked automatically • Each addition requires your explicit confirmation For command-specific help: /help /mcp ''' }, '/mcp': { 'description': 'Manage MCP (Model Context Protocol) for local file access and SQLite database querying. When enabled with a function-calling model, AI can automatically search, read, list files, and query databases. Supports two modes: Files (default) and Database.', 'usage': '/mcp [args]', 'examples': [ ('Enable MCP server', '/mcp enable'), ('Disable MCP server', '/mcp disable'), ('Show MCP status and current mode', '/mcp status'), ('', ''), ('━━━ FILE MODE ━━━', ''), ('Add folder for file access', '/mcp add ~/Documents'), ('Remove folder by path', '/mcp remove ~/Desktop'), ('Remove folder by number', '/mcp remove 2'), ('List allowed folders', '/mcp list'), ('Toggle .gitignore filtering', '/mcp gitignore on'), ('', ''), ('━━━ DATABASE MODE ━━━', ''), ('Add SQLite database', '/mcp add db ~/app/data.db'), ('List all databases', '/mcp db list'), ('Switch to database #1', '/mcp db 1'), ('Remove database', '/mcp remove db 2'), ('Switch back to file mode', '/mcp files'), ], 'notes': '''MCP allows AI to read local files and query SQLite databases. Works automatically with function-calling models (GPT-4, Claude, etc.). FILE MODE (default): - Automatically loads and respects .gitignore patterns - Skips virtual environments and build artifacts - Supports search, read, and list operations DATABASE MODE: - Read-only access (no data modification) - Execute SELECT queries with JOINs, subqueries, CTEs - Full-text search across all tables - Schema inspection and data exploration - Automatic query validation and timeout protection Use /help mcp for comprehensive guide with examples.''' }, '/memory': { 'description': 'Toggle conversation memory. When ON, the AI remembers conversation history. When OFF, each request is independent (saves tokens and cost).', 'usage': '/memory [on|off]', 'examples': [ ('Check current memory status', '/memory'), ('Enable conversation memory', '/memory on'), ('Disable memory (save costs)', '/memory off'), ], 'notes': 'Memory is ON by default. Disabling memory reduces API costs but the AI won\'t remember previous messages. Messages are still saved locally for your reference.' }, '/online': { 'description': 'Enable or disable online mode (web search capabilities) for the current session.', 'usage': '/online [on|off]', 'examples': [ ('Check online mode status', '/online'), ('Enable web search', '/online on'), ('Disable web search', '/online off'), ], 'notes': 'Not all models support online mode. The model must have "tools" parameter support. This setting overrides the default online mode configured with /config online.' }, '/paste': { 'description': 'Paste plain text or code from clipboard and send to the AI. Optionally add a prompt.', 'usage': '/paste [prompt]', 'examples': [ ('Paste clipboard content', '/paste'), ('Paste with a question', '/paste Explain this code'), ('Paste and ask for review', '/paste Review this for bugs'), ], 'notes': 'Only plain text is supported. Binary clipboard data will be rejected. The clipboard content is shown as a preview before sending.' }, '/retry': { 'description': 'Resend the last prompt from conversation history.', 'usage': '/retry', 'examples': [ ('Retry last message', '/retry'), ], 'notes': 'Useful when you get an error or want a different response to the same prompt. Requires at least one message in history.' }, '/next': { 'description': 'View the next response in conversation history.', 'usage': '/next', 'examples': [ ('Navigate to next response', '/next'), ], 'notes': 'Navigate through conversation history. Use /prev to go backward.' }, '/prev': { 'description': 'View the previous response in conversation history.', 'usage': '/prev', 'examples': [ ('Navigate to previous response', '/prev'), ], 'notes': 'Navigate through conversation history. Use /next to go forward.' }, '/reset': { 'description': 'Clear conversation history and reset system prompt. This resets all session metrics.', 'usage': '/reset', 'examples': [ ('Reset conversation', '/reset'), ], 'notes': 'Requires confirmation. This clears all message history, resets the system prompt, and resets token/cost counters. Use when starting a completely new conversation topic.' }, '/info': { 'description': 'Display detailed information about a model including pricing, capabilities, context length, and online support.', 'usage': '/info [model_id]', 'examples': [ ('Show current model info', '/info'), ('Show specific model info', '/info gpt-4o'), ('Check model capabilities', '/info claude-3-opus'), ], 'notes': 'Without arguments, shows info for the currently selected model. Displays pricing per million tokens, supported modalities (text, image, etc.), and parameter support.' }, '/model': { 'description': 'Select or change the AI model for the current session. Shows image and online capabilities.', 'usage': '/model [search_term]', 'examples': [ ('List all models', '/model'), ('Search for GPT models', '/model gpt'), ('Search for Claude models', '/model claude'), ], 'notes': 'Models are numbered for easy selection. The table shows Image (✓ if model accepts images) and Online (✓ if model supports web search) columns.' }, '/config': { 'description': 'View or modify application configuration settings.', 'usage': '/config [setting] [value]', 'examples': [ ('View all settings', '/config'), ('Set API key', '/config api'), ('Set default model', '/config model'), ('Enable streaming', '/config stream on'), ('Set cost warning threshold', '/config costwarning 0.05'), ('Set log level', '/config loglevel debug'), ('Set default online mode', '/config online on'), ], 'notes': 'Available settings: api (API key), url (base URL), model (default model), stream (on/off), costwarning (threshold $), maxtoken (limit), online (default on/off), log (size MB), loglevel (debug/info/warning/error/critical).' }, '/maxtoken': { 'description': 'Set a temporary session token limit (cannot exceed stored max token limit).', 'usage': '/maxtoken [value]', 'examples': [ ('View current session limit', '/maxtoken'), ('Set session limit to 2000', '/maxtoken 2000'), ('Set to 50000', '/maxtoken 50000'), ], 'notes': 'This is a session-only setting and cannot exceed the stored max token limit (set with /config maxtoken). View without arguments to see current value.' }, '/system': { 'description': 'Set or clear the session-level system prompt to guide AI behavior.', 'usage': '/system [prompt|clear]', 'examples': [ ('View current system prompt', '/system'), ('Set as Python expert', '/system You are a Python expert'), ('Set as code reviewer', '/system You are a senior code reviewer. Focus on bugs and best practices.'), ('Clear system prompt', '/system clear'), ], 'notes': 'System prompts influence how the AI responds throughout the session. Use "clear" to remove the current system prompt.' }, '/save': { 'description': 'Save the current conversation history to the database.', 'usage': '/save ', 'examples': [ ('Save conversation', '/save my_chat'), ('Save with descriptive name', '/save python_debugging_2024'), ], 'notes': 'Saved conversations can be loaded later with /load. Use descriptive names to easily find them later.' }, '/load': { 'description': 'Load a saved conversation from the database by name or number.', 'usage': '/load ', 'examples': [ ('Load by name', '/load my_chat'), ('Load by number from /list', '/load 3'), ], 'notes': 'Use /list to see numbered conversations. Loading a conversation replaces current history and resets session metrics.' }, '/delete': { 'description': 'Delete a saved conversation from the database. Requires confirmation.', 'usage': '/delete ', 'examples': [ ('Delete by name', '/delete my_chat'), ('Delete by number from /list', '/delete 3'), ], 'notes': 'Use /list to see numbered conversations. This action cannot be undone!' }, '/list': { 'description': 'List all saved conversations with numbers, message counts, and timestamps.', 'usage': '/list', 'examples': [ ('Show saved conversations', '/list'), ], 'notes': 'Conversations are numbered for easy use with /load and /delete commands. Shows message count and last saved time.' }, '/export': { 'description': 'Export the current conversation to a file in various formats.', 'usage': '/export ', 'examples': [ ('Export as Markdown', '/export md notes.md'), ('Export as JSON', '/export json conversation.json'), ('Export as HTML', '/export html report.html'), ], 'notes': 'Available formats: md (Markdown), json (JSON), html (HTML). The export includes all messages and the system prompt if set.' }, '/stats': { 'description': 'Display session statistics including tokens, costs, and credits.', 'usage': '/stats', 'examples': [ ('View session statistics', '/stats'), ], 'notes': 'Shows total input/output tokens, total cost, average cost per message, and remaining credits. Also displays credit warnings if applicable.' }, '/credits': { 'description': 'Display your OpenRouter account credits and usage.', 'usage': '/credits', 'examples': [ ('Check credits', '/credits'), ], 'notes': 'Shows total credits, used credits, and credits left. Displays warnings if credits are low (< $1 or < 10% of total).' }, '/middleout': { 'description': 'Enable or disable middle-out transform to compress prompts exceeding context size.', 'usage': '/middleout [on|off]', 'examples': [ ('Check status', '/middleout'), ('Enable compression', '/middleout on'), ('Disable compression', '/middleout off'), ], 'notes': 'Middle-out transform intelligently compresses long prompts to fit within model context limits. Useful for very long conversations.' }, } # Supported code extensions SUPPORTED_CODE_EXTENSIONS = { '.py', '.js', '.ts', '.cs', '.java', '.c', '.cpp', '.h', '.hpp', '.rb', '.ruby', '.php', '.swift', '.kt', '.kts', '.go', '.sh', '.bat', '.ps1', '.R', '.scala', '.pl', '.lua', '.dart', '.elm', '.xml', '.json', '.yaml', '.yml', '.md', '.txt' } # Pricing MODEL_PRICING = { 'input': 3.0, 'output': 15.0 } LOW_CREDIT_RATIO = 0.1 LOW_CREDIT_AMOUNT = 1.0 HIGH_COST_WARNING = "cost_warning_threshold" # Valid log levels VALID_LOG_LEVELS = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL } # System directories to block (cross-platform) SYSTEM_DIRS_BLACKLIST = { '/System', '/Library', '/private', '/usr', '/bin', '/sbin', # macOS '/boot', '/dev', '/proc', '/sys', '/root', # Linux 'C:\\Windows', 'C:\\Program Files', 'C:\\Program Files (x86)' # Windows } # Database functions def create_table_if_not_exists(): """Ensure tables exist.""" os.makedirs(config_dir, exist_ok=True) with sqlite3.connect(str(database)) as conn: conn.execute('''CREATE TABLE IF NOT EXISTS config ( key TEXT PRIMARY KEY, value TEXT NOT NULL )''') conn.execute('''CREATE TABLE IF NOT EXISTS conversation_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, timestamp TEXT NOT NULL, data TEXT NOT NULL )''') # MCP configuration table conn.execute('''CREATE TABLE IF NOT EXISTS mcp_config ( key TEXT PRIMARY KEY, value TEXT NOT NULL )''') # MCP statistics table conn.execute('''CREATE TABLE IF NOT EXISTS mcp_stats ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, tool_name TEXT NOT NULL, folder TEXT, success INTEGER NOT NULL, error_message TEXT )''') # MCP databases table conn.execute('''CREATE TABLE IF NOT EXISTS mcp_databases ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT NOT NULL UNIQUE, name TEXT NOT NULL, size INTEGER, tables TEXT, added_timestamp TEXT NOT NULL )''') conn.commit() def get_config(key: str) -> Optional[str]: create_table_if_not_exists() with sqlite3.connect(str(database)) as conn: cursor = conn.execute('SELECT value FROM config WHERE key = ?', (key,)) result = cursor.fetchone() return result[0] if result else None def set_config(key: str, value: str): create_table_if_not_exists() with sqlite3.connect(str(database)) as conn: conn.execute('INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)', (key, value)) conn.commit() def get_mcp_config(key: str) -> Optional[str]: """Get MCP configuration value.""" create_table_if_not_exists() with sqlite3.connect(str(database)) as conn: cursor = conn.execute('SELECT value FROM mcp_config WHERE key = ?', (key,)) result = cursor.fetchone() return result[0] if result else None def set_mcp_config(key: str, value: str): """Set MCP configuration value.""" create_table_if_not_exists() with sqlite3.connect(str(database)) as conn: conn.execute('INSERT OR REPLACE INTO mcp_config (key, value) VALUES (?, ?)', (key, value)) conn.commit() def log_mcp_stat(tool_name: str, folder: Optional[str], success: bool, error_message: Optional[str] = None): """Log MCP tool usage statistics.""" create_table_if_not_exists() timestamp = datetime.datetime.now().isoformat() with sqlite3.connect(str(database)) as conn: conn.execute( 'INSERT INTO mcp_stats (timestamp, tool_name, folder, success, error_message) VALUES (?, ?, ?, ?, ?)', (timestamp, tool_name, folder, 1 if success else 0, error_message) ) conn.commit() def get_mcp_stats() -> Dict[str, Any]: """Get MCP usage statistics.""" create_table_if_not_exists() with sqlite3.connect(str(database)) as conn: cursor = conn.execute(''' SELECT COUNT(*) as total_calls, SUM(CASE WHEN tool_name = 'read_file' THEN 1 ELSE 0 END) as reads, SUM(CASE WHEN tool_name = 'list_directory' THEN 1 ELSE 0 END) as lists, SUM(CASE WHEN tool_name = 'search_files' THEN 1 ELSE 0 END) as searches, SUM(CASE WHEN tool_name = 'inspect_database' THEN 1 ELSE 0 END) as db_inspects, SUM(CASE WHEN tool_name = 'search_database' THEN 1 ELSE 0 END) as db_searches, SUM(CASE WHEN tool_name = 'query_database' THEN 1 ELSE 0 END) as db_queries, MAX(timestamp) as last_used FROM mcp_stats ''') row = cursor.fetchone() return { 'total_calls': row[0] or 0, 'reads': row[1] or 0, 'lists': row[2] or 0, 'searches': row[3] or 0, 'db_inspects': row[4] or 0, 'db_searches': row[5] or 0, 'db_queries': row[6] or 0, 'last_used': row[7] } # Rotating Rich Handler class RotatingRichHandler(RotatingFileHandler): """Custom handler combining RotatingFileHandler with Rich formatting.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.rich_console = Console(file=io.StringIO(), width=120, force_terminal=False) self.rich_handler = RichHandler( console=self.rich_console, show_time=True, show_path=True, rich_tracebacks=True, tracebacks_suppress=['requests', 'openrouter', 'urllib3', 'httpx', 'openai'] ) def emit(self, record): try: self.rich_handler.emit(record) output = self.rich_console.file.getvalue() self.rich_console.file.seek(0) self.rich_console.file.truncate(0) if output: self.stream.write(output) self.flush() except Exception: self.handleError(record) # Logging setup LOG_MAX_SIZE_MB = int(get_config('log_max_size_mb') or "10") LOG_BACKUP_COUNT = int(get_config('log_backup_count') or "2") LOG_LEVEL_STR = get_config('log_level') or "info" LOG_LEVEL = VALID_LOG_LEVELS.get(LOG_LEVEL_STR.lower(), logging.INFO) app_handler = None app_logger = None def setup_logging(): """Setup or reset logging configuration.""" global app_handler, LOG_MAX_SIZE_MB, LOG_BACKUP_COUNT, LOG_LEVEL, app_logger root_logger = logging.getLogger() if app_handler is not None: root_logger.removeHandler(app_handler) try: app_handler.close() except: pass # Check if log needs rotation if os.path.exists(log_file): current_size = os.path.getsize(log_file) max_bytes = LOG_MAX_SIZE_MB * 1024 * 1024 if current_size >= max_bytes: import shutil timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') backup_file = f"{log_file}.{timestamp}" try: shutil.move(str(log_file), backup_file) except Exception as e: print(f"Warning: Could not rotate log file: {e}") # Clean old backups log_dir = os.path.dirname(log_file) log_basename = os.path.basename(log_file) backup_pattern = f"{log_basename}.*" import glob backups = sorted(glob.glob(os.path.join(log_dir, backup_pattern))) while len(backups) > LOG_BACKUP_COUNT: oldest = backups.pop(0) try: os.remove(oldest) except: pass app_handler = RotatingRichHandler( filename=str(log_file), maxBytes=LOG_MAX_SIZE_MB * 1024 * 1024, backupCount=LOG_BACKUP_COUNT, encoding='utf-8' ) app_handler.setLevel(logging.NOTSET) root_logger.setLevel(logging.WARNING) root_logger.addHandler(app_handler) # Suppress noisy loggers logging.getLogger('asyncio').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('httpx').setLevel(logging.WARNING) logging.getLogger('httpcore').setLevel(logging.WARNING) logging.getLogger('openai').setLevel(logging.WARNING) logging.getLogger('openrouter').setLevel(logging.WARNING) app_logger = logging.getLogger("oai_app") app_logger.setLevel(LOG_LEVEL) app_logger.propagate = True return app_logger def set_log_level(level_str: str) -> bool: """Set application log level.""" global LOG_LEVEL, LOG_LEVEL_STR, app_logger level_str_lower = level_str.lower() if level_str_lower not in VALID_LOG_LEVELS: return False LOG_LEVEL = VALID_LOG_LEVELS[level_str_lower] LOG_LEVEL_STR = level_str_lower if app_logger: app_logger.setLevel(LOG_LEVEL) return True def reload_logging_config(): """Reload logging configuration.""" global LOG_MAX_SIZE_MB, LOG_BACKUP_COUNT, LOG_LEVEL, LOG_LEVEL_STR, app_logger LOG_MAX_SIZE_MB = int(get_config('log_max_size_mb') or "10") LOG_BACKUP_COUNT = int(get_config('log_backup_count') or "2") LOG_LEVEL_STR = get_config('log_level') or "info" LOG_LEVEL = VALID_LOG_LEVELS.get(LOG_LEVEL_STR.lower(), logging.INFO) app_logger = setup_logging() return app_logger app_logger = setup_logging() logger = logging.getLogger(__name__) # Load configs API_KEY = get_config('api_key') OPENROUTER_BASE_URL = get_config('base_url') or "https://openrouter.ai/api/v1" STREAM_ENABLED = get_config('stream_enabled') or "on" DEFAULT_MODEL_ID = get_config('default_model') MAX_TOKEN = int(get_config('max_token') or "100000") COST_WARNING_THRESHOLD = float(get_config(HIGH_COST_WARNING) or "0.01") DEFAULT_ONLINE_MODE = get_config('default_online_mode') or "off" # Fetch models models_data = [] text_models = [] try: headers = { "Authorization": f"Bearer {API_KEY}", "HTTP-Referer": APP_URL, "X-Title": APP_NAME } if API_KEY else { "HTTP-Referer": APP_URL, "X-Title": APP_NAME } response = requests.get(f"{OPENROUTER_BASE_URL}/models", headers=headers) response.raise_for_status() models_data = response.json()["data"] text_models = [m for m in models_data if "modalities" not in m or "video" not in (m.get("modalities") or [])] selected_model_default = None if DEFAULT_MODEL_ID: selected_model_default = next((m for m in text_models if m["id"] == DEFAULT_MODEL_ID), None) if not selected_model_default: console.print(f"[bold yellow]Warning: Default model '{DEFAULT_MODEL_ID}' unavailable. Use '/config model'.[/]") except Exception as e: models_data = [] text_models = [] app_logger.error(f"Failed to fetch models: {e}") # ============================================================================ # MCP INTEGRATION CLASSES # ============================================================================ class CrossPlatformMCPConfig: """Handle OS-specific MCP configuration.""" def __init__(self): self.system = platform.system() self.is_macos = self.system == "Darwin" self.is_linux = self.system == "Linux" self.is_windows = self.system == "Windows" app_logger.info(f"Detected OS: {self.system}") def get_default_allowed_dirs(self) -> List[Path]: """Get safe default directories.""" home = Path.home() if self.is_macos: return [ home / "Documents", home / "Desktop", home / "Downloads" ] elif self.is_linux: dirs = [home / "Documents"] try: for xdg_dir in ["DOCUMENTS", "DESKTOP", "DOWNLOAD"]: result = subprocess.run( ["xdg-user-dir", xdg_dir], capture_output=True, text=True, timeout=1 ) if result.returncode == 0: dir_path = Path(result.stdout.strip()) if dir_path.exists(): dirs.append(dir_path) except: dirs.extend([ home / "Desktop", home / "Downloads" ]) return list(set(dirs)) elif self.is_windows: return [ home / "Documents", home / "Desktop", home / "Downloads" ] return [home] def get_python_command(self) -> str: """Get Python command.""" import sys return sys.executable def get_filesystem_warning(self) -> str: """Get OS-specific security warning.""" if self.is_macos: return """ ⚠️ macOS Security Notice: The Filesystem MCP server needs access to your selected folder. You may see a security prompt - click 'Allow' to proceed. (System Settings > Privacy & Security > Files and Folders) """ elif self.is_linux: return """ ⚠️ Linux Security Notice: The Filesystem MCP server will access your selected folder. Ensure oAI has appropriate file permissions. """ elif self.is_windows: return """ ⚠️ Windows Security Notice: The Filesystem MCP server will access your selected folder. You may need to grant file access permissions. """ return "" def normalize_path(self, path: str) -> Path: """Normalize path for current OS.""" return Path(os.path.expanduser(path)).resolve() def is_system_directory(self, path: Path) -> bool: """Check if path is a system directory.""" path_str = str(path) for blocked in SYSTEM_DIRS_BLACKLIST: if path_str.startswith(blocked): return True return False def is_safe_path(self, requested_path: Path, allowed_dirs: List[Path]) -> bool: """Check if path is within allowed directories.""" try: requested = requested_path.resolve() for allowed in allowed_dirs: try: allowed_resolved = allowed.resolve() requested.relative_to(allowed_resolved) return True except ValueError: continue return False except: return False def get_folder_stats(self, folder: Path) -> Dict[str, Any]: """Get folder statistics.""" try: if not folder.exists() or not folder.is_dir(): return {'exists': False} file_count = 0 total_size = 0 for item in folder.rglob('*'): if item.is_file(): file_count += 1 try: total_size += item.stat().st_size except: pass return { 'exists': True, 'file_count': file_count, 'total_size': total_size, 'size_mb': total_size / (1024 * 1024) } except Exception as e: app_logger.error(f"Error getting folder stats for {folder}: {e}") return {'exists': False, 'error': str(e)} class GitignoreParser: """Parse .gitignore files and check if paths should be ignored.""" def __init__(self): self.patterns = [] # List of (pattern, is_negation, source_dir) def add_gitignore(self, gitignore_path: Path): """Parse and add patterns from a .gitignore file.""" if not gitignore_path.exists(): return try: source_dir = gitignore_path.parent with open(gitignore_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.rstrip('\n\r') # Skip empty lines and comments if not line or line.startswith('#'): continue # Check for negation pattern is_negation = line.startswith('!') if is_negation: line = line[1:] # Remove leading slash (make relative to gitignore location) if line.startswith('/'): line = line[1:] self.patterns.append((line, is_negation, source_dir)) app_logger.debug(f"Loaded {len(self.patterns)} patterns from {gitignore_path}") except Exception as e: app_logger.warning(f"Error reading {gitignore_path}: {e}") def should_ignore(self, path: Path) -> bool: """Check if a path should be ignored based on gitignore patterns.""" if not self.patterns: return False ignored = False for pattern, is_negation, source_dir in self.patterns: # Only apply pattern if path is under the source directory try: rel_path = path.relative_to(source_dir) except ValueError: # Path is not relative to this gitignore's directory continue rel_path_str = str(rel_path) # Check if pattern matches if self._match_pattern(pattern, rel_path_str, path.is_dir()): if is_negation: ignored = False # Negation patterns un-ignore else: ignored = True return ignored def _match_pattern(self, pattern: str, path: str, is_dir: bool) -> bool: """Match a gitignore pattern against a path.""" # Directory-only pattern (ends with /) if pattern.endswith('/'): if not is_dir: return False pattern = pattern[:-1] # ** matches any number of directories if '**' in pattern: # Convert ** to regex-like pattern pattern_parts = pattern.split('**') if len(pattern_parts) == 2: prefix, suffix = pattern_parts # Match if path starts with prefix and ends with suffix if prefix: if not path.startswith(prefix.rstrip('/')): return False if suffix: suffix = suffix.lstrip('/') if not (path.endswith(suffix) or f'/{suffix}' in path): return False return True # Direct match if fnmatch.fnmatch(path, pattern): return True # Match as subdirectory pattern if '/' not in pattern: # Pattern without / matches in any directory parts = path.split('/') if any(fnmatch.fnmatch(part, pattern) for part in parts): return True return False class SQLiteQueryValidator: """Validate SQLite queries for read-only safety.""" @staticmethod def is_safe_query(query: str) -> tuple[bool, str]: """ Validate that query is a safe read-only SELECT. Returns (is_safe, error_message) """ query_upper = query.strip().upper() # Must start with SELECT or WITH (for CTEs) if not (query_upper.startswith('SELECT') or query_upper.startswith('WITH')): return False, "Only SELECT queries are allowed (including WITH/CTE)" # Block dangerous keywords even in SELECT dangerous_keywords = [ 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'TRUNCATE', 'REPLACE', 'ATTACH', 'DETACH', 'PRAGMA', 'VACUUM', 'REINDEX' ] # Check for dangerous keywords (but allow them in string literals) # Simple check: look for keywords outside of quotes query_no_strings = re.sub(r"'[^']*'", '', query_upper) query_no_strings = re.sub(r'"[^"]*"', '', query_no_strings) for keyword in dangerous_keywords: if re.search(r'\b' + keyword + r'\b', query_no_strings): return False, f"Keyword '{keyword}' not allowed in read-only mode" return True, "" # P2 class MCPFilesystemServer: """MCP Filesystem Server with file access and SQLite database querying.""" def __init__(self, allowed_folders: List[Path]): self.allowed_folders = allowed_folders self.config = CrossPlatformMCPConfig() self.max_file_size = 10 * 1024 * 1024 # 10MB limit self.max_list_items = 1000 # Max items to return in list_directory self.respect_gitignore = True # Default enabled # Initialize gitignore parser self.gitignore_parser = GitignoreParser() self._load_gitignores() # SQLite configuration self.max_query_timeout = 5 # seconds self.max_query_results = 1000 # max rows self.default_query_limit = 100 # default rows app_logger.info(f"MCP Filesystem Server initialized with {len(allowed_folders)} folders") def _load_gitignores(self): """Load all .gitignore files from allowed folders.""" gitignore_count = 0 for folder in self.allowed_folders: if not folder.exists(): continue # Load root .gitignore root_gitignore = folder / '.gitignore' if root_gitignore.exists(): self.gitignore_parser.add_gitignore(root_gitignore) gitignore_count += 1 app_logger.info(f"Loaded .gitignore from {folder}") # Load nested .gitignore files try: for gitignore_path in folder.rglob('.gitignore'): if gitignore_path != root_gitignore: self.gitignore_parser.add_gitignore(gitignore_path) gitignore_count += 1 app_logger.debug(f"Loaded nested .gitignore: {gitignore_path}") except Exception as e: app_logger.warning(f"Error loading nested .gitignores from {folder}: {e}") if gitignore_count > 0: app_logger.info(f"Loaded {gitignore_count} .gitignore file(s) with {len(self.gitignore_parser.patterns)} total patterns") def reload_gitignores(self): """Reload all .gitignore files (call when allowed_folders changes).""" self.gitignore_parser = GitignoreParser() self._load_gitignores() app_logger.info("Reloaded .gitignore patterns") def is_allowed_path(self, path: Path) -> bool: """Check if path is allowed.""" return self.config.is_safe_path(path, self.allowed_folders) async def read_file(self, file_path: str) -> Dict[str, Any]: """Read file contents.""" try: path = self.config.normalize_path(file_path) if not self.is_allowed_path(path): error_msg = f"Access denied: {path} not in allowed folders" app_logger.warning(error_msg) log_mcp_stat('read_file', str(path.parent), False, error_msg) return {'error': error_msg} if not path.exists(): error_msg = f"File not found: {path}" app_logger.warning(error_msg) log_mcp_stat('read_file', str(path.parent), False, error_msg) return {'error': error_msg} if not path.is_file(): error_msg = f"Not a file: {path}" app_logger.warning(error_msg) log_mcp_stat('read_file', str(path.parent), False, error_msg) return {'error': error_msg} # Check if file should be ignored if self.respect_gitignore and self.gitignore_parser.should_ignore(path): error_msg = f"File ignored by .gitignore: {path}" app_logger.warning(error_msg) log_mcp_stat('read_file', str(path.parent), False, error_msg) return {'error': error_msg} file_size = path.stat().st_size if file_size > self.max_file_size: error_msg = f"File too large: {file_size / (1024*1024):.1f}MB (max: {self.max_file_size / (1024*1024):.0f}MB)" app_logger.warning(error_msg) log_mcp_stat('read_file', str(path.parent), False, error_msg) return {'error': error_msg} # Try to read as text try: content = path.read_text(encoding='utf-8') # If file is large (>50KB), provide summary instead of full content max_content_size = 50 * 1024 # 50KB if file_size > max_content_size: lines = content.split('\n') total_lines = len(lines) # Return first 500 lines + last 100 lines with notice head_lines = 500 tail_lines = 100 if total_lines > (head_lines + tail_lines): truncated_content = ( '\n'.join(lines[:head_lines]) + f"\n\n... [TRUNCATED: {total_lines - head_lines - tail_lines} lines omitted to stay within size limits] ...\n\n" + '\n'.join(lines[-tail_lines:]) ) app_logger.info(f"Read file (truncated): {path} ({file_size} bytes, {total_lines} lines)") log_mcp_stat('read_file', str(path.parent), True) return { 'content': truncated_content, 'path': str(path), 'size': file_size, 'truncated': True, 'total_lines': total_lines, 'lines_shown': head_lines + tail_lines, 'note': f'File truncated: showing first {head_lines} and last {tail_lines} lines of {total_lines} total' } app_logger.info(f"Read file: {path} ({file_size} bytes)") log_mcp_stat('read_file', str(path.parent), True) return { 'content': content, 'path': str(path), 'size': file_size } except UnicodeDecodeError: error_msg = f"Cannot decode file as UTF-8: {path}" app_logger.warning(error_msg) log_mcp_stat('read_file', str(path.parent), False, error_msg) return {'error': error_msg} except Exception as e: error_msg = f"Error reading file: {e}" app_logger.error(error_msg) log_mcp_stat('read_file', file_path, False, str(e)) return {'error': error_msg} async def list_directory(self, dir_path: str, recursive: bool = False) -> Dict[str, Any]: """List directory contents.""" try: path = self.config.normalize_path(dir_path) if not self.is_allowed_path(path): error_msg = f"Access denied: {path} not in allowed folders" app_logger.warning(error_msg) log_mcp_stat('list_directory', str(path), False, error_msg) return {'error': error_msg} if not path.exists(): error_msg = f"Directory not found: {path}" app_logger.warning(error_msg) log_mcp_stat('list_directory', str(path), False, error_msg) return {'error': error_msg} if not path.is_dir(): error_msg = f"Not a directory: {path}" app_logger.warning(error_msg) log_mcp_stat('list_directory', str(path), False, error_msg) return {'error': error_msg} items = [] pattern = '**/*' if recursive else '*' # Directories to skip (hardcoded common patterns) skip_dirs = { '.venv', 'venv', 'env', 'virtualenv', 'site-packages', 'dist-packages', '__pycache__', '.pytest_cache', '.mypy_cache', 'node_modules', '.git', '.svn', '.idea', '.vscode', 'build', 'dist', 'eggs', '.eggs' } def should_skip_path(item_path: Path) -> bool: """Check if path should be skipped.""" # Check hardcoded skip directories path_parts = item_path.parts if any(part in skip_dirs for part in path_parts): return True # Check gitignore patterns (if enabled) if self.respect_gitignore and self.gitignore_parser.should_ignore(item_path): return True return False for item in path.glob(pattern): # Stop if we hit the limit if len(items) >= self.max_list_items: break # Skip excluded directories if should_skip_path(item): continue try: stat = item.stat() items.append({ 'name': item.name, 'path': str(item), 'type': 'directory' if item.is_dir() else 'file', 'size': stat.st_size if item.is_file() else 0, 'modified': datetime.datetime.fromtimestamp(stat.st_mtime).isoformat() }) except: continue truncated = len(items) >= self.max_list_items app_logger.info(f"Listed directory: {path} ({len(items)} items, recursive={recursive}, truncated={truncated})") log_mcp_stat('list_directory', str(path), True) result = { 'path': str(path), 'items': items, 'count': len(items), 'truncated': truncated } if truncated: result['note'] = f'Results limited to {self.max_list_items} items. Use more specific search path or disable recursive mode.' return result except Exception as e: error_msg = f"Error listing directory: {e}" app_logger.error(error_msg) log_mcp_stat('list_directory', dir_path, False, str(e)) return {'error': error_msg} async def search_files(self, pattern: str, search_path: Optional[str] = None, content_search: bool = False) -> Dict[str, Any]: """Search for files.""" try: # Determine search roots if search_path: path = self.config.normalize_path(search_path) if not self.is_allowed_path(path): error_msg = f"Access denied: {path} not in allowed folders" app_logger.warning(error_msg) log_mcp_stat('search_files', str(path), False, error_msg) return {'error': error_msg} search_roots = [path] else: search_roots = self.allowed_folders matches = [] # Directories to skip (virtual environments, caches, etc.) skip_dirs = { '.venv', 'venv', 'env', 'virtualenv', 'site-packages', 'dist-packages', '__pycache__', '.pytest_cache', '.mypy_cache', 'node_modules', '.git', '.svn', '.idea', '.vscode', 'build', 'dist', 'eggs', '.eggs' } def should_skip_path(item_path: Path) -> bool: """Check if path should be skipped.""" # Check hardcoded skip directories path_parts = item_path.parts if any(part in skip_dirs for part in path_parts): return True # Check gitignore patterns (if enabled) if self.respect_gitignore and self.gitignore_parser.should_ignore(item_path): return True return False for root in search_roots: if not root.exists(): continue # Filename search if not content_search: for item in root.rglob(pattern): if item.is_file() and not should_skip_path(item): try: stat = item.stat() matches.append({ 'path': str(item), 'name': item.name, 'size': stat.st_size, 'modified': datetime.datetime.fromtimestamp(stat.st_mtime).isoformat() }) except: continue else: # Content search (slower) for item in root.rglob('*'): if item.is_file() and not should_skip_path(item): try: # Skip large files if item.stat().st_size > self.max_file_size: continue # Try to read and search content try: content = item.read_text(encoding='utf-8') if pattern.lower() in content.lower(): stat = item.stat() matches.append({ 'path': str(item), 'name': item.name, 'size': stat.st_size, 'modified': datetime.datetime.fromtimestamp(stat.st_mtime).isoformat() }) except: continue except: continue search_type = "content" if content_search else "filename" app_logger.info(f"Searched files: pattern='{pattern}', type={search_type}, found={len(matches)}") log_mcp_stat('search_files', str(search_roots[0]) if search_roots else None, True) return { 'pattern': pattern, 'search_type': search_type, 'matches': matches, 'count': len(matches) } except Exception as e: error_msg = f"Error searching files: {e}" app_logger.error(error_msg) log_mcp_stat('search_files', search_path or "all", False, str(e)) return {'error': error_msg} # ======================================================================== # SQLite DATABASE METHODS # ======================================================================== async def inspect_database(self, db_path: str, table_name: Optional[str] = None) -> Dict[str, Any]: """Inspect SQLite database schema.""" try: path = Path(db_path).resolve() if not path.exists(): error_msg = f"Database not found: {path}" app_logger.warning(error_msg) log_mcp_stat('inspect_database', str(path.parent), False, error_msg) return {'error': error_msg} if not path.is_file(): error_msg = f"Not a file: {path}" app_logger.warning(error_msg) log_mcp_stat('inspect_database', str(path.parent), False, error_msg) return {'error': error_msg} # Open database in read-only mode try: conn = sqlite3.connect(f'file:{path}?mode=ro', uri=True) cursor = conn.cursor() except sqlite3.DatabaseError as e: error_msg = f"Not a valid SQLite database: {path} ({e})" app_logger.warning(error_msg) log_mcp_stat('inspect_database', str(path.parent), False, error_msg) return {'error': error_msg} try: if table_name: # Get info for specific table cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,)) result = cursor.fetchone() if not result: return {'error': f"Table '{table_name}' not found"} create_sql = result[0] # Get column info cursor.execute(f"PRAGMA table_info({table_name})") columns = [] for row in cursor.fetchall(): columns.append({ 'id': row[0], 'name': row[1], 'type': row[2], 'not_null': bool(row[3]), 'default_value': row[4], 'primary_key': bool(row[5]) }) # Get indexes cursor.execute(f"PRAGMA index_list({table_name})") indexes = [] for row in cursor.fetchall(): indexes.append({ 'name': row[1], 'unique': bool(row[2]) }) # Get row count cursor.execute(f"SELECT COUNT(*) FROM {table_name}") row_count = cursor.fetchone()[0] app_logger.info(f"Inspected table: {table_name} in {path}") log_mcp_stat('inspect_database', str(path.parent), True) return { 'database': str(path), 'table': table_name, 'create_sql': create_sql, 'columns': columns, 'indexes': indexes, 'row_count': row_count } else: # Get all tables cursor.execute("SELECT name, sql FROM sqlite_master WHERE type='table' ORDER BY name") tables = [] for row in cursor.fetchall(): table = row[0] # Get row count cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] tables.append({ 'name': table, 'row_count': count }) # Get indexes cursor.execute("SELECT name, tbl_name FROM sqlite_master WHERE type='index' ORDER BY name") indexes = [{'name': row[0], 'table': row[1]} for row in cursor.fetchall()] # Get triggers cursor.execute("SELECT name, tbl_name FROM sqlite_master WHERE type='trigger' ORDER BY name") triggers = [{'name': row[0], 'table': row[1]} for row in cursor.fetchall()] # Get database size db_size = path.stat().st_size app_logger.info(f"Inspected database: {path} ({len(tables)} tables)") log_mcp_stat('inspect_database', str(path.parent), True) return { 'database': str(path), 'size': db_size, 'size_mb': db_size / (1024 * 1024), 'tables': tables, 'table_count': len(tables), 'indexes': indexes, 'triggers': triggers } finally: conn.close() except Exception as e: error_msg = f"Error inspecting database: {e}" app_logger.error(error_msg) log_mcp_stat('inspect_database', db_path, False, str(e)) return {'error': error_msg} async def search_database(self, db_path: str, search_term: str, table_name: Optional[str] = None, column_name: Optional[str] = None) -> Dict[str, Any]: """Search for a value across database tables.""" try: path = Path(db_path).resolve() if not path.exists(): error_msg = f"Database not found: {path}" app_logger.warning(error_msg) log_mcp_stat('search_database', str(path.parent), False, error_msg) return {'error': error_msg} # Open database in read-only mode try: conn = sqlite3.connect(f'file:{path}?mode=ro', uri=True) cursor = conn.cursor() except sqlite3.DatabaseError as e: error_msg = f"Not a valid SQLite database: {path} ({e})" app_logger.warning(error_msg) log_mcp_stat('search_database', str(path.parent), False, error_msg) return {'error': error_msg} try: matches = [] # Get tables to search if table_name: cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,)) tables = [row[0] for row in cursor.fetchall()] if not tables: return {'error': f"Table '{table_name}' not found"} else: cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] for table in tables: # Get columns for this table cursor.execute(f"PRAGMA table_info({table})") columns = [row[1] for row in cursor.fetchall()] # Filter columns if specified if column_name: if column_name not in columns: continue columns = [column_name] # Search in each column for column in columns: try: # Use LIKE for partial matching query = f"SELECT * FROM {table} WHERE {column} LIKE ? LIMIT {self.default_query_limit}" cursor.execute(query, (f'%{search_term}%',)) results = cursor.fetchall() if results: # Get column names col_names = [desc[0] for desc in cursor.description] for row in results: # Convert row to dict, handling binary data row_dict = {} for col_name, value in zip(col_names, row): if isinstance(value, bytes): # Convert bytes to hex string or base64 try: # Try to decode as UTF-8 first row_dict[col_name] = value.decode('utf-8') except UnicodeDecodeError: # If binary, show as hex string row_dict[col_name] = f"" else: row_dict[col_name] = value matches.append({ 'table': table, 'column': column, 'row': row_dict }) except sqlite3.Error: # Skip columns that can't be searched (e.g., BLOBs) continue app_logger.info(f"Searched database: {path} for '{search_term}' - found {len(matches)} matches") log_mcp_stat('search_database', str(path.parent), True) result = { 'database': str(path), 'search_term': search_term, 'matches': matches, 'count': len(matches) } if table_name: result['table_filter'] = table_name if column_name: result['column_filter'] = column_name if len(matches) >= self.default_query_limit: result['note'] = f'Results limited to {self.default_query_limit} matches. Use more specific search or query_database for full results.' return result finally: conn.close() except Exception as e: error_msg = f"Error searching database: {e}" app_logger.error(error_msg) log_mcp_stat('search_database', db_path, False, str(e)) return {'error': error_msg} async def query_database(self, db_path: str, query: str, limit: Optional[int] = None) -> Dict[str, Any]: """Execute a read-only SQL query on database.""" try: path = Path(db_path).resolve() if not path.exists(): error_msg = f"Database not found: {path}" app_logger.warning(error_msg) log_mcp_stat('query_database', str(path.parent), False, error_msg) return {'error': error_msg} # Validate query is_safe, error_msg = SQLiteQueryValidator.is_safe_query(query) if not is_safe: app_logger.warning(f"Unsafe query rejected: {error_msg}") log_mcp_stat('query_database', str(path.parent), False, error_msg) return {'error': error_msg} # Open database in read-only mode try: conn = sqlite3.connect(f'file:{path}?mode=ro', uri=True) cursor = conn.cursor() except sqlite3.DatabaseError as e: error_msg = f"Not a valid SQLite database: {path} ({e})" app_logger.warning(error_msg) log_mcp_stat('query_database', str(path.parent), False, error_msg) return {'error': error_msg} try: # Set query timeout def timeout_handler(signum, frame): raise TimeoutError(f"Query exceeded {self.max_query_timeout} second timeout") # Note: signal.alarm only works on Unix-like systems if hasattr(signal, 'SIGALRM'): signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(self.max_query_timeout) try: # Execute query cursor.execute(query) # Get results result_limit = min(limit or self.default_query_limit, self.max_query_results) results = cursor.fetchmany(result_limit) # Get column names columns = [desc[0] for desc in cursor.description] if cursor.description else [] # Convert to list of dicts, handling binary data rows = [] for row in results: row_dict = {} for col_name, value in zip(columns, row): if isinstance(value, bytes): # Convert bytes to readable format try: # Try to decode as UTF-8 first row_dict[col_name] = value.decode('utf-8') except UnicodeDecodeError: # If binary, show as hex string or summary row_dict[col_name] = f"" else: row_dict[col_name] = value rows.append(row_dict) # Check if more results available has_more = len(results) == result_limit if has_more: # Try to fetch one more to check one_more = cursor.fetchone() has_more = one_more is not None finally: if hasattr(signal, 'SIGALRM'): signal.alarm(0) # Cancel timeout app_logger.info(f"Executed query on {path}: returned {len(rows)} rows") log_mcp_stat('query_database', str(path.parent), True) result = { 'database': str(path), 'query': query, 'columns': columns, 'rows': rows, 'count': len(rows) } if has_more: result['truncated'] = True result['note'] = f'Results limited to {result_limit} rows. Use LIMIT clause in query for more control.' return result except TimeoutError as e: error_msg = str(e) app_logger.warning(error_msg) log_mcp_stat('query_database', str(path.parent), False, error_msg) return {'error': error_msg} except sqlite3.Error as e: error_msg = f"SQL error: {e}" app_logger.warning(error_msg) log_mcp_stat('query_database', str(path.parent), False, error_msg) return {'error': error_msg} finally: if hasattr(signal, 'SIGALRM'): signal.alarm(0) # Ensure timeout is cancelled conn.close() except Exception as e: error_msg = f"Error executing query: {e}" app_logger.error(error_msg) log_mcp_stat('query_database', db_path, False, str(e)) return {'error': error_msg} class MCPManager: """Manage MCP server lifecycle, tool calls, and mode switching.""" def __init__(self): self.enabled = False self.mode = "files" # "files" or "database" self.selected_db_index = None self.server: Optional[MCPFilesystemServer] = None # File/folder mode self.allowed_folders: List[Path] = [] # Database mode self.databases: List[Dict[str, Any]] = [] self.config = CrossPlatformMCPConfig() self.session_start_time: Optional[datetime.datetime] = None # Load persisted data self._load_folders() self._load_databases() app_logger.info("MCP Manager initialized") def _load_folders(self): """Load allowed folders from database.""" folders_json = get_mcp_config('allowed_folders') if folders_json: try: folder_paths = json.loads(folders_json) self.allowed_folders = [self.config.normalize_path(p) for p in folder_paths] app_logger.info(f"Loaded {len(self.allowed_folders)} folders from config") except Exception as e: app_logger.error(f"Error loading MCP folders: {e}") self.allowed_folders = [] def _save_folders(self): """Save allowed folders to database.""" folder_paths = [str(p) for p in self.allowed_folders] set_mcp_config('allowed_folders', json.dumps(folder_paths)) app_logger.info(f"Saved {len(self.allowed_folders)} folders to config") def _load_databases(self): """Load databases from database.""" create_table_if_not_exists() try: with sqlite3.connect(str(database)) as conn: cursor = conn.execute('SELECT id, path, name, size, tables, added_timestamp FROM mcp_databases ORDER BY id') self.databases = [] for row in cursor.fetchall(): tables_list = json.loads(row[4]) if row[4] else [] self.databases.append({ 'id': row[0], 'path': row[1], 'name': row[2], 'size': row[3], 'tables': tables_list, 'added': row[5] }) app_logger.info(f"Loaded {len(self.databases)} databases from config") except Exception as e: app_logger.error(f"Error loading databases: {e}") self.databases = [] def _save_database(self, db_info: Dict[str, Any]): """Save a database to config.""" create_table_if_not_exists() try: with sqlite3.connect(str(database)) as conn: conn.execute( 'INSERT INTO mcp_databases (path, name, size, tables, added_timestamp) VALUES (?, ?, ?, ?, ?)', (db_info['path'], db_info['name'], db_info['size'], json.dumps(db_info['tables']), db_info['added']) ) conn.commit() # Get the ID cursor = conn.execute('SELECT id FROM mcp_databases WHERE path = ?', (db_info['path'],)) db_info['id'] = cursor.fetchone()[0] app_logger.info(f"Saved database {db_info['name']} to config") except Exception as e: app_logger.error(f"Error saving database: {e}") def _remove_database_from_config(self, db_path: str): """Remove database from config.""" create_table_if_not_exists() try: with sqlite3.connect(str(database)) as conn: conn.execute('DELETE FROM mcp_databases WHERE path = ?', (db_path,)) conn.commit() app_logger.info(f"Removed database from config: {db_path}") except Exception as e: app_logger.error(f"Error removing database from config: {e}") def enable(self) -> Dict[str, Any]: """Enable MCP server.""" if not MCP_AVAILABLE: return { 'success': False, 'error': 'MCP library not installed. Run: pip install mcp' } if self.enabled: return { 'success': False, 'error': 'MCP is already enabled' } try: self.server = MCPFilesystemServer(self.allowed_folders) self.enabled = True self.session_start_time = datetime.datetime.now() set_mcp_config('mcp_enabled', 'true') app_logger.info("MCP Filesystem Server enabled") return { 'success': True, 'folder_count': len(self.allowed_folders), 'database_count': len(self.databases), 'message': 'MCP Filesystem Server started successfully' } except Exception as e: app_logger.error(f"Error enabling MCP: {e}") return { 'success': False, 'error': str(e) } def disable(self) -> Dict[str, Any]: """Disable MCP server.""" if not self.enabled: return { 'success': False, 'error': 'MCP is not enabled' } try: self.server = None self.enabled = False self.session_start_time = None self.mode = "files" self.selected_db_index = None set_mcp_config('mcp_enabled', 'false') app_logger.info("MCP Filesystem Server disabled") return { 'success': True, 'message': 'MCP Filesystem Server stopped' } except Exception as e: app_logger.error(f"Error disabling MCP: {e}") return { 'success': False, 'error': str(e) } def switch_mode(self, new_mode: str, db_index: Optional[int] = None) -> Dict[str, Any]: """Switch between files and database mode.""" if not self.enabled: return { 'success': False, 'error': 'MCP is not enabled. Use /mcp enable first' } if new_mode == "files": self.mode = "files" self.selected_db_index = None app_logger.info("Switched to file mode") return { 'success': True, 'mode': 'files', 'message': 'Switched to file mode', 'tools': ['read_file', 'list_directory', 'search_files'] } elif new_mode == "database": if db_index is None: return { 'success': False, 'error': 'Database index required. Use /mcp db ' } if db_index < 1 or db_index > len(self.databases): return { 'success': False, 'error': f'Invalid database number. Use 1-{len(self.databases)}' } self.mode = "database" self.selected_db_index = db_index - 1 # Convert to 0-based db = self.databases[self.selected_db_index] app_logger.info(f"Switched to database mode: {db['name']}") return { 'success': True, 'mode': 'database', 'database': db, 'message': f"Switched to database #{db_index}: {db['name']}", 'tools': ['inspect_database', 'search_database', 'query_database'] } else: return { 'success': False, 'error': f'Invalid mode: {new_mode}' } def add_folder(self, folder_path: str) -> Dict[str, Any]: """Add folder to allowed list.""" if not self.enabled: return { 'success': False, 'error': 'MCP is not enabled. Use /mcp enable first' } try: path = self.config.normalize_path(folder_path) # Validation checks if not path.exists(): return { 'success': False, 'error': f'Directory does not exist: {path}' } if not path.is_dir(): return { 'success': False, 'error': f'Not a directory: {path}' } if self.config.is_system_directory(path): return { 'success': False, 'error': f'Cannot add system directory: {path}' } # Check if already added if path in self.allowed_folders: return { 'success': False, 'error': f'Folder already in allowed list: {path}' } # Check for nested paths parent_folder = None for existing in self.allowed_folders: try: path.relative_to(existing) parent_folder = existing break except ValueError: continue # Get folder stats stats = self.config.get_folder_stats(path) # Add folder self.allowed_folders.append(path) self._save_folders() # Update server and reload gitignores if self.server: self.server.allowed_folders = self.allowed_folders self.server.reload_gitignores() app_logger.info(f"Added folder to MCP: {path}") result = { 'success': True, 'path': str(path), 'stats': stats, 'total_folders': len(self.allowed_folders) } if parent_folder: result['warning'] = f'Note: {parent_folder} is already allowed (parent folder)' return result except Exception as e: app_logger.error(f"Error adding folder: {e}") return { 'success': False, 'error': str(e) } def remove_folder(self, folder_ref: str) -> Dict[str, Any]: """Remove folder from allowed list (by path or number).""" if not self.enabled: return { 'success': False, 'error': 'MCP is not enabled. Use /mcp enable first' } try: # Check if it's a number if folder_ref.isdigit(): index = int(folder_ref) - 1 if 0 <= index < len(self.allowed_folders): path = self.allowed_folders[index] else: return { 'success': False, 'error': f'Invalid folder number: {folder_ref}' } else: # Treat as path path = self.config.normalize_path(folder_ref) if path not in self.allowed_folders: return { 'success': False, 'error': f'Folder not in allowed list: {path}' } # Remove folder self.allowed_folders.remove(path) self._save_folders() # Update server and reload gitignores if self.server: self.server.allowed_folders = self.allowed_folders self.server.reload_gitignores() app_logger.info(f"Removed folder from MCP: {path}") result = { 'success': True, 'path': str(path), 'total_folders': len(self.allowed_folders) } if len(self.allowed_folders) == 0: result['warning'] = 'No allowed folders remaining. Add one with /mcp add ' return result except Exception as e: app_logger.error(f"Error removing folder: {e}") return { 'success': False, 'error': str(e) } def add_database(self, db_path: str) -> Dict[str, Any]: """Add SQLite database.""" if not self.enabled: return { 'success': False, 'error': 'MCP is not enabled. Use /mcp enable first' } try: path = Path(db_path).resolve() # Validation if not path.exists(): return { 'success': False, 'error': f'Database file not found: {path}' } if not path.is_file(): return { 'success': False, 'error': f'Not a file: {path}' } # Check if already added if any(db['path'] == str(path) for db in self.databases): return { 'success': False, 'error': f'Database already added: {path.name}' } # Validate it's a SQLite database and get schema try: conn = sqlite3.connect(f'file:{path}?mode=ro', uri=True) cursor = conn.cursor() # Get tables cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") tables = [row[0] for row in cursor.fetchall()] conn.close() except sqlite3.DatabaseError as e: return { 'success': False, 'error': f'Not a valid SQLite database: {e}' } # Get file size db_size = path.stat().st_size # Create database entry db_info = { 'path': str(path), 'name': path.name, 'size': db_size, 'tables': tables, 'added': datetime.datetime.now().isoformat() } # Save to config self._save_database(db_info) # Add to list self.databases.append(db_info) app_logger.info(f"Added database: {path.name}") return { 'success': True, 'database': db_info, 'number': len(self.databases), 'message': f'Added database #{len(self.databases)}: {path.name}' } except Exception as e: app_logger.error(f"Error adding database: {e}") return { 'success': False, 'error': str(e) } def remove_database(self, db_ref: str) -> Dict[str, Any]: """Remove database (by number or path).""" if not self.enabled: return { 'success': False, 'error': 'MCP is not enabled' } try: # Check if it's a number if db_ref.isdigit(): index = int(db_ref) - 1 if 0 <= index < len(self.databases): db = self.databases[index] else: return { 'success': False, 'error': f'Invalid database number: {db_ref}' } else: # Treat as path path = str(Path(db_ref).resolve()) db = next((d for d in self.databases if d['path'] == path), None) if not db: return { 'success': False, 'error': f'Database not found: {db_ref}' } index = self.databases.index(db) # If currently selected, deselect if self.mode == "database" and self.selected_db_index == index: self.mode = "files" self.selected_db_index = None # Remove from config self._remove_database_from_config(db['path']) # Remove from list self.databases.pop(index) app_logger.info(f"Removed database: {db['name']}") result = { 'success': True, 'database': db, 'message': f"Removed database: {db['name']}" } if len(self.databases) == 0: result['warning'] = 'No databases remaining. Add one with /mcp add db ' return result except Exception as e: app_logger.error(f"Error removing database: {e}") return { 'success': False, 'error': str(e) } def list_databases(self) -> Dict[str, Any]: """List all databases.""" try: db_list = [] for idx, db in enumerate(self.databases, 1): db_info = { 'number': idx, 'name': db['name'], 'path': db['path'], 'size_mb': db['size'] / (1024 * 1024), 'tables': db['tables'], 'table_count': len(db['tables']), 'added': db['added'] } # Check if file still exists if not Path(db['path']).exists(): db_info['warning'] = 'File not found' db_list.append(db_info) return { 'success': True, 'databases': db_list, 'count': len(db_list) } except Exception as e: app_logger.error(f"Error listing databases: {e}") return { 'success': False, 'error': str(e) } def toggle_gitignore(self, enabled: bool) -> Dict[str, Any]: """Toggle .gitignore filtering.""" if not self.enabled: return { 'success': False, 'error': 'MCP is not enabled' } if not self.server: return { 'success': False, 'error': 'MCP server not running' } self.server.respect_gitignore = enabled status = "enabled" if enabled else "disabled" app_logger.info(f".gitignore filtering {status}") return { 'success': True, 'message': f'.gitignore filtering {status}', 'pattern_count': len(self.server.gitignore_parser.patterns) if enabled else 0 } def list_folders(self) -> Dict[str, Any]: """List all allowed folders with stats.""" try: folders_info = [] total_files = 0 total_size = 0 for idx, folder in enumerate(self.allowed_folders, 1): stats = self.config.get_folder_stats(folder) folder_info = { 'number': idx, 'path': str(folder), 'exists': stats.get('exists', False) } if stats.get('exists'): folder_info['file_count'] = stats['file_count'] folder_info['size_mb'] = stats['size_mb'] total_files += stats['file_count'] total_size += stats['total_size'] folders_info.append(folder_info) return { 'success': True, 'folders': folders_info, 'total_folders': len(self.allowed_folders), 'total_files': total_files, 'total_size_mb': total_size / (1024 * 1024) } except Exception as e: app_logger.error(f"Error listing folders: {e}") return { 'success': False, 'error': str(e) } def get_status(self) -> Dict[str, Any]: """Get comprehensive MCP status.""" try: stats = get_mcp_stats() uptime = None if self.session_start_time: delta = datetime.datetime.now() - self.session_start_time hours = delta.seconds // 3600 minutes = (delta.seconds % 3600) // 60 uptime = f"{hours}h {minutes}m" if hours > 0 else f"{minutes}m" folder_info = self.list_folders() gitignore_status = "enabled" if (self.server and self.server.respect_gitignore) else "disabled" gitignore_patterns = len(self.server.gitignore_parser.patterns) if self.server else 0 # Current mode info mode_info = { 'mode': self.mode, 'mode_display': '🔧 Files' if self.mode == 'files' else f'🗄️ DB #{self.selected_db_index + 1}' } if self.mode == 'database' and self.selected_db_index is not None: db = self.databases[self.selected_db_index] mode_info['database'] = db return { 'success': True, 'enabled': self.enabled, 'uptime': uptime, 'mode_info': mode_info, 'folder_count': len(self.allowed_folders), 'database_count': len(self.databases), 'total_files': folder_info.get('total_files', 0), 'total_size_mb': folder_info.get('total_size_mb', 0), 'stats': stats, 'tools_available': self._get_current_tools(), 'gitignore_status': gitignore_status, 'gitignore_patterns': gitignore_patterns } except Exception as e: app_logger.error(f"Error getting MCP status: {e}") return { 'success': False, 'error': str(e) } def _get_current_tools(self) -> List[str]: """Get tools available in current mode.""" if not self.enabled: return [] if self.mode == "files": return ['read_file', 'list_directory', 'search_files'] elif self.mode == "database": return ['inspect_database', 'search_database', 'query_database'] return [] async def call_tool(self, tool_name: str, **kwargs) -> Dict[str, Any]: """Call an MCP tool.""" if not self.enabled or not self.server: return { 'error': 'MCP is not enabled' } try: # File mode tools if tool_name == 'read_file': return await self.server.read_file(kwargs.get('file_path', '')) elif tool_name == 'list_directory': return await self.server.list_directory( kwargs.get('dir_path', ''), kwargs.get('recursive', False) ) elif tool_name == 'search_files': return await self.server.search_files( kwargs.get('pattern', ''), kwargs.get('search_path'), kwargs.get('content_search', False) ) # Database mode tools elif tool_name == 'inspect_database': if self.mode != 'database' or self.selected_db_index is None: return {'error': 'Not in database mode. Use /mcp db first'} db = self.databases[self.selected_db_index] return await self.server.inspect_database( db['path'], kwargs.get('table_name') ) elif tool_name == 'search_database': if self.mode != 'database' or self.selected_db_index is None: return {'error': 'Not in database mode. Use /mcp db first'} db = self.databases[self.selected_db_index] return await self.server.search_database( db['path'], kwargs.get('search_term', ''), kwargs.get('table_name'), kwargs.get('column_name') ) elif tool_name == 'query_database': if self.mode != 'database' or self.selected_db_index is None: return {'error': 'Not in database mode. Use /mcp db first'} db = self.databases[self.selected_db_index] return await self.server.query_database( db['path'], kwargs.get('query', ''), kwargs.get('limit') ) else: return { 'error': f'Unknown tool: {tool_name}' } except Exception as e: app_logger.error(f"Error calling MCP tool {tool_name}: {e}") return { 'error': str(e) } def get_tools_schema(self) -> List[Dict[str, Any]]: """Get MCP tools as OpenAI function calling schema (for current mode).""" if not self.enabled: return [] if self.mode == "files": return self._get_file_tools_schema() elif self.mode == "database": return self._get_database_tools_schema() return [] def _get_file_tools_schema(self) -> List[Dict[str, Any]]: """Get file mode tools schema.""" if len(self.allowed_folders) == 0: return [] allowed_dirs_str = ", ".join(str(f) for f in self.allowed_folders) return [ { "type": "function", "function": { "name": "search_files", "description": f"Search for files in the user's local filesystem. Allowed directories: {allowed_dirs_str}. Can search by filename pattern (e.g., '*.py' for Python files) or search within file contents. Automatically respects .gitignore patterns and excludes virtual environments.", "parameters": { "type": "object", "properties": { "pattern": { "type": "string", "description": "Search pattern. For filename search, use glob patterns like '*.py', '*.txt', 'report*'. For content search, use plain text to search for." }, "content_search": { "type": "boolean", "description": "If true, searches inside file contents for the pattern (SLOW - use only when specifically asked to search file contents). If false, searches only filenames (FAST - use for finding files by name). Default is false. Virtual environments and package directories are automatically excluded.", "default": False }, "search_path": { "type": "string", "description": "Optional: specific directory to search in. If not provided, searches all allowed directories.", } }, "required": ["pattern"] } } }, { "type": "function", "function": { "name": "read_file", "description": "Read the complete contents of a text file from the user's local filesystem. Only works for files within allowed directories. Maximum file size: 10MB. Files larger than 50KB are automatically truncated. Respects .gitignore patterns.", "parameters": { "type": "object", "properties": { "file_path": { "type": "string", "description": "Full path to the file to read (e.g., /Users/username/Documents/report.txt or ~/Documents/notes.md)" } }, "required": ["file_path"] } } }, { "type": "function", "function": { "name": "list_directory", "description": "List all files and subdirectories in a directory from the user's local filesystem. Only works for directories within allowed paths. Automatically filters out virtual environments, build artifacts, and .gitignore patterns. Limited to 1000 items.", "parameters": { "type": "object", "properties": { "dir_path": { "type": "string", "description": "Directory path to list (e.g., ~/Documents or /Users/username/Projects)" }, "recursive": { "type": "boolean", "description": "If true, lists all files in subdirectories recursively. If false, lists only immediate children. Default is true. WARNING: Recursive listings can be very large - use specific paths when possible.", "default": True } }, "required": ["dir_path"] } } } ] def _get_database_tools_schema(self) -> List[Dict[str, Any]]: """Get database mode tools schema.""" if self.selected_db_index is None or self.selected_db_index >= len(self.databases): return [] db = self.databases[self.selected_db_index] db_name = db['name'] tables_str = ", ".join(db['tables']) return [ { "type": "function", "function": { "name": "inspect_database", "description": f"Inspect the schema of the currently selected SQLite database ({db_name}). Can get all tables or details for a specific table including columns, types, indexes, and row counts. Available tables: {tables_str}.", "parameters": { "type": "object", "properties": { "table_name": { "type": "string", "description": f"Optional: specific table to inspect. If not provided, returns info for all tables. Available: {tables_str}" } }, "required": [] } } }, { "type": "function", "function": { "name": "search_database", "description": f"Search for a value across all tables in the database ({db_name}). Performs a LIKE search (partial matching) across all columns or specific table/column. Returns matching rows with all column data. Limited to {self.server.default_query_limit} results.", "parameters": { "type": "object", "properties": { "search_term": { "type": "string", "description": "Value to search for (partial match supported)" }, "table_name": { "type": "string", "description": f"Optional: limit search to specific table. Available: {tables_str}" }, "column_name": { "type": "string", "description": "Optional: limit search to specific column within the table" } }, "required": ["search_term"] } } }, { "type": "function", "function": { "name": "query_database", "description": f"Execute a read-only SQL query on the database ({db_name}). Supports SELECT queries including JOINs, subqueries, CTEs, aggregations, etc. Maximum {self.server.max_query_results} rows returned. Query timeout: {self.server.max_query_timeout} seconds. INSERT/UPDATE/DELETE/DROP are blocked for safety.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": f"SQL SELECT query to execute. Available tables: {tables_str}. Example: SELECT * FROM table WHERE column = 'value' LIMIT 10" }, "limit": { "type": "integer", "description": f"Optional: maximum rows to return (default {self.server.default_query_limit}, max {self.server.max_query_results}). You can also use LIMIT in your query." } }, "required": ["query"] } } } ] # Global MCP manager mcp_manager = MCPManager() # ============================================================================ # HELPER FUNCTIONS # ============================================================================ def supports_function_calling(model: Dict[str, Any]) -> bool: """Check if model supports function calling.""" supported_params = model.get("supported_parameters", []) return "tools" in supported_params or "functions" in supported_params def check_for_updates(current_version: str) -> str: """Check for updates.""" try: response = requests.get( 'https://gitlab.pm/api/v1/repos/rune/oai/releases/latest', headers={"Content-Type": "application/json"}, timeout=1.0, allow_redirects=True ) response.raise_for_status() data = response.json() version_online = data.get('tag_name', '').lstrip('v') if not version_online: logger.warning("No version found in API response") return f"[bold green]oAI version {current_version}[/]" current = pkg_version.parse(current_version) latest = pkg_version.parse(version_online) if latest > current: logger.info(f"Update available: {current_version} → {version_online}") return f"[bold green]oAI version {current_version} [/][bold red](Update available: {current_version} → {version_online})[/]" else: logger.debug(f"Already up to date: {current_version}") return f"[bold green]oAI version {current_version} (up to date)[/]" except: return f"[bold green]oAI version {current_version}[/]" def save_conversation(name: str, data: List[Dict[str, str]]): """Save conversation.""" timestamp = datetime.datetime.now().isoformat() data_json = json.dumps(data) with sqlite3.connect(str(database)) as conn: conn.execute('INSERT INTO conversation_sessions (name, timestamp, data) VALUES (?, ?, ?)', (name, timestamp, data_json)) conn.commit() def load_conversation(name: str) -> Optional[List[Dict[str, str]]]: """Load conversation.""" with sqlite3.connect(str(database)) as conn: cursor = conn.execute('SELECT data FROM conversation_sessions WHERE name = ? ORDER BY timestamp DESC LIMIT 1', (name,)) result = cursor.fetchone() if result: return json.loads(result[0]) return None def delete_conversation(name: str) -> int: """Delete conversation.""" with sqlite3.connect(str(database)) as conn: cursor = conn.execute('DELETE FROM conversation_sessions WHERE name = ?', (name,)) conn.commit() return cursor.rowcount def list_conversations() -> List[Dict[str, Any]]: """List conversations.""" with sqlite3.connect(str(database)) as conn: cursor = conn.execute(''' SELECT name, MAX(timestamp) as last_saved, data FROM conversation_sessions GROUP BY name ORDER BY last_saved DESC ''') conversations = [] for row in cursor.fetchall(): name, timestamp, data_json = row data = json.loads(data_json) conversations.append({ 'name': name, 'timestamp': timestamp, 'message_count': len(data) }) return conversations def estimate_cost(input_tokens: int, output_tokens: int) -> float: """Estimate cost.""" return (input_tokens * MODEL_PRICING['input'] / 1_000_000) + (output_tokens * MODEL_PRICING['output'] / 1_000_000) def has_web_search_capability(model: Dict[str, Any]) -> bool: """Check web search capability.""" supported_params = model.get("supported_parameters", []) return "tools" in supported_params def has_image_capability(model: Dict[str, Any]) -> bool: """Check image capability.""" architecture = model.get("architecture", {}) input_modalities = architecture.get("input_modalities", []) return "image" in input_modalities def supports_online_mode(model: Dict[str, Any]) -> bool: """Check online mode support.""" return has_web_search_capability(model) def get_effective_model_id(base_model_id: str, online_enabled: bool) -> str: """Get effective model ID.""" if online_enabled and not base_model_id.endswith(':online'): return f"{base_model_id}:online" return base_model_id def export_as_markdown(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str: """Export as Markdown.""" lines = ["# Conversation Export", ""] if session_system_prompt: lines.extend([f"**System Prompt:** {session_system_prompt}", ""]) lines.append(f"**Export Date:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") lines.append("") lines.append("---") lines.append("") for i, entry in enumerate(session_history, 1): lines.append(f"## Message {i}") lines.append("") lines.append("**User:**") lines.append("") lines.append(entry['prompt']) lines.append("") lines.append("**Assistant:**") lines.append("") lines.append(entry['response']) lines.append("") lines.append("---") lines.append("") return "\n".join(lines) def export_as_json(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str: """Export as JSON.""" export_data = { "export_date": datetime.datetime.now().isoformat(), "system_prompt": session_system_prompt, "message_count": len(session_history), "messages": session_history } return json.dumps(export_data, indent=2, ensure_ascii=False) def export_as_html(session_history: List[Dict[str, str]], session_system_prompt: str = "") -> str: """Export as HTML.""" def escape_html(text): return text.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') html_parts = [ "", "", "", " ", " ", " Conversation Export", " ", "", "", "
", "

💬 Conversation Export

", f"
📅 Exported: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
", f"
📊 Total Messages: {len(session_history)}
", "
", ] if session_system_prompt: html_parts.extend([ "
", " ⚙️ System Prompt", f"
{escape_html(session_system_prompt)}
", "
", ]) for i, entry in enumerate(session_history, 1): html_parts.extend([ "
", f"
Message {i} of {len(session_history)}
", "
", "
👤 User
", f"
{escape_html(entry['prompt'])}
", "
", "
", "
🤖 Assistant
", f"
{escape_html(entry['response'])}
", "
", "
", ]) html_parts.extend([ "
", "

Generated by oAI Chat • https://iurl.no/oai

", "
", "", "", ]) return "\n".join(html_parts) def show_command_help(command_or_topic: str): """Display detailed help for command or topic.""" # Handle topics (like 'mcp') if not command_or_topic.startswith('/'): if command_or_topic.lower() == 'mcp': help_data = COMMAND_HELP['mcp'] help_content = [] help_content.append(f"[bold cyan]Description:[/]") help_content.append(help_data['description']) help_content.append("") help_content.append(help_data['notes']) console.print(Panel( "\n".join(help_content), title="[bold green]MCP - Model Context Protocol Guide[/]", title_align="left", border_style="green", width=console.width - 4 )) app_logger.info("Displayed MCP comprehensive guide") return else: command_or_topic = '/' + command_or_topic if command_or_topic not in COMMAND_HELP: console.print(f"[bold red]Unknown command: {command_or_topic}[/]") console.print("[bold yellow]Type /help to see all available commands.[/]") console.print("[bold yellow]Type /help mcp for comprehensive MCP guide.[/]") app_logger.warning(f"Help requested for unknown command: {command_or_topic}") return help_data = COMMAND_HELP[command_or_topic] help_content = [] if 'aliases' in help_data: aliases_str = ", ".join(help_data['aliases']) help_content.append(f"[bold cyan]Aliases:[/] {aliases_str}") help_content.append("") help_content.append(f"[bold cyan]Description:[/]") help_content.append(help_data['description']) help_content.append("") help_content.append(f"[bold cyan]Usage:[/]") help_content.append(f"[yellow]{help_data['usage']}[/]") help_content.append("") if 'examples' in help_data and help_data['examples']: help_content.append(f"[bold cyan]Examples:[/]") for desc, example in help_data['examples']: if not desc and not example: help_content.append("") elif desc.startswith('━━━'): help_content.append(f"[bold yellow]{desc}[/]") else: help_content.append(f" [dim]{desc}:[/]" if desc else "") help_content.append(f" [green]{example}[/]" if example else "") help_content.append("") if 'notes' in help_data: help_content.append(f"[bold cyan]Notes:[/]") help_content.append(f"[dim]{help_data['notes']}[/]") console.print(Panel( "\n".join(help_content), title=f"[bold green]Help: {command_or_topic}[/]", title_align="left", border_style="green", width=console.width - 4 )) app_logger.info(f"Displayed detailed help for command: {command_or_topic}") def get_credits(api_key: str, base_url: str = OPENROUTER_BASE_URL) -> Optional[Dict[str, str]]: """Get credits.""" if not api_key: return None url = f"{base_url}/credits" headers = { "Authorization": f"Bearer {api_key}", "HTTP-Referer": APP_URL, "X-Title": APP_NAME } try: response = requests.get(url, headers=headers) response.raise_for_status() data = response.json().get('data', {}) total_credits = float(data.get('total_credits', 0)) total_usage = float(data.get('total_usage', 0)) credits_left = total_credits - total_usage return { 'total_credits': f"${total_credits:.2f}", 'used_credits': f"${total_usage:.2f}", 'credits_left': f"${credits_left:.2f}" } except Exception as e: console.print(f"[bold red]Error fetching credits: {e}[/]") return None def check_credit_alerts(credits_data: Optional[Dict[str, str]]) -> List[str]: """Check credit alerts.""" alerts = [] if credits_data: credits_left_value = float(credits_data['credits_left'].strip('$')) total_credits_value = float(credits_data['total_credits'].strip('$')) if credits_left_value < LOW_CREDIT_AMOUNT: alerts.append(f"Critical credit alert: Less than ${LOW_CREDIT_AMOUNT:.2f} left ({credits_data['credits_left']})") elif credits_left_value < total_credits_value * LOW_CREDIT_RATIO: alerts.append(f"Low credit alert: Credits left < 10% of total ({credits_data['credits_left']})") return alerts def clear_screen(): """Clear screen.""" try: print("\033[H\033[J", end="", flush=True) except: print("\n" * 100) def display_paginated_table(table: Table, title: str): """Display paginated table.""" try: terminal_height = os.get_terminal_size().lines - 8 except: terminal_height = 20 from rich.segment import Segment segments = list(console.render(table)) current_line_segments = [] all_lines = [] for segment in segments: if segment.text == '\n': all_lines.append(current_line_segments) current_line_segments = [] else: current_line_segments.append(segment) if current_line_segments: all_lines.append(current_line_segments) total_lines = len(all_lines) if total_lines <= terminal_height: console.print(Panel(table, title=title, title_align="left")) return header_lines = [] data_lines = [] header_end_index = 0 found_header_text = False for i, line_segments in enumerate(all_lines): has_header_style = any( seg.style and ('bold' in str(seg.style) or 'magenta' in str(seg.style)) for seg in line_segments ) if has_header_style: found_header_text = True if found_header_text and i > 0: line_text = ''.join(seg.text for seg in line_segments) if any(char in line_text for char in ['─', '━', '┼', '╪', '┤', '├']): header_end_index = i break if header_end_index > 0: header_lines = all_lines[:header_end_index + 1] data_lines = all_lines[header_end_index + 1:] else: header_lines = all_lines[:min(3, len(all_lines))] data_lines = all_lines[min(3, len(all_lines)):] lines_per_page = terminal_height - len(header_lines) current_line = 0 page_number = 1 while current_line < len(data_lines): clear_screen() console.print(f"[bold cyan]{title} (Page {page_number})[/]") for line_segments in header_lines: for segment in line_segments: console.print(segment.text, style=segment.style, end="") console.print() end_line = min(current_line + lines_per_page, len(data_lines)) for line_segments in data_lines[current_line:end_line]: for segment in line_segments: console.print(segment.text, style=segment.style, end="") console.print() current_line = end_line page_number += 1 if current_line < len(data_lines): console.print(f"\n[dim yellow]--- Press SPACE for next page, or any other key to finish (Page {page_number - 1}, showing {end_line}/{len(data_lines)} data rows) ---[/dim yellow]") try: import sys import tty import termios fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(fd) char = sys.stdin.read(1) if char != ' ': break finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) except: input_char = input().strip() if input_char != '': break else: break # P3 # ============================================================================ # MAIN CHAT FUNCTION WITH FULL MCP SUPPORT (FILES + DATABASE) # ============================================================================ @app.command() def chat(): global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED, MAX_TOKEN, COST_WARNING_THRESHOLD, DEFAULT_ONLINE_MODE, LOG_MAX_SIZE_MB, LOG_BACKUP_COUNT, LOG_LEVEL, LOG_LEVEL_STR, app_logger session_max_token = 0 session_system_prompt = "" session_history = [] current_index = -1 total_input_tokens = 0 total_output_tokens = 0 total_cost = 0.0 message_count = 0 middle_out_enabled = False conversation_memory_enabled = True memory_start_index = 0 saved_conversations_cache = [] online_mode_enabled = DEFAULT_ONLINE_MODE == "on" app_logger.info("Starting new chat session with memory enabled") if not API_KEY: console.print("[bold red]API key not found. Use '/config api'.[/]") try: new_api_key = typer.prompt("Enter API key") if new_api_key.strip(): set_config('api_key', new_api_key.strip()) API_KEY = new_api_key.strip() console.print("[bold green]API key saved. Re-run.[/]") else: raise typer.Exit() except: console.print("[bold red]No API key. Exiting.[/]") raise typer.Exit() if not text_models: console.print("[bold red]No models available. Check API key/URL.[/]") raise typer.Exit() credits_data = get_credits(API_KEY, OPENROUTER_BASE_URL) startup_credit_alerts = check_credit_alerts(credits_data) if startup_credit_alerts: startup_alert_msg = " | ".join(startup_credit_alerts) console.print(f"[bold red]⚠️ Startup {startup_alert_msg}[/]") app_logger.warning(f"Startup credit alerts: {startup_alert_msg}") selected_model = selected_model_default client = OpenRouter(api_key=API_KEY) if selected_model: online_status = "enabled" if online_mode_enabled else "disabled" console.print(f"[bold blue]Welcome to oAI![/] [bold red]Active model: {selected_model['name']}[/] [dim cyan](Online mode: {online_status})[/]") else: console.print("[bold blue]Welcome to oAI![/] [italic blue]Select a model with '/model'.[/]") if not selected_model: console.print("[bold yellow]No model selected. Use '/model'.[/]") session = PromptSession(history=FileHistory(str(history_file))) while True: try: # ============================================================ # BUILD PROMPT PREFIX WITH MODE INDICATOR # ============================================================ prompt_prefix = "You> " if mcp_manager.enabled: if mcp_manager.mode == "files": prompt_prefix = "[🔧 MCP: Files] You> " elif mcp_manager.mode == "database" and mcp_manager.selected_db_index is not None: db = mcp_manager.databases[mcp_manager.selected_db_index] prompt_prefix = f"[🗄️ MCP: DB #{mcp_manager.selected_db_index + 1}] You> " # ============================================================ # INITIALIZE LOOP VARIABLES # ============================================================ text_part = "" file_attachments = [] content_blocks = [] user_input = session.prompt(prompt_prefix, auto_suggest=AutoSuggestFromHistory()).strip() # Handle escape sequence if user_input.startswith("//"): user_input = user_input[1:] # Check unknown commands elif user_input.startswith("/") and user_input.lower() not in ["exit", "quit", "bye"]: command_word = user_input.split()[0].lower() if user_input.split() else user_input.lower() if not any(command_word.startswith(cmd) for cmd in VALID_COMMANDS): console.print(f"[bold red]Unknown command: {command_word}[/]") console.print("[bold yellow]Type /help to see all available commands.[/]") app_logger.warning(f"Unknown command attempted: {command_word}") continue if user_input.lower() in ["exit", "quit", "bye"]: total_tokens = total_input_tokens + total_output_tokens app_logger.info(f"Session ended. Total messages: {message_count}, Total tokens: {total_tokens}, Total cost: ${total_cost:.4f}") console.print("[bold yellow]Goodbye![/]") return # ============================================================ # MCP COMMANDS # ============================================================ if user_input.lower().startswith("/mcp"): parts = user_input[5:].strip().split(maxsplit=1) mcp_command = parts[0].lower() if parts else "" mcp_args = parts[1] if len(parts) > 1 else "" if mcp_command == "enable": result = mcp_manager.enable() if result['success']: console.print(f"[bold green]✓ {result['message']}[/]") if result['folder_count'] == 0 and result['database_count'] == 0: console.print("\n[bold yellow]No folders or databases configured yet.[/]") console.print("\n[bold cyan]To get started:[/]") console.print(" [bold yellow]Files:[/] /mcp add ~/Documents") console.print(" [bold yellow]Databases:[/] /mcp add db ~/app/data.db") else: folders_msg = f"{result['folder_count']} folder(s)" if result['folder_count'] else "no folders" dbs_msg = f"{result['database_count']} database(s)" if result['database_count'] else "no databases" console.print(f"\n[bold cyan]MCP active with {folders_msg} and {dbs_msg}.[/]") warning = mcp_manager.config.get_filesystem_warning() if warning: console.print(warning) else: console.print(f"[bold red]❌ {result['error']}[/]") continue elif mcp_command == "disable": result = mcp_manager.disable() if result['success']: console.print(f"[bold green]✓ {result['message']}[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") continue elif mcp_command == "add": # Check if it's a database or folder if mcp_args.startswith("db "): # Database db_path = mcp_args[3:].strip() if not db_path: console.print("[bold red]Usage: /mcp add db [/]") continue result = mcp_manager.add_database(db_path) if result['success']: db = result['database'] console.print("\n[bold yellow]⚠️ Database Check:[/]") console.print(f"Adding: [bold]{db['name']}[/]") console.print(f"Size: {db['size'] / (1024 * 1024):.2f} MB") console.print(f"Tables: {', '.join(db['tables'])} ({len(db['tables'])} total)") if db['size'] > 100 * 1024 * 1024: # > 100MB console.print("\n[bold yellow]⚠️ Large database detected! Queries may be slow.[/]") console.print("\nMCP will be able to:") console.print(" [green]✓[/green] Inspect database schema") console.print(" [green]✓[/green] Search for data across tables") console.print(" [green]✓[/green] Execute read-only SQL queries") console.print(" [red]✗[/red] Modify data (read-only mode)") try: confirm = typer.confirm("\nProceed?", default=True) if not confirm: # Remove it mcp_manager.remove_database(str(result['number'])) console.print("[bold yellow]Cancelled. Database not added.[/]") continue except (EOFError, KeyboardInterrupt): mcp_manager.remove_database(str(result['number'])) console.print("\n[bold yellow]Cancelled. Database not added.[/]") continue console.print(f"\n[bold green]✓ {result['message']}[/]") console.print(f"\n[bold cyan]Use '/mcp db {result['number']}' to start querying this database.[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") else: # Folder folder_path = mcp_args if not folder_path: console.print("[bold red]Usage: /mcp add or /mcp add db [/]") continue result = mcp_manager.add_folder(folder_path) if result['success']: stats = result['stats'] console.print("\n[bold yellow]⚠️ Security Check:[/]") console.print(f"You are granting MCP access to: [bold]{result['path']}[/]") if stats.get('exists'): file_count = stats['file_count'] size_mb = stats['size_mb'] console.print(f"This folder contains: [bold]{file_count} files ({size_mb:.1f} MB)[/]") console.print("\nMCP will be able to:") console.print(" [green]✓[/green] Read files in this folder") console.print(" [green]✓[/green] List and search files") console.print(" [green]✓[/green] Access subfolders recursively") console.print(" [green]✓[/green] Automatically respect .gitignore patterns") console.print(" [red]✗[/red] Delete or modify files (read-only)") try: confirm = typer.confirm("\nProceed?", default=True) if not confirm: mcp_manager.allowed_folders.remove(mcp_manager.config.normalize_path(folder_path)) mcp_manager._save_folders() console.print("[bold yellow]Cancelled. Folder not added.[/]") continue except (EOFError, KeyboardInterrupt): mcp_manager.allowed_folders.remove(mcp_manager.config.normalize_path(folder_path)) mcp_manager._save_folders() console.print("\n[bold yellow]Cancelled. Folder not added.[/]") continue console.print(f"\n[bold green]✓ Added {result['path']} to MCP allowed folders[/]") console.print(f"[dim cyan]MCP now has access to {result['total_folders']} folder(s) total.[/]") if result.get('warning'): console.print(f"\n[bold yellow]⚠️ {result['warning']}[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") continue elif mcp_command in ["remove", "rem"]: # Check if it's a database or folder if mcp_args.startswith("db "): # Database db_ref = mcp_args[3:].strip() if not db_ref: console.print("[bold red]Usage: /mcp remove db [/]") continue result = mcp_manager.remove_database(db_ref) if result['success']: console.print(f"\n[bold yellow]Removing: {result['database']['name']}[/]") try: confirm = typer.confirm("Confirm removal?", default=False) if not confirm: console.print("[bold yellow]Cancelled.[/]") # Re-add it mcp_manager.databases.append(result['database']) mcp_manager._save_database(result['database']) continue except (EOFError, KeyboardInterrupt): console.print("\n[bold yellow]Cancelled.[/]") mcp_manager.databases.append(result['database']) mcp_manager._save_database(result['database']) continue console.print(f"\n[bold green]✓ {result['message']}[/]") if result.get('warning'): console.print(f"\n[bold yellow]⚠️ {result['warning']}[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") else: # Folder if not mcp_args: console.print("[bold red]Usage: /mcp remove or /mcp remove db [/]") continue result = mcp_manager.remove_folder(mcp_args) if result['success']: console.print(f"\n[bold yellow]Removing: {result['path']}[/]") try: confirm = typer.confirm("Confirm removal?", default=False) if not confirm: mcp_manager.allowed_folders.append(mcp_manager.config.normalize_path(result['path'])) mcp_manager._save_folders() console.print("[bold yellow]Cancelled. Folder not removed.[/]") continue except (EOFError, KeyboardInterrupt): mcp_manager.allowed_folders.append(mcp_manager.config.normalize_path(result['path'])) mcp_manager._save_folders() console.print("\n[bold yellow]Cancelled. Folder not removed.[/]") continue console.print(f"\n[bold green]✓ Removed {result['path']} from MCP allowed folders[/]") console.print(f"[dim cyan]MCP now has access to {result['total_folders']} folder(s) total.[/]") if result.get('warning'): console.print(f"\n[bold yellow]⚠️ {result['warning']}[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") continue elif mcp_command == "list": result = mcp_manager.list_folders() if result['success']: if result['total_folders'] == 0: console.print("[bold yellow]No folders configured.[/]") console.print("\n[bold cyan]Add a folder with: /mcp add ~/Documents[/]") continue status_indicator = "[green]✓[/green]" if mcp_manager.enabled else "[red]✗[/red]" table = Table( "No.", "Path", "Files", "Size", show_header=True, header_style="bold magenta" ) for folder_info in result['folders']: number = str(folder_info['number']) path = folder_info['path'] if folder_info['exists']: files = f"📁 {folder_info['file_count']}" size = f"{folder_info['size_mb']:.1f} MB" else: files = "[red]Not found[/red]" size = "-" table.add_row(number, path, files, size) gitignore_info = "" if mcp_manager.server: gitignore_status = "on" if mcp_manager.server.respect_gitignore else "off" pattern_count = len(mcp_manager.server.gitignore_parser.patterns) gitignore_info = f" | .gitignore: {gitignore_status} ({pattern_count} patterns)" console.print(Panel( table, title=f"[bold green]MCP Folders: {'Active' if mcp_manager.enabled else 'Inactive'} {status_indicator}[/]", title_align="left", subtitle=f"[dim]Total: {result['total_folders']} folders, {result['total_files']} files ({result['total_size_mb']:.1f} MB){gitignore_info}[/]", subtitle_align="right" )) else: console.print(f"[bold red]❌ {result['error']}[/]") continue elif mcp_command == "db": if not mcp_args: # Show current database or list all if mcp_manager.mode == "database" and mcp_manager.selected_db_index is not None: db = mcp_manager.databases[mcp_manager.selected_db_index] console.print(f"[bold cyan]Currently using database #{mcp_manager.selected_db_index + 1}: {db['name']}[/]") console.print(f"[dim]Path: {db['path']}[/]") console.print(f"[dim]Tables: {', '.join(db['tables'])}[/]") else: console.print("[bold yellow]Not in database mode. Use '/mcp db ' to select a database.[/]") # Also show hint to list console.print("\n[dim]Use '/mcp db list' to see all databases[/]") continue if mcp_args == "list": result = mcp_manager.list_databases() if result['success']: if result['count'] == 0: console.print("[bold yellow]No databases configured.[/]") console.print("\n[bold cyan]Add a database with: /mcp add db ~/app/data.db[/]") continue table = Table( "No.", "Name", "Tables", "Size", "Status", show_header=True, header_style="bold magenta" ) for db_info in result['databases']: number = str(db_info['number']) name = db_info['name'] table_count = f"{db_info['table_count']} tables" size = f"{db_info['size_mb']:.1f} MB" if db_info.get('warning'): status = f"[red]{db_info['warning']}[/red]" else: status = "[green]✓[/green]" table.add_row(number, name, table_count, size, status) console.print(Panel( table, title="[bold green]MCP Databases[/]", title_align="left", subtitle=f"[dim]Total: {result['count']} database(s) | Use '/mcp db ' to select[/]", subtitle_align="right" )) else: console.print(f"[bold red]❌ {result['error']}[/]") continue # Switch to database mode try: db_num = int(mcp_args) result = mcp_manager.switch_mode("database", db_num) if result['success']: db = result['database'] console.print(f"\n[bold green]✓ {result['message']}[/]") console.print(f"[dim cyan]Tables: {', '.join(db['tables'])}[/]") console.print(f"\n[bold cyan]Available tools:[/] inspect_database, search_database, query_database") console.print(f"\n[bold yellow]You can now ask questions about this database![/]") console.print(f"[dim]Examples:[/]") console.print(f" 'Show me all tables'") console.print(f" 'Search for records mentioning X'") console.print(f" 'How many rows in the users table?'") console.print(f"\n[dim]Switch back to files: /mcp files[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") except ValueError: console.print(f"[bold red]Invalid database number: {mcp_args}[/]") console.print(f"[bold yellow]Use '/mcp db list' to see available databases[/]") continue elif mcp_command == "files": result = mcp_manager.switch_mode("files") if result['success']: console.print(f"[bold green]✓ {result['message']}[/]") console.print(f"\n[bold cyan]Available tools:[/] read_file, list_directory, search_files") else: console.print(f"[bold red]❌ {result['error']}[/]") continue elif mcp_command == "gitignore": if not mcp_args: if mcp_manager.server: status = "enabled" if mcp_manager.server.respect_gitignore else "disabled" pattern_count = len(mcp_manager.server.gitignore_parser.patterns) console.print(f"[bold blue].gitignore filtering: {status}[/]") console.print(f"[dim cyan]Loaded {pattern_count} pattern(s) from .gitignore files[/]") else: console.print("[bold yellow]MCP is not enabled. Enable with /mcp enable[/]") continue if mcp_args.lower() == "on": result = mcp_manager.toggle_gitignore(True) if result['success']: console.print(f"[bold green]✓ {result['message']}[/]") console.print(f"[dim cyan]Using {result['pattern_count']} .gitignore pattern(s)[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") elif mcp_args.lower() == "off": result = mcp_manager.toggle_gitignore(False) if result['success']: console.print(f"[bold green]✓ {result['message']}[/]") console.print("[bold yellow]Warning: All files will be visible, including those in .gitignore[/]") else: console.print(f"[bold red]❌ {result['error']}[/]") else: console.print("[bold yellow]Usage: /mcp gitignore on|off[/]") continue elif mcp_command == "status": result = mcp_manager.get_status() if result['success']: status_color = "green" if result['enabled'] else "red" status_text = "Active ✓" if result['enabled'] else "Inactive ✗" table = Table( "Property", "Value", show_header=True, header_style="bold magenta" ) table.add_row("Status", f"[{status_color}]{status_text}[/{status_color}]") # Mode info mode_info = result['mode_info'] table.add_row("Current Mode", mode_info['mode_display']) if 'database' in mode_info: db = mode_info['database'] table.add_row("Database Path", db['path']) table.add_row("Database Tables", ', '.join(db['tables'])) if result['uptime']: table.add_row("Uptime", result['uptime']) table.add_row("", "") table.add_row("[bold]Configuration[/]", "") table.add_row("Allowed Folders", str(result['folder_count'])) table.add_row("Databases", str(result['database_count'])) table.add_row("Total Files Accessible", str(result['total_files'])) table.add_row("Total Size", f"{result['total_size_mb']:.1f} MB") table.add_row(".gitignore Filtering", result['gitignore_status']) if result['gitignore_patterns'] > 0: table.add_row(".gitignore Patterns", str(result['gitignore_patterns'])) if result['enabled']: table.add_row("", "") table.add_row("[bold]Tools Available[/]", "") for tool in result['tools_available']: table.add_row(f" ✓ {tool}", "") stats = result['stats'] if stats['total_calls'] > 0: table.add_row("", "") table.add_row("[bold]Session Stats[/]", "") table.add_row("Total Tool Calls", str(stats['total_calls'])) if stats['reads'] > 0: table.add_row("Files Read", str(stats['reads'])) if stats['lists'] > 0: table.add_row("Directories Listed", str(stats['lists'])) if stats['searches'] > 0: table.add_row("File Searches", str(stats['searches'])) if stats['db_inspects'] > 0: table.add_row("DB Inspections", str(stats['db_inspects'])) if stats['db_searches'] > 0: table.add_row("DB Searches", str(stats['db_searches'])) if stats['db_queries'] > 0: table.add_row("SQL Queries", str(stats['db_queries'])) if stats['last_used']: try: last_used = datetime.datetime.fromisoformat(stats['last_used']) time_ago = datetime.datetime.now() - last_used if time_ago.seconds < 60: time_str = f"{time_ago.seconds} seconds ago" elif time_ago.seconds < 3600: time_str = f"{time_ago.seconds // 60} minutes ago" else: time_str = f"{time_ago.seconds // 3600} hours ago" table.add_row("Last Used", time_str) except: pass console.print(Panel( table, title="[bold green]MCP Status[/]", title_align="left" )) else: console.print(f"[bold red]❌ {result['error']}[/]") continue else: console.print("[bold cyan]MCP (Model Context Protocol) Commands:[/]\n") console.print(" [bold yellow]/mcp enable[/] - Start MCP server") console.print(" [bold yellow]/mcp disable[/] - Stop MCP server") console.print(" [bold yellow]/mcp status[/] - Show comprehensive MCP status") console.print("") console.print(" [bold cyan]FILE MODE (default):[/]") console.print(" [bold yellow]/mcp add [/] - Add folder for file access") console.print(" [bold yellow]/mcp remove [/] - Remove folder") console.print(" [bold yellow]/mcp list[/] - List all folders") console.print(" [bold yellow]/mcp files[/] - Switch to file mode") console.print(" [bold yellow]/mcp gitignore [on|off][/] - Toggle .gitignore filtering") console.print("") console.print(" [bold cyan]DATABASE MODE:[/]") console.print(" [bold yellow]/mcp add db [/] - Add SQLite database") console.print(" [bold yellow]/mcp db list[/] - List all databases") console.print(" [bold yellow]/mcp db [/] - Switch to database mode") console.print(" [bold yellow]/mcp remove db [/]- Remove database") console.print("") console.print("[dim]Use '/help /mcp' for detailed command help[/]") console.print("[dim]Use '/help mcp' for comprehensive MCP guide[/]") continue # ============================================================ # ALL OTHER COMMANDS # ============================================================ if user_input.lower() == "/retry": if not session_history: console.print("[bold red]No history to retry.[/]") app_logger.warning("Retry attempted with no history") continue last_prompt = session_history[-1]['prompt'] console.print("[bold green]Retrying last prompt...[/]") app_logger.info(f"Retrying prompt: {last_prompt[:100]}...") user_input = last_prompt elif user_input.lower().startswith("/online"): args = user_input[8:].strip() if not args: status = "enabled" if online_mode_enabled else "disabled" default_status = "enabled" if DEFAULT_ONLINE_MODE == "on" else "disabled" console.print(f"[bold blue]Online mode (web search) {status}.[/]") console.print(f"[dim blue]Default setting: {default_status} (use '/config online on|off' to change)[/]") if selected_model: if supports_online_mode(selected_model): console.print(f"[dim green]Current model '{selected_model['name']}' supports online mode.[/]") else: console.print(f"[dim yellow]Current model '{selected_model['name']}' does not support online mode.[/]") continue if args.lower() == "on": if not selected_model: console.print("[bold red]No model selected. Select a model first with '/model'.[/]") continue if not supports_online_mode(selected_model): console.print(f"[bold red]Model '{selected_model['name']}' does not support online mode (web search).[/]") console.print("[dim yellow]Online mode requires models with 'tools' parameter support.[/]") app_logger.warning(f"Online mode activation failed - model {selected_model['id']} doesn't support it") continue online_mode_enabled = True console.print("[bold green]Online mode enabled for this session. Model will use web search capabilities.[/]") console.print(f"[dim blue]Effective model ID: {get_effective_model_id(selected_model['id'], True)}[/]") app_logger.info(f"Online mode enabled for model {selected_model['id']}") elif args.lower() == "off": online_mode_enabled = False console.print("[bold green]Online mode disabled for this session. Model will not use web search.[/]") if selected_model: console.print(f"[dim blue]Effective model ID: {selected_model['id']}[/]") app_logger.info("Online mode disabled") else: console.print("[bold yellow]Usage: /online on|off (or /online to view status)[/]") continue elif user_input.lower().startswith("/memory"): args = user_input[8:].strip() if not args: status = "enabled" if conversation_memory_enabled else "disabled" history_count = len(session_history) - memory_start_index if conversation_memory_enabled and memory_start_index < len(session_history) else 0 console.print(f"[bold blue]Conversation memory {status}.[/]") if conversation_memory_enabled: console.print(f"[dim blue]Tracking {history_count} message(s) since memory enabled.[/]") else: console.print(f"[dim yellow]Memory disabled. Each request is independent (saves tokens/cost).[/]") continue if args.lower() == "on": conversation_memory_enabled = True memory_start_index = len(session_history) console.print("[bold green]Conversation memory enabled. Will remember conversations from this point forward.[/]") console.print(f"[dim blue]Memory will track messages starting from index {memory_start_index}.[/]") app_logger.info(f"Conversation memory enabled at index {memory_start_index}") elif args.lower() == "off": conversation_memory_enabled = False console.print("[bold green]Conversation memory disabled. API calls will not include history (lower cost).[/]") console.print(f"[dim yellow]Note: Messages are still saved locally but not sent to API.[/]") app_logger.info("Conversation memory disabled") else: console.print("[bold yellow]Usage: /memory on|off (or /memory to view status)[/]") continue elif user_input.lower().startswith("/paste"): optional_prompt = user_input[7:].strip() try: clipboard_content = pyperclip.paste() except Exception as e: console.print(f"[bold red]Failed to access clipboard: {e}[/]") app_logger.error(f"Clipboard access error: {e}") continue if not clipboard_content or not clipboard_content.strip(): console.print("[bold red]Clipboard is empty.[/]") app_logger.warning("Paste attempted with empty clipboard") continue try: clipboard_content.encode('utf-8') preview_lines = clipboard_content.split('\n')[:10] preview_text = '\n'.join(preview_lines) if len(clipboard_content.split('\n')) > 10: preview_text += "\n... (content truncated for preview)" char_count = len(clipboard_content) line_count = len(clipboard_content.split('\n')) console.print(Panel( preview_text, title=f"[bold cyan]📋 Clipboard Content Preview ({char_count} chars, {line_count} lines)[/]", title_align="left", border_style="cyan" )) if optional_prompt: final_prompt = f"{optional_prompt}\n\n```\n{clipboard_content}\n```" console.print(f"[dim blue]Sending with prompt: '{optional_prompt}'[/]") else: final_prompt = clipboard_content console.print("[dim blue]Sending clipboard content without additional prompt[/]") user_input = final_prompt app_logger.info(f"Pasted content from clipboard: {char_count} chars, {line_count} lines, with prompt: {bool(optional_prompt)}") except UnicodeDecodeError: console.print("[bold red]Clipboard contains non-text (binary) data. Only plain text is supported.[/]") app_logger.error("Paste failed - clipboard contains binary data") continue except Exception as e: console.print(f"[bold red]Error processing clipboard content: {e}[/]") app_logger.error(f"Clipboard processing error: {e}") continue elif user_input.lower().startswith("/export"): args = user_input[8:].strip().split(maxsplit=1) if len(args) != 2: console.print("[bold red]Usage: /export [/]") console.print("[bold yellow]Formats: md (Markdown), json (JSON), html (HTML)[/]") console.print("[bold yellow]Example: /export md my_conversation.md[/]") continue export_format = args[0].lower() filename = args[1] if not session_history: console.print("[bold red]No conversation history to export.[/]") continue if export_format not in ['md', 'json', 'html']: console.print("[bold red]Invalid format. Use: md, json, or html[/]") continue try: if export_format == 'md': content = export_as_markdown(session_history, session_system_prompt) elif export_format == 'json': content = export_as_json(session_history, session_system_prompt) elif export_format == 'html': content = export_as_html(session_history, session_system_prompt) export_path = Path(filename).expanduser() with open(export_path, 'w', encoding='utf-8') as f: f.write(content) console.print(f"[bold green]✅ Conversation exported to: {export_path.absolute()}[/]") console.print(f"[dim blue]Format: {export_format.upper()} | Messages: {len(session_history)} | Size: {len(content)} bytes[/]") app_logger.info(f"Conversation exported as {export_format} to {export_path} ({len(session_history)} messages)") except Exception as e: console.print(f"[bold red]Export failed: {e}[/]") app_logger.error(f"Export error: {e}") continue elif user_input.lower().startswith("/save"): args = user_input[6:].strip() if not args: console.print("[bold red]Usage: /save [/]") continue if not session_history: console.print("[bold red]No history to save.[/]") continue save_conversation(args, session_history) console.print(f"[bold green]Conversation saved as '{args}'.[/]") app_logger.info(f"Conversation saved as '{args}' with {len(session_history)} messages") continue elif user_input.lower().startswith("/load"): args = user_input[6:].strip() if not args: console.print("[bold red]Usage: /load [/]") console.print("[bold yellow]Tip: Use /list to see numbered conversations[/]") continue conversation_name = None if args.isdigit(): conv_number = int(args) if saved_conversations_cache and 1 <= conv_number <= len(saved_conversations_cache): conversation_name = saved_conversations_cache[conv_number - 1]['name'] console.print(f"[bold cyan]Loading conversation #{conv_number}: '{conversation_name}'[/]") else: console.print(f"[bold red]Invalid conversation number: {conv_number}[/]") console.print(f"[bold yellow]Use /list to see available conversations (1-{len(saved_conversations_cache) if saved_conversations_cache else 0})[/]") continue else: conversation_name = args loaded_data = load_conversation(conversation_name) if not loaded_data: console.print(f"[bold red]Conversation '{conversation_name}' not found.[/]") app_logger.warning(f"Load failed for '{conversation_name}' - not found") continue session_history = loaded_data current_index = len(session_history) - 1 if conversation_memory_enabled: memory_start_index = 0 total_input_tokens = 0 total_output_tokens = 0 total_cost = 0.0 message_count = 0 console.print(f"[bold green]Conversation '{conversation_name}' loaded with {len(session_history)} messages.[/]") app_logger.info(f"Conversation '{conversation_name}' loaded with {len(session_history)} messages") continue elif user_input.lower().startswith("/delete"): args = user_input[8:].strip() if not args: console.print("[bold red]Usage: /delete [/]") console.print("[bold yellow]Tip: Use /list to see numbered conversations[/]") continue conversation_name = None if args.isdigit(): conv_number = int(args) if saved_conversations_cache and 1 <= conv_number <= len(saved_conversations_cache): conversation_name = saved_conversations_cache[conv_number - 1]['name'] console.print(f"[bold cyan]Deleting conversation #{conv_number}: '{conversation_name}'[/]") else: console.print(f"[bold red]Invalid conversation number: {conv_number}[/]") console.print(f"[bold yellow]Use /list to see available conversations (1-{len(saved_conversations_cache) if saved_conversations_cache else 0})[/]") continue else: conversation_name = args try: confirm = typer.confirm(f"Delete conversation '{conversation_name}'? This cannot be undone.", default=False) if not confirm: console.print("[bold yellow]Deletion cancelled.[/]") continue except (EOFError, KeyboardInterrupt): console.print("\n[bold yellow]Deletion cancelled.[/]") continue deleted_count = delete_conversation(conversation_name) if deleted_count > 0: console.print(f"[bold green]Conversation '{conversation_name}' deleted ({deleted_count} version(s) removed).[/]") app_logger.info(f"Conversation '{conversation_name}' deleted - {deleted_count} version(s)") if saved_conversations_cache: saved_conversations_cache = [c for c in saved_conversations_cache if c['name'] != conversation_name] else: console.print(f"[bold red]Conversation '{conversation_name}' not found.[/]") app_logger.warning(f"Delete failed for '{conversation_name}' - not found") continue elif user_input.lower() == "/list": conversations = list_conversations() if not conversations: console.print("[bold yellow]No saved conversations found.[/]") app_logger.info("User viewed conversation list - empty") saved_conversations_cache = [] continue saved_conversations_cache = conversations table = Table("No.", "Name", "Messages", "Last Saved", show_header=True, header_style="bold magenta") for idx, conv in enumerate(conversations, 1): try: dt = datetime.datetime.fromisoformat(conv['timestamp']) formatted_time = dt.strftime('%Y-%m-%d %H:%M:%S') except: formatted_time = conv['timestamp'] table.add_row( str(idx), conv['name'], str(conv['message_count']), formatted_time ) console.print(Panel(table, title=f"[bold green]Saved Conversations ({len(conversations)} total)[/]", title_align="left", subtitle="[dim]Use /load or /delete to manage conversations[/]", subtitle_align="right")) app_logger.info(f"User viewed conversation list - {len(conversations)} conversations") continue elif user_input.lower() == "/prev": if not session_history or current_index <= 0: console.print("[bold red]No previous response.[/]") continue current_index -= 1 prev_response = session_history[current_index]['response'] md = Markdown(prev_response) console.print(Panel(md, title=f"[bold green]Previous Response ({current_index + 1}/{len(session_history)})[/]", title_align="left")) app_logger.debug(f"Viewed previous response at index {current_index}") continue elif user_input.lower() == "/next": if not session_history or current_index >= len(session_history) - 1: console.print("[bold red]No next response.[/]") continue current_index += 1 next_response = session_history[current_index]['response'] md = Markdown(next_response) console.print(Panel(md, title=f"[bold green]Next Response ({current_index + 1}/{len(session_history)})[/]", title_align="left")) app_logger.debug(f"Viewed next response at index {current_index}") continue elif user_input.lower() == "/stats": credits = get_credits(API_KEY, OPENROUTER_BASE_URL) credits_left = credits['credits_left'] if credits else "Unknown" stats = f"Total Input: {total_input_tokens}, Total Output: {total_output_tokens}, Total Tokens: {total_input_tokens + total_output_tokens}, Total Cost: ${total_cost:.4f}, Avg Cost/Message: ${total_cost / message_count:.4f}" if message_count > 0 else "No messages." table = Table("Metric", "Value", show_header=True, header_style="bold magenta") table.add_row("Session Stats", stats) table.add_row("Credits Left", credits_left) console.print(Panel(table, title="[bold green]Session Cost Summary[/]", title_align="left")) app_logger.info(f"User viewed stats: {stats}") warnings = check_credit_alerts(credits) if warnings: warning_text = '|'.join(warnings) console.print(f"[bold red]⚠️ {warning_text}[/]") app_logger.warning(f"Warnings in stats: {warning_text}") continue elif user_input.lower().startswith("/middleout"): args = user_input[11:].strip() if not args: console.print(f"[bold blue]Middle-out transform {'enabled' if middle_out_enabled else 'disabled'}.[/]") continue if args.lower() == "on": middle_out_enabled = True console.print("[bold green]Middle-out transform enabled.[/]") elif args.lower() == "off": middle_out_enabled = False console.print("[bold green]Middle-out transform disabled.[/]") else: console.print("[bold yellow]Usage: /middleout on|off (or /middleout to view status)[/]") continue elif user_input.lower() == "/reset": try: confirm = typer.confirm("Reset conversation context? This clears history and prompt.", default=False) if not confirm: console.print("[bold yellow]Reset cancelled.[/]") continue except (EOFError, KeyboardInterrupt): console.print("\n[bold yellow]Reset cancelled.[/]") continue session_history = [] current_index = -1 session_system_prompt = "" memory_start_index = 0 total_input_tokens = 0 total_output_tokens = 0 total_cost = 0.0 message_count = 0 console.print("[bold green]Conversation context reset.[/]") app_logger.info("Conversation context reset by user") continue elif user_input.lower().startswith("/info"): args = user_input[6:].strip() if not args: if not selected_model: console.print("[bold red]No model selected and no model ID provided. Use '/model' first or '/info '.[/]") continue model_to_show = selected_model else: model_to_show = next((m for m in models_data if m["id"] == args or m.get("canonical_slug") == args or args.lower() in m["name"].lower()), None) if not model_to_show: console.print(f"[bold red]Model '{args}' not found.[/]") continue pricing = model_to_show.get("pricing", {}) architecture = model_to_show.get("architecture", {}) supported_params = ", ".join(model_to_show.get("supported_parameters", [])) or "None" top_provider = model_to_show.get("top_provider", {}) table = Table("Property", "Value", show_header=True, header_style="bold magenta") table.add_row("ID", model_to_show["id"]) table.add_row("Name", model_to_show["name"]) table.add_row("Description", model_to_show.get("description", "N/A")) table.add_row("Context Length", str(model_to_show.get("context_length", "N/A"))) table.add_row("Online Support", "Yes" if supports_online_mode(model_to_show) else "No") table.add_row("Pricing - Prompt ($/M tokens)", pricing.get("prompt", "N/A")) table.add_row("Pricing - Completion ($/M tokens)", pricing.get("completion", "N/A")) table.add_row("Pricing - Request ($)", pricing.get("request", "N/A")) table.add_row("Pricing - Image ($)", pricing.get("image", "N/A")) table.add_row("Input Modalities", ", ".join(architecture.get("input_modalities", [])) or "None") table.add_row("Output Modalities", ", ".join(architecture.get("output_modalities", [])) or "None") table.add_row("Supported Parameters", supported_params) table.add_row("Top Provider Context Length", str(top_provider.get("context_length", "N/A"))) table.add_row("Max Completion Tokens", str(top_provider.get("max_completion_tokens", "N/A"))) table.add_row("Moderated", "Yes" if top_provider.get("is_moderated", False) else "No") console.print(Panel(table, title=f"[bold green]Model Info: {model_to_show['name']}[/]", title_align="left")) continue elif user_input.startswith("/model"): app_logger.info("User initiated model selection") args = user_input[7:].strip() search_term = args if args else "" filtered_models = text_models if search_term: filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()] if not filtered_models: console.print(f"[bold red]No models match '{search_term}'. Try '/model'.[/]") continue table = Table("No.", "Name", "ID", "Image", "Online", show_header=True, header_style="bold magenta") for i, model in enumerate(filtered_models, 1): image_support = "[green]✓[/green]" if has_image_capability(model) else "[red]✗[/red]" online_support = "[green]✓[/green]" if supports_online_mode(model) else "[red]✗[/red]" table.add_row(str(i), model["name"], model["id"], image_support, online_support) title = f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]" display_paginated_table(table, title) while True: try: choice = int(typer.prompt("Enter model number (or 0 to cancel)")) if choice == 0: break if 1 <= choice <= len(filtered_models): selected_model = filtered_models[choice - 1] if supports_online_mode(selected_model) and DEFAULT_ONLINE_MODE == "on": online_mode_enabled = True console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]") console.print("[dim cyan]✓ Online mode auto-enabled (default setting). Use '/online off' to disable for this session.[/]") else: online_mode_enabled = False console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]") if supports_online_mode(selected_model): console.print("[dim green]✓ This model supports online mode. Use '/online on' to enable web search.[/]") app_logger.info(f"Model selected: {selected_model['name']} ({selected_model['id']}), Online: {online_mode_enabled}") break console.print("[bold red]Invalid choice. Try again.[/]") except ValueError: console.print("[bold red]Invalid input. Enter a number.[/]") continue elif user_input.startswith("/maxtoken"): args = user_input[10:].strip() if not args: console.print(f"[bold blue]Current session max tokens: {session_max_token}[/]") continue try: new_limit = int(args) if new_limit < 1: console.print("[bold red]Session token limit must be at least 1.[/]") continue if new_limit > MAX_TOKEN: console.print(f"[bold yellow]Cannot exceed stored max ({MAX_TOKEN}). Capping.[/]") new_limit = MAX_TOKEN session_max_token = new_limit console.print(f"[bold green]Session max tokens set to: {session_max_token}[/]") except ValueError: console.print("[bold red]Invalid token limit. Provide a positive integer.[/]") continue elif user_input.startswith("/system"): args = user_input[8:].strip() if not args: if session_system_prompt: console.print(f"[bold blue]Current session system prompt:[/] {session_system_prompt}") else: console.print("[bold blue]No session system prompt set.[/]") continue if args.lower() == "clear": session_system_prompt = "" console.print("[bold green]Session system prompt cleared.[/]") else: session_system_prompt = args console.print(f"[bold green]Session system prompt set to: {session_system_prompt}[/]") continue elif user_input.startswith("/config"): args = user_input[8:].strip().lower() update = check_for_updates(version) if args == "api": try: new_api_key = typer.prompt("Enter new API key") if new_api_key.strip(): set_config('api_key', new_api_key.strip()) API_KEY = new_api_key.strip() client = OpenRouter(api_key=API_KEY) console.print("[bold green]API key updated![/]") else: console.print("[bold yellow]No change.[/]") except Exception as e: console.print(f"[bold red]Error updating API key: {e}[/]") elif args == "url": try: new_url = typer.prompt("Enter new base URL") if new_url.strip(): set_config('base_url', new_url.strip()) OPENROUTER_BASE_URL = new_url.strip() console.print("[bold green]Base URL updated![/]") else: console.print("[bold yellow]No change.[/]") except Exception as e: console.print(f"[bold red]Error updating URL: {e}[/]") elif args.startswith("costwarning"): sub_args = args[11:].strip() if not sub_args: console.print(f"[bold blue]Stored cost warning threshold: ${COST_WARNING_THRESHOLD:.4f}[/]") continue try: new_threshold = float(sub_args) if new_threshold < 0: console.print("[bold red]Cost warning threshold must be >= 0.[/]") continue set_config(HIGH_COST_WARNING, str(new_threshold)) COST_WARNING_THRESHOLD = new_threshold console.print(f"[bold green]Cost warning threshold set to ${COST_WARNING_THRESHOLD:.4f}[/]") except ValueError: console.print("[bold red]Invalid cost threshold. Provide a valid number.[/]") elif args.startswith("stream"): sub_args = args[7:].strip() if sub_args in ["on", "off"]: set_config('stream_enabled', sub_args) STREAM_ENABLED = sub_args console.print(f"[bold green]Streaming {'enabled' if sub_args == 'on' else 'disabled'}.[/]") else: console.print("[bold yellow]Usage: /config stream on|off[/]") elif args.startswith("loglevel"): sub_args = args[8:].strip() if not sub_args: console.print(f"[bold blue]Current log level: {LOG_LEVEL_STR.upper()}[/]") console.print(f"[dim yellow]Valid levels: debug, info, warning, error, critical[/]") continue if sub_args.lower() in VALID_LOG_LEVELS: if set_log_level(sub_args): set_config('log_level', sub_args.lower()) console.print(f"[bold green]Log level set to: {sub_args.upper()}[/]") app_logger.info(f"Log level changed to {sub_args.upper()}") else: console.print(f"[bold red]Failed to set log level.[/]") else: console.print(f"[bold red]Invalid log level: {sub_args}[/]") console.print(f"[bold yellow]Valid levels: debug, info, warning, error, critical[/]") elif args.startswith("log"): sub_args = args[4:].strip() if not sub_args: console.print(f"[bold blue]Current log file size limit: {LOG_MAX_SIZE_MB} MB[/]") console.print(f"[bold blue]Log backup count: {LOG_BACKUP_COUNT} files[/]") console.print(f"[bold blue]Log level: {LOG_LEVEL_STR.upper()}[/]") console.print(f"[dim yellow]Total max disk usage: ~{LOG_MAX_SIZE_MB * (LOG_BACKUP_COUNT + 1)} MB[/]") continue try: new_size_mb = int(sub_args) if new_size_mb < 1: console.print("[bold red]Log size must be at least 1 MB.[/]") continue if new_size_mb > 100: console.print("[bold yellow]Warning: Log size > 100MB. Capping at 100MB.[/]") new_size_mb = 100 set_config('log_max_size_mb', str(new_size_mb)) LOG_MAX_SIZE_MB = new_size_mb app_logger = reload_logging_config() console.print(f"[bold green]Log size limit set to {new_size_mb} MB and applied immediately.[/]") console.print(f"[dim cyan]Log file rotated if it exceeded the new limit.[/]") app_logger.info(f"Log size limit updated to {new_size_mb} MB and reloaded") except ValueError: console.print("[bold red]Invalid size. Provide a number in MB.[/]") elif args.startswith("online"): sub_args = args[7:].strip() if not sub_args: current_default = "enabled" if DEFAULT_ONLINE_MODE == "on" else "disabled" console.print(f"[bold blue]Default online mode: {current_default}[/]") console.print("[dim yellow]This sets the default for new models. Use '/online on|off' to override in current session.[/]") continue if sub_args in ["on", "off"]: set_config('default_online_mode', sub_args) DEFAULT_ONLINE_MODE = sub_args console.print(f"[bold green]Default online mode {'enabled' if sub_args == 'on' else 'disabled'}.[/]") console.print("[dim blue]Note: This affects new model selections. Current session unchanged.[/]") app_logger.info(f"Default online mode set to {sub_args}") else: console.print("[bold yellow]Usage: /config online on|off[/]") elif args.startswith("maxtoken"): sub_args = args[9:].strip() if not sub_args: console.print(f"[bold blue]Stored max token limit: {MAX_TOKEN}[/]") continue try: new_max = int(sub_args) if new_max < 1: console.print("[bold red]Max token limit must be at least 1.[/]") continue if new_max > 1000000: console.print("[bold yellow]Capped at 1M for safety.[/]") new_max = 1000000 set_config('max_token', str(new_max)) MAX_TOKEN = new_max if session_max_token > MAX_TOKEN: session_max_token = MAX_TOKEN console.print(f"[bold yellow]Session adjusted to {session_max_token}.[/]") console.print(f"[bold green]Stored max token limit updated to: {MAX_TOKEN}[/]") except ValueError: console.print("[bold red]Invalid token limit.[/]") elif args.startswith("model"): sub_args = args[6:].strip() search_term = sub_args if sub_args else "" filtered_models = text_models if search_term: filtered_models = [m for m in text_models if search_term.lower() in m["name"].lower() or search_term.lower() in m["id"].lower()] if not filtered_models: console.print(f"[bold red]No models match '{search_term}'. Try without search.[/]") continue table = Table("No.", "Name", "ID", "Image", "Online", show_header=True, header_style="bold magenta") for i, model in enumerate(filtered_models, 1): image_support = "[green]✓[/green]" if has_image_capability(model) else "[red]✗[/red]" online_support = "[green]✓[/green]" if supports_online_mode(model) else "[red]✗[/red]" table.add_row(str(i), model["name"], model["id"], image_support, online_support) title = f"[bold green]Available Models for Default ({'All' if not search_term else f'Search: {search_term}'})[/]" display_paginated_table(table, title) while True: try: choice = int(typer.prompt("Enter model number (or 0 to cancel)")) if choice == 0: break if 1 <= choice <= len(filtered_models): default_model = filtered_models[choice - 1] set_config('default_model', default_model["id"]) current_name = selected_model['name'] if selected_model else "None" console.print(f"[bold cyan]Default model set to: {default_model['name']} ({default_model['id']}). Current unchanged: {current_name}[/]") break console.print("[bold red]Invalid choice. Try again.[/]") except ValueError: console.print("[bold red]Invalid input. Enter a number.[/]") else: DEFAULT_MODEL_ID = get_config('default_model') memory_status = "Enabled" if conversation_memory_enabled else "Disabled" memory_tracked = len(session_history) - memory_start_index if conversation_memory_enabled else 0 mcp_status = "Enabled" if mcp_manager.enabled else "Disabled" mcp_folders = len(mcp_manager.allowed_folders) mcp_databases = len(mcp_manager.databases) mcp_mode = mcp_manager.mode gitignore_status = "enabled" if (mcp_manager.server and mcp_manager.server.respect_gitignore) else "disabled" gitignore_patterns = len(mcp_manager.server.gitignore_parser.patterns) if mcp_manager.server else 0 table = Table("Setting", "Value", show_header=True, header_style="bold magenta", width=console.width - 10) table.add_row("API Key", API_KEY or "[Not set]") table.add_row("Base URL", OPENROUTER_BASE_URL or "[Not set]") table.add_row("DB Path", str(database) or "[Not set]") table.add_row("Logfile", str(log_file) or "[Not set]") table.add_row("Log Size Limit", f"{LOG_MAX_SIZE_MB} MB") table.add_row("Log Backups", str(LOG_BACKUP_COUNT)) table.add_row("Log Level", LOG_LEVEL_STR.upper()) table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled") table.add_row("Default Model", DEFAULT_MODEL_ID or "[Not set]") table.add_row("Current Model", "[Not set]" if selected_model is None else str(selected_model["name"])) table.add_row("Default Online Mode", "Enabled" if DEFAULT_ONLINE_MODE == "on" else "Disabled") table.add_row("Session Online Mode", "Enabled" if online_mode_enabled else "Disabled") table.add_row("MCP Status", f"{mcp_status} ({mcp_folders} folders, {mcp_databases} DBs)") table.add_row("MCP Mode", f"{mcp_mode}") table.add_row("MCP .gitignore", f"{gitignore_status} ({gitignore_patterns} patterns)") table.add_row("Max Token", str(MAX_TOKEN)) table.add_row("Session Token", "[Not set]" if session_max_token == 0 else str(session_max_token)) table.add_row("Session System Prompt", session_system_prompt or "[Not set]") table.add_row("Cost Warning Threshold", f"${COST_WARNING_THRESHOLD:.4f}") table.add_row("Middle-out Transform", "Enabled" if middle_out_enabled else "Disabled") table.add_row("Conversation Memory", f"{memory_status} ({memory_tracked} tracked)" if conversation_memory_enabled else memory_status) table.add_row("History Size", str(len(session_history))) table.add_row("Current History Index", str(current_index) if current_index >= 0 else "[None]") table.add_row("App Name", APP_NAME) table.add_row("App URL", APP_URL) credits = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits: table.add_row("Total Credits", credits['total_credits']) table.add_row("Used Credits", credits['used_credits']) table.add_row("Credits Left", credits['credits_left']) else: table.add_row("Total Credits", "[Unavailable - Check API key]") table.add_row("Used Credits", "[Unavailable - Check API key]") table.add_row("Credits Left", "[Unavailable - Check API key]") console.print(Panel(table, title="[bold green]Current Configurations[/]", title_align="left", subtitle="%s" %(update), subtitle_align="right")) continue if user_input.lower() == "/credits": credits = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits: console.print(f"[bold green]Credits left: {credits['credits_left']}[/]") alerts = check_credit_alerts(credits) if alerts: for alert in alerts: console.print(f"[bold red]⚠️ {alert}[/]") else: console.print("[bold red]Unable to fetch credits. Check your API key or network.[/]") continue if user_input.lower() == "/clear" or user_input.lower() == "/cl": clear_screen() DEFAULT_MODEL_ID = get_config('default_model') token_value = session_max_token if session_max_token != 0 else "Not set" console.print(f"[bold cyan]Token limits: Max= {MAX_TOKEN}, Session={token_value}[/]") console.print("[bold blue]Active model[/] [bold red]%s[/]" %(str(selected_model["name"]) if selected_model else "None")) if online_mode_enabled: console.print("[bold cyan]Online mode: Enabled (web search active)[/]") if mcp_manager.enabled: gitignore_status = "on" if (mcp_manager.server and mcp_manager.server.respect_gitignore) else "off" mode_display = "Files" if mcp_manager.mode == "files" else f"DB #{mcp_manager.selected_db_index + 1}" console.print(f"[bold cyan]MCP: Enabled (Mode: {mode_display}, {len(mcp_manager.allowed_folders)} folders, {len(mcp_manager.databases)} DBs, .gitignore {gitignore_status})[/]") continue if user_input.lower().startswith("/help"): args = user_input[6:].strip() if args: show_command_help(args) continue help_table = Table("Command", "Description", "Example", show_header=True, header_style="bold cyan", width=console.width - 10) # SESSION COMMANDS help_table.add_row( "[bold yellow]━━━ SESSION COMMANDS ━━━[/]", "", "" ) help_table.add_row( "/clear or /cl", "Clear terminal screen. Keyboard shortcut: [bold]Ctrl+L[/]", "/clear\n/cl" ) help_table.add_row( "/help [command|topic]", "Show help menu or detailed help. Use '/help mcp' for MCP guide.", "/help\n/help mcp" ) help_table.add_row( "/memory [on|off]", "Toggle conversation memory. ON sends history, OFF is stateless (saves cost).", "/memory\n/memory off" ) help_table.add_row( "/next", "View next response in history.", "/next" ) help_table.add_row( "/online [on|off]", "Enable/disable online mode (web search) for session.", "/online on\n/online off" ) help_table.add_row( "/paste [prompt]", "Paste clipboard text/code. Optional prompt.", "/paste\n/paste Explain" ) help_table.add_row( "/prev", "View previous response in history.", "/prev" ) help_table.add_row( "/reset", "Clear history and reset system prompt (requires confirmation).", "/reset" ) help_table.add_row( "/retry", "Resend last prompt.", "/retry" ) # MCP COMMANDS help_table.add_row( "[bold yellow]━━━ MCP (FILE & DATABASE ACCESS) ━━━[/]", "", "" ) help_table.add_row( "/mcp enable", "Start MCP server for file and database access.", "/mcp enable" ) help_table.add_row( "/mcp disable", "Stop MCP server.", "/mcp disable" ) help_table.add_row( "/mcp status", "Show mode, stats, folders, databases, and .gitignore status.", "/mcp status" ) help_table.add_row( "/mcp add ", "Add folder for file access (auto-loads .gitignore).", "/mcp add ~/Documents" ) help_table.add_row( "/mcp add db ", "Add SQLite database for querying.", "/mcp add db ~/app.db" ) help_table.add_row( "/mcp list", "List all allowed folders with stats.", "/mcp list" ) help_table.add_row( "/mcp db list", "List all databases with details.", "/mcp db list" ) help_table.add_row( "/mcp db ", "Switch to database mode (select DB by number).", "/mcp db 1" ) help_table.add_row( "/mcp files", "Switch to file mode (default).", "/mcp files" ) help_table.add_row( "/mcp remove ", "Remove folder by path or number.", "/mcp remove 2" ) help_table.add_row( "/mcp remove db ", "Remove database by number.", "/mcp remove db 1" ) help_table.add_row( "/mcp gitignore [on|off]", "Toggle .gitignore filtering (respects project excludes).", "/mcp gitignore on" ) # MODEL COMMANDS help_table.add_row( "[bold yellow]━━━ MODEL COMMANDS ━━━[/]", "", "" ) help_table.add_row( "/info [model_id]", "Display model details (pricing, capabilities, context).", "/info\n/info gpt-4o" ) help_table.add_row( "/model [search]", "Select/change model. Shows image and online capabilities.", "/model\n/model gpt" ) # CONFIGURATION help_table.add_row( "[bold yellow]━━━ CONFIGURATION ━━━[/]", "", "" ) help_table.add_row( "/config", "View all configurations.", "/config" ) help_table.add_row( "/config api", "Set/update API key.", "/config api" ) help_table.add_row( "/config costwarning [val]", "Set cost warning threshold (USD).", "/config costwarning 0.05" ) help_table.add_row( "/config log [size_mb]", "Set log file size limit. Takes effect immediately.", "/config log 20" ) help_table.add_row( "/config loglevel [level]", "Set log verbosity (debug/info/warning/error/critical).", "/config loglevel debug" ) help_table.add_row( "/config maxtoken [val]", "Set stored max token limit.", "/config maxtoken 50000" ) help_table.add_row( "/config model [search]", "Set default startup model.", "/config model gpt" ) help_table.add_row( "/config online [on|off]", "Set default online mode for new models.", "/config online on" ) help_table.add_row( "/config stream [on|off]", "Enable/disable response streaming.", "/config stream off" ) help_table.add_row( "/config url", "Set/update base URL.", "/config url" ) # TOKEN & SYSTEM help_table.add_row( "[bold yellow]━━━ TOKEN & SYSTEM ━━━[/]", "", "" ) help_table.add_row( "/maxtoken [value]", "Set temporary session token limit.", "/maxtoken 2000" ) help_table.add_row( "/middleout [on|off]", "Enable/disable prompt compression for large contexts.", "/middleout on" ) help_table.add_row( "/system [prompt|clear]", "Set session system prompt. Use 'clear' to reset.", "/system You are a Python expert" ) # CONVERSATION MGMT help_table.add_row( "[bold yellow]━━━ CONVERSATION MGMT ━━━[/]", "", "" ) help_table.add_row( "/delete ", "Delete saved conversation (requires confirmation).", "/delete my_chat\n/delete 3" ) help_table.add_row( "/export ", "Export conversation (md/json/html).", "/export md notes.md" ) help_table.add_row( "/list", "List saved conversations with numbers and stats.", "/list" ) help_table.add_row( "/load ", "Load saved conversation.", "/load my_chat\n/load 3" ) help_table.add_row( "/save ", "Save current conversation.", "/save my_chat" ) # MONITORING help_table.add_row( "[bold yellow]━━━ MONITORING & STATS ━━━[/]", "", "" ) help_table.add_row( "/credits", "Display OpenRouter credits with alerts.", "/credits" ) help_table.add_row( "/stats", "Display session stats: tokens, cost, credits.", "/stats" ) # INPUT METHODS help_table.add_row( "[bold yellow]━━━ INPUT METHODS ━━━[/]", "", "" ) help_table.add_row( "@/path/to/file", "Attach files: images (PNG, JPG), PDFs, code files.", "Debug @script.py\nAnalyze @image.png" ) help_table.add_row( "Clipboard paste", "Use /paste to send clipboard content.", "/paste\n/paste Explain" ) help_table.add_row( "// escape", "Start with // to send literal / character.", "//help sends '/help' as text" ) # EXIT help_table.add_row( "[bold yellow]━━━ EXIT ━━━[/]", "", "" ) help_table.add_row( "exit | quit | bye", "Quit with session summary.", "exit" ) console.print(Panel( help_table, title="[bold cyan]oAI Chat Help (Version %s)[/]" % version, title_align="center", subtitle="💡 Commands are case-insensitive • Use /help for details • Use /help mcp for MCP guide • Memory ON by default • MCP has file & DB modes • Use // to escape / • Visit: https://iurl.no/oai", subtitle_align="center", border_style="cyan" )) continue if not selected_model: console.print("[bold yellow]Select a model first with '/model'.[/]") continue # ============================================================ # PROCESS FILE ATTACHMENTS # ============================================================ text_part = user_input for match in re.finditer(r'@([^\s]+)', user_input): file_path = match.group(1) expanded_path = os.path.expanduser(os.path.abspath(file_path)) if not os.path.exists(expanded_path) or os.path.isdir(expanded_path): console.print(f"[bold red]File not found or is a directory: {expanded_path}[/]") continue file_size = os.path.getsize(expanded_path) if file_size > 10 * 1024 * 1024: console.print(f"[bold red]File too large (>10MB): {expanded_path}[/]") continue mime_type, _ = mimetypes.guess_type(expanded_path) file_ext = os.path.splitext(expanded_path)[1].lower() try: with open(expanded_path, 'rb') as f: file_data = f.read() if mime_type and mime_type.startswith('image/'): modalities = selected_model.get("architecture", {}).get("input_modalities", []) if "image" not in modalities: console.print("[bold red]Selected model does not support image attachments.[/]") console.print(f"[dim yellow]Supported modalities: {', '.join(modalities) if modalities else 'text only'}[/]") continue b64_data = base64.b64encode(file_data).decode('utf-8') content_blocks.append({"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{b64_data}"}}) console.print(f"[dim green]✓ Image attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]") elif mime_type == 'application/pdf' or file_ext == '.pdf': modalities = selected_model.get("architecture", {}).get("input_modalities", []) supports_pdf = any(mod in modalities for mod in ["document", "pdf", "file"]) if not supports_pdf: console.print("[bold red]Selected model does not support PDF attachments.[/]") console.print(f"[dim yellow]Supported modalities: {', '.join(modalities) if modalities else 'text only'}[/]") continue b64_data = base64.b64encode(file_data).decode('utf-8') content_blocks.append({"type": "image_url", "image_url": {"url": f"data:application/pdf;base64,{b64_data}"}}) console.print(f"[dim green]✓ PDF attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]") elif (mime_type == 'text/plain' or file_ext in SUPPORTED_CODE_EXTENSIONS): text_content = file_data.decode('utf-8') content_blocks.append({"type": "text", "text": f"Code File: {os.path.basename(expanded_path)}\n\n{text_content}"}) console.print(f"[dim green]✓ Code file attached: {os.path.basename(expanded_path)} ({file_size / 1024:.1f} KB)[/]") else: console.print(f"[bold red]Unsupported file type ({mime_type}) for {expanded_path}.[/]") console.print("[bold yellow]Supported types: images (PNG, JPG, etc.), PDFs, and code files (.py, .js, etc.)[/]") continue file_attachments.append(file_path) app_logger.info(f"File attached: {os.path.basename(expanded_path)}, Type: {mime_type or file_ext}, Size: {file_size / 1024:.1f} KB") except UnicodeDecodeError: console.print(f"[bold red]Cannot decode {expanded_path} as UTF-8. File may be binary or use unsupported encoding.[/]") app_logger.error(f"UTF-8 decode error for {expanded_path}") continue except Exception as e: console.print(f"[bold red]Error reading file {expanded_path}: {e}[/]") app_logger.error(f"File read error for {expanded_path}: {e}") continue text_part = re.sub(r'@([^\s]+)', '', text_part).strip() # Build message content if text_part or content_blocks: message_content = [] if text_part: message_content.append({"type": "text", "text": text_part}) message_content.extend(content_blocks) else: console.print("[bold red]Prompt cannot be empty.[/]") continue # ============================================================ # BUILD API MESSAGES # ============================================================ api_messages = [] if session_system_prompt: api_messages.append({"role": "system", "content": session_system_prompt}) # Add database context if in database mode if mcp_manager.enabled and mcp_manager.mode == "database" and mcp_manager.selected_db_index is not None: db = mcp_manager.databases[mcp_manager.selected_db_index] db_context = f"""You are currently connected to SQLite database: {db['name']} Available tables: {', '.join(db['tables'])} You can: - Inspect the database schema with inspect_database - Search for data across tables with search_database - Execute read-only SQL queries with query_database All queries are read-only. INSERT/UPDATE/DELETE are not allowed.""" api_messages.append({"role": "system", "content": db_context}) if conversation_memory_enabled: for i in range(memory_start_index, len(session_history)): history_entry = session_history[i] api_messages.append({ "role": "user", "content": history_entry['prompt'] }) api_messages.append({ "role": "assistant", "content": history_entry['response'] }) api_messages.append({"role": "user", "content": message_content}) # Get effective model ID effective_model_id = get_effective_model_id(selected_model["id"], online_mode_enabled) # ======================================================================== # BUILD API PARAMS WITH MCP TOOLS SUPPORT # ======================================================================== api_params = { "model": effective_model_id, "messages": api_messages, "stream": STREAM_ENABLED == "on", "http_headers": { "HTTP-Referer": APP_URL, "X-Title": APP_NAME } } # Add MCP tools if enabled mcp_tools_added = False mcp_tools = [] if mcp_manager.enabled: if supports_function_calling(selected_model): mcp_tools = mcp_manager.get_tools_schema() if mcp_tools: api_params["tools"] = mcp_tools api_params["tool_choice"] = "auto" mcp_tools_added = True # IMPORTANT: Disable streaming if MCP tools are present if api_params["stream"]: api_params["stream"] = False app_logger.info("Disabled streaming due to MCP tool calls") app_logger.info(f"Added {len(mcp_tools)} MCP tools to request (mode: {mcp_manager.mode})") else: app_logger.debug(f"Model {selected_model['id']} doesn't support function calling - MCP tools not added") if session_max_token > 0: api_params["max_tokens"] = session_max_token if middle_out_enabled: api_params["transforms"] = ["middle-out"] # Log API request file_count = len(file_attachments) history_messages_count = len(session_history) - memory_start_index if conversation_memory_enabled else 0 memory_status = "ON" if conversation_memory_enabled else "OFF" online_status = "ON" if online_mode_enabled else "OFF" if mcp_tools_added: if mcp_manager.mode == "files": mcp_status = f"ON (Files, {len(mcp_manager.allowed_folders)} folders, {len(mcp_tools)} tools)" elif mcp_manager.mode == "database": db = mcp_manager.databases[mcp_manager.selected_db_index] mcp_status = f"ON (DB: {db['name']}, {len(mcp_tools)} tools)" else: mcp_status = f"ON ({len(mcp_tools)} tools)" else: mcp_status = "OFF" app_logger.info(f"API Request: Model '{effective_model_id}' (Online: {online_status}, MCP: {mcp_status}), Prompt length: {len(text_part)} chars, {file_count} file(s) attached, Memory: {memory_status}, History sent: {history_messages_count} messages.") # Send request is_streaming = api_params["stream"] if is_streaming: console.print("[bold green]Streaming response...[/] [dim](Press Ctrl+C to cancel)[/]") if online_mode_enabled: console.print("[dim cyan]🌐 Online mode active - model has web search access[/]") console.print("") else: thinking_msg = "Thinking" if mcp_tools_added: if mcp_manager.mode == "database": thinking_msg = "Thinking (database tools available)" else: thinking_msg = "Thinking (MCP tools available)" console.print(f"[bold green]{thinking_msg}...[/]", end="\r") if online_mode_enabled: console.print(f"[dim cyan]🌐 Online mode active[/]") if mcp_tools_added: if mcp_manager.mode == "files": gitignore_status = "on" if (mcp_manager.server and mcp_manager.server.respect_gitignore) else "off" console.print(f"[dim cyan]🔧 MCP active - AI can access {len(mcp_manager.allowed_folders)} folder(s) with {len(mcp_tools)} tools (.gitignore {gitignore_status})[/]") elif mcp_manager.mode == "database": db = mcp_manager.databases[mcp_manager.selected_db_index] console.print(f"[dim cyan]🗄️ MCP active - AI can query database: {db['name']} ({len(db['tables'])} tables, {len(mcp_tools)} tools)[/]") start_time = time.time() try: response = client.chat.send(**api_params) app_logger.info(f"API call successful for model '{effective_model_id}'") except Exception as e: console.print(f"[bold red]Error sending request: {e}[/]") app_logger.error(f"API Error: {type(e).__name__}: {e}") continue # ======================================================================== # HANDLE TOOL CALLS (MCP FUNCTION CALLING) # ======================================================================== tool_call_loop_count = 0 max_tool_loops = 5 while tool_call_loop_count < max_tool_loops: wants_tool_call = False if hasattr(response, 'choices') and response.choices: message = response.choices[0].message if hasattr(message, 'tool_calls') and message.tool_calls: wants_tool_call = True tool_calls = message.tool_calls console.print(f"\n[dim yellow]🔧 AI requesting {len(tool_calls)} tool call(s)...[/]") app_logger.info(f"Model requested {len(tool_calls)} tool calls") tool_results = [] for tool_call in tool_calls: tool_name = tool_call.function.name try: tool_args = json.loads(tool_call.function.arguments) except json.JSONDecodeError as e: app_logger.error(f"Failed to parse tool arguments: {e}") tool_results.append({ "tool_call_id": tool_call.id, "role": "tool", "name": tool_name, "content": json.dumps({"error": f"Invalid arguments: {e}"}) }) continue args_display = ', '.join(f'{k}="{v}"' if isinstance(v, str) else f'{k}={v}' for k, v in tool_args.items()) console.print(f"[dim cyan] → Calling: {tool_name}({args_display})[/]") app_logger.info(f"Executing MCP tool: {tool_name} with args: {tool_args}") try: result = asyncio.run(mcp_manager.call_tool(tool_name, **tool_args)) except Exception as e: app_logger.error(f"MCP tool execution error: {e}") result = {"error": str(e)} if 'error' in result: result_content = json.dumps({"error": result['error']}) console.print(f"[dim red] ✗ Error: {result['error']}[/]") app_logger.warning(f"MCP tool {tool_name} returned error: {result['error']}") else: # Display appropriate success message based on tool if tool_name == 'search_files': count = result.get('count', 0) console.print(f"[dim green] ✓ Found {count} file(s)[/]") elif tool_name == 'read_file': size = result.get('size', 0) truncated = " (truncated)" if result.get('truncated') else "" console.print(f"[dim green] ✓ Read file ({size} bytes{truncated})[/]") elif tool_name == 'list_directory': count = result.get('count', 0) truncated = " (limited to 1000)" if result.get('truncated') else "" console.print(f"[dim green] ✓ Listed {count} item(s){truncated}[/]") elif tool_name == 'inspect_database': if 'table' in result: console.print(f"[dim green] ✓ Inspected table: {result['table']} ({result['row_count']} rows)[/]") else: console.print(f"[dim green] ✓ Inspected database ({result['table_count']} tables)[/]") elif tool_name == 'search_database': count = result.get('count', 0) console.print(f"[dim green] ✓ Found {count} match(es)[/]") elif tool_name == 'query_database': count = result.get('count', 0) truncated = " (truncated)" if result.get('truncated') else "" console.print(f"[dim green] ✓ Query returned {count} row(s){truncated}[/]") else: console.print(f"[dim green] ✓ Success[/]") result_content = json.dumps(result, indent=2) app_logger.info(f"MCP tool {tool_name} succeeded") tool_results.append({ "tool_call_id": tool_call.id, "role": "tool", "name": tool_name, "content": result_content }) api_messages.append({ "role": "assistant", "content": message.content, "tool_calls": [ { "id": tc.id, "type": "function", "function": { "name": tc.function.name, "arguments": tc.function.arguments } } for tc in tool_calls ] }) api_messages.extend(tool_results) console.print("\n[dim cyan]💭 Processing tool results...[/]") app_logger.info("Sending tool results back to model") followup_params = { "model": effective_model_id, "messages": api_messages, "stream": False, # Keep streaming disabled for follow-ups "http_headers": { "HTTP-Referer": APP_URL, "X-Title": APP_NAME } } if mcp_tools_added: followup_params["tools"] = mcp_tools followup_params["tool_choice"] = "auto" if session_max_token > 0: followup_params["max_tokens"] = session_max_token try: response = client.chat.send(**followup_params) tool_call_loop_count += 1 app_logger.info(f"Follow-up request successful (loop {tool_call_loop_count})") except Exception as e: console.print(f"[bold red]Error getting follow-up response: {e}[/]") app_logger.error(f"Follow-up API error: {e}") break if not wants_tool_call: break if tool_call_loop_count >= max_tool_loops: console.print(f"[bold yellow]⚠️ Reached maximum tool call depth ({max_tool_loops}). Stopping.[/]") app_logger.warning(f"Hit max tool call loop limit: {max_tool_loops}") response_time = time.time() - start_time # ======================================================================== # PROCESS FINAL RESPONSE # ======================================================================== full_response = "" stream_interrupted = False if is_streaming: try: with Live("", console=console, refresh_per_second=10, auto_refresh=True) as live: try: for chunk in response: if hasattr(chunk, 'error') and chunk.error: console.print(f"\n[bold red]Stream error: {chunk.error.message}[/]") app_logger.error(f"Stream error: {chunk.error.message}") break if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: content_chunk = chunk.choices[0].delta.content full_response += content_chunk md = Markdown(full_response) live.update(md) except KeyboardInterrupt: stream_interrupted = True console.print("\n[bold yellow]⚠️ Streaming interrupted![/]") app_logger.info("Streaming interrupted by user (Ctrl+C)") except Exception as stream_error: stream_interrupted = True console.print(f"\n[bold red]Stream error: {stream_error}[/]") app_logger.error(f"Stream processing error: {stream_error}") if not stream_interrupted: console.print("") except KeyboardInterrupt: # Outer interrupt handler (in case inner misses it) stream_interrupted = True console.print("\n[bold yellow]⚠️ Streaming interrupted![/]") app_logger.info("Streaming interrupted by user (outer)") except Exception as e: stream_interrupted = True console.print(f"\n[bold red]Error during streaming: {e}[/]") app_logger.error(f"Streaming error: {e}") # If stream was interrupted, skip processing and continue to next prompt if stream_interrupted: if full_response: console.print(f"\n[dim yellow]Partial response received ({len(full_response)} chars). Discarding...[/]") console.print("[dim blue]💡 Ready for next prompt[/]\n") app_logger.info("Stream cleanup completed, returning to prompt") # Force close the response if possible try: if hasattr(response, 'close'): response.close() elif hasattr(response, '__exit__'): response.__exit__(None, None, None) except: pass # Recreate client to be safe try: client = OpenRouter(api_key=API_KEY) app_logger.info("Client recreated after stream interruption") except: pass continue # Now it's safe to continue else: full_response = response.choices[0].message.content if response.choices else "" console.print(f"\r{' ' * 50}\r", end="") if full_response: if not is_streaming: md = Markdown(full_response) console.print(Panel(md, title="[bold green]AI Response[/]", title_align="left", border_style="green")) session_history.append({'prompt': user_input, 'response': full_response}) current_index = len(session_history) - 1 usage = getattr(response, 'usage', None) input_tokens = usage.input_tokens if usage and hasattr(usage, 'input_tokens') else 0 output_tokens = usage.output_tokens if usage and hasattr(usage, 'output_tokens') else 0 msg_cost = usage.total_cost_usd if usage and hasattr(usage, 'total_cost_usd') else estimate_cost(input_tokens, output_tokens) total_input_tokens += input_tokens total_output_tokens += output_tokens total_cost += msg_cost message_count += 1 app_logger.info(f"Response: Tokens - I:{input_tokens} O:{output_tokens} T:{input_tokens + output_tokens}, Cost: ${msg_cost:.4f}, Time: {response_time:.2f}s, Tool loops: {tool_call_loop_count}") if conversation_memory_enabled: context_count = len(session_history) - memory_start_index context_info = f", Context: {context_count} msg(s)" if context_count > 1 else "" else: context_info = ", Memory: OFF" online_info = " 🌐" if online_mode_enabled else "" mcp_info = "" if mcp_tools_added: if mcp_manager.mode == "files": mcp_info = " 🔧" elif mcp_manager.mode == "database": mcp_info = " 🗄️" tool_info = f" ({tool_call_loop_count} tool loop(s))" if tool_call_loop_count > 0 else "" console.print(f"\n[dim blue]📊 Metrics: {input_tokens + output_tokens} tokens | ${msg_cost:.4f} | {response_time:.2f}s{context_info}{online_info}{mcp_info}{tool_info} | Session: {total_input_tokens + total_output_tokens} tokens | ${total_cost:.4f}[/]") warnings = [] if msg_cost > COST_WARNING_THRESHOLD: warnings.append(f"High cost alert: This response exceeded threshold ${COST_WARNING_THRESHOLD:.4f}") credits_data = get_credits(API_KEY, OPENROUTER_BASE_URL) if credits_data: warning_alerts = check_credit_alerts(credits_data) warnings.extend(warning_alerts) if warnings: warning_text = ' | '.join(warnings) console.print(f"[bold red]⚠️ {warning_text}[/]") app_logger.warning(f"Warnings triggered: {warning_text}") console.print("") try: copy_choice = input("💾 Type 'c' to copy response, or press Enter to continue: ").strip().lower() if copy_choice == "c": pyperclip.copy(full_response) console.print("[bold green]✅ Response copied to clipboard![/]") except (EOFError, KeyboardInterrupt): pass console.print("") else: console.print("[bold red]No response received.[/]") app_logger.error("No response from API") except KeyboardInterrupt: console.print("\n[bold yellow]Input interrupted. Continuing...[/]") app_logger.warning("Input interrupted by Ctrl+C") continue except EOFError: console.print("\n[bold yellow]Goodbye![/]") total_tokens = total_input_tokens + total_output_tokens app_logger.info(f"Session ended via EOF. Total messages: {message_count}, Total tokens: {total_tokens}, Total cost: ${total_cost:.4f}") return except Exception as e: console.print(f"[bold red]Error: {e}[/]") console.print("[bold yellow]Try again or select a model.[/]") app_logger.error(f"Unexpected error: {type(e).__name__}: {e}") if __name__ == "__main__": clear_screen() app()