Spaces:
Running
Running
import gradio as gr | |
import requests | |
import json | |
import os | |
from datetime import datetime, timedelta | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from functools import lru_cache | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
from openai import OpenAI | |
from bs4 import BeautifulSoup | |
import re | |
import pathlib | |
import sqlite3 | |
import pytz | |
######################################################### | |
# νκ΅ κΈ°μ 리μ€νΈ | |
######################################################### | |
KOREAN_COMPANIES = [ | |
"NVIDIA", | |
"ALPHABET", | |
"APPLE", | |
"TESLA", | |
"AMAZON", | |
"MICROSOFT", | |
"META", | |
"INTEL", | |
"SAMSUNG", | |
"HYNIX", | |
"BITCOIN", | |
"crypto", | |
"stock", | |
"Economics", | |
"Finance", | |
"investing" | |
] | |
######################################################### | |
# κ³΅ν΅ ν¨μ | |
######################################################### | |
def convert_to_seoul_time(timestamp_str): | |
""" | |
μ£Όμ΄μ§ 'YYYY-MM-DD HH:MM:SS' ννμ UTC μκ°μ μμΈ μκ°(KST)μΌλ‘ λ³ννμ¬ | |
'YYYY-MM-DD HH:MM:SS KST' ννμ λ¬Έμμ΄λ‘ λ°ν. | |
""" | |
try: | |
dt = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') | |
seoul_tz = pytz.timezone('Asia/Seoul') | |
seoul_time = seoul_tz.localize(dt) | |
return seoul_time.strftime('%Y-%m-%d %H:%M:%S KST') | |
except Exception as e: | |
print(f"μκ° λ³ν μ€λ₯: {str(e)}") | |
return timestamp_str | |
def analyze_sentiment_batch(articles, client): | |
""" | |
OpenAI APIλ₯Ό ν΅ν΄ λ΄μ€ κΈ°μ¬λ€(articles)μ μ λͺ©/λ΄μ©μ μ’ ν©νμ¬ | |
κ°μ± λΆμ(κΈμ /λΆμ /μ€λ¦½ λ±)μ μννκ³ , μμ½λ κ²°κ³Όλ₯Ό λ¬Έμμ΄λ‘ λ°ν. | |
""" | |
try: | |
# λͺ¨λ κΈ°μ¬(μ λͺ©, snippet)μ ν©μ³ νλμ ν μ€νΈλ‘ λ§λ λ€. | |
combined_text = "\n\n".join([ | |
f"μ λͺ©: {article.get('title', '')}\nλ΄μ©: {article.get('snippet', '')}" | |
for article in articles | |
]) | |
# κ°μ± λΆμμ μμ²νλ ν둬ννΈ | |
prompt = f"""λ€μ λ΄μ€ λͺ¨μμ λν΄ μ λ°μ μΈ κ°μ± λΆμμ μννμΈμ: | |
λ΄μ€ λ΄μ©: | |
{combined_text} | |
λ€μ νμμΌλ‘ λΆμν΄μ£ΌμΈμ: | |
1. μ λ°μ κ°μ±: [κΈμ /λΆμ /μ€λ¦½] | |
2. μ£Όμ κΈμ μ μμ: | |
- [νλͺ©1] | |
- [νλͺ©2] | |
3. μ£Όμ λΆμ μ μμ: | |
- [νλͺ©1] | |
- [νλͺ©2] | |
4. μ’ ν© νκ°: [μμΈ μ€λͺ ] | |
""" | |
response = client.chat.completions.create( | |
model="CohereForAI/c4ai-command-r-plus-08-2024", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.3, | |
max_tokens=1000 | |
) | |
return response.choices[0].message.content | |
except Exception as e: | |
return f"κ°μ± λΆμ μ€ν¨: {str(e)}" | |
######################################################### | |
# DB μ΄κΈ°ν λ° μ μΆλ ₯ ν¨μ | |
######################################################### | |
def init_db(): | |
""" | |
SQLite DB νμΌ(search_results.db)μ΄ μλ€λ©΄ μμ±νκ³ , | |
'searches' ν μ΄λΈμ΄ μλ€λ©΄ μμ±νλ€. | |
""" | |
db_path = pathlib.Path("search_results.db") | |
conn = sqlite3.connect(db_path) | |
c = conn.cursor() | |
c.execute('''CREATE TABLE IF NOT EXISTS searches | |
(id INTEGER PRIMARY KEY AUTOINCREMENT, | |
keyword TEXT, | |
country TEXT, | |
results TEXT, | |
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') | |
conn.commit() | |
conn.close() | |
def save_to_db(keyword, country, results): | |
""" | |
νΉμ (keyword, country)μ λν κ²μ κ²°κ³Ό(results: JSON νν)λ₯Ό DBμ μ μ₯. | |
""" | |
conn = sqlite3.connect("search_results.db") | |
c = conn.cursor() | |
seoul_tz = pytz.timezone('Asia/Seoul') | |
now = datetime.now(seoul_tz) | |
timestamp = now.strftime('%Y-%m-%d %H:%M:%S') | |
c.execute("""INSERT INTO searches | |
(keyword, country, results, timestamp) | |
VALUES (?, ?, ?, ?)""", | |
(keyword, country, json.dumps(results), timestamp)) | |
conn.commit() | |
conn.close() | |
def load_from_db(keyword, country): | |
""" | |
DBμμ (keyword, country)μ ν΄λΉνλ κ°μ₯ μ΅κ·Ό κ²μ κ²°κ³Όλ₯Ό λΆλ¬μ¨λ€. | |
κ²°κ³Όκ° μμΌλ©΄ (JSON λμ½λ©λ κ°, 'YYYY-MM-DD HH:MM:SS KST' ννμ μ μ₯ μκ°) λ°ν. | |
μμΌλ©΄ (None, None) λ°ν. | |
""" | |
conn = sqlite3.connect("search_results.db") | |
c = conn.cursor() | |
c.execute("""SELECT results, timestamp | |
FROM searches | |
WHERE keyword=? AND country=? | |
ORDER BY timestamp DESC | |
LIMIT 1""", | |
(keyword, country)) | |
result = c.fetchone() | |
conn.close() | |
if result: | |
return json.loads(result[0]), convert_to_seoul_time(result[1]) | |
return None, None | |
def load_by_id(search_id): | |
""" | |
DBμ PRIMARY KEY(id)λ‘ νΉμ κ²μ κΈ°λ‘μ λΆλ¬μ¨λ€. | |
- keyword, country, results, timestamp | |
- resultsλ₯Ό JSON λμ½λ©ν λ€ 'data'λ‘μ λ°ν. | |
""" | |
conn = sqlite3.connect("search_results.db") | |
c = conn.cursor() | |
c.execute("SELECT keyword, country, results, timestamp FROM searches WHERE id=?", | |
(search_id,)) | |
row = c.fetchone() | |
conn.close() | |
if row: | |
keyword, country, results_json, ts = row | |
data = json.loads(results_json) | |
return { | |
"keyword": keyword, | |
"country": country, | |
"data": data, | |
"timestamp": convert_to_seoul_time(ts) | |
} | |
return None | |
######################################################### | |
# κ²°κ³Ό νμ | |
######################################################### | |
def display_results(articles): | |
""" | |
κΈ°μ¬ λͺ©λ‘(articles)μ Markdown νμμΌλ‘ μμκ² μ 리. | |
""" | |
output = "" | |
for idx, article in enumerate(articles, 1): | |
output += f"### {idx}. {article['title']}\n" | |
output += f"μΆμ²: {article['channel']}\n" | |
output += f"μκ°: {article['time']}\n" | |
output += f"λ§ν¬: {article['link']}\n" | |
output += f"μμ½: {article['snippet']}\n\n" | |
return output | |
######################################################### | |
# SerpHouse API: λ²μ / μμ² / μλ΅ κ°κ³΅ | |
######################################################### | |
API_KEY = os.getenv("SERPHOUSE_API_KEY") # SERPHOUSE_API_KEY νκ²½λ³μ | |
def is_english(text): | |
""" | |
textκ° λͺ¨λ ASCII λ²μ λ΄μ μμΌλ©΄ True, μλλ©΄ False. | |
""" | |
return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', '')) | |
def translate_query(query, country): | |
""" | |
κ²μμ΄(query)λ₯Ό ν΄λΉ country μΈμ΄λ‘ λ²μ. | |
(λ¨, is_english(query)κ° Trueμ΄κ±°λ, countryκ° νΉμ 쑰건μ΄λ©΄ κ·Έλλ‘ λ¦¬ν΄) | |
""" | |
try: | |
if is_english(query): | |
return query | |
if country in COUNTRY_LANGUAGES: | |
target_lang = COUNTRY_LANGUAGES[country] | |
url = "https://translate.googleapis.com/translate_a/single" | |
params = { | |
"client": "gtx", | |
"sl": "auto", | |
"tl": target_lang, | |
"dt": "t", | |
"q": query | |
} | |
session = requests.Session() | |
retries = Retry(total=3, backoff_factor=0.5) | |
session.mount('https://', HTTPAdapter(max_retries=retries)) | |
response = session.get(url, params=params, timeout=(5, 10)) | |
translated_text = response.json()[0][0][0] | |
return translated_text | |
return query | |
except Exception as e: | |
print(f"λ²μ μ€λ₯: {str(e)}") | |
return query | |
def serphouse_search(query, country, page=1, num_result=10): | |
""" | |
SerpHouse APIμ μ€μκ° 'news' κ²μ μμ²μ 보λ΄κ³ , | |
κ²°κ³Όλ₯Ό νΉμ νμ(μ€λ₯ λ©μμ§ or κΈ°μ¬ λͺ©λ‘)μΌλ‘ λ°ννλ€. | |
""" | |
url = "https://api.serphouse.com/serp/live" | |
now = datetime.utcnow() | |
yesterday = now - timedelta(days=1) | |
date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" | |
# κ²μμ΄ λ²μ | |
translated_query = translate_query(query, country) | |
payload = { | |
"data": { | |
"q": translated_query, | |
"domain": "google.com", | |
"loc": COUNTRY_LOCATIONS.get(country, "United States"), | |
"lang": COUNTRY_LANGUAGES.get(country, "en"), | |
"device": "desktop", | |
"serp_type": "news", | |
"page": str(page), | |
"num": "100", | |
"date_range": date_range, | |
"sort_by": "date" | |
} | |
} | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Bearer {API_KEY}" | |
} | |
try: | |
session = requests.Session() | |
retries = Retry( | |
total=5, | |
backoff_factor=1, | |
status_forcelist=[500, 502, 503, 504, 429], | |
allowed_methods=["POST"] | |
) | |
adapter = HTTPAdapter(max_retries=retries) | |
session.mount('http://', adapter) | |
session.mount('https://', adapter) | |
# API νΈμΆ | |
response = session.post( | |
url, | |
json=payload, | |
headers=headers, | |
timeout=(30, 30) | |
) | |
response.raise_for_status() | |
response_data = response.json() | |
# μλ΅ λ°μ΄ν°μμ κΈ°μ¬ λΆλΆλ§ μΆμΆ | |
return format_results_from_raw({ | |
"results": response_data, | |
"translated_query": translated_query | |
}) | |
except requests.exceptions.Timeout: | |
return ("κ²μ μκ°μ΄ μ΄κ³Όλμμ΅λλ€. μ μ ν λ€μ μλν΄μ£ΌμΈμ.", []) | |
except requests.exceptions.RequestException as e: | |
return (f"κ²μ μ€ μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}", []) | |
except Exception as e: | |
return (f"μκΈ°μΉ μμ μ€λ₯κ° λ°μνμ΅λλ€: {str(e)}", []) | |
def format_results_from_raw(response_data): | |
""" | |
SerpHouse API raw λ°μ΄ν°(response_data)μμ | |
- error κ° μμΌλ©΄ ("Error: ...", []) | |
- μ μμΈ κ²½μ° ("", [κΈ°μ¬1, κΈ°μ¬2, ...]) ννλ‘ λ°ν | |
- λν νκ΅ λλ©μΈ/ν€μλλ₯Ό κ°μ§ κΈ°μ¬ μ μΈ (νν°λ§) | |
""" | |
if "error" in response_data: | |
return ("Error: " + response_data["error"], []) | |
try: | |
results = response_data["results"] | |
translated_query = response_data["translated_query"] | |
news_results = results.get('results', {}).get('results', {}).get('news', []) | |
if not news_results: | |
return ("κ²μ κ²°κ³Όκ° μμ΅λλ€.", []) | |
# νκ΅ λλ©μΈ / νκ΅ ν€μλ κΈ°μ¬ μ μΈ | |
korean_domains = [ | |
'.kr', 'korea', 'korean', 'yonhap', 'hankyung', 'chosun', | |
'donga', 'joins', 'hani', 'koreatimes', 'koreaherald' | |
] | |
korean_keywords = [ | |
'korea', 'korean', 'seoul', 'busan', 'incheon', 'daegu', | |
'gwangju', 'daejeon', 'ulsan', 'sejong' | |
] | |
filtered_articles = [] | |
for idx, result in enumerate(news_results, 1): | |
url = result.get("url", result.get("link", "")).lower() | |
title = result.get("title", "").lower() | |
channel = result.get("channel", result.get("source", "")).lower() | |
# νκ΅ κ΄λ ¨ κΈ°μ¬ νν° | |
is_korean_content = ( | |
any(domain in url or domain in channel for domain in korean_domains) or | |
any(keyword in title for keyword in korean_keywords) | |
) | |
if not is_korean_content: | |
filtered_articles.append({ | |
"index": idx, | |
"title": result.get("title", "μ λͺ© μμ"), | |
"link": url, | |
"snippet": result.get("snippet", "λ΄μ© μμ"), | |
"channel": result.get("channel", result.get("source", "μ μ μμ")), | |
"time": result.get("time", result.get("date", "μ μ μλ μκ°")), | |
"image_url": result.get("img", result.get("thumbnail", "")), | |
"translated_query": translated_query | |
}) | |
return ("", filtered_articles) | |
except Exception as e: | |
return (f"κ²°κ³Ό μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}", []) | |
######################################################### | |
# κ΅κ° μ€μ | |
######################################################### | |
COUNTRY_LANGUAGES = { | |
"United States": "en", | |
"KOREA": "ko", | |
"United Kingdom": "en", | |
"Taiwan": "zh-TW", | |
"Canada": "en", | |
"Australia": "en", | |
"Germany": "de", | |
"France": "fr", | |
"Japan": "ja", | |
"China": "zh", | |
"India": "hi", | |
"Brazil": "pt", | |
"Mexico": "es", | |
"Russia": "ru", | |
"Italy": "it", | |
"Spain": "es", | |
"Netherlands": "nl", | |
"Singapore": "en", | |
"Hong Kong": "zh-HK", | |
"Indonesia": "id", | |
"Malaysia": "ms", | |
"Philippines": "tl", | |
"Thailand": "th", | |
"Vietnam": "vi", | |
"Belgium": "nl", | |
"Denmark": "da", | |
"Finland": "fi", | |
"Ireland": "en", | |
"Norway": "no", | |
"Poland": "pl", | |
"Sweden": "sv", | |
"Switzerland": "de", | |
"Austria": "de", | |
"Czech Republic": "cs", | |
"Greece": "el", | |
"Hungary": "hu", | |
"Portugal": "pt", | |
"Romania": "ro", | |
"Turkey": "tr", | |
"Israel": "he", | |
"Saudi Arabia": "ar", | |
"United Arab Emirates": "ar", | |
"South Africa": "en", | |
"Argentina": "es", | |
"Chile": "es", | |
"Colombia": "es", | |
"Peru": "es", | |
"Venezuela": "es", | |
"New Zealand": "en", | |
"Bangladesh": "bn", | |
"Pakistan": "ur", | |
"Egypt": "ar", | |
"Morocco": "ar", | |
"Nigeria": "en", | |
"Kenya": "sw", | |
"Ukraine": "uk", | |
"Croatia": "hr", | |
"Slovakia": "sk", | |
"Bulgaria": "bg", | |
"Serbia": "sr", | |
"Estonia": "et", | |
"Latvia": "lv", | |
"Lithuania": "lt", | |
"Slovenia": "sl", | |
"Luxembourg": "Luxembourg", | |
"Malta": "Malta", | |
"Cyprus": "Cyprus", | |
"Iceland": "is" | |
} | |
COUNTRY_LOCATIONS = { | |
"United States": "United States", | |
"KOREA": "kr", | |
"United Kingdom": "United Kingdom", | |
"Taiwan": "Taiwan", | |
"Canada": "Canada", | |
"Australia": "Australia", | |
"Germany": "Germany", | |
"France": "France", | |
"Japan": "Japan", | |
"China": "China", | |
"India": "India", | |
"Brazil": "Brazil", | |
"Mexico": "Mexico", | |
"Russia": "Russia", | |
"Italy": "Italy", | |
"Spain": "Spain", | |
"Netherlands": "Netherlands", | |
"Singapore": "Singapore", | |
"Hong Kong": "Hong Kong", | |
"Indonesia": "Indonesia", | |
"Malaysia": "Malaysia", | |
"Philippines": "Philippines", | |
"Thailand": "Thailand", | |
"Vietnam": "Vietnam", | |
"Belgium": "Belgium", | |
"Denmark": "Denmark", | |
"Finland": "Finland", | |
"Ireland": "Ireland", | |
"Norway": "Norway", | |
"Poland": "Poland", | |
"Sweden": "Sweden", | |
"Switzerland": "Switzerland", | |
"Austria": "Austria", | |
"Czech Republic": "Czech Republic", | |
"Greece": "Greece", | |
"Hungary": "Hungary", | |
"Portugal": "Portugal", | |
"Romania": "Romania", | |
"Turkey": "Turkey", | |
"Israel": "Israel", | |
"Saudi Arabia": "Saudi Arabia", | |
"United Arab Emirates": "United Arab Emirates", | |
"South Africa": "South Africa", | |
"Argentina": "Argentina", | |
"Chile": "Chile", | |
"Colombia": "Colombia", | |
"Peru": "Peru", | |
"Venezuela": "Venezuela", | |
"New Zealand": "New Zealand", | |
"Bangladesh": "Bangladesh", | |
"Pakistan": "Pakistan", | |
"Egypt": "Egypt", | |
"Morocco": "Morocco", | |
"Nigeria": "Nigeria", | |
"Kenya": "Kenya", | |
"Ukraine": "Ukraine", | |
"Croatia": "Croatia", | |
"Slovakia": "Slovakia", | |
"Bulgaria": "Bulgaria", | |
"Serbia": "Serbia", | |
"Estonia": "et", | |
"Latvia": "lv", | |
"Lithuania": "lt", | |
"Slovenia": "sl", | |
"Luxembourg": "Luxembourg", | |
"Malta": "Malta", | |
"Cyprus": "Cyprus", | |
"Iceland": "Iceland" | |
} | |
######################################################### | |
# κ²μ/μΆλ ₯ ν¨μ (κΈ°μ κ²μ, λ‘λ) | |
######################################################### | |
def search_company(company): | |
""" | |
KOREAN_COMPANIESμ μλ companyλ₯Ό λ―Έκ΅(United States) κΈ°μ€μΌλ‘ κ²μ + κ°μ± λΆμ | |
DBμ μ μ₯ ν κΈ°μ¬ λͺ©λ‘ + λΆμ κ²°κ³Όλ₯Ό markdownμΌλ‘ λ°ν. | |
""" | |
error_message, articles = serphouse_search(company, "United States") | |
if not error_message and articles: | |
analysis = analyze_sentiment_batch(articles, client) | |
store_dict = { | |
"articles": articles, | |
"analysis": analysis | |
} | |
# DB μ μ₯ | |
save_to_db(company, "United States", store_dict) | |
# κ²°κ³Ό μΆλ ₯ | |
output = display_results(articles) | |
output += f"\n\n### λΆμ λ³΄κ³ \n{analysis}\n" | |
return output | |
else: | |
if error_message: | |
return error_message | |
return f"{company}μ λν κ²μ κ²°κ³Όκ° μμ΅λλ€." | |
def load_company(company): | |
""" | |
DBμ μ μ₯λ (company, United States) κ²μ κ²°κ³Όλ₯Ό λΆλ¬μ κΈ°μ¬+λΆμ μ λ°ν. | |
""" | |
data, timestamp = load_from_db(company, "United States") | |
if data: | |
articles = data.get("articles", []) | |
analysis = data.get("analysis", "") | |
output = f"### {company} κ²μ κ²°κ³Ό\nμ μ₯ μκ°: {timestamp}\n\n" | |
output += display_results(articles) | |
output += f"\n\n### λΆμ λ³΄κ³ \n{analysis}\n" | |
return output | |
return f"{company}μ λν μ μ₯λ κ²°κ³Όκ° μμ΅λλ€." | |
######################################################### | |
# μ 체 ν΅κ³ | |
######################################################### | |
def show_stats(): | |
""" | |
(κΈ°μ‘΄ "νκ΅ κΈ°μ λ΄μ€ λΆμ 리ν¬νΈ") -> "EarnBOT λΆμ 리ν¬νΈ" λ‘ λͺ μΉ λ³κ²½ | |
κ° κΈ°μ μ μ΅μ DB κΈ°λ‘(κΈ°μ¬+λΆμ) μλ₯Ό νμνκ³ , | |
κ°μ± λΆμ κ²°κ³Όλ₯Ό ν¨κ» μΆλ ₯. | |
""" | |
conn = sqlite3.connect("search_results.db") | |
c = conn.cursor() | |
output = "## EarnBOT λΆμ 리ν¬νΈ\n\n" | |
data_list = [] | |
for company in KOREAN_COMPANIES: | |
c.execute(""" | |
SELECT results, timestamp | |
FROM searches | |
WHERE keyword = ? | |
ORDER BY timestamp DESC | |
LIMIT 1 | |
""", (company,)) | |
row = c.fetchone() | |
if row: | |
results_json, tstamp = row | |
data_list.append((company, tstamp, results_json)) | |
conn.close() | |
def analyze_data(item): | |
comp, tstamp, results_json = item | |
data = json.loads(results_json) | |
articles = data.get("articles", []) | |
analysis = data.get("analysis", "") | |
count_articles = len(articles) | |
return (comp, tstamp, count_articles, analysis) | |
results_list = [] | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
futures = [executor.submit(analyze_data, dl) for dl in data_list] | |
for future in as_completed(futures): | |
results_list.append(future.result()) | |
for comp, tstamp, count, analysis in results_list: | |
seoul_time = convert_to_seoul_time(tstamp) | |
output += f"### {comp}\n" | |
output += f"- λ§μ§λ§ μ λ°μ΄νΈ: {seoul_time}\n" | |
output += f"- μ μ₯λ κΈ°μ¬ μ: {count}건\n\n" | |
if analysis: | |
output += "#### λ΄μ€ κ°μ± λΆμ\n" | |
output += f"{analysis}\n\n" | |
output += "---\n\n" | |
return output | |
######################################################### | |
# μ 체 κ²μ/μΆλ ₯ + μ’ ν© λ³΄κ³ | |
######################################################### | |
def search_all_companies(): | |
""" | |
KOREAN_COMPANIES 리μ€νΈμ λν΄ λ³λ ¬ κ²μ + λΆμ | |
""" | |
overall_result = "# [μ 체 κ²μ κ²°κ³Ό]\n\n" | |
def do_search(comp): | |
return comp, search_company(comp) | |
with ThreadPoolExecutor(max_workers=5) as executor: | |
futures = [executor.submit(do_search, c) for c in KOREAN_COMPANIES] | |
for future in as_completed(futures): | |
comp, res_text = future.result() | |
overall_result += f"## {comp}\n" | |
overall_result += res_text + "\n\n" | |
return overall_result | |
def load_all_companies(): | |
""" | |
KOREAN_COMPANIES 리μ€νΈμ λν΄ DBμ μ μ₯λ κ°(κΈ°μ¬+λΆμ) μΌκ΄ μΆλ ₯ | |
""" | |
overall_result = "# [μ 체 μΆλ ₯ κ²°κ³Ό]\n\n" | |
for comp in KOREAN_COMPANIES: | |
overall_result += f"## {comp}\n" | |
overall_result += load_company(comp) | |
overall_result += "\n" | |
return overall_result | |
def full_summary_report(): | |
""" | |
1) search_all_companies => κΈ°μ¬+λΆμ => DB μ μ₯ | |
2) load_all_companies => DB λ‘λ | |
3) show_stats => μ’ ν© κ°μ± λΆμ | |
""" | |
search_result_text = search_all_companies() | |
load_result_text = load_all_companies() | |
stats_text = show_stats() | |
combined_report = ( | |
"# μ 체 λΆμ λ³΄κ³ μμ½\n\n" | |
"μλ μμλ‘ μ€νλμμ΅λλ€:\n" | |
"1. λͺ¨λ μ’ λͺ© κ²μ(λ³λ ¬) + λΆμ => 2. λͺ¨λ μ’ λͺ© DB κ²°κ³Ό μΆλ ₯ => 3. μ 체 κ°μ± λΆμ ν΅κ³\n\n" | |
f"{search_result_text}\n\n" | |
f"{load_result_text}\n\n" | |
"## [μ 체 κ°μ± λΆμ ν΅κ³]\n\n" | |
f"{stats_text}" | |
) | |
return combined_report | |
######################################################### | |
# (μΆκ°) μ¬μ©μ μμ κ²μ + λΆμ | |
######################################################### | |
def search_custom(query, country): | |
""" | |
1) query & countryμ λν΄ κ²μ + λΆμ => DBμ μ₯ | |
2) DBμμ λ€μ λ‘λ© => κΈ°μ¬ + λΆμ κ²°κ³Ό νμ | |
""" | |
error_message, articles = serphouse_search(query, country) | |
if error_message: | |
return f"μ€λ₯ λ°μ: {error_message}" | |
if not articles: | |
return "κ²μ κ²°κ³Όκ° μμ΅λλ€." | |
analysis = analyze_sentiment_batch(articles, client) | |
save_data = { | |
"articles": articles, | |
"analysis": analysis | |
} | |
save_to_db(query, country, save_data) | |
loaded_data, timestamp = load_from_db(query, country) | |
if not loaded_data: | |
return "DBμμ λ‘λ μ€ν¨" | |
arts = loaded_data.get("articles", []) | |
analy = loaded_data.get("analysis", "") | |
out = f"## [μ¬μ©μ μμ κ²μ κ²°κ³Ό]\n\n" | |
out += f"**ν€μλ**: {query}\n\n" | |
out += f"**κ΅κ°**: {country}\n\n" | |
out += f"**μ μ₯ μκ°**: {timestamp}\n\n" | |
out += display_results(arts) | |
out += f"### λ΄μ€ κ°μ± λΆμ\n{analy}\n" | |
return out | |
######################################################### | |
# (μΆκ°) νμ€ν 리 ν¨μ | |
######################################################### | |
def get_custom_search_history(): | |
""" | |
KOREAN_COMPANIES λͺ©λ‘μ μλ keywordλ‘ κ²μλ κΈ°λ‘λ§ (id, label) 리μ€νΈλ‘ λ°ν. | |
label μ: "12 | 2025-01-22 10:23:00 KST | Apple (United States)" | |
""" | |
company_set = set(k.lower() for k in KOREAN_COMPANIES) | |
conn = sqlite3.connect("search_results.db") | |
c = conn.cursor() | |
c.execute("""SELECT id, keyword, country, timestamp | |
FROM searches | |
ORDER BY timestamp DESC""") | |
rows = c.fetchall() | |
conn.close() | |
history_list = [] | |
for sid, kw, cty, ts in rows: | |
if kw.lower() not in company_set: | |
display_time = convert_to_seoul_time(ts) | |
label = f"{sid} | {display_time} | {kw} ({cty})" | |
history_list.append((str(sid), label)) | |
return history_list | |
def view_history_record(record_id): | |
""" | |
μ£Όμ΄μ§ record_id λ‘λΆν° load_by_id() λ‘ λ‘λν κΈ°μ¬+λΆμ κ²°κ³Όλ₯Ό Markdown νμ | |
""" | |
if not record_id: | |
return "κΈ°λ‘μ΄ μμ΅λλ€." | |
data = load_by_id(int(record_id)) | |
if not data: | |
return "ν΄λΉ IDμ κΈ°λ‘μ΄ μμ΅λλ€." | |
keyword = data["keyword"] | |
country = data["country"] | |
timestamp = data["timestamp"] | |
stored = data["data"] # {"articles": [...], "analysis": ...} | |
articles = stored.get("articles", []) | |
analysis = stored.get("analysis", "") | |
out = f"### [νμ€ν 리 κ²μ κ²°κ³Ό]\n\n" | |
out += f"- ID: {record_id}\n" | |
out += f"- ν€μλ: {keyword}\n" | |
out += f"- κ΅κ°: {country}\n" | |
out += f"- μ μ₯ μκ°: {timestamp}\n\n" | |
out += display_results(articles) | |
out += f"\n\n### λΆμ λ³΄κ³ \n{analysis}\n" | |
return out | |
######################################################### | |
# Gradio μΈν°νμ΄μ€ | |
######################################################### | |
ACCESS_TOKEN = os.getenv("HF_TOKEN") | |
if not ACCESS_TOKEN: | |
raise ValueError("HF_TOKEN environment variable is not set") | |
client = OpenAI( | |
base_url="https://api-inference.huggingface.co/v1/", | |
api_key=ACCESS_TOKEN, | |
) | |
css = """ | |
/* μ μ μ€νμΌ */ | |
footer {visibility: hidden;} | |
/* κΈ°ν CSS λ±... (μ΄ν λμΌ) */ | |
""" | |
with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", css=css, title="NewsAI μλΉμ€") as iface: | |
init_db() | |
# μνλ νλ€ κ΅¬μ± | |
with gr.Tabs(): | |
with gr.Tab("μ§μ μλ κ²μ/λΆμ"): | |
gr.Markdown("## EarnBot: κΈλ‘λ² λΉ ν ν¬ κΈ°μ λ° ν¬μ μ λ§ AI μλ λΆμ") | |
gr.Markdown("- 'μ 체 λΆμ λ³΄κ³ μμ½' ν΄λ¦ μ μ 체 μλ λ³΄κ³ μμ±.\n" | |
"- μλ κ°λ³ μ’ λͺ©μ 'κ²μ(DB μλ μ μ₯)'κ³Ό 'μΆλ ₯(DB μλ νΈμΆ)'λ κ°λ₯.\n" | |
"- νλ¨ 'μλ κ²μ νμ€ν 리'μμ μ΄μ μ μλ μ λ ₯ν κ²μμ΄ κΈ°λ‘μ νμΈ κ°λ₯.") | |
with gr.Row(): | |
full_report_btn = gr.Button("μ 체 λΆμ λ³΄κ³ μμ½", variant="primary") | |
full_report_display = gr.Markdown() | |
full_report_btn.click(fn=full_summary_report, outputs=full_report_display) | |
# κ°λ³ μ’ λͺ© κ²μ/μΆλ ₯ | |
with gr.Column(): | |
for i in range(0, len(KOREAN_COMPANIES), 2): | |
with gr.Row(): | |
# μΌμͺ½ μ΄ | |
with gr.Column(): | |
company = KOREAN_COMPANIES[i] | |
with gr.Group(): | |
gr.Markdown(f"### {company}") | |
with gr.Row(): | |
search_btn = gr.Button("κ²μ", variant="primary") | |
load_btn = gr.Button("μΆλ ₯", variant="secondary") | |
result_display = gr.Markdown() | |
search_btn.click( | |
fn=lambda c=company: search_company(c), | |
inputs=[], | |
outputs=result_display | |
) | |
load_btn.click( | |
fn=lambda c=company: load_company(c), | |
inputs=[], | |
outputs=result_display | |
) | |
# μ€λ₯Έμͺ½ μ΄ | |
if i + 1 < len(KOREAN_COMPANIES): | |
with gr.Column(): | |
company = KOREAN_COMPANIES[i + 1] | |
with gr.Group(): | |
gr.Markdown(f"### {company}") | |
with gr.Row(): | |
search_btn = gr.Button("κ²μ", variant="primary") | |
load_btn = gr.Button("μΆλ ₯", variant="secondary") | |
result_display = gr.Markdown() | |
search_btn.click( | |
fn=lambda c=company: search_company(c), | |
inputs=[], | |
outputs=result_display | |
) | |
load_btn.click( | |
fn=lambda c=company: load_company(c), | |
inputs=[], | |
outputs=result_display | |
) | |
gr.Markdown("---") | |
gr.Markdown("### μλ κ²μ νμ€ν 리") | |
with gr.Row(): | |
refresh_hist_btn = gr.Button("νμ€ν 리 κ°±μ ", variant="secondary") | |
history_dropdown = gr.Dropdown(label="κ²μ κΈ°λ‘ λͺ©λ‘", choices=[], value=None) | |
hist_view_btn = gr.Button("보기", variant="primary") | |
hist_result_display = gr.Markdown() | |
def update_history_dropdown(): | |
history_list = get_custom_search_history() | |
choice_list = [] | |
for (id_val, label) in history_list: | |
choice_list.append(label) | |
return gr.update(choices=choice_list, value=None) | |
refresh_hist_btn.click( | |
fn=update_history_dropdown, | |
inputs=[], | |
outputs=history_dropdown | |
) | |
def show_history_record(selected_label): | |
if not selected_label: | |
return "νμ€ν λ¦¬κ° μ νλμ§ μμμ΅λλ€." | |
splitted = selected_label.split("|") | |
if len(splitted) < 2: | |
return "νμ μ€λ₯" | |
record_id = splitted[0].strip() | |
return view_history_record(record_id) | |
hist_view_btn.click( | |
fn=show_history_record, | |
inputs=[history_dropdown], | |
outputs=hist_result_display | |
) | |
# λ λ²μ§Έ ν: "μλ κ²μ/λΆμ" | |
with gr.Tab("μλ κ²μ/λΆμ"): | |
gr.Markdown("## μ¬μ©μ μμ ν€μλ + κ΅κ° κ²μ/λΆμ") | |
gr.Markdown("κ²μ κ²°κ³Όκ° DBμ μ μ₯λλ©°, μλ 'μλ κ²μ νμ€ν 리'μμλ νμΈ κ°λ₯ν©λλ€.") | |
with gr.Row(): | |
with gr.Column(): | |
user_input = gr.Textbox( | |
label="κ²μμ΄ μ λ ₯", | |
placeholder="μ) Apple, Samsung λ± μμ λ‘κ²" | |
) | |
with gr.Column(): | |
country_selection = gr.Dropdown( | |
choices=list(COUNTRY_LOCATIONS.keys()), | |
value="United States", | |
label="κ΅κ° μ ν" | |
) | |
with gr.Column(): | |
custom_search_btn = gr.Button("μ€ν", variant="primary") | |
custom_search_output = gr.Markdown() | |
custom_search_btn.click( | |
fn=search_custom, | |
inputs=[user_input, country_selection], | |
outputs=custom_search_output | |
) | |
gr.Markdown("---") | |
gr.Markdown("### μλ κ²μ νμ€ν 리 (λ λ²μ§Έ ν)") | |
with gr.Row(): | |
refresh_hist_btn2 = gr.Button("νμ€ν 리 κ°±μ ", variant="secondary") | |
history_dropdown2 = gr.Dropdown(label="κ²μ κΈ°λ‘ λͺ©λ‘", choices=[], value=None) | |
hist_view_btn2 = gr.Button("보기", variant="primary") | |
hist_result_display2 = gr.Markdown() | |
refresh_hist_btn2.click( | |
fn=update_history_dropdown, | |
inputs=[], | |
outputs=history_dropdown2 | |
) | |
hist_view_btn2.click( | |
fn=show_history_record, | |
inputs=[history_dropdown2], | |
outputs=hist_result_display2 | |
) | |
iface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=True, | |
ssl_verify=False, | |
show_error=True | |
) | |