Youtube_Anal_v1 / app.py
AIRider's picture
Update app.py
f5fe94e verified
import gradio as gr
import pandas as pd
from googleapiclient.discovery import build
import plotly.express as px
import base64
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from datetime import datetime, timedelta
import os
from huggingface_hub import InferenceClient # Hugging Face Hub API μ‚¬μš©
# 여기에 YouTube API ν‚€λ₯Ό μž…λ ₯ν•˜μ„Έμš”
YOUTUBE_API_KEY = "AIzaSyA9DEIHCYexeF2gSFW8cF6E3JTu9BhYxLc"
def create_client(model_name):
token = os.getenv("HF_TOKEN")
return InferenceClient(model=model_name, token=token)
client = create_client("CohereForAI/c4ai-command-r-plus")
def get_video_stats(video_id):
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
video_response = youtube.videos().list(
part="snippet,statistics",
id=video_id
).execute()
video = video_response["items"][0]
title = video["snippet"]["title"]
channel_id = video["snippet"]["channelId"]
publish_time = video["snippet"]["publishedAt"]
view_count = int(video["statistics"].get("viewCount", 0))
like_count = int(video["statistics"].get("likeCount", 0))
comment_count = int(video["statistics"].get("commentCount", 0))
return {
"λ™μ˜μƒ ID": video_id,
"제λͺ©": title,
"κ²Œμ‹œ μ‹œκ°„": publish_time,
"채널 ID": channel_id,
"쑰회수": view_count,
"μ’‹μ•„μš” 수": like_count,
"λŒ“κΈ€ 수": comment_count
}
def get_channel_stats(channel_id):
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
channel_response = youtube.channels().list(
part="statistics",
id=channel_id
).execute()
if channel_response["items"]:
channel = channel_response["items"][0]
subscriber_count = int(channel["statistics"]["subscriberCount"])
else:
subscriber_count = 0
return subscriber_count
def get_video_data(query, max_results, published_after, published_before):
youtube = build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
video_ids = []
next_page_token = None
while len(video_ids) < max_results:
search_response = youtube.search().list(
q=query,
type="video",
part="id",
maxResults=50,
pageToken=next_page_token,
order="viewCount",
publishedAfter=published_after,
publishedBefore=published_before
).execute()
video_ids.extend([item["id"]["videoId"] for item in search_response["items"]])
next_page_token = search_response.get("nextPageToken")
if not next_page_token:
break
video_ids = video_ids[:max_results]
video_stats = []
for video_id in video_ids:
stats = get_video_stats(video_id)
channel_id = stats["채널 ID"]
subscriber_count = get_channel_stats(channel_id)
stats["κ΅¬λ…μž 수"] = subscriber_count
video_stats.append(stats)
video_stats_df = pd.DataFrame(video_stats)
return video_stats_df
def download_csv(df, filename):
csv = df.to_csv(index=False)
b64 = base64.b64encode(csv.encode()).decode()
href = f'<a href="data:file/csv;base64,{b64}" download="{filename}.csv">λ‹€μš΄λ‘œλ“œ {filename} CSV</a>'
return href
def visualize_video_ranking(video_stats_df):
video_stats_df["ν™œμ„± μ§€μˆ˜"] = video_stats_df["쑰회수"] / video_stats_df["κ΅¬λ…μž 수"]
csv_download_link = download_csv(video_stats_df, "video_stats")
fig = px.bar(video_stats_df, x="λ™μ˜μƒ ID", y="ν™œμ„± μ§€μˆ˜", color="쑰회수",
labels={"λ™μ˜μƒ ID": "λ™μ˜μƒ ID", "ν™œμ„± μ§€μˆ˜": "ν™œμ„± μ§€μˆ˜"},
title="λ™μ˜μƒ ν™œμ„± μ§€μˆ˜")
fig.update_layout(height=500, width=500)
return video_stats_df, fig, csv_download_link
def analyze_titles(video_stats_df, n_clusters=5):
titles = video_stats_df['제λͺ©'].tolist()
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(titles)
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
kmeans.fit(tfidf_matrix)
labels = kmeans.labels_
video_stats_df["ν΄λŸ¬μŠ€ν„°"] = labels
cluster_summaries = []
for i in range(n_clusters):
cluster_titles = video_stats_df[video_stats_df["ν΄λŸ¬μŠ€ν„°"] == i]['제λͺ©'].tolist()
cluster_text = ' '.join(cluster_titles)
summary = summarize_cluster(cluster_text, i)
cluster_summaries.append(summary)
cluster_summary_df = pd.DataFrame({'ν΄λŸ¬μŠ€ν„°': range(n_clusters), 'μš”μ•½': cluster_summaries})
return cluster_summary_df
def summarize_cluster(cluster_text, cluster_num):
prompt = f"λ‹€μŒ λ™μ˜μƒμ„ λΆ„μ„ν•˜μ—¬ μš”μ•½ν•˜κ³ , 500자 μ΄λ‚΄λ‘œ λ™μ˜μƒμ˜ νŠΉμ§• 및 인기 μš”μΈμ„ μ„€λͺ…ν•΄μ£Όμ„Έμš”: {cluster_text}"
response = client.generate(inputs=prompt)
summary = response.generated_text.strip()
return summary
def main(query, max_results, period, page, n_clusters=5):
if query:
# κΈ°κ°„ μ„€μ •
now = datetime.utcnow()
published_before = now.isoformat("T") + "Z"
if period == "1주일":
published_after = (now - timedelta(days=7)).isoformat("T") + "Z"
elif period == "1κ°œμ›”":
published_after = (now - timedelta(days=30)).isoformat("T") + "Z"
elif period == "3κ°œμ›”":
published_after = (now - timedelta(days=90)).isoformat("T") + "Z"
else:
published_after = (now - timedelta(days=30)).isoformat("T") + "Z" # κΈ°λ³Έκ°’ 1κ°œμ›”
video_stats_df = get_video_data(query, max_results, published_after, published_before)
if page == "Video Ranking":
video_stats_df, fig, csv_download_link = visualize_video_ranking(video_stats_df)
return video_stats_df, fig, csv_download_link
elif page == "Title Analysis":
cluster_summary_df = analyze_titles(video_stats_df, n_clusters)
return cluster_summary_df, None, None
iface = gr.Interface(
fn=main,
inputs=[
gr.components.Textbox(label="검색 쿼리"),
gr.components.Number(label="μ΅œλŒ€ κ²°κ³Ό 수", value=5, precision=0, minimum=1, maximum=1000),
gr.components.Dropdown(["1주일", "1κ°œμ›”", "3κ°œμ›”"], label="κΈ°κ°„"),
gr.components.Dropdown(["Video Ranking", "Title Analysis"], label="νŽ˜μ΄μ§€"),
gr.components.Number(label="ν΄λŸ¬μŠ€ν„° 수", value=5, precision=0, minimum=2, maximum=10)
],
outputs=[
gr.components.Dataframe(label="κ²°κ³Ό"),
gr.components.Plot(label="κ·Έλž˜ν”„"),
gr.components.HTML(label="CSV λ‹€μš΄λ‘œλ“œ 링크")
],
live=False,
title="YouTube 뢄석 도ꡬ"
)
if __name__ == "__main__":
iface.launch()