195 lines
6.9 KiB
Python
195 lines
6.9 KiB
Python
"""SQLite database operations for article storage and deduplication"""
|
|
|
|
import aiosqlite
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from .models import Article
|
|
from ..logger import get_logger
|
|
|
|
logger = get_logger()
|
|
|
|
|
|
class Database:
|
|
"""Async SQLite database manager"""
|
|
|
|
def __init__(self, db_path: str | Path):
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
async def initialize(self):
|
|
"""Create database tables if they don't exist"""
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS articles (
|
|
id TEXT PRIMARY KEY,
|
|
url TEXT NOT NULL UNIQUE,
|
|
title TEXT NOT NULL,
|
|
summary TEXT,
|
|
content TEXT NOT NULL,
|
|
published TEXT NOT NULL,
|
|
source TEXT NOT NULL,
|
|
category TEXT NOT NULL,
|
|
fetched_at TEXT NOT NULL,
|
|
relevance_score REAL,
|
|
ai_summary TEXT,
|
|
processed INTEGER DEFAULT 0,
|
|
included_in_digest INTEGER DEFAULT 0
|
|
)
|
|
"""
|
|
)
|
|
|
|
await db.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_published
|
|
ON articles(published)
|
|
"""
|
|
)
|
|
|
|
await db.execute(
|
|
"""
|
|
CREATE INDEX IF NOT EXISTS idx_fetched_at
|
|
ON articles(fetched_at)
|
|
"""
|
|
)
|
|
|
|
await db.commit()
|
|
|
|
logger.debug(f"Database initialized at {self.db_path}")
|
|
|
|
async def article_exists(self, article_id: str) -> bool:
|
|
"""Check if article already exists in database"""
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
async with db.execute("SELECT 1 FROM articles WHERE id = ?", (article_id,)) as cursor:
|
|
result = await cursor.fetchone()
|
|
return result is not None
|
|
|
|
async def save_article(self, article: Article) -> bool:
|
|
"""Save article to database. Returns True if saved, False if duplicate"""
|
|
if await self.article_exists(article.id):
|
|
logger.debug(f"Article already exists: {article.title}")
|
|
return False
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
INSERT INTO articles (
|
|
id, url, title, summary, content, published, source,
|
|
category, fetched_at, relevance_score, ai_summary,
|
|
processed, included_in_digest
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
article.id,
|
|
str(article.url),
|
|
article.title,
|
|
article.summary,
|
|
article.content,
|
|
article.published.isoformat(),
|
|
article.source,
|
|
article.category,
|
|
article.fetched_at.isoformat(),
|
|
article.relevance_score,
|
|
article.ai_summary,
|
|
int(article.processed),
|
|
int(article.included_in_digest),
|
|
),
|
|
)
|
|
await db.commit()
|
|
|
|
logger.debug(f"Saved article: {article.title}")
|
|
return True
|
|
|
|
async def save_articles(self, articles: list[Article]) -> int:
|
|
"""Save multiple articles. Returns count of new articles saved"""
|
|
count = 0
|
|
for article in articles:
|
|
if await self.save_article(article):
|
|
count += 1
|
|
return count
|
|
|
|
async def get_unprocessed_articles(self, limit: Optional[int] = None) -> list[Article]:
|
|
"""Get articles that haven't been processed by AI yet"""
|
|
query = """
|
|
SELECT * FROM articles
|
|
WHERE processed = 0
|
|
ORDER BY published DESC
|
|
"""
|
|
if limit:
|
|
query += f" LIMIT {limit}"
|
|
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(query) as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [self._row_to_article(row) for row in rows]
|
|
|
|
async def update_article_processing(
|
|
self, article_id: str, relevance_score: float, ai_summary: str, included: bool
|
|
):
|
|
"""Update article with AI processing results"""
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
await db.execute(
|
|
"""
|
|
UPDATE articles
|
|
SET relevance_score = ?,
|
|
ai_summary = ?,
|
|
processed = 1,
|
|
included_in_digest = ?
|
|
WHERE id = ?
|
|
""",
|
|
(relevance_score, ai_summary, int(included), article_id),
|
|
)
|
|
await db.commit()
|
|
|
|
async def get_todays_digest_articles(self) -> list[Article]:
|
|
"""Get all articles included in today's digest"""
|
|
today = datetime.now().date()
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
async with db.execute(
|
|
"""
|
|
SELECT * FROM articles
|
|
WHERE included_in_digest = 1
|
|
AND date(fetched_at) = ?
|
|
ORDER BY relevance_score DESC, published DESC
|
|
""",
|
|
(today.isoformat(),),
|
|
) as cursor:
|
|
rows = await cursor.fetchall()
|
|
return [self._row_to_article(row) for row in rows]
|
|
|
|
async def cleanup_old_articles(self, retention_days: int):
|
|
"""Delete articles older than retention period"""
|
|
cutoff_date = datetime.now() - timedelta(days=retention_days)
|
|
async with aiosqlite.connect(self.db_path) as db:
|
|
cursor = await db.execute(
|
|
"DELETE FROM articles WHERE fetched_at < ?", (cutoff_date.isoformat(),)
|
|
)
|
|
deleted = cursor.rowcount
|
|
await db.commit()
|
|
|
|
if deleted > 0:
|
|
logger.debug(f"Cleaned up {deleted} old articles")
|
|
|
|
def _row_to_article(self, row: aiosqlite.Row) -> Article:
|
|
"""Convert database row to Article model"""
|
|
return Article(
|
|
id=row["id"],
|
|
url=row["url"],
|
|
title=row["title"],
|
|
summary=row["summary"],
|
|
content=row["content"],
|
|
published=datetime.fromisoformat(row["published"]),
|
|
source=row["source"],
|
|
category=row["category"],
|
|
fetched_at=datetime.fromisoformat(row["fetched_at"]),
|
|
relevance_score=row["relevance_score"],
|
|
ai_summary=row["ai_summary"],
|
|
processed=bool(row["processed"]),
|
|
included_in_digest=bool(row["included_in_digest"]),
|
|
)
|