|
import time |
|
import hashlib |
|
import hmac |
|
import base64 |
|
import requests |
|
import gradio as gr |
|
import urllib.request |
|
import urllib.parse |
|
import json |
|
import pandas as pd |
|
from concurrent.futures import ThreadPoolExecutor |
|
import os |
|
import tempfile |
|
from datetime import datetime |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
BASE_URL = "https://api.searchad.naver.com" |
|
API_KEY = os.environ.get("NAVER_API_KEY") |
|
SECRET_KEY = os.environ.get("NAVER_SECRET_KEY") |
|
CUSTOMER_ID = 2666992 |
|
|
|
|
|
CLIENT_ID = os.environ.get("NAVER_CLIENT_ID") |
|
CLIENT_SECRET = os.environ.get("NAVER_CLIENT_SECRET") |
|
|
|
|
|
if not API_KEY or not SECRET_KEY or not CLIENT_ID or not CLIENT_SECRET: |
|
raise ValueError("νμ νκ²½ λ³μκ° μ€μ λμ§ μμμ΅λλ€. .env νμΌμ νμΈνμΈμ.") |
|
else: |
|
print("νκ²½ λ³μκ° μ μμ μΌλ‘ λ‘λλμμ΅λλ€.") |
|
|
|
class NaverAPI: |
|
def __init__(self, base_url, api_key, secret_key, customer_id): |
|
self.base_url = base_url |
|
self.api_key = api_key |
|
self.secret_key = secret_key |
|
self.customer_id = customer_id |
|
|
|
def generate_signature(self, timestamp, method, path): |
|
sign = f"{timestamp}.{method}.{path}" |
|
signature = hmac.new(self.secret_key.encode('utf-8'), sign.encode('utf-8'), hashlib.sha256).digest() |
|
return base64.b64encode(signature).decode('utf-8') |
|
|
|
def get_timestamp(self): |
|
return str(int(time.time() * 1000)) |
|
|
|
def get_headers(self, method, uri): |
|
timestamp = self.get_timestamp() |
|
headers = { |
|
'Content-Type': 'application/json; charset=UTF-8', |
|
'X-Timestamp': timestamp, |
|
'X-API-KEY': self.api_key, |
|
'X-Customer': str(self.customer_id), |
|
'X-Signature': self.generate_signature(timestamp, method, uri), |
|
} |
|
return headers |
|
|
|
def get_keywords_data(self, keywords): |
|
uri = "/keywordstool" |
|
method = "GET" |
|
query = { |
|
'hintKeywords': ','.join(keywords), |
|
'showDetail': 1 |
|
} |
|
headers = self.get_headers(method, uri) |
|
response = requests.get(self.base_url + uri, headers=headers, params=query) |
|
response.raise_for_status() |
|
return response.json() |
|
|
|
def get_blog_count(keyword): |
|
|
|
client_id = CLIENT_ID |
|
client_secret = CLIENT_SECRET |
|
|
|
|
|
if isinstance(keyword, bytes): |
|
keyword = keyword.decode('utf-8') |
|
elif not isinstance(keyword, str): |
|
keyword = str(keyword) |
|
|
|
encText = urllib.parse.quote(keyword) |
|
url = "https://openapi.naver.com/v1/search/blog?query=" + encText |
|
request = urllib.request.Request(url) |
|
request.add_header("X-Naver-Client-Id", client_id) |
|
request.add_header("X-Naver-Client-Secret", client_secret) |
|
try: |
|
response = urllib.request.urlopen(request) |
|
rescode = response.getcode() |
|
if rescode == 200: |
|
response_body = response.read() |
|
data = json.loads(response_body.decode('utf-8')) |
|
return data.get('total', 0) |
|
else: |
|
return 0 |
|
except Exception as e: |
|
print(f"Error fetching blog count for keyword '{keyword}': {e}") |
|
return 0 |
|
|
|
def get_keywords_data_chunk(chunk): |
|
api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID) |
|
return api.get_keywords_data(chunk) |
|
|
|
def get_blog_count_parallel(keyword): |
|
return (keyword, get_blog_count(keyword)) |
|
|
|
def get_search_volumes(keyword): |
|
""" |
|
λ¨μΌ ν€μλμ μ κ²μλμ κ°μ Έμ€λ ν¨μ. |
|
""" |
|
api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID) |
|
try: |
|
data = api.get_keywords_data([keyword]) |
|
if 'keywordList' in data and len(data['keywordList']) > 0: |
|
|
|
for item in data['keywordList']: |
|
if item['relKeyword'].strip().lower() == keyword.strip().lower(): |
|
monthly_pc = item.get('monthlyPcQcCnt', 0) |
|
monthly_mobile = item.get('monthlyMobileQcCnt', 0) |
|
|
|
if isinstance(monthly_pc, str): |
|
monthly_pc = monthly_pc.replace(',', '').replace('< 10', '0') |
|
try: |
|
monthly_pc = int(monthly_pc) |
|
except ValueError: |
|
monthly_pc = 0 |
|
if isinstance(monthly_mobile, str): |
|
monthly_mobile = monthly_mobile.replace(',', '').replace('< 10', '0') |
|
try: |
|
monthly_mobile = int(monthly_mobile) |
|
except ValueError: |
|
monthly_mobile = 0 |
|
|
|
total_searches = monthly_pc + monthly_mobile |
|
return (keyword, monthly_pc, monthly_mobile, total_searches) |
|
|
|
return (keyword, 0, 0, 0) |
|
else: |
|
return (keyword, 0, 0, 0) |
|
except Exception as e: |
|
print(f"Error fetching search volumes for keyword '{keyword}': {e}") |
|
return (keyword, 0, 0, 0) |
|
|
|
def get_monthly_search_volumes(keywords, include_related_keywords=True): |
|
all_data = [] |
|
results = [] |
|
|
|
if include_related_keywords: |
|
chunk_size = 10 |
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
futures = [executor.submit(get_keywords_data_chunk, keywords[i:i+chunk_size]) for i in range(0, len(keywords), chunk_size)] |
|
for future in futures: |
|
try: |
|
data = future.result() |
|
if 'keywordList' in data: |
|
all_data.extend(data['keywordList']) |
|
except Exception as e: |
|
print(f"Error fetching keywords data chunk: {e}") |
|
|
|
if not all_data: |
|
return [("Error", "λ°μ΄ν°κ° λ°νλμ§ μμκ±°λ API μλ΅μ΄ μ ν¨νμ§ μμ΅λλ€.", "", "", "")] |
|
|
|
unique_keywords = set() |
|
for item in all_data: |
|
keyword = item['relKeyword'] |
|
if keyword not in unique_keywords: |
|
unique_keywords.add(keyword) |
|
monthly_pc = item.get('monthlyPcQcCnt', 0) |
|
monthly_mobile = item.get('monthlyMobileQcCnt', 0) |
|
|
|
if isinstance(monthly_pc, str): |
|
monthly_pc = monthly_pc.replace(',', '').replace('< 10', '0') |
|
try: |
|
monthly_pc = int(monthly_pc) |
|
except ValueError: |
|
monthly_pc = 0 |
|
if isinstance(monthly_mobile, str): |
|
monthly_mobile = monthly_mobile.replace(',', '').replace('< 10', '0') |
|
try: |
|
monthly_mobile = int(monthly_mobile) |
|
except ValueError: |
|
monthly_mobile = 0 |
|
|
|
total_searches = monthly_pc + monthly_mobile |
|
results.append((keyword, monthly_pc, monthly_mobile, total_searches)) |
|
|
|
if len(results) >= 100: |
|
break |
|
|
|
else: |
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
futures = [executor.submit(get_search_volumes, keyword) for keyword in keywords] |
|
for future in futures: |
|
try: |
|
result = future.result() |
|
results.append(result) |
|
except Exception as e: |
|
print(f"Error fetching search volumes for keyword '{keyword}': {e}") |
|
results.append((keyword, 0, 0, 0)) |
|
|
|
if not results: |
|
return [("Error", "λ°μ΄ν°κ° λ°νλμ§ μμκ±°λ API μλ΅μ΄ μ ν¨νμ§ μμ΅λλ€.", "", "", "")] |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
if include_related_keywords: |
|
blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results] |
|
for i, future in enumerate(blog_futures): |
|
try: |
|
keyword, blog_count = future.result() |
|
results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], blog_count) |
|
except Exception as e: |
|
print(f"Error fetching blog count for keyword '{results[i][0]}': {e}") |
|
results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], "Error") |
|
else: |
|
blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results] |
|
temp_results = [] |
|
for future in blog_futures: |
|
try: |
|
keyword, blog_count = future.result() |
|
temp_results.append((keyword, results[0][1], results[0][2], results[0][3], blog_count)) |
|
except Exception as e: |
|
print(f"Error fetching blog count for keyword '{keyword}': {e}") |
|
temp_results.append((keyword, results[0][1], results[0][2], results[0][3], "Error")) |
|
results = temp_results |
|
|
|
return results |
|
|
|
def save_to_excel(results, keyword): |
|
df = pd.DataFrame(results, columns=["ν€μλ", "PCμκ²μλ", "λͺ¨λ°μΌμκ²μλ", "ν νμκ²μλ", "λΈλ‘κ·Έλ¬Έμμ"]) |
|
now = datetime.now().strftime('%Y-%m-%d') |
|
sanitized_keyword = keyword.replace(' ', '_') |
|
filename = f"{now}_{sanitized_keyword}_μ°κ΄κ²μμ΄.xlsx" |
|
file_path = os.path.join(tempfile.gettempdir(), filename) |
|
df.to_excel(file_path, index=False) |
|
return file_path |
|
|
|
def display_search_volumes(keywords, include_related): |
|
keyword_list = [keyword.strip() for keyword in keywords.split(',') if keyword.strip()] |
|
if not keyword_list: |
|
return [("Error", "μ
λ ₯λ ν€μλκ° μμ΅λλ€.", "", "", "")], None |
|
results = get_monthly_search_volumes(keyword_list, include_related_keywords=include_related) |
|
file_path = save_to_excel(results, keywords) |
|
return results, file_path |
|
|
|
iface = gr.Interface( |
|
fn=display_search_volumes, |
|
inputs=[ |
|
gr.Textbox(placeholder="ν€μλλ₯Ό μ
λ ₯νμΈμ (μΌνλ‘ κ΅¬λΆ)", lines=2), |
|
gr.Checkbox(label="μ°κ΄κ²μμ΄ ν¬ν¨", value=True) |
|
], |
|
outputs=[ |
|
gr.Dataframe(headers=["ν€μλ", "PCμκ²μλ", "λͺ¨λ°μΌμκ²μλ", "ν νμκ²μλ", "λΈλ‘κ·Έλ¬Έμμ"]), |
|
gr.File(label="λ€μ΄λ‘λ μμ
νμΌ") |
|
], |
|
title="λ€μ΄λ² μκ²μλ κ²μκΈ°", |
|
description="ν€μλμ μ κ²μλκ³Ό λΈλ‘κ·Έ λ¬Έμ μλ₯Ό νμΈν μ μμ΅λλ€. μ°κ΄κ²μμ΄λ₯Ό ν¬ν¨ν μ§ μ ννμΈμ.", |
|
) |
|
|
|
iface.launch(share=True) |
|
|