Spaces:
Running
Running
import gradio as gr | |
import requests | |
import json | |
import os | |
from datetime import datetime, timedelta | |
from huggingface_hub import InferenceClient | |
API_KEY = os.getenv("SERPHOUSE_API_KEY") | |
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN")) | |
# κ΅κ°λ³ μ½λ λ§€ν | |
COUNTRY_CODES = { | |
"United States": "US", | |
"United Kingdom": "GB", | |
"Canada": "CA", | |
"Australia": "AU", | |
"Germany": "DE", | |
"France": "FR", | |
"Japan": "JP", | |
"South Korea": "KR", | |
"China": "CN", | |
"Taiwan": "TW", | |
"India": "IN", | |
"Brazil": "BR", | |
"Mexico": "MX", | |
"Russia": "RU", | |
"Italy": "IT", | |
"Spain": "ES", | |
"Netherlands": "NL", | |
"Singapore": "SG", | |
"Hong Kong": "HK", | |
"Indonesia": "ID", | |
"Malaysia": "MY", | |
"Philippines": "PH", | |
"Thailand": "TH", | |
"Vietnam": "VN" | |
} | |
MAJOR_COUNTRIES = list(COUNTRY_CODES.keys()) | |
def is_english(text): | |
return all(ord(char) < 128 for char in text.replace(' ', '')) | |
def translate_query(query, country): | |
try: | |
# μμ΄ μ λ ₯μΈ κ²½μ° λ²μνμ§ μμ | |
if is_english(query): | |
print(f"English query detected, using original: {query}") | |
return query[:255] | |
# νκΈ μ λ ₯μ΄κ³ South Koreaκ° μ νλ κ²½μ° λ²μνμ§ μμ | |
if country == "South Korea": | |
return query[:255] | |
if country in COUNTRY_CODES: | |
query = query[:100] | |
target_lang = COUNTRY_CODES[country] | |
prompt = f"""Translate this text to {target_lang} language. | |
For Japanese, use Kanji and Kana. | |
For Chinese (China), use Simplified Chinese. | |
For Chinese (Taiwan), use Traditional Chinese. | |
For Korean, use Hangul. | |
Only output the translated text without any explanation. | |
Text to translate: {query}""" | |
translated = hf_client.text_generation( | |
prompt, | |
max_new_tokens=50, | |
temperature=0.1 | |
) | |
translated = translated.strip()[:255] | |
print(f"Original query: {query}") | |
print(f"Translated query: {translated}") | |
return translated | |
return query[:255] | |
except Exception as e: | |
print(f"Translation error: {str(e)}") | |
return query[:255] | |
def search_serphouse(query, country, page=1, num_result=10): | |
url = "https://api.serphouse.com/serp/live" | |
# κ²μμ΄ λ²μ | |
translated_query = translate_query(query, country) | |
print(f"Original query: {query}") | |
print(f"Translated query: {translated_query}") | |
payload = { | |
"data": { | |
"q": translated_query, | |
"domain": "google.com", | |
"loc": country, # κ΅κ° μ΄λ¦ μΆκ° | |
"country_code": COUNTRY_CODES.get(country, "US"), | |
"lang": "en", | |
"device": "desktop", | |
"serp_type": "web", # webμΌλ‘ λ³κ²½ | |
"page": "1", | |
"verbatim": "0", | |
"gfilter": "0", | |
"num_result": "10" | |
} | |
} | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Bearer {API_KEY}" | |
} | |
# λλ²κΉ μ μν μΆλ ₯ | |
print("Full request payload:", json.dumps(payload, indent=2, ensure_ascii=False)) | |
try: | |
response = requests.post(url, json=payload, headers=headers) | |
print("Response status:", response.status_code) | |
print("Response content:", response.text) | |
response.raise_for_status() | |
return {"results": response.json(), "translated_query": translated_query} | |
except requests.RequestException as e: | |
error_msg = f"Error: {str(e)}" | |
if hasattr(response, 'text'): | |
error_msg += f"\nResponse content: {response.text}" | |
return {"error": error_msg, "translated_query": query} | |
def format_results_from_raw(response_data): | |
if "error" in response_data: | |
return "Error: " + response_data["error"], [] | |
try: | |
results = response_data["results"] | |
translated_query = response_data["translated_query"] | |
news_results = results.get('results', {}).get('organic_results', []) | |
if not news_results: | |
return "κ²μ κ²°κ³Όκ° μμ΅λλ€.", [] | |
articles = [] | |
for idx, result in enumerate(news_results, 1): | |
articles.append({ | |
"index": idx, | |
"title": result.get("title", "μ λͺ© μμ"), | |
"link": result.get("link", "#"), | |
"snippet": result.get("snippet", "λ΄μ© μμ"), | |
"channel": result.get("source", "μ μ μμ"), | |
"time": result.get("date", "μ μ μλ μκ°"), | |
"image_url": result.get("thumbnail", ""), | |
"translated_query": translated_query | |
}) | |
return "", articles | |
except Exception as e: | |
return f"κ²°κ³Ό μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}", [] | |
def serphouse_search(query, country): | |
response_data = search_serphouse(query, country) | |
return format_results_from_raw(response_data) | |
css = """ | |
footer {visibility: hidden;} | |
""" | |
with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css, title="NewsAI μλΉμ€") as iface: | |
gr.Markdown("κ²μμ΄λ₯Ό μ λ ₯νκ³ μνλ κ΅κ°λ₯Ό μ ννλ©΄, ν΄λΉ κ΅κ°μ μΈμ΄λ‘ λ²μλ κ²μμ΄λ‘ λ΄μ€λ₯Ό κ²μν©λλ€.") | |
with gr.Column(): | |
with gr.Row(): | |
query = gr.Textbox(label="κ²μμ΄") | |
country = gr.Dropdown(MAJOR_COUNTRIES, label="κ΅κ°", value="South Korea") | |
# λ²μλ κ²μμ΄ νμ μ»΄ν¬λνΈ | |
translated_display = gr.Markdown(visible=True) | |
search_button = gr.Button("κ²μ", variant="primary") | |
progress = gr.Progress() | |
status_message = gr.Markdown(visible=False) | |
articles_state = gr.State([]) | |
article_components = [] | |
for i in range(100): | |
with gr.Group(visible=False) as article_group: | |
title = gr.Markdown() | |
image = gr.Image(width=200, height=150) | |
snippet = gr.Markdown() | |
info = gr.Markdown() | |
article_components.append({ | |
'group': article_group, | |
'title': title, | |
'image': image, | |
'snippet': snippet, | |
'info': info, | |
'index': i, | |
}) | |
def search_and_display(query, country, articles_state, progress=gr.Progress()): | |
progress(0, desc="κ²μμ΄ λ²μ μ€...") | |
translated_query = translate_query(query, country) | |
if is_english(query): | |
translated_display_text = f"μμ΄ κ²μμ΄: {query}" | |
elif country == "South Korea": | |
translated_display_text = f"κ²μμ΄: {query}" | |
elif translated_query != query: | |
translated_display_text = f"μλ³Έ κ²μμ΄: {query}\nλ²μλ κ²μμ΄: {translated_query}" | |
else: | |
translated_display_text = f"κ²μμ΄: {query}" | |
progress(0.2, desc="κ²μ μ€...") | |
response_data = search_serphouse(query, country) | |
error_message, articles = format_results_from_raw(response_data) | |
outputs = [gr.update(value=translated_display_text, visible=True)] | |
if error_message: | |
outputs.append(gr.update(value=error_message, visible=True)) | |
for comp in article_components: | |
outputs.extend([ | |
gr.update(visible=False), gr.update(), gr.update(), | |
gr.update(), gr.update() | |
]) | |
articles_state = [] | |
else: | |
outputs.append(gr.update(value="", visible=False)) | |
total_articles = len(articles) | |
for idx, comp in enumerate(article_components): | |
progress((idx + 1) / total_articles, desc=f"κ²°κ³Ό νμ μ€... {idx + 1}/{total_articles}") | |
if idx < len(articles): | |
article = articles[idx] | |
image_url = article['image_url'] | |
image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False) | |
outputs.extend([ | |
gr.update(visible=True), | |
gr.update(value=f"### [{article['title']}]({article['link']})"), | |
image_update, | |
gr.update(value=f"**μμ½:** {article['snippet']}"), | |
gr.update(value=f"**μΆμ²:** {article['channel']} | **μκ°:** {article['time']}") | |
]) | |
else: | |
outputs.extend([ | |
gr.update(visible=False), gr.update(), gr.update(), | |
gr.update(), gr.update() | |
]) | |
articles_state = articles | |
progress(1.0, desc="μλ£!") | |
outputs.append(articles_state) | |
outputs.append(gr.update(visible=False)) | |
return outputs | |
search_outputs = [translated_display, gr.Markdown(visible=False)] | |
for comp in article_components: | |
search_outputs.extend([comp['group'], comp['title'], comp['image'], | |
comp['snippet'], comp['info']]) | |
search_outputs.extend([articles_state, status_message]) | |
search_button.click( | |
fn=search_and_display, | |
inputs=[query, country, articles_state], | |
outputs=search_outputs, | |
show_progress=True | |
) | |
iface.launch() |