""" Browser automation module for web scraping and analysis. This module enables the AI assistant to control a web browser, scrape content, and extract information from websites. """ import json import logging import re import urllib.parse from datetime import datetime import requests from bs4 import BeautifulSoup from models import WebResource, Task, db logger = logging.getLogger(__name__) class BrowserAutomation: """Class for handling browser automation and web scraping""" def __init__(self, user_agent=None, headers=None): self.user_agent = user_agent or 'QuantumAI Assistant/1.0' self.headers = headers or { 'User-Agent': self.user_agent, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', } self.session = requests.Session() self.session.headers.update(self.headers) def fetch_page(self, url, task_id=None): """ Fetch a webpage and parse its content Args: url (str): The URL to fetch task_id (int, optional): Associated task ID Returns: dict: Result containing status, parsed content, and metadata """ try: # Parse and normalize URL parsed_url = urllib.parse.urlparse(url) if not parsed_url.scheme: url = 'https://' + url logger.info(f"Fetching URL: {url}") response = self.session.get(url, timeout=15) response.raise_for_status() # Parse with BeautifulSoup soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.string if soup.title else "No title found" # Store or update the web resource web_resource = self._store_web_resource(url, title, task_id) # Extract main content, remove scripts, styles, etc. for script in soup(["script", "style", "meta", "noscript"]): script.extract() # Get text content and normalize whitespace text_content = soup.get_text(separator=' ') text_content = re.sub(r'\s+', ' ', text_content).strip() return { 'status': 'success', 'url': url, 'title': title, 'content': text_content, 'html': response.text, 'web_resource_id': web_resource.id, 'timestamp': datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error fetching URL {url}: {str(e)}") return { 'status': 'error', 'url': url, 'error': str(e), 'timestamp': datetime.utcnow().isoformat() } def _store_web_resource(self, url, title, task_id=None): """Store or update web resource in the database""" try: web_resource = WebResource.query.filter_by(url=url).first() if not web_resource: web_resource = WebResource( url=url, title=title, category='general', last_accessed=datetime.utcnow(), ) if task_id: web_resource.task_id = task_id db.session.add(web_resource) else: web_resource.last_accessed = datetime.utcnow() web_resource.title = title db.session.commit() return web_resource except Exception as e: logger.error(f"Error storing web resource: {str(e)}") db.session.rollback() # Return a placeholder object if db operation fails return WebResource(url=url, title=title) def extract_links(self, html): """Extract all links from an HTML document""" soup = BeautifulSoup(html, 'html.parser') links = [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'] text = a_tag.get_text(strip=True) if href.startswith('#') or href.startswith('javascript:'): continue links.append({ 'href': href, 'text': text[:100] if text else "" }) return links def extract_structured_data(self, html): """Extract structured data (JSON-LD, microdata) from an HTML document""" soup = BeautifulSoup(html, 'html.parser') structured_data = [] # Extract JSON-LD for script in soup.find_all('script', type='application/ld+json'): try: data = json.loads(script.string) structured_data.append({ 'type': 'json-ld', 'data': data }) except json.JSONDecodeError: pass # TODO: Add microdata and RDFa extraction if needed return structured_data def analyze_page_content(self, content, url=None): """Analyze page content to extract key information using NLP""" # This will be enhanced with our quantum NLP process # For now, just return a simple analysis word_count = len(content.split()) sentences = re.split(r'[.!?]+', content) sentence_count = len(sentences) return { 'word_count': word_count, 'sentence_count': sentence_count, 'average_sentence_length': word_count / max(1, sentence_count), 'url': url } # Helper functions for browser automation tasks def create_scraping_task(url, title, description=None, scheduled_for=None): """Create a new web scraping task""" task = Task( title=title, description=description or f"Scrape content from {url}", status='pending', task_type='web_scrape', scheduled_for=scheduled_for, config={'url': url} ) db.session.add(task) db.session.commit() return task def execute_scraping_task(task_id): """Execute a web scraping task""" task = Task.query.get(task_id) if not task or task.task_type != 'web_scrape': return {'status': 'error', 'message': 'Invalid task'} try: task.status = 'in_progress' db.session.commit() url = task.config.get('url') browser = BrowserAutomation() result = browser.fetch_page(url, task_id=task.id) if result['status'] == 'success': # Also analyze the content analysis = browser.analyze_page_content(result['content'], url) result['analysis'] = analysis task.status = 'completed' task.completed_at = datetime.utcnow() task.result = result else: task.status = 'failed' task.result = result db.session.commit() return result except Exception as e: logger.error(f"Error executing task {task_id}: {str(e)}") task.status = 'failed' task.result = {'error': str(e)} db.session.commit() return {'status': 'error', 'message': str(e)}