Skip to main content
Glama
datasets.py5.82 kB
from __future__ import annotations import json from dataclasses import dataclass from pathlib import Path from typing import Dict, Iterable, List, Optional import pandas as pd DATA_DIR = Path(__file__).resolve().parent.parent / "data" def _load_metadata_cache() -> Dict[str, dict]: """Read *.meta.json descriptors into a lookup keyed by dataset file name.""" metadata: Dict[str, dict] = {} if not DATA_DIR.exists(): return metadata for meta_path in sorted(DATA_DIR.glob("*.meta.json")): try: payload = json.loads(meta_path.read_text(encoding="utf-8")) except Exception as exc: # pragma: no cover - rare error path metadata[meta_path.stem] = { "error": f"Failed to parse {meta_path.name}: {exc}", "__source": str(meta_path), } continue descriptor = dict(payload) descriptor.setdefault("__source", str(meta_path)) dataset_key = meta_path.name.replace(".meta.json", "") metadata[dataset_key] = descriptor file_name = payload.get("file_name") if isinstance(file_name, str) and file_name: metadata.setdefault(file_name, descriptor) return metadata _METADATA_CACHE: Dict[str, dict] = _load_metadata_cache() _METADATA_KEYS: List[str] = sorted(_METADATA_CACHE) def metadata_keys() -> List[str]: """Return sorted dataset metadata keys.""" return list(_METADATA_KEYS) def get_metadata(name: str) -> Optional[dict]: """Fetch metadata for a dataset, if available.""" return _METADATA_CACHE.get(name) def metadata_cache() -> Dict[str, dict]: """Expose a copy of the metadata cache for callers that need full access.""" return dict(_METADATA_CACHE) class DatasetLoadError(RuntimeError): """Raised when a dataset cannot be materialised into a DataFrame.""" _DELIMITER_MAP = { "TAB": "\t", "COMMA": ",", "PIPE": "|", "SEMICOLON": ";", "SPACE": " ", } @dataclass class LoadedDataset: name: str frame: pd.DataFrame source_path: Path metadata: dict def _normalise_name(name: str) -> str: name = name.strip() if name.endswith(".meta.json"): name = name[:-10] return name def _resolve_dataset_path(name: str, meta: dict) -> Path: candidates: Iterable[Path] = [] candidates = [ DATA_DIR / name, ] file_name = meta.get("file_name") if isinstance(file_name, str) and file_name: candidates.append(DATA_DIR / file_name) for path in candidates: if path.exists(): return path raise DatasetLoadError(f"No data file found for '{name}'. Checked: {', '.join(map(str, candidates))}.") def _detect_git_lfs_pointer(path: Path) -> bool: try: with path.open("rb") as handle: header = handle.read(256) except OSError as exc: # pragma: no cover raise DatasetLoadError(f"Unable to read {path}: {exc}") from exc return header.startswith(b"version https://git-lfs.github.com/spec/v1") def _resolve_delimiter(meta: dict) -> tuple[Optional[str], bool]: raw = meta.get("delimiter") if not raw: return None, False raw_upper = str(raw).strip().upper() if raw_upper == "NEWLINE": return None, True return _DELIMITER_MAP.get(raw_upper, None), False def load_dataset(name: str, *, index_col: Optional[str] = None) -> LoadedDataset: """Load a dataset into a pandas DataFrame using its metadata.""" key = _normalise_name(name) meta = get_metadata(key) if meta is None: raise DatasetLoadError(f"No metadata found for '{key}'. Available: {metadata_keys()}") path = _resolve_dataset_path(key, meta) if _detect_git_lfs_pointer(path): raise DatasetLoadError(f"Dataset '{key}' points to a Git LFS object that is not available locally ({path.name}).") delimiter, newline_mode = _resolve_delimiter(meta) encoding = meta.get("encoding", "utf-8") fields = list(meta.get("fields", {})) if newline_mode: values = path.read_text(encoding=encoding).splitlines() column_name = fields[0] if fields else "value" frame = pd.DataFrame({column_name: values}) return LoadedDataset(name=key, frame=frame, source_path=path, metadata=meta) read_kwargs = { "encoding": encoding, } if delimiter is not None: read_kwargs["sep"] = delimiter else: read_kwargs["sep"] = None read_kwargs["engine"] = "python" header_value = meta.get("header") if header_value is None: header_value = 0 if delimiter not in (None,) else None if fields and delimiter is None: read_kwargs.setdefault("names", fields) read_kwargs.setdefault("header", header_value) elif fields: read_kwargs.setdefault("header", header_value) try: frame = pd.read_csv(path, **read_kwargs) except Exception as exc: raise DatasetLoadError(f"Failed loading '{key}' via pandas: {exc}") from exc if index_col and index_col in frame.columns: frame = frame.set_index(index_col) return LoadedDataset(name=key, frame=frame, source_path=path, metadata=meta) def load_all_datasets() -> Dict[str, LoadedDataset]: """Load every dataset with available metadata; errors surface per dataset.""" loaded: Dict[str, LoadedDataset] = {} for key in metadata_keys(): try: loaded[key] = load_dataset(key) except DatasetLoadError as exc: base_meta = dict(get_metadata(key) or {}) base_meta["error"] = str(exc) loaded[key] = LoadedDataset( name=key, frame=pd.DataFrame(), source_path=Path(""), metadata=base_meta, ) return loaded

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ndaniel/aurora-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server