N_B_analysis-3 / app.py
Kims12's picture
Update app.py
989a45c verified
import gradio as gr
import requests
from bs4 import BeautifulSoup
import urllib.parse # iframe ๊ฒฝ๋กœ ๋ณด์ •์„ ์œ„ํ•œ ๋ชจ๋“ˆ
import re
import logging
import tempfile
import pandas as pd
import mecab # pythonโ€‘mecabโ€‘ko ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉ
import os
import time
import hmac
import hashlib
import base64
# ๋””๋ฒ„๊น…(๋กœ๊ทธ)์šฉ ํ•จ์ˆ˜
def debug_log(message: str):
print(f"[DEBUG] {message}")
# [๊ธฐ๋ณธ์ฝ”๋“œ] - ๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ์Šคํฌ๋ž˜ํ•‘ ๊ธฐ๋Šฅ
def scrape_naver_blog(url: str) -> str:
debug_log("scrape_naver_blog ํ•จ์ˆ˜ ์‹œ์ž‘")
debug_log(f"์š”์ฒญ๋ฐ›์€ URL: {url}")
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/96.0.4664.110 Safari/537.36"
)
}
try:
# 1) ๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ '๋ฉ”์ธ' ํŽ˜์ด์ง€ ์š”์ฒญ
response = requests.get(url, headers=headers)
debug_log("HTTP GET ์š”์ฒญ(๋ฉ”์ธ ํŽ˜์ด์ง€) ์™„๋ฃŒ")
if response.status_code != 200:
debug_log(f"์š”์ฒญ ์‹คํŒจ, ์ƒํƒœ์ฝ”๋“œ: {response.status_code}")
return f"์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ์ƒํƒœ์ฝ”๋“œ: {response.status_code}"
# 2) ๋ฉ”์ธ ํŽ˜์ด์ง€ ํŒŒ์‹ฑ
soup = BeautifulSoup(response.text, "html.parser")
debug_log("HTML ํŒŒ์‹ฑ(๋ฉ”์ธ ํŽ˜์ด์ง€) ์™„๋ฃŒ")
# 3) iframe ํƒœ๊ทธ ์ฐพ๊ธฐ
iframe = soup.select_one("iframe#mainFrame")
if not iframe:
debug_log("iframe#mainFrame ํƒœ๊ทธ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.")
return "๋ณธ๋ฌธ iframe์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
iframe_src = iframe.get("src")
if not iframe_src:
debug_log("iframe src๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.")
return "๋ณธ๋ฌธ iframe์˜ src๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
# 4) iframe src ๋ณด์ • (์ ˆ๋Œ€๊ฒฝ๋กœ ์ฒ˜๋ฆฌ)
parsed_iframe_url = urllib.parse.urljoin(url, iframe_src)
debug_log(f"iframe ํŽ˜์ด์ง€ ์š”์ฒญ URL: {parsed_iframe_url}")
# 5) iframe ํŽ˜์ด์ง€ ์š”์ฒญ ๋ฐ ํŒŒ์‹ฑ
iframe_response = requests.get(parsed_iframe_url, headers=headers)
debug_log("HTTP GET ์š”์ฒญ(iframe ํŽ˜์ด์ง€) ์™„๋ฃŒ")
if iframe_response.status_code != 200:
debug_log(f"iframe ์š”์ฒญ ์‹คํŒจ, ์ƒํƒœ์ฝ”๋“œ: {iframe_response.status_code}")
return f"iframe์—์„œ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค. ์ƒํƒœ์ฝ”๋“œ: {iframe_response.status_code}"
iframe_soup = BeautifulSoup(iframe_response.text, "html.parser")
debug_log("HTML ํŒŒ์‹ฑ(iframe ํŽ˜์ด์ง€) ์™„๋ฃŒ")
# 6) ์ œ๋ชฉ๊ณผ ๋ณธ๋ฌธ ์ถ”์ถœ
title_div = iframe_soup.select_one('.se-module.se-module-text.se-title-text')
title = title_div.get_text(strip=True) if title_div else "์ œ๋ชฉ์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
debug_log(f"์ถ”์ถœ๋œ ์ œ๋ชฉ: {title}")
content_div = iframe_soup.select_one('.se-main-container')
if content_div:
content = content_div.get_text("\n", strip=True)
else:
content = "๋ณธ๋ฌธ์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค."
debug_log("๋ณธ๋ฌธ ์ถ”์ถœ ์™„๋ฃŒ")
result = f"[์ œ๋ชฉ]\n{title}\n\n[๋ณธ๋ฌธ]\n{content}"
debug_log("์ œ๋ชฉ๊ณผ ๋ณธ๋ฌธ์„ ํ•ฉ์ณ ๋ฐ˜ํ™˜ ์ค€๋น„ ์™„๋ฃŒ")
return result
except Exception as e:
debug_log(f"์—๋Ÿฌ ๋ฐœ์ƒ: {str(e)}")
return f"์Šคํฌ๋ž˜ํ•‘ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค: {str(e)}"
# [์ฐธ์กฐ์ฝ”๋“œ-1] ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ธฐ๋Šฅ
def analyze_text(text: str):
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.debug("์›๋ณธ ํ…์ŠคํŠธ: %s", text)
# 1. ํ•œ๊ตญ์–ด๋งŒ ๋‚จ๊ธฐ๊ธฐ (๊ณต๋ฐฑ, ์˜์–ด, ๊ธฐํ˜ธ ๋“ฑ ์ œ๊ฑฐ)
filtered_text = re.sub(r'[^๊ฐ€-ํžฃ]', '', text)
logger.debug("ํ•„ํ„ฐ๋ง๋œ ํ…์ŠคํŠธ (ํ•œ๊ตญ์–ด๋งŒ, ๊ณต๋ฐฑ ์ œ๊ฑฐ): %s", filtered_text)
if not filtered_text:
logger.debug("์œ ํšจํ•œ ํ•œ๊ตญ์–ด ํ…์ŠคํŠธ๊ฐ€ ์—†์Œ.")
return pd.DataFrame(columns=["๋‹จ์–ด", "๋นˆ๋„์ˆ˜"]), ""
# 2. Mecab์„ ์ด์šฉํ•œ ํ˜•ํƒœ์†Œ ๋ถ„์„ (๋ช…์‚ฌ์™€ ๋ณตํ•ฉ๋ช…์‚ฌ๋งŒ ์ถ”์ถœ)
mecab_instance = mecab.MeCab()
tokens = mecab_instance.pos(filtered_text)
logger.debug("ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ฒฐ๊ณผ: %s", tokens)
freq = {}
for word, pos in tokens:
if word and word.strip():
if pos.startswith("NN"):
freq[word] = freq.get(word, 0) + 1
logger.debug("๋‹จ์–ด: %s, ํ’ˆ์‚ฌ: %s, ํ˜„์žฌ ๋นˆ๋„: %d", word, pos, freq[word])
# 3. ๋นˆ๋„์ˆ˜๋ฅผ ๋‚ด๋ฆผ์ฐจ์ˆœ ์ •๋ ฌ
sorted_freq = sorted(freq.items(), key=lambda x: x[1], reverse=True)
logger.debug("๋‚ด๋ฆผ์ฐจ์ˆœ ์ •๋ ฌ๋œ ๋‹จ์–ด ๋นˆ๋„: %s", sorted_freq)
# 4. ๊ฒฐ๊ณผ DataFrame ์ƒ์„ฑ
df = pd.DataFrame(sorted_freq, columns=["๋‹จ์–ด", "๋นˆ๋„์ˆ˜"])
logger.debug("๊ฒฐ๊ณผ DataFrame ์ƒ์„ฑ๋จ, shape: %s", df.shape)
# 5. Excel ํŒŒ์ผ ์ƒ์„ฑ (์ž„์‹œ ํŒŒ์ผ)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx")
df.to_excel(temp_file.name, index=False, engine='openpyxl')
temp_file.close()
logger.debug("Excel ํŒŒ์ผ ์ƒ์„ฑ๋จ: %s", temp_file.name)
return df, temp_file.name
# [์ฐธ์กฐ์ฝ”๋“œ-2] ๋„ค์ด๋ฒ„ ๊ด‘๊ณ  API ๋ฐ ๊ฒ€์ƒ‰๋Ÿ‰/๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜ ์กฐํšŒ ๊ธฐ๋Šฅ
def generate_signature(timestamp, method, uri, secret_key):
message = f"{timestamp}.{method}.{uri}"
digest = hmac.new(secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).digest()
return base64.b64encode(digest).decode()
def get_header(method, uri, api_key, secret_key, customer_id):
timestamp = str(round(time.time() * 1000))
signature = generate_signature(timestamp, method, uri, secret_key)
return {
"Content-Type": "application/json; charset=UTF-8",
"X-Timestamp": timestamp,
"X-API-KEY": api_key,
"X-Customer": str(customer_id),
"X-Signature": signature
}
def fetch_related_keywords(keyword):
debug_log(f"fetch_related_keywords ํ˜ธ์ถœ, ํ‚ค์›Œ๋“œ: {keyword}")
API_KEY = os.environ["NAVER_API_KEY"]
SECRET_KEY = os.environ["NAVER_SECRET_KEY"]
CUSTOMER_ID = os.environ["NAVER_CUSTOMER_ID"]
BASE_URL = "https://api.naver.com"
uri = "/keywordstool"
method = "GET"
headers = get_header(method, uri, API_KEY, SECRET_KEY, CUSTOMER_ID)
params = {
"hintKeywords": [keyword],
"showDetail": "1"
}
response = requests.get(BASE_URL + uri, params=params, headers=headers)
data = response.json()
if "keywordList" not in data:
return pd.DataFrame()
df = pd.DataFrame(data["keywordList"])
if len(df) > 100:
df = df.head(100)
def parse_count(x):
try:
return int(str(x).replace(",", ""))
except:
return 0
df["PC์›”๊ฒ€์ƒ‰๋Ÿ‰"] = df["monthlyPcQcCnt"].apply(parse_count)
df["๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰"] = df["monthlyMobileQcCnt"].apply(parse_count)
df["ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰"] = df["PC์›”๊ฒ€์ƒ‰๋Ÿ‰"] + df["๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰"]
df.rename(columns={"relKeyword": "์ •๋ณดํ‚ค์›Œ๋“œ"}, inplace=True)
result_df = df[["์ •๋ณดํ‚ค์›Œ๋“œ", "PC์›”๊ฒ€์ƒ‰๋Ÿ‰", "๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰", "ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰"]]
debug_log("fetch_related_keywords ์™„๋ฃŒ")
return result_df
def fetch_blog_count(keyword):
debug_log(f"fetch_blog_count ํ˜ธ์ถœ, ํ‚ค์›Œ๋“œ: {keyword}")
client_id = os.environ["NAVER_SEARCH_CLIENT_ID"]
client_secret = os.environ["NAVER_SEARCH_CLIENT_SECRET"]
url = "https://openapi.naver.com/v1/search/blog.json"
headers = {
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret
}
params = {"query": keyword, "display": 1}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
debug_log(f"fetch_blog_count ๊ฒฐ๊ณผ: {data.get('total', 0)}")
return data.get("total", 0)
else:
debug_log(f"fetch_blog_count ์˜ค๋ฅ˜, ์ƒํƒœ์ฝ”๋“œ: {response.status_code}")
return 0
def create_excel_file(df):
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp:
excel_path = tmp.name
df.to_excel(excel_path, index=False)
debug_log(f"Excel ํŒŒ์ผ ์ƒ์„ฑ๋จ: {excel_path}")
return excel_path
def process_keyword(keywords: str, include_related: bool):
debug_log(f"process_keyword ํ˜ธ์ถœ, ํ‚ค์›Œ๋“œ๋“ค: {keywords}, ์—ฐ๊ด€๊ฒ€์ƒ‰์–ด ํฌํ•จ: {include_related}")
input_keywords = [k.strip() for k in keywords.splitlines() if k.strip()]
result_dfs = []
for idx, kw in enumerate(input_keywords):
df_kw = fetch_related_keywords(kw)
if df_kw.empty:
continue
row_kw = df_kw[df_kw["์ •๋ณดํ‚ค์›Œ๋“œ"] == kw]
if not row_kw.empty:
result_dfs.append(row_kw)
else:
result_dfs.append(df_kw.head(1))
if include_related and idx == 0:
df_related = df_kw[df_kw["์ •๋ณดํ‚ค์›Œ๋“œ"] != kw]
if not df_related.empty:
result_dfs.append(df_related)
if result_dfs:
result_df = pd.concat(result_dfs, ignore_index=True)
result_df.drop_duplicates(subset=["์ •๋ณดํ‚ค์›Œ๋“œ"], inplace=True)
else:
result_df = pd.DataFrame(columns=["์ •๋ณดํ‚ค์›Œ๋“œ", "PC์›”๊ฒ€์ƒ‰๋Ÿ‰", "๋ชจ๋ฐ”์ผ์›”๊ฒ€์ƒ‰๋Ÿ‰", "ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰"])
result_df["๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜"] = result_df["์ •๋ณดํ‚ค์›Œ๋“œ"].apply(fetch_blog_count)
result_df.sort_values(by="ํ† ํƒˆ์›”๊ฒ€์ƒ‰๋Ÿ‰", ascending=False, inplace=True)
debug_log("process_keyword ์™„๋ฃŒ")
return result_df, create_excel_file(result_df)
# [์ฐธ์กฐ์ฝ”๋“œ-1] ๋ฐ [์ฐธ์กฐ์ฝ”๋“œ-2]๋ฅผ ํ™œ์šฉํ•œ ํ˜•ํƒœ์†Œ ๋ถ„์„ ๋ฐ ๊ฒ€์ƒ‰๋Ÿ‰, ๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜ ์ถ”๊ฐ€ (๋นˆ๋„์ˆ˜1 ์ œ๊ฑฐ ์˜ต์…˜ ํฌํ•จ)
def morphological_analysis_and_enrich(text: str, remove_freq1: bool):
debug_log("morphological_analysis_and_enrich ํ•จ์ˆ˜ ์‹œ์ž‘")
df_freq, _ = analyze_text(text)
if df_freq.empty:
debug_log("ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ฒฐ๊ณผ๊ฐ€ ๋นˆ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์ž…๋‹ˆ๋‹ค.")
return df_freq, ""
if remove_freq1:
before_shape = df_freq.shape
df_freq = df_freq[df_freq["๋นˆ๋„์ˆ˜"] != 1]
debug_log(f"๋นˆ๋„์ˆ˜ 1 ์ œ๊ฑฐ ์ ์šฉ๋จ. {before_shape} -> {df_freq.shape}")
# ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ฒฐ๊ณผ์—์„œ ํ‚ค์›Œ๋“œ ์ถ”์ถœ (๊ฐ ๋‹จ์–ด๋ฅผ ์—”ํ„ฐ๋กœ ๊ตฌ๋ถ„)
keywords = "\n".join(df_freq["๋‹จ์–ด"].tolist())
debug_log(f"๋ถ„์„๋œ ํ‚ค์›Œ๋“œ: {keywords}")
# [์ฐธ์กฐ์ฝ”๋“œ-2]๋ฅผ ํ™œ์šฉํ•˜์—ฌ ๊ฐ ํ‚ค์›Œ๋“œ์˜ ๊ฒ€์ƒ‰๋Ÿ‰ ๋ฐ ๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜ ์กฐํšŒ (์—ฐ๊ด€๊ฒ€์ƒ‰์–ด ๋ฏธํฌํ•จ)
df_keyword_info, _ = process_keyword(keywords, include_related=False)
debug_log("๊ฒ€์ƒ‰๋Ÿ‰ ๋ฐ ๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜ ์กฐํšŒ ์™„๋ฃŒ")
# ํ˜•ํƒœ์†Œ ๋ถ„์„ ๊ฒฐ๊ณผ์™€ ๊ฒ€์ƒ‰๋Ÿ‰ ์ •๋ณด๋ฅผ ๋ณ‘ํ•ฉ (ํ‚ค์›Œ๋“œ ๊ธฐ์ค€)
merged_df = pd.merge(df_freq, df_keyword_info, left_on="๋‹จ์–ด", right_on="์ •๋ณดํ‚ค์›Œ๋“œ", how="left")
merged_df.drop(columns=["์ •๋ณดํ‚ค์›Œ๋“œ"], inplace=True)
# ๋ณ‘ํ•ฉ ๊ฒฐ๊ณผ Excel ํŒŒ์ผ ์ƒ์„ฑ
merged_excel_path = create_excel_file(merged_df)
debug_log("morphological_analysis_and_enrich ํ•จ์ˆ˜ ์™„๋ฃŒ")
return merged_df, merged_excel_path
# ์ƒˆ๋กญ๊ฒŒ ์ถ”๊ฐ€๋œ ๊ธฐ๋Šฅ: ์ž…๋ ฅํ•œ ๋ธ”๋กœ๊ทธ ๋งํฌ๋กœ๋ถ€ํ„ฐ ์Šคํฌ๋ž˜ํ•‘ํ•˜์—ฌ ์ˆ˜์ • ๊ฐ€๋Šฅํ•œ ํ…์ŠคํŠธ ๋ฐ•์Šค์— ์ถœ๋ ฅ
def fetch_blog_content(url: str):
debug_log("fetch_blog_content ํ•จ์ˆ˜ ์‹œ์ž‘")
content = scrape_naver_blog(url)
debug_log("fetch_blog_content ํ•จ์ˆ˜ ์™„๋ฃŒ")
return content
# Gradio ์ธํ„ฐํŽ˜์ด์Šค ๊ตฌ์„ฑ (๋‹จ์ผ ํƒญ)
with gr.Blocks(title="๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ํ˜•ํƒœ์†Œ ๋ถ„์„ ์ŠคํŽ˜์ด์Šค", css=".gradio-container { max-width: 960px; margin: auto; }") as demo:
gr.Markdown("# ๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ํ˜•ํƒœ์†Œ ๋ถ„์„ ์ŠคํŽ˜์ด์Šค")
with gr.Row():
blog_url_input = gr.Textbox(label="๋„ค์ด๋ฒ„ ๋ธ”๋กœ๊ทธ ๋งํฌ", placeholder="์˜ˆ: https://blog.naver.com/ssboost/222983068507", lines=1)
with gr.Row():
scrape_button = gr.Button("์Šคํฌ๋ž˜ํ•‘ ์‹คํ–‰")
with gr.Row():
blog_content_box = gr.Textbox(label="๋ธ”๋กœ๊ทธ ๋‚ด์šฉ (์ˆ˜์ • ๊ฐ€๋Šฅ)", lines=10, placeholder="์Šคํฌ๋ž˜ํ•‘๋œ ๋ธ”๋กœ๊ทธ ๋‚ด์šฉ์ด ์—ฌ๊ธฐ์— ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค.")
with gr.Row():
remove_freq_checkbox = gr.Checkbox(label="๋นˆ๋„์ˆ˜1 ์ œ๊ฑฐ", value=False)
with gr.Row():
analyze_button = gr.Button("๋ถ„์„ ์‹คํ–‰")
with gr.Row():
analysis_result = gr.Dataframe(label="๋ถ„์„ ๊ฒฐ๊ณผ (๋‹จ์–ด, ๋นˆ๋„์ˆ˜, ๊ฒ€์ƒ‰๋Ÿ‰, ๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜ ๋“ฑ)")
with gr.Row():
analysis_excel = gr.File(label="Excel ๋‹ค์šด๋กœ๋“œ")
# ์Šคํฌ๋ž˜ํ•‘ ์‹คํ–‰ ์‹œ URL๋กœ๋ถ€ํ„ฐ ๋ธ”๋กœ๊ทธ ๋ณธ๋ฌธ ์Šคํฌ๋ž˜ํ•‘ ํ›„ ์ˆ˜์ • ๊ฐ€๋Šฅํ•œ ํ…์ŠคํŠธ ๋ฐ•์Šค์— ์ถœ๋ ฅ
scrape_button.click(fn=fetch_blog_content, inputs=blog_url_input, outputs=blog_content_box)
# ๋ถ„์„ ์‹คํ–‰ ์‹œ ์ˆ˜์ •๋œ ๋ธ”๋กœ๊ทธ ๋‚ด์šฉ์„ ๋Œ€์ƒ์œผ๋กœ ํ˜•ํƒœ์†Œ ๋ถ„์„ ๋ฐ ๊ฒ€์ƒ‰๋Ÿ‰/๋ธ”๋กœ๊ทธ๋ฌธ์„œ์ˆ˜ ์กฐํšŒ ์ง„ํ–‰
analyze_button.click(fn=morphological_analysis_and_enrich, inputs=[blog_content_box, remove_freq_checkbox], outputs=[analysis_result, analysis_excel])
if __name__ == "__main__":
debug_log("Gradio ์•ฑ ์‹คํ–‰ ์‹œ์ž‘")
demo.launch()
debug_log("Gradio ์•ฑ ์‹คํ–‰ ์ข…๋ฃŒ")