165 lines
5.3 KiB
Python
165 lines
5.3 KiB
Python
"""Main orchestrator for News Agent"""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
from .config import get_config
|
|
from .logger import setup_logger, get_logger
|
|
from .storage.database import Database
|
|
from .storage.models import DigestEntry
|
|
from .aggregator.rss_fetcher import RSSFetcher
|
|
from .ai.client import OpenRouterClient
|
|
from .ai.filter import ArticleFilter
|
|
from .ai.summarizer import ArticleSummarizer
|
|
from .email.generator import EmailGenerator
|
|
from .email.sender import EmailSender
|
|
|
|
|
|
async def main():
|
|
"""Main execution flow"""
|
|
# Setup logging
|
|
setup_logger()
|
|
logger = get_logger()
|
|
|
|
try:
|
|
# Load configuration
|
|
config = get_config()
|
|
|
|
# Initialize database
|
|
db = Database(config.database.path)
|
|
await db.initialize()
|
|
|
|
# Clean up old articles
|
|
await db.cleanup_old_articles(config.database.retention_days)
|
|
|
|
# Initialize RSS fetcher
|
|
fetcher = RSSFetcher()
|
|
|
|
# Fetch articles from all sources (silently)
|
|
articles = await fetcher.fetch_all(config.rss_sources)
|
|
|
|
if not articles:
|
|
await fetcher.close()
|
|
return
|
|
|
|
# Save articles to database (deduplication)
|
|
new_articles_count = await db.save_articles(articles)
|
|
|
|
# Log only the summary
|
|
logger.info(f"Total articles fetched from all sources: {len(articles)}")
|
|
logger.info(
|
|
f"Saved {new_articles_count} new articles (filtered {len(articles) - new_articles_count} duplicates)"
|
|
)
|
|
|
|
await fetcher.close()
|
|
|
|
# Get unprocessed articles
|
|
unprocessed = await db.get_unprocessed_articles()
|
|
|
|
if not unprocessed:
|
|
return
|
|
|
|
# Initialize AI components
|
|
ai_client = OpenRouterClient()
|
|
filter_ai = ArticleFilter(ai_client)
|
|
summarizer = ArticleSummarizer(ai_client)
|
|
|
|
# Filter articles by relevance (silently)
|
|
filtered_articles = await filter_ai.filter_articles(
|
|
unprocessed, max_articles=config.ai.filtering.max_articles
|
|
)
|
|
|
|
if not filtered_articles:
|
|
# Mark all as processed but not included
|
|
for article in unprocessed:
|
|
await db.update_article_processing(
|
|
article.id, relevance_score=0.0, ai_summary="", included=False
|
|
)
|
|
|
|
# Still save the run with costs (for filtering only)
|
|
session_cost = ai_client.get_session_cost()
|
|
await db.save_run(
|
|
articles_fetched=len(articles),
|
|
articles_processed=len(unprocessed),
|
|
articles_included=0,
|
|
total_cost=session_cost,
|
|
)
|
|
return
|
|
|
|
# Summarize filtered articles (using batch processing for speed, silently)
|
|
# Extract just the articles for batch summarization
|
|
articles_to_summarize = [article for article, score in filtered_articles]
|
|
summaries_dict = await summarizer.summarize_batch(articles_to_summarize)
|
|
|
|
# Create digest entries with summaries
|
|
digest_entries = []
|
|
for article, score in filtered_articles:
|
|
summary = summaries_dict[article.id]
|
|
|
|
# Update database
|
|
await db.update_article_processing(
|
|
article.id, relevance_score=score, ai_summary=summary, included=True
|
|
)
|
|
|
|
# Create digest entry
|
|
entry = DigestEntry(
|
|
article=article,
|
|
relevance_score=score,
|
|
ai_summary=summary,
|
|
category=article.category,
|
|
)
|
|
digest_entries.append(entry)
|
|
|
|
# Mark remaining unprocessed articles as processed but not included
|
|
processed_ids = {article.id for article, _ in filtered_articles}
|
|
for article in unprocessed:
|
|
if article.id not in processed_ids:
|
|
await db.update_article_processing(
|
|
article.id, relevance_score=0.0, ai_summary="", included=False
|
|
)
|
|
|
|
# Get cost information
|
|
session_cost = ai_client.get_session_cost()
|
|
total_cost = await db.get_total_cost()
|
|
cumulative_cost = total_cost + session_cost
|
|
|
|
# Save run statistics with costs
|
|
await db.save_run(
|
|
articles_fetched=len(articles),
|
|
articles_processed=len(unprocessed),
|
|
articles_included=len(digest_entries),
|
|
total_cost=session_cost,
|
|
filtering_cost=0.0, # Could split this if tracking separately
|
|
summarization_cost=0.0,
|
|
)
|
|
|
|
# Generate email (silently) with cost info
|
|
generator = EmailGenerator()
|
|
|
|
date_str = datetime.now().strftime("%A, %B %d, %Y")
|
|
subject = config.email.subject_template.format(date=date_str)
|
|
|
|
html_content, text_content = generator.generate_digest_email(
|
|
digest_entries, date_str, subject, session_cost, cumulative_cost
|
|
)
|
|
|
|
# Send email (silently)
|
|
sender = EmailSender()
|
|
success = sender.send(subject, html_content, text_content)
|
|
|
|
if not success:
|
|
logger.error("Failed to send email")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fatal error in main execution: {e}", exc_info=True)
|
|
raise
|
|
|
|
|
|
def run():
|
|
"""Entry point for command-line execution"""
|
|
asyncio.run(main())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|