import gradio as gr import requests import json import os from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor, as_completed from functools import lru_cache from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from openai import OpenAI from bs4 import BeautifulSoup import re import pathlib import sqlite3 import pytz # 한국 기업 리스트 KOREAN_COMPANIES = [ "NVIDIA", "ALPHABET", "APPLE", "TESLA", "AMAZON", "MICROSOFT", "META", "INTEL", "SAMSUNG", "HYNIX", "BITCOIN", "crypto", "stock", "Economics", "Finance", "investing" ] def convert_to_seoul_time(timestamp_str): try: dt = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') seoul_tz = pytz.timezone('Asia/Seoul') seoul_time = seoul_tz.localize(dt) return seoul_time.strftime('%Y-%m-%d %H:%M:%S KST') except Exception as e: print(f"시간 변환 오류: {str(e)}") return timestamp_str def analyze_sentiment_batch(articles, client): """ OpenAI API를 통해 뉴스 기사들의 종합 감성 분석을 수행 """ try: # 모든 기사의 제목과 내용을 하나의 텍스트로 결합 combined_text = "\n\n".join([ f"제목: {article.get('title', '')}\n내용: {article.get('snippet', '')}" for article in articles ]) prompt = f"""다음 뉴스 모음에 대해 전반적인 감성 분석을 수행하세요: 뉴스 내용: {combined_text} 다음 형식으로 분석해주세요: 1. 전반적 감성: [긍정/부정/중립] 2. 주요 긍정적 요소: - [항목1] - [항목2] 3. 주요 부정적 요소: - [항목1] - [항목2] 4. 종합 평가: [상세 설명] """ response = client.chat.completions.create( model="CohereForAI/c4ai-command-r-plus-08-2024", messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=1000 ) return response.choices[0].message.content except Exception as e: return f"감성 분석 실패: {str(e)}" # DB 초기화 함수 def init_db(): db_path = pathlib.Path("search_results.db") conn = sqlite3.connect(db_path) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS searches (id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT, country TEXT, results TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') conn.commit() conn.close() def save_to_db(keyword, country, results): """ 특정 (keyword, country) 조합에 대한 검색 결과를 DB에 저장 """ conn = sqlite3.connect("search_results.db") c = conn.cursor() seoul_tz = pytz.timezone('Asia/Seoul') now = datetime.now(seoul_tz) timestamp = now.strftime('%Y-%m-%d %H:%M:%S') c.execute("""INSERT INTO searches (keyword, country, results, timestamp) VALUES (?, ?, ?, ?)""", (keyword, country, json.dumps(results), timestamp)) conn.commit() conn.close() def load_from_db(keyword, country): """ 특정 (keyword, country) 조합에 대한 가장 최근 검색 결과를 DB에서 불러오기 """ conn = sqlite3.connect("search_results.db") c = conn.cursor() c.execute("SELECT results, timestamp FROM searches WHERE keyword=? AND country=? ORDER BY timestamp DESC LIMIT 1", (keyword, country)) result = c.fetchone() conn.close() if result: return json.loads(result[0]), convert_to_seoul_time(result[1]) return None, None def display_results(articles): """ 뉴스 기사 목록을 Markdown 문자열로 변환하여 반환 """ output = "" for idx, article in enumerate(articles, 1): output += f"### {idx}. {article['title']}\n" output += f"출처: {article['channel']}\n" output += f"시간: {article['time']}\n" output += f"링크: {article['link']}\n" output += f"요약: {article['snippet']}\n\n" return output def search_company(company): """ 단일 기업(또는 키워드)에 대해 미국 뉴스 검색, DB 저장 후 결과 Markdown 반환 """ error_message, articles = serphouse_search(company, "United States") if not error_message and articles: save_to_db(company, "United States", articles) return display_results(articles) return f"{company}에 대한 검색 결과가 없습니다." def load_company(company): """ DB에서 단일 기업(또는 키워드)의 미국 뉴스 검색 결과를 불러와 Markdown 반환 """ results, timestamp = load_from_db(company, "United States") if results: return f"### {company} 검색 결과\n저장 시간: {timestamp}\n\n" + display_results(results) return f"{company}에 대한 저장된 결과가 없습니다." def show_stats(): """ KOREAN_COMPANIES 목록 내 모든 기업에 대해: - 가장 최근 DB 저장 일자 - 기사 수 - 감성 분석 결과 를 순차(또는 병렬)로 조회하여 보고서 형태로 반환 """ conn = sqlite3.connect("search_results.db") c = conn.cursor() output = "## 한국 기업 뉴스 분석 리포트\n\n" # 모든 기업에 대해 DB에서 읽어올 (company, timestamp, articles) 목록 수집 data_list = [] for company in KOREAN_COMPANIES: c.execute(""" SELECT results, timestamp FROM searches WHERE keyword = ? ORDER BY timestamp DESC LIMIT 1 """, (company,)) row = c.fetchone() if row: results_json, timestamp = row articles = json.loads(results_json) seoul_time = convert_to_seoul_time(timestamp) data_list.append((company, seoul_time, articles)) conn.close() # (옵션) 각 기업 감성 분석을 병렬 처리 def analyze_data(item): comp, tstamp, arts = item sentiment = "" if arts: sentiment = analyze_sentiment_batch(arts, client) return (comp, tstamp, len(arts), sentiment) # ThreadPoolExecutor로 병렬 감성 분석 results_list = [] with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(analyze_data, dl) for dl in data_list] for future in as_completed(futures): results_list.append(future.result()) # 결과 정렬(원하는 순서대로) - 여기서는 기업명 기준 or 그냥 순서 없음 for comp, tstamp, count, sentiment in results_list: output += f"### {comp}\n" output += f"- 마지막 업데이트: {tstamp}\n" output += f"- 저장된 기사 수: {count}건\n\n" if sentiment: output += "#### 뉴스 감성 분석\n" output += f"{sentiment}\n\n" output += "---\n\n" return output ### (1) 전체 검색: 멀티스레드 적용 def search_all_companies(): """ KOREAN_COMPANIES 리스트 내 모든 기업에 대해, 검색을 병렬(쓰레드)로 수행 후 결과를 합쳐 Markdown 형태로 반환 """ overall_result = "# [전체 검색 결과]\n\n" def do_search(comp): return comp, search_company(comp) with ThreadPoolExecutor(max_workers=5) as executor: futures = [executor.submit(do_search, c) for c in KOREAN_COMPANIES] for future in as_completed(futures): comp, res_text = future.result() overall_result += f"## {comp}\n" overall_result += res_text + "\n\n" return overall_result def load_all_companies(): """ KOREAN_COMPANIES 리스트 내 모든 기업에 대해, DB에서 불러온 결과를 순차(또는 병렬)로 합쳐서 Markdown 형태로 반환 """ overall_result = "# [전체 출력 결과]\n\n" for comp in KOREAN_COMPANIES: overall_result += f"## {comp}\n" overall_result += load_company(comp) overall_result += "\n" return overall_result def full_summary_report(): """ (1) 모든 기업 검색 -> (2) DB에서 모든 기업 불러오기 -> (3) 감성 분석 통계 순서대로 실행하여, 전체 리포트를 합쳐 반환 """ # 1) 전체 검색(병렬) search_result_text = search_all_companies() # 2) 전체 출력(순차) load_result_text = load_all_companies() # 3) 전체 통계(감성 분석) stats_text = show_stats() combined_report = ( "# 전체 분석 보고 요약\n\n" "아래 순서로 실행되었습니다:\n" "1. 모든 종목 검색(병렬) → 2. 모든 종목 DB 결과 출력 → 3. 전체 감성 분석 통계\n\n" f"{search_result_text}\n\n" f"{load_result_text}\n\n" "## [전체 감성 분석 통계]\n\n" f"{stats_text}" ) return combined_report ### (2) 사용자 임의 검색 + 국가 선택 기능 def search_custom(query, country): """ 사용자가 입력한 (query, country)를 대상으로 - 검색 (API 요청) - DB 저장 - DB 로드 후 감성 분석 - 최종 결과를 Markdown 형태로 반환 """ # 1) 검색 error_message, articles = serphouse_search(query, country) if error_message: return f"오류 발생: {error_message}" if not articles: return "검색 결과가 없습니다." # 2) DB 저장 save_to_db(query, country, articles) # 3) DB에서 다시 불러오기 results, timestamp = load_from_db(query, country) if not results: return f"DB 로드 실패: 저장된 결과가 없습니다." # 4) 감성 분석 sentiment_analysis = analyze_sentiment_batch(results, client) # 5) 최종 리포트(기사 목록 + 감성 분석) output = f"## [사용자 임의 검색 결과]\n\n" output += f"**키워드**: {query}\n\n" output += f"**국가**: {country}\n\n" output += f"**저장 시간**: {timestamp}\n\n" output += display_results(results) output += "### 뉴스 감성 분석\n" output += f"{sentiment_analysis}\n" return output ### (필수) API 인증 ACCESS_TOKEN = os.getenv("HF_TOKEN") if not ACCESS_TOKEN: raise ValueError("HF_TOKEN environment variable is not set") client = OpenAI( base_url="https://api-inference.huggingface.co/v1/", api_key=ACCESS_TOKEN, ) API_KEY = os.getenv("SERPHOUSE_API_KEY") ### 국가별 설정 COUNTRY_LANGUAGES = { "United States": "en", "KOREA": "ko", "United Kingdom": "en", "Taiwan": "zh-TW", "Canada": "en", "Australia": "en", "Germany": "de", "France": "fr", "Japan": "ja", "China": "zh", "India": "hi", "Brazil": "pt", "Mexico": "es", "Russia": "ru", "Italy": "it", "Spain": "es", "Netherlands": "nl", "Singapore": "en", "Hong Kong": "zh-HK", "Indonesia": "id", "Malaysia": "ms", "Philippines": "tl", "Thailand": "th", "Vietnam": "vi", "Belgium": "nl", "Denmark": "da", "Finland": "fi", "Ireland": "en", "Norway": "no", "Poland": "pl", "Sweden": "sv", "Switzerland": "de", "Austria": "de", "Czech Republic": "cs", "Greece": "el", "Hungary": "hu", "Portugal": "pt", "Romania": "ro", "Turkey": "tr", "Israel": "he", "Saudi Arabia": "ar", "United Arab Emirates": "ar", "South Africa": "en", "Argentina": "es", "Chile": "es", "Colombia": "es", "Peru": "es", "Venezuela": "es", "New Zealand": "en", "Bangladesh": "bn", "Pakistan": "ur", "Egypt": "ar", "Morocco": "ar", "Nigeria": "en", "Kenya": "sw", "Ukraine": "uk", "Croatia": "hr", "Slovakia": "sk", "Bulgaria": "bg", "Serbia": "sr", "Estonia": "et", "Latvia": "lv", "Lithuania": "lt", "Slovenia": "sl", "Luxembourg": "Luxembourg", "Malta": "Malta", "Cyprus": "Cyprus", "Iceland": "Iceland" } COUNTRY_LOCATIONS = { "United States": "United States", "KOREA": "kr", "United Kingdom": "United Kingdom", "Taiwan": "Taiwan", "Canada": "Canada", "Australia": "Australia", "Germany": "Germany", "France": "France", "Japan": "Japan", "China": "China", "India": "India", "Brazil": "Brazil", "Mexico": "Mexico", "Russia": "Russia", "Italy": "Italy", "Spain": "Spain", "Netherlands": "Netherlands", "Singapore": "Singapore", "Hong Kong": "Hong Kong", "Indonesia": "Indonesia", "Malaysia": "Malaysia", "Philippines": "Philippines", "Thailand": "Thailand", "Vietnam": "Vietnam", "Belgium": "Belgium", "Denmark": "Denmark", "Finland": "Finland", "Ireland": "Ireland", "Norway": "Norway", "Poland": "Poland", "Sweden": "Sweden", "Switzerland": "Switzerland", "Austria": "Austria", "Czech Republic": "Czech Republic", "Greece": "Greece", "Hungary": "Hungary", "Portugal": "Portugal", "Romania": "Romania", "Turkey": "Turkey", "Israel": "Israel", "Saudi Arabia": "Saudi Arabia", "United Arab Emirates": "United Arab Emirates", "South Africa": "South Africa", "Argentina": "Argentina", "Chile": "Chile", "Colombia": "Colombia", "Peru": "Peru", "Venezuela": "Venezuela", "New Zealand": "New Zealand", "Bangladesh": "Bangladesh", "Pakistan": "Pakistan", "Egypt": "Egypt", "Morocco": "Morocco", "Nigeria": "Nigeria", "Kenya": "Kenya", "Ukraine": "Ukraine", "Croatia": "Croatia", "Slovakia": "Slovakia", "Bulgaria": "Bulgaria", "Serbia": "Serbia", "Estonia": "et", "Latvia": "lv", "Lithuania": "lt", "Slovenia": "sl", "Luxembourg": "Luxembourg", "Malta": "Malta", "Cyprus": "Cyprus", "Iceland": "Iceland" } @lru_cache(maxsize=100) def translate_query(query, country): """ Google Translation API(비공식) 사용하여 검색어를 해당 국가 언어로 번역 """ try: if is_english(query): return query if country in COUNTRY_LANGUAGES: if country == "South Korea": return query target_lang = COUNTRY_LANGUAGES[country] url = "https://translate.googleapis.com/translate_a/single" params = { "client": "gtx", "sl": "auto", "tl": target_lang, "dt": "t", "q": query } session = requests.Session() retries = Retry(total=3, backoff_factor=0.5) session.mount('https://', HTTPAdapter(max_retries=retries)) response = session.get(url, params=params, timeout=(5, 10)) translated_text = response.json()[0][0][0] return translated_text return query except Exception as e: print(f"번역 오류: {str(e)}") return query def is_english(text): return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', '')) def search_serphouse(query, country, page=1, num_result=10): """ SerpHouse API에 실시간 검색 요청을 보내어, '뉴스' 탭 (sort_by=date)에서 해당 query에 대한 기사 목록을 가져온다. """ url = "https://api.serphouse.com/serp/live" now = datetime.utcnow() yesterday = now - timedelta(days=1) date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" translated_query = translate_query(query, country) payload = { "data": { "q": translated_query, "domain": "google.com", "loc": COUNTRY_LOCATIONS.get(country, "United States"), "lang": COUNTRY_LANGUAGES.get(country, "en"), "device": "desktop", "serp_type": "news", "page": str(page), "num": "100", "date_range": date_range, "sort_by": "date" } } headers = { "accept": "application/json", "content-type": "application/json", "authorization": f"Bearer {API_KEY}" } try: session = requests.Session() retries = Retry( total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504, 429], allowed_methods=["POST"] ) adapter = HTTPAdapter(max_retries=retries) session.mount('http://', adapter) session.mount('https://', adapter) response = session.post( url, json=payload, headers=headers, timeout=(30, 30) ) response.raise_for_status() return {"results": response.json(), "translated_query": translated_query} except requests.exceptions.Timeout: return { "error": "검색 시간이 초과되었습니다. 잠시 후 다시 시도해주세요.", "translated_query": query } except requests.exceptions.RequestException as e: return { "error": f"검색 중 오류가 발생했습니다: {str(e)}", "translated_query": query } except Exception as e: return { "error": f"예기치 않은 오류가 발생했습니다: {str(e)}", "translated_query": query } def format_results_from_raw(response_data): """ SerpHouse API의 응답 데이터를 가공하여, (에러메시지, 기사리스트) 형태로 반환. """ if "error" in response_data: return "Error: " + response_data["error"], [] try: results = response_data["results"] translated_query = response_data["translated_query"] # 실제 뉴스 결과 news_results = results.get('results', {}).get('results', {}).get('news', []) if not news_results: return "검색 결과가 없습니다.", [] # 한국 도메인 및 한국 관련 키워드 포함 기사 제외 korean_domains = [ '.kr', 'korea', 'korean', 'yonhap', 'hankyung', 'chosun', 'donga', 'joins', 'hani', 'koreatimes', 'koreaherald' ] korean_keywords = [ 'korea', 'korean', 'seoul', 'busan', 'incheon', 'daegu', 'gwangju', 'daejeon', 'ulsan', 'sejong' ] filtered_articles = [] for idx, result in enumerate(news_results, 1): url = result.get("url", result.get("link", "")).lower() title = result.get("title", "").lower() channel = result.get("channel", result.get("source", "")).lower() is_korean_content = ( any(domain in url or domain in channel for domain in korean_domains) or any(keyword in title for keyword in korean_keywords) ) # 한국어 뉴스(또는 한국 도메인) 제외 if not is_korean_content: filtered_articles.append({ "index": idx, "title": result.get("title", "제목 없음"), "link": url, "snippet": result.get("snippet", "내용 없음"), "channel": result.get("channel", result.get("source", "알 수 없음")), "time": result.get("time", result.get("date", "알 수 없는 시간")), "image_url": result.get("img", result.get("thumbnail", "")), "translated_query": translated_query }) return "", filtered_articles except Exception as e: return f"결과 처리 중 오류 발생: {str(e)}", [] def serphouse_search(query, country): """ 검색 및 결과 포매팅까지 일괄 처리 """ response_data = search_serphouse(query, country) return format_results_from_raw(response_data) # CSS (UI 커스터마이징) css = """ /* 전역 스타일 */ footer {visibility: hidden;} /* 레이아웃 컨테이너 */ #status_area { background: rgba(255, 255, 255, 0.9); padding: 15px; border-bottom: 1px solid #ddd; margin-bottom: 20px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); } #results_area { padding: 10px; margin-top: 10px; } /* 탭 스타일 */ .tabs { border-bottom: 2px solid #ddd !important; margin-bottom: 20px !important; } .tab-nav { border-bottom: none !important; margin-bottom: 0 !important; } .tab-nav button { font-weight: bold !important; padding: 10px 20px !important; } .tab-nav button.selected { border-bottom: 2px solid #1f77b4 !important; color: #1f77b4 !important; } /* 상태 메시지 */ #status_area .markdown-text { font-size: 1.1em; color: #2c3e50; padding: 10px 0; } /* 기본 컨테이너 */ .group { border: 1px solid #eee; padding: 15px; margin-bottom: 15px; border-radius: 5px; background: white; } /* 버튼 스타일 */ .primary-btn { background: #1f77b4 !important; border: none !important; } /* 입력 필드 */ .textbox { border: 1px solid #ddd !important; border-radius: 4px !important; } /* 프로그레스바 컨테이너 */ .progress-container { position: fixed; top: 0; left: 0; width: 100%; height: 6px; background: #e0e0e0; z-index: 1000; } /* 프로그레스bar */ .progress-bar { height: 100%; background: linear-gradient(90deg, #2196F3, #00BCD4); box-shadow: 0 0 10px rgba(33, 150, 243, 0.5); transition: width 0.3s ease; animation: progress-glow 1.5s ease-in-out infinite; } /* 프로그레스 텍스트 */ .progress-text { position: fixed; top: 8px; left: 50%; transform: translateX(-50%); background: #333; color: white; padding: 4px 12px; border-radius: 15px; font-size: 14px; z-index: 1001; box-shadow: 0 2px 5px rgba(0,0,0,0.2); } /* 프로그레스바 애니메이션 */ @keyframes progress-glow { 0% { box-shadow: 0 0 5px rgba(33, 150, 243, 0.5); } 50% { box-shadow: 0 0 20px rgba(33, 150, 243, 0.8); } 100% { box-shadow: 0 0 5px rgba(33, 150, 243, 0.5); } } /* 반응형 디자인 */ @media (max-width: 768px) { .group { padding: 10px; margin-bottom: 15px; } .progress-text { font-size: 12px; padding: 3px 10px; } } /* 로딩 상태 표시 개선 */ .loading { opacity: 0.7; pointer-events: none; transition: opacity 0.3s ease; } /* 결과 컨테이너 애니메이션 */ .group { transition: all 0.3s ease; opacity: 0; transform: translateY(20px); } .group.visible { opacity: 1; transform: translateY(0); } /* Examples 스타일링 */ .examples-table { margin-top: 10px !important; margin-bottom: 20px !important; } .examples-table button { background-color: #f0f0f0 !important; border: 1px solid #ddd !important; border-radius: 4px !important; padding: 5px 10px !important; margin: 2px !important; transition: all 0.3s ease !important; } .examples-table button:hover { background-color: #e0e0e0 !important; transform: translateY(-1px) !important; box-shadow: 0 2px 5px rgba(0,0,0,0.1) !important; } .examples-table .label { font-weight: bold !important; color: #444 !important; margin-bottom: 5px !important; } """ import gradio as gr with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", css=css, title="NewsAI 서비스") as iface: init_db() with gr.Tabs(): # 첫 번째 탭 with gr.Tab("Earnbot"): gr.Markdown("## EarnBot: 글로벌 빅테크 기업 및 투자 전망 AI 자동 분석") gr.Markdown(" * '전체 분석 보고 요약' 클릭 시 전체 자동 보고 생성.\n * 아래 개별 종목의 '검색(DB 자동 저장)'과 '출력(DB 자동 호출)'도 가능.\n * 추가로, 원하는 임의 키워드 및 국가로 검색/분석할 수도 있습니다.") # (2) 사용자 임의 검색 섹션 with gr.Group(): gr.Markdown("### 사용자 임의 검색") with gr.Row(): with gr.Column(): user_input = gr.Textbox( label="검색어 입력", placeholder="예) Apple, Samsung 등 자유롭게" ) with gr.Column(): country_selection = gr.Dropdown( choices=list(COUNTRY_LOCATIONS.keys()), value="United States", label="국가 선택" ) with gr.Column(): custom_search_btn = gr.Button("실행", variant="primary") custom_search_output = gr.Markdown() custom_search_btn.click( fn=search_custom, inputs=[user_input, country_selection], outputs=custom_search_output ) # 전체 분석 보고 요약 버튼 with gr.Row(): full_report_btn = gr.Button("전체 분석 보고 요약", variant="primary") full_report_display = gr.Markdown() full_report_btn.click( fn=full_summary_report, outputs=full_report_display ) # 기존 개별 기업 검색/출력 영역 with gr.Column(): for i in range(0, len(KOREAN_COMPANIES), 2): with gr.Row(): # 왼쪽 열 with gr.Column(): company = KOREAN_COMPANIES[i] with gr.Group(): gr.Markdown(f"### {company}") with gr.Row(): search_btn = gr.Button("검색", variant="primary") load_btn = gr.Button("출력", variant="secondary") result_display = gr.Markdown() search_btn.click( fn=lambda c=company: search_company(c), outputs=result_display ) load_btn.click( fn=lambda c=company: load_company(c), outputs=result_display ) # 오른쪽 열 if i + 1 < len(KOREAN_COMPANIES): with gr.Column(): company = KOREAN_COMPANIES[i + 1] with gr.Group(): gr.Markdown(f"### {company}") with gr.Row(): search_btn = gr.Button("검색", variant="primary") load_btn = gr.Button("출력", variant="secondary") result_display = gr.Markdown() search_btn.click( fn=lambda c=company: search_company(c), outputs=result_display ) load_btn.click( fn=lambda c=company: load_company(c), outputs=result_display ) iface.launch( server_name="0.0.0.0", server_port=7860, share=True, ssl_verify=False, show_error=True )