import gradio as gr import pandas as pd from googleapiclient.discovery import build import plotly.express as px import base64 import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans from datetime import datetime, timedelta import os from huggingface_hub import InferenceClient # Hugging Face Hub API 사용 # 여기에 YouTube API 키를 입력하세요 YOUTUBE_API_KEY = "AIzaSyA9DEIHCYexeF2gSFW8cF6E3JTu9BhYxLc" def create_client(model_name): token = os.getenv("HF_TOKEN") return InferenceClient(model=model_name, token=token) client = create_client("CohereForAI/c4ai-command-r-plus") def get_video_stats(video_id): youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) video_response = youtube.videos().list( part="snippet,statistics", id=video_id ).execute() video = video_response["items"][0] title = video["snippet"]["title"] channel_id = video["snippet"]["channelId"] publish_time = video["snippet"]["publishedAt"] view_count = int(video["statistics"].get("viewCount", 0)) like_count = int(video["statistics"].get("likeCount", 0)) comment_count = int(video["statistics"].get("commentCount", 0)) return { "동영상 ID": video_id, "제목": title, "게시 시간": publish_time, "채널 ID": channel_id, "조회수": view_count, "좋아요 수": like_count, "댓글 수": comment_count } def get_channel_stats(channel_id): youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) channel_response = youtube.channels().list( part="statistics", id=channel_id ).execute() if channel_response["items"]: channel = channel_response["items"][0] subscriber_count = int(channel["statistics"]["subscriberCount"]) else: subscriber_count = 0 return subscriber_count def get_video_data(query, max_results, published_after, published_before): youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) video_ids = [] next_page_token = None while len(video_ids) < max_results: search_response = youtube.search().list( q=query, type="video", part="id", maxResults=50, pageToken=next_page_token, order="viewCount", publishedAfter=published_after, publishedBefore=published_before ).execute() video_ids.extend([item["id"]["videoId"] for item in search_response["items"]]) next_page_token = search_response.get("nextPageToken") if not next_page_token: break video_ids = video_ids[:max_results] video_stats = [] for video_id in video_ids: stats = get_video_stats(video_id) channel_id = stats["채널 ID"] subscriber_count = get_channel_stats(channel_id) stats["구독자 수"] = subscriber_count video_stats.append(stats) video_stats_df = pd.DataFrame(video_stats) return video_stats_df def download_csv(df, filename): csv = df.to_csv(index=False) b64 = base64.b64encode(csv.encode()).decode() href = f'다운로드 {filename} CSV' return href def visualize_video_ranking(video_stats_df): video_stats_df["활성 지수"] = video_stats_df["조회수"] / video_stats_df["구독자 수"] csv_download_link = download_csv(video_stats_df, "video_stats") fig = px.bar(video_stats_df, x="동영상 ID", y="활성 지수", color="조회수", labels={"동영상 ID": "동영상 ID", "활성 지수": "활성 지수"}, title="동영상 활성 지수") fig.update_layout(height=500, width=500) return video_stats_df, fig, csv_download_link def analyze_titles(video_stats_df, n_clusters=5): titles = video_stats_df['제목'].tolist() vectorizer = TfidfVectorizer() tfidf_matrix = vectorizer.fit_transform(titles) kmeans = KMeans(n_clusters=n_clusters, random_state=42) kmeans.fit(tfidf_matrix) labels = kmeans.labels_ video_stats_df["클러스터"] = labels cluster_summaries = [] for i in range(n_clusters): cluster_titles = video_stats_df[video_stats_df["클러스터"] == i]['제목'].tolist() cluster_text = ' '.join(cluster_titles) summary = summarize_cluster(cluster_text, i) cluster_summaries.append(summary) cluster_summary_df = pd.DataFrame({'클러스터': range(n_clusters), '요약': cluster_summaries}) return cluster_summary_df def summarize_cluster(cluster_text, cluster_num): prompt = f"다음 동영상을 분석하여 요약하고, 500자 이내로 동영상의 특징 및 인기 요인을 설명해주세요: {cluster_text}" response = client.generate(inputs=prompt) summary = response.generated_text.strip() return summary def main(query, max_results, period, page, n_clusters=5): if query: # 기간 설정 now = datetime.utcnow() published_before = now.isoformat("T") + "Z" if period == "1주일": published_after = (now - timedelta(days=7)).isoformat("T") + "Z" elif period == "1개월": published_after = (now - timedelta(days=30)).isoformat("T") + "Z" elif period == "3개월": published_after = (now - timedelta(days=90)).isoformat("T") + "Z" else: published_after = (now - timedelta(days=30)).isoformat("T") + "Z" # 기본값 1개월 video_stats_df = get_video_data(query, max_results, published_after, published_before) if page == "Video Ranking": video_stats_df, fig, csv_download_link = visualize_video_ranking(video_stats_df) return video_stats_df, fig, csv_download_link elif page == "Title Analysis": cluster_summary_df = analyze_titles(video_stats_df, n_clusters) return cluster_summary_df, None, None iface = gr.Interface( fn=main, inputs=[ gr.components.Textbox(label="검색 쿼리"), gr.components.Number(label="최대 결과 수", value=5, precision=0, minimum=1, maximum=1000), gr.components.Dropdown(["1주일", "1개월", "3개월"], label="기간"), gr.components.Dropdown(["Video Ranking", "Title Analysis"], label="페이지"), gr.components.Number(label="클러스터 수", value=5, precision=0, minimum=2, maximum=10) ], outputs=[ gr.components.Dataframe(label="결과"), gr.components.Plot(label="그래프"), gr.components.HTML(label="CSV 다운로드 링크") ], live=False, title="YouTube 분석 도구" ) if __name__ == "__main__": iface.launch()