Spaces:
Sleeping
Sleeping
| import os | |
| #os.system('yt-dlp --cookies-from-browser chrome') | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| import json | |
| from datasets import load_dataset | |
| import streamlit as st | |
| from audio_recorder_streamlit import audio_recorder | |
| import msoffcrypto | |
| import docx | |
| import pptx | |
| #import pymupdf4llm | |
| import tempfile | |
| from typing import List, Optional, Dict, Any | |
| from pydub import AudioSegment | |
| from groq import Groq | |
| from langchain.chains import LLMChain | |
| from langchain_groq import ChatGroq | |
| from langchain.prompts import PromptTemplate | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain.schema import AIMessage, HumanMessage, SystemMessage | |
| from datetime import datetime | |
| import smtplib | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.application import MIMEApplication | |
| from reportlab.lib import colors | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| import re | |
| from docx import Document | |
| from pytube import YouTube | |
| from moviepy import VideoFileClip | |
| import yt_dlp | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from urllib.parse import urlparse, parse_qs | |
| from ratelimit import limits, sleep_and_retry | |
| import time | |
| import fasttext | |
| import requests | |
| from requests.auth import HTTPBasicAuth | |
| import pikepdf | |
| import io | |
| import pypdf | |
| from PyPDF2 import PdfReader | |
| from pptx import Presentation | |
| import trafilatura | |
| from bs4 import BeautifulSoup | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| SENDER_EMAIL = os.environ.get('SENDER_EMAIL') | |
| SENDER_PASSWORD = os.environ.get('SENDER_PASSWORD') | |
| class Config: | |
| """Centralisation de la configuration""" | |
| #GROQ_API_KEY = "" | |
| #SENDER_EMAIL = "" | |
| #SENDER_PASSWORD = "" | |
| FASTTEXT_MODEL_PATH = "lid.176.bin" | |
| import urllib.request | |
| urllib.request.urlretrieve('https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin', 'lid.176.bin') | |
| # Classes PDFGenerator et EmailSender restent inchangées... | |
| class PDFGenerator: | |
| def create_pdf(content: str, filename: str) -> str: | |
| doc = SimpleDocTemplate(filename, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| custom_style = ParagraphStyle( | |
| 'CustomStyle', | |
| parent=styles['Normal'], | |
| spaceBefore=12, | |
| spaceAfter=12, | |
| fontSize=12, | |
| leading=14, | |
| ) | |
| story = [] | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=16, | |
| spaceAfter=30, | |
| ) | |
| story.append(Paragraph("Résumé Audio", title_style)) | |
| story.append(Paragraph(f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}", custom_style)) | |
| story.append(Spacer(1, 20)) | |
| for line in content.split('\n'): | |
| if line.strip(): | |
| if line.startswith('#'): | |
| story.append(Paragraph(line.strip('# '), styles['Heading2'])) | |
| else: | |
| story.append(Paragraph(line, custom_style)) | |
| doc.build(story) | |
| return filename | |
| class EmailSender: | |
| def __init__(self, sender_email: str, sender_password: str): | |
| self.sender_email = SENDER_EMAIL # or Config.SENDER_EMAIL | |
| self.sender_password = SENDER_PASSWORD # or Config.SENDER_PASSWORD | |
| def send_email(self, recipient_email: str, subject: str, body: str, pdf_path: str) -> bool: | |
| try: | |
| msg = MIMEMultipart() | |
| msg['From'] = self.sender_email | |
| msg['To'] = recipient_email | |
| msg['Subject'] = subject | |
| msg.attach(MIMEText(body, 'plain')) | |
| with open(pdf_path, 'rb') as f: | |
| pdf_attachment = MIMEApplication(f.read(), _subtype='pdf') | |
| pdf_attachment.add_header('Content-Disposition', 'attachment', filename=os.path.basename(pdf_path)) | |
| msg.attach(pdf_attachment) | |
| server = smtplib.SMTP('smtp.gmail.com', 587) | |
| server.starttls() | |
| server.login(self.sender_email, self.sender_password) | |
| server.send_message(msg) | |
| server.quit() | |
| return True | |
| except Exception as e: | |
| st.error(f"Erreur d'envoi d'email: {str(e)}") | |
| return False | |
| class AudioProcessor: | |
| def __init__(self, model_name: str, prompt: str = None, chunk_length_ms: int = 300000): | |
| self.chunk_length_ms = chunk_length_ms | |
| self.groq_client = Groq() #api_key=Config.GROQ_API_KEY | |
| self.llm = ChatGroq( | |
| model=model_name, | |
| temperature=0, | |
| #api_key=Config.GROQ_API_KEY | |
| ) | |
| self.custom_prompt = prompt | |
| self.language_detector = fasttext.load_model(Config.FASTTEXT_MODEL_PATH) | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=4000, | |
| chunk_overlap=200 | |
| ) | |
| #self.custom_prompt = prompt | |
| # Définition des limites de taux : 5000 tokens par minute | |
| self.CALLS_PER_MINUTE = 5000 | |
| self.PERIOD = 60 # 60 secondes = 1 minute | |
| # Add language detection model | |
| #self.language_detector = fasttext.load_model('lid.176.bin') | |
| def check_language(self, text: str) -> str: | |
| """Vérifie si le texte est en français""" | |
| prediction = self.language_detector.predict(text.replace('\n', ' ')) | |
| return "OUI" if prediction[0][0] == '__label__fr' else "NON" | |
| def translate_to_french(self, text: str) -> str: | |
| """Traduit le texte en français si nécessaire""" | |
| try: | |
| messages = [ | |
| SystemMessage(content="Vous êtes un traducteur professionnel agréé en Français. Traduisez le texte suivant en français en conservant le format et la structure:"), | |
| HumanMessage(content=text) | |
| ] | |
| result = self._make_api_call(messages) | |
| return result.generations[0][0].text | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| time.sleep(60) | |
| return self.translate_to_french(text) | |
| raise e | |
| def _make_api_call(self, messages): | |
| return self.llm.generate([messages]) | |
| def chunk_audio(self, file_path: str) -> List[AudioSegment]: | |
| try: | |
| audio = AudioSegment.from_file(file_path) | |
| if len(audio) < self.chunk_length_ms: | |
| return [audio] | |
| return [ | |
| audio[i:i + self.chunk_length_ms] | |
| for i in range(0, len(audio), self.chunk_length_ms) | |
| ] | |
| except Exception as e: | |
| st.error(f"Error processing audio file: {str(e)}") | |
| return [] | |
| def transcribe_chunk(self, audio_chunk: AudioSegment) -> str: | |
| try: | |
| with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file: | |
| audio_chunk.export(temp_file.name, format="mp3") | |
| with open(temp_file.name, "rb") as audio_file: | |
| try: | |
| response = self.groq_client.audio.transcriptions.create( | |
| file=audio_file, | |
| model="whisper-large-v3-turbo", | |
| language="fr" | |
| ) | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| st.warning("Limite de taux atteinte pendant la transcription. Attente avant nouvelle tentative...") | |
| time.sleep(60) | |
| return self.transcribe_chunk(audio_chunk) | |
| raise e | |
| os.unlink(temp_file.name) | |
| return response.text | |
| except Exception as e: | |
| st.error(f"Transcription error: {str(e)}") | |
| return "" | |
| # Dans la classe AudioProcessor, ajoutez cette méthode : | |
| def split_text(self, text: str, max_tokens: int = 4000) -> List[str]: | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=max_tokens * 4, # Estimation approximative tokens -> caractères | |
| chunk_overlap=200, | |
| length_function=len, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| return text_splitter.split_text(text) | |
| def generate_summary(self, transcription: str) -> str: | |
| default_prompt = """ | |
| Vous êtes un assistant expert spécialisé dans le résumé et l'analyse d'enregistrements audio en langue française. | |
| Voici la transcription à analyser: | |
| {transcript} | |
| Veuillez fournir: | |
| 1. Un résumé concis (3-4 phrases) | |
| 2. Les points clés (maximum 5 points) | |
| 3. Les actions recommandées (si pertinent) | |
| 4. Une conclusion brève | |
| Format souhaité: | |
| # Résumé | |
| [votre résumé] | |
| # Points Clés | |
| • [point 1] | |
| • [point 2] | |
| ... | |
| # Actions Recommandées | |
| 1. [action 1] | |
| 2. [action 2] | |
| ... | |
| # Conclusion | |
| [votre conclusion] | |
| """ | |
| prompt_template = self.custom_prompt if self.custom_prompt else default_prompt | |
| try: | |
| chain = LLMChain( | |
| llm=self.llm, | |
| prompt=PromptTemplate( | |
| template=prompt_template, | |
| input_variables=["transcript"] | |
| ) | |
| ) | |
| summary = chain.run(transcript=transcription) | |
| # Vérification de la langue | |
| if self.check_language(summary) == "NON": | |
| st.warning("Résumé généré dans une autre langue. Traduction en cours...") | |
| summary = self.translate_to_french(summary) | |
| return summary | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| st.warning("Limite de taux atteinte. Attente avant nouvelle tentative...") | |
| time.sleep(60) # Attendre 1 minute | |
| return self.generate_summary(transcription) | |
| raise e | |
| # Méthodes existantes inchangées... | |
| def summarize_long_transcription(self, transcription: str) -> str: | |
| chunks = self.split_text(transcription, max_tokens=4000) | |
| partial_summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| st.write(f"Traitement du segment {i + 1}/{len(chunks)}...") | |
| try: | |
| messages = [ | |
| SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), | |
| HumanMessage(content=f"Résumez ce texte en français : {chunk}") | |
| ] | |
| result = self._make_api_call(messages) | |
| partial_summary = result.generations[0][0].text | |
| # Vérification de la langue pour chaque segment | |
| if self.check_language(partial_summary) == "NON": | |
| partial_summary = self.translate_to_french(partial_summary) | |
| partial_summaries.append(partial_summary) | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| st.warning(f"Limite de taux atteinte au segment {i+1}. Attente avant nouvelle tentative...") | |
| time.sleep(60) | |
| i -= 1 | |
| continue | |
| raise e | |
| try: | |
| final_prompt = f"""Combinez ces résumés partiels en un résumé global cohérent en langue française : | |
| {' '.join(partial_summaries)} | |
| """ | |
| messages = [ | |
| SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), | |
| HumanMessage(content=final_prompt) | |
| ] | |
| final_result = self._make_api_call(messages) | |
| final_summary = final_result.generations[0][0].text | |
| # Vérification finale de la langue | |
| if self.check_language(final_summary) == "NON": | |
| st.warning("Résumé final dans une autre langue. Traduction en cours...") | |
| final_summary = self.translate_to_french(final_summary) | |
| return final_summary | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| st.warning("Limite de taux atteinte lors de la génération du résumé final. Attente avant nouvelle tentative...") | |
| time.sleep(60) | |
| return self.summarize_long_transcription(transcription) | |
| raise e | |
| """def summarize_long_transcription(self, transcription: str) -> str: | |
| try: | |
| chunks = self.split_text(transcription) | |
| partial_summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| st.write(f"Traitement du segment {i + 1}/{len(chunks)}...") | |
| summary = self._process_chunk(chunk) | |
| partial_summaries.append(summary) | |
| return self._combine_summaries(partial_summaries) | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| time.sleep(60) | |
| return self.summarize_long_transcription(transcription) | |
| raise e | |
| def _process_chunk(self, chunk: str) -> str: | |
| messages = [ | |
| SystemMessage(content="Résumez ce texte en français :"), | |
| HumanMessage(content=chunk) | |
| ] | |
| result = self._make_api_call(messages) | |
| summary = result.generations[0][0].text | |
| if self.check_language(summary) == "NON": | |
| summary = self.translate_to_french(summary) | |
| return summary | |
| def _combine_summaries(self, summaries: List[str]) -> str: | |
| try: | |
| messages = [ | |
| SystemMessage(content="Combinez ces résumés en un résumé global cohérent en français :"), | |
| HumanMessage(content=' '.join(summaries)) | |
| ] | |
| result = self._make_api_call(messages) | |
| final_summary = result.generations[0][0].text | |
| if self.check_language(final_summary) == "NON": | |
| final_summary = self.translate_to_french(final_summary) | |
| return final_summary | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| time.sleep(60) | |
| return self._combine_summaries(summaries) | |
| raise e""" | |
| class VideoProcessor: | |
| def __init__(self): | |
| self.supported_formats = ['.mp4', '.avi', '.mov', '.mkv'] | |
| self.ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': 'temp_audio.%(ext)s' | |
| } | |
| def extract_video_id(self, url: str) -> str: | |
| try: | |
| parsed_url = urlparse(url) | |
| if parsed_url.hostname in ['www.youtube.com', 'youtube.com']: | |
| return parse_qs(parsed_url.query)['v'][0] | |
| elif parsed_url.hostname == 'youtu.be': | |
| return parsed_url.path[1:] | |
| return None | |
| except Exception: | |
| return None | |
| def get_youtube_transcription(self, video_id: str) -> Optional[str]: | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['fr', 'en']) | |
| return ' '.join(entry['text'] for entry in transcript_list) | |
| except Exception: | |
| return None | |
| """def download_youtube_audio(self, url: str) -> str: | |
| with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: | |
| ydl.download([url]) | |
| return 'temp_audio.mp3' """ | |
| def download_youtube_audio(self, url: str) -> str: | |
| try: | |
| # Fichier cookies | |
| cookie_file_path = "cookies.txt" | |
| # Options pour yt-dlp | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': 'temp_audio.%(ext)s', | |
| 'cookiefile': cookie_file_path | |
| } | |
| # Téléchargement | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| # Vérifier si le fichier audio existe | |
| audio_path = 'temp_audio.mp3' | |
| if not os.path.exists(audio_path): | |
| raise FileNotFoundError(f"Le fichier {audio_path} n'a pas été généré.") | |
| return audio_path | |
| except Exception as e: | |
| raise RuntimeError(f"Erreur lors du téléchargement : {str(e)}") | |
| def extract_audio_from_video(self, video_path: str) -> str: | |
| try: | |
| audio_path = f"{os.path.splitext(video_path)[0]}.mp3" | |
| with VideoFileClip(video_path) as video: | |
| video.audio.write_audiofile(audio_path) | |
| return audio_path | |
| except Exception as e: | |
| st.error(f"Erreur lors de l'extraction audio: {str(e)}") | |
| raise | |
| class DocumentProcessor: | |
| def __init__(self, model_name: str, prompt: str = None): | |
| self.llm = ChatGroq( | |
| model=model_name, | |
| temperature=0, | |
| #api_key=Config.GROQ_API_KEY | |
| ) | |
| self.custom_prompt = prompt | |
| #self.text_splitter = RecursiveCharacterTextSplitter( | |
| # chunk_size=4000, | |
| # chunk_overlap=200 | |
| #) | |
| self.language_detector = fasttext.load_model('lid.176.bin') | |
| def split_text(self, text: str, max_tokens: int = 4000) -> List[str]: | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=max_tokens * 4, # Estimation approximative tokens -> caractères | |
| chunk_overlap=200, | |
| length_function=len, | |
| separators=["\n\n", "\n", " ", ""] | |
| ) | |
| return text_splitter.split_text(text) | |
| def check_language(self, text: str) -> str: | |
| """Vérifie si le texte est en français""" | |
| prediction = self.language_detector.predict(text.replace('\n', ' ')) | |
| return "OUI" if prediction[0][0] == '__label__fr' else "NON" | |
| def translate_to_french(self, text: str) -> str: | |
| """Traduit le texte en français si nécessaire""" | |
| try: | |
| messages = [ | |
| SystemMessage(content="Vous êtes un traducteur professionnel agrée en Français. Traduisez le texte suivant en français en conservant le format et la structure:"), | |
| HumanMessage(content=text) | |
| ] | |
| result = self._make_api_call(messages) | |
| return result.generations[0][0].text | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| time.sleep(60) | |
| return self.translate_to_french(text) | |
| raise e | |
| # Méthodes existantes de DocumentProcessor inchangées... | |
| def _make_api_call(self, messages): | |
| return self.llm.generate([messages]) | |
| def process_protected_pdf(self, file_path: str, password: str = None) -> str: | |
| """ | |
| Traite un PDF, avec ou sans mot de passe, et extrait le texte. | |
| :param file_path: Chemin vers le fichier PDF. | |
| :param password: Mot de passe du fichier PDF (si nécessaire). | |
| :return: Texte extrait du PDF. | |
| """ | |
| try: | |
| # Si un mot de passe est fourni, tenter de déverrouiller le PDF | |
| if password: | |
| with pikepdf.open(file_path, password=password) as pdf: | |
| unlocked_pdf_path = "unlocked_temp.pdf" | |
| pdf.save(unlocked_pdf_path) | |
| # Utiliser le fichier temporaire déverrouillé | |
| reader = PdfReader(unlocked_pdf_path) | |
| text = "" | |
| for page in reader.pages: | |
| text += page.extract_text() | |
| # Supprimer le fichier temporaire | |
| os.remove(unlocked_pdf_path) | |
| else: | |
| # Si aucun mot de passe, traiter directement le PDF | |
| reader = PdfReader(file_path) | |
| text = "" | |
| for page in reader.pages: | |
| text += page.extract_text() | |
| return text | |
| except pikepdf.PasswordError: | |
| raise ValueError("Mot de passe PDF incorrect") | |
| except Exception as e: | |
| raise RuntimeError(f"Erreur lors du traitement du PDF : {e}") | |
| def process_protected_office(self, file, file_type: str, password: str = None) -> str: | |
| """ | |
| Traite un fichier Office (protégé ou non) et extrait le texte. | |
| :param file: Le fichier Office à traiter. | |
| :param password: Mot de passe du fichier (si nécessaire, sinon None). | |
| :param file_type: Type du fichier ('docx' ou 'pptx'). | |
| :return: Texte extrait du fichier. | |
| """ | |
| try: | |
| if password: | |
| # Cas où un mot de passe est fourni, tenter de déverrouiller le fichier | |
| office_file = msoffcrypto.OfficeFile(file) | |
| office_file.load_key(password=password) | |
| decrypted = io.BytesIO() | |
| office_file.decrypt(decrypted) | |
| if file_type == 'docx': | |
| doc = docx.Document(decrypted) | |
| return "\n".join([p.text for p in doc.paragraphs]) | |
| elif file_type == 'pptx': | |
| ppt = pptx.Presentation(decrypted) | |
| return "\n".join([shape.text for slide in ppt.slides | |
| for shape in slide.shapes if hasattr(shape, "text")]) | |
| else: | |
| # Cas où aucun mot de passe n'est fourni, traiter directement le fichier | |
| if file_type == 'docx': | |
| doc = docx.Document(file) # Charger le fichier sans décryptage | |
| return "\n".join([p.text for p in doc.paragraphs]) | |
| elif file_type == 'pptx': | |
| ppt = pptx.Presentation(file) | |
| return "\n".join([shape.text for slide in ppt.slides | |
| for shape in slide.shapes if hasattr(shape, "text")]) | |
| raise ValueError("Type de fichier non supporté. Utilisez 'docx' ou 'pptx'.") | |
| except msoffcrypto.exceptions.InvalidKeyError: | |
| raise ValueError("Mot de passe incorrect ou fichier non valide.") | |
| except Exception as e: | |
| raise RuntimeError(f"Erreur lors du traitement du fichier Office : {e}") | |
| def scrape_web_content(self, url: str, auth: Dict[str, str] = None) -> str: | |
| try: | |
| if auth: | |
| session = requests.Session() | |
| session.auth = HTTPBasicAuth(auth['username'], auth['password']) | |
| response = session.get(url, timeout=30) | |
| else: | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| downloaded = trafilatura.extract(response.text) | |
| if not downloaded: | |
| raise ValueError("Impossible d'extraire le contenu de cette page") | |
| return downloaded | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 401: | |
| raise ValueError("Authentification requise pour accéder à cette page") | |
| elif e.response.status_code == 404: | |
| raise ValueError("Page introuvable") | |
| else: | |
| raise ValueError(f"Erreur HTTP: {e.response.status_code}") | |
| except requests.exceptions.RequestException: | |
| raise ValueError("URL invalide ou inaccessible") | |
| def summarize_text(self, transcription: str) -> str: | |
| chunks = self.split_text(transcription, max_tokens=4000) | |
| partial_summaries = [] | |
| for i, chunk in enumerate(chunks): | |
| st.write(f"Traitement du segment {i + 1}/{len(chunks)}...") | |
| try: | |
| messages = [ | |
| SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), | |
| HumanMessage(content=f"Résumez ce texte en français : {chunk}") | |
| ] | |
| result = self._make_api_call(messages) | |
| partial_summary = result.generations[0][0].text | |
| # Vérification de la langue pour chaque segment | |
| if self.check_language(partial_summary) == "NON": | |
| partial_summary = self.translate_to_french(partial_summary) | |
| partial_summaries.append(partial_summary) | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| st.warning(f"Limite de taux atteinte au segment {i+1}. Attente avant nouvelle tentative...") | |
| time.sleep(60) | |
| i -= 1 | |
| continue | |
| raise e | |
| try: | |
| final_prompt = f"""Combinez ces résumés partiels en un résumé global cohérent en langue française : | |
| {' '.join(partial_summaries)} | |
| """ | |
| messages = [ | |
| SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), | |
| HumanMessage(content=final_prompt) | |
| ] | |
| final_result = self._make_api_call(messages) | |
| final_summary = final_result.generations[0][0].text | |
| # Vérification finale de la langue | |
| if self.check_language(final_summary) == "NON": | |
| st.warning("Résumé final dans une autre langue. Traduction en cours...") | |
| final_summary = self.translate_to_french(final_summary) | |
| return final_summary | |
| except Exception as e: | |
| if "rate_limit_exceeded" in str(e): | |
| st.warning("Limite de taux atteinte lors de la génération du résumé final. Attente avant nouvelle tentative...") | |
| time.sleep(60) | |
| return self.summarize_long_transcription(transcription) | |
| raise e | |
| def generate_docx(content: str, filename: str): | |
| doc = Document() | |
| doc.add_heading('Résumé Audio', 0) | |
| doc.add_paragraph(f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}") | |
| for line in content.split('\n'): | |
| if line.strip(): | |
| if line.startswith('#'): | |
| doc.add_heading(line.strip('# '), level=1) | |
| else: | |
| doc.add_paragraph(line) | |
| doc.save(filename) | |
| return filename | |
| def model_selection_sidebar(): | |
| """Configuration du modèle dans la barre latérale""" | |
| with st.sidebar: | |
| st.title("Configuration") | |
| model = st.selectbox( | |
| "Sélectionnez un modèle", | |
| [ | |
| "mixtral-8x7b-32768", | |
| "llama-3.3-70b-versatile", | |
| "gemma2-9b-i", | |
| "llama3-70b-8192" | |
| ] | |
| ) | |
| prompt = st.text_area( | |
| "Instructions personnalisées pour le résumé", | |
| placeholder="Ex: Résumé de réunion avec points clés et actions" | |
| ) | |
| return model, prompt | |
| def save_uploaded_file(uploaded_file) -> str: | |
| """Sauvegarde un fichier uploadé et retourne son chemin""" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| return tmp_file.name | |
| def is_valid_email(email: str) -> bool: | |
| """Valide le format d'une adresse email""" | |
| pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' | |
| return bool(re.match(pattern, email)) | |
| def enhance_main(): | |
| """Fonction principale avec gestion des états et des erreurs améliorée""" | |
| st.set_page_config(page_title="Multimodal Content Summarizer", page_icon="📝") | |
| # Titre de l'application | |
| st.title("🧠 **MultiModal Genius - Résumé Intelligent de Contenus Multimédias**") | |
| st.subheader("Transformez vidéos, audios, textes, pages webs et plus en résumés clairs et percutants grâce à la puissance de l'IA") | |
| with st.expander("Notice d'utilisation 📜"): | |
| st.markdown(""" | |
| ## **Bienvenue dans l'application MultiModal Genius !** 🎉 | |
| Cette application exploite la puissance de l'IA pour résumer des contenus multimédias variés, tels que des **documents**, **vidéos YouTube**, **audios**, **pages web**, et bien plus encore ! 🧠✨ | |
| ### **Comment utiliser l'application ?** | |
| 1. **Documents** 📄 : | |
| - **Formats supportés** : `.pdf`, `.docx`, `.pptx` | |
| - Chargez un document via le bouton **"Télécharger un fichier"**. | |
| - ⚠️ **Remarque** : Les documents contenant plus de **10 pages** peuvent entraîner des résultats imprécis en raison des limitations des modèles d'IA. | |
| 2. **Vidéos YouTube** 📹 : | |
| - Collez simplement l'URL de la vidéo. | |
| - La vidéo est automatiquement découpée en segments pour une analyse et un résumé précis. | |
| - **Durée du traitement** : Plus la vidéo est longue, plus le traitement peut prendre du temps. | |
| 3. **Audios** 🎵 : | |
| - Téléchargez un fichier audio au format `.mp3`. | |
| - L'audio sera transcrit par blocs (chunks) avant d'être résumé. | |
| - ⚠️ **Remarque** : Les fichiers audio de grande taille peuvent rallonger le processus. | |
| 4. **Pages Web** 🌐 : | |
| - Fournissez l'URL de la page. | |
| - Le contenu textuel sera extrait, découpé en blocs, puis résumé. | |
| ### **Pourquoi le résumé peut être long ?** | |
| - **Traitement volumineux** : Les contenus trop longs ou complexes nécessitent un découpage en plusieurs blocs (chunks). Ces blocs sont analysés et traduits avant d'être rassemblés pour un résumé final. | |
| - **Limites des modèles IA** : Certains contenus trop volumineux peuvent provoquer des hallucinations du modèle (résultats incohérents ou incorrects). | |
| ### **Fonctionnalités à venir 🚀** | |
| - **Description d'images** 🖼️ : Transformez vos images en descriptions riches et détaillées. | |
| - **Extraction de données** 📊 : Convertissez vos contenus en **format JSON** structuré. | |
| - **Amélioration des résumés longs** : Réduction des hallucinations grâce à des optimisations. | |
| - Et bien plus encore ! 🎯 | |
| ### **Astuce pour une meilleure expérience** | |
| - **Préférez des contenus courts ou moyennement volumineux** pour des résultats optimaux. | |
| - En cas de traitement long, un indicateur de progression vous tiendra informé. ⏳ | |
| ### **Nous sommes là pour vous aider !** | |
| Si vous rencontrez un problème ou avez une suggestion pour améliorer l'application, n'hésitez pas à nous contacter. 🙌 | |
| """) | |
| if "audio_processor" not in st.session_state: | |
| model_name, custom_prompt = model_selection_sidebar() | |
| st.session_state.audio_processor = AudioProcessor(model_name, custom_prompt) | |
| if "auth_required" not in st.session_state: | |
| st.session_state.auth_required = False | |
| # Interface principale | |
| source_type = st.radio("Type de source", ["Audio/Vidéo", "Document", "Web"]) | |
| try: | |
| if source_type == "Audio/Vidéo": | |
| process_audio_video() | |
| elif source_type == "Document": | |
| process_document() | |
| else: # Web | |
| process_web() | |
| except Exception as e: | |
| st.error(f"Une erreur est survenue: {str(e)}") | |
| st.error("Veuillez réessayer ou contacter le support.") | |
| def process_audio_video(): | |
| """Traitement des sources audio et vidéo""" | |
| source = st.radio("Choisissez votre source", ["Audio", "Vidéo locale", "YouTube"]) | |
| if source == "Audio": | |
| handle_audio_input() | |
| elif source == "Vidéo locale": | |
| handle_video_input() | |
| else: # YouTube | |
| handle_youtube_input() | |
| def handle_audio_input(): | |
| """Gestion des entrées audio""" | |
| uploaded_file = st.file_uploader("Fichier audio", type=['mp3', 'wav', 'm4a', 'ogg']) | |
| audio_bytes = audio_recorder() | |
| if uploaded_file or audio_bytes: | |
| process_and_display_results(uploaded_file, audio_bytes) | |
| def handle_video_input(): | |
| """Gestion des entrées vidéo""" | |
| uploaded_video = st.file_uploader("Fichier vidéo", type=['mp4', 'avi', 'mov', 'mkv']) | |
| if uploaded_video: | |
| st.video(uploaded_video) | |
| with st.spinner("Extraction de l'audio..."): | |
| video_processor = VideoProcessor() | |
| video_path = save_uploaded_file(uploaded_video) | |
| audio_path = video_processor.extract_audio_from_video(video_path) | |
| process_and_display_results(audio_path) | |
| def handle_youtube_input(): | |
| """Gestion des entrées YouTube""" | |
| youtube_url = st.text_input("URL YouTube") | |
| if youtube_url and st.button("Analyser"): | |
| video_processor = VideoProcessor() | |
| video_id = video_processor.extract_video_id(youtube_url) | |
| if video_id: | |
| st.video(youtube_url) | |
| with st.spinner("Traitement de la vidéo..."): | |
| transcription = video_processor.get_youtube_transcription(video_id) | |
| if transcription: | |
| process_and_display_results(None, None, transcription) | |
| else: | |
| audio_path = video_processor.download_youtube_audio(youtube_url) | |
| process_and_display_results(audio_path) | |
| def process_and_display_results(file_path=None, audio_bytes=None, transcription=None): | |
| """Traitement et affichage des résultats""" | |
| try: | |
| if transcription is None: | |
| transcription = get_transcription(file_path, audio_bytes) | |
| if transcription: | |
| display_transcription_and_summary(transcription) | |
| finally: | |
| cleanup_temporary_files() | |
| def get_transcription(file_path=None, audio_bytes=None) -> str: | |
| """Obtention de la transcription""" | |
| if file_path: | |
| path = file_path if isinstance(file_path, str) else save_uploaded_file(file_path) | |
| elif audio_bytes: | |
| path = save_audio_bytes(audio_bytes) | |
| else: | |
| return None | |
| chunks = st.session_state.audio_processor.chunk_audio(path) | |
| transcriptions = [] | |
| with st.expander("Transcription", expanded=False): | |
| progress_bar = st.progress(0) | |
| for i, chunk in enumerate(chunks): | |
| transcription = st.session_state.audio_processor.transcribe_chunk(chunk) | |
| if transcription: | |
| transcriptions.append(transcription) | |
| progress_bar.progress((i + 1) / len(chunks)) | |
| return " ".join(transcriptions) if transcriptions else None | |
| def display_transcription_and_summary(transcription: str): | |
| """Affichage de la transcription et du résumé""" | |
| st.subheader("Transcription") | |
| st.text_area("Texte transcrit:", value=transcription, height=200) | |
| st.subheader("Résumé et Analyse") | |
| summary = get_summary(transcription) | |
| st.markdown(summary) | |
| # Génération et téléchargement des documents | |
| generate_and_download_documents(summary) | |
| # Option d'envoi par email | |
| handle_email_sending(summary) | |
| def get_summary(transcription: str) -> str: | |
| """Génération du résumé""" | |
| chunks = st.session_state.audio_processor.split_text(transcription) | |
| if len(chunks) > 1: | |
| return st.session_state.audio_processor.summarize_long_transcription(transcription) | |
| return st.session_state.audio_processor.generate_summary(transcription) | |
| def generate_and_download_documents(summary: str): | |
| """Génération et téléchargement des documents""" | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| # Génération PDF | |
| pdf_filename = f"resume_{timestamp}.pdf" | |
| pdf_path = PDFGenerator.create_pdf(summary, pdf_filename) | |
| # Génération DOCX | |
| docx_filename = f"resume_{timestamp}.docx" | |
| docx_path = generate_docx(summary, docx_filename) | |
| # Boutons de téléchargement | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| with open(pdf_path, "rb") as pdf_file: | |
| st.download_button( | |
| "📥 Télécharger PDF", | |
| pdf_file, | |
| file_name=pdf_filename, | |
| mime="application/pdf" | |
| ) | |
| with col2: | |
| with open(docx_path, "rb") as docx_file: | |
| st.download_button( | |
| "📥 Télécharger DOCX", | |
| docx_file, | |
| file_name=docx_filename, | |
| mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| ) | |
| return pdf_path | |
| def handle_email_sending(summary: str): | |
| """Gestion de l'envoi par email""" | |
| st.subheader("📧 Recevoir le résumé par email") | |
| recipient_email = st.text_input("Entrez votre adresse email:") | |
| if st.button("Envoyer par email"): | |
| if not is_valid_email(recipient_email): | |
| st.error("Veuillez entrer une adresse email valide.") | |
| return | |
| with st.spinner("Envoi de l'email en cours..."): | |
| pdf_path = generate_and_download_documents(summary) | |
| email_sender = EmailSender(Config.SENDER_EMAIL, Config.SENDER_PASSWORD) | |
| if email_sender.send_email( | |
| recipient_email, | |
| "Résumé de votre contenu audio/vidéo", | |
| "Veuillez trouver ci-joint le résumé de votre contenu.", | |
| pdf_path | |
| ): | |
| st.success("Email envoyé avec succès!") | |
| else: | |
| st.error("Échec de l'envoi de l'email.") | |
| def cleanup_temporary_files(): | |
| """Nettoyage des fichiers temporaires""" | |
| temp_files = ['temp_audio.mp3', 'temp_video.mp4'] | |
| for temp_file in temp_files: | |
| if os.path.exists(temp_file): | |
| try: | |
| os.remove(temp_file) | |
| except Exception: | |
| pass | |
| def process_document(): | |
| """Traitement des documents""" | |
| file = st.file_uploader("Chargez votre document", type=['pdf', 'docx', 'pptx', 'txt']) | |
| password = st.text_input("Mot de passe (si protégé)", type="password") | |
| if file: | |
| try: | |
| doc_processor = DocumentProcessor( | |
| st.session_state.audio_processor.llm.model_name, | |
| st.session_state.audio_processor.custom_prompt | |
| ) | |
| text = process_document_with_password(file, password, doc_processor) | |
| if text: | |
| summary = doc_processor.summarize_text(text) | |
| display_summary_and_downloads(summary) | |
| except ValueError as e: | |
| st.error(str(e)) | |
| def process_document_with_password(file, password: str, doc_processor: DocumentProcessor) -> Optional[str]: | |
| """Traitement des documents protégés par mot de passe""" | |
| file_extension = os.path.splitext(file.name)[1].lower() | |
| try: | |
| if file_extension == '.pdf': | |
| return doc_processor.process_protected_pdf(file, password) | |
| elif file_extension in ['.docx', '.pptx']: | |
| return doc_processor.process_protected_office(file, file_extension[1:], password) | |
| elif file_extension == '.txt': | |
| return file.read().decode('utf-8') | |
| else: | |
| st.error("Format de fichier non supporté") | |
| return None | |
| except ValueError as e: | |
| st.error(str(e)) | |
| return None | |
| def process_web(): | |
| """Traitement des contenus web""" | |
| url = st.text_input("URL du site web") | |
| auth_required = st.checkbox("Authentification requise") | |
| auth = None | |
| if auth_required: | |
| username = st.text_input("Nom d'utilisateur") | |
| password = st.text_input("Mot de passe", type="password") | |
| auth = {"username": username, "password": password} | |
| if url and st.button("Analyser"): | |
| try: | |
| doc_processor = DocumentProcessor( | |
| st.session_state.audio_processor.llm.model_name, | |
| st.session_state.audio_processor.custom_prompt | |
| ) | |
| text = doc_processor.scrape_web_content(url, auth) | |
| if text: | |
| summary = doc_processor.summarize_text(text) | |
| display_summary_and_downloads(summary) | |
| except ValueError as e: | |
| st.error(str(e)) | |
| def display_summary_and_downloads(summary: str): | |
| """Affichage du résumé et options de téléchargement""" | |
| st.markdown("### 📝 Résumé et Analyse") | |
| st.markdown(summary) | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| # Génération PDF | |
| pdf_filename = f"resume_{timestamp}.pdf" | |
| pdf_path = PDFGenerator.create_pdf(summary, pdf_filename) | |
| # Génération DOCX | |
| docx_filename = f"resume_{timestamp}.docx" | |
| docx_path = generate_docx(summary, docx_filename) | |
| # Boutons de téléchargement | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| with open(pdf_path, "rb") as pdf_file: | |
| st.download_button( | |
| "📥 Télécharger PDF", | |
| pdf_file, | |
| file_name=pdf_filename, | |
| mime="application/pdf" | |
| ) | |
| with col2: | |
| with open(docx_path, "rb") as docx_file: | |
| st.download_button( | |
| "📥 Télécharger DOCX", | |
| docx_file, | |
| file_name=docx_filename, | |
| mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
| ) | |
| # Option d'envoi par email | |
| st.markdown("### 📧 Recevoir le résumé par email") | |
| recipient_email = st.text_input("Entrez votre adresse email:") | |
| if st.button("Envoyer par email"): | |
| if not is_valid_email(recipient_email): | |
| st.error("Veuillez entrer une adresse email valide.") | |
| else: | |
| with st.spinner("Envoi de l'email en cours..."): | |
| email_sender = EmailSender(Config.SENDER_EMAIL, Config.SENDER_PASSWORD) | |
| if email_sender.send_email( | |
| recipient_email, | |
| "Résumé de votre contenu", | |
| "Veuillez trouver ci-joint le résumé de votre contenu.", | |
| pdf_path | |
| ): | |
| st.success("Email envoyé avec succès!") | |
| else: | |
| st.error("Échec de l'envoi de l'email.") | |
| def save_audio_bytes(audio_bytes: bytes) -> str: | |
| """Sauvegarde les bytes audio dans un fichier temporaire""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| file_path = f"recording_{timestamp}.wav" | |
| with open(file_path, 'wb') as f: | |
| f.write(audio_bytes) | |
| return file_path | |
| if __name__ == "__main__": | |
| try: | |
| enhance_main() | |
| except Exception as e: | |
| st.error(f"Une erreur inattendue est survenue: {str(e)}") | |
| st.error("Veuillez réessayer ou contacter le support technique.") | |
| finally: | |
| cleanup_temporary_files() |