Spaces:
Runtime error
Runtime error
import discord | |
import logging | |
import os | |
import re | |
import asyncio | |
import subprocess | |
from huggingface_hub import InferenceClient | |
from googleapiclient.discovery import build | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from youtube_transcript_api.formatters import TextFormatter | |
from dotenv import load_dotenv | |
# ํ๊ฒฝ ๋ณ์ ๋ก๋ | |
load_dotenv() | |
# JSON_TOKEN.json ํ์ผ์ ๊ฒฝ๋ก | |
credentials_path = 'JSON_TOKEN.json' | |
token_path = 'token.json' | |
# ๋ก๊น ์ค์ | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:%(levelname)s:%(name)s:%(message)s', handlers=[logging.StreamHandler()]) | |
# ์ธํ ํธ ์ค์ | |
intents = discord.Intents.default() | |
intents.message_content = True | |
intents.messages = True | |
intents.guilds = True | |
intents.guild_messages = True | |
# ์ถ๋ก API ํด๋ผ์ด์ธํธ ์ค์ | |
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus", token=os.getenv("HF_TOKEN")) | |
# OAuth 2.0 ์ธ์ฆ ์ค์ | |
SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"] | |
creds = None | |
def authorize(): | |
flow = InstalledAppFlow.from_client_secrets_file(credentials_path, SCOPES) | |
auth_url, _ = flow.authorization_url(prompt='consent') | |
print('Please go to this URL and finish the authentication: {}'.format(auth_url)) | |
# ์ธ์ฆ ์ฝ๋๋ฅผ ์ ๋ ฅํ๋๋ก ์์ฒญ | |
code = input('Enter the authorization code: ') | |
flow.fetch_token(code=code) | |
creds = flow.credentials | |
with open(token_path, 'w') as token: | |
token.write(creds.to_json()) | |
return creds | |
if os.path.exists(token_path): | |
creds = Credentials.from_authorized_user_file(token_path, SCOPES) | |
else: | |
creds = authorize() | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
creds = authorize() | |
youtube_service = build('youtube', 'v3', credentials=creds) | |
# ํน์ ์ฑ๋ ID | |
SPECIFIC_CHANNEL_ID = int(os.getenv("DISCORD_CHANNEL_ID")) | |
class MyClient(discord.Client): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.is_processing = False | |
async def on_ready(self): | |
logging.info(f'{self.user}๋ก ๋ก๊ทธ์ธ๋์์ต๋๋ค!') | |
# web.py ํ์ผ ์คํ | |
subprocess.Popen(["python", "web.py"]) | |
logging.info("Web.py server has been started.") | |
# ๋ด์ด ์์๋ ๋ ์๋ด ๋ฉ์์ง๋ฅผ ์ ์ก | |
channel = self.get_channel(SPECIFIC_CHANNEL_ID) | |
if channel: | |
await channel.send("์ ํ๋ธ ๋น๋์ค URL์ ์ ๋ ฅํ๋ฉด, ์๋ง๊ณผ ๋๊ธ์ ๊ธฐ๋ฐ์ผ๋ก ๋ต๊ธ์ ์์ฑํฉ๋๋ค.") | |
async def on_message(self, message): | |
if message.author == self.user: | |
return | |
if not self.is_message_in_specific_channel(message): | |
return | |
if self.is_processing: | |
return | |
self.is_processing = True | |
try: | |
video_id = extract_video_id(message.content) | |
if video_id: | |
transcript = await get_best_available_transcript(video_id) | |
comments = await get_video_comments(video_id) | |
if comments and transcript: | |
replies = await generate_replies(comments, transcript) | |
await create_thread_and_send_replies(message, video_id, comments, replies) | |
await post_replies_to_youtube(video_id, comments, replies) | |
else: | |
await message.channel.send("์๋ง์ด๋ ๋๊ธ์ ๊ฐ์ ธ์ฌ ์ ์์ต๋๋ค.") | |
else: | |
await message.channel.send("์ ํจํ ์ ํ๋ธ ๋น๋์ค URL์ ์ ๊ณตํด ์ฃผ์ธ์.") | |
finally: | |
self.is_processing = False | |
def is_message_in_specific_channel(self, message): | |
# ๋ฉ์์ง๊ฐ ์ง์ ๋ ์ฑ๋์ด๊ฑฐ๋, ํด๋น ์ฑ๋์ ์ฐ๋ ๋์ธ ๊ฒฝ์ฐ True ๋ฐํ | |
return message.channel.id == SPECIFIC_CHANNEL_ID or ( | |
isinstance(message.channel, discord.Thread) and message.channel.parent_id == SPECIFIC_CHANNEL_ID | |
) | |
def extract_video_id(url): | |
""" | |
YouTube ๋น๋์ค URL์์ ๋น๋์ค ID๋ฅผ ์ถ์ถํฉ๋๋ค. | |
""" | |
video_id = None | |
youtube_regex = ( | |
r'(https?://)?(www\.)?' | |
'(youtube|youtu|youtube-nocookie)\.(com|be)/' | |
'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})') | |
match = re.match(youtube_regex, url) | |
if match: | |
video_id = match.group(6) | |
logging.debug(f'Extracted video ID: {video_id}') | |
return video_id | |
async def get_best_available_transcript(video_id): | |
""" | |
YouTube ๋น๋์ค์ ์๋ง์ ๊ฐ์ ธ์ต๋๋ค. | |
""" | |
try: | |
# ํ๊ตญ์ด ์๋ง ์๋ | |
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['ko']) | |
except Exception as e: | |
logging.warning(f'Error fetching Korean transcript: {e}') | |
try: | |
# ํ๊ตญ์ด ์๋ง์ด ์์ผ๋ฉด ์์ด ์๋ง ์๋ | |
transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en']) | |
except Exception as e: | |
logging.warning(f'Error fetching English transcript: {e}') | |
try: | |
# ์์ด ์๋ง๋ ์์ผ๋ฉด ๋ค๋ฅธ ์ธ์ด ์๋ง ์๋ | |
transcripts = YouTubeTranscriptApi.list_transcripts(video_id) | |
transcript = transcripts.find_manually_created_transcript().fetch() | |
except Exception as e: | |
logging.error(f'Error fetching alternative transcript: {e}') | |
return None | |
# ์๋ง ํฌ๋งทํ | |
formatter = TextFormatter() | |
transcript_text = formatter.format_transcript(transcript) | |
logging.debug(f'Fetched transcript: {transcript_text}') | |
return transcript_text | |
async def get_video_comments(video_id): | |
""" | |
YouTube ๋น๋์ค์ ๋๊ธ์ ๊ฐ์ ธ์ต๋๋ค. | |
""" | |
comments = [] | |
response = youtube_service.commentThreads().list( | |
part='snippet', | |
videoId=video_id, | |
maxResults=100 # ์ต๋ 100๊ฐ์ ๋๊ธ ๊ฐ์ ธ์ค๊ธฐ | |
).execute() | |
for item in response.get('items', []): | |
comment = item['snippet']['topLevelComment']['snippet']['textOriginal'] | |
comment_id = item['snippet']['topLevelComment']['id'] | |
comments.append((comment, comment_id)) # ๋๊ธ๊ณผ ๋๊ธ ID๋ฅผ ํจ๊ป ์ ์ฅ | |
logging.debug(f'Fetched comments: {comments}') | |
return comments | |
async def generate_replies(comments, transcript): | |
""" | |
๋๊ธ๊ณผ ์๋ง์ ๊ธฐ๋ฐ์ผ๋ก LLM ๋ต๊ธ์ ์์ฑํฉ๋๋ค. | |
""" | |
replies = [] | |
for comment, _ in comments: | |
messages = [ | |
{"role": "system", "content": f"๋น๋์ค ์๋ง: {transcript}"}, | |
{"role": "user", "content": comment} | |
] | |
loop = asyncio.get_event_loop() | |
response = await loop.run_in_executor(None, lambda: hf_client.chat_completion( | |
messages, max_tokens=400, temperature=0.7, top_p=0.85)) # max_tokens ๊ฐ์ ์กฐ์ | |
if response.choices and response.choices[0].message: | |
reply = response.choices[0].message['content'].strip() | |
else: | |
reply = "๋ต๊ธ์ ์์ฑํ ์ ์์ต๋๋ค." | |
replies.append(reply) | |
logging.debug(f'Generated replies: {replies}') | |
return replies | |
async def create_thread_and_send_replies(message, video_id, comments, replies): | |
""" | |
๋๊ธ๊ณผ ๋ต๊ธ์ ์๋ก์ด ์ฐ๋ ๋์ ์ ์กํฉ๋๋ค. | |
""" | |
thread = await message.channel.create_thread(name=f"{message.author.name}์ ๋๊ธ ๋ต๊ธ", message=message) | |
for (comment, _), reply in zip(comments, replies): | |
embed = discord.Embed(description=f"**๋๊ธ**: {comment}\n**๋ต๊ธ**: {reply}") | |
await thread.send(embed=embed) | |
async def post_replies_to_youtube(video_id, comments, replies): | |
""" | |
์์ฑ๋ ๋ต๊ธ์ YouTube ๋๊ธ๋ก ๊ฒ์ํฉ๋๋ค. | |
""" | |
for (comment, comment_id), reply in zip(comments, replies): | |
try: | |
response = youtube_service.comments().insert( | |
part='snippet', | |
body={ | |
'snippet': { | |
'parentId': comment_id, | |
'textOriginal': reply | |
} | |
} | |
).execute() | |
logging.debug(f'Posted reply to comment: {comment_id} with response: {response}') | |
except Exception as e: | |
logging.error(f'Error posting reply to comment {comment_id}: {e}') | |
logging.debug(f'Response: {e.resp} | Content: {e.content}') | |
if __name__ == "__main__": | |
discord_client = MyClient(intents=intents) | |
discord_client.run(os.getenv('DISCORD_TOKEN')) | |