BN / app.py
Avinash109's picture
Update app.py
49cea6e verified
raw
history blame
11.7 kB
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import joblib
import gradio as gr
from apscheduler.schedulers.background import BackgroundScheduler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import optuna
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import seaborn as sns
import logging
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Load and preprocess data
try:
data = pd.read_csv('BANKNIFTY_OPTION_CHAIN_data.csv')
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']])
data[['open', 'high', 'low', 'close', 'volume', 'oi']] = scaled_data
joblib.dump(scaler, 'scaler.gz')
logging.info(f"Data loaded and preprocessed. Total data points: {len(data)}")
except Exception as e:
logging.error(f"Error in data loading and preprocessing: {str(e)}")
raise
class BankNiftyDataset(Dataset):
def __init__(self, data, seq_len, expiry_type, target_cols=['close']):
self.data = data
self.seq_len = seq_len
self.expiry_type = expiry_type
self.target_cols = target_cols
if self.expiry_type == "weekly":
self.filtered_data = data[data['Expiry'].str.contains("W")]
elif self.expiry_type == "monthly":
self.filtered_data = data[~data['Expiry'].str.contains("W")]
else:
self.filtered_data = data
if len(self.filtered_data) < self.seq_len:
raise ValueError(f"Not enough data points for the specified sequence length. "
f"Got {len(self.filtered_data)} data points, need at least {self.seq_len}.")
logging.info(f"{expiry_type.capitalize()} dataset created with {len(self.filtered_data)} data points")
def __len__(self):
return max(0, len(self.filtered_data) - self.seq_len + 1)
def __getitem__(self, idx):
if idx < 0 or idx >= len(self):
raise IndexError("Index out of range")
seq_data = self.filtered_data.iloc[idx:idx+self.seq_len]
features = torch.tensor(seq_data[['open', 'high', 'low', 'close', 'volume', 'oi']].values, dtype=torch.float32)
label = torch.tensor(seq_data[self.target_cols].iloc[-1].values, dtype=torch.float32)
return features, label
class AdvancedModel(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, nhead=4, dropout=0.1):
super(AdvancedModel, self).__init__()
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout)
self.gru = nn.GRU(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout)
transformer_dim = (input_dim // nhead) * nhead
self.input_proj = nn.Linear(input_dim, transformer_dim) if input_dim != transformer_dim else nn.Identity()
encoder_layers = TransformerEncoderLayer(d_model=transformer_dim, nhead=nhead, dim_feedforward=hidden_dim, dropout=dropout)
self.transformer = TransformerEncoder(encoder_layers, num_layers=num_layers)
self.attention = nn.MultiheadAttention(hidden_dim, num_heads=nhead, dropout=dropout)
self.fc = nn.Sequential(
nn.Linear(hidden_dim * 3, hidden_dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
lstm_out, _ = self.lstm(x)
gru_out, _ = self.gru(x)
transformer_input = self.input_proj(x)
transformer_out = self.transformer(transformer_input.transpose(0, 1)).transpose(0, 1)
combined = torch.cat((lstm_out[:, -1, :], gru_out[:, -1, :], transformer_out[:, -1, :]), dim=1)
out = self.fc(combined)
return out
def objective(trial):
try:
input_dim = 6
hidden_dim = trial.suggest_int("hidden_dim", 64, 256)
output_dim = len(target_cols)
num_layers = trial.suggest_int("num_layers", 1, 4)
max_nhead = min(8, hidden_dim // 8)
nhead = trial.suggest_int("nhead", 2, max_nhead)
hidden_dim = (hidden_dim // nhead) * nhead
dropout = trial.suggest_float("dropout", 0.1, 0.5)
lr = trial.suggest_loguniform("lr", 1e-5, 1e-2)
model = AdvancedModel(input_dim, hidden_dim, output_dim, num_layers, nhead, dropout)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
for epoch in range(10):
train_model(model, optimizer, criterion, train_loader)
val_loss = evaluate_model(model, criterion, val_loader)
return val_loss
except Exception as e:
logging.error(f"Error in objective function: {str(e)}")
return float('inf')
def train_model(model, optimizer, criterion, train_loader):
model.train()
for batch in train_loader:
features, label = batch
optimizer.zero_grad()
output = model(features)
loss = criterion(output, label)
loss.backward()
optimizer.step()
def evaluate_model(model, criterion, val_loader):
model.eval()
total_loss = 0
with torch.no_grad():
for batch in val_loader:
features, label = batch
output = model(features)
loss = criterion(output, label)
total_loss += loss.item()
return total_loss / len(val_loader)
def generate_strategy(model, expiry_type):
model.eval()
dataset = BankNiftyDataset(data, seq_len, expiry_type, target_cols)
loader = DataLoader(dataset, batch_size=1, shuffle=False)
with torch.no_grad():
predictions = []
for features, _ in loader:
output = model(features)
predictions.append(output.squeeze().tolist())
return predictions
def retrain_model():
try:
new_data = pd.read_csv('BANKNIFTY_OPTION_CHAIN_data.csv')
new_scaled_data = scaler.transform(new_data[['open', 'high', 'low', 'close', 'volume', 'oi']])
new_data[['open', 'high', 'low', 'close', 'volume', 'oi']] = new_scaled_data
new_train_data, new_val_data = train_test_split(new_data, test_size=0.2, random_state=42)
new_train_dataset = BankNiftyDataset(new_train_data, seq_len, "weekly", target_cols)
new_val_dataset = BankNiftyDataset(new_val_data, seq_len, "weekly", target_cols)
new_train_loader = DataLoader(new_train_dataset, batch_size=32, shuffle=True)
new_val_loader = DataLoader(new_val_dataset, batch_size=32, shuffle=False)
train_model(model, optimizer, criterion, new_train_loader)
val_loss = evaluate_model(model, criterion, new_val_loader)
logging.info(f'Validation Loss after retraining: {val_loss:.4f}')
torch.save(model.state_dict(), 'retrained_model.pth')
except Exception as e:
logging.error(f"Error in retraining model: {str(e)}")
def plot_predictions(predictions, actual_values, title):
plt.figure(figsize=(12, 6))
plt.plot(predictions, label='Predictions')
plt.plot(actual_values, label='Actual Values')
plt.title(title)
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
return plt
def display_strategies():
try:
weekly_predictions = generate_strategy(model, "weekly")
monthly_predictions = generate_strategy(model, "monthly")
weekly_actual = data[data['Expiry'].str.contains("W")][target_cols].values[-len(weekly_predictions):]
monthly_actual = data[~data['Expiry'].str.contains("W")][target_cols].values[-len(monthly_predictions):]
weekly_plot = plot_predictions(weekly_predictions, weekly_actual, "Weekly Expiry Predictions vs Actual")
monthly_plot = plot_predictions(monthly_predictions, monthly_actual, "Monthly Expiry Predictions vs Actual")
weekly_mse = mean_squared_error(weekly_actual, weekly_predictions)
monthly_mse = mean_squared_error(monthly_actual, monthly_predictions)
return (
f"Weekly Expiry Strategy Predictions (MSE: {weekly_mse:.4f}):\n{weekly_predictions}\n\n"
f"Monthly Expiry Strategy Predictions (MSE: {monthly_mse:.4f}):\n{monthly_predictions}",
weekly_plot,
monthly_plot
)
except Exception as e:
logging.error(f"Error in displaying strategies: {str(e)}")
return "An error occurred while generating strategies.", None, None
# Main execution
if __name__ == "__main__":
try:
target_cols = ['close', 'volume', 'oi']
seq_len = 20
logging.info(f"Total data points: {len(data)}")
logging.info(f"Weekly data points: {len(data[data['Expiry'].str.contains('W')])}")
logging.info(f"Monthly data points: {len(data[~data['Expiry'].str.contains('W')])}")
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42)
logging.info(f"Train data points: {len(train_data)}")
logging.info(f"Validation data points: {len(val_data)}")
train_dataset = BankNiftyDataset(train_data, seq_len, "weekly", target_cols)
val_dataset = BankNiftyDataset(val_data, seq_len, "weekly", target_cols)
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=50)
best_params = study.best_params
logging.info(f"Best hyperparameters: {best_params}")
input_dim = 6
output_dim = len(target_cols)
model = AdvancedModel(input_dim, best_params['hidden_dim'], output_dim, best_params['num_layers'], best_params['nhead'], best_params['dropout'])
optimizer = optim.Adam(model.parameters(), lr=best_params['lr'])
criterion = nn.MSELoss()
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True)
num_epochs = 100
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
for epoch in range(num_epochs):
train_model(model, optimizer, criterion, train_loader)
val_loss = evaluate_model(model, criterion, val_loader)
scheduler.step(val_loss)
logging.info(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}")
torch.save(model.state_dict(), 'final_model.pth')
retraining_scheduler = BackgroundScheduler()
retraining_scheduler.add_job(retrain_model, 'interval', hours=1)
retraining_scheduler.start()
iface = gr.Interface(
fn=display_strategies,
inputs=None,
outputs=[
gr.Textbox(label="Strategy Predictions"),
gr.Plot(label="Weekly Expiry Predictions"),
gr.Plot(label="Monthly Expiry Predictions")
],
title="Advanced BankNifty Option Chain Strategy Generator",
description="This model predicts close price, volume, and open interest for weekly and monthly expiries."
)
iface.launch()
except Exception as e:
logging.error(f"Error in main execution: {str(e)}")