Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import Dataset, DataLoader | |
from sklearn.model_selection import train_test_split | |
from sklearn.preprocessing import StandardScaler | |
import joblib | |
import gradio as gr | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from torch.optim.lr_scheduler import ReduceLROnPlateau | |
from torch.nn import TransformerEncoder, TransformerEncoderLayer | |
import optuna | |
from sklearn.metrics import mean_squared_error | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
# Load and preprocess data | |
data = pd.read_csv('BANKNIFTY_OPTION_CHAIN_data.csv') | |
scaler = StandardScaler() | |
scaled_data = scaler.fit_transform(data[['open', 'high', 'low', 'close', 'volume', 'oi']]) | |
data[['open', 'high', 'low', 'close', 'volume', 'oi']] = scaled_data | |
joblib.dump(scaler, 'scaler.gz') | |
class BankNiftyDataset(Dataset): | |
def __init__(self, data, seq_len, expiry_type, target_cols=['close']): | |
self.data = data | |
self.seq_len = seq_len | |
self.expiry_type = expiry_type | |
self.target_cols = target_cols | |
if self.expiry_type == "weekly": | |
self.filtered_data = data[data['Expiry'].str.contains("W")] | |
elif self.expiry_type == "monthly": | |
self.filtered_data = data[~data['Expiry'].str.contains("W")] | |
def __len__(self): | |
return len(self.filtered_data) - self.seq_len | |
def __getitem__(self, idx): | |
seq_data = self.filtered_data.iloc[idx:idx+self.seq_len] | |
features = torch.tensor(seq_data[['open', 'high', 'low', 'close', 'volume', 'oi']].values, dtype=torch.float32) | |
label = torch.tensor(seq_data[self.target_cols].iloc[-1].values, dtype=torch.float32) | |
return features, label | |
class AdvancedModel(nn.Module): | |
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2, nhead=4, dropout=0.1): | |
super(AdvancedModel, self).__init__() | |
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout) | |
self.gru = nn.GRU(input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout) | |
encoder_layers = TransformerEncoderLayer(d_model=input_dim, nhead=nhead, dim_feedforward=hidden_dim, dropout=dropout) | |
self.transformer = TransformerEncoder(encoder_layers, num_layers=num_layers) | |
self.attention = nn.MultiheadAttention(hidden_dim, num_heads=nhead, dropout=dropout) | |
self.fc = nn.Sequential( | |
nn.Linear(hidden_dim * 3, hidden_dim), | |
nn.ReLU(), | |
nn.Dropout(dropout), | |
nn.Linear(hidden_dim, output_dim) | |
) | |
def forward(self, x): | |
lstm_out, _ = self.lstm(x) | |
gru_out, _ = self.gru(x) | |
transformer_out = self.transformer(x.transpose(0, 1)).transpose(0, 1) | |
combined = torch.cat((lstm_out[:, -1, :], gru_out[:, -1, :], transformer_out[:, -1, :]), dim=1) | |
out = self.fc(combined) | |
return out | |
def objective(trial): | |
input_dim = 6 | |
hidden_dim = trial.suggest_int("hidden_dim", 64, 256) | |
output_dim = len(target_cols) | |
num_layers = trial.suggest_int("num_layers", 1, 4) | |
nhead = trial.suggest_int("nhead", 2, 8) | |
dropout = trial.suggest_float("dropout", 0.1, 0.5) | |
lr = trial.suggest_loguniform("lr", 1e-5, 1e-2) | |
model = AdvancedModel(input_dim, hidden_dim, output_dim, num_layers, nhead, dropout) | |
optimizer = optim.Adam(model.parameters(), lr=lr) | |
criterion = nn.MSELoss() | |
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) | |
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) | |
for epoch in range(10): # Reduced epochs for faster optimization | |
train_model(model, optimizer, criterion, train_loader) | |
val_loss = evaluate_model(model, criterion, val_loader) | |
return val_loss | |
def train_model(model, optimizer, criterion, train_loader): | |
model.train() | |
for batch in train_loader: | |
features, label = batch | |
optimizer.zero_grad() | |
output = model(features) | |
loss = criterion(output, label) | |
loss.backward() | |
optimizer.step() | |
def evaluate_model(model, criterion, val_loader): | |
model.eval() | |
total_loss = 0 | |
with torch.no_grad(): | |
for batch in val_loader: | |
features, label = batch | |
output = model(features) | |
loss = criterion(output, label) | |
total_loss += loss.item() | |
return total_loss / len(val_loader) | |
def generate_strategy(model, expiry_type): | |
model.eval() | |
dataset = BankNiftyDataset(data, seq_len, expiry_type, target_cols) | |
loader = DataLoader(dataset, batch_size=1, shuffle=False) | |
with torch.no_grad(): | |
predictions = [] | |
for features, _ in loader: | |
output = model(features) | |
predictions.append(output.squeeze().tolist()) | |
return predictions | |
def retrain_model(): | |
new_data = pd.read_csv('BANKNIFTY_OPTION_CHAIN_data.csv') | |
new_scaled_data = scaler.transform(new_data[['open', 'high', 'low', 'close', 'volume', 'oi']]) | |
new_data[['open', 'high', 'low', 'close', 'volume', 'oi']] = new_scaled_data | |
new_train_data, new_val_data = train_test_split(new_data, test_size=0.2, random_state=42) | |
new_train_dataset = BankNiftyDataset(new_train_data, seq_len, "weekly", target_cols) | |
new_val_dataset = BankNiftyDataset(new_val_data, seq_len, "weekly", target_cols) | |
new_train_loader = DataLoader(new_train_dataset, batch_size=32, shuffle=True) | |
new_val_loader = DataLoader(new_val_dataset, batch_size=32, shuffle=False) | |
train_model(model, optimizer, criterion, new_train_loader) | |
val_loss = evaluate_model(model, criterion, new_val_loader) | |
print(f'Validation Loss after retraining: {val_loss:.4f}') | |
torch.save(model.state_dict(), 'retrained_model.pth') | |
def plot_predictions(predictions, actual_values, title): | |
plt.figure(figsize=(12, 6)) | |
plt.plot(predictions, label='Predictions') | |
plt.plot(actual_values, label='Actual Values') | |
plt.title(title) | |
plt.xlabel('Time') | |
plt.ylabel('Value') | |
plt.legend() | |
return plt | |
def display_strategies(): | |
weekly_predictions = generate_strategy(model, "weekly") | |
monthly_predictions = generate_strategy(model, "monthly") | |
weekly_actual = data[data['Expiry'].str.contains("W")][target_cols].values[-len(weekly_predictions):] | |
monthly_actual = data[~data['Expiry'].str.contains("W")][target_cols].values[-len(monthly_predictions):] | |
weekly_plot = plot_predictions(weekly_predictions, weekly_actual, "Weekly Expiry Predictions vs Actual") | |
monthly_plot = plot_predictions(monthly_predictions, monthly_actual, "Monthly Expiry Predictions vs Actual") | |
weekly_mse = mean_squared_error(weekly_actual, weekly_predictions) | |
monthly_mse = mean_squared_error(monthly_actual, monthly_predictions) | |
return ( | |
f"Weekly Expiry Strategy Predictions (MSE: {weekly_mse:.4f}):\n{weekly_predictions}\n\n" | |
f"Monthly Expiry Strategy Predictions (MSE: {monthly_mse:.4f}):\n{monthly_predictions}", | |
weekly_plot, | |
monthly_plot | |
) | |
# Hyperparameter optimization | |
target_cols = ['close', 'volume', 'oi'] # Predicting multiple targets | |
seq_len = 20 # Increased sequence length | |
train_data, val_data = train_test_split(data, test_size=0.2, random_state=42) | |
train_dataset = BankNiftyDataset(train_data, seq_len, "weekly", target_cols) | |
val_dataset = BankNiftyDataset(val_data, seq_len, "weekly", target_cols) | |
study = optuna.create_study(direction="minimize") | |
study.optimize(objective, n_trials=50) | |
best_params = study.best_params | |
print("Best hyperparameters:", best_params) | |
# Initialize the model with best parameters | |
input_dim = 6 | |
output_dim = len(target_cols) | |
model = AdvancedModel(input_dim, best_params['hidden_dim'], output_dim, best_params['num_layers'], best_params['nhead'], best_params['dropout']) | |
optimizer = optim.Adam(model.parameters(), lr=best_params['lr']) | |
criterion = nn.MSELoss() | |
# Learning rate scheduler | |
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) | |
# Training loop | |
num_epochs = 100 | |
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) | |
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) | |
for epoch in range(num_epochs): | |
train_model(model, optimizer, criterion, train_loader) | |
val_loss = evaluate_model(model, criterion, val_loader) | |
scheduler.step(val_loss) | |
print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss:.4f}") | |
# Save the final model | |
torch.save(model.state_dict(), 'final_model.pth') | |
# Scheduler for automatic retraining | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(retrain_model, 'interval', hours=1) | |
scheduler.start() | |
# Gradio interface | |
iface = gr.Interface( | |
fn=display_strategies, | |
inputs=None, | |
outputs=[ | |
gr.Textbox(label="Strategy Predictions"), | |
gr.Plot(label="Weekly Expiry Predictions"), | |
gr.Plot(label="Monthly Expiry Predictions") | |
], | |
title="Advanced BankNifty Option Chain Strategy Generator", | |
description="This model predicts close price, volume, and open interest for weekly and monthly expiries." | |
) | |
iface.launch() |