import os import json import redis import numpy as np from typing import List, Dict from openai import OpenAI from components.indexers.news_indexer import get_upstash_vector_store from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # ๐Ÿ” Environment variables REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") # โœ… Redis client try: redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) except Exception as e: logging.error(f"โŒ [Redis Init Error]: {e}") # Decide if you want to raise an exception or continue without Redis raise # It's critical for caching, so raising is appropriate here # ๐Ÿ“ฐ Topics TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"] TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS] # ๐Ÿง  Summarization Prompt # Removed the "numbered headline" instruction as we want LLM to just give plain headlines BASE_PROMPT = ( "You are Nuseโ€™s editorial summarizer. Read the excerpts below and extract the most important stories. " "Return up to 5 punchy headlines, each under 20 words. Each headline should be followed by a short explanation of why the story matters." "Don't include unnecessary text like 'this is important because' or 'why this matters because..' just state the logic and nothing else." ) # ๐Ÿ“ฅ Load documents and metadata def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]: topic_docs = {key: [] for key in TOPIC_KEYS} logging.info("Starting to load documents by topic from Upstash Vector Store...") try: vector_store = get_upstash_vector_store() for topic_key in zip(TOPICS, TOPIC_KEYS): filters = MetadataFilters( filters=[MetadataFilter(key="topic", value=topic_key, operator=FilterOperator.EQ)] ) # A random vector for querying, adjust similarity_top_k based on your data density dummy_vector = np.random.rand(384).tolist() query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters) logging.info(f"๐Ÿ”Ž Querying for topic '{topic_key}' with filter value '{topic_key}'.") result = vector_store.query(query) logging.info(f"โžก๏ธ Found {len(result.nodes)} nodes for topic '{topic_key}'.") for node in result.nodes: content = node.get_content().strip() # *** IMPORTANT: Retrieve the 'headline_id' from node.metadata *** headline_id = node.metadata.get("headline_id") # You can also get other metadata like title, url, source if needed for the summary output or debugging title = node.metadata.get("title", "No Title") url = node.metadata.get("url", "#") source = node.metadata.get("source", "Unknown Source") if content and headline_id is not None: topic_docs[topic_key].append({ "text": content, "headline_id": headline_id, # Store the actual ID "title": title, "url": url, "source": source }) elif content and headline_id is None: logging.warning(f"Node found without 'headline_id' for topic '{topic_key}': URL {node.metadata.get('url', 'N/A')}") except Exception as e: logging.error(f"โŒ [load_docs_by_topic_with_refs Error]: {e}", exc_info=True) return topic_docs # ๐Ÿงช Topic summarizer def summarize_topic(topic_key: str, docs: List[Dict]) -> List[Dict]: # Removed start_index if not docs: logging.warning(f"โš ๏ธ No docs for topic: {topic_key}, skipping summarization.") return [] # Get the headline_id of the first document to use as a representative ID for the summary # This assumes the summary is broadly about the topic, and we just need *a* representative ID. representative_headline_id = docs[0].get("headline_id") if docs else None representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news" representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}" # Concatenate document texts for summarization # Ensure all document texts are strings before joining content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None]) if not content: logging.warning(f"โš ๏ธ No valid text content found in docs for topic: {topic_key}, skipping summarization.") return [] content = content[:12000] # Truncate to avoid excessive token usage logging.info(f"๐Ÿง  Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)") try: client = OpenAI(api_key=OPENAI_API_KEY) response = client.chat.completions.create( model="gpt-4", # Consider "gpt-4o" or "gpt-3.5-turbo" for cost/speed messages=[ {"role": "system", "content": BASE_PROMPT}, {"role": "user", "content": content}, ], max_tokens=512, temperature=0.7, ) llm_output = response.choices[0].message.content.strip() # Parse headlines based on the expected format headlines = [] for line in llm_output.splitlines(): line = line.strip("-โ€“โ€ข ") # Clean bullet points if ":" in line: parts = line.split(':', 1) # Split only on the first colon if len(parts) == 2: headline_text = parts[0].strip() explanation_text = parts[1].strip() if headline_text and explanation_text: # Ensure both parts are non-empty headlines.append({"summary": headline_text, "explanation": explanation_text}) result = [] # Assign the representative ID to each generated summary for i, h_item in enumerate(headlines): result.append({ "summary": h_item["summary"], "explanation": h_item["explanation"], "headline_id": representative_headline_id, # Use the representative ID "image_url": "https://source.unsplash.com/800x600/?news", "article_link": representative_article_link, # Use representative link "representative_title": representative_title # Add the representative title for context }) logging.info(f"โœ… Successfully generated {len(result)} summaries for topic '{topic_key}'.") return result except Exception as e: logging.error(f"โŒ [Summarize topic '{topic_key}' Error]: {e}", exc_info=True) return [] # ๐Ÿš€ Generate and cache feed def generate_and_cache_daily_feed(): try: logging.info("๐Ÿ†• Generating daily feed...") topic_docs = load_docs_by_topic_with_refs() # This now returns docs with 'headline_id' feed_map = {} # global_ref is no longer needed in this context for generating summary IDs for topic_key in TOPIC_KEYS: try: # Pass the documents directly to the summarizer summaries = summarize_topic(topic_key, topic_docs.get(topic_key, [])) feed_map[topic_key] = summaries except Exception as e: logging.error(f"โŒ [Topic summarization loop error for '{topic_key}']: {e}", exc_info=True) feed_map[topic_key] = [] final_feed = [] for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS): topic_feed = feed_map.get(topic_key, []) final_feed.append({ "topic": topic_display_name, "feed": topic_feed }) # Cache to Redis try: cache_key = "daily_news_feed_cache" redis_client.set(cache_key, json.dumps(final_feed, ensure_ascii=False)) redis_client.expire(cache_key, 86400) # Set expiry to 24 hours logging.info(f"โœ… Cached feed under key '{cache_key}' with 24-hour expiry.") except Exception as e: logging.error(f"โŒ [Redis cache error]: {e}", exc_info=True) return final_feed except Exception as e: logging.critical(f"โŒ [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True) return [] # ๐Ÿ“ฆ Retrieve from cache def get_cached_daily_feed(): try: cache_key = "daily_news_feed_cache" cached = redis_client.get(cache_key) if cached: logging.info(f"โœ… Retrieved cached daily feed from '{cache_key}'.") return json.loads(cached) else: logging.info(f"โ„น๏ธ No cached data found under key '{cache_key}'.") return [] except Exception as e: logging.error(f"โŒ [get_cached_daily_feed error]: {e}", exc_info=True) return [] # ๐Ÿงช Run if main if __name__ == "__main__": feed = generate_and_cache_daily_feed() print("\n--- Generated Daily Feed ---") print(json.dumps(feed, indent=2, ensure_ascii=False)) # Example of retrieving from cache # cached_data = get_cached_daily_feed() # print("\n--- Cached Daily Feed ---") # print(json.dumps(cached_data, indent=2, ensure_ascii=False))