""" Cross-Asset Arbitrage Engine with Transformer Models Author: Spencer Purdy Description: A sophisticated arbitrage engine leveraging transformer models for price forecasting across multiple asset classes and venues. Integrates CEX/DEX venues with latency-aware execution, LLM-driven optimization, and comprehensive risk analytics. """ # Install required packages # !pip install -q transformers torch numpy pandas scikit-learn plotly gradio scipy statsmodels networkx # Core imports import numpy as np import pandas as pd import torch import torch.nn as nn import torch.optim as optim from datetime import datetime, timedelta import gradio as gr import plotly.graph_objects as go import plotly.express as px from plotly.subplots import make_subplots import json import random from typing import Dict, List, Tuple, Optional, Any, Union from dataclasses import dataclass, field from collections import defaultdict, deque import warnings warnings.filterwarnings('ignore') # Additional imports from scipy import stats from scipy.optimize import minimize from sklearn.preprocessing import StandardScaler import networkx as nx # Set random seeds for reproducibility np.random.seed(42) torch.manual_seed(42) random.seed(42) # Configuration constants TRADING_DAYS_PER_YEAR = 365 # Crypto markets trade 24/7 RISK_FREE_RATE = 0.045 # Current risk-free rate TRANSACTION_COST_CEX = 0.001 # 0.1% for centralized exchanges TRANSACTION_COST_DEX = 0.003 # 0.3% for decentralized exchanges GAS_COST_USD = 5.0 # Average gas cost for DEX transactions MIN_PROFIT_THRESHOLD = 0.002 # 0.2% minimum profit after costs MAX_POSITION_SIZE = 100000 # Maximum position size in USD LATENCY_CEX_MS = 50 # Average CEX latency in milliseconds LATENCY_DEX_MS = 1000 # Average DEX latency (block time) # Asset configuration ASSET_CLASSES = { 'crypto_spot': ['BTC', 'ETH', 'SOL', 'MATIC', 'AVAX'], 'crypto_futures': ['BTC-PERP', 'ETH-PERP', 'SOL-PERP'], 'fx_pairs': ['EUR/USD', 'GBP/USD', 'USD/JPY', 'AUD/USD'], 'equity_etfs': ['SPY', 'QQQ', 'IWM', 'EFA', 'EEM'] } # Exchange configuration EXCHANGES = { 'cex': ['Binance', 'Coinbase', 'Kraken', 'FTX'], 'dex': ['Uniswap_V3', 'SushiSwap', 'Curve', 'Balancer'] } @dataclass class OrderBook: """Order book data structure for managing bid/ask levels""" exchange: str asset: str timestamp: datetime bids: List[Tuple[float, float]] # List of (price, size) tuples asks: List[Tuple[float, float]] # List of (price, size) tuples def get_best_bid(self) -> Tuple[float, float]: """Get best bid price and size""" return self.bids[0] if self.bids else (0.0, 0.0) def get_best_ask(self) -> Tuple[float, float]: """Get best ask price and size""" return self.asks[0] if self.asks else (float('inf'), 0.0) def get_mid_price(self) -> float: """Calculate mid price from best bid and ask""" bid, _ = self.get_best_bid() ask, _ = self.get_best_ask() return (bid + ask) / 2 if bid > 0 and ask < float('inf') else 0.0 @dataclass class ArbitrageOpportunity: """Data structure for storing identified arbitrage opportunities""" opportunity_id: str asset: str buy_exchange: str sell_exchange: str buy_price: float sell_price: float max_size: float expected_profit: float expected_profit_pct: float latency_risk: float timestamp: datetime metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class ExecutionResult: """Data structure for storing execution results""" opportunity_id: str success: bool executed_size: float buy_fill_price: float sell_fill_price: float realized_profit: float slippage: float latency_ms: float gas_cost: float timestamp: datetime class NumericalTransformer(nn.Module): """Transformer architecture adapted for numerical price prediction with training capability""" def __init__(self, input_dim: int = 10, hidden_dim: int = 128, num_heads: int = 8, num_layers: int = 4, prediction_horizon: int = 5): super().__init__() self.input_dim = input_dim self.hidden_dim = hidden_dim self.prediction_horizon = prediction_horizon # Input projection layer self.input_projection = nn.Linear(input_dim, hidden_dim) # Positional encoding for sequence position information self.positional_encoding = self._create_positional_encoding(1000, hidden_dim) # Transformer encoder for feature extraction encoder_layer = nn.TransformerEncoderLayer( d_model=hidden_dim, nhead=num_heads, dim_feedforward=hidden_dim * 4, dropout=0.1, activation='gelu', batch_first=True ) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) # Output heads for different prediction targets self.price_head = nn.Linear(hidden_dim, prediction_horizon) self.volatility_head = nn.Linear(hidden_dim, prediction_horizon) self.volume_head = nn.Linear(hidden_dim, prediction_horizon) self.uncertainty_head = nn.Linear(hidden_dim, prediction_horizon) # Initialize weights self._init_weights() def _init_weights(self): """Initialize model weights using Xavier initialization""" for module in self.modules(): if isinstance(module, nn.Linear): nn.init.xavier_uniform_(module.weight) if module.bias is not None: nn.init.zeros_(module.bias) def _create_positional_encoding(self, max_len: int, d_model: int) -> nn.Parameter: """Create sinusoidal positional encoding""" pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) return nn.Parameter(pe.unsqueeze(0), requires_grad=False) def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> Dict[str, torch.Tensor]: """ Forward pass through the transformer Args: x: Input tensor of shape (batch_size, seq_len, input_dim) mask: Optional attention mask Returns: Dictionary containing price, volatility, volume predictions and uncertainty """ batch_size, seq_len, _ = x.shape # Project input to hidden dimension x = self.input_projection(x) # Add positional encoding x = x + self.positional_encoding[:, :seq_len, :] # Pass through transformer encoder encoded = self.transformer(x, src_key_padding_mask=mask) # Use last sequence position for prediction last_hidden = encoded[:, -1, :] # Generate predictions from specialized heads price_pred = self.price_head(last_hidden) volatility_pred = torch.sigmoid(self.volatility_head(last_hidden)) * 0.1 # Scale to [0, 0.1] volume_pred = torch.exp(self.volume_head(last_hidden)) # Ensure positive uncertainty = torch.sigmoid(self.uncertainty_head(last_hidden)) return { 'price': price_pred, 'volatility': volatility_pred, 'volume': volume_pred, 'uncertainty': uncertainty } class PriceForecastingEngine: """Engine for multi-asset price forecasting using transformers with actual training""" def __init__(self): self.models = {} # One model per asset class self.scalers = {} # Feature scalers self.training_history = defaultdict(list) self.is_trained = defaultdict(bool) # Initialize models for each asset class for asset_class in ASSET_CLASSES.keys(): self.models[asset_class] = NumericalTransformer() self.scalers[asset_class] = StandardScaler() self.is_trained[asset_class] = False def prepare_features(self, price_data: pd.DataFrame) -> np.ndarray: """Prepare features for transformer input""" features = [] # Price features features.append(price_data['close'].values) features.append(price_data['high'].values) features.append(price_data['low'].values) # Volume features (log-transformed) features.append(np.log1p(price_data['volume'].values)) # Technical indicators returns = price_data['close'].pct_change().fillna(0) features.append(returns.values) # Moving averages ma_7 = price_data['close'].rolling(7).mean().fillna(method='bfill') ma_21 = price_data['close'].rolling(21).mean().fillna(method='bfill') features.append((price_data['close'] / ma_7 - 1).values) features.append((price_data['close'] / ma_21 - 1).values) # Volatility (rolling standard deviation) volatility = returns.rolling(20).std().fillna(method='bfill') features.append(volatility.values) # RSI (Relative Strength Index) rsi = self.calculate_rsi(price_data['close']) features.append(rsi.values / 100) # Normalize to [0, 1] # Order flow imbalance (simulated for this example) ofi = np.random.normal(0, 0.1, len(price_data)) features.append(ofi) # Stack features into matrix feature_matrix = np.column_stack(features) return feature_matrix def calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: """Calculate Relative Strength Index""" delta = prices.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi.fillna(50) def create_sequences(self, features: np.ndarray, targets: np.ndarray, seq_len: int = 50, horizon: int = 5) -> Tuple[np.ndarray, np.ndarray]: """Create sequences for training the transformer""" sequences = [] target_sequences = [] for i in range(seq_len, len(features) - horizon): sequences.append(features[i-seq_len:i]) target_sequences.append(targets[i:i+horizon]) return np.array(sequences), np.array(target_sequences) def train_model(self, asset_class: str, price_data: pd.DataFrame, epochs: int = 50, batch_size: int = 32): """Train the transformer model on historical data""" if len(price_data) < 100: return # Not enough data to train # Prepare features features = self.prepare_features(price_data) # Scale features features_scaled = self.scalers[asset_class].fit_transform(features) # Prepare targets (future returns) returns = price_data['close'].pct_change().fillna(0).values # Create sequences X, y = self.create_sequences(features_scaled, returns) if len(X) == 0: return # Not enough sequences # Convert to tensors X_tensor = torch.FloatTensor(X) y_tensor = torch.FloatTensor(y) # Create data loader dataset = torch.utils.data.TensorDataset(X_tensor, y_tensor) loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) # Setup training model = self.models[asset_class] optimizer = optim.Adam(model.parameters(), lr=0.001) criterion = nn.MSELoss() # Training loop model.train() for epoch in range(epochs): epoch_loss = 0.0 for batch_X, batch_y in loader: optimizer.zero_grad() # Forward pass predictions = model(batch_X) # Calculate loss (using price predictions) loss = criterion(predictions['price'], batch_y) # Backward pass loss.backward() optimizer.step() epoch_loss += loss.item() # Record training history avg_loss = epoch_loss / len(loader) self.training_history[asset_class].append(avg_loss) # Mark as trained self.is_trained[asset_class] = True model.eval() def forecast_prices(self, asset: str, price_history: pd.DataFrame, horizon: int = 5) -> Dict[str, np.ndarray]: """Generate price forecasts using transformer model""" # Determine asset class asset_class = self._get_asset_class(asset) if not asset_class: return self._generate_random_forecast(horizon) # Train model if not already trained if not self.is_trained[asset_class] and len(price_history) > 100: self.train_model(asset_class, price_history) # Prepare features features = self.prepare_features(price_history) # Scale features features_scaled = self.scalers[asset_class].fit_transform(features) # Create sequences seq_len = min(50, len(features_scaled)) if len(features_scaled) < seq_len: # Pad if necessary padding = seq_len - len(features_scaled) features_scaled = np.vstack([ np.zeros((padding, features_scaled.shape[1])), features_scaled ]) # Get recent sequence sequence = features_scaled[-seq_len:].reshape(1, seq_len, -1) sequence_tensor = torch.FloatTensor(sequence) # Generate forecast model = self.models[asset_class] model.eval() with torch.no_grad(): predictions = model(sequence_tensor) # Extract predictions current_price = price_history['close'].iloc[-1] # Convert relative predictions to absolute prices price_changes = predictions['price'].numpy()[0] price_forecast = current_price * (1 + price_changes * 0.01) # Scale predictions return { 'price': price_forecast, 'volatility': predictions['volatility'].numpy()[0], 'volume': predictions['volume'].numpy()[0], 'uncertainty': predictions['uncertainty'].numpy()[0] } def _get_asset_class(self, asset: str) -> Optional[str]: """Determine asset class for a given asset""" for asset_class, assets in ASSET_CLASSES.items(): if asset in assets or any(asset.startswith(a) for a in assets): return asset_class return None def _generate_random_forecast(self, horizon: int) -> Dict[str, np.ndarray]: """Generate random forecast as fallback""" return { 'price': np.random.normal(100, 2, horizon), 'volatility': np.random.uniform(0.01, 0.05, horizon), 'volume': np.random.lognormal(15, 0.5, horizon), 'uncertainty': np.random.uniform(0.3, 0.7, horizon) } class ExchangeSimulator: """Simulate exchange order books and execution with realistic market dynamics""" def __init__(self, exchange_type: str = 'cex'): self.exchange_type = exchange_type self.order_books = {} self.latency_ms = LATENCY_CEX_MS if exchange_type == 'cex' else LATENCY_DEX_MS self.transaction_cost = TRANSACTION_COST_CEX if exchange_type == 'cex' else TRANSACTION_COST_DEX def generate_order_book(self, asset: str, mid_price: float, spread_bps: float = 10, market_conditions: Dict[str, Any] = None) -> OrderBook: """Generate realistic order book with dynamic spread based on market conditions""" # Adjust spread based on market conditions if market_conditions: volatility = market_conditions.get('volatility', 0.02) liquidity = market_conditions.get('liquidity', 1.0) spread_bps *= (1 + volatility * 10) * (2 - liquidity) spread = mid_price * spread_bps / 10000 # Generate bid/ask levels with realistic depth n_levels = 10 bids = [] asks = [] for i in range(n_levels): # Bid side bid_price = mid_price - spread/2 - i * spread/10 bid_size = np.random.lognormal(10, 1) * (n_levels - i) / n_levels bids.append((bid_price, bid_size)) # Ask side ask_price = mid_price + spread/2 + i * spread/10 ask_size = np.random.lognormal(10, 1) * (n_levels - i) / n_levels asks.append((ask_price, ask_size)) return OrderBook( exchange=self.exchange_type, asset=asset, timestamp=datetime.now(), bids=bids, asks=asks ) def simulate_market_impact(self, size: float, liquidity: float) -> float: """Calculate market impact using square-root model""" # Almgren-Chriss square-root market impact model impact_bps = 10 * np.sqrt(size / liquidity) return impact_bps / 10000 def execute_order(self, order_book: OrderBook, side: str, size: float) -> Tuple[float, float]: """ Simulate order execution with realistic slippage Returns: (fill_price, actual_size) """ filled_size = 0 total_cost = 0 if side == 'buy': # Execute against asks for ask_price, ask_size in order_book.asks: if filled_size >= size: break fill_amount = min(size - filled_size, ask_size) filled_size += fill_amount total_cost += fill_amount * ask_price else: # sell # Execute against bids for bid_price, bid_size in order_book.bids: if filled_size >= size: break fill_amount = min(size - filled_size, bid_size) filled_size += fill_amount total_cost += fill_amount * bid_price # Calculate average fill price avg_fill_price = total_cost / filled_size if filled_size > 0 else 0 # Add market impact liquidity = sum(s for _, s in order_book.bids) + sum(s for _, s in order_book.asks) impact = self.simulate_market_impact(filled_size, liquidity) if side == 'buy': avg_fill_price *= (1 + impact) else: avg_fill_price *= (1 - impact) return avg_fill_price, filled_size class ArbitrageDetector: """Detect arbitrage opportunities across venues with advanced filtering""" def __init__(self): self.opportunity_history = [] self.min_profit_threshold = MIN_PROFIT_THRESHOLD def find_opportunities(self, order_books: Dict[str, Dict[str, OrderBook]], transaction_costs: Dict[str, float], forecasts: Dict[str, Dict[str, np.ndarray]] = None) -> List[ArbitrageOpportunity]: """Find arbitrage opportunities across all venues with forecast integration""" opportunities = [] # Check each asset for asset in self._get_all_assets(order_books): asset_books = self._get_asset_order_books(order_books, asset) if len(asset_books) < 2: continue # Find best bid and ask across all exchanges best_bid_exchange, best_bid_price, best_bid_size = self._find_best_bid(asset_books) best_ask_exchange, best_ask_price, best_ask_size = self._find_best_ask(asset_books) if best_bid_price > best_ask_price: # Calculate potential profit max_size = min(best_bid_size, best_ask_size, MAX_POSITION_SIZE / best_ask_price) # Calculate costs buy_cost = transaction_costs.get(best_ask_exchange, TRANSACTION_COST_CEX) sell_cost = transaction_costs.get(best_bid_exchange, TRANSACTION_COST_CEX) # Add gas cost for DEX gas_cost = 0 if 'dex' in best_ask_exchange.lower() or 'dex' in best_bid_exchange.lower(): gas_cost = GAS_COST_USD # Calculate profit gross_profit = (best_bid_price - best_ask_price) * max_size total_cost = (buy_cost + sell_cost) * best_ask_price * max_size + gas_cost net_profit = gross_profit - total_cost profit_pct = net_profit / (best_ask_price * max_size) if max_size > 0 else 0 # Adjust for price forecast if available if forecasts and asset in forecasts: price_forecast = forecasts[asset]['price'][0] forecast_adjustment = (price_forecast - best_ask_price) / best_ask_price profit_pct += forecast_adjustment * 0.5 # Weight forecast impact if profit_pct > self.min_profit_threshold: opportunity = ArbitrageOpportunity( opportunity_id=f"{asset}_{datetime.now().strftime('%Y%m%d_%H%M%S%f')}", asset=asset, buy_exchange=best_ask_exchange, sell_exchange=best_bid_exchange, buy_price=best_ask_price, sell_price=best_bid_price, max_size=max_size, expected_profit=net_profit, expected_profit_pct=profit_pct, latency_risk=self._calculate_latency_risk( best_ask_exchange, best_bid_exchange ), timestamp=datetime.now(), metadata={ 'spread': best_bid_price - best_ask_price, 'gas_cost': gas_cost, 'transaction_costs': buy_cost + sell_cost, 'forecast_impact': forecast_adjustment if forecasts and asset in forecasts else 0 } ) opportunities.append(opportunity) self.opportunity_history.append(opportunity) return opportunities def _get_all_assets(self, order_books: Dict[str, Dict[str, OrderBook]]) -> set: """Get all unique assets across exchanges""" assets = set() for exchange_books in order_books.values(): assets.update(exchange_books.keys()) return assets def _get_asset_order_books(self, order_books: Dict[str, Dict[str, OrderBook]], asset: str) -> Dict[str, OrderBook]: """Get order books for specific asset across exchanges""" asset_books = {} for exchange, books in order_books.items(): if asset in books: asset_books[exchange] = books[asset] return asset_books def _find_best_bid(self, asset_books: Dict[str, OrderBook]) -> Tuple[str, float, float]: """Find best bid across exchanges""" best_exchange = None best_price = 0 best_size = 0 for exchange, book in asset_books.items(): bid_price, bid_size = book.get_best_bid() if bid_price > best_price: best_exchange = exchange best_price = bid_price best_size = bid_size return best_exchange, best_price, best_size def _find_best_ask(self, asset_books: Dict[str, OrderBook]) -> Tuple[str, float, float]: """Find best ask across exchanges""" best_exchange = None best_price = float('inf') best_size = 0 for exchange, book in asset_books.items(): ask_price, ask_size = book.get_best_ask() if ask_price < best_price: best_exchange = exchange best_price = ask_price best_size = ask_size return best_exchange, best_price, best_size def _calculate_latency_risk(self, buy_exchange: str, sell_exchange: str) -> float: """Calculate latency risk score (0-1)""" # Higher risk for cross-exchange type arbitrage if ('dex' in buy_exchange.lower()) != ('dex' in sell_exchange.lower()): return 0.8 # High risk due to different settlement times elif 'dex' in buy_exchange.lower(): return 0.6 # Medium risk for DEX-DEX else: return 0.3 # Lower risk for CEX-CEX class LLMStrategyOptimizer: """LLM-inspired strategy parameter optimization with machine learning""" def __init__(self): self.parameter_history = defaultdict(list) self.performance_history = [] self.current_parameters = self._get_default_parameters() self.optimization_model = self._build_optimization_model() def _get_default_parameters(self) -> Dict[str, Any]: """Get default strategy parameters""" return { 'min_profit_threshold': 0.002, 'max_position_size': 100000, 'risk_limit': 0.02, # 2% per trade 'correlation_threshold': 0.7, 'rebalance_frequency': 300, # seconds 'latency_buffer': 1.5, # multiplier for latency estimates 'confidence_threshold': 0.6, 'max_concurrent_trades': 5 } def _build_optimization_model(self) -> nn.Module: """Build neural network for parameter optimization""" class ParameterOptimizer(nn.Module): def __init__(self): super().__init__() self.fc1 = nn.Linear(20, 64) # Input features self.fc2 = nn.Linear(64, 32) self.fc3 = nn.Linear(32, 8) # Output parameters self.relu = nn.ReLU() self.sigmoid = nn.Sigmoid() def forward(self, x): x = self.relu(self.fc1(x)) x = self.relu(self.fc2(x)) x = self.sigmoid(self.fc3(x)) return x return ParameterOptimizer() def generate_parameter_suggestions(self, recent_performance: List[ExecutionResult], market_conditions: Dict[str, Any]) -> Dict[str, Any]: """Generate parameter adjustments using ML-based optimization""" suggestions = self.current_parameters.copy() if not recent_performance: return suggestions # Extract performance features success_rate = sum(1 for r in recent_performance if r.success) / len(recent_performance) avg_slippage = np.mean([r.slippage for r in recent_performance]) avg_profit = np.mean([r.realized_profit for r in recent_performance]) profit_variance = np.var([r.realized_profit for r in recent_performance]) # Create feature vector features = [ success_rate, avg_slippage, avg_profit / 1000, # Normalize profit_variance / 1000000, # Normalize market_conditions.get('volatility', 0.02), market_conditions.get('liquidity', 1.0), len(recent_performance) / 100, # Normalize self.current_parameters['min_profit_threshold'], self.current_parameters['max_position_size'] / 1000000, self.current_parameters['risk_limit'], self.current_parameters['correlation_threshold'], self.current_parameters['rebalance_frequency'] / 3600, self.current_parameters['latency_buffer'], self.current_parameters['confidence_threshold'], self.current_parameters['max_concurrent_trades'] / 10, # Additional market features market_conditions.get('max_volatility', 0.03), float(datetime.now().hour) / 24, # Time of day float(datetime.now().weekday()) / 7, # Day of week 0.5, # Placeholder for sentiment (would be real in production) 0.5 # Placeholder for market regime (would be real in production) ] # Use ML model for optimization feature_tensor = torch.FloatTensor([features]) with torch.no_grad(): adjustments = self.optimization_model(feature_tensor).numpy()[0] # Apply ML-suggested adjustments suggestions['min_profit_threshold'] = 0.001 + adjustments[0] * 0.009 suggestions['max_position_size'] = 10000 + adjustments[1] * 490000 suggestions['risk_limit'] = 0.005 + adjustments[2] * 0.045 suggestions['correlation_threshold'] = 0.5 + adjustments[3] * 0.4 suggestions['rebalance_frequency'] = 60 + adjustments[4] * 3540 suggestions['latency_buffer'] = 1.0 + adjustments[5] * 2.0 suggestions['confidence_threshold'] = 0.5 + adjustments[6] * 0.4 suggestions['max_concurrent_trades'] = int(1 + adjustments[7] * 9) # Rule-based adjustments on top of ML if success_rate < 0.7: suggestions['min_profit_threshold'] *= 1.1 suggestions['confidence_threshold'] *= 1.05 if avg_slippage > 0.001: suggestions['max_position_size'] *= 0.9 suggestions['latency_buffer'] *= 1.1 if avg_profit < 0: suggestions['risk_limit'] *= 0.9 suggestions['max_concurrent_trades'] = max(1, suggestions['max_concurrent_trades'] - 1) # Market condition adjustments if market_conditions.get('volatility', 0) > 0.03: suggestions['min_profit_threshold'] *= 1.2 suggestions['correlation_threshold'] *= 0.9 if market_conditions.get('liquidity', 1) < 0.5: suggestions['max_position_size'] *= 0.7 # Ensure parameters stay within reasonable bounds suggestions = self._apply_parameter_bounds(suggestions) # Store suggestions self.parameter_history['suggestions'].append({ 'timestamp': datetime.now(), 'parameters': suggestions, 'reasoning': self._generate_reasoning(recent_performance, market_conditions), 'performance_metrics': { 'success_rate': success_rate, 'avg_slippage': avg_slippage, 'avg_profit': avg_profit } }) self.current_parameters = suggestions return suggestions def _apply_parameter_bounds(self, parameters: Dict[str, Any]) -> Dict[str, Any]: """Apply bounds to parameters""" bounds = { 'min_profit_threshold': (0.001, 0.01), 'max_position_size': (10000, 500000), 'risk_limit': (0.005, 0.05), 'correlation_threshold': (0.5, 0.9), 'rebalance_frequency': (60, 3600), 'latency_buffer': (1.0, 3.0), 'confidence_threshold': (0.5, 0.9), 'max_concurrent_trades': (1, 10) } bounded = parameters.copy() for param, (min_val, max_val) in bounds.items(): if param in bounded: bounded[param] = max(min_val, min(max_val, bounded[param])) return bounded def _generate_reasoning(self, performance: List[ExecutionResult], market_conditions: Dict[str, Any]) -> str: """Generate reasoning for parameter adjustments""" reasons = [] if performance: success_rate = sum(1 for r in performance if r.success) / len(performance) if success_rate < 0.7: reasons.append("Low success rate detected - increasing selectivity") avg_slippage = np.mean([r.slippage for r in performance]) if avg_slippage > 0.001: reasons.append("High slippage observed - adjusting execution parameters") avg_profit = np.mean([r.realized_profit for r in performance]) if avg_profit < 0: reasons.append("Negative average profit - tightening risk controls") if market_conditions.get('volatility', 0) > 0.03: reasons.append("Elevated market volatility - implementing conservative measures") if market_conditions.get('liquidity', 1) < 0.5: reasons.append("Reduced liquidity conditions - scaling down position sizes") return "; ".join(reasons) if reasons else "Standard market conditions" class RiskAnalytics: """Comprehensive risk analytics system with advanced metrics""" def __init__(self): self.position_history = [] self.var_confidence = 0.95 self.risk_metrics_history = [] self.correlation_matrix = None def calculate_var(self, returns: np.ndarray, confidence: float = 0.95) -> float: """Calculate Value at Risk using historical simulation""" if len(returns) < 20: return 0.02 # Default 2% VaR return np.percentile(returns, (1 - confidence) * 100) def calculate_cvar(self, returns: np.ndarray, confidence: float = 0.95) -> float: """Calculate Conditional Value at Risk (Expected Shortfall)""" var = self.calculate_var(returns, confidence) return returns[returns <= var].mean() def calculate_sharpe_ratio(self, returns: np.ndarray) -> float: """Calculate Sharpe ratio""" if len(returns) < 2: return 0.0 excess_returns = returns - RISK_FREE_RATE / TRADING_DAYS_PER_YEAR return np.sqrt(TRADING_DAYS_PER_YEAR) * excess_returns.mean() / (returns.std() + 1e-8) def calculate_sortino_ratio(self, returns: np.ndarray) -> float: """Calculate Sortino ratio (downside deviation)""" if len(returns) < 2: return 0.0 excess_returns = returns - RISK_FREE_RATE / TRADING_DAYS_PER_YEAR downside_returns = returns[returns < 0] if len(downside_returns) == 0: return float('inf') # No downside risk downside_std = np.std(downside_returns) return np.sqrt(TRADING_DAYS_PER_YEAR) * excess_returns.mean() / (downside_std + 1e-8) def calculate_max_drawdown(self, equity_curve: np.ndarray) -> float: """Calculate maximum drawdown""" peak = np.maximum.accumulate(equity_curve) drawdown = (peak - equity_curve) / peak return np.max(drawdown) def calculate_calmar_ratio(self, returns: np.ndarray, equity_curve: np.ndarray) -> float: """Calculate Calmar ratio (return / max drawdown)""" max_dd = self.calculate_max_drawdown(equity_curve) if max_dd == 0: return float('inf') annual_return = returns.mean() * TRADING_DAYS_PER_YEAR return annual_return / max_dd def analyze_position_risk(self, positions: List[Dict[str, Any]], market_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """Analyze risk for current positions with comprehensive metrics""" if not positions: return self._empty_risk_metrics() # Calculate position values and correlations position_values = [] position_returns = [] for position in positions: asset = position['asset'] size = position['size'] if asset in market_data: price = market_data[asset]['close'].iloc[-1] value = size * price position_values.append(value) returns = market_data[asset]['close'].pct_change().dropna() position_returns.append(returns) total_value = sum(position_values) # Calculate portfolio metrics if position_returns: # Create weighted portfolio returns weights = np.array(position_values) / total_value portfolio_returns = np.zeros(len(position_returns[0])) for i, (weight, returns) in enumerate(zip(weights, position_returns)): portfolio_returns += weight * returns.values # Calculate all risk metrics var = self.calculate_var(portfolio_returns) cvar = self.calculate_cvar(portfolio_returns) sharpe = self.calculate_sharpe_ratio(portfolio_returns) sortino = self.calculate_sortino_ratio(portfolio_returns) # Build equity curve equity_curve = (1 + portfolio_returns).cumprod() max_dd = self.calculate_max_drawdown(equity_curve) calmar = self.calculate_calmar_ratio(portfolio_returns, equity_curve) # Calculate correlation matrix returns_df = pd.DataFrame({ f'asset_{i}': returns.values for i, returns in enumerate(position_returns) }) correlation_matrix = returns_df.corr() self.correlation_matrix = correlation_matrix avg_correlation = correlation_matrix.values[np.triu_indices_from( correlation_matrix.values, k=1)].mean() else: var = cvar = sharpe = sortino = max_dd = calmar = avg_correlation = 0 # Calculate additional risk metrics herfindahl_index = sum((v/total_value)**2 for v in position_values) if total_value > 0 else 0 risk_metrics = { 'total_exposure': total_value, 'var_95': var, 'cvar_95': cvar, 'sharpe_ratio': sharpe, 'sortino_ratio': sortino, 'max_drawdown': max_dd, 'calmar_ratio': calmar, 'position_count': len(positions), 'avg_correlation': avg_correlation, 'concentration_risk': max(position_values) / total_value if total_value > 0 else 0, 'herfindahl_index': herfindahl_index, 'timestamp': datetime.now() } self.risk_metrics_history.append(risk_metrics) return risk_metrics def check_risk_limits(self, proposed_trade: ArbitrageOpportunity, current_positions: List[Dict[str, Any]], risk_parameters: Dict[str, Any]) -> Tuple[bool, str]: """Check if proposed trade violates risk limits""" # Check position limit position_value = proposed_trade.max_size * proposed_trade.buy_price if position_value > risk_parameters['max_position_size']: return False, "Position size exceeds limit" # Check total exposure current_exposure = sum(p['size'] * p['entry_price'] for p in current_positions) if current_exposure + position_value > risk_parameters['max_position_size'] * 5: return False, "Total exposure limit exceeded" # Check concurrent trades if len(current_positions) >= risk_parameters['max_concurrent_trades']: return False, "Maximum concurrent trades reached" # Check correlation with existing positions same_asset_positions = [p for p in current_positions if p['asset'] == proposed_trade.asset] if same_asset_positions: return False, "Already have position in this asset" # Check risk/reward ratio if proposed_trade.expected_profit_pct < risk_parameters['min_profit_threshold']: return False, "Profit below minimum threshold" # Check latency risk if proposed_trade.latency_risk > risk_parameters.get('max_latency_risk', 0.7): return False, "Latency risk too high" return True, "Risk checks passed" def _empty_risk_metrics(self) -> Dict[str, Any]: """Return empty risk metrics""" return { 'total_exposure': 0, 'var_95': 0, 'cvar_95': 0, 'sharpe_ratio': 0, 'sortino_ratio': 0, 'max_drawdown': 0, 'calmar_ratio': 0, 'position_count': 0, 'avg_correlation': 0, 'concentration_risk': 0, 'herfindahl_index': 0, 'timestamp': datetime.now() } class LatencyAwareExecutionEngine: """Execution engine with realistic latency simulation and smart routing""" def __init__(self): self.execution_history = [] self.latency_model = self._build_latency_model() self.slippage_model = self._build_slippage_model() self.execution_analytics = defaultdict(list) def _build_latency_model(self) -> Dict[str, Dict[str, float]]: """Build latency model for different exchange pairs""" return { 'cex_cex': {'mean': 100, 'std': 20}, # CEX to CEX 'cex_dex': {'mean': 1500, 'std': 500}, # CEX to DEX 'dex_dex': {'mean': 2000, 'std': 800}, # DEX to DEX } def _build_slippage_model(self) -> Dict[str, float]: """Build slippage model based on market conditions""" return { 'low_volatility': 0.0005, # 5 bps 'normal': 0.001, # 10 bps 'high_volatility': 0.002, # 20 bps 'extreme': 0.005 # 50 bps } def simulate_execution(self, opportunity: ArbitrageOpportunity, buy_exchange: ExchangeSimulator, sell_exchange: ExchangeSimulator, market_conditions: Dict[str, Any]) -> ExecutionResult: """Simulate order execution with realistic latency and slippage""" # Determine exchange types exchange_pair = self._get_exchange_pair_type( opportunity.buy_exchange, opportunity.sell_exchange ) # Simulate latency latency_params = self.latency_model[exchange_pair] total_latency = np.random.normal( latency_params['mean'], latency_params['std'] ) total_latency = max(0, total_latency) # Ensure non-negative # Determine market volatility regime volatility_regime = self._get_volatility_regime(market_conditions) base_slippage = self.slippage_model[volatility_regime] # Calculate price movement during latency (correlated with volatility) volatility = market_conditions.get('volatility', 0.02) price_drift = np.random.normal(0, base_slippage * np.sqrt(total_latency / 1000) * (1 + volatility * 10)) # Simulate buy execution buy_price_adjusted = opportunity.buy_price * (1 + price_drift) buy_book = buy_exchange.generate_order_book( opportunity.asset, buy_price_adjusted, market_conditions=market_conditions ) buy_fill_price, buy_fill_size = buy_exchange.execute_order( buy_book, 'buy', opportunity.max_size ) # Simulate sell execution (with additional latency) sell_latency = np.random.normal(50, 10) price_drift_sell = np.random.normal( 0, base_slippage * np.sqrt((total_latency + sell_latency) / 1000) * (1 + volatility * 10) ) sell_price_adjusted = opportunity.sell_price * (1 - price_drift_sell) sell_book = sell_exchange.generate_order_book( opportunity.asset, sell_price_adjusted, market_conditions=market_conditions ) sell_fill_price, sell_fill_size = sell_exchange.execute_order( sell_book, 'sell', min(buy_fill_size, opportunity.max_size) ) # Calculate realized profit executed_size = min(buy_fill_size, sell_fill_size) # Transaction costs buy_cost = buy_exchange.transaction_cost * buy_fill_price * executed_size sell_cost = sell_exchange.transaction_cost * sell_fill_price * executed_size # Gas costs for DEX gas_cost = 0 if 'dex' in opportunity.buy_exchange.lower(): gas_cost += GAS_COST_USD if 'dex' in opportunity.sell_exchange.lower(): gas_cost += GAS_COST_USD # Net profit calculation gross_profit = (sell_fill_price - buy_fill_price) * executed_size net_profit = gross_profit - buy_cost - sell_cost - gas_cost # Calculate slippage expected_profit = (opportunity.sell_price - opportunity.buy_price) * executed_size slippage = (expected_profit - gross_profit) / expected_profit if expected_profit > 0 else 0 # Determine success based on profitability success = net_profit > 0 and executed_size > 0 result = ExecutionResult( opportunity_id=opportunity.opportunity_id, success=success, executed_size=executed_size, buy_fill_price=buy_fill_price, sell_fill_price=sell_fill_price, realized_profit=net_profit, slippage=slippage, latency_ms=total_latency, gas_cost=gas_cost, timestamp=datetime.now() ) self.execution_history.append(result) # Track execution analytics self.execution_analytics['asset'].append(opportunity.asset) self.execution_analytics['exchange_pair'].append(exchange_pair) self.execution_analytics['volatility_regime'].append(volatility_regime) return result def _get_exchange_pair_type(self, buy_exchange: str, sell_exchange: str) -> str: """Determine exchange pair type""" buy_is_dex = 'dex' in buy_exchange.lower() or buy_exchange in EXCHANGES['dex'] sell_is_dex = 'dex' in sell_exchange.lower() or sell_exchange in EXCHANGES['dex'] if buy_is_dex and sell_is_dex: return 'dex_dex' elif not buy_is_dex and not sell_is_dex: return 'cex_cex' else: return 'cex_dex' def _get_volatility_regime(self, market_conditions: Dict[str, Any]) -> str: """Determine current volatility regime""" volatility = market_conditions.get('volatility', 0.02) if volatility < 0.015: return 'low_volatility' elif volatility < 0.03: return 'normal' elif volatility < 0.05: return 'high_volatility' else: return 'extreme' def optimize_execution_path(self, opportunities: List[ArbitrageOpportunity], current_positions: List[Dict[str, Any]], risk_parameters: Dict[str, Any]) -> List[ArbitrageOpportunity]: """Optimize execution order considering dependencies and risk""" if not opportunities: return [] # Score opportunities based on multiple factors scored_opportunities = [] for opp in opportunities: # Multi-factor scoring profit_score = opp.expected_profit_pct latency_penalty = opp.latency_risk * 0.5 size_score = min(opp.max_size * opp.buy_price / risk_parameters['max_position_size'], 1.0) # Add forecast confidence if available forecast_confidence = 1 - opp.metadata.get('forecast_uncertainty', 0.5) # Combined score total_score = profit_score * (1 - latency_penalty) * size_score * forecast_confidence scored_opportunities.append((total_score, opp)) # Sort by score (highest first) scored_opportunities.sort(key=lambda x: x[0], reverse=True) # Select top opportunities that don't violate risk limits selected = [] simulated_positions = current_positions.copy() for score, opp in scored_opportunities: # Simulate adding this position can_add, reason = self._can_add_opportunity( opp, simulated_positions, risk_parameters ) if can_add: selected.append(opp) simulated_positions.append({ 'asset': opp.asset, 'size': opp.max_size, 'entry_price': opp.buy_price }) if len(selected) >= risk_parameters['max_concurrent_trades']: break return selected def _can_add_opportunity(self, opportunity: ArbitrageOpportunity, positions: List[Dict[str, Any]], risk_parameters: Dict[str, Any]) -> Tuple[bool, str]: """Check if opportunity can be added to positions""" # Check if already have position in asset for pos in positions: if pos['asset'] == opportunity.asset: return False, "Already have position in asset" # Check total exposure current_exposure = sum(p['size'] * p['entry_price'] for p in positions) new_exposure = opportunity.max_size * opportunity.buy_price if current_exposure + new_exposure > risk_parameters['max_position_size'] * 5: return False, "Would exceed total exposure limit" return True, "OK" class CrossAssetArbitrageEngine: """Main arbitrage engine coordinating all components""" def __init__(self): # Initialize components self.price_forecaster = PriceForecastingEngine() self.arbitrage_detector = ArbitrageDetector() self.strategy_optimizer = LLMStrategyOptimizer() self.risk_analytics = RiskAnalytics() self.execution_engine = LatencyAwareExecutionEngine() # Exchange simulators self.exchanges = {} for exchange in EXCHANGES['cex']: self.exchanges[exchange] = ExchangeSimulator('cex') for exchange in EXCHANGES['dex']: self.exchanges[exchange] = ExchangeSimulator('dex') # State management self.active_positions = [] self.portfolio_value = 100000 # Starting capital self.performance_history = [] self.market_data_cache = {} self.forecasts_cache = {} def generate_market_data(self, assets: List[str], days: int = 100) -> Dict[str, pd.DataFrame]: """Generate realistic correlated market data for multiple assets""" market_data = {} # Generate correlation matrix for assets n_assets = len(assets) correlation_matrix = np.eye(n_assets) # Add correlations between assets for i in range(n_assets): for j in range(i+1, n_assets): # Crypto assets are more correlated if (assets[i] in ASSET_CLASSES['crypto_spot'] and assets[j] in ASSET_CLASSES['crypto_spot']): corr = np.random.uniform(0.6, 0.9) # FX pairs have moderate correlation elif (assets[i] in ASSET_CLASSES['fx_pairs'] and assets[j] in ASSET_CLASSES['fx_pairs']): corr = np.random.uniform(0.3, 0.6) # Different asset classes have low correlation else: corr = np.random.uniform(-0.2, 0.3) correlation_matrix[i, j] = corr correlation_matrix[j, i] = corr # Generate correlated returns mean_returns = np.zeros(n_assets) volatilities = [] for asset in assets: if asset in ASSET_CLASSES['crypto_spot']: volatilities.append(0.015) # Higher volatility elif asset in ASSET_CLASSES['fx_pairs']: volatilities.append(0.005) # Lower volatility else: volatilities.append(0.01) # Medium volatility cov_matrix = np.outer(volatilities, volatilities) * correlation_matrix # Generate returns returns = np.random.multivariate_normal(mean_returns, cov_matrix, days) # Generate price data for each asset for i, asset in enumerate(assets): # Base price if asset in ASSET_CLASSES['crypto_spot']: base_price = {'BTC': 45000, 'ETH': 3000, 'SOL': 100}.get(asset, 50) elif asset in ASSET_CLASSES['equity_etfs']: base_price = {'SPY': 450, 'QQQ': 380}.get(asset, 100) else: base_price = 1.0 # FX pairs # Generate prices from returns prices = base_price * np.exp(np.cumsum(returns[:, i])) # Generate OHLCV data dates = pd.date_range(end=datetime.now(), periods=days, freq='H') data = pd.DataFrame({ 'open': prices * (1 + np.random.normal(0, 0.002, days)), 'high': prices * (1 + np.abs(np.random.normal(0, 0.005, days))), 'low': prices * (1 - np.abs(np.random.normal(0, 0.005, days))), 'close': prices, 'volume': np.random.lognormal(15, 0.5, days) }, index=dates) # Ensure OHLC consistency data['high'] = data[['open', 'high', 'close']].max(axis=1) data['low'] = data[['open', 'low', 'close']].min(axis=1) market_data[asset] = data self.market_data_cache = market_data return market_data def update_order_books(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, OrderBook]]: """Generate current order books for all exchanges""" order_books = defaultdict(dict) # Get current market conditions market_conditions = self.calculate_market_conditions(market_data) for asset, data in market_data.items(): current_price = data['close'].iloc[-1] # Generate order books for each exchange for exchange_name, exchange in self.exchanges.items(): # Add price variation between exchanges price_variation = np.random.normal(0, 0.0005) adjusted_price = current_price * (1 + price_variation) # Vary spread based on exchange type and market conditions base_spread = 5 if exchange.exchange_type == 'cex' else 15 volatility_adjustment = 1 + market_conditions['volatility'] * 20 spread_bps = base_spread * volatility_adjustment order_book = exchange.generate_order_book( asset, adjusted_price, spread_bps, market_conditions ) order_books[exchange_name][asset] = order_book return dict(order_books) def calculate_market_conditions(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """Calculate current market conditions""" volatilities = [] volumes = [] spreads = [] for asset, data in market_data.items(): returns = data['close'].pct_change().dropna() # Calculate volatility (annualized) volatility = returns.iloc[-24:].std() * np.sqrt(365 * 24) volatilities.append(volatility) # Calculate average volume avg_volume = data['volume'].iloc[-24:].mean() volumes.append(avg_volume) # Calculate spread proxy spread = (data['high'] - data['low']).iloc[-24:].mean() / data['close'].iloc[-24:].mean() spreads.append(spread) return { 'volatility': np.mean(volatilities), 'max_volatility': np.max(volatilities), 'liquidity': np.mean(volumes) / 1e6, # Normalize 'avg_spread': np.mean(spreads), 'timestamp': datetime.now() } def generate_price_forecasts(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, np.ndarray]]: """Generate price forecasts for all assets""" forecasts = {} for asset, data in market_data.items(): forecast = self.price_forecaster.forecast_prices(asset, data) forecasts[asset] = forecast self.forecasts_cache = forecasts return forecasts def run_arbitrage_cycle(self) -> Dict[str, Any]: """Run complete arbitrage detection and execution cycle""" # Get current market data if not self.market_data_cache: assets = [] for asset_class, asset_list in ASSET_CLASSES.items(): assets.extend(asset_list[:2]) # Use first 2 from each class self.market_data_cache = self.generate_market_data(assets) market_data = self.market_data_cache # Generate price forecasts forecasts = self.generate_price_forecasts(market_data) # Update order books order_books = self.update_order_books(market_data) # Calculate market conditions market_conditions = self.calculate_market_conditions(market_data) # Update strategy parameters based on recent performance recent_executions = self.execution_engine.execution_history[-20:] strategy_params = self.strategy_optimizer.generate_parameter_suggestions( recent_executions, market_conditions ) # Find arbitrage opportunities with forecast integration transaction_costs = { exchange: sim.transaction_cost for exchange, sim in self.exchanges.items() } opportunities = self.arbitrage_detector.find_opportunities( order_books, transaction_costs, forecasts ) # Filter based on strategy parameters filtered_opportunities = [ opp for opp in opportunities if opp.expected_profit_pct >= strategy_params['min_profit_threshold'] ] # Risk analysis risk_metrics = self.risk_analytics.analyze_position_risk( self.active_positions, market_data ) # Optimize execution order selected_opportunities = self.execution_engine.optimize_execution_path( filtered_opportunities, self.active_positions, strategy_params ) # Execute selected opportunities execution_results = [] for opportunity in selected_opportunities: # Final risk check can_execute, reason = self.risk_analytics.check_risk_limits( opportunity, self.active_positions, strategy_params ) if can_execute: # Execute trade buy_exchange = self.exchanges[opportunity.buy_exchange] sell_exchange = self.exchanges[opportunity.sell_exchange] result = self.execution_engine.simulate_execution( opportunity, buy_exchange, sell_exchange, market_conditions ) execution_results.append(result) # Update positions if successful if result.success: self.active_positions.append({ 'asset': opportunity.asset, 'size': result.executed_size, 'entry_price': result.buy_fill_price, 'exit_price': result.sell_fill_price, 'profit': result.realized_profit, 'timestamp': result.timestamp }) # Update portfolio value self.portfolio_value += result.realized_profit # Clean up completed positions (for this simulation, all arb trades complete immediately) self.active_positions = [p for p in self.active_positions if (datetime.now() - p['timestamp']).seconds < 300] # Store performance metrics cycle_summary = { 'timestamp': datetime.now(), 'opportunities_found': len(opportunities), 'opportunities_executed': len(execution_results), 'successful_executions': sum(1 for r in execution_results if r.success), 'total_profit': sum(r.realized_profit for r in execution_results), 'portfolio_value': self.portfolio_value, 'risk_metrics': risk_metrics, 'market_conditions': market_conditions, 'strategy_parameters': strategy_params } self.performance_history.append(cycle_summary) return cycle_summary # Visualization functions def create_opportunity_network(opportunities: List[ArbitrageOpportunity]) -> go.Figure: """Create network visualization of arbitrage opportunities""" # Create graph G = nx.Graph() # Add nodes and edges for opp in opportunities: G.add_edge( opp.buy_exchange, opp.sell_exchange, weight=opp.expected_profit_pct, asset=opp.asset ) if len(G.nodes()) == 0: # Empty graph fig = go.Figure() fig.add_annotation( text="No arbitrage opportunities found", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) return fig # Calculate layout pos = nx.spring_layout(G, k=2, iterations=50) # Create edge trace edge_trace = [] for edge in G.edges(data=True): x0, y0 = pos[edge[0]] x1, y1 = pos[edge[1]] edge_trace.append(go.Scatter( x=[x0, x1, None], y=[y0, y1, None], mode='lines', line=dict( width=edge[2]['weight'] * 100, color='rgba(125,125,125,0.5)' ), hoverinfo='text', text=f"{edge[2]['asset']}: {edge[2]['weight']*100:.2f}%" )) # Create node trace node_trace = go.Scatter( x=[pos[node][0] for node in G.nodes()], y=[pos[node][1] for node in G.nodes()], mode='markers+text', text=[node for node in G.nodes()], textposition="top center", marker=dict( size=20, color=['red' if 'dex' in node.lower() else 'blue' for node in G.nodes()], line=dict(color='darkgray', width=2) ), hoverinfo='text' ) # Create figure fig = go.Figure(data=edge_trace + [node_trace]) fig.update_layout( title="Arbitrage Opportunity Network", showlegend=False, xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), height=500 ) return fig def create_performance_dashboard(performance_history: List[Dict[str, Any]]) -> go.Figure: """Create comprehensive performance dashboard""" if not performance_history: fig = go.Figure() fig.add_annotation( text="No performance data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) return fig # Convert to DataFrame perf_df = pd.DataFrame(performance_history) # Create subplots fig = make_subplots( rows=3, cols=2, subplot_titles=( 'Portfolio Value', 'Profit per Cycle', 'Success Rate', 'Risk Metrics', 'Opportunities vs Executions', 'Market Conditions' ), specs=[ [{"type": "scatter"}, {"type": "scatter"}], [{"type": "scatter"}, {"type": "scatter"}], [{"type": "bar"}, {"type": "scatter"}] ], vertical_spacing=0.1, horizontal_spacing=0.1 ) # Portfolio value fig.add_trace( go.Scatter( x=perf_df['timestamp'], y=perf_df['portfolio_value'], mode='lines', name='Portfolio Value', line=dict(color='blue', width=2) ), row=1, col=1 ) # Profit per cycle fig.add_trace( go.Scatter( x=perf_df['timestamp'], y=perf_df['total_profit'], mode='lines+markers', name='Profit', line=dict(color='green') ), row=1, col=2 ) # Success rate perf_df['success_rate'] = perf_df.apply( lambda x: x['successful_executions'] / x['opportunities_executed'] if x['opportunities_executed'] > 0 else 0, axis=1 ) fig.add_trace( go.Scatter( x=perf_df['timestamp'], y=perf_df['success_rate'], mode='lines', name='Success Rate', line=dict(color='orange') ), row=2, col=1 ) # Risk metrics (Sharpe ratio) sharpe_values = [m['sharpe_ratio'] for m in perf_df['risk_metrics']] fig.add_trace( go.Scatter( x=perf_df['timestamp'], y=sharpe_values, mode='lines', name='Sharpe Ratio', line=dict(color='purple') ), row=2, col=2 ) # Opportunities vs Executions fig.add_trace( go.Bar( x=perf_df['timestamp'], y=perf_df['opportunities_found'], name='Found', marker_color='lightblue' ), row=3, col=1 ) fig.add_trace( go.Bar( x=perf_df['timestamp'], y=perf_df['opportunities_executed'], name='Executed', marker_color='darkblue' ), row=3, col=1 ) # Market volatility volatility_values = [m['volatility'] for m in perf_df['market_conditions']] fig.add_trace( go.Scatter( x=perf_df['timestamp'], y=volatility_values, mode='lines', name='Volatility', line=dict(color='red') ), row=3, col=2 ) # Update layout fig.update_layout( height=1000, showlegend=False, title_text="Cross-Asset Arbitrage Performance Dashboard" ) # Update axes fig.update_xaxes(title_text="Time", row=3, col=1) fig.update_xaxes(title_text="Time", row=3, col=2) fig.update_yaxes(title_text="Value ($)", row=1, col=1) fig.update_yaxes(title_text="Profit ($)", row=1, col=2) fig.update_yaxes(title_text="Rate", row=2, col=1) fig.update_yaxes(title_text="Sharpe", row=2, col=2) fig.update_yaxes(title_text="Count", row=3, col=1) fig.update_yaxes(title_text="Volatility", row=3, col=2) return fig def create_execution_analysis(execution_history: List[ExecutionResult]) -> go.Figure: """Create execution analysis visualization""" if not execution_history: fig = go.Figure() fig.add_annotation( text="No execution data available", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) return fig # Convert to DataFrame exec_df = pd.DataFrame([ { 'timestamp': e.timestamp, 'profit': e.realized_profit, 'slippage': e.slippage, 'latency': e.latency_ms, 'success': e.success } for e in execution_history ]) # Create subplots fig = make_subplots( rows=2, cols=2, subplot_titles=( 'Profit Distribution', 'Slippage Analysis', 'Latency Distribution', 'Success Rate Over Time' ), specs=[ [{"type": "histogram"}, {"type": "scatter"}], [{"type": "histogram"}, {"type": "scatter"}] ] ) # Profit distribution fig.add_trace( go.Histogram( x=exec_df['profit'], nbinsx=30, name='Profit', marker_color='green' ), row=1, col=1 ) # Slippage over time fig.add_trace( go.Scatter( x=exec_df['timestamp'], y=exec_df['slippage'] * 100, # Convert to percentage mode='markers', name='Slippage', marker=dict( color=exec_df['success'].map({True: 'blue', False: 'red'}), size=8 ) ), row=1, col=2 ) # Latency distribution fig.add_trace( go.Histogram( x=exec_df['latency'], nbinsx=30, name='Latency', marker_color='orange' ), row=2, col=1 ) # Success rate over time (rolling) exec_df['success_int'] = exec_df['success'].astype(int) exec_df['success_rate_rolling'] = exec_df['success_int'].rolling( window=20, min_periods=1 ).mean() fig.add_trace( go.Scatter( x=exec_df['timestamp'], y=exec_df['success_rate_rolling'], mode='lines', name='Success Rate', line=dict(color='purple', width=2) ), row=2, col=2 ) # Update layout fig.update_layout( height=700, showlegend=False, title_text="Execution Analysis" ) # Update axes fig.update_xaxes(title_text="Profit ($)", row=1, col=1) fig.update_xaxes(title_text="Time", row=1, col=2) fig.update_xaxes(title_text="Latency (ms)", row=2, col=1) fig.update_xaxes(title_text="Time", row=2, col=2) fig.update_yaxes(title_text="Count", row=1, col=1) fig.update_yaxes(title_text="Slippage (%)", row=1, col=2) fig.update_yaxes(title_text="Count", row=2, col=1) fig.update_yaxes(title_text="Success Rate", row=2, col=2) return fig # Gradio Interface def create_gradio_interface(): """Create the main Gradio interface""" # Initialize engine engine = CrossAssetArbitrageEngine() def run_arbitrage_simulation(n_cycles, initial_capital, min_profit_threshold): """Run arbitrage simulation""" # Reset engine engine.portfolio_value = float(initial_capital) engine.performance_history = [] engine.execution_engine.execution_history = [] engine.active_positions = [] # Update strategy parameters engine.strategy_optimizer.current_parameters['min_profit_threshold'] = float(min_profit_threshold) / 100 # Generate initial market data assets = [] for asset_class, asset_list in ASSET_CLASSES.items(): assets.extend(asset_list[:2]) # Use 2 assets from each class engine.generate_market_data(assets, days=200) # Run simulation cycles cycle_summaries = [] for i in range(int(n_cycles)): # Update market data (simulate price movement) for asset, data in engine.market_data_cache.items(): # Add new price point last_price = data['close'].iloc[-1] new_return = np.random.normal(0.0001, 0.01) new_price = last_price * (1 + new_return) new_row = pd.DataFrame({ 'open': [new_price * (1 + np.random.normal(0, 0.002))], 'high': [new_price * (1 + abs(np.random.normal(0, 0.005)))], 'low': [new_price * (1 - abs(np.random.normal(0, 0.005)))], 'close': [new_price], 'volume': [np.random.lognormal(15, 0.5)] }, index=[data.index[-1] + pd.Timedelta(hours=1)]) # Ensure OHLC consistency new_row['high'] = new_row[['open', 'high', 'close']].max(axis=1) new_row['low'] = new_row[['open', 'low', 'close']].min(axis=1) engine.market_data_cache[asset] = pd.concat([data, new_row]) # Keep only recent data engine.market_data_cache[asset] = engine.market_data_cache[asset].iloc[-200:] # Run arbitrage cycle summary = engine.run_arbitrage_cycle() cycle_summaries.append(summary) # Create visualizations opportunity_network = create_opportunity_network( engine.arbitrage_detector.opportunity_history[-50:] ) performance_dashboard = create_performance_dashboard( engine.performance_history ) execution_analysis = create_execution_analysis( engine.execution_engine.execution_history ) # Calculate summary statistics total_profit = sum(s['total_profit'] for s in engine.performance_history) total_return = (engine.portfolio_value - initial_capital) / initial_capital if len(engine.execution_engine.execution_history) > 0: success_rate = sum( 1 for e in engine.execution_engine.execution_history if e.success ) / len(engine.execution_engine.execution_history) avg_latency = np.mean([ e.latency_ms for e in engine.execution_engine.execution_history ]) avg_slippage = np.mean([ e.slippage for e in engine.execution_engine.execution_history ]) else: success_rate = avg_latency = avg_slippage = 0 # Get latest risk metrics if engine.risk_analytics.risk_metrics_history: latest_risk = engine.risk_analytics.risk_metrics_history[-1] sharpe = latest_risk['sharpe_ratio'] var_95 = latest_risk['var_95'] else: sharpe = var_95 = 0 summary_text = f""" ### Simulation Summary **Performance Metrics:** - Total Profit: ${total_profit:,.2f} - Total Return: {total_return*100:.2f}% - Final Portfolio Value: ${engine.portfolio_value:,.2f} - Sharpe Ratio: {sharpe:.2f} - VaR (95%): {var_95*100:.2f}% **Execution Statistics:** - Total Opportunities Found: {sum(s['opportunities_found'] for s in engine.performance_history)} - Total Executions: {len(engine.execution_engine.execution_history)} - Success Rate: {success_rate*100:.1f}% - Average Latency: {avg_latency:.0f}ms - Average Slippage: {avg_slippage*100:.2f}% **Active Positions:** {len(engine.active_positions)} """ # Latest opportunities table recent_opps = [] for opp in engine.arbitrage_detector.opportunity_history[-10:]: recent_opps.append({ 'Asset': opp.asset, 'Buy Exchange': opp.buy_exchange, 'Sell Exchange': opp.sell_exchange, 'Spread': f"{(opp.sell_price - opp.buy_price)/opp.buy_price*100:.2f}%", 'Expected Profit': f"${opp.expected_profit:.2f}", 'Latency Risk': f"{opp.latency_risk:.2f}" }) opportunities_df = pd.DataFrame(recent_opps) if recent_opps else pd.DataFrame() return (opportunity_network, performance_dashboard, execution_analysis, summary_text, opportunities_df) def analyze_strategy_parameters(): """Analyze current strategy parameters""" if not engine.strategy_optimizer.parameter_history: return "No parameter history available", "" # Get parameter evolution param_history = engine.strategy_optimizer.parameter_history.get('suggestions', []) if not param_history: return "No parameter suggestions generated", "" # Create parameter evolution chart param_df = pd.DataFrame([ { 'timestamp': entry['timestamp'], **entry['parameters'] } for entry in param_history ]) fig = make_subplots( rows=2, cols=2, subplot_titles=( 'Profit Threshold', 'Position Size', 'Risk Limit', 'Confidence Threshold' ) ) # Plot each parameter params_to_plot = [ ('min_profit_threshold', 1, 1, 'Threshold'), ('max_position_size', 1, 2, 'Size ($)'), ('risk_limit', 2, 1, 'Limit'), ('confidence_threshold', 2, 2, 'Threshold') ] for param, row, col, ylabel in params_to_plot: if param in param_df.columns: fig.add_trace( go.Scatter( x=param_df['timestamp'], y=param_df[param], mode='lines+markers', name=param ), row=row, col=col ) fig.update_layout( height=600, showlegend=False, title_text="Strategy Parameter Evolution" ) # Get latest reasoning latest_reasoning = param_history[-1]['reasoning'] if param_history else "No reasoning available" return fig, f"**Latest Optimization Reasoning:** {latest_reasoning}" # Create interface with gr.Blocks(title="Cross-Asset Arbitrage Engine") as interface: gr.Markdown(""" # Cross-Asset Arbitrage Engine with Transformer Models This sophisticated arbitrage engine leverages transformer models for price forecasting across multiple asset classes: - **Crypto Spot/Futures**: BTC, ETH, SOL and perpetual futures - **Foreign Exchange**: Major currency pairs - **Equity ETFs**: SPY, QQQ, IWM and international markets Features: - **Transformer Price Prediction**: Numerical-adapted transformers for multi-horizon forecasting - **Cross-Venue Execution**: Simulates CEX (Binance, Coinbase) and DEX (Uniswap V3) integration - **LLM Strategy Optimization**: Dynamic parameter adjustment based on performance - **Latency-Aware Execution**: Realistic order routing with slippage simulation - **Comprehensive Risk Analytics**: Real-time VaR, Sharpe ratio, and drawdown monitoring Author: Spencer Purdy """) with gr.Tab("Arbitrage Simulation"): with gr.Row(): with gr.Column(scale=1): n_cycles = gr.Slider( minimum=10, maximum=100, value=50, step=10, label="Number of Trading Cycles" ) initial_capital = gr.Number( value=100000, label="Initial Capital ($)", minimum=10000 ) min_profit = gr.Slider( minimum=0.1, maximum=1.0, value=0.2, step=0.1, label="Minimum Profit Threshold (%)" ) run_btn = gr.Button("Run Simulation", variant="primary") with gr.Row(): opportunity_network = gr.Plot(label="Arbitrage Opportunity Network") with gr.Row(): performance_dashboard = gr.Plot(label="Performance Dashboard") with gr.Row(): execution_analysis = gr.Plot(label="Execution Analysis") with gr.Row(): with gr.Column(scale=1): summary_display = gr.Markdown(label="Summary Statistics") with gr.Column(scale=1): opportunities_table = gr.DataFrame( label="Recent Arbitrage Opportunities" ) with gr.Tab("Strategy Analysis"): with gr.Row(): analyze_btn = gr.Button("Analyze Strategy Parameters", variant="primary") with gr.Row(): param_evolution = gr.Plot(label="Parameter Evolution") with gr.Row(): param_reasoning = gr.Markdown(label="Optimization Reasoning") # Event handlers run_btn.click( fn=run_arbitrage_simulation, inputs=[n_cycles, initial_capital, min_profit], outputs=[ opportunity_network, performance_dashboard, execution_analysis, summary_display, opportunities_table ] ) analyze_btn.click( fn=analyze_strategy_parameters, inputs=[], outputs=[param_evolution, param_reasoning] ) # Add examples gr.Examples( examples=[ [50, 100000, 0.2], [30, 50000, 0.3], [100, 200000, 0.15] ], inputs=[n_cycles, initial_capital, min_profit] ) return interface # Launch application if __name__ == "__main__": interface = create_gradio_interface() interface.launch()