Pt / app.py
Geek7's picture
Update app.py
3abff25 verified
raw
history blame
10.2 kB
import pandas as pd
import numpy as np
import yfinance as yf
import streamlit as st
from datetime import datetime, timedelta
import pytz
# Function to fetch stock data
def fetch_stock_data(ticker, start_datetime, end_datetime):
stock_data = yf.download(ticker, start=start_datetime, end=end_datetime)
return stock_data
# Function to detect head and shoulder patterns
def detect_head_shoulder(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mask_head_shoulder = (
(df['high_roll_max'] > df['High'].shift(1)) &
(df['high_roll_max'] > df['High'].shift(-1)) &
(df['High'] < df['High'].shift(1)) &
(df['High'] < df['High'].shift(-1))
)
mask_inv_head_shoulder = (
(df['low_roll_min'] < df['Low'].shift(1)) &
(df['low_roll_min'] < df['Low'].shift(-1)) &
(df['Low'] > df['Low'].shift(1)) &
(df['Low'] > df['Low'].shift(-1))
)
df['head_shoulder_pattern'] = np.nan
df.loc[mask_head_shoulder, 'head_shoulder_pattern'] = 'Head and Shoulder'
df.loc[mask_inv_head_shoulder, 'head_shoulder_pattern'] = 'Inverse Head and Shoulder'
return df
# Function to detect multiple tops and bottoms
def detect_multiple_tops_bottoms(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
df['close_roll_max'] = df['Close'].rolling(window=roll_window).max()
df['close_roll_min'] = df['Close'].rolling(window=roll_window).min()
mask_top = (df['high_roll_max'] >= df['High'].shift(1)) & (df['close_roll_max'] < df['Close'].shift(1))
mask_bottom = (df['low_roll_min'] <= df['Low'].shift(1)) & (df['close_roll_min'] > df['Close'].shift(1))
df['multiple_top_bottom_pattern'] = np.nan
df.loc[mask_top, 'multiple_top_bottom_pattern'] = 'Multiple Top'
df.loc[mask_bottom, 'multiple_top_bottom_pattern'] = 'Multiple Bottom'
return df
# Function to calculate support and resistance levels
def calculate_support_resistance(df, window=3):
roll_window = window
std_dev = 2
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mean_high = df['High'].rolling(window=roll_window).mean()
std_high = df['High'].rolling(window=roll_window).std()
mean_low = df['Low'].rolling(window=roll_window).mean()
std_low = df['Low'].rolling(window=roll_window).std()
df['support'] = mean_low - std_dev * std_low
df['resistance'] = mean_high + std_dev * std_high
return df
# Function to detect triangle patterns
def detect_triangle_pattern(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mask_asc = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['Close'] > df['Close'].shift(1))
)
mask_desc = (
(df['high_roll_max'] <= df['High'].shift(1)) &
(df['low_roll_min'] >= df['Low'].shift(1)) &
(df['Close'] < df['Close'].shift(1))
)
df['triangle_pattern'] = np.nan
df.loc[mask_asc, 'triangle_pattern'] = 'Ascending Triangle'
df.loc[mask_desc, 'triangle_pattern'] = 'Descending Triangle'
return df
# Function to detect wedge patterns
def detect_wedge(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
mask_wedge_up = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['trend_high'] == 1) &
(df['trend_low'] == 1)
)
mask_wedge_down = (
(df['high_roll_max'] <= df['High'].shift(1)) &
(df['low_roll_min'] >= df['Low'].shift(1)) &
(df['trend_high'] == -1) &
(df['trend_low'] == -1)
)
df['wedge_pattern'] = np.nan
df.loc[mask_wedge_up, 'wedge_pattern'] = 'Wedge Up'
df.loc[mask_wedge_down, 'wedge_pattern'] = 'Wedge Down'
return df
# Function to detect channel patterns
def detect_channel(df, window=3):
roll_window = window
channel_range = 0.1
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) <0 else 0)
mask_channel_up = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) &
(df['trend_high'] == 1) &
(df['trend_low'] == 1)
)
mask_channel_down = (
(df['high_roll_max'] <= df['High'].shift(1)) &
(df['low_roll_min'] >= df['Low'].shift(1)) &
(df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) &
(df['trend_high'] == -1) &
(df['trend_low'] == -1)
)
df['channel_pattern'] = np.nan
df.loc[mask_channel_up, 'channel_pattern'] = 'Channel Up'
df.loc[mask_channel_down, 'channel_pattern'] = 'Channel Down'
return df
# Function to detect double top and bottom patterns
def detect_double_top_bottom(df, window=3, threshold=0.05):
roll_window = window
range_threshold = threshold
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mask_double_top = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['high_roll_max'] >= df['High'].shift(-1)) &
(df['High'] < df['High'].shift(1)) &
(df['High'] < df['High'].shift(-1)) &
((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) &
((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2)
)
mask_double_bottom = (
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(-1)) &
(df['Low'] > df['Low'].shift(1)) &
(df['Low'] > df['Low'].shift(-1)) &
((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) &
((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2)
)
df['double_pattern'] = np.nan
df.loc[mask_double_top, 'double_pattern'] = 'Double Top'
df.loc[mask_double_bottom, 'double_pattern'] = 'Double Bottom'
return df
# Function to detect trendlines
def detect_trendline(df, window=2):
roll_window = window
df['slope'] = np.nan
df['intercept'] = np.nan
for i in range(window, len(df)):
x = np.array(range(i-window, i))
y = df['Close'][i-window:i]
A = np.vstack([x, np.ones(len(x))]).T
m, c = np.linalg.lstsq(A, y, rcond=None)[0]
df.at[df.index[i], 'slope'] = m
df.at[df.index[i], 'intercept'] = c
mask_support = df['slope'] > 0
mask_resistance = df['slope'] < 0
df['support'] = np.nan
df['resistance'] = np.nan
df.loc[mask_support, 'support'] = df['Close'] * df['slope'] + df['intercept']
df.loc[mask_resistance, 'resistance'] = df['Close'] * df['slope'] + df['intercept']
return df
# Function to find pivots
def find_pivots(df):
high_diffs = df['High'].diff()
low_diffs = df['Low'].diff()
higher_high_mask = (high_diffs > 0) & (high_diffs.shift(-1) < 0)
lower_low_mask = (low_diffs < 0) & (low_diffs.shift(-1) > 0)
lower_high_mask = (high_diffs < 0) & (high_diffs.shift(-1) > 0)
higher_low_mask = (low_diffs > 0) & (low_diffs.shift(-1) < 0)
df['signal'] = ''
df.loc[higher_high_mask, 'signal'] = 'HH'
df.loc[lower_low_mask, 'signal'] = 'LL'
df.loc[lower_high_mask, 'signal'] = 'LH'
df.loc[higher_low_mask, 'signal'] = 'HL'
return df
# Streamlit App
def main():
st.title('Stock Pattern Detection App')
ticker = st.text_input('Enter Stock Ticker:', 'AAPL')
start_date = st.date_input('Start Date', pd.to_datetime('2020-01-01'))
end_date = st.date_input('End Date', pd.to_datetime('2022-01-01'))
st.info("Select Preferred Timezone:")
preferred_timezone = st.selectbox('Timezone', list(pytz.all_timezones))
start_time = st.time_input('Select Start Time', datetime(2022, 1, 1, 0, 0))
end_time = st.time_input('Select End Time', datetime(2022, 1, 1, 23, 59))
start_datetime = datetime.combine(start_date, start_time)
end_datetime = datetime.combine(end_date, end_time)
start_datetime = pytz.timezone(preferred_timezone).localize(start_datetime)
end_datetime = pytz.timezone(preferred_timezone).localize(end_datetime)
if st.button('Detect Patterns'):
stock_data = fetch_stock_data(ticker, start_datetime, end_datetime)
stock_data = detect_head_shoulder(stock_data)
stock_data = detect_multiple_tops_bottoms(stock_data)
stock_data = calculate_support_resistance(stock_data)
stock_data = detect_triangle_pattern(stock_data)
stock_data = detect_wedge(stock_data)
stock_data = detect_channel(stock_data)
stock_data = detect_double_top_bottom(stock_data)
stock_data = detect_trendline(stock_data)
stock_data = find_pivots(stock_data)
st.write(stock_data)
if __name__ == "__main__":
main()