Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
from googleapiclient.discovery import build | |
import plotly.express as px | |
import base64 | |
import numpy as np | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.cluster import KMeans | |
from datetime import datetime, timedelta | |
import os | |
from huggingface_hub import InferenceApi # Hugging Face Hub API ์ฌ์ฉ | |
# ์ฌ๊ธฐ์ YouTube API ํค๋ฅผ ์ ๋ ฅํ์ธ์ | |
YOUTUBE_API_KEY = "AIzaSyDNZX0U6XNmKRj-Lu6ghc2E2Q1KVmkK4ps" | |
def create_client(model_name): | |
token = os.getenv("HF_TOKEN") | |
return InferenceApi(repo_id=model_name, token=token) | |
client = create_client("CohereForAI/c4ai-command-r-plus") | |
def get_video_stats(video_id): | |
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
video_response = youtube.videos().list( | |
part="snippet,statistics", | |
id=video_id | |
).execute() | |
video = video_response["items"][0] | |
title = video["snippet"]["title"] | |
channel_id = video["snippet"]["channelId"] | |
publish_time = video["snippet"]["publishedAt"] | |
view_count = int(video["statistics"].get("viewCount", 0)) | |
like_count = int(video["statistics"].get("likeCount", 0)) | |
comment_count = int(video["statistics"].get("commentCount", 0)) | |
return { | |
"Video ID": video_id, | |
"Title": title, | |
"publishedAt": publish_time, | |
"Channel ID": channel_id, | |
"View Count": view_count, | |
"Like Count": like_count, | |
"Comment Count": comment_count | |
} | |
def get_channel_stats(channel_id): | |
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
channel_response = youtube.channels().list( | |
part="statistics", | |
id=channel_id | |
).execute() | |
if channel_response["items"]: | |
channel = channel_response["items"][0] | |
subscriber_count = int(channel["statistics"]["subscriberCount"]) | |
else: | |
subscriber_count = 0 | |
return subscriber_count | |
def get_video_data(query, max_results, published_after, published_before): | |
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
video_ids = [] | |
next_page_token = None | |
while len(video_ids) < max_results: | |
search_response = youtube.search().list( | |
q=query, | |
type="video", | |
part="id", | |
maxResults=50, | |
pageToken=next_page_token, | |
order="viewCount", | |
publishedAfter=published_after, | |
publishedBefore=published_before | |
).execute() | |
video_ids.extend([item["id"]["videoId"] for item in search_response["items"]]) | |
next_page_token = search_response.get("nextPageToken") | |
if not next_page_token: | |
break | |
video_ids = video_ids[:max_results] | |
video_stats = [] | |
for video_id in video_ids: | |
stats = get_video_stats(video_id) | |
channel_id = stats["Channel ID"] | |
subscriber_count = get_channel_stats(channel_id) | |
stats["Subscriber Count"] = subscriber_count | |
video_stats.append(stats) | |
video_stats_df = pd.DataFrame(video_stats) | |
return video_stats_df | |
def download_csv(df, filename): | |
csv = df.to_csv(index=False) | |
b64 = base64.b64encode(csv.encode()).decode() | |
href = f'<a href="data:file/csv;base64,{b64}" download="{filename}.csv">Download {filename} CSV</a>' | |
return href | |
def visualize_video_ranking(video_stats_df): | |
video_stats_df["Active_Index"] = video_stats_df["View Count"] / video_stats_df["Subscriber Count"] | |
csv_download_link = download_csv(video_stats_df, "video_stats") | |
fig = px.bar(video_stats_df, x="Video ID", y="Active_Index", color="View Count", | |
labels={"Video ID": "Video ID", "Active_Index": "Active_Index"}, | |
title="Video Active Index") | |
fig.update_layout(height=500, width=500) | |
return video_stats_df, fig, csv_download_link | |
def analyze_titles(video_stats_df, n_clusters=5): | |
titles = video_stats_df['Title'].tolist() | |
vectorizer = TfidfVectorizer() | |
tfidf_matrix = vectorizer.fit_transform(titles) | |
kmeans = KMeans(n_clusters=n_clusters, random_state=42) | |
kmeans.fit(tfidf_matrix) | |
labels = kmeans.labels_ | |
video_stats_df["Cluster"] = labels | |
cluster_summaries = [] | |
for i in range(n_clusters): | |
cluster_titles = video_stats_df[video_stats_df["Cluster"] == i]['Title'].tolist() | |
cluster_text = ' '.join(cluster_titles) | |
summary = summarize_cluster(cluster_text, i) | |
cluster_summaries.append(summary) | |
cluster_summary_df = pd.DataFrame({'Cluster': range(n_clusters), 'Summary': cluster_summaries}) | |
return cluster_summary_df | |
def summarize_cluster(cluster_text, cluster_num): | |
response = client(inputs=cluster_text) | |
summary = response[0]["generated_text"].strip() | |
return summary | |
def main(query, max_results, period, page, n_clusters=5): | |
if query: | |
# ๊ธฐ๊ฐ ์ค์ | |
now = datetime.utcnow() | |
published_before = now.isoformat("T") + "Z" | |
if period == "1์ฃผ์ผ": | |
published_after = (now - timedelta(days=7)).isoformat("T") + "Z" | |
elif period == "1๊ฐ์": | |
published_after = (now - timedelta(days=30)).isoformat("T") + "Z" | |
elif period == "3๊ฐ์": | |
published_after = (now - timedelta(days=90)).isoformat("T") + "Z" | |
else: | |
published_after = (now - timedelta(days=30)).isoformat("T") + "Z" # ๊ธฐ๋ณธ๊ฐ 1๊ฐ์ | |
video_stats_df = get_video_data(query, max_results, published_after, published_before) | |
if page == "Video Ranking": | |
video_stats_df, fig, csv_download_link = visualize_video_ranking(video_stats_df) | |
return video_stats_df, fig, csv_download_link | |
elif page == "Title Analysis": | |
cluster_summary_df = analyze_titles(video_stats_df, n_clusters) | |
return cluster_summary_df, None, None | |
iface = gr.Interface( | |
fn=main, | |
inputs=[ | |
gr.components.Textbox(label="๊ฒ์ ์ฟผ๋ฆฌ"), | |
gr.components.Slider(minimum=1, maximum=1000, value=5, label="์ต๋ ๊ฒฐ๊ณผ ์"), | |
gr.components.Dropdown(["1์ฃผ์ผ", "1๊ฐ์", "3๊ฐ์"], label="๊ธฐ๊ฐ"), | |
gr.components.Dropdown(["Video Ranking", "Title Analysis"], label="ํ์ด์ง"), | |
gr.components.Slider(minimum=2, maximum=10, value=5, label="ํด๋ฌ์คํฐ ์") | |
], | |
outputs=[ | |
gr.components.Dataframe(label="๊ฒฐ๊ณผ"), | |
gr.components.Plot(label="๊ทธ๋ํ"), | |
gr.components.HTML(label="CSV ๋ค์ด๋ก๋ ๋งํฌ") | |
], | |
live=False, | |
title="YouTube ๋ถ์ ๋๊ตฌ" | |
) | |
if __name__ == "__main__": | |
iface.launch() | |