|
import streamlit as st |
|
import pandas as pd |
|
from sklearn.model_selection import train_test_split |
|
from datasets import Dataset |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, get_linear_schedule_with_warmup |
|
import numpy as np |
|
import torch |
|
from transformers import pipeline |
|
from collections import Counter |
|
import time |
|
from tqdm import tqdm |
|
import evaluate |
|
|
|
|
|
def load_and_process_data(news_file, trend_file): |
|
news_df = pd.read_csv(news_file) |
|
trend_df = pd.read_csv(trend_file) |
|
trend_df = trend_df.rename(columns={'Symbol': 'Stock'}) |
|
news_labeled_df = news_df.merge(trend_df[['Stock', 'Trend']], on='Stock', how='left') |
|
news_labeled_df = news_labeled_df[news_labeled_df['Trend'].isin(['Positive', 'Negative'])] |
|
label_map = {'Negative': 0, 'Positive': 1} |
|
news_labeled_df['label'] = news_labeled_df['Trend'].map(label_map) |
|
return news_labeled_df |
|
|
|
|
|
def check_class_imbalance(df): |
|
class_counts = df['label'].value_counts() |
|
st.write("**Class Distribution:**", class_counts.to_dict()) |
|
if class_counts.min() / class_counts.max() < 0.5: |
|
st.warning("Warning: Class imbalance detected. Consider balancing techniques.") |
|
|
|
|
|
def split_data(df): |
|
stocks = df['Stock'].unique() |
|
train_val_stocks, test_stocks = train_test_split(stocks, test_size=0.2, random_state=42) |
|
train_stocks, val_stocks = train_test_split(train_val_stocks, test_size=0.25, random_state=42) |
|
train_df = df[df['Stock'].isin(train_stocks)] |
|
val_df = df[df['Stock'].isin(val_stocks)] |
|
test_df = df[df['Stock'].isin(test_stocks)] |
|
return train_df, val_df, test_df |
|
|
|
|
|
def tokenize_datasets(train_df, val_df, test_df, tokenizer): |
|
train_dataset = Dataset.from_pandas(train_df[['Headline', 'label']]) |
|
val_dataset = Dataset.from_pandas(val_df[['Headline', 'label']]) |
|
test_dataset = Dataset.from_pandas(test_df[['Headline', 'label']]) |
|
def tokenize_function(examples): |
|
return tokenizer(examples['Headline'], padding='max_length', truncation=True, max_length=128) |
|
tokenized_train = train_dataset.map(tokenize_function, batched=True) |
|
tokenized_val = val_dataset.map(tokenize_function, batched=True) |
|
tokenized_test = test_dataset.map(tokenize_function, batched=True) |
|
return tokenized_train, tokenized_val, tokenized_test |
|
|
|
|
|
@st.cache_resource |
|
def load_model(): |
|
model = AutoModelForSequenceClassification.from_pretrained( |
|
"yiyanghkust/finbert-tone", |
|
num_labels=2, |
|
ignore_mismatched_sizes=True |
|
) |
|
for param in model.bert.encoder.layer[:6].parameters(): |
|
param.requires_grad = False |
|
return model |
|
|
|
|
|
def train_model(tokenized_train, tokenized_val, model): |
|
training_args = TrainingArguments( |
|
output_dir="./results", |
|
num_train_epochs=5, |
|
per_device_train_batch_size=32, |
|
per_device_eval_batch_size=32, |
|
eval_strategy="epoch", |
|
save_strategy="epoch", |
|
load_best_model_at_end=True, |
|
metric_for_best_model="accuracy", |
|
learning_rate=5e-5, |
|
weight_decay=0.1, |
|
report_to="none", |
|
) |
|
total_steps = len(tokenized_train) // training_args.per_device_train_batch_size * training_args.num_train_epochs |
|
optimizer = torch.optim.AdamW(model.parameters(), lr=training_args.learning_rate) |
|
trainer = Trainer( |
|
model=model, |
|
args=training_args, |
|
train_dataset=tokenized_train, |
|
eval_dataset=tokenized_val, |
|
compute_metrics=lambda eval_pred: {"accuracy": evaluate.load("accuracy").compute(predictions=np.argmax(eval_pred.predictions, axis=1), references=eval_pred.label_ids)}, |
|
optimizers=(optimizer, get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)), |
|
) |
|
trainer.train() |
|
trainer.save_model("./fine_tuned_model") |
|
return trainer |
|
|
|
|
|
def evaluate_model(pipe, df, model_name=""): |
|
results = [] |
|
total_start = time.perf_counter() |
|
for stock, group in tqdm(df.groupby("Stock")): |
|
headlines = group["Headline"].tolist() |
|
true_trend = group["Trend"].iloc[0] |
|
try: |
|
preds = pipe(headlines, truncation=True) |
|
except Exception as e: |
|
st.error(f"Error for {stock}: {e}") |
|
continue |
|
labels = [p['label'] for p in preds] |
|
count = Counter(labels) |
|
num_pos, num_neg = count.get("Positive", 0), count.get("Negative", 0) |
|
predicted_trend = "Positive" if num_pos > num_neg else "Negative" |
|
match = predicted_trend == true_trend |
|
results.append(match) |
|
total_runtime = time.perf_counter() - total_start |
|
accuracy = sum(results) / len(results) if results else 0 |
|
st.write(f"**🔍 Evaluation Summary for {model_name}**") |
|
st.write(f"✅ Accuracy: {accuracy:.2%}") |
|
st.write(f"⏱ Total Runtime: {total_runtime:.2f} seconds") |
|
return accuracy |
|
|
|
|
|
st.title("Financial Sentiment Analysis with FinBERT") |
|
st.markdown("Upload your CSV files to train and evaluate a sentiment analysis model on financial news headlines.") |
|
|
|
st.header("Upload CSV Files") |
|
news_file = st.file_uploader("Upload Train_stock_news.csv", type="csv") |
|
trend_file = st.file_uploader("Upload Training_price_comparison.csv", type="csv") |
|
|
|
if news_file and trend_file: |
|
with st.spinner("Processing data..."): |
|
df = load_and_process_data(news_file, trend_file) |
|
check_class_imbalance(df) |
|
train_df, val_df, test_df = split_data(df) |
|
st.write(f"**Training stocks:** {len(train_df['Stock'].unique())}") |
|
st.write(f"**Validation stocks:** {len(val_df['Stock'].unique())}") |
|
st.write(f"**Test stocks:** {len(test_df['Stock'].unique())}") |
|
|
|
tokenizer = AutoTokenizer.from_pretrained("yiyanghkust/finbert-tone") |
|
tokenized_train, tokenized_val, tokenized_test = tokenize_datasets(train_df, val_df, test_df, tokenizer) |
|
|
|
model = load_model() |
|
|
|
with st.spinner("Training model..."): |
|
trainer = train_model(tokenized_train, tokenized_val, model) |
|
|
|
st.success("Model training completed!") |
|
|
|
|
|
original_pipe = pipeline("text-classification", model="yiyanghkust/finbert-tone") |
|
st.write("Evaluating original model...") |
|
original_accuracy = evaluate_model(original_pipe, test_df, model_name="Original Model") |
|
|
|
|
|
fine_tuned_pipe = pipeline("text-classification", model="./fine_tuned_model") |
|
st.write("Evaluating fine-tuned model...") |
|
fine_tuned_accuracy = evaluate_model(fine_tuned_pipe, test_df, model_name="Fine-tuned Model") |
|
|
|
st.write(f"**Comparison:**") |
|
st.write(f"Original Model Accuracy: {original_accuracy:.2%}") |
|
st.write(f"Fine-tuned Model Accuracy: {fine_tuned_accuracy:.2%}") |
|
else: |
|
st.warning("Please upload both CSV files to proceed.") |