1 Commits
1.7 ... 1.8

Author SHA1 Message Date
53b6ae3a76 Added some more function. E.g. use of models. 2025-12-17 14:43:47 +01:00

265
oai.py
View File

@@ -82,7 +82,7 @@ app_logger.setLevel(logging.INFO)
# DB configuration
database = config_dir / 'oai_config.db'
DB_FILE = str(database)
version = '1.7'
version = '1.8'
def create_table_if_not_exists():
"""Ensure the config and conversation_sessions tables exist."""
@@ -347,6 +347,166 @@ def clear_screen():
except:
print("\n" * 100)
def has_web_search_capability(model: Dict[str, Any]) -> bool:
"""Check if model supports web search based on supported_parameters."""
supported_params = model.get("supported_parameters", [])
# Web search is typically indicated by 'tools' parameter support
return "tools" in supported_params
def has_image_capability(model: Dict[str, Any]) -> bool:
"""Check if model supports image input based on input modalities."""
architecture = model.get("architecture", {})
input_modalities = architecture.get("input_modalities", [])
return "image" in input_modalities
def supports_online_mode(model: Dict[str, Any]) -> bool:
"""Check if model supports :online suffix for web search."""
# Models that support tools parameter can use :online
return has_web_search_capability(model)
def get_effective_model_id(base_model_id: str, online_enabled: bool) -> str:
"""Get the effective model ID with :online suffix if enabled."""
if online_enabled and not base_model_id.endswith(':online'):
return f"{base_model_id}:online"
return base_model_id
def display_paginated_table(table: Table, title: str):
"""Display a table with pagination support using Rich console for colored output, repeating header on each page."""
# Get terminal height (subtract some lines for prompt and margins)
try:
terminal_height = os.get_terminal_size().lines - 8
except:
terminal_height = 20 # Fallback if terminal size can't be determined
# Create a segment-based approach to capture Rich-rendered output
from rich.segment import Segment
# Render the table to segments
segments = list(console.render(table))
# Convert segments to lines while preserving style
current_line_segments = []
all_lines = []
for segment in segments:
if segment.text == '\n':
all_lines.append(current_line_segments)
current_line_segments = []
else:
current_line_segments.append(segment)
# Add last line if not empty
if current_line_segments:
all_lines.append(current_line_segments)
total_lines = len(all_lines)
# If fits on one screen after segment analysis
if total_lines <= terminal_height:
console.print(Panel(table, title=title, title_align="left"))
return
# Separate header from data rows
# Typically the first 3 lines are: top border, header row, separator
header_lines = []
data_lines = []
# Find where the header ends (usually after the first horizontal line after header text)
header_end_index = 0
found_header_text = False
for i, line_segments in enumerate(all_lines):
# Check if this line contains header-style text (bold/magenta usually)
has_header_style = any(
seg.style and ('bold' in str(seg.style) or 'magenta' in str(seg.style))
for seg in line_segments
)
if has_header_style:
found_header_text = True
# After finding header text, the next line with box-drawing chars is the separator
if found_header_text and i > 0:
line_text = ''.join(seg.text for seg in line_segments)
# Check for horizontal line characters (─ ━ ╌ etc.)
if any(char in line_text for char in ['', '', '', '', '', '']):
header_end_index = i
break
# If we found a header separator, split there
if header_end_index > 0:
header_lines = all_lines[:header_end_index + 1] # Include the separator
data_lines = all_lines[header_end_index + 1:]
else:
# Fallback: assume first 3 lines are header
header_lines = all_lines[:min(3, len(all_lines))]
data_lines = all_lines[min(3, len(all_lines)):]
# Calculate how many data lines fit per page (accounting for header)
lines_per_page = terminal_height - len(header_lines)
# Display with pagination
current_line = 0
page_number = 1
while current_line < len(data_lines):
# Clear screen for each page
clear_screen()
# Print title
console.print(f"[bold cyan]{title} (Page {page_number})[/]")
# Print header on every page
for line_segments in header_lines:
for segment in line_segments:
console.print(segment.text, style=segment.style, end="")
console.print() # New line after each row
# Calculate how many data lines to show on this page
end_line = min(current_line + lines_per_page, len(data_lines))
# Print data lines for this page
for line_segments in data_lines[current_line:end_line]:
for segment in line_segments:
console.print(segment.text, style=segment.style, end="")
console.print() # New line after each row
# Update position
current_line = end_line
page_number += 1
# If there's more content, wait for user
if current_line < len(data_lines):
console.print(f"\n[dim yellow]--- Press SPACE for next page, or any other key to finish (Page {page_number - 1}, showing {end_line}/{len(data_lines)} data rows) ---[/dim yellow]")
try:
import sys
import tty
import termios
# Save terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Set terminal to raw mode to read single character
tty.setraw(fd)
char = sys.stdin.read(1)
# If not space, break pagination
if char != ' ':
break
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
except:
# Fallback for Windows or if termios not available
input_char = input().strip()
if input_char != '':
break
else:
# No more content
break
@app.command()
def chat():
global API_KEY, OPENROUTER_BASE_URL, STREAM_ENABLED, MAX_TOKEN, COST_WARNING_THRESHOLD
@@ -362,6 +522,7 @@ def chat():
conversation_memory_enabled = True # Memory ON by default
memory_start_index = 0 # Track when memory was last enabled
saved_conversations_cache = [] # Cache for /list results to use with /load by number
online_mode_enabled = False # Online mode (web search) disabled by default
app_logger.info("Starting new chat session with memory enabled") # Log session start
@@ -426,6 +587,39 @@ def chat():
console.print("[bold green]Retrying last prompt...[/]")
app_logger.info(f"Retrying prompt: {last_prompt[:100]}...")
user_input = last_prompt
elif user_input.lower().startswith("/online"):
args = user_input[8:].strip()
if not args:
status = "enabled" if online_mode_enabled else "disabled"
console.print(f"[bold blue]Online mode (web search) {status}.[/]")
if selected_model:
if supports_online_mode(selected_model):
console.print(f"[dim green]Current model '{selected_model['name']}' supports online mode.[/]")
else:
console.print(f"[dim yellow]Current model '{selected_model['name']}' does not support online mode.[/]")
continue
if args.lower() == "on":
if not selected_model:
console.print("[bold red]No model selected. Select a model first with '/model'.[/]")
continue
if not supports_online_mode(selected_model):
console.print(f"[bold red]Model '{selected_model['name']}' does not support online mode (web search).[/]")
console.print("[dim yellow]Online mode requires models with 'tools' parameter support.[/]")
app_logger.warning(f"Online mode activation failed - model {selected_model['id']} doesn't support it")
continue
online_mode_enabled = True
console.print("[bold green]Online mode enabled. Model will use web search capabilities.[/]")
console.print(f"[dim blue]Effective model ID: {get_effective_model_id(selected_model['id'], True)}[/]")
app_logger.info(f"Online mode enabled for model {selected_model['id']}")
elif args.lower() == "off":
online_mode_enabled = False
console.print("[bold green]Online mode disabled. Model will not use web search.[/]")
if selected_model:
console.print(f"[dim blue]Effective model ID: {selected_model['id']}[/]")
app_logger.info("Online mode disabled")
else:
console.print("[bold yellow]Usage: /online on|off (or /online to view status)[/]")
continue
elif user_input.lower().startswith("/memory"):
args = user_input[8:].strip()
if not args:
@@ -755,7 +949,7 @@ def chat():
console.print(f"[bold red]Model '{args}' not found.[/]")
continue
# Display model info (unchanged)
# Display model info
pricing = model_to_show.get("pricing", {})
architecture = model_to_show.get("architecture", {})
supported_params = ", ".join(model_to_show.get("supported_parameters", [])) or "None"
@@ -773,6 +967,7 @@ def chat():
table.add_row("Input Modalities", ", ".join(architecture.get("input_modalities", [])) or "None")
table.add_row("Output Modalities", ", ".join(architecture.get("output_modalities", [])) or "None")
table.add_row("Supported Parameters", supported_params)
table.add_row("Online Mode Support", "Yes" if supports_online_mode(model_to_show) else "No")
table.add_row("Top Provider Context Length", str(top_provider.get("context_length", "N/A")))
table.add_row("Max Completion Tokens", str(top_provider.get("max_completion_tokens", "N/A")))
table.add_row("Moderated", "Yes" if top_provider.get("is_moderated", False) else "No")
@@ -780,7 +975,7 @@ def chat():
console.print(Panel(table, title=f"[bold green]Model Info: {model_to_show['name']}[/]", title_align="left"))
continue
# Model selection (unchanged but with logging)
# Model selection with colored checkmarks (removed Web column)
elif user_input.startswith("/model"):
app_logger.info("User initiated model selection")
args = user_input[7:].strip()
@@ -791,10 +986,17 @@ def chat():
if not filtered_models:
console.print(f"[bold red]No models match '{search_term}'. Try '/model'.[/]")
continue
table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta")
# Create table with colored checkmarks (removed Web column)
table = Table("No.", "Name", "ID", "Image", show_header=True, header_style="bold magenta")
for i, model in enumerate(filtered_models, 1):
table.add_row(str(i), model["name"], model["id"])
console.print(Panel(table, title=f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left"))
image_support = "[green]✓[/green]" if has_image_capability(model) else "[red]✗[/red]"
table.add_row(str(i), model["name"], model["id"], image_support)
# Use pagination for the table
title = f"[bold green]Available Models ({'All' if not search_term else f'Search: {search_term}'})[/]"
display_paginated_table(table, title)
while True:
try:
choice = int(typer.prompt("Enter model number (or 0 to cancel)"))
@@ -802,7 +1004,13 @@ def chat():
break
if 1 <= choice <= len(filtered_models):
selected_model = filtered_models[choice - 1]
# Disable online mode when switching models (user must re-enable)
if online_mode_enabled:
online_mode_enabled = False
console.print("[dim yellow]Note: Online mode auto-disabled when changing models.[/]")
console.print(f"[bold cyan]Selected: {selected_model['name']} ({selected_model['id']})[/]")
if supports_online_mode(selected_model):
console.print("[dim green]This model supports online mode. Use '/online on' to enable web search.[/]")
app_logger.info(f"Model selected: {selected_model['name']} ({selected_model['id']})")
break
console.print("[bold red]Invalid choice. Try again.[/]")
@@ -926,10 +1134,17 @@ def chat():
if not filtered_models:
console.print(f"[bold red]No models match '{search_term}'. Try without search.[/]")
continue
table = Table("No.", "Name", "ID", show_header=True, header_style="bold magenta")
# Create table with colored checkmarks (removed Web column)
table = Table("No.", "Name", "ID", "Image", show_header=True, header_style="bold magenta")
for i, model in enumerate(filtered_models, 1):
table.add_row(str(i), model["name"], model["id"])
console.print(Panel(table, title=f"[bold green]Available Models for Default ({'All' if not search_term else f'Search: {search_term}'})[/]", title_align="left"))
image_support = "[green]✓[/green]" if has_image_capability(model) else "[red]✗[/red]"
table.add_row(str(i), model["name"], model["id"], image_support)
# Use pagination for the table
title = f"[bold green]Available Models for Default ({'All' if not search_term else f'Search: {search_term}'})[/]"
display_paginated_table(table, title)
while True:
try:
choice = int(typer.prompt("Enter model number (or 0 to cancel)"))
@@ -956,6 +1171,7 @@ def chat():
table.add_row("Streaming", "Enabled" if STREAM_ENABLED == "on" else "Disabled")
table.add_row("Default Model", DEFAULT_MODEL_ID or "[Not set]")
table.add_row("Current Model", "[Not set]" if selected_model is None else str(selected_model["name"]))
table.add_row("Online Mode", "Enabled" if online_mode_enabled else "Disabled")
table.add_row("Max Token", str(MAX_TOKEN))
table.add_row("Session Token", "[Not set]" if session_max_token == 0 else str(session_max_token))
table.add_row("Session System Prompt", session_system_prompt or "[Not set]")
@@ -998,6 +1214,8 @@ def chat():
token_value = session_max_token if session_max_token != 0 else " Not set"
console.print(f"[bold cyan]Token limits: Max= {MAX_TOKEN}, Session={token_value}[/]")
console.print("[bold blue]Active model[/] [bold red]%s[/]" %(str(selected_model["name"]) if selected_model else "None"))
if online_mode_enabled:
console.print("[bold cyan]Online mode: Enabled (web search active)[/]")
continue
if user_input.lower() == "/help":
@@ -1029,6 +1247,11 @@ def chat():
"View the next response in history.",
"/next"
)
help_table.add_row(
"/online [on|off]",
"Enable/disable online mode (web search) for current model. Only works with models that support tools.",
"/online on\n/online off"
)
help_table.add_row(
"/paste [prompt]",
"Paste plain text/code from clipboard and send to AI. Optional prompt can be added.",
@@ -1058,12 +1281,12 @@ def chat():
)
help_table.add_row(
"/info [model_id]",
"Display detailed info (pricing, modalities, context length, etc.) for current or specified model.",
"Display detailed info (pricing, modalities, context length, online support, etc.) for current or specified model.",
"/info\n/info gpt-4o"
)
help_table.add_row(
"/model [search]",
"Select or change the current model for the session. Supports searching by name or ID.",
"Select or change the current model for the session. Supports searching by name or ID. Shows image capabilities.",
"/model\n/model gpt"
)
@@ -1095,7 +1318,7 @@ def chat():
)
help_table.add_row(
"/config model [search]",
"Set default model that loads on startup. Doesn't change current session model.",
"Set default model that loads on startup. Doesn't change current session model. Shows image capabilities.",
"/config model gpt"
)
help_table.add_row(
@@ -1324,9 +1547,12 @@ def chat():
# Add current user message
api_messages.append({"role": "user", "content": message_content})
# Get effective model ID with :online suffix if enabled
effective_model_id = get_effective_model_id(selected_model["id"], online_mode_enabled)
# Build API params with app identification headers (using http_headers)
api_params = {
"model": selected_model["id"],
"model": effective_model_id,
"messages": api_messages,
"stream": STREAM_ENABLED == "on",
"http_headers": {
@@ -1343,12 +1569,15 @@ def chat():
file_count = len(file_attachments)
history_messages_count = len(session_history) - memory_start_index if conversation_memory_enabled else 0
memory_status = "ON" if conversation_memory_enabled else "OFF"
app_logger.info(f"API Request: Model '{selected_model['id']}', Prompt length: {len(text_part)} chars, {file_count} file(s) attached, Memory: {memory_status}, History sent: {history_messages_count} messages, Transforms: middle-out {'enabled' if middle_out_enabled else 'disabled'}, App: {APP_NAME} ({APP_URL}).")
online_status = "ON" if online_mode_enabled else "OFF"
app_logger.info(f"API Request: Model '{effective_model_id}' (Online: {online_status}), Prompt length: {len(text_part)} chars, {file_count} file(s) attached, Memory: {memory_status}, History sent: {history_messages_count} messages, Transforms: middle-out {'enabled' if middle_out_enabled else 'disabled'}, App: {APP_NAME} ({APP_URL}).")
# Send and handle response with metrics and timing
is_streaming = STREAM_ENABLED == "on"
if is_streaming:
console.print("[bold green]Streaming response...[/] [dim](Press Ctrl+C to cancel)[/]")
if online_mode_enabled:
console.print("[dim cyan]🌐 Online mode active - model has web search access[/]")
console.print("") # Add spacing before response
else:
console.print("[bold green]Thinking...[/]", end="\r")
@@ -1356,7 +1585,7 @@ def chat():
start_time = time.time() # Start timing request
try:
response = client.chat.send(**api_params)
app_logger.info(f"API call successful for model '{selected_model['id']}'")
app_logger.info(f"API call successful for model '{effective_model_id}'")
except Exception as e:
console.print(f"[bold red]Error sending request: {e}[/]")
app_logger.error(f"API Error: {type(e).__name__}: {e}")
@@ -1413,7 +1642,7 @@ def chat():
message_count += 1
# Log response metrics
app_logger.info(f"Response: Tokens - I:{input_tokens} O:{output_tokens} T:{input_tokens + output_tokens}, Cost: ${msg_cost:.4f}, Time: {response_time:.2f}s")
app_logger.info(f"Response: Tokens - I:{input_tokens} O:{output_tokens} T:{input_tokens + output_tokens}, Cost: ${msg_cost:.4f}, Time: {response_time:.2f}s, Online: {online_mode_enabled}")
# Per-message metrics display with context info
if conversation_memory_enabled:
@@ -1421,7 +1650,9 @@ def chat():
context_info = f", Context: {context_count} msg(s)" if context_count > 1 else ""
else:
context_info = ", Memory: OFF"
console.print(f"\n[dim blue]📊 Metrics: {input_tokens + output_tokens} tokens | ${msg_cost:.4f} | {response_time:.2f}s{context_info} | Session: {total_input_tokens + total_output_tokens} tokens | ${total_cost:.4f}[/]")
online_info = " 🌐" if online_mode_enabled else ""
console.print(f"\n[dim blue]📊 Metrics: {input_tokens + output_tokens} tokens | ${msg_cost:.4f} | {response_time:.2f}s{context_info}{online_info} | Session: {total_input_tokens + total_output_tokens} tokens | ${total_cost:.4f}[/]")
# Cost and credit alerts
warnings = []