Ohpdf / app.py
Docfile's picture
Update app.py
0ab8eed verified
raw
history blame
18.1 kB
from flask import Flask, render_template, request, jsonify, send_file
import threading
import time
import os
import json
from datetime import datetime
from google import genai
from pydantic import BaseModel, Field
import typing_extensions as typing
import uuid
import enum
app = Flask(__name__)
# Configuration
GOOGLE_API_KEY = "AIzaSyAMYpF67aqFnWDJESWOx1dC-w3sEU29VcM" # Remplacez par votre clé API
MODEL_ID = "gemini-2.0-flash" # Modèle recommandé selon la documentation
UPLOAD_FOLDER = 'uploads'
RESULTS_FOLDER = 'results'
# Créer les dossiers s'ils n'existent pas
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(RESULTS_FOLDER, exist_ok=True)
# Définition des schémas Pydantic selon la documentation
class TranslationPair(BaseModel):
"""Paire de traductions fang/français"""
fang: str = Field(description="Phrase en langue fang")
francais: str = Field(description="Traduction française de la phrase")
class SyntheticDataResponse(BaseModel):
"""Réponse structurée pour la génération de données synthétiques"""
request_number: int = Field(description="Numéro de la requête")
generated_pairs: list[TranslationPair] = Field(
description="Liste des paires de traduction générées",
min_items=1
)
timestamp: str = Field(description="Horodatage de la génération")
# Stockage des tâches en cours
class TaskManager:
def __init__(self):
self.tasks = {}
def create_task(self, task_id):
self.tasks[task_id] = {
'status': 'running',
'progress': 0,
'total': 470,
'results_file': f'results_{task_id}.json',
'start_time': datetime.now(),
'errors': [],
'last_update': datetime.now(),
'all_data': []
}
def update_progress(self, task_id, progress, data=None):
if task_id in self.tasks:
self.tasks[task_id]['progress'] = progress
self.tasks[task_id]['last_update'] = datetime.now()
if data:
self.tasks[task_id]['all_data'].append(data)
def add_error(self, task_id, error):
if task_id in self.tasks:
self.tasks[task_id]['errors'].append(error)
def complete_task(self, task_id):
if task_id in self.tasks:
self.tasks[task_id]['status'] = 'completed'
self.tasks[task_id]['last_update'] = datetime.now()
def get_task(self, task_id):
return self.tasks.get(task_id)
task_manager = TaskManager()
def generate_synthetic_data(file_path, task_id):
"""Fonction qui exécute les 470 requêtes en arrière-plan avec sortie JSON structurée"""
try:
# Initialiser le client Google AI selon la documentation
client = genai.Client(api_key=GOOGLE_API_KEY)
# Uploader le fichier
file_ref = client.files.upload(path=file_path)
# Prompt optimisé pour une sortie structurée
prompt = """
À partir du fichier fourni, génère exactement 400 nouvelles paires de phrases synthétiques.
Chaque paire doit contenir :
- Une phrase en fang (langue locale)
- Sa traduction en français
Les phrases doivent être variées, naturelles et représentatives de la structure linguistique
présente dans le fichier source. Respecte strictement le nombre de 400 paires demandées.
"""
# Fichier de résultats JSON
results_file = os.path.join(RESULTS_FOLDER, f'results_{task_id}.json')
# Structure pour stocker toutes les données
all_results = {
"metadata": {
"task_id": task_id,
"start_time": datetime.now().isoformat(),
"total_requests": 470,
"model_used": MODEL_ID,
"expected_pairs_per_request": 400
},
"requests": [],
"summary": {
"total_pairs": 0,
"completed_requests": 0,
"failed_requests": 0,
"errors": []
}
}
for i in range(470):
try:
print(f"🔄 Démarrage requête {i+1}/470...")
# Configuration selon la documentation avec schéma Pydantic
response = client.models.generate_content(
model=MODEL_ID,
contents=[file_ref, prompt],
config={
'response_mime_type': 'application/json',
'response_schema': SyntheticDataResponse,
}
)
# Utiliser la propriété .parsed selon la documentation
if hasattr(response, 'parsed') and response.parsed:
parsed_response = response.parsed
# Structurer la réponse selon le schéma
request_data = {
"request_number": i + 1,
"timestamp": datetime.now().isoformat(),
"response": {
"request_number": parsed_response.request_number,
"generated_pairs": [
{
"fang": pair.fang,
"francais": pair.francais
} for pair in parsed_response.generated_pairs
],
"timestamp": parsed_response.timestamp
},
"pairs_count": len(parsed_response.generated_pairs),
"status": "success"
}
all_results["requests"].append(request_data)
all_results["summary"]["total_pairs"] += request_data["pairs_count"]
all_results["summary"]["completed_requests"] += 1
print(f"✅ Requête {i+1} réussie - {request_data['pairs_count']} paires générées")
else:
# Fallback : parser le JSON manuellement si .parsed n'est pas disponible
try:
response_data = json.loads(response.text)
request_data = {
"request_number": i + 1,
"timestamp": datetime.now().isoformat(),
"response": response_data,
"pairs_count": len(response_data.get("generated_pairs", [])),
"status": "success_fallback"
}
all_results["requests"].append(request_data)
all_results["summary"]["total_pairs"] += request_data["pairs_count"]
all_results["summary"]["completed_requests"] += 1
print(f"✅ Requête {i+1} réussie (fallback) - {request_data['pairs_count']} paires")
except json.JSONDecodeError as json_error:
raise Exception(f"Impossible de parser la réponse JSON: {json_error}")
# Sauvegarder après chaque requête réussie
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(all_results, f, ensure_ascii=False, indent=2)
# Mettre à jour le progrès
task_manager.update_progress(task_id, i + 1)
# Pause pour respecter les limites de l'API (ajustable selon vos besoins)
time.sleep(1) # Réduit à 1 seconde, ajustez selon vos limites API
except Exception as e:
error_msg = f"Erreur requête {i+1}: {str(e)}"
print(f"❌ {error_msg}")
task_manager.add_error(task_id, error_msg)
all_results["summary"]["errors"].append({
"request_number": i + 1,
"error": error_msg,
"timestamp": datetime.now().isoformat()
})
all_results["summary"]["failed_requests"] += 1
# Créer une entrée vide pour cette requête échouée
failed_request_data = {
"request_number": i + 1,
"timestamp": datetime.now().isoformat(),
"response": None,
"pairs_count": 0,
"status": "failed",
"error": error_msg
}
all_results["requests"].append(failed_request_data)
# Sauvegarder même en cas d'erreur
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(all_results, f, ensure_ascii=False, indent=2)
# Mettre à jour le progrès même en cas d'erreur
task_manager.update_progress(task_id, i + 1)
# Pause plus longue en cas d'erreur
time.sleep(5)
# Finaliser le fichier JSON
end_time = datetime.now()
start_time = datetime.fromisoformat(all_results["metadata"]["start_time"])
duration = (end_time - start_time).total_seconds()
all_results["metadata"]["end_time"] = end_time.isoformat()
all_results["metadata"]["duration_seconds"] = duration
all_results["metadata"]["duration_minutes"] = duration / 60
all_results["summary"]["success_rate"] = (
all_results["summary"]["completed_requests"] / 470 * 100
)
# Statistiques finales
print(f"\n📊 STATISTIQUES FINALES:")
print(f" • Requêtes réussies: {all_results['summary']['completed_requests']}/470")
print(f" • Requêtes échouées: {all_results['summary']['failed_requests']}/470")
print(f" • Total paires générées: {all_results['summary']['total_pairs']}")
print(f" • Taux de réussite: {all_results['summary']['success_rate']:.1f}%")
print(f" • Durée totale: {duration/60:.1f} minutes")
with open(results_file, 'w', encoding='utf-8') as f:
json.dump(all_results, f, ensure_ascii=False, indent=2)
task_manager.complete_task(task_id)
print(f"🎉 Tâche {task_id} terminée avec succès")
except Exception as e:
error_msg = f"Erreur générale: {str(e)}"
task_manager.add_error(task_id, error_msg)
print(f"💥 {error_msg}")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'Aucun fichier sélectionné'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'Aucun fichier sélectionné'}), 400
if file:
# Générer un ID unique pour cette tâche
task_id = str(uuid.uuid4())
# Sauvegarder le fichier
filename = f"input_{task_id}.txt"
file_path = os.path.join(UPLOAD_FOLDER, filename)
file.save(file_path)
# Créer la tâche
task_manager.create_task(task_id)
# Démarrer le traitement en arrière-plan
thread = threading.Thread(
target=generate_synthetic_data,
args=(file_path, task_id)
)
thread.daemon = True
thread.start()
return jsonify({
'task_id': task_id,
'message': 'Traitement démarré en arrière-plan',
'expected_duration_minutes': 8 # Estimation basée sur 1s par requête
})
@app.route('/status/<task_id>')
def get_status(task_id):
task = task_manager.get_task(task_id)
if not task:
return jsonify({'error': 'Tâche non trouvée'}), 404
# Calculer ETA si la tâche est en cours
eta_minutes = None
if task['status'] == 'running' and task['progress'] > 0:
elapsed = (datetime.now() - task['start_time']).total_seconds()
rate = task['progress'] / elapsed # requêtes par seconde
remaining = task['total'] - task['progress']
eta_seconds = remaining / rate if rate > 0 else None
eta_minutes = eta_seconds / 60 if eta_seconds else None
return jsonify({
'status': task['status'],
'progress': task['progress'],
'total': task['total'],
'percentage': round((task['progress'] / task['total']) * 100, 2),
'errors_count': len(task['errors']),
'start_time': task['start_time'].strftime('%Y-%m-%d %H:%M:%S'),
'last_update': task['last_update'].strftime('%Y-%m-%d %H:%M:%S'),
'eta_minutes': round(eta_minutes) if eta_minutes else None
})
@app.route('/download/<task_id>')
def download_results(task_id):
task = task_manager.get_task(task_id)
if not task:
return jsonify({'error': 'Tâche non trouvée'}), 404
results_file = os.path.join(RESULTS_FOLDER, f'results_{task_id}.json')
if not os.path.exists(results_file):
return jsonify({'error': 'Fichier de résultats non trouvé'}), 404
# Vérifier si c'est un téléchargement partiel
is_partial = request.args.get('partial', 'false').lower() == 'true'
if is_partial and task['status'] == 'running':
# Créer un fichier temporaire avec les données actuelles
temp_file = os.path.join(RESULTS_FOLDER, f'temp_results_{task_id}.json')
try:
# Charger les données actuelles
with open(results_file, 'r', encoding='utf-8') as f:
current_data = json.load(f)
# Ajouter des métadonnées pour le téléchargement partiel
current_data["partial_download"] = {
"downloaded_at": datetime.now().isoformat(),
"is_partial": True,
"progress": f"{task['progress']}/{task['total']}",
"percentage": round((task['progress'] / task['total']) * 100, 2)
}
# Sauvegarder le fichier temporaire
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(current_data, f, ensure_ascii=False, indent=2)
return send_file(
temp_file,
as_attachment=True,
download_name=f'donnees_synthetiques_partiel_{task_id}.json'
)
except Exception as e:
return jsonify({'error': f'Erreur lors de la création du fichier partiel: {str(e)}'}), 500
# Téléchargement normal (complet)
download_name = f'donnees_synthetiques_{"complet" if task["status"] == "completed" else "actuel"}_{task_id}.json'
return send_file(
results_file,
as_attachment=True,
download_name=download_name
)
@app.route('/tasks')
def list_tasks():
"""Liste toutes les tâches"""
task_list = []
for task_id, task_info in task_manager.tasks.items():
task_list.append({
'id': task_id,
'status': task_info['status'],
'progress': task_info['progress'],
'total': task_info['total'],
'percentage': round((task_info['progress'] / task_info['total']) * 100, 2),
'start_time': task_info['start_time'].strftime('%Y-%m-%d %H:%M:%S'),
'last_update': task_info['last_update'].strftime('%Y-%m-%d %H:%M:%S'),
'errors_count': len(task_info['errors'])
})
# Trier par heure de début (plus récent en premier)
task_list.sort(key=lambda x: x['start_time'], reverse=True)
return jsonify(task_list)
@app.route('/cleanup')
def cleanup_temp_files():
"""Nettoyer les fichiers temporaires"""
try:
temp_files_deleted = 0
for filename in os.listdir(RESULTS_FOLDER):
if filename.startswith('temp_results_') and filename.endswith('.json'):
file_path = os.path.join(RESULTS_FOLDER, filename)
os.remove(file_path)
temp_files_deleted += 1
return jsonify({
'message': f'{temp_files_deleted} fichiers temporaires supprimés'
})
except Exception as e:
return jsonify({'error': f'Erreur lors du nettoyage: {str(e)}'}), 500
@app.route('/preview/<task_id>')
def preview_results(task_id):
"""Aperçu des résultats JSON pour debug"""
task = task_manager.get_task(task_id)
if not task:
return jsonify({'error': 'Tâche non trouvée'}), 404
results_file = os.path.join(RESULTS_FOLDER, f'results_{task_id}.json')
if not os.path.exists(results_file):
return jsonify({'error': 'Fichier de résultats non trouvé'}), 404
try:
with open(results_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Retourner un aperçu des données
preview = {
"metadata": data.get("metadata", {}),
"summary": data.get("summary", {}),
"sample_requests": data.get("requests", [])[:3], # 3 premiers échantillons
"total_requests": len(data.get("requests", []))
}
return jsonify(preview)
except Exception as e:
return jsonify({'error': f'Erreur lors de la lecture du fichier: {str(e)}'}), 500
if __name__ == '__main__':
print("🚀 Démarrage du serveur Flask...")
print("📂 Dossiers créés:", UPLOAD_FOLDER, RESULTS_FOLDER)
print("🌐 Application disponible sur: http://localhost:5000")
print("📊 Sortie JSON structurée activée avec schémas Pydantic")
print("🔧 Modèle utilisé:", MODEL_ID)
app.run(debug=True, threaded=True)