import time import hashlib import hmac import base64 import requests import gradio as gr import urllib.request import urllib.parse import json import pandas as pd from concurrent.futures import ThreadPoolExecutor import os import tempfile from datetime import datetime from dotenv import load_dotenv # dotenv 추가 # .env 파일의 환경 변수를 로드합니다. load_dotenv() # 환경 변수에서 API 키와 시크릿 키를 불러옵니다. BASE_URL = "https://api.searchad.naver.com" API_KEY = os.environ.get("NAVER_API_KEY") SECRET_KEY = os.environ.get("NAVER_SECRET_KEY") CUSTOMER_ID = 2666992 # 환경 변수에서 클라이언트 ID와 시크릿을 불러옵니다. CLIENT_ID = os.environ.get("NAVER_CLIENT_ID") CLIENT_SECRET = os.environ.get("NAVER_CLIENT_SECRET") # 환경 변수 로드 확인 if not API_KEY or not SECRET_KEY or not CLIENT_ID or not CLIENT_SECRET: raise ValueError("필수 환경 변수가 설정되지 않았습니다. .env 파일을 확인하세요.") else: print("환경 변수가 정상적으로 로드되었습니다.") class NaverAPI: def __init__(self, base_url, api_key, secret_key, customer_id): self.base_url = base_url self.api_key = api_key self.secret_key = secret_key self.customer_id = customer_id def generate_signature(self, timestamp, method, path): sign = f"{timestamp}.{method}.{path}" signature = hmac.new(self.secret_key.encode('utf-8'), sign.encode('utf-8'), hashlib.sha256).digest() return base64.b64encode(signature).decode('utf-8') def get_timestamp(self): return str(int(time.time() * 1000)) def get_headers(self, method, uri): timestamp = self.get_timestamp() headers = { 'Content-Type': 'application/json; charset=UTF-8', 'X-Timestamp': timestamp, 'X-API-KEY': self.api_key, 'X-Customer': str(self.customer_id), 'X-Signature': self.generate_signature(timestamp, method, uri), } return headers def get_keywords_data(self, keywords): uri = "/keywordstool" method = "GET" query = { 'hintKeywords': ','.join(keywords), 'showDetail': 1 } headers = self.get_headers(method, uri) response = requests.get(self.base_url + uri, headers=headers, params=query) response.raise_for_status() # HTTP 오류 발생 시 예외 발생 return response.json() def get_blog_count(keyword): # 클라이언트 ID와 시크릿을 환경 변수에서 불러옵니다. client_id = CLIENT_ID client_secret = CLIENT_SECRET # keyword가 바이트 타입일 경우 디코딩 if isinstance(keyword, bytes): keyword = keyword.decode('utf-8') elif not isinstance(keyword, str): keyword = str(keyword) encText = urllib.parse.quote(keyword) url = "https://openapi.naver.com/v1/search/blog?query=" + encText request = urllib.request.Request(url) request.add_header("X-Naver-Client-Id", client_id) request.add_header("X-Naver-Client-Secret", client_secret) try: response = urllib.request.urlopen(request) rescode = response.getcode() if rescode == 200: response_body = response.read() data = json.loads(response_body.decode('utf-8')) return data.get('total', 0) else: return 0 except Exception as e: print(f"Error fetching blog count for keyword '{keyword}': {e}") return 0 def get_keywords_data_chunk(chunk): api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID) return api.get_keywords_data(chunk) def get_blog_count_parallel(keyword): return (keyword, get_blog_count(keyword)) def get_monthly_search_volumes(keywords, include_related_keywords=True): all_data = [] chunk_size = 10 # 키워드를 10개씩 나누어 요청 if include_related_keywords: # API 병렬 요청 with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(get_keywords_data_chunk, keywords[i:i+chunk_size]) for i in range(0, len(keywords), chunk_size)] for future in futures: try: data = future.result() if 'keywordList' in data: all_data.extend(data['keywordList']) except Exception as e: print(f"Error fetching keywords data chunk: {e}") else: # 연관검색어를 포함하지 않으므로 입력 키워드만 처리 for keyword in keywords: # 가상의 데이터 구조을 맞추기 위해 all_data.append({ 'relKeyword': keyword, 'monthlyPcQcCnt': '0', # 실제 API에서 값을 가져오려면 별도 요청 필요 'monthlyMobileQcCnt': '0' }) if not all_data: return [("Error", "데이터가 반환되지 않았거나 API 응답이 유효하지 않습니다.", "", "", "")] # 블로그 문서 수 칼럼 추가 results = [] unique_keywords = set() for item in all_data: keyword = item['relKeyword'] if keyword not in unique_keywords: unique_keywords.add(keyword) monthly_pc = item.get('monthlyPcQcCnt', 0) monthly_mobile = item.get('monthlyMobileQcCnt', 0) if isinstance(monthly_pc, str): monthly_pc = monthly_pc.replace(',', '').replace('< 10', '0') try: monthly_pc = int(monthly_pc) except ValueError: monthly_pc = 0 if isinstance(monthly_mobile, str): monthly_mobile = monthly_mobile.replace(',', '').replace('< 10', '0') try: monthly_mobile = int(monthly_mobile) except ValueError: monthly_mobile = 0 total_searches = monthly_pc + monthly_mobile results.append((keyword, monthly_pc, monthly_mobile, total_searches)) if len(results) >= 100 and include_related_keywords: break if include_related_keywords: # 블로그 문서 수 병렬 요청 with ThreadPoolExecutor(max_workers=5) as executor: blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results] for i, future in enumerate(blog_futures): try: keyword, blog_count = future.result() results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], blog_count) except Exception as e: print(f"Error fetching blog count for keyword '{results[i][0]}': {e}") results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], "Error") else: # 블로그 문서 수 병렬 요청 (연관검색어가 아닐 경우 각 키워드에 대해) with ThreadPoolExecutor(max_workers=5) as executor: blog_futures = [executor.submit(get_blog_count_parallel, keyword) for keyword in results] temp_results = [] for future in blog_futures: try: keyword, blog_count = future.result() temp_results.append((keyword, 0, 0, 0, blog_count)) except Exception as e: print(f"Error fetching blog count for keyword '{keyword}': {e}") temp_results.append((keyword, 0, 0, 0, "Error")) results = temp_results return results def save_to_excel(results, keyword): df = pd.DataFrame(results, columns=["키워드", "PC월검색량", "모바일월검색량", "토탈월검색량", "블로그문서수"]) now = datetime.now().strftime('%Y-%m-%d') sanitized_keyword = keyword.replace(' ', '_') filename = f"{now}_{sanitized_keyword}_연관검색어.xlsx" file_path = os.path.join(tempfile.gettempdir(), filename) df.to_excel(file_path, index=False) return file_path def display_search_volumes(keywords, include_related): keyword_list = [keyword.strip() for keyword in keywords.split(',')] results = get_monthly_search_volumes(keyword_list, include_related_keywords=include_related) file_path = save_to_excel(results, keywords) return results, file_path iface = gr.Interface( fn=display_search_volumes, inputs=[ gr.Textbox(placeholder="키워드를 입력하세요"), gr.Checkbox(label="연관검색어 포함", value=True) # 연관검색어 토글 추가 ], outputs=[ gr.Dataframe(headers=["키워드", "PC월검색량", "모바일월검색량", "토탈월검색량", "블로그문서수"]), gr.File(label="다운로드 엑셀 파일") ], title="네이버 월검색량 검색기", description="키워드의 월 검색량과 블로그 문서 수를 확인할 수 있습니다. 연관검색어를 포함할지 선택하세요.", ) iface.launch(share=True) # share=True를 추가하여 공개 링크 생성