Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
from googleapiclient.discovery import build | |
import plotly.express as px | |
import base64 | |
import numpy as np | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.cluster import KMeans | |
from datetime import datetime, timedelta | |
import os | |
from huggingface_hub import InferenceClient # Hugging Face Hub API μ¬μ© | |
# μ¬κΈ°μ YouTube API ν€λ₯Ό μ λ ₯νμΈμ | |
YOUTUBE_API_KEY = "AIzaSyA9DEIHCYexeF2gSFW8cF6E3JTu9BhYxLc" | |
def create_client(model_name): | |
token = os.getenv("HF_TOKEN") | |
return InferenceClient(model=model_name, token=token) | |
client = create_client("CohereForAI/c4ai-command-r-plus") | |
def get_video_stats(video_id): | |
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
video_response = youtube.videos().list( | |
part="snippet,statistics", | |
id=video_id | |
).execute() | |
video = video_response["items"][0] | |
title = video["snippet"]["title"] | |
channel_id = video["snippet"]["channelId"] | |
publish_time = video["snippet"]["publishedAt"] | |
view_count = int(video["statistics"].get("viewCount", 0)) | |
like_count = int(video["statistics"].get("likeCount", 0)) | |
comment_count = int(video["statistics"].get("commentCount", 0)) | |
return { | |
"λμμ ID": video_id, | |
"μ λͺ©": title, | |
"κ²μ μκ°": publish_time, | |
"μ±λ ID": channel_id, | |
"μ‘°νμ": view_count, | |
"μ’μμ μ": like_count, | |
"λκΈ μ": comment_count | |
} | |
def get_channel_stats(channel_id): | |
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
channel_response = youtube.channels().list( | |
part="statistics", | |
id=channel_id | |
).execute() | |
if channel_response["items"]: | |
channel = channel_response["items"][0] | |
subscriber_count = int(channel["statistics"]["subscriberCount"]) | |
else: | |
subscriber_count = 0 | |
return subscriber_count | |
def get_video_data(query, max_results, published_after, published_before): | |
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
video_ids = [] | |
next_page_token = None | |
while len(video_ids) < max_results: | |
search_response = youtube.search().list( | |
q=query, | |
type="video", | |
part="id", | |
maxResults=50, | |
pageToken=next_page_token, | |
order="viewCount", | |
publishedAfter=published_after, | |
publishedBefore=published_before | |
).execute() | |
video_ids.extend([item["id"]["videoId"] for item in search_response["items"]]) | |
next_page_token = search_response.get("nextPageToken") | |
if not next_page_token: | |
break | |
video_ids = video_ids[:max_results] | |
video_stats = [] | |
for video_id in video_ids: | |
stats = get_video_stats(video_id) | |
channel_id = stats["μ±λ ID"] | |
subscriber_count = get_channel_stats(channel_id) | |
stats["ꡬλ μ μ"] = subscriber_count | |
video_stats.append(stats) | |
video_stats_df = pd.DataFrame(video_stats) | |
return video_stats_df | |
def download_csv(df, filename): | |
csv = df.to_csv(index=False) | |
b64 = base64.b64encode(csv.encode()).decode() | |
href = f'<a href="data:file/csv;base64,{b64}" download="{filename}.csv">λ€μ΄λ‘λ {filename} CSV</a>' | |
return href | |
def visualize_video_ranking(video_stats_df): | |
video_stats_df["νμ± μ§μ"] = video_stats_df["μ‘°νμ"] / video_stats_df["ꡬλ μ μ"] | |
csv_download_link = download_csv(video_stats_df, "video_stats") | |
fig = px.bar(video_stats_df, x="λμμ ID", y="νμ± μ§μ", color="μ‘°νμ", | |
labels={"λμμ ID": "λμμ ID", "νμ± μ§μ": "νμ± μ§μ"}, | |
title="λμμ νμ± μ§μ") | |
fig.update_layout(height=500, width=500) | |
return video_stats_df, fig, csv_download_link | |
def analyze_titles(video_stats_df, n_clusters=5): | |
titles = video_stats_df['μ λͺ©'].tolist() | |
vectorizer = TfidfVectorizer() | |
tfidf_matrix = vectorizer.fit_transform(titles) | |
kmeans = KMeans(n_clusters=n_clusters, random_state=42) | |
kmeans.fit(tfidf_matrix) | |
labels = kmeans.labels_ | |
video_stats_df["ν΄λ¬μ€ν°"] = labels | |
cluster_summaries = [] | |
for i in range(n_clusters): | |
cluster_titles = video_stats_df[video_stats_df["ν΄λ¬μ€ν°"] == i]['μ λͺ©'].tolist() | |
cluster_text = ' '.join(cluster_titles) | |
summary = summarize_cluster(cluster_text, i) | |
cluster_summaries.append(summary) | |
cluster_summary_df = pd.DataFrame({'ν΄λ¬μ€ν°': range(n_clusters), 'μμ½': cluster_summaries}) | |
return cluster_summary_df | |
def summarize_cluster(cluster_text, cluster_num): | |
prompt = f"λ€μ λμμμ λΆμνμ¬ μμ½νκ³ , 500μ μ΄λ΄λ‘ λμμμ νΉμ§ λ° μΈκΈ° μμΈμ μ€λͺ ν΄μ£ΌμΈμ: {cluster_text}" | |
response = client.generate(inputs=prompt) | |
summary = response.generated_text.strip() | |
return summary | |
def main(query, max_results, period, page, n_clusters=5): | |
if query: | |
# κΈ°κ° μ€μ | |
now = datetime.utcnow() | |
published_before = now.isoformat("T") + "Z" | |
if period == "1μ£ΌμΌ": | |
published_after = (now - timedelta(days=7)).isoformat("T") + "Z" | |
elif period == "1κ°μ": | |
published_after = (now - timedelta(days=30)).isoformat("T") + "Z" | |
elif period == "3κ°μ": | |
published_after = (now - timedelta(days=90)).isoformat("T") + "Z" | |
else: | |
published_after = (now - timedelta(days=30)).isoformat("T") + "Z" # κΈ°λ³Έκ° 1κ°μ | |
video_stats_df = get_video_data(query, max_results, published_after, published_before) | |
if page == "Video Ranking": | |
video_stats_df, fig, csv_download_link = visualize_video_ranking(video_stats_df) | |
return video_stats_df, fig, csv_download_link | |
elif page == "Title Analysis": | |
cluster_summary_df = analyze_titles(video_stats_df, n_clusters) | |
return cluster_summary_df, None, None | |
iface = gr.Interface( | |
fn=main, | |
inputs=[ | |
gr.components.Textbox(label="κ²μ 쿼리"), | |
gr.components.Number(label="μ΅λ κ²°κ³Ό μ", value=5, precision=0, minimum=1, maximum=1000), | |
gr.components.Dropdown(["1μ£ΌμΌ", "1κ°μ", "3κ°μ"], label="κΈ°κ°"), | |
gr.components.Dropdown(["Video Ranking", "Title Analysis"], label="νμ΄μ§"), | |
gr.components.Number(label="ν΄λ¬μ€ν° μ", value=5, precision=0, minimum=2, maximum=10) | |
], | |
outputs=[ | |
gr.components.Dataframe(label="κ²°κ³Ό"), | |
gr.components.Plot(label="κ·Έλν"), | |
gr.components.HTML(label="CSV λ€μ΄λ‘λ λ§ν¬") | |
], | |
live=False, | |
title="YouTube λΆμ λꡬ" | |
) | |
if __name__ == "__main__": | |
iface.launch() | |