import os #os.system('yt-dlp --cookies-from-browser chrome') import streamlit as st from audio_recorder_streamlit import audio_recorder import msoffcrypto import docx import pptx #import pymupdf4llm import tempfile from typing import List, Optional, Dict, Any from pydub import AudioSegment from groq import Groq from langchain.chains import LLMChain from langchain_groq import ChatGroq from langchain.prompts import PromptTemplate from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.schema import AIMessage, HumanMessage, SystemMessage from datetime import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from reportlab.lib import colors from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle import re from docx import Document from pytube import YouTube from moviepy import VideoFileClip import yt_dlp from youtube_transcript_api import YouTubeTranscriptApi from urllib.parse import urlparse, parse_qs from ratelimit import limits, sleep_and_retry import time import fasttext import requests from requests.auth import HTTPBasicAuth import pikepdf import io import pypdf from PyPDF2 import PdfReader from pptx import Presentation import trafilatura from bs4 import BeautifulSoup class Config: """Centralisation de la configuration""" GROQ_API_KEY = "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" SENDER_EMAIL = "adjoumanideyanvo1@gmail.com" SENDER_PASSWORD = "fkev txsk ldjg nyqs" FASTTEXT_MODEL_PATH = "lid.176.bin" import urllib.request urllib.request.urlretrieve('https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin', 'lid.176.bin') # Classes PDFGenerator et EmailSender restent inchangées... class PDFGenerator: @staticmethod def create_pdf(content: str, filename: str) -> str: doc = SimpleDocTemplate(filename, pagesize=letter) styles = getSampleStyleSheet() custom_style = ParagraphStyle( 'CustomStyle', parent=styles['Normal'], spaceBefore=12, spaceAfter=12, fontSize=12, leading=14, ) story = [] title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontSize=16, spaceAfter=30, ) story.append(Paragraph("Résumé Audio", title_style)) story.append(Paragraph(f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}", custom_style)) story.append(Spacer(1, 20)) for line in content.split('\n'): if line.strip(): if line.startswith('#'): story.append(Paragraph(line.strip('# '), styles['Heading2'])) else: story.append(Paragraph(line, custom_style)) doc.build(story) return filename class EmailSender: def __init__(self, sender_email: str, sender_password: str): self.sender_email = Config.SENDER_EMAIL self.sender_password = Config.SENDER_PASSWORD def send_email(self, recipient_email: str, subject: str, body: str, pdf_path: str) -> bool: try: msg = MIMEMultipart() msg['From'] = self.sender_email msg['To'] = recipient_email msg['Subject'] = subject msg.attach(MIMEText(body, 'plain')) with open(pdf_path, 'rb') as f: pdf_attachment = MIMEApplication(f.read(), _subtype='pdf') pdf_attachment.add_header('Content-Disposition', 'attachment', filename=os.path.basename(pdf_path)) msg.attach(pdf_attachment) server = smtplib.SMTP('smtp.gmail.com', 587) server.starttls() server.login(self.sender_email, self.sender_password) server.send_message(msg) server.quit() return True except Exception as e: st.error(f"Erreur d'envoi d'email: {str(e)}") return False class AudioProcessor: def __init__(self, model_name: str, prompt: str = None, chunk_length_ms: int = 300000): self.chunk_length_ms = chunk_length_ms self.groq_client = Groq(api_key=Config.GROQ_API_KEY) self.llm = ChatGroq( model=model_name, temperature=0, api_key=Config.GROQ_API_KEY ) self.custom_prompt = prompt self.language_detector = fasttext.load_model(Config.FASTTEXT_MODEL_PATH) self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=4000, chunk_overlap=200 ) #self.custom_prompt = prompt # Définition des limites de taux : 5000 tokens par minute self.CALLS_PER_MINUTE = 5000 self.PERIOD = 60 # 60 secondes = 1 minute # Add language detection model #self.language_detector = fasttext.load_model('lid.176.bin') def check_language(self, text: str) -> str: """Vérifie si le texte est en français""" prediction = self.language_detector.predict(text.replace('\n', ' ')) return "OUI" if prediction[0][0] == '__label__fr' else "NON" def translate_to_french(self, text: str) -> str: """Traduit le texte en français si nécessaire""" try: messages = [ SystemMessage(content="Vous êtes un traducteur professionnel. Traduisez le texte suivant en français en conservant le format et la structure:"), HumanMessage(content=text) ] result = self._make_api_call(messages) return result.generations[0][0].text except Exception as e: if "rate_limit_exceeded" in str(e): time.sleep(60) return self.translate_to_french(text) raise e @sleep_and_retry @limits(calls=5000, period=60) def _make_api_call(self, messages): return self.llm.generate([messages]) def chunk_audio(self, file_path: str) -> List[AudioSegment]: try: audio = AudioSegment.from_file(file_path) if len(audio) < self.chunk_length_ms: return [audio] return [ audio[i:i + self.chunk_length_ms] for i in range(0, len(audio), self.chunk_length_ms) ] except Exception as e: st.error(f"Error processing audio file: {str(e)}") return [] def transcribe_chunk(self, audio_chunk: AudioSegment) -> str: try: with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file: audio_chunk.export(temp_file.name, format="mp3") with open(temp_file.name, "rb") as audio_file: try: response = self.groq_client.audio.transcriptions.create( file=audio_file, model="whisper-large-v3-turbo", language="fr" ) except Exception as e: if "rate_limit_exceeded" in str(e): st.warning("Limite de taux atteinte pendant la transcription. Attente avant nouvelle tentative...") time.sleep(60) return self.transcribe_chunk(audio_chunk) raise e os.unlink(temp_file.name) return response.text except Exception as e: st.error(f"Transcription error: {str(e)}") return "" # Dans la classe AudioProcessor, ajoutez cette méthode : def split_text(self, text: str, max_tokens: int = 4000) -> List[str]: text_splitter = RecursiveCharacterTextSplitter( chunk_size=max_tokens * 4, # Estimation approximative tokens -> caractères chunk_overlap=200, length_function=len, separators=["\n\n", "\n", " ", ""] ) return text_splitter.split_text(text) def generate_summary(self, transcription: str) -> str: default_prompt = """ Vous êtes un assistant expert spécialisé dans le résumé et l'analyse d'enregistrements audio en langue française. Voici la transcription à analyser: {transcript} Veuillez fournir: 1. Un résumé concis (3-4 phrases) 2. Les points clés (maximum 5 points) 3. Les actions recommandées (si pertinent) 4. Une conclusion brève Format souhaité: # Résumé [votre résumé] # Points Clés • [point 1] • [point 2] ... # Actions Recommandées 1. [action 1] 2. [action 2] ... # Conclusion [votre conclusion] """ prompt_template = self.custom_prompt if self.custom_prompt else default_prompt try: chain = LLMChain( llm=self.llm, prompt=PromptTemplate( template=prompt_template, input_variables=["transcript"] ) ) summary = chain.run(transcript=transcription) # Vérification de la langue if self.check_language(summary) == "NON": st.warning("Résumé généré dans une autre langue. Traduction en cours...") summary = self.translate_to_french(summary) return summary except Exception as e: if "rate_limit_exceeded" in str(e): st.warning("Limite de taux atteinte. Attente avant nouvelle tentative...") time.sleep(60) # Attendre 1 minute return self.generate_summary(transcription) raise e # Méthodes existantes inchangées... def summarize_long_transcription(self, transcription: str) -> str: chunks = self.split_text(transcription, max_tokens=4000) partial_summaries = [] for i, chunk in enumerate(chunks): st.write(f"Traitement du segment {i + 1}/{len(chunks)}...") try: messages = [ SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), HumanMessage(content=f"Résumez ce texte en français : {chunk}") ] result = self._make_api_call(messages) partial_summary = result.generations[0][0].text # Vérification de la langue pour chaque segment if self.check_language(partial_summary) == "NON": partial_summary = self.translate_to_french(partial_summary) partial_summaries.append(partial_summary) except Exception as e: if "rate_limit_exceeded" in str(e): st.warning(f"Limite de taux atteinte au segment {i+1}. Attente avant nouvelle tentative...") time.sleep(60) i -= 1 continue raise e try: final_prompt = f"""Combinez ces résumés partiels en un résumé global cohérent en langue française : {' '.join(partial_summaries)} """ messages = [ SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), HumanMessage(content=final_prompt) ] final_result = self._make_api_call(messages) final_summary = final_result.generations[0][0].text # Vérification finale de la langue if self.check_language(final_summary) == "NON": st.warning("Résumé final dans une autre langue. Traduction en cours...") final_summary = self.translate_to_french(final_summary) return final_summary except Exception as e: if "rate_limit_exceeded" in str(e): st.warning("Limite de taux atteinte lors de la génération du résumé final. Attente avant nouvelle tentative...") time.sleep(60) return self.summarize_long_transcription(transcription) raise e """def summarize_long_transcription(self, transcription: str) -> str: try: chunks = self.split_text(transcription) partial_summaries = [] for i, chunk in enumerate(chunks): st.write(f"Traitement du segment {i + 1}/{len(chunks)}...") summary = self._process_chunk(chunk) partial_summaries.append(summary) return self._combine_summaries(partial_summaries) except Exception as e: if "rate_limit_exceeded" in str(e): time.sleep(60) return self.summarize_long_transcription(transcription) raise e def _process_chunk(self, chunk: str) -> str: messages = [ SystemMessage(content="Résumez ce texte en français :"), HumanMessage(content=chunk) ] result = self._make_api_call(messages) summary = result.generations[0][0].text if self.check_language(summary) == "NON": summary = self.translate_to_french(summary) return summary def _combine_summaries(self, summaries: List[str]) -> str: try: messages = [ SystemMessage(content="Combinez ces résumés en un résumé global cohérent en français :"), HumanMessage(content=' '.join(summaries)) ] result = self._make_api_call(messages) final_summary = result.generations[0][0].text if self.check_language(final_summary) == "NON": final_summary = self.translate_to_french(final_summary) return final_summary except Exception as e: if "rate_limit_exceeded" in str(e): time.sleep(60) return self._combine_summaries(summaries) raise e""" class VideoProcessor: """def __init__(self): self.supported_formats = ['.mp4', '.avi', '.mov', '.mkv'] self.ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'outtmpl': 'temp_audio.%(ext)s' }""" def __init__(self): self.supported_formats = ['.mp4', '.avi', '.mov', '.mkv'] self.ydl_opts = { 'cookiesfrombrowser': ('chromium',), 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'outtmpl': 'temp_audio.%(ext)s' } def extract_video_id(self, url: str) -> str: try: parsed_url = urlparse(url) if parsed_url.hostname in ['www.youtube.com', 'youtube.com']: return parse_qs(parsed_url.query)['v'][0] elif parsed_url.hostname == 'youtu.be': return parsed_url.path[1:] return None except Exception: return None def get_youtube_transcription(self, video_id: str) -> Optional[str]: try: transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['fr', 'en']) return ' '.join(entry['text'] for entry in transcript_list) except Exception: return None """def download_youtube_audio(self, url: str) -> str: with yt_dlp.YoutubeDL(self.ydl_opts) as ydl: ydl.download([url]) return 'temp_audio.mp3' """ def download_youtube_audio(self, url: str) -> str: try: # Définir le chemin vers le fichier cookies cookie_file_path = "cookies.txt" # Assurez-vous que le fichier est dans le bon dossier ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', 'preferredquality': '192', }], 'outtmpl': 'temp_audio.%(ext)s', 'cookiefile': cookie_file_path # Utilisation du fichier de cookies } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return 'Téléchargement réussi avec cookies !' except Exception as e: raise RuntimeError(f"Erreur lors du téléchargement : {str(e)}") """def download_youtube_audio(self, url: str) -> str: """ #Télécharge une vidéo YouTube en utilisant les cookies du navigateur. """ ydl_opts = { 'cookiesfrombrowser': ('chrome',), # Utilise Chromium pour les cookies 'format': 'bestaudio/best', 'postprocessors': [ { 'key': 'FFmpegExtractAudio', 'preferredcodec': 'mp3', } ], 'outtmpl': 'temp_audio_with_cookies.%(ext)s' } try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return 'temp_audio_with_cookies.mp3' except Exception as e: st.error(f"Erreur lors du téléchargement de la vidéo avec cookies : {str(e)}") raise """ def extract_audio_from_video(self, video_path: str) -> str: try: audio_path = f"{os.path.splitext(video_path)[0]}.mp3" with VideoFileClip(video_path) as video: video.audio.write_audiofile(audio_path) return audio_path except Exception as e: st.error(f"Erreur lors de l'extraction audio: {str(e)}") raise class DocumentProcessor: def __init__(self, model_name: str, prompt: str = None): self.llm = ChatGroq( model=model_name, temperature=0, api_key=Config.GROQ_API_KEY ) self.custom_prompt = prompt #self.text_splitter = RecursiveCharacterTextSplitter( # chunk_size=4000, # chunk_overlap=200 #) self.language_detector = fasttext.load_model('lid.176.bin') def split_text(self, text: str, max_tokens: int = 4000) -> List[str]: text_splitter = RecursiveCharacterTextSplitter( chunk_size=max_tokens * 4, # Estimation approximative tokens -> caractères chunk_overlap=200, length_function=len, separators=["\n\n", "\n", " ", ""] ) return text_splitter.split_text(text) def check_language(self, text: str) -> str: """Vérifie si le texte est en français""" prediction = self.language_detector.predict(text.replace('\n', ' ')) return "OUI" if prediction[0][0] == '__label__fr' else "NON" def translate_to_french(self, text: str) -> str: """Traduit le texte en français si nécessaire""" try: messages = [ SystemMessage(content="Vous êtes un traducteur professionnel. Traduisez le texte suivant en français en conservant le format et la structure:"), HumanMessage(content=text) ] result = self._make_api_call(messages) return result.generations[0][0].text except Exception as e: if "rate_limit_exceeded" in str(e): time.sleep(60) return self.translate_to_french(text) raise e # Méthodes existantes de DocumentProcessor inchangées... @sleep_and_retry @limits(calls=5000, period=60) def _make_api_call(self, messages): return self.llm.generate([messages]) def process_protected_pdf(self, file_path: str, password: str = None) -> str: """ Traite un PDF, avec ou sans mot de passe, et extrait le texte. :param file_path: Chemin vers le fichier PDF. :param password: Mot de passe du fichier PDF (si nécessaire). :return: Texte extrait du PDF. """ try: # Si un mot de passe est fourni, tenter de déverrouiller le PDF if password: with pikepdf.open(file_path, password=password) as pdf: unlocked_pdf_path = "unlocked_temp.pdf" pdf.save(unlocked_pdf_path) # Utiliser le fichier temporaire déverrouillé reader = PdfReader(unlocked_pdf_path) text = "" for page in reader.pages: text += page.extract_text() # Supprimer le fichier temporaire os.remove(unlocked_pdf_path) else: # Si aucun mot de passe, traiter directement le PDF reader = PdfReader(file_path) text = "" for page in reader.pages: text += page.extract_text() return text except pikepdf.PasswordError: raise ValueError("Mot de passe PDF incorrect") except Exception as e: raise RuntimeError(f"Erreur lors du traitement du PDF : {e}") def process_protected_office(self, file, file_type: str, password: str = None) -> str: """ Traite un fichier Office (protégé ou non) et extrait le texte. :param file: Le fichier Office à traiter. :param password: Mot de passe du fichier (si nécessaire, sinon None). :param file_type: Type du fichier ('docx' ou 'pptx'). :return: Texte extrait du fichier. """ try: if password: # Cas où un mot de passe est fourni, tenter de déverrouiller le fichier office_file = msoffcrypto.OfficeFile(file) office_file.load_key(password=password) decrypted = io.BytesIO() office_file.decrypt(decrypted) if file_type == 'docx': doc = docx.Document(decrypted) return "\n".join([p.text for p in doc.paragraphs]) elif file_type == 'pptx': ppt = pptx.Presentation(decrypted) return "\n".join([shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, "text")]) else: # Cas où aucun mot de passe n'est fourni, traiter directement le fichier if file_type == 'docx': doc = docx.Document(file) # Charger le fichier sans décryptage return "\n".join([p.text for p in doc.paragraphs]) elif file_type == 'pptx': ppt = pptx.Presentation(file) return "\n".join([shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, "text")]) raise ValueError("Type de fichier non supporté. Utilisez 'docx' ou 'pptx'.") except msoffcrypto.exceptions.InvalidKeyError: raise ValueError("Mot de passe incorrect ou fichier non valide.") except Exception as e: raise RuntimeError(f"Erreur lors du traitement du fichier Office : {e}") """def process_protected_office(self, file, password: str=None, file_type: str) -> str: try: office_file = msoffcrypto.OfficeFile(file) office_file.load_key(password=password) decrypted = io.BytesIO() office_file.decrypt(decrypted) if file_type == 'docx': doc = docx.Document(decrypted) return "\n".join([p.text for p in doc.paragraphs]) elif file_type == 'pptx': ppt = pptx.Presentation(decrypted) return "\n".join([shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, "text")]) except Exception: raise ValueError("Mot de passe document incorrect")""" def scrape_web_content(self, url: str, auth: Dict[str, str] = None) -> str: try: if auth: session = requests.Session() session.auth = HTTPBasicAuth(auth['username'], auth['password']) response = session.get(url, timeout=30) else: response = requests.get(url, timeout=30) response.raise_for_status() downloaded = trafilatura.extract(response.text) if not downloaded: raise ValueError("Impossible d'extraire le contenu de cette page") return downloaded except requests.exceptions.HTTPError as e: if e.response.status_code == 401: raise ValueError("Authentification requise pour accéder à cette page") elif e.response.status_code == 404: raise ValueError("Page introuvable") else: raise ValueError(f"Erreur HTTP: {e.response.status_code}") except requests.exceptions.RequestException: raise ValueError("URL invalide ou inaccessible") def summarize_text(self, transcription: str) -> str: chunks = self.split_text(transcription, max_tokens=4000) partial_summaries = [] for i, chunk in enumerate(chunks): st.write(f"Traitement du segment {i + 1}/{len(chunks)}...") try: messages = [ SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), HumanMessage(content=f"Résumez ce texte en français : {chunk}") ] result = self._make_api_call(messages) partial_summary = result.generations[0][0].text # Vérification de la langue pour chaque segment if self.check_language(partial_summary) == "NON": partial_summary = self.translate_to_french(partial_summary) partial_summaries.append(partial_summary) except Exception as e: if "rate_limit_exceeded" in str(e): st.warning(f"Limite de taux atteinte au segment {i+1}. Attente avant nouvelle tentative...") time.sleep(60) i -= 1 continue raise e try: final_prompt = f"""Combinez ces résumés partiels en un résumé global cohérent en langue française : {' '.join(partial_summaries)} """ messages = [ SystemMessage(content="Vous êtes un assistant expert en résumé de texte en français."), HumanMessage(content=final_prompt) ] final_result = self._make_api_call(messages) final_summary = final_result.generations[0][0].text # Vérification finale de la langue if self.check_language(final_summary) == "NON": st.warning("Résumé final dans une autre langue. Traduction en cours...") final_summary = self.translate_to_french(final_summary) return final_summary except Exception as e: if "rate_limit_exceeded" in str(e): st.warning("Limite de taux atteinte lors de la génération du résumé final. Attente avant nouvelle tentative...") time.sleep(60) return self.summarize_long_transcription(transcription) raise e """def summarize_text(self, text: str) -> str: try: chunks = self.text_splitter.split_text(text) summaries = [] for chunk in chunks: messages = [ SystemMessage(content="Générez un résumé en français de ce texte:"), HumanMessage(content=chunk) ] summary = self._make_api_call(messages).generations[0][0].text summaries.append(summary) if len(summaries) > 1: final_messages = [ SystemMessage(content="Combinez ces résumés en un résumé cohérent en français:"), HumanMessage(content="\n".join(summaries)) ] final_summary = self._make_api_call(final_messages).generations[0][0].text return final_summary return summaries[0] except Exception as e: raise ValueError(f"Erreur de traitement: {str(e)}")""" def generate_docx(content: str, filename: str): doc = Document() doc.add_heading('Résumé Audio', 0) doc.add_paragraph(f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}") for line in content.split('\n'): if line.strip(): if line.startswith('#'): doc.add_heading(line.strip('# '), level=1) else: doc.add_paragraph(line) doc.save(filename) return filename def model_selection_sidebar(): """Configuration du modèle dans la barre latérale""" with st.sidebar: st.title("Configuration") model = st.selectbox( "Sélectionnez un modèle", [ "mixtral-8x7b-32768", "llama-3.3-70b-versatile", "gemma2-9b-i", "llama3-70b-8192" ] ) prompt = st.text_area( "Instructions personnalisées pour le résumé", placeholder="Ex: Résumé de réunion avec points clés et actions" ) return model, prompt def save_uploaded_file(uploaded_file) -> str: """Sauvegarde un fichier uploadé et retourne son chemin""" with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file: tmp_file.write(uploaded_file.getvalue()) return tmp_file.name def is_valid_email(email: str) -> bool: """Valide le format d'une adresse email""" pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' return bool(re.match(pattern, email)) def enhance_main(): """Fonction principale avec gestion des états et des erreurs améliorée""" st.set_page_config(page_title="Multimodal Content Summarizer", page_icon="📝") if "audio_processor" not in st.session_state: model_name, custom_prompt = model_selection_sidebar() st.session_state.audio_processor = AudioProcessor(model_name, custom_prompt) if "auth_required" not in st.session_state: st.session_state.auth_required = False # Interface principale source_type = st.radio("Type de source", ["Audio/Vidéo", "Document", "Web"]) try: if source_type == "Audio/Vidéo": process_audio_video() elif source_type == "Document": process_document() else: # Web process_web() except Exception as e: st.error(f"Une erreur est survenue: {str(e)}") st.error("Veuillez réessayer ou contacter le support.") def process_audio_video(): """Traitement des sources audio et vidéo""" source = st.radio("Choisissez votre source", ["Audio", "Vidéo locale", "YouTube"]) if source == "Audio": handle_audio_input() elif source == "Vidéo locale": handle_video_input() else: # YouTube handle_youtube_input() def handle_audio_input(): """Gestion des entrées audio""" uploaded_file = st.file_uploader("Fichier audio", type=['mp3', 'wav', 'm4a', 'ogg']) audio_bytes = audio_recorder() if uploaded_file or audio_bytes: process_and_display_results(uploaded_file, audio_bytes) def handle_video_input(): """Gestion des entrées vidéo""" uploaded_video = st.file_uploader("Fichier vidéo", type=['mp4', 'avi', 'mov', 'mkv']) if uploaded_video: st.video(uploaded_video) with st.spinner("Extraction de l'audio..."): video_processor = VideoProcessor() video_path = save_uploaded_file(uploaded_video) audio_path = video_processor.extract_audio_from_video(video_path) process_and_display_results(audio_path) def handle_youtube_input(): """Gestion des entrées YouTube""" youtube_url = st.text_input("URL YouTube") if youtube_url and st.button("Analyser"): video_processor = VideoProcessor() video_id = video_processor.extract_video_id(youtube_url) if video_id: st.video(youtube_url) with st.spinner("Traitement de la vidéo..."): transcription = video_processor.get_youtube_transcription(video_id) if transcription: process_and_display_results(None, None, transcription) else: audio_path = video_processor.download_youtube_audio(youtube_url) process_and_display_results(audio_path) def process_and_display_results(file_path=None, audio_bytes=None, transcription=None): """Traitement et affichage des résultats""" try: if transcription is None: transcription = get_transcription(file_path, audio_bytes) if transcription: display_transcription_and_summary(transcription) finally: cleanup_temporary_files() def get_transcription(file_path=None, audio_bytes=None) -> str: """Obtention de la transcription""" if file_path: path = file_path if isinstance(file_path, str) else save_uploaded_file(file_path) elif audio_bytes: path = save_audio_bytes(audio_bytes) else: return None chunks = st.session_state.audio_processor.chunk_audio(path) transcriptions = [] with st.expander("Transcription", expanded=False): progress_bar = st.progress(0) for i, chunk in enumerate(chunks): transcription = st.session_state.audio_processor.transcribe_chunk(chunk) if transcription: transcriptions.append(transcription) progress_bar.progress((i + 1) / len(chunks)) return " ".join(transcriptions) if transcriptions else None def display_transcription_and_summary(transcription: str): """Affichage de la transcription et du résumé""" st.subheader("Transcription") st.text_area("Texte transcrit:", value=transcription, height=200) st.subheader("Résumé et Analyse") summary = get_summary(transcription) st.markdown(summary) # Génération et téléchargement des documents generate_and_download_documents(summary) # Option d'envoi par email handle_email_sending(summary) def get_summary(transcription: str) -> str: """Génération du résumé""" chunks = st.session_state.audio_processor.split_text(transcription) if len(chunks) > 1: return st.session_state.audio_processor.summarize_long_transcription(transcription) return st.session_state.audio_processor.generate_summary(transcription) def generate_and_download_documents(summary: str): """Génération et téléchargement des documents""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Génération PDF pdf_filename = f"resume_{timestamp}.pdf" pdf_path = PDFGenerator.create_pdf(summary, pdf_filename) # Génération DOCX docx_filename = f"resume_{timestamp}.docx" docx_path = generate_docx(summary, docx_filename) # Boutons de téléchargement col1, col2 = st.columns(2) with col1: with open(pdf_path, "rb") as pdf_file: st.download_button( "📥 Télécharger PDF", pdf_file, file_name=pdf_filename, mime="application/pdf" ) with col2: with open(docx_path, "rb") as docx_file: st.download_button( "📥 Télécharger DOCX", docx_file, file_name=docx_filename, mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) return pdf_path def handle_email_sending(summary: str): """Gestion de l'envoi par email""" st.subheader("📧 Recevoir le résumé par email") recipient_email = st.text_input("Entrez votre adresse email:") if st.button("Envoyer par email"): if not is_valid_email(recipient_email): st.error("Veuillez entrer une adresse email valide.") return with st.spinner("Envoi de l'email en cours..."): pdf_path = generate_and_download_documents(summary) email_sender = EmailSender(Config.SENDER_EMAIL, Config.SENDER_PASSWORD) if email_sender.send_email( recipient_email, "Résumé de votre contenu audio/vidéo", "Veuillez trouver ci-joint le résumé de votre contenu.", pdf_path ): st.success("Email envoyé avec succès!") else: st.error("Échec de l'envoi de l'email.") def cleanup_temporary_files(): """Nettoyage des fichiers temporaires""" temp_files = ['temp_audio.mp3', 'temp_video.mp4'] for temp_file in temp_files: if os.path.exists(temp_file): try: os.remove(temp_file) except Exception: pass def process_document(): """Traitement des documents""" file = st.file_uploader("Chargez votre document", type=['pdf', 'docx', 'pptx', 'txt']) password = st.text_input("Mot de passe (si protégé)", type="password") if file: try: doc_processor = DocumentProcessor( st.session_state.audio_processor.llm.model_name, st.session_state.audio_processor.custom_prompt ) text = process_document_with_password(file, password, doc_processor) if text: summary = doc_processor.summarize_text(text) display_summary_and_downloads(summary) except ValueError as e: st.error(str(e)) def process_document_with_password(file, password: str, doc_processor: DocumentProcessor) -> Optional[str]: """Traitement des documents protégés par mot de passe""" file_extension = os.path.splitext(file.name)[1].lower() try: if file_extension == '.pdf': return doc_processor.process_protected_pdf(file, password) elif file_extension in ['.docx', '.pptx']: return doc_processor.process_protected_office(file, file_extension[1:], password) elif file_extension == '.txt': return file.read().decode('utf-8') else: st.error("Format de fichier non supporté") return None except ValueError as e: st.error(str(e)) return None def process_web(): """Traitement des contenus web""" url = st.text_input("URL du site web") auth_required = st.checkbox("Authentification requise") auth = None if auth_required: username = st.text_input("Nom d'utilisateur") password = st.text_input("Mot de passe", type="password") auth = {"username": username, "password": password} if url and st.button("Analyser"): try: doc_processor = DocumentProcessor( st.session_state.audio_processor.llm.model_name, st.session_state.audio_processor.custom_prompt ) text = doc_processor.scrape_web_content(url, auth) if text: summary = doc_processor.summarize_text(text) display_summary_and_downloads(summary) except ValueError as e: st.error(str(e)) def display_summary_and_downloads(summary: str): """Affichage du résumé et options de téléchargement""" st.markdown("### 📝 Résumé et Analyse") st.markdown(summary) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # Génération PDF pdf_filename = f"resume_{timestamp}.pdf" pdf_path = PDFGenerator.create_pdf(summary, pdf_filename) # Génération DOCX docx_filename = f"resume_{timestamp}.docx" docx_path = generate_docx(summary, docx_filename) # Boutons de téléchargement col1, col2 = st.columns(2) with col1: with open(pdf_path, "rb") as pdf_file: st.download_button( "📥 Télécharger PDF", pdf_file, file_name=pdf_filename, mime="application/pdf" ) with col2: with open(docx_path, "rb") as docx_file: st.download_button( "📥 Télécharger DOCX", docx_file, file_name=docx_filename, mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) # Option d'envoi par email st.markdown("### 📧 Recevoir le résumé par email") recipient_email = st.text_input("Entrez votre adresse email:") if st.button("Envoyer par email"): if not is_valid_email(recipient_email): st.error("Veuillez entrer une adresse email valide.") else: with st.spinner("Envoi de l'email en cours..."): email_sender = EmailSender(Config.SENDER_EMAIL, Config.SENDER_PASSWORD) if email_sender.send_email( recipient_email, "Résumé de votre contenu", "Veuillez trouver ci-joint le résumé de votre contenu.", pdf_path ): st.success("Email envoyé avec succès!") else: st.error("Échec de l'envoi de l'email.") def save_audio_bytes(audio_bytes: bytes) -> str: """Sauvegarde les bytes audio dans un fichier temporaire""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_path = f"recording_{timestamp}.wav" with open(file_path, 'wb') as f: f.write(audio_bytes) return file_path if __name__ == "__main__": try: enhance_main() except Exception as e: st.error(f"Une erreur inattendue est survenue: {str(e)}") st.error("Veuillez réessayer ou contacter le support technique.") finally: cleanup_temporary_files()