diff --git a/bun.lock b/bun.lock index df0a796..f72d880 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "cvsa", diff --git a/ml_new/data/dataset_storage_parquet.py b/ml_new/data/dataset_storage_parquet.py index 87e9402..6a7d449 100644 --- a/ml_new/data/dataset_storage_parquet.py +++ b/ml_new/data/dataset_storage_parquet.py @@ -155,6 +155,63 @@ class ParquetDatasetStorage: logger.error(f"Failed to save dataset {dataset_id}: {e}") return False + def _regenerate_metadata_from_parquet(self, dataset_id: str) -> Optional[Dict[str, Any]]: + """ + Regenerate metadata from parquet file when metadata JSON is missing or corrupted + + Args: + dataset_id: Dataset ID + + Returns: + Dict: Regenerated metadata, or None if failed + """ + try: + data_file, metadata_file = self._get_dataset_files(dataset_id) + if not data_file.exists(): + logger.warning(f"Parquet file not found for dataset {dataset_id}, cannot regenerate metadata") + return None + + # Read parquet file to extract basic information + table = pq.read_table(data_file, columns=['aid', 'label', 'embedding', 'metadata_json', 'user_labels_json']) + + # Extract embedding dimension from first row + first_embedding = table.column('embedding')[0] + embedding_dim = len(first_embedding) if first_embedding else 0 + + # Get total records count + total_records = len(table) + + # Generate regenerated metadata + regenerated_metadata = { + 'dataset_id': dataset_id, + 'description': f'Auto-regenerated metadata for dataset {dataset_id}', + 'stats': { + 'regenerated': True, + 'regeneration_reason': 'missing_or_corrupted_metadata_file' + }, + 'created_at': datetime.fromtimestamp(data_file.stat().st_mtime).isoformat(), + 'file_format': 'parquet_v1', + 'embedding_dimension': embedding_dim, + 'total_records': total_records, + 'columns': ['aid', 'label', 'inconsistent', 'text_checksum', 'embedding', 'metadata_json', 'user_labels_json'], + 'file_size_bytes': data_file.stat().st_size, + 'compression': 'zstd' + } + + # Save regenerated metadata to file + with open(metadata_file, 'w', encoding='utf-8') as f: + json.dump(regenerated_metadata, f, ensure_ascii=False, indent=2) + + # Update cache + self.metadata_cache[dataset_id] = regenerated_metadata + + logger.info(f"Successfully regenerated metadata for dataset {dataset_id} from parquet file") + return regenerated_metadata + + except Exception as e: + logger.error(f"Failed to regenerate metadata for dataset {dataset_id}: {e}") + return None + def load_dataset_metadata(self, dataset_id: str) -> Optional[Dict[str, Any]]: """ Quickly load dataset metadata (without loading the entire file) @@ -172,7 +229,9 @@ class ParquetDatasetStorage: # Load from file _, metadata_file = self._get_dataset_files(dataset_id) if not metadata_file.exists(): - return None + # Try to regenerate metadata from parquet file + logger.warning(f"Metadata file not found for dataset {dataset_id}, attempting to regenerate from parquet") + return self._regenerate_metadata_from_parquet(dataset_id) try: with open(metadata_file, 'r', encoding='utf-8') as f: @@ -184,7 +243,9 @@ class ParquetDatasetStorage: except Exception as e: logger.error(f"Failed to load metadata for {dataset_id}: {e}") - return None + # Try to regenerate metadata from parquet file + logger.warning(f"Metadata file corrupted for dataset {dataset_id}, attempting to regenerate from parquet") + return self._regenerate_metadata_from_parquet(dataset_id) def load_dataset_partial(self, dataset_id: str, columns: Optional[List[str]] = None, filters: Optional[Dict[str, Any]] = None) -> Optional[pd.DataFrame]: @@ -338,17 +399,29 @@ class ParquetDatasetStorage: """List metadata for all datasets""" datasets = [] - for dataset_id, metadata in self.metadata_cache.items(): - datasets.append({ - "dataset_id": dataset_id, - "description": metadata.get("description"), - "stats": metadata.get("stats", {}), - "created_at": metadata.get("created_at"), - "total_records": metadata.get("total_records", 0), - "file_size_mb": round(metadata.get("file_size_bytes", 0) / (1024 * 1024), 2), - "embedding_dimension": metadata.get("embedding_dimension"), - "file_format": metadata.get("file_format") - }) + # Scan for all parquet files and try to load their metadata + for parquet_file in self.storage_dir.glob("*.parquet"): + try: + # Extract dataset_id from filename (remove .parquet extension) + dataset_id = parquet_file.stem + print(dataset_id) + + # Try to load metadata, this will automatically regenerate if missing + metadata = self.load_dataset_metadata(dataset_id) + if metadata: + datasets.append({ + "dataset_id": dataset_id, + "description": metadata.get("description"), + "stats": metadata.get("stats", {}), + "created_at": metadata.get("created_at"), + "total_records": metadata.get("total_records", 0), + "file_size_mb": round(metadata.get("file_size_bytes", 0) / (1024 * 1024), 2), + "embedding_dimension": metadata.get("embedding_dimension"), + "file_format": metadata.get("file_format") + }) + except Exception as e: + logger.warning(f"Failed to process dataset {parquet_file.stem}: {e}") + continue # Sort by creation time descending datasets.sort(key=lambda x: x["created_at"], reverse=True)