Spaces:
Sleeping
Sleeping
import streamlit as st | |
import google.generativeai as genai | |
import os | |
from dotenv import load_dotenv | |
from PIL import Image | |
import tempfile | |
import time | |
import ssl | |
# Charger les variables d'environnement | |
load_dotenv() | |
# Configurer la clé API | |
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
# Paramètres de sécurité | |
safety_settings = [ | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
] | |
def role_to_streamlit(role): | |
return "assistant" if role == "model" else role | |
def upload_and_process_file(file_path): | |
max_retries = 3 | |
retry_delay = 2 | |
for attempt in range(max_retries): | |
try: | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"Le fichier {file_path} n'existe pas") | |
file_size = os.path.getsize(file_path) | |
if file_size == 0: | |
raise ValueError(f"Le fichier {file_path} est vide") | |
uploaded_file = genai.upload_file(path=file_path) | |
timeout = 300 | |
start_time = time.time() | |
while uploaded_file.state.name == "PROCESSING": | |
if time.time() - start_time > timeout: | |
raise TimeoutError("Timeout pendant le traitement du fichier") | |
time.sleep(10) | |
uploaded_file = genai.get_file(uploaded_file.name) | |
if uploaded_file.state.name == "FAILED": | |
raise ValueError(f"Échec du traitement: {uploaded_file.state.name}") | |
return uploaded_file | |
except Exception as e: | |
if attempt < max_retries - 1: | |
time.sleep(retry_delay * (attempt + 1)) | |
else: | |
raise | |
def allowed_file(filename): | |
ALLOWED_EXTENSIONS = {'txt','mp4','mp3','pdf', 'png', 'jpg', 'jpeg', 'gif'} | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
# Initialiser le modèle | |
model = genai.GenerativeModel('gemini-1.5-flash', | |
safety_settings=safety_settings, | |
system_instruction="Tu es un assistant intelligent. ton but est d'assister au mieux que tu peux. tu as été créé par Aenir et tu t'appelles Mariam") | |
# Configuration de la page Streamlit | |
st.set_page_config(page_title="Mariam - Assistant IA", page_icon="🤖") | |
st.title("Mariam AI - Chat Intelligent") | |
# Initialiser l'historique de chat | |
if "chat" not in st.session_state: | |
st.session_state.chat = model.start_chat(history=[]) | |
# CSS personnalisé (amélioré) | |
st.markdown(""" | |
<style> | |
/* Conteneur principal pour la zone de saisie et les uploads */ | |
.input-area { | |
display: flex; | |
align-items: center; | |
gap: 10px; | |
margin-bottom: 10px; | |
} | |
/* Style pour la zone de saisie */ | |
.chat-input { | |
flex-grow: 1; /* Permet à la zone de saisie de prendre l'espace disponible */ | |
} | |
/* Style pour les conteneurs d'upload */ | |
.upload-container { | |
display: flex; | |
align-items: center; | |
gap: 5px; | |
} | |
/* Style pour les icônes (ajustez la taille si nécessaire) */ | |
.upload-icon { | |
font-size: 1.5em; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# Afficher l'historique des messages | |
for message in st.session_state.chat.history: | |
with st.chat_message(role_to_streamlit(message.role)): | |
st.markdown(message.parts[0].text) | |
if len(message.parts) > 1: | |
for part in message.parts[1:]: | |
if hasattr(part, 'image'): | |
st.image(part.image) | |
# Créer le conteneur principal pour la zone de saisie et les uploads | |
input_area = st.container() | |
with input_area: | |
# Zone de saisie | |
prompt = st.chat_input("Que puis-je faire pour vous ?", key="chat_input") | |
# Colonnes pour les icônes d'upload (utiliser plus de colonnes pour un espacement plus fin) | |
ucol1, ucol2, ucol3, ucol4 = st.columns([1,1,1,1]) | |
with ucol1: | |
# Icône d'upload de fichiers | |
st.markdown("<span class='upload-icon'>📁</span>", unsafe_allow_html=True) | |
with ucol2: | |
# Upload de fichiers | |
uploaded_files = st.file_uploader("", type=["txt", "mp4", "mp3", "pdf"], | |
accept_multiple_files=True, key="files", | |
label_visibility="collapsed") | |
with ucol3: | |
# Icône d'upload d'images | |
st.markdown("<span class='upload-icon'>📸</span>", unsafe_allow_html=True) | |
with ucol4: | |
# Upload d'images | |
uploaded_images = st.file_uploader("", type=["jpg", "jpeg", "png", "gif"], | |
accept_multiple_files=True, key="images", | |
label_visibility="collapsed") | |
# Appliquer les styles aux éléments | |
st.markdown(f""" | |
<style> | |
div[data-testid='stChatInput'] {{ | |
flex-grow: 1; | |
}} | |
div[data-testid='stFileUploader'] {{ | |
display: inline-flex; | |
padding-left: 0; | |
padding-right: 0; | |
padding-bottom: 0; | |
padding-top: 0; | |
margin-left: 0; | |
margin-right: 0; | |
}} | |
div[data-testid='stFileUploader'] > div:nth-child(2) {{ | |
display: none; | |
}} | |
</style> | |
""", unsafe_allow_html=True) | |
if prompt: | |
content = [prompt] | |
temp_files = [] | |
try: | |
# Traitement des images | |
if uploaded_images: | |
for img_file in uploaded_images: | |
if allowed_file(img_file.name): | |
image = Image.open(img_file) | |
content.append(image) | |
st.chat_message("user").image(image) | |
# Traitement des autres fichiers | |
if uploaded_files: | |
for file in uploaded_files: | |
if allowed_file(file.name): | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.name)[1]) as temp_file: | |
temp_file.write(file.getvalue()) | |
temp_files.append(temp_file.name) | |
uploaded_file = upload_and_process_file(temp_file.name) | |
content.append(uploaded_file) | |
# Afficher le message utilisateur | |
st.chat_message("user").markdown(prompt) | |
# Envoyer le message et afficher la réponse | |
response = st.session_state.chat.send_message(content) | |
with st.chat_message("assistant"): | |
st.markdown(response.text) | |
except Exception as e: | |
st.error(f"Une erreur est survenue : {str(e)}") | |
finally: | |
# Nettoyage des fichiers temporaires | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except Exception as e: | |
print(f"Erreur lors de la suppression du fichier temporaire {temp_file}: {e}") |