Spaces:
Runtime error
Runtime error
""" | |
Browser automation module for web scraping and analysis. | |
This module enables the AI assistant to control a web browser, | |
scrape content, and extract information from websites. | |
""" | |
import json | |
import logging | |
import re | |
import urllib.parse | |
from datetime import datetime | |
import requests | |
from bs4 import BeautifulSoup | |
from models import WebResource, Task, db | |
logger = logging.getLogger(__name__) | |
class BrowserAutomation: | |
"""Class for handling browser automation and web scraping""" | |
def __init__(self, user_agent=None, headers=None): | |
self.user_agent = user_agent or 'QuantumAI Assistant/1.0' | |
self.headers = headers or { | |
'User-Agent': self.user_agent, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
} | |
self.session = requests.Session() | |
self.session.headers.update(self.headers) | |
def fetch_page(self, url, task_id=None): | |
""" | |
Fetch a webpage and parse its content | |
Args: | |
url (str): The URL to fetch | |
task_id (int, optional): Associated task ID | |
Returns: | |
dict: Result containing status, parsed content, and metadata | |
""" | |
try: | |
# Parse and normalize URL | |
parsed_url = urllib.parse.urlparse(url) | |
if not parsed_url.scheme: | |
url = 'https://' + url | |
logger.info(f"Fetching URL: {url}") | |
response = self.session.get(url, timeout=15) | |
response.raise_for_status() | |
# Parse with BeautifulSoup | |
soup = BeautifulSoup(response.text, 'html.parser') | |
title = soup.title.string if soup.title else "No title found" | |
# Store or update the web resource | |
web_resource = self._store_web_resource(url, title, task_id) | |
# Extract main content, remove scripts, styles, etc. | |
for script in soup(["script", "style", "meta", "noscript"]): | |
script.extract() | |
# Get text content and normalize whitespace | |
text_content = soup.get_text(separator=' ') | |
text_content = re.sub(r'\s+', ' ', text_content).strip() | |
return { | |
'status': 'success', | |
'url': url, | |
'title': title, | |
'content': text_content, | |
'html': response.text, | |
'web_resource_id': web_resource.id, | |
'timestamp': datetime.utcnow().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Error fetching URL {url}: {str(e)}") | |
return { | |
'status': 'error', | |
'url': url, | |
'error': str(e), | |
'timestamp': datetime.utcnow().isoformat() | |
} | |
def _store_web_resource(self, url, title, task_id=None): | |
"""Store or update web resource in the database""" | |
try: | |
web_resource = WebResource.query.filter_by(url=url).first() | |
if not web_resource: | |
web_resource = WebResource( | |
url=url, | |
title=title, | |
category='general', | |
last_accessed=datetime.utcnow(), | |
) | |
if task_id: | |
web_resource.task_id = task_id | |
db.session.add(web_resource) | |
else: | |
web_resource.last_accessed = datetime.utcnow() | |
web_resource.title = title | |
db.session.commit() | |
return web_resource | |
except Exception as e: | |
logger.error(f"Error storing web resource: {str(e)}") | |
db.session.rollback() | |
# Return a placeholder object if db operation fails | |
return WebResource(url=url, title=title) | |
def extract_links(self, html): | |
"""Extract all links from an HTML document""" | |
soup = BeautifulSoup(html, 'html.parser') | |
links = [] | |
for a_tag in soup.find_all('a', href=True): | |
href = a_tag['href'] | |
text = a_tag.get_text(strip=True) | |
if href.startswith('#') or href.startswith('javascript:'): | |
continue | |
links.append({ | |
'href': href, | |
'text': text[:100] if text else "" | |
}) | |
return links | |
def extract_structured_data(self, html): | |
"""Extract structured data (JSON-LD, microdata) from an HTML document""" | |
soup = BeautifulSoup(html, 'html.parser') | |
structured_data = [] | |
# Extract JSON-LD | |
for script in soup.find_all('script', type='application/ld+json'): | |
try: | |
data = json.loads(script.string) | |
structured_data.append({ | |
'type': 'json-ld', | |
'data': data | |
}) | |
except json.JSONDecodeError: | |
pass | |
# TODO: Add microdata and RDFa extraction if needed | |
return structured_data | |
def analyze_page_content(self, content, url=None): | |
"""Analyze page content to extract key information using NLP""" | |
# This will be enhanced with our quantum NLP process | |
# For now, just return a simple analysis | |
word_count = len(content.split()) | |
sentences = re.split(r'[.!?]+', content) | |
sentence_count = len(sentences) | |
return { | |
'word_count': word_count, | |
'sentence_count': sentence_count, | |
'average_sentence_length': word_count / max(1, sentence_count), | |
'url': url | |
} | |
# Helper functions for browser automation tasks | |
def create_scraping_task(url, title, description=None, scheduled_for=None): | |
"""Create a new web scraping task""" | |
task = Task( | |
title=title, | |
description=description or f"Scrape content from {url}", | |
status='pending', | |
task_type='web_scrape', | |
scheduled_for=scheduled_for, | |
config={'url': url} | |
) | |
db.session.add(task) | |
db.session.commit() | |
return task | |
def execute_scraping_task(task_id): | |
"""Execute a web scraping task""" | |
task = Task.query.get(task_id) | |
if not task or task.task_type != 'web_scrape': | |
return {'status': 'error', 'message': 'Invalid task'} | |
try: | |
task.status = 'in_progress' | |
db.session.commit() | |
url = task.config.get('url') | |
browser = BrowserAutomation() | |
result = browser.fetch_page(url, task_id=task.id) | |
if result['status'] == 'success': | |
# Also analyze the content | |
analysis = browser.analyze_page_content(result['content'], url) | |
result['analysis'] = analysis | |
task.status = 'completed' | |
task.completed_at = datetime.utcnow() | |
task.result = result | |
else: | |
task.status = 'failed' | |
task.result = result | |
db.session.commit() | |
return result | |
except Exception as e: | |
logger.error(f"Error executing task {task_id}: {str(e)}") | |
task.status = 'failed' | |
task.result = {'error': str(e)} | |
db.session.commit() | |
return {'status': 'error', 'message': str(e)} |