diff --git a/README.md b/README.md index c07b3af..64f3220 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,14 @@ oAI is a feature-rich command-line chat application that provides an interactive - Supports code files, text, JSON, YAML, and more - Large file handling (auto-truncates >50KB) +- โœ๏ธ **Write Mode** (NEW!): AI can modify files with your permission + - Create and edit files within allowed folders + - Delete files (always requires confirmation) + - Move, copy, and organize files + - Create directories + - Ignores .gitignore for write operations + - OFF by default - explicit opt-in required + - ๐Ÿ—„๏ธ **Database Mode**: AI can query your SQLite databases - Read-only access (no data modification possible) - Schema inspection (tables, columns, indexes) @@ -39,6 +47,8 @@ oAI is a feature-rich command-line chat application that provides an interactive - ๐Ÿ”’ **Security Features**: - Explicit folder/database approval required - System directory blocking + - Write mode OFF by default (non-persistent) + - Delete operations always require user confirmation - Read-only database access - SQL injection protection - Query timeout (5 seconds) @@ -133,13 +143,18 @@ oai You> /model # Enable MCP for file access -You> /mcp enable +You> /mcp on You> /mcp add ~/Documents -# Ask AI to help with files +# Ask AI to help with files (read-only) [๐Ÿ”ง MCP: Files] You> List all Python files in Documents [๐Ÿ”ง MCP: Files] You> Read and explain main.py +# Enable write mode to let AI modify files +You> /mcp write on +[๐Ÿ”งโœ๏ธ MCP: Files+Write] You> Create a new Python file with helper functions +[๐Ÿ”งโœ๏ธ MCP: Files+Write] You> Refactor main.py to use async/await + # Switch to database mode You> /mcp add db ~/myapp/data.db You> /mcp db 1 @@ -153,7 +168,7 @@ You> /mcp db 1 **Setup:** ```bash -/mcp enable # Start MCP server +/mcp on # Start MCP server /mcp add ~/Projects # Grant access to folder /mcp add ~/Documents # Add another folder /mcp list # View all allowed folders @@ -167,16 +182,26 @@ You> /mcp db 1 "What's in my Documents folder?" ``` -**Available Tools:** +**Available Tools (Read-Only):** - `read_file` - Read complete file contents - `list_directory` - List files/folders (recursive optional) - `search_files` - Search by name or content +**Available Tools (Write Mode - requires `/mcp write on`):** +- `write_file` - Create new files or overwrite existing ones +- `edit_file` - Find and replace text in existing files +- `delete_file` - Delete files (always requires confirmation) +- `create_directory` - Create directories +- `move_file` - Move or rename files +- `copy_file` - Copy files to new locations + **Features:** -- โœ… Automatic .gitignore filtering +- โœ… Automatic .gitignore filtering (read operations only) - โœ… Skips virtual environments (venv, node_modules) - โœ… Handles large files (auto-truncates >50KB) - โœ… Cross-platform (macOS, Linux, Windows via WSL) +- โœ… Write mode OFF by default for safety +- โœ… Delete operations require user confirmation with LLM's reason ### Database Mode @@ -210,13 +235,41 @@ You> /mcp db 1 - โœ… WHERE, GROUP BY, HAVING, ORDER BY, LIMIT - โŒ INSERT/UPDATE/DELETE (blocked for safety) +### Write Mode + +**Enable Write Mode:** +```bash +/mcp write on # Enable write mode (shows warning, requires confirmation) +``` + +**Natural Language Usage:** +``` +"Create a new Python file called utils.py with helper functions" +"Edit main.py and replace the old API endpoint with the new one" +"Delete the backup.old file" (will prompt for confirmation) +"Create a directory called tests" +"Move config.json to the config folder" +``` + +**Important:** +- โš ๏ธ Write mode is **OFF by default** and resets each session +- โš ๏ธ Delete operations **always** require user confirmation +- โš ๏ธ All operations are limited to allowed MCP folders +- โœ… Write operations ignore .gitignore (can write to any file in allowed folders) + +**Disable Write Mode:** +```bash +/mcp write off # Disable write mode (back to read-only) +``` + ### Mode Management ```bash -/mcp status # Show current mode, stats, folders/databases +/mcp status # Show current mode, write mode, stats, folders/databases /mcp files # Switch to file mode /mcp db # Switch to database mode /mcp gitignore on # Enable .gitignore filtering (default) +/mcp write on|off # Enable/disable write mode /mcp remove 2 # Remove folder/database by number ``` @@ -238,9 +291,9 @@ You> /mcp db 1 ### MCP Commands ``` -/mcp enable Start MCP server -/mcp disable Stop MCP server -/mcp status Show comprehensive status +/mcp on Start MCP server +/mcp off Stop MCP server +/mcp status Show comprehensive status (includes write mode) /mcp add Add folder for file access /mcp add db Add SQLite database /mcp list List all folders @@ -249,6 +302,8 @@ You> /mcp db 1 /mcp files Switch to file mode /mcp remove Remove folder/database /mcp gitignore on Enable .gitignore filtering +/mcp write on Enable write mode (create/edit/delete files) +/mcp write off Disable write mode (read-only) ``` ### Model Commands @@ -356,7 +411,11 @@ PDF (models with document support) - โœ… Database opened in `mode=ro` ### File System Safety -- โœ… Read-only access (no write/delete) +- โœ… Read-only by default (write mode requires explicit opt-in) +- โœ… Write mode OFF by default each session (non-persistent) +- โœ… Delete operations always require user confirmation +- โœ… Write operations limited to allowed folders only +- โœ… System directories blocked - โœ… Virtual environment exclusion - โœ… Build artifact filtering - โœ… Maximum file size (10 MB) @@ -444,14 +503,19 @@ ls -la database.db ## Version History -### v2.1.0-beta (Current) +### v2.1.0-RC1 (Current) - โœจ **NEW**: MCP (Model Context Protocol) integration - โœจ **NEW**: File system access (read, search, list) +- โœจ **NEW**: Write mode - AI can create, edit, and delete files + - 6 write tools: write_file, edit_file, delete_file, create_directory, move_file, copy_file + - OFF by default - requires explicit `/mcp write on` activation + - Delete operations always require user confirmation + - Non-persistent setting (resets each session) - โœจ **NEW**: SQLite database querying (read-only) - โœจ **NEW**: Dual mode support (Files & Database) - โœจ **NEW**: .gitignore filtering - โœจ **NEW**: Binary data handling in databases -- โœจ **NEW**: Mode indicators in prompt +- โœจ **NEW**: Mode indicators in prompt (shows โœ๏ธ when write mode active) - โœจ **NEW**: Comprehensive `/help mcp` guide - ๐Ÿ”ง Improved error handling for tool calls - ๐Ÿ”ง Enhanced logging for MCP operations diff --git a/oai.py b/oai.py index 339c744..746f759 100644 --- a/oai.py +++ b/oai.py @@ -27,6 +27,9 @@ from prompt_toolkit import PromptSession from prompt_toolkit.history import FileHistory from rich.logging import RichHandler from prompt_toolkit.auto_suggest import AutoSuggestFromHistory +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit.filters import Condition +from prompt_toolkit.application.current import get_app from packaging import version as pkg_version import io import platform @@ -45,7 +48,7 @@ except ImportError: print("Warning: MCP library not found. Install with: pip install mcp") # App version -version = '2.1.0-beta' +version = '2.1.0-RC1' app = typer.Typer() @@ -112,7 +115,7 @@ MCP (Model Context Protocol) gives your AI assistant direct access to: โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ SETUP: - /mcp enable Start MCP server + /mcp on Start MCP server /mcp add ~/Documents Grant access to folder /mcp add ~/Code/project Add another folder /mcp list View all allowed folders @@ -135,6 +138,41 @@ MANAGEMENT: /mcp gitignore on|off Toggle .gitignore filtering /mcp status Show comprehensive status +โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ +โ”‚ โœ๏ธ WRITE MODE (OPTIONAL) โ”‚ +โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ + +ENABLE WRITE MODE: + /mcp write on Enable file modifications (requires confirmation) + /mcp write off Disable write mode (back to read-only) + +WHAT WRITE MODE ALLOWS: + โ€ข Create new files or overwrite existing ones + โ€ข Edit files (find and replace text) + โ€ข Delete files (always requires confirmation) + โ€ข Create directories + โ€ข Move and copy files + +USAGE (after enabling write mode): + "Create a new Python file called utils.py" + "Edit main.py and update the API endpoint" + "Delete the old backup.txt file" + "Create a tests directory" + "Move config.json to the configs folder" + +SAFETY FEATURES: + โœ“ OFF by default (resets each session) + โœ“ Requires explicit activation with user confirmation + โœ“ Delete operations ALWAYS require user confirmation + โœ“ Limited to allowed MCP folders only + โœ“ Ignores .gitignore (can write to any file in allowed folders) + โœ“ System directories remain blocked + +IMPORTANT: + โš ๏ธ Write mode is powerful - use with caution! + โš ๏ธ Always review what the AI plans to modify + โš ๏ธ Deletions will show file details + AI's reason for confirmation + โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚ ๐Ÿ—„๏ธ DATABASE MODE โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ @@ -176,15 +214,18 @@ SAFETY: โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ MODE INDICATORS: - [๐Ÿ”ง MCP: Files] You're in file mode - [๐Ÿ—„๏ธ MCP: DB #1] You're querying database #1 + [๐Ÿ”ง MCP: Files] You're in file mode (read-only) + [๐Ÿ”งโœ๏ธ MCP: Files+Write] You're in file mode with write permissions + [๐Ÿ—„๏ธ MCP: DB #1] You're querying database #1 QUICK REFERENCE: - /mcp status See current mode, stats, folders/databases + /mcp status See current mode, write mode, stats, folders/databases /mcp files Switch to file mode (default) /mcp db Switch to database mode /mcp db list List all databases with details /mcp list List all folders + /mcp write on Enable write mode (create/edit/delete files) + /mcp write off Disable write mode (read-only) TROUBLESHOOTING: โ€ข No results? Check /mcp status to see what's accessible @@ -194,7 +235,9 @@ TROUBLESHOOTING: SECURITY NOTES: โ€ข MCP only accesses explicitly added folders/databases - โ€ข File mode: read-only access (no write/delete) + โ€ข File mode: read-only by default (write mode requires opt-in) + โ€ข Write mode: OFF by default, resets each session + โ€ข Delete operations: Always require user confirmation โ€ข Database mode: SELECT queries only (no modifications) โ€ข System directories are blocked automatically โ€ข Each addition requires your explicit confirmation @@ -206,8 +249,8 @@ For command-specific help: /help /mcp 'description': 'Manage MCP (Model Context Protocol) for local file access and SQLite database querying. When enabled with a function-calling model, AI can automatically search, read, list files, and query databases. Supports two modes: Files (default) and Database.', 'usage': '/mcp [args]', 'examples': [ - ('Enable MCP server', '/mcp enable'), - ('Disable MCP server', '/mcp disable'), + ('Enable MCP server', '/mcp on'), + ('Disable MCP server', '/mcp off'), ('Show MCP status and current mode', '/mcp status'), ('', ''), ('โ”โ”โ” FILE MODE โ”โ”โ”', ''), @@ -216,6 +259,8 @@ For command-specific help: /help /mcp ('Remove folder by number', '/mcp remove 2'), ('List allowed folders', '/mcp list'), ('Toggle .gitignore filtering', '/mcp gitignore on'), + ('Enable write mode', '/mcp write on'), + ('Disable write mode', '/mcp write off'), ('', ''), ('โ”โ”โ” DATABASE MODE โ”โ”โ”', ''), ('Add SQLite database', '/mcp add db ~/app/data.db'), @@ -227,10 +272,18 @@ For command-specific help: /help /mcp 'notes': '''MCP allows AI to read local files and query SQLite databases. Works automatically with function-calling models (GPT-4, Claude, etc.). FILE MODE (default): +- Read-only by default (write mode requires opt-in) - Automatically loads and respects .gitignore patterns - Skips virtual environments and build artifacts - Supports search, read, and list operations +WRITE MODE (optional): +- Enable with /mcp write on (requires confirmation) +- Allows creating, editing, and deleting files +- OFF by default, resets each session +- Delete operations always require user confirmation +- Limited to allowed folders only + DATABASE MODE: - Read-only access (no data modification) - Execute SELECT queries with JOINs, subqueries, CTEs @@ -1354,6 +1407,396 @@ class MCPFilesystemServer: log_mcp_stat('search_files', search_path or "all", False, str(e)) return {'error': error_msg} + # ======================================================================== + # FILE WRITE METHODS + # ======================================================================== + + async def write_file(self, file_path: str, content: str) -> Dict[str, Any]: + """Create a new file or overwrite an existing file with content. + + Note: Ignores .gitignore - allows writing to any file in allowed folders. + """ + try: + path = self.config.normalize_path(file_path) + + # Permission check: must be in allowed folders + if not self.is_allowed_path(path): + error_msg = f"Access denied: {path} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('write_file', str(path.parent), False, error_msg) + return {'error': error_msg} + + # Check if parent directory exists, create if needed + parent_dir = path.parent + if not parent_dir.exists(): + parent_dir.mkdir(parents=True, exist_ok=True) + app_logger.info(f"Created parent directory: {parent_dir}") + + # Determine if creating new file or overwriting + is_new_file = not path.exists() + + # Write content to file + path.write_text(content, encoding='utf-8') + file_size = path.stat().st_size + + app_logger.info(f"{'Created' if is_new_file else 'Updated'} file: {path} ({file_size} bytes)") + log_mcp_stat('write_file', str(path.parent), True) + + return { + 'success': True, + 'path': str(path), + 'size': file_size, + 'created': is_new_file + } + + except PermissionError as e: + error_msg = f"Permission denied writing to {file_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('write_file', file_path, False, str(e)) + return {'error': error_msg, 'error_type': 'PermissionError'} + except UnicodeEncodeError as e: + error_msg = f"Encoding error writing to {file_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('write_file', file_path, False, str(e)) + return {'error': error_msg, 'error_type': 'EncodingError'} + except Exception as e: + error_msg = f"Error writing file {file_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('write_file', file_path, False, str(e)) + return {'error': error_msg} + + async def edit_file(self, file_path: str, old_text: str, new_text: str) -> Dict[str, Any]: + """Find and replace text in an existing file. + + Note: Ignores .gitignore - allows editing any file in allowed folders. + """ + try: + path = self.config.normalize_path(file_path) + + # Permission check + if not self.is_allowed_path(path): + error_msg = f"Access denied: {path} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('edit_file', str(path.parent), False, error_msg) + return {'error': error_msg} + + # File must exist + if not path.exists(): + error_msg = f"File not found: {path}" + app_logger.warning(error_msg) + log_mcp_stat('edit_file', str(path), False, error_msg) + return {'error': error_msg} + + if not path.is_file(): + error_msg = f"Not a file: {path}" + app_logger.warning(error_msg) + log_mcp_stat('edit_file', str(path), False, error_msg) + return {'error': error_msg} + + # Read current content + current_content = path.read_text(encoding='utf-8') + + # Check if old_text exists in file + if old_text not in current_content: + error_msg = f"Text not found in file: '{old_text[:50]}...'" + app_logger.warning(f"Edit failed - text not found in {path}") + log_mcp_stat('edit_file', str(path), False, error_msg) + return {'error': error_msg} + + # Count occurrences + occurrence_count = current_content.count(old_text) + + if occurrence_count > 1: + error_msg = f"Text appears {occurrence_count} times in file. Please provide more context to make the match unique." + app_logger.warning(f"Edit failed - ambiguous match in {path}") + log_mcp_stat('edit_file', str(path), False, error_msg) + return {'error': error_msg} + + # Perform replacement + new_content = current_content.replace(old_text, new_text, 1) + path.write_text(new_content, encoding='utf-8') + + app_logger.info(f"Edited file: {path} (1 replacement)") + log_mcp_stat('edit_file', str(path.parent), True) + + return { + 'success': True, + 'path': str(path), + 'changes': 1 + } + + except PermissionError as e: + error_msg = f"Permission denied editing {file_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('edit_file', file_path, False, str(e)) + return {'error': error_msg, 'error_type': 'PermissionError'} + except UnicodeDecodeError as e: + error_msg = f"Cannot read file (encoding issue): {file_path}" + app_logger.error(error_msg) + log_mcp_stat('edit_file', file_path, False, str(e)) + return {'error': error_msg, 'error_type': 'EncodingError'} + except Exception as e: + error_msg = f"Error editing file {file_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('edit_file', file_path, False, str(e)) + return {'error': error_msg} + + async def delete_file(self, file_path: str, reason: str) -> Dict[str, Any]: + """Delete a file with user confirmation. + + Always requires user to confirm deletion with the LLM's provided reason. + Note: Ignores .gitignore - allows deleting any file in allowed folders. + """ + try: + path = self.config.normalize_path(file_path) + + # Permission check + if not self.is_allowed_path(path): + error_msg = f"Access denied: {path} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('delete_file', str(path.parent), False, error_msg) + return {'error': error_msg} + + # File must exist + if not path.exists(): + error_msg = f"File not found: {path}" + app_logger.warning(error_msg) + log_mcp_stat('delete_file', str(path), False, error_msg) + return {'error': error_msg} + + if not path.is_file(): + error_msg = f"Not a file: {path}" + app_logger.warning(error_msg) + log_mcp_stat('delete_file', str(path), False, error_msg) + return {'error': error_msg} + + # Get file info for confirmation prompt + file_size = path.stat().st_size + file_mtime = datetime.datetime.fromtimestamp(path.stat().st_mtime) + + # User confirmation required + console.print(Panel.fit( + f"[bold yellow]โš ๏ธ DELETE FILE?[/]\n\n" + f"Path: [cyan]{path}[/]\n" + f"Size: {file_size:,} bytes\n" + f"Modified: {file_mtime.strftime('%Y-%m-%d %H:%M:%S')}\n\n" + f"[bold]LLM Reason:[/] {reason}", + title="Delete File Confirmation", + border_style="yellow" + )) + + confirm = typer.confirm("Delete this file?", default=False) + + if not confirm: + app_logger.info(f"User cancelled file deletion: {path}") + log_mcp_stat('delete_file', str(path), False, "User cancelled") + return { + 'success': False, + 'user_cancelled': True, + 'path': str(path) + } + + # Delete the file + path.unlink() + + app_logger.info(f"Deleted file: {path}") + log_mcp_stat('delete_file', str(path.parent), True) + + return { + 'success': True, + 'path': str(path), + 'user_cancelled': False + } + + except PermissionError as e: + error_msg = f"Permission denied deleting {file_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('delete_file', file_path, False, str(e)) + return {'error': error_msg, 'error_type': 'PermissionError'} + except Exception as e: + error_msg = f"Error deleting file {file_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('delete_file', file_path, False, str(e)) + return {'error': error_msg} + + async def create_directory(self, dir_path: str) -> Dict[str, Any]: + """Create a directory (and parent directories if needed). + + Note: Ignores .gitignore - allows creating directories anywhere in allowed folders. + """ + try: + path = self.config.normalize_path(dir_path) + + # Permission check + if not self.is_allowed_path(path): + error_msg = f"Access denied: {path} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('create_directory', str(path.parent), False, error_msg) + return {'error': error_msg} + + # Check if already exists + already_exists = path.exists() + + if already_exists and not path.is_dir(): + error_msg = f"Path exists but is not a directory: {path}" + app_logger.warning(error_msg) + log_mcp_stat('create_directory', str(path), False, error_msg) + return {'error': error_msg} + + # Create directory (and parents) + path.mkdir(parents=True, exist_ok=True) + + if already_exists: + app_logger.info(f"Directory already exists: {path}") + else: + app_logger.info(f"Created directory: {path}") + + log_mcp_stat('create_directory', str(path.parent), True) + + return { + 'success': True, + 'path': str(path), + 'created': not already_exists + } + + except PermissionError as e: + error_msg = f"Permission denied creating directory {dir_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('create_directory', dir_path, False, str(e)) + return {'error': error_msg, 'error_type': 'PermissionError'} + except Exception as e: + error_msg = f"Error creating directory {dir_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('create_directory', dir_path, False, str(e)) + return {'error': error_msg} + + async def move_file(self, source_path: str, dest_path: str) -> Dict[str, Any]: + """Move or rename a file. + + Note: Ignores .gitignore - allows moving any file within allowed folders. + """ + try: + source = self.config.normalize_path(source_path) + dest = self.config.normalize_path(dest_path) + + # Both paths must be in allowed folders + if not self.is_allowed_path(source): + error_msg = f"Access denied: source {source} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('move_file', str(source.parent), False, error_msg) + return {'error': error_msg} + + if not self.is_allowed_path(dest): + error_msg = f"Access denied: destination {dest} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('move_file', str(dest.parent), False, error_msg) + return {'error': error_msg} + + # Source must exist + if not source.exists(): + error_msg = f"Source file not found: {source}" + app_logger.warning(error_msg) + log_mcp_stat('move_file', str(source), False, error_msg) + return {'error': error_msg} + + if not source.is_file(): + error_msg = f"Source is not a file: {source}" + app_logger.warning(error_msg) + log_mcp_stat('move_file', str(source), False, error_msg) + return {'error': error_msg} + + # Create destination parent directory if needed + dest.parent.mkdir(parents=True, exist_ok=True) + + # Move/rename the file + import shutil + shutil.move(str(source), str(dest)) + + app_logger.info(f"Moved file: {source} -> {dest}") + log_mcp_stat('move_file', str(source.parent), True) + + return { + 'success': True, + 'source': str(source), + 'destination': str(dest) + } + + except PermissionError as e: + error_msg = f"Permission denied moving file: {e}" + app_logger.error(error_msg) + log_mcp_stat('move_file', source_path, False, str(e)) + return {'error': error_msg, 'error_type': 'PermissionError'} + except Exception as e: + error_msg = f"Error moving file from {source_path} to {dest_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('move_file', source_path, False, str(e)) + return {'error': error_msg} + + async def copy_file(self, source_path: str, dest_path: str) -> Dict[str, Any]: + """Copy a file to a new location. + + Note: Ignores .gitignore - allows copying any file within allowed folders. + """ + try: + source = self.config.normalize_path(source_path) + dest = self.config.normalize_path(dest_path) + + # Both paths must be in allowed folders + if not self.is_allowed_path(source): + error_msg = f"Access denied: source {source} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('copy_file', str(source.parent), False, error_msg) + return {'error': error_msg} + + if not self.is_allowed_path(dest): + error_msg = f"Access denied: destination {dest} is not in allowed MCP folders" + app_logger.warning(error_msg) + log_mcp_stat('copy_file', str(dest.parent), False, error_msg) + return {'error': error_msg} + + # Source must exist + if not source.exists(): + error_msg = f"Source file not found: {source}" + app_logger.warning(error_msg) + log_mcp_stat('copy_file', str(source), False, error_msg) + return {'error': error_msg} + + if not source.is_file(): + error_msg = f"Source is not a file: {source}" + app_logger.warning(error_msg) + log_mcp_stat('copy_file', str(source), False, error_msg) + return {'error': error_msg} + + # Create destination parent directory if needed + dest.parent.mkdir(parents=True, exist_ok=True) + + # Copy the file + import shutil + shutil.copy2(str(source), str(dest)) + + file_size = dest.stat().st_size + + app_logger.info(f"Copied file: {source} -> {dest} ({file_size} bytes)") + log_mcp_stat('copy_file', str(source.parent), True) + + return { + 'success': True, + 'source': str(source), + 'destination': str(dest), + 'size': file_size + } + + except PermissionError as e: + error_msg = f"Permission denied copying file: {e}" + app_logger.error(error_msg) + log_mcp_stat('copy_file', source_path, False, str(e)) + return {'error': error_msg, 'error_type': 'PermissionError'} + except Exception as e: + error_msg = f"Error copying file from {source_path} to {dest_path}: {e}" + app_logger.error(error_msg) + log_mcp_stat('copy_file', source_path, False, str(e)) + return {'error': error_msg} + # ======================================================================== # SQLite DATABASE METHODS # ======================================================================== @@ -1710,6 +2153,7 @@ class MCPManager: def __init__(self): self.enabled = False + self.write_enabled = False # Write mode off by default (non-persistent) self.mode = "files" # "files" or "database" self.selected_db_index = None @@ -1864,12 +2308,57 @@ class MCPManager: 'error': str(e) } + def enable_write(self) -> None: + """Enable write mode with user confirmation.""" + if not self.enabled: + console.print("[bold red]Error:[/] MCP must be enabled first. Use '/mcp on'") + app_logger.warning("Attempted to enable write mode without MCP enabled") + return + + # Show warning panel + console.print(Panel.fit( + "[bold yellow]โš ๏ธ WRITE MODE WARNING[/]\n\n" + "Enabling write mode allows the LLM to:\n" + "โ€ข Create new files\n" + "โ€ข Modify existing files\n" + "โ€ข Delete files (with confirmation)\n" + "โ€ข Create directories\n" + "โ€ข Move and copy files\n\n" + "[dim]Operations are limited to your allowed MCP folders.[/]\n" + "[dim]Deletions will always require your confirmation.[/]\n\n" + "[bold red]Use with caution. Review LLM changes carefully.[/]", + title="MCP Write Mode", + border_style="yellow" + )) + + confirm = typer.confirm("Enable write mode?", default=False) + + if confirm: + self.write_enabled = True + console.print("[bold green]โœ“ Write mode enabled[/]") + app_logger.info("MCP write mode enabled by user") + log_mcp_stat('write_mode_enabled', '', True) + else: + console.print("[yellow]Cancelled. Write mode remains disabled.[/]") + app_logger.info("User cancelled write mode enablement") + + def disable_write(self) -> None: + """Disable write mode.""" + if not self.write_enabled: + console.print("[dim]Write mode is already disabled.[/]") + return + + self.write_enabled = False + console.print("[bold green]โœ“ Write mode disabled[/]") + app_logger.info("MCP write mode disabled") + log_mcp_stat('write_mode_disabled', '', True) + def switch_mode(self, new_mode: str, db_index: Optional[int] = None) -> Dict[str, Any]: """Switch between files and database mode.""" if not self.enabled: return { 'success': False, - 'error': 'MCP is not enabled. Use /mcp enable first' + 'error': 'MCP is not enabled. Use /mcp on first' } if new_mode == "files": @@ -1918,7 +2407,7 @@ class MCPManager: if not self.enabled: return { 'success': False, - 'error': 'MCP is not enabled. Use /mcp enable first' + 'error': 'MCP is not enabled. Use /mcp on first' } try: @@ -1998,7 +2487,7 @@ class MCPManager: if not self.enabled: return { 'success': False, - 'error': 'MCP is not enabled. Use /mcp enable first' + 'error': 'MCP is not enabled. Use /mcp on first' } try: @@ -2055,7 +2544,7 @@ class MCPManager: if not self.enabled: return { 'success': False, - 'error': 'MCP is not enabled. Use /mcp enable first' + 'error': 'MCP is not enabled. Use /mcp on first' } try: @@ -2320,6 +2809,7 @@ class MCPManager: return { 'success': True, 'enabled': self.enabled, + 'write_enabled': self.write_enabled, 'uptime': uptime, 'mode_info': mode_info, 'folder_count': len(self.allowed_folders), @@ -2358,8 +2848,15 @@ class MCPManager: 'error': 'MCP is not enabled' } + # Check write permissions for write operations + write_tools = {'write_file', 'edit_file', 'delete_file', 'create_directory', 'move_file', 'copy_file'} + if tool_name in write_tools and not self.write_enabled: + return { + 'error': 'Write operations disabled. Enable with /mcp write on' + } + try: - # File mode tools + # File mode tools (read-only) if tool_name == 'read_file': return await self.server.read_file(kwargs.get('file_path', '')) elif tool_name == 'list_directory': @@ -2402,6 +2899,38 @@ class MCPManager: kwargs.get('query', ''), kwargs.get('limit') ) + + # Write mode tools + elif tool_name == 'write_file': + return await self.server.write_file( + kwargs.get('file_path', ''), + kwargs.get('content', '') + ) + elif tool_name == 'edit_file': + return await self.server.edit_file( + kwargs.get('file_path', ''), + kwargs.get('old_text', ''), + kwargs.get('new_text', '') + ) + elif tool_name == 'delete_file': + return await self.server.delete_file( + kwargs.get('file_path', ''), + kwargs.get('reason', 'No reason provided') + ) + elif tool_name == 'create_directory': + return await self.server.create_directory( + kwargs.get('dir_path', '') + ) + elif tool_name == 'move_file': + return await self.server.move_file( + kwargs.get('source_path', ''), + kwargs.get('dest_path', '') + ) + elif tool_name == 'copy_file': + return await self.server.copy_file( + kwargs.get('source_path', ''), + kwargs.get('dest_path', '') + ) else: return { 'error': f'Unknown tool: {tool_name}' @@ -2431,7 +2960,8 @@ class MCPManager: allowed_dirs_str = ", ".join(str(f) for f in self.allowed_folders) - return [ + # Base read-only tools + tools = [ { "type": "function", "function": { @@ -2498,6 +3028,139 @@ class MCPManager: } } ] + + # Add write tools if write mode is enabled + if self.write_enabled: + tools.extend([ + { + "type": "function", + "function": { + "name": "write_file", + "description": f"Create a new file or overwrite an existing file with the specified content. Works within allowed directories: {allowed_dirs_str}. Ignores .gitignore patterns - can write to any file in allowed folders. Automatically creates parent directories if needed.", + "parameters": { + "type": "object", + "properties": { + "file_path": { + "type": "string", + "description": "Full path to the file to create or overwrite (e.g., /Users/username/project/src/main.py)" + }, + "content": { + "type": "string", + "description": "The complete content to write to the file" + } + }, + "required": ["file_path", "content"] + } + } + }, + { + "type": "function", + "function": { + "name": "edit_file", + "description": f"Make targeted edits to an existing file by finding and replacing specific text. The old_text must match exactly and appear only once in the file. For multiple matches, provide more context to make the match unique. Works within allowed directories: {allowed_dirs_str}. Ignores .gitignore patterns.", + "parameters": { + "type": "object", + "properties": { + "file_path": { + "type": "string", + "description": "Full path to the file to edit" + }, + "old_text": { + "type": "string", + "description": "The exact text to find and replace. Must match exactly and appear only once. Include enough context to make the match unique." + }, + "new_text": { + "type": "string", + "description": "The new text to replace the old text with" + } + }, + "required": ["file_path", "old_text", "new_text"] + } + } + }, + { + "type": "function", + "function": { + "name": "delete_file", + "description": f"Delete a file from the filesystem. ALWAYS requires user confirmation before deletion. The user will see the file path, size, modification time, and your reason before deciding. Works within allowed directories: {allowed_dirs_str}. Ignores .gitignore patterns.", + "parameters": { + "type": "object", + "properties": { + "file_path": { + "type": "string", + "description": "Full path to the file to delete" + }, + "reason": { + "type": "string", + "description": "Clear explanation for why this file should be deleted. The user will see this reason in the confirmation prompt." + } + }, + "required": ["file_path", "reason"] + } + } + }, + { + "type": "function", + "function": { + "name": "create_directory", + "description": f"Create a new directory (and all parent directories if needed). If the directory already exists, returns success without error. Works within allowed directories: {allowed_dirs_str}. Ignores .gitignore patterns.", + "parameters": { + "type": "object", + "properties": { + "dir_path": { + "type": "string", + "description": "Full path to the directory to create (e.g., /Users/username/project/src/components)" + } + }, + "required": ["dir_path"] + } + } + }, + { + "type": "function", + "function": { + "name": "move_file", + "description": f"Move or rename a file. Both source and destination must be within allowed directories: {allowed_dirs_str}. Creates destination parent directories if needed. Ignores .gitignore patterns.", + "parameters": { + "type": "object", + "properties": { + "source_path": { + "type": "string", + "description": "Full path to the file to move/rename" + }, + "dest_path": { + "type": "string", + "description": "Full path for the new location/name" + } + }, + "required": ["source_path", "dest_path"] + } + } + }, + { + "type": "function", + "function": { + "name": "copy_file", + "description": f"Copy a file to a new location. Both source and destination must be within allowed directories: {allowed_dirs_str}. Creates destination parent directories if needed. Preserves file metadata. Ignores .gitignore patterns.", + "parameters": { + "type": "object", + "properties": { + "source_path": { + "type": "string", + "description": "Full path to the file to copy" + }, + "dest_path": { + "type": "string", + "description": "Full path for the copy destination" + } + }, + "required": ["source_path", "dest_path"] + } + } + } + ]) + + return tools def _get_database_tools_schema(self) -> List[Dict[str, Any]]: """Get database mode tools schema.""" @@ -2951,6 +3614,7 @@ def display_paginated_table(table: Table, title: str): header_lines = [] data_lines = [] + footer_line = [] header_end_index = 0 found_header_text = False @@ -2970,6 +3634,14 @@ def display_paginated_table(table: Table, title: str): header_end_index = i break + # Extract footer (bottom border) - it's the last line of the table + if all_lines: + last_line_text = ''.join(seg.text for seg in all_lines[-1]) + # Check if last line is a border line (contains box drawing characters) + if any(char in last_line_text for char in ['โ”€', 'โ”', 'โ”ด', 'โ•ง', 'โ”˜', 'โ””']): + footer_line = all_lines[-1] + all_lines = all_lines[:-1] # Remove footer from all_lines + if header_end_index > 0: header_lines = all_lines[:header_end_index + 1] data_lines = all_lines[header_end_index + 1:] @@ -2999,6 +3671,12 @@ def display_paginated_table(table: Table, title: str): console.print(segment.text, style=segment.style, end="") console.print() + # Add footer (bottom border) on each page + if footer_line: + for segment in footer_line: + console.print(segment.text, style=segment.style, end="") + console.print() + current_line = end_line page_number += 1 @@ -3089,7 +3767,21 @@ def chat(): if not selected_model: console.print("[bold yellow]No model selected. Use '/model'.[/]") - session = PromptSession(history=FileHistory(str(history_file))) + # Create custom key bindings to add tab support for autocomplete + kb = KeyBindings() + + @kb.add('tab') + def _(event): + """Accept suggestion with tab key.""" + b = event.current_buffer + suggestion = b.suggestion + if suggestion and b.document.is_cursor_at_the_end: + b.insert_text(suggestion.text) + + session = PromptSession( + history=FileHistory(str(history_file)), + key_bindings=kb + ) while True: try: @@ -3099,7 +3791,10 @@ def chat(): prompt_prefix = "You> " if mcp_manager.enabled: if mcp_manager.mode == "files": - prompt_prefix = "[๐Ÿ”ง MCP: Files] You> " + if mcp_manager.write_enabled: + prompt_prefix = "[๐Ÿ”งโœ๏ธ MCP: Files+Write] You> " + else: + prompt_prefix = "[๐Ÿ”ง MCP: Files] You> " elif mcp_manager.mode == "database" and mcp_manager.selected_db_index is not None: db = mcp_manager.databases[mcp_manager.selected_db_index] prompt_prefix = f"[๐Ÿ—„๏ธ MCP: DB #{mcp_manager.selected_db_index + 1}] You> " @@ -3141,7 +3836,7 @@ def chat(): mcp_command = parts[0].lower() if parts else "" mcp_args = parts[1] if len(parts) > 1 else "" - if mcp_command == "enable": + if mcp_command in ["enable", "on"]: result = mcp_manager.enable() if result['success']: console.print(f"[bold green]โœ“ {result['message']}[/]") @@ -3162,7 +3857,7 @@ def chat(): console.print(f"[bold red]โŒ {result['error']}[/]") continue - elif mcp_command == "disable": + elif mcp_command in ["disable", "off"]: result = mcp_manager.disable() if result['success']: console.print(f"[bold green]โœ“ {result['message']}[/]") @@ -3472,7 +4167,7 @@ def chat(): console.print(f"[bold blue].gitignore filtering: {status}[/]") console.print(f"[dim cyan]Loaded {pattern_count} pattern(s) from .gitignore files[/]") else: - console.print("[bold yellow]MCP is not enabled. Enable with /mcp enable[/]") + console.print("[bold yellow]MCP is not enabled. Enable with /mcp on[/]") continue if mcp_args.lower() == "on": @@ -3493,6 +4188,28 @@ def chat(): console.print("[bold yellow]Usage: /mcp gitignore on|off[/]") continue + elif mcp_command == "write": + if not mcp_args: + # Show current write mode status + if mcp_manager.enabled: + status = "[bold green]Enabled โš ๏ธ[/]" if mcp_manager.write_enabled else "[dim]Disabled[/]" + console.print(f"[bold blue]Write Mode:[/] {status}") + if mcp_manager.write_enabled: + console.print("[dim cyan]LLM can create, modify, and delete files in allowed folders[/]") + else: + console.print("[dim cyan]Use '/mcp write on' to enable write operations[/]") + else: + console.print("[bold yellow]MCP is not enabled. Enable with /mcp on[/]") + continue + + if mcp_args.lower() == "on": + mcp_manager.enable_write() + elif mcp_args.lower() == "off": + mcp_manager.disable_write() + else: + console.print("[bold yellow]Usage: /mcp write on|off[/]") + continue + elif mcp_command == "status": result = mcp_manager.get_status() @@ -3529,6 +4246,10 @@ def chat(): if result['gitignore_patterns'] > 0: table.add_row(".gitignore Patterns", str(result['gitignore_patterns'])) + # Write mode status + write_status = "[green]Enabled โš ๏ธ[/]" if result['write_enabled'] else "[dim]Disabled[/]" + table.add_row("Write Mode", write_status) + if result['enabled']: table.add_row("", "") table.add_row("[bold]Tools Available[/]", "") @@ -3581,8 +4302,8 @@ def chat(): else: console.print("[bold cyan]MCP (Model Context Protocol) Commands:[/]\n") - console.print(" [bold yellow]/mcp enable[/] - Start MCP server") - console.print(" [bold yellow]/mcp disable[/] - Stop MCP server") + console.print(" [bold yellow]/mcp on[/] - Start MCP server") + console.print(" [bold yellow]/mcp off[/] - Stop MCP server") console.print(" [bold yellow]/mcp status[/] - Show comprehensive MCP status") console.print("") console.print(" [bold cyan]FILE MODE (default):[/]") @@ -4390,14 +5111,14 @@ def chat(): "" ) help_table.add_row( - "/mcp enable", + "/mcp on", "Start MCP server for file and database access.", - "/mcp enable" + "/mcp on" ) help_table.add_row( - "/mcp disable", + "/mcp off", "Stop MCP server.", - "/mcp disable" + "/mcp off" ) help_table.add_row( "/mcp status", @@ -4449,6 +5170,11 @@ def chat(): "Toggle .gitignore filtering (respects project excludes).", "/mcp gitignore on" ) + help_table.add_row( + "/mcp write [on|off]", + "Enable/disable write mode (create, edit, delete files). Requires confirmation.", + "/mcp write on" + ) # MODEL COMMANDS help_table.add_row( @@ -5111,11 +5837,14 @@ All queries are read-only. INSERT/UPDATE/DELETE are not allowed.""" continue # Now it's safe to continue else: full_response = response.choices[0].message.content if response.choices else "" - console.print(f"\r{' ' * 50}\r", end="") + # Clear any processing messages before showing response + console.print(f"\r{' ' * 100}\r", end="") if full_response: if not is_streaming: md = Markdown(full_response) + # Add newline before Panel to ensure clean rendering + console.print() console.print(Panel(md, title="[bold green]AI Response[/]", title_align="left", border_style="green")) session_history.append({'prompt': user_input, 'response': full_response})