File size: 10,472 Bytes
cf10c85
 
 
 
 
 
38db139
 
cf10c85
 
 
 
38db139
cf10c85
7e1ea76
38db139
cf10c85
 
38db139
 
 
7e1ea76
cf10c85
38db139
 
 
 
 
 
3bdc160
38db139
 
cf10c85
38db139
7e1ea76
cf10c85
 
 
38db139
 
7e1ea76
38db139
7e1ea76
38db139
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf10c85
 
7e1ea76
cf10c85
 
38db139
cf10c85
38db139
cf10c85
 
 
 
38db139
cf10c85
 
 
 
 
 
 
38db139
cf10c85
 
 
 
 
 
7e1ea76
cf10c85
 
 
7e1ea76
 
cf10c85
38db139
cf10c85
38db139
 
 
 
 
cf10c85
38db139
 
7e1ea76
cf10c85
 
 
 
 
 
 
38db139
cf10c85
 
38db139
7e1ea76
 
38db139
cf10c85
 
 
38db139
cf10c85
 
 
38db139
cf10c85
 
38db139
 
 
cf10c85
 
38db139
 
 
 
 
 
 
 
 
 
 
 
cf10c85
 
7e1ea76
38db139
7e1ea76
38db139
7e1ea76
38db139
7e1ea76
 
38db139
7e1ea76
38db139
7e1ea76
38db139
 
 
 
7e1ea76
 
 
 
 
38db139
 
7e1ea76
38db139
 
 
 
 
7e1ea76
38db139
cf10c85
38db139
 
7e1ea76
38db139
 
 
 
7e1ea76
38db139
7e1ea76
38db139
cf10c85
 
38db139
 
 
 
 
 
 
 
cf10c85
38db139
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cf10c85
38db139
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import os
import feedparser
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
import logging
from huggingface_hub import HfApi, login, snapshot_download
from huggingface_hub.utils import HfHubHTTPError
import json
import dateutil.parser
import hashlib
import re
from datetime import datetime

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# --- Constants ---
# Local and repository configuration
LOCAL_DB_DIR = "chroma_db_news"  # Using a more descriptive local directory name
FEEDS_FILE = "rss_feeds.json"
COLLECTION_NAME = "news_articles"
REPO_ID = "broadfield-dev/news-rag-db" # Your Hugging Face Hub repo ID

# RSS feed fetching configuration
MAX_ARTICLES_PER_FEED = 1000

# Hugging Face credentials
HF_API_TOKEN = os.getenv("HF_TOKEN")
if not HF_API_TOKEN:
    raise ValueError("Hugging Face API token not found. Please set the HF_TOKEN environment variable.")

# --- Global Initializations ---
# Initialize Hugging Face API
login(token=HF_API_TOKEN)
hf_api = HfApi()

# Initialize embedding model once to be reused
logger.info("Loading embedding model...")
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
logger.info("Embedding model loaded.")


def setup_local_db():
    """
    Ensures the local database directory exists.
    If it doesn't, it attempts to download from Hugging Face Hub.
    If the Hub repo is empty or doesn't exist, it creates a new local directory.
    This function handles the "build a new dataset if one does not exist" requirement.
    """
    if os.path.exists(LOCAL_DB_DIR):
        logger.info(f"Local database found at '{LOCAL_DB_DIR}'.")
        return

    logger.info(f"Local database not found. Attempting to download from Hugging Face Hub repo: {REPO_ID}")
    try:
        # snapshot_download is the correct function for downloading a whole repository/folder
        snapshot_download(
            repo_id=REPO_ID,
            repo_type="dataset",
            local_dir=LOCAL_DB_DIR,
            token=HF_API_TOKEN,
        )
        logger.info(f"Database downloaded successfully from {REPO_ID} to {LOCAL_DB_DIR}.")
    except HfHubHTTPError as e:
        # This error (e.g., 404 Not Found) is expected if the repo is new or empty.
        logger.warning(
            f"Failed to download from Hub (Repo might be new or empty): {e}. "
            f"A new local database will be created at '{LOCAL_DB_DIR}'."
        )
        os.makedirs(LOCAL_DB_DIR, exist_ok=True)
    except Exception as e:
        logger.error(f"An unexpected error occurred during DB download: {e}")
        logger.info(f"Creating a new local database at '{LOCAL_DB_DIR}'.")
        os.makedirs(LOCAL_DB_DIR, exist_ok=True)

def clean_text(text):
    """Clean text by removing HTML tags and extra whitespace."""
    if not text or not isinstance(text, str):
        return ""
    # Remove HTML tags
    text = re.sub(r'<.*?>', '', text)
    # Normalize whitespace
    text = ' '.join(text.split())
    return text.strip().lower()

def fetch_rss_feeds():
    """Fetches and parses articles from a list of RSS feeds in a JSON file."""
    articles = []
    seen_keys = set()
    
    try:
        with open(FEEDS_FILE, 'r') as f:
            feed_categories = json.load(f)
    except FileNotFoundError:
        logger.error(f"'{FEEDS_FILE}' not found. Please create it. No feeds to process.")
        return []

    for category, feeds in feed_categories.items():
        for feed_info in feeds:
            feed_url = feed_info.get("url")
            if not feed_url:
                logger.warning(f"Skipping feed with no URL in category '{category}'")
                continue

            try:
                logger.info(f"Fetching {feed_url}")
                feed = feedparser.parse(feed_url)
                if feed.bozo:
                    logger.warning(f"Feed parsing error for {feed_url}: {feed.bozo_exception}")
                    continue
                
                for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
                    title = clean_text(entry.get("title", "No Title"))
                    link = entry.get("link", "") # Don't clean link URL
                    description = clean_text(entry.get("summary", entry.get("description", "")))
                    
                    if not description: # Skip articles without content
                        continue

                    published = "Unknown Date"
                    for date_field in ["published", "updated", "created", "pubDate"]:
                        if date_field in entry:
                            try:
                                parsed_date = dateutil.parser.parse(entry[date_field])
                                published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
                                break
                            except (ValueError, TypeError):
                                continue

                    # Create a unique key to deduplicate articles before processing
                    description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
                    key = f"{title}|{link}|{published}|{description_hash}"
                    
                    if key not in seen_keys:
                        seen_keys.add(key)
                        articles.append({
                            "title": entry.get("title", "No Title"),
                            "link": link,
                            "description": description,
                            "published": published,
                            "category": category,
                        })
            except Exception as e:
                logger.error(f"Error fetching or parsing feed {feed_url}: {e}")
                
    logger.info(f"Total unique articles fetched: {len(articles)}")
    return articles

def process_and_store_articles(articles, vector_db):
    """Processes articles and stores them in the Chroma DB, avoiding duplicates."""
    new_docs = []
    new_doc_ids = []
    
    # Get all existing document IDs from the database once to check for duplicates
    try:
        existing_ids = set(vector_db.get(include=[])["ids"])
        logger.info(f"Found {len(existing_ids)} existing documents in the database.")
    except Exception as e:
        logger.error(f"Could not retrieve existing IDs from DB. Assuming empty. Error: {e}")
        existing_ids = set()

    for article in articles:
        try:
            # Recreate the same unique ID format for checking against the DB
            title = clean_text(article["title"])
            link = article["link"]
            published = article["published"]
            description = article["description"]
            description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
            doc_id = f"{title}|{link}|{published}|{description_hash}"

            if doc_id in existing_ids:
                logger.debug(f"Skipping duplicate article (ID already in DB): {title[:50]}...")
                continue
            
            # Add to our in-memory set to avoid duplicates from the same batch
            existing_ids.add(doc_id)

            metadata = {
                "title": article["title"],
                "link": article["link"],
                "published": article["published"],
                "category": article["category"],
                # Store original description if needed, or keep it clean
                # "original_description": article["description"], 
            }
            # The Document object itself doesn't take an ID
            doc = Document(page_content=description, metadata=metadata)
            new_docs.append(doc)
            new_doc_ids.append(doc_id)

        except Exception as e:
            logger.error(f"Error processing article '{article.get('title', 'N/A')}': {e}")
    
    if new_docs:
        logger.info(f"Adding {len(new_docs)} new documents to the database...")
        try:
            # Provide the list of documents and a parallel list of their unique IDs
            vector_db.add_documents(documents=new_docs, ids=new_doc_ids)
            vector_db.persist() # Save changes to disk
            logger.info("Successfully added new documents and persisted the database.")
        except Exception as e:
            logger.error(f"Failed to add documents to Chroma DB: {e}")
    else:
        logger.info("No new articles to add to the database.")

def upload_to_hf_hub():
    """Uploads the local Chroma DB directory to the Hugging Face Hub."""
    if not os.path.exists(LOCAL_DB_DIR):
        logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.")
        return
    
    try:
        # Ensure the repo exists before uploading.
        hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True)

        logger.info(f"Uploading database from '{LOCAL_DB_DIR}' to Hugging Face repo: {REPO_ID}...")
        # upload_folder is the recommended way to upload a directory's contents.
        hf_api.upload_folder(
            folder_path=LOCAL_DB_DIR,
            repo_id=REPO_ID,
            repo_type="dataset",
            commit_message=f"Update database - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        )
        logger.info(f"Database successfully uploaded to {REPO_ID}.")
    except Exception as e:
        logger.error(f"Error uploading to Hugging Face Hub: {e}")


def main():
    """Main execution function to run the data pipeline."""
    # 1. Ensure local DB exists by downloading from Hub or creating a new one
    setup_local_db()

    # 2. Initialize the vector DB object *after* the directory is guaranteed to exist
    logger.info("Initializing Chroma vector database...")
    vector_db = Chroma(
        persist_directory=LOCAL_DB_DIR,
        embedding_function=embedding_model,
        collection_name=COLLECTION_NAME
    )
    logger.info("Chroma DB initialized.")

    # 3. Fetch new articles from RSS feeds
    articles = fetch_rss_feeds()

    # 4. Process new articles and add them to the DB
    if articles:
        process_and_store_articles(articles, vector_db)
    
    # 5. Upload the potentially updated database back to the Hub
    upload_to_hf_hub()
    
    logger.info("Script finished.")


if __name__ == "__main__":
    main()