na_ver-1 / app.py
Kims12's picture
Update app.py
08b8a4f verified
import gradio as gr
import pandas as pd
import re
from io import BytesIO
import tempfile
import os
import time
import hmac
import hashlib
import base64
import requests
# --- 넀이버 κ΄‘κ³  API: μ„œλͺ… 생성 및 헀더 ꡬ성 ---
def generate_signature(timestamp, method, uri, secret_key):
message = f"{timestamp}.{method}.{uri}"
digest = hmac.new(secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).digest()
return base64.b64encode(digest).decode()
def get_header(method, uri, api_key, secret_key, customer_id):
timestamp = str(round(time.time() * 1000))
signature = generate_signature(timestamp, method, uri, secret_key)
return {
"Content-Type": "application/json; charset=UTF-8",
"X-Timestamp": timestamp,
"X-API-KEY": api_key,
"X-Customer": str(customer_id),
"X-Signature": signature
}
# --- 넀이버 κ΄‘κ³  API: κ²€μƒ‰λŸ‰ 쑰회 (연관검색어 μ œμ™Έ) ---
def fetch_related_keywords(keyword):
API_KEY = os.environ.get("NAVER_API_KEY")
SECRET_KEY = os.environ.get("NAVER_SECRET_KEY")
CUSTOMER_ID = os.environ.get("NAVER_CUSTOMER_ID")
if not API_KEY or not SECRET_KEY or not CUSTOMER_ID:
return pd.DataFrame()
BASE_URL = "https://api.naver.com"
uri = "/keywordstool"
method = "GET"
headers = get_header(method, uri, API_KEY, SECRET_KEY, CUSTOMER_ID)
params = {
"hintKeywords": [keyword],
"showDetail": "1"
}
try:
response = requests.get(BASE_URL + uri, params=params, headers=headers)
data = response.json()
except Exception as e:
return pd.DataFrame()
if "keywordList" not in data:
return pd.DataFrame()
df = pd.DataFrame(data["keywordList"])
if len(df) > 100:
df = df.head(100)
def parse_count(x):
try:
return int(str(x).replace(",", ""))
except:
return 0
df["PCμ›”κ²€μƒ‰λŸ‰"] = df["monthlyPcQcCnt"].apply(parse_count)
df["λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰"] = df["monthlyMobileQcCnt"].apply(parse_count)
df["ν† νƒˆμ›”κ²€μƒ‰λŸ‰"] = df["PCμ›”κ²€μƒ‰λŸ‰"] + df["λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰"]
df.rename(columns={"relKeyword": "μ •λ³΄ν‚€μ›Œλ“œ"}, inplace=True)
result_df = df[["μ •λ³΄ν‚€μ›Œλ“œ", "PCμ›”κ²€μƒ‰λŸ‰", "λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰", "ν† νƒˆμ›”κ²€μƒ‰λŸ‰"]]
return result_df
# --- 넀이버 검색 API: λΈ”λ‘œκ·Έ λ¬Έμ„œμˆ˜ 쑰회 ---
def fetch_blog_count(keyword):
client_id = os.environ.get("NAVER_SEARCH_CLIENT_ID")
client_secret = os.environ.get("NAVER_SEARCH_CLIENT_SECRET")
if not client_id or not client_secret:
return 0
url = "https://openapi.naver.com/v1/search/blog.json"
headers = {
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret
}
params = {"query": keyword, "display": 1}
try:
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
return data.get("total", 0)
else:
return 0
except:
return 0
def process_excel(file_bytes):
"""
μ—…λ‘œλ“œλœ μ—‘μ…€ νŒŒμΌμ—μ„œ D4μ…€λΆ€ν„° Dμ—΄μ˜ μƒν’ˆλͺ…을 μΆ”μΆœν•˜μ—¬,
각 μ…€μ—μ„œ 특수문자λ₯Ό μ œκ±°ν•œ ν›„ 곡백 κΈ°μ€€μœΌλ‘œ ν‚€μ›Œλ“œλ₯Ό μΆ”μΆœν•©λ‹ˆλ‹€.
ν•œ μ…€ λ‚΄μ—μ„œ μ€‘λ³΅λœ ν‚€μ›Œλ“œλŠ” ν•œ 번만 μΉ΄μš΄νŠΈν•˜κ³ , 전체 셀에 λŒ€ν•΄
ν‚€μ›Œλ“œμ˜ λΉˆλ„λ₯Ό κ³„μ‚°ν•©λ‹ˆλ‹€.
이후, 각 ν‚€μ›Œλ“œμ— λŒ€ν•΄ 넀이버 APIλ₯Ό ν™œμš©ν•˜μ—¬
- PCμ›”κ²€μƒ‰λŸ‰, λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰, ν† νƒˆμ›”κ²€μƒ‰λŸ‰ 및
- 넀이버 검색 APIλ₯Ό ν†΅ν•œ λΈ”λ‘œκ·Έ λ¬Έμ„œμˆ˜λ₯Ό μ‘°νšŒν•˜μ—¬
κ²°κ³Ό μ—‘μ…€ 파일과 λ°μ΄ν„°ν”„λ ˆμž„μœΌλ‘œ 좜λ ₯ν•©λ‹ˆλ‹€.
μ΅œμ’… μ—‘μ…€ 파일의 μ—΄ ꡬ성은 λ‹€μŒκ³Ό κ°™μŠ΅λ‹ˆλ‹€.
Aμ—΄ : ν‚€μ›Œλ“œ
Bμ—΄ : λΉˆλ„μˆ˜
Cμ—΄ : PCμ›”κ²€μƒ‰λŸ‰
Dμ—΄ : λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰
Eμ—΄ : ν† νƒˆμ›”κ²€μƒ‰λŸ‰
Fμ—΄ : λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜
μ—λŸ¬ λ°œμƒ μ‹œ, μ—λŸ¬ λ©”μ‹œμ§€λ₯Ό ν…μŠ€νŠΈ 파일과 λ°μ΄ν„°ν”„λ ˆμž„ ν˜•νƒœλ‘œ λ°˜ν™˜ν•©λ‹ˆλ‹€.
"""
# μ—‘μ…€ 파일 읽기
try:
df = pd.read_excel(BytesIO(file_bytes), header=None, engine="openpyxl")
except Exception as e:
error_message = "μ—‘μ…€ νŒŒμΌμ„ μ½λŠ” 쀑 였λ₯˜κ°€ λ°œμƒν•˜μ˜€μŠ΅λ‹ˆλ‹€: " + str(e)
temp_error = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="wb")
temp_error.write(error_message.encode("utf-8"))
temp_error.close()
error_df = pd.DataFrame({"μ—λŸ¬": [error_message]})
return temp_error.name, error_df
# μ—‘μ…€ 파일 ν˜•μ‹ 체크 (μ΅œμ†Œ 4μ—΄, μ΅œμ†Œ 4ν–‰)
if df.shape[1] < 4 or df.shape[0] < 4:
error_message = "μ—‘μ…€ 파일의 ν˜•μ‹μ΄ μ˜¬λ°”λ₯΄μ§€ μ•ŠμŠ΅λ‹ˆλ‹€."
temp_error = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="wb")
temp_error.write(error_message.encode("utf-8"))
temp_error.close()
error_df = pd.DataFrame({"μ—λŸ¬": [error_message]})
return temp_error.name, error_df
# Dμ—΄(4번째 μ—΄, 인덱슀 3)μ—μ„œ 4ν–‰(인덱슀 3)λΆ€ν„° 데이터λ₯Ό κ°€μ Έμ˜΄
product_names_series = df.iloc[3:, 3]
product_names_series = product_names_series.dropna()
keyword_counts = {}
for cell in product_names_series:
if not isinstance(cell, str):
cell = str(cell)
cleaned = re.sub(r'[^0-9a-zA-Zκ°€-힣\s]', '', cell)
keywords = cleaned.split()
unique_keywords = set(keywords)
for keyword in unique_keywords:
keyword_counts[keyword] = keyword_counts.get(keyword, 0) + 1
sorted_keywords = sorted(keyword_counts.items(), key=lambda x: (-x[1], x[0]))
# 각 ν‚€μ›Œλ“œμ— λŒ€ν•΄ 넀이버 APIλ₯Ό ν™œμš©ν•˜μ—¬ κ²€μƒ‰λŸ‰ 및 λΈ”λ‘œκ·Έ λ¬Έμ„œμˆ˜ 쑰회
result_data = []
for keyword, count in sorted_keywords:
pc_search = 0
mobile_search = 0
total_search = 0
df_api = fetch_related_keywords(keyword)
if not df_api.empty:
row = df_api[df_api["μ •λ³΄ν‚€μ›Œλ“œ"] == keyword]
if row.empty:
row = df_api.iloc[[0]]
pc_search = int(row["PCμ›”κ²€μƒ‰λŸ‰"].iloc[0])
mobile_search = int(row["λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰"].iloc[0])
total_search = int(row["ν† νƒˆμ›”κ²€μƒ‰λŸ‰"].iloc[0])
blog_count = fetch_blog_count(keyword)
result_data.append({
"ν‚€μ›Œλ“œ": keyword,
"λΉˆλ„μˆ˜": count,
"PCμ›”κ²€μƒ‰λŸ‰": pc_search,
"λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰": mobile_search,
"ν† νƒˆμ›”κ²€μƒ‰λŸ‰": total_search,
"λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜": blog_count
})
result_df = pd.DataFrame(result_data)
# κ²°κ³Ό μ—‘μ…€ 파일 생성 (헀더: Aμ—΄λΆ€ν„° Fμ—΄κΉŒμ§€)
output = BytesIO()
try:
with pd.ExcelWriter(output, engine="openpyxl") as writer:
result_df.to_excel(writer, index=False, startrow=1, header=False)
worksheet = writer.sheets["Sheet1"]
worksheet.cell(row=1, column=1, value="ν‚€μ›Œλ“œ")
worksheet.cell(row=1, column=2, value="λΉˆλ„μˆ˜")
worksheet.cell(row=1, column=3, value="PCμ›”κ²€μƒ‰λŸ‰")
worksheet.cell(row=1, column=4, value="λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰")
worksheet.cell(row=1, column=5, value="ν† νƒˆμ›”κ²€μƒ‰λŸ‰")
worksheet.cell(row=1, column=6, value="λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜")
output.seek(0)
except Exception as e:
error_message = "μ—‘μ…€ νŒŒμΌμ„ μƒμ„±ν•˜λŠ” 쀑 였λ₯˜κ°€ λ°œμƒν•˜μ˜€μŠ΅λ‹ˆλ‹€: " + str(e)
temp_error = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="wb")
temp_error.write(error_message.encode("utf-8"))
temp_error.close()
error_df = pd.DataFrame({"μ—λŸ¬": [error_message]})
return temp_error.name, error_df
temp_excel = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx", mode="wb")
temp_excel.write(output.getvalue())
temp_excel.close()
return temp_excel.name, result_df
iface = gr.Interface(
fn=process_excel,
inputs=gr.File(label="μ—‘μ…€ 파일 μ—…λ‘œλ“œ", type="binary"),
outputs=[
gr.File(label="κ²°κ³Ό μ—‘μ…€ 파일"),
gr.DataFrame(label="ν‚€μ›Œλ“œ 뢄석 ν‘œ")
],
title="μ—‘μ…€ μƒν’ˆλͺ… ν‚€μ›Œλ“œ μΆ”μΆœ 및 κ²€μƒ‰λŸ‰/λΈ”λ‘œκ·Έ λ¬Έμ„œμˆ˜ 쑰회",
description=(
"μ—‘μ…€ 파일의 D4μ…€λΆ€ν„° D열에 μžˆλŠ” μƒν’ˆλͺ… 데이터λ₯Ό λΆ„μ„ν•˜μ—¬, "
"특수문자λ₯Ό μ œκ±°ν•œ ν›„ 곡백 κΈ°μ€€μœΌλ‘œ ν‚€μ›Œλ“œλ₯Ό μΆ”μΆœν•©λ‹ˆλ‹€. "
"각 ν‚€μ›Œλ“œμ— λŒ€ν•΄ 넀이버 APIλ₯Ό ν™œμš©ν•˜μ—¬ PC/λͺ¨λ°”일/ν† νƒˆ μ›” κ²€μƒ‰λŸ‰κ³Ό "
"넀이버 λΈ”λ‘œκ·Έ λ¬Έμ„œμˆ˜λ₯Ό μ‘°νšŒν•œ κ²°κ³Όλ₯Ό μ—‘μ…€ 파일과 ν‘œ(λ°μ΄ν„°ν”„λ ˆμž„)둜 좜λ ₯ν•©λ‹ˆλ‹€."
)
)
iface.launch()