import pandas as pd import numpy as np import yfinance as yf import streamlit as st from datetime import datetime, timedelta import pytz # Function to fetch stock data def fetch_stock_data(ticker, start_datetime, end_datetime): stock_data = yf.download(ticker, start=start_datetime, end=end_datetime) return stock_data def fetch_stock_data(ticker, start_datetime, end_datetime): # Fetch live data using yfinance stock_data = yf.download(ticker, start=start_datetime, end=end_datetime) # Convert DatetimeIndex to a column df.reset_index(inplace=True) return stock_data # Function to detect head and shoulder patterns def detect_head_shoulder(df, window=3): roll_window = window df['high_roll_max'] = df['High'].rolling(window=roll_window).max() df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() mask_head_shoulder = ( (df['high_roll_max'] > df['High'].shift(1)) & (df['high_roll_max'] > df['High'].shift(-1)) & (df['High'] < df['High'].shift(1)) & (df['High'] < df['High'].shift(-1)) ) mask_inv_head_shoulder = ( (df['low_roll_min'] < df['Low'].shift(1)) & (df['low_roll_min'] < df['Low'].shift(-1)) & (df['Low'] > df['Low'].shift(1)) & (df['Low'] > df['Low'].shift(-1)) ) df['head_shoulder_pattern'] = np.nan df.loc[mask_head_shoulder, 'head_shoulder_pattern'] = 'Head and Shoulder' df.loc[mask_inv_head_shoulder, 'head_shoulder_pattern'] = 'Inverse Head and Shoulder' return df # Function to detect multiple tops and bottoms def detect_multiple_tops_bottoms(df, window=3): roll_window = window df['high_roll_max'] = df['High'].rolling(window=roll_window).max() df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() df['close_roll_max'] = df['Close'].rolling(window=roll_window).max() df['close_roll_min'] = df['Close'].rolling(window=roll_window).min() mask_top = (df['high_roll_max'] >= df['High'].shift(1)) & (df['close_roll_max'] < df['Close'].shift(1)) mask_bottom = (df['low_roll_min'] <= df['Low'].shift(1)) & (df['close_roll_min'] > df['Close'].shift(1)) df['multiple_top_bottom_pattern'] = np.nan df.loc[mask_top, 'multiple_top_bottom_pattern'] = 'Multiple Top' df.loc[mask_bottom, 'multiple_top_bottom_pattern'] = 'Multiple Bottom' return df # Function to calculate support and resistance levels def calculate_support_resistance(df, window=3): roll_window = window std_dev = 2 df['high_roll_max'] = df['High'].rolling(window=roll_window).max() df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() mean_high = df['High'].rolling(window=roll_window).mean() std_high = df['High'].rolling(window=roll_window).std() mean_low = df['Low'].rolling(window=roll_window).mean() std_low = df['Low'].rolling(window=roll_window).std() df['support'] = mean_low - std_dev * std_low df['resistance'] = mean_high + std_dev * std_high return df # Function to detect triangle patterns def detect_triangle_pattern(df, window=3): roll_window = window df['high_roll_max'] = df['High'].rolling(window=roll_window).max() df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() mask_asc = ( (df['high_roll_max'] >= df['High'].shift(1)) & (df['low_roll_min'] <= df['Low'].shift(1)) & (df['Close'] > df['Close'].shift(1)) ) mask_desc = ( (df['high_roll_max'] <= df['High'].shift(1)) & (df['low_roll_min'] >= df['Low'].shift(1)) & (df['Close'] < df['Close'].shift(1)) ) df['triangle_pattern'] = np.nan df.loc[mask_asc, 'triangle_pattern'] = 'Ascending Triangle' df.loc[mask_desc, 'triangle_pattern'] = 'Descending Triangle' return df # Function to detect wedge patterns def detect_wedge(df, window=3): roll_window = window df['high_roll_max'] = df['High'].rolling(window=roll_window).max() df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0) df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0) mask_wedge_up = ( (df['high_roll_max'] >= df['High'].shift(1)) & (df['low_roll_min'] <= df['Low'].shift(1)) & (df['trend_high'] == 1) & (df['trend_low'] == 1) ) mask_wedge_down = ( (df['high_roll_max'] <= df['High'].shift(1)) & (df['low_roll_min'] >= df['Low'].shift(1)) & (df['trend_high'] == -1) & (df['trend_low'] == -1) ) df['wedge_pattern'] = np.nan df.loc[mask_wedge_up, 'wedge_pattern'] = 'Wedge Up' df.loc[mask_wedge_down, 'wedge_pattern'] = 'Wedge Down' return df # Function to detect channel patterns def detect_channel(df, window=3): roll_window = window channel_range = 0.1 df['high_roll_max'] = df['High'].rolling(window=roll_window).max() df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0) df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) <0 else 0) mask_channel_up = ( (df['high_roll_max'] >= df['High'].shift(1)) & (df['low_roll_min'] <= df['Low'].shift(1)) & (df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) & (df['trend_high'] == 1) & (df['trend_low'] == 1) ) mask_channel_down = ( (df['high_roll_max'] <= df['High'].shift(1)) & (df['low_roll_min'] >= df['Low'].shift(1)) & (df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) & (df['trend_high'] == -1) & (df['trend_low'] == -1) ) df['channel_pattern'] = np.nan df.loc[mask_channel_up, 'channel_pattern'] = 'Channel Up' df.loc[mask_channel_down, 'channel_pattern'] = 'Channel Down' return df # Function to detect double top and bottom patterns def detect_double_top_bottom(df, window=3, threshold=0.05): roll_window = window range_threshold = threshold df['high_roll_max'] = df['High'].rolling(window=roll_window).max() df['low_roll_min'] = df['Low'].rolling(window=roll_window).min() mask_double_top = ( (df['high_roll_max'] >= df['High'].shift(1)) & (df['high_roll_max'] >= df['High'].shift(-1)) & (df['High'] < df['High'].shift(1)) & (df['High'] < df['High'].shift(-1)) & ((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) & ((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2) ) mask_double_bottom = ( (df['low_roll_min'] <= df['Low'].shift(1)) & (df['low_roll_min'] <= df['Low'].shift(-1)) & (df['Low'] > df['Low'].shift(1)) & (df['Low'] > df['Low'].shift(-1)) & ((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) & ((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2) ) df['double_pattern'] = np.nan df.loc[mask_double_top, 'double_pattern'] = 'Double Top' df.loc[mask_double_bottom, 'double_pattern'] = 'Double Bottom' return df # Function to detect trendlines def detect_trendline(df, window=2): roll_window = window df['slope'] = np.nan df['intercept'] = np.nan for i in range(window, len(df)): x = np.array(range(i-window, i)) y = df['Close'][i-window:i] A = np.vstack([x, np.ones(len(x))]).T m, c = np.linalg.lstsq(A, y, rcond=None)[0] df.at[df.index[i], 'slope'] = m df.at[df.index[i], 'intercept'] = c mask_support = df['slope'] > 0 mask_resistance = df['slope'] < 0 df['support'] = np.nan df['resistance'] = np.nan df.loc[mask_support, 'support'] = df['Close'] * df['slope'] + df['intercept'] df.loc[mask_resistance, 'resistance'] = df['Close'] * df['slope'] + df['intercept'] return df # Function to find pivots def find_pivots(df): high_diffs = df['High'].diff() low_diffs = df['Low'].diff() higher_high_mask = (high_diffs > 0) & (high_diffs.shift(-1) < 0) lower_low_mask = (low_diffs < 0) & (low_diffs.shift(-1) > 0) lower_high_mask = (high_diffs < 0) & (high_diffs.shift(-1) > 0) higher_low_mask = (low_diffs > 0) & (low_diffs.shift(-1) < 0) df['signal'] = '' df.loc[higher_high_mask, 'signal'] = 'HH' df.loc[lower_low_mask, 'signal'] = 'LL' df.loc[lower_high_mask, 'signal'] = 'LH' df.loc[higher_low_mask, 'signal'] = 'HL' return df # Streamlit App def main(): st.title('Live Stock Pattern Detection App') ticker = st.text_input('Enter Stock Ticker:', 'AAPL') # Placeholder for start and end date start_date_placeholder = st.empty() end_date_placeholder = st.empty() start_date = start_date_placeholder.date_input('Start Date', pd.to_datetime('2022-01-01')) end_date = end_date_placeholder.date_input('End Date', pd.to_datetime('2022-02-01')) st.info("Select Preferred Timezone:") preferred_timezone = st.selectbox('Timezone', list(pytz.all_timezones)) # Time selection components start_hour, start_minute, start_second = st.slider('Select Start Time', 0, 23, 0), st.slider('', 0, 59, 0), st.slider('', 0, 59, 0) end_hour, end_minute, end_second = st.slider('Select End Time', 0, 23, 23), st.slider('', 0, 59, 59), st.slider('', 0, 59, 59) start_datetime = datetime(start_date.year, start_date.month, start_date.day, start_hour, start_minute, start_second) end_datetime = datetime(end_date.year, end_date.month, end_date.day, end_hour, end_minute, end_second) start_datetime = pytz.timezone(preferred_timezone).localize(start_datetime) end_datetime = pytz.timezone(preferred_timezone).localize(end_datetime) start_date_placeholder.info(f"Start Date (UTC): {start_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}") end_date_placeholder.info(f"End Date (UTC): {end_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}") if st.button('Detect Patterns'): stock_data = fetch_stock_data(ticker, start_datetime, end_datetime) stock_data = detect_head_shoulder(stock_data) stock_data = detect_multiple_tops_bottoms(stock_data) stock_data = calculate_support_resistance(stock_data) stock_data = detect_triangle_pattern(stock_data) stock_data = detect_wedge(stock_data) stock_data = detect_channel(stock_data) stock_data = detect_double_top_bottom(stock_data) stock_data = detect_trendline(stock_data) stock_data = find_pivots(stock_data) st.write(stock_data) if __name__ == "__main__": main()