na_ver / app.py
Kims12's picture
Update app.py
c98c67f verified
raw
history blame
11.3 kB
import time
import hashlib
import hmac
import base64
import requests
import gradio as gr
import urllib.request
import urllib.parse
import json
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import os
import tempfile
from datetime import datetime
from dotenv import load_dotenv # dotenv μΆ”κ°€
# .env 파일의 ν™˜κ²½ λ³€μˆ˜λ₯Ό λ‘œλ“œν•©λ‹ˆλ‹€.
load_dotenv()
# ν™˜κ²½ λ³€μˆ˜μ—μ„œ API 킀와 μ‹œν¬λ¦Ώ ν‚€λ₯Ό λΆˆλŸ¬μ˜΅λ‹ˆλ‹€.
BASE_URL = "https://api.searchad.naver.com"
API_KEY = os.environ.get("NAVER_API_KEY")
SECRET_KEY = os.environ.get("NAVER_SECRET_KEY")
CUSTOMER_ID = 2666992
# ν™˜κ²½ λ³€μˆ˜μ—μ„œ ν΄λΌμ΄μ–ΈνŠΈ ID와 μ‹œν¬λ¦Ώμ„ λΆˆλŸ¬μ˜΅λ‹ˆλ‹€.
CLIENT_ID = os.environ.get("NAVER_CLIENT_ID")
CLIENT_SECRET = os.environ.get("NAVER_CLIENT_SECRET")
# ν™˜κ²½ λ³€μˆ˜ λ‘œλ“œ 확인
if not API_KEY or not SECRET_KEY or not CLIENT_ID or not CLIENT_SECRET:
raise ValueError("ν•„μˆ˜ ν™˜κ²½ λ³€μˆ˜κ°€ μ„€μ •λ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€. .env νŒŒμΌμ„ ν™•μΈν•˜μ„Έμš”.")
else:
print("ν™˜κ²½ λ³€μˆ˜κ°€ μ •μƒμ μœΌλ‘œ λ‘œλ“œλ˜μ—ˆμŠ΅λ‹ˆλ‹€.")
class NaverAPI:
def __init__(self, base_url, api_key, secret_key, customer_id):
self.base_url = base_url
self.api_key = api_key
self.secret_key = secret_key
self.customer_id = customer_id
def generate_signature(self, timestamp, method, path):
sign = f"{timestamp}.{method}.{path}"
signature = hmac.new(self.secret_key.encode('utf-8'), sign.encode('utf-8'), hashlib.sha256).digest()
return base64.b64encode(signature).decode('utf-8')
def get_timestamp(self):
return str(int(time.time() * 1000))
def get_headers(self, method, uri):
timestamp = self.get_timestamp()
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'X-Timestamp': timestamp,
'X-API-KEY': self.api_key,
'X-Customer': str(self.customer_id),
'X-Signature': self.generate_signature(timestamp, method, uri),
}
return headers
def get_keywords_data(self, keywords):
uri = "/keywordstool"
method = "GET"
query = {
'hintKeywords': ','.join(keywords),
'showDetail': 1
}
headers = self.get_headers(method, uri)
response = requests.get(self.base_url + uri, headers=headers, params=query)
response.raise_for_status() # HTTP 였λ₯˜ λ°œμƒ μ‹œ μ˜ˆμ™Έ λ°œμƒ
return response.json()
def get_blog_count(keyword):
# ν΄λΌμ΄μ–ΈνŠΈ ID와 μ‹œν¬λ¦Ώμ„ ν™˜κ²½ λ³€μˆ˜μ—μ„œ λΆˆλŸ¬μ˜΅λ‹ˆλ‹€.
client_id = CLIENT_ID
client_secret = CLIENT_SECRET
# keywordκ°€ λ°”μ΄νŠΈ νƒ€μž…μΌ 경우 λ””μ½”λ”©
if isinstance(keyword, bytes):
keyword = keyword.decode('utf-8')
elif not isinstance(keyword, str):
keyword = str(keyword)
encText = urllib.parse.quote(keyword)
url = "https://openapi.naver.com/v1/search/blog?query=" + encText
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id", client_id)
request.add_header("X-Naver-Client-Secret", client_secret)
try:
response = urllib.request.urlopen(request)
rescode = response.getcode()
if rescode == 200:
response_body = response.read()
data = json.loads(response_body.decode('utf-8'))
return data.get('total', 0)
else:
return 0
except Exception as e:
print(f"Error fetching blog count for keyword '{keyword}': {e}")
return 0
def get_keywords_data_chunk(chunk):
api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID)
return api.get_keywords_data(chunk)
def get_blog_count_parallel(keyword):
return (keyword, get_blog_count(keyword))
def get_search_volumes(keyword):
"""
단일 ν‚€μ›Œλ“œμ˜ μ›” κ²€μƒ‰λŸ‰μ„ κ°€μ Έμ˜€λŠ” ν•¨μˆ˜.
"""
api = NaverAPI(BASE_URL, API_KEY, SECRET_KEY, CUSTOMER_ID)
try:
data = api.get_keywords_data([keyword])
if 'keywordList' in data and len(data['keywordList']) > 0:
# keywordListμ—μ„œ μž…λ ₯ν•œ ν‚€μ›Œλ“œμ™€ μΌμΉ˜ν•˜λŠ” ν•­λͺ©μ„ μ°ΎμŠ΅λ‹ˆλ‹€.
for item in data['keywordList']:
if item['relKeyword'].strip().lower() == keyword.strip().lower():
monthly_pc = item.get('monthlyPcQcCnt', 0)
monthly_mobile = item.get('monthlyMobileQcCnt', 0)
if isinstance(monthly_pc, str):
monthly_pc = monthly_pc.replace(',', '').replace('< 10', '0')
try:
monthly_pc = int(monthly_pc)
except ValueError:
monthly_pc = 0
if isinstance(monthly_mobile, str):
monthly_mobile = monthly_mobile.replace(',', '').replace('< 10', '0')
try:
monthly_mobile = int(monthly_mobile)
except ValueError:
monthly_mobile = 0
total_searches = monthly_pc + monthly_mobile
return (keyword, monthly_pc, monthly_mobile, total_searches)
# μž…λ ₯ν•œ ν‚€μ›Œλ“œμ™€ μΌμΉ˜ν•˜λŠ” ν•­λͺ©μ΄ 없을 경우
return (keyword, 0, 0, 0)
else:
return (keyword, 0, 0, 0)
except Exception as e:
print(f"Error fetching search volumes for keyword '{keyword}': {e}")
return (keyword, 0, 0, 0)
def get_monthly_search_volumes(keywords, include_related_keywords=True):
all_data = []
results = []
if include_related_keywords:
chunk_size = 10 # ν‚€μ›Œλ“œλ₯Ό 10κ°œμ”© λ‚˜λˆ„μ–΄ μš”μ²­
# API 병렬 μš”μ²­
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(get_keywords_data_chunk, keywords[i:i+chunk_size]) for i in range(0, len(keywords), chunk_size)]
for future in futures:
try:
data = future.result()
if 'keywordList' in data:
all_data.extend(data['keywordList'])
except Exception as e:
print(f"Error fetching keywords data chunk: {e}")
if not all_data:
return [("Error", "데이터가 λ°˜ν™˜λ˜μ§€ μ•Šμ•˜κ±°λ‚˜ API 응닡이 μœ νš¨ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.", "", "", "")]
unique_keywords = set()
for item in all_data:
keyword = item['relKeyword']
if keyword not in unique_keywords:
unique_keywords.add(keyword)
monthly_pc = item.get('monthlyPcQcCnt', 0)
monthly_mobile = item.get('monthlyMobileQcCnt', 0)
if isinstance(monthly_pc, str):
monthly_pc = monthly_pc.replace(',', '').replace('< 10', '0')
try:
monthly_pc = int(monthly_pc)
except ValueError:
monthly_pc = 0
if isinstance(monthly_mobile, str):
monthly_mobile = monthly_mobile.replace(',', '').replace('< 10', '0')
try:
monthly_mobile = int(monthly_mobile)
except ValueError:
monthly_mobile = 0
total_searches = monthly_pc + monthly_mobile
results.append((keyword, monthly_pc, monthly_mobile, total_searches))
if len(results) >= 100:
break
else:
# 연관검색어λ₯Ό ν¬ν•¨ν•˜μ§€ μ•ŠμœΌλ―€λ‘œ μž…λ ₯ ν‚€μ›Œλ“œλ§Œ 처리
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(get_search_volumes, keyword) for keyword in keywords]
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error fetching search volumes for keyword '{keyword}': {e}")
results.append((keyword, 0, 0, 0))
if not results:
return [("Error", "데이터가 λ°˜ν™˜λ˜μ§€ μ•Šμ•˜κ±°λ‚˜ API 응닡이 μœ νš¨ν•˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€.", "", "", "")]
# λΈ”λ‘œκ·Έ λ¬Έμ„œ 수 병렬 μš”μ²­
with ThreadPoolExecutor(max_workers=5) as executor:
if include_related_keywords:
blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results]
for i, future in enumerate(blog_futures):
try:
keyword, blog_count = future.result()
results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], blog_count)
except Exception as e:
print(f"Error fetching blog count for keyword '{results[i][0]}': {e}")
results[i] = (results[i][0], results[i][1], results[i][2], results[i][3], "Error")
else:
blog_futures = [executor.submit(get_blog_count_parallel, result[0]) for result in results]
temp_results = []
for future in blog_futures:
try:
keyword, blog_count = future.result()
temp_results.append((keyword, results[0][1], results[0][2], results[0][3], blog_count))
except Exception as e:
print(f"Error fetching blog count for keyword '{keyword}': {e}")
temp_results.append((keyword, results[0][1], results[0][2], results[0][3], "Error"))
results = temp_results
return results
def save_to_excel(results, keyword):
df = pd.DataFrame(results, columns=["ν‚€μ›Œλ“œ", "PCμ›”κ²€μƒ‰λŸ‰", "λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰", "ν† νƒˆμ›”κ²€μƒ‰λŸ‰", "λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜"])
now = datetime.now().strftime('%Y-%m-%d')
sanitized_keyword = keyword.replace(' ', '_')
filename = f"{now}_{sanitized_keyword}_연관검색어.xlsx"
file_path = os.path.join(tempfile.gettempdir(), filename)
df.to_excel(file_path, index=False)
return file_path
def display_search_volumes(keywords, include_related):
keyword_list = [keyword.strip() for keyword in keywords.split(',') if keyword.strip()]
if not keyword_list:
return [("Error", "μž…λ ₯된 ν‚€μ›Œλ“œκ°€ μ—†μŠ΅λ‹ˆλ‹€.", "", "", "")], None
results = get_monthly_search_volumes(keyword_list, include_related_keywords=include_related)
file_path = save_to_excel(results, keywords)
return results, file_path
iface = gr.Interface(
fn=display_search_volumes,
inputs=[
gr.Textbox(placeholder="ν‚€μ›Œλ“œλ₯Ό μž…λ ₯ν•˜μ„Έμš” (μ‰Όν‘œλ‘œ ꡬ뢄)", lines=2),
gr.Checkbox(label="연관검색어 포함", value=True) # 연관검색어 ν† κΈ€ μΆ”κ°€
],
outputs=[
gr.Dataframe(headers=["ν‚€μ›Œλ“œ", "PCμ›”κ²€μƒ‰λŸ‰", "λͺ¨λ°”μΌμ›”κ²€μƒ‰λŸ‰", "ν† νƒˆμ›”κ²€μƒ‰λŸ‰", "λΈ”λ‘œκ·Έλ¬Έμ„œμˆ˜"]),
gr.File(label="λ‹€μš΄λ‘œλ“œ μ—‘μ…€ 파일")
],
title="넀이버 μ›”κ²€μƒ‰λŸ‰ 검색기",
description="ν‚€μ›Œλ“œμ˜ μ›” κ²€μƒ‰λŸ‰κ³Ό λΈ”λ‘œκ·Έ λ¬Έμ„œ 수λ₯Ό 확인할 수 μžˆμŠ΅λ‹ˆλ‹€. 연관검색어λ₯Ό 포함할지 μ„ νƒν•˜μ„Έμš”.",
)
iface.launch(share=True) # share=Trueλ₯Ό μΆ”κ°€ν•˜μ—¬ 곡개 링크 생성