Files
oai-web/server/web/routes.py
T
2026-04-28 09:41:56 +02:00

3986 lines
151 KiB
Python

"""
web/routes.py — REST API routes for credentials, audit log, agents, settings, etc.
All database calls are async (asyncpg).
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from ..agents import tasks as agent_store
from ..agents.runner import agent_runner
from ..audit import audit_log
from ..config import settings
from ..database import credential_store, email_whitelist_store, filesystem_whitelist_store, web_whitelist_store
router = APIRouter()
# ── Credentials ───────────────────────────────────────────────────────────────
class CredentialIn(BaseModel):
key: str
value: str
description: str = ""
@router.get("/credentials")
async def list_credentials(request: Request):
_require_admin(request)
return await credential_store.list_keys()
@router.get("/credentials/{key:path}")
async def get_credential(request: Request, key: str):
_require_admin(request)
value = await credential_store.get(key)
if value is None:
raise HTTPException(status_code=404, detail=f"Credential '{key}' not found.")
return {"key": key, "value": value}
@router.post("/credentials")
async def set_credential(request: Request, body: CredentialIn):
_require_admin(request)
await credential_store.set(body.key, body.value, body.description)
return {"ok": True, "key": body.key}
@router.delete("/credentials/{key:path}")
async def delete_credential(request: Request, key: str):
_require_admin(request)
if key == "system:paused":
raise HTTPException(status_code=400, detail="Cannot delete kill-switch key via this endpoint.")
deleted = await credential_store.delete(key)
if not deleted:
raise HTTPException(status_code=404, detail=f"Credential '{key}' not found.")
return {"ok": True, "key": key}
# ── Audit log ─────────────────────────────────────────────────────────────────
@router.get("/audit")
async def query_audit(
request: Request,
start: str = "",
end: str = "",
tool_name: str = "",
session_id: str = "",
task_id: str = "",
confirmed_only: bool = False,
page: int = 1,
per_page: int = 50,
):
user = _require_auth(request)
# Non-admins can only see their own audit entries
force_user_id = None if user.is_admin else user.id
offset = (page - 1) * per_page
entries, total = await asyncio_gather(
audit_log.query(
start=start or None,
end=end or None,
tool_name=tool_name or None,
session_id=session_id or None,
task_id=task_id or None,
confirmed_only=confirmed_only,
user_id=force_user_id,
limit=per_page,
offset=offset,
),
audit_log.count(
start=start or None,
end=end or None,
tool_name=tool_name or None,
task_id=task_id or None,
session_id=session_id or None,
confirmed_only=confirmed_only,
user_id=force_user_id,
),
)
return {
"entries": [
{
"id": e.id,
"timestamp": e.timestamp,
"session_id": e.session_id,
"tool_name": e.tool_name,
"arguments": e.arguments,
"result_summary": e.result_summary,
"confirmed": e.confirmed,
"task_id": e.task_id,
}
for e in entries
],
"total": total,
"page": page,
"per_page": per_page,
"pages": max(1, -(-total // per_page)),
}
@router.delete("/audit")
async def clear_audit(request: Request, older_than_days: int = 0):
_require_admin(request)
deleted = await audit_log.purge(older_than_days=older_than_days if older_than_days > 0 else None)
return {"deleted": deleted}
@router.get("/settings/audit-retention")
async def get_audit_retention(request: Request):
_require_admin(request)
days = await credential_store.get("system:audit_retention_days")
return {"days": int(days) if days else 0}
@router.post("/settings/audit-retention")
async def set_audit_retention(request: Request, body: dict):
_require_admin(request)
days = int(body.get("days", 0))
if days > 0:
await credential_store.set(
"system:audit_retention_days", str(days),
description="Audit log retention in days (0 = keep forever)",
)
else:
await credential_store.delete("system:audit_retention_days")
return {"days": days}
# ── Email whitelist ────────────────────────────────────────────────────────────
class EmailWhitelistEntry(BaseModel):
email: str
daily_limit: int = 0
@router.get("/email-whitelist")
async def list_email_whitelist(request: Request):
_require_admin(request)
return await email_whitelist_store.list()
@router.post("/email-whitelist")
async def add_email_whitelist(request: Request, body: EmailWhitelistEntry):
_require_admin(request)
email = body.email.strip().lower()
if not email or "@" not in email:
raise HTTPException(status_code=400, detail="Invalid email address")
if body.daily_limit < 0:
raise HTTPException(status_code=400, detail="daily_limit must be >= 0")
await email_whitelist_store.add(email, body.daily_limit)
return {"ok": True, "email": email, "daily_limit": body.daily_limit}
@router.delete("/email-whitelist/{email:path}")
async def delete_email_whitelist(request: Request, email: str):
_require_admin(request)
deleted = await email_whitelist_store.remove(email)
if not deleted:
raise HTTPException(status_code=404, detail=f"Email '{email}' not in whitelist")
return {"ok": True, "email": email}
# ── Web whitelist ─────────────────────────────────────────────────────────────
class WebWhitelistEntry(BaseModel):
domain: str
note: str = ""
@router.get("/web-whitelist")
async def list_web_whitelist(request: Request):
_require_admin(request)
return await web_whitelist_store.list()
@router.post("/web-whitelist")
async def add_web_whitelist(request: Request, body: WebWhitelistEntry):
_require_admin(request)
domain = body.domain.strip()
if not domain:
raise HTTPException(status_code=400, detail="domain is required")
await web_whitelist_store.add(domain, body.note)
return {"ok": True, "domain": domain}
@router.delete("/web-whitelist/{domain:path}")
async def delete_web_whitelist(request: Request, domain: str):
_require_admin(request)
deleted = await web_whitelist_store.remove(domain)
if not deleted:
raise HTTPException(status_code=404, detail=f"Domain '{domain}' not in whitelist")
return {"ok": True, "domain": domain}
# ── Filesystem whitelist ──────────────────────────────────────────────────────
class FilesystemWhitelistEntry(BaseModel):
path: str
note: str = ""
@router.get("/filesystem-whitelist")
async def list_filesystem_whitelist(request: Request):
_require_admin(request)
return await filesystem_whitelist_store.list()
@router.post("/filesystem-whitelist")
async def add_filesystem_whitelist(request: Request, body: FilesystemWhitelistEntry):
_require_admin(request)
from pathlib import Path
path = body.path.strip()
if not path:
raise HTTPException(status_code=400, detail="path is required")
if not Path(path).is_absolute():
raise HTTPException(status_code=400, detail="path must be absolute (e.g. /home/rune/documents)")
if not Path(path).exists():
raise HTTPException(status_code=400, detail=f"path does not exist: {path}")
if not Path(path).is_dir():
raise HTTPException(status_code=400, detail=f"path is not a directory: {path}")
await filesystem_whitelist_store.add(path, body.note)
return {"ok": True, "path": path}
@router.delete("/filesystem-whitelist/{path:path}")
async def delete_filesystem_whitelist(request: Request, path: str):
_require_admin(request)
deleted = await filesystem_whitelist_store.remove(path)
if not deleted:
raise HTTPException(status_code=404, detail=f"Path '{path}' not in whitelist")
return {"ok": True, "path": path}
@router.get("/filesystem-browser")
async def filesystem_browser(request: Request, path: str = "/"):
_require_admin(request)
from pathlib import Path
p = Path(path)
if not p.is_absolute():
raise HTTPException(status_code=400, detail="path must be absolute")
if not p.exists() or not p.is_dir():
raise HTTPException(status_code=404, detail=f"Directory not found: {path}")
entries = []
try:
for entry in sorted(p.iterdir()):
try:
if entry.is_dir() and not entry.is_symlink():
entries.append({"name": entry.name, "path": str(entry)})
except PermissionError:
continue
except PermissionError:
pass
parent = str(p.parent) if p != p.parent else None
return {"path": str(p), "parent": parent, "entries": entries[:300]}
# ── Models ────────────────────────────────────────────────────────────────────
@router.get("/models")
async def list_models(request: Request):
from ..providers.models import get_available_models, get_access_tier
from ..providers.registry import get_available_providers
user = request.state.current_user
user_id = user.id if user else None
is_admin = user.is_admin if user else True
models, default = await get_available_models(user_id=user_id, is_admin=is_admin)
access = await get_access_tier(user_id=user_id, is_admin=is_admin)
return {
"models": models,
"default": default,
"providers": await get_available_providers(user_id=user_id),
"access": access,
}
@router.get("/models/info")
async def models_info(request: Request):
from ..providers.models import get_models_info
user = request.state.current_user
user_id = user.id if user else None
is_admin = user.is_admin if user else True
return await get_models_info(user_id=user_id, is_admin=is_admin)
# ── Runtime limits ────────────────────────────────────────────────────────────
class LimitsIn(BaseModel):
max_tool_calls: Optional[int] = None
max_autonomous_runs_per_hour: Optional[int] = None
max_concurrent_runs: Optional[int] = None
class ProxyTrustIn(BaseModel):
trusted_ips: str
@router.get("/settings/limits")
async def get_limits(request: Request):
_require_admin(request)
async def _get(key: str, default: int) -> int:
v = await credential_store.get(key)
try:
return int(v) if v else default
except (ValueError, TypeError):
return default
mtc, mar, mcr = await asyncio_gather(
_get("system:max_tool_calls", settings.max_tool_calls),
_get("system:max_autonomous_runs_per_hour", settings.max_autonomous_runs_per_hour),
_get("system:max_concurrent_runs", 3),
)
return {
"max_tool_calls": mtc,
"max_autonomous_runs_per_hour": mar,
"max_concurrent_runs": mcr,
"defaults": {
"max_tool_calls": settings.max_tool_calls,
"max_autonomous_runs_per_hour": settings.max_autonomous_runs_per_hour,
"max_concurrent_runs": 3,
},
}
@router.post("/settings/limits")
async def set_limits(request: Request, body: LimitsIn):
_require_admin(request)
if body.max_tool_calls is not None:
if body.max_tool_calls < 1:
raise HTTPException(status_code=400, detail="max_tool_calls must be >= 1")
await credential_store.set(
"system:max_tool_calls", str(body.max_tool_calls),
"Max tool calls per agent run",
)
if body.max_autonomous_runs_per_hour is not None:
if body.max_autonomous_runs_per_hour < 1:
raise HTTPException(status_code=400, detail="max_autonomous_runs_per_hour must be >= 1")
await credential_store.set(
"system:max_autonomous_runs_per_hour", str(body.max_autonomous_runs_per_hour),
"Max autonomous scheduler runs per hour",
)
if body.max_concurrent_runs is not None:
if body.max_concurrent_runs < 1:
raise HTTPException(status_code=400, detail="max_concurrent_runs must be >= 1")
await credential_store.set(
"system:max_concurrent_runs", str(body.max_concurrent_runs),
"Max concurrent agent runs",
)
return await get_limits()
@router.get("/queue")
async def get_queue_status(request: Request):
"""Return current run queue status."""
_require_auth(request)
return agent_runner.queue_status
# ── Browser trusted domains ───────────────────────────────────────────────────
class BrowserDomainIn(BaseModel):
domain: str
note: Optional[str] = None
@router.get("/my/browser-trusted")
async def list_browser_trusted(request: Request):
from ..database import get_pool as _gp
user = _require_auth(request)
pool = await _gp()
rows = await pool.fetch(
"SELECT id, domain, note, created_at FROM browser_approved_domains "
"WHERE owner_user_id = $1 ORDER BY domain",
user.id,
)
return [dict(r) for r in rows]
@router.post("/my/browser-trusted")
async def add_browser_trusted(request: Request, body: BrowserDomainIn):
from ..database import get_pool as _gp
user = _require_auth(request)
domain = body.domain.lower().strip().lstrip("*.")
if not domain:
raise HTTPException(status_code=400, detail="Invalid domain")
pool = await _gp()
try:
await pool.execute(
"INSERT INTO browser_approved_domains (owner_user_id, domain, note) "
"VALUES ($1, $2, $3) ON CONFLICT (owner_user_id, domain) DO NOTHING",
user.id, domain, body.note or None,
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return {"ok": True, "domain": domain}
@router.delete("/my/browser-trusted/{domain:path}")
async def remove_browser_trusted(request: Request, domain: str):
from ..database import get_pool as _gp
user = _require_auth(request)
domain = domain.lower().strip().lstrip("*.")
pool = await _gp()
await pool.execute(
"DELETE FROM browser_approved_domains WHERE owner_user_id = $1 AND domain = $2",
user.id, domain,
)
return {"ok": True}
@router.get("/settings/provider")
async def get_default_provider(request: Request):
_require_admin(request)
from ..providers.registry import get_available_providers
from ..config import settings as _settings
val = await credential_store.get("system:default_provider")
available = await get_available_providers()
current = val or _settings.default_provider
# If the saved provider no longer has a key, fall back to the first available
if current not in available and available:
current = available[0]
return {"default_provider": current, "available_providers": available}
class ProviderIn(BaseModel):
default_provider: str
@router.post("/settings/provider")
async def set_default_provider(request: Request, body: ProviderIn):
_require_admin(request)
if body.default_provider not in {"anthropic", "openrouter", "openai"}:
raise HTTPException(status_code=400, detail="Invalid provider. Use: anthropic, openrouter, openai")
await credential_store.set("system:default_provider", body.default_provider, "Default AI provider")
return {"default_provider": body.default_provider}
@router.get("/settings/default-models")
async def get_default_models(request: Request):
_require_admin(request)
return {
"default_model": await credential_store.get("system:default_chat_model") or "",
"free_tier_model": await credential_store.get("system:default_chat_model_free") or "",
}
class DefaultModelsIn(BaseModel):
default_model: Optional[str] = None
free_tier_model: Optional[str] = None
@router.post("/settings/default-models")
async def set_default_models(request: Request, body: DefaultModelsIn):
_require_admin(request)
if body.default_model is not None:
if body.default_model.strip():
await credential_store.set(
"system:default_chat_model", body.default_model.strip(),
"Default chat model for all users",
)
else:
await credential_store.delete("system:default_chat_model")
if body.free_tier_model is not None:
if body.free_tier_model.strip():
await credential_store.set(
"system:default_chat_model_free", body.free_tier_model.strip(),
"Default chat model for free-tier users (no own API key)",
)
else:
await credential_store.delete("system:default_chat_model_free")
return await get_default_models()
@router.get("/settings/proxy-trust")
async def get_proxy_trust(request: Request):
_require_admin(request)
return {
"trusted_ips": await credential_store.get("system:trusted_proxy_ips") or "127.0.0.1",
}
@router.post("/settings/proxy-trust")
async def set_proxy_trust(request: Request, body: ProxyTrustIn):
_require_admin(request)
await credential_store.set(
"system:trusted_proxy_ips", body.trusted_ips.strip(),
"Trusted reverse proxy IPs for X-Forwarded-For headers",
)
return {"trusted_ips": body.trusted_ips.strip()}
class UserBaseFolderIn(BaseModel):
path: str = ""
@router.get("/settings/users-base-folder")
async def get_users_base_folder(request: Request):
_require_admin(request)
return {"path": await credential_store.get("system:users_base_folder") or ""}
@router.post("/settings/users-base-folder")
async def set_users_base_folder(request: Request, body: UserBaseFolderIn):
_require_admin(request)
path = body.path.strip()
if path:
from pathlib import Path as _Path
p = _Path(path)
if not p.is_absolute():
raise HTTPException(status_code=400, detail="Path must be absolute.")
if not p.exists():
raise HTTPException(status_code=400, detail="Path does not exist on the server.")
await credential_store.set("system:users_base_folder", path, "Base folder for per-user subfolders")
# Provision folders for all existing users
from ..users import list_users, _provision_user_folder
for u in await list_users():
await _provision_user_folder(u["username"])
else:
await credential_store.delete("system:users_base_folder")
return {"path": path}
# ── Admin CalDAV / CardDAV settings ──────────────────────────────────────────
@router.get("/settings/caldav")
async def get_admin_caldav(request: Request):
_require_admin(request)
get = credential_store.get
return {
"host": await get("mailcow_host") or "",
"username": await get("mailcow_username") or "",
"password_set": bool(await get("mailcow_password")),
"calendar_name": await get("caldav_calendar_name") or "",
"contacts_allow_write": (await get("contacts:allow_write")) == "1",
"carddav_same_as_caldav": (await get("carddav_same_as_caldav")) == "1",
"carddav_url": await get("carddav_url") or "",
"carddav_username": await get("carddav_username") or "",
"carddav_password_set": bool(await get("carddav_password")),
"imap_host": await get("mailcow_imap_host") or "",
"smtp_host": await get("mailcow_smtp_host") or "",
"smtp_port": await get("mailcow_smtp_port") or "",
}
@router.post("/settings/caldav")
async def set_admin_caldav(request: Request):
_require_admin(request)
body = await request.json()
async def _set(key, val, desc=""):
val = (val or "").strip()
if val:
await credential_store.set(key, val, desc)
else:
await credential_store.delete(key)
async def _set_bool(key, val):
if val:
await credential_store.set(key, "1")
else:
await credential_store.delete(key)
await _set("mailcow_host", body.get("host"), "Mailcow hostname")
await _set("mailcow_username", body.get("username"), "Mailcow username")
# Only update password if a new value is provided
pwd = (body.get("password") or "").strip()
if pwd:
await credential_store.set("mailcow_password", pwd, "Mailcow password")
await _set("caldav_calendar_name", body.get("calendar_name"), "Default calendar name")
await _set_bool("contacts:allow_write", body.get("contacts_allow_write"))
same = bool(body.get("carddav_same_as_caldav"))
await _set_bool("carddav_same_as_caldav", same)
if same:
for k in ("carddav_url", "carddav_username", "carddav_password"):
await credential_store.delete(k)
else:
await _set("carddav_url", body.get("carddav_url"), "CardDAV server URL")
await _set("carddav_username", body.get("carddav_username"), "CardDAV username")
cpwd = (body.get("carddav_password") or "").strip()
if cpwd:
await credential_store.set("carddav_password", cpwd, "CardDAV password")
# Email tool overrides (optional host overrides for IMAP/SMTP)
await _set("mailcow_imap_host", body.get("imap_host"), "IMAP host override")
await _set("mailcow_smtp_host", body.get("smtp_host"), "SMTP host override")
await _set("mailcow_smtp_port", body.get("smtp_port"), "SMTP port override")
return {"ok": True}
# ── Admin Pushover settings ───────────────────────────────────────────────────
@router.get("/settings/pushover")
async def get_admin_pushover(request: Request):
_require_admin(request)
return {
"app_token_set": bool(await credential_store.get("pushover_app_token")),
"user_key_set": bool(await credential_store.get("pushover_user_key")),
}
@router.post("/settings/pushover")
async def set_admin_pushover(request: Request):
_require_admin(request)
body = await request.json()
for field, key in [("app_token", "pushover_app_token"), ("user_key", "pushover_user_key")]:
val = (body.get(field) or "").strip()
if val:
await credential_store.set(key, val)
# Never clear on empty — must explicitly use delete endpoint
return {"ok": True}
# ── User Pushover settings ────────────────────────────────────────────────────
@router.get("/my/pushover")
async def get_my_pushover(request: Request):
_require_auth(request)
user = request.state.current_user
app_ok = bool(await credential_store.get("pushover_app_token"))
user_key = await _user_settings_store.get(user["id"], "pushover_user_key")
return {"app_token_configured": app_ok, "user_key_set": bool(user_key)}
@router.post("/my/pushover")
async def set_my_pushover(request: Request):
_require_auth(request)
user = request.state.current_user
body = await request.json()
key = (body.get("user_key") or "").strip()
if key:
await _user_settings_store.set(user["id"], "pushover_user_key", key)
return {"ok": True}
@router.delete("/my/pushover")
async def delete_my_pushover(request: Request):
_require_auth(request)
user = request.state.current_user
await _user_settings_store.delete(user["id"], "pushover_user_key")
return {"ok": True}
# ── Agents ────────────────────────────────────────────────────────────────────
class AgentIn(BaseModel):
name: str
prompt: str
model: str
description: str = ""
can_create_subagents: bool = False
allowed_tools: Optional[list[str]] = None
schedule: Optional[str] = None
enabled: bool = True
max_tool_calls: Optional[int] = None
prompt_mode: str = "combined"
class AgentUpdate(BaseModel):
name: Optional[str] = None
prompt: Optional[str] = None
model: Optional[str] = None
description: Optional[str] = None
can_create_subagents: Optional[bool] = None
allowed_tools: Optional[list[str]] = None
schedule: Optional[str] = None
enabled: Optional[bool] = None
max_tool_calls: Optional[int] = None
prompt_mode: Optional[str] = None
@router.get("/agents")
async def list_agents(request: Request):
user = _require_auth(request)
owner_filter = None if user.is_admin else user.id
agents = await agent_store.list_agents(owner_user_id=owner_filter)
# Exclude email handling agents — they are managed via Email Accounts settings
from ..database import get_pool as _gp
pool = await _gp()
handler_ids = {str(r["agent_id"]) for r in await pool.fetch(
"SELECT agent_id FROM email_accounts WHERE agent_id IS NOT NULL"
)}
return [a for a in agents if a["id"] not in handler_ids]
@router.post("/agents", status_code=201)
async def create_agent(request: Request, body: AgentIn):
user = _require_auth(request)
# Stamp owner; synthetic API-key admin gets NULL (no real user row)
from ..auth import SYNTHETIC_API_ADMIN
owner_user_id = None if user.id == SYNTHETIC_API_ADMIN.id else user.id
agent = await agent_store.create_agent(
name=body.name,
prompt=body.prompt,
model=body.model,
description=body.description,
can_create_subagents=body.can_create_subagents,
allowed_tools=body.allowed_tools or [],
schedule=body.schedule,
enabled=body.enabled,
max_tool_calls=body.max_tool_calls,
prompt_mode=body.prompt_mode,
owner_user_id=owner_user_id,
)
agent_runner.reschedule(agent)
return agent
@router.get("/agents/{agent_id}")
async def get_agent(request: Request, agent_id: str):
user = _require_auth(request)
agent = await agent_store.get_agent(agent_id)
return _check_agent_access(agent, user)
@router.put("/agents/{agent_id}")
async def update_agent(request: Request, agent_id: str, body: AgentUpdate):
user = _require_auth(request)
_check_agent_access(await agent_store.get_agent(agent_id), user)
fields = {k: v for k, v in body.model_dump().items()
if v is not None or k == "max_tool_calls"}
agent = await agent_store.update_agent(agent_id, **fields)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
agent_runner.reschedule(agent)
return agent
@router.delete("/agents/{agent_id}")
async def delete_agent(request: Request, agent_id: str):
user = _require_auth(request)
_check_agent_access(await agent_store.get_agent(agent_id), user)
deleted = await agent_store.delete_agent(agent_id)
if not deleted:
raise HTTPException(status_code=404, detail="Agent not found")
agent_runner.remove(agent_id)
return {"ok": True}
@router.post("/agents/{agent_id}/toggle")
async def toggle_agent(request: Request, agent_id: str):
user = _require_auth(request)
agent = _check_agent_access(await agent_store.get_agent(agent_id), user)
updated = await agent_store.update_agent(agent_id, enabled=not agent["enabled"])
agent_runner.reschedule(updated)
return updated
@router.post("/agents/{agent_id}/run")
async def run_agent_now(request: Request, agent_id: str):
user = _require_auth(request)
_check_agent_access(await agent_store.get_agent(agent_id), user)
run = await agent_runner.run_agent_now(agent_id)
return {"ok": True, "run": run}
@router.post("/agents/{agent_id}/stop")
async def stop_agent(request: Request, agent_id: str):
user = _require_auth(request)
_check_agent_access(await agent_store.get_agent(agent_id), user)
run_id = await agent_runner.find_active_run(agent_id)
if not run_id:
raise HTTPException(status_code=404, detail="No active run found for this agent")
stopped = agent_runner.stop_run(run_id)
return {"ok": stopped, "run_id": run_id}
@router.get("/agents/{agent_id}/runs")
async def get_agent_runs(request: Request, agent_id: str):
user = _require_auth(request)
_check_agent_access(await agent_store.get_agent(agent_id), user)
return await agent_store.list_runs(agent_id=agent_id)
# ── Agent runs (cross-agent) ──────────────────────────────────────────────────
@router.get("/agent-runs")
async def list_all_runs(
request: Request,
since: str = "7d",
start: str = "",
end: str = "",
status: str = "",
):
user = _require_auth(request)
now = datetime.now(timezone.utc)
since_dt: str | None = None
if start:
since_dt = start
elif since == "today":
since_dt = now.replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
elif since == "7d":
since_dt = (now - timedelta(days=7)).isoformat()
elif since == "30d":
since_dt = (now - timedelta(days=30)).isoformat()
runs = await agent_store.list_runs(since=since_dt, status=status or None)
# Exclude email handler agents (managed via Email Accounts, not the Agents page)
from ..database import get_pool as _gp
_pool = await _gp()
handler_ids = {str(r["agent_id"]) for r in await _pool.fetch(
"SELECT agent_id FROM email_accounts WHERE agent_id IS NOT NULL"
)}
runs = [r for r in runs if r["agent_id"] not in handler_ids]
# Non-admins see only runs for their own agents
if not user.is_admin:
own_agents = await agent_store.list_agents(owner_user_id=user.id)
own_ids = {a["id"] for a in own_agents}
runs = [r for r in runs if r["agent_id"] in own_ids]
agents_cache: dict[str, dict] = {}
for run in runs:
aid = run["agent_id"]
if aid not in agents_cache:
a = await agent_store.get_agent(aid)
agents_cache[aid] = a or {}
run["agent_name"] = agents_cache[aid].get("name", "")
return runs
@router.get("/agent-runs/{run_id}")
async def get_agent_run(request: Request, run_id: str):
user = _require_auth(request)
from ..agents.tasks import get_run
run = await get_run(run_id)
if not run:
raise HTTPException(status_code=404, detail="Run not found")
_check_agent_access(await agent_store.get_agent(run["agent_id"]), user)
return run
@router.post("/agent-runs/{run_id}/stop")
async def stop_run(request: Request, run_id: str):
_require_admin(request)
stopped = agent_runner.stop_run(run_id)
if not stopped:
raise HTTPException(status_code=404, detail="Run not found or already stopped")
return {"ok": True, "run_id": run_id}
# ── Usage / cost overview ─────────────────────────────────────────────────────
@router.get("/usage")
async def get_usage(
request: Request,
since: str = "7d",
start: str = "",
end: str = "",
):
"""Aggregate token and cost usage by agent for the usage overview page."""
user = _require_auth(request)
if not user.is_admin:
from ..database import user_settings_store as _uss
if await _uss.get(user.id, "use_admin_keys"):
raise HTTPException(status_code=403, detail="Not available on admin API keys")
has_own = (
await _uss.get(user.id, "anthropic_api_key") or
await _uss.get(user.id, "openrouter_api_key") or
await _uss.get(user.id, "openai_api_key")
)
if not has_own:
raise HTTPException(status_code=403, detail="Not available on admin API keys")
from ..database import get_pool as _gp
pool = await _gp()
now = datetime.now(timezone.utc)
since_dt: str | None = None
if start:
since_dt = start
elif since == "today":
since_dt = now.replace(hour=0, minute=0, second=0, microsecond=0).isoformat()
elif since == "7d":
since_dt = (now - timedelta(days=7)).isoformat()
elif since == "30d":
since_dt = (now - timedelta(days=30)).isoformat()
# since == "all" → no date filter
# Build WHERE clauses
clauses: list[str] = ["ar.status IN ('success', 'error', 'stopped')"]
params: list = []
n = 1
if since_dt:
clauses.append(f"ar.started_at >= ${n}"); params.append(since_dt); n += 1
if end:
clauses.append(f"ar.started_at <= ${n}"); params.append(end); n += 1
# Non-admin sees only their own agents
if not user.is_admin:
own_agents = await agent_store.list_agents(owner_user_id=user.id)
own_ids = [a["id"] for a in own_agents]
if own_ids:
placeholders = ", ".join(f"${n + i}" for i in range(len(own_ids)))
clauses.append(f"ar.agent_id IN ({placeholders})")
params.extend(own_ids)
n += len(own_ids)
else:
# No agents → empty result
return {"summary": {"runs": 0, "input_tokens": 0, "output_tokens": 0, "cost_usd": None}, "by_agent": []}
where = "WHERE " + " AND ".join(clauses)
rows = await pool.fetch(
f"""
SELECT
ar.agent_id,
a.name AS agent_name,
a.model AS agent_model,
COUNT(*) AS runs,
SUM(ar.input_tokens) AS input_tokens,
SUM(ar.output_tokens) AS output_tokens,
SUM(ar.cost_usd) AS cost_usd
FROM agent_runs ar
LEFT JOIN agents a ON a.id = ar.agent_id
{where}
GROUP BY ar.agent_id, a.name, a.model
ORDER BY cost_usd DESC NULLS LAST, (SUM(ar.input_tokens) + SUM(ar.output_tokens)) DESC
""",
*params,
)
by_agent = []
total_input = 0
total_output = 0
total_cost: float | None = None
total_runs = 0
for row in rows:
inp = int(row["input_tokens"] or 0)
out = int(row["output_tokens"] or 0)
cost = float(row["cost_usd"]) if row["cost_usd"] is not None else None
runs = int(row["runs"])
total_input += inp
total_output += out
total_runs += runs
if cost is not None:
total_cost = (total_cost or 0.0) + cost
by_agent.append({
"agent_id": str(row["agent_id"]),
"agent_name": row["agent_name"] or "",
"model": row["agent_model"] or "",
"runs": runs,
"input_tokens": inp,
"output_tokens": out,
"cost_usd": cost,
})
# ── Chat session usage ────────────────────────────────────────────────────
chat_clauses: list[str] = ["task_id IS NULL"] # chat only, not agent/task runs
chat_params: list = []
cn = 1
if since_dt:
chat_clauses.append(f"started_at >= ${cn}"); chat_params.append(since_dt); cn += 1
if end:
chat_clauses.append(f"started_at <= ${cn}"); chat_params.append(end); cn += 1
if not user.is_admin:
chat_clauses.append(f"user_id = ${cn}"); chat_params.append(user.id); cn += 1
chat_where = "WHERE " + " AND ".join(chat_clauses)
chat_row = await pool.fetchrow(
f"""
SELECT
COUNT(*) AS sessions,
SUM(input_tokens) AS input_tokens,
SUM(output_tokens) AS output_tokens,
SUM(cost_usd) AS cost_usd
FROM conversations
{chat_where}
""",
*chat_params,
)
chat_inp = int(chat_row["input_tokens"] or 0)
chat_out = int(chat_row["output_tokens"] or 0)
chat_cost = float(chat_row["cost_usd"]) if chat_row["cost_usd"] is not None else None
chat_sessions = int(chat_row["sessions"] or 0)
total_input += chat_inp
total_output += chat_out
total_runs += chat_sessions
if chat_cost is not None:
total_cost = (total_cost or 0.0) + chat_cost
return {
"summary": {
"runs": total_runs,
"input_tokens": total_input,
"output_tokens": total_output,
"cost_usd": total_cost,
},
"by_agent": by_agent,
"chat": {
"sessions": chat_sessions,
"input_tokens": chat_inp,
"output_tokens": chat_out,
"cost_usd": chat_cost,
},
}
# ── Usage: clear cost data ────────────────────────────────────────────────────
@router.delete("/usage/cost")
async def clear_cost_data(request: Request):
"""Delete all agent_runs and null out conversation cost_usd. Admin only."""
_require_admin(request)
from ..database import get_pool as _gp
pool = await _gp()
async with pool.acquire() as conn:
await conn.execute("DELETE FROM agent_runs")
await conn.execute("UPDATE conversations SET cost_usd = NULL WHERE cost_usd IS NOT NULL")
return {"ok": True}
# ── Inbox triggers ────────────────────────────────────────────────────────────
class InboxTriggerIn(BaseModel):
trigger_word: str
agent_id: str
description: str = ""
enabled: bool = True
@router.get("/inbox-triggers")
async def list_inbox_triggers(request: Request):
_require_admin(request)
from ..inbox.triggers import list_triggers
return await list_triggers()
@router.post("/inbox-triggers", status_code=201)
async def create_inbox_trigger(request: Request, body: InboxTriggerIn):
_require_admin(request)
from ..inbox.triggers import create_trigger
return await create_trigger(body.trigger_word, body.agent_id, body.description, body.enabled)
@router.put("/inbox-triggers/{id}")
async def update_inbox_trigger(request: Request, id: str, body: InboxTriggerIn):
_require_admin(request)
from ..inbox.triggers import update_trigger
ok = await update_trigger(
id,
trigger_word=body.trigger_word,
agent_id=body.agent_id,
description=body.description,
enabled=body.enabled,
)
if not ok:
raise HTTPException(status_code=404, detail="Trigger not found")
return {"ok": True}
@router.delete("/inbox-triggers/{id}")
async def delete_inbox_trigger(request: Request, id: str):
_require_admin(request)
from ..inbox.triggers import delete_trigger
if not await delete_trigger(id):
raise HTTPException(status_code=404, detail="Trigger not found")
return {"ok": True}
@router.post("/inbox-triggers/{id}/toggle")
async def toggle_inbox_trigger(request: Request, id: str):
_require_admin(request)
from ..inbox.triggers import toggle_trigger
await toggle_trigger(id)
return {"ok": True}
@router.get("/inbox/status")
async def inbox_status(request: Request):
_require_admin(request)
from ..inbox.listener import inbox_listener
return {
"global": inbox_listener.status,
"all": inbox_listener.all_statuses(),
}
@router.post("/inbox/reconnect")
async def inbox_reconnect(request: Request):
_require_admin(request)
from ..inbox.listener import inbox_listener
inbox_listener.reconnect()
return {"ok": True}
@router.post("/inbox/disconnect")
async def inbox_disconnect(request: Request):
_require_admin(request)
from ..inbox.listener import inbox_listener
inbox_listener.stop()
return {"ok": True}
# ── Telegram ──────────────────────────────────────────────────────────────────
@router.get("/telegram/status")
async def telegram_status(request: Request):
_require_admin(request)
from ..telegram.listener import telegram_listener
return telegram_listener.status
@router.post("/telegram/reconnect")
async def telegram_reconnect(request: Request):
_require_admin(request)
from ..telegram.listener import telegram_listener
telegram_listener.reconnect()
return {"ok": True}
@router.post("/telegram/disconnect")
async def telegram_disconnect(request: Request):
_require_admin(request)
from ..telegram.listener import telegram_listener
telegram_listener.stop()
return {"ok": True}
@router.get("/telegram-whitelist")
async def list_telegram_whitelist(request: Request):
_require_admin(request)
from ..telegram.triggers import list_whitelist
return await list_whitelist()
@router.post("/telegram-whitelist")
async def add_telegram_whitelist(request: Request, body: dict):
_require_admin(request)
from ..telegram.triggers import add_to_whitelist
chat_id = body.get("chat_id", "").strip()
if not chat_id:
raise HTTPException(status_code=400, detail="chat_id required")
return await add_to_whitelist(chat_id, body.get("label", ""))
@router.delete("/telegram-whitelist/{chat_id}")
async def remove_telegram_whitelist(request: Request, chat_id: str):
_require_admin(request)
from ..telegram.triggers import remove_from_whitelist
if not await remove_from_whitelist(chat_id):
raise HTTPException(status_code=404, detail="chat_id not found")
return {"ok": True}
@router.get("/telegram-triggers")
async def list_telegram_triggers(request: Request):
_require_admin(request)
from ..telegram.triggers import list_triggers
return await list_triggers()
@router.post("/telegram-triggers")
async def create_telegram_trigger(request: Request, body: dict):
_require_admin(request)
from ..telegram.triggers import create_trigger
trigger_word = body.get("trigger_word", "").strip()
agent_id = body.get("agent_id", "").strip()
if not trigger_word or not agent_id:
raise HTTPException(status_code=400, detail="trigger_word and agent_id required")
return await create_trigger(
trigger_word=trigger_word,
agent_id=agent_id,
description=body.get("description", ""),
enabled=body.get("enabled", True),
)
@router.put("/telegram-triggers/{id}")
async def update_telegram_trigger(request: Request, id: str, body: dict):
_require_admin(request)
from ..telegram.triggers import update_trigger
_ALLOWED = {"trigger_word", "agent_id", "description", "enabled"}
safe_body = {k: v for k, v in body.items() if k in _ALLOWED}
if not await update_trigger(id, **safe_body):
raise HTTPException(status_code=404, detail="Trigger not found")
return {"ok": True}
@router.delete("/telegram-triggers/{id}")
async def delete_telegram_trigger(request: Request, id: str):
_require_admin(request)
from ..telegram.triggers import delete_trigger
if not await delete_trigger(id):
raise HTTPException(status_code=404, detail="Trigger not found")
return {"ok": True}
@router.post("/telegram-triggers/{id}/toggle")
async def toggle_telegram_trigger(request: Request, id: str):
_require_admin(request)
from ..telegram.triggers import toggle_trigger
await toggle_trigger(id)
return {"ok": True}
@router.get("/telegram/default-agent")
async def get_telegram_default_agent(request: Request):
_require_admin(request)
agent_id = await credential_store.get("telegram:default_agent_id") or ""
return {"agent_id": agent_id}
@router.post("/telegram/default-agent")
async def set_telegram_default_agent(request: Request, body: dict):
_require_admin(request)
agent_id = body.get("agent_id", "").strip()
if agent_id:
await credential_store.set("telegram:default_agent_id", agent_id,
description="Default Telegram agent (no trigger match)")
else:
await credential_store.delete("telegram:default_agent_id")
return {"ok": True}
# ── 2nd Brain ─────────────────────────────────────────────────────────────────
class CaptureIn(BaseModel):
content: str
@router.get("/brain/status")
async def get_brain_status(request: Request):
_require_admin(request)
from ..brain.database import get_pool, get_stats, browse_thoughts
if get_pool() is None:
return {"connected": False}
try:
stats = await get_stats()
recent = await browse_thoughts(limit=10)
return {"connected": True, "stats": stats, "recent": recent}
except Exception as e:
return {"connected": False, "error": str(e)}
@router.get("/brain/search")
async def search_brain(request: Request, q: str = "", limit: int = 20):
_require_admin(request)
from ..brain.database import get_pool
from ..brain.search import semantic_search
if get_pool() is None:
raise HTTPException(status_code=503, detail="Brain DB not available")
if not q.strip():
raise HTTPException(status_code=400, detail="Query required")
results = await semantic_search(q.strip(), threshold=0.3, limit=limit)
return {"results": results}
@router.post("/brain/capture")
async def capture_thought(request: Request, body: CaptureIn):
_require_admin(request)
from ..brain.database import get_pool
from ..brain.ingest import ingest_thought
if get_pool() is None:
raise HTTPException(status_code=503, detail="Brain DB not available")
result = await ingest_thought(body.content)
return {"ok": True, "id": result["id"], "confirmation": result["confirmation"]}
# ── System prompt files (SOUL.md / USER.md) ───────────────────────────────────
_PROMPT_FILES = {
"soul": "SOUL.md",
"user": "USER.md",
}
def _prompt_file_path(key: str):
from pathlib import Path
filename = _PROMPT_FILES.get(key)
if not filename:
raise HTTPException(status_code=404, detail="Unknown prompt file")
return Path(__file__).parent.parent.parent / filename
@router.get("/system-prompt/{key}")
async def get_system_prompt(request: Request, key: str):
_require_admin(request)
path = _prompt_file_path(key)
content = path.read_text(encoding="utf-8") if path.exists() else ""
return {"content": content}
@router.post("/system-prompt/{key}")
async def save_system_prompt(request: Request, key: str, body: dict):
_require_admin(request)
path = _prompt_file_path(key)
content = body.get("content", "")
path.write_text(content, encoding="utf-8")
return {"ok": True}
# ── Tool list ──────────────────────────────────────────────────────────────────
@router.get("/tools")
async def list_tools(request: Request):
user = _require_auth(request)
from ..main import _registry
if _registry is None:
return []
tools = _registry.all_tools()
result = []
for t in tools:
if t.name == "bash" and not user.is_admin:
continue
is_mcp = t.name.startswith("mcp__")
# Non-admins don't see global MCP tools — they get their own below
if is_mcp and not user.is_admin:
continue
result.append({
"name": t.name,
"is_mcp": is_mcp,
"server": t.name.split("__")[1] if is_mcp else None,
"server_display_name": getattr(t, "_server_display_name", None),
})
# Inject per-user MCP tools for non-admin users
if not user.is_admin:
from ..mcp_client.manager import discover_user_mcp_tools
user_mcp_tools = await discover_user_mcp_tools(user.id)
for t in user_mcp_tools:
result.append({
"name": t.name,
"is_mcp": True,
"server": t.name.split("__")[1] if "__" in t.name else None,
"server_display_name": getattr(t, "_server_display_name", getattr(t, "server_name", None)),
})
return result
# ── MCP servers ────────────────────────────────────────────────────────────────
class McpServerIn(BaseModel):
name: str
url: str
transport: str = "sse"
api_key: str = ""
headers: Optional[dict] = None
enabled: bool = True
class McpServerUpdate(BaseModel):
name: Optional[str] = None
url: Optional[str] = None
transport: Optional[str] = None
api_key: Optional[str] = None
headers: Optional[dict] = None
enabled: Optional[bool] = None
@router.get("/mcp-servers")
async def list_mcp_servers(request: Request):
_require_admin(request)
from ..mcp_client.store import list_servers
return await list_servers(include_secrets=False, user_id="GLOBAL")
@router.post("/mcp-servers", status_code=201)
async def create_mcp_server(request: Request, body: McpServerIn):
_require_admin(request)
from ..mcp_client.store import list_servers, create_server
from ..mcp_client.manager import reload_server_tools
from ..main import _registry
existing = await list_servers(user_id="GLOBAL")
if any(s["name"] == body.name for s in existing):
raise HTTPException(status_code=400, detail="A server with that name already exists")
server = await create_server(
name=body.name, url=body.url, transport=body.transport,
api_key=body.api_key, headers=body.headers, enabled=body.enabled,
)
if _registry:
reload_server_tools(_registry)
return server
@router.get("/mcp-servers/{server_id}")
async def get_mcp_server(request: Request, server_id: str):
_require_admin(request)
from ..mcp_client.store import get_server
server = await get_server(server_id, include_secrets=True)
if not server:
raise HTTPException(status_code=404, detail="MCP server not found")
return server
@router.put("/mcp-servers/{server_id}")
async def update_mcp_server(request: Request, server_id: str, body: McpServerUpdate):
_require_admin(request)
from ..mcp_client.store import update_server
from ..mcp_client.manager import reload_server_tools
from ..main import _registry
fields = {k: v for k, v in body.model_dump().items() if v is not None}
server = await update_server(server_id, **fields)
if not server:
raise HTTPException(status_code=404, detail="MCP server not found")
if _registry:
reload_server_tools(_registry)
return server
@router.delete("/mcp-servers/{server_id}")
async def delete_mcp_server(request: Request, server_id: str):
_require_admin(request)
from ..mcp_client.store import delete_server
from ..mcp_client.manager import reload_server_tools
from ..main import _registry
deleted = await delete_server(server_id)
if not deleted:
raise HTTPException(status_code=404, detail="MCP server not found")
if _registry:
reload_server_tools(_registry)
return {"ok": True}
@router.post("/mcp-servers/{server_id}/toggle")
async def toggle_mcp_server(request: Request, server_id: str):
_require_admin(request)
from ..mcp_client.store import get_server, update_server
from ..mcp_client.manager import reload_server_tools
from ..main import _registry
server = await get_server(server_id)
if not server:
raise HTTPException(status_code=404, detail="MCP server not found")
updated = await update_server(server_id, enabled=not server["enabled"])
if _registry:
reload_server_tools(_registry)
return updated
@router.post("/mcp-servers/{server_id}/refresh")
async def refresh_mcp_server(request: Request, server_id: str):
_require_admin(request)
from ..mcp_client.store import get_server
from ..mcp_client.manager import discover_tools, reload_server_tools
from ..main import _registry
server = await get_server(server_id, include_secrets=True)
if not server:
raise HTTPException(status_code=404, detail="MCP server not found")
tools = await discover_tools(server)
if _registry:
reload_server_tools(_registry)
return {"tool_count": len(tools), "tools": [t["tool_name"] for t in tools]}
# ── Security settings ─────────────────────────────────────────────────────────
_UPLOAD_DEFAULT_EXTENSIONS: list[str] = [
# Text / code
"txt", "md", "csv", "json", "xml", "yaml", "yml", "toml", "ini", "conf", "cfg",
"html", "css", "js", "ts", "jsx", "tsx", "py", "sh", "bash", "log", "sql",
"env", "rst", "diff", "patch", "tsv", "nfo", "gitignore", "dockerfile",
"rb", "go", "java", "c", "cpp", "h", "rs", "swift", "kt",
# Images
"jpg", "jpeg", "png", "gif", "webp", "svg", "bmp", "ico", "tiff", "avif", "heic", "heif",
# Documents
"pdf",
]
# Exact filenames (no extension) that are always allowed regardless of extension rules.
# Checked by lowercase basename — not editable via the policy UI.
_UPLOAD_ALLOWED_EXACT_NAMES: frozenset[str] = frozenset([
"known_hosts", "authorized_keys", "config", # SSH
"makefile", "procfile", "dockerfile", # build
".env", ".gitignore", ".htaccess", # config dotfiles
])
_UPLOAD_DEFAULT_MAX_MB = 50
_UPLOAD_DEFAULT_MAX_FILES = 20
async def _get_upload_policy() -> dict:
import json as _json
raw_ext = await credential_store.get("system:upload_allowed_extensions")
raw_mb = await credential_store.get("system:upload_max_file_size_mb")
raw_n = await credential_store.get("system:upload_max_files")
try:
exts = _json.loads(raw_ext) if raw_ext else _UPLOAD_DEFAULT_EXTENSIONS
except Exception:
exts = _UPLOAD_DEFAULT_EXTENSIONS
try:
max_mb = int(raw_mb) if raw_mb else _UPLOAD_DEFAULT_MAX_MB
except (ValueError, TypeError):
max_mb = _UPLOAD_DEFAULT_MAX_MB
try:
max_files = int(raw_n) if raw_n else _UPLOAD_DEFAULT_MAX_FILES
except (ValueError, TypeError):
max_files = _UPLOAD_DEFAULT_MAX_FILES
return {"allowed_extensions": exts, "max_file_size_mb": max_mb, "max_files": max_files}
class SecuritySettingsIn(BaseModel):
sanitize_enhanced: Optional[bool] = None
canary_enabled: Optional[bool] = None
output_validation_enabled: Optional[bool] = None
llm_screen_enabled: Optional[bool] = None
llm_screen_model: Optional[str] = None
llm_screen_block: Optional[bool] = None
truncation_enabled: Optional[bool] = None
max_web_chars: Optional[int] = None
max_email_chars: Optional[int] = None
max_file_chars: Optional[int] = None
max_subject_chars: Optional[int] = None
upload_allowed_extensions: Optional[list[str]] = None
upload_max_file_size_mb: Optional[int] = None
upload_max_files: Optional[int] = None
@router.get("/settings/security")
async def get_security_settings(request: Request):
_require_admin(request)
async def _bool(key: str) -> bool:
return await credential_store.get(key) == "1"
async def _int(key: str, default: int) -> int:
raw = await credential_store.get(key)
try:
return int(raw) if raw else default
except (ValueError, TypeError):
return default
(
sanitize_enhanced, canary_enabled, output_validation_enabled,
truncation_enabled, llm_screen_enabled, llm_screen_block,
max_web_chars, max_email_chars, max_file_chars, max_subject_chars,
llm_screen_model, upload_policy,
) = await asyncio_gather(
_bool("system:security_sanitize_enhanced"),
_bool("system:security_canary_enabled"),
_bool("system:security_output_validation_enabled"),
_bool("system:security_truncation_enabled"),
_bool("system:security_llm_screen_enabled"),
_bool("system:security_llm_screen_block"),
_int("system:security_max_web_chars", 20000),
_int("system:security_max_email_chars", 6000),
_int("system:security_max_file_chars", 20000),
_int("system:security_max_subject_chars", 200),
credential_store.get("system:security_llm_screen_model"),
_get_upload_policy(),
)
return {
"sanitize_enhanced": sanitize_enhanced,
"canary_enabled": canary_enabled,
"output_validation_enabled": output_validation_enabled,
"truncation_enabled": truncation_enabled,
"max_web_chars": max_web_chars,
"max_email_chars": max_email_chars,
"max_file_chars": max_file_chars,
"max_subject_chars": max_subject_chars,
"llm_screen_enabled": llm_screen_enabled,
"llm_screen_model": llm_screen_model or "google/gemini-flash-1.5",
"llm_screen_block": llm_screen_block,
**upload_policy,
}
@router.post("/settings/security")
async def save_security_settings(request: Request, body: SecuritySettingsIn):
_require_admin(request)
from ..security_screening import _invalidate_toggle_cache, _limit_cache
ops = []
if body.sanitize_enhanced is not None:
ops.append(credential_store.set("system:security_sanitize_enhanced", "1" if body.sanitize_enhanced else "0", "Enhanced prompt injection sanitization (Option 1)"))
_invalidate_toggle_cache("system:security_sanitize_enhanced")
if body.canary_enabled is not None:
ops.append(credential_store.set("system:security_canary_enabled", "1" if body.canary_enabled else "0", "Canary token injection detection (Option 2)"))
_invalidate_toggle_cache("system:security_canary_enabled")
if body.output_validation_enabled is not None:
ops.append(credential_store.set("system:security_output_validation_enabled", "1" if body.output_validation_enabled else "0", "Output validation for external-origin sessions (Option 4)"))
_invalidate_toggle_cache("system:security_output_validation_enabled")
if body.truncation_enabled is not None:
ops.append(credential_store.set("system:security_truncation_enabled", "1" if body.truncation_enabled else "0", "Configurable content truncation limits (Option 5)"))
_invalidate_toggle_cache("system:security_truncation_enabled")
if body.max_web_chars is not None:
ops.append(credential_store.set("system:security_max_web_chars", str(body.max_web_chars), "Max web page chars returned to agent"))
_limit_cache.pop("system:security_max_web_chars", None)
if body.max_email_chars is not None:
ops.append(credential_store.set("system:security_max_email_chars", str(body.max_email_chars), "Max email body chars returned to agent"))
_limit_cache.pop("system:security_max_email_chars", None)
if body.max_file_chars is not None:
ops.append(credential_store.set("system:security_max_file_chars", str(body.max_file_chars), "Max file content chars returned to agent"))
_limit_cache.pop("system:security_max_file_chars", None)
if body.max_subject_chars is not None:
ops.append(credential_store.set("system:security_max_subject_chars", str(body.max_subject_chars), "Max email subject chars returned to agent"))
_limit_cache.pop("system:security_max_subject_chars", None)
if body.llm_screen_enabled is not None:
ops.append(credential_store.set("system:security_llm_screen_enabled", "1" if body.llm_screen_enabled else "0", "LLM content screening for external tool results (Option 3)"))
_invalidate_toggle_cache("system:security_llm_screen_enabled")
if body.llm_screen_model is not None:
ops.append(credential_store.set("system:security_llm_screen_model", body.llm_screen_model, "Model used for LLM content screening"))
if body.llm_screen_block is not None:
ops.append(credential_store.set("system:security_llm_screen_block", "1" if body.llm_screen_block else "0", "LLM screening block mode (vs flag mode)"))
_invalidate_toggle_cache("system:security_llm_screen_block")
if body.upload_allowed_extensions is not None:
import json as _json
exts = [e.lstrip(".").lower().strip() for e in body.upload_allowed_extensions if e.strip()]
ops.append(credential_store.set("system:upload_allowed_extensions", _json.dumps(exts), "Allowed file extensions for user file uploads"))
if body.upload_max_file_size_mb is not None:
ops.append(credential_store.set("system:upload_max_file_size_mb", str(max(1, body.upload_max_file_size_mb)), "Max file size in MB for user uploads"))
if body.upload_max_files is not None:
ops.append(credential_store.set("system:upload_max_files", str(max(1, body.upload_max_files)), "Max number of files per upload request"))
if ops:
await asyncio_gather(*ops)
return {"ok": True}
# ── Login lockouts ────────────────────────────────────────────────────────────
@router.get("/settings/login-lockouts")
async def get_login_lockouts(request: Request):
_require_admin(request)
from ..login_limiter import list_locked
return list_locked()
@router.delete("/settings/login-lockouts")
async def unlock_all_ips(request: Request):
_require_admin(request)
from ..login_limiter import unlock_all
count = unlock_all()
return {"ok": True, "unlocked": count}
@router.delete("/settings/login-lockouts/{ip:path}")
async def unlock_ip(request: Request, ip: str):
_require_admin(request)
from ..login_limiter import unlock
if not unlock(ip):
raise HTTPException(status_code=404, detail=f"IP {ip!r} not found in lockout list")
return {"ok": True, "ip": ip}
# ── Branding ───────────────────────────────────────────────────────────────────
@router.get("/settings/branding")
async def get_branding(request: Request):
_require_admin(request)
from pathlib import Path
from ..main import BASE_DIR
brand_name, logo_filename = await asyncio_gather(
credential_store.get("system:brand_name"),
credential_store.get("system:brand_logo_filename"),
)
brand_name = brand_name or ""
logo_filename = logo_filename or ""
has_custom_logo = bool(logo_filename and (BASE_DIR / "web" / "static" / logo_filename).exists())
return {
"brand_name": brand_name,
"has_custom_logo": has_custom_logo,
"logo_filename": logo_filename if has_custom_logo else "",
}
class BrandingNameIn(BaseModel):
brand_name: str
@router.post("/settings/branding")
async def save_branding(request: Request, body: BrandingNameIn):
_require_admin(request)
from ..main import _refresh_brand_globals
if body.brand_name.strip():
await credential_store.set("system:brand_name", body.brand_name.strip(), "Custom brand name shown in sidebar")
else:
await credential_store.delete("system:brand_name")
await _refresh_brand_globals()
return {"ok": True}
@router.post("/settings/branding/logo")
async def upload_brand_logo():
raise HTTPException(status_code=405, detail="Use multipart POST")
_MAX_LOGO_BYTES = 2 * 1024 * 1024 # 2 MB
@router.post("/settings/branding/logo/upload")
async def upload_brand_logo_file(request: Request):
_require_admin(request)
from pathlib import Path
from ..main import BASE_DIR, _refresh_brand_globals
form = await request.form()
file = form.get("file")
if not file:
raise HTTPException(status_code=400, detail="No file provided")
ext = Path(file.filename).suffix.lower() or ".png"
# SVG excluded: can embed JavaScript and execute when served directly
if ext not in {".png", ".jpg", ".jpeg", ".gif", ".webp"}:
raise HTTPException(status_code=400, detail="Unsupported image type (png, jpg, gif, webp only)")
data = await file.read()
if len(data) > _MAX_LOGO_BYTES:
raise HTTPException(status_code=413, detail="File too large (max 2 MB)")
filename = f"logo_custom{ext}"
dest = BASE_DIR / "web" / "static" / filename
old_filename = await credential_store.get("system:brand_logo_filename") or ""
if old_filename and old_filename != filename:
old_path = BASE_DIR / "web" / "static" / old_filename
if old_path.exists():
old_path.unlink(missing_ok=True)
with dest.open("wb") as f:
f.write(data)
await credential_store.set("system:brand_logo_filename", filename, "Custom sidebar logo filename")
await _refresh_brand_globals()
return {"ok": True, "filename": filename}
@router.delete("/settings/branding/logo")
async def delete_brand_logo(request: Request):
_require_admin(request)
from pathlib import Path
from ..main import BASE_DIR, _refresh_brand_globals
filename = await credential_store.get("system:brand_logo_filename") or ""
if filename:
path = BASE_DIR / "web" / "static" / filename
path.unlink(missing_ok=True)
await credential_store.delete("system:brand_logo_filename")
await _refresh_brand_globals()
return {"ok": True}
# ── API Key ────────────────────────────────────────────────────────────────────
@router.get("/settings/api-key")
async def get_api_key_status(request: Request):
_require_admin(request)
hash_val, created_at = await asyncio_gather(
credential_store.get("system:api_key_hash"),
credential_store.get("system:api_key_created_at"),
)
return {"configured": hash_val is not None, "created_at": created_at}
@router.post("/settings/api-key")
async def generate_api_key(request: Request):
_require_admin(request)
import hashlib
import secrets
raw_key = "oai_" + secrets.token_urlsafe(32)
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
now = datetime.now(timezone.utc).isoformat()
await asyncio_gather(
credential_store.set("system:api_key_hash", key_hash, description="API key hash (SHA-256)"),
credential_store.set("system:api_key_created_at", now, description="API key creation timestamp"),
)
return {"key": raw_key}
@router.delete("/settings/api-key")
async def revoke_api_key(request: Request):
_require_admin(request)
await asyncio_gather(
credential_store.delete("system:api_key_hash"),
credential_store.delete("system:api_key_created_at"),
)
return {"ok": True}
# ── Provider API keys (admin — global) ────────────────────────────────────────
_VALID_PROVIDERS = frozenset({"anthropic", "openrouter", "openai"})
_PROVIDER_LABELS = {
"anthropic": "Anthropic API key",
"openrouter": "OpenRouter API key",
"openai": "OpenAI API key",
}
class ProviderKeyIn(BaseModel):
provider: str # "anthropic", "openrouter", or "openai"
key: str
@router.get("/settings/provider-keys")
async def get_provider_keys(request: Request):
_require_admin(request)
ant = await credential_store.get("system:anthropic_api_key")
orr = await credential_store.get("system:openrouter_api_key")
oai = await credential_store.get("system:openai_api_key")
return {"anthropic_set": bool(ant), "openrouter_set": bool(orr), "openai_set": bool(oai)}
@router.post("/settings/provider-keys")
async def set_provider_key(request: Request, body: ProviderKeyIn):
_require_admin(request)
if body.provider not in _VALID_PROVIDERS:
raise HTTPException(status_code=400, detail="provider must be 'anthropic', 'openrouter', or 'openai'")
if not body.key.strip():
raise HTTPException(status_code=400, detail="key must not be empty")
label = _PROVIDER_LABELS[body.provider]
await credential_store.set(f"system:{body.provider}_api_key", body.key.strip(), label)
return {"ok": True}
@router.delete("/settings/provider-keys/{provider}")
async def delete_provider_key(request: Request, provider: str):
_require_admin(request)
if provider not in _VALID_PROVIDERS:
raise HTTPException(status_code=400, detail="provider must be 'anthropic', 'openrouter', or 'openai'")
await credential_store.delete(f"system:{provider}_api_key")
return {"ok": True}
# ── Per-user API keys (/api/my/provider-keys) ─────────────────────────────────
from ..database import user_settings_store as _user_settings_store
class MyProviderKeyIn(BaseModel):
provider: str
key: str
@router.get("/my/provider-keys")
async def get_my_provider_keys(request: Request):
user = _require_auth(request)
ant = await _user_settings_store.get(user.id, "anthropic_api_key")
orr = await _user_settings_store.get(user.id, "openrouter_api_key")
oai = await _user_settings_store.get(user.id, "openai_api_key")
# Determine access tier based on global keys
global_ant = await credential_store.get("system:anthropic_api_key")
global_or = await credential_store.get("system:openrouter_api_key")
# Admin-granted full access bypasses all restrictions
use_admin_keys = not user.is_admin and bool(await _user_settings_store.get(user.id, "use_admin_keys"))
return {
"anthropic_set": bool(ant),
"openrouter_set": bool(orr),
"openai_set": bool(oai),
# Non-admin access restrictions (lifted if use_admin_keys is granted)
"anthropic_blocked": not use_admin_keys and not bool(ant) and bool(global_ant) and not user.is_admin,
"openrouter_free_only": not use_admin_keys and not bool(orr) and bool(global_or) and not user.is_admin,
"openai_blocked": not use_admin_keys and not user.is_admin,
}
@router.post("/my/provider-keys")
async def set_my_provider_key(request: Request, body: MyProviderKeyIn):
user = _require_auth(request)
if body.provider not in _VALID_PROVIDERS:
raise HTTPException(status_code=400, detail="provider must be 'anthropic', 'openrouter', or 'openai'")
if not body.key.strip():
raise HTTPException(status_code=400, detail="key must not be empty")
await _user_settings_store.set(user.id, f"{body.provider}_api_key", body.key.strip())
return {"ok": True}
@router.delete("/my/provider-keys/{provider}")
async def delete_my_provider_key(request: Request, provider: str):
user = _require_auth(request)
if provider not in _VALID_PROVIDERS:
raise HTTPException(status_code=400, detail="provider must be 'anthropic', 'openrouter', or 'openai'")
await _user_settings_store.delete(user.id, f"{provider}_api_key")
return {"ok": True}
# ── Admin: grant/revoke user admin key access ─────────────────────────────────
class AdminKeysAccessIn(BaseModel):
enabled: bool
@router.get("/users/{user_id}/admin-keys")
async def get_user_admin_keys_access(request: Request, user_id: str):
_require_admin(request)
val = await _user_settings_store.get(user_id, "use_admin_keys")
return {"enabled": bool(val)}
@router.post("/users/{user_id}/admin-keys")
async def set_user_admin_keys_access(request: Request, user_id: str, body: AdminKeysAccessIn):
_require_admin(request)
from ..users import get_user_by_id
if not await get_user_by_id(user_id):
raise HTTPException(status_code=404, detail="User not found")
if body.enabled:
await _user_settings_store.set(user_id, "use_admin_keys", "1")
else:
await _user_settings_store.delete(user_id, "use_admin_keys")
return {"ok": True}
# ── Per-user personality (/api/my/personality) ────────────────────────────────
class MyPersonalityIn(BaseModel):
soul: Optional[str] = None
user_context: Optional[str] = None
@router.get("/my/personality")
async def get_my_personality(request: Request):
user = _require_auth(request)
soul = await _user_settings_store.get(user.id, "personality_soul") or ""
user_ctx = await _user_settings_store.get(user.id, "personality_user") or ""
return {"soul": soul, "user_context": user_ctx}
@router.post("/my/personality")
async def set_my_personality(request: Request, body: MyPersonalityIn):
user = _require_auth(request)
if body.soul is not None:
if body.soul.strip():
await _user_settings_store.set(user.id, "personality_soul", body.soul.strip())
else:
await _user_settings_store.delete(user.id, "personality_soul")
if body.user_context is not None:
if body.user_context.strip():
await _user_settings_store.set(user.id, "personality_user", body.user_context.strip())
else:
await _user_settings_store.delete(user.id, "personality_user")
# Mark profile setup as done when user explicitly saves
await _user_settings_store.set(user.id, "personality_setup_done", "1")
return {"ok": True}
@router.post("/my/personality/dismiss-nag")
async def dismiss_personality_nag(request: Request):
user = _require_auth(request)
await _user_settings_store.set(user.id, "personality_setup_done", "1")
return {"ok": True}
# ── Per-user brain settings (/api/my/brain-settings) ─────────────────────────
@router.get("/my/brain-settings")
async def get_my_brain_settings(request: Request):
user = _require_auth(request)
val = await _user_settings_store.get(user.id, "brain_auto_approve")
return {"brain_auto_approve": bool(val)}
@router.post("/my/brain-settings")
async def set_my_brain_settings(request: Request, body: dict):
user = _require_auth(request)
enabled = body.get("brain_auto_approve", False)
if enabled:
await _user_settings_store.set(user.id, "brain_auto_approve", "1")
else:
await _user_settings_store.delete(user.id, "brain_auto_approve")
return {"ok": True}
# ── Per-user MCP servers (/api/my/mcp-servers) ───────────────────────────────
class MyMcpServerIn(BaseModel):
name: str
url: str
transport: str = "sse"
api_key: Optional[str] = None
headers: Optional[dict] = None
enabled: bool = True
class MyMcpServerUpdate(BaseModel):
name: Optional[str] = None
url: Optional[str] = None
transport: Optional[str] = None
api_key: Optional[str] = None
headers: Optional[dict] = None
enabled: Optional[bool] = None
@router.get("/my/mcp-servers")
async def list_my_mcp_servers(request: Request):
user = _require_auth(request)
from ..mcp_client.store import list_servers
return await list_servers(include_secrets=False, user_id=user.id)
@router.post("/my/mcp-servers", status_code=201)
async def create_my_mcp_server(request: Request, body: MyMcpServerIn):
user = _require_auth(request)
from ..mcp_client.store import list_servers, create_server
existing = await list_servers(user_id=user.id)
if any(s["name"] == body.name for s in existing):
raise HTTPException(status_code=400, detail="A server with that name already exists")
server = await create_server(
name=body.name, url=body.url, transport=body.transport,
api_key=body.api_key or "", headers=body.headers, enabled=body.enabled,
user_id=user.id,
)
return server
@router.get("/my/mcp-servers/{server_id}")
async def get_my_mcp_server(request: Request, server_id: str):
user = _require_auth(request)
from ..mcp_client.store import get_server
server = await get_server(server_id, include_secrets=False)
if not server or server.get("user_id") != user.id:
raise HTTPException(status_code=404, detail="MCP server not found")
return server
@router.put("/my/mcp-servers/{server_id}")
async def update_my_mcp_server(request: Request, server_id: str, body: MyMcpServerUpdate):
user = _require_auth(request)
from ..mcp_client.store import get_server, update_server
server = await get_server(server_id)
if not server or server.get("user_id") != user.id:
raise HTTPException(status_code=404, detail="MCP server not found")
fields = {k: v for k, v in body.model_dump().items() if v is not None}
updated = await update_server(server_id, **fields)
return updated
@router.delete("/my/mcp-servers/{server_id}")
async def delete_my_mcp_server(request: Request, server_id: str):
user = _require_auth(request)
from ..mcp_client.store import get_server, delete_server
server = await get_server(server_id)
if not server or server.get("user_id") != user.id:
raise HTTPException(status_code=404, detail="MCP server not found")
await delete_server(server_id)
return {"ok": True}
@router.post("/my/mcp-servers/{server_id}/toggle")
async def toggle_my_mcp_server(request: Request, server_id: str):
user = _require_auth(request)
from ..mcp_client.store import get_server, update_server
server = await get_server(server_id)
if not server or server.get("user_id") != user.id:
raise HTTPException(status_code=404, detail="MCP server not found")
updated = await update_server(server_id, enabled=not server["enabled"])
return updated
# ── Per-user inbox (/api/my/inbox) ────────────────────────────────────────────
_INBOX_CRED_KEYS = [
"imap_host", "imap_port", "imap_username", "imap_password",
"smtp_host", "smtp_port", "smtp_username", "smtp_password",
]
@router.get("/my/inbox/config")
async def get_my_inbox_config(request: Request):
user = _require_auth(request)
result = {}
for key in _INBOX_CRED_KEYS:
val = await _user_settings_store.get(user.id, f"inbox_{key}")
result[key] = bool(val) if "password" in key else (val or "")
return result
@router.post("/my/inbox/config")
async def set_my_inbox_config(request: Request):
user = _require_auth(request)
body = await request.json()
for key in _INBOX_CRED_KEYS:
if key in body:
val = str(body[key]).strip() if body[key] else ""
if val:
await _user_settings_store.set(user.id, f"inbox_{key}", val)
else:
await _user_settings_store.delete(user.id, f"inbox_{key}")
return {"ok": True}
@router.post("/my/inbox/reconnect")
async def reconnect_my_inbox(request: Request):
user = _require_auth(request)
from ..inbox.listener import inbox_listener
inbox_listener.start_for_user(user.id)
return {"ok": True}
@router.get("/my/inbox/status")
async def get_my_inbox_status(request: Request):
user = _require_auth(request)
from ..inbox.listener import inbox_listener
statuses = inbox_listener.all_statuses()
# all_statuses() now returns a list; find entries for this user
user_statuses = [s for s in statuses if s.get("user_id") == user.id]
if user_statuses:
return user_statuses[0]
return {"configured": False, "connected": False, "error": None}
class MyInboxTriggerIn(BaseModel):
trigger_word: str
agent_id: str
description: Optional[str] = ""
enabled: bool = True
@router.get("/my/inbox/triggers")
async def list_my_inbox_triggers(request: Request):
user = _require_auth(request)
from ..inbox.triggers import list_triggers
return await list_triggers(user_id=user.id)
@router.post("/my/inbox/triggers", status_code=201)
async def create_my_inbox_trigger(request: Request, body: MyInboxTriggerIn):
user = _require_auth(request)
from ..inbox.triggers import create_trigger
return await create_trigger(
trigger_word=body.trigger_word,
agent_id=body.agent_id,
description=body.description or "",
enabled=body.enabled,
user_id=user.id,
)
@router.put("/my/inbox/triggers/{trigger_id}")
async def update_my_inbox_trigger(request: Request, trigger_id: str, body: MyInboxTriggerIn):
user = _require_auth(request)
from ..inbox.triggers import update_trigger, list_triggers
triggers = await list_triggers(user_id=user.id)
if not any(t["id"] == trigger_id for t in triggers):
raise HTTPException(status_code=404, detail="Trigger not found")
fields = {k: v for k, v in body.model_dump().items() if v is not None}
await update_trigger(trigger_id, **fields)
return {"ok": True}
@router.delete("/my/inbox/triggers/{trigger_id}")
async def delete_my_inbox_trigger(request: Request, trigger_id: str):
user = _require_auth(request)
from ..inbox.triggers import delete_trigger, list_triggers
triggers = await list_triggers(user_id=user.id)
if not any(t["id"] == trigger_id for t in triggers):
raise HTTPException(status_code=404, detail="Trigger not found")
await delete_trigger(trigger_id)
return {"ok": True}
@router.post("/my/inbox/triggers/{trigger_id}/toggle")
async def toggle_my_inbox_trigger(request: Request, trigger_id: str):
user = _require_auth(request)
from ..inbox.triggers import toggle_trigger, list_triggers
triggers = await list_triggers(user_id=user.id)
if not any(t["id"] == trigger_id for t in triggers):
raise HTTPException(status_code=404, detail="Trigger not found")
await toggle_trigger(trigger_id)
return {"ok": True}
# ── Per-user Telegram (/api/my/telegram) ──────────────────────────────────────
@router.get("/my/telegram/config")
async def get_my_telegram_config(request: Request):
user = _require_auth(request)
token = await _user_settings_store.get(user.id, "telegram_bot_token")
return {"bot_token_set": bool(token)}
@router.post("/my/telegram/config")
async def set_my_telegram_config(request: Request):
user = _require_auth(request)
body = await request.json()
token = str(body.get("bot_token", "")).strip()
if token:
await _user_settings_store.set(user.id, "telegram_bot_token", token)
else:
await _user_settings_store.delete(user.id, "telegram_bot_token")
return {"ok": True}
@router.delete("/my/telegram/config")
async def delete_my_telegram_config(request: Request):
user = _require_auth(request)
await _user_settings_store.delete(user.id, "telegram_bot_token")
from ..telegram.listener import telegram_listener
telegram_listener.stop_for_user(user.id)
return {"ok": True}
@router.post("/my/telegram/reconnect")
async def reconnect_my_telegram(request: Request):
user = _require_auth(request)
from ..telegram.listener import telegram_listener
telegram_listener.start_for_user(user.id)
return {"ok": True}
@router.get("/my/telegram/status")
async def get_my_telegram_status(request: Request):
user = _require_auth(request)
from ..telegram.listener import telegram_listener
statuses = telegram_listener.all_statuses()
return statuses.get(user.id, {"configured": False, "running": False, "error": None})
@router.get("/my/telegram-whitelist")
async def list_my_telegram_whitelist(request: Request):
user = _require_auth(request)
from ..telegram.triggers import list_whitelist
return await list_whitelist(user_id=user.id)
@router.post("/my/telegram-whitelist")
async def add_my_telegram_whitelist(request: Request):
user = _require_auth(request)
body = await request.json()
chat_id = str(body.get("chat_id", "")).strip()
label = str(body.get("label", "")).strip()
if not chat_id:
raise HTTPException(status_code=400, detail="chat_id is required")
from ..telegram.triggers import add_to_whitelist
return await add_to_whitelist(chat_id, label=label, user_id=user.id)
@router.delete("/my/telegram-whitelist/{chat_id}")
async def remove_my_telegram_whitelist(request: Request, chat_id: str):
user = _require_auth(request)
from ..telegram.triggers import remove_from_whitelist, list_whitelist
entries = await list_whitelist(user_id=user.id)
if not any(e["chat_id"] == chat_id for e in entries):
raise HTTPException(status_code=404, detail="Chat ID not found in your whitelist")
await remove_from_whitelist(chat_id, user_id=user.id)
return {"ok": True}
class MyTelegramTriggerIn(BaseModel):
trigger_word: str
agent_id: str
description: Optional[str] = ""
enabled: bool = True
@router.get("/my/telegram-triggers")
async def list_my_telegram_triggers(request: Request):
user = _require_auth(request)
from ..telegram.triggers import list_triggers
return await list_triggers(user_id=user.id)
@router.post("/my/telegram-triggers", status_code=201)
async def create_my_telegram_trigger(request: Request, body: MyTelegramTriggerIn):
user = _require_auth(request)
from ..telegram.triggers import create_trigger
return await create_trigger(
trigger_word=body.trigger_word,
agent_id=body.agent_id,
description=body.description or "",
enabled=body.enabled,
user_id=user.id,
)
@router.put("/my/telegram-triggers/{trigger_id}")
async def update_my_telegram_trigger(
request: Request, trigger_id: str, body: MyTelegramTriggerIn
):
user = _require_auth(request)
from ..telegram.triggers import update_trigger, list_triggers
triggers = await list_triggers(user_id=user.id)
if not any(t["id"] == trigger_id for t in triggers):
raise HTTPException(status_code=404, detail="Trigger not found")
fields = {k: v for k, v in body.model_dump().items() if v is not None}
await update_trigger(trigger_id, **fields)
return {"ok": True}
@router.delete("/my/telegram-triggers/{trigger_id}")
async def delete_my_telegram_trigger(request: Request, trigger_id: str):
user = _require_auth(request)
from ..telegram.triggers import delete_trigger, list_triggers
triggers = await list_triggers(user_id=user.id)
if not any(t["id"] == trigger_id for t in triggers):
raise HTTPException(status_code=404, detail="Trigger not found")
await delete_trigger(trigger_id)
return {"ok": True}
@router.post("/my/telegram-triggers/{trigger_id}/toggle")
async def toggle_my_telegram_trigger(request: Request, trigger_id: str):
user = _require_auth(request)
from ..telegram.triggers import toggle_trigger, list_triggers
triggers = await list_triggers(user_id=user.id)
if not any(t["id"] == trigger_id for t in triggers):
raise HTTPException(status_code=404, detail="Trigger not found")
await toggle_trigger(trigger_id)
return {"ok": True}
# ── Conversations (chat history) ──────────────────────────────────────────────
class ConversationRenameIn(BaseModel):
title: str
@router.get("/conversations")
async def list_conversations(
request: Request,
page: int = 1,
per_page: int = 40,
q: str = "",
):
user = _require_auth(request)
from ..database import get_pool
pool = await get_pool()
offset = (page - 1) * per_page
# Build WHERE conditions with correct parameter indices
conditions = ["task_id IS NULL"]
filter_params: list = []
# Always scope to the requesting user — admins are not special here
filter_params.append(user.id)
conditions.append(f"user_id = ${len(filter_params)}")
if q:
filter_params.append(f"%{q}%")
conditions.append(f"title ILIKE ${len(filter_params)}")
where = "WHERE " + " AND ".join(conditions)
n = len(filter_params)
total = await pool.fetchval(
f"SELECT COUNT(*) FROM conversations {where}",
*filter_params,
)
rows = await pool.fetch(
f"""SELECT id, title, model, started_at, ended_at, task_id,
CASE WHEN jsonb_typeof(messages) = 'array'
THEN jsonb_array_length(messages) ELSE 0 END AS message_count
FROM conversations
{where}
ORDER BY ended_at DESC NULLS LAST
LIMIT ${n + 1} OFFSET ${n + 2}""",
*filter_params, per_page, offset,
)
return {"conversations": [dict(r) for r in rows], "total": total, "page": page, "per_page": per_page}
@router.patch("/conversations/{conv_id}")
async def rename_conversation(request: Request, conv_id: str, body: ConversationRenameIn):
user = _require_auth(request)
from ..database import get_pool
pool = await get_pool()
row = await pool.fetchrow("SELECT user_id FROM conversations WHERE id = $1", conv_id)
if not row:
raise HTTPException(status_code=404, detail="Conversation not found")
if not user.is_admin and row["user_id"] != user.id:
raise HTTPException(status_code=403, detail="Not your conversation")
title = body.title.strip()[:120] or "Chat"
await pool.execute("UPDATE conversations SET title = $1 WHERE id = $2", title, conv_id)
return {"ok": True, "title": title}
@router.delete("/conversations/{conv_id}")
async def delete_conversation(request: Request, conv_id: str):
user = _require_auth(request)
from ..database import get_pool
pool = await get_pool()
row = await pool.fetchrow("SELECT user_id FROM conversations WHERE id = $1", conv_id)
if not row:
raise HTTPException(status_code=404, detail="Conversation not found")
if not user.is_admin and row["user_id"] != user.id:
raise HTTPException(status_code=403, detail="Not your conversation")
await pool.execute("DELETE FROM conversations WHERE id = $1", conv_id)
return {"ok": True}
@router.get("/conversations/{conv_id}/export")
async def export_conversation(request: Request, conv_id: str, format: str = "markdown"):
"""Download a conversation as Markdown or JSON."""
import json as _json
from fastapi.responses import Response
from datetime import datetime, timezone
user = _require_auth(request)
from ..database import get_pool
pool = await get_pool()
row = await pool.fetchrow(
"SELECT * FROM conversations WHERE id = $1", conv_id
)
if not row:
raise HTTPException(status_code=404, detail="Conversation not found")
if not user.is_admin and row.get("user_id") != user.id:
raise HTTPException(status_code=403, detail="Not your conversation")
messages = row.get("messages") or []
if isinstance(messages, str):
try:
messages = _json.loads(messages)
except Exception:
messages = []
started_at = row.get("started_at") or ""
title = row.get("title") or "Conversation"
model = row.get("model") or ""
from ..config import settings as _settings
agent_name = _settings.agent_name
if format == "json":
content = _json.dumps({
"id": str(row["id"]),
"title": title,
"model": model,
"started_at": str(started_at),
"messages": messages,
}, indent=2, default=str)
filename = f"conversation-{conv_id[:8]}.json"
media_type = "application/json"
else:
# Markdown
lines = [f"# {title}", ""]
if model:
lines.append(f"**Model:** {model} ")
if started_at:
lines.append(f"**Date:** {str(started_at)[:19]} ")
lines += ["", "---", ""]
for msg in messages:
role = msg.get("role", "")
content_parts = msg.get("content", "")
if isinstance(content_parts, list):
text = " ".join(
p.get("text", "") for p in content_parts
if isinstance(p, dict) and p.get("type") == "text"
)
else:
text = str(content_parts)
if not text.strip():
continue
speaker = "You" if role == "user" else agent_name
lines += [f"**{speaker}**", "", text, "", "---", ""]
content = "\n".join(lines)
filename = f"conversation-{conv_id[:8]}.md"
media_type = "text/markdown"
return Response(
content=content,
media_type=media_type,
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
# ── Agent templates ────────────────────────────────────────────────────────────
@router.get("/agent-templates")
async def list_agent_templates():
from ..agent_templates import list_templates
return list_templates()
@router.get("/agent-templates/{template_id}")
async def get_agent_template(template_id: str):
from ..agent_templates import get_template
t = get_template(template_id)
if not t:
raise HTTPException(status_code=404, detail="Template not found")
return t
# ── User management (admin only) ───────────────────────────────────────────────
class UserCreateIn(BaseModel):
username: str
password: str
email: str
role: str = "user"
class UserUpdateIn(BaseModel):
role: Optional[str] = None
is_active: Optional[bool] = None
password: Optional[str] = None
email: Optional[str] = None
class ChangePasswordIn(BaseModel):
current_password: str
new_password: str
def _require_admin(request: Request):
"""Raise 403 if the current request is not from an admin."""
u = getattr(request.state, "current_user", None)
if u is None or u.role != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
return u
def _require_auth(request: Request):
"""Raise 401 if not authenticated."""
u = getattr(request.state, "current_user", None)
if u is None:
raise HTTPException(status_code=401, detail="Authentication required")
return u
def _check_agent_access(agent: dict | None, user) -> dict:
"""Raise 404 if the agent doesn't exist or the non-admin user doesn't own it.
Always returns 404 (not 403) to avoid leaking existence of other users' agents."""
if agent is None:
raise HTTPException(status_code=404, detail="Agent not found")
if not user.is_admin and agent.get("owner_user_id") != user.id:
raise HTTPException(status_code=404, detail="Agent not found")
return agent
@router.get("/users")
async def list_users_api(request: Request):
_require_admin(request)
from ..users import list_users
return await list_users()
@router.post("/users", status_code=201)
async def create_user_api(request: Request, body: UserCreateIn):
_require_admin(request)
if len(body.password) < 8:
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
if body.role not in ("admin", "user"):
raise HTTPException(status_code=400, detail="role must be 'admin' or 'user'")
email = body.email.strip().lower()
if not email or "@" not in email:
raise HTTPException(status_code=400, detail="A valid email address is required")
from ..users import create_user, get_user_by_username
if await get_user_by_username(body.username):
raise HTTPException(status_code=409, detail="Username already exists")
return await create_user(body.username, body.password, body.role, email)
@router.put("/users/{user_id}")
async def update_user_api(request: Request, user_id: str, body: UserUpdateIn):
admin = _require_admin(request)
if user_id == admin.id and body.is_active is False:
raise HTTPException(status_code=400, detail="Cannot deactivate your own account")
from ..users import update_user, get_user_by_id
if not await get_user_by_id(user_id):
raise HTTPException(status_code=404, detail="User not found")
fields = {}
if body.role is not None:
if body.role not in ("admin", "user"):
raise HTTPException(status_code=400, detail="role must be 'admin' or 'user'")
fields["role"] = body.role
if body.is_active is not None:
fields["is_active"] = body.is_active
if body.password is not None:
if len(body.password) < 8:
raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
fields["password"] = body.password
if body.email is not None:
email = body.email.strip().lower()
if email and "@" not in email:
raise HTTPException(status_code=400, detail="Invalid email address")
fields["email"] = email or None
if not fields:
raise HTTPException(status_code=400, detail="No fields to update")
await update_user(user_id, **fields)
# Stop or restart automated listeners when is_active changes
if "is_active" in fields:
from ..inbox.listener import inbox_listener
from ..telegram.listener import telegram_listener
if fields["is_active"]:
inbox_listener.start_for_user(user_id)
telegram_listener.start_for_user(user_id)
else:
inbox_listener.stop_for_user(user_id)
telegram_listener.stop_for_user(user_id)
from ..users import get_user_by_id
return await get_user_by_id(user_id)
@router.delete("/users/{user_id}")
async def delete_user_api(request: Request, user_id: str):
admin = _require_admin(request)
if user_id == admin.id:
raise HTTPException(status_code=400, detail="Cannot delete your own account")
from ..users import delete_user
if not await delete_user(user_id):
raise HTTPException(status_code=404, detail="User not found")
return {"ok": True}
@router.post("/users/me/password")
async def change_own_password(request: Request, body: ChangePasswordIn):
u = _require_auth(request) # sets u from request.state
from ..auth import verify_password
from ..users import get_user_by_username, update_user
full = await get_user_by_username(u.username)
if not full or not verify_password(body.current_password, full["password_hash"]):
raise HTTPException(status_code=400, detail="Current password is incorrect")
if len(body.new_password) < 8:
raise HTTPException(status_code=400, detail="New password must be at least 8 characters")
await update_user(u.id, password=body.new_password)
return {"ok": True}
# ── Helper: thin wrapper around asyncio.gather for use inside routes ───────────
# ── Email accounts (per-user handling + trigger accounts) ─────────────────────
@router.get("/my/email-accounts")
async def list_my_email_accounts(request: Request):
user = _require_auth(request)
from ..inbox.accounts import list_accounts, mask_account
accounts = await list_accounts(user_id=user.id)
return [mask_account(a) for a in accounts]
@router.post("/my/email-accounts", status_code=201)
async def create_my_email_account(request: Request):
user = _require_auth(request)
body = await request.json()
from ..inbox.accounts import create_account, mask_account
from ..agents.tasks import create_agent
label = body.get("label", "")
agent_model = body.get("agent_model", "").strip()
agent_prompt = body.get("agent_prompt", "").strip()
agent_max_tool_calls = body.get("agent_max_tool_calls")
if agent_max_tool_calls is not None:
try:
agent_max_tool_calls = int(agent_max_tool_calls)
except (TypeError, ValueError):
agent_max_tool_calls = None
agent_prompt_mode = body.get("agent_prompt_mode") or "combined"
# Auto-create a dedicated agent for this account
agent = await create_agent(
name=f"Email Handler: {label}",
prompt=agent_prompt,
model=agent_model,
allowed_tools=["email"],
created_by="user",
owner_user_id=user.id,
max_tool_calls=agent_max_tool_calls,
prompt_mode=agent_prompt_mode,
)
agent_id = agent["id"]
acct = await create_account(
label=label,
account_type="handling",
imap_host=body.get("imap_host", ""),
imap_port=int(body.get("imap_port") or 993),
imap_username=body.get("imap_username", ""),
imap_password=body.get("imap_password", ""),
smtp_host=body.get("smtp_host") or None,
smtp_port=int(body.get("smtp_port")) if body.get("smtp_port") else None,
smtp_username=body.get("smtp_username") or None,
smtp_password=body.get("smtp_password") or None,
agent_id=agent_id,
user_id=user.id,
initial_load_limit=int(body.get("initial_load_limit") or 200),
monitored_folders=body.get("monitored_folders") or ["INBOX"],
extra_tools=body.get("extra_tools") or [],
telegram_chat_id=body.get("telegram_chat_id") or None,
telegram_keyword=body.get("telegram_keyword") or None,
enabled=body.get("enabled", True),
)
return mask_account(acct)
@router.put("/my/email-accounts/{account_id}")
async def update_my_email_account(request: Request, account_id: str):
user = _require_auth(request)
from ..inbox.accounts import get_account, update_account, mask_account
from ..agents.tasks import update_agent
acct = await get_account(account_id)
if not acct or str(acct.get("user_id")) != user.id:
raise HTTPException(status_code=404, detail="Account not found")
body = await request.json()
# Update the linked agent's model, prompt, and cost-control fields if provided
if acct.get("agent_id"):
agent_fields = {}
if "agent_model" in body:
agent_fields["model"] = body["agent_model"]
if "agent_prompt" in body:
agent_fields["prompt"] = body["agent_prompt"]
if "label" in body:
agent_fields["name"] = f"Email Handler: {body['label']}"
if "agent_max_tool_calls" in body:
v = body["agent_max_tool_calls"]
agent_fields["max_tool_calls"] = int(v) if v is not None else None
if "agent_prompt_mode" in body:
agent_fields["prompt_mode"] = body["agent_prompt_mode"] or "combined"
if agent_fields:
await update_agent(acct["agent_id"], **agent_fields)
allowed_fields = {
"label", "imap_host", "imap_port", "imap_username", "imap_password",
"smtp_host", "smtp_port", "smtp_username", "smtp_password",
"enabled", "initial_load_limit", "monitored_folders", "extra_tools",
"telegram_chat_id", "telegram_keyword",
}
updates = {k: v for k, v in body.items() if k in allowed_fields}
await update_account(account_id, **updates)
updated = await get_account(account_id)
return mask_account(updated)
@router.delete("/my/email-accounts/{account_id}")
async def delete_my_email_account(request: Request, account_id: str):
user = _require_auth(request)
from ..inbox.accounts import get_account, delete_account
from ..inbox.listener import inbox_listener
from ..agents.tasks import delete_agent
acct = await get_account(account_id)
if not acct or str(acct.get("user_id")) != user.id:
raise HTTPException(status_code=404, detail="Account not found")
inbox_listener.stop_account(account_id)
linked_agent_id = acct.get("agent_id")
await delete_account(account_id)
if linked_agent_id:
await delete_agent(linked_agent_id)
return {"ok": True}
@router.post("/my/email-accounts/{account_id}/toggle")
async def toggle_my_email_account(request: Request, account_id: str):
user = _require_auth(request)
from ..inbox.accounts import get_account, toggle_account, mask_account
from ..inbox.listener import inbox_listener
acct = await get_account(account_id)
if not acct or str(acct.get("user_id")) != user.id:
raise HTTPException(status_code=404, detail="Account not found")
await toggle_account(account_id)
updated = await get_account(account_id)
if updated.get("enabled"):
inbox_listener.start_account(account_id, updated)
else:
inbox_listener.stop_account(account_id)
return mask_account(updated)
@router.post("/my/email-accounts/{account_id}/reconnect")
async def reconnect_my_email_account(request: Request, account_id: str):
user = _require_auth(request)
from ..inbox.accounts import get_account
from ..inbox.listener import inbox_listener
acct = await get_account(account_id)
if not acct or str(acct.get("user_id")) != user.id:
raise HTTPException(status_code=404, detail="Account not found")
inbox_listener.restart_account(account_id, acct)
return {"ok": True}
@router.post("/my/email-accounts/{account_id}/pause")
async def pause_my_email_account(request: Request, account_id: str):
user = _require_auth(request)
from ..inbox.accounts import get_account, pause_account
from ..inbox.listener import inbox_listener
acct = await get_account(account_id)
if not acct or str(acct.get("user_id")) != user.id:
raise HTTPException(status_code=404, detail="Account not found")
await pause_account(account_id)
inbox_listener.stop_account(account_id)
return {"ok": True, "paused": True}
@router.post("/my/email-accounts/{account_id}/resume")
async def resume_my_email_account(request: Request, account_id: str):
user = _require_auth(request)
from ..inbox.accounts import get_account, resume_account
from ..inbox.listener import inbox_listener
acct = await get_account(account_id)
if not acct or str(acct.get("user_id")) != user.id:
raise HTTPException(status_code=404, detail="Account not found")
await resume_account(account_id)
updated = await get_account(account_id)
if updated and updated.get("enabled"):
inbox_listener.start_account(account_id, updated)
return {"ok": True, "paused": False}
# ── Per-user Brain MCP key ────────────────────────────────────────────────────
import secrets as _secrets
async def _get_or_create_brain_key(user_id: str) -> str:
"""Return the user's brain MCP key, generating one if it doesn't exist."""
key = await _user_settings_store.get(user_id, "brain_mcp_key")
if not key:
key = _secrets.token_hex(32)
await _user_settings_store.set(user_id, "brain_mcp_key", key)
return key
@router.get("/my/brain/key")
async def get_my_brain_key(request: Request):
user = _require_auth(request)
key = await _get_or_create_brain_key(user.id)
return {"key": key}
@router.post("/my/brain/key/regenerate")
async def regenerate_my_brain_key(request: Request):
user = _require_auth(request)
key = _secrets.token_hex(32)
await _user_settings_store.set(user.id, "brain_mcp_key", key)
return {"key": key}
@router.get("/my/data-folder")
async def get_my_data_folder(request: Request):
"""Return the auto-provisioned user folder path (read-only, derived from system:users_base_folder)."""
user = _require_auth(request)
from ..users import get_user_folder
folder = await get_user_folder(user.id)
return {"data_folder": folder or ""}
# ── Per-user SSH key ───────────────────────────────────────────────────────────
@router.get("/my/ssh/pubkey")
async def get_my_ssh_pubkey(request: Request):
user = _require_auth(request)
from ..users import get_user_folder
from pathlib import Path
folder = await get_user_folder(user.id)
if not folder:
return {"exists": False, "pubkey": None, "no_folder": True}
pubkey_path = Path(folder) / ".ssh" / "id_ed25519.pub"
if not pubkey_path.exists():
return {"exists": False, "pubkey": None}
return {"exists": True, "pubkey": pubkey_path.read_text().strip()}
@router.post("/my/ssh/generate")
async def generate_my_ssh_key(request: Request):
import asyncio
from pathlib import Path
user = _require_auth(request)
body = await request.json()
force = bool(body.get("force", False))
from ..users import get_user_folder
folder = await get_user_folder(user.id)
if not folder:
raise HTTPException(status_code=400, detail="No user folder configured. Ask your administrator to set system:users_base_folder.")
ssh_dir = Path(folder) / ".ssh"
key_path = ssh_dir / "id_ed25519"
pubkey_path = ssh_dir / "id_ed25519.pub"
if key_path.exists() and not force:
raise HTTPException(status_code=409, detail="SSH key already exists.")
ssh_dir.mkdir(mode=0o700, parents=True, exist_ok=True)
if force:
key_path.unlink(missing_ok=True)
pubkey_path.unlink(missing_ok=True)
email = getattr(user, "email", "") or getattr(user, "username", "agent")
proc = await asyncio.create_subprocess_exec(
"ssh-keygen", "-t", "ed25519", "-C", email,
"-f", str(key_path), "-N", "",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise HTTPException(status_code=500, detail=f"ssh-keygen failed: {stderr.decode()}")
key_path.chmod(0o600)
pubkey_path.chmod(0o644)
ssh_dir.chmod(0o700)
return {"pubkey": pubkey_path.read_text().strip()}
# ── Per-user file browser ──────────────────────────────────────────────────────
import io as _io
import os as _os
import zipfile as _zipfile
from datetime import datetime as _dt, timezone as _tz
from fastapi.responses import FileResponse, StreamingResponse
def _resolve_user_path(base: str, rel_path: str) -> str:
"""Resolve rel_path within base; raise 403 on path traversal attempts."""
real_base = _os.path.realpath(base)
joined = _os.path.join(real_base, rel_path.lstrip("/\\")) if rel_path else real_base
real_target = _os.path.realpath(joined)
if real_target != real_base and not real_target.startswith(real_base + _os.sep):
raise HTTPException(status_code=403, detail="Access denied")
return real_target
def _fmt_size(n: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if n < 1024 or unit == "GB":
return f"{n:.0f} {unit}" if unit == "B" else f"{n/1024:.1f} {unit}"
n /= 1024
return str(n)
@router.get("/my/files")
async def list_my_files(request: Request, path: str = ""):
user = _require_auth(request)
from ..users import get_user_folder
base = await get_user_folder(user.id)
if not base:
raise HTTPException(status_code=404, detail="no_folder")
if not _os.path.isdir(base):
raise HTTPException(status_code=404, detail="folder_missing")
target = _resolve_user_path(base, path)
if not _os.path.exists(target):
raise HTTPException(status_code=404, detail="Path not found")
if not _os.path.isdir(target):
raise HTTPException(status_code=400, detail="Path is not a directory")
real_base = _os.path.realpath(base)
entries = []
try:
items = sorted(_os.scandir(target), key=lambda e: (not e.is_dir(), e.name.lower()))
for entry in items:
try:
stat = entry.stat(follow_symlinks=False)
rel = _os.path.relpath(entry.path, real_base)
entries.append({
"name": entry.name,
"path": rel,
"is_dir": entry.is_dir(),
"size": stat.st_size if not entry.is_dir() else None,
"modified": _dt.fromtimestamp(stat.st_mtime, tz=_tz.utc).isoformat(),
})
except OSError:
continue
except PermissionError:
pass
rel_current = _os.path.relpath(target, real_base)
if target == real_base:
rel_parent = None
else:
p = _os.path.relpath(_os.path.dirname(target), real_base)
rel_parent = "" if p == "." else p
return {
"path": "" if rel_current == "." else rel_current,
"parent": rel_parent,
"entries": entries,
}
@router.get("/my/files/download")
async def download_my_file(request: Request, path: str):
user = _require_auth(request)
from ..users import get_user_folder
base = await get_user_folder(user.id)
if not base:
raise HTTPException(status_code=404, detail="No files folder configured")
target = _resolve_user_path(base, path)
if not _os.path.isfile(target):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
target,
filename=_os.path.basename(target),
media_type="application/octet-stream",
)
_FB_TEXT_EXTS = {
".md", ".txt", ".json", ".xml", ".yaml", ".yml", ".csv",
".html", ".htm", ".css", ".js", ".ts", ".jsx", ".tsx",
".py", ".sh", ".bash", ".zsh", ".log", ".sql", ".toml",
".ini", ".conf", ".cfg", ".env", ".gitignore", ".dockerfile",
".rst", ".tex", ".diff", ".patch", ".nfo", ".tsv",
}
@router.get("/my/files/view")
async def view_my_file(request: Request, path: str):
user = _require_auth(request)
from ..users import get_user_folder
base = await get_user_folder(user.id)
if not base:
raise HTTPException(status_code=404, detail="No files folder configured")
target = _resolve_user_path(base, path)
if not _os.path.isfile(target):
raise HTTPException(status_code=404, detail="File not found")
ext = _os.path.splitext(target)[1].lower()
if ext not in _FB_TEXT_EXTS:
raise HTTPException(status_code=415, detail="File type not supported for viewing")
size = _os.path.getsize(target)
_MAX_VIEW = 512 * 1024 # 512 KB
try:
with open(target, "r", encoding="utf-8", errors="replace") as fh:
content = fh.read(_MAX_VIEW)
except OSError as exc:
raise HTTPException(status_code=500, detail=str(exc))
return {"content": content, "size": size, "truncated": size > _MAX_VIEW}
@router.get("/my/files/download-zip")
async def download_my_zip(request: Request, path: str = ""):
user = _require_auth(request)
from ..users import get_user_folder
base = await get_user_folder(user.id)
if not base:
raise HTTPException(status_code=404, detail="No files folder configured")
target = _resolve_user_path(base, path)
if not _os.path.isdir(target):
raise HTTPException(status_code=400, detail="Path is not a directory")
folder_name = _os.path.basename(target) or user.username
def _make_zip() -> bytes:
buf = _io.BytesIO()
with _zipfile.ZipFile(buf, "w", _zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in _os.walk(target):
dirs[:] = sorted(dirs)
for fname in sorted(files):
fpath = _os.path.join(root, fname)
arcname = _os.path.relpath(fpath, target)
try:
zf.write(fpath, arcname)
except OSError:
pass
return buf.getvalue()
import asyncio as _asyncio2
zip_bytes = await _asyncio2.get_event_loop().run_in_executor(None, _make_zip)
return StreamingResponse(
iter([zip_bytes]),
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{folder_name}.zip"'},
)
@router.get("/my/files/upload-policy")
async def get_upload_policy(request: Request):
_require_auth(request)
return await _get_upload_policy()
@router.post("/my/files/upload")
async def upload_my_files(request: Request, path: str = ""):
user = _require_auth(request)
from ..users import get_user_folder
base = await get_user_folder(user.id)
if not base:
raise HTTPException(status_code=404, detail="No files folder configured")
target_dir = _resolve_user_path(base, path)
if not _os.path.isdir(target_dir):
raise HTTPException(status_code=400, detail="Target path is not a directory")
policy = await _get_upload_policy()
allowed_exts = {e.lower() for e in policy["allowed_extensions"]}
max_bytes = policy["max_file_size_mb"] * 1024 * 1024
max_files = policy["max_files"]
form = await request.form()
fields = [f for f in form.values() if hasattr(f, "filename") and f.filename]
if len(fields) > max_files:
raise HTTPException(status_code=400, detail=f"Too many files — max {max_files} per upload")
uploaded = []
rejected = []
for field in fields:
safe_name = _os.path.basename(field.filename)
if not safe_name:
continue
has_ext = "." in safe_name
ext = safe_name.rsplit(".", 1)[-1].lower() if has_ext else ""
name_lower = safe_name.lower()
allowed = (ext in allowed_exts) if has_ext else (name_lower in _UPLOAD_ALLOWED_EXACT_NAMES)
if not allowed:
reason = f"File type .{ext} not allowed" if has_ext else f"Extensionless file '{safe_name}' not permitted"
rejected.append({"name": safe_name, "reason": reason})
continue
content = await field.read()
if len(content) > max_bytes:
rejected.append({"name": safe_name, "reason": f"File exceeds {policy['max_file_size_mb']} MB limit"})
continue
dest = _resolve_user_path(base, _os.path.join(path, safe_name))
with open(dest, "wb") as f:
f.write(content)
uploaded.append(safe_name)
return {"ok": True, "uploaded": uploaded, "rejected": rejected}
@router.delete("/my/files")
async def delete_my_file(request: Request, path: str):
"""Delete a file from the user's data folder.
Protected files (memory_*, reasoning_*) can never be deleted via this endpoint.
TODO: replace hard-coded prefixes with a .jarvisignore-style mechanism.
"""
user = _require_auth(request)
from ..users import get_user_folder
base = await get_user_folder(user.id)
if not base:
raise HTTPException(status_code=404, detail="No files folder configured")
target = _resolve_user_path(base, path)
name = _os.path.basename(target)
if name.startswith("memory_") or name.startswith("reasoning_"):
raise HTTPException(status_code=403, detail=f"'{name}' is a protected file and cannot be deleted")
if not _os.path.exists(target):
raise HTTPException(status_code=404, detail="File not found")
if _os.path.isdir(target):
raise HTTPException(status_code=400, detail="Use a dedicated endpoint to delete directories")
_os.remove(target)
return {"ok": True}
@router.get("/my/email-accounts/available-extra-tools")
async def available_extra_tools(request: Request):
"""Return which notification tools are configured and available for email handling accounts."""
user = _require_auth(request)
available = []
# Telegram: check user's own per-user token; admins also see global token
tg_token = await _user_settings_store.get(user.id, "telegram_bot_token")
if not tg_token and user.role == "admin":
tg_token = await credential_store.get("telegram:bot_token")
if tg_token:
available.append({"id": "telegram", "label": "Telegram", "description": "Send Telegram messages to whitelisted chats"})
# Pushover: global admin credential — only available to admin users
if user.role == "admin":
pv_key = await credential_store.get("pushover_app_token")
pv_user = await credential_store.get("pushover_user_key")
if pv_key and pv_user:
available.append({"id": "pushover", "label": "Pushover", "description": "Send push notifications to your phone"})
return available
@router.get("/my/telegram/whitelisted-chats")
async def my_whitelisted_telegram_chats(request: Request):
"""Return Telegram chat IDs the current user has whitelisted (for email account binding)."""
user = _require_auth(request)
from ..telegram.triggers import list_whitelist
user_chats = await list_whitelist(user.id)
global_chats = await list_whitelist("GLOBAL")
# Merge, deduplicate by chat_id, user-specific first
seen: set[str] = set()
result = []
for row in user_chats + global_chats:
if row["chat_id"] not in seen:
seen.add(row["chat_id"])
result.append({"chat_id": row["chat_id"], "label": row.get("label") or row["chat_id"]})
return result
@router.post("/my/email-accounts/list-folders-preview")
async def list_folders_preview(request: Request):
"""Load folder list using credentials supplied directly (for new accounts)."""
_require_auth(request)
body = await request.json()
host = body.get("imap_host", "").strip()
port = int(body.get("imap_port") or 993)
username = body.get("imap_username", "").strip()
password = body.get("imap_password", "")
if not host or not username or not password:
raise HTTPException(status_code=400, detail="host, username and password are required")
from ..tools.email_handling_tool import EmailHandlingTool
tool = EmailHandlingTool(account={
"imap_host": host, "imap_port": port,
"imap_username": username, "imap_password": password,
"monitored_folders": None, # show all folders for picker
})
result = await tool._list_folders()
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return result.data
@router.post("/my/email-accounts/{account_id}/list-folders")
async def list_folders_for_account(request: Request, account_id: str):
"""Open a temporary IMAP connection and return the folder tree."""
user = _require_auth(request)
from ..inbox.accounts import get_account
from ..tools.email_handling_tool import EmailHandlingTool
acct = await get_account(account_id)
if not acct or str(acct.get("user_id")) != user.id:
raise HTTPException(status_code=404, detail="Account not found")
# Show all server folders for the picker (not filtered to current selection)
tool = EmailHandlingTool(account=dict(acct, monitored_folders=None))
result = await tool._list_folders()
if not result.success:
raise HTTPException(status_code=400, detail=result.error)
return result.data
# ── Per-user CalDAV config ─────────────────────────────────────────────────────
@router.get("/my/caldav/config")
async def get_my_caldav_config(request: Request):
user = _require_auth(request)
get = lambda k: _user_settings_store.get(user.id, k)
url = await get("caldav_url")
username = await get("caldav_username")
password = await get("caldav_password")
calendar_name = await get("caldav_calendar_name")
carddav_same = await get("carddav_same_as_caldav")
carddav_url = await get("carddav_url")
carddav_user = await get("carddav_username")
carddav_pass = await get("carddav_password")
allow_write = await get("contacts_allow_write")
return {
"url": url or "",
"username": username or "",
"password": password or "",
"calendar_name": calendar_name or "",
"carddav_same_as_caldav": carddav_same == "1",
"carddav_url": carddav_url or "",
"carddav_username": carddav_user or "",
"carddav_password": carddav_pass or "",
"contacts_allow_write": allow_write == "1",
}
@router.post("/my/caldav/config")
async def set_my_caldav_config(request: Request):
user = _require_auth(request)
body = await request.json()
# CalDAV fields
for key, setting_key in [
("url", "caldav_url"),
("username", "caldav_username"),
("password", "caldav_password"),
("calendar_name", "caldav_calendar_name"),
]:
val = (body.get(key) or "").strip()
if val:
await _user_settings_store.set(user.id, setting_key, val)
elif key != "password":
await _user_settings_store.delete(user.id, setting_key)
# CardDAV fields
same = bool(body.get("carddav_same_as_caldav"))
if same:
await _user_settings_store.set(user.id, "carddav_same_as_caldav", "1")
# Clear separate CardDAV creds — not needed when using same server
for k in ("carddav_url", "carddav_username", "carddav_password"):
await _user_settings_store.delete(user.id, k)
else:
await _user_settings_store.delete(user.id, "carddav_same_as_caldav")
for key, setting_key in [
("carddav_url", "carddav_url"),
("carddav_username", "carddav_username"),
("carddav_password", "carddav_password"),
]:
val = (body.get(key) or "").strip()
if val:
await _user_settings_store.set(user.id, setting_key, val)
elif key != "carddav_password":
await _user_settings_store.delete(user.id, setting_key)
# Per-user contacts write permission
allow_write = bool(body.get("contacts_allow_write"))
if allow_write:
await _user_settings_store.set(user.id, "contacts_allow_write", "1")
else:
await _user_settings_store.delete(user.id, "contacts_allow_write")
return {"ok": True}
@router.post("/my/caldav/test")
async def test_my_caldav_config(request: Request):
user = _require_auth(request)
from ..tools.caldav_tool import _get_caldav_config
cfg = await _get_caldav_config(user_id=user.id)
if not cfg.get("url") or not cfg.get("username") or not cfg.get("password"):
return {"success": False, "message": "CalDAV credentials not configured"}
try:
import caldav
url = cfg["url"]
if not url.startswith(("http://", "https://")):
url = "https://" + url
if "/SOGo/dav/" not in url:
url = f"{url.rstrip('/')}/SOGo/dav/{cfg['username']}/"
client = caldav.DAVClient(url=url, username=cfg["username"], password=cfg["password"])
principal = client.principal()
calendars = principal.calendars()
return {"success": True, "message": f"Connected — {len(calendars)} calendar(s) found"}
except Exception as e:
return {"success": False, "message": str(e)}
@router.post("/my/caldav/test-carddav")
async def test_my_carddav_config(request: Request):
user = _require_auth(request)
from ..tools.contacts_tool import _get_carddav_config, _sogo_carddav_url
cfg = await _get_carddav_config(user_id=user.id)
if not cfg.get("url") or not cfg.get("username") or not cfg.get("password"):
return {"success": False, "message": "CardDAV credentials not configured"}
try:
import httpx
abook_url = _sogo_carddav_url(cfg["url"], cfg["username"])
body = (
'<?xml version="1.0" encoding="utf-8"?>'
'<D:propfind xmlns:D="DAV:"><D:prop><D:resourcetype/></D:prop></D:propfind>'
)
async with httpx.AsyncClient(
auth=(cfg["username"], cfg["password"]), timeout=10
) as client:
r = await client.request(
"PROPFIND", abook_url,
content=body,
headers={"Content-Type": "application/xml; charset=utf-8", "Depth": "0"},
)
if r.status_code in (200, 207):
return {"success": True, "message": f"Connected to {abook_url}"}
return {"success": False, "message": f"Server returned HTTP {r.status_code}"}
except Exception as e:
return {"success": False, "message": str(e)}
@router.delete("/my/caldav/config")
async def delete_my_caldav_config(request: Request):
user = _require_auth(request)
for key in (
"caldav_url", "caldav_username", "caldav_password", "caldav_calendar_name",
"carddav_same_as_caldav", "carddav_url", "carddav_username", "carddav_password",
"contacts_allow_write",
):
await _user_settings_store.delete(user.id, key)
return {"ok": True}
# ── Inbox all-accounts status (admin) ─────────────────────────────────────────
@router.get("/inbox/accounts")
async def list_inbox_accounts(request: Request):
_require_admin(request)
from ..inbox.accounts import list_accounts, mask_account
accounts = await list_accounts()
return [mask_account(a) for a in accounts]
import asyncio as _asyncio
async def asyncio_gather(*coros):
return await _asyncio.gather(*coros)
# ── MFA (TOTP) setup & management ─────────────────────────────────────────────
#
# Pending secrets are held in memory until confirmed; never written to DB until
# the user successfully verifies their first TOTP code.
import time as _time
_pending_mfa: dict[str, tuple[str, float]] = {} # user_id → (secret, expiry_epoch)
_MFA_SETUP_TTL = 600 # 10 minutes to confirm
@router.get("/my/mfa/status")
async def mfa_status(request: Request):
user = _require_auth(request)
from ..users import get_user_by_id
u = await get_user_by_id(user.id)
return {"enabled": bool(u and u.get("totp_secret"))}
@router.post("/my/mfa/setup/begin")
async def mfa_setup_begin(request: Request):
user = _require_auth(request)
from ..auth import generate_totp_secret, make_totp_provisioning_uri, make_totp_qr_png_b64
secret = generate_totp_secret()
_pending_mfa[user.id] = (secret, _time.time() + _MFA_SETUP_TTL)
uri = make_totp_provisioning_uri(secret, user.username)
qr = make_totp_qr_png_b64(uri)
return {"qr": qr, "secret": secret}
class MfaConfirmIn(BaseModel):
code: str
@router.post("/my/mfa/setup/confirm")
async def mfa_setup_confirm(request: Request, body: MfaConfirmIn):
user = _require_auth(request)
entry = _pending_mfa.get(user.id)
if not entry or _time.time() > entry[1]:
_pending_mfa.pop(user.id, None)
raise HTTPException(status_code=400, detail="Setup session expired. Please start again.")
secret, _ = entry
from ..auth import verify_totp
if not verify_totp(secret, body.code.strip()):
raise HTTPException(status_code=400, detail="Invalid code. Try again.")
from ..users import update_user
await update_user(user.id, totp_secret=secret)
_pending_mfa.pop(user.id, None)
return {"ok": True}
class MfaDisableIn(BaseModel):
password: str
code: str
@router.delete("/my/mfa/disable")
async def mfa_disable(request: Request, body: MfaDisableIn):
user = _require_auth(request)
from ..auth import verify_password, verify_totp
from ..users import get_user_by_username, update_user
full = await get_user_by_username(user.username)
if not full or not verify_password(body.password, full["password_hash"]):
raise HTTPException(status_code=400, detail="Password is incorrect")
if not full.get("totp_secret"):
raise HTTPException(status_code=400, detail="MFA is not enabled")
if not verify_totp(full["totp_secret"], body.code.strip()):
raise HTTPException(status_code=400, detail="Invalid TOTP code")
await update_user(user.id, totp_secret=None)
return {"ok": True}
@router.delete("/users/{user_id}/mfa")
async def admin_clear_mfa(request: Request, user_id: str):
_require_admin(request)
from ..users import get_user_by_id, update_user
if not await get_user_by_id(user_id):
raise HTTPException(status_code=404, detail="User not found")
await update_user(user_id, totp_secret=None)
return {"ok": True}
# ── Theme ──────────────────────────────────────────────────────────────────────
@router.get("/my/theme")
async def get_my_theme(request: Request):
user = _require_auth(request)
from ..web.themes import DEFAULT_THEME, theme_list
theme_id = await _user_settings_store.get(user.id, "theme") or DEFAULT_THEME
return {"active": theme_id, "themes": theme_list()}
class ThemeIn(BaseModel):
theme_id: str
@router.post("/my/theme")
async def set_my_theme(request: Request, body: ThemeIn):
user = _require_auth(request)
from ..web.themes import THEMES, DEFAULT_THEME
if body.theme_id not in THEMES:
raise HTTPException(status_code=400, detail="Unknown theme")
if body.theme_id == DEFAULT_THEME:
await _user_settings_store.delete(user.id, "theme")
else:
await _user_settings_store.set(user.id, "theme", body.theme_id)
return {"ok": True}
# ── User profile (display name) ────────────────────────────────────────────────
@router.get("/my/profile")
async def get_my_profile(request: Request):
user = _require_auth(request)
from ..users import get_user_by_id
u = await get_user_by_id(user.id)
if not u:
raise HTTPException(status_code=404, detail="User not found")
return {
"username": u["username"],
"email": u.get("email") or "",
"display_name": u.get("display_name") or "",
}
class ProfileUpdateIn(BaseModel):
display_name: str = ""
@router.post("/my/profile")
async def update_my_profile(request: Request, body: ProfileUpdateIn):
user = _require_auth(request)
from ..users import update_user
await update_user(user.id, display_name=body.display_name.strip() or None)
return {"ok": True}
# ── Webhook endpoints (inbound triggers) ──────────────────────────────────────
class WebhookEndpointIn(BaseModel):
name: str
agent_id: str = ""
description: str = ""
allow_get: bool = True
class WebhookEndpointUpdate(BaseModel):
name: Optional[str] = None
agent_id: Optional[str] = None
description: Optional[str] = None
allow_get: Optional[bool] = None
enabled: Optional[bool] = None
@router.get("/webhooks")
async def list_webhooks(request: Request):
_require_admin(request)
from ..webhooks.endpoints import list_endpoints
return await list_endpoints()
@router.post("/webhooks", status_code=201)
async def create_webhook(request: Request, body: WebhookEndpointIn):
_require_admin(request)
from ..webhooks.endpoints import create_endpoint
if not body.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
ep = await create_endpoint(
name=body.name.strip(),
agent_id=body.agent_id or "",
description=body.description,
allow_get=body.allow_get,
)
return ep # includes token — only time it's returned
@router.put("/webhooks/{endpoint_id}")
async def update_webhook(request: Request, endpoint_id: str, body: WebhookEndpointUpdate):
_require_admin(request)
from ..webhooks.endpoints import update_endpoint
fields = {k: v for k, v in body.model_dump().items() if v is not None}
ep = await update_endpoint(endpoint_id, **fields)
if ep is None:
raise HTTPException(status_code=404, detail="Webhook not found")
return ep
@router.delete("/webhooks/{endpoint_id}")
async def delete_webhook(request: Request, endpoint_id: str):
_require_admin(request)
from ..webhooks.endpoints import delete_endpoint
deleted = await delete_endpoint(endpoint_id)
if not deleted:
raise HTTPException(status_code=404, detail="Webhook not found")
return {"ok": True}
@router.post("/webhooks/{endpoint_id}/rotate")
async def rotate_webhook_token(request: Request, endpoint_id: str):
_require_admin(request)
from ..webhooks.endpoints import get_endpoint, rotate_token
ep = await get_endpoint(endpoint_id)
if ep is None:
raise HTTPException(status_code=404, detail="Webhook not found")
new_token = await rotate_token(endpoint_id)
return {"ok": True, "token": new_token}
# ── User-scoped webhook endpoints (non-admin) ─────────────────────────────────
@router.get("/my/webhooks")
async def list_my_webhooks(request: Request):
_require_auth(request)
user_id = request.state.current_user["id"]
from ..webhooks.endpoints import list_endpoints
return await list_endpoints(owner_user_id=user_id)
@router.post("/my/webhooks", status_code=201)
async def create_my_webhook(request: Request, body: WebhookEndpointIn):
_require_auth(request)
user_id = request.state.current_user["id"]
from ..webhooks.endpoints import create_endpoint
if not body.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
ep = await create_endpoint(
name=body.name.strip(),
agent_id=body.agent_id or "",
description=body.description,
allow_get=body.allow_get,
owner_user_id=user_id,
)
return ep # includes token — only time it's returned
@router.put("/my/webhooks/{endpoint_id}")
async def update_my_webhook(request: Request, endpoint_id: str, body: WebhookEndpointUpdate):
_require_auth(request)
user_id = request.state.current_user["id"]
from ..webhooks.endpoints import update_endpoint, get_endpoint
ep = await get_endpoint(endpoint_id, owner_user_id=user_id)
if ep is None:
raise HTTPException(status_code=404, detail="Webhook not found")
fields = {k: v for k, v in body.model_dump().items() if v is not None}
updated = await update_endpoint(endpoint_id, **fields)
return updated
@router.delete("/my/webhooks/{endpoint_id}")
async def delete_my_webhook(request: Request, endpoint_id: str):
_require_auth(request)
user_id = request.state.current_user["id"]
from ..webhooks.endpoints import delete_endpoint
deleted = await delete_endpoint(endpoint_id, owner_user_id=user_id)
if not deleted:
raise HTTPException(status_code=404, detail="Webhook not found")
return {"ok": True}
@router.post("/my/webhooks/{endpoint_id}/rotate")
async def rotate_my_webhook_token(request: Request, endpoint_id: str):
_require_auth(request)
user_id = request.state.current_user["id"]
from ..webhooks.endpoints import get_endpoint, rotate_token
ep = await get_endpoint(endpoint_id, owner_user_id=user_id)
if ep is None:
raise HTTPException(status_code=404, detail="Webhook not found")
new_token = await rotate_token(endpoint_id)
return {"ok": True, "token": new_token}
# ── Webhook targets (outbound) ────────────────────────────────────────────────
class WebhookTargetIn(BaseModel):
name: str
url: str
secret_header: str = ""
class WebhookTargetUpdate(BaseModel):
name: Optional[str] = None
url: Optional[str] = None
secret_header: Optional[str] = None
enabled: Optional[bool] = None
@router.get("/webhook-targets")
async def list_webhook_targets(request: Request):
_require_admin(request)
from ..database import get_pool
pool = await get_pool()
rows = await pool.fetch("SELECT * FROM webhook_targets ORDER BY name")
return [dict(r) for r in rows]
@router.post("/webhook-targets", status_code=201)
async def create_webhook_target(request: Request, body: WebhookTargetIn):
_require_admin(request)
if not body.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
if not body.url.strip():
raise HTTPException(status_code=400, detail="URL is required")
from ..database import get_pool
from datetime import datetime, timezone
pool = await get_pool()
now = datetime.now(timezone.utc).isoformat()
try:
row = await pool.fetchrow(
"""
INSERT INTO webhook_targets (name, url, secret_header, created_at)
VALUES ($1, $2, $3, $4) RETURNING *
""",
body.name.strip(), body.url.strip(), body.secret_header or None,
now,
)
except Exception as e:
if "unique" in str(e).lower():
raise HTTPException(status_code=409, detail="A target with that name already exists")
raise
return dict(row)
@router.put("/webhook-targets/{target_id}")
async def update_webhook_target(request: Request, target_id: str, body: WebhookTargetUpdate):
_require_admin(request)
fields = {k: v for k, v in body.model_dump().items() if v is not None}
if not fields:
raise HTTPException(status_code=400, detail="No fields to update")
from ..database import get_pool
pool = await get_pool()
set_clauses = ", ".join(f"{k} = ${i + 2}" for i, k in enumerate(fields))
await pool.execute(
f"UPDATE webhook_targets SET {set_clauses} WHERE id = $1::uuid",
target_id, *fields.values(),
)
row = await pool.fetchrow("SELECT * FROM webhook_targets WHERE id = $1::uuid", target_id)
if not row:
raise HTTPException(status_code=404, detail="Target not found")
return dict(row)
@router.delete("/webhook-targets/{target_id}")
async def delete_webhook_target(request: Request, target_id: str):
_require_admin(request)
from ..database import get_pool, _rowcount
pool = await get_pool()
status = await pool.execute(
"DELETE FROM webhook_targets WHERE id = $1::uuid", target_id
)
if _rowcount(status) == 0:
raise HTTPException(status_code=404, detail="Target not found")
return {"ok": True}
# ── User-scoped webhook targets (non-admin) ───────────────────────────────────
@router.get("/my/webhook-targets")
async def list_my_webhook_targets(request: Request):
_require_auth(request)
user_id = request.state.current_user["id"]
from ..database import get_pool
pool = await get_pool()
rows = await pool.fetch(
"SELECT * FROM webhook_targets WHERE owner_user_id = $1 ORDER BY name", user_id
)
return [dict(r) for r in rows]
@router.post("/my/webhook-targets", status_code=201)
async def create_my_webhook_target(request: Request, body: WebhookTargetIn):
_require_auth(request)
user_id = request.state.current_user["id"]
if not body.name.strip():
raise HTTPException(status_code=400, detail="Name is required")
if not body.url.strip():
raise HTTPException(status_code=400, detail="URL is required")
from ..database import get_pool
from datetime import datetime, timezone
pool = await get_pool()
now = datetime.now(timezone.utc).isoformat()
try:
row = await pool.fetchrow(
"""
INSERT INTO webhook_targets (name, url, secret_header, owner_user_id, created_at)
VALUES ($1, $2, $3, $4, $5) RETURNING *
""",
body.name.strip(), body.url.strip(), body.secret_header or None, user_id, now,
)
except Exception as e:
if "unique" in str(e).lower():
raise HTTPException(status_code=409, detail="A target with that name already exists")
raise
return dict(row)
@router.put("/my/webhook-targets/{target_id}")
async def update_my_webhook_target(request: Request, target_id: str, body: WebhookTargetUpdate):
_require_auth(request)
user_id = request.state.current_user["id"]
fields = {k: v for k, v in body.model_dump().items() if v is not None}
if not fields:
raise HTTPException(status_code=400, detail="No fields to update")
from ..database import get_pool
pool = await get_pool()
# Verify ownership first
existing = await pool.fetchrow(
"SELECT id FROM webhook_targets WHERE id = $1::uuid AND owner_user_id = $2",
target_id, user_id,
)
if not existing:
raise HTTPException(status_code=404, detail="Target not found")
set_clauses = ", ".join(f"{k} = ${i + 2}" for i, k in enumerate(fields))
await pool.execute(
f"UPDATE webhook_targets SET {set_clauses} WHERE id = $1::uuid",
target_id, *fields.values(),
)
row = await pool.fetchrow("SELECT * FROM webhook_targets WHERE id = $1::uuid", target_id)
return dict(row)
@router.delete("/my/webhook-targets/{target_id}")
async def delete_my_webhook_target(request: Request, target_id: str):
_require_auth(request)
user_id = request.state.current_user["id"]
from ..database import get_pool, _rowcount
pool = await get_pool()
status = await pool.execute(
"DELETE FROM webhook_targets WHERE id = $1::uuid AND owner_user_id = $2",
target_id, user_id,
)
if _rowcount(status) == 0:
raise HTTPException(status_code=404, detail="Target not found")
return {"ok": True}
# ── Page Change Monitors ───────────────────────────────────────────────────────
class WatchedPageIn(BaseModel):
name: str
url: str
schedule: str = "0 * * * *"
css_selector: Optional[str] = None
agent_id: Optional[str] = None
notification_mode: str = "agent"
class WatchedPageUpdate(BaseModel):
name: Optional[str] = None
url: Optional[str] = None
schedule: Optional[str] = None
css_selector: Optional[str] = None
agent_id: Optional[str] = None
notification_mode: Optional[str] = None
enabled: Optional[bool] = None
@router.get("/watched-pages")
async def list_watched_pages(request: Request):
user = _require_auth(request)
from ..monitors.store import list_watched_pages as _list
owner = None if user.is_admin else user.id
return await _list(owner_user_id=owner)
@router.post("/watched-pages", status_code=201)
async def create_watched_page(request: Request, body: WatchedPageIn):
user = _require_auth(request)
if not body.name.strip() or not body.url.strip():
raise HTTPException(status_code=400, detail="Name and URL are required")
from ..monitors.store import create_watched_page as _create
from ..monitors.page_monitor import page_monitor
page = await _create(
name=body.name.strip(),
url=body.url.strip(),
schedule=body.schedule,
css_selector=body.css_selector or None,
agent_id=body.agent_id or None,
notification_mode=body.notification_mode,
owner_user_id=user.id,
)
page_monitor.reschedule(page)
return page
@router.put("/watched-pages/{page_id}")
async def update_watched_page(request: Request, page_id: str, body: WatchedPageUpdate):
user = _require_auth(request)
from ..monitors.store import get_watched_page, update_watched_page as _update
page = await get_watched_page(page_id)
if not page:
raise HTTPException(status_code=404, detail="Page not found")
if not user.is_admin and str(page.get("owner_user_id")) != user.id:
raise HTTPException(status_code=404, detail="Page not found")
fields = {k: v for k, v in body.model_dump().items() if v is not None}
updated = await _update(page_id, **fields)
from ..monitors.page_monitor import page_monitor
page_monitor.reschedule(updated)
return updated
@router.delete("/watched-pages/{page_id}")
async def delete_watched_page(request: Request, page_id: str):
user = _require_auth(request)
from ..monitors.store import get_watched_page, delete_watched_page as _delete
page = await get_watched_page(page_id)
if not page:
raise HTTPException(status_code=404, detail="Page not found")
if not user.is_admin and str(page.get("owner_user_id")) != user.id:
raise HTTPException(status_code=404, detail="Page not found")
await _delete(page_id)
from ..monitors.page_monitor import page_monitor
page_monitor.remove(page_id)
return {"ok": True}
@router.post("/watched-pages/{page_id}/check-now")
async def check_page_now(request: Request, page_id: str):
user = _require_auth(request)
from ..monitors.store import get_watched_page
page = await get_watched_page(page_id)
if not page:
raise HTTPException(status_code=404, detail="Page not found")
if not user.is_admin and str(page.get("owner_user_id")) != user.id:
raise HTTPException(status_code=404, detail="Page not found")
from ..monitors.page_monitor import page_monitor
result = await page_monitor.check_now(page_id)
return result
# ── RSS Feed Monitors ──────────────────────────────────────────────────────────
class RssFeedIn(BaseModel):
name: str
url: str
schedule: str = "0 */4 * * *"
agent_id: Optional[str] = None
notification_mode: str = "agent"
max_items_per_run: int = 5
class RssFeedUpdate(BaseModel):
name: Optional[str] = None
url: Optional[str] = None
schedule: Optional[str] = None
agent_id: Optional[str] = None
notification_mode: Optional[str] = None
max_items_per_run: Optional[int] = None
enabled: Optional[bool] = None
@router.get("/rss-feeds")
async def list_rss_feeds(request: Request):
user = _require_auth(request)
from ..monitors.store import list_rss_feeds as _list
owner = None if user.is_admin else user.id
return await _list(owner_user_id=owner)
@router.post("/rss-feeds", status_code=201)
async def create_rss_feed(request: Request, body: RssFeedIn):
user = _require_auth(request)
if not body.name.strip() or not body.url.strip():
raise HTTPException(status_code=400, detail="Name and URL are required")
from ..monitors.store import create_rss_feed as _create
from ..monitors.rss_monitor import rss_monitor
feed = await _create(
name=body.name.strip(),
url=body.url.strip(),
schedule=body.schedule,
agent_id=body.agent_id or None,
notification_mode=body.notification_mode,
max_items_per_run=body.max_items_per_run,
owner_user_id=user.id,
)
rss_monitor.reschedule(feed)
return feed
@router.put("/rss-feeds/{feed_id}")
async def update_rss_feed(request: Request, feed_id: str, body: RssFeedUpdate):
user = _require_auth(request)
from ..monitors.store import get_rss_feed, update_rss_feed as _update
feed = await get_rss_feed(feed_id)
if not feed:
raise HTTPException(status_code=404, detail="Feed not found")
if not user.is_admin and str(feed.get("owner_user_id")) != user.id:
raise HTTPException(status_code=404, detail="Feed not found")
fields = {k: v for k, v in body.model_dump().items() if v is not None}
updated = await _update(feed_id, **fields)
from ..monitors.rss_monitor import rss_monitor
rss_monitor.reschedule(updated)
return updated
@router.delete("/rss-feeds/{feed_id}")
async def delete_rss_feed(request: Request, feed_id: str):
user = _require_auth(request)
from ..monitors.store import get_rss_feed, delete_rss_feed as _delete
feed = await get_rss_feed(feed_id)
if not feed:
raise HTTPException(status_code=404, detail="Feed not found")
if not user.is_admin and str(feed.get("owner_user_id")) != user.id:
raise HTTPException(status_code=404, detail="Feed not found")
await _delete(feed_id)
from ..monitors.rss_monitor import rss_monitor
rss_monitor.remove(feed_id)
return {"ok": True}
@router.post("/rss-feeds/{feed_id}/fetch-now")
async def fetch_feed_now(request: Request, feed_id: str):
user = _require_auth(request)
from ..monitors.store import get_rss_feed
feed = await get_rss_feed(feed_id)
if not feed:
raise HTTPException(status_code=404, detail="Feed not found")
if not user.is_admin and str(feed.get("owner_user_id")) != user.id:
raise HTTPException(status_code=404, detail="Feed not found")
from ..monitors.rss_monitor import rss_monitor
result = await rss_monitor.fetch_now(feed_id)
return result