Skip to content

io

orchard.core.io

Input/Output & Persistence Utilities.

This module manages the pipeline's interaction with the filesystem, handling configuration serialization (YAML), model checkpoint restoration, and dataset integrity verification via MD5 checksums and schema validation.

AuditSaver

Default AuditSaverProtocol implementation.

Delegates to the module-level save_config_as_yaml, dump_requirements, and dump_git_info functions — no logic duplication.

save_config(data, yaml_path)

Persist configuration to a YAML file.

Parameters:

Name Type Description Default
data Any

Configuration object to serialize.

required
yaml_path Path

Destination filesystem path.

required

Returns:

Type Description
Path

Confirmed path where the YAML was written.

Source code in orchard/core/io/serialization.py
def save_config(self, data: Any, yaml_path: Path) -> Path:
    """
    Persist configuration to a YAML file.

    Args:
        data: Configuration object to serialize.
        yaml_path: Destination filesystem path.

    Returns:
        Confirmed path where the YAML was written.
    """
    return save_config_as_yaml(data, yaml_path)

dump_requirements(output_path)

Freeze installed packages for reproducibility.

Parameters:

Name Type Description Default
output_path Path

Filesystem path for the requirements snapshot.

required
Source code in orchard/core/io/serialization.py
def dump_requirements(self, output_path: Path) -> None:
    """
    Freeze installed packages for reproducibility.

    Args:
        output_path: Filesystem path for the requirements snapshot.
    """
    dump_requirements(output_path)

dump_git_info(output_path)

Persist git commit hash and working tree status.

Parameters:

Name Type Description Default
output_path Path

Filesystem path for the git info snapshot.

required
Source code in orchard/core/io/serialization.py
def dump_git_info(self, output_path: Path) -> None:
    """
    Persist git commit hash and working tree status.

    Args:
        output_path: Filesystem path for the git info snapshot.
    """
    dump_git_info(output_path)

AuditSaverProtocol

Bases: Protocol

Protocol for run-manifest persistence (config YAML + dependency snapshot).

Enables dependency injection of auditability operations in RootOrchestrator, keeping the constructor signature lean while allowing full mocking in tests.

save_config(data, yaml_path)

Persist configuration to a YAML file.

Parameters:

Name Type Description Default
data Any

Configuration object to serialize.

required
yaml_path Path

Destination filesystem path.

required

Returns:

Type Description
Path

Confirmed path where the YAML was written.

Source code in orchard/core/io/serialization.py
def save_config(self, data: Any, yaml_path: Path) -> Path:
    """
    Persist configuration to a YAML file.

    Args:
        data: Configuration object to serialize.
        yaml_path: Destination filesystem path.

    Returns:
        Confirmed path where the YAML was written.
    """
    ...  # pragma: no cover

dump_requirements(output_path)

Freeze installed packages for reproducibility.

Parameters:

Name Type Description Default
output_path Path

Filesystem path for the requirements snapshot.

required
Source code in orchard/core/io/serialization.py
def dump_requirements(self, output_path: Path) -> None:
    """
    Freeze installed packages for reproducibility.

    Args:
        output_path: Filesystem path for the requirements snapshot.
    """
    ...  # pragma: no cover

dump_git_info(output_path)

Persist git commit hash and working tree status for auditability.

Parameters:

Name Type Description Default
output_path Path

Filesystem path for the git info snapshot.

required
Source code in orchard/core/io/serialization.py
def dump_git_info(self, output_path: Path) -> None:
    """
    Persist git commit hash and working tree status for auditability.

    Args:
        output_path: Filesystem path for the git info snapshot.
    """
    ...  # pragma: no cover

load_model_weights(model, path, device)

Restores model state from a checkpoint using secure weight-only loading.

Loads PyTorch state_dict from disk with security hardening (weights_only=True) to prevent arbitrary code execution. Automatically maps tensors to target device.

Parameters:

Name Type Description Default
model Module

The model instance to populate with loaded weights

required
path Path

Filesystem path to the checkpoint file (.pth)

required
device device

Target device for mapping the loaded tensors

required

Raises:

Type Description
OrchardExportError

If the checkpoint file does not exist at path

Example

model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture) checkpoint_path = Path("outputs/run_123/checkpoints/best_model.pth") load_model_weights(model, checkpoint_path, device)

Source code in orchard/core/io/checkpoints.py
def load_model_weights(model: torch.nn.Module, path: Path, device: torch.device) -> None:
    """
    Restores model state from a checkpoint using secure weight-only loading.

    Loads PyTorch state_dict from disk with security hardening (weights_only=True)
    to prevent arbitrary code execution. Automatically maps tensors to target device.

    Args:
        model: The model instance to populate with loaded weights
        path: Filesystem path to the checkpoint file (.pth)
        device: Target device for mapping the loaded tensors

    Raises:
        OrchardExportError: If the checkpoint file does not exist at path

    Example:
        >>> model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)
        >>> checkpoint_path = Path("outputs/run_123/checkpoints/best_model.pth")
        >>> load_model_weights(model, checkpoint_path, device)
    """
    if not path.exists():
        raise OrchardExportError(f"Model checkpoint not found at: {path}")

    # weights_only=True is used for security (avoids arbitrary code execution)
    state_dict = torch.load(path, map_location=device, weights_only=True)

    # Validate architecture compatibility before loading
    model_keys = set(model.state_dict().keys())
    checkpoint_keys = set(state_dict.keys())
    if model_keys != checkpoint_keys:
        missing = model_keys - checkpoint_keys
        unexpected = checkpoint_keys - model_keys
        parts = []
        if missing:
            parts.append(f"missing keys: {sorted(missing)[:5]}")
        if unexpected:
            parts.append(f"unexpected keys: {sorted(unexpected)[:5]}")
        raise OrchardExportError(
            f"Checkpoint architecture mismatch ({', '.join(parts)}). "
            "Ensure the config matches the architecture used during training."
        )

    model.load_state_dict(state_dict)

md5_checksum(path, chunk_size=_MD5_CHUNK_SIZE)

Calculates the MD5 checksum of a file using buffered reading.

Parameters:

Name Type Description Default
path Path

Path to the file to verify.

required
chunk_size int

Read buffer size in bytes.

_MD5_CHUNK_SIZE

Returns:

Name Type Description
str str

The calculated hexadecimal MD5 hash.

Source code in orchard/core/io/data_io.py
def md5_checksum(path: Path, chunk_size: int = _MD5_CHUNK_SIZE) -> str:
    """
    Calculates the MD5 checksum of a file using buffered reading.

    Args:
        path (Path): Path to the file to verify.
        chunk_size (int): Read buffer size in bytes.

    Returns:
        str: The calculated hexadecimal MD5 hash.
    """
    hash_md5 = hashlib.md5(usedforsecurity=False)  # pragma: no mutate
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(chunk_size), b""):  # pragma: no mutate
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

validate_npz_keys(data)

Validates that the loaded NPZ dataset contains all required dataset keys.

Parameters:

Name Type Description Default
data NpzFile

The loaded NPZ file object.

required

Raises:

Type Description
OrchardDatasetError

If any required key (images/labels) is missing.

Source code in orchard/core/io/data_io.py
def validate_npz_keys(data: np.lib.npyio.NpzFile) -> None:
    """
    Validates that the loaded NPZ dataset contains all required dataset keys.

    Args:
        data (np.lib.npyio.NpzFile): The loaded NPZ file object.

    Raises:
        OrchardDatasetError: If any required key (images/labels) is missing.
    """
    missing = _REQUIRED_NPZ_KEYS - set(data.files)
    if missing:
        found = list(data.files)
        raise OrchardDatasetError(
            f"NPZ archive is corrupted or invalid. Missing keys: {missing} | Found keys: {found}"
        )

dump_requirements(output_path)

Freeze installed packages to a requirements file for reproducibility.

Invokes pip freeze --local to capture the exact dependency versions of the current environment. The output is prefixed with a Python version header for auditability.

Parameters:

Name Type Description Default
output_path Path

Filesystem path where the requirements file is written.

required
Source code in orchard/core/io/serialization.py
def dump_requirements(output_path: Path) -> None:
    """
    Freeze installed packages to a requirements file for reproducibility.

    Invokes ``pip freeze --local`` to capture the exact dependency versions
    of the current environment. The output is prefixed with a Python version
    header for auditability.

    Args:
        output_path: Filesystem path where the requirements file is written.
    """
    import subprocess  # nosec B404
    import sys

    logger = logging.getLogger(LOGGER_NAME)

    try:
        result = subprocess.run(  # nosec B603
            [sys.executable, "-m", "pip", "freeze", "--local"],
            capture_output=True,
            text=True,
            timeout=30,
        )
        header = f"# Python {sys.version.split()[0]}\n"
        output_path.write_text(header + result.stdout, encoding="utf-8")
    except (subprocess.TimeoutExpired, OSError) as e:
        logger.error("Failed to dump requirements: %s", e)

load_config_from_yaml(yaml_path)

Loads a raw configuration dictionary from a YAML file.

Parameters:

Name Type Description Default
yaml_path Path

Path to the source YAML file.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The loaded configuration manifest.

Raises:

Type Description
FileNotFoundError

If the specified path does not exist.

Source code in orchard/core/io/serialization.py
def load_config_from_yaml(yaml_path: Path) -> dict[str, Any]:
    """
    Loads a raw configuration dictionary from a YAML file.

    Args:
        yaml_path (Path): Path to the source YAML file.

    Returns:
        dict[str, Any]: The loaded configuration manifest.

    Raises:
        FileNotFoundError: If the specified path does not exist.
    """
    if not yaml_path.exists():
        raise FileNotFoundError(f"YAML configuration file not found at: {yaml_path}")

    # Equivalent mutants: "r" is Python's default open mode; cast() has no runtime effect.
    with open(yaml_path, "r", encoding="utf-8") as f:  # pragma: no mutate
        return cast(dict[str, Any], yaml.safe_load(f))  # pragma: no mutate

save_config_as_yaml(data, yaml_path)

Serializes and persists configuration data to a YAML file.

This function coordinates the extraction of data from potentially complex objects (supporting Pydantic models, custom portable manifests, or raw dicts), applies recursive sanitization, and performs an atomic write to disk.

Parameters:

Name Type Description Default
data Any

The configuration object to save. Supports objects with 'dump_portable()' or 'model_dump()' methods, or standard dictionaries.

required
yaml_path Path

The destination filesystem path.

required

Returns:

Name Type Description
Path Path

The confirmed path where the YAML was successfully written.

Raises:

Type Description
ValueError

If the data structure cannot be serialized.

OSError

If a filesystem-level error occurs (permissions, disk full).

Source code in orchard/core/io/serialization.py
def save_config_as_yaml(data: Any, yaml_path: Path) -> Path:
    """
    Serializes and persists configuration data to a YAML file.

    This function coordinates the extraction of data from potentially complex
    objects (supporting Pydantic models, custom portable manifests, or raw dicts),
    applies recursive sanitization, and performs an atomic write to disk.

    Args:
        data (Any): The configuration object to save. Supports objects with
            'dump_portable()' or 'model_dump()' methods, or standard dictionaries.
        yaml_path (Path): The destination filesystem path.

    Returns:
        Path: The confirmed path where the YAML was successfully written.

    Raises:
        ValueError: If the data structure cannot be serialized.
        OSError: If a filesystem-level error occurs (permissions, disk full).
    """
    logger = logging.getLogger(LOGGER_NAME)

    # 1. Extraction & Sanitization Phase
    try:
        # Priority 1: Custom portability protocol
        if hasattr(data, "dump_portable"):
            raw_dict = data.dump_portable()

        # Priority 2: Pydantic model protocol
        elif hasattr(data, "model_dump"):
            try:
                raw_dict = data.model_dump(mode="json")
            except (TypeError, ValueError):  # pragma: no cover
                # Fallback for older Pydantic V2 versions or complex types
                raw_dict = data.model_dump()  # pragma: no mutate

        # Priority 3: Raw dictionary or other types
        else:
            raw_dict = data

        final_data = _sanitize_for_yaml(raw_dict)

    except Exception as e:
        logger.error("Serialization failed: object structure is incompatible. Error: %s", e)
        raise ValueError(f"Could not serialize configuration object: {e}") from e

    # 2. Persistence Phase (Atomic Write)
    try:
        _persist_yaml_atomic(final_data, yaml_path)
        logger.debug("Configuration frozen at → %s", yaml_path.name)
        return yaml_path

    except OSError as e:
        logger.error("IO Error: Could not write YAML to %s. Error: %s", yaml_path, e)
        raise