import logging import os import io import base64 import json import requests import threading import uuid import time import tempfile import subprocess import shutil import re from flask import Flask, render_template, request, jsonify, Response, stream_with_context, send_from_directory from google import genai from google.genai import types from PIL import Image # --- Configuration du Logging --- # Configuration d'un logger qui écrit dans la console (stdout). # C'est la pratique recommandée pour les applications conteneurisées (Docker) ou déployées sur des services comme Heroku/Render. logging.basicConfig( level=logging.INFO, # Niveau de log par défaut. Changer à logging.DEBUG pour plus de détails. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # --- Configuration de l'Application Flask --- app = Flask(__name__) # --- Constantes et Variables Globales --- GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88" TELEGRAM_CHAT_ID = "-1002564204301" GENERATED_PDF_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'generated_pdfs') # --- Initialisation des Services Externes --- client = None if GOOGLE_API_KEY: try: client = genai.Client(api_key=GOOGLE_API_KEY) logger.info("Client Google GenAI initialisé avec succès.") except Exception as e: logger.critical(f"Erreur critique lors de l'initialisation du client Gemini: {e}", exc_info=True) else: logger.critical("GEMINI_API_KEY non trouvé dans les variables d'environnement. Le service ne fonctionnera pas.") task_results = {} # --- Fonctions Utilitaires --- def load_prompt_from_file(filename): """Charge le contenu d'un fichier de prompt.""" try: prompts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'prompts') filepath = os.path.join(prompts_dir, filename) logger.info(f"Chargement du prompt depuis '{filepath}'") with open(filepath, 'r', encoding='utf-8') as f: return f.read() except Exception as e: logger.error(f"Erreur lors du chargement du prompt '{filename}': {e}", exc_info=True) return "" def get_prompt_for_style(style): """Retourne le prompt approprié en fonction du style demandé.""" logger.info(f"Sélection du prompt pour le style: '{style}'") return load_prompt_from_file('prompt_light.txt') if style == 'light' else load_prompt_from_file('prompt_colorful.txt') def check_latex_installation(): """Vérifie si pdflatex est installé et accessible dans le PATH.""" logger.info("Vérification de l'installation de LaTeX (pdflatex)...") try: # Exécute 'pdflatex -version' pour vérifier son existence. # capture_output=True masque la sortie, check=True lève une exception en cas d'échec. subprocess.run(["pdflatex", "-version"], capture_output=True, check=True, timeout=10) logger.info("Vérification réussie: pdflatex est installé et fonctionnel.") return True except (subprocess.CalledProcessError, FileNotFoundError, subprocess.TimeoutExpired) as e: logger.warning(f"pdflatex n'est pas installé ou n'est pas dans le PATH. La génération de PDF sera désactivée. Erreur: {e}") return False IS_LATEX_INSTALLED = check_latex_installation() def clean_latex_code(latex_code): """Extrait le code LaTeX brut des blocs de code formatés (```latex ... ```).""" logger.info("Nettoyage du code LaTeX reçu de Gemini...") # Cherche un bloc de code explicite ```latex ... ``` match_latex = re.search(r"```(?:latex|tex)\s*(.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) if match_latex: logger.info("Bloc de code 'latex' ou 'tex' trouvé et extrait.") return match_latex.group(1).strip() # Plan B : Cherche un bloc de code générique qui commence par \documentclass match_generic = re.search(r"```\s*(\\documentclass.*?)\s*```", latex_code, re.DOTALL | re.IGNORECASE) if match_generic: logger.info("Bloc de code générique avec '\\documentclass' trouvé et extrait.") return match_generic.group(1).strip() logger.warning("Aucun bloc de code LaTeX (```...```) n'a été trouvé. Utilisation de la réponse brute.") return latex_code.strip() def latex_to_pdf(latex_code, output_filename_base, output_dir): """Compile une chaîne de code LaTeX en fichier PDF.""" if not IS_LATEX_INSTALLED: logger.error("Tentative de compilation LaTeX alors que pdflatex n'est pas disponible.") return None, "Erreur: pdflatex n'est pas installé sur le serveur." tex_filename = f"{output_filename_base}.tex" tex_path = os.path.join(output_dir, tex_filename) pdf_path = os.path.join(output_dir, f"{output_filename_base}.pdf") logger.info(f"Début de la compilation LaTeX vers PDF pour '{output_filename_base}'") try: # Écriture du fichier .tex with open(tex_path, "w", encoding="utf-8") as tex_file: tex_file.write(latex_code) logger.info(f"Fichier .tex '{tex_path}' créé avec succès.") # Copie de l'environnement et configuration pour UTF-8 pour éviter les erreurs d'encodage my_env = os.environ.copy() my_env["LC_ALL"] = "C.UTF-8" my_env["LANG"] = "C.UTF-8" last_result = None # Exécution de pdflatex deux fois pour résoudre les références (table des matières, etc.) for i in range(2): logger.info(f"Exécution de pdflatex - Passe {i+1}/2...") process = subprocess.run( ["pdflatex", "-interaction=nonstopmode", "-output-directory", output_dir, tex_path], capture_output=True, text=True, check=False, encoding="utf-8", errors="replace", env=my_env, timeout=60 ) last_result = process # Si le PDF n'est pas créé et que la première passe a échoué, inutile de continuer if not os.path.exists(pdf_path) and process.returncode != 0: logger.warning(f"La passe {i+1} de pdflatex a échoué et aucun PDF n'a été créé. Arrêt de la compilation.") break if os.path.exists(pdf_path): logger.info(f"PDF généré avec succès : '{pdf_path}'") return pdf_path, f"PDF généré: {os.path.basename(pdf_path)}" else: error_log = last_result.stdout + "\n" + last_result.stderr if last_result else "Aucun résultat de compilation disponible." logger.error(f"Échec de la compilation PDF pour '{tex_filename}'. Log de pdflatex:\n{error_log}") return None, f"Erreur de compilation PDF. Log: ...{error_log[-1000:]}" # Retourne les 1000 derniers caractères du log except Exception as e: logger.error(f"Exception pendant la génération du PDF: {e}", exc_info=True) return None, f"Exception durant la génération du PDF: {str(e)}" def send_to_telegram(file_data, filename, caption="Nouveau fichier uploadé"): """Envoie un fichier au canal Telegram configuré.""" logger.info(f"Préparation de l'envoi du fichier '{filename}' à Telegram.") try: if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')): url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto" files = {'photo': (filename, file_data)} log_msg = f"Envoi de l'image '{filename}' à Telegram..." else: url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument" files = {'document': (filename, file_data)} log_msg = f"Envoi du document '{filename}' à Telegram..." logger.info(log_msg) data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption} response = requests.post(url, files=files, data=data, timeout=30) response.raise_for_status() # Lève une exception si le statut HTTP est une erreur (4xx ou 5xx) logger.info(f"Fichier '{filename}' envoyé avec succès à Telegram.") except Exception as e: logger.error(f"Erreur lors de l'envoi à Telegram: {e}", exc_info=True) # --- Logique Principale (Worker en arrière-plan) --- def process_files_background(task_id, files_data, resolution_style): """Fonction exécutée en thread pour traiter les fichiers, appeler Gemini et générer le PDF.""" logger.info(f"[Task {task_id}] Démarrage du traitement en arrière-plan.") task_results[task_id]['status'] = 'processing' uploaded_file_refs = [] try: if not client: raise ConnectionError("Le client Gemini n'est pas initialisé.") contents = [] logger.info(f"[Task {task_id}] Préparation des fichiers pour l'API Gemini.") for file_info in files_data: if file_info['type'].startswith('image/'): logger.info(f"[Task {task_id}] Traitement de l'image '{file_info['filename']}'.") img = Image.open(io.BytesIO(file_info['data'])) buffered = io.BytesIO() img.save(buffered, format="PNG") # Convertit en PNG pour la consistance img_base64_str = base64.b64encode(buffered.getvalue()).decode() contents.append({'inline_data': {'mime_type': 'image/png', 'data': img_base64_str}}) elif file_info['type'] == 'application/pdf': logger.info(f"[Task {task_id}] Upload du PDF '{file_info['filename']}' vers Google GenAI File API.") with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_pdf: temp_pdf.write(file_info['data']) temp_pdf_path = temp_pdf.name # Upload du fichier et ajout de la référence à la liste de nettoyage file_ref = client.files.upload(file=temp_pdf_path) uploaded_file_refs.append(file_ref) contents.append(file_ref) os.unlink(temp_pdf_path) # Supprime le fichier temporaire local logger.info(f"[Task {task_id}] PDF '{file_info['filename']}' uploadé avec succès. Référence: {file_ref.name}") if not contents: raise ValueError("Aucun contenu valide (image ou PDF) n'a été traité.") prompt_to_use = get_prompt_for_style(resolution_style) if not prompt_to_use: raise ValueError(f"Le fichier de prompt pour le style '{resolution_style}' est introuvable ou vide.") contents.append(prompt_to_use) task_results[task_id]['status'] = 'generating_latex' logger.info(f"[Task {task_id}] Envoi de la requête à l'API Gemini (modèle gemini-2.5-pro).") gemini_response = client.models.generate_content( model="gemini-2.5-flash", contents=contents, config=types.GenerateContentConfig(tools=[types.Tool(code_execution=types.ToolCodeExecution)]) ) logger.info(f"[Task {task_id}] Réponse reçue de Gemini.") full_latex_response = "" if gemini_response.candidates and gemini_response.candidates[0].content and gemini_response.candidates[0].content.parts: for part in gemini_response.candidates[0].content.parts: if hasattr(part, 'text') and part.text: full_latex_response += part.text if not full_latex_response.strip(): raise ValueError("La réponse de Gemini était vide.") logger.debug(f"[Task {task_id}] Réponse brute de Gemini:\n---\n{full_latex_response[:500]}...\n---") task_results[task_id]['status'] = 'cleaning_latex' cleaned_latex = clean_latex_code(full_latex_response) logger.debug(f"[Task {task_id}] Code LaTeX nettoyé:\n---\n{cleaned_latex[:500]}...\n---") task_results[task_id]['status'] = 'generating_pdf' pdf_filename_base = f"solution_{task_id}" pdf_file_path, pdf_message = latex_to_pdf(cleaned_latex, pdf_filename_base, GENERATED_PDF_DIR) if pdf_file_path: task_results[task_id]['status'] = 'completed' task_results[task_id]['pdf_filename'] = os.path.basename(pdf_file_path) task_results[task_id]['response'] = f"PDF généré avec succès: {os.path.basename(pdf_file_path)}" logger.info(f"[Task {task_id}] Tâche terminée avec succès. PDF: {os.path.basename(pdf_file_path)}") else: raise RuntimeError(f"Échec de la génération du PDF: {pdf_message}") except Exception as e: logger.error(f"[Task {task_id}] Une erreur est survenue dans le thread de traitement.", exc_info=True) task_results[task_id]['status'] = 'error' task_results[task_id]['error'] = str(e) task_results[task_id]['response'] = f"Une erreur est survenue: {str(e)}" finally: # Nettoyage des fichiers uploadés à l'API Gemini if uploaded_file_refs: logger.info(f"[Task {task_id}] Nettoyage des {len(uploaded_file_refs)} fichiers temporaires de l'API Gemini.") for file_ref in uploaded_file_refs: try: client.files.delete(file_ref) logger.info(f"[Task {task_id}] Fichier temporaire Gemini '{file_ref.name}' supprimé.") except Exception as del_e: logger.warning(f"[Task {task_id}] Échec de la suppression du fichier temporaire Gemini '{file_ref.name}': {del_e}") # --- Routes Flask (API Endpoints) --- @app.route('/') def index(): logger.info(f"Requête servie pour l'endpoint '/' depuis {request.remote_addr}") return render_template('index.html') @app.route('/solve', methods=['POST']) def solve(): logger.info(f"Nouvelle requête sur /solve depuis {request.remote_addr}") try: if 'user_files' not in request.files: logger.warning(f"/solve: Requête de {request.remote_addr} sans 'user_files'.") return jsonify({'error': 'Aucun champ de fichier dans la requête'}), 400 uploaded_files = request.files.getlist('user_files') if not uploaded_files or all(f.filename == '' for f in uploaded_files): logger.warning(f"/solve: Requête de {request.remote_addr} avec champ 'user_files' mais sans fichiers.") return jsonify({'error': 'Aucun fichier sélectionné'}), 400 resolution_style = request.form.get('style', 'colorful') files_data = [] file_count = {'images': 0, 'pdfs': 0} for file in uploaded_files: if not file.filename: continue file_data = file.read() file_type = file.content_type or 'application/octet-stream' # Validation et traitement des fichiers if file_type.startswith('image/'): file_count['images'] += 1 files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) send_to_telegram(file_data, file.filename, f"Image reçue: {file.filename} (Style: {resolution_style})") elif file_type == 'application/pdf': if file_count['pdfs'] >= 1: logger.warning(f"/solve: Requête de {request.remote_addr} avec plusieurs PDFs. Rejetée.") return jsonify({'error': 'Un seul fichier PDF est autorisé par requête'}), 400 file_count['pdfs'] += 1 files_data.append({'filename': file.filename, 'data': file_data, 'type': file_type}) send_to_telegram(file_data, file.filename, f"PDF reçu: {file.filename} (Style: {resolution_style})") else: logger.warning(f"/solve: Fichier non supporté '{file.filename}' de type '{file_type}' uploadé par {request.remote_addr}.") if not files_data: logger.warning(f"/solve: Aucun fichier valide (image/pdf) trouvé dans la requête de {request.remote_addr}.") return jsonify({'error': 'Aucun fichier valide (image ou PDF) n\'a été fourni'}), 400 task_id = str(uuid.uuid4()) task_results[task_id] = { 'status': 'pending', 'response': '', 'error': None, 'time_started': time.time(), 'style': resolution_style, 'file_count': file_count, 'first_filename': files_data[0]['filename'] } logger.info(f"Création de la tâche {task_id} pour {file_count['images']} image(s) et {file_count['pdfs']} PDF(s). Style: {resolution_style}.") threading.Thread(target=process_files_background, args=(task_id, files_data, resolution_style)).start() return jsonify({'task_id': task_id, 'status': 'pending', 'first_filename': files_data[0]['filename']}) except Exception as e: logger.error(f"Erreur inattendue dans l'endpoint /solve: {e}", exc_info=True) return jsonify({'error': f'Erreur interne du serveur: {e}'}), 500 @app.route('/task/', methods=['GET']) def get_task_status(task_id): logger.debug(f"Requête de statut pour la tâche {task_id}") task = task_results.get(task_id) if not task: logger.warning(f"Tentative d'accès à une tâche inexistante: {task_id}") return jsonify({'error': 'Tâche introuvable'}), 404 response_data = {'status': task['status'], 'response': task.get('response'), 'error': task.get('error')} if task['status'] == 'completed': response_data['download_url'] = f"/download/{task_id}" return jsonify(response_data) @app.route('/stream/', methods=['GET']) def stream_task_progress(task_id): """Endpoint pour Server-Sent Events (SSE) pour streamer la progression.""" def generate(): logger.info(f"Nouvelle connexion de streaming (SSE) pour la tâche {task_id}") last_status_sent = None while True: task = task_results.get(task_id) if not task: logger.warning(f"La tâche {task_id} a disparu pendant le streaming.") yield f'data: {json.dumps({"error": "La tâche a été perdue", "status": "error"})}\n\n' break current_status = task['status'] if current_status != last_status_sent: data_to_send = {"status": current_status} if current_status == 'completed': data_to_send["response"] = task.get("response", "") data_to_send["download_url"] = f"/download/{task_id}" elif current_status == 'error': data_to_send["error"] = task.get("error", "Erreur inconnue") logger.info(f"[Task {task_id}] Envoi de la mise à jour de statut via SSE: {current_status}") yield f'data: {json.dumps(data_to_send)}\n\n' last_status_sent = current_status if current_status in ['completed', 'error']: logger.info(f"Fermeture de la connexion SSE pour la tâche terminée/échouée {task_id}") break time.sleep(1) # Attendre 1 seconde avant de vérifier à nouveau return Response(stream_with_context(generate()), mimetype='text/event-stream', headers={'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no'}) @app.route('/download/') def download_pdf(task_id): logger.info(f"Requête de téléchargement pour la tâche {task_id}") task = task_results.get(task_id) if not task or task['status'] != 'completed' or 'pdf_filename' not in task: logger.warning(f"Échec du téléchargement pour la tâche {task_id}: Fichier non trouvé ou tâche non terminée.") return "Fichier non trouvé ou la tâche n'est pas encore terminée.", 404 try: logger.info(f"Envoi du fichier '{task['pdf_filename']}' pour la tâche {task_id}") return send_from_directory(GENERATED_PDF_DIR, task['pdf_filename'], as_attachment=True) except FileNotFoundError: logger.error(f"Le fichier PDF '{task['pdf_filename']}' pour la tâche {task_id} est introuvable sur le disque.") return "Erreur: Fichier introuvable sur le serveur.", 404 if __name__ == '__main__': logger.info("Démarrage de l'application Flask.") # Création du répertoire pour les PDFs générés s'il n'existe pas os.makedirs(GENERATED_PDF_DIR, exist_ok=True) logger.info(f"Répertoire pour les PDFs générés assuré d'exister: '{GENERATED_PDF_DIR}'") # Vérifications cri