Spaces:
Sleeping
Sleeping
import streamlit as st | |
import google.generativeai as genai | |
import os | |
import tempfile | |
import PIL.Image | |
import time | |
import ssl | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Configure the API key | |
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
safety_settings = [ | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
] | |
model = genai.GenerativeModel('gemini-1.5-flash-002', safety_settings=safety_settings, | |
system_instruction="Tu es un assistant intelligent. ton but est d'assister au mieux que tu peux. tu as été créé par Aenir et tu t'appelles Mariam") | |
# Function to get response from the model | |
# Gemini uses 'model' for assistant; Streamlit uses 'assistant' | |
def role_to_streamlit(role): | |
if role == "model": | |
return "assistant" | |
else: | |
return role | |
# Add a Gemini Chat history object to Streamlit session state | |
if "chat" not in st.session_state: | |
st.session_state.chat = model.start_chat(history=[]) | |
# Display Form Title | |
st.title("Mariam AI!") | |
# File uploader | |
uploaded_files = st.file_uploader("Choose a file", accept_multiple_files=True) | |
# Display chat messages from history above current input box | |
for message in st.session_state.chat.history: | |
with st.chat_message(role_to_streamlit(message.role)): | |
st.markdown(message.parts[0].text) | |
def upload_and_process_file(file_path): | |
"""Upload et traite un fichier avec l'API Gemini avec gestion des erreurs améliorée""" | |
max_retries = 3 | |
retry_delay = 2 # secondes | |
for attempt in range(max_retries): | |
try: | |
print(f"Tentative d'upload {attempt + 1}/{max_retries} pour {file_path}") | |
# Vérification du fichier | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"Le fichier {file_path} n'existe pas") | |
file_size = os.path.getsize(file_path) | |
if file_size == 0: | |
raise ValueError(f"Le fichier {file_path} est vide") | |
# Upload du fichier | |
uploaded_file = genai.upload_file(path=file_path) | |
print(f"Upload réussi: {uploaded_file.uri}") | |
# Attente du traitement | |
timeout = 300 # 5 minutes | |
start_time = time.time() | |
while uploaded_file.state.name == "PROCESSING": | |
if time.time() - start_time > timeout: | |
raise TimeoutError("Timeout pendant le traitement du fichier") | |
print( | |
f"En attente du traitement... Temps écoulé: {int(time.time() - start_time)}s") | |
time.sleep(10) | |
uploaded_file = genai.get_file(uploaded_file.name) | |
if uploaded_file.state.name == "FAILED": | |
raise ValueError( | |
f"Échec du traitement: {uploaded_file.state.name}") | |
print(f"Traitement terminé avec succès: {uploaded_file.uri}") | |
return uploaded_file | |
except ssl.SSLError as e: | |
print( | |
f"Erreur SSL lors de l'upload (tentative {attempt + 1}): {e}") | |
if attempt < max_retries - 1: | |
time.sleep(retry_delay * (attempt + 1)) | |
else: | |
raise | |
except Exception as e: | |
print( | |
f"Erreur lors de l'upload (tentative {attempt + 1}): {e}") | |
if attempt < max_retries - 1: | |
time.sleep(retry_delay * (attempt + 1)) | |
else: | |
raise | |
# Accept user's next message, add to context, resubmit context to Gemini | |
if prompt := st.chat_input("Hey?"): | |
# Display user's last message | |
st.chat_message("user").markdown(prompt) | |
content = [prompt] | |
temp_files = [] | |
try: | |
# Process uploaded files | |
for uploaded_file in uploaded_files: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as temp_file: | |
temp_file.write(uploaded_file.getvalue()) | |
temp_files.append(temp_file.name) | |
if uploaded_file.name.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): | |
content.append(PIL.Image.open(temp_file.name)) | |
else: | |
processed_file = upload_and_process_file(temp_file.name) | |
content.append(processed_file) | |
# Send user entry to Gemini and read the response | |
response = model.generate_content(content, stream=True) | |
response.resolve() | |
# Display the response | |
with st.chat_message("assistant"): | |
st.markdown(response.text) | |
# Update the chat history | |
st.session_state.chat.history.extend([ | |
genai.types.Content(parts=[genai.types.Part(text=prompt)], role="user"), | |
genai.types.Content(parts=[genai.types.Part(text=response.text)], role="model") | |
]) | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |
finally: | |
# Cleanup temporary files | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except Exception as e: | |
print(f"Error deleting temporary file {temp_file}: {e}") |