Skip to content

orchard

orchard

Orchard ML: type-Safe Deep Learning for Reproducible Research.

Top-level convenience API re-exporting the most commonly used components from subpackages, so users and the orchard CLI can write:

from orchard import Config, RootOrchestrator, get_model

LogStyle

Unified logging style constants for consistent visual hierarchy.

Provides separators, symbols, indentation, and ANSI color codes used by all logging modules. Placed here (in paths.constants) rather than in logger.styles so that low-level packages (environment, config) can reference the constants without triggering circular imports.

RootOrchestrator(cfg, infra_manager=None, reporter=None, time_tracker=None, audit_saver=None, log_initializer=None, seed_setter=None, thread_applier=None, system_configurator=None, static_dir_setup=None, device_resolver=None, rank=None, local_rank=None)

Central coordinator for ML experiment lifecycle management.

Orchestrates the complete initialization sequence from configuration validation through resource provisioning to execution readiness. Implements a 7-phase initialization protocol (phases 1-6 eager, phase 7 deferred) with dependency injection for maximum testability.

The orchestrator follows the Single Responsibility Principle by delegating specialized tasks to injected dependencies while maintaining overall coordination. Uses the Context Manager pattern to guarantee resource cleanup even during failures.

Initialization Phases:

  1. Determinism: Global RNG seeding (Python, NumPy, PyTorch)
  2. Runtime Configuration: CPU thread affinity, system libraries
  3. Filesystem Provisioning: Dynamic workspace creation via RunPaths
  4. Logging Initialization: File-based persistent logging setup
  5. Config Persistence: YAML manifest export for auditability
  6. Infrastructure Guarding: OS-level resource locks (prevents race conditions)
  7. Environment Reporting: Comprehensive telemetry logging

Dependency Injection:

All external dependencies are injectable with sensible defaults:

  • infra_manager: OS resource management (locks, cleanup)
  • reporter: Environment telemetry engine
  • log_initializer: Logging setup strategy
  • seed_setter: RNG seeding function
  • thread_applier: CPU thread configuration
  • system_configurator: System library setup (matplotlib, etc)
  • static_dir_setup: Static directory creation
  • audit_saver: Config YAML + requirements snapshot persistence
  • device_resolver: Hardware device detection

Attributes:

Name Type Description
cfg Config

Validated global configuration (Single Source of Truth)

rank int

Global rank of this process (0 in single-process mode)

local_rank int

Node-local rank for GPU assignment (0 in single-process mode)

is_main_process bool

True for rank 0, False for non-main ranks

infra InfraManagerProtocol

Infrastructure resource manager

reporter ReporterProtocol

Environment telemetry engine

time_tracker TimeTrackerProtocol

Pipeline duration tracker

paths RunPaths | None

Session-specific directory structure (None on non-main ranks)

run_logger Logger | None

Active logger instance (None on non-main ranks)

repro_mode bool

Strict determinism flag

warn_only_mode bool

Warn-only mode for strict determinism

num_workers int

DataLoader worker processes

Example

cfg = Config.from_recipe(Path("recipes/config_mini_cnn.yaml")) with RootOrchestrator(cfg) as orch: ... device = orch.get_device() ... logger = orch.run_logger ... paths = orch.paths ... # Execute training pipeline with guaranteed cleanup

Notes:

  • Thread-safe: Single-instance locking via InfrastructureManager
  • Idempotent: initialize_core_services() is safe to call multiple times (subsequent calls return cached RunPaths without re-executing phases)
  • Auditable: All configuration saved to YAML in workspace
  • Deterministic: Reproducible experiments via strict seeding

Initializes orchestrator with dependency injection.

Parameters:

Name Type Description Default
cfg 'Config'

Validated global configuration (SSOT)

required
infra_manager InfraManagerProtocol | None

Infrastructure management handler (default: InfrastructureManager())

None
reporter ReporterProtocol | None

Environment reporting engine (default: Reporter())

None
time_tracker TimeTrackerProtocol | None

Pipeline duration tracker (default: TimeTracker())

None
audit_saver AuditSaverProtocol | None

Run-manifest persistence — config YAML + dependency snapshot (default: AuditSaver())

None
log_initializer Callable[..., Any] | None

Logging setup function (default: Logger.setup)

None
seed_setter Callable[..., Any] | None

RNG seeding function (default: set_seed)

None
thread_applier Callable[..., Any] | None

CPU thread configuration (default: apply_cpu_threads)

None
system_configurator Callable[..., Any] | None

System library setup (default: configure_system_libraries)

None
static_dir_setup Callable[..., Any] | None

Static directory creation (default: setup_static_directories)

None
device_resolver Callable[..., Any] | None

Device resolution (default: to_device_obj)

None
rank int | None

Global rank of this process (default: auto-detected from RANK env var). Rank 0 executes all phases; rank N skips filesystem, logging, config persistence, infrastructure locking, and reporting.

None
local_rank int | None

Node-local rank for GPU assignment (default: auto-detected from LOCAL_RANK env var). Used by device_resolver to select the correct GPU in multi-GPU distributed setups.

None
Source code in orchard/core/orchestrator.py
def __init__(
    self,
    cfg: "Config",
    infra_manager: InfraManagerProtocol | None = None,
    reporter: ReporterProtocol | None = None,
    time_tracker: TimeTrackerProtocol | None = None,
    audit_saver: AuditSaverProtocol | None = None,
    log_initializer: Callable[..., Any] | None = None,
    seed_setter: Callable[..., Any] | None = None,
    thread_applier: Callable[..., Any] | None = None,
    system_configurator: Callable[..., Any] | None = None,
    static_dir_setup: Callable[..., Any] | None = None,
    device_resolver: Callable[..., Any] | None = None,
    rank: int | None = None,
    local_rank: int | None = None,
) -> None:
    """
    Initializes orchestrator with dependency injection.

    Args:
        cfg: Validated global configuration (SSOT)
        infra_manager: Infrastructure management handler (default: InfrastructureManager())
        reporter: Environment reporting engine (default: Reporter())
        time_tracker: Pipeline duration tracker (default: TimeTracker())
        audit_saver: Run-manifest persistence — config YAML + dependency
            snapshot (default: AuditSaver())
        log_initializer: Logging setup function (default: Logger.setup)
        seed_setter: RNG seeding function (default: set_seed)
        thread_applier: CPU thread configuration (default: apply_cpu_threads)
        system_configurator: System library setup (default: configure_system_libraries)
        static_dir_setup: Static directory creation (default: setup_static_directories)
        device_resolver: Device resolution (default: to_device_obj)
        rank: Global rank of this process (default: auto-detected from RANK env var).
            Rank 0 executes all phases; rank N skips filesystem, logging,
            config persistence, infrastructure locking, and reporting.
        local_rank: Node-local rank for GPU assignment (default: auto-detected
            from LOCAL_RANK env var). Used by device_resolver to select the
            correct GPU in multi-GPU distributed setups.
    """
    self.cfg = cfg

    # Dependency injection: _resolve for objects, _resolve_callable for functions
    self.rank = _resolve(rank, get_rank)
    self.local_rank = _resolve(local_rank, get_local_rank)
    self.is_main_process = self.rank == 0
    self.infra = _resolve(infra_manager, InfrastructureManager)
    self.reporter = _resolve(reporter, Reporter)
    self.time_tracker = _resolve(time_tracker, TimeTracker)
    self._audit_saver = _resolve(audit_saver, AuditSaver)
    self._log_initializer = _resolve_callable(log_initializer, Logger.setup)
    self._seed_setter = _resolve_callable(seed_setter, set_seed)
    self._thread_applier = _resolve_callable(thread_applier, apply_cpu_threads)
    self._system_configurator = _resolve_callable(
        system_configurator, configure_system_libraries
    )
    self._static_dir_setup = _resolve_callable(static_dir_setup, setup_static_directories)
    self._device_resolver = _resolve_callable(device_resolver, to_device_obj)

    # Lazy initialization
    self._initialized: bool = False
    self._cleaned_up: bool = False
    self._infra_lock_acquired: bool = False
    self._applied_threads: int = 0
    self.paths: RunPaths | None = None
    self.run_logger: logging.Logger | None = None
    self._device_cache: torch.device | None = None

    # Policy extraction from SSOT
    self.repro_mode = self.cfg.hardware.use_deterministic_algorithms
    self.warn_only_mode = self.cfg.hardware.deterministic_warn_only
    self.num_workers = self.cfg.hardware.effective_num_workers

__enter__()

Context Manager entry — triggers the initialization sequence.

Starts the pipeline timer and delegates to initialize_core_services() for phases 1-6 (seeding, runtime config, filesystem, logging, config persistence, infrastructure locking, and device resolution). Phase 7 (environment reporting) is deferred to log_environment_report().

If any phase raises (including KeyboardInterrupt / SystemExit), cleanup() is called before re-raising to ensure partial resources (locks, file handles) are released even on failure.

Returns:

Type Description
'RootOrchestrator'

Fully initialized RootOrchestrator ready for pipeline execution.

Raises:

Type Description
BaseException

Re-raises any initialization error after cleanup.

Source code in orchard/core/orchestrator.py
def __enter__(self) -> "RootOrchestrator":
    """
    Context Manager entry — triggers the initialization sequence.

    Starts the pipeline timer and delegates to initialize_core_services()
    for phases 1-6 (seeding, runtime config, filesystem, logging,
    config persistence, infrastructure locking, and device resolution).
    Phase 7 (environment reporting) is deferred to log_environment_report().

    If any phase raises (including KeyboardInterrupt / SystemExit),
    cleanup() is called before re-raising to ensure partial resources
    (locks, file handles) are released even on failure.

    Returns:
        Fully initialized RootOrchestrator ready for pipeline execution.

    Raises:
        BaseException: Re-raises any initialization error after cleanup.
    """
    try:
        self.time_tracker.start()
        self.initialize_core_services()
        return self
    except BaseException:
        self.cleanup()
        raise

__exit__(exc_type, exc_val, exc_tb)

Context Manager exit — stops timer and guarantees resource teardown.

Invoked automatically when leaving the with block, whether the pipeline completed normally or raised an exception. Stops the timer, then delegates to cleanup() for infrastructure lock release and logging handler closure.

Error reporting is intentionally left to the caller (CLI layer), which has the user-facing context to log appropriate messages.

Returns False so that any exception propagates to the caller unchanged.

Parameters:

Name Type Description Default
exc_type type[BaseException] | None

Exception class if the block raised, else None.

required
exc_val BaseException | None

Exception instance if the block raised, else None.

required
exc_tb TracebackType | None

Traceback object if the block raised, else None.

required

Returns:

Type Description
Literal[False]

Always False — exceptions are never suppressed.

Source code in orchard/core/orchestrator.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> Literal[False]:
    """
    Context Manager exit — stops timer and guarantees resource teardown.

    Invoked automatically when leaving the ``with`` block, whether the
    pipeline completed normally or raised an exception. Stops the timer,
    then delegates to cleanup() for infrastructure lock release and
    logging handler closure.

    Error reporting is intentionally left to the caller (CLI layer),
    which has the user-facing context to log appropriate messages.

    Returns False so that any exception propagates to the caller unchanged.

    Args:
        exc_type: Exception class if the block raised, else None.
        exc_val: Exception instance if the block raised, else None.
        exc_tb: Traceback object if the block raised, else None.

    Returns:
        Always False — exceptions are never suppressed.
    """
    # Stop timer (duration already shown in pipeline summary)
    self.time_tracker.stop()

    self.cleanup()
    return False

initialize_core_services()

Executes linear sequence of environment initialization phases.

Synchronizes global state through phases 1-6, progressing from deterministic seeding to device resolution. Phase 7 (environment reporting) is deferred to log_environment_report().

In distributed mode (torchrun / DDP), only the main process (rank 0) executes phases 3-6 (filesystem, logging, config persistence, infra locking). All ranks execute phases 1-2 (seeding, threads) for identical RNG state and thread affinity, plus device resolution for DDP readiness (each rank binds to cuda:{local_rank}).

Idempotent: guarded by _initialized flag. If already initialized, returns existing RunPaths without re-executing any phase. This prevents orphaned directories (Phase 3 creates unique paths per call) and resource leaks (Phase 6 acquires filesystem locks).

Returns:

Type Description
RunPaths | None

Provisioned directory structure for rank 0, None for non-main ranks.

Raises:

Type Description
RuntimeError

If called after cleanup (single-use guard).

OrchardDeviceError

If device resolution fails at runtime.

Source code in orchard/core/orchestrator.py
def initialize_core_services(self) -> RunPaths | None:
    """
    Executes linear sequence of environment initialization phases.

    Synchronizes global state through phases 1-6, progressing from
    deterministic seeding to device resolution. Phase 7 (environment
    reporting) is deferred to log_environment_report().

    In distributed mode (torchrun / DDP), only the main process (rank 0)
    executes phases 3-6 (filesystem, logging, config persistence, infra
    locking).  All ranks execute phases 1-2 (seeding, threads) for
    identical RNG state and thread affinity, plus device resolution
    for DDP readiness (each rank binds to ``cuda:{local_rank}``).

    Idempotent: guarded by ``_initialized`` flag. If already initialized,
    returns existing RunPaths without re-executing any phase. This prevents
    orphaned directories (Phase 3 creates unique paths per call) and
    resource leaks (Phase 6 acquires filesystem locks).

    Returns:
        Provisioned directory structure for rank 0, None for non-main ranks.

    Raises:
        RuntimeError: If called after cleanup (single-use guard).
        OrchardDeviceError: If device resolution fails at runtime.
    """
    if self._cleaned_up:
        raise RuntimeError(
            "Cannot re-initialize after cleanup — "
            "RootOrchestrator is a single-use context manager"
        )
    if self._initialized:
        return self.paths

    # All ranks: deterministic seeding and thread configuration
    self._phase_1_determinism()
    applied_threads = self._phase_2_runtime_configuration()

    # Rank 0 only: filesystem, logging, persistence, locking, reporting
    if self.is_main_process:
        self._phase_3_filesystem_provisioning()
        self._phase_4_logging_initialization()

        # type guards: paths and logger are guaranteed after phases 3-4
        assert self.paths is not None, "Paths not initialized after phase 3"  # nosec B101
        assert self.run_logger is not None, "Logger not initialized after phase 4"  # nosec B101

        self._phase_5_run_manifest()
        self._phase_6_infrastructure_guarding()

        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            # resolve_device in HardwareConfig already handles GPU-unavailable
            # at config-time. If we reach here with device="cuda" in config,
            # CUDA was available then — a runtime failure (e.g. driver crash)
            # is unrecoverable. Silently falling back to CPU would waste hours
            # of compute with GPU-tuned hyperparameters (batch size, mixed
            # precision, etc.). Fail fast so the user can fix the environment.
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    else:
        logger.debug("Rank %d: skipping phases 3-6 (non-main process).", self.rank)
        # Non-main ranks still need their device for DDP readiness
        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    self._applied_threads = applied_threads
    self._initialized = True
    return self.paths

log_environment_report()

Emit the environment initialization report (phase 7).

Designed to be called explicitly by the CLI app after external services (e.g. MLflow tracker) have been started, so that all enter/exit log messages appear in the correct chronological order.

Source code in orchard/core/orchestrator.py
def log_environment_report(self) -> None:
    """
    Emit the environment initialization report (phase 7).

    Designed to be called explicitly by the CLI app after external
    services (e.g. MLflow tracker) have been started, so that all
    enter/exit log messages appear in the correct chronological order.
    """
    if self._initialized and self.is_main_process:
        self._phase_7_environment_report(self._applied_threads)

cleanup()

Releases system resources and removes execution lock file.

Guarantees clean state for subsequent runs by unlinking InfrastructureManager guards and closing logging handlers. Non-main ranks skip resource release (they never acquired locks or opened file-based log handlers).

Source code in orchard/core/orchestrator.py
def cleanup(self) -> None:
    """
    Releases system resources and removes execution lock file.

    Guarantees clean state for subsequent runs by unlinking
    InfrastructureManager guards and closing logging handlers.
    Non-main ranks skip resource release (they never acquired locks
    or opened file-based log handlers).
    """
    if not self.is_main_process:
        self._cleaned_up = True
        return

    cleanup_logger = self.run_logger or logging.getLogger(LOGGER_NAME)
    try:
        if self._infra_lock_acquired:
            self.infra.release_resources(self.cfg, logger=cleanup_logger)
            self._infra_lock_acquired = False
    except (OSError, RuntimeError) as e:
        cleanup_logger.error("Failed to release system lock: %s", e)

    self._close_logging_handlers()
    self._cleaned_up = True

get_device()

Resolves and caches optimal computation device (CUDA/CPU/MPS).

Returns:

Type Description
device

PyTorch device object for model execution

Source code in orchard/core/orchestrator.py
def get_device(self) -> torch.device:
    """
    Resolves and caches optimal computation device (CUDA/CPU/MPS).

    Returns:
        PyTorch device object for model execution
    """
    if self._device_cache is None:
        self._device_cache = self._device_resolver(
            device_str=self.cfg.hardware.device,
            local_rank=self.local_rank,
        )
    return self._device_cache

OrchardConfigError

Bases: OrchardError, ValueError

Configuration validation error (backward-compatible with ValueError).

OrchardDatasetError

Bases: OrchardError

Dataset loading, fetching, or validation error.

OrchardDeviceError

Bases: OrchardError, RuntimeError

Device resolution failed at runtime (e.g. driver crash after config validation).

OrchardError

Bases: Exception

Base exception for all Orchard ML errors.

OrchardExportError

Bases: OrchardError

Model export (ONNX) or checkpoint loading error.

OrchardInfrastructureError

Bases: OrchardError

OS-level resource lock acquisition or release failure.

get_model(device, dataset_cfg, arch_cfg, verbose=True)

Factory function to resolve, instantiate, and prepare architectures.

It maps configuration identifiers to specific builder functions via an internal registry. Structural parameters like input channels and class cardinality are derived from the 'effective' geometry resolved by the DatasetConfig.

Parameters:

Name Type Description Default
device device

Hardware accelerator target.

required
dataset_cfg DatasetConfig

Dataset sub-config with resolved metadata.

required
arch_cfg ArchitectureConfig

Architecture sub-config with model selection.

required
verbose bool

If True, emit builder-internal INFO logging.

True

Returns:

Type Description
Module

nn.Module: The instantiated model synchronized with the target device.

Example

model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)

Raises:

Type Description
ValueError

If the requested architecture is not found in the registry.

Source code in orchard/architectures/factory.py
def get_model(
    device: torch.device,
    dataset_cfg: DatasetConfig,
    arch_cfg: ArchitectureConfig,
    verbose: bool = True,
) -> nn.Module:
    """
    Factory function to resolve, instantiate, and prepare architectures.

    It maps configuration identifiers to specific builder functions via an
    internal registry. Structural parameters like input channels and class
    cardinality are derived from the 'effective' geometry resolved by
    the DatasetConfig.

    Args:
        device: Hardware accelerator target.
        dataset_cfg: Dataset sub-config with resolved metadata.
        arch_cfg: Architecture sub-config with model selection.
        verbose: If True, emit builder-internal INFO logging.

    Returns:
        nn.Module: The instantiated model synchronized with the target device.

    Example:
        >>> model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)

    Raises:
        ValueError: If the requested architecture is not found in the registry.
    """
    # Resolve structural dimensions from sub-configs
    in_channels = dataset_cfg.effective_in_channels
    num_classes = dataset_cfg.num_classes
    model_name_lower = arch_cfg.name.lower()

    if verbose:
        logger.info(
            "%s%s %-18s: %s | Input: %dx%dx%d | Output: %d classes",
            LogStyle.INDENT,
            LogStyle.ARROW,
            "Architecture",
            arch_cfg.name,
            dataset_cfg.img_size,
            dataset_cfg.img_size,
            in_channels,
            num_classes,
        )

    # Instance construction and adaptation.
    # When verbose=False (e.g. export phase), suppress builder-internal INFO logs
    # to avoid duplicating messages already shown during training.
    _prev_level = logger.level
    if not verbose:
        logger.setLevel(logging.WARNING)
    try:
        with _suppress_download_noise():
            model = _dispatch_builder(
                model_name_lower, num_classes, in_channels, arch_cfg, dataset_cfg.resolution
            )
    finally:
        logger.setLevel(_prev_level)

    # Centralised device placement (builders stay device-agnostic)
    model = model.to(device)

    # Parameter telemetry
    if verbose:
        total_params = sum(p.numel() for p in model.parameters())
        logger.info(
            "%s%s %-18s: %s | Parameters: %s",
            LogStyle.INDENT,
            LogStyle.ARROW,
            "Deployed",
            str(device).upper(),
            f"{total_params:,}",
        )

    return model

log_pipeline_summary(test_acc, macro_f1, best_model_path, run_dir, duration, test_auc=None, onnx_path=None, logger_instance=None)

Log final pipeline completion summary.

Called at the end of the pipeline after all phases complete. Consolidates key metrics and artifact locations.

Parameters:

Name Type Description Default
test_acc float

Final test accuracy

required
macro_f1 float

Final macro F1 score

required
best_model_path Path

Path to best model checkpoint

required
run_dir Path

Root directory for this run

required
duration str

Human-readable duration string

required
test_auc float | None

Final test AUC (if available)

None
onnx_path Path | None

Path to ONNX export (if performed)

None
logger_instance Logger | None

Logger instance to use (defaults to module logger)

None
Source code in orchard/core/logger/progress.py
def log_pipeline_summary(
    test_acc: float,
    macro_f1: float,
    best_model_path: Path,
    run_dir: Path,
    duration: str,
    test_auc: float | None = None,
    onnx_path: Path | None = None,
    logger_instance: logging.Logger | None = None,
) -> None:
    """
    Log final pipeline completion summary.

    Called at the end of the pipeline after all phases complete.
    Consolidates key metrics and artifact locations.

    Args:
        test_acc: Final test accuracy
        macro_f1: Final macro F1 score
        best_model_path: Path to best model checkpoint
        run_dir: Root directory for this run
        duration: Human-readable duration string
        test_auc: Final test AUC (if available)
        onnx_path: Path to ONNX export (if performed)
        logger_instance: Logger instance to use (defaults to module logger)
    """
    log = logger_instance or logger

    I = LogStyle.INDENT  # noqa: E741  # pragma: no mutate
    A = LogStyle.ARROW  # pragma: no mutate
    S = LogStyle.SUCCESS  # pragma: no mutate

    Reporter.log_phase_header(log, "PIPELINE COMPLETE", LogStyle.DOUBLE)  # pragma: no mutate
    log.info("%s%s Test Accuracy  : %7.2f%%", I, S, test_acc * 100)
    log.info("%s%s Macro F1       : %8.4f", I, S, macro_f1)
    if test_auc is not None:
        log.info("%s%s Test AUC       : %8.4f", I, S, test_auc)
    log.info("%s%s Best Model     : %s", I, A, Path(best_model_path).name)
    if onnx_path:
        log.info("%s%s ONNX Export    : %s", I, A, Path(onnx_path).name)
    log.info("%s%s Run Directory  : %s", I, A, Path(run_dir).name)
    log.info("%s%s Duration       : %s", I, A, duration)
    log.info(LogStyle.DOUBLE)

run_export_phase(orchestrator, checkpoint_path, cfg=None)

Execute model export phase.

Exports trained model to production format (ONNX) with validation. Export format and opset version are read from cfg.export.

Parameters:

Name Type Description Default
orchestrator RootOrchestrator

Active RootOrchestrator providing paths, device, logger

required
checkpoint_path Path

Path to trained model checkpoint (.pth)

required
cfg Config | None

Optional config override (defaults to orchestrator's config)

None

Returns:

Type Description
Path | None

Path to exported model, or None if export config is absent

Example

with RootOrchestrator(cfg) as orch: ... best_path, *_ = run_training_phase(orch) ... onnx_path = run_export_phase(orch, best_path) ... print(f"Exported to: {onnx_path}")

Source code in orchard/pipeline/phases.py
def run_export_phase(
    orchestrator: RootOrchestrator,
    checkpoint_path: Path,
    cfg: Config | None = None,
) -> Path | None:
    """
    Execute model export phase.

    Exports trained model to production format (ONNX) with validation.
    Export format and opset version are read from ``cfg.export``.

    Args:
        orchestrator: Active RootOrchestrator providing paths, device, logger
        checkpoint_path: Path to trained model checkpoint (.pth)
        cfg: Optional config override (defaults to orchestrator's config)

    Returns:
        Path to exported model, or None if export config is absent

    Example:
        >>> with RootOrchestrator(cfg) as orch:
        ...     best_path, *_ = run_training_phase(orch)
        ...     onnx_path = run_export_phase(orch, best_path)
        ...     print(f"Exported to: {onnx_path}")
    """
    cfg = cfg or orchestrator.cfg

    if cfg.export is None:
        return None

    paths = orchestrator.paths
    run_logger = orchestrator.run_logger

    # type guards for MyPy
    assert run_logger is not None, _ERR_LOGGER_NOT_INIT  # nosec B101
    assert paths is not None, _ERR_PATHS_NOT_INIT  # nosec B101

    Reporter.log_phase_header(run_logger, "MODEL EXPORT")

    # Determine input shape from config (must match get_model's channel resolution)
    resolution = cfg.dataset.resolution
    input_shape = (cfg.dataset.effective_in_channels, resolution, resolution)

    # Export path (directory managed by RunPaths)
    onnx_path = paths.exports / "model.onnx"

    # Reload model architecture (on CPU for export)
    export_model = get_model(
        device=torch.device("cpu"),
        dataset_cfg=cfg.dataset,
        arch_cfg=cfg.architecture,
        verbose=False,
    )

    export_cfg = cfg.export  # guaranteed non-None (checked above)
    export_to_onnx(
        model=export_model,
        checkpoint_path=checkpoint_path,
        output_path=onnx_path,
        input_shape=input_shape,
        opset_version=export_cfg.opset_version,
        dynamic_axes=export_cfg.dynamic_axes,
        do_constant_folding=export_cfg.do_constant_folding,
        validate=export_cfg.validate_export,
    )

    # Post-export quantization
    quantized_path = None
    if export_cfg.quantize:
        quantized_path = quantize_model(
            onnx_path=onnx_path,
            backend=export_cfg.quantization_backend,
            weight_type=export_cfg.quantization_type,
        )

    # Numerical validation: compare PyTorch vs ONNX outputs
    if export_cfg.validate_export:
        is_valid = validate_export(
            pytorch_model=export_model,
            onnx_path=onnx_path,
            input_shape=input_shape,
            num_samples=export_cfg.validation_samples,
            max_deviation=export_cfg.max_deviation,
            label=export_cfg.format.upper(),
        )
        # `is False` (not `not is_valid`): None means onnxruntime is absent,
        # which is a skip — only warn when validation actually ran and failed.
        if is_valid is False:
            logger.warning(
                "  %s Numerical validation failed: ONNX outputs diverge from PyTorch model",
                LogStyle.WARNING,
            )

        # Validate quantized model (expected larger deviations from quantization)
        if quantized_path is not None:
            q_valid = validate_export(
                pytorch_model=export_model,
                onnx_path=quantized_path,
                input_shape=input_shape,
                num_samples=export_cfg.validation_samples,
                max_deviation=export_cfg.max_deviation * _QUANTIZED_TOLERANCE_FACTOR,
                label=export_cfg.quantization_type.upper(),
            )
            # See comment above: `is False` intentionally excludes None (skipped).
            if q_valid is False:
                logger.error(
                    "  %s Quantized model validation failed: "
                    "outputs diverge beyond 10x tolerance",
                    LogStyle.FAILURE,
                )

    # Inference latency benchmark
    if export_cfg.benchmark:
        benchmark_onnx_inference(
            onnx_path=onnx_path,
            input_shape=input_shape,
            num_runs=export_cfg.benchmark_runs,
            seed=cfg.training.seed,
            label=export_cfg.format.upper(),
        )
        if quantized_path:
            benchmark_onnx_inference(
                onnx_path=quantized_path,
                input_shape=input_shape,
                num_runs=export_cfg.benchmark_runs,
                seed=cfg.training.seed,
                label=export_cfg.quantization_type.upper(),
            )

    logger.info("  %s Export completed", LogStyle.SUCCESS)
    logger.info("    %s Output            : %s", LogStyle.ARROW, onnx_path.name)
    if quantized_path:
        logger.info("    %s Quantized         : %s", LogStyle.ARROW, quantized_path.name)

    return onnx_path

run_optimization_phase(orchestrator, cfg=None, tracker=None)

Execute hyperparameter optimization phase.

Runs Optuna study with configured trials, pruning, and early stopping. Generates visualizations (if enabled) and exports best configuration.

Parameters:

Name Type Description Default
orchestrator RootOrchestrator

Active RootOrchestrator providing paths, device, logger

required
cfg Config | None

Optional config override (defaults to orchestrator's config)

None
tracker TrackerProtocol | None

Optional experiment tracker for MLflow nested trial logging

None

Returns:

Type Description
tuple[Study, Path | None]

tuple of (completed study, path to best_config.yaml or None)

Example

with RootOrchestrator(cfg) as orch: ... study, best_config_path = run_optimization_phase(orch) ... print(f"Best AUC: {study.best_value:.4f}")

Source code in orchard/pipeline/phases.py
def run_optimization_phase(
    orchestrator: RootOrchestrator,
    cfg: Config | None = None,
    tracker: TrackerProtocol | None = None,
) -> tuple[optuna.Study, Path | None]:
    """
    Execute hyperparameter optimization phase.

    Runs Optuna study with configured trials, pruning, and early stopping.
    Generates visualizations (if enabled) and exports best configuration.

    Args:
        orchestrator: Active RootOrchestrator providing paths, device, logger
        cfg: Optional config override (defaults to orchestrator's config)
        tracker: Optional experiment tracker for MLflow nested trial logging

    Returns:
        tuple of (completed study, path to best_config.yaml or None)

    Example:
        >>> with RootOrchestrator(cfg) as orch:
        ...     study, best_config_path = run_optimization_phase(orch)
        ...     print(f"Best AUC: {study.best_value:.4f}")
    """
    cfg = cfg or orchestrator.cfg
    paths = orchestrator.paths
    device = orchestrator.get_device()
    run_logger = orchestrator.run_logger

    # type guards for MyPy
    assert run_logger is not None, _ERR_LOGGER_NOT_INIT  # nosec B101
    assert paths is not None, _ERR_PATHS_NOT_INIT  # nosec B101

    Reporter.log_phase_header(run_logger, "HYPERPARAMETER OPTIMIZATION", LogStyle.DOUBLE)

    # Execute Optuna study (includes post-processing: visualizations, best config export)
    study = run_optimization(cfg=cfg, device=device, paths=paths, tracker=tracker)

    log_optimization_summary(
        study=study,
        cfg=cfg,
        device=device,
        paths=paths,
    )

    # Best config path is in reports dir (exported by orchestrator if save_best_config=True)
    candidate = paths.reports / "best_config.yaml"
    best_config_path: Path | None = candidate if candidate.exists() else None

    return study, best_config_path

run_training_phase(orchestrator, cfg=None, tracker=None)

Execute model training phase.

Loads dataset, initializes model, runs training with validation, and performs final evaluation on test set.

Parameters:

Name Type Description Default
orchestrator RootOrchestrator

Active RootOrchestrator providing paths, device, logger

required
cfg Config | None

Optional config override (defaults to orchestrator's config)

None
tracker TrackerProtocol | None

Optional experiment tracker for MLflow metric logging

None

Returns:

Type Description
TrainingResult

TrainingResult named tuple with best_model_path, train_losses,

TrainingResult

val_metrics, model, macro_f1, test_acc, test_auc.

Example

with RootOrchestrator(cfg) as orch: ... result = run_training_phase(orch) ... print(f"Test Accuracy: {result.test_acc:.4f}")

Source code in orchard/pipeline/phases.py
def run_training_phase(
    orchestrator: RootOrchestrator,
    cfg: Config | None = None,
    tracker: TrackerProtocol | None = None,
) -> TrainingResult:
    """
    Execute model training phase.

    Loads dataset, initializes model, runs training with validation,
    and performs final evaluation on test set.

    Args:
        orchestrator: Active RootOrchestrator providing paths, device, logger
        cfg: Optional config override (defaults to orchestrator's config)
        tracker: Optional experiment tracker for MLflow metric logging

    Returns:
        TrainingResult named tuple with best_model_path, train_losses,
        val_metrics, model, macro_f1, test_acc, test_auc.

    Example:
        >>> with RootOrchestrator(cfg) as orch:
        ...     result = run_training_phase(orch)
        ...     print(f"Test Accuracy: {result.test_acc:.4f}")
    """
    cfg = cfg or orchestrator.cfg
    paths = orchestrator.paths
    device = orchestrator.get_device()
    run_logger = orchestrator.run_logger

    # type guards for MyPy
    assert run_logger is not None, _ERR_LOGGER_NOT_INIT  # nosec B101
    assert paths is not None, _ERR_PATHS_NOT_INIT  # nosec B101

    # Dataset metadata (respects data_root override via _ensure_metadata)
    ds_meta = cfg.dataset._ensure_metadata

    # DATA PREPARATION
    Reporter.log_phase_header(run_logger, "DATA PREPARATION")

    data = load_dataset(ds_meta)
    loaders = get_dataloaders(data, cfg.dataset, cfg.training, cfg.augmentation, cfg.num_workers)
    train_loader, val_loader, test_loader = loaders

    show_samples_for_dataset(
        loader=train_loader,
        dataset_name=cfg.dataset.dataset_name,
        run_paths=paths,
        mean=cfg.dataset.mean,
        std=cfg.dataset.std,
        arch_name=cfg.architecture.name,
        fig_dpi=cfg.evaluation.fig_dpi,
        num_samples=cfg.evaluation.n_samples,
        resolution=cfg.dataset.resolution,
    )

    # MODEL TRAINING
    Reporter.log_phase_header(
        run_logger, "TRAINING PIPELINE - " + cfg.architecture.name.upper(), LogStyle.DOUBLE
    )

    model = get_model(device=device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)

    class_weights = None
    if cfg.training.weighted_loss:
        train_labels = train_loader.dataset.labels.flatten()  # type: ignore[attr-defined]
        class_weights = compute_class_weights(train_labels, ds_meta.num_classes, device)

    task = get_task(cfg.task_type)
    criterion = task.criterion_factory.get_criterion(cfg.training, class_weights=class_weights)
    optimizer = get_optimizer(model, cfg.training)
    scheduler = get_scheduler(optimizer, cfg.training)

    trainer = ModelTrainer(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        optimizer=optimizer,
        scheduler=scheduler,
        criterion=criterion,
        device=device,
        training=cfg.training,
        output_path=paths.best_model_path,
        tracker=tracker,
    )

    best_model_path, train_losses, val_metrics_history = trainer.train()

    # FINAL EVALUATION
    Reporter.log_phase_header(run_logger, "FINAL EVALUATION")

    macro_f1, test_acc, test_auc = task.eval_pipeline.run_evaluation(
        model=model,
        test_loader=test_loader,
        train_losses=train_losses,
        val_metrics_history=val_metrics_history,
        class_names=ds_meta.classes,
        paths=paths,
        training=cfg.training,
        dataset=cfg.dataset,
        augmentation=cfg.augmentation,
        evaluation=cfg.evaluation,
        arch_name=cfg.architecture.name,
        aug_info=get_augmentations_description(
            cfg.augmentation,
            cfg.dataset.img_size,  # type: ignore[arg-type]
            cfg.training.mixup_alpha,
            ds_meta=ds_meta,
        ),
        tracker=tracker,
    )

    return TrainingResult(
        best_model_path=best_model_path,
        train_losses=train_losses,
        val_metrics=val_metrics_history,
        model=model,
        macro_f1=macro_f1,
        test_acc=test_acc,
        test_auc=test_auc,
    )

create_tracker(cfg)

Factory: returns MLflowTracker if tracking is configured, else NoOpTracker.

Parameters:

Name Type Description Default
cfg Any

Config object. If cfg.tracking is set and enabled, returns MLflowTracker.

required

Returns:

Type Description
TrackerProtocol

Active tracker instance.

Source code in orchard/tracking/tracker.py
def create_tracker(cfg: Any) -> TrackerProtocol:
    """
    Factory: returns MLflowTracker if tracking is configured, else NoOpTracker.

    Args:
        cfg: Config object. If cfg.tracking is set and enabled, returns MLflowTracker.

    Returns:
        Active tracker instance.
    """
    tracking_cfg = getattr(cfg, "tracking", None)
    if tracking_cfg is None or not tracking_cfg.enabled:
        return NoOpTracker()

    if not _MLFLOW_AVAILABLE:
        logger.warning(
            "Tracking enabled in config but mlflow is not installed. "
            "Install with: pip install mlflow"
        )
        return NoOpTracker()

    return MLflowTracker(experiment_name=tracking_cfg.experiment_name)