Files
oai/oai/utils/files.py
Rune Olsen b0cf88704e 2.1 (#2)
Final release of version 2.1.

Headlights:

### Core Features
- 🤖 Interactive chat with 300+ AI models via OpenRouter
- 🔍 Model selection with search and filtering
- 💾 Conversation save/load/export (Markdown, JSON, HTML)
- 📎 File attachments (images, PDFs, code files)
- 💰 Real-time cost tracking and credit monitoring
- 🎨 Rich terminal UI with syntax highlighting
- 📝 Persistent command history with search (Ctrl+R)
- 🌐 Online mode (web search capabilities)
- 🧠 Conversation memory toggle

### MCP Integration
- 🔧 **File Mode**: AI can read, search, and list local files
  - Automatic .gitignore filtering
  - Virtual environment exclusion
  - Large file handling (auto-truncates >50KB)

- ✍️ **Write Mode**: AI can modify files with permission
  - Create, edit, delete files
  - Move, copy, organize files
  - Always requires explicit opt-in

- 🗄️ **Database Mode**: AI can query SQLite databases
  - Read-only access (safe)
  - Schema inspection
  - Full SQL query support

Reviewed-on: #2
Co-authored-by: Rune Olsen <rune@rune.pm>
Co-committed-by: Rune Olsen <rune@rune.pm>
2026-02-03 09:02:44 +01:00

324 lines
9.6 KiB
Python

"""
File handling utilities for oAI.
This module provides safe file reading, type detection, and other
file-related operations used throughout the application.
"""
import os
import mimetypes
import base64
from pathlib import Path
from typing import Optional, Dict, Any, Tuple
from oai.constants import (
MAX_FILE_SIZE,
CONTENT_TRUNCATION_THRESHOLD,
SUPPORTED_CODE_EXTENSIONS,
ALLOWED_FILE_EXTENSIONS,
)
from oai.utils.logging import get_logger
def is_binary_file(file_path: Path) -> bool:
"""
Check if a file appears to be binary.
Args:
file_path: Path to the file to check
Returns:
True if the file appears to be binary, False otherwise
"""
try:
with open(file_path, "rb") as f:
# Read first 8KB to check for binary content
chunk = f.read(8192)
# Check for null bytes (common in binary files)
if b"\x00" in chunk:
return True
# Try to decode as UTF-8
try:
chunk.decode("utf-8")
return False
except UnicodeDecodeError:
return True
except Exception:
return True
def get_file_type(file_path: Path) -> Tuple[Optional[str], str]:
"""
Determine the MIME type and category of a file.
Args:
file_path: Path to the file
Returns:
Tuple of (mime_type, category) where category is one of:
'image', 'pdf', 'code', 'text', 'binary', 'unknown'
"""
mime_type, _ = mimetypes.guess_type(str(file_path))
ext = file_path.suffix.lower()
if mime_type and mime_type.startswith("image/"):
return mime_type, "image"
elif mime_type == "application/pdf" or ext == ".pdf":
return mime_type or "application/pdf", "pdf"
elif ext in SUPPORTED_CODE_EXTENSIONS:
return mime_type or "text/plain", "code"
elif mime_type and mime_type.startswith("text/"):
return mime_type, "text"
elif is_binary_file(file_path):
return mime_type, "binary"
else:
return mime_type, "unknown"
def read_file_safe(
file_path: Path,
max_size: int = MAX_FILE_SIZE,
truncate_threshold: int = CONTENT_TRUNCATION_THRESHOLD
) -> Dict[str, Any]:
"""
Safely read a file with size limits and truncation support.
Args:
file_path: Path to the file to read
max_size: Maximum file size to read (bytes)
truncate_threshold: Threshold for truncating large files
Returns:
Dictionary containing:
- content: File content (text or base64)
- size: File size in bytes
- truncated: Whether content was truncated
- encoding: 'text', 'base64', or None on error
- error: Error message if reading failed
"""
logger = get_logger()
try:
path = Path(file_path).resolve()
if not path.exists():
return {
"content": None,
"size": 0,
"truncated": False,
"encoding": None,
"error": f"File not found: {path}"
}
if not path.is_file():
return {
"content": None,
"size": 0,
"truncated": False,
"encoding": None,
"error": f"Not a file: {path}"
}
file_size = path.stat().st_size
if file_size > max_size:
return {
"content": None,
"size": file_size,
"truncated": False,
"encoding": None,
"error": f"File too large: {file_size / (1024*1024):.1f}MB (max: {max_size / (1024*1024):.0f}MB)"
}
# Try to read as text first
try:
content = path.read_text(encoding="utf-8")
# Check if truncation is needed
if file_size > truncate_threshold:
lines = content.split("\n")
total_lines = len(lines)
# Keep first 500 lines and last 100 lines
head_lines = 500
tail_lines = 100
if total_lines > (head_lines + tail_lines):
truncated_content = (
"\n".join(lines[:head_lines]) +
f"\n\n... [TRUNCATED: {total_lines - head_lines - tail_lines} lines omitted] ...\n\n" +
"\n".join(lines[-tail_lines:])
)
logger.info(f"Read file (truncated): {path} ({file_size} bytes, {total_lines} lines)")
return {
"content": truncated_content,
"size": file_size,
"truncated": True,
"total_lines": total_lines,
"lines_shown": head_lines + tail_lines,
"encoding": "text",
"error": None
}
logger.info(f"Read file: {path} ({file_size} bytes)")
return {
"content": content,
"size": file_size,
"truncated": False,
"encoding": "text",
"error": None
}
except UnicodeDecodeError:
# File is binary, return base64 encoded
with open(path, "rb") as f:
binary_data = f.read()
b64_content = base64.b64encode(binary_data).decode("utf-8")
logger.info(f"Read binary file: {path} ({file_size} bytes)")
return {
"content": b64_content,
"size": file_size,
"truncated": False,
"encoding": "base64",
"error": None
}
except PermissionError as e:
return {
"content": None,
"size": 0,
"truncated": False,
"encoding": None,
"error": f"Permission denied: {e}"
}
except Exception as e:
logger.error(f"Error reading file {file_path}: {e}")
return {
"content": None,
"size": 0,
"truncated": False,
"encoding": None,
"error": str(e)
}
def get_file_extension(file_path: Path) -> str:
"""
Get the lowercase file extension.
Args:
file_path: Path to the file
Returns:
Lowercase extension including the dot (e.g., '.py')
"""
return file_path.suffix.lower()
def is_allowed_extension(file_path: Path) -> bool:
"""
Check if a file has an allowed extension for attachment.
Args:
file_path: Path to the file
Returns:
True if the extension is allowed, False otherwise
"""
return get_file_extension(file_path) in ALLOWED_FILE_EXTENSIONS
def format_file_size(size_bytes: int) -> str:
"""
Format a file size in human-readable format.
Args:
size_bytes: Size in bytes
Returns:
Formatted string (e.g., '1.5 MB', '512 KB')
"""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if abs(size_bytes) < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} PB"
def prepare_file_attachment(
file_path: Path,
model_capabilities: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Prepare a file for attachment to an API request.
Args:
file_path: Path to the file
model_capabilities: Model capability information
Returns:
Content block dictionary for the API, or None if unsupported
"""
logger = get_logger()
path = Path(file_path).resolve()
if not path.exists():
logger.warning(f"File not found: {path}")
return None
mime_type, category = get_file_type(path)
file_size = path.stat().st_size
if file_size > MAX_FILE_SIZE:
logger.warning(f"File too large: {path} ({format_file_size(file_size)})")
return None
try:
with open(path, "rb") as f:
file_data = f.read()
if category == "image":
# Check if model supports images
input_modalities = model_capabilities.get("architecture", {}).get("input_modalities", [])
if "image" not in input_modalities:
logger.warning(f"Model does not support images")
return None
b64_data = base64.b64encode(file_data).decode("utf-8")
return {
"type": "image_url",
"image_url": {"url": f"data:{mime_type};base64,{b64_data}"}
}
elif category == "pdf":
# Check if model supports PDFs
input_modalities = model_capabilities.get("architecture", {}).get("input_modalities", [])
supports_pdf = any(mod in input_modalities for mod in ["document", "pdf", "file"])
if not supports_pdf:
logger.warning(f"Model does not support PDFs")
return None
b64_data = base64.b64encode(file_data).decode("utf-8")
return {
"type": "image_url",
"image_url": {"url": f"data:application/pdf;base64,{b64_data}"}
}
elif category in ("code", "text"):
text_content = file_data.decode("utf-8")
return {
"type": "text",
"text": f"File: {path.name}\n\n{text_content}"
}
else:
logger.warning(f"Unsupported file type: {category} ({mime_type})")
return None
except UnicodeDecodeError:
logger.error(f"Cannot decode file as UTF-8: {path}")
return None
except Exception as e:
logger.error(f"Error preparing file attachment {path}: {e}")
return None