Testpdf / app.py
Docfile's picture
Update app.py
f59ae01 verified
raw
history blame
7.75 kB
from flask import Flask, render_template, request, jsonify, Response, stream_with_context
from google import genai
import os
from PIL import Image
import io
import base64
import json
import requests
import threading
import uuid
import time
app = Flask(__name__)
# API Keys
GOOGLE_API_KEY = os.environ.get("GEMINI_API_KEY")
TELEGRAM_BOT_TOKEN = "8004545342:AAGcZaoDjYg8dmbbXRsR1N3TfSSbEiAGz88"
TELEGRAM_CHAT_ID = "-1002497861230"
# Initialize Gemini client
client = genai.Client(api_key=GOOGLE_API_KEY)
# Dictionnaire pour stocker les résultats des tâches en cours
task_results = {}
def send_to_telegram(image_data, caption="Nouvelle image uploadée"):
"""Envoie l'image à un chat Telegram spécifié"""
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendPhoto"
files = {'photo': ('image.png', image_data)}
data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption}
response = requests.post(url, files=files, data=data)
if response.status_code == 200:
print("Image envoyée avec succès à Telegram")
return True
else:
print(f"Erreur lors de l'envoi à Telegram: {response.text}")
return False
except Exception as e:
print(f"Exception lors de l'envoi à Telegram: {e}")
return False
def send_document_to_telegram(text_content, filename="reponse.txt", caption="Réponse"):
"""Envoie un fichier texte à un chat Telegram spécifié"""
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendDocument"
files = {'document': (filename, text_content.encode('utf-8'), 'text/plain')}
data = {'chat_id': TELEGRAM_CHAT_ID, 'caption': caption}
response = requests.post(url, files=files, data=data)
if response.status_code == 200:
print("Document envoyé avec succès à Telegram")
return True
else:
print(f"Erreur lors de l'envoi du document à Telegram: {response.text}")
return False
except Exception as e:
print(f"Exception lors de l'envoi du document à Telegram: {e}")
return False
def process_image_background(task_id, image_data):
"""Traite l'image en arrière-plan et met à jour le statut de la tâche"""
try:
# Mettre à jour le statut
task_results[task_id]['status'] = 'processing'
# Ouvrir l'image pour la traiter
img = Image.open(io.BytesIO(image_data))
# Traitement pour Gemini
buffered = io.BytesIO()
img.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
# Générer une réponse complète
full_response = ""
try:
response = client.models.generate_content(
model="gemini-2.5-pro-exp-03-25",
contents=[
{'inline_data': {'mime_type': 'image/png', 'data': img_str}},
"Résous ça en français with rendering latex"
])
# Extraire le texte complet
for part in response.candidates[0].content.parts:
full_response += part.text
# Envoyer la réponse par Telegram
send_document_to_telegram(
full_response,
filename=f"reponse_{task_id}.txt",
caption=f"Réponse pour la tâche {task_id}"
)
# Mettre à jour le résultat
task_results[task_id]['status'] = 'completed'
task_results[task_id]['response'] = full_response
except Exception as e:
print(f"Error during generation: {e}")
task_results[task_id]['status'] = 'error'
task_results[task_id]['error'] = str(e)
except Exception as e:
print(f"Exception in background task: {e}")
task_results[task_id]['status'] = 'error'
task_results[task_id]['error'] = str(e)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/solve', methods=['POST'])
def solve():
try:
# Lire l'image
image_data = request.files['image'].read()
# Envoyer l'image à Telegram
send_to_telegram(image_data, "Nouvelle image pour résolution")
# Créer un identifiant unique pour cette tâche
task_id = str(uuid.uuid4())
# Initialiser le dictionnaire de résultats pour cette tâche
task_results[task_id] = {
'status': 'pending',
'response': '',
'time_started': time.time()
}
# Lancer le traitement en arrière-plan
threading.Thread(
target=process_image_background,
args=(task_id, image_data)
).start()
# Retourner immédiatement l'ID de la tâche
return jsonify({
'task_id': task_id,
'status': 'pending'
})
except Exception as e:
print(f"Exception during task creation: {e}")
return jsonify({'error': 'Une erreur inattendue est survenue'}), 500
@app.route('/task/<task_id>', methods=['GET'])
def get_task_status(task_id):
"""Récupère le statut d'une tâche en cours"""
if task_id not in task_results:
return jsonify({'error': 'Tâche introuvable'}), 404
task = task_results[task_id]
# Nettoyer les tâches terminées après 30 minutes
current_time = time.time()
if (task['status'] in ['completed', 'error'] and
current_time - task.get('time_started', 0) > 1800):
# Ne pas supprimer maintenant, mais prévoir un nettoyage futur
pass
response = {
'status': task['status']
}
if task['status'] == 'completed':
response['response'] = task['response']
elif task['status'] == 'error':
response['error'] = task.get('error', 'Une erreur inconnue est survenue')
return jsonify(response)
@app.route('/stream/<task_id>', methods=['GET'])
def stream_task_progress(task_id):
"""Stream les mises à jour de progression d'une tâche"""
def generate():
if task_id not in task_results:
yield f'data: {json.dumps({"error": "Tâche introuvable"})}\n\n'
return
last_status = None
while True:
task = task_results.get(task_id)
if not task:
yield f'data: {json.dumps({"error": "Tâche introuvable"})}\n\n'
break
current_status = task['status']
# Si le statut a changé, envoyer une mise à jour
if current_status != last_status:
yield f'data: {json.dumps({"status": current_status})}\n\n'
last_status = current_status
# Si la tâche est terminée ou en erreur, envoyer les résultats et terminer
if current_status == 'completed':
yield f'data: {json.dumps({"status": "completed", "response": task["response"]})}\n\n'
break
elif current_status == 'error':
yield f'data: {json.dumps({"status": "error", "error": task.get("error", "Une erreur est survenue")})}\n\n'
break
# Attendre un peu avant la prochaine vérification
time.sleep(0.5)
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
if __name__ == '__main__':
app.run(debug=True)