Skip to content

orchard

orchard

Orchard ML: type-Safe Deep Learning for Reproducible Research.

Top-level convenience API re-exporting the most commonly used components from subpackages, so users and the orchard CLI can write:

from orchard import Config, RootOrchestrator, get_model

LogStyle

Unified logging style constants for consistent visual hierarchy.

Provides separators, symbols, indentation, and ANSI color codes used by all logging modules. Placed here (in paths.constants) rather than in logger.styles so that low-level packages (environment, config) can reference the constants without triggering circular imports.

RootOrchestrator(cfg, infra_manager=None, reporter=None, time_tracker=None, audit_saver=None, log_initializer=None, seed_setter=None, thread_applier=None, system_configurator=None, static_dir_setup=None, device_resolver=None, rank=None, local_rank=None)

Central coordinator for ML experiment lifecycle management.

Orchestrates the complete initialization sequence from configuration validation through resource provisioning to execution readiness. Implements a 7-phase initialization protocol (phases 1-6 eager, phase 7 deferred) with dependency injection for maximum testability.

The orchestrator follows the Single Responsibility Principle by delegating specialized tasks to injected dependencies while maintaining overall coordination. Uses the Context Manager pattern to guarantee resource cleanup even during failures.

Initialization Phases:

  1. Determinism: Global RNG seeding (Python, NumPy, PyTorch)
  2. Runtime Configuration: CPU thread affinity, system libraries
  3. Filesystem Provisioning: Dynamic workspace creation via RunPaths
  4. Logging Initialization: File-based persistent logging setup
  5. Config Persistence: YAML manifest export for auditability
  6. Infrastructure Guarding: OS-level resource locks (prevents race conditions)
  7. Environment Reporting: Comprehensive telemetry logging

Dependency Injection:

All external dependencies are injectable with sensible defaults:

  • infra_manager: OS resource management (locks, cleanup)
  • reporter: Environment telemetry engine
  • log_initializer: Logging setup strategy
  • seed_setter: RNG seeding function
  • thread_applier: CPU thread configuration
  • system_configurator: System library setup (matplotlib, etc)
  • static_dir_setup: Static directory creation
  • audit_saver: Config YAML + requirements snapshot persistence
  • device_resolver: Hardware device detection

Attributes:

Name Type Description
cfg Config

Validated global configuration (Single Source of Truth)

rank int

Global rank of this process (0 in single-process mode)

local_rank int

Node-local rank for GPU assignment (0 in single-process mode)

is_main_process bool

True for rank 0, False for non-main ranks

infra InfraManagerProtocol

Infrastructure resource manager

reporter ReporterProtocol

Environment telemetry engine

time_tracker TimeTrackerProtocol

Pipeline duration tracker

paths RunPaths | None

Session-specific directory structure (None on non-main ranks)

run_logger Logger | None

Active logger instance (None on non-main ranks)

repro_mode bool

Strict determinism flag

warn_only_mode bool

Warn-only mode for strict determinism

num_workers int

DataLoader worker processes

Example

cfg = Config.from_recipe(Path("recipes/config_mini_cnn.yaml")) with RootOrchestrator(cfg) as orch: ... device = orch.get_device() ... logger = orch.run_logger ... paths = orch.paths ... # Execute training pipeline with guaranteed cleanup

Notes:

  • Thread-safe: Single-instance locking via InfrastructureManager
  • Idempotent: initialize_core_services() is safe to call multiple times (subsequent calls return cached RunPaths without re-executing phases)
  • Auditable: All configuration saved to YAML in workspace
  • Deterministic: Reproducible experiments via strict seeding

Initializes orchestrator with dependency injection.

Parameters:

Name Type Description Default
cfg 'Config'

Validated global configuration (SSOT)

required
infra_manager InfraManagerProtocol | None

Infrastructure management handler (default: InfrastructureManager())

None
reporter ReporterProtocol | None

Environment reporting engine (default: Reporter())

None
time_tracker TimeTrackerProtocol | None

Pipeline duration tracker (default: TimeTracker())

None
audit_saver AuditSaverProtocol | None

Run-manifest persistence — config YAML + dependency snapshot (default: AuditSaver())

None
log_initializer Callable[..., Any] | None

Logging setup function (default: Logger.setup)

None
seed_setter Callable[..., Any] | None

RNG seeding function (default: set_seed)

None
thread_applier Callable[..., Any] | None

CPU thread configuration (default: apply_cpu_threads)

None
system_configurator Callable[..., Any] | None

System library setup (default: configure_system_libraries)

None
static_dir_setup Callable[..., Any] | None

Static directory creation (default: setup_static_directories)

None
device_resolver Callable[..., Any] | None

Device resolution (default: to_device_obj)

None
rank int | None

Global rank of this process (default: auto-detected from RANK env var). Rank 0 executes all phases; rank N skips filesystem, logging, config persistence, infrastructure locking, and reporting.

None
local_rank int | None

Node-local rank for GPU assignment (default: auto-detected from LOCAL_RANK env var). Used by device_resolver to select the correct GPU in multi-GPU distributed setups.

None
Source code in orchard/core/orchestrator.py
def __init__(
    self,
    cfg: "Config",
    infra_manager: InfraManagerProtocol | None = None,
    reporter: ReporterProtocol | None = None,
    time_tracker: TimeTrackerProtocol | None = None,
    audit_saver: AuditSaverProtocol | None = None,
    log_initializer: Callable[..., Any] | None = None,
    seed_setter: Callable[..., Any] | None = None,
    thread_applier: Callable[..., Any] | None = None,
    system_configurator: Callable[..., Any] | None = None,
    static_dir_setup: Callable[..., Any] | None = None,
    device_resolver: Callable[..., Any] | None = None,
    rank: int | None = None,
    local_rank: int | None = None,
) -> None:
    """
    Initializes orchestrator with dependency injection.

    Args:
        cfg: Validated global configuration (SSOT)
        infra_manager: Infrastructure management handler (default: InfrastructureManager())
        reporter: Environment reporting engine (default: Reporter())
        time_tracker: Pipeline duration tracker (default: TimeTracker())
        audit_saver: Run-manifest persistence — config YAML + dependency
            snapshot (default: AuditSaver())
        log_initializer: Logging setup function (default: Logger.setup)
        seed_setter: RNG seeding function (default: set_seed)
        thread_applier: CPU thread configuration (default: apply_cpu_threads)
        system_configurator: System library setup (default: configure_system_libraries)
        static_dir_setup: Static directory creation (default: setup_static_directories)
        device_resolver: Device resolution (default: to_device_obj)
        rank: Global rank of this process (default: auto-detected from RANK env var).
            Rank 0 executes all phases; rank N skips filesystem, logging,
            config persistence, infrastructure locking, and reporting.
        local_rank: Node-local rank for GPU assignment (default: auto-detected
            from LOCAL_RANK env var). Used by device_resolver to select the
            correct GPU in multi-GPU distributed setups.
    """
    self.cfg = cfg

    # Dependency injection: _resolve for objects, _resolve_callable for functions
    self.rank = _resolve(rank, get_rank)
    self.local_rank = _resolve(local_rank, get_local_rank)
    self.is_main_process = self.rank == 0
    self.infra = _resolve(infra_manager, InfrastructureManager)
    self.reporter = _resolve(reporter, Reporter)
    self.time_tracker = _resolve(time_tracker, TimeTracker)
    self._audit_saver = _resolve(audit_saver, AuditSaver)
    self._log_initializer = _resolve_callable(log_initializer, Logger.setup)
    self._seed_setter = _resolve_callable(seed_setter, set_seed)
    self._thread_applier = _resolve_callable(thread_applier, apply_cpu_threads)
    self._system_configurator = _resolve_callable(
        system_configurator, configure_system_libraries
    )
    self._static_dir_setup = _resolve_callable(static_dir_setup, setup_static_directories)
    self._device_resolver = _resolve_callable(device_resolver, to_device_obj)

    # Lazy initialization
    self._initialized: bool = False
    self._cleaned_up: bool = False
    self._infra_lock_acquired: bool = False
    self._applied_threads: int = 0
    self.paths: RunPaths | None = None
    self.run_logger: logging.Logger | None = None
    self._device_cache: torch.device | None = None

    # Policy extraction from SSOT
    self.repro_mode = self.cfg.hardware.use_deterministic_algorithms
    self.warn_only_mode = self.cfg.hardware.deterministic_warn_only
    self.num_workers = self.cfg.hardware.effective_num_workers

__enter__()

Context Manager entry — triggers the initialization sequence.

Starts the pipeline timer and delegates to initialize_core_services() for phases 1-6 (seeding, runtime config, filesystem, logging, config persistence, infrastructure locking, and device resolution). Phase 7 (environment reporting) is deferred to log_environment_report().

If any phase raises (including KeyboardInterrupt / SystemExit), cleanup() is called before re-raising to ensure partial resources (locks, file handles) are released even on failure.

Returns:

Type Description
'RootOrchestrator'

Fully initialized RootOrchestrator ready for pipeline execution.

Raises:

Type Description
BaseException

Re-raises any initialization error after cleanup.

Source code in orchard/core/orchestrator.py
def __enter__(self) -> "RootOrchestrator":
    """
    Context Manager entry — triggers the initialization sequence.

    Starts the pipeline timer and delegates to initialize_core_services()
    for phases 1-6 (seeding, runtime config, filesystem, logging,
    config persistence, infrastructure locking, and device resolution).
    Phase 7 (environment reporting) is deferred to log_environment_report().

    If any phase raises (including KeyboardInterrupt / SystemExit),
    cleanup() is called before re-raising to ensure partial resources
    (locks, file handles) are released even on failure.

    Returns:
        Fully initialized RootOrchestrator ready for pipeline execution.

    Raises:
        BaseException: Re-raises any initialization error after cleanup.
    """
    try:
        self.time_tracker.start()
        self.initialize_core_services()
        return self
    except BaseException:
        self.cleanup()
        raise

__exit__(exc_type, exc_val, exc_tb)

Context Manager exit — stops timer and guarantees resource teardown.

Invoked automatically when leaving the with block, whether the pipeline completed normally or raised an exception. Stops the timer, then delegates to cleanup() for infrastructure lock release and logging handler closure.

Error reporting is intentionally left to the caller (CLI layer), which has the user-facing context to log appropriate messages.

Returns False so that any exception propagates to the caller unchanged.

Parameters:

Name Type Description Default
exc_type type[BaseException] | None

Exception class if the block raised, else None.

required
exc_val BaseException | None

Exception instance if the block raised, else None.

required
exc_tb TracebackType | None

Traceback object if the block raised, else None.

required

Returns:

Type Description
Literal[False]

Always False — exceptions are never suppressed.

Source code in orchard/core/orchestrator.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> Literal[False]:
    """
    Context Manager exit — stops timer and guarantees resource teardown.

    Invoked automatically when leaving the ``with`` block, whether the
    pipeline completed normally or raised an exception. Stops the timer,
    then delegates to cleanup() for infrastructure lock release and
    logging handler closure.

    Error reporting is intentionally left to the caller (CLI layer),
    which has the user-facing context to log appropriate messages.

    Returns False so that any exception propagates to the caller unchanged.

    Args:
        exc_type: Exception class if the block raised, else None.
        exc_val: Exception instance if the block raised, else None.
        exc_tb: Traceback object if the block raised, else None.

    Returns:
        Always False — exceptions are never suppressed.
    """
    # Stop timer (duration already shown in pipeline summary)
    self.time_tracker.stop()

    self.cleanup()
    return False

initialize_core_services()

Executes linear sequence of environment initialization phases.

Synchronizes global state through phases 1-6, progressing from deterministic seeding to device resolution. Phase 7 (environment reporting) is deferred to log_environment_report().

In distributed mode (torchrun / DDP), only the main process (rank 0) executes phases 3-6 (filesystem, logging, config persistence, infra locking). All ranks execute phases 1-2 (seeding, threads) for identical RNG state and thread affinity, plus device resolution for DDP readiness (each rank binds to cuda:{local_rank}).

Idempotent: guarded by _initialized flag. If already initialized, returns existing RunPaths without re-executing any phase. This prevents orphaned directories (Phase 3 creates unique paths per call) and resource leaks (Phase 6 acquires filesystem locks).

Returns:

Type Description
RunPaths | None

Provisioned directory structure for rank 0, None for non-main ranks.

Raises:

Type Description
RuntimeError

If called after cleanup (single-use guard).

OrchardDeviceError

If device resolution fails at runtime.

Source code in orchard/core/orchestrator.py
def initialize_core_services(self) -> RunPaths | None:
    """
    Executes linear sequence of environment initialization phases.

    Synchronizes global state through phases 1-6, progressing from
    deterministic seeding to device resolution. Phase 7 (environment
    reporting) is deferred to log_environment_report().

    In distributed mode (torchrun / DDP), only the main process (rank 0)
    executes phases 3-6 (filesystem, logging, config persistence, infra
    locking).  All ranks execute phases 1-2 (seeding, threads) for
    identical RNG state and thread affinity, plus device resolution
    for DDP readiness (each rank binds to ``cuda:{local_rank}``).

    Idempotent: guarded by ``_initialized`` flag. If already initialized,
    returns existing RunPaths without re-executing any phase. This prevents
    orphaned directories (Phase 3 creates unique paths per call) and
    resource leaks (Phase 6 acquires filesystem locks).

    Returns:
        Provisioned directory structure for rank 0, None for non-main ranks.

    Raises:
        RuntimeError: If called after cleanup (single-use guard).
        OrchardDeviceError: If device resolution fails at runtime.
    """
    if self._cleaned_up:
        raise RuntimeError(
            "Cannot re-initialize after cleanup — "
            "RootOrchestrator is a single-use context manager"
        )
    if self._initialized:
        return self.paths

    # All ranks: deterministic seeding and thread configuration
    self._phase_1_determinism()
    applied_threads = self._phase_2_runtime_configuration()

    # Rank 0 only: filesystem, logging, persistence, locking, reporting
    if self.is_main_process:
        self._phase_3_filesystem_provisioning()
        self._phase_4_logging_initialization()

        # type guards: paths and logger are guaranteed after phases 3-4
        assert self.paths is not None, "Paths not initialized after phase 3"  # nosec B101
        assert self.run_logger is not None, "Logger not initialized after phase 4"  # nosec B101

        self._phase_5_run_manifest()
        self._phase_6_infrastructure_guarding()

        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            # resolve_device in HardwareConfig already handles GPU-unavailable
            # at config-time. If we reach here with device="cuda" in config,
            # CUDA was available then — a runtime failure (e.g. driver crash)
            # is unrecoverable. Silently falling back to CPU would waste hours
            # of compute with GPU-tuned hyperparameters (batch size, mixed
            # precision, etc.). Fail fast so the user can fix the environment.
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    else:
        logger.debug("Rank %d: skipping phases 3-6 (non-main process).", self.rank)
        # Non-main ranks still need their device for DDP readiness
        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    self._applied_threads = applied_threads
    self._initialized = True
    return self.paths

log_environment_report()

Emit the environment initialization report (phase 7).

Designed to be called explicitly by the CLI app after external services (e.g. MLflow tracker) have been started, so that all enter/exit log messages appear in the correct chronological order.

Source code in orchard/core/orchestrator.py
def log_environment_report(self) -> None:
    """
    Emit the environment initialization report (phase 7).

    Designed to be called explicitly by the CLI app after external
    services (e.g. MLflow tracker) have been started, so that all
    enter/exit log messages appear in the correct chronological order.
    """
    if self._initialized and self.is_main_process:
        self._phase_7_environment_report(self._applied_threads)

cleanup()

Releases system resources and removes execution lock file.

Guarantees clean state for subsequent runs by unlinking InfrastructureManager guards and closing logging handlers. Non-main ranks skip resource release (they never acquired locks or opened file-based log handlers).

Source code in orchard/core/orchestrator.py
def cleanup(self) -> None:
    """
    Releases system resources and removes execution lock file.

    Guarantees clean state for subsequent runs by unlinking
    InfrastructureManager guards and closing logging handlers.
    Non-main ranks skip resource release (they never acquired locks
    or opened file-based log handlers).
    """
    if not self.is_main_process:
        self._cleaned_up = True
        return

    cleanup_logger = self.run_logger or logging.getLogger(LOGGER_NAME)
    try:
        if self._infra_lock_acquired:
            self.infra.release_resources(self.cfg, logger=cleanup_logger)
            self._infra_lock_acquired = False
    except (OSError, RuntimeError) as e:
        cleanup_logger.error("Failed to release system lock: %s", e)

    self._close_logging_handlers()
    self._cleaned_up = True

get_device()

Resolves and caches optimal computation device (CUDA/CPU/MPS).

Returns:

Type Description
device

PyTorch device object for model execution

Source code in orchard/core/orchestrator.py
def get_device(self) -> torch.device:
    """
    Resolves and caches optimal computation device (CUDA/CPU/MPS).

    Returns:
        PyTorch device object for model execution
    """
    if self._device_cache is None:
        self._device_cache = self._device_resolver(
            device_str=self.cfg.hardware.device,
            local_rank=self.local_rank,
        )
    return self._device_cache

TaskComponents(criterion_factory, training_step, validation_metrics, eval_pipeline, fallback_metrics, early_stopping_thresholds) dataclass

Immutable bundle of task-specific strategy implementations.

Attributes:

Name Type Description
criterion_factory TaskCriterionFactory

Builds the loss function for this task.

training_step TaskTrainingStep

Executes the forward pass and computes training loss.

validation_metrics TaskValidationMetrics

Computes per-epoch validation metrics.

eval_pipeline TaskEvalPipeline

Orchestrates inference, visualization, and reporting.

fallback_metrics Mapping[str, float]

Metrics returned when validation fails during Optuna trials. Must contain at least the monitored metric key.

early_stopping_thresholds Mapping[str, float]

Default early-stopping thresholds per metric name (e.g. {"accuracy": 0.995, "auc": 0.9999}).

OrchardConfigError

Bases: OrchardError, ValueError

Configuration validation error (backward-compatible with ValueError).

OrchardDatasetError

Bases: OrchardError

Dataset loading, fetching, or validation error.

OrchardDeviceError

Bases: OrchardError, RuntimeError

Device resolution failed at runtime (e.g. driver crash after config validation).

OrchardError

Bases: Exception

Base exception for all Orchard ML errors.

OrchardExportError

Bases: OrchardError

Model export (ONNX) or checkpoint loading error.

OrchardInfrastructureError

Bases: OrchardError

OS-level resource lock acquisition or release failure.

ClassificationCriterionAdapter

Builds classification loss functions (CrossEntropy / Focal).

get_criterion(training, class_weights=None)

Delegate to the existing criterion factory.

Parameters:

Name Type Description Default
training TrainingConfig

Training sub-config with criterion parameters.

required
class_weights Tensor | None

Optional per-class weights for imbalanced datasets.

None

Returns:

Type Description
Module

Loss module (CrossEntropyLoss or FocalLoss).

Source code in orchard/tasks/classification/criterion_adapter.py
def get_criterion(
    self,
    training: TrainingConfig,
    class_weights: torch.Tensor | None = None,
) -> nn.Module:
    """
    Delegate to the existing criterion factory.

    Args:
        training: Training sub-config with criterion parameters.
        class_weights: Optional per-class weights for imbalanced datasets.

    Returns:
        Loss module (CrossEntropyLoss or FocalLoss).
    """
    return get_criterion(training, class_weights=class_weights)

ClassificationEvalPipelineAdapter

Orchestrates classification inference, visualization, and reporting.

run_evaluation(model, test_loader, train_losses, val_metrics_history, class_names, paths, training, dataset, augmentation, evaluation, arch_name, aug_info='N/A', tracker=None)

Delegate to the existing final evaluation pipeline.

Parameters:

Name Type Description Default
model Module

Trained model (already on target device).

required
test_loader DataLoader[Any]

DataLoader for test set.

required
train_losses list[float]

Training loss history per epoch.

required
val_metrics_history list[Mapping[str, float]]

Validation metrics history per epoch.

required
class_names list[str]

List of class label strings.

required
paths RunPaths

RunPaths for artifact output.

required
training TrainingConfig

Training sub-config.

required
dataset DatasetConfig

Dataset sub-config.

required
augmentation AugmentationConfig

Augmentation sub-config.

required
evaluation EvaluationConfig

Evaluation sub-config.

required
arch_name str

Architecture identifier.

required
aug_info str

Augmentation description string.

'N/A'
tracker TrackerProtocol | None

Optional experiment tracker for final metrics.

None

Returns:

Type Description
Mapping[str, float]

Mapping of metric names to float values.

Source code in orchard/tasks/classification/evaluation_adapter.py
def run_evaluation(
    self,
    model: nn.Module,
    test_loader: DataLoader[Any],
    train_losses: list[float],
    val_metrics_history: list[Mapping[str, float]],
    class_names: list[str],
    paths: RunPaths,
    training: TrainingConfig,
    dataset: DatasetConfig,
    augmentation: AugmentationConfig,
    evaluation: EvaluationConfig,
    arch_name: str,
    aug_info: str = "N/A",  # pragma: no mutate
    tracker: TrackerProtocol | None = None,
) -> Mapping[str, float]:
    """
    Delegate to the existing final evaluation pipeline.

    Args:
        model: Trained model (already on target device).
        test_loader: DataLoader for test set.
        train_losses: Training loss history per epoch.
        val_metrics_history: Validation metrics history per epoch.
        class_names: List of class label strings.
        paths: RunPaths for artifact output.
        training: Training sub-config.
        dataset: Dataset sub-config.
        augmentation: Augmentation sub-config.
        evaluation: Evaluation sub-config.
        arch_name: Architecture identifier.
        aug_info: Augmentation description string.
        tracker: Optional experiment tracker for final metrics.

    Returns:
        Mapping of metric names to float values.
    """
    macro_f1, test_acc, test_auc = run_final_evaluation(
        model=model,
        test_loader=test_loader,
        train_losses=train_losses,
        val_metrics_history=val_metrics_history,
        class_names=class_names,
        paths=paths,
        training=training,
        dataset=dataset,
        augmentation=augmentation,
        evaluation=evaluation,
        arch_name=arch_name,
        aug_info=aug_info,
        tracker=tracker,
    )
    return MappingProxyType(
        {
            METRIC_F1: macro_f1,
            METRIC_ACCURACY: test_acc,
            METRIC_AUC: test_auc,
        }
    )

ClassificationMetricsAdapter

Computes per-epoch classification metrics (loss, accuracy, AUC, F1).

compute_validation_metrics(model, val_loader, criterion, device)

Delegate to the existing validation engine.

Parameters:

Name Type Description Default
model Module

Neural network model to evaluate.

required
val_loader DataLoader[Any]

Validation data provider.

required
criterion Module

Loss function.

required
device device

Hardware target.

required

Returns:

Type Description
Mapping[str, float]

Immutable mapping with keys: loss, accuracy, auc, f1.

Source code in orchard/tasks/classification/metrics_adapter.py
def compute_validation_metrics(
    self,
    model: nn.Module,
    val_loader: DataLoader[Any],
    criterion: nn.Module,
    device: torch.device,
) -> Mapping[str, float]:
    """
    Delegate to the existing validation engine.

    Args:
        model: Neural network model to evaluate.
        val_loader: Validation data provider.
        criterion: Loss function.
        device: Hardware target.

    Returns:
        Immutable mapping with keys: loss, accuracy, auc, f1.
    """
    return validate_epoch(model, val_loader, criterion, device)

ClassificationTrainingStepAdapter

Computes classification training loss with optional MixUp blending.

compute_training_loss(model, inputs, targets, criterion, mixup_fn=None, device=None)

Execute classification forward pass and compute loss.

When mixup_fn is provided, inputs and targets are blended before the forward pass and the loss is computed as a convex combination of the two target sets.

Parameters:

Name Type Description Default
model Module

Neural network producing logits.

required
inputs Any

Batch of input tensors.

required
targets Any

Batch of target tensors.

required
criterion Module

Loss function (e.g. CrossEntropyLoss).

required
mixup_fn Callable[..., Any] | None

Optional MixUp augmentation callable.

None
device device | None

Target device for tensor placement.

None

Returns:

Type Description
Tensor

Scalar loss tensor for backward pass.

Source code in orchard/tasks/classification/training_step_adapter.py
def compute_training_loss(
    self,
    model: nn.Module,
    inputs: Any,
    targets: Any,
    criterion: nn.Module,
    mixup_fn: Callable[..., Any] | None = None,
    device: torch.device | None = None,
) -> torch.Tensor:
    """
    Execute classification forward pass and compute loss.

    When ``mixup_fn`` is provided, inputs and targets are blended
    before the forward pass and the loss is computed as a convex
    combination of the two target sets.

    Args:
        model: Neural network producing logits.
        inputs: Batch of input tensors.
        targets: Batch of target tensors.
        criterion: Loss function (e.g. CrossEntropyLoss).
        mixup_fn: Optional MixUp augmentation callable.
        device: Target device for tensor placement.

    Returns:
        Scalar loss tensor for backward pass.
    """
    if device is not None:
        inputs = inputs.to(device)
        targets = targets.to(device)
    if mixup_fn is not None:
        inputs, y_a, y_b, lam = mixup_fn(inputs, targets)
        outputs = model(inputs)
        loss: torch.Tensor = lam * criterion(outputs, y_a) + (1 - lam) * criterion(outputs, y_b)
        return loss
    outputs = model(inputs)
    result: torch.Tensor = criterion(outputs, targets)
    return result

DetectionCriterionAdapter

Returns a no-op sentinel criterion for detection tasks.

get_criterion(training, class_weights=None)

Return a sentinel criterion.

Detection models compute their own losses internally (classification loss, box regression loss, objectness, RPN box reg). The returned module raises RuntimeError if its forward() is ever called, making misuse immediately visible.

Parameters:

Name Type Description Default
training TrainingConfig

Training sub-config (ignored for detection).

required
class_weights Tensor | None

Per-class weights (ignored for detection).

None

Returns:

Type Description
Module

Sentinel nn.Module that raises on forward.

Source code in orchard/tasks/detection/criterion_adapter.py
def get_criterion(
    self,
    training: TrainingConfig,  # noqa: ARG002
    class_weights: torch.Tensor | None = None,  # noqa: ARG002
) -> nn.Module:
    """
    Return a sentinel criterion.

    Detection models compute their own losses internally (classification
    loss, box regression loss, objectness, RPN box reg). The returned
    module raises ``RuntimeError`` if its ``forward()`` is ever called,
    making misuse immediately visible.

    Args:
        training: Training sub-config (ignored for detection).
        class_weights: Per-class weights (ignored for detection).

    Returns:
        Sentinel ``nn.Module`` that raises on forward.
    """
    return _DetectionNoOpCriterion()

DetectionEvalPipelineAdapter

Orchestrates detection inference, mAP computation, and reporting.

run_evaluation(model, test_loader, train_losses, val_metrics_history, class_names, paths, training, dataset, augmentation, evaluation, arch_name, aug_info='N/A', tracker=None)

Run detection evaluation pipeline.

Computes mAP metrics on the test set, plots training loss curves, and optionally logs metrics to the experiment tracker.

Parameters:

Name Type Description Default
model Module

Trained detection model (already on target device).

required
test_loader DataLoader[Any]

DataLoader for test set.

required
train_losses list[float]

Training loss history per epoch.

required
val_metrics_history list[Mapping[str, float]]

Validation metrics history per epoch.

required
class_names list[str]

List of class label strings.

required
paths RunPaths

RunPaths for artifact output.

required
training TrainingConfig

Training sub-config.

required
dataset DatasetConfig

Dataset sub-config.

required
augmentation AugmentationConfig

Augmentation sub-config.

required
evaluation EvaluationConfig

Evaluation sub-config.

required
arch_name str

Architecture identifier.

required
aug_info str

Augmentation description string.

'N/A'
tracker TrackerProtocol | None

Optional experiment tracker for final metrics.

None

Returns:

Type Description
Mapping[str, float]

Mapping of detection metric names to float values.

Source code in orchard/tasks/detection/evaluation_adapter.py
def run_evaluation(
    self,
    model: nn.Module,
    test_loader: DataLoader[Any],
    train_losses: list[float],
    val_metrics_history: list[Mapping[str, float]],
    class_names: list[str],
    paths: RunPaths,
    training: TrainingConfig,
    dataset: DatasetConfig,
    augmentation: AugmentationConfig,  # noqa: ARG002
    evaluation: EvaluationConfig,
    arch_name: str,
    aug_info: str = "N/A",  # noqa: ARG002
    tracker: TrackerProtocol | None = None,
) -> Mapping[str, float]:
    """
    Run detection evaluation pipeline.

    Computes mAP metrics on the test set, plots training loss curves,
    and optionally logs metrics to the experiment tracker.

    Args:
        model: Trained detection model (already on target device).
        test_loader: DataLoader for test set.
        train_losses: Training loss history per epoch.
        val_metrics_history: Validation metrics history per epoch.
        class_names: List of class label strings.
        paths: RunPaths for artifact output.
        training: Training sub-config.
        dataset: Dataset sub-config.
        augmentation: Augmentation sub-config.
        evaluation: Evaluation sub-config.
        arch_name: Architecture identifier.
        aug_info: Augmentation description string.
        tracker: Optional experiment tracker for final metrics.

    Returns:
        Mapping of detection metric names to float values.
    """
    device = next(model.parameters()).device

    # Inference + mAP computation
    model.eval()
    metric = MeanAveragePrecision(iou_type="bbox")

    with torch.no_grad():
        for images, targets in test_loader:
            images_on_device = [img.to(device) for img in images]
            predictions = model(images_on_device)
            metric.update(
                [to_cpu(p) for p in predictions],
                [to_cpu(t) for t in targets],
            )

    result = metric.compute()
    test_metrics = {
        METRIC_MAP: float(result["map"]),
        METRIC_MAP_50: float(result["map_50"]),
        METRIC_MAP_75: float(result["map_75"]),
    }

    # Log results
    logger.info(
        "%s%s %-18s: mAP=%.4f  mAP@50=%.4f  mAP@75=%.4f",
        LogStyle.INDENT,
        LogStyle.ARROW,
        "Test Metrics",
        test_metrics[METRIC_MAP],
        test_metrics[METRIC_MAP_50],
        test_metrics[METRIC_MAP_75],
    )

    # Bbox visualization grid
    if evaluation.save_predictions_grid:
        show_detections(
            model=model,
            loader=test_loader,
            device=device,
            classes=class_names,
            save_path=paths.figures / f"detection_samples_{arch_name}_{dataset.resolution}.png",
            ctx=PlotContext(
                arch_name=arch_name,
                resolution=dataset.resolution,
                fig_dpi=evaluation.fig_dpi,
                plot_style=evaluation.plot_style,
                cmap_confusion=evaluation.cmap_confusion,
                grid_cols=evaluation.grid_cols,
                n_samples=evaluation.n_samples,
                fig_size_predictions=evaluation.fig_size_predictions,
                mean=dataset.mean,
                std=dataset.std,
            ),
        )

    # Training curves — plot mAP instead of loss (METRIC_LOSS is a 0.0 sentinel)
    val_map = [m.get(METRIC_MAP, 0.0) for m in val_metrics_history]
    ctx = PlotContext(
        arch_name=arch_name,
        resolution=dataset.resolution,
        fig_dpi=evaluation.fig_dpi,
        plot_style=evaluation.plot_style,
        cmap_confusion=evaluation.cmap_confusion,
        grid_cols=evaluation.grid_cols,
        n_samples=evaluation.n_samples,
        fig_size_predictions=evaluation.fig_size_predictions,
    )
    plot_training_curves(
        train_losses=train_losses,
        val_metric_values=val_map,
        out_path=paths.figures / "training_curves.png",
        ctx=ctx,
        val_label="Validation mAP",
    )

    # Structured report (Excel/CSV/JSON) — args tested in test_reporting.py
    report = create_structured_report(
        val_metrics=val_metrics_history,
        test_metrics=test_metrics,
        train_losses=train_losses,
        best_path=paths.best_model_path,
        log_path=paths.logs / "session.log",
        arch_name=arch_name,
        dataset=dataset,
        training=training,
        task_type="detection",
    )
    report.save(paths.final_report_path, fmt=evaluation.report_format)

    # Tracker logging
    if tracker is not None:
        full_metrics = {METRIC_LOSS: 0.0, **test_metrics}  # sentinel for tracker schema
        tracker.log_test_metrics(full_metrics)

    return MappingProxyType(test_metrics)

DetectionMetricsAdapter

Computes mAP validation metrics for object detection.

compute_validation_metrics(model, val_loader, criterion, device)

Run detection inference and compute mAP metrics.

Iterates the validation loader, collects predictions and targets, then computes mean Average Precision at multiple IoU thresholds.

Detection models do not produce a single validation loss in eval mode, so "loss" is returned as 0.0.

Parameters:

Name Type Description Default
model Module

Detection model to evaluate.

required
val_loader DataLoader[Any]

Validation data provider.

required
criterion Module

Ignored (detection models compute losses internally).

required
device device

Hardware target for inference.

required

Returns:

Type Description
Mapping[str, float]

Immutable mapping with keys: loss, map, map_50, map_75.

Source code in orchard/tasks/detection/metrics_adapter.py
def compute_validation_metrics(
    self,
    model: nn.Module,
    val_loader: DataLoader[Any],
    criterion: nn.Module,  # noqa: ARG002
    device: torch.device,
) -> Mapping[str, float]:
    """
    Run detection inference and compute mAP metrics.

    Iterates the validation loader, collects predictions and targets,
    then computes mean Average Precision at multiple IoU thresholds.

    Detection models do not produce a single validation loss in eval
    mode, so ``"loss"`` is returned as ``0.0``.

    Args:
        model: Detection model to evaluate.
        val_loader: Validation data provider.
        criterion: Ignored (detection models compute losses internally).
        device: Hardware target for inference.

    Returns:
        Immutable mapping with keys: ``loss``, ``map``, ``map_50``, ``map_75``.
    """
    model.eval()
    metric = MeanAveragePrecision(iou_type="bbox")

    with torch.no_grad():
        for images, targets in val_loader:
            images = [img.to(device) for img in images]
            predictions = model(images)
            metric.update(
                [to_cpu(p) for p in predictions],
                [to_cpu(t) for t in targets],
            )

    result = metric.compute()

    return MappingProxyType(
        {
            METRIC_LOSS: 0.0,  # sentinel — detection models don't expose validation loss
            METRIC_MAP: float(result["map"]),
            METRIC_MAP_50: float(result["map_50"]),
            METRIC_MAP_75: float(result["map_75"]),
        }
    )

DetectionTrainingStepAdapter

Computes detection training loss by summing model-internal losses.

compute_training_loss(model, inputs, targets, criterion, mixup_fn=None, device=None)

Execute detection forward pass and compute total loss.

Moves images and target dicts to device, calls the model in training mode (which returns a loss dict), and sums all loss components into a single scalar for backpropagation.

Parameters:

Name Type Description Default
model Module

Detection model (e.g. Faster R-CNN) in training mode.

required
inputs Any

List of image tensors, one per image in the batch.

required
targets Any

List of target dicts, each with boxes and labels.

required
criterion Module

Ignored (detection models compute losses internally).

required
mixup_fn Callable[..., Any] | None

Ignored (MixUp is not applicable to detection).

None
device device | None

Target device for tensor placement.

None

Returns:

Type Description
Tensor

Scalar loss tensor (sum of all loss components).

Source code in orchard/tasks/detection/training_step_adapter.py
def compute_training_loss(
    self,
    model: nn.Module,
    inputs: Any,
    targets: Any,
    criterion: nn.Module,  # noqa: ARG002
    mixup_fn: Callable[..., Any] | None = None,  # noqa: ARG002
    device: torch.device | None = None,
) -> torch.Tensor:
    """
    Execute detection forward pass and compute total loss.

    Moves images and target dicts to device, calls the model in
    training mode (which returns a loss dict), and sums all loss
    components into a single scalar for backpropagation.

    Args:
        model: Detection model (e.g. Faster R-CNN) in training mode.
        inputs: List of image tensors, one per image in the batch.
        targets: List of target dicts, each with ``boxes`` and ``labels``.
        criterion: Ignored (detection models compute losses internally).
        mixup_fn: Ignored (MixUp is not applicable to detection).
        device: Target device for tensor placement.

    Returns:
        Scalar loss tensor (sum of all loss components).
    """
    if device is not None:
        images = [img.to(device) for img in inputs]
        targets_on_device: list[dict[str, Any]] = [
            {k: v.to(device) for k, v in t.items()} for t in targets
        ]
    else:
        images = list(inputs)
        targets_on_device = list(targets)

    loss_dict = model(images, targets_on_device)
    total_loss = torch.stack(list(loss_dict.values())).sum()
    return total_loss

get_model(device, dataset_cfg, arch_cfg, verbose=True)

Factory function to resolve, instantiate, and prepare architectures.

It maps configuration identifiers to specific builder functions via an internal registry. Structural parameters like input channels and class cardinality are derived from the 'effective' geometry resolved by the DatasetConfig.

Parameters:

Name Type Description Default
device device

Hardware accelerator target.

required
dataset_cfg DatasetConfig

Dataset sub-config with resolved metadata.

required
arch_cfg ArchitectureConfig

Architecture sub-config with model selection.

required
verbose bool

If True, emit builder-internal INFO logging.

True

Returns:

Type Description
Module

nn.Module: The instantiated model synchronized with the target device.

Example

model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)

Raises:

Type Description
ValueError

If the requested architecture is not found in the registry.

Source code in orchard/architectures/factory.py
def get_model(
    device: torch.device,
    dataset_cfg: DatasetConfig,
    arch_cfg: ArchitectureConfig,
    verbose: bool = True,
) -> nn.Module:
    """
    Factory function to resolve, instantiate, and prepare architectures.

    It maps configuration identifiers to specific builder functions via an
    internal registry. Structural parameters like input channels and class
    cardinality are derived from the 'effective' geometry resolved by
    the DatasetConfig.

    Args:
        device: Hardware accelerator target.
        dataset_cfg: Dataset sub-config with resolved metadata.
        arch_cfg: Architecture sub-config with model selection.
        verbose: If True, emit builder-internal INFO logging.

    Returns:
        nn.Module: The instantiated model synchronized with the target device.

    Example:
        >>> model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)

    Raises:
        ValueError: If the requested architecture is not found in the registry.
    """
    # Resolve structural dimensions from sub-configs
    in_channels = dataset_cfg.effective_in_channels
    num_classes = dataset_cfg.num_classes
    model_name_lower = arch_cfg.name.lower()

    if verbose:
        logger.info(
            "%s%s %-18s: %s | Input: %dx%dx%d | Output: %d classes",
            LogStyle.INDENT,
            LogStyle.ARROW,
            "Architecture",
            arch_cfg.name,
            dataset_cfg.img_size,
            dataset_cfg.img_size,
            in_channels,
            num_classes,
        )

    # Instance construction and adaptation.
    # When verbose=False (e.g. export phase), suppress builder-internal INFO logs
    # to avoid duplicating messages already shown during training.
    _prev_level = logger.level
    if not verbose:
        logger.setLevel(logging.WARNING)
    try:
        with _suppress_download_noise():
            model = _dispatch_builder(
                model_name_lower, num_classes, in_channels, arch_cfg, dataset_cfg.resolution
            )
    finally:
        logger.setLevel(_prev_level)

    # Centralised device placement (builders stay device-agnostic)
    model = model.to(device)

    # Parameter telemetry
    if verbose:
        total_params = sum(p.numel() for p in model.parameters())
        logger.info(
            "%s%s %-18s: %s | Parameters: %s",
            LogStyle.INDENT,
            LogStyle.ARROW,
            "Deployed",
            str(device).upper(),
            f"{total_params:,}",
        )

    return model

log_pipeline_summary(test_metrics, best_model_path, run_dir, duration, onnx_path=None, logger_instance=None)

Log final pipeline completion summary.

Called at the end of the pipeline after all phases complete. Consolidates key metrics and artifact locations.

Parameters:

Name Type Description Default
test_metrics Mapping[str, float]

Task-specific test metrics mapping.

required
best_model_path Path

Path to best model checkpoint

required
run_dir Path

Root directory for this run

required
duration str

Human-readable duration string

required
onnx_path Path | None

Path to ONNX export (if performed)

None
logger_instance Logger | None

Logger instance to use (defaults to module logger)

None
Source code in orchard/core/logger/progress.py
def log_pipeline_summary(
    test_metrics: Mapping[str, float],
    best_model_path: Path,
    run_dir: Path,
    duration: str,
    onnx_path: Path | None = None,
    logger_instance: logging.Logger | None = None,
) -> None:
    """
    Log final pipeline completion summary.

    Called at the end of the pipeline after all phases complete.
    Consolidates key metrics and artifact locations.

    Args:
        test_metrics: Task-specific test metrics mapping.
        best_model_path: Path to best model checkpoint
        run_dir: Root directory for this run
        duration: Human-readable duration string
        onnx_path: Path to ONNX export (if performed)
        logger_instance: Logger instance to use (defaults to module logger)
    """
    log = logger_instance or logger

    I = LogStyle.INDENT  # noqa: E741
    A = LogStyle.ARROW
    S = LogStyle.SUCCESS

    Reporter.log_phase_header(log, "PIPELINE COMPLETE", LogStyle.DOUBLE)
    for key, value in test_metrics.items():
        label = key.replace("_", " ").title()
        if key == METRIC_ACCURACY:
            log.info("%s%s %-15s: %7.2f%%", I, S, label, value * 100)
        else:
            log.info("%s%s %-15s: %8.4f", I, S, label, value)
    log.info("%s%s Best Model     : %s", I, A, Path(best_model_path).name)
    if onnx_path:
        log.info("%s%s ONNX Export    : %s", I, A, Path(onnx_path).name)
    log.info("%s%s Run Directory  : %s", I, A, Path(run_dir).name)
    log.info("%s%s Duration       : %s", I, A, duration)
    log.info(LogStyle.DOUBLE)

register_task(task_type, components)

Register a task-specific component bundle.

Parameters:

Name Type Description Default
task_type str

Identifier matching a TaskType literal (e.g. "classification").

required
components TaskComponents

Frozen bundle of strategy implementations.

required

Raises:

Type Description
OrchardConfigError

If task_type is already registered.

Source code in orchard/core/task_registry.py
def register_task(task_type: str, components: TaskComponents) -> None:
    """
    Register a task-specific component bundle.

    Args:
        task_type: Identifier matching a ``TaskType`` literal
            (e.g. ``"classification"``).
        components: Frozen bundle of strategy implementations.

    Raises:
        OrchardConfigError: If ``task_type`` is already registered.
    """
    if task_type in _TASK_REGISTRY:
        raise OrchardConfigError(
            f"Task '{task_type}' is already registered. "
            "Duplicate registration indicates a plugin conflict."
        )
    _TASK_REGISTRY[task_type] = components

run_export_phase(orchestrator, checkpoint_path, cfg=None)

Execute model export phase.

Exports trained model to production format (ONNX) with validation. Export format and opset version are read from cfg.export.

Parameters:

Name Type Description Default
orchestrator RootOrchestrator

Active RootOrchestrator providing paths, device, logger

required
checkpoint_path Path

Path to trained model checkpoint (.pth)

required
cfg Config | None

Optional config override (defaults to orchestrator's config)

None

Returns:

Type Description
Path | None

Path to exported model, or None if export config is absent

Example

with RootOrchestrator(cfg) as orch: ... best_path, *_ = run_training_phase(orch) ... onnx_path = run_export_phase(orch, best_path) ... print(f"Exported to: {onnx_path}")

Source code in orchard/pipeline/phases.py
def run_export_phase(
    orchestrator: RootOrchestrator,
    checkpoint_path: Path,
    cfg: Config | None = None,
) -> Path | None:
    """
    Execute model export phase.

    Exports trained model to production format (ONNX) with validation.
    Export format and opset version are read from ``cfg.export``.

    Args:
        orchestrator: Active RootOrchestrator providing paths, device, logger
        checkpoint_path: Path to trained model checkpoint (.pth)
        cfg: Optional config override (defaults to orchestrator's config)

    Returns:
        Path to exported model, or None if export config is absent

    Example:
        >>> with RootOrchestrator(cfg) as orch:
        ...     best_path, *_ = run_training_phase(orch)
        ...     onnx_path = run_export_phase(orch, best_path)
        ...     print(f"Exported to: {onnx_path}")
    """
    cfg = cfg or orchestrator.cfg

    if cfg.export is None:
        return None

    # TODO(detection): Remove pragma when ONNX export is implemented for detection
    if cfg.task_type == "detection":  # pragma: no mutate block
        import warnings

        warnings.warn(
            "ONNX export is not yet supported for detection tasks. Skipping.",
            UserWarning,
            stacklevel=2,
        )
        return None

    paths = orchestrator.paths
    run_logger = orchestrator.run_logger

    # type guards for MyPy
    assert run_logger is not None, _ERR_LOGGER_NOT_INIT  # nosec B101
    assert paths is not None, _ERR_PATHS_NOT_INIT  # nosec B101

    Reporter.log_phase_header(run_logger, "MODEL EXPORT")

    # Determine input shape from config (must match get_model's channel resolution)
    resolution = cfg.dataset.resolution
    input_shape = (cfg.dataset.effective_in_channels, resolution, resolution)

    # Export path (directory managed by RunPaths)
    onnx_path = paths.exports / "model.onnx"

    # Reload model architecture (on CPU for export)
    export_model = get_model(
        device=torch.device("cpu"),
        dataset_cfg=cfg.dataset,
        arch_cfg=cfg.architecture,
        verbose=False,
    )

    export_cfg = cfg.export  # guaranteed non-None (checked above)
    export_to_onnx(
        model=export_model,
        checkpoint_path=checkpoint_path,
        output_path=onnx_path,
        input_shape=input_shape,
        opset_version=export_cfg.opset_version,
        dynamic_axes=export_cfg.dynamic_axes,
        do_constant_folding=export_cfg.do_constant_folding,
        validate=export_cfg.validate_export,
    )

    # Post-export quantization
    quantized_path = None
    if export_cfg.quantize:
        quantized_path = quantize_model(
            onnx_path=onnx_path,
            backend=export_cfg.quantization_backend,
            weight_type=export_cfg.quantization_type,
        )

    # Numerical validation: compare PyTorch vs ONNX outputs
    if export_cfg.validate_export:
        _validate_exported_models(export_model, onnx_path, quantized_path, input_shape, export_cfg)

    # Inference latency benchmark
    if export_cfg.benchmark:
        _benchmark_exported_models(
            onnx_path, quantized_path, input_shape, export_cfg, cfg.training.seed
        )

    logger.info("  %s Export completed", LogStyle.SUCCESS)
    logger.info("    %s Output            : %s", LogStyle.ARROW, onnx_path.name)
    if quantized_path:
        logger.info("    %s Quantized         : %s", LogStyle.ARROW, quantized_path.name)

    return onnx_path

run_optimization_phase(orchestrator, cfg=None, tracker=None)

Execute hyperparameter optimization phase.

Runs Optuna study with configured trials, pruning, and early stopping. Generates visualizations (if enabled) and exports best configuration.

Parameters:

Name Type Description Default
orchestrator RootOrchestrator

Active RootOrchestrator providing paths, device, logger

required
cfg Config | None

Optional config override (defaults to orchestrator's config)

None
tracker TrackerProtocol | None

Optional experiment tracker for MLflow nested trial logging

None

Returns:

Type Description
tuple[Study, Path | None]

tuple of (completed study, path to best_config.yaml or None)

Example

with RootOrchestrator(cfg) as orch: ... study, best_config_path = run_optimization_phase(orch) ... print(f"Best AUC: {study.best_value:.4f}")

Source code in orchard/pipeline/phases.py
def run_optimization_phase(
    orchestrator: RootOrchestrator,
    cfg: Config | None = None,
    tracker: TrackerProtocol | None = None,
) -> tuple[optuna.Study, Path | None]:
    """
    Execute hyperparameter optimization phase.

    Runs Optuna study with configured trials, pruning, and early stopping.
    Generates visualizations (if enabled) and exports best configuration.

    Args:
        orchestrator: Active RootOrchestrator providing paths, device, logger
        cfg: Optional config override (defaults to orchestrator's config)
        tracker: Optional experiment tracker for MLflow nested trial logging

    Returns:
        tuple of (completed study, path to best_config.yaml or None)

    Example:
        >>> with RootOrchestrator(cfg) as orch:
        ...     study, best_config_path = run_optimization_phase(orch)
        ...     print(f"Best AUC: {study.best_value:.4f}")
    """
    cfg = cfg or orchestrator.cfg
    paths = orchestrator.paths
    device = orchestrator.get_device()
    run_logger = orchestrator.run_logger

    # type guards for MyPy
    assert run_logger is not None, _ERR_LOGGER_NOT_INIT  # nosec B101
    assert paths is not None, _ERR_PATHS_NOT_INIT  # nosec B101

    Reporter.log_phase_header(run_logger, "HYPERPARAMETER OPTIMIZATION", LogStyle.DOUBLE)

    # Execute Optuna study (includes post-processing: visualizations, best config export)
    study = run_optimization(cfg=cfg, device=device, paths=paths, tracker=tracker)

    log_optimization_summary(
        study=study,
        cfg=cfg,
        device=device,
        paths=paths,
    )

    # Best config path is in reports dir (exported by orchestrator if save_best_config=True)
    candidate = paths.reports / BEST_CONFIG_FILENAME
    best_config_path: Path | None = candidate if candidate.exists() else None

    return study, best_config_path

run_training_phase(orchestrator, cfg=None, tracker=None)

Execute model training phase.

Loads dataset, initializes model, runs training with validation, and performs final evaluation on test set.

Parameters:

Name Type Description Default
orchestrator RootOrchestrator

Active RootOrchestrator providing paths, device, logger

required
cfg Config | None

Optional config override (defaults to orchestrator's config)

None
tracker TrackerProtocol | None

Optional experiment tracker for MLflow metric logging

None

Returns:

Type Description
TrainingResult

TrainingResult named tuple with best_model_path, train_losses,

TrainingResult

val_metrics, model, test_metrics.

Example

with RootOrchestrator(cfg) as orch: ... result = run_training_phase(orch) ... print(f"Test metrics: {result.test_metrics}")

Source code in orchard/pipeline/phases.py
def run_training_phase(
    orchestrator: RootOrchestrator,
    cfg: Config | None = None,
    tracker: TrackerProtocol | None = None,
) -> TrainingResult:
    """
    Execute model training phase.

    Loads dataset, initializes model, runs training with validation,
    and performs final evaluation on test set.

    Args:
        orchestrator: Active RootOrchestrator providing paths, device, logger
        cfg: Optional config override (defaults to orchestrator's config)
        tracker: Optional experiment tracker for MLflow metric logging

    Returns:
        TrainingResult named tuple with best_model_path, train_losses,
        val_metrics, model, test_metrics.

    Example:
        >>> with RootOrchestrator(cfg) as orch:
        ...     result = run_training_phase(orch)
        ...     print(f"Test metrics: {result.test_metrics}")
    """
    cfg = cfg or orchestrator.cfg
    paths = orchestrator.paths
    device = orchestrator.get_device()
    run_logger = orchestrator.run_logger

    # type guards for MyPy
    assert run_logger is not None, _ERR_LOGGER_NOT_INIT  # nosec B101
    assert paths is not None, _ERR_PATHS_NOT_INIT  # nosec B101

    # Dataset metadata (respects data_root override via _ensure_metadata)
    ds_meta = cfg.dataset._ensure_metadata

    # DATA PREPARATION
    Reporter.log_phase_header(run_logger, "DATA PREPARATION")

    data = load_dataset(ds_meta)
    loaders = get_dataloaders(
        data,
        cfg.dataset,
        cfg.training,
        cfg.augmentation,
        cfg.num_workers,
        task_type=cfg.task_type,
    )
    train_loader, val_loader, test_loader = loaders

    # show_samples_for_dataset assumes stacked Tensor batches — detection
    # batches (list[Tensor]) would crash on denormalization.
    if cfg.task_type != "detection":  # pragma: no mutate
        show_samples_for_dataset(
            loader=train_loader,
            dataset_name=cfg.dataset.dataset_name,
            run_paths=paths,
            mean=cfg.dataset.mean,
            std=cfg.dataset.std,
            arch_name=cfg.architecture.name,
            fig_dpi=cfg.evaluation.fig_dpi,
            num_samples=cfg.evaluation.n_samples,
            resolution=cfg.dataset.resolution,
        )

    # MODEL TRAINING
    Reporter.log_phase_header(
        run_logger, "TRAINING PIPELINE - " + cfg.architecture.name.upper(), LogStyle.DOUBLE
    )

    model = get_model(device=device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)

    class_weights = None
    if cfg.task_type == "classification" and cfg.training.weighted_loss:
        ds = cast(VisionDataset, train_loader.dataset)  # pragma: no mutate
        train_labels = ds.labels.flatten()
        class_weights = compute_class_weights(train_labels, ds_meta.num_classes, device)

    task = get_task(cfg.task_type)
    criterion = task.criterion_factory.get_criterion(cfg.training, class_weights=class_weights)
    optimizer = get_optimizer(model, cfg.training)
    scheduler = get_scheduler(optimizer, cfg.training)

    trainer = ModelTrainer(
        model=model,
        train_loader=train_loader,
        val_loader=val_loader,
        optimizer=optimizer,
        scheduler=scheduler,
        criterion=criterion,
        device=device,
        training=cfg.training,
        output_path=paths.best_model_path,
        tracker=tracker,
        training_step=task.training_step,
        validation_metrics=task.validation_metrics,
    )

    best_model_path, train_losses, val_metrics_history = trainer.train()

    # FINAL EVALUATION
    Reporter.log_phase_header(run_logger, "FINAL EVALUATION")

    test_metrics = task.eval_pipeline.run_evaluation(
        model=model,
        test_loader=test_loader,
        train_losses=train_losses,
        val_metrics_history=val_metrics_history,
        class_names=ds_meta.classes,
        paths=paths,
        training=cfg.training,
        dataset=cfg.dataset,
        augmentation=cfg.augmentation,
        evaluation=cfg.evaluation,
        arch_name=cfg.architecture.name,
        aug_info=get_augmentations_description(
            cfg.augmentation,
            cast("int", cfg.dataset.img_size),
            cfg.training.mixup_alpha,
            ds_meta=ds_meta,
        ),
        tracker=tracker,
    )

    return TrainingResult(
        best_model_path=best_model_path,
        train_losses=train_losses,
        val_metrics=val_metrics_history,
        model=model,
        test_metrics=test_metrics,
    )

create_tracker(cfg)

Factory: returns MLflowTracker if tracking is configured, else NoOpTracker.

Parameters:

Name Type Description Default
cfg Any

Config object. If cfg.tracking is set and enabled, returns MLflowTracker.

required

Returns:

Type Description
TrackerProtocol

Active tracker instance.

Source code in orchard/tracking/tracker.py
def create_tracker(cfg: Any) -> TrackerProtocol:
    """
    Factory: returns MLflowTracker if tracking is configured, else NoOpTracker.

    Args:
        cfg: Config object. If cfg.tracking is set and enabled, returns MLflowTracker.

    Returns:
        Active tracker instance.
    """
    tracking_cfg = getattr(cfg, "tracking", None)
    if tracking_cfg is None or not tracking_cfg.enabled:
        return NoOpTracker()

    if not _MLFLOW_AVAILABLE:
        logger.warning(
            "Tracking enabled in config but mlflow is not installed. "
            "Install with: pip install mlflow"
        )
        return NoOpTracker()

    return MLflowTracker(experiment_name=tracking_cfg.experiment_name)