Spaces:
Sleeping
Sleeping
import gradio as gr | |
import anthropic | |
import PyPDF2 | |
import pandas as pd | |
import numpy as np | |
import io | |
import os | |
import json | |
import zipfile | |
import tempfile | |
from typing import Dict, List, Tuple, Union | |
import re | |
from pathlib import Path | |
import openpyxl | |
from dataclasses import dataclass | |
from enum import Enum | |
# Configuración para HuggingFace | |
os.environ['GRADIO_ANALYTICS_ENABLED'] = 'False' | |
# Inicializar cliente Anthropic | |
client = anthropic.Anthropic() | |
# Enum para tipos de análisis | |
class AnalysisType(Enum): | |
MATHEMATICAL_MODEL = "mathematical_model" | |
DATA_FITTING = "data_fitting" | |
UNKNOWN = "unknown" | |
# Estructura modular para modelos | |
class MathematicalModel: | |
name: str | |
equation: str | |
parameters: List[str] | |
application: str | |
sources: List[str] | |
category: str | |
# Sistema de registro de modelos escalable | |
class ModelRegistry: | |
def __init__(self): | |
self.models = {} | |
self._initialize_default_models() | |
def register_model(self, model: MathematicalModel): | |
"""Registra un nuevo modelo matemático""" | |
if model.category not in self.models: | |
self.models[model.category] = {} | |
self.models[model.category][model.name] = model | |
def get_model(self, category: str, name: str) -> MathematicalModel: | |
"""Obtiene un modelo específico""" | |
return self.models.get(category, {}).get(name) | |
def get_all_models(self) -> Dict: | |
"""Retorna todos los modelos registrados""" | |
return self.models | |
def _initialize_default_models(self): | |
"""Inicializa los modelos por defecto""" | |
# Modelos de crecimiento | |
self.register_model(MathematicalModel( | |
name="Monod", | |
equation="μ = μmax × (S / (Ks + S))", | |
parameters=["μmax (h⁻¹)", "Ks (g/L)"], | |
application="Crecimiento limitado por sustrato único", | |
sources=["Cambridge", "MIT", "DTU"], | |
category="crecimiento_biomasa" | |
)) | |
self.register_model(MathematicalModel( | |
name="Logístico", | |
equation="dX/dt = μmax × X × (1 - X/Xmax)", | |
parameters=["μmax (h⁻¹)", "Xmax (g/L)"], | |
application="Sistemas cerrados batch", | |
sources=["Cranfield", "Swansea", "HAL Theses"], | |
category="crecimiento_biomasa" | |
)) | |
self.register_model(MathematicalModel( | |
name="Gompertz", | |
equation="X(t) = Xmax × exp(-exp((μmax × e / Xmax) × (λ - t) + 1))", | |
parameters=["λ (h)", "μmax (h⁻¹)", "Xmax (g/L)"], | |
application="Crecimiento con fase lag pronunciada", | |
sources=["Lund University", "NC State"], | |
category="crecimiento_biomasa" | |
)) | |
# Modelos enzimáticos | |
self.register_model(MathematicalModel( | |
name="Michaelis-Menten", | |
equation="v = Vmax × S / (Km + S)", | |
parameters=["Vmax", "Km"], | |
application="Cinética enzimática básica", | |
sources=["Warsaw Univ Tech", "Food Processing"], | |
category="consumo_sustrato" | |
)) | |
# Modelos de producto | |
self.register_model(MathematicalModel( | |
name="Luedeking-Piret", | |
equation="dP/dt = α × (dX/dt) + β × X", | |
parameters=["α (asociado)", "β (no asociado)"], | |
application="Producción mixta asociada/no asociada", | |
sources=["Cambridge", "E-Century"], | |
category="formacion_producto" | |
)) | |
# Instancia global del registro | |
model_registry = ModelRegistry() | |
# Modelos de Claude disponibles | |
CLAUDE_MODELS = { | |
"claude-3-5-sonnet-20241022": { | |
"name": "Claude 3.5 Sonnet", | |
"description": "Modelo rápido y eficiente", | |
"max_tokens": 4000, | |
"best_for": "Análisis general" | |
}, | |
"claude-3-opus-20240229": { | |
"name": "Claude 3 Opus", | |
"description": "Modelo más potente", | |
"max_tokens": 4000, | |
"best_for": "Análisis complejos" | |
}, | |
"claude-3-haiku-20240307": { | |
"name": "Claude 3 Haiku", | |
"description": "Modelo más rápido", | |
"max_tokens": 4000, | |
"best_for": "Análisis rápidos" | |
} | |
} | |
class FileProcessor: | |
"""Clase para procesar diferentes tipos de archivos""" | |
def extract_text_from_pdf(pdf_file) -> str: | |
"""Extrae texto de un archivo PDF""" | |
try: | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) | |
text = "" | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
return text | |
except Exception as e: | |
return f"Error al leer PDF: {str(e)}" | |
def read_csv(csv_file) -> pd.DataFrame: | |
"""Lee archivo CSV""" | |
try: | |
return pd.read_csv(io.BytesIO(csv_file)) | |
except Exception as e: | |
return None | |
def read_excel(excel_file) -> pd.DataFrame: | |
"""Lee archivo Excel""" | |
try: | |
return pd.read_excel(io.BytesIO(excel_file)) | |
except Exception as e: | |
return None | |
def extract_from_zip(zip_file) -> List[Tuple[str, bytes]]: | |
"""Extrae archivos de un ZIP""" | |
files = [] | |
try: | |
with zipfile.ZipFile(io.BytesIO(zip_file), 'r') as zip_ref: | |
for file_name in zip_ref.namelist(): | |
if not file_name.startswith('__MACOSX'): | |
file_data = zip_ref.read(file_name) | |
files.append((file_name, file_data)) | |
except Exception as e: | |
print(f"Error procesando ZIP: {e}") | |
return files | |
class AIAnalyzer: | |
"""Clase para análisis con IA""" | |
def __init__(self, client, model_registry): | |
self.client = client | |
self.model_registry = model_registry | |
def detect_analysis_type(self, content: Union[str, pd.DataFrame]) -> AnalysisType: | |
"""Detecta el tipo de análisis necesario""" | |
if isinstance(content, pd.DataFrame): | |
# Si es DataFrame, probablemente son datos para ajustar | |
return AnalysisType.DATA_FITTING | |
# Analizar texto para determinar tipo | |
prompt = """ | |
Analiza este contenido y determina si es: | |
1. Un artículo científico que describe modelos matemáticos biotecnológicos | |
2. Datos experimentales para ajuste de parámetros | |
Responde solo con: "MODELO" o "DATOS" | |
""" | |
try: | |
response = self.client.messages.create( | |
model="claude-3-haiku-20240307", | |
max_tokens=10, | |
messages=[{"role": "user", "content": f"{prompt}\n\n{content[:1000]}"}] | |
) | |
result = response.content[0].text.strip().upper() | |
if "MODELO" in result: | |
return AnalysisType.MATHEMATICAL_MODEL | |
elif "DATOS" in result: | |
return AnalysisType.DATA_FITTING | |
else: | |
return AnalysisType.UNKNOWN | |
except: | |
return AnalysisType.UNKNOWN | |
def analyze_mathematical_article(self, text: str, claude_model: str) -> Dict: | |
"""Analiza artículo con modelos matemáticos""" | |
prompts = { | |
"identificar_modelos": """ | |
Analiza este texto científico e identifica: | |
1. Modelos matemáticos biotecnológicos descritos | |
2. Ecuaciones específicas | |
3. Parámetros mencionados | |
4. Aplicaciones biotecnológicas | |
5. Microorganismos y procesos | |
Formato JSON con estructura: | |
{ | |
"modelos": ["nombre1", "nombre2"], | |
"ecuaciones": ["eq1", "eq2"], | |
"parametros": ["param1", "param2"], | |
"aplicaciones": ["app1", "app2"], | |
"microorganismos": ["org1", "org2"] | |
} | |
""", | |
"recomendar_implementacion": """ | |
Basado en los modelos identificados, proporciona: | |
1. Estrategia de implementación | |
2. Consideraciones experimentales | |
3. Métodos de validación | |
4. Posibles limitaciones | |
""" | |
} | |
try: | |
# Identificar modelos | |
response = self.client.messages.create( | |
model=claude_model, | |
max_tokens=2000, | |
messages=[{ | |
"role": "user", | |
"content": f"{prompts['identificar_modelos']}\n\nTEXTO:\n{text[:3000]}" | |
}] | |
) | |
models_info = response.content[0].text | |
# Recomendaciones | |
response2 = self.client.messages.create( | |
model=claude_model, | |
max_tokens=2000, | |
messages=[{ | |
"role": "user", | |
"content": f"{prompts['recomendar_implementacion']}\n\nMODELOS:\n{models_info}" | |
}] | |
) | |
return { | |
"tipo": "Artículo de Modelos Matemáticos", | |
"modelos": models_info, | |
"recomendaciones": response2.content[0].text | |
} | |
except Exception as e: | |
return {"error": str(e)} | |
def analyze_fitting_data(self, data: pd.DataFrame, claude_model: str) -> Dict: | |
"""Analiza datos para ajuste de parámetros""" | |
# Preparar resumen de datos | |
data_summary = f""" | |
Columnas: {list(data.columns)} | |
Forma: {data.shape} | |
Primeras filas: | |
{data.head().to_string()} | |
Estadísticas: | |
{data.describe().to_string()} | |
""" | |
prompt = """ | |
Analiza estos datos experimentales y determina: | |
1. Variables independientes y dependientes | |
2. Posibles modelos matemáticos aplicables | |
3. Método de ajuste recomendado | |
4. Parámetros a estimar | |
5. Calidad esperada del ajuste | |
Proporciona código Python para el ajuste. | |
""" | |
try: | |
response = self.client.messages.create( | |
model=claude_model, | |
max_tokens=3000, | |
messages=[{ | |
"role": "user", | |
"content": f"{prompt}\n\nDATOS:\n{data_summary}" | |
}] | |
) | |
return { | |
"tipo": "Datos para Ajuste", | |
"analisis": response.content[0].text, | |
"resumen_datos": data_summary | |
} | |
except Exception as e: | |
return {"error": str(e)} | |
def process_files(files, claude_model: str) -> str: | |
"""Procesa múltiples archivos""" | |
processor = FileProcessor() | |
analyzer = AIAnalyzer(client, model_registry) | |
results = [] | |
for file in files: | |
if file is None: | |
continue | |
file_name = file.name if hasattr(file, 'name') else "archivo" | |
file_ext = Path(file_name).suffix.lower() | |
# Leer contenido del archivo | |
with open(file.name, 'rb') as f: | |
file_content = f.read() | |
# Procesar según tipo | |
if file_ext == '.zip': | |
# Extraer y procesar archivos del ZIP | |
extracted_files = processor.extract_from_zip(file_content) | |
results.append(f"## 📦 Archivo ZIP: {file_name}") | |
results.append(f"Contiene {len(extracted_files)} archivos\n") | |
for name, content in extracted_files: | |
sub_ext = Path(name).suffix.lower() | |
results.append(f"### 📄 {name}") | |
if sub_ext == '.pdf': | |
text = processor.extract_text_from_pdf(content) | |
analysis_type = analyzer.detect_analysis_type(text) | |
if analysis_type == AnalysisType.MATHEMATICAL_MODEL: | |
result = analyzer.analyze_mathematical_article(text, claude_model) | |
else: | |
result = {"tipo": "PDF no reconocido", "contenido": text[:500]} | |
results.append(json.dumps(result, indent=2, ensure_ascii=False)) | |
elif sub_ext in ['.csv', '.xlsx', '.xls']: | |
if sub_ext == '.csv': | |
df = processor.read_csv(content) | |
else: | |
df = processor.read_excel(content) | |
if df is not None: | |
result = analyzer.analyze_fitting_data(df, claude_model) | |
results.append(json.dumps(result, indent=2, ensure_ascii=False)) | |
results.append("\n---\n") | |
elif file_ext == '.pdf': | |
text = processor.extract_text_from_pdf(file_content) | |
analysis_type = analyzer.detect_analysis_type(text) | |
results.append(f"## 📄 PDF: {file_name}") | |
if analysis_type == AnalysisType.MATHEMATICAL_MODEL: | |
result = analyzer.analyze_mathematical_article(text, claude_model) | |
else: | |
result = {"tipo": "PDF - Contenido no identificado", "texto": text[:1000]} | |
results.append(json.dumps(result, indent=2, ensure_ascii=False)) | |
elif file_ext in ['.csv', '.xlsx', '.xls']: | |
results.append(f"## 📊 Archivo de datos: {file_name}") | |
if file_ext == '.csv': | |
df = processor.read_csv(file_content) | |
else: | |
df = processor.read_excel(file_content) | |
if df is not None: | |
result = analyzer.analyze_fitting_data(df, claude_model) | |
results.append(json.dumps(result, indent=2, ensure_ascii=False)) | |
results.append("\n---\n") | |
return "\n".join(results) | |
def generate_implementation_code(analysis_results: str) -> str: | |
"""Genera código de implementación basado en el análisis""" | |
code = """ | |
import numpy as np | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
from scipy.integrate import odeint | |
from scipy.optimize import curve_fit, differential_evolution | |
from sklearn.metrics import r2_score, mean_squared_error | |
import seaborn as sns | |
# Configuración de visualización | |
plt.style.use('seaborn-v0_8-darkgrid') | |
sns.set_palette("husl") | |
class BiotechModelFitter: | |
\"\"\"Clase para ajuste de modelos biotecnológicos\"\"\" | |
def __init__(self): | |
self.models = {} | |
self.fitted_params = {} | |
self.results = {} | |
def add_model(self, name, func, param_names): | |
\"\"\"Registra un nuevo modelo\"\"\" | |
self.models[name] = { | |
'function': func, | |
'parameters': param_names | |
} | |
def fit_model(self, model_name, x_data, y_data, bounds=None): | |
\"\"\"Ajusta modelo a datos\"\"\" | |
if model_name not in self.models: | |
raise ValueError(f"Modelo {model_name} no registrado") | |
model_func = self.models[model_name]['function'] | |
# Intentar ajuste con curve_fit | |
try: | |
if bounds: | |
popt, pcov = curve_fit(model_func, x_data, y_data, bounds=bounds) | |
else: | |
popt, pcov = curve_fit(model_func, x_data, y_data) | |
# Calcular métricas | |
y_pred = model_func(x_data, *popt) | |
r2 = r2_score(y_data, y_pred) | |
rmse = np.sqrt(mean_squared_error(y_data, y_pred)) | |
self.fitted_params[model_name] = popt | |
self.results[model_name] = { | |
'parameters': dict(zip(self.models[model_name]['parameters'], popt)), | |
'covariance': pcov, | |
'r2': r2, | |
'rmse': rmse | |
} | |
return True | |
except Exception as e: | |
print(f"Error en ajuste: {e}") | |
# Intentar con optimización global | |
return self._global_fit(model_name, x_data, y_data, bounds) | |
def _global_fit(self, model_name, x_data, y_data, bounds): | |
\"\"\"Ajuste global con differential evolution\"\"\" | |
model_func = self.models[model_name]['function'] | |
def objective(params): | |
y_pred = model_func(x_data, *params) | |
return np.sum((y_data - y_pred)**2) | |
if not bounds: | |
# Bounds por defecto | |
n_params = len(self.models[model_name]['parameters']) | |
bounds = [(0, 100)] * n_params | |
result = differential_evolution(objective, bounds) | |
if result.success: | |
popt = result.x | |
y_pred = model_func(x_data, *popt) | |
r2 = r2_score(y_data, y_pred) | |
rmse = np.sqrt(mean_squared_error(y_data, y_pred)) | |
self.fitted_params[model_name] = popt | |
self.results[model_name] = { | |
'parameters': dict(zip(self.models[model_name]['parameters'], popt)), | |
'r2': r2, | |
'rmse': rmse, | |
'optimization_result': result | |
} | |
return True | |
return False | |
def plot_results(self, x_data, y_data, models_to_plot=None): | |
\"\"\"Visualiza resultados del ajuste\"\"\" | |
plt.figure(figsize=(12, 8)) | |
# Datos experimentales | |
plt.scatter(x_data, y_data, label='Datos experimentales', | |
s=50, alpha=0.7, edgecolors='black') | |
# Modelos ajustados | |
if models_to_plot is None: | |
models_to_plot = self.fitted_params.keys() | |
x_smooth = np.linspace(x_data.min(), x_data.max(), 300) | |
for model_name in models_to_plot: | |
if model_name in self.fitted_params: | |
model_func = self.models[model_name]['function'] | |
params = self.fitted_params[model_name] | |
y_smooth = model_func(x_smooth, *params) | |
r2 = self.results[model_name]['r2'] | |
plt.plot(x_smooth, y_smooth, | |
label=f'{model_name} (R² = {r2:.4f})', | |
linewidth=2.5) | |
plt.xlabel('Variable Independiente', fontsize=12) | |
plt.ylabel('Variable Dependiente', fontsize=12) | |
plt.title('Ajuste de Modelos Biotecnológicos', fontsize=14, fontweight='bold') | |
plt.legend(loc='best', frameon=True, shadow=True) | |
plt.grid(True, alpha=0.3) | |
plt.tight_layout() | |
return plt.gcf() | |
def generate_report(self): | |
\"\"\"Genera reporte de resultados\"\"\" | |
report = "# Reporte de Ajuste de Modelos\\n\\n" | |
for model_name, results in self.results.items(): | |
report += f"## Modelo: {model_name}\\n\\n" | |
report += f"### Parámetros ajustados:\\n" | |
for param, value in results['parameters'].items(): | |
report += f"- **{param}**: {value:.6f}\\n" | |
report += f"\\n### Métricas de ajuste:\\n" | |
report += f"- **R²**: {results['r2']:.6f}\\n" | |
report += f"- **RMSE**: {results['rmse']:.6f}\\n\\n" | |
return report | |
# Modelos predefinidos comunes | |
def monod_model(S, mu_max, Ks): | |
return mu_max * S / (Ks + S) | |
def logistic_growth(t, K, r, t0): | |
return K / (1 + np.exp(-r * (t - t0))) | |
def gompertz_model(t, A, mu, lambda_param): | |
return A * np.exp(-np.exp(mu * np.e / A * (lambda_param - t) + 1)) | |
def michaelis_menten(S, Vmax, Km): | |
return Vmax * S / (Km + S) | |
# Ejemplo de uso | |
if __name__ == "__main__": | |
# Crear instancia del ajustador | |
fitter = BiotechModelFitter() | |
# Registrar modelos | |
fitter.add_model('Monod', monod_model, ['mu_max', 'Ks']) | |
fitter.add_model('Michaelis-Menten', michaelis_menten, ['Vmax', 'Km']) | |
fitter.add_model('Logistic', logistic_growth, ['K', 'r', 't0']) | |
print("Sistema de ajuste listo para usar!") | |
print("Carga tus datos y utiliza fitter.fit_model()") | |
""" | |
return code | |
# Interfaz Gradio optimizada para HuggingFace | |
def create_interface(): | |
with gr.Blocks( | |
title="Analizador Inteligente de Modelos Biotecnológicos", | |
theme=gr.themes.Soft(), | |
css=""" | |
.gradio-container { | |
font-family: 'Arial', sans-serif; | |
} | |
""" | |
) as demo: | |
gr.Markdown(""" | |
# 🧬 Analizador Inteligente de Modelos Biotecnológicos | |
### 🎯 Capacidades: | |
- **Detección automática** del tipo de documento (artículo científico vs datos experimentales) | |
- **Análisis de PDFs** con modelos matemáticos biotecnológicos | |
- **Procesamiento de datos** CSV/Excel para ajuste de parámetros | |
- **Soporte para múltiples archivos** y archivos ZIP | |
- **Generación de código** Python para implementación | |
### 📁 Tipos de archivo soportados: | |
- PDF (artículos científicos o reportes de datos) | |
- CSV/Excel (datos experimentales) | |
- ZIP (múltiples archivos) | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
files_input = gr.File( | |
label="📁 Subir archivos", | |
file_count="multiple", | |
file_types=[".pdf", ".csv", ".xlsx", ".xls", ".zip"], | |
type="filepath" | |
) | |
model_selector = gr.Dropdown( | |
choices=list(CLAUDE_MODELS.keys()), | |
value="claude-3-5-sonnet-20241022", | |
label="🤖 Modelo Claude", | |
info="Selecciona el modelo de IA" | |
) | |
analyze_btn = gr.Button( | |
"🚀 Analizar", | |
variant="primary", | |
size="lg" | |
) | |
# Información del modelo | |
model_info = gr.Markdown() | |
def update_model_info(model): | |
info = CLAUDE_MODELS[model] | |
return f""" | |
**{info['name']}** | |
{info['description']} | |
*Mejor para: {info['best_for']}* | |
""" | |
model_selector.change( | |
update_model_info, | |
inputs=[model_selector], | |
outputs=[model_info] | |
) | |
with gr.Column(scale=2): | |
analysis_output = gr.Markdown( | |
label="📊 Resultados del Análisis" | |
) | |
code_output = gr.Code( | |
label="💻 Código de Implementación", | |
language="python", | |
interactive=True | |
) | |
# Ejemplos | |
gr.Examples( | |
examples=[ | |
[["examples/growth_kinetics.pdf"]], | |
[["examples/experimental_data.csv"]], | |
[["examples/multiple_files.zip"]] | |
], | |
inputs=[files_input], | |
label="📚 Ejemplos" | |
) | |
# Footer | |
gr.Markdown(""" | |
--- | |
### 🔧 Características técnicas: | |
- **Base de modelos escalable**: Fácil adición de nuevos modelos matemáticos | |
- **Análisis con IA**: Detección automática del contexto y tipo de análisis | |
- **Optimizado para HuggingFace**: Configuración lista para deployment | |
- **Código modular**: Arquitectura flexible y mantenible | |
### 📖 Instrucciones: | |
1. Sube uno o varios archivos (PDF, CSV, Excel o ZIP) | |
2. El sistema detectará automáticamente el tipo de análisis necesario | |
3. Revisa los resultados y el código generado | |
4. Copia el código para tu implementación | |
""") | |
# Eventos | |
analyze_btn.click( | |
fn=lambda files, model: ( | |
process_files(files, model) if files else "Por favor sube archivos para analizar", | |
generate_implementation_code("") if files else "" | |
), | |
inputs=[files_input, model_selector], | |
outputs=[analysis_output, code_output] | |
) | |
# Cargar info inicial del modelo | |
demo.load( | |
fn=lambda: update_model_info("claude-3-5-sonnet-20241022"), | |
outputs=[model_info] | |
) | |
return demo | |
# Función principal para HuggingFace Spaces | |
def main(): | |
if not os.getenv("ANTHROPIC_API_KEY"): | |
print("⚠️ Configura ANTHROPIC_API_KEY en los secretos de HuggingFace Space") | |
return gr.Interface( | |
fn=lambda x: "Por favor configura ANTHROPIC_API_KEY en los secretos del Space", | |
inputs=gr.Textbox(), | |
outputs=gr.Textbox(), | |
title="Error de Configuración" | |
) | |
return create_interface() | |
# Para ejecución local | |
if __name__ == "__main__": | |
demo = main() | |
if demo: | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False | |
) |