add: metadata regeneration from parquet datasets when missing
This commit is contained in:
parent
6f63736880
commit
fc06b3d69f
1
bun.lock
1
bun.lock
@ -1,5 +1,6 @@
|
||||
{
|
||||
"lockfileVersion": 1,
|
||||
"configVersion": 0,
|
||||
"workspaces": {
|
||||
"": {
|
||||
"name": "cvsa",
|
||||
|
||||
@ -155,6 +155,63 @@ class ParquetDatasetStorage:
|
||||
logger.error(f"Failed to save dataset {dataset_id}: {e}")
|
||||
return False
|
||||
|
||||
def _regenerate_metadata_from_parquet(self, dataset_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Regenerate metadata from parquet file when metadata JSON is missing or corrupted
|
||||
|
||||
Args:
|
||||
dataset_id: Dataset ID
|
||||
|
||||
Returns:
|
||||
Dict: Regenerated metadata, or None if failed
|
||||
"""
|
||||
try:
|
||||
data_file, metadata_file = self._get_dataset_files(dataset_id)
|
||||
if not data_file.exists():
|
||||
logger.warning(f"Parquet file not found for dataset {dataset_id}, cannot regenerate metadata")
|
||||
return None
|
||||
|
||||
# Read parquet file to extract basic information
|
||||
table = pq.read_table(data_file, columns=['aid', 'label', 'embedding', 'metadata_json', 'user_labels_json'])
|
||||
|
||||
# Extract embedding dimension from first row
|
||||
first_embedding = table.column('embedding')[0]
|
||||
embedding_dim = len(first_embedding) if first_embedding else 0
|
||||
|
||||
# Get total records count
|
||||
total_records = len(table)
|
||||
|
||||
# Generate regenerated metadata
|
||||
regenerated_metadata = {
|
||||
'dataset_id': dataset_id,
|
||||
'description': f'Auto-regenerated metadata for dataset {dataset_id}',
|
||||
'stats': {
|
||||
'regenerated': True,
|
||||
'regeneration_reason': 'missing_or_corrupted_metadata_file'
|
||||
},
|
||||
'created_at': datetime.fromtimestamp(data_file.stat().st_mtime).isoformat(),
|
||||
'file_format': 'parquet_v1',
|
||||
'embedding_dimension': embedding_dim,
|
||||
'total_records': total_records,
|
||||
'columns': ['aid', 'label', 'inconsistent', 'text_checksum', 'embedding', 'metadata_json', 'user_labels_json'],
|
||||
'file_size_bytes': data_file.stat().st_size,
|
||||
'compression': 'zstd'
|
||||
}
|
||||
|
||||
# Save regenerated metadata to file
|
||||
with open(metadata_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(regenerated_metadata, f, ensure_ascii=False, indent=2)
|
||||
|
||||
# Update cache
|
||||
self.metadata_cache[dataset_id] = regenerated_metadata
|
||||
|
||||
logger.info(f"Successfully regenerated metadata for dataset {dataset_id} from parquet file")
|
||||
return regenerated_metadata
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to regenerate metadata for dataset {dataset_id}: {e}")
|
||||
return None
|
||||
|
||||
def load_dataset_metadata(self, dataset_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""
|
||||
Quickly load dataset metadata (without loading the entire file)
|
||||
@ -172,7 +229,9 @@ class ParquetDatasetStorage:
|
||||
# Load from file
|
||||
_, metadata_file = self._get_dataset_files(dataset_id)
|
||||
if not metadata_file.exists():
|
||||
return None
|
||||
# Try to regenerate metadata from parquet file
|
||||
logger.warning(f"Metadata file not found for dataset {dataset_id}, attempting to regenerate from parquet")
|
||||
return self._regenerate_metadata_from_parquet(dataset_id)
|
||||
|
||||
try:
|
||||
with open(metadata_file, 'r', encoding='utf-8') as f:
|
||||
@ -184,7 +243,9 @@ class ParquetDatasetStorage:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load metadata for {dataset_id}: {e}")
|
||||
return None
|
||||
# Try to regenerate metadata from parquet file
|
||||
logger.warning(f"Metadata file corrupted for dataset {dataset_id}, attempting to regenerate from parquet")
|
||||
return self._regenerate_metadata_from_parquet(dataset_id)
|
||||
|
||||
def load_dataset_partial(self, dataset_id: str, columns: Optional[List[str]] = None,
|
||||
filters: Optional[Dict[str, Any]] = None) -> Optional[pd.DataFrame]:
|
||||
@ -338,17 +399,29 @@ class ParquetDatasetStorage:
|
||||
"""List metadata for all datasets"""
|
||||
datasets = []
|
||||
|
||||
for dataset_id, metadata in self.metadata_cache.items():
|
||||
datasets.append({
|
||||
"dataset_id": dataset_id,
|
||||
"description": metadata.get("description"),
|
||||
"stats": metadata.get("stats", {}),
|
||||
"created_at": metadata.get("created_at"),
|
||||
"total_records": metadata.get("total_records", 0),
|
||||
"file_size_mb": round(metadata.get("file_size_bytes", 0) / (1024 * 1024), 2),
|
||||
"embedding_dimension": metadata.get("embedding_dimension"),
|
||||
"file_format": metadata.get("file_format")
|
||||
})
|
||||
# Scan for all parquet files and try to load their metadata
|
||||
for parquet_file in self.storage_dir.glob("*.parquet"):
|
||||
try:
|
||||
# Extract dataset_id from filename (remove .parquet extension)
|
||||
dataset_id = parquet_file.stem
|
||||
print(dataset_id)
|
||||
|
||||
# Try to load metadata, this will automatically regenerate if missing
|
||||
metadata = self.load_dataset_metadata(dataset_id)
|
||||
if metadata:
|
||||
datasets.append({
|
||||
"dataset_id": dataset_id,
|
||||
"description": metadata.get("description"),
|
||||
"stats": metadata.get("stats", {}),
|
||||
"created_at": metadata.get("created_at"),
|
||||
"total_records": metadata.get("total_records", 0),
|
||||
"file_size_mb": round(metadata.get("file_size_bytes", 0) / (1024 * 1024), 2),
|
||||
"embedding_dimension": metadata.get("embedding_dimension"),
|
||||
"file_format": metadata.get("file_format")
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to process dataset {parquet_file.stem}: {e}")
|
||||
continue
|
||||
|
||||
# Sort by creation time descending
|
||||
datasets.sort(key=lambda x: x["created_at"], reverse=True)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user