Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""H2H model.ipynb | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1uxbLGJ4l9i0bdWy43Oz4rgsyTdA5FTSd | |
""" | |
!pip install yfinance | |
# Data and computation | |
import pandas as pd | |
import numpy as np | |
# Plotting | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
sns.set_style('whitegrid') | |
plt.style.use("fivethirtyeight") | |
# Yahoo Finance data import | |
import yfinance as yf | |
# Machine Learning | |
from sklearn.model_selection import train_test_split | |
from sklearn.tree import DecisionTreeClassifier | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, roc_curve, auc | |
from sklearn.preprocessing import StandardScaler, RobustScaler | |
from sklearn.pipeline import Pipeline | |
import scipy.stats as stats | |
from sklearn.model_selection import GridSearchCV | |
# Misc | |
import warnings | |
warnings.filterwarnings('ignore') | |
# Select stocks and date range | |
symbols = ['ADANIENT.NS', | |
'ADANIPORTS.NS', 'APOLLOHOSP.NS', 'ASIANPAINT.NS', | |
'AXISBANK.NS', 'BAJAJ-AUTO.NS', 'BAJFINANCE.NS', 'BAJAJFINSV.NS', 'BEL.NS', 'BHARTIARTL.NS', 'CIPLA.NS', | |
'COALINDIA.NS', 'DRREDDY.NS', 'EICHERMOT.NS', 'GRASIM.NS', 'HCLTECH.NS', 'HDFCBANK.NS', | |
'HDFCLIFE.NS', 'HEROMOTOCO.NS', 'HINDALCO.NS', 'HINDUNILVR.NS', 'ICICIBANK.NS', 'INDUSINDBK.NS', | |
'INFY.NS', 'ITC.NS', 'JIOFIN.NS', 'JSWSTEEL.NS', 'KOTAKBANK.NS', 'LT.NS', 'M&M.NS', 'MARUTI.NS', | |
'NESTLEIND.NS', 'NTPC.NS', 'ONGC.NS', 'POWERGRID.NS', 'RELIANCE.NS', 'SBILIFE.NS', 'SHRIRAMFIN.NS', | |
'SBIN.NS', 'SUNPHARMA.NS', 'TATACONSUM.NS', 'TCS.NS', | |
'TATAMOTORS.NS', 'TATASTEEL.NS', 'TECHM.NS', | |
'TITAN.NS', 'TRENT.NS', 'ULTRACEMCO.NS', | |
'WIPRO.NS', | |
'ETERNAL.NS'] | |
start_date = '2024-07-31' | |
end_date = '2025-07-31' | |
# Download daily close data for both stocks | |
raw_data = yf.download(symbols, start=start_date, end=end_date) | |
# Flatten MultiIndex columns | |
raw_data.columns = ['_'.join(col).strip() for col in raw_data.columns.values] | |
# For simplicity, stack to long format and process each stock similarly | |
data = raw_data.copy() | |
data | |
# Helper functions | |
def SMA(series, period): | |
return series.rolling(window=period).mean() | |
def EMA(series, period): | |
return series.ewm(span=period, adjust=False).mean() | |
def MACD(series, fast=12, slow=26, signal=9): | |
ema_fast = EMA(series, fast) | |
ema_slow = EMA(series, slow) | |
macd = ema_fast - ema_slow | |
macd_signal = EMA(macd, signal) | |
macd_hist = macd - macd_signal | |
return macd, macd_signal, macd_hist | |
def RSI(series, period=14): | |
delta = series.diff() | |
gain = (delta.where(delta > 0, 0)).ewm(alpha=1/period, min_periods=period).mean() | |
loss = (-delta.where(delta < 0, 0)).ewm(alpha=1/period, min_periods=period).mean() | |
RS = gain / loss | |
return 100 - (100 / (1 + RS)) | |
def create_volatility_features(df): | |
# Calculate returns if not exists | |
if 'return_1d' not in df.columns: | |
df['return_1d'] = df['Close'].pct_change() | |
# Volatility features (crucial for logistic regression) | |
for period in [5, 10, 20, 30]: | |
df[f'volatility_{period}d'] = df['return_1d'].rolling(period).std() | |
# Volatility ratios | |
df['vol_ratio_5_20'] = df['volatility_5d'] / df['volatility_20d'] | |
df['vol_ratio_10_20'] = df['volatility_10d'] / df['volatility_20d'] | |
# Volatility rank (where current vol sits in historical range) | |
df['vol_rank_20'] = df['volatility_5d'].rolling(20).rank(pct=True) | |
df['vol_rank_50'] = df['volatility_5d'].rolling(50).rank(pct=True) | |
return df | |
def create_enhanced_lag_features(df): | |
"""Add comprehensive lag features - Critical for time series""" | |
#print("Adding enhanced lag features...") | |
# Price momentum lags | |
for lag in [1, 2, 3, 5, 10]: | |
df[f'return_lag_{lag}'] = df['return_1d'].shift(lag) | |
# Technical indicator lags | |
for lag in [1, 2, 3]: | |
if 'RSI14' in df.columns: | |
df[f'rsi_lag_{lag}'] = df['RSI14'].shift(lag) | |
if 'MACD' in df.columns: | |
df[f'macd_lag_{lag}'] = df['MACD'].shift(lag) | |
# Volume lags | |
if 'volume_ratio_20' in df.columns: | |
for lag in [1, 2]: | |
df[f'volume_ratio_lag_{lag}'] = df['volume_ratio_20'].shift(lag) | |
return df | |
def create_volume_features(df): | |
"""Enhanced volume features""" | |
#print("Adding volume features...") | |
# Volume moving averages | |
df['volume_sma_10'] = df['Volume'].rolling(10).mean() | |
df['volume_sma_20'] = df['Volume'].rolling(20).mean() | |
df['volume_sma_50'] = df['Volume'].rolling(50).mean() | |
# Volume ratios | |
df['volume_ratio_10'] = df['Volume'] / df['volume_sma_10'] | |
df['volume_ratio_20'] = df['Volume'] / df['volume_sma_20'] | |
df['volume_ratio_50'] = df['Volume'] / df['volume_sma_50'] | |
# Price-volume features | |
df['price_volume'] = df['Close'] * df['Volume'] | |
df['pv_sma_5'] = df['price_volume'].rolling(5).mean() | |
# Volume momentum | |
df['volume_momentum_5'] = df['Volume'] / df['Volume'].shift(5) | |
return df | |
def create_momentum_features(df): | |
"""Add momentum features""" | |
#print("Adding momentum features...") | |
# Price momentum | |
for period in [3, 5, 10, 20]: | |
df[f'momentum_{period}d'] = df['Close'] / df['Close'].shift(period) - 1 | |
# Rate of change | |
for period in [5, 10]: | |
df[f'roc_{period}d'] = (df['Close'] - df['Close'].shift(period)) / df['Close'].shift(period) | |
return df | |
def create_position_features(df): | |
"""Add price position features""" | |
#print("Adding position features...") | |
# Price position in recent range | |
for period in [10, 20, 50]: | |
df[f'high_{period}d'] = df['High'].rolling(period).max() | |
df[f'low_{period}d'] = df['Low'].rolling(period).min() | |
df[f'price_position_{period}'] = (df['Close'] - df[f'low_{period}d']) / (df[f'high_{period}d'] - df[f'low_{period}d']) | |
# Bollinger Band position (if BB exists) | |
if 'SMA20' in df.columns: | |
bb_std = df['Close'].rolling(20).std() | |
df['bb_upper'] = df['SMA20'] + (bb_std * 2) | |
df['bb_lower'] = df['SMA20'] - (bb_std * 2) | |
df['bb_position'] = (df['Close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower']) | |
return df | |
def create_rolling_stats(df): | |
"""Add rolling statistical features""" | |
#print("Adding rolling statistics...") | |
# Rolling statistics of returns | |
for period in [5, 10]: | |
df[f'return_mean_{period}d'] = df['return_1d'].rolling(period).mean() | |
df[f'return_std_{period}d'] = df['return_1d'].rolling(period).std() | |
df[f'return_skew_{period}d'] = df['return_1d'].rolling(period).skew() | |
df[f'return_kurt_{period}d'] = df['return_1d'].rolling(period).kurt() | |
# Rolling statistics of RSI | |
if 'RSI14' in df.columns: | |
df['rsi_mean_5d'] = df['RSI14'].rolling(5).mean() | |
df['rsi_std_5d'] = df['RSI14'].rolling(5).std() | |
return df | |
# Process each stock separately and then concatenate for ML | |
all_ml_data = [] | |
for ticker in symbols: | |
df = pd.DataFrame({ | |
'Open': data[f'Open_{ticker}'], | |
'High': data[f'High_{ticker}'], | |
'Low': data[f'Low_{ticker}'], | |
'Close': data[f'Close_{ticker}'], | |
'Volume': data[f'Volume_{ticker}'] | |
}) | |
df['SMA20'] = SMA(df['Close'], 20) | |
df['SMA50'] = SMA(df['Close'], 50) | |
df['EMA20'] = EMA(df['Close'], 20) | |
df['EMA50'] = EMA(df['Close'], 50) | |
df['RSI14'] = RSI(df['Close'], 14) | |
df['RSI20'] = RSI(df['Close'], 20) | |
df['MACD'], df['MACD_signal'], df['MACD_hist'] = MACD(df['Close']) | |
df = create_volatility_features(df) | |
df = create_enhanced_lag_features(df) | |
df = create_volume_features(df) | |
df = create_momentum_features(df) | |
df = create_position_features(df) | |
# Feature: SMA 20 above SMA 50 (bullish crossover) | |
df['SMA_crossover'] = (df['SMA20'] > df['SMA50']).astype(int) | |
# Feature: RSI oversold signal | |
df['RSI_oversold'] = (df['RSI14'] < 30).astype(int) | |
# Target: next-day up/down | |
df['next_close'] = df['Close'].shift(-1) | |
df['target'] = (df['next_close'] > df['Close']).astype(int) | |
df['ticker'] = ticker | |
# Drop rows with NaN (from indicator calculations) | |
df = df.dropna().copy() | |
all_ml_data.append(df) | |
# Concatenate all stocks | |
ml_data = pd.concat(all_ml_data) | |
ml_data.reset_index(inplace=True) | |
ml_data | |
ml_data.columns | |
for ticker in ml_data['ticker'].unique(): | |
plt.figure(figsize=(20,12)) | |
plt.plot( | |
ml_data[ml_data['ticker'] == ticker]['Date'], | |
ml_data[ml_data['ticker'] == ticker]['Close'], | |
label=f"{ticker}" | |
) | |
plt.title("Closing Price Over Time") | |
plt.xlabel("Date") | |
plt.ylabel("Close Price (USD)") | |
plt.legend(loc='upper left') | |
plt.show() | |
sample = ml_data[ml_data['ticker'] == 'RELIANCE.NS'].copy() | |
plt.figure(figsize=(14,7)) | |
plt.plot(sample['Date'], sample['Close'], label='Close') | |
plt.plot(sample['Date'], sample['SMA20'], label='SMA20') | |
plt.plot(sample['Date'], sample['SMA50'], label='SMA50') | |
plt.title('RELIANCE: Close with SMA20 & SMA50') | |
plt.legend() | |
plt.show() | |
plt.figure(figsize=(14,4)) | |
plt.plot(sample['Date'], sample['RSI14'], label='RSI14', color='green') | |
plt.axhline(70, linestyle='--', color='r') | |
plt.axhline(30, linestyle='--', color='b') | |
plt.title('RELIANCE: RSI14 Time Series') | |
plt.legend() | |
plt.show() | |
plt.figure(figsize=(14,4)) | |
plt.plot(sample['Date'], sample['MACD'], label='MACD') | |
plt.plot(sample['Date'], sample['MACD_signal'], label='MACD Signal') | |
plt.title('RELIANCE: MACD & Signal') | |
plt.legend() | |
plt.show() | |
# Select features | |
features = [ | |
'Close', 'Volume', 'SMA20', 'SMA50', 'EMA20', 'EMA50', | |
'RSI14', 'MACD', 'MACD_signal', 'MACD_hist', | |
'SMA_crossover', 'RSI_oversold', | |
'return_1d', 'volatility_5d', 'volatility_10d', 'volatility_20d', | |
'volatility_30d', 'vol_ratio_5_20', 'vol_ratio_10_20', 'vol_rank_20', | |
'vol_rank_50', 'return_lag_1', 'return_lag_2', 'return_lag_3', | |
'return_lag_5', 'return_lag_10', 'rsi_lag_1', 'macd_lag_1', 'rsi_lag_2', | |
'macd_lag_2', 'rsi_lag_3', 'macd_lag_3', 'volume_sma_10', | |
'volume_sma_20', 'volume_sma_50', 'volume_ratio_10', 'volume_ratio_20', | |
'volume_ratio_50', 'price_volume', 'pv_sma_5', 'volume_momentum_5', | |
'momentum_3d', 'momentum_5d', 'momentum_10d', 'momentum_20d', 'roc_5d', | |
'roc_10d', 'high_10d', 'low_10d', 'price_position_10', 'high_20d', | |
'low_20d', 'price_position_20', 'high_50d', 'low_50d', | |
'price_position_50', 'bb_upper', 'bb_lower', 'bb_position', | |
'SMA_crossover', 'RSI_oversold', 'next_close' | |
] | |
target = 'target' | |
# Standardize features (recommended for LR) | |
X = ml_data[features] | |
y = ml_data[target] | |
scaler = RobustScaler() | |
X_scaled = scaler.fit_transform(X) | |
sns.countplot(x='target', data=ml_data) | |
plt.title("Class Balance: Next-Day Up/Down Distribution") | |
plt.xlabel("0 = Down, 1 = Up") | |
plt.ylabel("Count") | |
plt.show() | |
# # Sort by date (if multi-stock, by ticker as well) | |
# ml_data = ml_data.sort_values(['ticker', 'Date']) | |
# # Chronological split: 80% train, 20% test | |
# split_idx = int(0.8 * len(ml_data)) | |
# X_train, X_test = X_scaled[:split_idx], X_scaled[split_idx:] | |
# y_train, y_test = y[:split_idx], y[split_idx:] | |
from sklearn.model_selection import TimeSeriesSplit | |
tscv = TimeSeriesSplit(n_splits=5) | |
# Use only the last fold for final testing | |
for train_index, test_index in tscv.split(X_scaled): | |
pass # this will give you the last split | |
X_train, X_test = X_scaled[train_index], X_scaled[test_index] | |
y_train, y_test = y.iloc[train_index], y.iloc[test_index] | |
# Hyperparameter tuning | |
from sklearn.model_selection import GridSearchCV | |
param_grid = { | |
'max_depth': [5, 7, 10, 15], | |
'min_samples_split': [2, 5, 10], | |
'min_samples_leaf': [1, 2, 4], | |
'criterion': ['gini', 'entropy'] | |
} | |
# Decision Tree Classifier | |
dt_model = GridSearchCV(DecisionTreeClassifier(random_state=42), param_grid, cv=tscv, n_jobs=-1) | |
dt_model.fit(X_train, y_train) | |
dt_preds = dt_model.predict(X_test) | |
# Logistic Regression | |
lr_model = LogisticRegression(random_state=42, max_iter=1000) | |
lr_model.fit(X_train, y_train) | |
lr_preds = lr_model.predict(X_test) | |
# Performance | |
def print_metrics(model_name, y_true, y_pred): | |
print(f"\n=== {model_name} ===") | |
print("Accuracy:", accuracy_score(y_true, y_pred)) | |
print(classification_report(y_true, y_pred, target_names=['Down','Up'])) | |
print_metrics("Decision Tree", y_test, dt_preds) | |
print_metrics("Logistic Regression", y_test, lr_preds) | |
# Decision Tree feature importances | |
importances = pd.Series(dt_model.best_index_, index=features) | |
importances = importances.sort_values(ascending=False) | |
print("\nTop Feature Importances (Decision Tree):") | |
print(importances) | |
plt.figure(figsize=(35,20)) | |
corr = ml_data[[ | |
'Close', 'Volume', 'SMA20', 'SMA50', 'EMA20', 'EMA50', | |
'RSI14', 'MACD', 'MACD_signal', 'MACD_hist', | |
'SMA_crossover', 'RSI_oversold', | |
'return_1d', 'volatility_5d', 'volatility_10d', 'volatility_20d', | |
'volatility_30d', 'vol_ratio_5_20', 'vol_ratio_10_20', 'vol_rank_20', | |
'vol_rank_50', 'return_lag_1', 'return_lag_2', 'return_lag_3', | |
'return_lag_5', 'return_lag_10', 'rsi_lag_1', 'macd_lag_1', 'rsi_lag_2', | |
]].corr() | |
sns.heatmap(corr, annot=True, cmap='coolwarm', center=0) | |
plt.title("Correlation Heatmap of Features") | |
plt.show() | |
from sklearn.metrics import confusion_matrix | |
import seaborn as sns | |
# Calculate the confusion matrix | |
cm = confusion_matrix(y_test, lr_preds) | |
# Display the confusion matrix using a heatmap | |
plt.figure(figsize=(8, 6)) | |
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Down', 'Up'], yticklabels=['Down', 'Up']) | |
plt.xlabel('Predicted') | |
plt.ylabel('Actual') | |
plt.title('Confusion Matrix for LR model') | |
plt.show() | |
print("\n=======Confusion Matrix========\n") | |
print(cm) | |
from sklearn.metrics import precision_recall_curve, average_precision_score | |
# Get predicted probabilities for the positive class | |
y_scores = lr_model.predict_proba(X_test)[:, 1] | |
# Calculate precision and recall for different thresholds | |
precision, recall, _ = precision_recall_curve(y_test, y_scores) | |
# Calculate the Average Precision (AP) score | |
average_precision = average_precision_score(y_test, y_scores) | |
# Plot the Precision-Recall curve | |
plt.figure(figsize=(8, 6)) | |
plt.plot(recall, precision, color='red', lw=2, label='Precision-Recall curve (AP = %0.2f)' % average_precision) | |
plt.xlabel('Recall') | |
plt.ylabel('Precision') | |
plt.title('Precision-Recall Curve') | |
plt.ylim([0.0, 1.05]) | |
plt.xlim([0.0, 1.0]) | |
plt.legend(loc="lower left") | |
plt.tight_layout() | |
plt.show() | |
print(f"\nAverage Precision (AP) for Logistic Regression: {average_precision:.4f}") | |
from sklearn.metrics import roc_curve, auc | |
# Calculate ROC curve | |
fpr, tpr, thresholds = roc_curve(y_test, lr_model.predict_proba(X_test)[:,1]) | |
roc_auc = auc(fpr, tpr) | |
# Plot ROC curve | |
plt.figure(figsize=(8, 6)) | |
plt.plot(fpr, tpr, color='red', lw=2, label='ROC curve (area = %0.2f)' % roc_auc) | |
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') | |
plt.xlim([0.0, 1.0]) | |
plt.ylim([0.0, 1.05]) | |
plt.xlabel('False Positive Rate') | |
plt.ylabel('True Positive Rate') | |
plt.title('Receiver Operating Characteristic (ROC) Curve') | |
plt.legend(loc="lower right") | |
plt.show() | |
print(f"\nAUC for Logistic Regression: {roc_auc:.4f}") | |
from sklearn.metrics import f1_score, classification_report | |
# Calculate F1 score | |
f1 = f1_score(y_test, lr_preds) | |
print(f"\nF1 Score for Logistic Regression: {f1:.4f}") | |
# Print classification report | |
print("\nClassification Report for Logistic Regression:") | |
print(classification_report(y_test, lr_preds, target_names=['Down', 'Up'])) | |
# Calculate training accuracy for Logistic Regression | |
lr_train_accuracy = lr_model.score(X_train, y_train) | |
print(f"\nLogistic Regression Training Accuracy: {lr_train_accuracy:.4f}") | |
print(f"Logistic Regression Test Accuracy: {accuracy_score(y_test, lr_preds):.4f}") | |
import pickle | |
# Save the Logistic Regression model | |
filename = 'logistic_regression_model.pkl' | |
pickle.dump(lr_model, open(filename, 'wb')) | |
print(f"Logistic Regression model saved to {filename}") | |
import pickle | |
# Load the saved Logistic Regression model | |
filename = 'logistic_regression_model.pkl' | |
loaded_model = pickle.load(open(filename, 'rb')) | |
print(f"Logistic Regression model loaded from {filename}") | |
# Test the loaded model | |
loaded_preds = loaded_model.predict(X_test) | |
# Evaluate the loaded model's performance | |
print("\n=== Loaded Logistic Regression Model Performance ===") | |
print("\n Accuracy:", accuracy_score(y_test, loaded_preds)," \n") | |
print(classification_report(y_test, loaded_preds, target_names=['Down','Up'])) |