File size: 2,950 Bytes
748bb2f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# diagnostics.py
import logging
import time
import os
from pathlib import Path
import pandas as pd
import streamlit as st

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("metadata_manager.log"),
        logging.StreamHandler()
    ]
)

def diagnose_parquet_files(directory_path):
    """Diagnostic tool to verify parquet files are readable and valid"""
    logger = logging.getLogger("ParquetDiagnostic")
    logger.info(f"Starting parquet file diagnostics in {directory_path}")
    
    dir_path = Path(directory_path)
    if not dir_path.exists():
        logger.error(f"Directory does not exist: {dir_path}")
        return False
    
    all_files = list(dir_path.glob("*.parquet"))
    logger.info(f"Found {len(all_files)} parquet files")
    
    if not all_files:
        logger.warning("No parquet files found")
        return False
    
    success_count = 0
    issue_count = 0
    total_rows = 0
    
    for file_path in all_files:
        logger.info(f"Diagnosing file: {file_path}")
        file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
        logger.info(f"File size: {file_size_mb:.2f} MB")
        
        try:
            # Try to read the file metadata without loading the data
            import pyarrow.parquet as pq
            parquet_file = pq.ParquetFile(file_path)
            
            # Log schema information
            schema = parquet_file.schema.to_arrow_schema()
            logger.info(f"Schema: {schema}")
            
            # Log file metadata
            metadata = parquet_file.metadata
            num_rows = metadata.num_rows
            num_columns = len(schema.names)
            logger.info(f"Rows: {num_rows}, Columns: {num_columns}")
            
            # Try to read a small sample to verify data can be loaded
            sample_df = pd.read_parquet(file_path, engine='pyarrow')
            actual_rows = len(sample_df)
            
            logger.info(f"Successfully read {actual_rows} rows")
            total_rows += actual_rows
            success_count += 1
            
        except Exception as e:
            logger.error(f"Failed to read file {file_path}: {str(e)}", exc_info=True)
            issue_count += 1
            
            # Try alternate engines if primary fails
            try:
                logger.info("Attempting to read with fastparquet engine")
                sample_df = pd.read_parquet(file_path, engine='fastparquet')
                logger.info(f"fastparquet succeeded, read {len(sample_df)} rows")
            except Exception as e2:
                logger.error(f"fastparquet also failed: {str(e2)}")
    
    logger.info(f"Diagnostics complete: {success_count} files OK, {issue_count} files with issues")
    logger.info(f"Total rows across all files: {total_rows}")
    
    return success_count > 0