Spaces:
Sleeping
Sleeping
import streamlit as st | |
import google.generativeai as genai | |
import os | |
from dotenv import load_dotenv | |
import http.client | |
import json | |
from typing import Iterator | |
load_dotenv() | |
# Configure the API key | |
genai.configure(api_key=os.getenv("GOOGLE_API_KEY")) | |
safety_settings = [ | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
] | |
model = genai.GenerativeModel('gemini-2.0-flash-exp', | |
tools='code_execution', | |
safety_settings=safety_settings, | |
system_instruction="Tu es un assistant intelligent. ton but est d'assister au mieux que tu peux. tu as été créé par Aenir et tu t'appelles Mariam") | |
def perform_web_search(query: str) -> dict: | |
conn = http.client.HTTPSConnection("google.serper.dev") | |
payload = json.dumps({"q": query}) | |
headers = { | |
'X-API-KEY': '9b90a274d9e704ff5b21c0367f9ae1161779b573', | |
'Content-Type': 'application/json' | |
} | |
try: | |
conn.request("POST", "/search", payload, headers) | |
res = conn.getresponse() | |
data = json.loads(res.read().decode("utf-8")) | |
return data | |
except Exception as e: | |
st.error(f"Erreur lors de la recherche web : {e}") | |
return None | |
finally: | |
conn.close() | |
def format_search_results(data: dict) -> str: | |
if not data: | |
return "Aucun résultat trouvé" | |
result = "" | |
if 'knowledgeGraph' in data: | |
kg = data['knowledgeGraph'] | |
result += f"### {kg.get('title', '')}\n" | |
result += f"*{kg.get('type', '')}*\n\n" | |
result += f"{kg.get('description', '')}\n\n" | |
if 'organic' in data: | |
result += "### Résultats principaux:\n" | |
for item in data['organic'][:3]: | |
result += f"- **{item['title']}**\n" | |
result += f" {item['snippet']}\n" | |
result += f" [Lien]({item['link']})\n\n" | |
if 'peopleAlsoAsk' in data: | |
result += "### Questions fréquentes:\n" | |
for item in data['peopleAlsoAsk'][:2]: | |
result += f"- **{item['question']}**\n" | |
result += f" {item['snippet']}\n\n" | |
return result | |
def stream_response(prompt: str, uploaded_file=None) -> Iterator[str]: | |
"""Stream the response from Gemini""" | |
try: | |
if uploaded_file: | |
response = model.generate_content([uploaded_file, "\n\n", prompt], stream=True) | |
else: | |
response = model.generate_content(prompt, stream=True) | |
for chunk in response: | |
if chunk.text: | |
yield chunk.text | |
except Exception as e: | |
yield f"Erreur lors de la génération de la réponse : {str(e)}" | |
def role_to_streamlit(role: str) -> str: | |
return "assistant" if role == "model" else role | |
def process_uploaded_file(file) -> object: | |
if file is not None: | |
file_path = os.path.join("temp", file.name) | |
with open(file_path, "wb") as f: | |
f.write(file.getbuffer()) | |
try: | |
return genai.upload_file(file_path) | |
except Exception as e: | |
st.error(f"Erreur lors du téléchargement du fichier : {e}") | |
return None | |
finally: | |
# Clean up the temporary file | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
def main(): | |
# Initialize session state | |
if "chat" not in st.session_state: | |
st.session_state.chat = model.start_chat(history=[]) | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "web_search" not in st.session_state: | |
st.session_state.web_search = False | |
st.title("Mariam AI!") | |
# Settings sidebar | |
with st.sidebar: | |
st.title("Paramètres") | |
st.session_state.web_search = st.toggle("Activer la recherche web", | |
value=st.session_state.web_search) | |
# File upload | |
uploaded_file = st.file_uploader("Télécharger un fichier (image/document)", | |
type=['jpg', 'mp4', 'mp3', 'jpeg', 'png', 'pdf', 'txt']) | |
# Display chat history from session state | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Chat input | |
if prompt := st.chat_input("Hey?"): | |
# Add user message to chat history | |
st.session_state.messages.append({"role": "user", "content": prompt}) | |
# Display user message | |
st.chat_message("user").markdown(prompt) | |
# Handle file upload | |
uploaded_gemini_file = None | |
if uploaded_file: | |
uploaded_gemini_file = process_uploaded_file(uploaded_file) | |
try: | |
# Perform web search if enabled | |
if st.session_state.web_search: | |
with st.spinner("Recherche web en cours..."): | |
web_results = perform_web_search(prompt) | |
if web_results: | |
formatted_results = format_search_results(web_results) | |
prompt = f"""Question: {prompt}\n\nRésultats de recherche web:\n{formatted_results}\n\nPourrais-tu analyser ces informations et me donner une réponse complète?""" | |
# Display assistant message with streaming | |
with st.chat_message("assistant"): | |
message_placeholder = st.empty() | |
full_response = "" | |
# Stream the response | |
for chunk in stream_response(prompt, uploaded_gemini_file): | |
full_response += chunk | |
# Update the message placeholder with the accumulated response | |
message_placeholder.markdown(full_response + "▌") | |
# Remove the cursor and update with the final response | |
message_placeholder.markdown(full_response) | |
# Add assistant response to chat history | |
st.session_state.messages.append({"role": "assistant", "content": full_response}) | |
except Exception as e: | |
st.error(f"Erreur lors de l'envoi du message : {e}") | |
if __name__ == "__main__": | |
# Create temp directory | |
os.makedirs("temp", exist_ok=True) | |
main() |