Spaces:
Runtime error
Runtime error
import os | |
import glob | |
import pandas as pd | |
import numpy as np | |
import gradio as gr | |
from ta.momentum import RSIIndicator | |
from ta.trend import MACD, SMAIndicator | |
from ta.volatility import BollingerBands | |
import lightgbm as lgb | |
# ============================== | |
# CONFIG | |
# ============================== | |
DATA_FOLDER = r"D:\Internship_Project\Crypto_Data_Tracker" | |
# ============================== | |
# LOAD & PREPARE DATA | |
# ============================== | |
def load_crypto_data(): | |
csv_files = glob.glob(os.path.join(DATA_FOLDER, "*.csv")) | |
all_data = [] | |
for file in csv_files: | |
coin_name = os.path.basename(file).replace('.csv', '') | |
temp_df = pd.read_csv(file) | |
temp_df['Coin'] = coin_name | |
all_data.append(temp_df) | |
df = pd.concat(all_data, ignore_index=True) | |
# Detect date and price columns | |
date_col = next((c for c in df.columns if 'date' in c.lower()), None) | |
price_col = next((c for c in df.columns if 'close' in c.lower()), None) | |
coin_col = 'Coin' | |
df[date_col] = pd.to_datetime(df[date_col], errors='coerce') | |
df = df.dropna(subset=[date_col]) | |
# Add technical indicators | |
def add_indicators(g): | |
g = g.sort_values(by=date_col).copy() | |
g['Daily_Return'] = g[price_col].pct_change() | |
g['SMA_20'] = SMAIndicator(g[price_col], window=20).sma_indicator() | |
g['RSI'] = RSIIndicator(g[price_col], window=14).rsi() | |
macd = MACD(g[price_col]) | |
g['MACD'] = macd.macd() | |
g['MACD_Signal'] = macd.macd_signal() | |
return g | |
df = df.groupby(coin_col, group_keys=False).apply(add_indicators) | |
return df, price_col, date_col, coin_col | |
# ============================== | |
# SIMPLE CRYPTO PREDICTOR | |
# ============================== | |
class SimpleCryptoPredictor: | |
def __init__(self, df, price_col, date_col, coin_col): | |
self.df = df.copy() | |
self.price_col = price_col | |
self.date_col = date_col | |
self.coin_col = coin_col | |
self.model = None | |
self.available_coins = [] | |
self.feature_columns = [] | |
def initialize(self): | |
coin_counts = self.df[self.coin_col].value_counts() | |
self.available_coins = coin_counts[coin_counts >= 50].index.tolist() | |
self._train_model() | |
def _train_model(self): | |
features_list = [] | |
for coin in self.available_coins[:20]: | |
coin_data = self.df[self.df[self.coin_col] == coin].copy() | |
if len(coin_data) < 100: | |
continue | |
features_df = self._create_features(coin_data, include_target=True) | |
if len(features_df) > 0: | |
features_list.append(features_df) | |
all_features = pd.concat(features_list, ignore_index=True) | |
feature_cols = ['return_1d', 'return_3d', 'return_7d', 'rsi_norm', | |
'vol_7d', 'sma_signal', 'return_lag1', 'vol_lag1'] | |
available_cols = [c for c in feature_cols if c in all_features.columns] | |
X = all_features[available_cols].copy() | |
y = all_features['target_return'].copy() | |
mask = ~(X.isna().any(axis=1) | y.isna()) | |
X = X[mask] | |
y = y[mask] | |
self.model = lgb.LGBMRegressor(n_estimators=100, max_depth=6, learning_rate=0.1, random_state=42) | |
self.model.fit(X, y) | |
self.feature_columns = available_cols | |
def _create_features(self, coin_data, include_target=False): | |
coin_data = coin_data.sort_values(self.date_col).copy() | |
if len(coin_data) < 30: | |
return pd.DataFrame() | |
coin_data['return_1d'] = coin_data[self.price_col].pct_change(1) * 100 | |
coin_data['return_3d'] = coin_data[self.price_col].pct_change(3) * 100 | |
coin_data['return_7d'] = coin_data[self.price_col].pct_change(7) * 100 | |
coin_data['rsi_norm'] = (coin_data['RSI'] - 50) / 50 | |
coin_data['vol_7d'] = coin_data['return_1d'].rolling(7).std() | |
coin_data['sma_20'] = coin_data[self.price_col].rolling(20).mean() | |
coin_data['sma_signal'] = np.where(coin_data[self.price_col] > coin_data['sma_20'], 1, -1) | |
coin_data['return_lag1'] = coin_data['return_1d'].shift(1) | |
coin_data['vol_lag1'] = coin_data['vol_7d'].shift(1) | |
if include_target: | |
coin_data['price_future'] = coin_data[self.price_col].shift(-1) | |
coin_data['target_return'] = ((coin_data['price_future'] - coin_data[self.price_col]) / coin_data[self.price_col] * 100) | |
coin_data = coin_data.replace([np.inf, -np.inf], np.nan) | |
return coin_data | |
def predict_coin(self, coin_name): | |
if coin_name not in self.available_coins: | |
return f"Coin '{coin_name}' not found." | |
coin_data = self.df[self.df[self.coin_col] == coin_name].copy() | |
features_df = self._create_features(coin_data) | |
latest = features_df.iloc[-1] | |
feature_values = [latest.get(c, 0) for c in self.feature_columns] | |
pred_return = self.model.predict([feature_values])[0] | |
price = latest.get(self.price_col, 0) | |
if pred_return > 3: | |
rec = "STRONG BUY π’" | |
elif pred_return > 1: | |
rec = "BUY π’" | |
elif pred_return > -1: | |
rec = "HOLD π‘" | |
elif pred_return > -3: | |
rec = "SELL π΄" | |
else: | |
rec = "STRONG SELL π΄" | |
return f"{coin_name}: Price=${price:.4f}, Predicted Return={pred_return:+.2f}%, Recommendation={rec}" | |
def find_opportunities(self, top_n=10): | |
predictions = [] | |
for coin in self.available_coins: | |
coin_data = self.df[self.df[self.coin_col] == coin].copy() | |
features_df = self._create_features(coin_data) | |
if len(features_df) == 0: | |
continue | |
latest = features_df.iloc[-1] | |
feature_values = [latest.get(c, 0) for c in self.feature_columns] | |
pred_return = self.model.predict([feature_values])[0] | |
predictions.append((coin, latest.get(self.price_col, 0), pred_return)) | |
predictions.sort(key=lambda x: x[2], reverse=True) | |
return pd.DataFrame(predictions[:top_n], columns=['Coin', 'Price', 'Predicted Return %']) | |
# ============================== | |
# INIT | |
# ============================== | |
df, price_col, date_col, coin_col = load_crypto_data() | |
predictor = SimpleCryptoPredictor(df, price_col, date_col, coin_col) | |
predictor.initialize() | |
# ============================== | |
# GRADIO APP | |
# ============================== | |
def predict_single(coin): | |
return predictor.predict_coin(coin) | |
def top_opportunities(n): | |
df_top = predictor.find_opportunities(int(n)) | |
return df_top | |
coin_dropdown = gr.Dropdown(choices=predictor.available_coins, label="Select Coin") | |
top_n_slider = gr.Slider(1, 20, value=10, step=1, label="Top N Opportunities") | |
with gr.Blocks() as demo: | |
gr.Markdown("## π Crypto Prediction Dashboard") | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Single Coin Prediction") | |
coin_input = gr.Dropdown(choices=predictor.available_coins, label="Select Coin") | |
predict_btn = gr.Button("Predict") | |
prediction_output = gr.Textbox(label="Prediction Result") | |
with gr.Column(): | |
gr.Markdown("### Top Opportunities") | |
top_n_input = gr.Slider(1, 20, value=10, step=1, label="Top N Opportunities") | |
top_btn = gr.Button("Find Opportunities") | |
table_output = gr.Dataframe(headers=["Coin", "Price", "Predicted Return %"]) | |
predict_btn.click(predict_single, inputs=coin_input, outputs=prediction_output) | |
top_btn.click(top_opportunities, inputs=top_n_input, outputs=table_output) | |
if __name__ == "__main__": | |
demo.launch() | |