File size: 4,976 Bytes
859af74 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
import pandas as pd
import logging
import os
from typing import Dict, Any
from .synthetic_data_generator import SyntheticDataGenerator
logger = logging.getLogger(__name__)
def load_data(config: Dict[str, Any]) -> pd.DataFrame:
"""
Load market data from file or generate synthetic data if needed.
Args:
config: Configuration dictionary
Returns:
DataFrame with market data
"""
logger.info("Starting data ingestion process")
try:
data_source = config['data_source']
data_type = data_source['type']
if data_type == 'csv':
return _load_csv_data(config)
elif data_type == 'synthetic':
return _generate_synthetic_data(config)
else:
raise ValueError(f"Unsupported data source type: {data_type}")
except Exception as e:
logger.error(f"Error in data ingestion: {e}", exc_info=True)
raise
def _load_csv_data(config: Dict[str, Any]) -> pd.DataFrame:
"""Load data from CSV file"""
path = config['data_source']['path']
if not os.path.exists(path):
logger.warning(f"CSV file not found at {path}, generating synthetic data instead")
return _generate_synthetic_data(config)
logger.info(f"Loading data from CSV: {path}")
df = pd.read_csv(path)
# Validate data
required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.warning(f"Missing columns in CSV: {missing_columns}")
logger.info("Generating synthetic data instead")
return _generate_synthetic_data(config)
# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
logger.info(f"Successfully loaded {len(df)} data points from CSV")
return df
def _generate_synthetic_data(config: Dict[str, Any]) -> pd.DataFrame:
"""Generate synthetic data using the SyntheticDataGenerator"""
logger.info("Generating synthetic market data")
try:
# Create data directory if it doesn't exist
data_path = config['synthetic_data']['data_path']
os.makedirs(os.path.dirname(data_path), exist_ok=True)
# Initialize synthetic data generator
generator = SyntheticDataGenerator(config)
# Generate OHLCV data
df = generator.generate_ohlcv_data(
symbol=config['trading']['symbol'],
start_date='2024-01-01',
end_date='2024-12-31',
frequency=config['trading']['timeframe']
)
# Save to CSV if configured
if config['synthetic_data'].get('generate_data', True):
generator.save_to_csv(df, data_path)
logger.info(f"Saved synthetic data to {data_path}")
return df
except Exception as e:
logger.error(f"Error generating synthetic data: {e}", exc_info=True)
raise
def validate_data(df: pd.DataFrame) -> bool:
"""
Validate the loaded data for required fields and data quality.
Args:
df: DataFrame to validate
Returns:
True if data is valid, False otherwise
"""
logger.info("Validating data quality")
try:
# Check for required columns
required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
logger.error(f"Missing required columns: {missing_columns}")
return False
# Check for null values
null_counts = df[required_columns].isnull().sum()
if null_counts.sum() > 0:
logger.warning(f"Found null values: {null_counts.to_dict()}")
# Check for negative prices
price_columns = ['open', 'high', 'low', 'close']
negative_prices = df[price_columns].lt(0).any().any()
if negative_prices:
logger.error("Found negative prices in data")
return False
# Check for negative volumes
if (df['volume'] < 0).any():
logger.error("Found negative volumes in data")
return False
# Check OHLC consistency
invalid_ohlc = (
(df['high'] < df['low']) |
(df['open'] > df['high']) |
(df['open'] < df['low']) |
(df['close'] > df['high']) |
(df['close'] < df['low'])
)
if invalid_ohlc.any():
logger.error(f"Found {invalid_ohlc.sum()} rows with invalid OHLC data")
return False
logger.info("Data validation passed")
return True
except Exception as e:
logger.error(f"Error during data validation: {e}", exc_info=True)
return False
|