ragV98's picture
rebuild trigger
e2dc38b
raw
history blame
9.69 kB
import os
import json
import redis
import numpy as np
from typing import List, Dict
from openai import OpenAI
from components.indexers.news_indexer import get_upstash_vector_store
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# πŸ” Environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# βœ… Redis client
try:
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
except Exception as e:
logging.error(f"❌ [Redis Init Error]: {e}")
# Decide if you want to raise an exception or continue without Redis
raise # It's critical for caching, so raising is appropriate here
# πŸ“° Topics
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
# 🧠 Summarization Prompt
# Removed the "numbered headline" instruction as we want LLM to just give plain headlines
BASE_PROMPT = (
"You are Nuse’s editorial summarizer. Read the excerpts below and extract the most important stories. "
"Return up to 5 punchy headlines, each under 20 words. Each headline should be followed by a short explanation of why the story matters."
"Don't include unnecessary text like 'this is important because' or 'why this matters because..' just state the logic and nothing else."
)
# πŸ“₯ Load documents and metadata
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]:
topic_docs = {key: [] for key in TOPIC_KEYS}
logging.info("Starting to load documents by topic from Upstash Vector Store...")
try:
vector_store = get_upstash_vector_store()
for topic_key in zip(TOPICS, TOPIC_KEYS):
filters = MetadataFilters(
filters=[MetadataFilter(key="topic", value=topic_key, operator=FilterOperator.EQ)]
)
# A random vector for querying, adjust similarity_top_k based on your data density
dummy_vector = np.random.rand(384).tolist()
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters)
logging.info(f"πŸ”Ž Querying for topic '{topic_key}' with filter value '{topic_key}'.")
result = vector_store.query(query)
logging.info(f"➑️ Found {len(result.nodes)} nodes for topic '{topic_key}'.")
for node in result.nodes:
content = node.get_content().strip()
# *** IMPORTANT: Retrieve the 'headline_id' from node.metadata ***
headline_id = node.metadata.get("headline_id")
# You can also get other metadata like title, url, source if needed for the summary output or debugging
title = node.metadata.get("title", "No Title")
url = node.metadata.get("url", "#")
source = node.metadata.get("source", "Unknown Source")
if content and headline_id is not None:
topic_docs[topic_key].append({
"text": content,
"headline_id": headline_id, # Store the actual ID
"title": title,
"url": url,
"source": source
})
elif content and headline_id is None:
logging.warning(f"Node found without 'headline_id' for topic '{topic_key}': URL {node.metadata.get('url', 'N/A')}")
except Exception as e:
logging.error(f"❌ [load_docs_by_topic_with_refs Error]: {e}", exc_info=True)
return topic_docs
# πŸ§ͺ Topic summarizer
def summarize_topic(topic_key: str, docs: List[Dict]) -> List[Dict]: # Removed start_index
if not docs:
logging.warning(f"⚠️ No docs for topic: {topic_key}, skipping summarization.")
return []
# Get the headline_id of the first document to use as a representative ID for the summary
# This assumes the summary is broadly about the topic, and we just need *a* representative ID.
representative_headline_id = docs[0].get("headline_id") if docs else None
representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news"
representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}"
# Concatenate document texts for summarization
# Ensure all document texts are strings before joining
content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None])
if not content:
logging.warning(f"⚠️ No valid text content found in docs for topic: {topic_key}, skipping summarization.")
return []
content = content[:12000] # Truncate to avoid excessive token usage
logging.info(f"🧠 Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)")
try:
client = OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
model="gpt-4", # Consider "gpt-4o" or "gpt-3.5-turbo" for cost/speed
messages=[
{"role": "system", "content": BASE_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512,
temperature=0.7,
)
llm_output = response.choices[0].message.content.strip()
# Parse headlines based on the expected format
headlines = []
for line in llm_output.splitlines():
line = line.strip("-–‒ ") # Clean bullet points
if ":" in line:
parts = line.split(':', 1) # Split only on the first colon
if len(parts) == 2:
headline_text = parts[0].strip()
explanation_text = parts[1].strip()
if headline_text and explanation_text: # Ensure both parts are non-empty
headlines.append({"summary": headline_text, "explanation": explanation_text})
result = []
# Assign the representative ID to each generated summary
for i, h_item in enumerate(headlines):
result.append({
"summary": h_item["summary"],
"explanation": h_item["explanation"],
"headline_id": representative_headline_id, # Use the representative ID
"image_url": "https://source.unsplash.com/800x600/?news",
"article_link": representative_article_link, # Use representative link
"representative_title": representative_title # Add the representative title for context
})
logging.info(f"βœ… Successfully generated {len(result)} summaries for topic '{topic_key}'.")
return result
except Exception as e:
logging.error(f"❌ [Summarize topic '{topic_key}' Error]: {e}", exc_info=True)
return []
# πŸš€ Generate and cache feed
def generate_and_cache_daily_feed():
try:
logging.info("πŸ†• Generating daily feed...")
topic_docs = load_docs_by_topic_with_refs() # This now returns docs with 'headline_id'
feed_map = {}
# global_ref is no longer needed in this context for generating summary IDs
for topic_key in TOPIC_KEYS:
try:
# Pass the documents directly to the summarizer
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, []))
feed_map[topic_key] = summaries
except Exception as e:
logging.error(f"❌ [Topic summarization loop error for '{topic_key}']: {e}", exc_info=True)
feed_map[topic_key] = []
final_feed = []
for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS):
topic_feed = feed_map.get(topic_key, [])
final_feed.append({
"topic": topic_display_name,
"feed": topic_feed
})
# Cache to Redis
try:
cache_key = "daily_news_feed_cache"
redis_client.set(cache_key, json.dumps(final_feed, ensure_ascii=False))
redis_client.expire(cache_key, 86400) # Set expiry to 24 hours
logging.info(f"βœ… Cached feed under key '{cache_key}' with 24-hour expiry.")
except Exception as e:
logging.error(f"❌ [Redis cache error]: {e}", exc_info=True)
return final_feed
except Exception as e:
logging.critical(f"❌ [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True)
return []
# πŸ“¦ Retrieve from cache
def get_cached_daily_feed():
try:
cache_key = "daily_news_feed_cache"
cached = redis_client.get(cache_key)
if cached:
logging.info(f"βœ… Retrieved cached daily feed from '{cache_key}'.")
return json.loads(cached)
else:
logging.info(f"ℹ️ No cached data found under key '{cache_key}'.")
return []
except Exception as e:
logging.error(f"❌ [get_cached_daily_feed error]: {e}", exc_info=True)
return []
# πŸ§ͺ Run if main
if __name__ == "__main__":
feed = generate_and_cache_daily_feed()
print("\n--- Generated Daily Feed ---")
print(json.dumps(feed, indent=2, ensure_ascii=False))
# Example of retrieving from cache
# cached_data = get_cached_daily_feed()
# print("\n--- Cached Daily Feed ---")
# print(json.dumps(cached_data, indent=2, ensure_ascii=False))