Files
oai/oai/commands/handlers.py

1480 lines
50 KiB
Python

"""
Command handlers for oAI.
This module implements all the slash commands available in the chat interface.
Each command is registered with the global registry.
"""
import json
from typing import Any, Dict, List, Optional
from oai.commands.registry import (
Command,
CommandContext,
CommandHelp,
CommandResult,
CommandStatus,
registry,
)
from oai.constants import COMMAND_HELP, VALID_COMMANDS
from oai.utils.export import export_as_html, export_as_json, export_as_markdown
from oai.utils.logging import get_logger
class HelpCommand(Command):
"""Display help information for commands."""
@property
def name(self) -> str:
return "/help"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Display help information for commands.",
usage="/help [command|topic]",
examples=[
("Show all commands", "/help"),
("Get help for a specific command", "/help /model"),
("Get detailed MCP help", "/help mcp"),
],
notes="Use /help without arguments to see the full command list.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
logger = get_logger()
if args:
# Show help for specific command/topic
self._show_command_help(args)
else:
# Show general help
self._show_general_help(context)
logger.info(f"Displayed help for: {args or 'general'}")
return CommandResult.success()
def _show_command_help(self, command_or_topic: str) -> None:
"""Show help for a specific command or topic."""
# Handle topics (like 'mcp')
if not command_or_topic.startswith("/"):
if command_or_topic.lower() == "mcp":
help_data = COMMAND_HELP.get("mcp", {})
if help_data:
content = []
content.append(f"[bold cyan]Description:[/]")
content.append(help_data.get("description", ""))
content.append("")
content.append(help_data.get("notes", ""))
display_panel(
"\n".join(content),
title="[bold green]MCP - Model Context Protocol Guide[/]",
border_style="green",
)
return
command_or_topic = "/" + command_or_topic
help_data = COMMAND_HELP.get(command_or_topic)
if not help_data:
pass
return
content = []
if help_data.get("aliases"):
content.append(f"[bold cyan]Aliases:[/] {', '.join(help_data['aliases'])}")
content.append("")
content.append("[bold cyan]Description:[/]")
content.append(help_data.get("description", ""))
content.append("")
content.append("[bold cyan]Usage:[/]")
content.append(f"[yellow]{help_data.get('usage', '')}[/]")
content.append("")
examples = help_data.get("examples", [])
if examples:
content.append("[bold cyan]Examples:[/]")
for desc, example in examples:
if not desc and not example:
content.append("")
elif desc.startswith("━━━"):
content.append(f"[bold yellow]{desc}[/]")
else:
if desc:
content.append(f" [dim]{desc}:[/]")
if example:
content.append(f" [green]{example}[/]")
content.append("")
notes = help_data.get("notes")
if notes:
content.append("[bold cyan]Notes:[/]")
content.append(f"[dim]{notes}[/]")
display_panel(
"\n".join(content),
title=f"[bold green]Help: {command_or_topic}[/]",
border_style="green",
)
def _show_general_help(self, context: CommandContext) -> None:
"""Show general help with all commands."""
from rich.table import Table
table = Table(
"Command",
"Description",
"Example",
show_header=True,
header_style="bold magenta",
show_lines=False,
)
# Group commands by category
categories = [
("[bold cyan]━━━ CHAT ━━━[/]", [
("/retry", "Resend the last prompt.", "/retry"),
("/memory", "Toggle conversation memory.", "/memory on"),
("/online", "Toggle online mode (web search).", "/online on"),
("/paste", "Paste from clipboard with optional prompt.", "/paste Explain"),
]),
("[bold cyan]━━━ NAVIGATION ━━━[/]", [
("/prev", "View previous response in history.", "/prev"),
("/next", "View next response in history.", "/next"),
("/reset", "Clear conversation history.", "/reset"),
]),
("[bold cyan]━━━ MODEL & CONFIG ━━━[/]", [
("/model", "Select AI model.", "/model gpt"),
("/info", "Show model information.", "/info"),
("/config", "View or change settings.", "/config stream on"),
("/maxtoken", "Set session token limit.", "/maxtoken 2000"),
("/system", "Set system prompt.", "/system You are an expert"),
]),
("[bold cyan]━━━ SAVE & EXPORT ━━━[/]", [
("/save", "Save conversation.", "/save my_chat"),
("/load", "Load saved conversation.", "/load my_chat"),
("/delete", "Delete saved conversation.", "/delete my_chat"),
("/list", "List saved conversations.", "/list"),
("/export", "Export conversation.", "/export md notes.md"),
]),
("[bold cyan]━━━ STATS & INFO ━━━[/]", [
("/stats", "Show session statistics.", "/stats"),
("/credits", "Show account credits.", "/credits"),
("/middleout", "Toggle middle-out compression.", "/middleout on"),
]),
("[bold cyan]━━━ MCP (FILE/DB ACCESS) ━━━[/]", [
("/mcp on", "Enable MCP server.", "/mcp on"),
("/mcp add", "Add folder or database.", "/mcp add ~/Documents"),
("/mcp status", "Show MCP status.", "/mcp status"),
("/mcp write", "Toggle write mode.", "/mcp write on"),
]),
("[bold cyan]━━━ UTILITY ━━━[/]", [
("/clear", "Clear the screen.", "/clear"),
("/help", "Show this help.", "/help /model"),
]),
("[bold yellow]━━━ EXIT ━━━[/]", [
("exit", "Quit the application.", "exit"),
]),
]
for header, commands in categories:
table.add_row(header, "", "")
for cmd, desc, example in commands:
table.add_row(cmd, desc, example)
from oai.constants import APP_VERSION
display_panel(
table,
title=f"[bold cyan]oAI Chat Help (Version {APP_VERSION})[/]",
subtitle="💡 Use /help <command> for details • /help mcp for MCP guide",
)
class ClearCommand(Command):
"""Clear the terminal screen."""
@property
def name(self) -> str:
return "/clear"
@property
def aliases(self) -> List[str]:
return ["/cl"]
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Clear the terminal screen.",
usage="/clear",
aliases=["/cl"],
notes="You can also use Ctrl+L.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
pass
return CommandResult.success()
class MemoryCommand(Command):
"""Toggle conversation memory."""
@property
def name(self) -> str:
return "/memory"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Toggle conversation memory.",
usage="/memory [on|off]",
examples=[
("Check status", "/memory"),
("Enable memory", "/memory on"),
("Disable memory", "/memory off"),
],
notes="When off, each message is independent (saves tokens).",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not args:
status = "enabled" if context.memory_enabled else "disabled"
return CommandResult.success()
if args.lower() == "on":
context.memory_enabled = True
return CommandResult.success(data={"memory_enabled": True})
elif args.lower() == "off":
context.memory_enabled = False
context.memory_start_index = len(context.session_history)
return CommandResult.success(data={"memory_enabled": False})
else:
pass
return CommandResult.error("Invalid argument")
class OnlineCommand(Command):
"""Toggle online mode (web search)."""
@property
def name(self) -> str:
return "/online"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Enable or disable online mode (web search).",
usage="/online [on|off]",
examples=[
("Check status", "/online"),
("Enable web search", "/online on"),
("Disable web search", "/online off"),
],
notes="Not all models support online mode.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not args:
status = "enabled" if context.online_enabled else "disabled"
return CommandResult.success()
if args.lower() == "on":
if context.selected_model_raw:
params = context.selected_model_raw.get("supported_parameters", [])
if "tools" not in params:
pass # Model doesn't support tools, but allow enabling anyway
context.online_enabled = True
return CommandResult.success(data={"online_enabled": True})
elif args.lower() == "off":
context.online_enabled = False
return CommandResult.success(data={"online_enabled": False})
else:
pass
return CommandResult.error("Invalid argument")
class ResetCommand(Command):
"""Reset conversation history."""
@property
def name(self) -> str:
return "/reset"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Clear conversation history and reset system prompt.",
usage="/reset",
notes="Requires confirmation. Resets all session metrics.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
# TUI handles confirmation via modal
# For now, just reset directly since TUI shows confirmation dialog
context.session_history.clear()
context.session_system_prompt = ""
context.memory_start_index = 0
context.current_index = 0
context.total_input_tokens = 0
context.total_output_tokens = 0
context.total_cost = 0.0
context.message_count = 0
get_logger().info("Conversation reset by user")
return CommandResult.success()
class StatsCommand(Command):
"""Display session statistics."""
@property
def name(self) -> str:
return "/stats"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Display session statistics.",
usage="/stats",
notes="Shows tokens, costs, and credits.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
from rich.table import Table
table = Table(
"Metric",
"Value",
show_header=True,
header_style="bold magenta",
)
table.add_row("Input Tokens", f"{context.total_input_tokens:,}")
table.add_row("Output Tokens", f"{context.total_output_tokens:,}")
table.add_row(
"Total Tokens",
f"{context.total_input_tokens + context.total_output_tokens:,}",
)
table.add_row("Total Cost", f"${context.total_cost:.4f}")
if context.message_count > 0:
avg_cost = context.total_cost / context.message_count
table.add_row("Avg Cost/Message", f"${avg_cost:.4f}")
table.add_row("Messages", str(context.message_count))
# Get credits if provider available
if context.provider:
credits = context.provider.get_credits()
if credits:
table.add_row("", "")
table.add_row("[bold]Credits[/]", "")
table.add_row(
"Credits Left",
credits.get("credits_left_formatted", "N/A"),
)
return CommandResult.success()
class CreditsCommand(Command):
"""Display account credits."""
@property
def name(self) -> str:
return "/credits"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Display your OpenRouter account credits.",
usage="/credits",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not context.provider:
pass
return CommandResult.error("No provider")
credits = context.provider.get_credits()
if not credits:
pass
return CommandResult.error("Failed to fetch credits")
from rich.table import Table
table = Table(
"Metric",
"Value",
show_header=True,
header_style="bold magenta",
)
table.add_row("Total Credits", credits.get("total_credits_formatted", "N/A"))
table.add_row("Used Credits", credits.get("used_credits_formatted", "N/A"))
table.add_row("Credits Left", credits.get("credits_left_formatted", "N/A"))
# Check for warnings
from oai.constants import LOW_CREDIT_AMOUNT, LOW_CREDIT_RATIO
credits_left = credits.get("credits_left", 0)
total = credits.get("total_credits", 0)
warnings = []
if credits_left < LOW_CREDIT_AMOUNT:
warnings.append(f"Less than ${LOW_CREDIT_AMOUNT:.2f} remaining!")
elif total > 0 and credits_left < total * LOW_CREDIT_RATIO:
warnings.append("Less than 10% of credits remaining!")
for warning in warnings:
pass # Warnings handled by TUI
return CommandResult.success(data=credits)
class ExportCommand(Command):
"""Export conversation to file."""
@property
def name(self) -> str:
return "/export"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Export conversation to a file.",
usage="/export <format> <filename>",
examples=[
("Export as Markdown", "/export md notes.md"),
("Export as JSON", "/export json conversation.json"),
("Export as HTML", "/export html report.html"),
],
notes="Available formats: md, json, html",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
parts = args.split(maxsplit=1)
if len(parts) != 2:
pass
return CommandResult.error("Invalid arguments")
fmt, filename = parts
fmt = fmt.lower()
if fmt not in ["md", "json", "html"]:
pass
return CommandResult.error("Invalid format")
if not context.session_history:
pass
return CommandResult.error("Empty history")
try:
if fmt == "md":
content = export_as_markdown(
context.session_history,
context.session_system_prompt,
)
elif fmt == "json":
content = export_as_json(
context.session_history,
context.session_system_prompt,
)
else: # html
content = export_as_html(
context.session_history,
context.session_system_prompt,
)
with open(filename, "w", encoding="utf-8") as f:
f.write(content)
get_logger().info(f"Exported conversation to {filename} ({fmt})")
return CommandResult.success()
except Exception as e:
pass
return CommandResult.error(str(e))
class MiddleOutCommand(Command):
"""Toggle middle-out compression."""
@property
def name(self) -> str:
return "/middleout"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Toggle middle-out transform for long prompts.",
usage="/middleout [on|off]",
examples=[
("Check status", "/middleout"),
("Enable", "/middleout on"),
("Disable", "/middleout off"),
],
notes="Compresses prompts exceeding context size.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not args:
status = "enabled" if context.middle_out_enabled else "disabled"
return CommandResult.success()
if args.lower() == "on":
context.middle_out_enabled = True
return CommandResult.success(data={"middle_out_enabled": True})
elif args.lower() == "off":
context.middle_out_enabled = False
return CommandResult.success(data={"middle_out_enabled": False})
else:
pass
return CommandResult.error("Invalid argument")
class MaxTokenCommand(Command):
"""Set session token limit."""
@property
def name(self) -> str:
return "/maxtoken"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Set session token limit.",
usage="/maxtoken [value]",
examples=[
("View current", "/maxtoken"),
("Set to 2000", "/maxtoken 2000"),
],
notes="Cannot exceed stored max token setting.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not args:
if context.session_max_token > 0:
message = f"Session max token: {context.session_max_token}"
else:
message = "No session token limit set (using model default)."
return CommandResult.success(message=message)
try:
value = int(args)
if value <= 0:
message = "Token limit must be positive."
return CommandResult.error(message)
# Check against stored limit
stored_max = 100000 # Default
if context.settings:
stored_max = context.settings.max_tokens
warning_msg = None
if value > stored_max:
warning_msg = f"⚠️ Value {value} exceeds stored limit {stored_max}. Using {stored_max}."
value = stored_max
context.session_max_token = value
message = f"✓ Session max token set to {value}."
if warning_msg and context.is_tui:
message = f"{warning_msg}\n{message}"
return CommandResult.success(message=message, data={"session_max_token": value})
except ValueError:
message = "Please enter a valid number."
return CommandResult.error(message)
class SystemCommand(Command):
"""Set session system prompt."""
@property
def name(self) -> str:
return "/system"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Set or clear the session system prompt.",
usage="/system [prompt|clear|default <prompt>]",
examples=[
("View current", "/system"),
("Set prompt", "/system You are a Python expert"),
("Multiline prompt", r"/system You are an expert.\nRespond clearly."),
("Blank prompt", '/system ""'),
("Save as default", "/system default You are a Python expert"),
("Revert to default", "/system clear"),
],
notes=r'Use \n for newlines. Use /system "" for blank, /system clear to revert to default.',
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
from oai.constants import DEFAULT_SYSTEM_PROMPT
if not args:
# Show current session prompt and default
msg = []
if context.session_system_prompt:
msg.append(f"Session prompt: {context.session_system_prompt}")
else:
msg.append("Session prompt: [blank]")
if context.settings:
if context.settings.default_system_prompt is None:
msg.append(f"Default prompt: [hardcoded] {DEFAULT_SYSTEM_PROMPT[:60]}...")
elif context.settings.default_system_prompt == "":
msg.append("Default prompt: [blank]")
else:
msg.append(f"Custom default: {context.settings.default_system_prompt}")
message = "\n".join(msg)
return CommandResult.success(message=message)
if args.lower() == "clear":
# Revert to hardcoded default
if context.settings:
context.settings.clear_default_system_prompt()
context.session_system_prompt = DEFAULT_SYSTEM_PROMPT
message = f"✓ Reverted to hardcoded default system prompt.\nDefault: {DEFAULT_SYSTEM_PROMPT[:60]}..."
else:
context.session_system_prompt = DEFAULT_SYSTEM_PROMPT
message = "✓ Session prompt reverted to default."
return CommandResult.success(message=message)
# Check for default command
if args.lower().startswith("default "):
prompt = args[8:] # Remove "default " prefix (keep trailing spaces)
if not prompt:
message = "Usage: /system default <prompt>"
return CommandResult.error(message)
# Decode escape sequences like \n for newlines
prompt = prompt.encode().decode('unicode_escape')
if context.settings:
context.settings.set_default_system_prompt(prompt)
context.session_system_prompt = prompt
if prompt:
message = f"✓ Default system prompt saved: {prompt}"
else:
message = "✓ Default system prompt set to blank."
get_logger().info(f"Default system prompt updated: {prompt[:50]}...")
return CommandResult.success(message=message)
else:
message = "Settings not available"
return CommandResult.error(message)
# Decode escape sequences like \n for newlines
prompt = args.encode().decode('unicode_escape')
context.session_system_prompt = prompt
if prompt:
message = f"✓ Session system prompt set: {prompt}\nTip: Use '/system default <prompt>' to save as default"
else:
message = "✓ Session system prompt set to blank."
get_logger().info(f"System prompt updated: {prompt[:50]}...")
return CommandResult.success(message=message)
class RetryCommand(Command):
"""Resend the last prompt."""
@property
def name(self) -> str:
return "/retry"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Resend the last prompt.",
usage="/retry",
notes="Requires at least one message in history.",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not context.session_history:
message = "No message to retry."
return CommandResult.error(message)
last_prompt = context.session_history[-1].get("prompt", "")
if not last_prompt:
message = "Last message has no prompt."
return CommandResult.error(message)
# Return the prompt to be re-sent
preview = last_prompt[:50] + "..." if len(last_prompt) > 50 else last_prompt
message = f"Retrying: {preview}"
return CommandResult.success(message=message, data={"retry_prompt": last_prompt})
class PrevCommand(Command):
"""View previous response."""
@property
def name(self) -> str:
return "/prev"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="View previous response in history.",
usage="/prev",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not context.session_history:
pass
return CommandResult.error("Empty history")
if context.current_index <= 0:
pass
return CommandResult.success()
context.current_index -= 1
entry = context.session_history[context.current_index]
# TUI handles display
return CommandResult.success(data={"show_history_entry": entry, "index": context.current_index})
class NextCommand(Command):
"""View next response."""
@property
def name(self) -> str:
return "/next"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="View next response in history.",
usage="/next",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not context.session_history:
pass
return CommandResult.error("Empty history")
if context.current_index >= len(context.session_history) - 1:
pass
return CommandResult.success()
context.current_index += 1
entry = context.session_history[context.current_index]
# TUI handles display
return CommandResult.success(data={"show_history_entry": entry, "index": context.current_index})
class ConfigCommand(Command):
"""View or modify configuration."""
@property
def name(self) -> str:
return "/config"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="View or modify application configuration.",
usage="/config [setting] [value]",
examples=[
("View all settings", "/config"),
("Set API key", "/config api"),
("Enable streaming", "/config stream on"),
],
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
from rich.table import Table
if not context.settings:
pass
return CommandResult.error("No settings")
settings = context.settings
if not args:
# Show all settings
from oai.constants import DEFAULT_SYSTEM_PROMPT
table = Table("Setting", "Value", show_header=True, header_style="bold magenta")
table.add_row("API Key", "***" + settings.api_key[-4:] if settings.api_key else "Not set")
table.add_row("Base URL", settings.base_url)
table.add_row("Default Model", settings.default_model or "Not set")
# Show system prompt status
if settings.default_system_prompt is None:
system_prompt_display = f"[default] {DEFAULT_SYSTEM_PROMPT[:40]}..."
elif settings.default_system_prompt == "":
system_prompt_display = "[blank]"
else:
system_prompt_display = settings.default_system_prompt[:50] + "..." if len(settings.default_system_prompt) > 50 else settings.default_system_prompt
table.add_row("System Prompt", system_prompt_display)
table.add_row("Streaming", "on" if settings.stream_enabled else "off")
table.add_row("Cost Warning", f"${settings.cost_warning_threshold:.4f}")
table.add_row("Max Tokens", str(settings.max_tokens))
table.add_row("Default Online", "on" if settings.default_online_mode else "off")
table.add_row("Log Level", settings.log_level)
return CommandResult.success()
parts = args.split(maxsplit=1)
setting = parts[0].lower()
value = parts[1] if len(parts) > 1 else None
if setting == "api":
if value:
settings.set_api_key(value)
else:
# TUI handles API key input via modal
return CommandResult.success(data={"show_api_key_input": True})
elif setting == "stream":
if value and value.lower() in ["on", "off"]:
settings.set_stream_enabled(value.lower() == "on")
else:
pass # Invalid argument, silently ignore
elif setting == "model":
if value:
# Show model selector with search term, same as /model
return CommandResult.success(data={"show_model_selector": True, "search": value, "set_as_default": True})
else:
# Show model selector without search filter
return CommandResult.success(data={"show_model_selector": True, "search": "", "set_as_default": True})
elif setting == "system":
from oai.constants import DEFAULT_SYSTEM_PROMPT
if value:
# Decode escape sequences like \n for newlines
value = value.encode().decode('unicode_escape')
settings.set_default_system_prompt(value)
if value:
pass
else:
pass
else:
if settings.default_system_prompt is None:
pass
elif settings.default_system_prompt == "":
pass
else:
pass
elif setting == "online":
if value and value.lower() in ["on", "off"]:
settings.set_default_online_mode(value.lower() == "on")
else:
pass
elif setting == "costwarning":
if value:
try:
settings.set_cost_warning_threshold(float(value))
print_success(f"Cost warning set to: ${float(value):.4f}")
except ValueError:
pass
else:
pass
elif setting == "maxtoken":
if value:
try:
settings.set_max_tokens(int(value))
except ValueError:
pass
else:
pass
elif setting == "loglevel":
valid_levels = ["debug", "info", "warning", "error", "critical"]
if value and value.lower() in valid_levels:
settings.set_log_level(value.lower())
print_success(f"Log level set to: {value.lower()}")
else:
print_info(f"Valid levels: {', '.join(valid_levels)}")
else:
pass
return CommandResult.error("Unknown setting")
return CommandResult.success()
class ListCommand(Command):
"""List saved conversations."""
@property
def name(self) -> str:
return "/list"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="List all saved conversations.",
usage="/list",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
from oai.config.database import Database
from rich.table import Table
db = Database()
conversations = db.list_conversations()
if not conversations:
pass
return CommandResult.success()
table = Table("No.", "Name", "Messages", "Last Saved", show_header=True, header_style="bold magenta")
for i, conv in enumerate(conversations, 1):
table.add_row(
str(i),
conv["name"],
str(conv["message_count"]),
conv["timestamp"][:19] if conv.get("timestamp") else "-",
)
return CommandResult.success(data={"conversations": conversations})
class SaveCommand(Command):
"""Save conversation."""
@property
def name(self) -> str:
return "/save"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Save the current conversation.",
usage="/save <name>",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not args:
pass
return CommandResult.error("Missing name")
if not context.session_history:
pass
return CommandResult.error("Empty history")
from oai.config.database import Database
db = Database()
db.save_conversation(args, context.session_history)
get_logger().info(f"Saved conversation: {args}")
return CommandResult.success()
class LoadCommand(Command):
"""Load saved conversation."""
@property
def name(self) -> str:
return "/load"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Load a saved conversation.",
usage="/load <name|number>",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not args:
pass
return CommandResult.error("Missing name")
from oai.config.database import Database
db = Database()
# Check if it's a number
name = args
if args.isdigit():
conversations = db.list_conversations()
index = int(args) - 1
if 0 <= index < len(conversations):
name = conversations[index]["name"]
else:
print_error(f"Invalid number. Use 1-{len(conversations)}")
return CommandResult.error("Invalid number")
data = db.load_conversation(name)
if not data:
pass
return CommandResult.error("Not found")
print_success(f"Loaded conversation '{name}' ({len(data)} messages)")
get_logger().info(f"Loaded conversation: {name}")
return CommandResult.success(data={"load_conversation": name, "history": data})
class DeleteCommand(Command):
"""Delete saved conversation."""
@property
def name(self) -> str:
return "/delete"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Delete a saved conversation.",
usage="/delete <name|number>",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not args:
pass
return CommandResult.error("Missing name")
from oai.config.database import Database
db = Database()
# Check if it's a number
name = args
if args.isdigit():
conversations = db.list_conversations()
index = int(args) - 1
if 0 <= index < len(conversations):
name = conversations[index]["name"]
else:
print_error(f"Invalid number. Use 1-{len(conversations)}")
return CommandResult.error("Invalid number")
# TUI handles confirmation via modal
count = db.delete_conversation(name)
if count > 0:
pass
get_logger().info(f"Deleted conversation: {name}")
else:
pass
return CommandResult.error("Not found")
return CommandResult.success()
class InfoCommand(Command):
"""Show model information."""
@property
def name(self) -> str:
return "/info"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Display detailed model information.",
usage="/info [model_id]",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
from rich.table import Table
if not context.provider:
pass
return CommandResult.error("No provider")
model_id = args.strip() if args else None
if not model_id and context.selected_model_raw:
model_id = context.selected_model_raw.get("id")
if not model_id:
pass
return CommandResult.error("No model")
# Get raw model data
model = None
if hasattr(context.provider, "get_raw_model"):
model = context.provider.get_raw_model(model_id)
if not model:
pass
return CommandResult.error("Not found")
table = Table("Property", "Value", show_header=True, header_style="bold magenta")
table.add_row("ID", model.get("id", ""))
table.add_row("Name", model.get("name", ""))
table.add_row("Context Length", f"{model.get('context_length', 0):,}")
# Pricing
pricing = model.get("pricing", {})
if pricing:
prompt_price = float(pricing.get("prompt", 0)) * 1_000_000
completion_price = float(pricing.get("completion", 0)) * 1_000_000
table.add_row("Input Price", f"${prompt_price:.2f}/M tokens")
table.add_row("Output Price", f"${completion_price:.2f}/M tokens")
# Capabilities
arch = model.get("architecture", {})
input_mod = arch.get("input_modalities", [])
output_mod = arch.get("output_modalities", [])
supported = model.get("supported_parameters", [])
table.add_row("Input Modalities", ", ".join(input_mod) if input_mod else "text")
table.add_row("Output Modalities", ", ".join(output_mod) if output_mod else "text")
table.add_row("Image Support", "" if "image" in input_mod else "")
table.add_row("Tool Support", "" if "tools" in supported else "")
table.add_row("Online Support", "" if "tools" in supported else "")
return CommandResult.success()
class MCPCommand(Command):
"""MCP management command."""
@property
def name(self) -> str:
return "/mcp"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Manage MCP (Model Context Protocol).",
usage="/mcp <command> [args]",
examples=[
("Enable MCP", "/mcp on"),
("Show status", "/mcp status"),
("Add folder", "/mcp add ~/Documents"),
],
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
if not context.mcp_manager:
message = "MCP not available"
return CommandResult.error(message)
mcp = context.mcp_manager
parts = args.strip().split(maxsplit=1)
cmd = parts[0].lower() if parts else ""
cmd_args = parts[1] if len(parts) > 1 else ""
if cmd in ["on", "enable"]:
result = mcp.enable()
if result["success"]:
message = result.get("message", "✓ MCP enabled")
if result.get("folder_count", 0) == 0:
message += "\nTip: Add folders with: /mcp add ~/Documents"
return CommandResult.success(message=message)
else:
message = result.get("error", "Failed to enable MCP")
return CommandResult.error(message)
elif cmd in ["off", "disable"]:
result = mcp.disable()
if result["success"]:
message = "✓ MCP disabled"
return CommandResult.success(message=message)
else:
message = result.get("error", "Failed to disable MCP")
return CommandResult.error(message)
elif cmd == "status":
result = mcp.get_status()
if result["success"]:
# Format as text for TUI
status_str = "Active ✓" if result["enabled"] else "Inactive ✗"
mode = result.get("mode_info", {}).get("mode_display", "files")
folders = result.get("folder_count", 0)
databases = result.get("database_count", 0)
write_mode = "Enabled" if result.get("write_enabled") else "Disabled"
gitignore = result.get("gitignore_status", "on")
message = f"**MCP Status**\n\n"
message += f"**Status:** {status_str}\n"
message += f"**Mode:** {mode}\n"
message += f"**Folders:** {folders}\n"
message += f"**Databases:** {databases}\n"
message += f"**Write Mode:** {write_mode}\n"
message += f"**.gitignore:** {gitignore}"
if not context.is_tui:
from rich.table import Table
table = Table("Property", "Value", show_header=True, header_style="bold magenta")
table.add_row("Status", status_str)
table.add_row("Mode", mode)
table.add_row("Folders", str(folders))
table.add_row("Databases", str(databases))
table.add_row("Write Mode", write_mode)
table.add_row(".gitignore", gitignore)
return CommandResult.success()
else:
return CommandResult.success(message=message)
return CommandResult.error("Failed to get MCP status")
elif cmd == "add":
if cmd_args.startswith("db "):
db_path = cmd_args[3:].strip()
result = mcp.add_database(db_path)
else:
result = mcp.add_folder(cmd_args)
if result["success"]:
message = f"✓ Added: {cmd_args}"
return CommandResult.success(message=message)
else:
message = result.get("error", "Failed to add")
return CommandResult.error(message)
elif cmd in ["remove", "rem"]:
if cmd_args.startswith("db "):
result = mcp.remove_database(cmd_args[3:].strip())
else:
result = mcp.remove_folder(cmd_args)
if result["success"]:
message = f"✓ Removed: {cmd_args}"
return CommandResult.success(message=message)
else:
message = result.get("error", "Failed to remove")
return CommandResult.error(message)
elif cmd == "list":
result = mcp.list_folders()
if result["success"]:
folders = result.get("folders", [])
if not folders:
message = "No folders added. Use: /mcp add <path>"
return CommandResult.success(message=message)
if context.is_tui:
# Format as text for TUI
message = "**MCP Folders**\n\n"
for f in folders:
message += f"**{f['number']}.** {f['path']}\n"
message += f" Files: {f.get('file_count', 0)}, Size: {f.get('size_mb', 0):.1f} MB\n"
return CommandResult.success(message=message)
else:
# Rich table for CLI
from rich.table import Table
table = Table("No.", "Path", "Files", "Size", show_header=True, header_style="bold magenta")
for f in folders:
table.add_row(
str(f["number"]),
f["path"],
str(f.get("file_count", 0)),
f"{f.get('size_mb', 0):.1f} MB",
)
return CommandResult.success()
return CommandResult.error("Failed to list folders")
elif cmd == "db":
if cmd_args == "list":
result = mcp.list_databases()
if result["success"]:
databases = result.get("databases", [])
if not databases:
message = "No databases added. Use: /mcp add db <path>"
return CommandResult.success(message=message)
if context.is_tui:
# Format as text for TUI
message = "**MCP Databases**\n\n"
for db in databases:
message += f"**{db['number']}.** {db['name']}\n"
message += f" Tables: {db.get('table_count', 0)}, Size: {db.get('size_mb', 0):.1f} MB\n"
return CommandResult.success(message=message)
else:
# Rich table for CLI
from rich.table import Table
table = Table("No.", "Name", "Tables", "Size", show_header=True, header_style="bold magenta")
for db in databases:
table.add_row(
str(db["number"]),
db["name"],
str(db.get("table_count", 0)),
f"{db.get('size_mb', 0):.1f} MB",
)
return CommandResult.success()
return CommandResult.error("Failed to list databases")
elif cmd_args.isdigit():
result = mcp.switch_mode("database", int(cmd_args))
if result["success"]:
message = result.get("message", "✓ Switched to database mode")
return CommandResult.success(message=message)
else:
message = result.get("error", "Failed to switch mode")
return CommandResult.error(message)
else:
message = "Usage: /mcp db list or /mcp db <number>"
return CommandResult.success(message=message)
elif cmd == "files":
result = mcp.switch_mode("files")
if result["success"]:
message = "✓ Switched to file mode"
return CommandResult.success(message=message)
else:
message = result.get("error", "Failed to switch to file mode")
return CommandResult.error(message)
elif cmd == "write":
if cmd_args.lower() == "on":
mcp.enable_write()
message = "✓ Write mode enabled"
return CommandResult.success(message=message)
elif cmd_args.lower() == "off":
mcp.disable_write()
message = "✓ Write mode disabled"
return CommandResult.success(message=message)
else:
status = "Enabled" if mcp.write_enabled else "Disabled"
message = f"Write mode: {status}"
return CommandResult.success(message=message)
elif cmd == "gitignore":
if cmd_args.lower() in ["on", "off"]:
result = mcp.toggle_gitignore(cmd_args.lower() == "on")
if result["success"]:
message = result.get("message", "✓ Updated")
return CommandResult.success(message=message)
else:
message = result.get("error", "Failed to update gitignore")
return CommandResult.error(message)
else:
message = "Usage: /mcp gitignore [on|off]"
return CommandResult.success(message=message)
else:
message = f"Unknown MCP command: {cmd}\nAvailable: on, off, status, add, remove, list, db, files, write, gitignore"
return CommandResult.error(message)
class PasteCommand(Command):
"""Paste from clipboard."""
@property
def name(self) -> str:
return "/paste"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Paste from clipboard and send to AI.",
usage="/paste [prompt]",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
try:
import pyperclip
content = pyperclip.paste()
except ImportError:
message = "pyperclip not installed"
return CommandResult.error(message)
except Exception as e:
message = f"Failed to access clipboard: {e}"
return CommandResult.error(str(e))
if not content:
message = "Clipboard is empty"
return CommandResult.error(message)
# Build the prompt
if args:
full_prompt = f"{args}\n\n```\n{content}\n```"
else:
full_prompt = content
return CommandResult.success(data={"paste_prompt": full_prompt})
class ModelCommand(Command):
"""Select AI model."""
@property
def name(self) -> str:
return "/model"
@property
def help(self) -> CommandHelp:
return CommandHelp(
description="Select or search for AI models.",
usage="/model [search_term]",
)
def execute(self, args: str, context: CommandContext) -> CommandResult:
# This is handled specially in the CLI, but we need a handler
# to prevent it from being sent to the AI
return CommandResult.success(data={"show_model_selector": True, "search": args})
def register_all_commands() -> None:
"""Register all built-in commands with the global registry."""
commands = [
HelpCommand(),
ClearCommand(),
MemoryCommand(),
OnlineCommand(),
ResetCommand(),
StatsCommand(),
CreditsCommand(),
ExportCommand(),
MiddleOutCommand(),
MaxTokenCommand(),
SystemCommand(),
RetryCommand(),
PrevCommand(),
NextCommand(),
ConfigCommand(),
ListCommand(),
SaveCommand(),
LoadCommand(),
DeleteCommand(),
InfoCommand(),
MCPCommand(),
PasteCommand(),
ModelCommand(),
]
for command in commands:
try:
registry.register(command)
except ValueError as e:
get_logger().warning(f"Failed to register command: {e}")