Spaces:
Sleeping
Sleeping
import pandas as pd | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import Dataset, DataLoader | |
from sklearn.model_selection import TimeSeriesSplit | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.metrics import mean_squared_error | |
import numpy as np | |
import os | |
import gradio as gr | |
import time | |
import joblib | |
# Load and preprocess data (updated every retrain) | |
def load_data(): | |
# Load the latest CSV data (assume it's updated periodically) | |
data = pd.read_csv('BANKNIFTY_OPTION_CHAIN_data.csv') | |
# Feature engineering: Create technical indicators, lag features, etc. | |
data['SMA_20'] = data['close'].rolling(window=20).mean() | |
data['SMA_50'] = data['close'].rolling(window=50).mean() | |
data['RSI'] = 100 - (100 / (1 + (data['close'].diff(1).clip(lower=0).mean() / | |
data['close'].diff(1).clip(upper=0).mean()))) | |
data.fillna(0, inplace=True) | |
return data | |
# Define dataset class | |
class BankNiftyDataset(Dataset): | |
def __init__(self, data, seq_len, features): | |
self.data = data | |
self.seq_len = seq_len | |
self.features = features | |
def __len__(self): | |
return len(self.data) - self.seq_len | |
def __getitem__(self, idx): | |
seq_data = self.data.iloc[idx:idx + self.seq_len][self.features].values | |
label = self.data['close'].iloc[idx + self.seq_len] | |
return { | |
'features': torch.tensor(seq_data, dtype=torch.float32), | |
'label': torch.tensor(label, dtype=torch.float32) | |
} | |
# Transformer model with LSTM | |
class TransformerLSTMModel(nn.Module): | |
def __init__(self, input_dim, hidden_dim, output_dim, nhead=4, num_encoder_layers=2): | |
super(TransformerLSTMModel, self).__init__() | |
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=1, batch_first=True) | |
self.transformer_encoder = nn.TransformerEncoder( | |
nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=nhead), num_layers=num_encoder_layers | |
) | |
self.fc = nn.Linear(hidden_dim, output_dim) | |
def forward(self, x): | |
h0 = torch.zeros(1, x.size(0), 128).to(x.device) | |
c0 = torch.zeros(1, x.size(0), 128).to(x.device) | |
out, _ = self.lstm(x, (h0, c0)) | |
out = self.transformer_encoder(out) | |
out = self.fc(out[:, -1, :]) | |
return out | |
# Function to train the model and update it periodically | |
def retrain_model(data, seq_len=10, batch_size=32, n_splits=5): | |
input_dim = len(features) | |
model = TransformerLSTMModel(input_dim=input_dim, hidden_dim=128, output_dim=1) | |
optimizer = optim.Adam(model.parameters(), lr=0.001) | |
criterion = nn.MSELoss() | |
tscv = TimeSeriesSplit(n_splits=n_splits) | |
best_loss = float('inf') | |
for fold, (train_idx, val_idx) in enumerate(tscv.split(data)): | |
train_data, val_data = data.iloc[train_idx], data.iloc[val_idx] | |
train_dataset = BankNiftyDataset(train_data, seq_len, features) | |
val_dataset = BankNiftyDataset(val_data, seq_len, features) | |
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) | |
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) | |
for epoch in range(10): # Train for 10 epochs per fold | |
model.train() | |
for batch in train_loader: | |
features = batch['features'] | |
labels = batch['label'].unsqueeze(1) | |
optimizer.zero_grad() | |
outputs = model(features) | |
loss = criterion(outputs, labels) | |
loss.backward() | |
optimizer.step() | |
# Validation | |
model.eval() | |
val_loss = 0 | |
with torch.no_grad(): | |
for batch in val_loader: | |
features = batch['features'] | |
labels = batch['label'].unsqueeze(1) | |
outputs = model(features) | |
val_loss += criterion(outputs, labels).item() | |
val_loss /= len(val_loader) | |
print(f'Fold {fold + 1}, Epoch {epoch + 1}, Val Loss: {val_loss}') | |
# Save the best model | |
if val_loss < best_loss: | |
best_loss = val_loss | |
torch.save(model.state_dict(), 'best_model.pth') | |
print("Model updated with new best performance.") | |
# Periodically check for new data and retrain | |
def schedule_retraining(interval_hours=24): | |
while True: | |
print("Retraining model...") | |
data = load_data() # Load the latest data | |
retrain_model(data) # Retrain the model | |
print(f"Next retraining scheduled in {interval_hours} hours.") | |
time.sleep(interval_hours * 3600) # Sleep for the specified interval | |
# Gradio interface for user prediction after automatic retraining | |
def generate_strategy(open_, high, low, close, volume, oi, sma20, sma50, rsi): | |
# Prepare new data | |
new_data = pd.DataFrame({ | |
'open': [open_], 'high': [high], 'low': [low], 'close': [close], | |
'volume': [volume], 'oi': [oi], 'SMA_20': [sma20], 'SMA_50': [sma50], 'RSI': [rsi] | |
}) | |
new_data[features] = scaler.transform(new_data[features]) | |
seq_data = new_data[features].values | |
# Load best model | |
model = TransformerLSTMModel(input_dim=len(features), hidden_dim=128, output_dim=1) | |
model.load_state_dict(torch.load('best_model.pth')) | |
model.eval() | |
# Make prediction | |
with torch.no_grad(): | |
features = torch.tensor(seq_data, dtype=torch.float32).unsqueeze(0).unsqueeze(0) | |
output = model(features) | |
return output.item() | |
# Gradio interface for real-time predictions | |
inputs = [ | |
gr.inputs.Number(label="Open Price"), | |
gr.inputs.Number(label="High Price"), | |
gr.inputs.Number(label="Low Price"), | |
gr.inputs.Number(label="Close Price"), | |
gr.inputs.Number(label="Volume"), | |
gr.inputs.Number(label="Open Interest"), | |
gr.inputs.Number(label="SMA_20"), | |
gr.inputs.Number(label="SMA_50"), | |
gr.inputs.Number(label="RSI") | |
] | |
outputs = gr.outputs.Textbox(label="Predicted Strategy") | |
# Launch Gradio interface for strategy prediction | |
gr.Interface(fn=generate_strategy, inputs=inputs, outputs=outputs, title="BankNifty Strategy Generator").launch() | |
# Start automatic retraining (optional, can be run separately) | |
if __name__ == "__main__": | |
schedule_retraining(interval_hours=24) # Retrain every 24 hours | |