MoneyRadar / app.py
ginipick's picture
Update app.py
606e64a verified
raw
history blame
32.2 kB
import gradio as gr
import requests
import json
import os
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from openai import OpenAI
from bs4 import BeautifulSoup
import re
import pathlib
import sqlite3
import pytz
#########################################################
# ν•œκ΅­ κΈ°μ—… 리슀트
#########################################################
KOREAN_COMPANIES = [
"NVIDIA",
"ALPHABET",
"APPLE",
"TESLA",
"AMAZON",
"MICROSOFT",
"META",
"INTEL",
"SAMSUNG",
"HYNIX",
"BITCOIN",
"crypto",
"stock",
"Economics",
"Finance",
"investing"
]
#########################################################
# 곡톡 ν•¨μˆ˜
#########################################################
def convert_to_seoul_time(timestamp_str):
"""
μ£Όμ–΄μ§„ 'YYYY-MM-DD HH:MM:SS' ν˜•νƒœμ˜ UTC μ‹œκ°μ„ μ„œμšΈ μ‹œκ°„(KST)으둜 λ³€ν™˜ν•˜μ—¬
'YYYY-MM-DD HH:MM:SS KST' ν˜•νƒœμ˜ λ¬Έμžμ—΄λ‘œ λ°˜ν™˜.
"""
try:
dt = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
seoul_tz = pytz.timezone('Asia/Seoul')
seoul_time = seoul_tz.localize(dt)
return seoul_time.strftime('%Y-%m-%d %H:%M:%S KST')
except Exception as e:
print(f"μ‹œκ°„ λ³€ν™˜ 였λ₯˜: {str(e)}")
return timestamp_str
def analyze_sentiment_batch(articles, client):
"""
OpenAI APIλ₯Ό 톡해 λ‰΄μŠ€ 기사듀(articles)의 제λͺ©/λ‚΄μš©μ„ μ’…ν•©ν•˜μ—¬
감성 뢄석(긍정/λΆ€μ •/쀑립 λ“±)을 μˆ˜ν–‰ν•˜κ³ , μš”μ•½λœ κ²°κ³Όλ₯Ό λ¬Έμžμ—΄λ‘œ λ°˜ν™˜.
"""
try:
# λͺ¨λ“  기사(제λͺ©, snippet)을 합쳐 ν•˜λ‚˜μ˜ ν…μŠ€νŠΈλ‘œ λ§Œλ“ λ‹€.
combined_text = "\n\n".join([
f"제λͺ©: {article.get('title', '')}\nλ‚΄μš©: {article.get('snippet', '')}"
for article in articles
])
# 감성 뢄석을 μš”μ²­ν•˜λŠ” ν”„λ‘¬ν”„νŠΈ
prompt = f"""λ‹€μŒ λ‰΄μŠ€ λͺ¨μŒμ— λŒ€ν•΄ μ „λ°˜μ μΈ 감성 뢄석을 μˆ˜ν–‰ν•˜μ„Έμš”:
λ‰΄μŠ€ λ‚΄μš©:
{combined_text}
λ‹€μŒ ν˜•μ‹μœΌλ‘œ λΆ„μ„ν•΄μ£Όμ„Έμš”:
1. μ „λ°˜μ  감성: [긍정/λΆ€μ •/쀑립]
2. μ£Όμš” 긍정적 μš”μ†Œ:
- [ν•­λͺ©1]
- [ν•­λͺ©2]
3. μ£Όμš” 뢀정적 μš”μ†Œ:
- [ν•­λͺ©1]
- [ν•­λͺ©2]
4. μ’…ν•© 평가: [상세 μ„€λͺ…]
"""
response = client.chat.completions.create(
model="CohereForAI/c4ai-command-r-plus-08-2024",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
return f"감성 뢄석 μ‹€νŒ¨: {str(e)}"
#########################################################
# DB μ΄ˆκΈ°ν™” 및 μž…μΆœλ ₯ ν•¨μˆ˜
#########################################################
def init_db():
"""
SQLite DB 파일(search_results.db)이 μ—†λ‹€λ©΄ μƒμ„±ν•˜κ³ ,
'searches' ν…Œμ΄λΈ”μ΄ μ—†λ‹€λ©΄ μƒμ„±ν•œλ‹€.
"""
db_path = pathlib.Path("search_results.db")
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS searches
(id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT,
country TEXT,
results TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''')
conn.commit()
conn.close()
def save_to_db(keyword, country, results):
"""
νŠΉμ • (keyword, country)에 λŒ€ν•œ 검색 κ²°κ³Ό(results: JSON ν˜•νƒœ)λ₯Ό DB에 μ €μž₯.
"""
conn = sqlite3.connect("search_results.db")
c = conn.cursor()
seoul_tz = pytz.timezone('Asia/Seoul')
now = datetime.now(seoul_tz)
timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
c.execute("""INSERT INTO searches
(keyword, country, results, timestamp)
VALUES (?, ?, ?, ?)""",
(keyword, country, json.dumps(results), timestamp))
conn.commit()
conn.close()
def load_from_db(keyword, country):
"""
DBμ—μ„œ (keyword, country)에 ν•΄λ‹Ήν•˜λŠ” κ°€μž₯ 졜근 검색 κ²°κ³Όλ₯Ό λΆˆλŸ¬μ˜¨λ‹€.
κ²°κ³Όκ°€ 있으면 (JSON λ””μ½”λ”©λœ κ°’, 'YYYY-MM-DD HH:MM:SS KST' ν˜•νƒœμ˜ μ €μž₯ μ‹œκ°) λ°˜ν™˜.
μ—†μœΌλ©΄ (None, None) λ°˜ν™˜.
"""
conn = sqlite3.connect("search_results.db")
c = conn.cursor()
c.execute("""SELECT results, timestamp
FROM searches
WHERE keyword=? AND country=?
ORDER BY timestamp DESC
LIMIT 1""",
(keyword, country))
result = c.fetchone()
conn.close()
if result:
return json.loads(result[0]), convert_to_seoul_time(result[1])
return None, None
def load_by_id(search_id):
"""
DB의 PRIMARY KEY(id)둜 νŠΉμ • 검색 기둝을 λΆˆλŸ¬μ˜¨λ‹€.
- keyword, country, results, timestamp
- resultsλ₯Ό JSON λ””μ½”λ”©ν•œ λ’€ 'data'λ‘œμ„œ λ°˜ν™˜.
"""
conn = sqlite3.connect("search_results.db")
c = conn.cursor()
c.execute("SELECT keyword, country, results, timestamp FROM searches WHERE id=?",
(search_id,))
row = c.fetchone()
conn.close()
if row:
keyword, country, results_json, ts = row
data = json.loads(results_json)
return {
"keyword": keyword,
"country": country,
"data": data,
"timestamp": convert_to_seoul_time(ts)
}
return None
#########################################################
# κ²°κ³Ό ν‘œμ‹œ
#########################################################
def display_results(articles):
"""
기사 λͺ©λ‘(articles)을 Markdown ν˜•μ‹μœΌλ‘œ 예쁘게 정리.
"""
output = ""
for idx, article in enumerate(articles, 1):
output += f"### {idx}. {article['title']}\n"
output += f"좜처: {article['channel']}\n"
output += f"μ‹œκ°„: {article['time']}\n"
output += f"링크: {article['link']}\n"
output += f"μš”μ•½: {article['snippet']}\n\n"
return output
#########################################################
# SerpHouse API: λ²ˆμ—­ / μš”μ²­ / 응닡 가곡
#########################################################
API_KEY = os.getenv("SERPHOUSE_API_KEY") # SERPHOUSE_API_KEY ν™˜κ²½λ³€μˆ˜
def is_english(text):
"""
textκ°€ λͺ¨λ‘ ASCII λ²”μœ„ 내에 있으면 True, μ•„λ‹ˆλ©΄ False.
"""
return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', ''))
@lru_cache(maxsize=100)
def translate_query(query, country):
"""
검색어(query)λ₯Ό ν•΄λ‹Ή country μ–Έμ–΄λ‘œ λ²ˆμ—­.
(단, is_english(query)κ°€ Trueμ΄κ±°λ‚˜, countryκ°€ νŠΉμ • 쑰건이면 κ·ΈλŒ€λ‘œ 리턴)
"""
try:
if is_english(query):
return query
if country in COUNTRY_LANGUAGES:
target_lang = COUNTRY_LANGUAGES[country]
url = "https://translate.googleapis.com/translate_a/single"
params = {
"client": "gtx",
"sl": "auto",
"tl": target_lang,
"dt": "t",
"q": query
}
session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5)
session.mount('https://', HTTPAdapter(max_retries=retries))
response = session.get(url, params=params, timeout=(5, 10))
translated_text = response.json()[0][0][0]
return translated_text
return query
except Exception as e:
print(f"λ²ˆμ—­ 였λ₯˜: {str(e)}")
return query
def serphouse_search(query, country, page=1, num_result=10):
"""
SerpHouse API에 μ‹€μ‹œκ°„ 'news' 검색 μš”μ²­μ„ 보내고,
κ²°κ³Όλ₯Ό νŠΉμ • ν˜•μ‹(였λ₯˜ λ©”μ‹œμ§€ or 기사 λͺ©λ‘)으둜 λ°˜ν™˜ν•œλ‹€.
"""
url = "https://api.serphouse.com/serp/live"
now = datetime.utcnow()
yesterday = now - timedelta(days=1)
date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}"
# 검색어 λ²ˆμ—­
translated_query = translate_query(query, country)
payload = {
"data": {
"q": translated_query,
"domain": "google.com",
"loc": COUNTRY_LOCATIONS.get(country, "United States"),
"lang": COUNTRY_LANGUAGES.get(country, "en"),
"device": "desktop",
"serp_type": "news",
"page": str(page),
"num": "100",
"date_range": date_range,
"sort_by": "date"
}
}
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {API_KEY}"
}
try:
session = requests.Session()
retries = Retry(
total=5,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504, 429],
allowed_methods=["POST"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
# API 호좜
response = session.post(
url,
json=payload,
headers=headers,
timeout=(30, 30)
)
response.raise_for_status()
response_data = response.json()
# 응닡 λ°μ΄ν„°μ—μ„œ 기사 λΆ€λΆ„λ§Œ μΆ”μΆœ
return format_results_from_raw({
"results": response_data,
"translated_query": translated_query
})
except requests.exceptions.Timeout:
return ("검색 μ‹œκ°„μ΄ μ΄ˆκ³Όλ˜μ—ˆμŠ΅λ‹ˆλ‹€. μž μ‹œ ν›„ λ‹€μ‹œ μ‹œλ„ν•΄μ£Όμ„Έμš”.", [])
except requests.exceptions.RequestException as e:
return (f"검색 쀑 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}", [])
except Exception as e:
return (f"예기치 μ•Šμ€ 였λ₯˜κ°€ λ°œμƒν–ˆμŠ΅λ‹ˆλ‹€: {str(e)}", [])
def format_results_from_raw(response_data):
"""
SerpHouse API raw 데이터(response_data)μ—μ„œ
- error κ°€ 있으면 ("Error: ...", [])
- 정상인 경우 ("", [기사1, 기사2, ...]) ν˜•νƒœλ‘œ λ°˜ν™˜
- λ˜ν•œ ν•œκ΅­ 도메인/ν‚€μ›Œλ“œλ₯Ό κ°€μ§„ 기사 μ œμ™Έ (필터링)
"""
if "error" in response_data:
return ("Error: " + response_data["error"], [])
try:
results = response_data["results"]
translated_query = response_data["translated_query"]
news_results = results.get('results', {}).get('results', {}).get('news', [])
if not news_results:
return ("검색 κ²°κ³Όκ°€ μ—†μŠ΅λ‹ˆλ‹€.", [])
# ν•œκ΅­ 도메인 / ν•œκ΅­ ν‚€μ›Œλ“œ 기사 μ œμ™Έ
korean_domains = [
'.kr', 'korea', 'korean', 'yonhap', 'hankyung', 'chosun',
'donga', 'joins', 'hani', 'koreatimes', 'koreaherald'
]
korean_keywords = [
'korea', 'korean', 'seoul', 'busan', 'incheon', 'daegu',
'gwangju', 'daejeon', 'ulsan', 'sejong'
]
filtered_articles = []
for idx, result in enumerate(news_results, 1):
url = result.get("url", result.get("link", "")).lower()
title = result.get("title", "").lower()
channel = result.get("channel", result.get("source", "")).lower()
# ν•œκ΅­ κ΄€λ ¨ 기사 ν•„ν„°
is_korean_content = (
any(domain in url or domain in channel for domain in korean_domains) or
any(keyword in title for keyword in korean_keywords)
)
if not is_korean_content:
filtered_articles.append({
"index": idx,
"title": result.get("title", "제λͺ© μ—†μŒ"),
"link": url,
"snippet": result.get("snippet", "λ‚΄μš© μ—†μŒ"),
"channel": result.get("channel", result.get("source", "μ•Œ 수 μ—†μŒ")),
"time": result.get("time", result.get("date", "μ•Œ 수 μ—†λŠ” μ‹œκ°„")),
"image_url": result.get("img", result.get("thumbnail", "")),
"translated_query": translated_query
})
return ("", filtered_articles)
except Exception as e:
return (f"κ²°κ³Ό 처리 쀑 였λ₯˜ λ°œμƒ: {str(e)}", [])
#########################################################
# κ΅­κ°€ μ„€μ •
#########################################################
COUNTRY_LANGUAGES = {
"United States": "en",
"KOREA": "ko",
"United Kingdom": "en",
"Taiwan": "zh-TW",
"Canada": "en",
"Australia": "en",
"Germany": "de",
"France": "fr",
"Japan": "ja",
"China": "zh",
"India": "hi",
"Brazil": "pt",
"Mexico": "es",
"Russia": "ru",
"Italy": "it",
"Spain": "es",
"Netherlands": "nl",
"Singapore": "en",
"Hong Kong": "zh-HK",
"Indonesia": "id",
"Malaysia": "ms",
"Philippines": "tl",
"Thailand": "th",
"Vietnam": "vi",
"Belgium": "nl",
"Denmark": "da",
"Finland": "fi",
"Ireland": "en",
"Norway": "no",
"Poland": "pl",
"Sweden": "sv",
"Switzerland": "de",
"Austria": "de",
"Czech Republic": "cs",
"Greece": "el",
"Hungary": "hu",
"Portugal": "pt",
"Romania": "ro",
"Turkey": "tr",
"Israel": "he",
"Saudi Arabia": "ar",
"United Arab Emirates": "ar",
"South Africa": "en",
"Argentina": "es",
"Chile": "es",
"Colombia": "es",
"Peru": "es",
"Venezuela": "es",
"New Zealand": "en",
"Bangladesh": "bn",
"Pakistan": "ur",
"Egypt": "ar",
"Morocco": "ar",
"Nigeria": "en",
"Kenya": "sw",
"Ukraine": "uk",
"Croatia": "hr",
"Slovakia": "sk",
"Bulgaria": "bg",
"Serbia": "sr",
"Estonia": "et",
"Latvia": "lv",
"Lithuania": "lt",
"Slovenia": "sl",
"Luxembourg": "Luxembourg",
"Malta": "Malta",
"Cyprus": "Cyprus",
"Iceland": "is"
}
COUNTRY_LOCATIONS = {
"United States": "United States",
"KOREA": "kr",
"United Kingdom": "United Kingdom",
"Taiwan": "Taiwan",
"Canada": "Canada",
"Australia": "Australia",
"Germany": "Germany",
"France": "France",
"Japan": "Japan",
"China": "China",
"India": "India",
"Brazil": "Brazil",
"Mexico": "Mexico",
"Russia": "Russia",
"Italy": "Italy",
"Spain": "Spain",
"Netherlands": "Netherlands",
"Singapore": "Singapore",
"Hong Kong": "Hong Kong",
"Indonesia": "Indonesia",
"Malaysia": "Malaysia",
"Philippines": "Philippines",
"Thailand": "Thailand",
"Vietnam": "Vietnam",
"Belgium": "Belgium",
"Denmark": "Denmark",
"Finland": "Finland",
"Ireland": "Ireland",
"Norway": "Norway",
"Poland": "Poland",
"Sweden": "Sweden",
"Switzerland": "Switzerland",
"Austria": "Austria",
"Czech Republic": "Czech Republic",
"Greece": "Greece",
"Hungary": "Hungary",
"Portugal": "Portugal",
"Romania": "Romania",
"Turkey": "Turkey",
"Israel": "Israel",
"Saudi Arabia": "Saudi Arabia",
"United Arab Emirates": "United Arab Emirates",
"South Africa": "South Africa",
"Argentina": "Argentina",
"Chile": "Chile",
"Colombia": "Colombia",
"Peru": "Peru",
"Venezuela": "Venezuela",
"New Zealand": "New Zealand",
"Bangladesh": "Bangladesh",
"Pakistan": "Pakistan",
"Egypt": "Egypt",
"Morocco": "Morocco",
"Nigeria": "Nigeria",
"Kenya": "Kenya",
"Ukraine": "Ukraine",
"Croatia": "Croatia",
"Slovakia": "Slovakia",
"Bulgaria": "Bulgaria",
"Serbia": "Serbia",
"Estonia": "et",
"Latvia": "lv",
"Lithuania": "lt",
"Slovenia": "sl",
"Luxembourg": "Luxembourg",
"Malta": "Malta",
"Cyprus": "Cyprus",
"Iceland": "Iceland"
}
#########################################################
# 검색/좜λ ₯ ν•¨μˆ˜ (κΈ°μ—… 검색, λ‘œλ“œ)
#########################################################
def search_company(company):
"""
KOREAN_COMPANIES에 μžˆλŠ” companyλ₯Ό λ―Έκ΅­(United States) κΈ°μ€€μœΌλ‘œ 검색 + 감성 뢄석
DB에 μ €μž₯ ν›„ 기사 λͺ©λ‘ + 뢄석 κ²°κ³Όλ₯Ό markdown으둜 λ°˜ν™˜.
"""
error_message, articles = serphouse_search(company, "United States")
if not error_message and articles:
analysis = analyze_sentiment_batch(articles, client)
store_dict = {
"articles": articles,
"analysis": analysis
}
# DB μ €μž₯
save_to_db(company, "United States", store_dict)
# κ²°κ³Ό 좜λ ₯
output = display_results(articles)
output += f"\n\n### 뢄석 보고\n{analysis}\n"
return output
else:
if error_message:
return error_message
return f"{company}에 λŒ€ν•œ 검색 κ²°κ³Όκ°€ μ—†μŠ΅λ‹ˆλ‹€."
def load_company(company):
"""
DB에 μ €μž₯된 (company, United States) 검색 κ²°κ³Όλ₯Ό λΆˆλŸ¬μ™€ 기사+뢄석 을 λ°˜ν™˜.
"""
data, timestamp = load_from_db(company, "United States")
if data:
articles = data.get("articles", [])
analysis = data.get("analysis", "")
output = f"### {company} 검색 κ²°κ³Ό\nμ €μž₯ μ‹œκ°„: {timestamp}\n\n"
output += display_results(articles)
output += f"\n\n### 뢄석 보고\n{analysis}\n"
return output
return f"{company}에 λŒ€ν•œ μ €μž₯된 κ²°κ³Όκ°€ μ—†μŠ΅λ‹ˆλ‹€."
#########################################################
# 전체 톡계
#########################################################
def show_stats():
"""
(κΈ°μ‘΄ "ν•œκ΅­ κΈ°μ—… λ‰΄μŠ€ 뢄석 리포트") -> "EarnBOT 뢄석 리포트" 둜 λͺ…μΉ­ λ³€κ²½
각 κΈ°μ—…μ˜ μ΅œμ‹  DB 기둝(기사+뢄석) 수λ₯Ό ν‘œμ‹œν•˜κ³ ,
감성 뢄석 κ²°κ³Όλ₯Ό ν•¨κ»˜ 좜λ ₯.
"""
conn = sqlite3.connect("search_results.db")
c = conn.cursor()
output = "## EarnBOT 뢄석 리포트\n\n"
data_list = []
for company in KOREAN_COMPANIES:
c.execute("""
SELECT results, timestamp
FROM searches
WHERE keyword = ?
ORDER BY timestamp DESC
LIMIT 1
""", (company,))
row = c.fetchone()
if row:
results_json, tstamp = row
data_list.append((company, tstamp, results_json))
conn.close()
def analyze_data(item):
comp, tstamp, results_json = item
data = json.loads(results_json)
articles = data.get("articles", [])
analysis = data.get("analysis", "")
count_articles = len(articles)
return (comp, tstamp, count_articles, analysis)
results_list = []
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(analyze_data, dl) for dl in data_list]
for future in as_completed(futures):
results_list.append(future.result())
for comp, tstamp, count, analysis in results_list:
seoul_time = convert_to_seoul_time(tstamp)
output += f"### {comp}\n"
output += f"- λ§ˆμ§€λ§‰ μ—…λ°μ΄νŠΈ: {seoul_time}\n"
output += f"- μ €μž₯된 기사 수: {count}건\n\n"
if analysis:
output += "#### λ‰΄μŠ€ 감성 뢄석\n"
output += f"{analysis}\n\n"
output += "---\n\n"
return output
#########################################################
# 전체 검색/좜λ ₯ + μ’…ν•© 보고
#########################################################
def search_all_companies():
"""
KOREAN_COMPANIES λ¦¬μŠ€νŠΈμ— λŒ€ν•΄ 병렬 검색 + 뢄석
"""
overall_result = "# [전체 검색 κ²°κ³Ό]\n\n"
def do_search(comp):
return comp, search_company(comp)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(do_search, c) for c in KOREAN_COMPANIES]
for future in as_completed(futures):
comp, res_text = future.result()
overall_result += f"## {comp}\n"
overall_result += res_text + "\n\n"
return overall_result
def load_all_companies():
"""
KOREAN_COMPANIES λ¦¬μŠ€νŠΈμ— λŒ€ν•΄ DB에 μ €μž₯된 κ°’(기사+뢄석) 일괄 좜λ ₯
"""
overall_result = "# [전체 좜λ ₯ κ²°κ³Ό]\n\n"
for comp in KOREAN_COMPANIES:
overall_result += f"## {comp}\n"
overall_result += load_company(comp)
overall_result += "\n"
return overall_result
def full_summary_report():
"""
1) search_all_companies => 기사+뢄석 => DB μ €μž₯
2) load_all_companies => DB λ‘œλ“œ
3) show_stats => μ’…ν•© 감성 뢄석
"""
search_result_text = search_all_companies()
load_result_text = load_all_companies()
stats_text = show_stats()
combined_report = (
"# 전체 뢄석 보고 μš”μ•½\n\n"
"μ•„λž˜ μˆœμ„œλ‘œ μ‹€ν–‰λ˜μ—ˆμŠ΅λ‹ˆλ‹€:\n"
"1. λͺ¨λ“  μ’…λͺ© 검색(병렬) + 뢄석 => 2. λͺ¨λ“  μ’…λͺ© DB κ²°κ³Ό 좜λ ₯ => 3. 전체 감성 뢄석 톡계\n\n"
f"{search_result_text}\n\n"
f"{load_result_text}\n\n"
"## [전체 감성 뢄석 톡계]\n\n"
f"{stats_text}"
)
return combined_report
#########################################################
# (μΆ”κ°€) μ‚¬μš©μž μž„μ˜ 검색 + 뢄석
#########################################################
def search_custom(query, country):
"""
1) query & country에 λŒ€ν•΄ 검색 + 뢄석 => DBμ €μž₯
2) DBμ—μ„œ λ‹€μ‹œ λ‘œλ”© => 기사 + 뢄석 κ²°κ³Ό ν‘œμ‹œ
"""
error_message, articles = serphouse_search(query, country)
if error_message:
return f"였λ₯˜ λ°œμƒ: {error_message}"
if not articles:
return "검색 κ²°κ³Όκ°€ μ—†μŠ΅λ‹ˆλ‹€."
analysis = analyze_sentiment_batch(articles, client)
save_data = {
"articles": articles,
"analysis": analysis
}
save_to_db(query, country, save_data)
loaded_data, timestamp = load_from_db(query, country)
if not loaded_data:
return "DBμ—μ„œ λ‘œλ“œ μ‹€νŒ¨"
arts = loaded_data.get("articles", [])
analy = loaded_data.get("analysis", "")
out = f"## [μ‚¬μš©μž μž„μ˜ 검색 κ²°κ³Ό]\n\n"
out += f"**ν‚€μ›Œλ“œ**: {query}\n\n"
out += f"**κ΅­κ°€**: {country}\n\n"
out += f"**μ €μž₯ μ‹œκ°„**: {timestamp}\n\n"
out += display_results(arts)
out += f"### λ‰΄μŠ€ 감성 뢄석\n{analy}\n"
return out
#########################################################
# (μΆ”κ°€) νžˆμŠ€ν† λ¦¬ ν•¨μˆ˜
#########################################################
def get_custom_search_history():
"""
KOREAN_COMPANIES λͺ©λ‘μ— μ—†λŠ” keyword둜 κ²€μƒ‰λœ 기둝만 (id, label) 리슀트둜 λ°˜ν™˜.
label 예: "12 | 2025-01-22 10:23:00 KST | Apple (United States)"
"""
company_set = set(k.lower() for k in KOREAN_COMPANIES)
conn = sqlite3.connect("search_results.db")
c = conn.cursor()
c.execute("""SELECT id, keyword, country, timestamp
FROM searches
ORDER BY timestamp DESC""")
rows = c.fetchall()
conn.close()
history_list = []
for sid, kw, cty, ts in rows:
if kw.lower() not in company_set:
display_time = convert_to_seoul_time(ts)
label = f"{sid} | {display_time} | {kw} ({cty})"
history_list.append((str(sid), label))
return history_list
def view_history_record(record_id):
"""
μ£Όμ–΄μ§„ record_id λ‘œλΆ€ν„° load_by_id() 둜 λ‘œλ“œν•œ 기사+뢄석 κ²°κ³Όλ₯Ό Markdown ν‘œμ‹œ
"""
if not record_id:
return "기둝이 μ—†μŠ΅λ‹ˆλ‹€."
data = load_by_id(int(record_id))
if not data:
return "ν•΄λ‹Ή ID의 기둝이 μ—†μŠ΅λ‹ˆλ‹€."
keyword = data["keyword"]
country = data["country"]
timestamp = data["timestamp"]
stored = data["data"] # {"articles": [...], "analysis": ...}
articles = stored.get("articles", [])
analysis = stored.get("analysis", "")
out = f"### [νžˆμŠ€ν† λ¦¬ 검색 κ²°κ³Ό]\n\n"
out += f"- ID: {record_id}\n"
out += f"- ν‚€μ›Œλ“œ: {keyword}\n"
out += f"- κ΅­κ°€: {country}\n"
out += f"- μ €μž₯ μ‹œκ°„: {timestamp}\n\n"
out += display_results(articles)
out += f"\n\n### 뢄석 보고\n{analysis}\n"
return out
#########################################################
# Gradio μΈν„°νŽ˜μ΄μŠ€
#########################################################
ACCESS_TOKEN = os.getenv("HF_TOKEN")
if not ACCESS_TOKEN:
raise ValueError("HF_TOKEN environment variable is not set")
client = OpenAI(
base_url="https://api-inference.huggingface.co/v1/",
api_key=ACCESS_TOKEN,
)
css = """
/* μ „μ—­ μŠ€νƒ€μΌ */
footer {visibility: hidden;}
/* 기타 CSS λ“±... (μ΄ν•˜ 동일) */
"""
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", css=css, title="NewsAI μ„œλΉ„μŠ€") as iface:
init_db()
# μ›ν•˜λŠ” νƒ­λ“€ ꡬ성
with gr.Tabs():
with gr.Tab("μ§€μ • μžλ™ 검색/뢄석"):
gr.Markdown("## EarnBot: κΈ€λ‘œλ²Œ λΉ…ν…Œν¬ κΈ°μ—… 및 투자 전망 AI μžλ™ 뢄석")
gr.Markdown("- '전체 뢄석 보고 μš”μ•½' 클릭 μ‹œ 전체 μžλ™ 보고 생성.\n"
"- μ•„λž˜ κ°œλ³„ μ’…λͺ©μ˜ '검색(DB μžλ™ μ €μž₯)'κ³Ό '좜λ ₯(DB μžλ™ 호좜)'도 κ°€λŠ₯.\n"
"- ν•˜λ‹¨ 'μˆ˜λ™ 검색 νžˆμŠ€ν† λ¦¬'μ—μ„œ 이전에 μˆ˜λ™ μž…λ ₯ν•œ 검색어 기둝을 확인 κ°€λŠ₯.")
with gr.Row():
full_report_btn = gr.Button("전체 뢄석 보고 μš”μ•½", variant="primary")
full_report_display = gr.Markdown()
full_report_btn.click(fn=full_summary_report, outputs=full_report_display)
# κ°œλ³„ μ’…λͺ© 검색/좜λ ₯
with gr.Column():
for i in range(0, len(KOREAN_COMPANIES), 2):
with gr.Row():
# μ™Όμͺ½ μ—΄
with gr.Column():
company = KOREAN_COMPANIES[i]
with gr.Group():
gr.Markdown(f"### {company}")
with gr.Row():
search_btn = gr.Button("검색", variant="primary")
load_btn = gr.Button("좜λ ₯", variant="secondary")
result_display = gr.Markdown()
search_btn.click(
fn=lambda c=company: search_company(c),
inputs=[],
outputs=result_display
)
load_btn.click(
fn=lambda c=company: load_company(c),
inputs=[],
outputs=result_display
)
# 였λ₯Έμͺ½ μ—΄
if i + 1 < len(KOREAN_COMPANIES):
with gr.Column():
company = KOREAN_COMPANIES[i + 1]
with gr.Group():
gr.Markdown(f"### {company}")
with gr.Row():
search_btn = gr.Button("검색", variant="primary")
load_btn = gr.Button("좜λ ₯", variant="secondary")
result_display = gr.Markdown()
search_btn.click(
fn=lambda c=company: search_company(c),
inputs=[],
outputs=result_display
)
load_btn.click(
fn=lambda c=company: load_company(c),
inputs=[],
outputs=result_display
)
gr.Markdown("---")
gr.Markdown("### μˆ˜λ™ 검색 νžˆμŠ€ν† λ¦¬")
with gr.Row():
refresh_hist_btn = gr.Button("νžˆμŠ€ν† λ¦¬ κ°±μ‹ ", variant="secondary")
history_dropdown = gr.Dropdown(label="검색 기둝 λͺ©λ‘", choices=[], value=None)
hist_view_btn = gr.Button("보기", variant="primary")
hist_result_display = gr.Markdown()
def update_history_dropdown():
history_list = get_custom_search_history()
choice_list = []
for (id_val, label) in history_list:
choice_list.append(label)
return gr.update(choices=choice_list, value=None)
refresh_hist_btn.click(
fn=update_history_dropdown,
inputs=[],
outputs=history_dropdown
)
def show_history_record(selected_label):
if not selected_label:
return "νžˆμŠ€ν† λ¦¬κ°€ μ„ νƒλ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."
splitted = selected_label.split("|")
if len(splitted) < 2:
return "ν˜•μ‹ 였λ₯˜"
record_id = splitted[0].strip()
return view_history_record(record_id)
hist_view_btn.click(
fn=show_history_record,
inputs=[history_dropdown],
outputs=hist_result_display
)
# 두 번째 νƒ­: "μˆ˜λ™ 검색/뢄석"
with gr.Tab("μˆ˜λ™ 검색/뢄석"):
gr.Markdown("## μ‚¬μš©μž μž„μ˜ ν‚€μ›Œλ“œ + κ΅­κ°€ 검색/뢄석")
gr.Markdown("검색 κ²°κ³Όκ°€ DB에 μ €μž₯되며, μ•„λž˜ 'μˆ˜λ™ 검색 νžˆμŠ€ν† λ¦¬'μ—μ„œλ„ 확인 κ°€λŠ₯ν•©λ‹ˆλ‹€.")
with gr.Row():
with gr.Column():
user_input = gr.Textbox(
label="검색어 μž…λ ₯",
placeholder="예) Apple, Samsung λ“± 자유둭게"
)
with gr.Column():
country_selection = gr.Dropdown(
choices=list(COUNTRY_LOCATIONS.keys()),
value="United States",
label="κ΅­κ°€ 선택"
)
with gr.Column():
custom_search_btn = gr.Button("μ‹€ν–‰", variant="primary")
custom_search_output = gr.Markdown()
custom_search_btn.click(
fn=search_custom,
inputs=[user_input, country_selection],
outputs=custom_search_output
)
gr.Markdown("---")
gr.Markdown("### μˆ˜λ™ 검색 νžˆμŠ€ν† λ¦¬ (두 번째 νƒ­)")
with gr.Row():
refresh_hist_btn2 = gr.Button("νžˆμŠ€ν† λ¦¬ κ°±μ‹ ", variant="secondary")
history_dropdown2 = gr.Dropdown(label="검색 기둝 λͺ©λ‘", choices=[], value=None)
hist_view_btn2 = gr.Button("보기", variant="primary")
hist_result_display2 = gr.Markdown()
refresh_hist_btn2.click(
fn=update_history_dropdown,
inputs=[],
outputs=history_dropdown2
)
hist_view_btn2.click(
fn=show_history_record,
inputs=[history_dropdown2],
outputs=hist_result_display2
)
iface.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
ssl_verify=False,
show_error=True
)