|
import os |
|
import json |
|
import redis |
|
import numpy as np |
|
from typing import List, Dict |
|
from openai import OpenAI |
|
from components.indexers.news_indexer import get_upstash_vector_store |
|
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator |
|
import logging |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") |
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
|
|
|
|
|
try: |
|
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) |
|
except Exception as e: |
|
logging.error(f"β [Redis Init Error]: {e}") |
|
|
|
raise |
|
|
|
|
|
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"] |
|
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS] |
|
|
|
|
|
|
|
BASE_PROMPT = ( |
|
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. " |
|
"Return up to 5 punchy headlines, each under 20 words. Each headline should be followed by a short explanation of why the story matters." |
|
"Don't include unnecessary text like 'this is important because' or 'why this matters because..' just state the logic and nothing else." |
|
) |
|
|
|
|
|
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]: |
|
topic_docs = {key: [] for key in TOPIC_KEYS} |
|
logging.info("Starting to load documents by topic from Upstash Vector Store...") |
|
try: |
|
vector_store = get_upstash_vector_store() |
|
for topic_key in zip(TOPICS, TOPIC_KEYS): |
|
filters = MetadataFilters( |
|
filters=[MetadataFilter(key="topic", value=topic_key, operator=FilterOperator.EQ)] |
|
) |
|
|
|
dummy_vector = np.random.rand(384).tolist() |
|
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters) |
|
|
|
logging.info(f"π Querying for topic '{topic_key}' with filter value '{topic_key}'.") |
|
result = vector_store.query(query) |
|
logging.info(f"β‘οΈ Found {len(result.nodes)} nodes for topic '{topic_key}'.") |
|
|
|
for node in result.nodes: |
|
content = node.get_content().strip() |
|
|
|
headline_id = node.metadata.get("headline_id") |
|
|
|
|
|
title = node.metadata.get("title", "No Title") |
|
url = node.metadata.get("url", "#") |
|
source = node.metadata.get("source", "Unknown Source") |
|
|
|
if content and headline_id is not None: |
|
topic_docs[topic_key].append({ |
|
"text": content, |
|
"headline_id": headline_id, |
|
"title": title, |
|
"url": url, |
|
"source": source |
|
}) |
|
elif content and headline_id is None: |
|
logging.warning(f"Node found without 'headline_id' for topic '{topic_key}': URL {node.metadata.get('url', 'N/A')}") |
|
|
|
except Exception as e: |
|
logging.error(f"β [load_docs_by_topic_with_refs Error]: {e}", exc_info=True) |
|
return topic_docs |
|
|
|
|
|
def summarize_topic(topic_key: str, docs: List[Dict]) -> List[Dict]: |
|
if not docs: |
|
logging.warning(f"β οΈ No docs for topic: {topic_key}, skipping summarization.") |
|
return [] |
|
|
|
|
|
|
|
representative_headline_id = docs[0].get("headline_id") if docs else None |
|
representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news" |
|
representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}" |
|
|
|
|
|
|
|
|
|
content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None]) |
|
|
|
if not content: |
|
logging.warning(f"β οΈ No valid text content found in docs for topic: {topic_key}, skipping summarization.") |
|
return [] |
|
|
|
content = content[:12000] |
|
|
|
logging.info(f"π§ Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)") |
|
try: |
|
client = OpenAI(api_key=OPENAI_API_KEY) |
|
response = client.chat.completions.create( |
|
model="gpt-4", |
|
messages=[ |
|
{"role": "system", "content": BASE_PROMPT}, |
|
{"role": "user", "content": content}, |
|
], |
|
max_tokens=512, |
|
temperature=0.7, |
|
) |
|
llm_output = response.choices[0].message.content.strip() |
|
|
|
|
|
headlines = [] |
|
for line in llm_output.splitlines(): |
|
line = line.strip("-ββ’ ") |
|
if ":" in line: |
|
parts = line.split(':', 1) |
|
if len(parts) == 2: |
|
headline_text = parts[0].strip() |
|
explanation_text = parts[1].strip() |
|
if headline_text and explanation_text: |
|
headlines.append({"summary": headline_text, "explanation": explanation_text}) |
|
|
|
result = [] |
|
|
|
for i, h_item in enumerate(headlines): |
|
result.append({ |
|
"summary": h_item["summary"], |
|
"explanation": h_item["explanation"], |
|
"headline_id": representative_headline_id, |
|
"image_url": "https://source.unsplash.com/800x600/?news", |
|
"article_link": representative_article_link, |
|
"representative_title": representative_title |
|
}) |
|
|
|
logging.info(f"β
Successfully generated {len(result)} summaries for topic '{topic_key}'.") |
|
return result |
|
except Exception as e: |
|
logging.error(f"β [Summarize topic '{topic_key}' Error]: {e}", exc_info=True) |
|
return [] |
|
|
|
|
|
def generate_and_cache_daily_feed(): |
|
try: |
|
logging.info("π Generating daily feed...") |
|
topic_docs = load_docs_by_topic_with_refs() |
|
feed_map = {} |
|
|
|
|
|
for topic_key in TOPIC_KEYS: |
|
try: |
|
|
|
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, [])) |
|
feed_map[topic_key] = summaries |
|
except Exception as e: |
|
logging.error(f"β [Topic summarization loop error for '{topic_key}']: {e}", exc_info=True) |
|
feed_map[topic_key] = [] |
|
|
|
final_feed = [] |
|
for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS): |
|
topic_feed = feed_map.get(topic_key, []) |
|
final_feed.append({ |
|
"topic": topic_display_name, |
|
"feed": topic_feed |
|
}) |
|
|
|
|
|
try: |
|
cache_key = "daily_news_feed_cache" |
|
redis_client.set(cache_key, json.dumps(final_feed, ensure_ascii=False)) |
|
redis_client.expire(cache_key, 86400) |
|
logging.info(f"β
Cached feed under key '{cache_key}' with 24-hour expiry.") |
|
except Exception as e: |
|
logging.error(f"β [Redis cache error]: {e}", exc_info=True) |
|
|
|
return final_feed |
|
|
|
except Exception as e: |
|
logging.critical(f"β [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True) |
|
return [] |
|
|
|
|
|
def get_cached_daily_feed(): |
|
try: |
|
cache_key = "daily_news_feed_cache" |
|
cached = redis_client.get(cache_key) |
|
if cached: |
|
logging.info(f"β
Retrieved cached daily feed from '{cache_key}'.") |
|
return json.loads(cached) |
|
else: |
|
logging.info(f"βΉοΈ No cached data found under key '{cache_key}'.") |
|
return [] |
|
except Exception as e: |
|
logging.error(f"β [get_cached_daily_feed error]: {e}", exc_info=True) |
|
return [] |
|
|
|
|
|
if __name__ == "__main__": |
|
feed = generate_and_cache_daily_feed() |
|
print("\n--- Generated Daily Feed ---") |
|
print(json.dumps(feed, indent=2, ensure_ascii=False)) |
|
|
|
|
|
|
|
|
|
|