Skip to content

orchestrator

orchard.core.orchestrator

Experiment Lifecycle Orchestration.

This module provides RootOrchestrator, the central coordinator for experiment execution. It manages the complete lifecycle from configuration validation to resource cleanup, ensuring deterministic and reproducible ML experiments.

Architecture:

  • Dependency Injection: All external dependencies are injectable for testability
  • 7-Phase Initialization: Phases 1-6 at construction, phase 7 deferred to CLI
  • Context Manager: Automatic resource acquisition and cleanup
  • Protocol-Based: type-safe abstractions for mockability

Key Components:

  • RootOrchestrator: Main lifecycle controller

Related Protocols (defined in their respective modules):

  • InfraManagerProtocol: config/infrastructure_config.py
  • ReporterProtocol: logger/env_reporter.py
  • TimeTrackerProtocol: environment/timing.py
  • AuditSaverProtocol: io/serialization.py
Example

from orchard.core import Config, RootOrchestrator cfg = Config.from_recipe(Path("recipes/config_mini_cnn.yaml")) with RootOrchestrator(cfg) as orchestrator: ... device = orchestrator.get_device() ... paths = orchestrator.paths ... # Run training pipeline phases

RootOrchestrator(cfg, infra_manager=None, reporter=None, time_tracker=None, audit_saver=None, log_initializer=None, seed_setter=None, thread_applier=None, system_configurator=None, static_dir_setup=None, device_resolver=None, rank=None, local_rank=None)

Central coordinator for ML experiment lifecycle management.

Orchestrates the complete initialization sequence from configuration validation through resource provisioning to execution readiness. Implements a 7-phase initialization protocol (phases 1-6 eager, phase 7 deferred) with dependency injection for maximum testability.

The orchestrator follows the Single Responsibility Principle by delegating specialized tasks to injected dependencies while maintaining overall coordination. Uses the Context Manager pattern to guarantee resource cleanup even during failures.

Initialization Phases:

  1. Determinism: Global RNG seeding (Python, NumPy, PyTorch)
  2. Runtime Configuration: CPU thread affinity, system libraries
  3. Filesystem Provisioning: Dynamic workspace creation via RunPaths
  4. Logging Initialization: File-based persistent logging setup
  5. Config Persistence: YAML manifest export for auditability
  6. Infrastructure Guarding: OS-level resource locks (prevents race conditions)
  7. Environment Reporting: Comprehensive telemetry logging

Dependency Injection:

All external dependencies are injectable with sensible defaults:

  • infra_manager: OS resource management (locks, cleanup)
  • reporter: Environment telemetry engine
  • log_initializer: Logging setup strategy
  • seed_setter: RNG seeding function
  • thread_applier: CPU thread configuration
  • system_configurator: System library setup (matplotlib, etc)
  • static_dir_setup: Static directory creation
  • audit_saver: Config YAML + requirements snapshot persistence
  • device_resolver: Hardware device detection

Attributes:

Name Type Description
cfg Config

Validated global configuration (Single Source of Truth)

rank int

Global rank of this process (0 in single-process mode)

local_rank int

Node-local rank for GPU assignment (0 in single-process mode)

is_main_process bool

True for rank 0, False for non-main ranks

infra InfraManagerProtocol

Infrastructure resource manager

reporter ReporterProtocol

Environment telemetry engine

time_tracker TimeTrackerProtocol

Pipeline duration tracker

paths RunPaths | None

Session-specific directory structure (None on non-main ranks)

run_logger Logger | None

Active logger instance (None on non-main ranks)

repro_mode bool

Strict determinism flag

warn_only_mode bool

Warn-only mode for strict determinism

num_workers int

DataLoader worker processes

Example

cfg = Config.from_recipe(Path("recipes/config_mini_cnn.yaml")) with RootOrchestrator(cfg) as orch: ... device = orch.get_device() ... logger = orch.run_logger ... paths = orch.paths ... # Execute training pipeline with guaranteed cleanup

Notes:

  • Thread-safe: Single-instance locking via InfrastructureManager
  • Idempotent: initialize_core_services() is safe to call multiple times (subsequent calls return cached RunPaths without re-executing phases)
  • Auditable: All configuration saved to YAML in workspace
  • Deterministic: Reproducible experiments via strict seeding

Initializes orchestrator with dependency injection.

Parameters:

Name Type Description Default
cfg 'Config'

Validated global configuration (SSOT)

required
infra_manager InfraManagerProtocol | None

Infrastructure management handler (default: InfrastructureManager())

None
reporter ReporterProtocol | None

Environment reporting engine (default: Reporter())

None
time_tracker TimeTrackerProtocol | None

Pipeline duration tracker (default: TimeTracker())

None
audit_saver AuditSaverProtocol | None

Run-manifest persistence — config YAML + dependency snapshot (default: AuditSaver())

None
log_initializer Callable[..., Any] | None

Logging setup function (default: Logger.setup)

None
seed_setter Callable[..., Any] | None

RNG seeding function (default: set_seed)

None
thread_applier Callable[..., Any] | None

CPU thread configuration (default: apply_cpu_threads)

None
system_configurator Callable[..., Any] | None

System library setup (default: configure_system_libraries)

None
static_dir_setup Callable[..., Any] | None

Static directory creation (default: setup_static_directories)

None
device_resolver Callable[..., Any] | None

Device resolution (default: to_device_obj)

None
rank int | None

Global rank of this process (default: auto-detected from RANK env var). Rank 0 executes all phases; rank N skips filesystem, logging, config persistence, infrastructure locking, and reporting.

None
local_rank int | None

Node-local rank for GPU assignment (default: auto-detected from LOCAL_RANK env var). Used by device_resolver to select the correct GPU in multi-GPU distributed setups.

None
Source code in orchard/core/orchestrator.py
def __init__(
    self,
    cfg: "Config",
    infra_manager: InfraManagerProtocol | None = None,
    reporter: ReporterProtocol | None = None,
    time_tracker: TimeTrackerProtocol | None = None,
    audit_saver: AuditSaverProtocol | None = None,
    log_initializer: Callable[..., Any] | None = None,
    seed_setter: Callable[..., Any] | None = None,
    thread_applier: Callable[..., Any] | None = None,
    system_configurator: Callable[..., Any] | None = None,
    static_dir_setup: Callable[..., Any] | None = None,
    device_resolver: Callable[..., Any] | None = None,
    rank: int | None = None,
    local_rank: int | None = None,
) -> None:
    """
    Initializes orchestrator with dependency injection.

    Args:
        cfg: Validated global configuration (SSOT)
        infra_manager: Infrastructure management handler (default: InfrastructureManager())
        reporter: Environment reporting engine (default: Reporter())
        time_tracker: Pipeline duration tracker (default: TimeTracker())
        audit_saver: Run-manifest persistence — config YAML + dependency
            snapshot (default: AuditSaver())
        log_initializer: Logging setup function (default: Logger.setup)
        seed_setter: RNG seeding function (default: set_seed)
        thread_applier: CPU thread configuration (default: apply_cpu_threads)
        system_configurator: System library setup (default: configure_system_libraries)
        static_dir_setup: Static directory creation (default: setup_static_directories)
        device_resolver: Device resolution (default: to_device_obj)
        rank: Global rank of this process (default: auto-detected from RANK env var).
            Rank 0 executes all phases; rank N skips filesystem, logging,
            config persistence, infrastructure locking, and reporting.
        local_rank: Node-local rank for GPU assignment (default: auto-detected
            from LOCAL_RANK env var). Used by device_resolver to select the
            correct GPU in multi-GPU distributed setups.
    """
    self.cfg = cfg

    # Dependency injection: _resolve for objects, _resolve_callable for functions
    self.rank = _resolve(rank, get_rank)
    self.local_rank = _resolve(local_rank, get_local_rank)
    self.is_main_process = self.rank == 0
    self.infra = _resolve(infra_manager, InfrastructureManager)
    self.reporter = _resolve(reporter, Reporter)
    self.time_tracker = _resolve(time_tracker, TimeTracker)
    self._audit_saver = _resolve(audit_saver, AuditSaver)
    self._log_initializer = _resolve_callable(log_initializer, Logger.setup)
    self._seed_setter = _resolve_callable(seed_setter, set_seed)
    self._thread_applier = _resolve_callable(thread_applier, apply_cpu_threads)
    self._system_configurator = _resolve_callable(
        system_configurator, configure_system_libraries
    )
    self._static_dir_setup = _resolve_callable(static_dir_setup, setup_static_directories)
    self._device_resolver = _resolve_callable(device_resolver, to_device_obj)

    # Lazy initialization
    self._initialized: bool = False
    self._cleaned_up: bool = False
    self._infra_lock_acquired: bool = False
    self._applied_threads: int = 0
    self.paths: RunPaths | None = None
    self.run_logger: logging.Logger | None = None
    self._device_cache: torch.device | None = None

    # Policy extraction from SSOT
    self.repro_mode = self.cfg.hardware.use_deterministic_algorithms
    self.warn_only_mode = self.cfg.hardware.deterministic_warn_only
    self.num_workers = self.cfg.hardware.effective_num_workers

__enter__()

Context Manager entry — triggers the initialization sequence.

Starts the pipeline timer and delegates to initialize_core_services() for phases 1-6 (seeding, runtime config, filesystem, logging, config persistence, infrastructure locking, and device resolution). Phase 7 (environment reporting) is deferred to log_environment_report().

If any phase raises (including KeyboardInterrupt / SystemExit), cleanup() is called before re-raising to ensure partial resources (locks, file handles) are released even on failure.

Returns:

Type Description
'RootOrchestrator'

Fully initialized RootOrchestrator ready for pipeline execution.

Raises:

Type Description
BaseException

Re-raises any initialization error after cleanup.

Source code in orchard/core/orchestrator.py
def __enter__(self) -> "RootOrchestrator":
    """
    Context Manager entry — triggers the initialization sequence.

    Starts the pipeline timer and delegates to initialize_core_services()
    for phases 1-6 (seeding, runtime config, filesystem, logging,
    config persistence, infrastructure locking, and device resolution).
    Phase 7 (environment reporting) is deferred to log_environment_report().

    If any phase raises (including KeyboardInterrupt / SystemExit),
    cleanup() is called before re-raising to ensure partial resources
    (locks, file handles) are released even on failure.

    Returns:
        Fully initialized RootOrchestrator ready for pipeline execution.

    Raises:
        BaseException: Re-raises any initialization error after cleanup.
    """
    try:
        self.time_tracker.start()
        self.initialize_core_services()
        return self
    except BaseException:
        self.cleanup()
        raise

__exit__(exc_type, exc_val, exc_tb)

Context Manager exit — stops timer and guarantees resource teardown.

Invoked automatically when leaving the with block, whether the pipeline completed normally or raised an exception. Stops the timer, then delegates to cleanup() for infrastructure lock release and logging handler closure.

Error reporting is intentionally left to the caller (CLI layer), which has the user-facing context to log appropriate messages.

Returns False so that any exception propagates to the caller unchanged.

Parameters:

Name Type Description Default
exc_type type[BaseException] | None

Exception class if the block raised, else None.

required
exc_val BaseException | None

Exception instance if the block raised, else None.

required
exc_tb TracebackType | None

Traceback object if the block raised, else None.

required

Returns:

Type Description
Literal[False]

Always False — exceptions are never suppressed.

Source code in orchard/core/orchestrator.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> Literal[False]:
    """
    Context Manager exit — stops timer and guarantees resource teardown.

    Invoked automatically when leaving the ``with`` block, whether the
    pipeline completed normally or raised an exception. Stops the timer,
    then delegates to cleanup() for infrastructure lock release and
    logging handler closure.

    Error reporting is intentionally left to the caller (CLI layer),
    which has the user-facing context to log appropriate messages.

    Returns False so that any exception propagates to the caller unchanged.

    Args:
        exc_type: Exception class if the block raised, else None.
        exc_val: Exception instance if the block raised, else None.
        exc_tb: Traceback object if the block raised, else None.

    Returns:
        Always False — exceptions are never suppressed.
    """
    # Stop timer (duration already shown in pipeline summary)
    self.time_tracker.stop()

    self.cleanup()
    return False

initialize_core_services()

Executes linear sequence of environment initialization phases.

Synchronizes global state through phases 1-6, progressing from deterministic seeding to device resolution. Phase 7 (environment reporting) is deferred to log_environment_report().

In distributed mode (torchrun / DDP), only the main process (rank 0) executes phases 3-6 (filesystem, logging, config persistence, infra locking). All ranks execute phases 1-2 (seeding, threads) for identical RNG state and thread affinity, plus device resolution for DDP readiness (each rank binds to cuda:{local_rank}).

Idempotent: guarded by _initialized flag. If already initialized, returns existing RunPaths without re-executing any phase. This prevents orphaned directories (Phase 3 creates unique paths per call) and resource leaks (Phase 6 acquires filesystem locks).

Returns:

Type Description
RunPaths | None

Provisioned directory structure for rank 0, None for non-main ranks.

Raises:

Type Description
RuntimeError

If called after cleanup (single-use guard).

OrchardDeviceError

If device resolution fails at runtime.

Source code in orchard/core/orchestrator.py
def initialize_core_services(self) -> RunPaths | None:
    """
    Executes linear sequence of environment initialization phases.

    Synchronizes global state through phases 1-6, progressing from
    deterministic seeding to device resolution. Phase 7 (environment
    reporting) is deferred to log_environment_report().

    In distributed mode (torchrun / DDP), only the main process (rank 0)
    executes phases 3-6 (filesystem, logging, config persistence, infra
    locking).  All ranks execute phases 1-2 (seeding, threads) for
    identical RNG state and thread affinity, plus device resolution
    for DDP readiness (each rank binds to ``cuda:{local_rank}``).

    Idempotent: guarded by ``_initialized`` flag. If already initialized,
    returns existing RunPaths without re-executing any phase. This prevents
    orphaned directories (Phase 3 creates unique paths per call) and
    resource leaks (Phase 6 acquires filesystem locks).

    Returns:
        Provisioned directory structure for rank 0, None for non-main ranks.

    Raises:
        RuntimeError: If called after cleanup (single-use guard).
        OrchardDeviceError: If device resolution fails at runtime.
    """
    if self._cleaned_up:
        raise RuntimeError(
            "Cannot re-initialize after cleanup — "
            "RootOrchestrator is a single-use context manager"
        )
    if self._initialized:
        return self.paths

    # All ranks: deterministic seeding and thread configuration
    self._phase_1_determinism()
    applied_threads = self._phase_2_runtime_configuration()

    # Rank 0 only: filesystem, logging, persistence, locking, reporting
    if self.is_main_process:
        self._phase_3_filesystem_provisioning()
        self._phase_4_logging_initialization()

        # type guards: paths and logger are guaranteed after phases 3-4
        assert self.paths is not None, "Paths not initialized after phase 3"  # nosec B101
        assert self.run_logger is not None, "Logger not initialized after phase 4"  # nosec B101

        self._phase_5_run_manifest()
        self._phase_6_infrastructure_guarding()

        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            # resolve_device in HardwareConfig already handles GPU-unavailable
            # at config-time. If we reach here with device="cuda" in config,
            # CUDA was available then — a runtime failure (e.g. driver crash)
            # is unrecoverable. Silently falling back to CPU would waste hours
            # of compute with GPU-tuned hyperparameters (batch size, mixed
            # precision, etc.). Fail fast so the user can fix the environment.
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    else:
        logger.debug("Rank %d: skipping phases 3-6 (non-main process).", self.rank)
        # Non-main ranks still need their device for DDP readiness
        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    self._applied_threads = applied_threads
    self._initialized = True
    return self.paths

log_environment_report()

Emit the environment initialization report (phase 7).

Designed to be called explicitly by the CLI app after external services (e.g. MLflow tracker) have been started, so that all enter/exit log messages appear in the correct chronological order.

Source code in orchard/core/orchestrator.py
def log_environment_report(self) -> None:
    """
    Emit the environment initialization report (phase 7).

    Designed to be called explicitly by the CLI app after external
    services (e.g. MLflow tracker) have been started, so that all
    enter/exit log messages appear in the correct chronological order.
    """
    if self._initialized and self.is_main_process:
        self._phase_7_environment_report(self._applied_threads)

cleanup()

Releases system resources and removes execution lock file.

Guarantees clean state for subsequent runs by unlinking InfrastructureManager guards and closing logging handlers. Non-main ranks skip resource release (they never acquired locks or opened file-based log handlers).

Source code in orchard/core/orchestrator.py
def cleanup(self) -> None:
    """
    Releases system resources and removes execution lock file.

    Guarantees clean state for subsequent runs by unlinking
    InfrastructureManager guards and closing logging handlers.
    Non-main ranks skip resource release (they never acquired locks
    or opened file-based log handlers).
    """
    if not self.is_main_process:
        self._cleaned_up = True
        return

    cleanup_logger = self.run_logger or logging.getLogger(LOGGER_NAME)
    try:
        if self._infra_lock_acquired:
            self.infra.release_resources(self.cfg, logger=cleanup_logger)
            self._infra_lock_acquired = False
    except (OSError, RuntimeError) as e:
        cleanup_logger.error("Failed to release system lock: %s", e)

    self._close_logging_handlers()
    self._cleaned_up = True

get_device()

Resolves and caches optimal computation device (CUDA/CPU/MPS).

Returns:

Type Description
device

PyTorch device object for model execution

Source code in orchard/core/orchestrator.py
def get_device(self) -> torch.device:
    """
    Resolves and caches optimal computation device (CUDA/CPU/MPS).

    Returns:
        PyTorch device object for model execution
    """
    if self._device_cache is None:
        self._device_cache = self._device_resolver(
            device_str=self.cfg.hardware.device,
            local_rank=self.local_rank,
        )
    return self._device_cache