|
import yfinance as yf |
|
import pandas as pd |
|
import numpy as np |
|
from sklearn.preprocessing import MinMaxScaler |
|
from tensorflow import keras |
|
from tensorflow.keras.models import Sequential |
|
from tensorflow.keras.layers import Dense, LSTM, GRU |
|
from statsmodels.tsa.arima.model import ARIMA |
|
from kerastuner.tuners import RandomSearch |
|
|
|
|
|
def get_stock_data(symbol, start_date, end_date): |
|
stock_data = yf.download(symbol, start=start_date, end=end_date) |
|
return stock_data['Close'] |
|
|
|
|
|
def prepare_data(data): |
|
scaler = MinMaxScaler(feature_range=(0, 1)) |
|
scaled_data = scaler.fit_transform(data.values.reshape(-1, 1)) |
|
return scaled_data, scaler |
|
|
|
|
|
def create_lstm_model(input_shape, hp): |
|
model = Sequential() |
|
model.add(LSTM(units=hp.Int('units', min_value=32, max_value=512, step=32), |
|
return_sequences=True, input_shape=input_shape)) |
|
model.add(LSTM(units=hp.Int('units', min_value=32, max_value=512, step=32), |
|
return_sequences=True)) |
|
model.add(LSTM(units=hp.Int('units', min_value=32, max_value=512, step=32))) |
|
model.add(Dense(units=1)) |
|
model.compile(optimizer='adam', loss='mean_squared_error') |
|
return model |
|
|
|
|
|
def create_gru_model(input_shape, hp): |
|
model = Sequential() |
|
model.add(GRU(units=hp.Int('units', min_value=32, max_value=512, step=32), |
|
return_sequences=True, input_shape=input_shape)) |
|
model.add(GRU(units=hp.Int('units', min_value=32, max_value=512, step=32), |
|
return_sequences=True)) |
|
model.add(GRU(units=hp.Int('units', min_value=32, max_value=512, step=32))) |
|
model.add(Dense(units=1)) |
|
model.compile(optimizer='adam', loss='mean_squared_error') |
|
return model |
|
|
|
|
|
def create_arima_model(data, hp): |
|
|
|
p = hp.Int('p', min_value=1, max_value=5, step=1) |
|
d = hp.Int('d', min_value=0, max_value=1, step=1) |
|
q = hp.Int('q', min_value=1, max_value=5, step=1) |
|
|
|
model = ARIMA(data, order=(p, d, q)) |
|
return model |
|
|
|
|
|
def create_tuner(model_builder, objective): |
|
return RandomSearch(model_builder, |
|
objective=objective, |
|
max_epochs=10, |
|
factor=3, |
|
directory='keras_tuner_logs', |
|
project_name='stock_price_forecasting') |
|
|
|
|
|
def tune_arima_model(data, tuner, hp): |
|
|
|
model = tuner.oracle.get_best_trials(1)[0].hyperparameters.values |
|
order = (model['p'], model['d'], model['q']) |
|
|
|
|
|
arima_model = ARIMA(data, order=order) |
|
arima_model_fit = arima_model.fit() |
|
|
|
|
|
forecast_steps = tuner.oracle.get_best_trials(1)[0].metrics.values['steps'] |
|
arima_forecast = arima_model_fit.get_forecast(steps=forecast_steps) |
|
arima_predictions = arima_forecast.predicted_mean |
|
|
|
return arima_predictions |
|
|
|
|
|
def ensemble_forecast(predictions_list): |
|
return pd.DataFrame(predictions_list).mean(axis=0) |
|
|
|
|
|
symbol = 'AAPL' |
|
start_date = '2021-01-01' |
|
end_date = '2022-01-01' |
|
stock_prices = get_stock_data(symbol, start_date, end_date) |
|
|
|
|
|
input_shape = (60, 1) |
|
|
|
|
|
def objective(hp): |
|
lstm_model = create_lstm_model(input_shape, hp) |
|
lstm_model.fit(x_train, y_train, epochs=10, validation_split=0.2) |
|
loss = lstm_model.evaluate(x_test, y_test) |
|
return loss |
|
|
|
|
|
tuner_lstm = create_tuner(create_lstm_model, objective) |
|
|
|
|
|
scaled_data, scaler = prepare_data(stock_prices) |
|
input_data = scaled_data.reshape(-1, 1) |
|
|
|
train_size = int(len(input_data) * 0.80) |
|
train_data, test_data = input_data[0:train_size, :], input_data[train_size:len(input_data), :] |
|
|
|
x_train, y_train = [], [] |
|
for i in range(60, len(train_data)): |
|
x_train.append(train_data[i - 60:i, 0]) |
|
y_train.append(train_data[i, 0]) |
|
|
|
x_train, y_train = np.array(x_train), np.array(y_train) |
|
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1)) |
|
|
|
x_test, y_test = [], [] |
|
for i in range(60, len(test_data)): |
|
x_test.append(test_data[i - 60:i, 0]) |
|
y_test.append(test_data[i, 0]) |
|
|
|
x_test, y_test = np.array(x_test), np.array(y_test) |
|
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1)) |
|
|
|
|
|
tuner_lstm.search(x_train, y_train, epochs=10, validation_split=0.2) |
|
|
|
|
|
best_lstm_model = tuner_lstm.get_best_models(num_models=1)[0] |
|
|
|
|
|
lstm_predictions = best_lstm_model.predict(x_test) |
|
lstm_predictions = scaler.inverse_transform(lstm_predictions) |
|
|
|
|
|
tuner_arima = create_tuner(create_arima_model, 'val_loss') |
|
tuner_arima.search(stock_prices, epochs=10, validation_split=0.2) |
|
|
|
|
|
arima_predictions = tune_arima_model(stock_prices, tuner_arima, x_test, y_test) |
|
|
|
|
|
tuner_gru = create_tuner(create_gru_model, objective) |
|
tuner |