|
import logging |
|
import gradio as gr |
|
import pandas as pd |
|
import torch |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
from GoogleNews import GoogleNews |
|
from transformers import pipeline |
|
from datetime import datetime, timedelta |
|
import matplotlib |
|
matplotlib.use('Agg') |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
|
) |
|
|
|
SENTIMENT_ANALYSIS_MODEL = ( |
|
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" |
|
) |
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
logging.info(f"Using device: {DEVICE}") |
|
logging.info("Initializing sentiment analysis model...") |
|
sentiment_analyzer = pipeline( |
|
"sentiment-analysis", model=SENTIMENT_ANALYSIS_MODEL, device=DEVICE |
|
) |
|
logging.info("Model initialized successfully") |
|
|
|
def fetch_articles(query, max_articles=30): |
|
try: |
|
logging.info(f"Fetching up to {max_articles} articles for query: '{query}'") |
|
googlenews = GoogleNews(lang="en") |
|
googlenews.search(query) |
|
|
|
|
|
articles = googlenews.result() |
|
|
|
|
|
page = 2 |
|
while len(articles) < max_articles and page <= 10: |
|
logging.info(f"Fetched {len(articles)} articles so far. Getting page {page}...") |
|
googlenews.get_page(page) |
|
page_results = googlenews.result() |
|
|
|
|
|
if not page_results: |
|
logging.info(f"No more results found after page {page-1}") |
|
break |
|
|
|
articles.extend(page_results) |
|
page += 1 |
|
|
|
|
|
articles = articles[:max_articles] |
|
|
|
logging.info(f"Successfully fetched {len(articles)} articles") |
|
return articles |
|
except Exception as e: |
|
logging.error( |
|
f"Error while searching articles for query: '{query}'. Error: {e}" |
|
) |
|
raise gr.Error( |
|
f"Unable to search articles for query: '{query}'. Try again later...", |
|
duration=5, |
|
) |
|
|
|
def analyze_article_sentiment(article): |
|
logging.info(f"Analyzing sentiment for article: {article['title']}") |
|
sentiment = sentiment_analyzer(article["desc"])[0] |
|
article["sentiment"] = sentiment |
|
return article |
|
|
|
def calculate_time_weight(article_date_str): |
|
""" |
|
๊ธฐ์ฌ ์๊ฐ ๊ธฐ์ค์ผ๋ก ๊ฐ์ค์น ๊ณ์ฐ |
|
- 1์๊ฐ๋น 1%์ฉ ๊ฐ์, ์ต๋ 24์๊ฐ๊น์ง๋ง ๊ณ ๋ ค |
|
- 1์๊ฐ ๋ด ๊ธฐ์ฌ: 24% ๊ฐ์ค์น |
|
- 10์๊ฐ ์ ๊ธฐ์ฌ: 15% ๊ฐ์ค์น |
|
- 24์๊ฐ ์ด์ ์ ๊ธฐ์ฌ: 1% ๊ฐ์ค์น |
|
""" |
|
try: |
|
|
|
date_formats = [ |
|
'%a, %d %b %Y %H:%M:%S %z', |
|
'%Y-%m-%d %H:%M:%S', |
|
'%a, %d %b %Y %H:%M:%S', |
|
'%Y-%m-%dT%H:%M:%S%z', |
|
'%a %b %d, %Y', |
|
'%d %b %Y' |
|
] |
|
|
|
parsed_date = None |
|
for format_str in date_formats: |
|
try: |
|
parsed_date = datetime.strptime(article_date_str, format_str) |
|
break |
|
except ValueError: |
|
continue |
|
|
|
|
|
if parsed_date is None: |
|
logging.warning(f"Could not parse date: {article_date_str}, using default 24h ago") |
|
return 0.01 |
|
|
|
|
|
now = datetime.now() |
|
if parsed_date.tzinfo is not None: |
|
now = now.replace(tzinfo=parsed_date.tzinfo) |
|
|
|
hours_diff = (now - parsed_date).total_seconds() / 3600 |
|
|
|
|
|
if hours_diff <= 24: |
|
weight = 0.24 - (0.01 * int(hours_diff)) |
|
return max(0.01, weight) |
|
else: |
|
return 0.01 |
|
except Exception as e: |
|
logging.error(f"Error calculating time weight: {e}") |
|
return 0.01 |
|
|
|
def calculate_sentiment_score(sentiment_label, time_weight): |
|
""" |
|
๊ฐ์ฑ ๋ ์ด๋ธ์ ๋ฐ๋ฅธ ๊ธฐ๋ณธ ์ ์ ๊ณ์ฐ ๋ฐ ์๊ฐ ๊ฐ์ค์น ์ ์ฉ |
|
- positive: +3์ |
|
- neutral: 0์ |
|
- negative: -3์ |
|
""" |
|
base_score = { |
|
'positive': 3, |
|
'neutral': 0, |
|
'negative': -3 |
|
}.get(sentiment_label, 0) |
|
|
|
|
|
weighted_addition = base_score * time_weight |
|
|
|
return base_score, weighted_addition |
|
|
|
def analyze_asset_sentiment(asset_name): |
|
logging.info(f"Starting sentiment analysis for asset: {asset_name}") |
|
logging.info("Fetching up to 30 articles") |
|
articles = fetch_articles(asset_name, max_articles=30) |
|
logging.info("Analyzing sentiment of each article") |
|
analyzed_articles = [analyze_article_sentiment(article) for article in articles] |
|
|
|
|
|
for article in analyzed_articles: |
|
time_weight = calculate_time_weight(article["date"]) |
|
article["time_weight"] = time_weight |
|
|
|
sentiment_label = article["sentiment"]["label"] |
|
base_score, weighted_addition = calculate_sentiment_score(sentiment_label, time_weight) |
|
|
|
article["base_score"] = base_score |
|
article["weighted_addition"] = weighted_addition |
|
article["total_score"] = base_score + weighted_addition |
|
|
|
logging.info("Sentiment analysis completed") |
|
|
|
|
|
sentiment_summary = create_sentiment_summary(analyzed_articles, asset_name) |
|
|
|
return convert_to_dataframe(analyzed_articles), sentiment_summary |
|
|
|
def create_sentiment_summary(analyzed_articles, asset_name): |
|
""" |
|
๊ฐ์ฑ ๋ถ์ ๊ฒฐ๊ณผ๋ฅผ ์์ฝํ๊ณ ๊ทธ๋ํ๋ก ์๊ฐํ |
|
""" |
|
total_articles = len(analyzed_articles) |
|
positive_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "positive") |
|
neutral_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "neutral") |
|
negative_count = sum(1 for a in analyzed_articles if a["sentiment"]["label"] == "negative") |
|
|
|
|
|
base_score_sum = sum(a["base_score"] for a in analyzed_articles) |
|
|
|
|
|
weighted_score_sum = sum(a["total_score"] for a in analyzed_articles) |
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6)) |
|
|
|
|
|
labels = ['Positive', 'Neutral', 'Negative'] |
|
sizes = [positive_count, neutral_count, negative_count] |
|
colors = ['green', 'gray', 'red'] |
|
|
|
ax1.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90) |
|
ax1.axis('equal') |
|
ax1.set_title(f'Sentiment Distribution for {asset_name}') |
|
|
|
|
|
sorted_articles = sorted(analyzed_articles, key=lambda x: x.get("date", ""), reverse=True) |
|
|
|
|
|
max_display = min(15, len(sorted_articles)) |
|
display_articles = sorted_articles[:max_display] |
|
|
|
dates = [a.get("date", "")[:10] for a in display_articles] |
|
scores = [a.get("total_score", 0) for a in display_articles] |
|
|
|
|
|
bar_colors = ['green' if s > 0 else 'red' if s < 0 else 'gray' for s in scores] |
|
|
|
bars = ax2.bar(range(len(dates)), scores, color=bar_colors) |
|
ax2.set_xticks(range(len(dates))) |
|
ax2.set_xticklabels(dates, rotation=45, ha='right') |
|
ax2.set_ylabel('Weighted Sentiment Score') |
|
ax2.set_title(f'Recent Article Scores for {asset_name}') |
|
ax2.axhline(y=0, color='black', linestyle='-', alpha=0.3) |
|
|
|
|
|
summary_text = f""" |
|
Analysis Summary for {asset_name}: |
|
Total Articles: {total_articles} |
|
Positive: {positive_count} ({positive_count/total_articles*100:.1f}%) |
|
Neutral: {neutral_count} ({neutral_count/total_articles*100:.1f}%) |
|
Negative: {negative_count} ({negative_count/total_articles*100:.1f}%) |
|
|
|
Base Score Sum: {base_score_sum:.2f} |
|
Weighted Score Sum: {weighted_score_sum:.2f} |
|
""" |
|
|
|
plt.figtext(0.5, 0.01, summary_text, ha='center', fontsize=10, bbox={"facecolor":"orange", "alpha":0.2, "pad":5}) |
|
|
|
plt.tight_layout(rect=[0, 0.1, 1, 0.95]) |
|
|
|
|
|
fig_path = f"sentiment_summary_{asset_name.replace(' ', '_')}.png" |
|
plt.savefig(fig_path) |
|
plt.close() |
|
|
|
return fig_path |
|
|
|
def convert_to_dataframe(analyzed_articles): |
|
df = pd.DataFrame(analyzed_articles) |
|
df["Title"] = df.apply( |
|
lambda row: f'<a href="{row["link"]}" target="_blank">{row["title"]}</a>', |
|
axis=1, |
|
) |
|
df["Description"] = df["desc"] |
|
df["Date"] = df["date"] |
|
|
|
def sentiment_badge(sentiment): |
|
colors = { |
|
"negative": "red", |
|
"neutral": "gray", |
|
"positive": "green", |
|
} |
|
color = colors.get(sentiment, "grey") |
|
return f'<span style="background-color: {color}; color: white; padding: 2px 6px; border-radius: 4px;">{sentiment}</span>' |
|
|
|
df["Sentiment"] = df["sentiment"].apply(lambda x: sentiment_badge(x["label"])) |
|
|
|
|
|
df["Base Score"] = df["base_score"] |
|
df["Weight"] = df["time_weight"].apply(lambda x: f"{x*100:.0f}%") |
|
df["Total Score"] = df["total_score"].apply(lambda x: f"{x:.2f}") |
|
|
|
return df[["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"]] |
|
|
|
with gr.Blocks() as iface: |
|
gr.Markdown("# Trading Asset Sentiment Analysis") |
|
gr.Markdown( |
|
"Enter the name of a trading asset, and I'll fetch recent articles and analyze their sentiment!" |
|
) |
|
|
|
with gr.Row(): |
|
input_asset = gr.Textbox( |
|
label="Asset Name", |
|
lines=1, |
|
placeholder="Enter the name of the trading asset...", |
|
) |
|
|
|
with gr.Row(): |
|
analyze_button = gr.Button("Analyze Sentiment", size="sm") |
|
|
|
gr.Examples( |
|
examples=[ |
|
"Bitcoin", |
|
"Tesla", |
|
"Apple", |
|
"Amazon", |
|
], |
|
inputs=input_asset, |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Blocks(): |
|
gr.Markdown("## Sentiment Summary") |
|
sentiment_summary = gr.Image(type="filepath", label="Sentiment Analysis Summary") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Blocks(): |
|
gr.Markdown("## Articles and Sentiment Analysis") |
|
articles_output = gr.Dataframe( |
|
headers=["Sentiment", "Title", "Description", "Date", "Base Score", "Weight", "Total Score"], |
|
datatype=["markdown", "html", "markdown", "markdown", "number", "markdown", "markdown"], |
|
wrap=False, |
|
) |
|
|
|
analyze_button.click( |
|
analyze_asset_sentiment, |
|
inputs=[input_asset], |
|
outputs=[articles_output, sentiment_summary], |
|
) |
|
|
|
logging.info("Launching Gradio interface") |
|
iface.queue().launch() |