Testys commited on
Commit
4e62c61
·
1 Parent(s): d286a45

Update search_utils.py

Browse files
Files changed (1) hide show
  1. search_utils.py +53 -18
search_utils.py CHANGED
@@ -40,11 +40,11 @@ class MetadataManager:
40
  self.shard_dir.mkdir(parents=True, exist_ok=True)
41
 
42
  def _unzip_if_needed(self):
43
- """Handle ZIP extraction with validation and retries"""
44
  zip_path = Path("metadata_shards.zip")
45
 
46
- # Check if we need to unzip
47
- if not any(self.shard_dir.glob("*.parquet")):
48
  logger.info("No parquet files found, checking for zip archive")
49
 
50
  if not zip_path.exists():
@@ -53,28 +53,63 @@ class MetadataManager:
53
  logger.info(f"Extracting {zip_path} to {self.shard_dir}")
54
  try:
55
  with zipfile.ZipFile(zip_path, 'r') as zip_ref:
56
- # Validate zip contents before extraction
57
- zip_files = zip_ref.namelist()
58
- if not any(fname.endswith('.parquet') for fname in zip_files):
59
- raise ValueError("ZIP file contains no parquet files")
60
-
61
  zip_ref.extractall(self.shard_dir)
62
- logger.info(f"Extracted {len(zip_files)} files")
63
 
64
- # Verify extraction succeeded
65
- if not any(self.shard_dir.glob("*.parquet")):
 
 
 
 
 
 
 
 
 
66
  raise RuntimeError("Extraction completed but no parquet files found")
67
 
 
 
68
  except Exception as e:
69
  logger.error(f"Failed to extract zip file: {str(e)}")
70
- # Clean up partial extraction
71
- if any(self.shard_dir.iterdir()):
72
- for f in self.shard_dir.glob("*"):
73
- f.unlink()
74
  raise
75
-
76
- else:
77
- logger.info("Parquet files already exist in cache directory")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78
 
79
  def _build_shard_map(self):
80
  """Create validated index range to shard mapping"""
 
40
  self.shard_dir.mkdir(parents=True, exist_ok=True)
41
 
42
  def _unzip_if_needed(self):
43
+ """Handle ZIP extraction with nested directory handling"""
44
  zip_path = Path("metadata_shards.zip")
45
 
46
+ # Check if we need to unzip by looking for parquet files in any subdirectory
47
+ if not any(self.shard_dir.rglob("*.parquet")):
48
  logger.info("No parquet files found, checking for zip archive")
49
 
50
  if not zip_path.exists():
 
53
  logger.info(f"Extracting {zip_path} to {self.shard_dir}")
54
  try:
55
  with zipfile.ZipFile(zip_path, 'r') as zip_ref:
56
+ # Check for nested directory structure in zip
57
+ zip_root = self._get_zip_root(zip_ref)
58
+
59
+ # Extract while preserving structure
 
60
  zip_ref.extractall(self.shard_dir)
 
61
 
62
+ # Handle nested directory if exists
63
+ if zip_root:
64
+ nested_dir = self.shard_dir / zip_root
65
+ if nested_dir.exists():
66
+ # Move files up from nested directory
67
+ self._flatten_directory(nested_dir, self.shard_dir)
68
+ nested_dir.rmdir()
69
+
70
+ # Verify extraction
71
+ parquet_files = list(self.shard_dir.rglob("*.parquet"))
72
+ if not parquet_files:
73
  raise RuntimeError("Extraction completed but no parquet files found")
74
 
75
+ logger.info(f"Found {len(parquet_files)} parquet files after extraction")
76
+
77
  except Exception as e:
78
  logger.error(f"Failed to extract zip file: {str(e)}")
79
+ self._clean_failed_extraction()
 
 
 
80
  raise
81
+
82
+ def _get_zip_root(self, zip_ref):
83
+ """Identify common root directory in zip file"""
84
+ try:
85
+ first_file = zip_ref.namelist()[0]
86
+ if '/' in first_file:
87
+ return first_file.split('/')[0]
88
+ return ""
89
+ except Exception as e:
90
+ logger.warning(f"Error detecting zip root: {str(e)}")
91
+ return ""
92
+
93
+ def _flatten_directory(self, src_dir, dest_dir):
94
+ """Move files from nested directory to destination"""
95
+ for item in src_dir.iterdir():
96
+ if item.is_dir():
97
+ self._flatten_directory(item, dest_dir)
98
+ item.rmdir()
99
+ else:
100
+ target = dest_dir / item.name
101
+ if target.exists():
102
+ target.unlink()
103
+ item.rename(target)
104
+
105
+ def _clean_failed_extraction(self):
106
+ """Remove any extracted files after failed attempt"""
107
+ logger.info("Cleaning up failed extraction")
108
+ for item in self.shard_dir.iterdir():
109
+ if item.is_dir():
110
+ shutil.rmtree(item)
111
+ else:
112
+ item.unlink()
113
 
114
  def _build_shard_map(self):
115
  """Create validated index range to shard mapping"""