File size: 7,566 Bytes
8beb2b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""
Browser automation module for web scraping and analysis.
This module enables the AI assistant to control a web browser,
scrape content, and extract information from websites.
"""

import json
import logging
import re
import urllib.parse
from datetime import datetime

import requests
from bs4 import BeautifulSoup
from models import WebResource, Task, db

logger = logging.getLogger(__name__)


class BrowserAutomation:
    """Class for handling browser automation and web scraping"""
    
    def __init__(self, user_agent=None, headers=None):
        self.user_agent = user_agent or 'QuantumAI Assistant/1.0'
        self.headers = headers or {
            'User-Agent': self.user_agent,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
        }
        self.session = requests.Session()
        self.session.headers.update(self.headers)
        
    def fetch_page(self, url, task_id=None):
        """
        Fetch a webpage and parse its content
        
        Args:
            url (str): The URL to fetch
            task_id (int, optional): Associated task ID
            
        Returns:
            dict: Result containing status, parsed content, and metadata
        """
        try:
            # Parse and normalize URL
            parsed_url = urllib.parse.urlparse(url)
            if not parsed_url.scheme:
                url = 'https://' + url
                
            logger.info(f"Fetching URL: {url}")
            response = self.session.get(url, timeout=15)
            response.raise_for_status()
            
            # Parse with BeautifulSoup
            soup = BeautifulSoup(response.text, 'html.parser')
            title = soup.title.string if soup.title else "No title found"
            
            # Store or update the web resource
            web_resource = self._store_web_resource(url, title, task_id)
            
            # Extract main content, remove scripts, styles, etc.
            for script in soup(["script", "style", "meta", "noscript"]):
                script.extract()
                
            # Get text content and normalize whitespace
            text_content = soup.get_text(separator=' ')
            text_content = re.sub(r'\s+', ' ', text_content).strip()
            
            return {
                'status': 'success',
                'url': url,
                'title': title,
                'content': text_content,
                'html': response.text,
                'web_resource_id': web_resource.id,
                'timestamp': datetime.utcnow().isoformat()
            }
            
        except Exception as e:
            logger.error(f"Error fetching URL {url}: {str(e)}")
            return {
                'status': 'error',
                'url': url,
                'error': str(e),
                'timestamp': datetime.utcnow().isoformat()
            }
            
    def _store_web_resource(self, url, title, task_id=None):
        """Store or update web resource in the database"""
        try:
            web_resource = WebResource.query.filter_by(url=url).first()
            
            if not web_resource:
                web_resource = WebResource(
                    url=url,
                    title=title,
                    category='general',
                    last_accessed=datetime.utcnow(),
                )
                if task_id:
                    web_resource.task_id = task_id
                db.session.add(web_resource)
            else:
                web_resource.last_accessed = datetime.utcnow()
                web_resource.title = title
                
            db.session.commit()
            return web_resource
            
        except Exception as e:
            logger.error(f"Error storing web resource: {str(e)}")
            db.session.rollback()
            # Return a placeholder object if db operation fails
            return WebResource(url=url, title=title)
    
    def extract_links(self, html):
        """Extract all links from an HTML document"""
        soup = BeautifulSoup(html, 'html.parser')
        links = []
        
        for a_tag in soup.find_all('a', href=True):
            href = a_tag['href']
            text = a_tag.get_text(strip=True)
            
            if href.startswith('#') or href.startswith('javascript:'):
                continue
                
            links.append({
                'href': href,
                'text': text[:100] if text else ""
            })
            
        return links
    
    def extract_structured_data(self, html):
        """Extract structured data (JSON-LD, microdata) from an HTML document"""
        soup = BeautifulSoup(html, 'html.parser')
        structured_data = []
        
        # Extract JSON-LD
        for script in soup.find_all('script', type='application/ld+json'):
            try:
                data = json.loads(script.string)
                structured_data.append({
                    'type': 'json-ld',
                    'data': data
                })
            except json.JSONDecodeError:
                pass
                
        # TODO: Add microdata and RDFa extraction if needed
        
        return structured_data
    
    def analyze_page_content(self, content, url=None):
        """Analyze page content to extract key information using NLP"""
        # This will be enhanced with our quantum NLP process
        # For now, just return a simple analysis
        word_count = len(content.split())
        sentences = re.split(r'[.!?]+', content)
        sentence_count = len(sentences)
        
        return {
            'word_count': word_count,
            'sentence_count': sentence_count,
            'average_sentence_length': word_count / max(1, sentence_count),
            'url': url
        }


# Helper functions for browser automation tasks
def create_scraping_task(url, title, description=None, scheduled_for=None):
    """Create a new web scraping task"""
    task = Task(
        title=title,
        description=description or f"Scrape content from {url}",
        status='pending',
        task_type='web_scrape',
        scheduled_for=scheduled_for,
        config={'url': url}
    )
    db.session.add(task)
    db.session.commit()
    return task


def execute_scraping_task(task_id):
    """Execute a web scraping task"""
    task = Task.query.get(task_id)
    if not task or task.task_type != 'web_scrape':
        return {'status': 'error', 'message': 'Invalid task'}
    
    try:
        task.status = 'in_progress'
        db.session.commit()
        
        url = task.config.get('url')
        browser = BrowserAutomation()
        result = browser.fetch_page(url, task_id=task.id)
        
        if result['status'] == 'success':
            # Also analyze the content
            analysis = browser.analyze_page_content(result['content'], url)
            result['analysis'] = analysis
            
            task.status = 'completed'
            task.completed_at = datetime.utcnow()
            task.result = result
        else:
            task.status = 'failed'
            task.result = result
            
        db.session.commit()
        return result
        
    except Exception as e:
        logger.error(f"Error executing task {task_id}: {str(e)}")
        task.status = 'failed'
        task.result = {'error': str(e)}
        db.session.commit()
        return {'status': 'error', 'message': str(e)}