import os import glob import pandas as pd import numpy as np import gradio as gr from ta.momentum import RSIIndicator from ta.trend import MACD, SMAIndicator from ta.volatility import BollingerBands import lightgbm as lgb # ============================== # CONFIG # ============================== DATA_FOLDER = r"D:\Internship_Project\Crypto_Data_Tracker" # ============================== # LOAD & PREPARE DATA # ============================== def load_crypto_data(): csv_files = glob.glob(os.path.join(DATA_FOLDER, "*.csv")) all_data = [] for file in csv_files: coin_name = os.path.basename(file).replace('.csv', '') temp_df = pd.read_csv(file) temp_df['Coin'] = coin_name all_data.append(temp_df) df = pd.concat(all_data, ignore_index=True) # Detect date and price columns date_col = next((c for c in df.columns if 'date' in c.lower()), None) price_col = next((c for c in df.columns if 'close' in c.lower()), None) coin_col = 'Coin' df[date_col] = pd.to_datetime(df[date_col], errors='coerce') df = df.dropna(subset=[date_col]) # Add technical indicators def add_indicators(g): g = g.sort_values(by=date_col).copy() g['Daily_Return'] = g[price_col].pct_change() g['SMA_20'] = SMAIndicator(g[price_col], window=20).sma_indicator() g['RSI'] = RSIIndicator(g[price_col], window=14).rsi() macd = MACD(g[price_col]) g['MACD'] = macd.macd() g['MACD_Signal'] = macd.macd_signal() return g df = df.groupby(coin_col, group_keys=False).apply(add_indicators) return df, price_col, date_col, coin_col # ============================== # SIMPLE CRYPTO PREDICTOR # ============================== class SimpleCryptoPredictor: def __init__(self, df, price_col, date_col, coin_col): self.df = df.copy() self.price_col = price_col self.date_col = date_col self.coin_col = coin_col self.model = None self.available_coins = [] self.feature_columns = [] def initialize(self): coin_counts = self.df[self.coin_col].value_counts() self.available_coins = coin_counts[coin_counts >= 50].index.tolist() self._train_model() def _train_model(self): features_list = [] for coin in self.available_coins[:20]: coin_data = self.df[self.df[self.coin_col] == coin].copy() if len(coin_data) < 100: continue features_df = self._create_features(coin_data, include_target=True) if len(features_df) > 0: features_list.append(features_df) all_features = pd.concat(features_list, ignore_index=True) feature_cols = ['return_1d', 'return_3d', 'return_7d', 'rsi_norm', 'vol_7d', 'sma_signal', 'return_lag1', 'vol_lag1'] available_cols = [c for c in feature_cols if c in all_features.columns] X = all_features[available_cols].copy() y = all_features['target_return'].copy() mask = ~(X.isna().any(axis=1) | y.isna()) X = X[mask] y = y[mask] self.model = lgb.LGBMRegressor(n_estimators=100, max_depth=6, learning_rate=0.1, random_state=42) self.model.fit(X, y) self.feature_columns = available_cols def _create_features(self, coin_data, include_target=False): coin_data = coin_data.sort_values(self.date_col).copy() if len(coin_data) < 30: return pd.DataFrame() coin_data['return_1d'] = coin_data[self.price_col].pct_change(1) * 100 coin_data['return_3d'] = coin_data[self.price_col].pct_change(3) * 100 coin_data['return_7d'] = coin_data[self.price_col].pct_change(7) * 100 coin_data['rsi_norm'] = (coin_data['RSI'] - 50) / 50 coin_data['vol_7d'] = coin_data['return_1d'].rolling(7).std() coin_data['sma_20'] = coin_data[self.price_col].rolling(20).mean() coin_data['sma_signal'] = np.where(coin_data[self.price_col] > coin_data['sma_20'], 1, -1) coin_data['return_lag1'] = coin_data['return_1d'].shift(1) coin_data['vol_lag1'] = coin_data['vol_7d'].shift(1) if include_target: coin_data['price_future'] = coin_data[self.price_col].shift(-1) coin_data['target_return'] = ((coin_data['price_future'] - coin_data[self.price_col]) / coin_data[self.price_col] * 100) coin_data = coin_data.replace([np.inf, -np.inf], np.nan) return coin_data def predict_coin(self, coin_name): if coin_name not in self.available_coins: return f"Coin '{coin_name}' not found." coin_data = self.df[self.df[self.coin_col] == coin_name].copy() features_df = self._create_features(coin_data) latest = features_df.iloc[-1] feature_values = [latest.get(c, 0) for c in self.feature_columns] pred_return = self.model.predict([feature_values])[0] price = latest.get(self.price_col, 0) if pred_return > 3: rec = "STRONG BUY 🟢" elif pred_return > 1: rec = "BUY 🟢" elif pred_return > -1: rec = "HOLD 🟡" elif pred_return > -3: rec = "SELL 🔴" else: rec = "STRONG SELL 🔴" return f"{coin_name}: Price=${price:.4f}, Predicted Return={pred_return:+.2f}%, Recommendation={rec}" def find_opportunities(self, top_n=10): predictions = [] for coin in self.available_coins: coin_data = self.df[self.df[self.coin_col] == coin].copy() features_df = self._create_features(coin_data) if len(features_df) == 0: continue latest = features_df.iloc[-1] feature_values = [latest.get(c, 0) for c in self.feature_columns] pred_return = self.model.predict([feature_values])[0] predictions.append((coin, latest.get(self.price_col, 0), pred_return)) predictions.sort(key=lambda x: x[2], reverse=True) return pd.DataFrame(predictions[:top_n], columns=['Coin', 'Price', 'Predicted Return %']) # ============================== # INIT # ============================== df, price_col, date_col, coin_col = load_crypto_data() predictor = SimpleCryptoPredictor(df, price_col, date_col, coin_col) predictor.initialize() # ============================== # GRADIO APP # ============================== def predict_single(coin): return predictor.predict_coin(coin) def top_opportunities(n): df_top = predictor.find_opportunities(int(n)) return df_top coin_dropdown = gr.Dropdown(choices=predictor.available_coins, label="Select Coin") top_n_slider = gr.Slider(1, 20, value=10, step=1, label="Top N Opportunities") with gr.Blocks() as demo: gr.Markdown("## 📈 Crypto Prediction Dashboard") with gr.Row(): with gr.Column(): gr.Markdown("### Single Coin Prediction") coin_input = gr.Dropdown(choices=predictor.available_coins, label="Select Coin") predict_btn = gr.Button("Predict") prediction_output = gr.Textbox(label="Prediction Result") with gr.Column(): gr.Markdown("### Top Opportunities") top_n_input = gr.Slider(1, 20, value=10, step=1, label="Top N Opportunities") top_btn = gr.Button("Find Opportunities") table_output = gr.Dataframe(headers=["Coin", "Price", "Predicted Return %"]) predict_btn.click(predict_single, inputs=coin_input, outputs=prediction_output) top_btn.click(top_opportunities, inputs=top_n_input, outputs=table_output) if __name__ == "__main__": demo.launch()