Docfile's picture
Update app.py
4c4251c verified
raw
history blame
3.13 kB
from flask import Flask, render_template, request, jsonify, Response
import google.generativeai as genai
import os
from PIL import Image
import tempfile
import io
import uuid
import time
app = Flask(__name__)
# Configuration Gemini
token = os.environ.get("TOKEN")
genai.configure(api_key=token)
safety_settings = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
]
mm = """ resous cet exercice. tu répondras en détaillant au maximum ton procédé de calcul. réponse attendue uniquement en Latex"""
model = genai.GenerativeModel(
model_name="gemini-exp-1206",
safety_settings=safety_settings
)
# Dictionnaire pour stocker les réponses en cours de génération
pending_responses = {}
@app.route('/')
def home():
return render_template('index.html')
@app.route('/generate', methods=['POST'])
def generate():
if 'image' not in request.files:
return jsonify({'error': 'No image uploaded'}), 400
image_file = request.files['image']
request_id = str(uuid.uuid4()) # Générer un identifiant unique
# Sauvegarder temporairement l'image
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as temp_file:
image_file.save(temp_file.name)
try:
image = Image.open(temp_file.name)
# Convertir l'image en bytes pour le streaming
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format='PNG')
img_byte_arr = img_byte_arr.getvalue()
# Stocker la tâche de génération dans le dictionnaire
pending_responses[request_id] = {
'status': 'processing',
'response': model.generate_content([mm, {"mime_type": "image/png", "data": img_byte_arr}], stream=True)
}
return jsonify({'request_id': request_id})
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
# Nettoyer le fichier temporaire
os.unlink(temp_file.name)
@app.route('/stream/<request_id>')
def stream(request_id):
def generate_stream():
while request_id in pending_responses and pending_responses[request_id]['status'] == 'processing':
try:
chunk = next(pending_responses[request_id]['response'])
yield f"data: {chunk.text}\n\n"
except StopIteration:
pending_responses[request_id]['status'] = 'completed'
except Exception as e:
yield f"data: Error: {str(e)}\n\n"
pending_responses[request_id]['status'] = 'error'
time.sleep(0.1) # Attendre un peu avant de vérifier à nouveau
if request_id in pending_responses:
del pending_responses[request_id]
return Response(generate_stream(), mimetype='text/event-stream')
if __name__ == '__main__':
app.run(debug=True)