import os import logging import re import requests import hashlib import PyPDF2 import numpy as np import pandas as pd from io import BytesIO from typing import List, Dict, Any, Tuple from urllib.parse import urlparse, urljoin from concurrent.futures import ThreadPoolExecutor, as_completed from bs4 import BeautifulSoup from pathlib import Path from datetime import datetime from sklearn.feature_extraction.text import TfidfVectorizer from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from transformers import pipeline from sentence_transformers import SentenceTransformer, util import torch import spacy import matplotlib.pyplot as plt from utils import sanitize_filename logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class SEOSpaceAnalyzer: def __init__(self, max_urls: int = 20, max_workers: int = 4) -> None: self.max_urls = max_urls self.max_workers = max_workers self.session = self._configure_session() self.models = self._load_models() self.base_dir = Path("content_storage") self.base_dir.mkdir(parents=True, exist_ok=True) self.current_analysis: Dict[str, Any] = {} def _load_models(self) -> Dict[str, Any]: try: device = 0 if torch.cuda.is_available() else -1 logger.info("Cargando modelos NLP...") models = { 'summarizer': pipeline("summarization", model="facebook/bart-large-cnn", device=device), 'ner': pipeline("ner", model="dslim/bert-base-NER", aggregation_strategy="simple", device=device), 'semantic': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'), 'spacy': spacy.load("es_core_news_lg") } logger.info("Modelos cargados correctamente.") return models except Exception as e: logger.error(f"Error cargando modelos: {e}") raise def _configure_session(self) -> requests.Session: session = requests.Session() retry = Retry( total=3, backoff_factor=1, status_forcelist=[500, 502, 503, 504], allowed_methods=['GET', 'HEAD'] ) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) session.headers.update({ 'User-Agent': 'Mozilla/5.0 (compatible; SEOBot/1.0)', 'Accept-Language': 'es-ES,es;q=0.9' }) return session def analyze_sitemap(self, sitemap_url: str) -> Tuple[Dict, List[str], Dict, Dict, List[Dict], Dict, Dict]: try: urls = self._parse_sitemap(sitemap_url) if not urls: return {"error": "No se pudieron extraer URLs del sitemap"}, [], {}, {}, [], {}, {} results: List[Dict] = [] with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = {executor.submit(self._process_url, url): url for url in urls[:self.max_urls]} for future in as_completed(futures): url = futures[future] try: res = future.result() results.append(res) logger.info(f"Procesado: {url}") except Exception as e: logger.error(f"Error procesando {url}: {e}") results.append({'url': url, 'status': 'error', 'error': str(e)}) summaries, entities = self._apply_nlp(results) similarities = self._compute_semantic_similarity(results) self.current_analysis = { 'stats': self._calculate_stats(results), 'content_analysis': self._analyze_content(results), 'links': self._analyze_links(results), 'recommendations': self._generate_seo_recommendations(results), 'details': results, 'summaries': summaries, 'entities': entities, 'similarities': similarities, 'timestamp': datetime.now().isoformat() } a = self.current_analysis return a['stats'], a['recommendations'], a['content_analysis'], a['links'], a['details'], a['summaries'], a['similarities'] except Exception as e: logger.error(f"Error en análisis: {e}") return {"error": str(e)}, [], {}, {}, [], {}, {} def _process_url(self, url: str) -> Dict: try: response = self.session.get(url, timeout=15) response.raise_for_status() content_type = response.headers.get('Content-Type', '') result: Dict[str, Any] = {'url': url, 'status': 'success'} if 'application/pdf' in content_type: result.update(self._process_pdf(response.content)) elif 'text/html' in content_type: result.update(self._process_html(response.text, url)) else: result.update({'type': 'unknown', 'content': '', 'word_count': 0}) self._save_content(url, response.content) return result except requests.exceptions.Timeout as e: return {'url': url, 'status': 'error', 'error': "Timeout"} except requests.exceptions.HTTPError as e: return {'url': url, 'status': 'error', 'error': "HTTP Error"} except Exception as e: return {'url': url, 'status': 'error', 'error': str(e)} def _process_html(self, html: str, base_url: str) -> Dict: soup = BeautifulSoup(html, 'html.parser') clean_text = self._clean_text(soup.get_text()) return { 'type': 'html', 'content': clean_text, 'word_count': len(clean_text.split()), 'metadata': self._extract_metadata(soup), 'links': self._extract_links(soup, base_url) } def _process_pdf(self, content: bytes) -> Dict: try: text = "" with BytesIO(content) as pdf_file: reader = PyPDF2.PdfReader(pdf_file) for page in reader.pages: extracted = page.extract_text() text += extracted if extracted else "" clean_text = self._clean_text(text) return { 'type': 'pdf', 'content': clean_text, 'word_count': len(clean_text.split()), 'page_count': len(reader.pages) } except Exception as e: return {'type': 'pdf', 'error': str(e)} def _clean_text(self, text: str) -> str: if not text: return "" text = re.sub(r'\s+', ' ', text) return re.sub(r'[^\w\sáéíóúñÁÉÍÓÚÑ]', ' ', text).strip() def _extract_metadata(self, soup: BeautifulSoup) -> Dict: metadata = {'title': '', 'description': '', 'keywords': [], 'og': {}} if soup.title and soup.title.string: metadata['title'] = soup.title.string.strip()[:200] for meta in soup.find_all('meta'): name = meta.get('name', '').lower() prop = meta.get('property', '').lower() content = meta.get('content', '') if name == 'description': metadata['description'] = content[:300] elif name == 'keywords': metadata['keywords'] = [kw.strip() for kw in content.split(',') if kw.strip()] elif prop.startswith('og:'): metadata['og'][prop[3:]] = content return metadata def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]: links: List[Dict] = [] base_netloc = urlparse(base_url).netloc for tag in soup.find_all('a', href=True): try: href = tag['href'].strip() if not href or href.startswith('javascript:'): continue full_url = urljoin(base_url, href) parsed = urlparse(full_url) links.append({ 'url': full_url, 'type': 'internal' if parsed.netloc == base_netloc else 'external', 'anchor': self._clean_text(tag.get_text())[:100], 'file_type': self._get_file_type(parsed.path) }) except: continue return links def _get_file_type(self, path: str) -> str: ext = Path(path).suffix.lower() return ext[1:] if ext else 'html' def _parse_sitemap(self, sitemap_url: str) -> List[str]: try: response = self.session.get(sitemap_url, timeout=10) response.raise_for_status() if 'xml' not in response.headers.get('Content-Type', ''): return [] soup = BeautifulSoup(response.text, 'lxml-xml') urls: List[str] = [] if soup.find('sitemapindex'): for sitemap in soup.find_all('loc'): url = sitemap.text.strip() if url.endswith('.xml'): urls.extend(self._parse_sitemap(url)) else: urls = [loc.text.strip() for loc in soup.find_all('loc')] return list({url for url in urls if url.startswith('http')}) except: return [] def _save_content(self, url: str, content: bytes) -> None: try: parsed = urlparse(url) domain_dir = self.base_dir / parsed.netloc raw_path = parsed.path.lstrip('/') if not raw_path or raw_path.endswith('/'): raw_path = os.path.join(raw_path, 'index.html') if raw_path else 'index.html' safe_path = sanitize_filename(raw_path) save_path = domain_dir / safe_path save_path.parent.mkdir(parents=True, exist_ok=True) with open(save_path, 'wb') as f: f.write(content) except: pass def _apply_nlp(self, results: List[Dict]) -> Tuple[Dict[str, str], Dict[str, List[str]]]: summaries = {} entities = {} for r in results: if r.get('status') != 'success' or not r.get('content'): continue content = r['content'] if len(content.split()) > 300: try: summary = self.models['summarizer'](content[:1024], max_length=100, min_length=30, do_sample=False)[0]['summary_text'] summaries[r['url']] = summary except: pass try: ents = self.models['ner'](content[:1000]) entities[r['url']] = list(set([e['word'] for e in ents if e['entity_group'] in ['PER', 'ORG', 'LOC']])) except: pass return summaries, entities def _compute_semantic_similarity(self, results: List[Dict]) -> Dict[str, List[Dict]]: contents = [(r['url'], r['content']) for r in results if r.get('status') == 'success' and r.get('content')] if len(contents) < 2: return {} try: urls, texts = zip(*contents) embeddings = self.models['semantic'].encode(texts, convert_to_tensor=True) sim_matrix = util.pytorch_cos_sim(embeddings, embeddings) similarity_dict = {} for i, url in enumerate(urls): scores = list(sim_matrix[i]) top_indices = sorted(range(len(scores)), key=lambda j: scores[j], reverse=True) top_similar = [ {"url": urls[j], "score": float(scores[j])} for j in top_indices if j != i and float(scores[j]) > 0.5 ][:3] similarity_dict[url] = top_similar return similarity_dict except: return {} def _calculate_stats(self, results: List[Dict]) -> Dict: successful = [r for r in results if r.get('status') == 'success'] content_types = [r.get('type', 'unknown') for r in successful] avg_word_count = round(np.mean([r.get('word_count', 0) for r in successful]) if successful else 0, 1) return { 'total_urls': len(results), 'successful': len(successful), 'failed': len(results) - len(successful), 'content_types': pd.Series(content_types).value_counts().to_dict(), 'avg_word_count': avg_word_count, 'failed_urls': [r['url'] for r in results if r.get('status') != 'success'] } def _analyze_content(self, results: List[Dict]) -> Dict: successful = [r for r in results if r.get('status') == 'success' and r.get('content')] texts = [r['content'] for r in successful if len(r['content'].split()) > 10] if not texts: return {'top_keywords': [], 'content_samples': []} try: stop_words = list(self.models['spacy'].Defaults.stop_words) vectorizer = TfidfVectorizer(stop_words=stop_words, max_features=50, ngram_range=(1, 2)) tfidf = vectorizer.fit_transform(texts) feature_names = vectorizer.get_feature_names_out() sorted_indices = np.argsort(np.asarray(tfidf.sum(axis=0)).ravel())[-10:] top_keywords = feature_names[sorted_indices][::-1].tolist() except: top_keywords = [] samples = [{'url': r['url'], 'sample': r['content'][:500] + '...' if len(r['content']) > 500 else r['content']} for r in successful[:3]] return {'top_keywords': top_keywords, 'content_samples': samples} def _analyze_links(self, results: List[Dict]) -> Dict: all_links = [] for result in results: if result.get('links'): all_links.extend(result['links']) if not all_links: return {'internal_links': {}, 'external_domains': {}, 'common_anchors': {}, 'file_types': {}} df = pd.DataFrame(all_links) return { 'internal_links': df[df['type'] == 'internal']['url'].value_counts().head(20).to_dict(), 'external_domains': df[df['type'] == 'external']['url'].apply(lambda x: urlparse(x).netloc).value_counts().head(10).to_dict(), 'common_anchors': df['anchor'].value_counts().head(10).to_dict(), 'file_types': df['file_type'].value_counts().to_dict() } def _generate_seo_recommendations(self, results: List[Dict]) -> List[str]: successful = [r for r in results if r.get('status') == 'success'] if not successful: return ["No se pudo analizar ningún contenido exitosamente"] recs = [] missing_titles = sum(1 for r in successful if not r.get('metadata', {}).get('title')) if missing_titles: recs.append(f"📌 Añadir títulos a {missing_titles} páginas") short_descriptions = sum(1 for r in successful if not r.get('metadata', {}).get('description')) if short_descriptions: recs.append(f"📌 Añadir meta descripciones a {short_descriptions} páginas") short_content = sum(1 for r in successful if r.get('word_count', 0) < 300) if short_content: recs.append(f"📝 Ampliar contenido en {short_content} páginas (menos de 300 palabras)") all_links = [link for r in results for link in r.get('links', [])] if all_links: df_links = pd.DataFrame(all_links) internal_links = df_links[df_links['type'] == 'internal'] if len(internal_links) > 100: recs.append(f"🔗 Optimizar estructura de enlaces internos ({len(internal_links)} enlaces)") return recs if recs else ["✅ No se detectaron problemas críticos de SEO"] def plot_internal_links(self, links_data: Dict) -> Any: internal_links = links_data.get('internal_links', {}) fig, ax = plt.subplots() if not internal_links: ax.text(0.5, 0.5, 'No hay enlaces internos', ha='center', va='center', transform=ax.transAxes) ax.axis('off') else: names = list(internal_links.keys()) counts = list(internal_links.values()) ax.barh(names, counts) ax.set_xlabel("Cantidad de enlaces") ax.set_title("Top 20 Enlaces Internos") plt.tight_layout() return fig