import gradio as gr import requests import json import os from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from openai import OpenAI from bs4 import BeautifulSoup import re import pathlib import sqlite3 import pytz # 한국 기업 리스트 KOREAN_COMPANIES = [ "NVIDIA", "ALPHABET", "APPLE", "TESLA", "AMAZON", "MICROSOFT", "META", "INTEL", "SAMSUNG", "HYNIX", "BITCOIN", "crypto", "stock", "Economics", "Finance", "investing" ] ###################################################################### # 공통 함수: 시간 변환 ###################################################################### def convert_to_seoul_time(timestamp_str): """ 주어진 'YYYY-MM-DD HH:MM:SS' 형태의 시각(UTC 기준 등)을 'YYYY-MM-DD HH:MM:SS KST' 형태로 변환하여 반환. """ try: dt = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') seoul_tz = pytz.timezone('Asia/Seoul') seoul_time = seoul_tz.localize(dt) return seoul_time.strftime('%Y-%m-%d %H:%M:%S KST') except Exception as e: print(f"시간 변환 오류: {str(e)}") return timestamp_str ###################################################################### # 공통 함수: 감성 분석 ###################################################################### def analyze_sentiment_batch(articles, client): """ OpenAI API를 통해, 뉴스 기사들의 제목+내용을 종합하여 감성 분석을 수행. - 결과를 한국어로 작성하도록 프롬프트 내에 명시. """ try: # 기사들의 title/snippet 결합 combined_text = "\n\n".join([ f"제목: {article.get('title', '')}\n내용: {article.get('snippet', '')}" for article in articles ]) # 한국어로 작성할 것을 유도하는 문구 prompt = f"""다음 뉴스 모음에 대해 전반적인 감성 분석을 수행하세요. (한국어로 작성하세요) 뉴스 내용: {combined_text} 다음 형식으로 분석해주세요: 1. 전반적 감성: [긍정/부정/중립] 2. 주요 긍정적 요소: - [항목1] - [항목2] 3. 주요 부정적 요소: - [항목1] - [항목2] 4. 종합 평가: [상세 설명] """ response = client.chat.completions.create( model="CohereForAI/c4ai-command-r-plus-08-2024", messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=1000 ) return response.choices[0].message.content except Exception as e: return f"감성 분석 실패: {str(e)}" ###################################################################### # DB 초기화 및 입출력 ###################################################################### def init_db(): """ SQLite DB 파일(search_results.db)이 없으면 생성, 'searches' 테이블이 없으면 생성 """ db_path = pathlib.Path("search_results.db") conn = sqlite3.connect(db_path) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS searches (id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT, country TEXT, results TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') conn.commit() conn.close() def save_to_db(keyword, country, results): """ (keyword, country)에 대한 결과(JSON)를 DB에 insert. """ conn = sqlite3.connect("search_results.db") c = conn.cursor() seoul_tz = pytz.timezone('Asia/Seoul') now = datetime.now(seoul_tz) timestamp = now.strftime('%Y-%m-%d %H:%M:%S') c.execute("""INSERT INTO searches (keyword, country, results, timestamp) VALUES (?, ?, ?, ?)""", (keyword, country, json.dumps(results), timestamp)) conn.commit() conn.close() def load_from_db(keyword, country): """ DB에서 (keyword, country)에 해당하는 가장 최근 기록을 로드 - 성공시 (json.loads(...)된 results, KST 시간) - 실패시 (None, None) """ conn = sqlite3.connect("search_results.db") c = conn.cursor() c.execute("""SELECT results, timestamp FROM searches WHERE keyword=? AND country=? ORDER BY timestamp DESC LIMIT 1""", (keyword, country)) row = c.fetchone() conn.close() if row: return json.loads(row[0]), convert_to_seoul_time(row[1]) return None, None ###################################################################### # SerpHouse API (검색 함수들) ###################################################################### API_KEY = os.getenv("SERPHOUSE_API_KEY") def is_english(text): """ 텍스트가 전부 ASCII 범위면 True, 아니면 False """ return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', '')) @lru_cache(maxsize=100) def translate_query(query, country): """ query를 해당 country 언어로 번역 """ try: # 이미 영어면 그냥 반환 if is_english(query): return query if country in COUNTRY_LANGUAGES: target_lang = COUNTRY_LANGUAGES[country] url = "https://translate.googleapis.com/translate_a/single" params = { "client": "gtx", "sl": "auto", "tl": target_lang, "dt": "t", "q": query } session = requests.Session() retries = Retry(total=3, backoff_factor=0.5) session.mount('https://', HTTPAdapter(max_retries=retries)) resp = session.get(url, params=params, timeout=(5, 10)) translated_text = resp.json()[0][0][0] return translated_text return query except Exception as e: print(f"번역 오류: {str(e)}") return query def search_serphouse(query, country, page=1, num_result=10): """ SerpHouse API 실시간 검색 -> 'news' (sort_by=date) """ url = "https://api.serphouse.com/serp/live" now = datetime.utcnow() yesterday = now - timedelta(days=1) date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" translated_query = translate_query(query, country) payload = { "data": { "q": translated_query, "domain": "google.com", "loc": COUNTRY_LOCATIONS.get(country, "United States"), "lang": COUNTRY_LANGUAGES.get(country, "en"), "device": "desktop", "serp_type": "news", "page": str(page), "num": "100", "date_range": date_range, "sort_by": "date" } } headers = { "accept": "application/json", "content-type": "application/json", "authorization": f"Bearer {API_KEY}" } try: session = requests.Session() retries = Retry( total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["POST"] ) adapter = HTTPAdapter(max_retries=retries) session.mount('http://', adapter) session.mount('https://', adapter) resp = session.post(url, json=payload, headers=headers, timeout=(30, 30)) resp.raise_for_status() # 응답 JSON return { "results": resp.json(), "translated_query": translated_query } except requests.exceptions.Timeout: return { "error": "검색 시간이 초과되었습니다. 잠시 후 다시 시도해주세요.", "translated_query": query } except requests.exceptions.RequestException as e: return { "error": f"검색 중 오류가 발생했습니다: {str(e)}", "translated_query": query } except Exception as e: return { "error": f"예기치 않은 오류가 발생했습니다: {str(e)}", "translated_query": query } def format_results_from_raw(response_data): """ SerpHouse API 응답을 (error_message, articles_list) 형태로 가공 - 한국 도메인(kr, korea, etc) 제외 - empty시 "검색 결과가 없습니다." """ if "error" in response_data: return "Error: " + response_data["error"], [] try: results = response_data["results"] translated_query = response_data["translated_query"] # 실제 뉴스 탭 결과 news_results = results.get('results', {}).get('results', {}).get('news', []) if not news_results: return "검색 결과가 없습니다.", [] # 한국어 제외 korean_domains = [ '.kr', 'korea', 'korean', 'yonhap', 'hankyung', 'chosun', 'donga', 'joins', 'hani', 'koreatimes', 'koreaherald' ] korean_keywords = [ 'korea', 'korean', 'seoul', 'busan', 'incheon', 'daegu', 'gwangju', 'daejeon', 'ulsan', 'sejong' ] filtered_articles = [] for idx, result in enumerate(news_results, 1): url = result.get("url", result.get("link", "")).lower() title = result.get("title", "").lower() channel = result.get("channel", result.get("source", "")).lower() is_korean_content = ( any(domain in url or domain in channel for domain in korean_domains) or any(keyword in title for keyword in korean_keywords) ) if not is_korean_content: filtered_articles.append({ "index": idx, "title": result.get("title", "제목 없음"), "link": url, "snippet": result.get("snippet", "내용 없음"), "channel": result.get("channel", result.get("source", "알 수 없음")), "time": result.get("time", result.get("date", "알 수 없는 시간")), "image_url": result.get("img", result.get("thumbnail", "")), "translated_query": translated_query }) return "", filtered_articles except Exception as e: return f"결과 처리 중 오류 발생: {str(e)}", [] def serphouse_search(query, country): """ 전체 파이프라인 (search_serphouse -> format_results_from_raw) 반환: (error_message, articles_list) """ response_data = search_serphouse(query, country) return format_results_from_raw(response_data) ###################################################################### # 뉴스 기사 목록 -> Markdown ###################################################################### def display_results(articles): """ 기사 목록을 Markdown 문자열로 변환 """ output = "" for idx, article in enumerate(articles, 1): output += f"### {idx}. {article['title']}\n" output += f"출처: {article['channel']}\n" output += f"시간: {article['time']}\n" output += f"링크: {article['link']}\n" output += f"요약: {article['snippet']}\n\n" return output ###################################################################### # 한국 기업 목록 (이미 선언됨) ###################################################################### ###################################################################### # 검색/출력 함수 ###################################################################### def search_company(company): """ 미국(United States) 뉴스 검색 -> 감성분석(한국어) -> DB저장 -> Markdown 반환 """ error_message, articles = serphouse_search(company, "United States") if not error_message and articles: analysis = analyze_sentiment_batch(articles, client) data_to_store = { "articles": articles, "analysis": analysis } save_to_db(company, "United States", data_to_store) out = display_results(articles) out += f"\n\n### 분석 보고\n{analysis}\n" return out else: if error_message: return error_message return f"{company}에 대한 검색 결과가 없습니다." def load_company(company): """ DB에서 (company, United States) 검색 결과를 불러와 기사+분석 출력 """ loaded, ts = load_from_db(company, "United States") if loaded: articles = loaded.get("articles", []) analysis = loaded.get("analysis", "") out = f"### {company} 검색 결과\n저장 시간: {ts}\n\n" out += display_results(articles) out += f"\n\n### 분석 보고\n{analysis}\n" return out return f"{company}에 대한 저장된 결과가 없습니다." ###################################################################### # 통계 (EarnBOT 분석 리포트) ###################################################################### def show_stats(): """ KOREAN_COMPANIES 내 모든 기업의 가장 최근 DB 결과 -> 기사수, 분석, timestamp """ conn = sqlite3.connect("search_results.db") c = conn.cursor() output = "## EarnBOT 분석 리포트\n\n" # DB에서 각 기업의 최신 저장 기록 data_list = [] for comp in KOREAN_COMPANIES: c.execute(""" SELECT results, timestamp FROM searches WHERE keyword=? ORDER BY timestamp DESC LIMIT 1 """, (comp,)) row = c.fetchone() if row: results_json, ts = row data_list.append((comp, ts, results_json)) conn.close() def analyze_data(item): comp, tstamp, json_str = item data_obj = json.loads(json_str) articles = data_obj.get("articles", []) analysis = data_obj.get("analysis", "") count_articles = len(articles) return (comp, tstamp, count_articles, analysis) results_list = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(analyze_data, dl) for dl in data_list] for future in as_completed(futures): results_list.append(future.result()) for comp, tstamp, count_articles, analysis in results_list: kst_time = convert_to_seoul_time(tstamp) output += f"### {comp}\n" output += f"- 마지막 업데이트: {kst_time}\n" output += f"- 저장된 기사 수: {count_articles}건\n\n" if analysis: output += "#### 뉴스 감성 분석\n" output += f"{analysis}\n\n" output += "---\n\n" return output ###################################################################### # 전체 검색+출력+분석 종합 ###################################################################### def search_all_companies(): """ 모든 기업 병렬 검색+분석 -> DB 저장 -> Markdown 출력 """ overall = "# [전체 검색 결과]\n\n" def do_search(comp): return comp, search_company(comp) with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(do_search, c) for c in KOREAN_COMPANIES] for future in as_completed(futures): comp, res = future.result() overall += f"## {comp}\n" overall += res + "\n\n" return overall def load_all_companies(): """ 모든 기업 DB 로드 -> 기사+분석 """ overall = "# [전체 출력 결과]\n\n" for comp in KOREAN_COMPANIES: overall += f"## {comp}\n" overall += load_company(comp) overall += "\n" return overall def full_summary_report(): """ 1) 전체 검색+분석 => DB 2) 전체 DB 로드 3) 감성 분석 통계 """ search_text = search_all_companies() load_text = load_all_companies() stats_text = show_stats() combined = ( "# 전체 분석 보고 요약\n\n" "아래 순서로 실행되었습니다:\n" "1. 모든 종목 검색(병렬) + 분석 => 2. 모든 종목 DB 결과 출력 => 3. 전체 감성 분석 통계\n\n" f"{search_text}\n\n" f"{load_text}\n\n" "## [전체 감성 분석 통계]\n\n" f"{stats_text}" ) return combined ###################################################################### # 사용자 임의 검색 ###################################################################### def search_custom(query, country): """ 1) query & country -> 검색+분석 2) DB 저장 3) DB 재로드 -> 기사+분석 출력 """ error_message, articles = serphouse_search(query, country) if error_message: return f"오류 발생: {error_message}" if not articles: return "검색 결과가 없습니다." analysis = analyze_sentiment_batch(articles, client) store = { "articles": articles, "analysis": analysis } save_to_db(query, country, store) loaded, ts = load_from_db(query, country) if not loaded: return "DB에서 로드 실패" arts = loaded.get("articles", []) analy = loaded.get("analysis", "") out = f"## [사용자 임의 검색 결과]\n\n" out += f"**키워드**: {query}\n\n" out += f"**국가**: {country}\n\n" out += f"**저장 시간**: {ts}\n\n" out += display_results(arts) out += f"### 뉴스 감성 분석\n{analy}\n" return out ###################################################################### # Hugging Face openai Client ###################################################################### ACCESS_TOKEN = os.getenv("HF_TOKEN") if not ACCESS_TOKEN: raise ValueError("HF_TOKEN environment variable is not set") client = OpenAI( base_url="https://api-inference.huggingface.co/v1/", api_key=ACCESS_TOKEN, ) ###################################################################### # 국가 설정 ###################################################################### COUNTRY_LANGUAGES = { "United States": "en", "KOREA": "ko", "United Kingdom": "en", "Taiwan": "zh-TW", "Canada": "en", "Australia": "en", "Germany": "de", "France": "fr", "Japan": "ja", "China": "zh", "India": "hi", "Brazil": "pt", "Mexico": "es", "Russia": "ru", "Italy": "it", "Spain": "es", "Netherlands": "nl", "Singapore": "en", "Hong Kong": "zh-HK", "Indonesia": "id", "Malaysia": "ms", "Philippines": "tl", "Thailand": "th", "Vietnam": "vi", "Belgium": "nl", "Denmark": "da", "Finland": "fi", "Ireland": "en", "Norway": "no", "Poland": "pl", "Sweden": "sv", "Switzerland": "de", "Austria": "de", "Czech Republic": "cs", "Greece": "el", "Hungary": "hu", "Portugal": "pt", "Romania": "ro", "Turkey": "tr", "Israel": "he", "Saudi Arabia": "ar", "United Arab Emirates": "ar", "South Africa": "en", "Argentina": "es", "Chile": "es", "Colombia": "es", "Peru": "es", "Venezuela": "es", "New Zealand": "en", "Bangladesh": "bn", "Pakistan": "ur", "Egypt": "ar", "Morocco": "ar", "Nigeria": "en", "Kenya": "sw", "Ukraine": "uk", "Croatia": "hr", "Slovakia": "sk", "Bulgaria": "bg", "Serbia": "sr", "Estonia": "et", "Latvia": "lv", "Lithuania": "lt", "Slovenia": "sl", "Luxembourg": "Luxembourg", "Malta": "Malta", "Cyprus": "Cyprus", "Iceland": "Iceland" } COUNTRY_LOCATIONS = { "United States": "United States", "KOREA": "kr", "United Kingdom": "United Kingdom", "Taiwan": "Taiwan", "Canada": "Canada", "Australia": "Australia", "Germany": "Germany", "France": "France", "Japan": "Japan", "China": "China", "India": "India", "Brazil": "Brazil", "Mexico": "Mexico", "Russia": "Russia", "Italy": "Italy", "Spain": "Spain", "Netherlands": "Netherlands", "Singapore": "Singapore", "Hong Kong": "Hong Kong", "Indonesia": "Indonesia", "Malaysia": "Malaysia", "Philippines": "Philippines", "Thailand": "Thailand", "Vietnam": "Vietnam", "Belgium": "Belgium", "Denmark": "Denmark", "Finland": "Finland", "Ireland": "Ireland", "Norway": "Norway", "Poland": "Poland", "Sweden": "Sweden", "Switzerland": "Switzerland", "Austria": "Austria", "Czech Republic": "Czech Republic", "Greece": "Greece", "Hungary": "Hungary", "Portugal": "Portugal", "Romania": "Romania", "Turkey": "Turkey", "Israel": "Israel", "Saudi Arabia": "Saudi Arabia", "United Arab Emirates": "United Arab Emirates", "South Africa": "South Africa", "Argentina": "Argentina", "Chile": "Chile", "Colombia": "Colombia", "Peru": "Peru", "Venezuela": "Venezuela", "New Zealand": "New Zealand", "Bangladesh": "Bangladesh", "Pakistan": "Pakistan", "Egypt": "Egypt", "Morocco": "Morocco", "Nigeria": "Nigeria", "Kenya": "Kenya", "Ukraine": "Ukraine", "Croatia": "Croatia", "Slovakia": "Slovakia", "Bulgaria": "Bulgaria", "Serbia": "Serbia", "Estonia": "et", "Latvia": "lv", "Lithuania": "lt", "Slovenia": "sl", "Luxembourg": "Luxembourg", "Malta": "Malta", "Cyprus": "Cyprus", "Iceland": "Iceland" } css = """ /* 전역 스타일 */ footer {visibility: hidden;} /* 레이아웃 스타일, 탭 스타일, 등등 */ #status_area { background: rgba(255, 255, 255, 0.9); padding: 15px; border-bottom: 1px solid #ddd; margin-bottom: 20px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); } #results_area { padding: 10px; margin-top: 10px; } .tabs { border-bottom: 2px solid #ddd !important; margin-bottom: 20px !important; } .tab-nav { border-bottom: none !important; margin-bottom: 0 !important; } .tab-nav button { font-weight: bold !important; padding: 10px 20px !important; } .tab-nav button.selected { border-bottom: 2px solid #1f77b4 !important; color: #1f77b4 !important; } /* 상태 메시지 */ #status_area .markdown-text { font-size: 1.1em; color: #2c3e50; padding: 10px 0; } .group { border: 1px solid #eee; padding: 15px; margin-bottom: 15px; border-radius: 5px; background: white; } /* 버튼 스타일 */ .primary-btn { background: #1f77b4 !important; border: none !important; } /* 기타 ... */ """ import gradio as gr with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", css=css, title="NewsAI 서비스") as iface: # DB 초기화 init_db() with gr.Tabs(): # 첫 번째 탭 with gr.Tab("Earnbot"): gr.Markdown("## EarnBot: 글로벌 빅테크 기업 및 투자 전망 AI 자동 분석") gr.Markdown(" * '전체 분석 보고 요약' 클릭 시 전체 자동 보고 생성.\n * 아래 개별 종목의 '검색(DB 자동 저장)'과 '출력(DB 자동 호출)'도 가능.\n * 추가로, 원하는 임의 키워드 및 국가로 검색/분석할 수도 있습니다.") # 사용자 임의 검색 섹션 with gr.Group(): gr.Markdown("### 사용자 임의 검색") with gr.Row(): with gr.Column(): user_input = gr.Textbox( label="검색어 입력", placeholder="예) Apple, Samsung 등 자유롭게" ) with gr.Column(): country_selection = gr.Dropdown( choices=list(COUNTRY_LOCATIONS.keys()), value="United States", label="국가 선택" ) with gr.Column(): custom_search_btn = gr.Button("실행", variant="primary") custom_search_output = gr.Markdown() # 임의 검색 버튼 클릭 custom_search_btn.click( fn=search_custom, inputs=[user_input, country_selection], outputs=custom_search_output ) # 전체 분석 보고 버튼 with gr.Row(): full_report_btn = gr.Button("전체 분석 보고 요약", variant="primary") full_report_display = gr.Markdown() # 전체 보고 -> full_summary_report full_report_btn.click( fn=full_summary_report, outputs=full_report_display ) # 지정된 기업 목록: 검색 / 출력 with gr.Column(): for i in range(0, len(KOREAN_COMPANIES), 2): with gr.Row(): # 왼쪽 with gr.Column(): company = KOREAN_COMPANIES[i] with gr.Group(): gr.Markdown(f"### {company}") with gr.Row(): search_btn = gr.Button("검색", variant="primary") load_btn = gr.Button("출력", variant="secondary") result_display = gr.Markdown() # 검색 search_btn.click( fn=lambda c=company: search_company(c), outputs=result_display ) # 출력 load_btn.click( fn=lambda c=company: load_company(c), outputs=result_display ) # 오른쪽 if i + 1 < len(KOREAN_COMPANIES): with gr.Column(): company = KOREAN_COMPANIES[i + 1] with gr.Group(): gr.Markdown(f"### {company}") with gr.Row(): search_btn = gr.Button("검색", variant="primary") load_btn = gr.Button("출력", variant="secondary") result_display = gr.Markdown() search_btn.click( fn=lambda c=company: search_company(c), outputs=result_display ) load_btn.click( fn=lambda c=company: load_company(c), outputs=result_display ) iface.launch( server_name="0.0.0.0", server_port=7860, share=True, ssl_verify=False, show_error=True )