|
""" |
|
Cross-Asset Arbitrage Engine with Transformer Models |
|
Author: Spencer Purdy |
|
Description: A sophisticated arbitrage engine leveraging transformer models for price forecasting |
|
across multiple asset classes and venues. Integrates CEX/DEX venues with latency-aware |
|
execution, LLM-driven optimization, and comprehensive risk analytics. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
import numpy as np |
|
import pandas as pd |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from datetime import datetime, timedelta |
|
import gradio as gr |
|
import plotly.graph_objects as go |
|
import plotly.express as px |
|
from plotly.subplots import make_subplots |
|
import json |
|
import random |
|
from typing import Dict, List, Tuple, Optional, Any, Union |
|
from dataclasses import dataclass, field |
|
from collections import defaultdict, deque |
|
import warnings |
|
warnings.filterwarnings('ignore') |
|
|
|
|
|
from scipy import stats |
|
from scipy.optimize import minimize |
|
from sklearn.preprocessing import StandardScaler |
|
import networkx as nx |
|
|
|
|
|
np.random.seed(42) |
|
torch.manual_seed(42) |
|
random.seed(42) |
|
|
|
|
|
TRADING_DAYS_PER_YEAR = 365 |
|
RISK_FREE_RATE = 0.045 |
|
TRANSACTION_COST_CEX = 0.001 |
|
TRANSACTION_COST_DEX = 0.003 |
|
GAS_COST_USD = 5.0 |
|
MIN_PROFIT_THRESHOLD = 0.002 |
|
MAX_POSITION_SIZE = 100000 |
|
LATENCY_CEX_MS = 50 |
|
LATENCY_DEX_MS = 1000 |
|
|
|
|
|
ASSET_CLASSES = { |
|
'crypto_spot': ['BTC', 'ETH', 'SOL', 'MATIC', 'AVAX'], |
|
'crypto_futures': ['BTC-PERP', 'ETH-PERP', 'SOL-PERP'], |
|
'fx_pairs': ['EUR/USD', 'GBP/USD', 'USD/JPY', 'AUD/USD'], |
|
'equity_etfs': ['SPY', 'QQQ', 'IWM', 'EFA', 'EEM'] |
|
} |
|
|
|
|
|
EXCHANGES = { |
|
'cex': ['Binance', 'Coinbase', 'Kraken', 'FTX'], |
|
'dex': ['Uniswap_V3', 'SushiSwap', 'Curve', 'Balancer'] |
|
} |
|
|
|
@dataclass |
|
class OrderBook: |
|
"""Order book data structure for managing bid/ask levels""" |
|
exchange: str |
|
asset: str |
|
timestamp: datetime |
|
bids: List[Tuple[float, float]] |
|
asks: List[Tuple[float, float]] |
|
|
|
def get_best_bid(self) -> Tuple[float, float]: |
|
"""Get best bid price and size""" |
|
return self.bids[0] if self.bids else (0.0, 0.0) |
|
|
|
def get_best_ask(self) -> Tuple[float, float]: |
|
"""Get best ask price and size""" |
|
return self.asks[0] if self.asks else (float('inf'), 0.0) |
|
|
|
def get_mid_price(self) -> float: |
|
"""Calculate mid price from best bid and ask""" |
|
bid, _ = self.get_best_bid() |
|
ask, _ = self.get_best_ask() |
|
return (bid + ask) / 2 if bid > 0 and ask < float('inf') else 0.0 |
|
|
|
@dataclass |
|
class ArbitrageOpportunity: |
|
"""Data structure for storing identified arbitrage opportunities""" |
|
opportunity_id: str |
|
asset: str |
|
buy_exchange: str |
|
sell_exchange: str |
|
buy_price: float |
|
sell_price: float |
|
max_size: float |
|
expected_profit: float |
|
expected_profit_pct: float |
|
latency_risk: float |
|
timestamp: datetime |
|
metadata: Dict[str, Any] = field(default_factory=dict) |
|
|
|
@dataclass |
|
class ExecutionResult: |
|
"""Data structure for storing execution results""" |
|
opportunity_id: str |
|
success: bool |
|
executed_size: float |
|
buy_fill_price: float |
|
sell_fill_price: float |
|
realized_profit: float |
|
slippage: float |
|
latency_ms: float |
|
gas_cost: float |
|
timestamp: datetime |
|
|
|
class NumericalTransformer(nn.Module): |
|
"""Transformer architecture adapted for numerical price prediction with training capability""" |
|
|
|
def __init__(self, input_dim: int = 10, hidden_dim: int = 128, |
|
num_heads: int = 8, num_layers: int = 4, |
|
prediction_horizon: int = 5): |
|
super().__init__() |
|
|
|
self.input_dim = input_dim |
|
self.hidden_dim = hidden_dim |
|
self.prediction_horizon = prediction_horizon |
|
|
|
|
|
self.input_projection = nn.Linear(input_dim, hidden_dim) |
|
|
|
|
|
self.positional_encoding = self._create_positional_encoding(1000, hidden_dim) |
|
|
|
|
|
encoder_layer = nn.TransformerEncoderLayer( |
|
d_model=hidden_dim, |
|
nhead=num_heads, |
|
dim_feedforward=hidden_dim * 4, |
|
dropout=0.1, |
|
activation='gelu', |
|
batch_first=True |
|
) |
|
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) |
|
|
|
|
|
self.price_head = nn.Linear(hidden_dim, prediction_horizon) |
|
self.volatility_head = nn.Linear(hidden_dim, prediction_horizon) |
|
self.volume_head = nn.Linear(hidden_dim, prediction_horizon) |
|
self.uncertainty_head = nn.Linear(hidden_dim, prediction_horizon) |
|
|
|
|
|
self._init_weights() |
|
|
|
def _init_weights(self): |
|
"""Initialize model weights using Xavier initialization""" |
|
for module in self.modules(): |
|
if isinstance(module, nn.Linear): |
|
nn.init.xavier_uniform_(module.weight) |
|
if module.bias is not None: |
|
nn.init.zeros_(module.bias) |
|
|
|
def _create_positional_encoding(self, max_len: int, d_model: int) -> nn.Parameter: |
|
"""Create sinusoidal positional encoding""" |
|
pe = torch.zeros(max_len, d_model) |
|
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) |
|
div_term = torch.exp(torch.arange(0, d_model, 2).float() * |
|
(-np.log(10000.0) / d_model)) |
|
|
|
pe[:, 0::2] = torch.sin(position * div_term) |
|
pe[:, 1::2] = torch.cos(position * div_term) |
|
|
|
return nn.Parameter(pe.unsqueeze(0), requires_grad=False) |
|
|
|
def forward(self, x: torch.Tensor, mask: Optional[torch.Tensor] = None) -> Dict[str, torch.Tensor]: |
|
""" |
|
Forward pass through the transformer |
|
Args: |
|
x: Input tensor of shape (batch_size, seq_len, input_dim) |
|
mask: Optional attention mask |
|
Returns: |
|
Dictionary containing price, volatility, volume predictions and uncertainty |
|
""" |
|
batch_size, seq_len, _ = x.shape |
|
|
|
|
|
x = self.input_projection(x) |
|
|
|
|
|
x = x + self.positional_encoding[:, :seq_len, :] |
|
|
|
|
|
encoded = self.transformer(x, src_key_padding_mask=mask) |
|
|
|
|
|
last_hidden = encoded[:, -1, :] |
|
|
|
|
|
price_pred = self.price_head(last_hidden) |
|
volatility_pred = torch.sigmoid(self.volatility_head(last_hidden)) * 0.1 |
|
volume_pred = torch.exp(self.volume_head(last_hidden)) |
|
uncertainty = torch.sigmoid(self.uncertainty_head(last_hidden)) |
|
|
|
return { |
|
'price': price_pred, |
|
'volatility': volatility_pred, |
|
'volume': volume_pred, |
|
'uncertainty': uncertainty |
|
} |
|
|
|
class PriceForecastingEngine: |
|
"""Engine for multi-asset price forecasting using transformers with actual training""" |
|
|
|
def __init__(self): |
|
self.models = {} |
|
self.scalers = {} |
|
self.training_history = defaultdict(list) |
|
self.is_trained = defaultdict(bool) |
|
|
|
|
|
for asset_class in ASSET_CLASSES.keys(): |
|
self.models[asset_class] = NumericalTransformer() |
|
self.scalers[asset_class] = StandardScaler() |
|
self.is_trained[asset_class] = False |
|
|
|
def prepare_features(self, price_data: pd.DataFrame) -> np.ndarray: |
|
"""Prepare features for transformer input""" |
|
features = [] |
|
|
|
|
|
features.append(price_data['close'].values) |
|
features.append(price_data['high'].values) |
|
features.append(price_data['low'].values) |
|
|
|
|
|
features.append(np.log1p(price_data['volume'].values)) |
|
|
|
|
|
returns = price_data['close'].pct_change().fillna(0) |
|
features.append(returns.values) |
|
|
|
|
|
ma_7 = price_data['close'].rolling(7).mean().fillna(method='bfill') |
|
ma_21 = price_data['close'].rolling(21).mean().fillna(method='bfill') |
|
features.append((price_data['close'] / ma_7 - 1).values) |
|
features.append((price_data['close'] / ma_21 - 1).values) |
|
|
|
|
|
volatility = returns.rolling(20).std().fillna(method='bfill') |
|
features.append(volatility.values) |
|
|
|
|
|
rsi = self.calculate_rsi(price_data['close']) |
|
features.append(rsi.values / 100) |
|
|
|
|
|
ofi = np.random.normal(0, 0.1, len(price_data)) |
|
features.append(ofi) |
|
|
|
|
|
feature_matrix = np.column_stack(features) |
|
|
|
return feature_matrix |
|
|
|
def calculate_rsi(self, prices: pd.Series, period: int = 14) -> pd.Series: |
|
"""Calculate Relative Strength Index""" |
|
delta = prices.diff() |
|
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() |
|
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() |
|
|
|
rs = gain / loss |
|
rsi = 100 - (100 / (1 + rs)) |
|
|
|
return rsi.fillna(50) |
|
|
|
def create_sequences(self, features: np.ndarray, targets: np.ndarray, |
|
seq_len: int = 50, horizon: int = 5) -> Tuple[np.ndarray, np.ndarray]: |
|
"""Create sequences for training the transformer""" |
|
sequences = [] |
|
target_sequences = [] |
|
|
|
for i in range(seq_len, len(features) - horizon): |
|
sequences.append(features[i-seq_len:i]) |
|
target_sequences.append(targets[i:i+horizon]) |
|
|
|
return np.array(sequences), np.array(target_sequences) |
|
|
|
def train_model(self, asset_class: str, price_data: pd.DataFrame, |
|
epochs: int = 50, batch_size: int = 32): |
|
"""Train the transformer model on historical data""" |
|
if len(price_data) < 100: |
|
return |
|
|
|
|
|
features = self.prepare_features(price_data) |
|
|
|
|
|
features_scaled = self.scalers[asset_class].fit_transform(features) |
|
|
|
|
|
returns = price_data['close'].pct_change().fillna(0).values |
|
|
|
|
|
X, y = self.create_sequences(features_scaled, returns) |
|
|
|
if len(X) == 0: |
|
return |
|
|
|
|
|
X_tensor = torch.FloatTensor(X) |
|
y_tensor = torch.FloatTensor(y) |
|
|
|
|
|
dataset = torch.utils.data.TensorDataset(X_tensor, y_tensor) |
|
loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) |
|
|
|
|
|
model = self.models[asset_class] |
|
optimizer = optim.Adam(model.parameters(), lr=0.001) |
|
criterion = nn.MSELoss() |
|
|
|
|
|
model.train() |
|
for epoch in range(epochs): |
|
epoch_loss = 0.0 |
|
|
|
for batch_X, batch_y in loader: |
|
optimizer.zero_grad() |
|
|
|
|
|
predictions = model(batch_X) |
|
|
|
|
|
loss = criterion(predictions['price'], batch_y) |
|
|
|
|
|
loss.backward() |
|
optimizer.step() |
|
|
|
epoch_loss += loss.item() |
|
|
|
|
|
avg_loss = epoch_loss / len(loader) |
|
self.training_history[asset_class].append(avg_loss) |
|
|
|
|
|
self.is_trained[asset_class] = True |
|
model.eval() |
|
|
|
def forecast_prices(self, asset: str, price_history: pd.DataFrame, |
|
horizon: int = 5) -> Dict[str, np.ndarray]: |
|
"""Generate price forecasts using transformer model""" |
|
|
|
|
|
asset_class = self._get_asset_class(asset) |
|
if not asset_class: |
|
return self._generate_random_forecast(horizon) |
|
|
|
|
|
if not self.is_trained[asset_class] and len(price_history) > 100: |
|
self.train_model(asset_class, price_history) |
|
|
|
|
|
features = self.prepare_features(price_history) |
|
|
|
|
|
features_scaled = self.scalers[asset_class].fit_transform(features) |
|
|
|
|
|
seq_len = min(50, len(features_scaled)) |
|
if len(features_scaled) < seq_len: |
|
|
|
padding = seq_len - len(features_scaled) |
|
features_scaled = np.vstack([ |
|
np.zeros((padding, features_scaled.shape[1])), |
|
features_scaled |
|
]) |
|
|
|
|
|
sequence = features_scaled[-seq_len:].reshape(1, seq_len, -1) |
|
sequence_tensor = torch.FloatTensor(sequence) |
|
|
|
|
|
model = self.models[asset_class] |
|
model.eval() |
|
|
|
with torch.no_grad(): |
|
predictions = model(sequence_tensor) |
|
|
|
|
|
current_price = price_history['close'].iloc[-1] |
|
|
|
|
|
price_changes = predictions['price'].numpy()[0] |
|
price_forecast = current_price * (1 + price_changes * 0.01) |
|
|
|
return { |
|
'price': price_forecast, |
|
'volatility': predictions['volatility'].numpy()[0], |
|
'volume': predictions['volume'].numpy()[0], |
|
'uncertainty': predictions['uncertainty'].numpy()[0] |
|
} |
|
|
|
def _get_asset_class(self, asset: str) -> Optional[str]: |
|
"""Determine asset class for a given asset""" |
|
for asset_class, assets in ASSET_CLASSES.items(): |
|
if asset in assets or any(asset.startswith(a) for a in assets): |
|
return asset_class |
|
return None |
|
|
|
def _generate_random_forecast(self, horizon: int) -> Dict[str, np.ndarray]: |
|
"""Generate random forecast as fallback""" |
|
return { |
|
'price': np.random.normal(100, 2, horizon), |
|
'volatility': np.random.uniform(0.01, 0.05, horizon), |
|
'volume': np.random.lognormal(15, 0.5, horizon), |
|
'uncertainty': np.random.uniform(0.3, 0.7, horizon) |
|
} |
|
|
|
class ExchangeSimulator: |
|
"""Simulate exchange order books and execution with realistic market dynamics""" |
|
|
|
def __init__(self, exchange_type: str = 'cex'): |
|
self.exchange_type = exchange_type |
|
self.order_books = {} |
|
self.latency_ms = LATENCY_CEX_MS if exchange_type == 'cex' else LATENCY_DEX_MS |
|
self.transaction_cost = TRANSACTION_COST_CEX if exchange_type == 'cex' else TRANSACTION_COST_DEX |
|
|
|
def generate_order_book(self, asset: str, mid_price: float, |
|
spread_bps: float = 10, market_conditions: Dict[str, Any] = None) -> OrderBook: |
|
"""Generate realistic order book with dynamic spread based on market conditions""" |
|
|
|
|
|
if market_conditions: |
|
volatility = market_conditions.get('volatility', 0.02) |
|
liquidity = market_conditions.get('liquidity', 1.0) |
|
spread_bps *= (1 + volatility * 10) * (2 - liquidity) |
|
|
|
spread = mid_price * spread_bps / 10000 |
|
|
|
|
|
n_levels = 10 |
|
bids = [] |
|
asks = [] |
|
|
|
for i in range(n_levels): |
|
|
|
bid_price = mid_price - spread/2 - i * spread/10 |
|
bid_size = np.random.lognormal(10, 1) * (n_levels - i) / n_levels |
|
bids.append((bid_price, bid_size)) |
|
|
|
|
|
ask_price = mid_price + spread/2 + i * spread/10 |
|
ask_size = np.random.lognormal(10, 1) * (n_levels - i) / n_levels |
|
asks.append((ask_price, ask_size)) |
|
|
|
return OrderBook( |
|
exchange=self.exchange_type, |
|
asset=asset, |
|
timestamp=datetime.now(), |
|
bids=bids, |
|
asks=asks |
|
) |
|
|
|
def simulate_market_impact(self, size: float, liquidity: float) -> float: |
|
"""Calculate market impact using square-root model""" |
|
|
|
impact_bps = 10 * np.sqrt(size / liquidity) |
|
return impact_bps / 10000 |
|
|
|
def execute_order(self, order_book: OrderBook, side: str, |
|
size: float) -> Tuple[float, float]: |
|
""" |
|
Simulate order execution with realistic slippage |
|
Returns: (fill_price, actual_size) |
|
""" |
|
|
|
filled_size = 0 |
|
total_cost = 0 |
|
|
|
if side == 'buy': |
|
|
|
for ask_price, ask_size in order_book.asks: |
|
if filled_size >= size: |
|
break |
|
|
|
fill_amount = min(size - filled_size, ask_size) |
|
filled_size += fill_amount |
|
total_cost += fill_amount * ask_price |
|
|
|
else: |
|
|
|
for bid_price, bid_size in order_book.bids: |
|
if filled_size >= size: |
|
break |
|
|
|
fill_amount = min(size - filled_size, bid_size) |
|
filled_size += fill_amount |
|
total_cost += fill_amount * bid_price |
|
|
|
|
|
avg_fill_price = total_cost / filled_size if filled_size > 0 else 0 |
|
|
|
|
|
liquidity = sum(s for _, s in order_book.bids) + sum(s for _, s in order_book.asks) |
|
impact = self.simulate_market_impact(filled_size, liquidity) |
|
|
|
if side == 'buy': |
|
avg_fill_price *= (1 + impact) |
|
else: |
|
avg_fill_price *= (1 - impact) |
|
|
|
return avg_fill_price, filled_size |
|
|
|
class ArbitrageDetector: |
|
"""Detect arbitrage opportunities across venues with advanced filtering""" |
|
|
|
def __init__(self): |
|
self.opportunity_history = [] |
|
self.min_profit_threshold = MIN_PROFIT_THRESHOLD |
|
|
|
def find_opportunities(self, order_books: Dict[str, Dict[str, OrderBook]], |
|
transaction_costs: Dict[str, float], |
|
forecasts: Dict[str, Dict[str, np.ndarray]] = None) -> List[ArbitrageOpportunity]: |
|
"""Find arbitrage opportunities across all venues with forecast integration""" |
|
|
|
opportunities = [] |
|
|
|
|
|
for asset in self._get_all_assets(order_books): |
|
asset_books = self._get_asset_order_books(order_books, asset) |
|
|
|
if len(asset_books) < 2: |
|
continue |
|
|
|
|
|
best_bid_exchange, best_bid_price, best_bid_size = self._find_best_bid(asset_books) |
|
best_ask_exchange, best_ask_price, best_ask_size = self._find_best_ask(asset_books) |
|
|
|
if best_bid_price > best_ask_price: |
|
|
|
max_size = min(best_bid_size, best_ask_size, |
|
MAX_POSITION_SIZE / best_ask_price) |
|
|
|
|
|
buy_cost = transaction_costs.get(best_ask_exchange, TRANSACTION_COST_CEX) |
|
sell_cost = transaction_costs.get(best_bid_exchange, TRANSACTION_COST_CEX) |
|
|
|
|
|
gas_cost = 0 |
|
if 'dex' in best_ask_exchange.lower() or 'dex' in best_bid_exchange.lower(): |
|
gas_cost = GAS_COST_USD |
|
|
|
|
|
gross_profit = (best_bid_price - best_ask_price) * max_size |
|
total_cost = (buy_cost + sell_cost) * best_ask_price * max_size + gas_cost |
|
net_profit = gross_profit - total_cost |
|
profit_pct = net_profit / (best_ask_price * max_size) if max_size > 0 else 0 |
|
|
|
|
|
if forecasts and asset in forecasts: |
|
price_forecast = forecasts[asset]['price'][0] |
|
forecast_adjustment = (price_forecast - best_ask_price) / best_ask_price |
|
profit_pct += forecast_adjustment * 0.5 |
|
|
|
if profit_pct > self.min_profit_threshold: |
|
opportunity = ArbitrageOpportunity( |
|
opportunity_id=f"{asset}_{datetime.now().strftime('%Y%m%d_%H%M%S%f')}", |
|
asset=asset, |
|
buy_exchange=best_ask_exchange, |
|
sell_exchange=best_bid_exchange, |
|
buy_price=best_ask_price, |
|
sell_price=best_bid_price, |
|
max_size=max_size, |
|
expected_profit=net_profit, |
|
expected_profit_pct=profit_pct, |
|
latency_risk=self._calculate_latency_risk( |
|
best_ask_exchange, best_bid_exchange |
|
), |
|
timestamp=datetime.now(), |
|
metadata={ |
|
'spread': best_bid_price - best_ask_price, |
|
'gas_cost': gas_cost, |
|
'transaction_costs': buy_cost + sell_cost, |
|
'forecast_impact': forecast_adjustment if forecasts and asset in forecasts else 0 |
|
} |
|
) |
|
|
|
opportunities.append(opportunity) |
|
self.opportunity_history.append(opportunity) |
|
|
|
return opportunities |
|
|
|
def _get_all_assets(self, order_books: Dict[str, Dict[str, OrderBook]]) -> set: |
|
"""Get all unique assets across exchanges""" |
|
assets = set() |
|
for exchange_books in order_books.values(): |
|
assets.update(exchange_books.keys()) |
|
return assets |
|
|
|
def _get_asset_order_books(self, order_books: Dict[str, Dict[str, OrderBook]], |
|
asset: str) -> Dict[str, OrderBook]: |
|
"""Get order books for specific asset across exchanges""" |
|
asset_books = {} |
|
for exchange, books in order_books.items(): |
|
if asset in books: |
|
asset_books[exchange] = books[asset] |
|
return asset_books |
|
|
|
def _find_best_bid(self, asset_books: Dict[str, OrderBook]) -> Tuple[str, float, float]: |
|
"""Find best bid across exchanges""" |
|
best_exchange = None |
|
best_price = 0 |
|
best_size = 0 |
|
|
|
for exchange, book in asset_books.items(): |
|
bid_price, bid_size = book.get_best_bid() |
|
if bid_price > best_price: |
|
best_exchange = exchange |
|
best_price = bid_price |
|
best_size = bid_size |
|
|
|
return best_exchange, best_price, best_size |
|
|
|
def _find_best_ask(self, asset_books: Dict[str, OrderBook]) -> Tuple[str, float, float]: |
|
"""Find best ask across exchanges""" |
|
best_exchange = None |
|
best_price = float('inf') |
|
best_size = 0 |
|
|
|
for exchange, book in asset_books.items(): |
|
ask_price, ask_size = book.get_best_ask() |
|
if ask_price < best_price: |
|
best_exchange = exchange |
|
best_price = ask_price |
|
best_size = ask_size |
|
|
|
return best_exchange, best_price, best_size |
|
|
|
def _calculate_latency_risk(self, buy_exchange: str, sell_exchange: str) -> float: |
|
"""Calculate latency risk score (0-1)""" |
|
|
|
if ('dex' in buy_exchange.lower()) != ('dex' in sell_exchange.lower()): |
|
return 0.8 |
|
elif 'dex' in buy_exchange.lower(): |
|
return 0.6 |
|
else: |
|
return 0.3 |
|
|
|
class LLMStrategyOptimizer: |
|
"""LLM-inspired strategy parameter optimization with machine learning""" |
|
|
|
def __init__(self): |
|
self.parameter_history = defaultdict(list) |
|
self.performance_history = [] |
|
self.current_parameters = self._get_default_parameters() |
|
self.optimization_model = self._build_optimization_model() |
|
|
|
def _get_default_parameters(self) -> Dict[str, Any]: |
|
"""Get default strategy parameters""" |
|
return { |
|
'min_profit_threshold': 0.002, |
|
'max_position_size': 100000, |
|
'risk_limit': 0.02, |
|
'correlation_threshold': 0.7, |
|
'rebalance_frequency': 300, |
|
'latency_buffer': 1.5, |
|
'confidence_threshold': 0.6, |
|
'max_concurrent_trades': 5 |
|
} |
|
|
|
def _build_optimization_model(self) -> nn.Module: |
|
"""Build neural network for parameter optimization""" |
|
class ParameterOptimizer(nn.Module): |
|
def __init__(self): |
|
super().__init__() |
|
self.fc1 = nn.Linear(20, 64) |
|
self.fc2 = nn.Linear(64, 32) |
|
self.fc3 = nn.Linear(32, 8) |
|
self.relu = nn.ReLU() |
|
self.sigmoid = nn.Sigmoid() |
|
|
|
def forward(self, x): |
|
x = self.relu(self.fc1(x)) |
|
x = self.relu(self.fc2(x)) |
|
x = self.sigmoid(self.fc3(x)) |
|
return x |
|
|
|
return ParameterOptimizer() |
|
|
|
def generate_parameter_suggestions(self, |
|
recent_performance: List[ExecutionResult], |
|
market_conditions: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Generate parameter adjustments using ML-based optimization""" |
|
|
|
suggestions = self.current_parameters.copy() |
|
|
|
if not recent_performance: |
|
return suggestions |
|
|
|
|
|
success_rate = sum(1 for r in recent_performance if r.success) / len(recent_performance) |
|
avg_slippage = np.mean([r.slippage for r in recent_performance]) |
|
avg_profit = np.mean([r.realized_profit for r in recent_performance]) |
|
profit_variance = np.var([r.realized_profit for r in recent_performance]) |
|
|
|
|
|
features = [ |
|
success_rate, |
|
avg_slippage, |
|
avg_profit / 1000, |
|
profit_variance / 1000000, |
|
market_conditions.get('volatility', 0.02), |
|
market_conditions.get('liquidity', 1.0), |
|
len(recent_performance) / 100, |
|
self.current_parameters['min_profit_threshold'], |
|
self.current_parameters['max_position_size'] / 1000000, |
|
self.current_parameters['risk_limit'], |
|
self.current_parameters['correlation_threshold'], |
|
self.current_parameters['rebalance_frequency'] / 3600, |
|
self.current_parameters['latency_buffer'], |
|
self.current_parameters['confidence_threshold'], |
|
self.current_parameters['max_concurrent_trades'] / 10, |
|
|
|
market_conditions.get('max_volatility', 0.03), |
|
float(datetime.now().hour) / 24, |
|
float(datetime.now().weekday()) / 7, |
|
0.5, |
|
0.5 |
|
] |
|
|
|
|
|
feature_tensor = torch.FloatTensor([features]) |
|
|
|
with torch.no_grad(): |
|
adjustments = self.optimization_model(feature_tensor).numpy()[0] |
|
|
|
|
|
suggestions['min_profit_threshold'] = 0.001 + adjustments[0] * 0.009 |
|
suggestions['max_position_size'] = 10000 + adjustments[1] * 490000 |
|
suggestions['risk_limit'] = 0.005 + adjustments[2] * 0.045 |
|
suggestions['correlation_threshold'] = 0.5 + adjustments[3] * 0.4 |
|
suggestions['rebalance_frequency'] = 60 + adjustments[4] * 3540 |
|
suggestions['latency_buffer'] = 1.0 + adjustments[5] * 2.0 |
|
suggestions['confidence_threshold'] = 0.5 + adjustments[6] * 0.4 |
|
suggestions['max_concurrent_trades'] = int(1 + adjustments[7] * 9) |
|
|
|
|
|
if success_rate < 0.7: |
|
suggestions['min_profit_threshold'] *= 1.1 |
|
suggestions['confidence_threshold'] *= 1.05 |
|
|
|
if avg_slippage > 0.001: |
|
suggestions['max_position_size'] *= 0.9 |
|
suggestions['latency_buffer'] *= 1.1 |
|
|
|
if avg_profit < 0: |
|
suggestions['risk_limit'] *= 0.9 |
|
suggestions['max_concurrent_trades'] = max(1, suggestions['max_concurrent_trades'] - 1) |
|
|
|
|
|
if market_conditions.get('volatility', 0) > 0.03: |
|
suggestions['min_profit_threshold'] *= 1.2 |
|
suggestions['correlation_threshold'] *= 0.9 |
|
|
|
if market_conditions.get('liquidity', 1) < 0.5: |
|
suggestions['max_position_size'] *= 0.7 |
|
|
|
|
|
suggestions = self._apply_parameter_bounds(suggestions) |
|
|
|
|
|
self.parameter_history['suggestions'].append({ |
|
'timestamp': datetime.now(), |
|
'parameters': suggestions, |
|
'reasoning': self._generate_reasoning(recent_performance, market_conditions), |
|
'performance_metrics': { |
|
'success_rate': success_rate, |
|
'avg_slippage': avg_slippage, |
|
'avg_profit': avg_profit |
|
} |
|
}) |
|
|
|
self.current_parameters = suggestions |
|
return suggestions |
|
|
|
def _apply_parameter_bounds(self, parameters: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Apply bounds to parameters""" |
|
bounds = { |
|
'min_profit_threshold': (0.001, 0.01), |
|
'max_position_size': (10000, 500000), |
|
'risk_limit': (0.005, 0.05), |
|
'correlation_threshold': (0.5, 0.9), |
|
'rebalance_frequency': (60, 3600), |
|
'latency_buffer': (1.0, 3.0), |
|
'confidence_threshold': (0.5, 0.9), |
|
'max_concurrent_trades': (1, 10) |
|
} |
|
|
|
bounded = parameters.copy() |
|
for param, (min_val, max_val) in bounds.items(): |
|
if param in bounded: |
|
bounded[param] = max(min_val, min(max_val, bounded[param])) |
|
|
|
return bounded |
|
|
|
def _generate_reasoning(self, performance: List[ExecutionResult], |
|
market_conditions: Dict[str, Any]) -> str: |
|
"""Generate reasoning for parameter adjustments""" |
|
|
|
reasons = [] |
|
|
|
if performance: |
|
success_rate = sum(1 for r in performance if r.success) / len(performance) |
|
if success_rate < 0.7: |
|
reasons.append("Low success rate detected - increasing selectivity") |
|
|
|
avg_slippage = np.mean([r.slippage for r in performance]) |
|
if avg_slippage > 0.001: |
|
reasons.append("High slippage observed - adjusting execution parameters") |
|
|
|
avg_profit = np.mean([r.realized_profit for r in performance]) |
|
if avg_profit < 0: |
|
reasons.append("Negative average profit - tightening risk controls") |
|
|
|
if market_conditions.get('volatility', 0) > 0.03: |
|
reasons.append("Elevated market volatility - implementing conservative measures") |
|
|
|
if market_conditions.get('liquidity', 1) < 0.5: |
|
reasons.append("Reduced liquidity conditions - scaling down position sizes") |
|
|
|
return "; ".join(reasons) if reasons else "Standard market conditions" |
|
|
|
class RiskAnalytics: |
|
"""Comprehensive risk analytics system with advanced metrics""" |
|
|
|
def __init__(self): |
|
self.position_history = [] |
|
self.var_confidence = 0.95 |
|
self.risk_metrics_history = [] |
|
self.correlation_matrix = None |
|
|
|
def calculate_var(self, returns: np.ndarray, confidence: float = 0.95) -> float: |
|
"""Calculate Value at Risk using historical simulation""" |
|
if len(returns) < 20: |
|
return 0.02 |
|
|
|
return np.percentile(returns, (1 - confidence) * 100) |
|
|
|
def calculate_cvar(self, returns: np.ndarray, confidence: float = 0.95) -> float: |
|
"""Calculate Conditional Value at Risk (Expected Shortfall)""" |
|
var = self.calculate_var(returns, confidence) |
|
return returns[returns <= var].mean() |
|
|
|
def calculate_sharpe_ratio(self, returns: np.ndarray) -> float: |
|
"""Calculate Sharpe ratio""" |
|
if len(returns) < 2: |
|
return 0.0 |
|
|
|
excess_returns = returns - RISK_FREE_RATE / TRADING_DAYS_PER_YEAR |
|
return np.sqrt(TRADING_DAYS_PER_YEAR) * excess_returns.mean() / (returns.std() + 1e-8) |
|
|
|
def calculate_sortino_ratio(self, returns: np.ndarray) -> float: |
|
"""Calculate Sortino ratio (downside deviation)""" |
|
if len(returns) < 2: |
|
return 0.0 |
|
|
|
excess_returns = returns - RISK_FREE_RATE / TRADING_DAYS_PER_YEAR |
|
downside_returns = returns[returns < 0] |
|
|
|
if len(downside_returns) == 0: |
|
return float('inf') |
|
|
|
downside_std = np.std(downside_returns) |
|
return np.sqrt(TRADING_DAYS_PER_YEAR) * excess_returns.mean() / (downside_std + 1e-8) |
|
|
|
def calculate_max_drawdown(self, equity_curve: np.ndarray) -> float: |
|
"""Calculate maximum drawdown""" |
|
peak = np.maximum.accumulate(equity_curve) |
|
drawdown = (peak - equity_curve) / peak |
|
return np.max(drawdown) |
|
|
|
def calculate_calmar_ratio(self, returns: np.ndarray, equity_curve: np.ndarray) -> float: |
|
"""Calculate Calmar ratio (return / max drawdown)""" |
|
max_dd = self.calculate_max_drawdown(equity_curve) |
|
if max_dd == 0: |
|
return float('inf') |
|
|
|
annual_return = returns.mean() * TRADING_DAYS_PER_YEAR |
|
return annual_return / max_dd |
|
|
|
def analyze_position_risk(self, positions: List[Dict[str, Any]], |
|
market_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: |
|
"""Analyze risk for current positions with comprehensive metrics""" |
|
|
|
if not positions: |
|
return self._empty_risk_metrics() |
|
|
|
|
|
position_values = [] |
|
position_returns = [] |
|
|
|
for position in positions: |
|
asset = position['asset'] |
|
size = position['size'] |
|
|
|
if asset in market_data: |
|
price = market_data[asset]['close'].iloc[-1] |
|
value = size * price |
|
position_values.append(value) |
|
|
|
returns = market_data[asset]['close'].pct_change().dropna() |
|
position_returns.append(returns) |
|
|
|
total_value = sum(position_values) |
|
|
|
|
|
if position_returns: |
|
|
|
weights = np.array(position_values) / total_value |
|
portfolio_returns = np.zeros(len(position_returns[0])) |
|
|
|
for i, (weight, returns) in enumerate(zip(weights, position_returns)): |
|
portfolio_returns += weight * returns.values |
|
|
|
|
|
var = self.calculate_var(portfolio_returns) |
|
cvar = self.calculate_cvar(portfolio_returns) |
|
sharpe = self.calculate_sharpe_ratio(portfolio_returns) |
|
sortino = self.calculate_sortino_ratio(portfolio_returns) |
|
|
|
|
|
equity_curve = (1 + portfolio_returns).cumprod() |
|
max_dd = self.calculate_max_drawdown(equity_curve) |
|
calmar = self.calculate_calmar_ratio(portfolio_returns, equity_curve) |
|
|
|
|
|
returns_df = pd.DataFrame({ |
|
f'asset_{i}': returns.values |
|
for i, returns in enumerate(position_returns) |
|
}) |
|
correlation_matrix = returns_df.corr() |
|
self.correlation_matrix = correlation_matrix |
|
|
|
avg_correlation = correlation_matrix.values[np.triu_indices_from( |
|
correlation_matrix.values, k=1)].mean() |
|
else: |
|
var = cvar = sharpe = sortino = max_dd = calmar = avg_correlation = 0 |
|
|
|
|
|
herfindahl_index = sum((v/total_value)**2 for v in position_values) if total_value > 0 else 0 |
|
|
|
risk_metrics = { |
|
'total_exposure': total_value, |
|
'var_95': var, |
|
'cvar_95': cvar, |
|
'sharpe_ratio': sharpe, |
|
'sortino_ratio': sortino, |
|
'max_drawdown': max_dd, |
|
'calmar_ratio': calmar, |
|
'position_count': len(positions), |
|
'avg_correlation': avg_correlation, |
|
'concentration_risk': max(position_values) / total_value if total_value > 0 else 0, |
|
'herfindahl_index': herfindahl_index, |
|
'timestamp': datetime.now() |
|
} |
|
|
|
self.risk_metrics_history.append(risk_metrics) |
|
|
|
return risk_metrics |
|
|
|
def check_risk_limits(self, proposed_trade: ArbitrageOpportunity, |
|
current_positions: List[Dict[str, Any]], |
|
risk_parameters: Dict[str, Any]) -> Tuple[bool, str]: |
|
"""Check if proposed trade violates risk limits""" |
|
|
|
|
|
position_value = proposed_trade.max_size * proposed_trade.buy_price |
|
|
|
if position_value > risk_parameters['max_position_size']: |
|
return False, "Position size exceeds limit" |
|
|
|
|
|
current_exposure = sum(p['size'] * p['entry_price'] for p in current_positions) |
|
|
|
if current_exposure + position_value > risk_parameters['max_position_size'] * 5: |
|
return False, "Total exposure limit exceeded" |
|
|
|
|
|
if len(current_positions) >= risk_parameters['max_concurrent_trades']: |
|
return False, "Maximum concurrent trades reached" |
|
|
|
|
|
same_asset_positions = [p for p in current_positions if p['asset'] == proposed_trade.asset] |
|
if same_asset_positions: |
|
return False, "Already have position in this asset" |
|
|
|
|
|
if proposed_trade.expected_profit_pct < risk_parameters['min_profit_threshold']: |
|
return False, "Profit below minimum threshold" |
|
|
|
|
|
if proposed_trade.latency_risk > risk_parameters.get('max_latency_risk', 0.7): |
|
return False, "Latency risk too high" |
|
|
|
return True, "Risk checks passed" |
|
|
|
def _empty_risk_metrics(self) -> Dict[str, Any]: |
|
"""Return empty risk metrics""" |
|
return { |
|
'total_exposure': 0, |
|
'var_95': 0, |
|
'cvar_95': 0, |
|
'sharpe_ratio': 0, |
|
'sortino_ratio': 0, |
|
'max_drawdown': 0, |
|
'calmar_ratio': 0, |
|
'position_count': 0, |
|
'avg_correlation': 0, |
|
'concentration_risk': 0, |
|
'herfindahl_index': 0, |
|
'timestamp': datetime.now() |
|
} |
|
|
|
class LatencyAwareExecutionEngine: |
|
"""Execution engine with realistic latency simulation and smart routing""" |
|
|
|
def __init__(self): |
|
self.execution_history = [] |
|
self.latency_model = self._build_latency_model() |
|
self.slippage_model = self._build_slippage_model() |
|
self.execution_analytics = defaultdict(list) |
|
|
|
def _build_latency_model(self) -> Dict[str, Dict[str, float]]: |
|
"""Build latency model for different exchange pairs""" |
|
return { |
|
'cex_cex': {'mean': 100, 'std': 20}, |
|
'cex_dex': {'mean': 1500, 'std': 500}, |
|
'dex_dex': {'mean': 2000, 'std': 800}, |
|
} |
|
|
|
def _build_slippage_model(self) -> Dict[str, float]: |
|
"""Build slippage model based on market conditions""" |
|
return { |
|
'low_volatility': 0.0005, |
|
'normal': 0.001, |
|
'high_volatility': 0.002, |
|
'extreme': 0.005 |
|
} |
|
|
|
def simulate_execution(self, opportunity: ArbitrageOpportunity, |
|
buy_exchange: ExchangeSimulator, |
|
sell_exchange: ExchangeSimulator, |
|
market_conditions: Dict[str, Any]) -> ExecutionResult: |
|
"""Simulate order execution with realistic latency and slippage""" |
|
|
|
|
|
exchange_pair = self._get_exchange_pair_type( |
|
opportunity.buy_exchange, |
|
opportunity.sell_exchange |
|
) |
|
|
|
|
|
latency_params = self.latency_model[exchange_pair] |
|
total_latency = np.random.normal( |
|
latency_params['mean'], |
|
latency_params['std'] |
|
) |
|
total_latency = max(0, total_latency) |
|
|
|
|
|
volatility_regime = self._get_volatility_regime(market_conditions) |
|
base_slippage = self.slippage_model[volatility_regime] |
|
|
|
|
|
volatility = market_conditions.get('volatility', 0.02) |
|
price_drift = np.random.normal(0, base_slippage * np.sqrt(total_latency / 1000) * (1 + volatility * 10)) |
|
|
|
|
|
buy_price_adjusted = opportunity.buy_price * (1 + price_drift) |
|
buy_book = buy_exchange.generate_order_book( |
|
opportunity.asset, |
|
buy_price_adjusted, |
|
market_conditions=market_conditions |
|
) |
|
|
|
buy_fill_price, buy_fill_size = buy_exchange.execute_order( |
|
buy_book, 'buy', opportunity.max_size |
|
) |
|
|
|
|
|
sell_latency = np.random.normal(50, 10) |
|
price_drift_sell = np.random.normal( |
|
0, |
|
base_slippage * np.sqrt((total_latency + sell_latency) / 1000) * (1 + volatility * 10) |
|
) |
|
|
|
sell_price_adjusted = opportunity.sell_price * (1 - price_drift_sell) |
|
sell_book = sell_exchange.generate_order_book( |
|
opportunity.asset, |
|
sell_price_adjusted, |
|
market_conditions=market_conditions |
|
) |
|
|
|
sell_fill_price, sell_fill_size = sell_exchange.execute_order( |
|
sell_book, 'sell', min(buy_fill_size, opportunity.max_size) |
|
) |
|
|
|
|
|
executed_size = min(buy_fill_size, sell_fill_size) |
|
|
|
|
|
buy_cost = buy_exchange.transaction_cost * buy_fill_price * executed_size |
|
sell_cost = sell_exchange.transaction_cost * sell_fill_price * executed_size |
|
|
|
|
|
gas_cost = 0 |
|
if 'dex' in opportunity.buy_exchange.lower(): |
|
gas_cost += GAS_COST_USD |
|
if 'dex' in opportunity.sell_exchange.lower(): |
|
gas_cost += GAS_COST_USD |
|
|
|
|
|
gross_profit = (sell_fill_price - buy_fill_price) * executed_size |
|
net_profit = gross_profit - buy_cost - sell_cost - gas_cost |
|
|
|
|
|
expected_profit = (opportunity.sell_price - opportunity.buy_price) * executed_size |
|
slippage = (expected_profit - gross_profit) / expected_profit if expected_profit > 0 else 0 |
|
|
|
|
|
success = net_profit > 0 and executed_size > 0 |
|
|
|
result = ExecutionResult( |
|
opportunity_id=opportunity.opportunity_id, |
|
success=success, |
|
executed_size=executed_size, |
|
buy_fill_price=buy_fill_price, |
|
sell_fill_price=sell_fill_price, |
|
realized_profit=net_profit, |
|
slippage=slippage, |
|
latency_ms=total_latency, |
|
gas_cost=gas_cost, |
|
timestamp=datetime.now() |
|
) |
|
|
|
self.execution_history.append(result) |
|
|
|
|
|
self.execution_analytics['asset'].append(opportunity.asset) |
|
self.execution_analytics['exchange_pair'].append(exchange_pair) |
|
self.execution_analytics['volatility_regime'].append(volatility_regime) |
|
|
|
return result |
|
|
|
def _get_exchange_pair_type(self, buy_exchange: str, sell_exchange: str) -> str: |
|
"""Determine exchange pair type""" |
|
buy_is_dex = 'dex' in buy_exchange.lower() or buy_exchange in EXCHANGES['dex'] |
|
sell_is_dex = 'dex' in sell_exchange.lower() or sell_exchange in EXCHANGES['dex'] |
|
|
|
if buy_is_dex and sell_is_dex: |
|
return 'dex_dex' |
|
elif not buy_is_dex and not sell_is_dex: |
|
return 'cex_cex' |
|
else: |
|
return 'cex_dex' |
|
|
|
def _get_volatility_regime(self, market_conditions: Dict[str, Any]) -> str: |
|
"""Determine current volatility regime""" |
|
volatility = market_conditions.get('volatility', 0.02) |
|
|
|
if volatility < 0.015: |
|
return 'low_volatility' |
|
elif volatility < 0.03: |
|
return 'normal' |
|
elif volatility < 0.05: |
|
return 'high_volatility' |
|
else: |
|
return 'extreme' |
|
|
|
def optimize_execution_path(self, opportunities: List[ArbitrageOpportunity], |
|
current_positions: List[Dict[str, Any]], |
|
risk_parameters: Dict[str, Any]) -> List[ArbitrageOpportunity]: |
|
"""Optimize execution order considering dependencies and risk""" |
|
|
|
if not opportunities: |
|
return [] |
|
|
|
|
|
scored_opportunities = [] |
|
|
|
for opp in opportunities: |
|
|
|
profit_score = opp.expected_profit_pct |
|
latency_penalty = opp.latency_risk * 0.5 |
|
size_score = min(opp.max_size * opp.buy_price / risk_parameters['max_position_size'], 1.0) |
|
|
|
|
|
forecast_confidence = 1 - opp.metadata.get('forecast_uncertainty', 0.5) |
|
|
|
|
|
total_score = profit_score * (1 - latency_penalty) * size_score * forecast_confidence |
|
|
|
scored_opportunities.append((total_score, opp)) |
|
|
|
|
|
scored_opportunities.sort(key=lambda x: x[0], reverse=True) |
|
|
|
|
|
selected = [] |
|
simulated_positions = current_positions.copy() |
|
|
|
for score, opp in scored_opportunities: |
|
|
|
can_add, reason = self._can_add_opportunity( |
|
opp, simulated_positions, risk_parameters |
|
) |
|
|
|
if can_add: |
|
selected.append(opp) |
|
simulated_positions.append({ |
|
'asset': opp.asset, |
|
'size': opp.max_size, |
|
'entry_price': opp.buy_price |
|
}) |
|
|
|
if len(selected) >= risk_parameters['max_concurrent_trades']: |
|
break |
|
|
|
return selected |
|
|
|
def _can_add_opportunity(self, opportunity: ArbitrageOpportunity, |
|
positions: List[Dict[str, Any]], |
|
risk_parameters: Dict[str, Any]) -> Tuple[bool, str]: |
|
"""Check if opportunity can be added to positions""" |
|
|
|
|
|
for pos in positions: |
|
if pos['asset'] == opportunity.asset: |
|
return False, "Already have position in asset" |
|
|
|
|
|
current_exposure = sum(p['size'] * p['entry_price'] for p in positions) |
|
new_exposure = opportunity.max_size * opportunity.buy_price |
|
|
|
if current_exposure + new_exposure > risk_parameters['max_position_size'] * 5: |
|
return False, "Would exceed total exposure limit" |
|
|
|
return True, "OK" |
|
|
|
class CrossAssetArbitrageEngine: |
|
"""Main arbitrage engine coordinating all components""" |
|
|
|
def __init__(self): |
|
|
|
self.price_forecaster = PriceForecastingEngine() |
|
self.arbitrage_detector = ArbitrageDetector() |
|
self.strategy_optimizer = LLMStrategyOptimizer() |
|
self.risk_analytics = RiskAnalytics() |
|
self.execution_engine = LatencyAwareExecutionEngine() |
|
|
|
|
|
self.exchanges = {} |
|
for exchange in EXCHANGES['cex']: |
|
self.exchanges[exchange] = ExchangeSimulator('cex') |
|
for exchange in EXCHANGES['dex']: |
|
self.exchanges[exchange] = ExchangeSimulator('dex') |
|
|
|
|
|
self.active_positions = [] |
|
self.portfolio_value = 100000 |
|
self.performance_history = [] |
|
self.market_data_cache = {} |
|
self.forecasts_cache = {} |
|
|
|
def generate_market_data(self, assets: List[str], days: int = 100) -> Dict[str, pd.DataFrame]: |
|
"""Generate realistic correlated market data for multiple assets""" |
|
market_data = {} |
|
|
|
|
|
n_assets = len(assets) |
|
correlation_matrix = np.eye(n_assets) |
|
|
|
|
|
for i in range(n_assets): |
|
for j in range(i+1, n_assets): |
|
|
|
if (assets[i] in ASSET_CLASSES['crypto_spot'] and |
|
assets[j] in ASSET_CLASSES['crypto_spot']): |
|
corr = np.random.uniform(0.6, 0.9) |
|
|
|
elif (assets[i] in ASSET_CLASSES['fx_pairs'] and |
|
assets[j] in ASSET_CLASSES['fx_pairs']): |
|
corr = np.random.uniform(0.3, 0.6) |
|
|
|
else: |
|
corr = np.random.uniform(-0.2, 0.3) |
|
|
|
correlation_matrix[i, j] = corr |
|
correlation_matrix[j, i] = corr |
|
|
|
|
|
mean_returns = np.zeros(n_assets) |
|
volatilities = [] |
|
|
|
for asset in assets: |
|
if asset in ASSET_CLASSES['crypto_spot']: |
|
volatilities.append(0.015) |
|
elif asset in ASSET_CLASSES['fx_pairs']: |
|
volatilities.append(0.005) |
|
else: |
|
volatilities.append(0.01) |
|
|
|
cov_matrix = np.outer(volatilities, volatilities) * correlation_matrix |
|
|
|
|
|
returns = np.random.multivariate_normal(mean_returns, cov_matrix, days) |
|
|
|
|
|
for i, asset in enumerate(assets): |
|
|
|
if asset in ASSET_CLASSES['crypto_spot']: |
|
base_price = {'BTC': 45000, 'ETH': 3000, 'SOL': 100}.get(asset, 50) |
|
elif asset in ASSET_CLASSES['equity_etfs']: |
|
base_price = {'SPY': 450, 'QQQ': 380}.get(asset, 100) |
|
else: |
|
base_price = 1.0 |
|
|
|
|
|
prices = base_price * np.exp(np.cumsum(returns[:, i])) |
|
|
|
|
|
dates = pd.date_range(end=datetime.now(), periods=days, freq='H') |
|
|
|
data = pd.DataFrame({ |
|
'open': prices * (1 + np.random.normal(0, 0.002, days)), |
|
'high': prices * (1 + np.abs(np.random.normal(0, 0.005, days))), |
|
'low': prices * (1 - np.abs(np.random.normal(0, 0.005, days))), |
|
'close': prices, |
|
'volume': np.random.lognormal(15, 0.5, days) |
|
}, index=dates) |
|
|
|
|
|
data['high'] = data[['open', 'high', 'close']].max(axis=1) |
|
data['low'] = data[['open', 'low', 'close']].min(axis=1) |
|
|
|
market_data[asset] = data |
|
|
|
self.market_data_cache = market_data |
|
return market_data |
|
|
|
def update_order_books(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, OrderBook]]: |
|
"""Generate current order books for all exchanges""" |
|
order_books = defaultdict(dict) |
|
|
|
|
|
market_conditions = self.calculate_market_conditions(market_data) |
|
|
|
for asset, data in market_data.items(): |
|
current_price = data['close'].iloc[-1] |
|
|
|
|
|
for exchange_name, exchange in self.exchanges.items(): |
|
|
|
price_variation = np.random.normal(0, 0.0005) |
|
adjusted_price = current_price * (1 + price_variation) |
|
|
|
|
|
base_spread = 5 if exchange.exchange_type == 'cex' else 15 |
|
volatility_adjustment = 1 + market_conditions['volatility'] * 20 |
|
spread_bps = base_spread * volatility_adjustment |
|
|
|
order_book = exchange.generate_order_book( |
|
asset, adjusted_price, spread_bps, market_conditions |
|
) |
|
|
|
order_books[exchange_name][asset] = order_book |
|
|
|
return dict(order_books) |
|
|
|
def calculate_market_conditions(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: |
|
"""Calculate current market conditions""" |
|
|
|
volatilities = [] |
|
volumes = [] |
|
spreads = [] |
|
|
|
for asset, data in market_data.items(): |
|
returns = data['close'].pct_change().dropna() |
|
|
|
|
|
volatility = returns.iloc[-24:].std() * np.sqrt(365 * 24) |
|
volatilities.append(volatility) |
|
|
|
|
|
avg_volume = data['volume'].iloc[-24:].mean() |
|
volumes.append(avg_volume) |
|
|
|
|
|
spread = (data['high'] - data['low']).iloc[-24:].mean() / data['close'].iloc[-24:].mean() |
|
spreads.append(spread) |
|
|
|
return { |
|
'volatility': np.mean(volatilities), |
|
'max_volatility': np.max(volatilities), |
|
'liquidity': np.mean(volumes) / 1e6, |
|
'avg_spread': np.mean(spreads), |
|
'timestamp': datetime.now() |
|
} |
|
|
|
def generate_price_forecasts(self, market_data: Dict[str, pd.DataFrame]) -> Dict[str, Dict[str, np.ndarray]]: |
|
"""Generate price forecasts for all assets""" |
|
forecasts = {} |
|
|
|
for asset, data in market_data.items(): |
|
forecast = self.price_forecaster.forecast_prices(asset, data) |
|
forecasts[asset] = forecast |
|
|
|
self.forecasts_cache = forecasts |
|
return forecasts |
|
|
|
def run_arbitrage_cycle(self) -> Dict[str, Any]: |
|
"""Run complete arbitrage detection and execution cycle""" |
|
|
|
|
|
if not self.market_data_cache: |
|
assets = [] |
|
for asset_class, asset_list in ASSET_CLASSES.items(): |
|
assets.extend(asset_list[:2]) |
|
self.market_data_cache = self.generate_market_data(assets) |
|
|
|
market_data = self.market_data_cache |
|
|
|
|
|
forecasts = self.generate_price_forecasts(market_data) |
|
|
|
|
|
order_books = self.update_order_books(market_data) |
|
|
|
|
|
market_conditions = self.calculate_market_conditions(market_data) |
|
|
|
|
|
recent_executions = self.execution_engine.execution_history[-20:] |
|
strategy_params = self.strategy_optimizer.generate_parameter_suggestions( |
|
recent_executions, market_conditions |
|
) |
|
|
|
|
|
transaction_costs = { |
|
exchange: sim.transaction_cost |
|
for exchange, sim in self.exchanges.items() |
|
} |
|
|
|
opportunities = self.arbitrage_detector.find_opportunities( |
|
order_books, transaction_costs, forecasts |
|
) |
|
|
|
|
|
filtered_opportunities = [ |
|
opp for opp in opportunities |
|
if opp.expected_profit_pct >= strategy_params['min_profit_threshold'] |
|
] |
|
|
|
|
|
risk_metrics = self.risk_analytics.analyze_position_risk( |
|
self.active_positions, market_data |
|
) |
|
|
|
|
|
selected_opportunities = self.execution_engine.optimize_execution_path( |
|
filtered_opportunities, self.active_positions, strategy_params |
|
) |
|
|
|
|
|
execution_results = [] |
|
|
|
for opportunity in selected_opportunities: |
|
|
|
can_execute, reason = self.risk_analytics.check_risk_limits( |
|
opportunity, self.active_positions, strategy_params |
|
) |
|
|
|
if can_execute: |
|
|
|
buy_exchange = self.exchanges[opportunity.buy_exchange] |
|
sell_exchange = self.exchanges[opportunity.sell_exchange] |
|
|
|
result = self.execution_engine.simulate_execution( |
|
opportunity, buy_exchange, sell_exchange, market_conditions |
|
) |
|
|
|
execution_results.append(result) |
|
|
|
|
|
if result.success: |
|
self.active_positions.append({ |
|
'asset': opportunity.asset, |
|
'size': result.executed_size, |
|
'entry_price': result.buy_fill_price, |
|
'exit_price': result.sell_fill_price, |
|
'profit': result.realized_profit, |
|
'timestamp': result.timestamp |
|
}) |
|
|
|
|
|
self.portfolio_value += result.realized_profit |
|
|
|
|
|
self.active_positions = [p for p in self.active_positions |
|
if (datetime.now() - p['timestamp']).seconds < 300] |
|
|
|
|
|
cycle_summary = { |
|
'timestamp': datetime.now(), |
|
'opportunities_found': len(opportunities), |
|
'opportunities_executed': len(execution_results), |
|
'successful_executions': sum(1 for r in execution_results if r.success), |
|
'total_profit': sum(r.realized_profit for r in execution_results), |
|
'portfolio_value': self.portfolio_value, |
|
'risk_metrics': risk_metrics, |
|
'market_conditions': market_conditions, |
|
'strategy_parameters': strategy_params |
|
} |
|
|
|
self.performance_history.append(cycle_summary) |
|
|
|
return cycle_summary |
|
|
|
|
|
def create_opportunity_network(opportunities: List[ArbitrageOpportunity]) -> go.Figure: |
|
"""Create network visualization of arbitrage opportunities""" |
|
|
|
|
|
G = nx.Graph() |
|
|
|
|
|
for opp in opportunities: |
|
G.add_edge( |
|
opp.buy_exchange, |
|
opp.sell_exchange, |
|
weight=opp.expected_profit_pct, |
|
asset=opp.asset |
|
) |
|
|
|
if len(G.nodes()) == 0: |
|
|
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No arbitrage opportunities found", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, showarrow=False |
|
) |
|
return fig |
|
|
|
|
|
pos = nx.spring_layout(G, k=2, iterations=50) |
|
|
|
|
|
edge_trace = [] |
|
for edge in G.edges(data=True): |
|
x0, y0 = pos[edge[0]] |
|
x1, y1 = pos[edge[1]] |
|
|
|
edge_trace.append(go.Scatter( |
|
x=[x0, x1, None], |
|
y=[y0, y1, None], |
|
mode='lines', |
|
line=dict( |
|
width=edge[2]['weight'] * 100, |
|
color='rgba(125,125,125,0.5)' |
|
), |
|
hoverinfo='text', |
|
text=f"{edge[2]['asset']}: {edge[2]['weight']*100:.2f}%" |
|
)) |
|
|
|
|
|
node_trace = go.Scatter( |
|
x=[pos[node][0] for node in G.nodes()], |
|
y=[pos[node][1] for node in G.nodes()], |
|
mode='markers+text', |
|
text=[node for node in G.nodes()], |
|
textposition="top center", |
|
marker=dict( |
|
size=20, |
|
color=['red' if 'dex' in node.lower() else 'blue' for node in G.nodes()], |
|
line=dict(color='darkgray', width=2) |
|
), |
|
hoverinfo='text' |
|
) |
|
|
|
|
|
fig = go.Figure(data=edge_trace + [node_trace]) |
|
|
|
fig.update_layout( |
|
title="Arbitrage Opportunity Network", |
|
showlegend=False, |
|
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
height=500 |
|
) |
|
|
|
return fig |
|
|
|
def create_performance_dashboard(performance_history: List[Dict[str, Any]]) -> go.Figure: |
|
"""Create comprehensive performance dashboard""" |
|
|
|
if not performance_history: |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No performance data available", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, showarrow=False |
|
) |
|
return fig |
|
|
|
|
|
perf_df = pd.DataFrame(performance_history) |
|
|
|
|
|
fig = make_subplots( |
|
rows=3, cols=2, |
|
subplot_titles=( |
|
'Portfolio Value', 'Profit per Cycle', |
|
'Success Rate', 'Risk Metrics', |
|
'Opportunities vs Executions', 'Market Conditions' |
|
), |
|
specs=[ |
|
[{"type": "scatter"}, {"type": "scatter"}], |
|
[{"type": "scatter"}, {"type": "scatter"}], |
|
[{"type": "bar"}, {"type": "scatter"}] |
|
], |
|
vertical_spacing=0.1, |
|
horizontal_spacing=0.1 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=perf_df['timestamp'], |
|
y=perf_df['portfolio_value'], |
|
mode='lines', |
|
name='Portfolio Value', |
|
line=dict(color='blue', width=2) |
|
), |
|
row=1, col=1 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=perf_df['timestamp'], |
|
y=perf_df['total_profit'], |
|
mode='lines+markers', |
|
name='Profit', |
|
line=dict(color='green') |
|
), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
perf_df['success_rate'] = perf_df.apply( |
|
lambda x: x['successful_executions'] / x['opportunities_executed'] if x['opportunities_executed'] > 0 else 0, |
|
axis=1 |
|
) |
|
fig.add_trace( |
|
go.Scatter( |
|
x=perf_df['timestamp'], |
|
y=perf_df['success_rate'], |
|
mode='lines', |
|
name='Success Rate', |
|
line=dict(color='orange') |
|
), |
|
row=2, col=1 |
|
) |
|
|
|
|
|
sharpe_values = [m['sharpe_ratio'] for m in perf_df['risk_metrics']] |
|
fig.add_trace( |
|
go.Scatter( |
|
x=perf_df['timestamp'], |
|
y=sharpe_values, |
|
mode='lines', |
|
name='Sharpe Ratio', |
|
line=dict(color='purple') |
|
), |
|
row=2, col=2 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Bar( |
|
x=perf_df['timestamp'], |
|
y=perf_df['opportunities_found'], |
|
name='Found', |
|
marker_color='lightblue' |
|
), |
|
row=3, col=1 |
|
) |
|
fig.add_trace( |
|
go.Bar( |
|
x=perf_df['timestamp'], |
|
y=perf_df['opportunities_executed'], |
|
name='Executed', |
|
marker_color='darkblue' |
|
), |
|
row=3, col=1 |
|
) |
|
|
|
|
|
volatility_values = [m['volatility'] for m in perf_df['market_conditions']] |
|
fig.add_trace( |
|
go.Scatter( |
|
x=perf_df['timestamp'], |
|
y=volatility_values, |
|
mode='lines', |
|
name='Volatility', |
|
line=dict(color='red') |
|
), |
|
row=3, col=2 |
|
) |
|
|
|
|
|
fig.update_layout( |
|
height=1000, |
|
showlegend=False, |
|
title_text="Cross-Asset Arbitrage Performance Dashboard" |
|
) |
|
|
|
|
|
fig.update_xaxes(title_text="Time", row=3, col=1) |
|
fig.update_xaxes(title_text="Time", row=3, col=2) |
|
fig.update_yaxes(title_text="Value ($)", row=1, col=1) |
|
fig.update_yaxes(title_text="Profit ($)", row=1, col=2) |
|
fig.update_yaxes(title_text="Rate", row=2, col=1) |
|
fig.update_yaxes(title_text="Sharpe", row=2, col=2) |
|
fig.update_yaxes(title_text="Count", row=3, col=1) |
|
fig.update_yaxes(title_text="Volatility", row=3, col=2) |
|
|
|
return fig |
|
|
|
def create_execution_analysis(execution_history: List[ExecutionResult]) -> go.Figure: |
|
"""Create execution analysis visualization""" |
|
|
|
if not execution_history: |
|
fig = go.Figure() |
|
fig.add_annotation( |
|
text="No execution data available", |
|
xref="paper", yref="paper", |
|
x=0.5, y=0.5, showarrow=False |
|
) |
|
return fig |
|
|
|
|
|
exec_df = pd.DataFrame([ |
|
{ |
|
'timestamp': e.timestamp, |
|
'profit': e.realized_profit, |
|
'slippage': e.slippage, |
|
'latency': e.latency_ms, |
|
'success': e.success |
|
} |
|
for e in execution_history |
|
]) |
|
|
|
|
|
fig = make_subplots( |
|
rows=2, cols=2, |
|
subplot_titles=( |
|
'Profit Distribution', 'Slippage Analysis', |
|
'Latency Distribution', 'Success Rate Over Time' |
|
), |
|
specs=[ |
|
[{"type": "histogram"}, {"type": "scatter"}], |
|
[{"type": "histogram"}, {"type": "scatter"}] |
|
] |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Histogram( |
|
x=exec_df['profit'], |
|
nbinsx=30, |
|
name='Profit', |
|
marker_color='green' |
|
), |
|
row=1, col=1 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=exec_df['timestamp'], |
|
y=exec_df['slippage'] * 100, |
|
mode='markers', |
|
name='Slippage', |
|
marker=dict( |
|
color=exec_df['success'].map({True: 'blue', False: 'red'}), |
|
size=8 |
|
) |
|
), |
|
row=1, col=2 |
|
) |
|
|
|
|
|
fig.add_trace( |
|
go.Histogram( |
|
x=exec_df['latency'], |
|
nbinsx=30, |
|
name='Latency', |
|
marker_color='orange' |
|
), |
|
row=2, col=1 |
|
) |
|
|
|
|
|
exec_df['success_int'] = exec_df['success'].astype(int) |
|
exec_df['success_rate_rolling'] = exec_df['success_int'].rolling( |
|
window=20, min_periods=1 |
|
).mean() |
|
|
|
fig.add_trace( |
|
go.Scatter( |
|
x=exec_df['timestamp'], |
|
y=exec_df['success_rate_rolling'], |
|
mode='lines', |
|
name='Success Rate', |
|
line=dict(color='purple', width=2) |
|
), |
|
row=2, col=2 |
|
) |
|
|
|
|
|
fig.update_layout( |
|
height=700, |
|
showlegend=False, |
|
title_text="Execution Analysis" |
|
) |
|
|
|
|
|
fig.update_xaxes(title_text="Profit ($)", row=1, col=1) |
|
fig.update_xaxes(title_text="Time", row=1, col=2) |
|
fig.update_xaxes(title_text="Latency (ms)", row=2, col=1) |
|
fig.update_xaxes(title_text="Time", row=2, col=2) |
|
fig.update_yaxes(title_text="Count", row=1, col=1) |
|
fig.update_yaxes(title_text="Slippage (%)", row=1, col=2) |
|
fig.update_yaxes(title_text="Count", row=2, col=1) |
|
fig.update_yaxes(title_text="Success Rate", row=2, col=2) |
|
|
|
return fig |
|
|
|
|
|
def create_gradio_interface(): |
|
"""Create the main Gradio interface""" |
|
|
|
|
|
engine = CrossAssetArbitrageEngine() |
|
|
|
def run_arbitrage_simulation(n_cycles, initial_capital, min_profit_threshold): |
|
"""Run arbitrage simulation""" |
|
|
|
|
|
engine.portfolio_value = float(initial_capital) |
|
engine.performance_history = [] |
|
engine.execution_engine.execution_history = [] |
|
engine.active_positions = [] |
|
|
|
|
|
engine.strategy_optimizer.current_parameters['min_profit_threshold'] = float(min_profit_threshold) / 100 |
|
|
|
|
|
assets = [] |
|
for asset_class, asset_list in ASSET_CLASSES.items(): |
|
assets.extend(asset_list[:2]) |
|
|
|
engine.generate_market_data(assets, days=200) |
|
|
|
|
|
cycle_summaries = [] |
|
for i in range(int(n_cycles)): |
|
|
|
for asset, data in engine.market_data_cache.items(): |
|
|
|
last_price = data['close'].iloc[-1] |
|
new_return = np.random.normal(0.0001, 0.01) |
|
new_price = last_price * (1 + new_return) |
|
|
|
new_row = pd.DataFrame({ |
|
'open': [new_price * (1 + np.random.normal(0, 0.002))], |
|
'high': [new_price * (1 + abs(np.random.normal(0, 0.005)))], |
|
'low': [new_price * (1 - abs(np.random.normal(0, 0.005)))], |
|
'close': [new_price], |
|
'volume': [np.random.lognormal(15, 0.5)] |
|
}, index=[data.index[-1] + pd.Timedelta(hours=1)]) |
|
|
|
|
|
new_row['high'] = new_row[['open', 'high', 'close']].max(axis=1) |
|
new_row['low'] = new_row[['open', 'low', 'close']].min(axis=1) |
|
|
|
engine.market_data_cache[asset] = pd.concat([data, new_row]) |
|
|
|
|
|
engine.market_data_cache[asset] = engine.market_data_cache[asset].iloc[-200:] |
|
|
|
|
|
summary = engine.run_arbitrage_cycle() |
|
cycle_summaries.append(summary) |
|
|
|
|
|
opportunity_network = create_opportunity_network( |
|
engine.arbitrage_detector.opportunity_history[-50:] |
|
) |
|
|
|
performance_dashboard = create_performance_dashboard( |
|
engine.performance_history |
|
) |
|
|
|
execution_analysis = create_execution_analysis( |
|
engine.execution_engine.execution_history |
|
) |
|
|
|
|
|
total_profit = sum(s['total_profit'] for s in engine.performance_history) |
|
total_return = (engine.portfolio_value - initial_capital) / initial_capital |
|
|
|
if len(engine.execution_engine.execution_history) > 0: |
|
success_rate = sum( |
|
1 for e in engine.execution_engine.execution_history if e.success |
|
) / len(engine.execution_engine.execution_history) |
|
avg_latency = np.mean([ |
|
e.latency_ms for e in engine.execution_engine.execution_history |
|
]) |
|
avg_slippage = np.mean([ |
|
e.slippage for e in engine.execution_engine.execution_history |
|
]) |
|
else: |
|
success_rate = avg_latency = avg_slippage = 0 |
|
|
|
|
|
if engine.risk_analytics.risk_metrics_history: |
|
latest_risk = engine.risk_analytics.risk_metrics_history[-1] |
|
sharpe = latest_risk['sharpe_ratio'] |
|
var_95 = latest_risk['var_95'] |
|
else: |
|
sharpe = var_95 = 0 |
|
|
|
summary_text = f""" |
|
### Simulation Summary |
|
|
|
**Performance Metrics:** |
|
- Total Profit: ${total_profit:,.2f} |
|
- Total Return: {total_return*100:.2f}% |
|
- Final Portfolio Value: ${engine.portfolio_value:,.2f} |
|
- Sharpe Ratio: {sharpe:.2f} |
|
- VaR (95%): {var_95*100:.2f}% |
|
|
|
**Execution Statistics:** |
|
- Total Opportunities Found: {sum(s['opportunities_found'] for s in engine.performance_history)} |
|
- Total Executions: {len(engine.execution_engine.execution_history)} |
|
- Success Rate: {success_rate*100:.1f}% |
|
- Average Latency: {avg_latency:.0f}ms |
|
- Average Slippage: {avg_slippage*100:.2f}% |
|
|
|
**Active Positions:** {len(engine.active_positions)} |
|
""" |
|
|
|
|
|
recent_opps = [] |
|
for opp in engine.arbitrage_detector.opportunity_history[-10:]: |
|
recent_opps.append({ |
|
'Asset': opp.asset, |
|
'Buy Exchange': opp.buy_exchange, |
|
'Sell Exchange': opp.sell_exchange, |
|
'Spread': f"{(opp.sell_price - opp.buy_price)/opp.buy_price*100:.2f}%", |
|
'Expected Profit': f"${opp.expected_profit:.2f}", |
|
'Latency Risk': f"{opp.latency_risk:.2f}" |
|
}) |
|
|
|
opportunities_df = pd.DataFrame(recent_opps) if recent_opps else pd.DataFrame() |
|
|
|
return (opportunity_network, performance_dashboard, execution_analysis, |
|
summary_text, opportunities_df) |
|
|
|
def analyze_strategy_parameters(): |
|
"""Analyze current strategy parameters""" |
|
|
|
if not engine.strategy_optimizer.parameter_history: |
|
return "No parameter history available", "" |
|
|
|
|
|
param_history = engine.strategy_optimizer.parameter_history.get('suggestions', []) |
|
|
|
if not param_history: |
|
return "No parameter suggestions generated", "" |
|
|
|
|
|
param_df = pd.DataFrame([ |
|
{ |
|
'timestamp': entry['timestamp'], |
|
**entry['parameters'] |
|
} |
|
for entry in param_history |
|
]) |
|
|
|
fig = make_subplots( |
|
rows=2, cols=2, |
|
subplot_titles=( |
|
'Profit Threshold', 'Position Size', |
|
'Risk Limit', 'Confidence Threshold' |
|
) |
|
) |
|
|
|
|
|
params_to_plot = [ |
|
('min_profit_threshold', 1, 1, 'Threshold'), |
|
('max_position_size', 1, 2, 'Size ($)'), |
|
('risk_limit', 2, 1, 'Limit'), |
|
('confidence_threshold', 2, 2, 'Threshold') |
|
] |
|
|
|
for param, row, col, ylabel in params_to_plot: |
|
if param in param_df.columns: |
|
fig.add_trace( |
|
go.Scatter( |
|
x=param_df['timestamp'], |
|
y=param_df[param], |
|
mode='lines+markers', |
|
name=param |
|
), |
|
row=row, col=col |
|
) |
|
|
|
fig.update_layout( |
|
height=600, |
|
showlegend=False, |
|
title_text="Strategy Parameter Evolution" |
|
) |
|
|
|
|
|
latest_reasoning = param_history[-1]['reasoning'] if param_history else "No reasoning available" |
|
|
|
return fig, f"**Latest Optimization Reasoning:** {latest_reasoning}" |
|
|
|
|
|
with gr.Blocks(title="Cross-Asset Arbitrage Engine") as interface: |
|
gr.Markdown(""" |
|
# Cross-Asset Arbitrage Engine with Transformer Models |
|
|
|
This sophisticated arbitrage engine leverages transformer models for price forecasting across multiple asset classes: |
|
- **Crypto Spot/Futures**: BTC, ETH, SOL and perpetual futures |
|
- **Foreign Exchange**: Major currency pairs |
|
- **Equity ETFs**: SPY, QQQ, IWM and international markets |
|
|
|
Features: |
|
- **Transformer Price Prediction**: Numerical-adapted transformers for multi-horizon forecasting |
|
- **Cross-Venue Execution**: Simulates CEX (Binance, Coinbase) and DEX (Uniswap V3) integration |
|
- **LLM Strategy Optimization**: Dynamic parameter adjustment based on performance |
|
- **Latency-Aware Execution**: Realistic order routing with slippage simulation |
|
- **Comprehensive Risk Analytics**: Real-time VaR, Sharpe ratio, and drawdown monitoring |
|
|
|
Author: Spencer Purdy |
|
""") |
|
|
|
with gr.Tab("Arbitrage Simulation"): |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
n_cycles = gr.Slider( |
|
minimum=10, maximum=100, value=50, step=10, |
|
label="Number of Trading Cycles" |
|
) |
|
initial_capital = gr.Number( |
|
value=100000, label="Initial Capital ($)", minimum=10000 |
|
) |
|
min_profit = gr.Slider( |
|
minimum=0.1, maximum=1.0, value=0.2, step=0.1, |
|
label="Minimum Profit Threshold (%)" |
|
) |
|
|
|
run_btn = gr.Button("Run Simulation", variant="primary") |
|
|
|
with gr.Row(): |
|
opportunity_network = gr.Plot(label="Arbitrage Opportunity Network") |
|
|
|
with gr.Row(): |
|
performance_dashboard = gr.Plot(label="Performance Dashboard") |
|
|
|
with gr.Row(): |
|
execution_analysis = gr.Plot(label="Execution Analysis") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
summary_display = gr.Markdown(label="Summary Statistics") |
|
|
|
with gr.Column(scale=1): |
|
opportunities_table = gr.DataFrame( |
|
label="Recent Arbitrage Opportunities" |
|
) |
|
|
|
with gr.Tab("Strategy Analysis"): |
|
with gr.Row(): |
|
analyze_btn = gr.Button("Analyze Strategy Parameters", variant="primary") |
|
|
|
with gr.Row(): |
|
param_evolution = gr.Plot(label="Parameter Evolution") |
|
|
|
with gr.Row(): |
|
param_reasoning = gr.Markdown(label="Optimization Reasoning") |
|
|
|
|
|
run_btn.click( |
|
fn=run_arbitrage_simulation, |
|
inputs=[n_cycles, initial_capital, min_profit], |
|
outputs=[ |
|
opportunity_network, performance_dashboard, |
|
execution_analysis, summary_display, opportunities_table |
|
] |
|
) |
|
|
|
analyze_btn.click( |
|
fn=analyze_strategy_parameters, |
|
inputs=[], |
|
outputs=[param_evolution, param_reasoning] |
|
) |
|
|
|
|
|
gr.Examples( |
|
examples=[ |
|
[50, 100000, 0.2], |
|
[30, 50000, 0.3], |
|
[100, 200000, 0.15] |
|
], |
|
inputs=[n_cycles, initial_capital, min_profit] |
|
) |
|
|
|
return interface |
|
|
|
|
|
if __name__ == "__main__": |
|
interface = create_gradio_interface() |
|
interface.launch() |