Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
from bs4 import BeautifulSoup | |
import urllib.parse # iframe κ²½λ‘ λ³΄μ μ μν λͺ¨λ | |
import re | |
import logging | |
import tempfile | |
import pandas as pd | |
import mecab # pythonβmecabβko λΌμ΄λΈλ¬λ¦¬ μ¬μ© | |
import os | |
import time | |
import hmac | |
import hashlib | |
import base64 | |
# λλ²κΉ (λ‘κ·Έ)μ© ν¨μ | |
def debug_log(message: str): | |
print(f"[DEBUG] {message}") | |
# --- λ€μ΄λ² λΈλ‘κ·Έ μ€ν¬λν --- | |
def scrape_naver_blog(url: str) -> str: | |
debug_log("scrape_naver_blog ν¨μ μμ") | |
debug_log(f"μμ²λ°μ URL: {url}") | |
headers = { | |
"User-Agent": ( | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " | |
"AppleWebKit/537.36 (KHTML, like Gecko) " | |
"Chrome/96.0.4664.110 Safari/537.36" | |
) | |
} | |
try: | |
response = requests.get(url, headers=headers) | |
debug_log("HTTP GET μμ²(λ©μΈ νμ΄μ§) μλ£") | |
if response.status_code != 200: | |
debug_log(f"μμ² μ€ν¨, μνμ½λ: {response.status_code}") | |
return f"μ€λ₯κ° λ°μνμ΅λλ€. μνμ½λ: {response.status_code}" | |
soup = BeautifulSoup(response.text, "html.parser") | |
debug_log("HTML νμ±(λ©μΈ νμ΄μ§) μλ£") | |
iframe = soup.select_one("iframe#mainFrame") | |
if not iframe: | |
debug_log("iframe#mainFrame νκ·Έλ₯Ό μ°Ύμ μ μμ΅λλ€.") | |
return "λ³Έλ¬Έ iframeμ μ°Ύμ μ μμ΅λλ€." | |
iframe_src = iframe.get("src") | |
if not iframe_src: | |
debug_log("iframe srcκ° μ‘΄μ¬νμ§ μμ΅λλ€.") | |
return "λ³Έλ¬Έ iframeμ srcλ₯Ό μ°Ύμ μ μμ΅λλ€." | |
parsed_iframe_url = urllib.parse.urljoin(url, iframe_src) | |
debug_log(f"iframe νμ΄μ§ μμ² URL: {parsed_iframe_url}") | |
iframe_response = requests.get(parsed_iframe_url, headers=headers) | |
debug_log("HTTP GET μμ²(iframe νμ΄μ§) μλ£") | |
if iframe_response.status_code != 200: | |
debug_log(f"iframe μμ² μ€ν¨, μνμ½λ: {iframe_response.status_code}") | |
return f"iframeμμ μ€λ₯κ° λ°μνμ΅λλ€. μνμ½λ: {iframe_response.status_code}" | |
iframe_soup = BeautifulSoup(iframe_response.text, "html.parser") | |
debug_log("HTML νμ±(iframe νμ΄μ§) μλ£") | |
title_div = iframe_soup.select_one('.se-module.se-module-text.se-title-text') | |
title = title_div.get_text(strip=True) if title_div else "μ λͺ©μ μ°Ύμ μ μμ΅λλ€." | |
debug_log(f"μΆμΆλ μ λͺ©: {title}") | |
content_div = iframe_soup.select_one('.se-main-container') | |
if content_div: | |
content = content_div.get_text("\n", strip=True) | |
else: | |
content = "λ³Έλ¬Έμ μ°Ύμ μ μμ΅λλ€." | |
debug_log("λ³Έλ¬Έ μΆμΆ μλ£") | |
result = f"[μ λͺ©]\n{title}\n\n[λ³Έλ¬Έ]\n{content}" | |
debug_log("μ λͺ©κ³Ό λ³Έλ¬Έ ν©μΉ¨ μλ£") | |
return result | |
except Exception as e: | |
debug_log(f"μλ¬ λ°μ: {str(e)}") | |
return f"μ€ν¬λν μ€ μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}" | |
# --- ννμ λΆμ (μ°Έμ‘°μ½λ-1) --- | |
def analyze_text(text: str): | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
logger.debug("μλ³Έ ν μ€νΈ: %s", text) | |
filtered_text = re.sub(r'[^κ°-ν£]', '', text) | |
logger.debug("νν°λ§λ ν μ€νΈ: %s", filtered_text) | |
if not filtered_text: | |
logger.debug("μ ν¨ν νκ΅μ΄ ν μ€νΈκ° μμ.") | |
return pd.DataFrame(columns=["λ¨μ΄", "λΉλμ"]), "" | |
mecab_instance = mecab.MeCab() | |
tokens = mecab_instance.pos(filtered_text) | |
logger.debug("ννμ λΆμ κ²°κ³Ό: %s", tokens) | |
freq = {} | |
for word, pos in tokens: | |
if word and word.strip() and pos.startswith("NN"): | |
freq[word] = freq.get(word, 0) + 1 | |
logger.debug("λ¨μ΄: %s, νμ¬: %s, λΉλ: %d", word, pos, freq[word]) | |
sorted_freq = sorted(freq.items(), key=lambda x: x[1], reverse=True) | |
logger.debug("μ λ ¬λ λ¨μ΄ λΉλ: %s", sorted_freq) | |
df = pd.DataFrame(sorted_freq, columns=["λ¨μ΄", "λΉλμ"]) | |
logger.debug("ννμ λΆμ DataFrame μμ±λ¨, shape: %s", df.shape) | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") | |
df.to_excel(temp_file.name, index=False, engine='openpyxl') | |
temp_file.close() | |
logger.debug("Excel νμΌ μμ±λ¨: %s", temp_file.name) | |
return df, temp_file.name | |
# --- λ€μ΄λ² κ²μ λ° κ΄κ³ API κ΄λ ¨ (μ°Έμ‘°μ½λ-2) --- | |
def generate_signature(timestamp, method, uri, secret_key): | |
message = f"{timestamp}.{method}.{uri}" | |
digest = hmac.new(secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).digest() | |
return base64.b64encode(digest).decode() | |
def get_header(method, uri, api_key, secret_key, customer_id): | |
timestamp = str(round(time.time() * 1000)) | |
signature = generate_signature(timestamp, method, uri, secret_key) | |
return { | |
"Content-Type": "application/json; charset=UTF-8", | |
"X-Timestamp": timestamp, | |
"X-API-KEY": api_key, | |
"X-Customer": str(customer_id), | |
"X-Signature": signature | |
} | |
def fetch_related_keywords(keyword): | |
debug_log(f"fetch_related_keywords νΈμΆ, ν€μλ: {keyword}") | |
API_KEY = os.environ["NAVER_API_KEY"] | |
SECRET_KEY = os.environ["NAVER_SECRET_KEY"] | |
CUSTOMER_ID = os.environ["NAVER_CUSTOMER_ID"] | |
BASE_URL = "https://api.naver.com" | |
uri = "/keywordstool" | |
method = "GET" | |
headers = get_header(method, uri, API_KEY, SECRET_KEY, CUSTOMER_ID) | |
params = { | |
"hintKeywords": [keyword], | |
"showDetail": "1" | |
} | |
response = requests.get(BASE_URL + uri, params=params, headers=headers) | |
data = response.json() | |
if "keywordList" not in data: | |
return pd.DataFrame() | |
df = pd.DataFrame(data["keywordList"]) | |
if len(df) > 100: | |
df = df.head(100) | |
def parse_count(x): | |
try: | |
return int(str(x).replace(",", "")) | |
except: | |
return 0 | |
df["PCμκ²μλ"] = df["monthlyPcQcCnt"].apply(parse_count) | |
df["λͺ¨λ°μΌμκ²μλ"] = df["monthlyMobileQcCnt"].apply(parse_count) | |
df["ν νμκ²μλ"] = df["PCμκ²μλ"] + df["λͺ¨λ°μΌμκ²μλ"] | |
df.rename(columns={"relKeyword": "μ 보ν€μλ"}, inplace=True) | |
result_df = df[["μ 보ν€μλ", "PCμκ²μλ", "λͺ¨λ°μΌμκ²μλ", "ν νμκ²μλ"]] | |
debug_log("fetch_related_keywords μλ£") | |
return result_df | |
def fetch_blog_count(keyword): | |
debug_log(f"fetch_blog_count νΈμΆ, ν€μλ: {keyword}") | |
client_id = os.environ["NAVER_SEARCH_CLIENT_ID"] | |
client_secret = os.environ["NAVER_SEARCH_CLIENT_SECRET"] | |
url = "https://openapi.naver.com/v1/search/blog.json" | |
headers = { | |
"X-Naver-Client-Id": client_id, | |
"X-Naver-Client-Secret": client_secret | |
} | |
params = {"query": keyword, "display": 1} | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
debug_log(f"fetch_blog_count κ²°κ³Ό: {data.get('total', 0)}") | |
return data.get("total", 0) | |
else: | |
debug_log(f"fetch_blog_count μ€λ₯, μνμ½λ: {response.status_code}") | |
return 0 | |
def create_excel_file(df): | |
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp: | |
excel_path = tmp.name | |
df.to_excel(excel_path, index=False) | |
debug_log(f"Excel νμΌ μμ±λ¨: {excel_path}") | |
return excel_path | |
def process_keyword(keywords: str, include_related: bool): | |
debug_log(f"process_keyword νΈμΆ, ν€μλλ€: {keywords}, μ°κ΄κ²μμ΄ ν¬ν¨: {include_related}") | |
input_keywords = [k.strip() for k in keywords.splitlines() if k.strip()] | |
result_dfs = [] | |
for idx, kw in enumerate(input_keywords): | |
df_kw = fetch_related_keywords(kw) | |
if df_kw.empty: | |
continue | |
row_kw = df_kw[df_kw["μ 보ν€μλ"] == kw] | |
if not row_kw.empty: | |
result_dfs.append(row_kw) | |
else: | |
result_dfs.append(df_kw.head(1)) | |
if include_related and idx == 0: | |
df_related = df_kw[df_kw["μ 보ν€μλ"] != kw] | |
if not df_related.empty: | |
result_dfs.append(df_related) | |
if result_dfs: | |
result_df = pd.concat(result_dfs, ignore_index=True) | |
result_df.drop_duplicates(subset=["μ 보ν€μλ"], inplace=True) | |
else: | |
result_df = pd.DataFrame(columns=["μ 보ν€μλ", "PCμκ²μλ", "λͺ¨λ°μΌμκ²μλ", "ν νμκ²μλ"]) | |
result_df["λΈλ‘κ·Έλ¬Έμμ"] = result_df["μ 보ν€μλ"].apply(fetch_blog_count) | |
result_df.sort_values(by="ν νμκ²μλ", ascending=False, inplace=True) | |
debug_log("process_keyword μλ£") | |
return result_df, create_excel_file(result_df) | |
# --- ννμ λΆμκ³Ό κ²μλ/λΈλ‘κ·Έλ¬Έμμ λ³ν© --- | |
def morphological_analysis_and_enrich(text: str, remove_freq1: bool): | |
debug_log("morphological_analysis_and_enrich ν¨μ μμ") | |
df_freq, _ = analyze_text(text) | |
if df_freq.empty: | |
debug_log("ννμ λΆμ κ²°κ³Όκ° λΉ λ°μ΄ν°νλ μμ λλ€.") | |
return df_freq, "" | |
if remove_freq1: | |
before_shape = df_freq.shape | |
df_freq = df_freq[df_freq["λΉλμ"] != 1] | |
debug_log(f"λΉλμ 1 μ κ±° μ μ©λ¨. {before_shape} -> {df_freq.shape}") | |
keywords = "\n".join(df_freq["λ¨μ΄"].tolist()) | |
debug_log(f"λΆμλ ν€μλ: {keywords}") | |
df_keyword_info, _ = process_keyword(keywords, include_related=False) | |
debug_log("κ²μλ λ° λΈλ‘κ·Έλ¬Έμμ μ‘°ν μλ£") | |
merged_df = pd.merge(df_freq, df_keyword_info, left_on="λ¨μ΄", right_on="μ 보ν€μλ", how="left") | |
merged_df.drop(columns=["μ 보ν€μλ"], inplace=True) | |
merged_excel_path = create_excel_file(merged_df) | |
debug_log("morphological_analysis_and_enrich ν¨μ μλ£") | |
return merged_df, merged_excel_path | |
# --- μ§μ ν€μλ λΆμ (λ¨λ λΆμ) --- | |
def direct_keyword_analysis(text: str, keyword_input: str): | |
debug_log("direct_keyword_analysis ν¨μ μμ") | |
keywords = re.split(r'[\n,]+', keyword_input) | |
keywords = [kw.strip() for kw in keywords if kw.strip()] | |
debug_log(f"μ λ ₯λ ν€μλ λͺ©λ‘: {keywords}") | |
results = [] | |
for kw in keywords: | |
count = text.count(kw) | |
results.append((kw, count)) | |
debug_log(f"ν€μλ '{kw}'μ λΉλμ: {count}") | |
df = pd.DataFrame(results, columns=["ν€μλ", "λΉλμ"]) | |
excel_path = create_excel_file(df) | |
debug_log("direct_keyword_analysis ν¨μ μλ£") | |
return df, excel_path | |
# --- ν΅ν© λΆμ (ννμ λΆμ + μ§μ ν€μλ λΆμ) --- | |
def combined_analysis(blog_text: str, remove_freq1: bool, direct_keyword_input: str): | |
debug_log("combined_analysis ν¨μ μμ") | |
merged_df, _ = morphological_analysis_and_enrich(blog_text, remove_freq1) | |
if "μ§μ μ λ ₯" not in merged_df.columns: | |
merged_df["μ§μ μ λ ₯"] = "" | |
direct_keywords = re.split(r'[\n,]+', direct_keyword_input) | |
direct_keywords = [kw.strip() for kw in direct_keywords if kw.strip()] | |
debug_log(f"μ λ ₯λ μ§μ ν€μλ: {direct_keywords}") | |
for dk in direct_keywords: | |
if dk in merged_df["λ¨μ΄"].values: | |
merged_df.loc[merged_df["λ¨μ΄"] == dk, "μ§μ μ λ ₯"] = "μ§μ μ λ ₯" | |
else: | |
freq = blog_text.count(dk) | |
df_direct, _ = process_keyword(dk, include_related=False) | |
if (not df_direct.empty) and (dk in df_direct["μ 보ν€μλ"].values): | |
row = df_direct[df_direct["μ 보ν€μλ"] == dk].iloc[0] | |
pc = row.get("PCμκ²μλ", None) | |
mobile = row.get("λͺ¨λ°μΌμκ²μλ", None) | |
total = row.get("ν νμκ²μλ", None) | |
blog_count = row.get("λΈλ‘κ·Έλ¬Έμμ", None) | |
else: | |
pc = mobile = total = blog_count = None | |
new_row = { | |
"λ¨μ΄": dk, | |
"λΉλμ": freq, | |
"PCμκ²μλ": pc, | |
"λͺ¨λ°μΌμκ²μλ": mobile, | |
"ν νμκ²μλ": total, | |
"λΈλ‘κ·Έλ¬Έμμ": blog_count, | |
"μ§μ μ λ ₯": "μ§μ μ λ ₯" | |
} | |
merged_df = pd.concat([merged_df, pd.DataFrame([new_row])], ignore_index=True) | |
merged_df = merged_df.sort_values(by="λΉλμ", ascending=False).reset_index(drop=True) | |
combined_excel = create_excel_file(merged_df) | |
debug_log("combined_analysis ν¨μ μλ£") | |
return merged_df, combined_excel | |
# --- λΆμ νΈλ€λ¬ --- | |
def analysis_handler(blog_text: str, remove_freq1: bool, direct_keyword_input: str, direct_keyword_only: bool): | |
debug_log("analysis_handler ν¨μ μμ") | |
if direct_keyword_only: | |
# μ§μ ν€μλ λΆμλ§ μν | |
return direct_keyword_analysis(blog_text, direct_keyword_input) | |
else: | |
# ν΅ν© λΆμ (ννμ λΆμ + μ§μ ν€μλ λΆμ) | |
return combined_analysis(blog_text, remove_freq1, direct_keyword_input) | |
# --- μ€ν¬λν μ€ν --- | |
def fetch_blog_content(url: str): | |
debug_log("fetch_blog_content ν¨μ μμ") | |
content = scrape_naver_blog(url) | |
debug_log("fetch_blog_content ν¨μ μλ£") | |
return content | |
# --- Gradio μΈν°νμ΄μ€ κ΅¬μ± --- | |
custom_css = """ | |
.gradio-container { max-width: 960px; margin: auto; } | |
.centered-button-row { justify-content: center; } | |
""" | |
with gr.Blocks(title="λ€μ΄λ² λΈλ‘κ·Έ ννμ λΆμ μ€νμ΄μ€", css=custom_css) as demo: | |
gr.Markdown("# λ€μ΄λ² λΈλ‘κ·Έ ννμ λΆμ μ€νμ΄μ€") | |
# λΈλ‘κ·Έ λ§ν¬μ μ€ν¬λν μ€ν λ²νΌμ ν κ·Έλ£Ή λ΄μ λ°°μΉ (λ²νΌμ κ°μ΄λ° μ λ ¬) | |
with gr.Group(): | |
blog_url_input = gr.Textbox(label="λ€μ΄λ² λΈλ‘κ·Έ λ§ν¬", placeholder="μ: https://blog.naver.com/ssboost/222983068507", lines=1) | |
with gr.Row(elem_classes="centered-button-row"): | |
scrape_button = gr.Button("μ€ν¬λν μ€ν") | |
with gr.Row(): | |
blog_content_box = gr.Textbox(label="λΈλ‘κ·Έ λ΄μ© (μμ κ°λ₯)", lines=10, placeholder="μ€ν¬λνλ λΈλ‘κ·Έ λ΄μ©μ΄ μ¬κΈ°μ νμλ©λλ€.") | |
with gr.Row(): | |
remove_freq_checkbox = gr.Checkbox(label="λΉλμ1 μ κ±°", value=True) | |
# "λΉλμ1 μ κ±°" μλμ "μ§μ ν€μλ μ λ ₯λ§ λΆμ" μ ν νλͺ© μΆκ° (κΈ°λ³Έ λ―Έμ ν) | |
with gr.Row(): | |
direct_keyword_only_checkbox = gr.Checkbox(label="μ§μ ν€μλ μ λ ₯λ§ λΆμ", value=False) | |
with gr.Row(): | |
direct_keyword_box = gr.Textbox(label="μ§μ ν€μλ μ λ ₯ (μν° λλ ','λ‘ κ΅¬λΆ)", lines=2, placeholder="μ: ν€μλ1, ν€μλ2\nν€μλ3") | |
with gr.Row(): | |
analyze_button = gr.Button("λΆμ μ€ν") | |
# κ²°κ³Ό ν μ΄λΈμ νλ©΄ μ 체 νμ μ¬μ©νκ³ , Excel λ€μ΄λ‘λ λ²νΌμ κ·Έ μλ λ³λ νμ λ°°μΉ | |
with gr.Row(): | |
result_df = gr.Dataframe(label="ν΅ν© λΆμ κ²°κ³Ό (λ¨μ΄, λΉλμ, κ²μλ, λΈλ‘κ·Έλ¬Έμμ, μ§μ μ λ ₯)", interactive=True) | |
with gr.Row(): | |
excel_file = gr.File(label="Excel λ€μ΄λ‘λ") | |
# μ΄λ²€νΈ μ°κ²° | |
scrape_button.click(fn=fetch_blog_content, inputs=blog_url_input, outputs=blog_content_box) | |
analyze_button.click(fn=analysis_handler, inputs=[blog_content_box, remove_freq_checkbox, direct_keyword_box, direct_keyword_only_checkbox], | |
outputs=[result_df, excel_file]) | |
if __name__ == "__main__": | |
debug_log("Gradio μ± μ€ν μμ") | |
demo.launch() | |
debug_log("Gradio μ± μ€ν μ’ λ£") | |