import feedparser import requests from typing import Dict, List, Any, Optional from datetime import datetime import time import hashlib import re from urllib.parse import urlparse from utils.logging import setup_logger from utils.error_handling import handle_exceptions, IntegrationError from utils.storage import load_data, save_data # Initialize logger logger = setup_logger(__name__) class RSSFeedIntegration: """RSS Feed integration for content aggregation""" def __init__(self): """Initialize RSS Feed integration""" self.feeds = {} self.last_fetch = {} self.cached_entries = {} @handle_exceptions def add_feed(self, url: str, name: Optional[str] = None, category: str = "General") -> Dict[str, Any]: """Add an RSS feed Args: url: Feed URL name: Feed name (optional, will be extracted from feed if not provided) category: Feed category (default: General) Returns: Feed information """ # Validate URL parsed_url = urlparse(url) if not parsed_url.scheme or not parsed_url.netloc: raise IntegrationError(f"Invalid feed URL: {url}") # Check if feed already exists feed_id = self._generate_feed_id(url) if feed_id in self.feeds: return self.feeds[feed_id] # Fetch feed to validate and get information try: feed_data = feedparser.parse(url) if feed_data.get("bozo", 0) == 1 and not feed_data.get("entries"): bozo_exception = feed_data.get("bozo_exception") error_msg = str(bozo_exception) if bozo_exception else "Unknown error" raise IntegrationError(f"Invalid feed: {error_msg}") # Extract feed information feed_info = { "id": feed_id, "url": url, "name": name or feed_data.feed.get("title", url), "description": feed_data.feed.get("description", ""), "category": category, "last_updated": feed_data.feed.get("updated", ""), "added_at": datetime.now().isoformat(), "entry_count": len(feed_data.entries) } # Store feed information self.feeds[feed_id] = feed_info self.last_fetch[feed_id] = time.time() # Cache entries self._cache_entries(feed_id, feed_data.entries) return feed_info except Exception as e: if not isinstance(e, IntegrationError): logger.error(f"Failed to add feed {url}: {str(e)}") raise IntegrationError(f"Failed to add feed: {str(e)}") raise @handle_exceptions def remove_feed(self, feed_id: str) -> bool: """Remove an RSS feed Args: feed_id: Feed ID Returns: True if successful, False otherwise """ if feed_id not in self.feeds: return False # Remove feed and related data del self.feeds[feed_id] if feed_id in self.last_fetch: del self.last_fetch[feed_id] if feed_id in self.cached_entries: del self.cached_entries[feed_id] return True @handle_exceptions def get_feeds(self, category: Optional[str] = None) -> List[Dict[str, Any]]: """Get all feeds or feeds in a specific category Args: category: Feed category (optional) Returns: List of feed information """ if category: return [feed for feed in self.feeds.values() if feed.get("category") == category] else: return list(self.feeds.values()) @handle_exceptions def update_feed(self, feed_id: str, name: Optional[str] = None, category: Optional[str] = None) -> Dict[str, Any]: """Update feed information Args: feed_id: Feed ID name: New feed name (optional) category: New feed category (optional) Returns: Updated feed information """ if feed_id not in self.feeds: raise IntegrationError(f"Feed not found: {feed_id}") feed_info = self.feeds[feed_id] # Update fields if provided if name is not None: feed_info["name"] = name if category is not None: feed_info["category"] = category # Update timestamp feed_info["updated_at"] = datetime.now().isoformat() # Store updated feed information self.feeds[feed_id] = feed_info return feed_info @handle_exceptions def fetch_feed_entries(self, feed_id: str, max_entries: int = 20, force_refresh: bool = False) -> List[Dict[str, Any]]: """Fetch entries from a feed Args: feed_id: Feed ID max_entries: Maximum number of entries to fetch (default: 20) force_refresh: Force refresh even if cache is recent (default: False) Returns: List of feed entries """ if feed_id not in self.feeds: raise IntegrationError(f"Feed not found: {feed_id}") feed_info = self.feeds[feed_id] current_time = time.time() # Check if we need to refresh the cache cache_age = current_time - self.last_fetch.get(feed_id, 0) if force_refresh or cache_age > 300: # Refresh if older than 5 minutes try: feed_data = feedparser.parse(feed_info["url"]) # Update feed information feed_info["last_updated"] = feed_data.feed.get("updated", "") feed_info["entry_count"] = len(feed_data.entries) self.feeds[feed_id] = feed_info # Update cache self.last_fetch[feed_id] = current_time self._cache_entries(feed_id, feed_data.entries) except Exception as e: logger.error(f"Failed to fetch feed {feed_info['url']}: {str(e)}") # If we have cached entries, use them instead of failing if feed_id not in self.cached_entries: raise IntegrationError(f"Failed to fetch feed: {str(e)}") # Return entries from cache entries = self.cached_entries.get(feed_id, []) return entries[:max_entries] @handle_exceptions def fetch_all_entries(self, max_entries_per_feed: int = 10, categories: Optional[List[str]] = None) -> Dict[str, List[Dict[str, Any]]]: """Fetch entries from all feeds or feeds in specific categories Args: max_entries_per_feed: Maximum number of entries per feed (default: 10) categories: List of categories to include (optional) Returns: Dictionary mapping feed IDs to lists of entries """ result = {} # Get feeds to fetch feeds_to_fetch = self.feeds.values() if categories: feeds_to_fetch = [feed for feed in feeds_to_fetch if feed.get("category") in categories] # Fetch entries for each feed for feed in feeds_to_fetch: try: entries = self.fetch_feed_entries(feed["id"], max_entries_per_feed) result[feed["id"]] = entries except Exception as e: logger.error(f"Failed to fetch entries for feed {feed['url']}: {str(e)}") result[feed["id"]] = [] return result @handle_exceptions def get_latest_entries(self, max_entries: int = 20, categories: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Get latest entries from all feeds or feeds in specific categories Args: max_entries: Maximum number of entries to return (default: 20) categories: List of categories to include (optional) Returns: List of latest entries """ # Fetch entries from all feeds all_entries = self.fetch_all_entries(max_entries, categories) # Flatten and sort entries entries = [] for feed_id, feed_entries in all_entries.items(): for entry in feed_entries: entry["feed_id"] = feed_id entry["feed_name"] = self.feeds[feed_id]["name"] entries.append(entry) # Sort by published date (newest first) entries.sort(key=lambda x: x.get("published_parsed", 0), reverse=True) return entries[:max_entries] @handle_exceptions def search_entries(self, query: str, max_results: int = 20, categories: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Search for entries matching a query Args: query: Search query max_results: Maximum number of results to return (default: 20) categories: List of categories to include (optional) Returns: List of matching entries """ # Fetch entries from all feeds all_entries = self.fetch_all_entries(50, categories) # Fetch more entries for better search results # Flatten entries entries = [] for feed_id, feed_entries in all_entries.items(): for entry in feed_entries: entry["feed_id"] = feed_id entry["feed_name"] = self.feeds[feed_id]["name"] entries.append(entry) # Search for matching entries query = query.lower() matching_entries = [] for entry in entries: # Check if query matches title, summary, or content title = entry.get("title", "").lower() summary = entry.get("summary", "").lower() content = "" # Extract content from different possible formats if "content" in entry: for content_item in entry["content"]: content += content_item.get("value", "").lower() # Check for match if query in title or query in summary or query in content: matching_entries.append(entry) return matching_entries[:max_results] @handle_exceptions def get_feed_categories(self) -> List[str]: """Get all feed categories Returns: List of categories """ categories = set(feed.get("category", "General") for feed in self.feeds.values()) return sorted(list(categories)) @handle_exceptions def export_opml(self) -> str: """Export feeds as OPML Returns: OPML content as string """ opml = '\n' opml += '\n' opml += ' \n' opml += f' MONA RSS Feeds Export\n' opml += f' {datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")}\n' opml += ' \n' opml += ' \n' # Group feeds by category categories = {} for feed in self.feeds.values(): category = feed.get("category", "General") if category not in categories: categories[category] = [] categories[category].append(feed) # Add feeds by category for category, feeds in categories.items(): opml += f' \n' for feed in feeds: title = feed.get("name", "").replace('"', '"') url = feed.get("url", "").replace('"', '"') description = feed.get("description", "").replace('"', '"') opml += f' \n' opml += ' \n' opml += ' \n' opml += '' return opml @handle_exceptions def import_opml(self, opml_content: str) -> Dict[str, Any]: """Import feeds from OPML Args: opml_content: OPML content as string Returns: Import results """ import xml.etree.ElementTree as ET try: # Parse OPML content root = ET.fromstring(opml_content) # Find all outline elements with type="rss" results = { "total": 0, "imported": 0, "failed": 0, "existing": 0, "feeds": [] } # Process outlines for outline in root.findall(".//outline"): # Check if this is a category or a feed outline_type = outline.get("type") if outline_type == "rss" or outline.get("xmlUrl"): # This is a feed results["total"] += 1 url = outline.get("xmlUrl") title = outline.get("title") or outline.get("text") # Find category (parent outline) category = "General" parent = outline.getparent() if hasattr(outline, "getparent") else None if parent is not None and parent.get("title"): category = parent.get("title") # Add feed try: feed_id = self._generate_feed_id(url) if feed_id in self.feeds: results["existing"] += 1 results["feeds"].append({ "url": url, "title": title, "status": "existing" }) else: self.add_feed(url, title, category) results["imported"] += 1 results["feeds"].append({ "url": url, "title": title, "status": "imported" }) except Exception as e: results["failed"] += 1 results["feeds"].append({ "url": url, "title": title, "status": "failed", "error": str(e) }) return results except Exception as e: logger.error(f"Failed to import OPML: {str(e)}") raise IntegrationError(f"Failed to import OPML: {str(e)}") def _generate_feed_id(self, url: str) -> str: """Generate a unique ID for a feed URL Args: url: Feed URL Returns: Feed ID """ return hashlib.md5(url.encode()).hexdigest() def _cache_entries(self, feed_id: str, entries: List[Dict[str, Any]]) -> None: """Cache feed entries Args: feed_id: Feed ID entries: List of feed entries """ # Process entries to standardize format processed_entries = [] for entry in entries: # Extract basic information processed_entry = { "id": entry.get("id", ""), "title": entry.get("title", ""), "link": entry.get("link", ""), "summary": entry.get("summary", ""), "published": entry.get("published", ""), "published_parsed": entry.get("published_parsed"), "updated": entry.get("updated", ""), "updated_parsed": entry.get("updated_parsed"), "authors": entry.get("authors", []), "tags": entry.get("tags", []) } # Extract content if "content" in entry: processed_entry["content"] = entry["content"] # Clean up HTML in summary if present if processed_entry["summary"] and re.search(r"<[^>]+>", processed_entry["summary"]): # Simple HTML tag removal (a more robust solution would use a proper HTML parser) processed_entry["summary_text"] = re.sub(r"<[^>]+>", "", processed_entry["summary"]) processed_entries.append(processed_entry) # Store in cache self.cached_entries[feed_id] = processed_entries