|
import gradio as gr |
|
import pandas as pd |
|
import re |
|
from io import BytesIO |
|
import tempfile |
|
import os |
|
import time |
|
import hmac |
|
import hashlib |
|
import base64 |
|
import requests |
|
|
|
|
|
def generate_signature(timestamp, method, uri, secret_key): |
|
message = f"{timestamp}.{method}.{uri}" |
|
digest = hmac.new(secret_key.encode("utf-8"), message.encode("utf-8"), hashlib.sha256).digest() |
|
return base64.b64encode(digest).decode() |
|
|
|
def get_header(method, uri, api_key, secret_key, customer_id): |
|
timestamp = str(round(time.time() * 1000)) |
|
signature = generate_signature(timestamp, method, uri, secret_key) |
|
return { |
|
"Content-Type": "application/json; charset=UTF-8", |
|
"X-Timestamp": timestamp, |
|
"X-API-KEY": api_key, |
|
"X-Customer": str(customer_id), |
|
"X-Signature": signature |
|
} |
|
|
|
|
|
def fetch_related_keywords(keyword): |
|
API_KEY = os.environ.get("NAVER_API_KEY") |
|
SECRET_KEY = os.environ.get("NAVER_SECRET_KEY") |
|
CUSTOMER_ID = os.environ.get("NAVER_CUSTOMER_ID") |
|
|
|
if not API_KEY or not SECRET_KEY or not CUSTOMER_ID: |
|
return pd.DataFrame() |
|
BASE_URL = "https://api.naver.com" |
|
uri = "/keywordstool" |
|
method = "GET" |
|
headers = get_header(method, uri, API_KEY, SECRET_KEY, CUSTOMER_ID) |
|
params = { |
|
"hintKeywords": [keyword], |
|
"showDetail": "1" |
|
} |
|
try: |
|
response = requests.get(BASE_URL + uri, params=params, headers=headers) |
|
data = response.json() |
|
except Exception as e: |
|
return pd.DataFrame() |
|
if "keywordList" not in data: |
|
return pd.DataFrame() |
|
df = pd.DataFrame(data["keywordList"]) |
|
if len(df) > 100: |
|
df = df.head(100) |
|
|
|
def parse_count(x): |
|
try: |
|
return int(str(x).replace(",", "")) |
|
except: |
|
return 0 |
|
|
|
df["PCμκ²μλ"] = df["monthlyPcQcCnt"].apply(parse_count) |
|
df["λͺ¨λ°μΌμκ²μλ"] = df["monthlyMobileQcCnt"].apply(parse_count) |
|
df["ν νμκ²μλ"] = df["PCμκ²μλ"] + df["λͺ¨λ°μΌμκ²μλ"] |
|
df.rename(columns={"relKeyword": "μ 보ν€μλ"}, inplace=True) |
|
result_df = df[["μ 보ν€μλ", "PCμκ²μλ", "λͺ¨λ°μΌμκ²μλ", "ν νμκ²μλ"]] |
|
return result_df |
|
|
|
|
|
def fetch_blog_count(keyword): |
|
client_id = os.environ.get("NAVER_SEARCH_CLIENT_ID") |
|
client_secret = os.environ.get("NAVER_SEARCH_CLIENT_SECRET") |
|
if not client_id or not client_secret: |
|
return 0 |
|
url = "https://openapi.naver.com/v1/search/blog.json" |
|
headers = { |
|
"X-Naver-Client-Id": client_id, |
|
"X-Naver-Client-Secret": client_secret |
|
} |
|
params = {"query": keyword, "display": 1} |
|
try: |
|
response = requests.get(url, headers=headers, params=params) |
|
if response.status_code == 200: |
|
data = response.json() |
|
return data.get("total", 0) |
|
else: |
|
return 0 |
|
except: |
|
return 0 |
|
|
|
def process_excel(file_bytes): |
|
""" |
|
μ
λ‘λλ μμ
νμΌμμ D4μ
λΆν° Dμ΄μ μνλͺ
μ μΆμΆνμ¬, |
|
κ° μ
μμ νΉμλ¬Έμλ₯Ό μ κ±°ν ν 곡백 κΈ°μ€μΌλ‘ ν€μλλ₯Ό μΆμΆν©λλ€. |
|
ν μ
λ΄μμ μ€λ³΅λ ν€μλλ ν λ²λ§ μΉ΄μ΄νΈνκ³ , μ 체 μ
μ λν΄ |
|
ν€μλμ λΉλλ₯Ό κ³μ°ν©λλ€. |
|
|
|
μ΄ν, κ° ν€μλμ λν΄ λ€μ΄λ² APIλ₯Ό νμ©νμ¬ |
|
- PCμκ²μλ, λͺ¨λ°μΌμκ²μλ, ν νμκ²μλ λ° |
|
- λ€μ΄λ² κ²μ APIλ₯Ό ν΅ν λΈλ‘κ·Έ λ¬Έμμλ₯Ό μ‘°ννμ¬ |
|
κ²°κ³Ό μμ
νμΌκ³Ό λ°μ΄ν°νλ μμΌλ‘ μΆλ ₯ν©λλ€. |
|
|
|
μ΅μ’
μμ
νμΌμ μ΄ κ΅¬μ±μ λ€μκ³Ό κ°μ΅λλ€. |
|
Aμ΄ : ν€μλ |
|
Bμ΄ : λΉλμ |
|
Cμ΄ : PCμκ²μλ |
|
Dμ΄ : λͺ¨λ°μΌμκ²μλ |
|
Eμ΄ : ν νμκ²μλ |
|
Fμ΄ : λΈλ‘κ·Έλ¬Έμμ |
|
|
|
μλ¬ λ°μ μ, μλ¬ λ©μμ§λ₯Ό ν
μ€νΈ νμΌκ³Ό λ°μ΄ν°νλ μ ννλ‘ λ°νν©λλ€. |
|
""" |
|
|
|
try: |
|
df = pd.read_excel(BytesIO(file_bytes), header=None, engine="openpyxl") |
|
except Exception as e: |
|
error_message = "μμ
νμΌμ μ½λ μ€ μ€λ₯κ° λ°μνμμ΅λλ€: " + str(e) |
|
temp_error = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="wb") |
|
temp_error.write(error_message.encode("utf-8")) |
|
temp_error.close() |
|
error_df = pd.DataFrame({"μλ¬": [error_message]}) |
|
return temp_error.name, error_df |
|
|
|
|
|
if df.shape[1] < 4 or df.shape[0] < 4: |
|
error_message = "μμ
νμΌμ νμμ΄ μ¬λ°λ₯΄μ§ μμ΅λλ€." |
|
temp_error = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="wb") |
|
temp_error.write(error_message.encode("utf-8")) |
|
temp_error.close() |
|
error_df = pd.DataFrame({"μλ¬": [error_message]}) |
|
return temp_error.name, error_df |
|
|
|
|
|
product_names_series = df.iloc[3:, 3] |
|
product_names_series = product_names_series.dropna() |
|
|
|
keyword_counts = {} |
|
for cell in product_names_series: |
|
if not isinstance(cell, str): |
|
cell = str(cell) |
|
cleaned = re.sub(r'[^0-9a-zA-Zκ°-ν£\s]', '', cell) |
|
keywords = cleaned.split() |
|
unique_keywords = set(keywords) |
|
for keyword in unique_keywords: |
|
keyword_counts[keyword] = keyword_counts.get(keyword, 0) + 1 |
|
|
|
sorted_keywords = sorted(keyword_counts.items(), key=lambda x: (-x[1], x[0])) |
|
|
|
|
|
result_data = [] |
|
for keyword, count in sorted_keywords: |
|
pc_search = 0 |
|
mobile_search = 0 |
|
total_search = 0 |
|
df_api = fetch_related_keywords(keyword) |
|
if not df_api.empty: |
|
row = df_api[df_api["μ 보ν€μλ"] == keyword] |
|
if row.empty: |
|
row = df_api.iloc[[0]] |
|
pc_search = int(row["PCμκ²μλ"].iloc[0]) |
|
mobile_search = int(row["λͺ¨λ°μΌμκ²μλ"].iloc[0]) |
|
total_search = int(row["ν νμκ²μλ"].iloc[0]) |
|
blog_count = fetch_blog_count(keyword) |
|
result_data.append({ |
|
"ν€μλ": keyword, |
|
"λΉλμ": count, |
|
"PCμκ²μλ": pc_search, |
|
"λͺ¨λ°μΌμκ²μλ": mobile_search, |
|
"ν νμκ²μλ": total_search, |
|
"λΈλ‘κ·Έλ¬Έμμ": blog_count |
|
}) |
|
result_df = pd.DataFrame(result_data) |
|
|
|
|
|
output = BytesIO() |
|
try: |
|
with pd.ExcelWriter(output, engine="openpyxl") as writer: |
|
result_df.to_excel(writer, index=False, startrow=1, header=False) |
|
worksheet = writer.sheets["Sheet1"] |
|
worksheet.cell(row=1, column=1, value="ν€μλ") |
|
worksheet.cell(row=1, column=2, value="λΉλμ") |
|
worksheet.cell(row=1, column=3, value="PCμκ²μλ") |
|
worksheet.cell(row=1, column=4, value="λͺ¨λ°μΌμκ²μλ") |
|
worksheet.cell(row=1, column=5, value="ν νμκ²μλ") |
|
worksheet.cell(row=1, column=6, value="λΈλ‘κ·Έλ¬Έμμ") |
|
output.seek(0) |
|
except Exception as e: |
|
error_message = "μμ
νμΌμ μμ±νλ μ€ μ€λ₯κ° λ°μνμμ΅λλ€: " + str(e) |
|
temp_error = tempfile.NamedTemporaryFile(delete=False, suffix=".txt", mode="wb") |
|
temp_error.write(error_message.encode("utf-8")) |
|
temp_error.close() |
|
error_df = pd.DataFrame({"μλ¬": [error_message]}) |
|
return temp_error.name, error_df |
|
|
|
temp_excel = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx", mode="wb") |
|
temp_excel.write(output.getvalue()) |
|
temp_excel.close() |
|
|
|
return temp_excel.name, result_df |
|
|
|
iface = gr.Interface( |
|
fn=process_excel, |
|
inputs=gr.File(label="μμ
νμΌ μ
λ‘λ", type="binary"), |
|
outputs=[ |
|
gr.File(label="κ²°κ³Ό μμ
νμΌ"), |
|
gr.DataFrame(label="ν€μλ λΆμ ν") |
|
], |
|
title="μμ
μνλͺ
ν€μλ μΆμΆ λ° κ²μλ/λΈλ‘κ·Έ λ¬Έμμ μ‘°ν", |
|
description=( |
|
"μμ
νμΌμ D4μ
λΆν° Dμ΄μ μλ μνλͺ
λ°μ΄ν°λ₯Ό λΆμνμ¬, " |
|
"νΉμλ¬Έμλ₯Ό μ κ±°ν ν 곡백 κΈ°μ€μΌλ‘ ν€μλλ₯Ό μΆμΆν©λλ€. " |
|
"κ° ν€μλμ λν΄ λ€μ΄λ² APIλ₯Ό νμ©νμ¬ PC/λͺ¨λ°μΌ/ν ν μ κ²μλκ³Ό " |
|
"λ€μ΄λ² λΈλ‘κ·Έ λ¬Έμμλ₯Ό μ‘°νν κ²°κ³Όλ₯Ό μμ
νμΌκ³Ό ν(λ°μ΄ν°νλ μ)λ‘ μΆλ ₯ν©λλ€." |
|
) |
|
) |
|
|
|
iface.launch() |