N_B_analysis-4 / app.py
Kims12's picture
Update app.py
82bbb8a verified
import gradio as gr
import requests
from bs4 import BeautifulSoup
import urllib.parse # iframe 경둜 보정을 μœ„ν•œ λͺ¨λ“ˆ
import re
import logging
import tempfile
import pandas as pd
import mecab # python‑mecab‑ko 라이브러리 μ‚¬μš©
import os
import time
import hmac
import hashlib
import base64
# 디버깅(둜그)용 ν•¨μˆ˜
def debug_log(message: str):
print(f"[DEBUG] {message}")
# --- 넀이버 λΈ”λ‘œκ·Έ μŠ€ν¬λž˜ν•‘ ---
def scrape_naver_blog(url: str) -> str:
debug_log("scrape_naver_blog ν•¨μˆ˜ μ‹œμž‘")
debug_log(f"μš”μ²­λ°›μ€ URL: {url}")
headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/96.0.4664.110 Safari/537.36"
)
}
try:
response = requests.get(url, headers=headers)
debug_log("HTTP GET μš”μ²­(메인 νŽ˜μ΄μ§€) μ™„λ£Œ")
if response.status_code != 200:
debug_log(f"μš”μ²­ μ‹€νŒ¨, μƒνƒœμ½”λ“œ: {response.status_code}")
return f"였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€. μƒνƒœμ½”λ“œ: {response.status_code}"
soup = BeautifulSoup(response.text, "html.parser")
debug_log("HTML νŒŒμ‹±(메인 νŽ˜μ΄μ§€) μ™„λ£Œ")
iframe = soup.select_one("iframe#mainFrame")
if not iframe:
debug_log("iframe#mainFrame νƒœκ·Έλ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€.")
return "λ³Έλ¬Έ iframe을 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
iframe_src = iframe.get("src")
if not iframe_src:
debug_log("iframe srcκ°€ μ‘΄μž¬ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.")
return "λ³Έλ¬Έ iframe의 srcλ₯Ό 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
parsed_iframe_url = urllib.parse.urljoin(url, iframe_src)
debug_log(f"iframe νŽ˜μ΄μ§€ μš”μ²­ URL: {parsed_iframe_url}")
iframe_response = requests.get(parsed_iframe_url, headers=headers)
debug_log("HTTP GET μš”μ²­(iframe νŽ˜μ΄μ§€) μ™„λ£Œ")
if iframe_response.status_code != 200:
debug_log(f"iframe μš”μ²­ μ‹€νŒ¨, μƒνƒœμ½”λ“œ: {iframe_response.status_code}")
return f"iframeμ—μ„œ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€. μƒνƒœμ½”λ“œ: {iframe_response.status_code}"
iframe_soup = BeautifulSoup(iframe_response.text, "html.parser")
debug_log("HTML νŒŒμ‹±(iframe νŽ˜μ΄μ§€) μ™„λ£Œ")
title_div = iframe_soup.select_one('.se-module.se-module-text.se-title-text')
title = title_div.get_text(strip=True) if title_div else "제λͺ©μ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
debug_log(f"μΆ”μΆœλœ 제λͺ©: {title}")
content_div = iframe_soup.select_one('.se-main-container')
if content_div:
content = content_div.get_text("\n", strip=True)
else:
content = "본문을 찾을 수 μ—†μŠ΅λ‹ˆλ‹€."
debug_log("λ³Έλ¬Έ μΆ”μΆœ μ™„λ£Œ")
result = f"[제λͺ©]\n{title}\n\n[λ³Έλ¬Έ]\n{content}"
debug_log("제λͺ©κ³Ό λ³Έλ¬Έ ν•©μΉ¨ μ™„λ£Œ")
return result
except Exception as e:
debug_log(f"μ—λŸ¬ λ°œμƒ: {str(e)}")
return f"μŠ€ν¬λž˜ν•‘ 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}"
# --- ν˜•νƒœμ†Œ 뢄석 (μ°Έμ‘°μ½”λ“œ-1) ---
def analyze_text(text: str):
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.debug("원본 ν…μŠ€νŠΈ: %s", text)
filtered_text = re.sub(r'[^κ°€-힣]', '', text)
logger.debug("ν•„ν„°λ§λœ ν…μŠ€νŠΈ: %s", filtered_text)
if not filtered_text:
logger.debug("μœ νš¨ν•œ ν•œκ΅­μ–΄ ν…μŠ€νŠΈκ°€ μ—†μŒ.")
return pd.DataFrame(columns=["단어", "λΉˆλ„μˆ˜"]), ""
mecab_instance = mecab.MeCab()
tokens = mecab_instance.pos(filtered_text)
logger.debug("ν˜•νƒœμ†Œ 뢄석 κ²°κ³Ό: %s", tokens)
freq = {}
for word, pos in tokens:
if word and word.strip() and pos.startswith("NN"):
freq[word] = freq.get(word, 0) + 1
logger.debug("단어: %s, ν’ˆμ‚¬: %s, λΉˆλ„: %d", word, pos, freq[word])
sorted_freq = sorted(freq.items(), key=lambda x: x[1], reverse=True)
logger.debug("μ •λ ¬λœ 단어 λΉˆλ„: %s", sorted_freq)
df = pd.DataFrame(sorted_freq, columns=["단어", "λΉˆλ„μˆ˜"])
logger.debug("ν˜•νƒœμ†Œ 뢄석 DataFrame 생성됨, shape: %s", df.shape)
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx")
df.to_excel(temp_file.name, index=False, engine='openpyxl')
temp_file.close()
logger.debug("Excel 파일 생성됨: %s", temp_file.name)
return df, temp_file.name
# --- 넀이버 검색 및 κ΄‘κ³  API κ΄€λ ¨ (μ°Έμ‘°μ½”λ“œ-2) ---
def generate_signature(timestamp, method, uri, secret_key):
message = f"{timestamp}.{method}.{uri}"
digest = hmac.new(secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).digest()
return base64.b64encode(digest).decode()
def get_header(method, uri, api_key, secret_key, customer_id):
timestamp = str(round(time.time() * 1000))
signature = generate_signature(timestamp, method, uri, secret_key)
return {
"Content-Type": "application/json; charset=UTF-8",
"X-Timestamp": timestamp,
"X-API-KEY": api_key,
"X-Customer": str(customer_id),
"X-Signature": signature
}
def fetch_related_keywords(keyword):
debug_log(f"fetch_related_keywords 호좜, ν‚€μ›Œλ“œ: {keyword}")
API_KEY = os.environ["NAVER_API_KEY"]
SECRET_KEY = os.environ["NAVER_SECRET_KEY"]
CUSTOMER_ID = os.environ["NAVER_CUSTOMER_ID"]
BASE_URL = "https://api.naver.com"
uri = "/keywordstool"
method = "GET"
headers = get_header(method, uri, API_KEY, SECRET_KEY, CUSTOMER_ID)
params = {
"hintKeywords": [keyword],
"showDetail": "1"
}
response = requests.get(BASE_URL + uri, params=params, headers=headers)
data = response.json()
if "keywordList" not in data:
return pd.DataFrame()
df = pd.DataFrame(data["keywordList"])
if len(df) > 100:
df = df.head(100)
def parse_count(x):
try:
return int(str(x).replace(",", ""))
except:
return 0
df["PCμ›”κ²€μƒ‰λŸ‰"] = df["monthlyPcQcCnt"].apply(parse_count)
df["λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰"] = df["monthlyMobileQcCnt"].apply(parse_count)
df["ν† νƒˆμ›”κ²€μƒ‰λŸ‰"] = df["PCμ›”κ²€μƒ‰λŸ‰"] + df["λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰"]
df.rename(columns={"relKeyword": "μ •λ³΄ν‚€μ›Œλ“œ"}, inplace=True)
result_df = df[["μ •λ³΄ν‚€μ›Œλ“œ", "PCμ›”κ²€μƒ‰λŸ‰", "λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰", "ν† νƒˆμ›”κ²€μƒ‰λŸ‰"]]
debug_log("fetch_related_keywords μ™„λ£Œ")
return result_df
def fetch_blog_count(keyword):
debug_log(f"fetch_blog_count 호좜, ν‚€μ›Œλ“œ: {keyword}")
client_id = os.environ["NAVER_SEARCH_CLIENT_ID"]
client_secret = os.environ["NAVER_SEARCH_CLIENT_SECRET"]
url = "https://openapi.naver.com/v1/search/blog.json"
headers = {
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret
}
params = {"query": keyword, "display": 1}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
debug_log(f"fetch_blog_count κ²°κ³Ό: {data.get('total', 0)}")
return data.get("total", 0)
else:
debug_log(f"fetch_blog_count 였λ₯˜, μƒνƒœμ½”λ“œ: {response.status_code}")
return 0
def create_excel_file(df):
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp:
excel_path = tmp.name
df.to_excel(excel_path, index=False)
debug_log(f"Excel 파일 생성됨: {excel_path}")
return excel_path
def process_keyword(keywords: str, include_related: bool):
debug_log(f"process_keyword 호좜, ν‚€μ›Œλ“œλ“€: {keywords}, 연관검색어 포함: {include_related}")
input_keywords = [k.strip() for k in keywords.splitlines() if k.strip()]
result_dfs = []
for idx, kw in enumerate(input_keywords):
df_kw = fetch_related_keywords(kw)
if df_kw.empty:
continue
row_kw = df_kw[df_kw["μ •λ³΄ν‚€μ›Œλ“œ"] == kw]
if not row_kw.empty:
result_dfs.append(row_kw)
else:
result_dfs.append(df_kw.head(1))
if include_related and idx == 0:
df_related = df_kw[df_kw["μ •λ³΄ν‚€μ›Œλ“œ"] != kw]
if not df_related.empty:
result_dfs.append(df_related)
if result_dfs:
result_df = pd.concat(result_dfs, ignore_index=True)
result_df.drop_duplicates(subset=["μ •λ³΄ν‚€μ›Œλ“œ"], inplace=True)
else:
result_df = pd.DataFrame(columns=["μ •λ³΄ν‚€μ›Œλ“œ", "PCμ›”κ²€μƒ‰λŸ‰", "λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰", "ν† νƒˆμ›”κ²€μƒ‰λŸ‰"])
result_df["λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜"] = result_df["μ •λ³΄ν‚€μ›Œλ“œ"].apply(fetch_blog_count)
result_df.sort_values(by="ν† νƒˆμ›”κ²€μƒ‰λŸ‰", ascending=False, inplace=True)
debug_log("process_keyword μ™„λ£Œ")
return result_df, create_excel_file(result_df)
# --- ν˜•νƒœμ†Œ 뢄석과 κ²€μƒ‰λŸ‰/λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜ 병합 ---
def morphological_analysis_and_enrich(text: str, remove_freq1: bool):
debug_log("morphological_analysis_and_enrich ν•¨μˆ˜ μ‹œμž‘")
df_freq, _ = analyze_text(text)
if df_freq.empty:
debug_log("ν˜•νƒœμ†Œ 뢄석 κ²°κ³Όκ°€ 빈 λ°μ΄ν„°ν”„λ ˆμž„μž…λ‹ˆλ‹€.")
return df_freq, ""
if remove_freq1:
before_shape = df_freq.shape
df_freq = df_freq[df_freq["λΉˆλ„μˆ˜"] != 1]
debug_log(f"λΉˆλ„μˆ˜ 1 제거 적용됨. {before_shape} -> {df_freq.shape}")
keywords = "\n".join(df_freq["단어"].tolist())
debug_log(f"λΆ„μ„λœ ν‚€μ›Œλ“œ: {keywords}")
df_keyword_info, _ = process_keyword(keywords, include_related=False)
debug_log("κ²€μƒ‰λŸ‰ 및 λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜ 쑰회 μ™„λ£Œ")
merged_df = pd.merge(df_freq, df_keyword_info, left_on="단어", right_on="μ •λ³΄ν‚€μ›Œλ“œ", how="left")
merged_df.drop(columns=["μ •λ³΄ν‚€μ›Œλ“œ"], inplace=True)
merged_excel_path = create_excel_file(merged_df)
debug_log("morphological_analysis_and_enrich ν•¨μˆ˜ μ™„λ£Œ")
return merged_df, merged_excel_path
# --- 직접 ν‚€μ›Œλ“œ 뢄석 (단독 뢄석) ---
def direct_keyword_analysis(text: str, keyword_input: str):
debug_log("direct_keyword_analysis ν•¨μˆ˜ μ‹œμž‘")
keywords = re.split(r'[\n,]+', keyword_input)
keywords = [kw.strip() for kw in keywords if kw.strip()]
debug_log(f"μž…λ ₯된 ν‚€μ›Œλ“œ λͺ©λ‘: {keywords}")
results = []
for kw in keywords:
count = text.count(kw)
results.append((kw, count))
debug_log(f"ν‚€μ›Œλ“œ '{kw}'의 λΉˆλ„μˆ˜: {count}")
df = pd.DataFrame(results, columns=["ν‚€μ›Œλ“œ", "λΉˆλ„μˆ˜"])
excel_path = create_excel_file(df)
debug_log("direct_keyword_analysis ν•¨μˆ˜ μ™„λ£Œ")
return df, excel_path
# --- 톡합 뢄석 (ν˜•νƒœμ†Œ 뢄석 + 직접 ν‚€μ›Œλ“œ 뢄석) ---
def combined_analysis(blog_text: str, remove_freq1: bool, direct_keyword_input: str):
debug_log("combined_analysis ν•¨μˆ˜ μ‹œμž‘")
merged_df, _ = morphological_analysis_and_enrich(blog_text, remove_freq1)
if "μ§μ ‘μž…λ ₯" not in merged_df.columns:
merged_df["μ§μ ‘μž…λ ₯"] = ""
direct_keywords = re.split(r'[\n,]+', direct_keyword_input)
direct_keywords = [kw.strip() for kw in direct_keywords if kw.strip()]
debug_log(f"μž…λ ₯된 직접 ν‚€μ›Œλ“œ: {direct_keywords}")
for dk in direct_keywords:
if dk in merged_df["단어"].values:
merged_df.loc[merged_df["단어"] == dk, "μ§μ ‘μž…λ ₯"] = "μ§μ ‘μž…λ ₯"
else:
freq = blog_text.count(dk)
df_direct, _ = process_keyword(dk, include_related=False)
if (not df_direct.empty) and (dk in df_direct["μ •λ³΄ν‚€μ›Œλ“œ"].values):
row = df_direct[df_direct["μ •λ³΄ν‚€μ›Œλ“œ"] == dk].iloc[0]
pc = row.get("PCμ›”κ²€μƒ‰λŸ‰", None)
mobile = row.get("λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰", None)
total = row.get("ν† νƒˆμ›”κ²€μƒ‰λŸ‰", None)
blog_count = row.get("λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜", None)
else:
pc = mobile = total = blog_count = None
new_row = {
"단어": dk,
"λΉˆλ„μˆ˜": freq,
"PCμ›”κ²€μƒ‰λŸ‰": pc,
"λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰": mobile,
"ν† νƒˆμ›”κ²€μƒ‰λŸ‰": total,
"λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜": blog_count,
"μ§μ ‘μž…λ ₯": "μ§μ ‘μž…λ ₯"
}
merged_df = pd.concat([merged_df, pd.DataFrame([new_row])], ignore_index=True)
merged_df = merged_df.sort_values(by="λΉˆλ„μˆ˜", ascending=False).reset_index(drop=True)
combined_excel = create_excel_file(merged_df)
debug_log("combined_analysis ν•¨μˆ˜ μ™„λ£Œ")
return merged_df, combined_excel
# --- 뢄석 ν•Έλ“€λŸ¬ ---
def analysis_handler(blog_text: str, remove_freq1: bool, direct_keyword_input: str, direct_keyword_only: bool):
debug_log("analysis_handler ν•¨μˆ˜ μ‹œμž‘")
if direct_keyword_only:
# 직접 ν‚€μ›Œλ“œ λΆ„μ„λ§Œ μˆ˜ν–‰
return direct_keyword_analysis(blog_text, direct_keyword_input)
else:
# 톡합 뢄석 (ν˜•νƒœμ†Œ 뢄석 + 직접 ν‚€μ›Œλ“œ 뢄석)
return combined_analysis(blog_text, remove_freq1, direct_keyword_input)
# --- μŠ€ν¬λž˜ν•‘ μ‹€ν–‰ ---
def fetch_blog_content(url: str):
debug_log("fetch_blog_content ν•¨μˆ˜ μ‹œμž‘")
content = scrape_naver_blog(url)
debug_log("fetch_blog_content ν•¨μˆ˜ μ™„λ£Œ")
return content
# --- Gradio μΈν„°νŽ˜μ΄μŠ€ ꡬ성 ---
custom_css = """
.gradio-container { max-width: 960px; margin: auto; }
.centered-button-row { justify-content: center; }
"""
with gr.Blocks(title="넀이버 λΈ”λ‘œκ·Έ ν˜•νƒœμ†Œ 뢄석 슀페이슀", css=custom_css) as demo:
gr.Markdown("# 넀이버 λΈ”λ‘œκ·Έ ν˜•νƒœμ†Œ 뢄석 슀페이슀")
# λΈ”λ‘œκ·Έ 링크와 μŠ€ν¬λž˜ν•‘ μ‹€ν–‰ λ²„νŠΌμ„ ν•œ κ·Έλ£Ή 내에 배치 (λ²„νŠΌμ€ κ°€μš΄λ° μ •λ ¬)
with gr.Group():
blog_url_input = gr.Textbox(label="넀이버 λΈ”λ‘œκ·Έ 링크", placeholder="예: https://blog.naver.com/ssboost/222983068507", lines=1)
with gr.Row(elem_classes="centered-button-row"):
scrape_button = gr.Button("μŠ€ν¬λž˜ν•‘ μ‹€ν–‰")
with gr.Row():
blog_content_box = gr.Textbox(label="λΈ”λ‘œκ·Έ λ‚΄μš© (μˆ˜μ • κ°€λŠ₯)", lines=10, placeholder="μŠ€ν¬λž˜ν•‘λœ λΈ”λ‘œκ·Έ λ‚΄μš©μ΄ 여기에 ν‘œμ‹œλ©λ‹ˆλ‹€.")
with gr.Row():
remove_freq_checkbox = gr.Checkbox(label="λΉˆλ„μˆ˜1 제거", value=True)
# "λΉˆλ„μˆ˜1 제거" μ•„λž˜μ— "직접 ν‚€μ›Œλ“œ μž…λ ₯만 뢄석" 선택 ν•­λͺ© μΆ”κ°€ (κΈ°λ³Έ 미선택)
with gr.Row():
direct_keyword_only_checkbox = gr.Checkbox(label="직접 ν‚€μ›Œλ“œ μž…λ ₯만 뢄석", value=False)
with gr.Row():
direct_keyword_box = gr.Textbox(label="직접 ν‚€μ›Œλ“œ μž…λ ₯ (μ—”ν„° λ˜λŠ” ','둜 ꡬ뢄)", lines=2, placeholder="예: ν‚€μ›Œλ“œ1, ν‚€μ›Œλ“œ2\nν‚€μ›Œλ“œ3")
with gr.Row():
analyze_button = gr.Button("뢄석 μ‹€ν–‰")
# κ²°κ³Ό ν…Œμ΄λΈ”μ€ ν™”λ©΄ 전체 폭을 μ‚¬μš©ν•˜κ³ , Excel λ‹€μš΄λ‘œλ“œ λ²„νŠΌμ€ κ·Έ μ•„λž˜ 별도 행에 배치
with gr.Row():
result_df = gr.Dataframe(label="톡합 뢄석 κ²°κ³Ό (단어, λΉˆλ„μˆ˜, κ²€μƒ‰λŸ‰, λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜, μ§μ ‘μž…λ ₯)", interactive=True)
with gr.Row():
excel_file = gr.File(label="Excel λ‹€μš΄λ‘œλ“œ")
# 이벀트 μ—°κ²°
scrape_button.click(fn=fetch_blog_content, inputs=blog_url_input, outputs=blog_content_box)
analyze_button.click(fn=analysis_handler, inputs=[blog_content_box, remove_freq_checkbox, direct_keyword_box, direct_keyword_only_checkbox],
outputs=[result_df, excel_file])
if __name__ == "__main__":
debug_log("Gradio μ•± μ‹€ν–‰ μ‹œμž‘")
demo.launch()
debug_log("Gradio μ•± μ‹€ν–‰ μ’…λ£Œ")