|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
import matplotlib.pyplot as plt
|
|
|
import matplotlib.dates as mdates
|
|
|
import mplfinance as mpf
|
|
|
import datetime as dt
|
|
|
import requests
|
|
|
import bs4 as bs
|
|
|
import pickle
|
|
|
import os
|
|
|
import time
|
|
|
import threading
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
import warnings
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
|
|
|
sp500_tickers = []
|
|
|
stock_data = {}
|
|
|
last_refresh = None
|
|
|
refresh_interval = 30
|
|
|
|
|
|
class StockDashboard:
|
|
|
def __init__(self):
|
|
|
self.sp500_tickers = []
|
|
|
self.stock_data = {}
|
|
|
self.last_refresh = None
|
|
|
self.refresh_interval = 30
|
|
|
|
|
|
def save_sp500_tickers(self):
|
|
|
"""Scrape and save current S&P 500 tickers from Wikipedia with fallbacks"""
|
|
|
try:
|
|
|
current_time = dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
|
|
|
|
|
tickers = []
|
|
|
|
|
|
|
|
|
try:
|
|
|
headers = {
|
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
|
|
|
}
|
|
|
resp = requests.get('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies',
|
|
|
headers=headers, timeout=10)
|
|
|
resp.raise_for_status()
|
|
|
|
|
|
|
|
|
soup = None
|
|
|
parsers = ["lxml", "html.parser", "html5lib"]
|
|
|
|
|
|
for parser in parsers:
|
|
|
try:
|
|
|
soup = bs.BeautifulSoup(resp.text, parser)
|
|
|
break
|
|
|
except Exception as parser_error:
|
|
|
print(f"Parser {parser} failed: {parser_error}")
|
|
|
continue
|
|
|
|
|
|
if soup is None:
|
|
|
raise Exception("All HTML parsers failed")
|
|
|
|
|
|
table = soup.find('table', {'id': 'constituents'})
|
|
|
|
|
|
if table:
|
|
|
for row in table.findAll('tr')[1:]:
|
|
|
cells = row.find_all('td')
|
|
|
if cells:
|
|
|
ticker = cells[0].text.strip()
|
|
|
if ticker:
|
|
|
tickers.append(ticker)
|
|
|
|
|
|
if tickers:
|
|
|
print(f"Successfully scraped {len(tickers)} tickers from Wikipedia")
|
|
|
else:
|
|
|
raise Exception("No tickers found in Wikipedia table")
|
|
|
|
|
|
except Exception as wiki_error:
|
|
|
print(f"Wikipedia scraping failed: {wiki_error}")
|
|
|
tickers = []
|
|
|
|
|
|
|
|
|
if not tickers:
|
|
|
print("Using fallback S&P 500 ticker list")
|
|
|
tickers = [
|
|
|
'AAPL', 'MSFT', 'GOOGL', 'AMZN', 'NVDA', 'TSLA', 'META', 'BRK-B', 'UNH', 'JNJ',
|
|
|
'JPM', 'PG', 'HD', 'MA', 'PFE', 'ABBV', 'BAC', 'KO', 'PEP', 'AVGO', 'TMO', 'COST',
|
|
|
'WMT', 'MRK', 'ACN', 'DHR', 'VZ', 'ADBE', 'NFLX', 'CRM', 'PYPL', 'INTC', 'QCOM',
|
|
|
'TXN', 'HON', 'NKE', 'PM', 'LOW', 'ORCL', 'IBM', 'AMD', 'RTX', 'INTU', 'SPGI',
|
|
|
'ISRG', 'GILD', 'AMAT', 'ADI', 'MDLZ', 'REGN', 'VRTX', 'KLAC', 'PANW', 'SNPS',
|
|
|
'CDNS', 'MELI', 'MU', 'ASML', 'CHTR', 'MAR', 'ORLY', 'MNST', 'PAYX', 'CTAS',
|
|
|
'ADP', 'ODFL', 'CPRT', 'ROST', 'BIIB', 'DXCM', 'ALGN', 'IDXX', 'FAST', 'SGEN',
|
|
|
'VRSK', 'WDAY', 'CTSH', 'EXC', 'XEL', 'AEP', 'SO', 'DUK', 'D', 'DTE', 'NEE',
|
|
|
'SRE', 'AEE', 'EIX', 'PEG', 'WEC', 'CMS', 'ATO', 'LNT', 'PNW', 'NI', 'BKH',
|
|
|
'CNP', 'OGS', 'NFG', 'SWX', 'UGI', 'AES', 'NRG', 'VST', 'ETR', 'FE', 'PPL',
|
|
|
'AEE', 'EIX', 'PEG', 'WEC', 'CMS', 'ATO', 'LNT', 'PNW', 'NI', 'BKH', 'CNP',
|
|
|
'OGS', 'NFG', 'SWX', 'UGI', 'AES', 'NRG', 'VST', 'ETR', 'FE', 'PPL'
|
|
|
]
|
|
|
|
|
|
|
|
|
if not tickers:
|
|
|
try:
|
|
|
import yfinance as yf
|
|
|
sp500 = yf.Ticker("^GSPC")
|
|
|
|
|
|
|
|
|
print("Using yfinance fallback ticker list")
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
if not tickers:
|
|
|
raise Exception("All methods to get S&P 500 tickers failed")
|
|
|
|
|
|
|
|
|
with open("sp500tickers.pickle", "wb") as f:
|
|
|
pickle.dump(tickers, f)
|
|
|
|
|
|
self.sp500_tickers = tickers
|
|
|
return f"Successfully loaded {len(tickers)} S&P 500 tickers at {current_time}"
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error in save_sp500_tickers: {str(e)}")
|
|
|
|
|
|
fallback_tickers = ['AAPL', 'MSFT', 'GOOGL', 'AMZN', 'TSLA', 'META', 'NVDA', 'JPM', 'JNJ', 'PG']
|
|
|
self.sp500_tickers = fallback_tickers
|
|
|
|
|
|
|
|
|
try:
|
|
|
with open("sp500tickers.pickle", "wb") as f:
|
|
|
pickle.dump(fallback_tickers, f)
|
|
|
except:
|
|
|
pass
|
|
|
|
|
|
return f"Using fallback ticker list due to error: {str(e)}"
|
|
|
|
|
|
def load_sp500_tickers(self):
|
|
|
"""Load S&P 500 tickers from pickle file"""
|
|
|
try:
|
|
|
|
|
|
if os.path.exists("sp500tickers.pickle"):
|
|
|
file_time = os.path.getmtime("sp500tickers.pickle")
|
|
|
file_age = dt.datetime.now() - dt.datetime.fromtimestamp(file_time)
|
|
|
|
|
|
with open("sp500tickers.pickle", "rb") as f:
|
|
|
self.sp500_tickers = pickle.load(f)
|
|
|
|
|
|
|
|
|
if file_age.days > 0:
|
|
|
age_str = f"{file_age.days} days ago"
|
|
|
elif file_age.seconds > 3600:
|
|
|
age_str = f"{file_age.seconds // 3600} hours ago"
|
|
|
elif file_age.seconds > 60:
|
|
|
age_str = f"{file_age.seconds // 60} minutes ago"
|
|
|
else:
|
|
|
age_str = f"{file_age.seconds} seconds ago"
|
|
|
|
|
|
return f"Loaded {len(self.sp500_tickers)} tickers from cache (last updated {age_str} from Wikipedia)"
|
|
|
else:
|
|
|
return self.save_sp500_tickers()
|
|
|
except Exception as e:
|
|
|
return self.save_sp500_tickers()
|
|
|
|
|
|
def get_stock_data(self, ticker, days=365):
|
|
|
"""Get stock data for a specific ticker using yfinance with fallbacks"""
|
|
|
try:
|
|
|
start = dt.datetime.now() - dt.timedelta(days=days)
|
|
|
end = dt.datetime.now()
|
|
|
|
|
|
|
|
|
try:
|
|
|
import yfinance as yf
|
|
|
except ImportError as e:
|
|
|
print(f"yfinance import failed: {e}")
|
|
|
return self._generate_fallback_data(ticker, days)
|
|
|
|
|
|
|
|
|
try:
|
|
|
stock = yf.Ticker(ticker)
|
|
|
df = stock.history(start=start, end=end)
|
|
|
|
|
|
if df.empty:
|
|
|
print(f"No data received for {ticker}")
|
|
|
return self._generate_fallback_data(ticker, days)
|
|
|
|
|
|
return df
|
|
|
|
|
|
except Exception as yf_error:
|
|
|
print(f"yfinance error for {ticker}: {yf_error}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
stock = yf.Ticker(ticker)
|
|
|
df = stock.history(period=f"{days}d")
|
|
|
|
|
|
if not df.empty:
|
|
|
print(f"Alternative method worked for {ticker}")
|
|
|
return df
|
|
|
|
|
|
except Exception as alt_error:
|
|
|
print(f"Alternative method also failed for {ticker}: {alt_error}")
|
|
|
|
|
|
|
|
|
return self._generate_fallback_data(ticker, days)
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error getting data for {ticker}: {str(e)}")
|
|
|
return self._generate_fallback_data(ticker, days)
|
|
|
|
|
|
def _generate_fallback_data(self, ticker, days):
|
|
|
"""Generate fallback sample data when yfinance fails"""
|
|
|
try:
|
|
|
import numpy as np
|
|
|
import pandas as pd
|
|
|
|
|
|
|
|
|
end_date = dt.datetime.now()
|
|
|
start_date = end_date - dt.timedelta(days=days)
|
|
|
dates = pd.date_range(start=start_date, end=end_date, freq='D')
|
|
|
|
|
|
|
|
|
np.random.seed(hash(ticker) % 2**32)
|
|
|
price_changes = np.random.normal(0, 0.02, len(dates))
|
|
|
prices = 100 * np.exp(np.cumsum(price_changes))
|
|
|
|
|
|
|
|
|
data = {
|
|
|
'Open': prices * (1 + np.random.normal(0, 0.005, len(dates))),
|
|
|
'High': prices * (1 + np.abs(np.random.normal(0, 0.01, len(dates)))),
|
|
|
'Low': prices * (1 - np.abs(np.random.normal(0, 0.01, len(dates)))),
|
|
|
'Close': prices,
|
|
|
'Volume': np.random.randint(1000000, 10000000, len(dates))
|
|
|
}
|
|
|
|
|
|
df = pd.DataFrame(data, index=dates)
|
|
|
|
|
|
|
|
|
df['High'] = df[['Open', 'Close', 'High']].max(axis=1)
|
|
|
df['Low'] = df[['Open', 'Close', 'Low']].min(axis=1)
|
|
|
|
|
|
print(f"Generated fallback data for {ticker}")
|
|
|
return df
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Failed to generate fallback data for {ticker}: {e}")
|
|
|
return None
|
|
|
|
|
|
def create_price_chart(self, tickers, chart_type="candlestick", days=100):
|
|
|
"""Create various types of stock price charts for multiple tickers"""
|
|
|
try:
|
|
|
if not tickers:
|
|
|
return None, "No tickers selected"
|
|
|
|
|
|
|
|
|
if isinstance(tickers, str):
|
|
|
tickers = [tickers]
|
|
|
|
|
|
|
|
|
all_data = {}
|
|
|
fallback_used = []
|
|
|
real_data_used = []
|
|
|
|
|
|
for ticker in tickers:
|
|
|
df = self.get_stock_data(ticker, days)
|
|
|
if df is not None and not df.empty:
|
|
|
all_data[ticker] = df.tail(days)
|
|
|
|
|
|
|
|
|
if 'fallback_data' in str(df.columns) or len(df) == days:
|
|
|
fallback_used.append(ticker)
|
|
|
else:
|
|
|
real_data_used.append(ticker)
|
|
|
|
|
|
if not all_data:
|
|
|
return None, "No data available for selected tickers"
|
|
|
|
|
|
|
|
|
status_msg = f"Generated {chart_type} chart for {len(tickers)} stocks"
|
|
|
if fallback_used:
|
|
|
status_msg += f" (Fallback data used for: {', '.join(fallback_used)})"
|
|
|
if real_data_used:
|
|
|
status_msg += f" (Real data used for: {', '.join(real_data_used)})"
|
|
|
|
|
|
if chart_type == "candlestick":
|
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10),
|
|
|
gridspec_kw={'height_ratios': [3, 1]})
|
|
|
|
|
|
|
|
|
colors = ['green', 'blue', 'red', 'purple', 'orange', 'brown', 'pink', 'gray']
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
base_color = colors[i % len(colors)]
|
|
|
|
|
|
|
|
|
import matplotlib.colors as mcolors
|
|
|
base_rgb = mcolors.to_rgb(base_color)
|
|
|
|
|
|
down_color = (min(1.0, base_rgb[0] * 0.8 + 0.2),
|
|
|
max(0.0, base_rgb[1] * 0.6),
|
|
|
max(0.0, base_rgb[2] * 0.6))
|
|
|
|
|
|
|
|
|
width = 0.6
|
|
|
width2 = width * 0.8
|
|
|
|
|
|
|
|
|
up = df[df.Close >= df.Open]
|
|
|
down = df[df.Close < df.Open]
|
|
|
|
|
|
|
|
|
ax1.bar(up.index, up.Close - up.Open, width, bottom=up.Open,
|
|
|
color=base_color, alpha=0.8, label=f'{ticker} (Up)')
|
|
|
ax1.bar(up.index, up.High - up.Close, width2, bottom=up.Close,
|
|
|
color=base_color, alpha=0.8)
|
|
|
ax1.bar(up.index, up.Low - up.Open, width2, bottom=up.Open,
|
|
|
color=base_color, alpha=0.8)
|
|
|
|
|
|
|
|
|
ax1.bar(down.index, down.Close - down.Open, width, bottom=down.Open,
|
|
|
color=down_color, alpha=0.8, label=f'{ticker} (Down)')
|
|
|
ax1.bar(down.index, down.High - down.Open, width2, bottom=down.Open,
|
|
|
color=down_color, alpha=0.8)
|
|
|
ax1.bar(down.index, down.Low - down.Close, width2, bottom=down.Close,
|
|
|
color=down_color, alpha=0.8)
|
|
|
|
|
|
ax1.set_title(f'Multi-Stock Candlestick Chart (Last {days} days)')
|
|
|
ax1.set_ylabel('Price ($)')
|
|
|
ax1.legend()
|
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
|
|
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
color = colors[i % len(colors)]
|
|
|
ax2.bar(df.index, df['Volume'], alpha=0.5, color=color, label=ticker)
|
|
|
|
|
|
ax2.set_ylabel('Volume')
|
|
|
ax2.set_xlabel('Date')
|
|
|
ax2.legend()
|
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
|
|
plt.tight_layout()
|
|
|
return fig, status_msg
|
|
|
|
|
|
elif chart_type == "line":
|
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10),
|
|
|
gridspec_kw={'height_ratios': [3, 1]})
|
|
|
|
|
|
|
|
|
colors = ['blue', 'red', 'green', 'purple', 'orange', 'brown', 'pink', 'gray']
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
color = colors[i % len(colors)]
|
|
|
ax1.plot(df.index, df['Close'], color=color, linewidth=2, label=ticker)
|
|
|
|
|
|
ax1.set_title(f'Multi-Stock Line Chart (Last {days} days)')
|
|
|
ax1.set_ylabel('Price ($)')
|
|
|
ax1.legend()
|
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
|
|
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
color = colors[i % len(colors)]
|
|
|
ax2.bar(df.index, df['Volume'], alpha=0.5, color=color, label=ticker)
|
|
|
|
|
|
ax2.set_ylabel('Volume')
|
|
|
ax2.set_xlabel('Date')
|
|
|
ax2.legend()
|
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
|
|
plt.tight_layout()
|
|
|
return fig, status_msg
|
|
|
|
|
|
elif chart_type == "ohlc":
|
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10),
|
|
|
gridspec_kw={'height_ratios': [3, 1]})
|
|
|
|
|
|
|
|
|
colors = ['red', 'blue', 'green', 'purple', 'orange', 'brown', 'pink', 'gray']
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
color = colors[i % len(colors)]
|
|
|
ax1.plot(df.index, df['Close'], color=color, linewidth=1, label=f'{ticker} Close', alpha=0.8)
|
|
|
ax1.plot(df.index, df['Open'], color=color, linewidth=1, alpha=0.6, linestyle='--')
|
|
|
ax1.plot(df.index, df['High'], color=color, linewidth=1, alpha=0.6, linestyle=':')
|
|
|
ax1.plot(df.index, df['Low'], color=color, linewidth=1, alpha=0.6, linestyle='-.')
|
|
|
|
|
|
ax1.set_title(f'Multi-Stock OHLC Chart (Last {days} days)')
|
|
|
ax1.set_ylabel('Price ($)')
|
|
|
ax1.legend()
|
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
|
|
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
color = colors[i % len(colors)]
|
|
|
ax2.bar(df.index, df['Volume'], alpha=0.5, color=color, label=ticker)
|
|
|
|
|
|
ax2.set_ylabel('Volume')
|
|
|
ax2.set_xlabel('Date')
|
|
|
ax2.legend()
|
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
|
|
plt.tight_layout()
|
|
|
return fig, status_msg
|
|
|
|
|
|
elif chart_type == "renko":
|
|
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10),
|
|
|
gridspec_kw={'height_ratios': [3, 1]})
|
|
|
|
|
|
|
|
|
colors = ['purple', 'blue', 'red', 'green', 'orange', 'brown', 'pink', 'gray']
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
color = colors[i % len(colors)]
|
|
|
ax1.plot(df.index, df['Close'], color=color, linewidth=2, label=ticker)
|
|
|
|
|
|
ax1.set_title(f'Multi-Stock Price Chart (Last {days} days)')
|
|
|
ax1.set_ylabel('Price ($)')
|
|
|
ax1.legend()
|
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
|
|
|
|
|
for i, (ticker, df) in enumerate(all_data.items()):
|
|
|
color = colors[i % len(colors)]
|
|
|
ax2.bar(df.index, df['Volume'], alpha=0.5, color=color, label=ticker)
|
|
|
|
|
|
ax2.set_ylabel('Volume')
|
|
|
ax2.set_xlabel('Date')
|
|
|
ax2.legend()
|
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
|
|
plt.tight_layout()
|
|
|
return fig, status_msg
|
|
|
|
|
|
except Exception as e:
|
|
|
return None, f"Error creating chart: {str(e)}"
|
|
|
|
|
|
def create_technical_analysis_chart(self, ticker, days=100):
|
|
|
"""Create chart with technical indicators"""
|
|
|
try:
|
|
|
df = self.get_stock_data(ticker, days)
|
|
|
if df is None or df.empty:
|
|
|
return None, f"No data available for {ticker}"
|
|
|
|
|
|
df = df.tail(days)
|
|
|
|
|
|
|
|
|
df['SMA_20'] = df['Close'].rolling(window=20).mean()
|
|
|
df['SMA_50'] = df['Close'].rolling(window=50).mean()
|
|
|
df['EMA_12'] = df['Close'].ewm(span=12).mean()
|
|
|
df['EMA_26'] = df['Close'].ewm(span=26).mean()
|
|
|
df['MACD'] = df['EMA_12'] - df['EMA_26']
|
|
|
df['Signal'] = df['MACD'].ewm(span=9).mean()
|
|
|
|
|
|
|
|
|
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 12),
|
|
|
gridspec_kw={'height_ratios': [3, 1, 1]})
|
|
|
|
|
|
|
|
|
ax1.plot(df.index, df['Close'], label='Close Price', linewidth=2)
|
|
|
ax1.plot(df.index, df['SMA_20'], label='20-Day SMA', alpha=0.7)
|
|
|
ax1.plot(df.index, df['SMA_50'], label='50-Day SMA', alpha=0.7)
|
|
|
ax1.set_title(f'{ticker} - Technical Analysis (Last {days} days)')
|
|
|
ax1.set_ylabel('Price ($)')
|
|
|
ax1.legend()
|
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
|
|
|
|
|
ax2.plot(df.index, df['MACD'], label='MACD', color='blue')
|
|
|
ax2.plot(df.index, df['Signal'], label='Signal', color='red')
|
|
|
ax2.bar(df.index, df['MACD'] - df['Signal'], alpha=0.3, color='gray')
|
|
|
ax2.set_ylabel('MACD')
|
|
|
ax2.legend()
|
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
|
|
|
|
|
ax3.bar(df.index, df['Volume'], alpha=0.7, color='green')
|
|
|
ax3.set_ylabel('Volume')
|
|
|
ax3.set_xlabel('Date')
|
|
|
ax3.grid(True, alpha=0.3)
|
|
|
|
|
|
plt.tight_layout()
|
|
|
return fig, f"Generated technical analysis chart for {ticker}"
|
|
|
|
|
|
except Exception as e:
|
|
|
return None, f"Error creating technical analysis chart: {str(e)}"
|
|
|
|
|
|
def create_correlation_heatmap(self, selected_tickers):
|
|
|
"""Create correlation heatmap for selected stocks"""
|
|
|
try:
|
|
|
if not selected_tickers:
|
|
|
return None, "Please select at least 2 stocks for correlation analysis"
|
|
|
|
|
|
if len(selected_tickers) < 2:
|
|
|
return None, "Please select at least 2 stocks for correlation analysis"
|
|
|
|
|
|
if len(selected_tickers) > 50:
|
|
|
return None, "Please select no more than 50 stocks for correlation analysis"
|
|
|
|
|
|
data_dict = {}
|
|
|
|
|
|
for ticker in selected_tickers:
|
|
|
df = self.get_stock_data(ticker, 365)
|
|
|
if df is not None and not df.empty:
|
|
|
data_dict[ticker] = df['Close']
|
|
|
|
|
|
if not data_dict:
|
|
|
return None, "No stock data available for correlation analysis"
|
|
|
|
|
|
|
|
|
corr_df = pd.DataFrame(data_dict).corr()
|
|
|
|
|
|
|
|
|
fig, ax = plt.subplots(figsize=(12, 10))
|
|
|
im = ax.imshow(corr_df, cmap='RdYlGn', vmin=-1, vmax=1)
|
|
|
|
|
|
|
|
|
cbar = ax.figure.colorbar(im, ax=ax)
|
|
|
cbar.ax.set_ylabel('Correlation', rotation=-90, va="bottom")
|
|
|
|
|
|
|
|
|
ax.set_xticks(range(len(corr_df.columns)))
|
|
|
ax.set_yticks(range(len(corr_df.columns)))
|
|
|
ax.set_xticklabels(corr_df.columns, rotation=45, ha='right')
|
|
|
ax.set_yticklabels(corr_df.columns)
|
|
|
|
|
|
|
|
|
for i in range(len(corr_df.columns)):
|
|
|
for j in range(len(corr_df.columns)):
|
|
|
text = ax.text(j, i, f'{corr_df.iloc[i, j]:.2f}',
|
|
|
ha="center", va="center", color="black", fontsize=8)
|
|
|
|
|
|
ax.set_title(f'Stock Correlation Heatmap ({len(selected_tickers)} Selected Stocks)')
|
|
|
plt.tight_layout()
|
|
|
|
|
|
return fig, f"Generated correlation heatmap for {len(selected_tickers)} stocks"
|
|
|
|
|
|
except Exception as e:
|
|
|
return None, f"Error creating correlation heatmap: {str(e)}"
|
|
|
|
|
|
def get_market_summary(self, selected_tickers):
|
|
|
"""Get summary statistics for selected stocks"""
|
|
|
try:
|
|
|
if not selected_tickers:
|
|
|
return "Please select stocks for market summary", "No stocks selected"
|
|
|
|
|
|
summary_data = []
|
|
|
|
|
|
for ticker in selected_tickers:
|
|
|
df = self.get_stock_data(ticker, 30)
|
|
|
if df is not None and not df.empty:
|
|
|
current_price = df['Close'].iloc[-1]
|
|
|
prev_price = df['Close'].iloc[-2] if len(df) > 1 else current_price
|
|
|
change = current_price - prev_price
|
|
|
change_pct = (change / prev_price) * 100 if prev_price != 0 else 0
|
|
|
|
|
|
summary_data.append({
|
|
|
'Ticker': ticker,
|
|
|
'Current Price': f"${current_price:.2f}",
|
|
|
'Change': f"${change:.2f}",
|
|
|
'Change %': f"{change_pct:.2f}%",
|
|
|
'Volume': f"{df['Volume'].iloc[-1]:,.0f}"
|
|
|
})
|
|
|
|
|
|
if summary_data:
|
|
|
summary_df = pd.DataFrame(summary_data)
|
|
|
return summary_df.to_html(index=False, classes='table table-striped'), f"Market summary generated for {len(selected_tickers)} stocks"
|
|
|
else:
|
|
|
return "No data available", "Unable to generate market summary"
|
|
|
|
|
|
except Exception as e:
|
|
|
return f"Error: {str(e)}", "Error generating market summary"
|
|
|
|
|
|
def refresh_data(self):
|
|
|
"""Refresh all stock data"""
|
|
|
try:
|
|
|
self.last_refresh = dt.datetime.now()
|
|
|
if not self.sp500_tickers:
|
|
|
self.load_sp500_tickers()
|
|
|
return f"Data refreshed at {self.last_refresh.strftime('%Y-%m-%d %H:%M:%S')}"
|
|
|
except Exception as e:
|
|
|
return f"Error refreshing data: {str(e)}"
|
|
|
|
|
|
|
|
|
dashboard = StockDashboard()
|
|
|
dashboard.load_sp500_tickers()
|
|
|
|
|
|
|
|
|
def update_chart(selected_tickers, chart_type, days):
|
|
|
"""Update chart based on user selection"""
|
|
|
if not selected_tickers:
|
|
|
return None, "Please select at least one stock ticker"
|
|
|
|
|
|
if chart_type == "technical":
|
|
|
|
|
|
ticker = selected_tickers[0]
|
|
|
fig, msg = dashboard.create_technical_analysis_chart(ticker, int(days))
|
|
|
else:
|
|
|
|
|
|
fig, msg = dashboard.create_price_chart(selected_tickers, chart_type, int(days))
|
|
|
|
|
|
if fig is not None:
|
|
|
return fig, msg
|
|
|
else:
|
|
|
return None, msg
|
|
|
|
|
|
def update_correlation(selected_tickers):
|
|
|
"""Update correlation heatmap"""
|
|
|
fig, msg = dashboard.create_correlation_heatmap(selected_tickers)
|
|
|
return fig, msg
|
|
|
|
|
|
def update_market_summary(selected_tickers):
|
|
|
"""Update market summary"""
|
|
|
summary_html, msg = dashboard.get_market_summary(selected_tickers)
|
|
|
return summary_html, msg
|
|
|
|
|
|
def refresh_all_data():
|
|
|
"""Refresh all data"""
|
|
|
msg = dashboard.refresh_data()
|
|
|
return msg
|
|
|
|
|
|
def auto_refresh(interval):
|
|
|
"""Set auto-refresh interval"""
|
|
|
global refresh_interval
|
|
|
refresh_interval = int(interval) / 1000
|
|
|
return f"Auto-refresh interval set to {interval} milliseconds ({refresh_interval:.1f} seconds)"
|
|
|
|
|
|
def initialize_dashboard():
|
|
|
"""Initialize dashboard on startup"""
|
|
|
return dashboard.load_sp500_tickers()
|
|
|
|
|
|
def update_ticker_choices():
|
|
|
"""Update the ticker checkbox choices with loaded tickers"""
|
|
|
if dashboard.sp500_tickers:
|
|
|
return gr.CheckboxGroup.update(choices=dashboard.sp500_tickers)
|
|
|
else:
|
|
|
return gr.CheckboxGroup.update(choices=["Loading..."])
|
|
|
|
|
|
|
|
|
with gr.Blocks(title="Stock Market Dashboard") as demo:
|
|
|
gr.Markdown("# π Real-Time Stock Market Dashboard")
|
|
|
gr.Markdown("Monitor S&P 500 stocks with live data and advanced visualizations")
|
|
|
|
|
|
with gr.Row():
|
|
|
with gr.Column(scale=1):
|
|
|
gr.Markdown("### ποΈ Dashboard Controls")
|
|
|
|
|
|
|
|
|
chart_type = gr.Dropdown(
|
|
|
choices=["candlestick", "line", "ohlc", "renko", "technical"],
|
|
|
label="Chart Type",
|
|
|
value="candlestick"
|
|
|
)
|
|
|
|
|
|
|
|
|
days_slider = gr.Slider(
|
|
|
minimum=5,
|
|
|
maximum=365,
|
|
|
value=100,
|
|
|
step=5,
|
|
|
label="Time Period (days)"
|
|
|
)
|
|
|
|
|
|
|
|
|
update_chart_btn = gr.Button("π Update Chart")
|
|
|
|
|
|
|
|
|
gr.Markdown("### β° Auto-Refresh Settings")
|
|
|
refresh_interval_input = gr.Slider(
|
|
|
minimum=500,
|
|
|
maximum=5000,
|
|
|
value=1000,
|
|
|
step=500,
|
|
|
label="Refresh Interval (milliseconds)"
|
|
|
)
|
|
|
set_refresh_btn = gr.Button("π Set Refresh Interval")
|
|
|
|
|
|
|
|
|
refresh_data_btn = gr.Button("π Refresh All Data")
|
|
|
|
|
|
|
|
|
status_output = gr.Textbox(label="Status", interactive=False)
|
|
|
|
|
|
|
|
|
gr.Markdown("### π₯ Correlation Analysis")
|
|
|
gr.Markdown("Select 2-50 stocks for correlation analysis")
|
|
|
update_corr_btn = gr.Button("π Update Correlation")
|
|
|
correlation_output = gr.Plot(label="Correlation Heatmap")
|
|
|
correlation_msg = gr.Textbox(label="Correlation Message", interactive=False)
|
|
|
|
|
|
|
|
|
gr.Markdown("### π Market Summary")
|
|
|
gr.Markdown("Select stocks for market summary")
|
|
|
update_summary_btn = gr.Button("π Update Summary")
|
|
|
summary_output = gr.HTML(label="Market Summary")
|
|
|
summary_msg = gr.Textbox(label="Summary Message", interactive=False)
|
|
|
|
|
|
with gr.Column(scale=2):
|
|
|
gr.Markdown("### π Stock Chart")
|
|
|
chart_output = gr.Plot(label="Stock Chart")
|
|
|
chart_msg = gr.Textbox(label="Chart Message", interactive=False)
|
|
|
|
|
|
|
|
|
gr.Markdown("#### π Select Stocks (S&P 500)")
|
|
|
gr.Markdown("Choose one or more stocks to analyze:")
|
|
|
|
|
|
|
|
|
with gr.Column(scale=1):
|
|
|
ticker_checkboxes = gr.CheckboxGroup(
|
|
|
choices=dashboard.sp500_tickers if dashboard.sp500_tickers else ["Loading..."],
|
|
|
label="Available S&P 500 Stocks",
|
|
|
value=[],
|
|
|
interactive=True
|
|
|
)
|
|
|
|
|
|
|
|
|
update_chart_btn.click(
|
|
|
fn=update_chart,
|
|
|
inputs=[ticker_checkboxes, chart_type, days_slider],
|
|
|
outputs=[chart_output, chart_msg]
|
|
|
)
|
|
|
|
|
|
update_corr_btn.click(
|
|
|
fn=update_correlation,
|
|
|
inputs=[ticker_checkboxes],
|
|
|
outputs=[correlation_output, correlation_msg]
|
|
|
)
|
|
|
|
|
|
update_summary_btn.click(
|
|
|
fn=update_market_summary,
|
|
|
inputs=[ticker_checkboxes],
|
|
|
outputs=[summary_output, summary_msg]
|
|
|
)
|
|
|
|
|
|
refresh_data_btn.click(
|
|
|
fn=refresh_all_data,
|
|
|
inputs=[],
|
|
|
outputs=[status_output]
|
|
|
)
|
|
|
|
|
|
set_refresh_btn.click(
|
|
|
fn=auto_refresh,
|
|
|
inputs=[refresh_interval_input],
|
|
|
outputs=[status_output]
|
|
|
)
|
|
|
|
|
|
|
|
|
demo.load(initialize_dashboard, outputs=[status_output])
|
|
|
|
|
|
|
|
|
demo.load(update_ticker_choices, outputs=[ticker_checkboxes])
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
demo.launch(share=False, server_name="0.0.0.0", server_port=7860)
|
|
|
|