import gradio as gr import requests import json import os from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor from functools import lru_cache from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from openai import OpenAI from bs4 import BeautifulSoup import re import pathlib import sqlite3 import pytz # 한국 기업 리스트 KOREAN_COMPANIES = [ "SAMSUNG", "HYNIX", "HYUNDAI", "KIA", "LG", "HANWHA", "SKT", "Lotte", "KOGAS", "KEPCO", "SK", "POSCO", "DOOSAN", "WOORI", "KAKAO", "Celltrion" ] def convert_to_seoul_time(timestamp_str): try: # 입력된 시간을 naive datetime 객체로 변환 dt = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S') # 서울 시간대 설정 seoul_tz = pytz.timezone('Asia/Seoul') # 현재 시간을 서울 시간으로 인식하도록 수정 seoul_time = seoul_tz.localize(dt) return seoul_time.strftime('%Y-%m-%d %H:%M:%S KST') except Exception as e: print(f"시간 변환 오류: {str(e)}") return timestamp_str def analyze_sentiment_batch(articles, client): try: # 모든 기사의 제목과 내용을 하나의 텍스트로 결합 combined_text = "\n\n".join([ f"제목: {article.get('title', '')}\n내용: {article.get('snippet', '')}" for article in articles ]) prompt = f"""다음 뉴스 모음에 대해 전반적인 감성 분석을 수행하세요: 뉴스 내용: {combined_text} 다음 형식으로 분석해주세요: 1. 전반적 감성: [긍정/부정/중립] 2. 주요 긍정적 요소: - [항목1] - [항목2] 3. 주요 부정적 요소: - [항목1] - [항목2] 4. 종합 평가: [상세 설명] """ response = client.chat.completions.create( model="CohereForAI/c4ai-command-r-plus-08-2024", messages=[{"role": "user", "content": prompt}], temperature=0.3, max_tokens=1000 ) return response.choices[0].message.content except Exception as e: return f"감성 분석 실패: {str(e)}" # DB 초기화 함수 def init_db(): db_path = pathlib.Path("search_results.db") conn = sqlite3.connect(db_path) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS searches (id INTEGER PRIMARY KEY AUTOINCREMENT, keyword TEXT, country TEXT, results TEXT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP)''') conn.commit() conn.close() def save_to_db(keyword, country, results): conn = sqlite3.connect("search_results.db") c = conn.cursor() # 현재 시간을 서울 시간으로 가져오기 seoul_tz = pytz.timezone('Asia/Seoul') now = datetime.now(seoul_tz) # 시간대 정보를 제거하고 저장 timestamp = now.strftime('%Y-%m-%d %H:%M:%S') c.execute("""INSERT INTO searches (keyword, country, results, timestamp) VALUES (?, ?, ?, ?)""", (keyword, country, json.dumps(results), timestamp)) conn.commit() conn.close() def load_from_db(keyword, country): conn = sqlite3.connect("search_results.db") c = conn.cursor() c.execute("SELECT results, timestamp FROM searches WHERE keyword=? AND country=? ORDER BY timestamp DESC LIMIT 1", (keyword, country)) result = c.fetchone() conn.close() if result: return json.loads(result[0]), convert_to_seoul_time(result[1]) return None, None def display_results(articles): output = "" for idx, article in enumerate(articles, 1): output += f"### {idx}. {article['title']}\n" output += f"출처: {article['channel']}\n" output += f"시간: {article['time']}\n" output += f"링크: {article['link']}\n" output += f"요약: {article['snippet']}\n\n" return output def search_company(company): error_message, articles = serphouse_search(company, "United States") if not error_message and articles: save_to_db(company, "United States", articles) return display_results(articles) return f"{company}에 대한 검색 결과가 없습니다." def load_company(company): results, timestamp = load_from_db(company, "United States") if results: return f"### {company} 검색 결과\n저장 시간: {timestamp}\n\n" + display_results(results) return f"{company}에 대한 저장된 결과가 없습니다." def show_stats(): conn = sqlite3.connect("search_results.db") c = conn.cursor() output = "## 한국 기업 뉴스 분석 리포트\n\n" for company in KOREAN_COMPANIES: c.execute(""" SELECT results, timestamp FROM searches WHERE keyword = ? ORDER BY timestamp DESC LIMIT 1 """, (company,)) result = c.fetchone() if result: results_json, timestamp = result articles = json.loads(results_json) seoul_time = convert_to_seoul_time(timestamp) output += f"### {company}\n" output += f"- 마지막 업데이트: {seoul_time}\n" output += f"- 저장된 기사 수: {len(articles)}건\n\n" if articles: # 전체 기사에 대한 감성 분석 sentiment_analysis = analyze_sentiment_batch(articles, client) output += "#### 뉴스 감성 분석\n" output += f"{sentiment_analysis}\n\n" output += "---\n\n" conn.close() return output ACCESS_TOKEN = os.getenv("HF_TOKEN") if not ACCESS_TOKEN: raise ValueError("HF_TOKEN environment variable is not set") client = OpenAI( base_url="https://api-inference.huggingface.co/v1/", api_key=ACCESS_TOKEN, ) API_KEY = os.getenv("SERPHOUSE_API_KEY") # 국가별 언어 코드 매핑 (첫 번째 탭에서는 'United States'만 주로 사용) COUNTRY_LANGUAGES = { "United States": "en", "KOREA": "ko", "United Kingdom": "en", "Taiwan": "zh-TW", "Canada": "en", "Australia": "en", "Germany": "de", "France": "fr", "Japan": "ja", "China": "zh", "India": "hi", "Brazil": "pt", "Mexico": "es", "Russia": "ru", "Italy": "it", "Spain": "es", "Netherlands": "nl", "Singapore": "en", "Hong Kong": "zh-HK", "Indonesia": "id", "Malaysia": "ms", "Philippines": "tl", "Thailand": "th", "Vietnam": "vi", "Belgium": "nl", "Denmark": "da", "Finland": "fi", "Ireland": "en", "Norway": "no", "Poland": "pl", "Sweden": "sv", "Switzerland": "de", "Austria": "de", "Czech Republic": "cs", "Greece": "el", "Hungary": "hu", "Portugal": "pt", "Romania": "ro", "Turkey": "tr", "Israel": "he", "Saudi Arabia": "ar", "United Arab Emirates": "ar", "South Africa": "en", "Argentina": "es", "Chile": "es", "Colombia": "es", "Peru": "es", "Venezuela": "es", "New Zealand": "en", "Bangladesh": "bn", "Pakistan": "ur", "Egypt": "ar", "Morocco": "ar", "Nigeria": "en", "Kenya": "sw", "Ukraine": "uk", "Croatia": "hr", "Slovakia": "sk", "Bulgaria": "bg", "Serbia": "sr", "Estonia": "et", "Latvia": "lv", "Lithuania": "lt", "Slovenia": "sl", "Luxembourg": "fr", "Malta": "mt", "Cyprus": "el", "Iceland": "is" } COUNTRY_LOCATIONS = { "United States": "United States", "KOREA": "kr", "United Kingdom": "United Kingdom", "Taiwan": "Taiwan", "Canada": "Canada", "Australia": "Australia", "Germany": "Germany", "France": "France", "Japan": "Japan", "China": "China", "India": "India", "Brazil": "Brazil", "Mexico": "Mexico", "Russia": "Russia", "Italy": "Italy", "Spain": "Spain", "Netherlands": "Netherlands", "Singapore": "Singapore", "Hong Kong": "Hong Kong", "Indonesia": "Indonesia", "Malaysia": "Malaysia", "Philippines": "Philippines", "Thailand": "Thailand", "Vietnam": "Vietnam", "Belgium": "Belgium", "Denmark": "Denmark", "Finland": "Finland", "Ireland": "Ireland", "Norway": "Norway", "Poland": "Poland", "Sweden": "Sweden", "Switzerland": "Switzerland", "Austria": "Austria", "Czech Republic": "Czech Republic", "Greece": "Greece", "Hungary": "Hungary", "Portugal": "Portugal", "Romania": "Romania", "Turkey": "Turkey", "Israel": "Israel", "Saudi Arabia": "Saudi Arabia", "United Arab Emirates": "United Arab Emirates", "South Africa": "South Africa", "Argentina": "Argentina", "Chile": "Chile", "Colombia": "Colombia", "Peru": "Peru", "Venezuela": "Venezuela", "New Zealand": "New Zealand", "Bangladesh": "Bangladesh", "Pakistan": "Pakistan", "Egypt": "Egypt", "Morocco": "Morocco", "Nigeria": "Nigeria", "Kenya": "Kenya", "Ukraine": "Ukraine", "Croatia": "Croatia", "Slovakia": "Slovakia", "Bulgaria": "Bulgaria", "Serbia": "Serbia", "Estonia": "Estonia", "Latvia": "Latvia", "Lithuania": "Lithuania", "Slovenia": "Slovenia", "Luxembourg": "Luxembourg", "Malta": "Malta", "Cyprus": "Cyprus", "Iceland": "Iceland" } @lru_cache(maxsize=100) def translate_query(query, country): try: if is_english(query): return query if country in COUNTRY_LANGUAGES: if country == "South Korea": return query target_lang = COUNTRY_LANGUAGES[country] url = "https://translate.googleapis.com/translate_a/single" params = { "client": "gtx", "sl": "auto", "tl": target_lang, "dt": "t", "q": query } session = requests.Session() retries = Retry(total=3, backoff_factor=0.5) session.mount('https://', HTTPAdapter(max_retries=retries)) response = session.get(url, params=params, timeout=(5, 10)) translated_text = response.json()[0][0][0] return translated_text return query except Exception as e: print(f"번역 오류: {str(e)}") return query def is_english(text): return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', '')) def search_serphouse(query, country, page=1, num_result=10): url = "https://api.serphouse.com/serp/live" now = datetime.utcnow() yesterday = now - timedelta(days=1) date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" translated_query = translate_query(query, country) payload = { "data": { "q": translated_query, "domain": "google.com", "loc": COUNTRY_LOCATIONS.get(country, "United States"), "lang": COUNTRY_LANGUAGES.get(country, "en"), "device": "desktop", "serp_type": "news", "page": "1", "num": "100", "date_range": date_range, "sort_by": "date" } } headers = { "accept": "application/json", "content-type": "application/json", "authorization": f"Bearer {API_KEY}" } try: session = requests.Session() retries = Retry( total=5, backoff_factor=1, status_forcelist=[500, 502, 503, 504, 429], allowed_methods=["POST"] ) adapter = HTTPAdapter(max_retries=retries) session.mount('http://', adapter) session.mount('https://', adapter) response = session.post( url, json=payload, headers=headers, timeout=(30, 30) ) response.raise_for_status() return {"results": response.json(), "translated_query": translated_query} except requests.exceptions.Timeout: return { "error": "검색 시간이 초과되었습니다. 잠시 후 다시 시도해주세요.", "translated_query": query } except requests.exceptions.RequestException as e: return { "error": f"검색 중 오류가 발생했습니다: {str(e)}", "translated_query": query } except Exception as e: return { "error": f"예기치 않은 오류가 발생했습니다: {str(e)}", "translated_query": query } def format_results_from_raw(response_data): if "error" in response_data: return "Error: " + response_data["error"], [] try: results = response_data["results"] translated_query = response_data["translated_query"] news_results = results.get('results', {}).get('results', {}).get('news', []) if not news_results: return "검색 결과가 없습니다.", [] # 한국 도메인 및 한국 관련 키워드 필터링 korean_domains = [ '.kr', 'korea', 'korean', 'yonhap', 'hankyung', 'chosun', 'donga', 'joins', 'hani', 'koreatimes', 'koreaherald' ] korean_keywords = [ 'korea', 'korean', 'seoul', 'busan', 'incheon', 'daegu', 'gwangju', 'daejeon', 'ulsan', 'sejong' ] filtered_articles = [] for idx, result in enumerate(news_results, 1): url = result.get("url", result.get("link", "")).lower() title = result.get("title", "").lower() channel = result.get("channel", result.get("source", "")).lower() # 한국 관련 컨텐츠 필터링 is_korean_content = ( any(domain in url or domain in channel for domain in korean_domains) or any(keyword in title.lower() for keyword in korean_keywords) ) if not is_korean_content: filtered_articles.append({ "index": idx, "title": result.get("title", "제목 없음"), "link": url, "snippet": result.get("snippet", "내용 없음"), "channel": result.get("channel", result.get("source", "알 수 없음")), "time": result.get("time", result.get("date", "알 수 없는 시간")), "image_url": result.get("img", result.get("thumbnail", "")), "translated_query": translated_query }) return "", filtered_articles except Exception as e: return f"결과 처리 중 오류 발생: {str(e)}", [] def serphouse_search(query, country): response_data = search_serphouse(query, country) return format_results_from_raw(response_data) css = """ /* 전역 스타일 */ footer {visibility: hidden;} /* 레이아웃 컨테이너 */ #status_area { background: rgba(255, 255, 255, 0.9); padding: 15px; border-bottom: 1px solid #ddd; margin-bottom: 20px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); } #results_area { padding: 10px; margin-top: 10px; } /* 탭 스타일 */ .tabs { border-bottom: 2px solid #ddd !important; margin-bottom: 20px !important; } .tab-nav { border-bottom: none !important; margin-bottom: 0 !important; } .tab-nav button { font-weight: bold !important; padding: 10px 20px !important; } .tab-nav button.selected { border-bottom: 2px solid #1f77b4 !important; color: #1f77b4 !important; } /* 상태 메시지 */ #status_area .markdown-text { font-size: 1.1em; color: #2c3e50; padding: 10px 0; } /* 기본 컨테이너 */ .group { border: 1px solid #eee; padding: 15px; margin-bottom: 15px; border-radius: 5px; background: white; } /* 버튼 스타일 */ .primary-btn { background: #1f77b4 !important; border: none !important; } /* 입력 필드 */ .textbox { border: 1px solid #ddd !important; border-radius: 4px !important; } /* 프로그레스바 컨테이너 */ .progress-container { position: fixed; top: 0; left: 0; width: 100%; height: 6px; background: #e0e0e0; z-index: 1000; } /* 프로그레스바 */ .progress-bar { height: 100%; background: linear-gradient(90deg, #2196F3, #00BCD4); box-shadow: 0 0 10px rgba(33, 150, 243, 0.5); transition: width 0.3s ease; animation: progress-glow 1.5s ease-in-out infinite; } /* 프로그레스 텍스트 */ .progress-text { position: fixed; top: 8px; left: 50%; transform: translateX(-50%); background: #333; color: white; padding: 4px 12px; border-radius: 15px; font-size: 14px; z-index: 1001; box-shadow: 0 2px 5px rgba(0,0,0,0.2); } /* 프로그레스바 애니메이션 */ @keyframes progress-glow { 0% { box-shadow: 0 0 5px rgba(33, 150, 243, 0.5); } 50% { box-shadow: 0 0 20px rgba(33, 150, 243, 0.8); } 100% { box-shadow: 0 0 5px rgba(33, 150, 243, 0.5); } } /* 반응형 디자인 */ @media (max-width: 768px) { .group { padding: 10px; margin-bottom: 15px; } .progress-text { font-size: 12px; padding: 3px 10px; } } /* 로딩 상태 표시 개선 */ .loading { opacity: 0.7; pointer-events: none; transition: opacity 0.3s ease; } /* 결과 컨테이너 애니메이션 */ .group { transition: all 0.3s ease; opacity: 0; transform: translateY(20px); } .group.visible { opacity: 1; transform: translateY(0); } /* Examples 스타일링 */ .examples-table { margin-top: 10px !important; margin-bottom: 20px !important; } .examples-table button { background-color: #f0f0f0 !important; border: 1px solid #ddd !important; border-radius: 4px !important; padding: 5px 10px !important; margin: 2px !important; transition: all 0.3s ease !important; } .examples-table button:hover { background-color: #e0e0e0 !important; transform: translateY(-1px) !important; box-shadow: 0 2px 5px rgba(0,0,0,0.1) !important; } .examples-table .label { font-weight: bold !important; color: #444 !important; margin-bottom: 5px !important; } """ with gr.Blocks(theme="Yntec/HaleyCH_Theme_Orange", css=css, title="NewsAI 서비스") as iface: init_db() with gr.Tabs(): # 첫 번째 탭 (DB 검색)만 유지 with gr.Tab("DB 검색"): gr.Markdown("## 한국 주요 기업 미국 뉴스 DB") gr.Markdown("각 기업의 미국 뉴스를 검색하여 DB에 저장하고 불러올 수 있습니다.") with gr.Column(): for i in range(0, len(KOREAN_COMPANIES), 2): with gr.Row(): # 왼쪽 열 with gr.Column(): company = KOREAN_COMPANIES[i] with gr.Group(): gr.Markdown(f"### {company}") with gr.Row(): search_btn = gr.Button(f"검색", variant="primary") load_btn = gr.Button(f"출력", variant="secondary") result_display = gr.Markdown() search_btn.click( fn=lambda c=company: search_company(c), outputs=result_display ) load_btn.click( fn=lambda c=company: load_company(c), outputs=result_display ) # 오른쪽 열 if i + 1 < len(KOREAN_COMPANIES): with gr.Column(): company = KOREAN_COMPANIES[i + 1] with gr.Group(): gr.Markdown(f"### {company}") with gr.Row(): search_btn = gr.Button(f"검색", variant="primary") load_btn = gr.Button(f"출력", variant="secondary") result_display = gr.Markdown() search_btn.click( fn=lambda c=company: search_company(c), outputs=result_display ) load_btn.click( fn=lambda c=company: load_company(c), outputs=result_display ) # 전체 검색 통계 with gr.Row(): stats_btn = gr.Button("전체 검색 통계 보기", variant="secondary") stats_display = gr.Markdown() stats_btn.click( fn=show_stats, outputs=stats_display ) iface.launch( server_name="0.0.0.0", server_port=7860, share=True, ssl_verify=False, show_error=True )