SpencerCPurdy's picture
Create app.py
ace4358 verified
"""
Cross-Asset Arbitrage Engine with Transformer Models
Author: Spencer Purdy
Description: A sophisticated arbitrage engine leveraging transformer models for price forecasting
across multiple asset classes and venues. Integrates CEX/DEX venues with latency-aware
execution, LLM-driven optimization, and comprehensive risk analytics.
"""
# Install required packages
# !pip install -q transformers torch numpy pandas scikit-learn plotly gradio scipy statsmodels networkx
# Core imports
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from datetime import datetime, timedelta
import gradio as gr
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import json
import random
from typing import Dict, List, Tuple, Optional, Any, Union
from dataclasses import dataclass, field
from collections import defaultdict, deque
import warnings
warnings.filterwarnings('ignore')
# Additional imports
from scipy import stats
from scipy.optimize import minimize
from sklearn.preprocessing import StandardScaler
import networkx as nx
# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
random.seed(42)
# Configuration constants
TRADING_DAYS_PER_YEAR = 365 # Crypto markets trade 24/7
RISK_FREE_RATE = 0.045 # Current risk-free rate
TRANSACTION_COST_CEX = 0.001 # 0.1% for centralized exchanges
TRANSACTION_COST_DEX = 0.003 # 0.3% for decentralized exchanges
GAS_COST_USD = 5.0 # Average gas cost for DEX transactions
MIN_PROFIT_THRESHOLD = 0.002 # 0.2% minimum profit after costs
MAX_POSITION_SIZE = 100000 # Maximum position size in USD
LATENCY_CEX_MS = 50 # Average CEX latency in milliseconds
LATENCY_DEX_MS = 1000 # Average DEX latency (block time)
# Asset configuration
ASSET_CLASSES = {
'crypto_spot': ['BTC', 'ETH', 'SOL', 'MATIC', 'AVAX'],
'crypto_futures': ['BTC-PERP', 'ETH-PERP', 'SOL-PERP'],
'fx_pairs': ['EUR/USD', 'GBP/USD', 'USD/JPY', 'AUD/USD'],
'equity_etfs': ['SPY', 'QQQ', 'IWM', 'EFA', 'EEM']
}
# Exchange configuration
EXCHANGES = {
'cex': ['Binance', 'Coinbase', 'Kraken', 'FTX'],
'dex': ['Uniswap_V3', 'SushiSwap', 'Curve', 'Balancer']
}
@dataclass
class OrderBook:
"""Order book data structure for managing bid/ask levels"""
exchange: str
asset: str
timestamp: datetime
bids: List[Tuple[float, float]] # List of (price, size) tuples
asks: List[Tuple[float, float]] # List of (price, size) tuples
def get_best_bid(self) -> Tuple[float, float]:
"""Get best bid price and size"""
return self.bids[0] if self.bids else (0.0, 0.0)
def get_best_ask(self) -> Tuple[float, float]:
"""Get best ask price and size"""
return self.asks[0] if self.asks else (float('inf'), 0.0)
def get_mid_price(self) -> float:
"""Calculate mid price from best bid and ask"""
bid, _ = self.get_best_bid()
ask, _ = self.get_best_ask()
return (bid + ask) / 2 if bid > 0 and ask < float('inf') else 0.0
@dataclass
class ArbitrageOpportunity:
"""Data structure for storing identified arbitrage opportunities"""
opportunity_id: str
asset: str
buy_exchange: str
sell_exchange: str
buy_price: float
sell_price: float
max_size: float
expected_profit: float
expected_profit_pct: float
latency_risk: float
timestamp: datetime
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ExecutionResult:
"""Data structure for storing execution results"""
opportunity_id: str
success: bool
executed_size: float
buy_fill_price: float
sell_fill_price: float
realized_profit: float
slippage: float
latency_ms: float
gas_cost: float
timestamp: datetime
class NumericalTransformer(nn.Module):
"""Transformer architecture adapted for numerical price prediction with training capability"""
def __init__(self, input_dim: int = 10, hidden_dim: int = 128,
num_heads: int = 8, num_layers: int = 4,
prediction_horizon: int = 5):
super().__init__()
self.input_dim = input_dim
self.hidden_dim = hidden_dim
self.prediction_horizon = prediction_horizon
# Input projection layer
self.input_projection = nn.Linear(input_dim, hidden_dim)
# Positional encoding for sequence position information
self.positional_encoding = self._create_positional_encoding(1000, hidden_dim)
# Transformer encoder for feature extraction
encoder_layer = nn.TransformerEncoderLayer(
d_model=hidden_dim,
nhead=num_heads,
dim_feedforward=hidden_dim * 4,
dropout=0.1,
activation='gelu',
batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
# Output heads for different prediction targets
self.price_head = nn.Linear(hidden_dim, prediction_horizon)
self.volatility_head = nn.Linear(hidden_dim, prediction_horizon)
self.volume_head = nn.Linear(hidden_dim, prediction_horizon)
self.uncertainty_head = nn.Linear(hidden_dim, prediction_horizon)
# Initialize weights
self._init_weights()
def _init_weights(self):
"""Initialize model weights using Xavier initialization"""
for module in self.modules():
if isinstance(module, nn.Linear):
nn.init.xavier_uniform_(module.weight)
if module.bias is not None:
nn.init.zeros_(module.bias)
def _create_positional_encoding(self, max_len: int, d_model: int) -> nn.Parameter:
"""Create sinusoidal positional encoding"""
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() *
(-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return nn.Parameter(pe.unsqueeze(0), requires_grad=False)
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> Dict[str, torch.Tensor]:
"""
Forward pass through the transformer
Args:
x: Input tensor of shape (batch_size, seq_len, input_dim)
mask: Optional attention mask
Returns:
Dictionary containing price, volatility, volume predictions and uncertainty
"""
batch_size, seq_len, _ = x.shape
# Project input to hidden dimension
x = self.input_projection(x)
# Add positional encoding
x = x + self.positional_encoding[:, :seq_len, :]
# Pass through transformer encoder
encoded = self.transformer(x, src_key_padding_mask=mask)
# Use last sequence position for prediction
last_hidden = encoded[:, -1, :]
# Generate predictions from specialized heads
price_pred = self.price_head(last_hidden)
volatility_pred = torch.sigmoid(self.volatility_head(last_hidden)) * 0.1 # Scale to [0, 0.1]
volume_pred = torch.exp(self.volume_head(last_hidden)) # Ensure positive
uncertainty = torch.sigmoid(self.uncertainty_head(last_hidden))
return {
'price': price_pred,
'volatility': volatility_pred,
'volume': volume_pred,
'uncertainty': uncertainty
}
class PriceForecastingEngine:
"""Engine for multi-asset price forecasting using transformers with actual training"""
def __init__(self):
self.models = {} # One model per asset class
self.scalers = {} # Feature scalers
self.training_history = defaultdict(list)
self.is_trained = defaultdict(bool)
# Initialize models for each asset class
for asset_class in ASSET_CLASSES.keys():
self.models[asset_class] = NumericalTransformer()
self.scalers[asset_class] = StandardScaler()
self.is_trained[asset_class] = False
def prepare_features(self, price_data: pd.DataFrame) -> np.ndarray:
"""Prepare features for transformer input"""
features = []
# Price features
features.append(price_data['close'].values)
features.append(price_data['high'].values)
features.append(price_data['low'].values)
# Volume features (log-transformed)
features.append(np.log1p(price_data['volume'].values))
# Technical indicators
returns = price_data['close'].pct_change().fillna(0)
features.append(returns.values)
# Moving averages
ma_7 = price_data['close'].rolling(7).mean().fillna(method='bfill')
ma_21 = price_data['close'].rolling(21).mean().fillna(method='bfill')
features.append((price_data['close'] / ma_7 - 1).values)
features.append((price_data['close'] / ma_21 - 1).values)
# Volatility (rolling standard deviation)
volatility = returns.rolling(20).std().fillna(method='bfill')
features.append(volatility.values)
# RSI (Relative Strength Index)
rsi = self.calculate_rsi(price_data['close'])
features.append(rsi.values / 100) # Normalize to [0, 1]
# Order flow imbalance (simulated for this example)
ofi = np.random.normal(0, 0.1, len(price_data))
features.append(ofi)
# Stack features into matrix
feature_matrix = np.column_stack(features)
return feature_matrix
def calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series:
"""Calculate Relative Strength Index"""
delta = prices.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
return rsi.fillna(50)
def create_sequences(self, features: np.ndarray, targets: np.ndarray,
seq_len: int = 50, horizon: int = 5) -> Tuple[np.ndarray, np.ndarray]:
"""Create sequences for training the transformer"""
sequences = []
target_sequences = []
for i in range(seq_len, len(features) - horizon):
sequences.append(features[i-seq_len:i])
target_sequences.append(targets[i:i+horizon])
return np.array(sequences), np.array(target_sequences)
def train_model(self, asset_class: str, price_data: pd.DataFrame,
epochs: int = 50, batch_size: int = 32):
"""Train the transformer model on historical data"""
if len(price_data) < 100:
return # Not enough data to train
# Prepare features
features = self.prepare_features(price_data)
# Scale features
features_scaled = self.scalers[asset_class].fit_transform(features)
# Prepare targets (future returns)
returns = price_data['close'].pct_change().fillna(0).values
# Create sequences
X, y = self.create_sequences(features_scaled, returns)
if len(X) == 0:
return # Not enough sequences
# Convert to tensors
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y)
# Create data loader
dataset = torch.utils.data.TensorDataset(X_tensor, y_tensor)
loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Setup training
model = self.models[asset_class]
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
# Training loop
model.train()
for epoch in range(epochs):
epoch_loss = 0.0
for batch_X, batch_y in loader:
optimizer.zero_grad()
# Forward pass
predictions = model(batch_X)
# Calculate loss (using price predictions)
loss = criterion(predictions['price'], batch_y)
# Backward pass
loss.backward()
optimizer.step()
epoch_loss += loss.item()
# Record training history
avg_loss = epoch_loss / len(loader)
self.training_history[asset_class].append(avg_loss)
# Mark as trained
self.is_trained[asset_class] = True
model.eval()
def forecast_prices(self, asset: str, price_history: pd.DataFrame,
horizon: int = 5) -> Dict[str, np.ndarray]:
"""Generate price forecasts using transformer model"""
# Determine asset class
asset_class = self._get_asset_class(asset)
if not asset_class:
return self._generate_random_forecast(horizon)
# Train model if not already trained
if not self.is_trained[asset_class] and len(price_history) > 100:
self.train_model(asset_class, price_history)
# Prepare features
features = self.prepare_features(price_history)
# Scale features
features_scaled = self.scalers[asset_class].fit_transform(features)
# Create sequences
seq_len = min(50, len(features_scaled))
if len(features_scaled) < seq_len:
# Pad if necessary
padding = seq_len - len(features_scaled)
features_scaled = np.vstack([
np.zeros((padding, features_scaled.shape[1])),
features_scaled
])
# Get recent sequence
sequence = features_scaled[-seq_len:].reshape(1, seq_len, -1)
sequence_tensor = torch.FloatTensor(sequence)
# Generate forecast
model = self.models[asset_class]
model.eval()
with torch.no_grad():
predictions = model(sequence_tensor)
# Extract predictions
current_price = price_history['close'].iloc[-1]
# Convert relative predictions to absolute prices
price_changes = predictions['price'].numpy()[0]
price_forecast = current_price * (1 + price_changes * 0.01) # Scale predictions
return {
'price': price_forecast,
'volatility': predictions['volatility'].numpy()[0],
'volume': predictions['volume'].numpy()[0],
'uncertainty': predictions['uncertainty'].numpy()[0]
}
def _get_asset_class(self, asset: str) -> Optional[str]:
"""Determine asset class for a given asset"""
for asset_class, assets in ASSET_CLASSES.items():
if asset in assets or any(asset.startswith(a) for a in assets):
return asset_class
return None
def _generate_random_forecast(self, horizon: int) -> Dict[str, np.ndarray]:
"""Generate random forecast as fallback"""
return {
'price': np.random.normal(100, 2, horizon),
'volatility': np.random.uniform(0.01, 0.05, horizon),
'volume': np.random.lognormal(15, 0.5, horizon),
'uncertainty': np.random.uniform(0.3, 0.7, horizon)
}
class ExchangeSimulator:
"""Simulate exchange order books and execution with realistic market dynamics"""
def __init__(self, exchange_type: str = 'cex'):
self.exchange_type = exchange_type
self.order_books = {}
self.latency_ms = LATENCY_CEX_MS if exchange_type == 'cex' else LATENCY_DEX_MS
self.transaction_cost = TRANSACTION_COST_CEX if exchange_type == 'cex' else TRANSACTION_COST_DEX
def generate_order_book(self, asset: str, mid_price: float,
spread_bps: float = 10, market_conditions: Dict[str, Any] = None) -> OrderBook:
"""Generate realistic order book with dynamic spread based on market conditions"""
# Adjust spread based on market conditions
if market_conditions:
volatility = market_conditions.get('volatility', 0.02)
liquidity = market_conditions.get('liquidity', 1.0)
spread_bps *= (1 + volatility * 10) * (2 - liquidity)
spread = mid_price * spread_bps / 10000
# Generate bid/ask levels with realistic depth
n_levels = 10
bids = []
asks = []
for i in range(n_levels):
# Bid side
bid_price = mid_price - spread/2 - i * spread/10
bid_size = np.random.lognormal(10, 1) * (n_levels - i) / n_levels
bids.append((bid_price, bid_size))
# Ask side
ask_price = mid_price + spread/2 + i * spread/10
ask_size = np.random.lognormal(10, 1) * (n_levels - i) / n_levels
asks.append((ask_price, ask_size))
return OrderBook(
exchange=self.exchange_type,
asset=asset,
timestamp=datetime.now(),
bids=bids,
asks=asks
)
def simulate_market_impact(self, size: float, liquidity: float) -> float:
"""Calculate market impact using square-root model"""
# Almgren-Chriss square-root market impact model
impact_bps = 10 * np.sqrt(size / liquidity)
return impact_bps / 10000
def execute_order(self, order_book: OrderBook, side: str,
size: float) -> Tuple[float, float]:
"""
Simulate order execution with realistic slippage
Returns: (fill_price, actual_size)
"""
filled_size = 0
total_cost = 0
if side == 'buy':
# Execute against asks
for ask_price, ask_size in order_book.asks:
if filled_size >= size:
break
fill_amount = min(size - filled_size, ask_size)
filled_size += fill_amount
total_cost += fill_amount * ask_price
else: # sell
# Execute against bids
for bid_price, bid_size in order_book.bids:
if filled_size >= size:
break
fill_amount = min(size - filled_size, bid_size)
filled_size += fill_amount
total_cost += fill_amount * bid_price
# Calculate average fill price
avg_fill_price = total_cost / filled_size if filled_size > 0 else 0
# Add market impact
liquidity = sum(s for _, s in order_book.bids) + sum(s for _, s in order_book.asks)
impact = self.simulate_market_impact(filled_size, liquidity)
if side == 'buy':
avg_fill_price *= (1 + impact)
else:
avg_fill_price *= (1 - impact)
return avg_fill_price, filled_size
class ArbitrageDetector:
"""Detect arbitrage opportunities across venues with advanced filtering"""
def __init__(self):
self.opportunity_history = []
self.min_profit_threshold = MIN_PROFIT_THRESHOLD
def find_opportunities(self, order_books: Dict[str, Dict[str, OrderBook]],
transaction_costs: Dict[str, float],
forecasts: Dict[str, Dict[str, np.ndarray]] = None) -> List[ArbitrageOpportunity]:
"""Find arbitrage opportunities across all venues with forecast integration"""
opportunities = []
# Check each asset
for asset in self._get_all_assets(order_books):
asset_books = self._get_asset_order_books(order_books, asset)
if len(asset_books) < 2:
continue
# Find best bid and ask across all exchanges
best_bid_exchange, best_bid_price, best_bid_size = self._find_best_bid(asset_books)
best_ask_exchange, best_ask_price, best_ask_size = self._find_best_ask(asset_books)
if best_bid_price > best_ask_price:
# Calculate potential profit
max_size = min(best_bid_size, best_ask_size,
MAX_POSITION_SIZE / best_ask_price)
# Calculate costs
buy_cost = transaction_costs.get(best_ask_exchange, TRANSACTION_COST_CEX)
sell_cost = transaction_costs.get(best_bid_exchange, TRANSACTION_COST_CEX)
# Add gas cost for DEX
gas_cost = 0
if 'dex' in best_ask_exchange.lower() or 'dex' in best_bid_exchange.lower():
gas_cost = GAS_COST_USD
# Calculate profit
gross_profit = (best_bid_price - best_ask_price) * max_size
total_cost = (buy_cost + sell_cost) * best_ask_price * max_size + gas_cost
net_profit = gross_profit - total_cost
profit_pct = net_profit / (best_ask_price * max_size) if max_size > 0 else 0
# Adjust for price forecast if available
if forecasts and asset in forecasts:
price_forecast = forecasts[asset]['price'][0]
forecast_adjustment = (price_forecast - best_ask_price) / best_ask_price
profit_pct += forecast_adjustment * 0.5 # Weight forecast impact
if profit_pct > self.min_profit_threshold:
opportunity = ArbitrageOpportunity(
opportunity_id=f"{asset}_{datetime.now().strftime('%Y%m%d_%H%M%S%f')}",
asset=asset,
buy_exchange=best_ask_exchange,
sell_exchange=best_bid_exchange,
buy_price=best_ask_price,
sell_price=best_bid_price,
max_size=max_size,
expected_profit=net_profit,
expected_profit_pct=profit_pct,
latency_risk=self._calculate_latency_risk(
best_ask_exchange, best_bid_exchange
),
timestamp=datetime.now(),
metadata={
'spread': best_bid_price - best_ask_price,
'gas_cost': gas_cost,
'transaction_costs': buy_cost + sell_cost,
'forecast_impact': forecast_adjustment if forecasts and asset in forecasts else 0
}
)
opportunities.append(opportunity)
self.opportunity_history.append(opportunity)
return opportunities
def _get_all_assets(self, order_books: Dict[str, Dict[str, OrderBook]]) -> set:
"""Get all unique assets across exchanges"""
assets = set()
for exchange_books in order_books.values():
assets.update(exchange_books.keys())
return assets
def _get_asset_order_books(self, order_books: Dict[str, Dict[str, OrderBook]],
asset: str) -> Dict[str, OrderBook]:
"""Get order books for specific asset across exchanges"""
asset_books = {}
for exchange, books in order_books.items():
if asset in books:
asset_books[exchange] = books[asset]
return asset_books
def _find_best_bid(self, asset_books: Dict[str, OrderBook]) -> Tuple[str, float, float]:
"""Find best bid across exchanges"""
best_exchange = None
best_price = 0
best_size = 0
for exchange, book in asset_books.items():
bid_price, bid_size = book.get_best_bid()
if bid_price > best_price:
best_exchange = exchange
best_price = bid_price
best_size = bid_size
return best_exchange, best_price, best_size
def _find_best_ask(self, asset_books: Dict[str, OrderBook]) -> Tuple[str, float, float]:
"""Find best ask across exchanges"""
best_exchange = None
best_price = float('inf')
best_size = 0
for exchange, book in asset_books.items():
ask_price, ask_size = book.get_best_ask()
if ask_price < best_price:
best_exchange = exchange
best_price = ask_price
best_size = ask_size
return best_exchange, best_price, best_size
def _calculate_latency_risk(self, buy_exchange: str, sell_exchange: str) -> float:
"""Calculate latency risk score (0-1)"""
# Higher risk for cross-exchange type arbitrage
if ('dex' in buy_exchange.lower()) != ('dex' in sell_exchange.lower()):
return 0.8 # High risk due to different settlement times
elif 'dex' in buy_exchange.lower():
return 0.6 # Medium risk for DEX-DEX
else:
return 0.3 # Lower risk for CEX-CEX
class LLMStrategyOptimizer:
"""LLM-inspired strategy parameter optimization with machine learning"""
def __init__(self):
self.parameter_history = defaultdict(list)
self.performance_history = []
self.current_parameters = self._get_default_parameters()
self.optimization_model = self._build_optimization_model()
def _get_default_parameters(self) -> Dict[str, Any]:
"""Get default strategy parameters"""
return {
'min_profit_threshold': 0.002,
'max_position_size': 100000,
'risk_limit': 0.02, # 2% per trade
'correlation_threshold': 0.7,
'rebalance_frequency': 300, # seconds
'latency_buffer': 1.5, # multiplier for latency estimates
'confidence_threshold': 0.6,
'max_concurrent_trades': 5
}
def _build_optimization_model(self) -> nn.Module:
"""Build neural network for parameter optimization"""
class ParameterOptimizer(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(20, 64) # Input features
self.fc2 = nn.Linear(64, 32)
self.fc3 = nn.Linear(32, 8) # Output parameters
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.sigmoid(self.fc3(x))
return x
return ParameterOptimizer()
def generate_parameter_suggestions(self,
recent_performance: List[ExecutionResult],
market_conditions: Dict[str, Any]) -> Dict[str, Any]:
"""Generate parameter adjustments using ML-based optimization"""
suggestions = self.current_parameters.copy()
if not recent_performance:
return suggestions
# Extract performance features
success_rate = sum(1 for r in recent_performance if r.success) / len(recent_performance)
avg_slippage = np.mean([r.slippage for r in recent_performance])
avg_profit = np.mean([r.realized_profit for r in recent_performance])
profit_variance = np.var([r.realized_profit for r in recent_performance])
# Create feature vector
features = [
success_rate,
avg_slippage,
avg_profit / 1000, # Normalize
profit_variance / 1000000, # Normalize
market_conditions.get('volatility', 0.02),
market_conditions.get('liquidity', 1.0),
len(recent_performance) / 100, # Normalize
self.current_parameters['min_profit_threshold'],
self.current_parameters['max_position_size'] / 1000000,
self.current_parameters['risk_limit'],
self.current_parameters['correlation_threshold'],
self.current_parameters['rebalance_frequency'] / 3600,
self.current_parameters['latency_buffer'],
self.current_parameters['confidence_threshold'],
self.current_parameters['max_concurrent_trades'] / 10,
# Additional market features
market_conditions.get('max_volatility', 0.03),
float(datetime.now().hour) / 24, # Time of day
float(datetime.now().weekday()) / 7, # Day of week
0.5, # Placeholder for sentiment (would be real in production)
0.5 # Placeholder for market regime (would be real in production)
]
# Use ML model for optimization
feature_tensor = torch.FloatTensor([features])
with torch.no_grad():
adjustments = self.optimization_model(feature_tensor).numpy()[0]
# Apply ML-suggested adjustments
suggestions['min_profit_threshold'] = 0.001 + adjustments[0] * 0.009
suggestions['max_position_size'] = 10000 + adjustments[1] * 490000
suggestions['risk_limit'] = 0.005 + adjustments[2] * 0.045
suggestions['correlation_threshold'] = 0.5 + adjustments[3] * 0.4
suggestions['rebalance_frequency'] = 60 + adjustments[4] * 3540
suggestions['latency_buffer'] = 1.0 + adjustments[5] * 2.0
suggestions['confidence_threshold'] = 0.5 + adjustments[6] * 0.4
suggestions['max_concurrent_trades'] = int(1 + adjustments[7] * 9)
# Rule-based adjustments on top of ML
if success_rate < 0.7:
suggestions['min_profit_threshold'] *= 1.1
suggestions['confidence_threshold'] *= 1.05
if avg_slippage > 0.001:
suggestions['max_position_size'] *= 0.9
suggestions['latency_buffer'] *= 1.1
if avg_profit < 0:
suggestions['risk_limit'] *= 0.9
suggestions['max_concurrent_trades'] = max(1, suggestions['max_concurrent_trades'] - 1)
# Market condition adjustments
if market_conditions.get('volatility', 0) > 0.03:
suggestions['min_profit_threshold'] *= 1.2
suggestions['correlation_threshold'] *= 0.9
if market_conditions.get('liquidity', 1) < 0.5:
suggestions['max_position_size'] *= 0.7
# Ensure parameters stay within reasonable bounds
suggestions = self._apply_parameter_bounds(suggestions)
# Store suggestions
self.parameter_history['suggestions'].append({
'timestamp': datetime.now(),
'parameters': suggestions,
'reasoning': self._generate_reasoning(recent_performance, market_conditions),
'performance_metrics': {
'success_rate': success_rate,
'avg_slippage': avg_slippage,
'avg_profit': avg_profit
}
})
self.current_parameters = suggestions
return suggestions
def _apply_parameter_bounds(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Apply bounds to parameters"""
bounds = {
'min_profit_threshold': (0.001, 0.01),
'max_position_size': (10000, 500000),
'risk_limit': (0.005, 0.05),
'correlation_threshold': (0.5, 0.9),
'rebalance_frequency': (60, 3600),
'latency_buffer': (1.0, 3.0),
'confidence_threshold': (0.5, 0.9),
'max_concurrent_trades': (1, 10)
}
bounded = parameters.copy()
for param, (min_val, max_val) in bounds.items():
if param in bounded:
bounded[param] = max(min_val, min(max_val, bounded[param]))
return bounded
def _generate_reasoning(self, performance: List[ExecutionResult],
market_conditions: Dict[str, Any]) -> str:
"""Generate reasoning for parameter adjustments"""
reasons = []
if performance:
success_rate = sum(1 for r in performance if r.success) / len(performance)
if success_rate < 0.7:
reasons.append("Low success rate detected - increasing selectivity")
avg_slippage = np.mean([r.slippage for r in performance])
if avg_slippage > 0.001:
reasons.append("High slippage observed - adjusting execution parameters")
avg_profit = np.mean([r.realized_profit for r in performance])
if avg_profit < 0:
reasons.append("Negative average profit - tightening risk controls")
if market_conditions.get('volatility', 0) > 0.03:
reasons.append("Elevated market volatility - implementing conservative measures")
if market_conditions.get('liquidity', 1) < 0.5:
reasons.append("Reduced liquidity conditions - scaling down position sizes")
return "; ".join(reasons) if reasons else "Standard market conditions"
class RiskAnalytics:
"""Comprehensive risk analytics system with advanced metrics"""
def __init__(self):
self.position_history = []
self.var_confidence = 0.95
self.risk_metrics_history = []
self.correlation_matrix = None
def calculate_var(self, returns: np.ndarray, confidence: float = 0.95) -> float:
"""Calculate Value at Risk using historical simulation"""
if len(returns) < 20:
return 0.02 # Default 2% VaR
return np.percentile(returns, (1 - confidence) * 100)
def calculate_cvar(self, returns: np.ndarray, confidence: float = 0.95) -> float:
"""Calculate Conditional Value at Risk (Expected Shortfall)"""
var = self.calculate_var(returns, confidence)
return returns[returns <= var].mean()
def calculate_sharpe_ratio(self, returns: np.ndarray) -> float:
"""Calculate Sharpe ratio"""
if len(returns) < 2:
return 0.0
excess_returns = returns - RISK_FREE_RATE / TRADING_DAYS_PER_YEAR
return np.sqrt(TRADING_DAYS_PER_YEAR) * excess_returns.mean() / (returns.std() + 1e-8)
def calculate_sortino_ratio(self, returns: np.ndarray) -> float:
"""Calculate Sortino ratio (downside deviation)"""
if len(returns) < 2:
return 0.0
excess_returns = returns - RISK_FREE_RATE / TRADING_DAYS_PER_YEAR
downside_returns = returns[returns < 0]
if len(downside_returns) == 0:
return float('inf') # No downside risk
downside_std = np.std(downside_returns)
return np.sqrt(TRADING_DAYS_PER_YEAR) * excess_returns.mean() / (downside_std + 1e-8)
def calculate_max_drawdown(self, equity_curve: np.ndarray) -> float:
"""Calculate maximum drawdown"""
peak = np.maximum.accumulate(equity_curve)
drawdown = (peak - equity_curve) / peak
return np.max(drawdown)
def calculate_calmar_ratio(self, returns: np.ndarray, equity_curve: np.ndarray) -> float:
"""Calculate Calmar ratio (return / max drawdown)"""
max_dd = self.calculate_max_drawdown(equity_curve)
if max_dd == 0:
return float('inf')
annual_return = returns.mean() * TRADING_DAYS_PER_YEAR
return annual_return / max_dd
def analyze_position_risk(self, positions: List[Dict[str, Any]],
market_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Analyze risk for current positions with comprehensive metrics"""
if not positions:
return self._empty_risk_metrics()
# Calculate position values and correlations
position_values = []
position_returns = []
for position in positions:
asset = position['asset']
size = position['size']
if asset in market_data:
price = market_data[asset]['close'].iloc[-1]
value = size * price
position_values.append(value)
returns = market_data[asset]['close'].pct_change().dropna()
position_returns.append(returns)
total_value = sum(position_values)
# Calculate portfolio metrics
if position_returns:
# Create weighted portfolio returns
weights = np.array(position_values) / total_value
portfolio_returns = np.zeros(len(position_returns[0]))
for i, (weight, returns) in enumerate(zip(weights, position_returns)):
portfolio_returns += weight * returns.values
# Calculate all risk metrics
var = self.calculate_var(portfolio_returns)
cvar = self.calculate_cvar(portfolio_returns)
sharpe = self.calculate_sharpe_ratio(portfolio_returns)
sortino = self.calculate_sortino_ratio(portfolio_returns)
# Build equity curve
equity_curve = (1 + portfolio_returns).cumprod()
max_dd = self.calculate_max_drawdown(equity_curve)
calmar = self.calculate_calmar_ratio(portfolio_returns, equity_curve)
# Calculate correlation matrix
returns_df = pd.DataFrame({
f'asset_{i}': returns.values
for i, returns in enumerate(position_returns)
})
correlation_matrix = returns_df.corr()
self.correlation_matrix = correlation_matrix
avg_correlation = correlation_matrix.values[np.triu_indices_from(
correlation_matrix.values, k=1)].mean()
else:
var = cvar = sharpe = sortino = max_dd = calmar = avg_correlation = 0
# Calculate additional risk metrics
herfindahl_index = sum((v/total_value)**2 for v in position_values) if total_value > 0 else 0
risk_metrics = {
'total_exposure': total_value,
'var_95': var,
'cvar_95': cvar,
'sharpe_ratio': sharpe,
'sortino_ratio': sortino,
'max_drawdown': max_dd,
'calmar_ratio': calmar,
'position_count': len(positions),
'avg_correlation': avg_correlation,
'concentration_risk': max(position_values) / total_value if total_value > 0 else 0,
'herfindahl_index': herfindahl_index,
'timestamp': datetime.now()
}
self.risk_metrics_history.append(risk_metrics)
return risk_metrics
def check_risk_limits(self, proposed_trade: ArbitrageOpportunity,
current_positions: List[Dict[str, Any]],
risk_parameters: Dict[str, Any]) -> Tuple[bool, str]:
"""Check if proposed trade violates risk limits"""
# Check position limit
position_value = proposed_trade.max_size * proposed_trade.buy_price
if position_value > risk_parameters['max_position_size']:
return False, "Position size exceeds limit"
# Check total exposure
current_exposure = sum(p['size'] * p['entry_price'] for p in current_positions)
if current_exposure + position_value > risk_parameters['max_position_size'] * 5:
return False, "Total exposure limit exceeded"
# Check concurrent trades
if len(current_positions) >= risk_parameters['max_concurrent_trades']:
return False, "Maximum concurrent trades reached"
# Check correlation with existing positions
same_asset_positions = [p for p in current_positions if p['asset'] == proposed_trade.asset]
if same_asset_positions:
return False, "Already have position in this asset"
# Check risk/reward ratio
if proposed_trade.expected_profit_pct < risk_parameters['min_profit_threshold']:
return False, "Profit below minimum threshold"
# Check latency risk
if proposed_trade.latency_risk > risk_parameters.get('max_latency_risk', 0.7):
return False, "Latency risk too high"
return True, "Risk checks passed"
def _empty_risk_metrics(self) -> Dict[str, Any]:
"""Return empty risk metrics"""
return {
'total_exposure': 0,
'var_95': 0,
'cvar_95': 0,
'sharpe_ratio': 0,
'sortino_ratio': 0,
'max_drawdown': 0,
'calmar_ratio': 0,
'position_count': 0,
'avg_correlation': 0,
'concentration_risk': 0,
'herfindahl_index': 0,
'timestamp': datetime.now()
}
class LatencyAwareExecutionEngine:
"""Execution engine with realistic latency simulation and smart routing"""
def __init__(self):
self.execution_history = []
self.latency_model = self._build_latency_model()
self.slippage_model = self._build_slippage_model()
self.execution_analytics = defaultdict(list)
def _build_latency_model(self) -> Dict[str, Dict[str, float]]:
"""Build latency model for different exchange pairs"""
return {
'cex_cex': {'mean': 100, 'std': 20}, # CEX to CEX
'cex_dex': {'mean': 1500, 'std': 500}, # CEX to DEX
'dex_dex': {'mean': 2000, 'std': 800}, # DEX to DEX
}
def _build_slippage_model(self) -> Dict[str, float]:
"""Build slippage model based on market conditions"""
return {
'low_volatility': 0.0005, # 5 bps
'normal': 0.001, # 10 bps
'high_volatility': 0.002, # 20 bps
'extreme': 0.005 # 50 bps
}
def simulate_execution(self, opportunity: ArbitrageOpportunity,
buy_exchange: ExchangeSimulator,
sell_exchange: ExchangeSimulator,
market_conditions: Dict[str, Any]) -> ExecutionResult:
"""Simulate order execution with realistic latency and slippage"""
# Determine exchange types
exchange_pair = self._get_exchange_pair_type(
opportunity.buy_exchange,
opportunity.sell_exchange
)
# Simulate latency
latency_params = self.latency_model[exchange_pair]
total_latency = np.random.normal(
latency_params['mean'],
latency_params['std']
)
total_latency = max(0, total_latency) # Ensure non-negative
# Determine market volatility regime
volatility_regime = self._get_volatility_regime(market_conditions)
base_slippage = self.slippage_model[volatility_regime]
# Calculate price movement during latency (correlated with volatility)
volatility = market_conditions.get('volatility', 0.02)
price_drift = np.random.normal(0, base_slippage * np.sqrt(total_latency / 1000) * (1 + volatility * 10))
# Simulate buy execution
buy_price_adjusted = opportunity.buy_price * (1 + price_drift)
buy_book = buy_exchange.generate_order_book(
opportunity.asset,
buy_price_adjusted,
market_conditions=market_conditions
)
buy_fill_price, buy_fill_size = buy_exchange.execute_order(
buy_book, 'buy', opportunity.max_size
)
# Simulate sell execution (with additional latency)
sell_latency = np.random.normal(50, 10)
price_drift_sell = np.random.normal(
0,
base_slippage * np.sqrt((total_latency + sell_latency) / 1000) * (1 + volatility * 10)
)
sell_price_adjusted = opportunity.sell_price * (1 - price_drift_sell)
sell_book = sell_exchange.generate_order_book(
opportunity.asset,
sell_price_adjusted,
market_conditions=market_conditions
)
sell_fill_price, sell_fill_size = sell_exchange.execute_order(
sell_book, 'sell', min(buy_fill_size, opportunity.max_size)
)
# Calculate realized profit
executed_size = min(buy_fill_size, sell_fill_size)
# Transaction costs
buy_cost = buy_exchange.transaction_cost * buy_fill_price * executed_size
sell_cost = sell_exchange.transaction_cost * sell_fill_price * executed_size
# Gas costs for DEX
gas_cost = 0
if 'dex' in opportunity.buy_exchange.lower():
gas_cost += GAS_COST_USD
if 'dex' in opportunity.sell_exchange.lower():
gas_cost += GAS_COST_USD
# Net profit calculation
gross_profit = (sell_fill_price - buy_fill_price) * executed_size
net_profit = gross_profit - buy_cost - sell_cost - gas_cost
# Calculate slippage
expected_profit = (opportunity.sell_price - opportunity.buy_price) * executed_size
slippage = (expected_profit - gross_profit) / expected_profit if expected_profit > 0 else 0
# Determine success based on profitability
success = net_profit > 0 and executed_size > 0
result = ExecutionResult(
opportunity_id=opportunity.opportunity_id,
success=success,
executed_size=executed_size,
buy_fill_price=buy_fill_price,
sell_fill_price=sell_fill_price,
realized_profit=net_profit,
slippage=slippage,
latency_ms=total_latency,
gas_cost=gas_cost,
timestamp=datetime.now()
)
self.execution_history.append(result)
# Track execution analytics
self.execution_analytics['asset'].append(opportunity.asset)
self.execution_analytics['exchange_pair'].append(exchange_pair)
self.execution_analytics['volatility_regime'].append(volatility_regime)
return result
def _get_exchange_pair_type(self, buy_exchange: str, sell_exchange: str) -> str:
"""Determine exchange pair type"""
buy_is_dex = 'dex' in buy_exchange.lower() or buy_exchange in EXCHANGES['dex']
sell_is_dex = 'dex' in sell_exchange.lower() or sell_exchange in EXCHANGES['dex']
if buy_is_dex and sell_is_dex:
return 'dex_dex'
elif not buy_is_dex and not sell_is_dex:
return 'cex_cex'
else:
return 'cex_dex'
def _get_volatility_regime(self, market_conditions: Dict[str, Any]) -> str:
"""Determine current volatility regime"""
volatility = market_conditions.get('volatility', 0.02)
if volatility < 0.015:
return 'low_volatility'
elif volatility < 0.03:
return 'normal'
elif volatility < 0.05:
return 'high_volatility'
else:
return 'extreme'
def optimize_execution_path(self, opportunities: List[ArbitrageOpportunity],
current_positions: List[Dict[str, Any]],
risk_parameters: Dict[str, Any]) -> List[ArbitrageOpportunity]:
"""Optimize execution order considering dependencies and risk"""
if not opportunities:
return []
# Score opportunities based on multiple factors
scored_opportunities = []
for opp in opportunities:
# Multi-factor scoring
profit_score = opp.expected_profit_pct
latency_penalty = opp.latency_risk * 0.5
size_score = min(opp.max_size * opp.buy_price / risk_parameters['max_position_size'], 1.0)
# Add forecast confidence if available
forecast_confidence = 1 - opp.metadata.get('forecast_uncertainty', 0.5)
# Combined score
total_score = profit_score * (1 - latency_penalty) * size_score * forecast_confidence
scored_opportunities.append((total_score, opp))
# Sort by score (highest first)
scored_opportunities.sort(key=lambda x: x[0], reverse=True)
# Select top opportunities that don't violate risk limits
selected = []
simulated_positions = current_positions.copy()
for score, opp in scored_opportunities:
# Simulate adding this position
can_add, reason = self._can_add_opportunity(
opp, simulated_positions, risk_parameters
)
if can_add:
selected.append(opp)
simulated_positions.append({
'asset': opp.asset,
'size': opp.max_size,
'entry_price': opp.buy_price
})
if len(selected) >= risk_parameters['max_concurrent_trades']:
break
return selected
def _can_add_opportunity(self, opportunity: ArbitrageOpportunity,
positions: List[Dict[str, Any]],
risk_parameters: Dict[str, Any]) -> Tuple[bool, str]:
"""Check if opportunity can be added to positions"""
# Check if already have position in asset
for pos in positions:
if pos['asset'] == opportunity.asset:
return False, "Already have position in asset"
# Check total exposure
current_exposure = sum(p['size'] * p['entry_price'] for p in positions)
new_exposure = opportunity.max_size * opportunity.buy_price
if current_exposure + new_exposure > risk_parameters['max_position_size'] * 5:
return False, "Would exceed total exposure limit"
return True, "OK"
class CrossAssetArbitrageEngine:
"""Main arbitrage engine coordinating all components"""
def __init__(self):
# Initialize components
self.price_forecaster = PriceForecastingEngine()
self.arbitrage_detector = ArbitrageDetector()
self.strategy_optimizer = LLMStrategyOptimizer()
self.risk_analytics = RiskAnalytics()
self.execution_engine = LatencyAwareExecutionEngine()
# Exchange simulators
self.exchanges = {}
for exchange in EXCHANGES['cex']:
self.exchanges[exchange] = ExchangeSimulator('cex')
for exchange in EXCHANGES['dex']:
self.exchanges[exchange] = ExchangeSimulator('dex')
# State management
self.active_positions = []
self.portfolio_value = 100000 # Starting capital
self.performance_history = []
self.market_data_cache = {}
self.forecasts_cache = {}
def generate_market_data(self, assets: List[str], days: int = 100) -> Dict[str, pd.DataFrame]:
"""Generate realistic correlated market data for multiple assets"""
market_data = {}
# Generate correlation matrix for assets
n_assets = len(assets)
correlation_matrix = np.eye(n_assets)
# Add correlations between assets
for i in range(n_assets):
for j in range(i+1, n_assets):
# Crypto assets are more correlated
if (assets[i] in ASSET_CLASSES['crypto_spot'] and
assets[j] in ASSET_CLASSES['crypto_spot']):
corr = np.random.uniform(0.6, 0.9)
# FX pairs have moderate correlation
elif (assets[i] in ASSET_CLASSES['fx_pairs'] and
assets[j] in ASSET_CLASSES['fx_pairs']):
corr = np.random.uniform(0.3, 0.6)
# Different asset classes have low correlation
else:
corr = np.random.uniform(-0.2, 0.3)
correlation_matrix[i, j] = corr
correlation_matrix[j, i] = corr
# Generate correlated returns
mean_returns = np.zeros(n_assets)
volatilities = []
for asset in assets:
if asset in ASSET_CLASSES['crypto_spot']:
volatilities.append(0.015) # Higher volatility
elif asset in ASSET_CLASSES['fx_pairs']:
volatilities.append(0.005) # Lower volatility
else:
volatilities.append(0.01) # Medium volatility
cov_matrix = np.outer(volatilities, volatilities) * correlation_matrix
# Generate returns
returns = np.random.multivariate_normal(mean_returns, cov_matrix, days)
# Generate price data for each asset
for i, asset in enumerate(assets):
# Base price
if asset in ASSET_CLASSES['crypto_spot']:
base_price = {'BTC': 45000, 'ETH': 3000, 'SOL': 100}.get(asset, 50)
elif asset in ASSET_CLASSES['equity_etfs']:
base_price = {'SPY': 450, 'QQQ': 380}.get(asset, 100)
else:
base_price = 1.0 # FX pairs
# Generate prices from returns
prices = base_price * np.exp(np.cumsum(returns[:, i]))
# Generate OHLCV data
dates = pd.date_range(end=datetime.now(), periods=days, freq='H')
data = pd.DataFrame({
'open': prices * (1 + np.random.normal(0, 0.002, days)),
'high': prices * (1 + np.abs(np.random.normal(0, 0.005, days))),
'low': prices * (1 - np.abs(np.random.normal(0, 0.005, days))),
'close': prices,
'volume': np.random.lognormal(15, 0.5, days)
}, index=dates)
# Ensure OHLC consistency
data['high'] = data[['open', 'high', 'close']].max(axis=1)
data['low'] = data[['open', 'low', 'close']].min(axis=1)
market_data[asset] = data
self.market_data_cache = market_data
return market_data
def update_order_books(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, OrderBook]]:
"""Generate current order books for all exchanges"""
order_books = defaultdict(dict)
# Get current market conditions
market_conditions = self.calculate_market_conditions(market_data)
for asset, data in market_data.items():
current_price = data['close'].iloc[-1]
# Generate order books for each exchange
for exchange_name, exchange in self.exchanges.items():
# Add price variation between exchanges
price_variation = np.random.normal(0, 0.0005)
adjusted_price = current_price * (1 + price_variation)
# Vary spread based on exchange type and market conditions
base_spread = 5 if exchange.exchange_type == 'cex' else 15
volatility_adjustment = 1 + market_conditions['volatility'] * 20
spread_bps = base_spread * volatility_adjustment
order_book = exchange.generate_order_book(
asset, adjusted_price, spread_bps, market_conditions
)
order_books[exchange_name][asset] = order_book
return dict(order_books)
def calculate_market_conditions(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""Calculate current market conditions"""
volatilities = []
volumes = []
spreads = []
for asset, data in market_data.items():
returns = data['close'].pct_change().dropna()
# Calculate volatility (annualized)
volatility = returns.iloc[-24:].std() * np.sqrt(365 * 24)
volatilities.append(volatility)
# Calculate average volume
avg_volume = data['volume'].iloc[-24:].mean()
volumes.append(avg_volume)
# Calculate spread proxy
spread = (data['high'] - data['low']).iloc[-24:].mean() / data['close'].iloc[-24:].mean()
spreads.append(spread)
return {
'volatility': np.mean(volatilities),
'max_volatility': np.max(volatilities),
'liquidity': np.mean(volumes) / 1e6, # Normalize
'avg_spread': np.mean(spreads),
'timestamp': datetime.now()
}
def generate_price_forecasts(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, np.ndarray]]:
"""Generate price forecasts for all assets"""
forecasts = {}
for asset, data in market_data.items():
forecast = self.price_forecaster.forecast_prices(asset, data)
forecasts[asset] = forecast
self.forecasts_cache = forecasts
return forecasts
def run_arbitrage_cycle(self) -> Dict[str, Any]:
"""Run complete arbitrage detection and execution cycle"""
# Get current market data
if not self.market_data_cache:
assets = []
for asset_class, asset_list in ASSET_CLASSES.items():
assets.extend(asset_list[:2]) # Use first 2 from each class
self.market_data_cache = self.generate_market_data(assets)
market_data = self.market_data_cache
# Generate price forecasts
forecasts = self.generate_price_forecasts(market_data)
# Update order books
order_books = self.update_order_books(market_data)
# Calculate market conditions
market_conditions = self.calculate_market_conditions(market_data)
# Update strategy parameters based on recent performance
recent_executions = self.execution_engine.execution_history[-20:]
strategy_params = self.strategy_optimizer.generate_parameter_suggestions(
recent_executions, market_conditions
)
# Find arbitrage opportunities with forecast integration
transaction_costs = {
exchange: sim.transaction_cost
for exchange, sim in self.exchanges.items()
}
opportunities = self.arbitrage_detector.find_opportunities(
order_books, transaction_costs, forecasts
)
# Filter based on strategy parameters
filtered_opportunities = [
opp for opp in opportunities
if opp.expected_profit_pct >= strategy_params['min_profit_threshold']
]
# Risk analysis
risk_metrics = self.risk_analytics.analyze_position_risk(
self.active_positions, market_data
)
# Optimize execution order
selected_opportunities = self.execution_engine.optimize_execution_path(
filtered_opportunities, self.active_positions, strategy_params
)
# Execute selected opportunities
execution_results = []
for opportunity in selected_opportunities:
# Final risk check
can_execute, reason = self.risk_analytics.check_risk_limits(
opportunity, self.active_positions, strategy_params
)
if can_execute:
# Execute trade
buy_exchange = self.exchanges[opportunity.buy_exchange]
sell_exchange = self.exchanges[opportunity.sell_exchange]
result = self.execution_engine.simulate_execution(
opportunity, buy_exchange, sell_exchange, market_conditions
)
execution_results.append(result)
# Update positions if successful
if result.success:
self.active_positions.append({
'asset': opportunity.asset,
'size': result.executed_size,
'entry_price': result.buy_fill_price,
'exit_price': result.sell_fill_price,
'profit': result.realized_profit,
'timestamp': result.timestamp
})
# Update portfolio value
self.portfolio_value += result.realized_profit
# Clean up completed positions (for this simulation, all arb trades complete immediately)
self.active_positions = [p for p in self.active_positions
if (datetime.now() - p['timestamp']).seconds < 300]
# Store performance metrics
cycle_summary = {
'timestamp': datetime.now(),
'opportunities_found': len(opportunities),
'opportunities_executed': len(execution_results),
'successful_executions': sum(1 for r in execution_results if r.success),
'total_profit': sum(r.realized_profit for r in execution_results),
'portfolio_value': self.portfolio_value,
'risk_metrics': risk_metrics,
'market_conditions': market_conditions,
'strategy_parameters': strategy_params
}
self.performance_history.append(cycle_summary)
return cycle_summary
# Visualization functions
def create_opportunity_network(opportunities: List[ArbitrageOpportunity]) -> go.Figure:
"""Create network visualization of arbitrage opportunities"""
# Create graph
G = nx.Graph()
# Add nodes and edges
for opp in opportunities:
G.add_edge(
opp.buy_exchange,
opp.sell_exchange,
weight=opp.expected_profit_pct,
asset=opp.asset
)
if len(G.nodes()) == 0:
# Empty graph
fig = go.Figure()
fig.add_annotation(
text="No arbitrage opportunities found",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False
)
return fig
# Calculate layout
pos = nx.spring_layout(G, k=2, iterations=50)
# Create edge trace
edge_trace = []
for edge in G.edges(data=True):
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_trace.append(go.Scatter(
x=[x0, x1, None],
y=[y0, y1, None],
mode='lines',
line=dict(
width=edge[2]['weight'] * 100,
color='rgba(125,125,125,0.5)'
),
hoverinfo='text',
text=f"{edge[2]['asset']}: {edge[2]['weight']*100:.2f}%"
))
# Create node trace
node_trace = go.Scatter(
x=[pos[node][0] for node in G.nodes()],
y=[pos[node][1] for node in G.nodes()],
mode='markers+text',
text=[node for node in G.nodes()],
textposition="top center",
marker=dict(
size=20,
color=['red' if 'dex' in node.lower() else 'blue' for node in G.nodes()],
line=dict(color='darkgray', width=2)
),
hoverinfo='text'
)
# Create figure
fig = go.Figure(data=edge_trace + [node_trace])
fig.update_layout(
title="Arbitrage Opportunity Network",
showlegend=False,
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
height=500
)
return fig
def create_performance_dashboard(performance_history: List[Dict[str, Any]]) -> go.Figure:
"""Create comprehensive performance dashboard"""
if not performance_history:
fig = go.Figure()
fig.add_annotation(
text="No performance data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False
)
return fig
# Convert to DataFrame
perf_df = pd.DataFrame(performance_history)
# Create subplots
fig = make_subplots(
rows=3, cols=2,
subplot_titles=(
'Portfolio Value', 'Profit per Cycle',
'Success Rate', 'Risk Metrics',
'Opportunities vs Executions', 'Market Conditions'
),
specs=[
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "scatter"}, {"type": "scatter"}],
[{"type": "bar"}, {"type": "scatter"}]
],
vertical_spacing=0.1,
horizontal_spacing=0.1
)
# Portfolio value
fig.add_trace(
go.Scatter(
x=perf_df['timestamp'],
y=perf_df['portfolio_value'],
mode='lines',
name='Portfolio Value',
line=dict(color='blue', width=2)
),
row=1, col=1
)
# Profit per cycle
fig.add_trace(
go.Scatter(
x=perf_df['timestamp'],
y=perf_df['total_profit'],
mode='lines+markers',
name='Profit',
line=dict(color='green')
),
row=1, col=2
)
# Success rate
perf_df['success_rate'] = perf_df.apply(
lambda x: x['successful_executions'] / x['opportunities_executed'] if x['opportunities_executed'] > 0 else 0,
axis=1
)
fig.add_trace(
go.Scatter(
x=perf_df['timestamp'],
y=perf_df['success_rate'],
mode='lines',
name='Success Rate',
line=dict(color='orange')
),
row=2, col=1
)
# Risk metrics (Sharpe ratio)
sharpe_values = [m['sharpe_ratio'] for m in perf_df['risk_metrics']]
fig.add_trace(
go.Scatter(
x=perf_df['timestamp'],
y=sharpe_values,
mode='lines',
name='Sharpe Ratio',
line=dict(color='purple')
),
row=2, col=2
)
# Opportunities vs Executions
fig.add_trace(
go.Bar(
x=perf_df['timestamp'],
y=perf_df['opportunities_found'],
name='Found',
marker_color='lightblue'
),
row=3, col=1
)
fig.add_trace(
go.Bar(
x=perf_df['timestamp'],
y=perf_df['opportunities_executed'],
name='Executed',
marker_color='darkblue'
),
row=3, col=1
)
# Market volatility
volatility_values = [m['volatility'] for m in perf_df['market_conditions']]
fig.add_trace(
go.Scatter(
x=perf_df['timestamp'],
y=volatility_values,
mode='lines',
name='Volatility',
line=dict(color='red')
),
row=3, col=2
)
# Update layout
fig.update_layout(
height=1000,
showlegend=False,
title_text="Cross-Asset Arbitrage Performance Dashboard"
)
# Update axes
fig.update_xaxes(title_text="Time", row=3, col=1)
fig.update_xaxes(title_text="Time", row=3, col=2)
fig.update_yaxes(title_text="Value ($)", row=1, col=1)
fig.update_yaxes(title_text="Profit ($)", row=1, col=2)
fig.update_yaxes(title_text="Rate", row=2, col=1)
fig.update_yaxes(title_text="Sharpe", row=2, col=2)
fig.update_yaxes(title_text="Count", row=3, col=1)
fig.update_yaxes(title_text="Volatility", row=3, col=2)
return fig
def create_execution_analysis(execution_history: List[ExecutionResult]) -> go.Figure:
"""Create execution analysis visualization"""
if not execution_history:
fig = go.Figure()
fig.add_annotation(
text="No execution data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False
)
return fig
# Convert to DataFrame
exec_df = pd.DataFrame([
{
'timestamp': e.timestamp,
'profit': e.realized_profit,
'slippage': e.slippage,
'latency': e.latency_ms,
'success': e.success
}
for e in execution_history
])
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
'Profit Distribution', 'Slippage Analysis',
'Latency Distribution', 'Success Rate Over Time'
),
specs=[
[{"type": "histogram"}, {"type": "scatter"}],
[{"type": "histogram"}, {"type": "scatter"}]
]
)
# Profit distribution
fig.add_trace(
go.Histogram(
x=exec_df['profit'],
nbinsx=30,
name='Profit',
marker_color='green'
),
row=1, col=1
)
# Slippage over time
fig.add_trace(
go.Scatter(
x=exec_df['timestamp'],
y=exec_df['slippage'] * 100, # Convert to percentage
mode='markers',
name='Slippage',
marker=dict(
color=exec_df['success'].map({True: 'blue', False: 'red'}),
size=8
)
),
row=1, col=2
)
# Latency distribution
fig.add_trace(
go.Histogram(
x=exec_df['latency'],
nbinsx=30,
name='Latency',
marker_color='orange'
),
row=2, col=1
)
# Success rate over time (rolling)
exec_df['success_int'] = exec_df['success'].astype(int)
exec_df['success_rate_rolling'] = exec_df['success_int'].rolling(
window=20, min_periods=1
).mean()
fig.add_trace(
go.Scatter(
x=exec_df['timestamp'],
y=exec_df['success_rate_rolling'],
mode='lines',
name='Success Rate',
line=dict(color='purple', width=2)
),
row=2, col=2
)
# Update layout
fig.update_layout(
height=700,
showlegend=False,
title_text="Execution Analysis"
)
# Update axes
fig.update_xaxes(title_text="Profit ($)", row=1, col=1)
fig.update_xaxes(title_text="Time", row=1, col=2)
fig.update_xaxes(title_text="Latency (ms)", row=2, col=1)
fig.update_xaxes(title_text="Time", row=2, col=2)
fig.update_yaxes(title_text="Count", row=1, col=1)
fig.update_yaxes(title_text="Slippage (%)", row=1, col=2)
fig.update_yaxes(title_text="Count", row=2, col=1)
fig.update_yaxes(title_text="Success Rate", row=2, col=2)
return fig
# Gradio Interface
def create_gradio_interface():
"""Create the main Gradio interface"""
# Initialize engine
engine = CrossAssetArbitrageEngine()
def run_arbitrage_simulation(n_cycles, initial_capital, min_profit_threshold):
"""Run arbitrage simulation"""
# Reset engine
engine.portfolio_value = float(initial_capital)
engine.performance_history = []
engine.execution_engine.execution_history = []
engine.active_positions = []
# Update strategy parameters
engine.strategy_optimizer.current_parameters['min_profit_threshold'] = float(min_profit_threshold) / 100
# Generate initial market data
assets = []
for asset_class, asset_list in ASSET_CLASSES.items():
assets.extend(asset_list[:2]) # Use 2 assets from each class
engine.generate_market_data(assets, days=200)
# Run simulation cycles
cycle_summaries = []
for i in range(int(n_cycles)):
# Update market data (simulate price movement)
for asset, data in engine.market_data_cache.items():
# Add new price point
last_price = data['close'].iloc[-1]
new_return = np.random.normal(0.0001, 0.01)
new_price = last_price * (1 + new_return)
new_row = pd.DataFrame({
'open': [new_price * (1 + np.random.normal(0, 0.002))],
'high': [new_price * (1 + abs(np.random.normal(0, 0.005)))],
'low': [new_price * (1 - abs(np.random.normal(0, 0.005)))],
'close': [new_price],
'volume': [np.random.lognormal(15, 0.5)]
}, index=[data.index[-1] + pd.Timedelta(hours=1)])
# Ensure OHLC consistency
new_row['high'] = new_row[['open', 'high', 'close']].max(axis=1)
new_row['low'] = new_row[['open', 'low', 'close']].min(axis=1)
engine.market_data_cache[asset] = pd.concat([data, new_row])
# Keep only recent data
engine.market_data_cache[asset] = engine.market_data_cache[asset].iloc[-200:]
# Run arbitrage cycle
summary = engine.run_arbitrage_cycle()
cycle_summaries.append(summary)
# Create visualizations
opportunity_network = create_opportunity_network(
engine.arbitrage_detector.opportunity_history[-50:]
)
performance_dashboard = create_performance_dashboard(
engine.performance_history
)
execution_analysis = create_execution_analysis(
engine.execution_engine.execution_history
)
# Calculate summary statistics
total_profit = sum(s['total_profit'] for s in engine.performance_history)
total_return = (engine.portfolio_value - initial_capital) / initial_capital
if len(engine.execution_engine.execution_history) > 0:
success_rate = sum(
1 for e in engine.execution_engine.execution_history if e.success
) / len(engine.execution_engine.execution_history)
avg_latency = np.mean([
e.latency_ms for e in engine.execution_engine.execution_history
])
avg_slippage = np.mean([
e.slippage for e in engine.execution_engine.execution_history
])
else:
success_rate = avg_latency = avg_slippage = 0
# Get latest risk metrics
if engine.risk_analytics.risk_metrics_history:
latest_risk = engine.risk_analytics.risk_metrics_history[-1]
sharpe = latest_risk['sharpe_ratio']
var_95 = latest_risk['var_95']
else:
sharpe = var_95 = 0
summary_text = f"""
### Simulation Summary
**Performance Metrics:**
- Total Profit: ${total_profit:,.2f}
- Total Return: {total_return*100:.2f}%
- Final Portfolio Value: ${engine.portfolio_value:,.2f}
- Sharpe Ratio: {sharpe:.2f}
- VaR (95%): {var_95*100:.2f}%
**Execution Statistics:**
- Total Opportunities Found: {sum(s['opportunities_found'] for s in engine.performance_history)}
- Total Executions: {len(engine.execution_engine.execution_history)}
- Success Rate: {success_rate*100:.1f}%
- Average Latency: {avg_latency:.0f}ms
- Average Slippage: {avg_slippage*100:.2f}%
**Active Positions:** {len(engine.active_positions)}
"""
# Latest opportunities table
recent_opps = []
for opp in engine.arbitrage_detector.opportunity_history[-10:]:
recent_opps.append({
'Asset': opp.asset,
'Buy Exchange': opp.buy_exchange,
'Sell Exchange': opp.sell_exchange,
'Spread': f"{(opp.sell_price - opp.buy_price)/opp.buy_price*100:.2f}%",
'Expected Profit': f"${opp.expected_profit:.2f}",
'Latency Risk': f"{opp.latency_risk:.2f}"
})
opportunities_df = pd.DataFrame(recent_opps) if recent_opps else pd.DataFrame()
return (opportunity_network, performance_dashboard, execution_analysis,
summary_text, opportunities_df)
def analyze_strategy_parameters():
"""Analyze current strategy parameters"""
if not engine.strategy_optimizer.parameter_history:
return "No parameter history available", ""
# Get parameter evolution
param_history = engine.strategy_optimizer.parameter_history.get('suggestions', [])
if not param_history:
return "No parameter suggestions generated", ""
# Create parameter evolution chart
param_df = pd.DataFrame([
{
'timestamp': entry['timestamp'],
**entry['parameters']
}
for entry in param_history
])
fig = make_subplots(
rows=2, cols=2,
subplot_titles=(
'Profit Threshold', 'Position Size',
'Risk Limit', 'Confidence Threshold'
)
)
# Plot each parameter
params_to_plot = [
('min_profit_threshold', 1, 1, 'Threshold'),
('max_position_size', 1, 2, 'Size ($)'),
('risk_limit', 2, 1, 'Limit'),
('confidence_threshold', 2, 2, 'Threshold')
]
for param, row, col, ylabel in params_to_plot:
if param in param_df.columns:
fig.add_trace(
go.Scatter(
x=param_df['timestamp'],
y=param_df[param],
mode='lines+markers',
name=param
),
row=row, col=col
)
fig.update_layout(
height=600,
showlegend=False,
title_text="Strategy Parameter Evolution"
)
# Get latest reasoning
latest_reasoning = param_history[-1]['reasoning'] if param_history else "No reasoning available"
return fig, f"**Latest Optimization Reasoning:** {latest_reasoning}"
# Create interface
with gr.Blocks(title="Cross-Asset Arbitrage Engine") as interface:
gr.Markdown("""
# Cross-Asset Arbitrage Engine with Transformer Models
This sophisticated arbitrage engine leverages transformer models for price forecasting across multiple asset classes:
- **Crypto Spot/Futures**: BTC, ETH, SOL and perpetual futures
- **Foreign Exchange**: Major currency pairs
- **Equity ETFs**: SPY, QQQ, IWM and international markets
Features:
- **Transformer Price Prediction**: Numerical-adapted transformers for multi-horizon forecasting
- **Cross-Venue Execution**: Simulates CEX (Binance, Coinbase) and DEX (Uniswap V3) integration
- **LLM Strategy Optimization**: Dynamic parameter adjustment based on performance
- **Latency-Aware Execution**: Realistic order routing with slippage simulation
- **Comprehensive Risk Analytics**: Real-time VaR, Sharpe ratio, and drawdown monitoring
Author: Spencer Purdy
""")
with gr.Tab("Arbitrage Simulation"):
with gr.Row():
with gr.Column(scale=1):
n_cycles = gr.Slider(
minimum=10, maximum=100, value=50, step=10,
label="Number of Trading Cycles"
)
initial_capital = gr.Number(
value=100000, label="Initial Capital ($)", minimum=10000
)
min_profit = gr.Slider(
minimum=0.1, maximum=1.0, value=0.2, step=0.1,
label="Minimum Profit Threshold (%)"
)
run_btn = gr.Button("Run Simulation", variant="primary")
with gr.Row():
opportunity_network = gr.Plot(label="Arbitrage Opportunity Network")
with gr.Row():
performance_dashboard = gr.Plot(label="Performance Dashboard")
with gr.Row():
execution_analysis = gr.Plot(label="Execution Analysis")
with gr.Row():
with gr.Column(scale=1):
summary_display = gr.Markdown(label="Summary Statistics")
with gr.Column(scale=1):
opportunities_table = gr.DataFrame(
label="Recent Arbitrage Opportunities"
)
with gr.Tab("Strategy Analysis"):
with gr.Row():
analyze_btn = gr.Button("Analyze Strategy Parameters", variant="primary")
with gr.Row():
param_evolution = gr.Plot(label="Parameter Evolution")
with gr.Row():
param_reasoning = gr.Markdown(label="Optimization Reasoning")
# Event handlers
run_btn.click(
fn=run_arbitrage_simulation,
inputs=[n_cycles, initial_capital, min_profit],
outputs=[
opportunity_network, performance_dashboard,
execution_analysis, summary_display, opportunities_table
]
)
analyze_btn.click(
fn=analyze_strategy_parameters,
inputs=[],
outputs=[param_evolution, param_reasoning]
)
# Add examples
gr.Examples(
examples=[
[50, 100000, 0.2],
[30, 50000, 0.3],
[100, 200000, 0.15]
],
inputs=[n_cycles, initial_capital, min_profit]
)
return interface
# Launch application
if __name__ == "__main__":
interface = create_gradio_interface()
interface.launch()