|
import os |
|
import json |
|
import mimetypes |
|
from flask import Flask, request, session, jsonify, redirect, url_for, flash, render_template |
|
from dotenv import load_dotenv |
|
import google.generativeai as genai |
|
from werkzeug.utils import secure_filename |
|
import markdown |
|
from flask_session import Session |
|
import pprint |
|
|
|
|
|
load_dotenv() |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
|
app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY', 'une-super-cle-secrete-a-changer') |
|
|
|
|
|
UPLOAD_FOLDER = 'temp' |
|
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg'} |
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER |
|
app.config['MAX_CONTENT_LENGTH'] = 25 * 1024 * 1024 |
|
|
|
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
print(f"Dossier d'upload configuré : {os.path.abspath(UPLOAD_FOLDER)}") |
|
|
|
|
|
app.config['SESSION_TYPE'] = 'filesystem' |
|
app.config['SESSION_PERMANENT'] = False |
|
app.config['SESSION_USE_SIGNER'] = True |
|
app.config['SESSION_FILE_DIR'] = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'flask_session') |
|
|
|
app.config['SESSION_COOKIE_SAMESITE'] = 'None' |
|
|
|
app.config['SESSION_COOKIE_SECURE'] = True |
|
|
|
|
|
os.makedirs(app.config['SESSION_FILE_DIR'], exist_ok=True) |
|
print(f"Dossier pour les sessions serveur configuré : {app.config['SESSION_FILE_DIR']}") |
|
|
|
|
|
server_session = Session(app) |
|
|
|
|
|
MODEL_FLASH = 'gemini-2.0-flash' |
|
MODEL_PRO = 'gemini-2.5-pro-exp-03-25' |
|
SYSTEM_INSTRUCTION = "Tu es un assistant intelligent et amical nommé Mariam. Tu assistes les utilisateurs au mieux de tes capacités. Tu as été créé par Aenir." |
|
SAFETY_SETTINGS = [ |
|
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, |
|
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, |
|
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, |
|
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
|
] |
|
GEMINI_CONFIGURED = False |
|
try: |
|
gemini_api_key = os.getenv("GOOGLE_API_KEY") |
|
if not gemini_api_key: |
|
print("ERREUR: Clé API GOOGLE_API_KEY manquante dans le fichier .env") |
|
else: |
|
genai.configure(api_key=gemini_api_key) |
|
models_list = [m.name for m in genai.list_models()] |
|
if f'models/{MODEL_FLASH}' in models_list and f'models/{MODEL_PRO}' in models_list: |
|
print(f"Configuration Gemini effectuée. Modèles requis ({MODEL_FLASH}, {MODEL_PRO}) disponibles.") |
|
print(f"System instruction: {SYSTEM_INSTRUCTION}") |
|
GEMINI_CONFIGURED = True |
|
else: |
|
print(f"ERREUR: Les modèles requis ({MODEL_FLASH}, {MODEL_PRO}) ne sont pas tous disponibles via l'API.") |
|
print(f"Modèles trouvés: {models_list}") |
|
|
|
except Exception as e: |
|
print(f"ERREUR Critique lors de la configuration initiale de Gemini : {e}") |
|
print("L'application fonctionnera sans les fonctionnalités IA.") |
|
|
|
|
|
|
|
def allowed_file(filename): |
|
"""Vérifie si l'extension du fichier est autorisée.""" |
|
return '.' in filename and \ |
|
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
|
def prepare_gemini_history(chat_history): |
|
"""Convertit l'historique stocké en session au format attendu par Gemini API.""" |
|
print(f"--- DEBUG [prepare_gemini_history]: Entrée avec {len(chat_history)} messages") |
|
gemini_history = [] |
|
for i, message in enumerate(list(chat_history)): |
|
role = 'user' if message.get('role') == 'user' else 'model' |
|
text_part = message.get('raw_text') |
|
|
|
print(f" [prepare_gemini_history] Message {i} (rôle session: {message.get('role')}, rôle gemini: {role}): raw_text présent? {'Oui' if text_part is not None else 'NON'}, contenu début: '{str(text_part)[:60]}...'") |
|
|
|
if text_part: |
|
parts = [text_part] |
|
gemini_history.append({'role': role, 'parts': parts}) |
|
else: |
|
|
|
print(f" AVERTISSEMENT [prepare_gemini_history]: raw_text vide ou absent pour le message {i}, ignoré pour l'historique Gemini.") |
|
|
|
print(f"--- DEBUG [prepare_gemini_history]: Sortie avec {len(gemini_history)} messages formatés pour Gemini") |
|
return gemini_history |
|
|
|
|
|
|
|
@app.route('/') |
|
def root(): |
|
"""Sert la page HTML principale.""" |
|
print("--- LOG: Appel route '/' ---") |
|
return render_template('index.html') |
|
|
|
@app.route('/api/history', methods=['GET']) |
|
def get_history(): |
|
"""Fournit l'historique de chat stocké en session au format JSON.""" |
|
print("\n--- DEBUG [/api/history]: Début requête GET ---") |
|
if 'chat_history' not in session: |
|
session['chat_history'] = [] |
|
print(" [/api/history]: Session 'chat_history' initialisée (vide).") |
|
|
|
display_history = [] |
|
current_history = session.get('chat_history', []) |
|
print(f" [/api/history]: Historique récupéré de la session serveur: {len(current_history)} messages.") |
|
|
|
|
|
|
|
|
|
|
|
for i, msg in enumerate(current_history): |
|
|
|
if isinstance(msg, dict) and 'role' in msg and 'text' in msg: |
|
display_history.append({ |
|
'role': msg.get('role'), |
|
'text': msg.get('text') |
|
}) |
|
else: |
|
|
|
print(f" AVERTISSEMENT [/api/history]: Format invalide dans l'historique session au message {i}: {msg}") |
|
|
|
print(f" [/api/history]: Historique préparé pour le frontend: {len(display_history)} messages.") |
|
return jsonify({'success': True, 'history': display_history}) |
|
|
|
@app.route('/api/chat', methods=['POST']) |
|
def chat_api(): |
|
"""Gère les nouvelles requêtes de chat via AJAX.""" |
|
print(f"\n---===================================---") |
|
print(f"--- DEBUG [/api/chat]: Nouvelle requête POST ---") |
|
|
|
if not GEMINI_CONFIGURED: |
|
print("--- ERREUR [/api/chat]: Tentative d'appel sans configuration Gemini valide.") |
|
return jsonify({'success': False, 'error': "Le service IA n'est pas configuré correctement."}), 503 |
|
|
|
|
|
prompt = request.form.get('prompt', '').strip() |
|
use_web_search = request.form.get('web_search', 'false').lower() == 'true' |
|
file = request.files.get('file') |
|
use_advanced = request.form.get('advanced_reasoning', 'false').lower() == 'true' |
|
|
|
print(f" [/api/chat]: Prompt reçu: '{prompt[:50]}...'") |
|
print(f" [/api/chat]: Recherche Web: {use_web_search}, Raisonnement Avancé: {use_advanced}") |
|
print(f" [/api/chat]: Fichier: {file.filename if file else 'Aucun'}") |
|
|
|
|
|
if not prompt and not file: |
|
print("--- ERREUR [/api/chat]: Prompt et fichier vides.") |
|
return jsonify({'success': False, 'error': 'Veuillez fournir un message ou un fichier.'}), 400 |
|
|
|
|
|
if 'chat_history' not in session: |
|
session['chat_history'] = [] |
|
history_before_user_add = list(session.get('chat_history', [])) |
|
print(f"--- DEBUG [/api/chat]: Historique en session AVANT ajout user message: {len(history_before_user_add)} messages") |
|
|
|
|
|
|
|
|
|
|
|
uploaded_gemini_file = None |
|
uploaded_filename = None |
|
filepath_to_delete = None |
|
|
|
|
|
if file and file.filename != '': |
|
print(f"--- LOG [/api/chat]: Traitement du fichier '{file.filename}'") |
|
if allowed_file(file.filename): |
|
try: |
|
filename = secure_filename(file.filename) |
|
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
|
file.save(filepath) |
|
filepath_to_delete = filepath |
|
uploaded_filename = filename |
|
print(f" [/api/chat]: Fichier '{filename}' sauvegardé dans '{filepath}'") |
|
mime_type = mimetypes.guess_type(filepath)[0] or 'application/octet-stream' |
|
print(f" [/api/chat]: Upload Google AI (Mime: {mime_type})...") |
|
uploaded_gemini_file = genai.upload_file(path=filepath, mime_type=mime_type) |
|
print(f" [/api/chat]: Fichier Google AI '{uploaded_gemini_file.name}' uploadé.") |
|
except Exception as e: |
|
print(f"--- ERREUR [/api/chat]: Échec traitement/upload fichier '{filename}': {e}") |
|
if filepath_to_delete and os.path.exists(filepath_to_delete): |
|
try: os.remove(filepath_to_delete) |
|
except OSError as del_e: print(f" Erreur suppression fichier temp après erreur: {del_e}") |
|
return jsonify({'success': False, 'error': f"Erreur traitement fichier: {e}"}), 500 |
|
else: |
|
print(f"--- ERREUR [/api/chat]: Type de fichier non autorisé: {file.filename}") |
|
return jsonify({'success': False, 'error': f"Type de fichier non autorisé."}), 400 |
|
|
|
|
|
raw_user_text = prompt |
|
display_user_text = f"[{uploaded_filename}] {prompt}" if uploaded_filename and prompt else (prompt or f"[{uploaded_filename}]") |
|
user_history_entry = { |
|
'role': 'user', |
|
'text': display_user_text, |
|
'raw_text': raw_user_text, |
|
} |
|
|
|
|
|
if not isinstance(session.get('chat_history'), list): |
|
print("--- ERREUR [/api/chat]: 'chat_history' n'est pas une liste! Réinitialisation.") |
|
session['chat_history'] = [] |
|
session['chat_history'].append(user_history_entry) |
|
|
|
|
|
history_after_user_add = list(session.get('chat_history', [])) |
|
print(f"--- DEBUG [/api/chat]: Historique en session APRES ajout user message: {len(history_after_user_add)} messages") |
|
|
|
|
|
|
|
|
|
|
|
current_gemini_parts = [] |
|
|
|
if uploaded_gemini_file and not raw_user_text: |
|
raw_user_text = f"Décris le contenu de ce fichier : {uploaded_filename}" |
|
final_prompt_for_gemini = raw_user_text |
|
current_gemini_parts.append(uploaded_gemini_file) |
|
current_gemini_parts.append(final_prompt_for_gemini) |
|
print(f" [/api/chat]: Fichier seul détecté, prompt généré: '{final_prompt_for_gemini}'") |
|
elif uploaded_gemini_file and raw_user_text: |
|
final_prompt_for_gemini = raw_user_text |
|
current_gemini_parts.append(uploaded_gemini_file) |
|
current_gemini_parts.append(final_prompt_for_gemini) |
|
else: |
|
final_prompt_for_gemini = raw_user_text |
|
if final_prompt_for_gemini: |
|
current_gemini_parts.append(final_prompt_for_gemini) |
|
|
|
|
|
selected_model_name = MODEL_PRO if use_advanced else MODEL_FLASH |
|
|
|
|
|
if not current_gemini_parts: |
|
print("--- ERREUR [/api/chat]: Aucune donnée (texte ou fichier valide) à envoyer après traitement.") |
|
if session.get('chat_history'): session['chat_history'].pop() |
|
return jsonify({'success': False, 'error': "Impossible d'envoyer une requête vide."}), 400 |
|
|
|
|
|
try: |
|
|
|
|
|
history_for_gemini_prep = list(session.get('chat_history', []))[:-1] |
|
gemini_history_to_send = prepare_gemini_history(history_for_gemini_prep) |
|
|
|
|
|
contents_for_gemini = gemini_history_to_send + [{'role': 'user', 'parts': current_gemini_parts}] |
|
|
|
|
|
print(f"--- DEBUG [/api/chat]: Préparation de l'envoi à l'API Gemini (Modèle: {selected_model_name}) ---") |
|
print(f" Nombre total de tours (historique + actuel): {len(contents_for_gemini)}") |
|
print(f" Nombre de messages d'historique formatés envoyés: {len(gemini_history_to_send)}") |
|
print(f" Google Search activé: {use_web_search}") |
|
print(f" Contenu détaillé des 'parts' envoyées:") |
|
for i, turn in enumerate(contents_for_gemini): |
|
role = turn.get('role') |
|
parts_details = [] |
|
for part in turn.get('parts', []): |
|
if isinstance(part, str): |
|
parts_details.append(f"Text({len(part)} chars): '{part[:60].replace(chr(10), ' ')}...'") |
|
elif hasattr(part, 'name') and hasattr(part, 'mime_type'): |
|
parts_details.append(f"File(name={part.name}, mime={part.mime_type})") |
|
else: |
|
parts_details.append(f"UnknownPart({type(part)})") |
|
print(f" Turn {i} (role: {role}): {', '.join(parts_details)}") |
|
print("--------------------------------------------------------------------") |
|
|
|
|
|
active_model = genai.GenerativeModel( |
|
model_name=selected_model_name, |
|
safety_settings=SAFETY_SETTINGS, |
|
system_instruction=SYSTEM_INSTRUCTION |
|
) |
|
print(f"--- LOG [/api/chat]: Envoi de la requête à {selected_model_name}...") |
|
|
|
|
|
if use_web_search: |
|
print(f"--- LOG [/api/chat]: Activation Google Search Retrieval...") |
|
response = active_model.generate_content( |
|
contents=contents_for_gemini, |
|
tools='google_search_retrieval' |
|
) |
|
else: |
|
response = active_model.generate_content( |
|
contents=contents_for_gemini |
|
) |
|
|
|
|
|
response_text_raw = "" |
|
response_html = "" |
|
search_metadata = None |
|
|
|
try: |
|
if response.parts: |
|
response_text_raw = response.text |
|
print(f"--- LOG [/api/chat]: Réponse reçue de Gemini (brute, début): '{response_text_raw[:100]}...'") |
|
|
|
|
|
if use_web_search and hasattr(response, 'candidates') and response.candidates: |
|
candidate = response.candidates[0] |
|
if hasattr(candidate, 'citation_metadata'): |
|
metadata = candidate.citation_metadata |
|
search_metadata = {} |
|
|
|
|
|
if hasattr(metadata, 'citations'): |
|
search_pages = [] |
|
for citation in metadata.citations: |
|
if hasattr(citation, 'start_index') and hasattr(citation, 'end_index'): |
|
search_pages.append({ |
|
"title": citation.title if hasattr(citation, 'title') else "Sans titre", |
|
"url": citation.uri if hasattr(citation, 'uri') else "#", |
|
"snippet": f"Citation: {citation.start_index}-{citation.end_index}" |
|
}) |
|
if search_pages: |
|
search_metadata["search_pages"] = search_pages |
|
|
|
print(f"--- LOG [/api/chat]: Métadonnées de citation récupérées:") |
|
if search_metadata and search_metadata.get("search_pages"): |
|
print(f" Pages: {len(search_metadata['search_pages'])} sources") |
|
else: |
|
feedback_info = f"Feedback: {response.prompt_feedback}" if response.prompt_feedback else "Pas de feedback détaillé." |
|
print(f"--- AVERTISSEMENT [/api/chat]: Réponse Gemini sans 'parts'. {feedback_info}") |
|
if response.prompt_feedback and response.prompt_feedback.block_reason: |
|
reason = response.prompt_feedback.block_reason.name |
|
response_text_raw = f"Désolé, ma réponse a été bloquée ({reason})." |
|
|
|
|
|
response_html = markdown.markdown(response_text_raw, extensions=['extra']) |
|
print(f"--- LOG [/api/chat]: Réponse convertie en HTML") |
|
|
|
|
|
assistant_history_entry = { |
|
'role': 'assistant', |
|
'text': response_html, |
|
'raw_text': response_text_raw, |
|
'search_metadata': search_metadata |
|
} |
|
|
|
|
|
if not isinstance(session.get('chat_history'), list): |
|
print("--- ERREUR [/api/chat]: 'chat_history' n'est pas une liste pendant l'ajout de réponse!") |
|
session['chat_history'] = [] |
|
session['chat_history'].append(assistant_history_entry) |
|
|
|
|
|
if filepath_to_delete and os.path.exists(filepath_to_delete): |
|
try: |
|
os.remove(filepath_to_delete) |
|
print(f"--- LOG [/api/chat]: Fichier temporaire '{filepath_to_delete}' supprimé.") |
|
except OSError as e: |
|
print(f"--- ERREUR [/api/chat]: Impossible de supprimer le fichier temporaire: {e}") |
|
|
|
|
|
response_object = { |
|
'success': True, |
|
'text': response_html, |
|
'search_metadata': search_metadata |
|
} |
|
return jsonify(response_object) |
|
|
|
except Exception as process_error: |
|
print(f"--- ERREUR [/api/chat]: Traitement de la réponse: {process_error}") |
|
|
|
if filepath_to_delete and os.path.exists(filepath_to_delete): |
|
try: os.remove(filepath_to_delete) |
|
except OSError as e: print(f" Erreur suppression fichier temp après erreur: {e}") |
|
return jsonify({'success': False, 'error': f"Erreur traitement réponse: {process_error}"}), 500 |
|
|
|
except Exception as call_error: |
|
print(f"--- ERREUR [/api/chat]: Appel à l'API Gemini: {call_error}") |
|
|
|
if filepath_to_delete and os.path.exists(filepath_to_delete): |
|
try: os.remove(filepath_to_delete) |
|
except OSError as e: print(f" Erreur suppression fichier temp après erreur: {e}") |
|
return jsonify({'success': False, 'error': f"Erreur API Gemini: {call_error}"}), 500 |
|
|
|
if __name__ == '__main__': |
|
|
|
if 'GOOGLE_API_KEY' not in os.environ: |
|
print("Attention: La variable d'environnement GOOGLE_API_KEY n'est pas définie.") |
|
print("Vous devrez la définir avant d'exécuter l'application:") |
|
print("export GOOGLE_API_KEY=votre_clé_api") |
|
|
|
app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 5000))) |