2 Commits
main ... 3.0

Author SHA1 Message Date
1191fa6d19 Updated README 2026-02-04 11:26:08 +01:00
6298158d3c oAI version 3.0 beta 1 2026-02-04 11:22:53 +01:00
32 changed files with 3592 additions and 1864 deletions

1
.gitignore vendored
View File

@@ -45,3 +45,4 @@ b0.sh
requirements.txt requirements.txt
system_prompt.txt system_prompt.txt
CLAUDE* CLAUDE*
SESSION*_COMPLETE.md

View File

@@ -1,19 +1,21 @@
# oAI - OpenRouter AI Chat Client # oAI - OpenRouter AI Chat Client
A powerful, extensible terminal-based chat client for OpenRouter API with **MCP (Model Context Protocol)** support, enabling AI to access local files and query SQLite databases. A powerful, modern **Textual TUI** chat client for OpenRouter API with **MCP (Model Context Protocol)** support, enabling AI to access local files and query SQLite databases.
## Features ## Features
### Core Features ### Core Features
- 🖥️ **Modern Textual TUI** with async streaming and beautiful interface
- 🤖 Interactive chat with 300+ AI models via OpenRouter - 🤖 Interactive chat with 300+ AI models via OpenRouter
- 🔍 Model selection with search and filtering - 🔍 Model selection with search, filtering, and capability icons
- 💾 Conversation save/load/export (Markdown, JSON, HTML) - 💾 Conversation save/load/export (Markdown, JSON, HTML)
- 📎 File attachments (images, PDFs, code files) - 📎 File attachments (images, PDFs, code files)
- 💰 Real-time cost tracking and credit monitoring - 💰 Real-time cost tracking and credit monitoring
- 🎨 Rich terminal UI with syntax highlighting - 🎨 Dark theme with syntax highlighting and Markdown rendering
- 📝 Persistent command history with search (Ctrl+R) - 📝 Command history navigation (Up/Down arrows)
- 🌐 Online mode (web search capabilities) - 🌐 Online mode (web search capabilities)
- 🧠 Conversation memory toggle - 🧠 Conversation memory toggle
- ⌨️ Keyboard shortcuts (F1=Help, F2=Models, Ctrl+S=Stats)
### MCP Integration ### MCP Integration
- 🔧 **File Mode**: AI can read, search, and list local files - 🔧 **File Mode**: AI can read, search, and list local files
@@ -38,26 +40,15 @@ A powerful, extensible terminal-based chat client for OpenRouter API with **MCP
## Installation ## Installation
### Option 1: Install from Source (Recommended) ### Option 1: Pre-built Binary (macOS/Linux) (Recommended)
```bash
# Clone the repository
git clone https://gitlab.pm/rune/oai.git
cd oai
# Install with pip
pip install -e .
```
### Option 2: Pre-built Binary (macOS/Linux)
Download from [Releases](https://gitlab.pm/rune/oai/releases): Download from [Releases](https://gitlab.pm/rune/oai/releases):
- **macOS (Apple Silicon)**: `oai_v2.1.0_mac_arm64.zip` - **macOS (Apple Silicon)**: `oai_v3.0.0_mac_arm64.zip`
- **Linux (x86_64)**: `oai_v2.1.0_linux_x86_64.zip` - **Linux (x86_64)**: `oai_v3.0.0_linux_x86_64.zip`
```bash ```bash
# Extract and install # Extract and install
unzip oai_v2.1.0_*.zip unzip oai_v3.0.0_*.zip
mkdir -p ~/.local/bin mkdir -p ~/.local/bin
mv oai ~/.local/bin/ mv oai ~/.local/bin/
@@ -73,14 +64,30 @@ xattr -cr ~/.local/bin/oai
export PATH="$HOME/.local/bin:$PATH" export PATH="$HOME/.local/bin:$PATH"
``` ```
### Option 2: Install from Source
```bash
# Clone the repository
git clone https://gitlab.pm/rune/oai.git
cd oai
# Install with pip
pip install -e .
```
## Quick Start ## Quick Start
```bash ```bash
# Start the chat client # Start oAI (launches TUI)
oai chat oai
# Or with options # Or with options
oai chat --model gpt-4o --mcp oai --model gpt-4o --online --mcp
# Show version
oai version
``` ```
On first run, you'll be prompted for your OpenRouter API key. On first run, you'll be prompted for your OpenRouter API key.
@@ -88,12 +95,14 @@ On first run, you'll be prompted for your OpenRouter API key.
### Basic Commands ### Basic Commands
```bash ```bash
# In the chat interface: # In the TUI interface:
/model # Select AI model /model # Select AI model (or press F2)
/help # Show all commands /help # Show all commands (or press F1)
/mcp on # Enable file/database access /mcp on # Enable file/database access
/stats # View session statistics /stats # View session statistics (or press Ctrl+S)
exit # Quit /config # View configuration settings
/credits # Check account credits
Ctrl+Q # Quit
``` ```
## MCP (Model Context Protocol) ## MCP (Model Context Protocol)
@@ -184,21 +193,22 @@ MCP allows the AI to interact with your local files and databases.
## CLI Options ## CLI Options
```bash ```bash
oai chat [OPTIONS] oai [OPTIONS]
Options: Options:
-m, --model TEXT Model ID to use -m, --model TEXT Model ID to use
-s, --system TEXT System prompt -s, --system TEXT System prompt
-o, --online Enable online mode -o, --online Enable online mode
--mcp Enable MCP server --mcp Enable MCP server
-v, --version Show version
--help Show help --help Show help
``` ```
Other commands: Commands:
```bash ```bash
oai config [setting] [value] # Configure settings oai # Launch TUI (default)
oai version # Show version oai version # Show version information
oai credits # Check credits oai --help # Show help message
``` ```
## Configuration ## Configuration
@@ -218,14 +228,18 @@ oai/
├── oai/ ├── oai/
│ ├── __init__.py │ ├── __init__.py
│ ├── __main__.py # Entry point for python -m oai │ ├── __main__.py # Entry point for python -m oai
│ ├── cli.py # Main CLI interface │ ├── cli.py # Main CLI entry point
│ ├── constants.py # Configuration constants │ ├── constants.py # Configuration constants
│ ├── commands/ # Slash command handlers │ ├── commands/ # Slash command handlers
│ ├── config/ # Settings and database │ ├── config/ # Settings and database
│ ├── core/ # Chat client and session │ ├── core/ # Chat client and session
│ ├── mcp/ # MCP server and tools │ ├── mcp/ # MCP server and tools
│ ├── providers/ # AI provider abstraction │ ├── providers/ # AI provider abstraction
│ ├── ui/ # Terminal UI utilities │ ├── tui/ # Textual TUI interface
│ │ ├── app.py # Main TUI application
│ │ ├── widgets/ # Custom widgets
│ │ ├── screens/ # Modal screens
│ │ └── styles.tcss # TUI styling
│ └── utils/ # Logging, export, etc. │ └── utils/ # Logging, export, etc.
├── pyproject.toml # Package configuration ├── pyproject.toml # Package configuration
├── build.sh # Binary build script ├── build.sh # Binary build script
@@ -266,7 +280,18 @@ pip install -e . --force-reinstall
## Version History ## Version History
### v2.1.0 (Current) ### v3.0.0 (Current)
- 🎨 **Complete migration to Textual TUI** - Modern async terminal interface
- 🗑️ **Removed CLI interface** - TUI-only for cleaner codebase (11.6% smaller)
- 🖱️ **Modal screens** - Help, stats, config, credits, model selector
- ⌨️ **Keyboard shortcuts** - F1 (help), F2 (models), Ctrl+S (stats), etc.
- 🎯 **Capability indicators** - Visual icons for model features (vision, tools, online)
- 🎨 **Consistent dark theme** - Professional styling throughout
- 📊 **Enhanced model selector** - Search, filter, capability columns
- 🚀 **Default command** - Just run `oai` to launch TUI
- 🧹 **Code cleanup** - Removed 1,300+ lines of CLI code
### v2.1.0
- 🏗️ Complete codebase refactoring to modular package structure - 🏗️ Complete codebase refactoring to modular package structure
- 🔌 Extensible provider architecture for adding new AI providers - 🔌 Extensible provider architecture for adding new AI providers
- 📦 Proper Python packaging with pyproject.toml - 📦 Proper Python packaging with pyproject.toml

View File

@@ -1,55 +1,27 @@
""" """
Main CLI entry point for oAI. Main CLI entry point for oAI.
This module provides the command-line interface for the oAI chat application. This module provides the command-line interface for the oAI TUI application.
""" """
import os
import sys import sys
from pathlib import Path
from typing import Optional from typing import Optional
import typer import typer
from prompt_toolkit import PromptSession
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.history import FileHistory
from rich.markdown import Markdown
from rich.panel import Panel
from oai import __version__ from oai import __version__
from oai.commands import register_all_commands, registry from oai.commands import register_all_commands
from oai.commands.registry import CommandContext, CommandStatus
from oai.config.database import Database
from oai.config.settings import Settings from oai.config.settings import Settings
from oai.constants import ( from oai.constants import APP_URL, APP_VERSION
APP_NAME,
APP_URL,
APP_VERSION,
CONFIG_DIR,
HISTORY_FILE,
VALID_COMMANDS,
)
from oai.core.client import AIClient from oai.core.client import AIClient
from oai.core.session import ChatSession from oai.core.session import ChatSession
from oai.mcp.manager import MCPManager from oai.mcp.manager import MCPManager
from oai.providers.base import UsageStats
from oai.providers.openrouter import OpenRouterProvider
from oai.ui.console import (
clear_screen,
console,
display_panel,
print_error,
print_info,
print_success,
print_warning,
)
from oai.ui.tables import create_model_table, display_paginated_table
from oai.utils.logging import LoggingManager, get_logger from oai.utils.logging import LoggingManager, get_logger
# Create Typer app # Create Typer app
app = typer.Typer( app = typer.Typer(
name="oai", name="oai",
help=f"oAI - OpenRouter AI Chat Client\n\nVersion: {APP_VERSION}", help=f"oAI - OpenRouter AI Chat Client (TUI)\n\nVersion: {APP_VERSION}",
add_completion=False, add_completion=False,
epilog="For more information, visit: " + APP_URL, epilog="For more information, visit: " + APP_URL,
) )
@@ -65,374 +37,6 @@ def main_callback(
help="Show version information", help="Show version information",
is_flag=True, is_flag=True,
), ),
) -> None:
"""Main callback to handle global options."""
# Show version with update check if --version flag
if version_flag:
version_info = check_for_updates(APP_VERSION)
console.print(version_info)
raise typer.Exit()
# Show version with update check when --help is requested
if "--help" in sys.argv or "-h" in sys.argv:
version_info = check_for_updates(APP_VERSION)
console.print(f"\n{version_info}\n")
# Continue to subcommand if provided
if ctx.invoked_subcommand is None:
return
def check_for_updates(current_version: str) -> str:
"""Check for available updates."""
import requests
from packaging import version as pkg_version
try:
response = requests.get(
"https://gitlab.pm/api/v1/repos/rune/oai/releases/latest",
headers={"Content-Type": "application/json"},
timeout=1.0,
)
response.raise_for_status()
data = response.json()
version_online = data.get("tag_name", "").lstrip("v")
if not version_online:
return f"[bold green]oAI version {current_version}[/]"
current = pkg_version.parse(current_version)
latest = pkg_version.parse(version_online)
if latest > current:
return (
f"[bold green]oAI version {current_version}[/] "
f"[bold red](Update available: {current_version}{version_online})[/]"
)
return f"[bold green]oAI version {current_version} (up to date)[/]"
except Exception:
return f"[bold green]oAI version {current_version}[/]"
def show_welcome(settings: Settings, version_info: str) -> None:
"""Display welcome message."""
console.print(Panel.fit(
f"{version_info}\n\n"
"[bold cyan]Commands:[/] /help for commands, /model to select model\n"
"[bold cyan]MCP:[/] /mcp on to enable file/database access\n"
"[bold cyan]Exit:[/] Type 'exit', 'quit', or 'bye'",
title=f"[bold green]Welcome to {APP_NAME}[/]",
border_style="green",
))
def select_model(client: AIClient, search_term: Optional[str] = None) -> Optional[dict]:
"""Display model selection interface."""
try:
models = client.provider.get_raw_models()
if not models:
print_error("No models available")
return None
# Filter by search term if provided
if search_term:
search_lower = search_term.lower()
models = [m for m in models if search_lower in m.get("id", "").lower()]
if not models:
print_error(f"No models found matching '{search_term}'")
return None
# Create and display table
table = create_model_table(models)
display_paginated_table(
table,
f"[bold green]Available Models ({len(models)})[/]",
)
# Prompt for selection
console.print("")
try:
choice = input("Enter model number (or press Enter to cancel): ").strip()
except (EOFError, KeyboardInterrupt):
return None
if not choice:
return None
try:
index = int(choice) - 1
if 0 <= index < len(models):
selected = models[index]
print_success(f"Selected model: {selected['id']}")
return selected
except ValueError:
pass
print_error("Invalid selection")
return None
except Exception as e:
print_error(f"Failed to fetch models: {e}")
return None
def run_chat_loop(
session: ChatSession,
prompt_session: PromptSession,
settings: Settings,
) -> None:
"""Run the main chat loop."""
logger = get_logger()
mcp_manager = session.mcp_manager
while True:
try:
# Build prompt prefix
prefix = "You> "
if mcp_manager and mcp_manager.enabled:
if mcp_manager.mode == "files":
if mcp_manager.write_enabled:
prefix = "[🔧✍️ MCP: Files+Write] You> "
else:
prefix = "[🔧 MCP: Files] You> "
elif mcp_manager.mode == "database" and mcp_manager.selected_db_index is not None:
prefix = f"[🗄️ MCP: DB #{mcp_manager.selected_db_index + 1}] You> "
# Get user input
user_input = prompt_session.prompt(
prefix,
auto_suggest=AutoSuggestFromHistory(),
).strip()
if not user_input:
continue
# Handle escape sequence
if user_input.startswith("//"):
user_input = user_input[1:]
# Check for exit
if user_input.lower() in ["exit", "quit", "bye"]:
console.print(
f"\n[bold yellow]Goodbye![/]\n"
f"[dim]Session: {session.stats.total_tokens:,} tokens, "
f"${session.stats.total_cost:.4f}[/]"
)
logger.info(
f"Session ended. Messages: {session.stats.message_count}, "
f"Tokens: {session.stats.total_tokens}, "
f"Cost: ${session.stats.total_cost:.4f}"
)
return
# Check for unknown commands
if user_input.startswith("/"):
cmd_word = user_input.split()[0].lower()
if not registry.is_command(user_input):
# Check if it's a valid command prefix
is_valid = any(cmd_word.startswith(cmd) for cmd in VALID_COMMANDS)
if not is_valid:
print_error(f"Unknown command: {cmd_word}")
print_info("Type /help to see available commands.")
continue
# Try to execute as command
context = session.get_context()
result = registry.execute(user_input, context)
if result:
# Update session state from context
session.memory_enabled = context.memory_enabled
session.memory_start_index = context.memory_start_index
session.online_enabled = context.online_enabled
session.middle_out_enabled = context.middle_out_enabled
session.session_max_token = context.session_max_token
session.current_index = context.current_index
session.system_prompt = context.session_system_prompt
if result.status == CommandStatus.EXIT:
return
# Handle special results
if result.data:
# Retry - resend last prompt
if "retry_prompt" in result.data:
user_input = result.data["retry_prompt"]
# Fall through to send message
# Paste - send clipboard content
elif "paste_prompt" in result.data:
user_input = result.data["paste_prompt"]
# Fall through to send message
# Model selection
elif "show_model_selector" in result.data:
search = result.data.get("search", "")
model = select_model(session.client, search if search else None)
if model:
session.set_model(model)
# If this came from /config model, also save as default
if result.data.get("set_as_default"):
settings.set_default_model(model["id"])
print_success(f"Default model set to: {model['id']}")
continue
# Load conversation
elif "load_conversation" in result.data:
history = result.data.get("history", [])
session.history.clear()
from oai.core.session import HistoryEntry
for entry in history:
session.history.append(HistoryEntry(
prompt=entry.get("prompt", ""),
response=entry.get("response", ""),
prompt_tokens=entry.get("prompt_tokens", 0),
completion_tokens=entry.get("completion_tokens", 0),
msg_cost=entry.get("msg_cost", 0.0),
))
session.current_index = len(session.history) - 1
continue
else:
# Normal command completed
continue
else:
# Command completed with no special data
continue
# Ensure model is selected
if not session.selected_model:
print_warning("Please select a model first with /model")
continue
# Send message
stream = settings.stream_enabled
if mcp_manager and mcp_manager.enabled:
tools = session.get_mcp_tools()
if tools:
stream = False # Disable streaming with tools
if stream:
console.print(
"[bold green]Streaming response...[/] "
"[dim](Press Ctrl+C to cancel)[/]"
)
if session.online_enabled:
console.print("[dim cyan]🌐 Online mode active[/]")
console.print("")
try:
response_text, usage, response_time = session.send_message(
user_input,
stream=stream,
)
except Exception as e:
print_error(f"Error: {e}")
logger.error(f"Message error: {e}")
continue
if not response_text:
print_error("No response received")
continue
# Display non-streaming response
if not stream:
console.print()
display_panel(
Markdown(response_text),
title="[bold green]AI Response[/]",
border_style="green",
)
# Calculate cost and tokens
cost = 0.0
tokens = 0
estimated = False
if usage and (usage.prompt_tokens > 0 or usage.completion_tokens > 0):
tokens = usage.total_tokens
if usage.total_cost_usd:
cost = usage.total_cost_usd
else:
cost = session.client.estimate_cost(
session.selected_model["id"],
usage.prompt_tokens,
usage.completion_tokens,
)
else:
# Estimate tokens when usage not available (streaming fallback)
# Rough estimate: ~4 characters per token for English text
est_input_tokens = len(user_input) // 4 + 1
est_output_tokens = len(response_text) // 4 + 1
tokens = est_input_tokens + est_output_tokens
cost = session.client.estimate_cost(
session.selected_model["id"],
est_input_tokens,
est_output_tokens,
)
# Create estimated usage for session tracking
usage = UsageStats(
prompt_tokens=est_input_tokens,
completion_tokens=est_output_tokens,
total_tokens=tokens,
)
estimated = True
# Add to history
session.add_to_history(user_input, response_text, usage, cost)
# Display metrics
est_marker = "~" if estimated else ""
context_info = ""
if session.memory_enabled:
context_count = len(session.history) - session.memory_start_index
if context_count > 1:
context_info = f", Context: {context_count} msg(s)"
else:
context_info = ", Memory: OFF"
online_emoji = " 🌐" if session.online_enabled else ""
mcp_emoji = ""
if mcp_manager and mcp_manager.enabled:
if mcp_manager.mode == "files":
mcp_emoji = " 🔧"
elif mcp_manager.mode == "database":
mcp_emoji = " 🗄️"
console.print(
f"\n[dim blue]📊 {est_marker}{tokens} tokens | {est_marker}${cost:.4f} | {response_time:.2f}s"
f"{context_info}{online_emoji}{mcp_emoji} | "
f"Session: {est_marker}{session.stats.total_tokens:,} tokens | "
f"{est_marker}${session.stats.total_cost:.4f}[/]"
)
# Check warnings
warnings = session.check_warnings()
for warning in warnings:
print_warning(warning)
# Offer to copy
console.print("")
try:
from oai.ui.prompts import prompt_copy_response
prompt_copy_response(response_text)
except Exception:
pass
console.print("")
except KeyboardInterrupt:
console.print("\n[dim]Input cancelled[/]")
continue
except EOFError:
console.print("\n[bold yellow]Goodbye![/]")
return
@app.command()
def chat(
model: Optional[str] = typer.Option( model: Optional[str] = typer.Option(
None, None,
"--model", "--model",
@@ -457,22 +61,35 @@ def chat(
help="Enable MCP server", help="Enable MCP server",
), ),
) -> None: ) -> None:
"""Start an interactive chat session.""" """Main callback - launches TUI by default."""
if version_flag:
typer.echo(f"oAI version {APP_VERSION}")
raise typer.Exit()
# If no subcommand provided, launch TUI
if ctx.invoked_subcommand is None:
_launch_tui(model, system, online, mcp)
def _launch_tui(
model: Optional[str] = None,
system: Optional[str] = None,
online: bool = False,
mcp: bool = False,
) -> None:
"""Launch the Textual TUI interface."""
# Setup logging # Setup logging
logging_manager = LoggingManager() logging_manager = LoggingManager()
logging_manager.setup() logging_manager.setup()
logger = get_logger() logger = get_logger()
# Clear screen
clear_screen()
# Load settings # Load settings
settings = Settings.load() settings = Settings.load()
# Check API key # Check API key
if not settings.api_key: if not settings.api_key:
print_error("No API key configured") typer.echo("Error: No API key configured", err=True)
print_info("Run: oai --config api to set your API key") typer.echo("Run: oai config api to set your API key", err=True)
raise typer.Exit(1) raise typer.Exit(1)
# Initialize client # Initialize client
@@ -482,236 +99,99 @@ def chat(
base_url=settings.base_url, base_url=settings.base_url,
) )
except Exception as e: except Exception as e:
print_error(f"Failed to initialize client: {e}") typer.echo(f"Error: Failed to initialize client: {e}", err=True)
raise typer.Exit(1) raise typer.Exit(1)
# Register commands # Register commands
register_all_commands() register_all_commands()
# Check for updates and show welcome # Initialize MCP manager (always create it, even if not enabled)
version_info = check_for_updates(APP_VERSION)
show_welcome(settings, version_info)
# Initialize MCP manager
mcp_manager = MCPManager() mcp_manager = MCPManager()
if mcp: if mcp:
result = mcp_manager.enable() try:
if result["success"]: result = mcp_manager.enable()
print_success("MCP enabled") if result["success"]:
else: logger.info("MCP server enabled in files mode")
print_warning(f"MCP: {result.get('error', 'Failed to enable')}") else:
logger.warning(f"MCP: {result.get('error', 'Failed to enable')}")
except Exception as e:
logger.warning(f"Failed to enable MCP: {e}")
# Create session # Create session with MCP manager
session = ChatSession( session = ChatSession(
client=client, client=client,
settings=settings, settings=settings,
mcp_manager=mcp_manager, mcp_manager=mcp_manager,
) )
# Set system prompt # Set system prompt if provided
if system: if system:
session.system_prompt = system session.set_system_prompt(system)
print_info(f"System prompt: {system}")
# Set online mode # Enable online mode if requested
if online: if online:
session.online_enabled = True session.online_enabled = True
print_info("Online mode enabled")
# Select model # Set model if specified, otherwise use default
if model: if model:
raw_model = client.get_raw_model(model) raw_model = client.get_raw_model(model)
if raw_model: if raw_model:
session.set_model(raw_model) session.set_model(raw_model)
else: else:
print_warning(f"Model '{model}' not found") logger.warning(f"Model '{model}' not found")
elif settings.default_model: elif settings.default_model:
raw_model = client.get_raw_model(settings.default_model) raw_model = client.get_raw_model(settings.default_model)
if raw_model: if raw_model:
session.set_model(raw_model) session.set_model(raw_model)
else: else:
print_warning(f"Default model '{settings.default_model}' not available") logger.warning(f"Default model '{settings.default_model}' not available")
# Setup prompt session # Run Textual app
HISTORY_FILE.parent.mkdir(parents=True, exist_ok=True) from oai.tui.app import oAIChatApp
prompt_session = PromptSession(
history=FileHistory(str(HISTORY_FILE)),
)
# Run chat loop app_instance = oAIChatApp(session, settings, model)
run_chat_loop(session, prompt_session, settings) app_instance.run()
@app.command() @app.command()
def config( def tui(
setting: Optional[str] = typer.Argument( model: Optional[str] = typer.Option(
None, None,
help="Setting to configure (api, url, model, system, stream, costwarning, maxtoken, online, log, loglevel)", "--model",
"-m",
help="Model ID to use",
), ),
value: Optional[str] = typer.Argument( system: Optional[str] = typer.Option(
None, None,
help="Value to set", "--system",
"-s",
help="System prompt",
),
online: bool = typer.Option(
False,
"--online",
"-o",
help="Enable online mode",
),
mcp: bool = typer.Option(
False,
"--mcp",
help="Enable MCP server",
), ),
) -> None: ) -> None:
"""View or modify configuration settings.""" """Start Textual TUI interface (alias for just running 'oai')."""
settings = Settings.load() _launch_tui(model, system, online, mcp)
if not setting:
# Show all settings
from rich.table import Table
from oai.constants import DEFAULT_SYSTEM_PROMPT
table = Table("Setting", "Value", show_header=True, header_style="bold magenta")
table.add_row("API Key", "***" + settings.api_key[-4:] if settings.api_key else "Not set")
table.add_row("Base URL", settings.base_url)
table.add_row("Default Model", settings.default_model or "Not set")
# Show system prompt status
if settings.default_system_prompt is None:
system_prompt_display = f"[default] {DEFAULT_SYSTEM_PROMPT[:40]}..."
elif settings.default_system_prompt == "":
system_prompt_display = "[blank]"
else:
system_prompt_display = settings.default_system_prompt[:50] + "..." if len(settings.default_system_prompt) > 50 else settings.default_system_prompt
table.add_row("System Prompt", system_prompt_display)
table.add_row("Streaming", "on" if settings.stream_enabled else "off")
table.add_row("Cost Warning", f"${settings.cost_warning_threshold:.4f}")
table.add_row("Max Tokens", str(settings.max_tokens))
table.add_row("Default Online", "on" if settings.default_online_mode else "off")
table.add_row("Log Level", settings.log_level)
display_panel(table, title="[bold green]Configuration[/]")
return
setting = setting.lower()
if setting == "api":
if value:
settings.set_api_key(value)
else:
from oai.ui.prompts import prompt_input
new_key = prompt_input("Enter API key", password=True)
if new_key:
settings.set_api_key(new_key)
print_success("API key updated")
elif setting == "url":
settings.set_base_url(value or "https://openrouter.ai/api/v1")
print_success(f"Base URL set to: {settings.base_url}")
elif setting == "model":
if value:
settings.set_default_model(value)
print_success(f"Default model set to: {value}")
else:
print_info(f"Current default model: {settings.default_model or 'Not set'}")
elif setting == "system":
from oai.constants import DEFAULT_SYSTEM_PROMPT
if value:
# Decode escape sequences like \n for newlines
value = value.encode().decode('unicode_escape')
settings.set_default_system_prompt(value)
if value:
print_success(f"Default system prompt set to: {value}")
else:
print_success("Default system prompt set to blank.")
else:
if settings.default_system_prompt is None:
print_info(f"Using hardcoded default: {DEFAULT_SYSTEM_PROMPT[:60]}...")
elif settings.default_system_prompt == "":
print_info("System prompt: [blank]")
else:
print_info(f"System prompt: {settings.default_system_prompt}")
elif setting == "stream":
if value and value.lower() in ["on", "off"]:
settings.set_stream_enabled(value.lower() == "on")
print_success(f"Streaming {'enabled' if settings.stream_enabled else 'disabled'}")
else:
print_info("Usage: oai config stream [on|off]")
elif setting == "costwarning":
if value:
try:
threshold = float(value)
settings.set_cost_warning_threshold(threshold)
print_success(f"Cost warning threshold set to: ${threshold:.4f}")
except ValueError:
print_error("Please enter a valid number")
else:
print_info(f"Current threshold: ${settings.cost_warning_threshold:.4f}")
elif setting == "maxtoken":
if value:
try:
max_tok = int(value)
settings.set_max_tokens(max_tok)
print_success(f"Max tokens set to: {max_tok}")
except ValueError:
print_error("Please enter a valid number")
else:
print_info(f"Current max tokens: {settings.max_tokens}")
elif setting == "online":
if value and value.lower() in ["on", "off"]:
settings.set_default_online_mode(value.lower() == "on")
print_success(f"Default online mode {'enabled' if settings.default_online_mode else 'disabled'}")
else:
print_info("Usage: oai config online [on|off]")
elif setting == "loglevel":
valid_levels = ["debug", "info", "warning", "error", "critical"]
if value and value.lower() in valid_levels:
settings.set_log_level(value.lower())
print_success(f"Log level set to: {value.lower()}")
else:
print_info(f"Valid levels: {', '.join(valid_levels)}")
else:
print_error(f"Unknown setting: {setting}")
@app.command() @app.command()
def version() -> None: def version() -> None:
"""Show version information.""" """Show version information."""
version_info = check_for_updates(APP_VERSION) typer.echo(f"oAI version {APP_VERSION}")
console.print(version_info) typer.echo(f"Visit {APP_URL} for more information")
@app.command()
def credits() -> None:
"""Check account credits."""
settings = Settings.load()
if not settings.api_key:
print_error("No API key configured")
raise typer.Exit(1)
client = AIClient(api_key=settings.api_key, base_url=settings.base_url)
credits_data = client.get_credits()
if not credits_data:
print_error("Failed to fetch credits")
raise typer.Exit(1)
from rich.table import Table
table = Table("Metric", "Value", show_header=True, header_style="bold magenta")
table.add_row("Total Credits", credits_data.get("total_credits_formatted", "N/A"))
table.add_row("Used Credits", credits_data.get("used_credits_formatted", "N/A"))
table.add_row("Credits Left", credits_data.get("credits_left_formatted", "N/A"))
display_panel(table, title="[bold green]Account Credits[/]")
def main() -> None: def main() -> None:
"""Main entry point.""" """Entry point for the CLI."""
# Default to 'chat' command if no arguments provided
if len(sys.argv) == 1:
sys.argv.append("chat")
app() app()

File diff suppressed because it is too large Load Diff

View File

@@ -98,6 +98,7 @@ class CommandContext:
total_output_tokens: int = 0 total_output_tokens: int = 0
total_cost: float = 0.0 total_cost: float = 0.0
message_count: int = 0 message_count: int = 0
is_tui: bool = False # Flag for TUI mode
current_index: int = 0 current_index: int = 0

View File

@@ -15,7 +15,7 @@ import logging
# ============================================================================= # =============================================================================
APP_NAME = "oAI" APP_NAME = "oAI"
APP_VERSION = "2.1.0" APP_VERSION = "3.0.0"
APP_URL = "https://iurl.no/oai" APP_URL = "https://iurl.no/oai"
APP_DESCRIPTION = "OpenRouter AI Chat Client with MCP Integration" APP_DESCRIPTION = "OpenRouter AI Chat Client with MCP Integration"

View File

@@ -9,10 +9,7 @@ import asyncio
import json import json
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple from typing import Any, AsyncIterator, Callable, Dict, Iterator, List, Optional, Tuple
from rich.live import Live
from rich.markdown import Markdown
from oai.commands.registry import CommandContext, CommandResult, registry from oai.commands.registry import CommandContext, CommandResult, registry
from oai.config.database import Database from oai.config.database import Database
@@ -25,16 +22,6 @@ from oai.constants import (
from oai.core.client import AIClient from oai.core.client import AIClient
from oai.mcp.manager import MCPManager from oai.mcp.manager import MCPManager
from oai.providers.base import ChatResponse, StreamChunk, UsageStats from oai.providers.base import ChatResponse, StreamChunk, UsageStats
from oai.ui.console import (
console,
display_markdown,
display_panel,
print_error,
print_info,
print_success,
print_warning,
)
from oai.ui.prompts import prompt_copy_response
from oai.utils.logging import get_logger from oai.utils.logging import get_logger
@@ -396,7 +383,7 @@ class ChatSession:
if not tool_calls: if not tool_calls:
return response return response
console.print(f"\n[dim yellow]🔧 AI requesting {len(tool_calls)} tool call(s)...[/]") # Tool calls requested by AI
tool_results = [] tool_results = []
for tc in tool_calls: for tc in tool_calls:
@@ -417,15 +404,17 @@ class ChatSession:
f'{k}="{v}"' if isinstance(v, str) else f"{k}={v}" f'{k}="{v}"' if isinstance(v, str) else f"{k}={v}"
for k, v in args.items() for k, v in args.items()
) )
console.print(f"[dim cyan] → {tc.function.name}({args_display})[/]") # Executing tool: {tc.function.name}
# Execute tool # Execute tool
result = asyncio.run(self.execute_tool(tc.function.name, args)) result = asyncio.run(self.execute_tool(tc.function.name, args))
if "error" in result: if "error" in result:
console.print(f"[dim red] ✗ Error: {result['error']}[/]") # Tool execution error logged
pass
else: else:
self._display_tool_success(tc.function.name, result) # Tool execution successful
pass
tool_results.append({ tool_results.append({
"tool_call_id": tc.id, "tool_call_id": tc.id,
@@ -452,38 +441,12 @@ class ChatSession:
}) })
api_messages.extend(tool_results) api_messages.extend(tool_results)
console.print("\n[dim cyan]💭 Processing tool results...[/]") # Processing tool results
loop_count += 1 loop_count += 1
self.logger.warning(f"Reached max tool loops ({max_loops})") self.logger.warning(f"Reached max tool loops ({max_loops})")
console.print(f"[bold yellow]⚠️ Reached maximum tool calls ({max_loops})[/]")
return response return response
def _display_tool_success(self, tool_name: str, result: Dict[str, Any]) -> None:
"""Display a success message for a tool call."""
if tool_name == "search_files":
count = result.get("count", 0)
console.print(f"[dim green] ✓ Found {count} file(s)[/]")
elif tool_name == "read_file":
size = result.get("size", 0)
truncated = " (truncated)" if result.get("truncated") else ""
console.print(f"[dim green] ✓ Read {size} bytes{truncated}[/]")
elif tool_name == "list_directory":
count = result.get("count", 0)
console.print(f"[dim green] ✓ Listed {count} item(s)[/]")
elif tool_name == "inspect_database":
if "table" in result:
console.print(f"[dim green] ✓ Inspected table: {result['table']}[/]")
else:
console.print(f"[dim green] ✓ Inspected database ({result.get('table_count', 0)} tables)[/]")
elif tool_name == "search_database":
count = result.get("count", 0)
console.print(f"[dim green] ✓ Found {count} match(es)[/]")
elif tool_name == "query_database":
count = result.get("count", 0)
console.print(f"[dim green] ✓ Query returned {count} row(s)[/]")
else:
console.print("[dim green] ✓ Success[/]")
def _stream_response( def _stream_response(
self, self,
@@ -521,27 +484,296 @@ class ChatSession:
usage: Optional[UsageStats] = None usage: Optional[UsageStats] = None
try: try:
with Live("", console=console, refresh_per_second=10) as live: for chunk in response:
for chunk in response: if chunk.error:
if chunk.error: self.logger.error(f"Stream error: {chunk.error}")
console.print(f"\n[bold red]Stream error: {chunk.error}[/]") break
break
if chunk.delta_content: if chunk.delta_content:
full_text += chunk.delta_content full_text += chunk.delta_content
live.update(Markdown(full_text)) if on_chunk:
if on_chunk: on_chunk(chunk.delta_content)
on_chunk(chunk.delta_content)
if chunk.usage: if chunk.usage:
usage = chunk.usage usage = chunk.usage
except KeyboardInterrupt: except KeyboardInterrupt:
console.print("\n[bold yellow]⚠️ Streaming interrupted[/]") self.logger.info("Streaming interrupted")
return "", None return "", None
return full_text, usage return full_text, usage
# ========== ASYNC METHODS FOR TUI ==========
async def send_message_async(
self,
user_input: str,
stream: bool = True,
) -> AsyncIterator[StreamChunk]:
"""
Async version of send_message for Textual TUI.
Args:
user_input: User's input text
stream: Whether to stream the response
Yields:
StreamChunk objects for progressive display
"""
if not self.selected_model:
raise ValueError("No model selected")
messages = self.build_api_messages(user_input)
tools = self.get_mcp_tools()
if tools:
# Disable streaming when tools are present
stream = False
model_id = self.selected_model["id"]
if self.online_enabled:
if hasattr(self.client.provider, "get_effective_model_id"):
model_id = self.client.provider.get_effective_model_id(model_id, True)
transforms = ["middle-out"] if self.middle_out_enabled else None
max_tokens = None
if self.session_max_token > 0:
max_tokens = self.session_max_token
if tools:
# Use async tool handling flow
async for chunk in self._send_with_tools_async(
messages=messages,
model_id=model_id,
tools=tools,
max_tokens=max_tokens,
transforms=transforms,
):
yield chunk
elif stream:
# Use async streaming flow
async for chunk in self._stream_response_async(
messages=messages,
model_id=model_id,
max_tokens=max_tokens,
transforms=transforms,
):
yield chunk
else:
# Non-streaming request
response = self.client.chat(
messages=messages,
model=model_id,
stream=False,
max_tokens=max_tokens,
transforms=transforms,
)
if isinstance(response, ChatResponse):
# Yield single chunk with complete response
chunk = StreamChunk(
id="",
delta_content=response.content,
usage=response.usage,
error=None,
)
yield chunk
async def _send_with_tools_async(
self,
messages: List[Dict[str, Any]],
model_id: str,
tools: List[Dict[str, Any]],
max_tokens: Optional[int] = None,
transforms: Optional[List[str]] = None,
) -> AsyncIterator[StreamChunk]:
"""
Async version of _send_with_tools for TUI.
Args:
messages: API messages
model_id: Model ID
tools: Tool definitions
max_tokens: Max tokens
transforms: Transforms list
Yields:
StreamChunk objects including tool call notifications
"""
max_loops = 5
loop_count = 0
api_messages = list(messages)
while loop_count < max_loops:
response = self.client.chat(
messages=api_messages,
model=model_id,
stream=False,
max_tokens=max_tokens,
tools=tools,
tool_choice="auto",
transforms=transforms,
)
if not isinstance(response, ChatResponse):
raise ValueError("Expected ChatResponse")
tool_calls = response.tool_calls
if not tool_calls:
# Final response, yield it
chunk = StreamChunk(
id="",
delta_content=response.content,
usage=response.usage,
error=None,
)
yield chunk
return
# Yield notification about tool calls
tool_notification = f"\n🔧 AI requesting {len(tool_calls)} tool call(s)...\n"
yield StreamChunk(id="", delta_content=tool_notification, usage=None, error=None)
tool_results = []
for tc in tool_calls:
try:
args = json.loads(tc.function.arguments)
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse tool arguments: {e}")
tool_results.append({
"tool_call_id": tc.id,
"role": "tool",
"name": tc.function.name,
"content": json.dumps({"error": f"Invalid arguments: {e}"}),
})
continue
# Yield tool call display
args_display = ", ".join(
f'{k}="{v}"' if isinstance(v, str) else f"{k}={v}"
for k, v in args.items()
)
tool_display = f"{tc.function.name}({args_display})\n"
yield StreamChunk(id="", delta_content=tool_display, usage=None, error=None)
# Execute tool (await instead of asyncio.run)
result = await self.execute_tool(tc.function.name, args)
if "error" in result:
error_msg = f" ✗ Error: {result['error']}\n"
yield StreamChunk(id="", delta_content=error_msg, usage=None, error=None)
else:
success_msg = self._format_tool_success(tc.function.name, result)
yield StreamChunk(id="", delta_content=success_msg, usage=None, error=None)
tool_results.append({
"tool_call_id": tc.id,
"role": "tool",
"name": tc.function.name,
"content": json.dumps(result),
})
# Add assistant message with tool calls
api_messages.append({
"role": "assistant",
"content": response.content,
"tool_calls": [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments,
},
}
for tc in tool_calls
],
})
# Add tool results
api_messages.extend(tool_results)
loop_count += 1
# Max loops reached
yield StreamChunk(
id="",
delta_content="\n⚠️ Maximum tool call loops reached\n",
usage=None,
error="Max loops reached"
)
def _format_tool_success(self, tool_name: str, result: Dict[str, Any]) -> str:
"""Format a success message for a tool call."""
if tool_name == "search_files":
count = result.get("count", 0)
return f" ✓ Found {count} file(s)\n"
elif tool_name == "read_file":
size = result.get("size", 0)
truncated = " (truncated)" if result.get("truncated") else ""
return f" ✓ Read {size} bytes{truncated}\n"
elif tool_name == "list_directory":
count = result.get("count", 0)
return f" ✓ Listed {count} item(s)\n"
elif tool_name == "inspect_database":
if "table" in result:
return f" ✓ Inspected table: {result['table']}\n"
else:
return f" ✓ Inspected database ({result.get('table_count', 0)} tables)\n"
elif tool_name == "search_database":
count = result.get("count", 0)
return f" ✓ Found {count} match(es)\n"
elif tool_name == "query_database":
count = result.get("count", 0)
return f" ✓ Query returned {count} row(s)\n"
else:
return " ✓ Success\n"
async def _stream_response_async(
self,
messages: List[Dict[str, Any]],
model_id: str,
max_tokens: Optional[int] = None,
transforms: Optional[List[str]] = None,
) -> AsyncIterator[StreamChunk]:
"""
Async version of _stream_response for TUI.
Args:
messages: API messages
model_id: Model ID
max_tokens: Max tokens
transforms: Transforms
Yields:
StreamChunk objects
"""
response = self.client.chat(
messages=messages,
model=model_id,
stream=True,
max_tokens=max_tokens,
transforms=transforms,
)
if isinstance(response, ChatResponse):
# Non-streaming response
chunk = StreamChunk(
id="",
delta_content=response.content,
usage=response.usage,
error=None,
)
yield chunk
return
# Stream the response
for chunk in response:
if chunk.error:
yield StreamChunk(id="", delta_content=None, usage=None, error=chunk.error)
break
yield chunk
# ========== END ASYNC METHODS ==========
def add_to_history( def add_to_history(
self, self,
prompt: str, prompt: str,

View File

@@ -269,10 +269,17 @@ class OpenRouterProvider(AIProvider):
completion_tokens = usage_data.get("output_tokens", 0) or 0 completion_tokens = usage_data.get("output_tokens", 0) or 0
# Get cost if available # Get cost if available
# OpenRouter returns cost in different places:
# 1. As 'total_cost_usd' in usage object (rare)
# 2. As 'usage' at root level (common - this is the dollar amount)
total_cost = None
if hasattr(usage_data, "total_cost_usd"): if hasattr(usage_data, "total_cost_usd"):
total_cost = getattr(usage_data, "total_cost_usd", None) total_cost = getattr(usage_data, "total_cost_usd", None)
elif hasattr(usage_data, "usage"):
# OpenRouter puts cost as 'usage' field (dollar amount)
total_cost = getattr(usage_data, "usage", None)
elif isinstance(usage_data, dict): elif isinstance(usage_data, dict):
total_cost = usage_data.get("total_cost_usd") total_cost = usage_data.get("total_cost_usd") or usage_data.get("usage")
return UsageStats( return UsageStats(
prompt_tokens=prompt_tokens, prompt_tokens=prompt_tokens,

5
oai/tui/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""Textual TUI interface for oAI."""
from oai.tui.app import oAIChatApp
__all__ = ["oAIChatApp"]

1002
oai/tui/app.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,21 @@
"""TUI screens for oAI."""
from oai.tui.screens.config_screen import ConfigScreen
from oai.tui.screens.conversation_selector import ConversationSelectorScreen
from oai.tui.screens.credits_screen import CreditsScreen
from oai.tui.screens.dialogs import AlertDialog, ConfirmDialog, InputDialog
from oai.tui.screens.help_screen import HelpScreen
from oai.tui.screens.model_selector import ModelSelectorScreen
from oai.tui.screens.stats_screen import StatsScreen
__all__ = [
"AlertDialog",
"ConfirmDialog",
"ConfigScreen",
"ConversationSelectorScreen",
"CreditsScreen",
"InputDialog",
"HelpScreen",
"ModelSelectorScreen",
"StatsScreen",
]

View File

@@ -0,0 +1,107 @@
"""Configuration screen for oAI TUI."""
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Static
from oai.config.settings import Settings
class ConfigScreen(ModalScreen[None]):
"""Modal screen displaying configuration settings."""
DEFAULT_CSS = """
ConfigScreen {
align: center middle;
}
ConfigScreen > Container {
width: 70;
height: auto;
background: #1e1e1e;
border: solid #555555;
}
ConfigScreen .header {
dock: top;
width: 100%;
height: auto;
background: #2d2d2d;
color: #cccccc;
padding: 0 2;
}
ConfigScreen .content {
width: 100%;
height: auto;
background: #1e1e1e;
padding: 2;
color: #cccccc;
}
ConfigScreen .footer {
dock: bottom;
width: 100%;
height: auto;
background: #2d2d2d;
padding: 1 2;
align: center middle;
}
"""
def __init__(self, settings: Settings):
super().__init__()
self.settings = settings
def compose(self) -> ComposeResult:
"""Compose the screen."""
with Container():
yield Static("[bold]Configuration[/]", classes="header")
with Vertical(classes="content"):
yield Static(self._get_config_text(), markup=True)
with Vertical(classes="footer"):
yield Button("Close", id="close", variant="primary")
def _get_config_text(self) -> str:
"""Generate the configuration text."""
from oai.constants import DEFAULT_SYSTEM_PROMPT
# API Key display
api_key_display = "***" + self.settings.api_key[-4:] if self.settings.api_key else "Not set"
# System prompt display
if self.settings.default_system_prompt is None:
system_prompt_display = f"[default] {DEFAULT_SYSTEM_PROMPT[:40]}..."
elif self.settings.default_system_prompt == "":
system_prompt_display = "[blank]"
else:
prompt = self.settings.default_system_prompt
system_prompt_display = prompt[:50] + "..." if len(prompt) > 50 else prompt
return f"""
[bold cyan]═══ CONFIGURATION ═══[/]
[bold]API Key:[/] {api_key_display}
[bold]Base URL:[/] {self.settings.base_url}
[bold]Default Model:[/] {self.settings.default_model or "Not set"}
[bold]System Prompt:[/] {system_prompt_display}
[bold]Streaming:[/] {"on" if self.settings.stream_enabled else "off"}
[bold]Cost Warning:[/] ${self.settings.cost_warning_threshold:.4f}
[bold]Max Tokens:[/] {self.settings.max_tokens}
[bold]Default Online:[/] {"on" if self.settings.default_online_mode else "off"}
[bold]Log Level:[/] {self.settings.log_level}
[dim]Use /config [setting] [value] to modify settings[/]
"""
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
self.dismiss()
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key in ("escape", "enter"):
self.dismiss()

View File

@@ -0,0 +1,205 @@
"""Conversation selector screen for oAI TUI."""
from typing import List, Optional
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, DataTable, Input, Static
class ConversationSelectorScreen(ModalScreen[Optional[dict]]):
"""Modal screen for selecting a saved conversation."""
DEFAULT_CSS = """
ConversationSelectorScreen {
align: center middle;
}
ConversationSelectorScreen > Container {
width: 80%;
height: 70%;
background: #1e1e1e;
border: solid #555555;
layout: vertical;
}
ConversationSelectorScreen .header {
height: 3;
width: 100%;
background: #2d2d2d;
color: #cccccc;
padding: 0 2;
content-align: center middle;
}
ConversationSelectorScreen .search-input {
height: 3;
width: 100%;
background: #2a2a2a;
border: solid #555555;
margin: 0 0 1 0;
}
ConversationSelectorScreen .search-input:focus {
border: solid #888888;
}
ConversationSelectorScreen DataTable {
height: 1fr;
width: 100%;
background: #1e1e1e;
border: solid #555555;
}
ConversationSelectorScreen .footer {
height: 5;
width: 100%;
background: #2d2d2d;
padding: 1 2;
align: center middle;
}
ConversationSelectorScreen Button {
margin: 0 1;
}
"""
def __init__(self, conversations: List[dict]):
super().__init__()
self.all_conversations = conversations
self.filtered_conversations = conversations
self.selected_conversation: Optional[dict] = None
def compose(self) -> ComposeResult:
"""Compose the screen."""
with Container():
yield Static(
f"[bold]Load Conversation[/] [dim]({len(self.all_conversations)} saved)[/]",
classes="header"
)
yield Input(placeholder="Search conversations...", id="search-input", classes="search-input")
yield DataTable(id="conv-table", cursor_type="row", show_header=True, zebra_stripes=True)
with Vertical(classes="footer"):
yield Button("Load", id="load", variant="success")
yield Button("Cancel", id="cancel", variant="error")
def on_mount(self) -> None:
"""Initialize the table when mounted."""
table = self.query_one("#conv-table", DataTable)
# Add columns
table.add_column("#", width=5)
table.add_column("Name", width=40)
table.add_column("Messages", width=12)
table.add_column("Last Saved", width=20)
# Populate table
self._populate_table()
# Focus table if list is small (fits on screen), otherwise focus search
if len(self.all_conversations) <= 10:
table.focus()
else:
search_input = self.query_one("#search-input", Input)
search_input.focus()
def _populate_table(self) -> None:
"""Populate the table with conversations."""
table = self.query_one("#conv-table", DataTable)
table.clear()
for idx, conv in enumerate(self.filtered_conversations, 1):
name = conv.get("name", "Unknown")
message_count = str(conv.get("message_count", 0))
last_saved = conv.get("last_saved", "Unknown")
# Format timestamp if it's a full datetime
if "T" in last_saved or len(last_saved) > 20:
try:
# Truncate to just date and time
last_saved = last_saved[:19].replace("T", " ")
except:
pass
table.add_row(
str(idx),
name,
message_count,
last_saved,
key=str(idx)
)
def on_input_changed(self, event: Input.Changed) -> None:
"""Filter conversations based on search input."""
if event.input.id != "search-input":
return
search_term = event.value.lower()
if not search_term:
self.filtered_conversations = self.all_conversations
else:
self.filtered_conversations = [
c for c in self.all_conversations
if search_term in c.get("name", "").lower()
]
self._populate_table()
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection (click)."""
try:
row_index = int(event.row_key.value) - 1
if 0 <= row_index < len(self.filtered_conversations):
self.selected_conversation = self.filtered_conversations[row_index]
except (ValueError, IndexError):
pass
def on_data_table_row_highlighted(self, event) -> None:
"""Handle row highlight (arrow key navigation)."""
try:
table = self.query_one("#conv-table", DataTable)
if table.cursor_row is not None:
row_data = table.get_row_at(table.cursor_row)
if row_data:
row_index = int(row_data[0]) - 1
if 0 <= row_index < len(self.filtered_conversations):
self.selected_conversation = self.filtered_conversations[row_index]
except (ValueError, IndexError, AttributeError):
pass
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
if event.button.id == "load":
if self.selected_conversation:
self.dismiss(self.selected_conversation)
else:
self.dismiss(None)
else:
self.dismiss(None)
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key == "escape":
self.dismiss(None)
elif event.key == "enter":
# If in search input, move to table
search_input = self.query_one("#search-input", Input)
if search_input.has_focus:
table = self.query_one("#conv-table", DataTable)
table.focus()
# If in table, select current row
else:
table = self.query_one("#conv-table", DataTable)
if table.cursor_row is not None:
try:
row_data = table.get_row_at(table.cursor_row)
if row_data:
row_index = int(row_data[0]) - 1
if 0 <= row_index < len(self.filtered_conversations):
selected = self.filtered_conversations[row_index]
self.dismiss(selected)
except (ValueError, IndexError, AttributeError):
if self.selected_conversation:
self.dismiss(self.selected_conversation)

View File

@@ -0,0 +1,125 @@
"""Credits screen for oAI TUI."""
from typing import Optional, Dict, Any
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Static
from oai.core.client import AIClient
class CreditsScreen(ModalScreen[None]):
"""Modal screen displaying account credits."""
DEFAULT_CSS = """
CreditsScreen {
align: center middle;
}
CreditsScreen > Container {
width: 60;
height: auto;
background: #1e1e1e;
border: solid #555555;
}
CreditsScreen .header {
dock: top;
width: 100%;
height: auto;
background: #2d2d2d;
color: #cccccc;
padding: 0 2;
}
CreditsScreen .content {
width: 100%;
height: auto;
background: #1e1e1e;
padding: 2;
color: #cccccc;
}
CreditsScreen .footer {
dock: bottom;
width: 100%;
height: auto;
background: #2d2d2d;
padding: 1 2;
align: center middle;
}
"""
def __init__(self, client: AIClient):
super().__init__()
self.client = client
self.credits_data: Optional[Dict[str, Any]] = None
def compose(self) -> ComposeResult:
"""Compose the screen."""
with Container():
yield Static("[bold]Account Credits[/]", classes="header")
with Vertical(classes="content"):
yield Static("[dim]Loading...[/]", id="credits-content", markup=True)
with Vertical(classes="footer"):
yield Button("Close", id="close", variant="primary")
def on_mount(self) -> None:
"""Fetch credits when mounted."""
self.fetch_credits()
def fetch_credits(self) -> None:
"""Fetch and display credits information."""
try:
self.credits_data = self.client.provider.get_credits()
content = self.query_one("#credits-content", Static)
content.update(self._get_credits_text())
except Exception as e:
content = self.query_one("#credits-content", Static)
content.update(f"[red]Error fetching credits:[/]\n{str(e)}")
def _get_credits_text(self) -> str:
"""Generate the credits text."""
if not self.credits_data:
return "[yellow]No credit information available[/]"
total = self.credits_data.get("total_credits", 0)
used = self.credits_data.get("used_credits", 0)
remaining = self.credits_data.get("credits_left", 0)
# Calculate percentage used
if total > 0:
percent_used = (used / total) * 100
percent_remaining = (remaining / total) * 100
else:
percent_used = 0
percent_remaining = 0
# Color code based on remaining credits
if percent_remaining > 50:
remaining_color = "green"
elif percent_remaining > 20:
remaining_color = "yellow"
else:
remaining_color = "red"
return f"""
[bold cyan]═══ OPENROUTER CREDITS ═══[/]
[bold]Total Credits:[/] ${total:.2f}
[bold]Used:[/] ${used:.2f} [dim]({percent_used:.1f}%)[/]
[bold]Remaining:[/] [{remaining_color}]${remaining:.2f}[/] [dim]({percent_remaining:.1f}%)[/]
[dim]Visit openrouter.ai to add more credits[/]
"""
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
self.dismiss()
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key in ("escape", "enter"):
self.dismiss()

236
oai/tui/screens/dialogs.py Normal file
View File

@@ -0,0 +1,236 @@
"""Modal dialog screens for oAI TUI."""
from typing import Callable, Optional
from textual.app import ComposeResult
from textual.containers import Container, Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Input, Label, Static
class ConfirmDialog(ModalScreen[bool]):
"""A confirmation dialog with Yes/No buttons."""
DEFAULT_CSS = """
ConfirmDialog {
align: center middle;
}
ConfirmDialog > Container {
width: 60;
height: auto;
background: #2d2d2d;
border: solid #555555;
padding: 2;
}
ConfirmDialog Label {
width: 100%;
content-align: center middle;
margin-bottom: 2;
color: #cccccc;
}
ConfirmDialog Horizontal {
width: 100%;
height: auto;
align: center middle;
}
ConfirmDialog Button {
margin: 0 1;
}
"""
def __init__(
self,
message: str,
title: str = "Confirm",
yes_label: str = "Yes",
no_label: str = "No",
):
super().__init__()
self.message = message
self.title = title
self.yes_label = yes_label
self.no_label = no_label
def compose(self) -> ComposeResult:
"""Compose the dialog."""
with Container():
yield Static(f"[bold]{self.title}[/]", classes="dialog-title")
yield Label(self.message)
with Horizontal():
yield Button(self.yes_label, id="yes", variant="success")
yield Button(self.no_label, id="no", variant="error")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
if event.button.id == "yes":
self.dismiss(True)
else:
self.dismiss(False)
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key == "escape":
self.dismiss(False)
elif event.key == "enter":
self.dismiss(True)
class InputDialog(ModalScreen[Optional[str]]):
"""An input dialog for text entry."""
DEFAULT_CSS = """
InputDialog {
align: center middle;
}
InputDialog > Container {
width: 70;
height: auto;
background: #2d2d2d;
border: solid #555555;
padding: 2;
}
InputDialog Label {
width: 100%;
margin-bottom: 1;
color: #cccccc;
}
InputDialog Input {
width: 100%;
margin-bottom: 2;
background: #3a3a3a;
border: solid #555555;
}
InputDialog Input:focus {
border: solid #888888;
}
InputDialog Horizontal {
width: 100%;
height: auto;
align: center middle;
}
InputDialog Button {
margin: 0 1;
}
"""
def __init__(
self,
message: str,
title: str = "Input",
default: str = "",
placeholder: str = "",
):
super().__init__()
self.message = message
self.title = title
self.default = default
self.placeholder = placeholder
def compose(self) -> ComposeResult:
"""Compose the dialog."""
with Container():
yield Static(f"[bold]{self.title}[/]", classes="dialog-title")
yield Label(self.message)
yield Input(
value=self.default,
placeholder=self.placeholder,
id="input-field"
)
with Horizontal():
yield Button("OK", id="ok", variant="primary")
yield Button("Cancel", id="cancel")
def on_mount(self) -> None:
"""Focus the input field when mounted."""
input_field = self.query_one("#input-field", Input)
input_field.focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
if event.button.id == "ok":
input_field = self.query_one("#input-field", Input)
self.dismiss(input_field.value)
else:
self.dismiss(None)
def on_input_submitted(self, event: Input.Submitted) -> None:
"""Handle Enter key in input field."""
self.dismiss(event.value)
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key == "escape":
self.dismiss(None)
class AlertDialog(ModalScreen[None]):
"""A simple alert/message dialog."""
DEFAULT_CSS = """
AlertDialog {
align: center middle;
}
AlertDialog > Container {
width: 60;
height: auto;
background: #2d2d2d;
border: solid #555555;
padding: 2;
}
AlertDialog Label {
width: 100%;
content-align: center middle;
margin-bottom: 2;
color: #cccccc;
}
AlertDialog Horizontal {
width: 100%;
height: auto;
align: center middle;
}
"""
def __init__(self, message: str, title: str = "Alert", variant: str = "default"):
super().__init__()
self.message = message
self.title = title
self.variant = variant
def compose(self) -> ComposeResult:
"""Compose the dialog."""
# Choose color based on variant (using design system)
color = "$primary"
if self.variant == "error":
color = "$error"
elif self.variant == "success":
color = "$success"
elif self.variant == "warning":
color = "$warning"
with Container():
yield Static(f"[bold {color}]{self.title}[/]", classes="dialog-title")
yield Label(self.message)
with Horizontal():
yield Button("OK", id="ok", variant="primary")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
self.dismiss()
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key in ("escape", "enter"):
self.dismiss()

View File

@@ -0,0 +1,138 @@
"""Help screen for oAI TUI."""
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Static
class HelpScreen(ModalScreen[None]):
"""Modal screen displaying help and commands."""
DEFAULT_CSS = """
HelpScreen {
align: center middle;
}
HelpScreen > Container {
width: 90%;
height: 85%;
background: #1e1e1e;
border: solid #555555;
}
HelpScreen .header {
dock: top;
width: 100%;
height: auto;
background: #2d2d2d;
color: #cccccc;
padding: 0 2;
}
HelpScreen .content {
height: 1fr;
background: #1e1e1e;
padding: 2;
overflow-y: auto;
color: #cccccc;
}
HelpScreen .footer {
dock: bottom;
width: 100%;
height: auto;
background: #2d2d2d;
padding: 1 2;
align: center middle;
}
"""
def compose(self) -> ComposeResult:
"""Compose the screen."""
with Container():
yield Static("[bold]oAI Help & Commands[/]", classes="header")
with Vertical(classes="content"):
yield Static(self._get_help_text(), markup=True)
with Vertical(classes="footer"):
yield Button("Close", id="close", variant="primary")
def _get_help_text(self) -> str:
"""Generate the help text."""
return """
[bold cyan]═══ KEYBOARD SHORTCUTS ═══[/]
[bold]F1[/] Show this help (Ctrl+H may not work)
[bold]F2[/] Open model selector (Ctrl+M may not work)
[bold]Ctrl+S[/] Show session statistics
[bold]Ctrl+L[/] Clear chat display
[bold]Ctrl+P[/] Show previous message
[bold]Ctrl+N[/] Show next message
[bold]Ctrl+Q[/] Quit application
[bold]Up/Down[/] Navigate input history
[bold]ESC[/] Close dialogs
[dim]Note: Some Ctrl keys may be captured by your terminal[/]
[bold cyan]═══ SLASH COMMANDS ═══[/]
[bold yellow]Session Control:[/]
/reset Clear conversation history (with confirmation)
/clear Clear the chat display
/memory on/off Toggle conversation memory
/online on/off Toggle online search mode
/exit, /quit, /bye Exit the application
[bold yellow]Model & Configuration:[/]
/model [search] Open model selector with optional search
/config View configuration settings
/config api Set API key (prompts for input)
/config stream on Enable streaming responses
/system [prompt] Set session system prompt
/maxtoken [n] Set session token limit
[bold yellow]Conversation Management:[/]
/save [name] Save current conversation
/load [name] Load saved conversation (shows picker if no name)
/list List all saved conversations
/delete <name> Delete a saved conversation
[bold yellow]Export:[/]
/export md [file] Export as Markdown
/export json [file] Export as JSON
/export html [file] Export as HTML
[bold yellow]History Navigation:[/]
/prev Show previous message in history
/next Show next message in history
[bold yellow]MCP (Model Context Protocol):[/]
/mcp on Enable MCP file access
/mcp off Disable MCP
/mcp status Show MCP status
/mcp add <path> Add folder for file access
/mcp list List registered folders
/mcp write Toggle write permissions
[bold yellow]Information & Utilities:[/]
/help Show this help screen
/stats Show session statistics
/credits Check account credits
/retry Retry last prompt
/paste Paste from clipboard and send
[bold cyan]═══ TIPS ═══[/]
• Type [bold]/[/] to see command suggestions with [bold]Tab[/] to autocomplete
• Use [bold]Up/Down arrows[/] to navigate your input history
• Type [bold]//[/] at start to escape commands (sends /help as literal message)
• All messages support [bold]Markdown formatting[/] with syntax highlighting
• Responses stream in real-time for better interactivity
• Enable MCP to let AI access your local files and databases
• Use [bold]F1[/] or [bold]F2[/] if Ctrl shortcuts don't work in your terminal
"""
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
self.dismiss()
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key in ("escape", "enter"):
self.dismiss()

View File

@@ -0,0 +1,254 @@
"""Model selector screen for oAI TUI."""
from typing import List, Optional
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, DataTable, Input, Label, Static
class ModelSelectorScreen(ModalScreen[Optional[dict]]):
"""Modal screen for selecting an AI model."""
DEFAULT_CSS = """
ModelSelectorScreen {
align: center middle;
}
ModelSelectorScreen > Container {
width: 90%;
height: 85%;
background: #1e1e1e;
border: solid #555555;
layout: vertical;
}
ModelSelectorScreen .header {
height: 3;
width: 100%;
background: #2d2d2d;
color: #cccccc;
padding: 0 2;
content-align: center middle;
}
ModelSelectorScreen .search-input {
height: 3;
width: 100%;
background: #2a2a2a;
border: solid #555555;
margin: 0 0 1 0;
}
ModelSelectorScreen .search-input:focus {
border: solid #888888;
}
ModelSelectorScreen DataTable {
height: 1fr;
width: 100%;
background: #1e1e1e;
border: solid #555555;
}
ModelSelectorScreen .footer {
height: 5;
width: 100%;
background: #2d2d2d;
padding: 1 2;
align: center middle;
}
ModelSelectorScreen Button {
margin: 0 1;
}
"""
def __init__(self, models: List[dict], current_model: Optional[str] = None):
super().__init__()
self.all_models = models
self.filtered_models = models
self.current_model = current_model
self.selected_model: Optional[dict] = None
def compose(self) -> ComposeResult:
"""Compose the screen."""
with Container():
yield Static(
f"[bold]Select Model[/] [dim]({len(self.all_models)} available)[/]",
classes="header"
)
yield Input(placeholder="Search to filter models...", id="search-input", classes="search-input")
yield DataTable(id="model-table", cursor_type="row", show_header=True, zebra_stripes=True)
with Vertical(classes="footer"):
yield Button("Select", id="select", variant="success")
yield Button("Cancel", id="cancel", variant="error")
def on_mount(self) -> None:
"""Initialize the table when mounted."""
table = self.query_one("#model-table", DataTable)
# Add columns
table.add_column("#", width=5)
table.add_column("Model ID", width=35)
table.add_column("Name", width=30)
table.add_column("Context", width=10)
table.add_column("Price", width=12)
table.add_column("Img", width=4)
table.add_column("Tools", width=6)
table.add_column("Online", width=7)
# Populate table
self._populate_table()
# Focus table if list is small (fits on screen), otherwise focus search
if len(self.filtered_models) <= 20:
table.focus()
else:
search_input = self.query_one("#search-input", Input)
search_input.focus()
def _populate_table(self) -> None:
"""Populate the table with models."""
table = self.query_one("#model-table", DataTable)
table.clear()
rows_added = 0
for idx, model in enumerate(self.filtered_models, 1):
try:
model_id = model.get("id", "")
name = model.get("name", "")
context = str(model.get("context_length", "N/A"))
# Format pricing
pricing = model.get("pricing", {})
prompt_price = pricing.get("prompt", "0")
completion_price = pricing.get("completion", "0")
# Convert to numbers and format
try:
prompt = float(prompt_price) * 1000000 # Convert to per 1M tokens
completion = float(completion_price) * 1000000
if prompt == 0 and completion == 0:
price = "Free"
else:
price = f"${prompt:.2f}/${completion:.2f}"
except:
price = "N/A"
# Check capabilities
architecture = model.get("architecture", {})
modality = architecture.get("modality", "")
supported_params = model.get("supported_parameters", [])
# Vision support: check if modality contains "image"
supports_vision = "image" in modality
# Tool support: check if "tools" or "tool_choice" in supported_parameters
supports_tools = "tools" in supported_params or "tool_choice" in supported_params
# Online support: check if model can use :online suffix (most models can)
# Models that already have :online in their ID support it
supports_online = ":online" in model_id or model_id not in ["openrouter/free"]
# Format capability indicators
img_indicator = "" if supports_vision else "-"
tools_indicator = "" if supports_tools else "-"
web_indicator = "" if supports_online else "-"
# Add row
table.add_row(
str(idx),
model_id,
name,
context,
price,
img_indicator,
tools_indicator,
web_indicator,
key=str(idx)
)
rows_added += 1
except Exception:
# Silently skip rows that fail to add
pass
def on_input_changed(self, event: Input.Changed) -> None:
"""Filter models based on search input."""
if event.input.id != "search-input":
return
search_term = event.value.lower()
if not search_term:
self.filtered_models = self.all_models
else:
self.filtered_models = [
m for m in self.all_models
if search_term in m.get("id", "").lower()
or search_term in m.get("name", "").lower()
]
self._populate_table()
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection (click or arrow navigation)."""
try:
row_index = int(event.row_key.value) - 1
if 0 <= row_index < len(self.filtered_models):
self.selected_model = self.filtered_models[row_index]
except (ValueError, IndexError):
pass
def on_data_table_row_highlighted(self, event) -> None:
"""Handle row highlight (arrow key navigation)."""
try:
table = self.query_one("#model-table", DataTable)
if table.cursor_row is not None:
row_data = table.get_row_at(table.cursor_row)
if row_data:
row_index = int(row_data[0]) - 1
if 0 <= row_index < len(self.filtered_models):
self.selected_model = self.filtered_models[row_index]
except (ValueError, IndexError, AttributeError):
pass
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
if event.button.id == "select":
if self.selected_model:
self.dismiss(self.selected_model)
else:
# No selection, dismiss without result
self.dismiss(None)
else:
self.dismiss(None)
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key == "escape":
self.dismiss(None)
elif event.key == "enter":
# If in search input, move to table
search_input = self.query_one("#search-input", Input)
if search_input.has_focus:
table = self.query_one("#model-table", DataTable)
table.focus()
# If in table or anywhere else, select current row
else:
table = self.query_one("#model-table", DataTable)
# Get the currently highlighted row
if table.cursor_row is not None:
try:
row_key = table.get_row_at(table.cursor_row)
if row_key:
row_index = int(row_key[0]) - 1
if 0 <= row_index < len(self.filtered_models):
selected = self.filtered_models[row_index]
self.dismiss(selected)
except (ValueError, IndexError, AttributeError):
# Fall back to previously selected model
if self.selected_model:
self.dismiss(self.selected_model)

View File

@@ -0,0 +1,129 @@
"""Statistics screen for oAI TUI."""
from textual.app import ComposeResult
from textual.containers import Container, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Static
from oai.core.session import ChatSession
class StatsScreen(ModalScreen[None]):
"""Modal screen displaying session statistics."""
DEFAULT_CSS = """
StatsScreen {
align: center middle;
}
StatsScreen > Container {
width: 70;
height: auto;
background: #1e1e1e;
border: solid #555555;
}
StatsScreen .header {
dock: top;
width: 100%;
height: auto;
background: #2d2d2d;
color: #cccccc;
padding: 0 2;
}
StatsScreen .content {
width: 100%;
height: auto;
background: #1e1e1e;
padding: 2;
color: #cccccc;
}
StatsScreen .footer {
dock: bottom;
width: 100%;
height: auto;
background: #2d2d2d;
padding: 1 2;
align: center middle;
}
"""
def __init__(self, session: ChatSession):
super().__init__()
self.session = session
def compose(self) -> ComposeResult:
"""Compose the screen."""
with Container():
yield Static("[bold]Session Statistics[/]", classes="header")
with Vertical(classes="content"):
yield Static(self._get_stats_text(), markup=True)
with Vertical(classes="footer"):
yield Button("Close", id="close", variant="primary")
def _get_stats_text(self) -> str:
"""Generate the statistics text."""
stats = self.session.stats
# Calculate averages
avg_input = stats.total_input_tokens // stats.message_count if stats.message_count > 0 else 0
avg_output = stats.total_output_tokens // stats.message_count if stats.message_count > 0 else 0
avg_cost = stats.total_cost / stats.message_count if stats.message_count > 0 else 0
# Get model info
model_name = "None"
model_context = "N/A"
if self.session.selected_model:
model_name = self.session.selected_model.get("name", "Unknown")
model_context = str(self.session.selected_model.get("context_length", "N/A"))
# MCP status
mcp_status = "Disabled"
if self.session.mcp_manager and self.session.mcp_manager.enabled:
mode = self.session.mcp_manager.mode
if mode == "files":
write = " (Write)" if self.session.mcp_manager.write_enabled else ""
mcp_status = f"Enabled - Files{write}"
elif mode == "database":
db_idx = self.session.mcp_manager.selected_db_index
if db_idx is not None:
db_name = self.session.mcp_manager.databases[db_idx]["name"]
mcp_status = f"Enabled - Database ({db_name})"
return f"""
[bold cyan]═══ SESSION INFO ═══[/]
[bold]Messages:[/] {stats.message_count}
[bold]Current Model:[/] {model_name}
[bold]Context Length:[/] {model_context}
[bold]Memory:[/] {"Enabled" if self.session.memory_enabled else "Disabled"}
[bold]Online Mode:[/] {"Enabled" if self.session.online_enabled else "Disabled"}
[bold]MCP:[/] {mcp_status}
[bold cyan]═══ TOKEN USAGE ═══[/]
[bold]Input Tokens:[/] {stats.total_input_tokens:,}
[bold]Output Tokens:[/] {stats.total_output_tokens:,}
[bold]Total Tokens:[/] {stats.total_tokens:,}
[bold]Avg Input/Msg:[/] {avg_input:,}
[bold]Avg Output/Msg:[/] {avg_output:,}
[bold cyan]═══ COSTS ═══[/]
[bold]Total Cost:[/] ${stats.total_cost:.6f}
[bold]Avg Cost/Msg:[/] ${avg_cost:.6f}
[bold cyan]═══ HISTORY ═══[/]
[bold]History Size:[/] {len(self.session.history)} entries
[bold]Current Index:[/] {self.session.current_index + 1 if self.session.history else 0}
[bold]Memory Start:[/] {self.session.memory_start_index + 1}
"""
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button press."""
self.dismiss()
def on_key(self, event) -> None:
"""Handle keyboard shortcuts."""
if event.key in ("escape", "enter"):
self.dismiss()

167
oai/tui/styles.tcss Normal file
View File

@@ -0,0 +1,167 @@
/* Textual CSS for oAI TUI - Using Textual Design System */
Screen {
background: $background;
overflow: hidden;
}
Header {
dock: top;
height: auto;
background: #2d2d2d;
color: #cccccc;
padding: 0 1;
border-bottom: solid #555555;
}
ChatDisplay {
background: $background;
border: none;
padding: 1;
scrollbar-background: $background;
scrollbar-color: $primary;
overflow-y: auto;
}
UserMessageWidget {
margin: 0 0 1 0;
padding: 1;
background: $surface;
border-left: thick $success;
height: auto;
}
SystemMessageWidget {
margin: 0 0 1 0;
padding: 1;
background: #2a2a2a;
border-left: thick #888888;
height: auto;
color: #cccccc;
}
AssistantMessageWidget {
margin: 0 0 1 0;
padding: 1;
background: $panel;
border-left: thick $accent;
height: auto;
}
#assistant-label {
margin-bottom: 1;
color: #cccccc;
}
#assistant-content {
height: auto;
max-height: 100%;
color: $text;
}
InputBar {
dock: bottom;
height: auto;
background: #2d2d2d;
align: center middle;
border-top: solid #555555;
padding: 1;
}
#input-prefix {
width: auto;
padding: 0 1;
content-align: center middle;
color: #888888;
}
#input-prefix.prefix-hidden {
display: none;
}
#chat-input {
width: 85%;
height: 5;
min-height: 5;
background: #3a3a3a;
border: none;
padding: 1 2;
color: #ffffff;
content-align: left top;
}
#chat-input:focus {
background: #404040;
}
#command-dropdown {
display: none;
dock: bottom;
offset-y: -5;
offset-x: 7.5%;
height: auto;
max-height: 12;
width: 85%;
background: #2d2d2d;
border: solid #555555;
padding: 0;
layer: overlay;
}
#command-dropdown.visible {
display: block;
}
#command-dropdown #command-list {
background: #2d2d2d;
border: none;
scrollbar-background: #2d2d2d;
scrollbar-color: #555555;
}
Footer {
dock: bottom;
height: auto;
background: #252525;
color: #888888;
padding: 0 1;
}
/* Button styles */
Button {
height: 3;
min-width: 10;
background: #3a3a3a;
color: #cccccc;
border: none;
}
Button:hover {
background: #4a4a4a;
}
Button:focus {
background: #505050;
}
Button.-primary {
background: #3a3a3a;
}
Button.-success {
background: #2d5016;
color: #90ee90;
}
Button.-success:hover {
background: #3a6b1e;
}
Button.-error {
background: #5a1a1a;
color: #ff6b6b;
}
Button.-error:hover {
background: #6e2222;
}

View File

@@ -0,0 +1,17 @@
"""TUI widgets for oAI."""
from oai.tui.widgets.chat_display import ChatDisplay
from oai.tui.widgets.footer import Footer
from oai.tui.widgets.header import Header
from oai.tui.widgets.input_bar import InputBar
from oai.tui.widgets.message import AssistantMessageWidget, SystemMessageWidget, UserMessageWidget
__all__ = [
"ChatDisplay",
"Footer",
"Header",
"InputBar",
"UserMessageWidget",
"SystemMessageWidget",
"AssistantMessageWidget",
]

View File

@@ -0,0 +1,21 @@
"""Chat display widget for oAI TUI."""
from textual.containers import ScrollableContainer
from textual.widgets import Static
class ChatDisplay(ScrollableContainer):
"""Scrollable container for chat messages."""
def __init__(self):
super().__init__(id="chat-display")
async def add_message(self, widget: Static) -> None:
"""Add a message widget to the display."""
await self.mount(widget)
self.scroll_end(animate=False)
def clear_messages(self) -> None:
"""Clear all messages from the display."""
for child in list(self.children):
child.remove()

View File

@@ -0,0 +1,178 @@
"""Command dropdown menu for TUI input."""
from textual.app import ComposeResult
from textual.containers import VerticalScroll
from textual.widget import Widget
from textual.widgets import Label, OptionList
from textual.widgets.option_list import Option
from oai.commands import registry
class CommandDropdown(VerticalScroll):
"""Dropdown menu showing available commands."""
DEFAULT_CSS = """
CommandDropdown {
display: none;
height: auto;
max-height: 12;
width: 80;
background: #2d2d2d;
border: solid #555555;
padding: 0;
layer: overlay;
}
CommandDropdown.visible {
display: block;
}
CommandDropdown OptionList {
height: auto;
max-height: 12;
background: #2d2d2d;
border: none;
padding: 0;
}
CommandDropdown OptionList > .option-list--option {
padding: 0 2;
color: #cccccc;
background: transparent;
}
CommandDropdown OptionList > .option-list--option-highlighted {
background: #3e3e3e;
color: #ffffff;
}
"""
def __init__(self):
"""Initialize the command dropdown."""
super().__init__(id="command-dropdown")
self._all_commands = []
self._load_commands()
def _load_commands(self) -> None:
"""Load all available commands."""
# Get base commands with descriptions
base_commands = [
("/help", "Show help screen"),
("/model", "Select AI model"),
("/stats", "Show session statistics"),
("/credits", "Check account credits"),
("/clear", "Clear chat display"),
("/reset", "Reset conversation history"),
("/memory on", "Enable conversation memory"),
("/memory off", "Disable memory"),
("/online on", "Enable online search"),
("/online off", "Disable online search"),
("/save", "Save current conversation"),
("/load", "Load saved conversation"),
("/list", "List saved conversations"),
("/delete", "Delete a conversation"),
("/export md", "Export as Markdown"),
("/export json", "Export as JSON"),
("/export html", "Export as HTML"),
("/prev", "Show previous message"),
("/next", "Show next message"),
("/config", "View configuration"),
("/config api", "Set API key"),
("/system", "Set system prompt"),
("/maxtoken", "Set token limit"),
("/retry", "Retry last prompt"),
("/paste", "Paste from clipboard"),
("/mcp on", "Enable MCP file access"),
("/mcp off", "Disable MCP"),
("/mcp status", "Show MCP status"),
("/mcp add", "Add folder/database"),
("/mcp remove", "Remove folder/database"),
("/mcp list", "List folders"),
("/mcp write on", "Enable write mode"),
("/mcp write off", "Disable write mode"),
("/mcp files", "Switch to file mode"),
("/mcp db list", "List databases"),
]
self._all_commands = base_commands
def compose(self) -> ComposeResult:
"""Compose the dropdown."""
yield OptionList(id="command-list")
def show_commands(self, filter_text: str = "") -> None:
"""Show commands matching the filter.
Args:
filter_text: Text to filter commands by
"""
option_list = self.query_one("#command-list", OptionList)
option_list.clear_options()
if not filter_text.startswith("/"):
self.remove_class("visible")
return
# Remove the leading slash for filtering
filter_without_slash = filter_text[1:].lower()
# Filter commands - show if filter text is contained anywhere in the command
if filter_without_slash:
matching = [
(cmd, desc) for cmd, desc in self._all_commands
if filter_without_slash in cmd[1:].lower() # Skip the / in command for matching
]
else:
# Show all commands when just "/" is typed
matching = self._all_commands
if not matching:
self.remove_class("visible")
return
# Add options - limit to 10 results
for cmd, desc in matching[:10]:
# Format: command in white, description in gray, separated by spaces
label = f"{cmd} [dim]{desc}[/]" if desc else cmd
option_list.add_option(Option(label, id=cmd))
self.add_class("visible")
# Auto-select first option
if len(option_list._options) > 0:
option_list.highlighted = 0
def hide(self) -> None:
"""Hide the dropdown."""
self.remove_class("visible")
def get_selected_command(self) -> str | None:
"""Get the currently selected command.
Returns:
Selected command text or None
"""
option_list = self.query_one("#command-list", OptionList)
if option_list.highlighted is not None:
option = option_list.get_option_at_index(option_list.highlighted)
return option.id
return None
def move_selection_up(self) -> None:
"""Move selection up in the list."""
option_list = self.query_one("#command-list", OptionList)
if option_list.option_count > 0:
if option_list.highlighted is None:
option_list.highlighted = option_list.option_count - 1
elif option_list.highlighted > 0:
option_list.highlighted -= 1
def move_selection_down(self) -> None:
"""Move selection down in the list."""
option_list = self.query_one("#command-list", OptionList)
if option_list.option_count > 0:
if option_list.highlighted is None:
option_list.highlighted = 0
elif option_list.highlighted < option_list.option_count - 1:
option_list.highlighted += 1

View File

@@ -0,0 +1,58 @@
"""Command suggester for TUI input."""
from typing import Iterable
from textual.suggester import Suggester
from oai.commands import registry
class CommandSuggester(Suggester):
"""Suggester that provides command completions."""
def __init__(self):
"""Initialize the command suggester."""
super().__init__(use_cache=False, case_sensitive=False)
# Get all command names from registry
self._commands = []
self._update_commands()
def _update_commands(self) -> None:
"""Update the list of available commands."""
# Get all registered command names
command_names = registry.get_all_names()
# Add common MCP subcommands for better UX
mcp_subcommands = [
"/mcp on",
"/mcp off",
"/mcp status",
"/mcp add",
"/mcp remove",
"/mcp list",
"/mcp write on",
"/mcp write off",
"/mcp files",
"/mcp db list",
]
self._commands = command_names + mcp_subcommands
async def get_suggestion(self, value: str) -> str | None:
"""Get a command suggestion based on the current input.
Args:
value: Current input value
Returns:
Suggested completion or None
"""
if not value or not value.startswith("/"):
return None
# Find the first command that starts with the input
value_lower = value.lower()
for cmd in self._commands:
if cmd.lower().startswith(value_lower) and cmd.lower() != value_lower:
# Return the rest of the command (after what's already typed)
return cmd[len(value):]
return None

39
oai/tui/widgets/footer.py Normal file
View File

@@ -0,0 +1,39 @@
"""Footer widget for oAI TUI."""
from textual.app import ComposeResult
from textual.widgets import Static
class Footer(Static):
"""Footer displaying session metrics."""
def __init__(self):
super().__init__()
self.tokens_in = 0
self.tokens_out = 0
self.cost = 0.0
self.messages = 0
def compose(self) -> ComposeResult:
"""Compose the footer."""
yield Static(self._format_footer(), id="footer-content")
def _format_footer(self) -> str:
"""Format the footer text."""
return (
f"[dim]Messages: {self.messages} | "
f"Tokens: {self.tokens_in + self.tokens_out:,} "
f"({self.tokens_in:,} in, {self.tokens_out:,} out) | "
f"Cost: ${self.cost:.4f}[/]"
)
def update_stats(
self, tokens_in: int, tokens_out: int, cost: float, messages: int
) -> None:
"""Update the displayed statistics."""
self.tokens_in = tokens_in
self.tokens_out = tokens_out
self.cost = cost
self.messages = messages
content = self.query_one("#footer-content", Static)
content.update(self._format_footer())

65
oai/tui/widgets/header.py Normal file
View File

@@ -0,0 +1,65 @@
"""Header widget for oAI TUI."""
from textual.app import ComposeResult
from textual.widgets import Static
from typing import Optional, Dict, Any
class Header(Static):
"""Header displaying app title, version, current model, and capabilities."""
def __init__(self, version: str = "3.0.0", model: str = "", model_info: Optional[Dict[str, Any]] = None):
super().__init__()
self.version = version
self.model = model
self.model_info = model_info or {}
def compose(self) -> ComposeResult:
"""Compose the header."""
yield Static(self._format_header(), id="header-content")
def _format_capabilities(self) -> str:
"""Format capability icons based on model info."""
if not self.model_info:
return ""
icons = []
# Check vision support
architecture = self.model_info.get("architecture", {})
modality = architecture.get("modality", "")
if "image" in modality:
icons.append("[bold cyan]👁️[/]") # Bright if supported
else:
icons.append("[dim]👁️[/]") # Dim if not supported
# Check tool support
supported_params = self.model_info.get("supported_parameters", [])
if "tools" in supported_params or "tool_choice" in supported_params:
icons.append("[bold cyan]🔧[/]")
else:
icons.append("[dim]🔧[/]")
# Check online support (most models support :online suffix)
model_id = self.model_info.get("id", "")
if ":online" in model_id or model_id not in ["openrouter/free"]:
icons.append("[bold cyan]🌐[/]")
else:
icons.append("[dim]🌐[/]")
return " ".join(icons) if icons else ""
def _format_header(self) -> str:
"""Format the header text."""
model_text = f" | {self.model}" if self.model else ""
capabilities = self._format_capabilities()
capabilities_text = f" {capabilities}" if capabilities else ""
return f"[bold cyan]oAI[/] [dim]v{self.version}[/]{model_text}{capabilities_text}"
def update_model(self, model: str, model_info: Optional[Dict[str, Any]] = None) -> None:
"""Update the displayed model and capabilities."""
self.model = model
if model_info:
self.model_info = model_info
content = self.query_one("#header-content", Static)
content.update(self._format_header())

View File

@@ -0,0 +1,49 @@
"""Input bar widget for oAI TUI."""
from textual.app import ComposeResult
from textual.containers import Horizontal
from textual.widgets import Input, Static
class InputBar(Horizontal):
"""Input bar with prompt prefix and text input."""
def __init__(self):
super().__init__(id="input-bar")
self.mcp_status = ""
self.online_mode = False
def compose(self) -> ComposeResult:
"""Compose the input bar."""
yield Static(self._format_prefix(), id="input-prefix", classes="prefix-hidden" if not (self.mcp_status or self.online_mode) else "")
yield Input(
placeholder="Type a message or /command...",
id="chat-input"
)
def _format_prefix(self) -> str:
"""Format the input prefix with status indicators."""
indicators = []
if self.mcp_status:
indicators.append(f"[cyan]{self.mcp_status}[/]")
if self.online_mode:
indicators.append("[green]🌐[/]")
prefix = " ".join(indicators) + " " if indicators else ""
return f"{prefix}[bold]>[/]"
def update_mcp_status(self, status: str) -> None:
"""Update MCP status indicator."""
self.mcp_status = status
prefix = self.query_one("#input-prefix", Static)
prefix.update(self._format_prefix())
def update_online_mode(self, online: bool) -> None:
"""Update online mode indicator."""
self.online_mode = online
prefix = self.query_one("#input-prefix", Static)
prefix.update(self._format_prefix())
def get_input(self) -> Input:
"""Get the input widget."""
return self.query_one("#chat-input", Input)

View File

@@ -0,0 +1,69 @@
"""Message widgets for oAI TUI."""
from typing import Any, AsyncIterator, Tuple
from rich.markdown import Markdown
from textual.app import ComposeResult
from textual.widgets import RichLog, Static
class UserMessageWidget(Static):
"""Widget for displaying user messages."""
def __init__(self, content: str):
super().__init__()
self.content = content
def compose(self) -> ComposeResult:
"""Compose the user message."""
yield Static(f"[bold green]You:[/] {self.content}")
class SystemMessageWidget(Static):
"""Widget for displaying system/info messages without 'You:' prefix."""
def __init__(self, content: str):
super().__init__()
self.content = content
def compose(self) -> ComposeResult:
"""Compose the system message."""
yield Static(self.content)
class AssistantMessageWidget(Static):
"""Widget for displaying assistant responses with streaming support."""
def __init__(self, model_name: str = "Assistant"):
super().__init__()
self.model_name = model_name
self.full_text = ""
def compose(self) -> ComposeResult:
"""Compose the assistant message."""
yield Static(f"[bold]{self.model_name}:[/]", id="assistant-label")
yield RichLog(id="assistant-content", highlight=True, markup=True, wrap=True)
async def stream_response(self, response_iterator: AsyncIterator) -> Tuple[str, Any]:
"""Stream tokens progressively and return final text and usage."""
log = self.query_one("#assistant-content", RichLog)
self.full_text = ""
usage = None
async for chunk in response_iterator:
if hasattr(chunk, "delta_content") and chunk.delta_content:
self.full_text += chunk.delta_content
log.clear()
log.write(Markdown(self.full_text))
if hasattr(chunk, "usage") and chunk.usage:
usage = chunk.usage
return self.full_text, usage
def set_content(self, content: str) -> None:
"""Set the complete content (non-streaming)."""
self.full_text = content
log = self.query_one("#assistant-content", RichLog)
log.clear()
log.write(Markdown(content))

View File

@@ -1,51 +0,0 @@
"""
UI utilities for oAI.
This module provides rich terminal UI components and display helpers
for the chat application.
"""
from oai.ui.console import (
console,
clear_screen,
display_panel,
display_table,
display_markdown,
print_error,
print_warning,
print_success,
print_info,
)
from oai.ui.tables import (
create_model_table,
create_stats_table,
create_help_table,
display_paginated_table,
)
from oai.ui.prompts import (
prompt_confirm,
prompt_choice,
prompt_input,
)
__all__ = [
# Console utilities
"console",
"clear_screen",
"display_panel",
"display_table",
"display_markdown",
"print_error",
"print_warning",
"print_success",
"print_info",
# Table utilities
"create_model_table",
"create_stats_table",
"create_help_table",
"display_paginated_table",
# Prompt utilities
"prompt_confirm",
"prompt_choice",
"prompt_input",
]

View File

@@ -1,242 +0,0 @@
"""
Console utilities for oAI.
This module provides the Rich console instance and common display functions
for formatted terminal output.
"""
from typing import Any, Optional
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
# Global console instance for the application
console = Console()
def clear_screen() -> None:
"""
Clear the terminal screen.
Uses ANSI escape codes for fast clearing, with a fallback
for terminals that don't support them.
"""
try:
print("\033[H\033[J", end="", flush=True)
except Exception:
# Fallback: print many newlines
print("\n" * 100)
def display_panel(
content: Any,
title: Optional[str] = None,
subtitle: Optional[str] = None,
border_style: str = "green",
title_align: str = "left",
subtitle_align: str = "right",
) -> None:
"""
Display content in a bordered panel.
Args:
content: Content to display (string, Table, or Markdown)
title: Optional panel title
subtitle: Optional panel subtitle
border_style: Border color/style
title_align: Title alignment ("left", "center", "right")
subtitle_align: Subtitle alignment
"""
panel = Panel(
content,
title=title,
subtitle=subtitle,
border_style=border_style,
title_align=title_align,
subtitle_align=subtitle_align,
)
console.print(panel)
def display_table(
table: Table,
title: Optional[str] = None,
subtitle: Optional[str] = None,
) -> None:
"""
Display a table with optional title panel.
Args:
table: Rich Table to display
title: Optional panel title
subtitle: Optional panel subtitle
"""
if title:
display_panel(table, title=title, subtitle=subtitle)
else:
console.print(table)
def display_markdown(
content: str,
panel: bool = False,
title: Optional[str] = None,
) -> None:
"""
Display markdown-formatted content.
Args:
content: Markdown text to display
panel: Whether to wrap in a panel
title: Optional panel title (if panel=True)
"""
md = Markdown(content)
if panel:
display_panel(md, title=title)
else:
console.print(md)
def print_error(message: str, prefix: str = "Error:") -> None:
"""
Print an error message in red.
Args:
message: Error message to display
prefix: Prefix before the message (default: "Error:")
"""
console.print(f"[bold red]{prefix}[/] {message}")
def print_warning(message: str, prefix: str = "Warning:") -> None:
"""
Print a warning message in yellow.
Args:
message: Warning message to display
prefix: Prefix before the message (default: "Warning:")
"""
console.print(f"[bold yellow]{prefix}[/] {message}")
def print_success(message: str, prefix: str = "") -> None:
"""
Print a success message in green.
Args:
message: Success message to display
prefix: Prefix before the message (default: "")
"""
console.print(f"[bold green]{prefix}[/] {message}")
def print_info(message: str, dim: bool = False) -> None:
"""
Print an informational message in cyan.
Args:
message: Info message to display
dim: Whether to dim the message
"""
if dim:
console.print(f"[dim cyan]{message}[/]")
else:
console.print(f"[bold cyan]{message}[/]")
def print_metrics(
tokens: int,
cost: float,
time_seconds: float,
context_info: str = "",
online: bool = False,
mcp_mode: Optional[str] = None,
tool_loops: int = 0,
session_tokens: int = 0,
session_cost: float = 0.0,
) -> None:
"""
Print formatted metrics for a response.
Args:
tokens: Total tokens used
cost: Cost in USD
time_seconds: Response time
context_info: Context information string
online: Whether online mode is active
mcp_mode: MCP mode ("files", "database", or None)
tool_loops: Number of tool call loops
session_tokens: Total session tokens
session_cost: Total session cost
"""
parts = [
f"📊 Metrics: {tokens} tokens",
f"${cost:.4f}",
f"{time_seconds:.2f}s",
]
if context_info:
parts.append(context_info)
if online:
parts.append("🌐")
if mcp_mode == "files":
parts.append("🔧")
elif mcp_mode == "database":
parts.append("🗄️")
if tool_loops > 0:
parts.append(f"({tool_loops} tool loop(s))")
parts.append(f"Session: {session_tokens} tokens")
parts.append(f"${session_cost:.4f}")
console.print(f"\n[dim blue]{' | '.join(parts)}[/]")
def format_size(size_bytes: int) -> str:
"""
Format a size in bytes to a human-readable string.
Args:
size_bytes: Size in bytes
Returns:
Formatted size string (e.g., "1.5 MB")
"""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if abs(size_bytes) < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} PB"
def format_tokens(tokens: int) -> str:
"""
Format token count with thousands separators.
Args:
tokens: Number of tokens
Returns:
Formatted token string (e.g., "1,234,567")
"""
return f"{tokens:,}"
def format_cost(cost: float, precision: int = 4) -> str:
"""
Format cost in USD.
Args:
cost: Cost in dollars
precision: Decimal places
Returns:
Formatted cost string (e.g., "$0.0123")
"""
return f"${cost:.{precision}f}"

View File

@@ -1,274 +0,0 @@
"""
Prompt utilities for oAI.
This module provides functions for gathering user input
through confirmations, choices, and text prompts.
"""
from typing import List, Optional, TypeVar
import typer
from oai.ui.console import console
T = TypeVar("T")
def prompt_confirm(
message: str,
default: bool = False,
abort: bool = False,
) -> bool:
"""
Prompt the user for a yes/no confirmation.
Args:
message: The question to ask
default: Default value if user presses Enter
abort: Whether to abort on "no" response
Returns:
True if user confirms, False otherwise
"""
try:
return typer.confirm(message, default=default, abort=abort)
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Cancelled[/]")
return False
def prompt_choice(
message: str,
choices: List[str],
default: Optional[str] = None,
) -> Optional[str]:
"""
Prompt the user to select from a list of choices.
Args:
message: The question to ask
choices: List of valid choices
default: Default choice if user presses Enter
Returns:
Selected choice or None if cancelled
"""
# Display choices
console.print(f"\n[bold cyan]{message}[/]")
for i, choice in enumerate(choices, 1):
default_marker = " [default]" if choice == default else ""
console.print(f" {i}. {choice}{default_marker}")
try:
response = input("\nEnter number or value: ").strip()
if not response and default:
return default
# Try as number first
try:
index = int(response) - 1
if 0 <= index < len(choices):
return choices[index]
except ValueError:
pass
# Try as exact match
if response in choices:
return response
# Try case-insensitive match
response_lower = response.lower()
for choice in choices:
if choice.lower() == response_lower:
return choice
console.print(f"[red]Invalid choice: {response}[/]")
return None
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Cancelled[/]")
return None
def prompt_input(
message: str,
default: Optional[str] = None,
password: bool = False,
required: bool = False,
) -> Optional[str]:
"""
Prompt the user for text input.
Args:
message: The prompt message
default: Default value if user presses Enter
password: Whether to hide input (for sensitive data)
required: Whether input is required (loops until provided)
Returns:
User input or default, None if cancelled
"""
prompt_text = message
if default:
prompt_text += f" [{default}]"
prompt_text += ": "
try:
while True:
if password:
import getpass
response = getpass.getpass(prompt_text)
else:
response = input(prompt_text).strip()
if not response:
if default:
return default
if required:
console.print("[yellow]Input required[/]")
continue
return None
return response
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Cancelled[/]")
return None
def prompt_number(
message: str,
min_value: Optional[int] = None,
max_value: Optional[int] = None,
default: Optional[int] = None,
) -> Optional[int]:
"""
Prompt the user for a numeric input.
Args:
message: The prompt message
min_value: Minimum allowed value
max_value: Maximum allowed value
default: Default value if user presses Enter
Returns:
Integer value or None if cancelled
"""
prompt_text = message
if default is not None:
prompt_text += f" [{default}]"
prompt_text += ": "
try:
while True:
response = input(prompt_text).strip()
if not response:
if default is not None:
return default
return None
try:
value = int(response)
except ValueError:
console.print("[red]Please enter a valid number[/]")
continue
if min_value is not None and value < min_value:
console.print(f"[red]Value must be at least {min_value}[/]")
continue
if max_value is not None and value > max_value:
console.print(f"[red]Value must be at most {max_value}[/]")
continue
return value
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Cancelled[/]")
return None
def prompt_selection(
items: List[T],
message: str = "Select an item",
display_func: Optional[callable] = None,
allow_cancel: bool = True,
) -> Optional[T]:
"""
Prompt the user to select an item from a list.
Args:
items: List of items to choose from
message: The selection prompt
display_func: Function to convert item to display string
allow_cancel: Whether to allow cancellation
Returns:
Selected item or None if cancelled
"""
if not items:
console.print("[yellow]No items to select[/]")
return None
display = display_func or str
console.print(f"\n[bold cyan]{message}[/]")
for i, item in enumerate(items, 1):
console.print(f" {i}. {display(item)}")
if allow_cancel:
console.print(f" 0. Cancel")
try:
while True:
response = input("\nEnter number: ").strip()
try:
index = int(response)
except ValueError:
console.print("[red]Please enter a valid number[/]")
continue
if allow_cancel and index == 0:
return None
if 1 <= index <= len(items):
return items[index - 1]
console.print(f"[red]Please enter a number between 1 and {len(items)}[/]")
except (EOFError, KeyboardInterrupt):
console.print("\n[yellow]Cancelled[/]")
return None
def prompt_copy_response(response: str) -> bool:
"""
Prompt user to copy a response to clipboard.
Args:
response: The response text
Returns:
True if copied, False otherwise
"""
try:
copy_choice = input("💾 Type 'c' to copy response, or press Enter to continue: ").strip().lower()
if copy_choice == "c":
try:
import pyperclip
pyperclip.copy(response)
console.print("[bold green]✅ Response copied to clipboard![/]")
return True
except ImportError:
console.print("[yellow]pyperclip not installed - cannot copy to clipboard[/]")
except Exception as e:
console.print(f"[red]Failed to copy: {e}[/]")
except (EOFError, KeyboardInterrupt):
pass
return False

View File

@@ -1,373 +0,0 @@
"""
Table utilities for oAI.
This module provides functions for creating and displaying
formatted tables with pagination support.
"""
import os
import sys
from typing import Any, Dict, List, Optional
from rich.panel import Panel
from rich.table import Table
from oai.ui.console import clear_screen, console
def create_model_table(
models: List[Dict[str, Any]],
show_capabilities: bool = True,
) -> Table:
"""
Create a table displaying available AI models.
Args:
models: List of model dictionaries
show_capabilities: Whether to show capability columns
Returns:
Rich Table with model information
"""
if show_capabilities:
table = Table(
"No.",
"Model ID",
"Context",
"Image",
"Online",
"Tools",
show_header=True,
header_style="bold magenta",
)
else:
table = Table(
"No.",
"Model ID",
"Context",
show_header=True,
header_style="bold magenta",
)
for i, model in enumerate(models, 1):
model_id = model.get("id", "Unknown")
context = model.get("context_length", 0)
context_str = f"{context:,}" if context else "-"
if show_capabilities:
# Get modalities and parameters
architecture = model.get("architecture", {})
input_modalities = architecture.get("input_modalities", [])
supported_params = model.get("supported_parameters", [])
has_image = "" if "image" in input_modalities else "-"
has_online = "" if "tools" in supported_params else "-"
has_tools = "" if "tools" in supported_params or "functions" in supported_params else "-"
table.add_row(
str(i),
model_id,
context_str,
has_image,
has_online,
has_tools,
)
else:
table.add_row(str(i), model_id, context_str)
return table
def create_stats_table(stats: Dict[str, Any]) -> Table:
"""
Create a table displaying session statistics.
Args:
stats: Dictionary with statistics data
Returns:
Rich Table with stats
"""
table = Table(
"Metric",
"Value",
show_header=True,
header_style="bold magenta",
)
# Token stats
if "input_tokens" in stats:
table.add_row("Input Tokens", f"{stats['input_tokens']:,}")
if "output_tokens" in stats:
table.add_row("Output Tokens", f"{stats['output_tokens']:,}")
if "total_tokens" in stats:
table.add_row("Total Tokens", f"{stats['total_tokens']:,}")
# Cost stats
if "total_cost" in stats:
table.add_row("Total Cost", f"${stats['total_cost']:.4f}")
if "avg_cost" in stats:
table.add_row("Avg Cost/Message", f"${stats['avg_cost']:.4f}")
# Message stats
if "message_count" in stats:
table.add_row("Messages", str(stats["message_count"]))
# Credits
if "credits_left" in stats:
table.add_row("Credits Left", stats["credits_left"])
return table
def create_help_table(commands: Dict[str, Dict[str, str]]) -> Table:
"""
Create a help table for commands.
Args:
commands: Dictionary of command info
Returns:
Rich Table with command help
"""
table = Table(
"Command",
"Description",
"Example",
show_header=True,
header_style="bold magenta",
show_lines=False,
)
for cmd, info in commands.items():
description = info.get("description", "")
example = info.get("example", "")
table.add_row(cmd, description, example)
return table
def create_folder_table(
folders: List[Dict[str, Any]],
gitignore_info: str = "",
) -> Table:
"""
Create a table for MCP folder listing.
Args:
folders: List of folder dictionaries
gitignore_info: Optional gitignore status info
Returns:
Rich Table with folder information
"""
table = Table(
"No.",
"Path",
"Files",
"Size",
show_header=True,
header_style="bold magenta",
)
for folder in folders:
number = str(folder.get("number", ""))
path = folder.get("path", "")
if folder.get("exists", True):
files = f"📁 {folder.get('file_count', 0)}"
size = f"{folder.get('size_mb', 0):.1f} MB"
else:
files = "[red]Not found[/red]"
size = "-"
table.add_row(number, path, files, size)
return table
def create_database_table(databases: List[Dict[str, Any]]) -> Table:
"""
Create a table for MCP database listing.
Args:
databases: List of database dictionaries
Returns:
Rich Table with database information
"""
table = Table(
"No.",
"Name",
"Tables",
"Size",
"Status",
show_header=True,
header_style="bold magenta",
)
for db in databases:
number = str(db.get("number", ""))
name = db.get("name", "")
table_count = f"{db.get('table_count', 0)} tables"
size = f"{db.get('size_mb', 0):.1f} MB"
if db.get("warning"):
status = f"[red]{db['warning']}[/red]"
else:
status = "[green]✓[/green]"
table.add_row(number, name, table_count, size, status)
return table
def display_paginated_table(
table: Table,
title: str,
terminal_height: Optional[int] = None,
) -> None:
"""
Display a table with pagination for large datasets.
Allows navigating through pages with keyboard input.
Press SPACE for next page, any other key to exit.
Args:
table: Rich Table to display
title: Title for the table
terminal_height: Override terminal height (auto-detected if None)
"""
# Get terminal dimensions
try:
term_height = terminal_height or os.get_terminal_size().lines - 8
except OSError:
term_height = 20
# Render table to segments
from rich.segment import Segment
segments = list(console.render(table))
# Group segments into lines
current_line_segments: List[Segment] = []
all_lines: List[List[Segment]] = []
for segment in segments:
if segment.text == "\n":
all_lines.append(current_line_segments)
current_line_segments = []
else:
current_line_segments.append(segment)
if current_line_segments:
all_lines.append(current_line_segments)
total_lines = len(all_lines)
# If table fits in one screen, just display it
if total_lines <= term_height:
console.print(Panel(table, title=title, title_align="left"))
return
# Extract header and footer lines
header_lines: List[List[Segment]] = []
data_lines: List[List[Segment]] = []
footer_line: List[Segment] = []
# Find header end (line after the header text with border)
header_end_index = 0
found_header_text = False
for i, line_segments in enumerate(all_lines):
has_header_style = any(
seg.style and ("bold" in str(seg.style) or "magenta" in str(seg.style))
for seg in line_segments
)
if has_header_style:
found_header_text = True
if found_header_text and i > 0:
line_text = "".join(seg.text for seg in line_segments)
if any(char in line_text for char in ["", "", "", "", "", ""]):
header_end_index = i
break
# Extract footer (bottom border)
if all_lines:
last_line_text = "".join(seg.text for seg in all_lines[-1])
if any(char in last_line_text for char in ["", "", "", "", "", ""]):
footer_line = all_lines[-1]
all_lines = all_lines[:-1]
# Split into header and data
if header_end_index > 0:
header_lines = all_lines[: header_end_index + 1]
data_lines = all_lines[header_end_index + 1 :]
else:
header_lines = all_lines[: min(3, len(all_lines))]
data_lines = all_lines[min(3, len(all_lines)) :]
lines_per_page = term_height - len(header_lines)
current_line = 0
page_number = 1
# Paginate
while current_line < len(data_lines):
clear_screen()
console.print(f"[bold cyan]{title} (Page {page_number})[/]")
# Print header
for line_segments in header_lines:
for segment in line_segments:
console.print(segment.text, style=segment.style, end="")
console.print()
# Print data rows for this page
end_line = min(current_line + lines_per_page, len(data_lines))
for line_segments in data_lines[current_line:end_line]:
for segment in line_segments:
console.print(segment.text, style=segment.style, end="")
console.print()
# Print footer
if footer_line:
for segment in footer_line:
console.print(segment.text, style=segment.style, end="")
console.print()
current_line = end_line
page_number += 1
# Prompt for next page
if current_line < len(data_lines):
console.print(
f"\n[dim yellow]--- Press SPACE for next page, "
f"or any other key to finish (Page {page_number - 1}, "
f"showing {end_line}/{len(data_lines)} data rows) ---[/dim yellow]"
)
try:
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
char = sys.stdin.read(1)
if char != " ":
break
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
except (ImportError, OSError, AttributeError):
# Fallback for non-Unix systems
try:
user_input = input()
if user_input.strip():
break
except (EOFError, KeyboardInterrupt):
break

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "oai" name = "oai"
version = "2.1.0" version = "3.0.0"
description = "OpenRouter AI Chat Client - A feature-rich terminal-based chat application" description = "OpenRouter AI Chat Client - A feature-rich terminal-based chat application"
readme = "README.md" readme = "README.md"
license = {text = "MIT"} license = {text = "MIT"}
@@ -44,10 +44,10 @@ dependencies = [
"markdown-it-py>=3.0.0", "markdown-it-py>=3.0.0",
"openrouter>=0.0.19", "openrouter>=0.0.19",
"packaging>=21.0", "packaging>=21.0",
"prompt-toolkit>=3.0.0",
"pyperclip>=1.8.0", "pyperclip>=1.8.0",
"requests>=2.28.0", "requests>=2.28.0",
"rich>=13.0.0", "rich>=13.0.0",
"textual>=0.50.0",
"typer>=0.9.0", "typer>=0.9.0",
"mcp>=1.0.0", "mcp>=1.0.0",
] ]
@@ -73,7 +73,7 @@ Documentation = "https://iurl.no/oai"
oai = "oai.cli:main" oai = "oai.cli:main"
[tool.setuptools] [tool.setuptools]
packages = ["oai", "oai.commands", "oai.config", "oai.core", "oai.mcp", "oai.providers", "oai.ui", "oai.utils"] packages = ["oai", "oai.commands", "oai.config", "oai.core", "oai.mcp", "oai.providers", "oai.tui", "oai.tui.widgets", "oai.tui.screens", "oai.utils"]
[tool.setuptools.package-data] [tool.setuptools.package-data]
oai = ["py.typed"] oai = ["py.typed"]