first commit
This commit is contained in:
194
src/storage/database.py
Normal file
194
src/storage/database.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""SQLite database operations for article storage and deduplication"""
|
||||
|
||||
import aiosqlite
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from .models import Article
|
||||
from ..logger import get_logger
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
class Database:
|
||||
"""Async SQLite database manager"""
|
||||
|
||||
def __init__(self, db_path: str | Path):
|
||||
self.db_path = Path(db_path)
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
async def initialize(self):
|
||||
"""Create database tables if they don't exist"""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS articles (
|
||||
id TEXT PRIMARY KEY,
|
||||
url TEXT NOT NULL UNIQUE,
|
||||
title TEXT NOT NULL,
|
||||
summary TEXT,
|
||||
content TEXT NOT NULL,
|
||||
published TEXT NOT NULL,
|
||||
source TEXT NOT NULL,
|
||||
category TEXT NOT NULL,
|
||||
fetched_at TEXT NOT NULL,
|
||||
relevance_score REAL,
|
||||
ai_summary TEXT,
|
||||
processed INTEGER DEFAULT 0,
|
||||
included_in_digest INTEGER DEFAULT 0
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
await db.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_published
|
||||
ON articles(published)
|
||||
"""
|
||||
)
|
||||
|
||||
await db.execute(
|
||||
"""
|
||||
CREATE INDEX IF NOT EXISTS idx_fetched_at
|
||||
ON articles(fetched_at)
|
||||
"""
|
||||
)
|
||||
|
||||
await db.commit()
|
||||
|
||||
logger.info(f"Database initialized at {self.db_path}")
|
||||
|
||||
async def article_exists(self, article_id: str) -> bool:
|
||||
"""Check if article already exists in database"""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute("SELECT 1 FROM articles WHERE id = ?", (article_id,)) as cursor:
|
||||
result = await cursor.fetchone()
|
||||
return result is not None
|
||||
|
||||
async def save_article(self, article: Article) -> bool:
|
||||
"""Save article to database. Returns True if saved, False if duplicate"""
|
||||
if await self.article_exists(article.id):
|
||||
logger.debug(f"Article already exists: {article.title}")
|
||||
return False
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
"""
|
||||
INSERT INTO articles (
|
||||
id, url, title, summary, content, published, source,
|
||||
category, fetched_at, relevance_score, ai_summary,
|
||||
processed, included_in_digest
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
article.id,
|
||||
str(article.url),
|
||||
article.title,
|
||||
article.summary,
|
||||
article.content,
|
||||
article.published.isoformat(),
|
||||
article.source,
|
||||
article.category,
|
||||
article.fetched_at.isoformat(),
|
||||
article.relevance_score,
|
||||
article.ai_summary,
|
||||
int(article.processed),
|
||||
int(article.included_in_digest),
|
||||
),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
logger.debug(f"Saved article: {article.title}")
|
||||
return True
|
||||
|
||||
async def save_articles(self, articles: list[Article]) -> int:
|
||||
"""Save multiple articles. Returns count of new articles saved"""
|
||||
count = 0
|
||||
for article in articles:
|
||||
if await self.save_article(article):
|
||||
count += 1
|
||||
return count
|
||||
|
||||
async def get_unprocessed_articles(self, limit: Optional[int] = None) -> list[Article]:
|
||||
"""Get articles that haven't been processed by AI yet"""
|
||||
query = """
|
||||
SELECT * FROM articles
|
||||
WHERE processed = 0
|
||||
ORDER BY published DESC
|
||||
"""
|
||||
if limit:
|
||||
query += f" LIMIT {limit}"
|
||||
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(query) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [self._row_to_article(row) for row in rows]
|
||||
|
||||
async def update_article_processing(
|
||||
self, article_id: str, relevance_score: float, ai_summary: str, included: bool
|
||||
):
|
||||
"""Update article with AI processing results"""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
"""
|
||||
UPDATE articles
|
||||
SET relevance_score = ?,
|
||||
ai_summary = ?,
|
||||
processed = 1,
|
||||
included_in_digest = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(relevance_score, ai_summary, int(included), article_id),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def get_todays_digest_articles(self) -> list[Article]:
|
||||
"""Get all articles included in today's digest"""
|
||||
today = datetime.now().date()
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
db.row_factory = aiosqlite.Row
|
||||
async with db.execute(
|
||||
"""
|
||||
SELECT * FROM articles
|
||||
WHERE included_in_digest = 1
|
||||
AND date(fetched_at) = ?
|
||||
ORDER BY relevance_score DESC, published DESC
|
||||
""",
|
||||
(today.isoformat(),),
|
||||
) as cursor:
|
||||
rows = await cursor.fetchall()
|
||||
return [self._row_to_article(row) for row in rows]
|
||||
|
||||
async def cleanup_old_articles(self, retention_days: int):
|
||||
"""Delete articles older than retention period"""
|
||||
cutoff_date = datetime.now() - timedelta(days=retention_days)
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
"DELETE FROM articles WHERE fetched_at < ?", (cutoff_date.isoformat(),)
|
||||
)
|
||||
deleted = cursor.rowcount
|
||||
await db.commit()
|
||||
|
||||
if deleted > 0:
|
||||
logger.info(f"Cleaned up {deleted} old articles")
|
||||
|
||||
def _row_to_article(self, row: aiosqlite.Row) -> Article:
|
||||
"""Convert database row to Article model"""
|
||||
return Article(
|
||||
id=row["id"],
|
||||
url=row["url"],
|
||||
title=row["title"],
|
||||
summary=row["summary"],
|
||||
content=row["content"],
|
||||
published=datetime.fromisoformat(row["published"]),
|
||||
source=row["source"],
|
||||
category=row["category"],
|
||||
fetched_at=datetime.fromisoformat(row["fetched_at"]),
|
||||
relevance_score=row["relevance_score"],
|
||||
ai_summary=row["ai_summary"],
|
||||
processed=bool(row["processed"]),
|
||||
included_in_digest=bool(row["included_in_digest"]),
|
||||
)
|
||||
Reference in New Issue
Block a user