import time import hashlib import hmac import base64 import requests import gradio as gr import urllib.request import urllib.parse import json import pandas as pd from concurrent.futures import ThreadPoolExecutor import os import tempfile from datetime import datetime BASE_URL = "https://api.searchad.naver.com" API_KEY = "010000000046604df3a0f6abf4c52824e0d5835c5cbeae279ced8b2bb9007b3cc566b190c7" SECRET_KEY = "AQAAAABGYE3zoPar9MUoJODVg1xczNEcSuIBi66wWUy4p4gs/Q==" CUSTOMER_ID = 2666992 class NaverAPI: def __init__(self, base_url, api_key, secret_key, customer_id): self.base_url = base_url self.api_key = api_key self.secret_key = secret_key self.customer_id = customer_id def generate_signature(self, timestamp, method, path): sign = f"{timestamp}.{method}.{path}" signature = hmac.new(self.secret_key.encode('utf-8'), sign.encode('utf-8'), hashlib.sha256).digest() return base64.b64encode(signature).decode('utf-8') def get_timestamp(self): return str(int(time.time() * 1000)) def get_headers(self, method, uri): timestamp = self.get_timestamp() headers = { 'Content-Type': 'application/json; charset=UTF-8', 'X-Timestamp': timestamp, 'X-API-KEY': self.api_key, 'X-Customer': str(self.customer_id), 'X-Signature': self.generate_signature(timestamp, method, uri), } return headers def get_keywords_data(self, keywords): uri = "/keywordstool" method = "GET" query = { 'hintKeywords': ','.join(keywords), 'showDetail': 1 } headers = self.get_headers(method, uri) response = requests.get(self.base_url + uri, headers=headers, params=query) response.raise_for_status() # HTTP 오류 발생 시 예외 발생 return response.json() def get_blog_count(keyword): client_id = "421ZKFMM5TS1xmvsF7C0" client_secret = "h47UQHAOGV" encText = urllib.parse.quote(keyword) url = "https://openapi.naver.com/v1/search/blog?query=" + encText request = urllib.request.Request(url) request.add_header("X-Naver-Client-Id", client_id) request.add_header("X-Naver-Client-Secret", client_secret) response = urllib.request.urlopen(request) rescode = response.getcode() if rescode == 200: response_body = response.read() data = json.loads(response_body.decode('utf-8')) return data['total'] else: return 0 def get_keywords_data_chunk(chunk): api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID) return api.get_keywords_data(chunk) def get_blog_count_parallel(keyword): return (keyword, get_blog_count(keyword)) def get_monthly_search_volumes(keywords): all_data = [] chunk_size = 10 # 키워드를 10개씩 나누어 요청 # API 병렬 요청 with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(get_keywords_data_chunk, keywords[i:i+chunk_size]) for i in range(0, len(keywords), chunk_size)] for future in futures: data = future.result() if 'keywordList' in data: all_data.extend(data['keywordList']) if not all_data: return [("Error", "No data returned or invalid response from API", "", "", "")] # 블로그 문서 수 칼럼 추가 results = [] unique_keywords = set() for item in all_data: keyword = item['relKeyword'] if keyword not in unique_keywords: unique_keywords.add(keyword) monthly_pc = item['monthlyPcQcCnt'] monthly_mobile = item['monthlyMobileQcCnt'] if isinstance(monthly_pc, str): monthly_pc = int(monthly_pc.replace(',', '').replace('< 10', '0')) if isinstance(monthly_mobile, str): monthly_mobile = int(monthly_mobile.replace(',', '').replace('< 10', '0')) total_searches = monthly_pc + monthly_mobile results.append((keyword, monthly_pc, monthly_mobile, total_searches)) if len(results) >= 100: break # 블로그 문서 수 병렬 요청 with ThreadPoolExecutor(max_workers=5) as executor: blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results] for i, future in enumerate(blog_futures): keyword, blog_count = future.result() results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], blog_count) return results def save_to_excel(results, keyword): df = pd.DataFrame(results, columns=["키워드", "PC월검색량", "모바일월검색량", "토탈월검색량", "블로그문서수"]) now = datetime.now().strftime('%Y-%m-%d') sanitized_keyword = keyword.replace(' ', '_') filename = f"{now}_{sanitized_keyword}_연관검색어.xlsx" file_path = os.path.join(tempfile.gettempdir(), filename) df.to_excel(file_path, index=False) return file_path def display_search_volumes(keywords): keyword_list = [keyword.strip() for keyword in keywords.split(',')] results = get_monthly_search_volumes(keyword_list) file_path = save_to_excel(results, keywords) return results, file_path iface = gr.Interface( fn=display_search_volumes, inputs=gr.Textbox(placeholder="키워드를 입력하세요"), outputs=[ gr.Dataframe(headers=["키워드", "PC월검색량", "모바일월검색량", "토탈월검색량", "블로그문서수"]), gr.File(label="다운로드 엑셀 파일") ], title="네이버 월검색량 검색기", ) iface.launch()