File size: 9,685 Bytes
69210b9 faed34c 6858714 2af85a2 e51955e 0820a6b ec3b991 0e7d7a3 69210b9 2af85a2 69210b9 0e7d7a3 0820a6b 69210b9 8cb2491 69210b9 6858714 fbd9dbe 0820a6b c8b3b66 2af85a2 e2dc38b 0820a6b c8b3b66 71257bd fbd9dbe 8cb2491 faed34c 0820a6b ec3b991 e2dc38b 8cb2491 0820a6b 8cb2491 0820a6b 8cb2491 0820a6b 8cb2491 0820a6b ec3b991 0820a6b b1c1acd 3f4bef7 fbd9dbe 0820a6b 3f4bef7 0820a6b 3f4bef7 0820a6b ec3b991 2af85a2 8cb2491 0820a6b 2af85a2 8cb2491 2af85a2 0820a6b 8cb2491 0820a6b 8cb2491 ec3b991 0820a6b ec3b991 7200af5 fbd9dbe 4df303e fbd9dbe 0820a6b fbd9dbe 0820a6b fbd9dbe 0820a6b fbd9dbe 0820a6b fbd9dbe 0820a6b fbd9dbe 0820a6b fbd9dbe ec3b991 fbd9dbe 0820a6b ec3b991 0820a6b ec3b991 fbd9dbe 8cb2491 fbd9dbe 0820a6b fbd9dbe 69210b9 fbd9dbe 69210b9 ec3b991 fbd9dbe 0820a6b ec3b991 e2dc38b ec3b991 e51955e fbd9dbe e51955e 8cb2491 0820a6b 8cb2491 0820a6b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
import os
import json
import redis
import numpy as np
from typing import List, Dict
from openai import OpenAI
from components.indexers.news_indexer import get_upstash_vector_store
from llama_index.core.vector_stores.types import VectorStoreQuery, MetadataFilter, MetadataFilters, FilterOperator
import logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# π Environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
# β
Redis client
try:
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
except Exception as e:
logging.error(f"β [Redis Init Error]: {e}")
# Decide if you want to raise an exception or continue without Redis
raise # It's critical for caching, so raising is appropriate here
# π° Topics
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
TOPIC_KEYS = [t.lower().replace(" news", "") for t in TOPICS]
# π§ Summarization Prompt
# Removed the "numbered headline" instruction as we want LLM to just give plain headlines
BASE_PROMPT = (
"You are Nuseβs editorial summarizer. Read the excerpts below and extract the most important stories. "
"Return up to 5 punchy headlines, each under 20 words. Each headline should be followed by a short explanation of why the story matters."
"Don't include unnecessary text like 'this is important because' or 'why this matters because..' just state the logic and nothing else."
)
# π₯ Load documents and metadata
def load_docs_by_topic_with_refs() -> Dict[str, List[Dict]]:
topic_docs = {key: [] for key in TOPIC_KEYS}
logging.info("Starting to load documents by topic from Upstash Vector Store...")
try:
vector_store = get_upstash_vector_store()
for topic_key in zip(TOPICS, TOPIC_KEYS):
filters = MetadataFilters(
filters=[MetadataFilter(key="topic", value=topic_key, operator=FilterOperator.EQ)]
)
# A random vector for querying, adjust similarity_top_k based on your data density
dummy_vector = np.random.rand(384).tolist()
query = VectorStoreQuery(query_embedding=dummy_vector, similarity_top_k=50, filters=filters)
logging.info(f"π Querying for topic '{topic_key}' with filter value '{topic_key}'.")
result = vector_store.query(query)
logging.info(f"β‘οΈ Found {len(result.nodes)} nodes for topic '{topic_key}'.")
for node in result.nodes:
content = node.get_content().strip()
# *** IMPORTANT: Retrieve the 'headline_id' from node.metadata ***
headline_id = node.metadata.get("headline_id")
# You can also get other metadata like title, url, source if needed for the summary output or debugging
title = node.metadata.get("title", "No Title")
url = node.metadata.get("url", "#")
source = node.metadata.get("source", "Unknown Source")
if content and headline_id is not None:
topic_docs[topic_key].append({
"text": content,
"headline_id": headline_id, # Store the actual ID
"title": title,
"url": url,
"source": source
})
elif content and headline_id is None:
logging.warning(f"Node found without 'headline_id' for topic '{topic_key}': URL {node.metadata.get('url', 'N/A')}")
except Exception as e:
logging.error(f"β [load_docs_by_topic_with_refs Error]: {e}", exc_info=True)
return topic_docs
# π§ͺ Topic summarizer
def summarize_topic(topic_key: str, docs: List[Dict]) -> List[Dict]: # Removed start_index
if not docs:
logging.warning(f"β οΈ No docs for topic: {topic_key}, skipping summarization.")
return []
# Get the headline_id of the first document to use as a representative ID for the summary
# This assumes the summary is broadly about the topic, and we just need *a* representative ID.
representative_headline_id = docs[0].get("headline_id") if docs else None
representative_article_link = docs[0].get("url") if docs else f"https://google.com/search?q={topic_key}+news"
representative_title = docs[0].get("title") if docs else f"Summary for {topic_key}"
# Concatenate document texts for summarization
# Ensure all document texts are strings before joining
content = "\n\n---\n\n".join([str(d["text"]) for d in docs if "text" in d and d["text"] is not None])
if not content:
logging.warning(f"β οΈ No valid text content found in docs for topic: {topic_key}, skipping summarization.")
return []
content = content[:12000] # Truncate to avoid excessive token usage
logging.info(f"π§ Summarizing topic via OpenAI: '{topic_key}' ({len(docs)} documents)")
try:
client = OpenAI(api_key=OPENAI_API_KEY)
response = client.chat.completions.create(
model="gpt-4", # Consider "gpt-4o" or "gpt-3.5-turbo" for cost/speed
messages=[
{"role": "system", "content": BASE_PROMPT},
{"role": "user", "content": content},
],
max_tokens=512,
temperature=0.7,
)
llm_output = response.choices[0].message.content.strip()
# Parse headlines based on the expected format
headlines = []
for line in llm_output.splitlines():
line = line.strip("-ββ’ ") # Clean bullet points
if ":" in line:
parts = line.split(':', 1) # Split only on the first colon
if len(parts) == 2:
headline_text = parts[0].strip()
explanation_text = parts[1].strip()
if headline_text and explanation_text: # Ensure both parts are non-empty
headlines.append({"summary": headline_text, "explanation": explanation_text})
result = []
# Assign the representative ID to each generated summary
for i, h_item in enumerate(headlines):
result.append({
"summary": h_item["summary"],
"explanation": h_item["explanation"],
"headline_id": representative_headline_id, # Use the representative ID
"image_url": "https://source.unsplash.com/800x600/?news",
"article_link": representative_article_link, # Use representative link
"representative_title": representative_title # Add the representative title for context
})
logging.info(f"β
Successfully generated {len(result)} summaries for topic '{topic_key}'.")
return result
except Exception as e:
logging.error(f"β [Summarize topic '{topic_key}' Error]: {e}", exc_info=True)
return []
# π Generate and cache feed
def generate_and_cache_daily_feed():
try:
logging.info("π Generating daily feed...")
topic_docs = load_docs_by_topic_with_refs() # This now returns docs with 'headline_id'
feed_map = {}
# global_ref is no longer needed in this context for generating summary IDs
for topic_key in TOPIC_KEYS:
try:
# Pass the documents directly to the summarizer
summaries = summarize_topic(topic_key, topic_docs.get(topic_key, []))
feed_map[topic_key] = summaries
except Exception as e:
logging.error(f"β [Topic summarization loop error for '{topic_key}']: {e}", exc_info=True)
feed_map[topic_key] = []
final_feed = []
for topic_display_name, topic_key in zip(TOPICS, TOPIC_KEYS):
topic_feed = feed_map.get(topic_key, [])
final_feed.append({
"topic": topic_display_name,
"feed": topic_feed
})
# Cache to Redis
try:
cache_key = "daily_news_feed_cache"
redis_client.set(cache_key, json.dumps(final_feed, ensure_ascii=False))
redis_client.expire(cache_key, 86400) # Set expiry to 24 hours
logging.info(f"β
Cached feed under key '{cache_key}' with 24-hour expiry.")
except Exception as e:
logging.error(f"β [Redis cache error]: {e}", exc_info=True)
return final_feed
except Exception as e:
logging.critical(f"β [generate_and_cache_daily_feed Overall Error]: {e}", exc_info=True)
return []
# π¦ Retrieve from cache
def get_cached_daily_feed():
try:
cache_key = "daily_news_feed_cache"
cached = redis_client.get(cache_key)
if cached:
logging.info(f"β
Retrieved cached daily feed from '{cache_key}'.")
return json.loads(cached)
else:
logging.info(f"βΉοΈ No cached data found under key '{cache_key}'.")
return []
except Exception as e:
logging.error(f"β [get_cached_daily_feed error]: {e}", exc_info=True)
return []
# π§ͺ Run if main
if __name__ == "__main__":
feed = generate_and_cache_daily_feed()
print("\n--- Generated Daily Feed ---")
print(json.dumps(feed, indent=2, ensure_ascii=False))
# Example of retrieving from cache
# cached_data = get_cached_daily_feed()
# print("\n--- Cached Daily Feed ---")
# print(json.dumps(cached_data, indent=2, ensure_ascii=False)) |