SEO / app.py
Merlintxu's picture
Update app.py
03a9af3 verified
raw
history blame
18.4 kB
import os
import json
import logging
import re
import requests
import hashlib
import PyPDF2
import numpy as np
import pandas as pd
from io import BytesIO
from typing import List, Dict, Optional, Tuple
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from sklearn.feature_extraction.text import TfidfVectorizer
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from transformers import pipeline
from sentence_transformers import SentenceTransformer
import spacy
import torch
import gradio as gr
import matplotlib.pyplot as plt
# Configuración de logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SEOSpaceAnalyzer:
def __init__(self):
self.session = self._configure_session()
self.models = self._load_models()
self.base_dir = Path("content_storage")
self.base_dir.mkdir(parents=True, exist_ok=True)
self.current_analysis = {}
def _configure_session(self) -> requests.Session:
"""Configura sesión HTTP con reintentos"""
session = requests.Session()
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=['GET', 'HEAD']
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.headers.update({
'User-Agent': 'Mozilla/5.0 (compatible; SEOBot/1.0)',
'Accept-Language': 'es-ES,es;q=0.9'
})
return session
def _load_models(self) -> Dict:
"""Carga modelos optimizados para Hugging Face"""
try:
device = 0 if torch.cuda.is_available() else -1
return {
'summarizer': pipeline(
"summarization",
model="facebook/bart-large-cnn",
device=device
),
'ner': pipeline(
"ner",
model="dslim/bert-base-NER",
aggregation_strategy="simple",
device=device
),
'semantic': SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2'),
'spacy': spacy.load("es_core_news_lg")
}
except Exception as e:
logger.error(f"Error loading models: {e}")
raise
def analyze_sitemap(self, sitemap_url: str) -> Dict:
"""Analiza un sitemap completo"""
try:
urls = self._parse_sitemap(sitemap_url)
if not urls:
return {"error": "No se pudieron extraer URLs del sitemap"}
results = []
with ThreadPoolExecutor(max_workers=4) as executor:
# Corregido: Cambiado ] por } en la comprensión del diccionario
futures = {executor.submit(self._process_url, url): url for url in urls[:20]} # Limitar para demo
for future in as_completed(futures):
try:
results.append(future.result())
except Exception as e:
url = futures[future]
logger.error(f"Error processing {url}: {e}")
results.append({'url': url, 'status': 'error', 'error': str(e)})
self.current_analysis = {
'stats': self._calculate_stats(results),
'content_analysis': self._analyze_content(results),
'links': self._analyze_links(results),
'recommendations': self._generate_seo_recommendations(results),
'timestamp': datetime.now().isoformat()
}
return self.current_analysis
except Exception as e:
logger.error(f"Error en análisis: {str(e)}")
return {"error": str(e)}
def _process_url(self, url: str) -> Dict:
"""Procesa una URL individual"""
try:
response = self.session.get(url, timeout=15)
response.raise_for_status()
content_type = response.headers.get('Content-Type', '')
result = {'url': url, 'status': 'success'}
if 'application/pdf' in content_type:
result.update(self._process_pdf(response.content))
elif 'text/html' in content_type:
result.update(self._process_html(response.text, url))
self._save_content(url, response.content)
return result
except requests.exceptions.RequestException as e:
logger.warning(f"Error procesando {url}: {str(e)}")
return {'url': url, 'status': 'error', 'error': str(e)}
def _process_html(self, html: str, base_url: str) -> Dict:
"""Procesa contenido HTML"""
soup = BeautifulSoup(html, 'html.parser')
clean_text = self._clean_text(soup.get_text())
return {
'type': 'html',
'content': clean_text,
'word_count': len(clean_text.split()),
'links': self._extract_links(soup, base_url),
'metadata': self._extract_metadata(soup)
}
def _process_pdf(self, content: bytes) -> Dict:
"""Procesa documentos PDF"""
try:
text = ""
with BytesIO(content) as pdf_file:
reader = PyPDF2.PdfReader(pdf_file)
for page in reader.pages:
text += page.extract_text() or "" # Handle None return
clean_text = self._clean_text(text)
return {
'type': 'pdf',
'content': clean_text,
'word_count': len(clean_text.split()),
'page_count': len(reader.pages)
}
except PyPDF2.PdfReadError as e:
logger.error(f"Error reading PDF: {e}")
return {'type': 'pdf', 'error': str(e)}
def _clean_text(self, text: str) -> str:
"""Limpieza avanzada de texto"""
if not text:
return ""
text = re.sub(r'\s+', ' ', text)
return re.sub(r'[^\w\sáéíóúñÁÉÍÓÚÑ]', ' ', text).strip()
def _extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict]:
"""Extrae y clasifica enlaces"""
links = []
base_netloc = urlparse(base_url).netloc
for tag in soup.find_all('a', href=True):
try:
href = tag['href'].strip()
if not href or href.startswith('javascript:'):
continue
full_url = urljoin(base_url, href)
parsed = urlparse(full_url)
links.append({
'url': full_url,
'type': 'internal' if parsed.netloc == base_netloc else 'external',
'anchor': self._clean_text(tag.get_text())[:100],
'file_type': self._get_file_type(parsed.path)
})
except Exception as e:
logger.warning(f"Error processing link {tag.get('href')}: {e}")
continue
return links
def _get_file_type(self, path: str) -> str:
"""Determina tipo de archivo por extensión"""
ext = Path(path).suffix.lower()
return ext[1:] if ext else 'html'
def _extract_metadata(self, soup: BeautifulSoup) -> Dict:
"""Extrae metadatos SEO"""
metadata = {
'title': '',
'description': '',
'keywords': [],
'og': {}
}
if soup.title and soup.title.string:
metadata['title'] = soup.title.string.strip()[:200]
for meta in soup.find_all('meta'):
name = meta.get('name', '').lower()
property_ = meta.get('property', '').lower()
content = meta.get('content', '')
if name == 'description':
metadata['description'] = content[:300]
elif name == 'keywords':
metadata['keywords'] = [kw.strip() for kw in content.split(',') if kw.strip()]
elif property_.startswith('og:'):
metadata['og'][property_[3:]] = content
return metadata
def _parse_sitemap(self, sitemap_url: str) -> List[str]:
"""Parsea sitemap XML básico"""
try:
response = self.session.get(sitemap_url, timeout=10)
response.raise_for_status()
if 'xml' not in response.headers.get('Content-Type', ''):
logger.warning(f"El sitemap no parece ser XML: {sitemap_url}")
return []
urls = []
soup = BeautifulSoup(response.text, 'lxml-xml') # Usar parser XML específico
# Handle sitemap index
if soup.find('sitemapindex'):
for sitemap in soup.find_all('loc'):
url = sitemap.text.strip()
if url.endswith('.xml'):
urls.extend(self._parse_sitemap(url))
else:
urls = [loc.text.strip() for loc in soup.find_all('loc')]
return list(set(url for url in urls if url.startswith('http')))
except Exception as e:
logger.error(f"Error parsing sitemap {sitemap_url}: {e}")
return []
def _save_content(self, url: str, content: bytes) -> None:
"""Almacena el contenido descargado"""
try:
parsed = urlparse(url)
domain_dir = self.base_dir / parsed.netloc
path = parsed.path.lstrip('/')
if not path or path.endswith('/'):
path = path + 'index.html'
save_path = domain_dir / path
save_path.parent.mkdir(parents=True, exist_ok=True)
with open(save_path, 'wb') as f:
f.write(content)
except Exception as e:
logger.error(f"Error saving content for {url}: {e}")
def _calculate_stats(self, results: List[Dict]) -> Dict:
"""Calcula estadísticas básicas"""
successful = [r for r in results if r.get('status') == 'success']
return {
'total_urls': len(results),
'successful': len(successful),
'failed': len(results) - len(successful),
'content_types': pd.Series([r.get('type', 'unknown') for r in successful]).value_counts().to_dict(),
'avg_word_count': round(np.mean([r.get('word_count', 0) for r in successful]), 1),
'failed_urls': [r['url'] for r in results if r.get('status') != 'success']
}
def _analyze_content(self, results: List[Dict]) -> Dict:
"""Analiza contenido con NLP"""
successful = [r for r in results if r.get('status') == 'success' and r.get('content')]
texts = [r['content'] for r in successful if len(r['content'].split()) > 10] # Filtrar contenido muy corto
if not texts:
return {'top_keywords': [], 'content_samples': []}
# Análisis de temas principales
try:
vectorizer = TfidfVectorizer(
stop_words=list(spacy.lang.es.stop_words.STOP_WORDS),
max_features=50,
ngram_range=(1, 2)
)
tfidf = vectorizer.fit_transform(texts)
feature_names = vectorizer.get_feature_names_out()
sorted_indices = np.argsort(np.asarray(tfidf.sum(axis=0)).ravel())[-10:] # Top 10 índices
top_keywords = feature_names[sorted_indices][::-1].tolist() # Orden descendente
except Exception as e:
logger.error(f"Error en análisis TF-IDF: {str(e)}")
top_keywords = []
return {
'top_keywords': top_keywords,
'content_samples': [{'url': r['url'], 'sample': r['content'][:500] + '...'}
for r in successful[:3]] # Muestras de contenido
}
def _analyze_links(self, results: List[Dict]) -> Dict:
"""Analiza estructura de enlaces"""
all_links = []
for result in results:
if result.get('links'):
all_links.extend(result['links'])
if not all_links:
return {
'internal_links': {},
'external_domains': {},
'common_anchors': {},
'file_types': {}
}
df = pd.DataFrame(all_links)
return {
'internal_links': df[df['type'] == 'internal']['url'].value_counts().head(20).to_dict(),
'external_domains': df[df['type'] == 'external']['url']
.apply(lambda x: urlparse(x).netloc)
.value_counts().head(10).to_dict(),
'common_anchors': df['anchor'].value_counts().head(10).to_dict(),
'file_types': df['file_type'].value_counts().to_dict()
}
def _generate_seo_recommendations(self, results: List[Dict]) -> List[str]:
"""Genera recomendaciones SEO"""
successful = [r for r in results if r.get('status') == 'success']
if not successful:
return ["No se pudo analizar ningún contenido exitosamente"]
recs = []
# Revisar metadatos
missing_titles = sum(1 for r in successful if not r.get('metadata', {}).get('title'))
if missing_titles:
recs.append(f"📌 Añadir títulos a {missing_titles} páginas")
short_descriptions = sum(1 for r in successful
if not r.get('metadata', {}).get('description'))
if short_descriptions:
recs.append(f"📌 Añadir meta descripciones a {short_descriptions} páginas")
# Revisar contenido corto
short_content = sum(1 for r in successful if r.get('word_count', 0) < 300)
if short_content:
recs.append(f"📝 Ampliar contenido en {short_content} páginas (menos de 300 palabras)")
# Analizar enlaces
all_links = [link for r in results for link in r.get('links', [])]
if all_links:
df_links = pd.DataFrame(all_links)
broken_links = sum(1 for link in all_links if link['type'] == 'internal')
if broken_links > 5: # Umbral arbitrario
recs.append(f"🔗 Revisar {broken_links} enlaces internos (posibles rotos)")
return recs if recs else ["✅ No se detectaron problemas críticos de SEO"]
def create_interface():
analyzer = SEOSpaceAnalyzer()
with gr.Blocks(title="SEO Analyzer Pro", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# 🕵️ SEO Analyzer Pro
**Analizador SEO avanzado con modelos de lenguaje**
Sube la URL de un sitemap.xml para analizar todo el sitio web.
""")
with gr.Row():
with gr.Column():
sitemap_input = gr.Textbox(
label="URL del Sitemap",
placeholder="https://ejemplo.com/sitemap.xml",
interactive=True
)
analyze_btn = gr.Button("Analizar Sitio", variant="primary")
with gr.Row():
clear_btn = gr.Button("Limpiar")
download_btn = gr.Button("Descargar Reporte", variant="secondary")
with gr.Column():
status_output = gr.Textbox(label="Estado del Análisis", interactive=False)
progress_bar = gr.Progress()
with gr.Tabs():
with gr.Tab("📊 Resumen"):
stats_output = gr.JSON(label="Estadísticas Generales")
recommendations_output = gr.JSON(label="Recomendaciones SEO")
with gr.Tab("📝 Contenido"):
content_output = gr.JSON(label="Análisis de Contenido")
gr.Examples(
examples=[
{"content": "Ejemplo de análisis de contenido..."}
],
inputs=[content_output],
label="Ejemplos de Salida"
)
with gr.Tab("🔗 Enlaces"):
links_output = gr.JSON(label="Análisis de Enlaces")
with gr.Accordion("Visualización de Enlaces", open=False):
links_plot = gr.Plot()
with gr.Tab("📂 Documentos"):
gr.Markdown("""
### Documentos Encontrados
Los documentos descargados se guardan en la carpeta `content_storage/`
""")
file_explorer = gr.FileExplorer(glob="content_storage/**/*")
# Event handlers
analyze_btn.click(
fn=analyzer.analyze_sitemap,
inputs=sitemap_input,
outputs=[stats_output, recommendations_output, content_output, links_output],
show_progress=True
)
clear_btn.click(
fn=lambda: [None]*4,
outputs=[stats_output, recommendations_output, content_output, links_output]
)
download_btn.click(
fn=lambda: gr.File(value="content_storage/seo_report.json"),
outputs=gr.File()
)
return interface
if __name__ == "__main__":
# Verificar modelos antes de iniciar
try:
spacy.load("es_core_news_lg")
except OSError:
logger.error("Modelo spaCy 'es_core_news_lg' no encontrado. Ejecute:")
logger.error("python -m spacy download es_core_news_lg")
exit(1)
app = create_interface()
app.launch(
server_name="0.0.0.0",
server_port=7860,
show_error=True,
share=False
)