File size: 3,353 Bytes
69210b9 a092d54 67fbb52 e465159 69210b9 e465159 f312f0d 1804706 e465159 69210b9 e465159 69210b9 e465159 69210b9 e465159 69210b9 e465159 69210b9 e465159 69210b9 e465159 69210b9 e465159 67fbb52 e465159 69210b9 3464963 69210b9 e465159 69210b9 e465159 69210b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
import os
import sys
import json
import requests
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
import redis
from typing import List, Dict
from llama_index.core import VectorStoreIndex
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.schema import Document
from llama_index.core.settings import Settings
# β
Disable OpenAI LLM fallback
Settings.llm = None
# π Load environment variables
REDIS_URL = os.environ.get("UPSTASH_REDIS_URL", "redis://localhost:6379")
REDIS_KEY = os.environ.get("UPSTASH_REDIS_TOKEN")
MISTRAL_URL = os.environ.get("MISTRAL_URL") # Mistral inference endpoint
HF_TOKEN = os.environ.get("HF_TOKEN") # Hugging Face access token
# β
Connect to Redis
redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True)
# π Topics to query and summarize
TOPICS = ["India news", "World news", "Tech news", "Finance news", "Sports news"]
# π§ Prompt builder for summarization
def build_prompt(content: str, topic: str) -> str:
return (
f"You are a news summarizer. Summarize the following content in 25-30 words. "
f"Make it engaging and informative. Include appropriate emojis. Topic: {topic}\n\n{content}"
)
# π Call Mistral API
def call_mistral(prompt: str) -> str:
headers = {
"Authorization": f"Bearer {HF_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"inputs": [
{"role": "user", "content": prompt}
]
}
try:
response = requests.post(MISTRAL_URL, headers=headers, json=payload, timeout=20)
response.raise_for_status()
return response.json()["outputs"][0]["content"].strip()
except Exception as e:
print(f"β οΈ Mistral error: {e}")
return None
# βοΈ Summarize a list of documents into a short news feed
def summarize_topic(docs: List[str], topic: str) -> List[Dict]:
feed = []
for doc in docs[:5]:
prompt = build_prompt(doc, topic)
summary = call_mistral(prompt)
if summary:
feed.append({
"summary": summary,
"image_url": "https://source.unsplash.com/800x600/?news",
"article_link": "https://google.com/search?q=" + topic.replace(" ", "+")
})
return feed
# π Main pipeline
def generate_and_cache_daily_feed(documents: List[Document]):
index = VectorStoreIndex.from_documents(documents)
retriever = index.as_retriever()
query_engine = RetrieverQueryEngine(retriever=retriever)
final_feed = []
for topic in TOPICS:
print(f"\nπ Generating for: {topic}")
response = query_engine.query(topic)
docs = [str(node.get_content()) for node in response.source_nodes]
print("Procured docs", docs)
topic_feed = summarize_topic(docs, topic)
final_feed.append({
"topic": topic.lower().replace(" news", ""),
"feed": topic_feed
})
# πΎ Cache to Redis
redis_client.set(REDIS_KEY, json.dumps(final_feed, ensure_ascii=False))
print(f"β
Cached daily feed under key '{REDIS_KEY}'")
return final_feed
# π§ͺ Redis fetch (for use in APIs)
def get_cached_daily_feed():
cached = redis_client.get(REDIS_KEY)
return json.loads(cached) if cached else []
|