|
import time |
|
import hashlib |
|
import hmac |
|
import base64 |
|
import requests |
|
import gradio as gr |
|
import urllib.request |
|
import urllib.parse |
|
import json |
|
import pandas as pd |
|
from concurrent.futures import ThreadPoolExecutor |
|
import os |
|
import tempfile |
|
from datetime import datetime |
|
|
|
BASE_URL = "https://api.searchad.naver.com" |
|
API_KEY = "010000000046604df3a0f6abf4c52824e0d5835c5cbeae279ced8b2bb9007b3cc566b190c7" |
|
SECRET_KEY = "AQAAAABGYE3zoPar9MUoJODVg1xczNEcSuIBi66wWUy4p4gs/Q==" |
|
CUSTOMER_ID = 2666992 |
|
|
|
class NaverAPI: |
|
def __init__(self, base_url, api_key, secret_key, customer_id): |
|
self.base_url = base_url |
|
self.api_key = api_key |
|
self.secret_key = secret_key |
|
self.customer_id = customer_id |
|
|
|
def generate_signature(self, timestamp, method, path): |
|
sign = f"{timestamp}.{method}.{path}" |
|
signature = hmac.new(self.secret_key.encode('utf-8'), sign.encode('utf-8'), hashlib.sha256).digest() |
|
return base64.b64encode(signature).decode('utf-8') |
|
|
|
def get_timestamp(self): |
|
return str(int(time.time() * 1000)) |
|
|
|
def get_headers(self, method, uri): |
|
timestamp = self.get_timestamp() |
|
headers = { |
|
'Content-Type': 'application/json; charset=UTF-8', |
|
'X-Timestamp': timestamp, |
|
'X-API-KEY': self.api_key, |
|
'X-Customer': str(self.customer_id), |
|
'X-Signature': self.generate_signature(timestamp, method, uri), |
|
} |
|
return headers |
|
|
|
def get_keywords_data(self, keywords): |
|
uri = "/keywordstool" |
|
method = "GET" |
|
query = { |
|
'hintKeywords': ','.join(keywords), |
|
'showDetail': 1 |
|
} |
|
headers = self.get_headers(method, uri) |
|
response = requests.get(self.base_url + uri, headers=headers, params=query) |
|
response.raise_for_status() |
|
return response.json() |
|
|
|
def get_blog_count(keyword): |
|
client_id = "421ZKFMM5TS1xmvsF7C0" |
|
client_secret = "h47UQHAOGV" |
|
encText = urllib.parse.quote(keyword) |
|
url = "https://openapi.naver.com/v1/search/blog?query=" + encText |
|
request = urllib.request.Request(url) |
|
request.add_header("X-Naver-Client-Id", client_id) |
|
request.add_header("X-Naver-Client-Secret", client_secret) |
|
response = urllib.request.urlopen(request) |
|
rescode = response.getcode() |
|
if rescode == 200: |
|
response_body = response.read() |
|
data = json.loads(response_body.decode('utf-8')) |
|
return data['total'] |
|
else: |
|
return 0 |
|
|
|
def get_keywords_data_chunk(chunk): |
|
api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID) |
|
return api.get_keywords_data(chunk) |
|
|
|
def get_blog_count_parallel(keyword): |
|
return (keyword, get_blog_count(keyword)) |
|
|
|
def get_monthly_search_volumes(keywords): |
|
all_data = [] |
|
chunk_size = 10 |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
futures = [executor.submit(get_keywords_data_chunk, keywords[i:i+chunk_size]) for i in range(0, len(keywords), chunk_size)] |
|
for future in futures: |
|
data = future.result() |
|
if 'keywordList' in data: |
|
all_data.extend(data['keywordList']) |
|
|
|
if not all_data: |
|
return [("Error", "No data returned or invalid response from API", "", "", "")] |
|
|
|
results = [] |
|
unique_keywords = set() |
|
for item in all_data: |
|
keyword = item['relKeyword'] |
|
if keyword not in unique_keywords: |
|
unique_keywords.add(keyword) |
|
monthly_pc = item['monthlyPcQcCnt'] |
|
monthly_mobile = item['monthlyMobileQcCnt'] |
|
|
|
if isinstance(monthly_pc, str): |
|
monthly_pc = int(monthly_pc.replace(',', '').replace('< 10', '0')) |
|
if isinstance(monthly_mobile, str): |
|
monthly_mobile = int(monthly_mobile.replace(',', '').replace('< 10', '0')) |
|
|
|
total_searches = monthly_pc + monthly_mobile |
|
results.append((keyword, monthly_pc, monthly_mobile, total_searches)) |
|
|
|
if len(results) >= 100: |
|
break |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor: |
|
blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results] |
|
for i, future in enumerate(blog_futures): |
|
keyword, blog_count = future.result() |
|
results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], blog_count) |
|
|
|
return results |
|
|
|
def save_to_excel(results, keyword): |
|
df = pd.DataFrame(results, columns=["ํค์๋", "PC์๊ฒ์๋", "๋ชจ๋ฐ์ผ์๊ฒ์๋", "ํ ํ์๊ฒ์๋", "๋ธ๋ก๊ทธ๋ฌธ์์"]) |
|
now = datetime.now().strftime('%Y-%m-%d') |
|
sanitized_keyword = keyword.replace(' ', '_') |
|
filename = f"{now}_{sanitized_keyword}_์ฐ๊ด๊ฒ์์ด.xlsx" |
|
file_path = os.path.join(tempfile.gettempdir(), filename) |
|
df.to_excel(file_path, index=False) |
|
return file_path |
|
|
|
def display_search_volumes(keywords): |
|
keyword_list = [keyword.strip() for keyword in keywords.split(',')] |
|
results = get_monthly_search_volumes(keyword_list) |
|
file_path = save_to_excel(results, keywords) |
|
return results, file_path |
|
|
|
iface = gr.Interface( |
|
fn=display_search_volumes, |
|
inputs=gr.Textbox(placeholder="ํค์๋๋ฅผ ์
๋ ฅํ์ธ์"), |
|
outputs=[ |
|
gr.Dataframe(headers=["ํค์๋", "PC์๊ฒ์๋", "๋ชจ๋ฐ์ผ์๊ฒ์๋", "ํ ํ์๊ฒ์๋", "๋ธ๋ก๊ทธ๋ฌธ์์"]), |
|
gr.File(label="๋ค์ด๋ก๋ ์์
ํ์ผ") |
|
], |
|
title="๋ค์ด๋ฒ ์๊ฒ์๋ ๊ฒ์๊ธฐ", |
|
) |
|
|
|
iface.launch() |
|
|