Spaces:
Sleeping
Sleeping
# Imports nécessaires | |
import os | |
import uuid | |
#os.system('yt-dlp --cookies-from-browser chrome') | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
import json | |
from datasets import load_dataset | |
import streamlit as st | |
from audio_recorder_streamlit import audio_recorder | |
import msoffcrypto | |
import docx | |
import pptx | |
#import pymupdf4llm | |
import tempfile | |
from typing import List, Optional, Dict, Any | |
from pydub import AudioSegment | |
from groq import Groq | |
from langchain.chains import LLMChain | |
from langchain_groq import ChatGroq | |
from langchain.prompts import PromptTemplate | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.schema import AIMessage, HumanMessage, SystemMessage | |
from datetime import datetime | |
import smtplib | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
from email.mime.application import MIMEApplication | |
from reportlab.lib import colors | |
from reportlab.lib.pagesizes import letter | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
import re | |
from docx import Document | |
from pytube import YouTube | |
from moviepy import VideoFileClip | |
import yt_dlp | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from urllib.parse import urlparse, parse_qs | |
import mimetypes | |
from ratelimit import limits, sleep_and_retry | |
import time | |
import fasttext | |
import requests | |
from requests.auth import HTTPBasicAuth | |
import pikepdf | |
import io | |
import pypdf | |
from PyPDF2 import PdfReader | |
from pptx import Presentation | |
import trafilatura | |
from bs4 import BeautifulSoup | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Chargement des variables d'environnement | |
load_dotenv() | |
SENDER_EMAIL = os.environ.get('SENDER_EMAIL') | |
SENDER_PASSWORD = os.environ.get('SENDER_PASSWORD') | |
# Configuration globale | |
class Config: | |
FASTTEXT_MODEL_PATH = "lid.176.bin" | |
# Téléchargement du modèle FastText si nécessaire | |
if not os.path.exists(Config.FASTTEXT_MODEL_PATH): | |
import urllib.request | |
urllib.request.urlretrieve( | |
'https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin', | |
Config.FASTTEXT_MODEL_PATH | |
) | |
# Classes principales | |
class PDFGenerator: | |
def create_pdf(content: str, filename: str) -> str: | |
doc = SimpleDocTemplate(filename, pagesize=letter) | |
styles = getSampleStyleSheet() | |
custom_style = ParagraphStyle( | |
'CustomStyle', | |
parent=styles['Normal'], | |
spaceBefore=12, | |
spaceAfter=12, | |
fontSize=12, | |
leading=14, | |
) | |
story = [] | |
title_style = ParagraphStyle( | |
'CustomTitle', | |
parent=styles['Heading1'], | |
fontSize=16, | |
spaceAfter=30, | |
) | |
story.append(Paragraph("Résumé", title_style)) | |
story.append(Paragraph(f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}", custom_style)) | |
story.append(Spacer(1, 20)) | |
for line in content.split('\n'): | |
if line.strip(): | |
if line.startswith('#'): | |
story.append(Paragraph(line.strip('# '), styles['Heading2'])) | |
else: | |
story.append(Paragraph(line, custom_style)) | |
doc.build(story) | |
return filename | |
class EmailSender: | |
def __init__(self, sender_email: str, sender_password: str): | |
self.sender_email = sender_email | |
self.sender_password = sender_password | |
def send_email(self, recipient_email: str, subject: str, body: str, pdf_path: str) -> bool: | |
try: | |
msg = MIMEMultipart() | |
msg['From'] = self.sender_email | |
msg['To'] = recipient_email | |
msg['Subject'] = subject | |
msg.attach(MIMEText(body, 'plain')) | |
with open(pdf_path, 'rb') as f: | |
pdf_attachment = MIMEApplication(f.read(), _subtype='pdf') | |
pdf_attachment.add_header('Content-Disposition', 'attachment', filename=os.path.basename(pdf_path)) | |
msg.attach(pdf_attachment) | |
server = smtplib.SMTP('smtp.gmail.com', 587) | |
server.starttls() | |
server.login(self.sender_email, self.sender_password) | |
server.send_message(msg) | |
server.quit() | |
return True | |
except Exception as e: | |
st.error(f"Erreur d'envoi d'email: {str(e)}") | |
return False | |
class AudioProcessor: | |
def __init__(self, model_name: str, prompt: str = None, chunk_length_ms: int = 300000): | |
self.chunk_length_ms = chunk_length_ms | |
self.llm = ChatGroq(model=model_name, temperature=0) | |
self.custom_prompt = prompt | |
self.language_detector = fasttext.load_model(Config.FASTTEXT_MODEL_PATH) | |
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200) | |
def check_language(self, text: str) -> str: | |
prediction = self.language_detector.predict(text.replace('\n', ' ')) | |
return "OUI" if prediction[0][0] == '__label__fr' else "NON" | |
def translate_to_french(self, text: str) -> str: | |
messages = [ | |
SystemMessage(content="Traduisez ce texte en français :"), | |
HumanMessage(content=text) | |
] | |
result = self._make_api_call(messages) | |
return result.generations[0][0].text | |
def _make_api_call(self, messages): | |
return self.llm.generate([messages]) | |
def chunk_audio(self, file_path: str) -> List[AudioSegment]: | |
audio = AudioSegment.from_file(file_path) | |
return [ | |
audio[i:i + self.chunk_length_ms] | |
for i in range(0, len(audio), self.chunk_length_ms) | |
] | |
def transcribe_chunk(self, audio_chunk: AudioSegment) -> str: | |
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as temp_file: | |
audio_chunk.export(temp_file.name, format="mp3") | |
with open(temp_file.name, "rb") as audio_file: | |
response = self.groq_client.audio.transcriptions.create( | |
file=audio_file, | |
model="whisper-large-v3-turbo", | |
language="fr" | |
) | |
os.unlink(temp_file.name) | |
return response.text | |
def generate_summary(self, transcription: str) -> str: | |
default_prompt = """ | |
# Résumé | |
[résumé ici] | |
# Points Clés | |
• [point 1] | |
• [point 2] | |
# Actions Recommandées | |
1. [action 1] | |
2. [action 2] | |
# Conclusion | |
[conclusion ici] | |
""" | |
prompt_template = self.custom_prompt or default_prompt | |
chain = LLMChain( | |
llm=self.llm, | |
prompt=PromptTemplate(template=prompt_template, input_variables=["transcript"]) | |
) | |
summary = chain.run(transcript=transcription) | |
if self.check_language(summary) == "NON": | |
summary = self.translate_to_french(summary) | |
return summary | |
class VideoProcessor: | |
def __init__(self): | |
self.supported_formats = ['.mp4', '.avi', '.mov', '.mkv'] | |
self.cookie_file_path = "cookies.txt" | |
def load_cookies(self): | |
dataset = load_dataset("Adjoumani/YoutubeCookiesDataset") | |
cookies = dataset["train"]["cookies"][0] | |
with open(self.cookie_file_path, "w") as f: | |
f.write(cookies) | |
def extract_video_id(self, url: str) -> str: | |
parsed_url = urlparse(url) | |
if parsed_url.hostname in ['www.youtube.com', 'youtube.com']: | |
return parse_qs(parsed_url.query)['v'][0] | |
elif parsed_url.hostname == 'youtu.be': | |
return parsed_url.path[1:] | |
return None | |
def get_youtube_transcription(self, video_id: str) -> Optional[str]: | |
try: | |
transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=['fr', 'en']) | |
return ' '.join(entry['text'] for entry in transcript_list) | |
except Exception: | |
return None | |
def download_youtube_audio(self, url: str) -> str: | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'outtmpl': 'temp_audio.%(ext)s', | |
'cookiefile': self.cookie_file_path, | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
return 'temp_audio.mp3' | |
def extract_audio_from_video(self, video_path: str) -> str: | |
audio_path = f"{os.path.splitext(video_path)[0]}.mp3" | |
with VideoFileClip(video_path) as video: | |
video.audio.write_audiofile(audio_path) | |
return audio_path | |
class DocumentProcessor: | |
def __init__(self, model_name: str, prompt: str = None): | |
self.llm = ChatGroq(model=model_name, temperature=0) | |
self.custom_prompt = prompt | |
self.language_detector = fasttext.load_model(Config.FASTTEXT_MODEL_PATH) | |
def process_protected_pdf(self, file_path: str, password: str = None) -> str: | |
if password: | |
with pikepdf.open(file_path, password=password) as pdf: | |
unlocked_pdf_path = "unlocked_temp.pdf" | |
pdf.save(unlocked_pdf_path) | |
reader = PdfReader(unlocked_pdf_path) | |
text = "\n".join(page.extract_text() for page in reader.pages) | |
os.remove(unlocked_pdf_path) | |
else: | |
reader = PdfReader(file_path) | |
text = "\n".join(page.extract_text() for page in reader.pages) | |
return text | |
def process_protected_office(self, file, file_type: str, password: str = None) -> str: | |
if password: | |
office_file = msoffcrypto.OfficeFile(file) | |
office_file.load_key(password=password) | |
decrypted = io.BytesIO() | |
office_file.decrypt(decrypted) | |
if file_type == 'docx': | |
doc = Document(decrypted) | |
return "\n".join([p.text for p in doc.paragraphs]) | |
elif file_type == 'pptx': | |
ppt = Presentation(decrypted) | |
return "\n".join([shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, "text")]) | |
else: | |
if file_type == 'docx': | |
doc = Document(file) | |
return "\n".join([p.text for p in doc.paragraphs]) | |
elif file_type == 'pptx': | |
ppt = Presentation(file) | |
return "\n".join([shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, "text")]) | |
def model_selection_sidebar(): | |
with st.sidebar: | |
st.title("Configuration") | |
model = st.selectbox( | |
"Sélectionnez un modèle", | |
["mixtral-8x7b-32768", "llama-3.3-70b-versatile", "gemma2-9b-i", "llama3-70b-8192"] | |
) | |
prompt = st.text_area( | |
"Instructions personnalisées pour le résumé", | |
placeholder="Ex: Résumé de réunion avec points clés et actions" | |
) | |
return model, prompt | |
def save_uploaded_file(uploaded_file) -> str: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_file: | |
tmp_file.write(uploaded_file.getvalue()) | |
return tmp_file.name | |
def is_valid_email(email: str) -> bool: | |
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' | |
return bool(re.match(pattern, email)) | |
def enhance_main(): | |
st.title("🧠 MultiModal Genius - Résumé Intelligent de Contenus Multimédias") | |
st.subheader("Transformez vidéos, audios, textes, pages webs et plus en résumés clairs grâce à l'IA") | |
if "audio_processor" not in st.session_state: | |
model_name, custom_prompt = model_selection_sidebar() | |
st.session_state.audio_processor = AudioProcessor(model_name, custom_prompt) | |
if "auth_required" not in st.session_state: | |
st.session_state.auth_required = False | |
source_type = st.radio("Type de source", ["Audio/Vidéo", "Document", "Web"]) | |
try: | |
if source_type == "Audio/Vidéo": | |
process_audio_video() | |
elif source_type == "Document": | |
process_document() | |
else: # Web | |
process_web() | |
except Exception as e: | |
st.error(f"Une erreur est survenue: {str(e)}") | |
st.error("Veuillez réessayer ou contacter le support.") | |
def process_audio_video(): | |
source = st.radio("Choisissez votre source", ["Audio", "Vidéo locale", "YouTube"]) | |
if source == "Audio": | |
handle_audio_input() | |
elif source == "Vidéo locale": | |
handle_video_input() | |
else: # YouTube | |
handle_youtube_input() | |
def handle_audio_input(): | |
uploaded_file = st.file_uploader("Fichier audio", type=['mp3', 'wav', 'm4a', 'ogg']) | |
audio_bytes = audio_recorder() | |
if uploaded_file or audio_bytes: | |
process_and_display_results(uploaded_file, audio_bytes) | |
def handle_video_input(): | |
uploaded_video = st.file_uploader("Fichier vidéo", type=['mp4', 'avi', 'mov', 'mkv']) | |
if uploaded_video: | |
st.video(uploaded_video) | |
with st.spinner("Extraction de l'audio..."): | |
video_processor = VideoProcessor() | |
video_path = save_uploaded_file(uploaded_video) | |
audio_path = video_processor.extract_audio_from_video(video_path) | |
process_and_display_results(audio_path) | |
def handle_youtube_input(): | |
youtube_url = st.text_input("URL YouTube") | |
if youtube_url and st.button("Analyser"): | |
video_processor = VideoProcessor() | |
video_id = video_processor.extract_video_id(youtube_url) | |
if video_id: | |
st.video(youtube_url) | |
with st.spinner("Traitement de la vidéo..."): | |
transcription = video_processor.get_youtube_transcription(video_id) | |
if transcription: | |
process_and_display_results(None, None, transcription) | |
else: | |
video_processor.load_cookies() | |
audio_path = video_processor.download_youtube_audio(youtube_url) | |
process_and_display_results(audio_path) | |
def process_and_display_results(file_path=None, audio_bytes=None, transcription=None): | |
if transcription is None: | |
if file_path: | |
path = file_path if isinstance(file_path, str) else save_uploaded_file(file_path) | |
elif audio_bytes: | |
path = save_audio_bytes(audio_bytes) | |
else: | |
return | |
chunks = st.session_state.audio_processor.chunk_audio(path) | |
transcriptions = [] | |
with st.expander("Transcription", expanded=False): | |
progress_bar = st.progress(0) | |
for i, chunk in enumerate(chunks): | |
transcription = st.session_state.audio_processor.transcribe_chunk(chunk) | |
if transcription: | |
transcriptions.append(transcription) | |
progress_bar.progress((i + 1) / len(chunks)) | |
transcription = " ".join(transcriptions) if transcriptions else None | |
if transcription: | |
display_transcription_and_summary(transcription) | |
def save_audio_bytes(audio_bytes: bytes) -> str: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
file_path = f"recording_{timestamp}.wav" | |
with open(file_path, 'wb') as f: | |
f.write(audio_bytes) | |
return file_path | |
def display_transcription_and_summary(transcription: str): | |
st.subheader("Transcription") | |
st.text_area("Texte transcrit:", value=transcription, height=200) | |
st.subheader("Résumé et Analyse") | |
summary = get_summary(transcription) | |
st.markdown(summary) | |
display_summary_and_downloads(summary) | |
def get_summary(full_transcription): | |
if full_transcription is not None: | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=4000 * 4, | |
chunk_overlap=200, | |
length_function=len, | |
separators=["\n\n", "\n", " ", ""] | |
) | |
chunks = text_splitter.split_text(full_transcription) | |
if len(chunks) > 1: | |
summary = st.session_state.audio_processor.summarize_long_transcription(full_transcription) | |
else: | |
summary = st.session_state.audio_processor.generate_summary(full_transcription) | |
else: | |
st.error("La transcription a échoué") | |
return None | |
return summary | |
def display_summary_and_downloads(summary: str): | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
pdf_filename = f"resume_{timestamp}.pdf" | |
pdf_path = PDFGenerator.create_pdf(summary, pdf_filename) | |
docx_filename = f"resume_{timestamp}.docx" | |
docx_path = generate_docx(summary, docx_filename) | |
col1, col2 = st.columns(2) | |
with col1: | |
with open(pdf_path, "rb") as pdf_file: | |
st.download_button( | |
"📥 Télécharger PDF", | |
pdf_file, | |
file_name=pdf_filename, | |
mime="application/pdf" | |
) | |
with col2: | |
with open(docx_path, "rb") as docx_file: | |
st.download_button( | |
"📥 Télécharger DOCX", | |
docx_file, | |
file_name=docx_filename, | |
mime="application/vnd.openxmlformats-officedocument.wordprocessingml.document" | |
) | |
st.markdown("### 📧 Recevoir le résumé par email") | |
recipient_email = st.text_input("Entrez votre adresse email:") | |
if st.button("Envoyer par email"): | |
if not is_valid_email(recipient_email): | |
st.error("Veuillez entrer une adresse email valide.") | |
else: | |
with st.spinner("Envoi de l'email en cours..."): | |
email_sender = EmailSender(SENDER_EMAIL, SENDER_PASSWORD) | |
if email_sender.send_email( | |
recipient_email, | |
"Résumé de votre contenu", | |
"Veuillez trouver ci-joint le résumé de votre contenu.", | |
pdf_path | |
): | |
st.success("Email envoyé avec succès!") | |
else: | |
st.error("Échec de l'envoi de l'email.") | |
def generate_docx(content: str, filename: str): | |
doc = Document() | |
doc.add_heading('Résumé', 0) | |
doc.add_paragraph(f"Date: {datetime.now().strftime('%d/%m/%Y %H:%M')}") | |
for line in content.split('\n'): | |
if line.strip(): | |
if line.startswith('#'): | |
doc.add_heading(line.strip('# '), level=1) | |
else: | |
doc.add_paragraph(line) | |
doc.save(filename) | |
return filename | |
if __name__ == "__main__": | |
try: | |
enhance_main() | |
except Exception as e: | |
st.error(f"Une erreur inattendue est survenue: {str(e)}") | |
st.error("Veuillez réessayer ou contacter le support technique.") | |
finally: | |
cleanup_temporary_files() | |
def cleanup_temporary_files(): | |
temp_files = ['temp_audio.mp3', 'temp_video.mp4'] | |
for temp_file in temp_files: | |
if os.path.exists(temp_file): | |
try: | |
os.remove(temp_file) | |
except Exception: | |
pass |