File size: 5,710 Bytes
67096dc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import time
import hashlib
import hmac
import base64
import requests
import gradio as gr
import urllib.request
import urllib.parse
import json
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import os
import tempfile
from datetime import datetime
BASE_URL = "https://api.searchad.naver.com"
API_KEY = "010000000046604df3a0f6abf4c52824e0d5835c5cbeae279ced8b2bb9007b3cc566b190c7"
SECRET_KEY = "AQAAAABGYE3zoPar9MUoJODVg1xczNEcSuIBi66wWUy4p4gs/Q=="
CUSTOMER_ID = 2666992
class NaverAPI:
def __init__(self, base_url, api_key, secret_key, customer_id):
self.base_url = base_url
self.api_key = api_key
self.secret_key = secret_key
self.customer_id = customer_id
def generate_signature(self, timestamp, method, path):
sign = f"{timestamp}.{method}.{path}"
signature = hmac.new(self.secret_key.encode('utf-8'), sign.encode('utf-8'), hashlib.sha256).digest()
return base64.b64encode(signature).decode('utf-8')
def get_timestamp(self):
return str(int(time.time() * 1000))
def get_headers(self, method, uri):
timestamp = self.get_timestamp()
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'X-Timestamp': timestamp,
'X-API-KEY': self.api_key,
'X-Customer': str(self.customer_id),
'X-Signature': self.generate_signature(timestamp, method, uri),
}
return headers
def get_keywords_data(self, keywords):
uri = "/keywordstool"
method = "GET"
query = {
'hintKeywords': ','.join(keywords),
'showDetail': 1
}
headers = self.get_headers(method, uri)
response = requests.get(self.base_url + uri, headers=headers, params=query)
response.raise_for_status() # HTTP μ€λ₯ λ°μ μ μμΈ λ°μ
return response.json()
def get_blog_count(keyword):
client_id = "421ZKFMM5TS1xmvsF7C0"
client_secret = "h47UQHAOGV"
encText = urllib.parse.quote(keyword)
url = "https://openapi.naver.com/v1/search/blog?query=" + encText
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id", client_id)
request.add_header("X-Naver-Client-Secret", client_secret)
response = urllib.request.urlopen(request)
rescode = response.getcode()
if rescode == 200:
response_body = response.read()
data = json.loads(response_body.decode('utf-8'))
return data['total']
else:
return 0
def get_keywords_data_chunk(chunk):
api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID)
return api.get_keywords_data(chunk)
def get_blog_count_parallel(keyword):
return (keyword, get_blog_count(keyword))
def get_monthly_search_volumes(keywords):
all_data = []
chunk_size = 10 # ν€μλλ₯Ό 10κ°μ© λλμ΄ μμ²
# API λ³λ ¬ μμ²
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(get_keywords_data_chunk, keywords[i:i+chunk_size]) for i in range(0, len(keywords), chunk_size)]
for future in futures:
data = future.result()
if 'keywordList' in data:
all_data.extend(data['keywordList'])
if not all_data:
return [("Error", "No data returned or invalid response from API", "", "", "")] # λΈλ‘κ·Έ λ¬Έμ μ μΉΌλΌ μΆκ°
results = []
unique_keywords = set()
for item in all_data:
keyword = item['relKeyword']
if keyword not in unique_keywords:
unique_keywords.add(keyword)
monthly_pc = item['monthlyPcQcCnt']
monthly_mobile = item['monthlyMobileQcCnt']
if isinstance(monthly_pc, str):
monthly_pc = int(monthly_pc.replace(',', '').replace('< 10', '0'))
if isinstance(monthly_mobile, str):
monthly_mobile = int(monthly_mobile.replace(',', '').replace('< 10', '0'))
total_searches = monthly_pc + monthly_mobile
results.append((keyword, monthly_pc, monthly_mobile, total_searches))
if len(results) >= 100:
break
# λΈλ‘κ·Έ λ¬Έμ μ λ³λ ¬ μμ²
with ThreadPoolExecutor(max_workers=5) as executor:
blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results]
for i, future in enumerate(blog_futures):
keyword, blog_count = future.result()
results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], blog_count)
return results
def save_to_excel(results, keyword):
df = pd.DataFrame(results, columns=["ν€μλ", "PCμκ²μλ", "λͺ¨λ°μΌμκ²μλ", "ν νμκ²μλ", "λΈλ‘κ·Έλ¬Έμμ"])
now = datetime.now().strftime('%Y-%m-%d')
sanitized_keyword = keyword.replace(' ', '_')
filename = f"{now}_{sanitized_keyword}_μ°κ΄κ²μμ΄.xlsx"
file_path = os.path.join(tempfile.gettempdir(), filename)
df.to_excel(file_path, index=False)
return file_path
def display_search_volumes(keywords):
keyword_list = [keyword.strip() for keyword in keywords.split(',')]
results = get_monthly_search_volumes(keyword_list)
file_path = save_to_excel(results, keywords)
return results, file_path
iface = gr.Interface(
fn=display_search_volumes,
inputs=gr.Textbox(placeholder="ν€μλλ₯Ό μ
λ ₯νμΈμ"),
outputs=[
gr.Dataframe(headers=["ν€μλ", "PCμκ²μλ", "λͺ¨λ°μΌμκ²μλ", "ν νμκ²μλ", "λΈλ‘κ·Έλ¬Έμμ"]),
gr.File(label="λ€μ΄λ‘λ μμ
νμΌ")
],
title="λ€μ΄λ² μκ²μλ κ²μκΈ°",
)
iface.launch()
|