Spaces:
Runtime error
Runtime error
import discord | |
import logging | |
import os | |
import re | |
import asyncio | |
import subprocess | |
import aiohttp | |
import time | |
from huggingface_hub import InferenceClient | |
from googleapiclient.discovery import build | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
from youtube_transcript_api.formatters import TextFormatter | |
from dotenv import load_dotenv | |
from pytube import YouTube | |
import whisper | |
import torch | |
from transformers import AutoProcessor, AutoModelForSpeechSeq2Seq | |
import librosa | |
# νκ²½ λ³μ λ‘λ | |
load_dotenv() | |
# λ‘κΉ μ€μ | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(name)s:%(message)s', handlers=[logging.StreamHandler()]) | |
# μΈν νΈ μ€μ | |
intents = discord.Intents.default() | |
intents.message_content = True | |
intents.messages = True | |
intents.guilds = True | |
intents.guild_messages = True | |
# μΆλ‘ API ν΄λΌμ΄μΈνΈ μ€μ | |
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN")) | |
whisper_client = InferenceClient("openai/whisper-large-v3", token=os.getenv("HF_TOKEN")) | |
# YouTube API μ€μ | |
API_KEY = os.getenv("YOUTUBE_API_KEY") | |
youtube_service = build('youtube', 'v3', developerKey=API_KEY) | |
# νΉμ μ±λ ID | |
SPECIFIC_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) | |
# μ μ‘ μ€ν¨ μ μ¬μλ νμ | |
MAX_RETRIES = 3 | |
class MyClient(discord.Client): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.is_processing = False | |
self.session = None | |
async def on_ready(self): | |
logging.info(f'{self.user}λ‘ λ‘κ·ΈμΈλμμ΅λλ€!') | |
# web.py νμΌ μ€ν | |
subprocess.Popen(["python", "web.py"]) | |
logging.info("Web.py μλ²κ° μμλμμ΅λλ€.") | |
# aiohttp ν΄λΌμ΄μΈνΈ μΈμ μμ± | |
self.session = aiohttp.ClientSession() | |
# λ΄μ΄ μμλ λ μλ΄ λ©μμ§λ₯Ό μ μ‘ | |
channel = self.get_channel(SPECIFIC_CHANNEL_ID) | |
if channel: | |
await channel.send("μ νλΈ λΉλμ€ URLμ μ λ ₯νλ©΄, μλ§κ³Ό λκΈμ κΈ°λ°μΌλ‘ λ΅κΈμ μμ±ν©λλ€.") | |
async def on_message(self, message): | |
if message.author == self.user: | |
return | |
if not self.is_message_in_specific_channel(message): | |
return | |
if self.is_processing: | |
return | |
self.is_processing = True | |
try: | |
video_id = extract_video_id(message.content) | |
if video_id: | |
transcript, language = await get_best_available_transcript(video_id) | |
comments = await get_video_comments(video_id) | |
if comments: | |
if transcript: | |
replies = await generate_replies(comments, transcript) | |
await create_thread_and_send_replies(message, video_id, comments, replies, self.session) | |
else: | |
await message.channel.send("μλ§μ κ°μ Έμ¬ μ μμ΅λλ€. Whisper λͺ¨λΈμ μ¬μ©νμ¬ μλ§μ μμ±ν©λλ€.") | |
transcript = await generate_whisper_transcript(video_id) | |
if transcript: | |
replies = await generate_replies(comments, transcript) | |
await create_thread_and_send_replies(message, video_id, comments, replies, self.session) | |
else: | |
await message.channel.send("Whisper λͺ¨λΈλ‘λ μλ§μ μμ±ν μ μμ΅λλ€. λκΈλ§μ κΈ°λ°μΌλ‘ λ΅λ³μ μμ±ν©λλ€.") | |
replies = await generate_replies(comments, "") | |
await create_thread_and_send_replies(message, video_id, comments, replies, self.session) | |
else: | |
await message.channel.send("λκΈμ κ°μ Έμ¬ μ μμ΅λλ€.") | |
else: | |
await message.channel.send("μ ν¨ν μ νλΈ λΉλμ€ URLμ μ κ³΅ν΄ μ£ΌμΈμ.") | |
finally: | |
self.is_processing = False | |
def is_message_in_specific_channel(self, message): | |
return message.channel.id == SPECIFIC_CHANNEL_ID or ( | |
isinstance(message.channel, discord.Thread) and message.channel.parent_id == SPECIFIC_CHANNEL_ID | |
) | |
async def close(self): | |
# aiohttp ν΄λΌμ΄μΈνΈ μΈμ μ’ λ£ | |
if self.session: | |
await self.session.close() | |
await super().close() | |
def extract_video_id(url): | |
video_id = None | |
youtube_regex = ( | |
r'(https?://)?(www\.)?' | |
'(youtube|youtu|youtube-nocookie)\.(com|be)/' | |
'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})') | |
match = re.match(youtube_regex, url) | |
if match: | |
video_id = match.group(6) | |
logging.debug(f'μΆμΆλ λΉλμ€ ID: {video_id}') | |
return video_id | |
async def get_best_available_transcript(video_id, max_retries=5, delay=10): | |
async def fetch_transcript(language): | |
try: | |
transcript = await asyncio.to_thread(YouTubeTranscriptApi.get_transcript, video_id, languages=[language]) | |
return transcript, language | |
except (NoTranscriptFound, TranscriptsDisabled): | |
logging.warning(f'{language} μλ§μ΄ μ 곡λμ§ μμ.') | |
return None, None | |
except Exception as e: | |
logging.warning(f'{language} μλ§ κ°μ Έμ€κΈ° μ€λ₯: {e}') | |
return None, None | |
for attempt in range(max_retries): | |
# μ°μ νκ΅μ΄ μλ§μ μλ | |
ko_transcript, ko_lang = await fetch_transcript('ko') | |
if ko_transcript: | |
return ko_transcript, ko_lang | |
# μμ΄ μλ§ μλ | |
en_transcript, en_lang = await fetch_transcript('en') | |
if en_transcript: | |
return en_transcript, en_lang | |
try: | |
# λΉλμ€μ μλ§ λͺ©λ‘μ΄ μλμ§ νμΈ | |
transcripts = await asyncio.to_thread(YouTubeTranscriptApi.list_transcripts, video_id) | |
# μλμΌλ‘ μμ±λ μλ§μ΄ μλμ§ νμΈ | |
manual_transcript = transcripts.find_manually_created_transcript(['ko', 'en']) | |
transcript = await asyncio.to_thread(manual_transcript.fetch) | |
return transcript, manual_transcript.language_code | |
except (NoTranscriptFound, TranscriptsDisabled) as e: | |
logging.warning(f'μλ μλ§μ μ°Ύμ μ μμ: {e}') | |
except Exception as e: | |
if attempt < max_retries - 1: | |
logging.error(f'μλ§ κ°μ Έμ€κΈ° μ€ν¨ (μλ {attempt + 1}/{max_retries}): {e}') | |
await asyncio.sleep(delay) | |
else: | |
logging.error(f'μ΅μ’ μλ§ κ°μ Έμ€κΈ° μ€ν¨: {e}') | |
return None, None | |
return None, None | |
async def generate_whisper_transcript(video_id): | |
try: | |
# YouTube λΉλμ€ λ€μ΄λ‘λ | |
yt = YouTube(f'https://www.youtube.com/watch?v={video_id}') | |
audio_stream = yt.streams.filter(only_audio=True).first() | |
audio_file = audio_stream.download(output_path='temp', filename=f'{video_id}.mp3') | |
# μ€λμ€ νμΌ λ‘λ | |
audio, sr = librosa.load(audio_file, sr=16000) | |
# Whisper λͺ¨λΈ λ° νλ‘μΈμ λ‘λ | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
processor = AutoProcessor.from_pretrained("openai/whisper-large-v3") | |
model = AutoModelForSpeechSeq2Seq.from_pretrained("openai/whisper-large-v3").to(device) | |
# μ€λμ€ μ²λ¦¬ | |
input_features = processor(audio, sampling_rate=sr, return_tensors="pt").input_features.to(device) | |
# μμ± | |
predicted_ids = model.generate(input_features) | |
transcription = processor.batch_decode(predicted_ids, skip_special_tokens=True) | |
# μμ νμΌ μμ | |
os.remove(audio_file) | |
return transcription[0] | |
except Exception as e: | |
logging.error(f'Whisper μλ§ μμ± μ€ν¨: {e}') | |
return None | |
async def get_video_comments(video_id): | |
comments = [] | |
response = youtube_service.commentThreads().list( | |
part='snippet', | |
videoId=video_id, | |
maxResults=100 # μ΅λ 100κ°μ λκΈ κ°μ Έμ€κΈ° | |
).execute() | |
for item in response.get('items', []): | |
comment = item['snippet']['topLevelComment']['snippet']['textOriginal'] | |
comment_id = item['snippet']['topLevelComment']['id'] | |
comments.append((comment, comment_id)) | |
logging.debug(f'κ°μ Έμ¨ λκΈ: {comments}') | |
return comments | |
async def generate_replies(comments, transcript): | |
replies = [] | |
for comment, _ in comments: | |
messages = [ | |
{"role": "system", "content": f"""λμ μ΄λ¦μ OpenFreeAIμ΄λ€. λ΅κΈ μμ±ν κ°μ₯ λ§μ§λ§μ λμ μ΄λ¦μ λ°νκ³ κ³΅μνκ² μΈμ¬νλΌ. λΉλμ€ μλ§: {transcript}"""}, | |
{"role": "user", "content": comment} | |
] | |
loop = asyncio.get_event_loop() | |
response = await loop.run_in_executor(None, lambda: hf_client.chat_completion( | |
messages, max_tokens=250, temperature=0.7, top_p=0.85)) | |
if response.choices and response.choices[0].message: | |
reply = response.choices[0].message['content'].strip() | |
else: | |
reply = "λ΅κΈμ μμ±ν μ μμ΅λλ€." | |
replies.append(reply) | |
logging.debug(f'μμ±λ λ΅κΈ: {replies}') | |
return replies | |
async def create_thread_and_send_replies(message, video_id, comments, replies, session): | |
thread = await message.channel.create_thread(name=f"{message.author.name}μ λκΈ λ΅κΈ", message=message) | |
for (comment, comment_id), reply in zip(comments, replies): | |
embed = discord.Embed(description=f"**λκΈ**: {comment}\n**λ΅κΈ**: {reply}") | |
await thread.send(embed=embed) | |
if __name__ == "__main__": | |
discord_client = MyClient(intents=intents) | |
discord_client.run(os.getenv('DISCORD_TOKEN')) |