Spaces:
Sleeping
Sleeping
from flask import Flask, render_template, request, jsonify, Response | |
import google.generativeai as genai | |
import os | |
from PIL import Image | |
import tempfile | |
import io | |
import uuid | |
import time | |
app = Flask(__name__) | |
# Configuration Gemini | |
token = os.environ.get("TOKEN") | |
genai.configure(api_key=token) | |
safety_settings = [ | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
] | |
mm = """ resous cet exercice. tu répondras en détaillant au maximum ton procédé de calcul. réponse attendue uniquement en Latex""" | |
model = genai.GenerativeModel( | |
model_name="gemini-exp-1206", | |
safety_settings=safety_settings | |
) | |
# Dictionnaire pour stocker les réponses en cours de génération | |
pending_responses = {} | |
def home(): | |
return render_template('index.html') | |
def generate(): | |
if 'image' not in request.files: | |
return jsonify({'error': 'No image uploaded'}), 400 | |
image_file = request.files['image'] | |
request_id = str(uuid.uuid4()) # Générer un identifiant unique | |
# Sauvegarder temporairement l'image | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file: | |
image_file.save(temp_file.name) | |
try: | |
image = Image.open(temp_file.name) | |
# Convertir l'image en bytes pour le streaming | |
img_byte_arr = io.BytesIO() | |
image.save(img_byte_arr, format='PNG') | |
img_byte_arr = img_byte_arr.getvalue() | |
# Stocker la tâche de génération dans le dictionnaire | |
pending_responses[request_id] = { | |
'status': 'processing', | |
'response': model.generate_content([mm, {"mime_type": "image/png", "data": img_byte_arr}], stream=True) | |
} | |
return jsonify({'request_id': request_id}) | |
except Exception as e: | |
return jsonify({'error': str(e)}), 500 | |
finally: | |
# Nettoyer le fichier temporaire | |
os.unlink(temp_file.name) | |
def stream(request_id): | |
def generate_stream(): | |
while request_id in pending_responses and pending_responses[request_id]['status'] == 'processing': | |
try: | |
chunk = next(pending_responses[request_id]['response']) | |
yield f"data: {chunk.text}\n\n" | |
except StopIteration: | |
pending_responses[request_id]['status'] = 'completed' | |
except Exception as e: | |
yield f"data: Error: {str(e)}\n\n" | |
pending_responses[request_id]['status'] = 'error' | |
time.sleep(0.1) # Attendre un peu avant de vérifier à nouveau | |
if request_id in pending_responses: | |
del pending_responses[request_id] | |
return Response(generate_stream(), mimetype='text/event-stream') | |
if __name__ == '__main__': | |
app.run(debug=True) |