datasets.py•5.82 kB
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional
import pandas as pd
DATA_DIR = Path(__file__).resolve().parent.parent / "data"
def _load_metadata_cache() -> Dict[str, dict]:
"""Read *.meta.json descriptors into a lookup keyed by dataset file name."""
metadata: Dict[str, dict] = {}
if not DATA_DIR.exists():
return metadata
for meta_path in sorted(DATA_DIR.glob("*.meta.json")):
try:
payload = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception as exc: # pragma: no cover - rare error path
metadata[meta_path.stem] = {
"error": f"Failed to parse {meta_path.name}: {exc}",
"__source": str(meta_path),
}
continue
descriptor = dict(payload)
descriptor.setdefault("__source", str(meta_path))
dataset_key = meta_path.name.replace(".meta.json", "")
metadata[dataset_key] = descriptor
file_name = payload.get("file_name")
if isinstance(file_name, str) and file_name:
metadata.setdefault(file_name, descriptor)
return metadata
_METADATA_CACHE: Dict[str, dict] = _load_metadata_cache()
_METADATA_KEYS: List[str] = sorted(_METADATA_CACHE)
def metadata_keys() -> List[str]:
"""Return sorted dataset metadata keys."""
return list(_METADATA_KEYS)
def get_metadata(name: str) -> Optional[dict]:
"""Fetch metadata for a dataset, if available."""
return _METADATA_CACHE.get(name)
def metadata_cache() -> Dict[str, dict]:
"""Expose a copy of the metadata cache for callers that need full access."""
return dict(_METADATA_CACHE)
class DatasetLoadError(RuntimeError):
"""Raised when a dataset cannot be materialised into a DataFrame."""
_DELIMITER_MAP = {
"TAB": "\t",
"COMMA": ",",
"PIPE": "|",
"SEMICOLON": ";",
"SPACE": " ",
}
@dataclass
class LoadedDataset:
name: str
frame: pd.DataFrame
source_path: Path
metadata: dict
def _normalise_name(name: str) -> str:
name = name.strip()
if name.endswith(".meta.json"):
name = name[:-10]
return name
def _resolve_dataset_path(name: str, meta: dict) -> Path:
candidates: Iterable[Path] = []
candidates = [
DATA_DIR / name,
]
file_name = meta.get("file_name")
if isinstance(file_name, str) and file_name:
candidates.append(DATA_DIR / file_name)
for path in candidates:
if path.exists():
return path
raise DatasetLoadError(f"No data file found for '{name}'. Checked: {', '.join(map(str, candidates))}.")
def _detect_git_lfs_pointer(path: Path) -> bool:
try:
with path.open("rb") as handle:
header = handle.read(256)
except OSError as exc: # pragma: no cover
raise DatasetLoadError(f"Unable to read {path}: {exc}") from exc
return header.startswith(b"version https://git-lfs.github.com/spec/v1")
def _resolve_delimiter(meta: dict) -> tuple[Optional[str], bool]:
raw = meta.get("delimiter")
if not raw:
return None, False
raw_upper = str(raw).strip().upper()
if raw_upper == "NEWLINE":
return None, True
return _DELIMITER_MAP.get(raw_upper, None), False
def load_dataset(name: str, *, index_col: Optional[str] = None) -> LoadedDataset:
"""Load a dataset into a pandas DataFrame using its metadata."""
key = _normalise_name(name)
meta = get_metadata(key)
if meta is None:
raise DatasetLoadError(f"No metadata found for '{key}'. Available: {metadata_keys()}")
path = _resolve_dataset_path(key, meta)
if _detect_git_lfs_pointer(path):
raise DatasetLoadError(f"Dataset '{key}' points to a Git LFS object that is not available locally ({path.name}).")
delimiter, newline_mode = _resolve_delimiter(meta)
encoding = meta.get("encoding", "utf-8")
fields = list(meta.get("fields", {}))
if newline_mode:
values = path.read_text(encoding=encoding).splitlines()
column_name = fields[0] if fields else "value"
frame = pd.DataFrame({column_name: values})
return LoadedDataset(name=key, frame=frame, source_path=path, metadata=meta)
read_kwargs = {
"encoding": encoding,
}
if delimiter is not None:
read_kwargs["sep"] = delimiter
else:
read_kwargs["sep"] = None
read_kwargs["engine"] = "python"
header_value = meta.get("header")
if header_value is None:
header_value = 0 if delimiter not in (None,) else None
if fields and delimiter is None:
read_kwargs.setdefault("names", fields)
read_kwargs.setdefault("header", header_value)
elif fields:
read_kwargs.setdefault("header", header_value)
try:
frame = pd.read_csv(path, **read_kwargs)
except Exception as exc:
raise DatasetLoadError(f"Failed loading '{key}' via pandas: {exc}") from exc
if index_col and index_col in frame.columns:
frame = frame.set_index(index_col)
return LoadedDataset(name=key, frame=frame, source_path=path, metadata=meta)
def load_all_datasets() -> Dict[str, LoadedDataset]:
"""Load every dataset with available metadata; errors surface per dataset."""
loaded: Dict[str, LoadedDataset] = {}
for key in metadata_keys():
try:
loaded[key] = load_dataset(key)
except DatasetLoadError as exc:
base_meta = dict(get_metadata(key) or {})
base_meta["error"] = str(exc)
loaded[key] = LoadedDataset(
name=key,
frame=pd.DataFrame(),
source_path=Path(""),
metadata=base_meta,
)
return loaded