Skip to content

core

orchard.core

Core Utilities Package.

This package exposes the essential components for configuration, logging, system management, project constants, and the dynamic dataset registry. It also includes the RootOrchestrator to manage experiment lifecycle initialization.

InfraManagerProtocol

Bases: Protocol

Protocol defining infrastructure management interface.

Enables dependency injection and mocking in tests while ensuring consistent lifecycle management across implementations.

prepare_environment(cfg, logger)

Prepare execution environment before experiment run.

Parameters:

Name Type Description Default
cfg 'HardwareAwareConfig'

Configuration with hardware manifest access.

required
logger Logger

Logger instance for status reporting.

required
Source code in orchard/core/config/infrastructure_config.py
def prepare_environment(self, cfg: "HardwareAwareConfig", logger: logging.Logger) -> None:
    """
    Prepare execution environment before experiment run.

    Args:
        cfg: Configuration with hardware manifest access.
        logger: Logger instance for status reporting.
    """
    ...  # pragma: no cover

release_resources(cfg, logger)

Release resources allocated during environment preparation.

Parameters:

Name Type Description Default
cfg 'HardwareAwareConfig'

Configuration used during resource allocation.

required
logger Logger

Logger instance for status reporting.

required
Source code in orchard/core/config/infrastructure_config.py
def release_resources(self, cfg: "HardwareAwareConfig", logger: logging.Logger) -> None:
    """
    Release resources allocated during environment preparation.

    Args:
        cfg: Configuration used during resource allocation.
        logger: Logger instance for status reporting.
    """
    ...  # pragma: no cover

TimeTracker()

Default implementation of TimeTrackerProtocol.

Tracks elapsed time between start() and stop() calls, providing both raw seconds and formatted output.

Source code in orchard/core/environment/timing.py
def __init__(self) -> None:
    self._start_time: float | None = None
    self._end_time: float | None = None

elapsed_seconds property

Total elapsed time in seconds.

elapsed_formatted property

Human-readable elapsed time string (e.g., '1h 23m 45s').

start()

Record pipeline start time.

Source code in orchard/core/environment/timing.py
def start(self) -> None:
    """Record pipeline start time."""
    self._start_time = time.time()
    self._end_time = None

stop()

Record stop time and return elapsed seconds.

Source code in orchard/core/environment/timing.py
def stop(self) -> float:
    """Record stop time and return elapsed seconds."""
    self._end_time = time.time()
    return self.elapsed_seconds

TimeTrackerProtocol

Bases: Protocol

Protocol for pipeline duration tracking.

elapsed_seconds property

Total elapsed time in seconds.

elapsed_formatted property

Human-readable elapsed time string.

start()

Record pipeline start time.

Source code in orchard/core/environment/timing.py
def start(self) -> None:
    """Record pipeline start time."""
    ...  # pragma: no cover

stop()

Record stop time and return elapsed seconds.

Source code in orchard/core/environment/timing.py
def stop(self) -> float:
    """Record stop time and return elapsed seconds."""
    ...  # pragma: no cover

Logger(name=LOGGER_NAME, log_dir=None, log_to_file=True, level=logging.INFO, max_bytes=5 * 1024 * 1024, backup_count=5)

Manages centralized logging configuration with singleton-like behavior.

Provides a unified logging interface for the entire framework with support for dynamic reconfiguration. Initially bootstraps with console-only output, then transitions to dual console+file logging when experiment directories become available.

The logger implements pseudo-singleton semantics via class-level tracking (_configured_names) to prevent duplicate handler registration while allowing intentional reconfiguration when log directories are provided.

Lifecycle
  1. Bootstrap Phase: Console-only logging (no log_dir specified)
  2. Orchestration Phase: RootOrchestrator calls setup() with log_dir
  3. Reconfiguration: Existing handlers removed, file handler added

Class Attributes: _configured_names (dict[str, bool]): Tracks which logger names have been configured

Attributes:

Name Type Description
name str

Logger identifier (typically LOGGER_NAME constant)

log_dir Path | None

Directory for log file storage

log_to_file bool

Enable file logging (requires log_dir)

level int

Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)

max_bytes int

Maximum log file size before rotation (default: 5MB)

backup_count int

Number of rotated log files to retain (default: 5)

_log Logger

Underlying Python logger instance

Example

Bootstrap phase (console-only)

logger = Logger().get_logger() logger.info("Framework initializing...")

Orchestration phase (add file logging)

logger = Logger.setup( ... name=LOGGER_NAME, ... log_dir=Path("./outputs/run_123/logs"), ... level="INFO" ... ) logger.info("Logging to file now")

Notes:

  • Reconfiguration is idempotent: calling setup() multiple times is safe
  • All handlers are properly closed before reconfiguration
  • Log files use UTC timestamps for consistency across time zones
  • RotatingFileHandler prevents disk space exhaustion

Initializes the Logger with specified configuration.

Parameters:

Name Type Description Default
name str

Logger identifier (default: LOGGER_NAME constant)

LOGGER_NAME
log_dir Path | None

Directory for log file storage (None = console-only)

None
log_to_file bool

Enable file logging if log_dir provided (default: True)

True
level int

Logging level as integer constant (default: logging.INFO)

INFO
max_bytes int

Maximum log file size before rotation in bytes (default: 5MB)

5 * 1024 * 1024
backup_count int

Number of rotated backup files to retain (default: 5)

5
Source code in orchard/core/logger/logger.py
def __init__(
    self,
    name: str = LOGGER_NAME,
    log_dir: Path | None = None,
    log_to_file: bool = True,
    level: int = logging.INFO,
    max_bytes: int = 5 * 1024 * 1024,
    backup_count: int = 5,
) -> None:
    """
    Initializes the Logger with specified configuration.

    Args:
        name: Logger identifier (default: LOGGER_NAME constant)
        log_dir: Directory for log file storage (None = console-only)
        log_to_file: Enable file logging if log_dir provided (default: True)
        level: Logging level as integer constant (default: logging.INFO)
        max_bytes: Maximum log file size before rotation in bytes (default: 5MB)
        backup_count: Number of rotated backup files to retain (default: 5)
    """
    self.name = name
    self.log_dir = log_dir
    self.log_to_file = log_to_file and (log_dir is not None)
    self.level = level
    self.max_bytes = max_bytes
    self.backup_count = backup_count

    self._log = logging.getLogger(name)

    if name not in Logger._configured_names or log_dir is not None:
        self._setup_logger()
        Logger._configured_names[name] = True

get_logger()

Returns the configured logging.Logger instance.

Returns:

Type Description
Logger

The underlying Python logging.Logger instance with configured handlers

Source code in orchard/core/logger/logger.py
def get_logger(self) -> logging.Logger:
    """
    Returns the configured logging.Logger instance.

    Returns:
        The underlying Python logging.Logger instance with configured handlers
    """
    return self._log

setup(name, log_dir=None, level='INFO', **kwargs) classmethod

Main entry point for configuring the logger, called by RootOrchestrator.

Bridges semantic LogLevel strings (INFO, DEBUG, WARNING) to Python logging constants. Provides convenient string-based level specification while internally using numeric logging constants.

Parameters:

Name Type Description Default
name str

Logger identifier (typically LOGGER_NAME constant)

required
log_dir Path | None

Directory for log file storage (None = console-only mode)

None
level str

Logging level as string (DEBUG, INFO, WARNING, ERROR, CRITICAL)

'INFO'
**kwargs Any

Additional arguments passed to Logger constructor

{}

Returns:

Type Description
Logger

Configured logging.Logger instance ready for use

Environment Variables

DEBUG: If set to "1", overrides level to DEBUG regardless of level parameter

Example

logger = Logger.setup( ... name="OrchardML", ... log_dir=Path("./outputs/run_123/logs"), ... level="INFO" ... ) logger.info("Training started")

Source code in orchard/core/logger/logger.py
@classmethod
def setup(
    cls, name: str, log_dir: Path | None = None, level: str = "INFO", **kwargs: Any
) -> logging.Logger:
    """
    Main entry point for configuring the logger, called by RootOrchestrator.

    Bridges semantic LogLevel strings (INFO, DEBUG, WARNING) to Python logging
    constants. Provides convenient string-based level specification while internally
    using numeric logging constants.

    Args:
        name: Logger identifier (typically LOGGER_NAME constant)
        log_dir: Directory for log file storage (None = console-only mode)
        level: Logging level as string (DEBUG, INFO, WARNING, ERROR, CRITICAL)
        **kwargs (Any): Additional arguments passed to Logger constructor

    Returns:
        Configured logging.Logger instance ready for use

    Environment Variables:
        DEBUG: If set to "1", overrides level to DEBUG regardless of level parameter

    Example:
        >>> logger = Logger.setup(
        ...     name="OrchardML",
        ...     log_dir=Path("./outputs/run_123/logs"),
        ...     level="INFO"
        ... )
        >>> logger.info("Training started")
    """
    if os.getenv("DEBUG") == "1":
        numeric_level = logging.DEBUG
    else:
        numeric_level = getattr(logging, level.upper(), logging.INFO)

    return cls(name=name, log_dir=log_dir, level=numeric_level, **kwargs).get_logger()

LogStyle

Unified logging style constants for consistent visual hierarchy.

Provides separators, symbols, indentation, and ANSI color codes used by all logging modules. Placed here (in paths.constants) rather than in logger.styles so that low-level packages (environment, config) can reference the constants without triggering circular imports.

Reporter

Bases: BaseModel

Centralized logging and reporting utility for experiment lifecycle events.

Transforms complex configuration states and hardware objects into human-readable logs. Called by Orchestrator during initialization.

log_phase_header(log, title, style=None) staticmethod

Log a centered phase header with separator lines.

Parameters:

Name Type Description Default
log Logger

Logger instance to write to.

required
title str

Header text (will be uppercased and centered).

required
style str | None

Separator string (defaults to LogStyle.HEAVY).

None
Source code in orchard/core/logger/env_reporter.py
@staticmethod
def log_phase_header(
    log: logging.Logger,
    title: str,
    style: str | None = None,
) -> None:
    """
    Log a centered phase header with separator lines.

    Args:
        log: Logger instance to write to.
        title: Header text (will be uppercased and centered).
        style: Separator string (defaults to ``LogStyle.HEAVY``).
    """
    sep = style if style is not None else LogStyle.HEAVY
    log.info("")
    log.info(sep)
    log.info(title.center(LogStyle.HEADER_WIDTH))
    log.info(sep)

log_initial_status(logger_instance, cfg, paths, device, applied_threads, num_workers)

Logs verified baseline environment configuration upon initialization.

Parameters:

Name Type Description Default
logger_instance Logger

Active experiment logger

required
cfg 'Config'

Validated global configuration manifest

required
paths 'RunPaths'

Dynamic path orchestrator for current session

required
device 'torch.device'

Resolved PyTorch compute device

required
applied_threads int

Number of intra-op threads assigned

required
num_workers int

Number of DataLoader workers

required
Source code in orchard/core/logger/env_reporter.py
def log_initial_status(
    self,
    logger_instance: logging.Logger,
    cfg: "Config",
    paths: "RunPaths",
    device: "torch.device",
    applied_threads: int,
    num_workers: int,
) -> None:
    """
    Logs verified baseline environment configuration upon initialization.

    Args:
        logger_instance: Active experiment logger
        cfg: Validated global configuration manifest
        paths: Dynamic path orchestrator for current session
        device: Resolved PyTorch compute device
        applied_threads: Number of intra-op threads assigned
        num_workers: Number of DataLoader workers
    """
    # Header Block
    Reporter.log_phase_header(
        logger_instance, "ENVIRONMENT INITIALIZATION"
    )  # pragma: no mutate

    I = LogStyle.INDENT  # noqa: E741
    A = LogStyle.ARROW

    # Experiment identifier
    logger_instance.info("%s%s %-18s: %s", I, A, "Experiment", cfg.run_slug)
    logger_instance.info("")

    # Task Section
    logger_instance.info("[TASK]")
    logger_instance.info("%s%s %-18s: %s", I, A, "Type", cfg.task_type.capitalize())
    logger_instance.info("")

    # Hardware Section
    self._log_hardware_section(logger_instance, cfg, device, applied_threads, num_workers)
    logger_instance.info("")

    # Dataset Section
    self._log_dataset_section(logger_instance, cfg)
    logger_instance.info("")

    # Strategy Section
    self._log_strategy_section(logger_instance, cfg, device)
    logger_instance.info("")

    # Hyperparameters Section
    logger_instance.info("[HYPERPARAMETERS]")
    logger_instance.info("%s%s %-18s: %s", I, A, "Epochs", cfg.training.epochs)
    logger_instance.info("%s%s %-18s: %s", I, A, "Batch Size", cfg.training.batch_size)
    lr = cfg.training.learning_rate
    lr_str = f"{lr:.2e}" if isinstance(lr, (float, int)) else str(lr)
    logger_instance.info("%s%s %-18s: %s", I, A, "Initial LR", lr_str)
    logger_instance.info("")

    # Tracking Section (only if configured)
    self._log_tracking_section(logger_instance, cfg)

    # Optimization Section (only if configured)
    self._log_optimization_section(logger_instance, cfg)

    # Export Section (only if configured)
    self._log_export_section(logger_instance, cfg)

    # Filesystem Section
    logger_instance.info("[FILESYSTEM]")
    logger_instance.info("%s%s %-18s: %s", I, A, "Run Root", paths.root.name)
    logger_instance.info(
        "%s%s %-18s: config.yaml, requirements.txt, git_info.txt", I, A, "Manifest"
    )

    # Closing separator
    logger_instance.info(LogStyle.HEAVY)
    logger_instance.info("")

DatasetMetadata

Bases: BaseModel

Immutable metadata container for a dataset entry.

Ensures dataset-specific constants are grouped and frozen throughout pipeline execution. Serves as static definition feeding into dynamic DatasetConfig.

Attributes:

Name Type Description
name str

Short identifier (e.g., 'pathmnist', 'galaxy10').

display_name str

Human-readable name for reporting.

md5_checksum str

MD5 hash for download integrity verification.

url str

Source URL for dataset download.

path Path

Local path to the .npz archive.

classes list[str]

Class labels in index order.

in_channels int

Number of image channels (1=grayscale, 3=RGB).

native_resolution int | None

Native pixel resolution (e.g., 28, 224).

mean tuple[float, ...]

Channel-wise normalization mean.

std tuple[float, ...]

Channel-wise normalization standard deviation.

is_anatomical bool

Whether images have fixed anatomical orientation.

is_texture_based bool

Whether classification relies on texture patterns.

normalization_info property

Formatted mean/std for reporting.

resolution_str property

Formatted resolution string (e.g., '28x28', '224x224').

num_classes property

Total number of target classes.

DatasetRegistryWrapper

Bases: BaseModel

Pydantic wrapper for multi-domain dataset registries.

Merges domain-specific registries (medical, space) based on the selected resolution and provides validated, deep-copied access to dataset metadata entries.

Attributes:

Name Type Description
resolution int

Target dataset resolution (28, 32, 64, 128, or 224).

registry dict[str, DatasetMetadata]

Deep-copied metadata registry for the selected resolution.

get_dataset(name)

Retrieves specific DatasetMetadata by name.

Parameters:

Name Type Description Default
name str

Dataset identifier

required

Returns:

Type Description
DatasetMetadata

Deep copy of DatasetMetadata

Raises:

Type Description
KeyError

If dataset not found in registry

Source code in orchard/core/metadata/wrapper.py
def get_dataset(self, name: str) -> DatasetMetadata:
    """
    Retrieves specific DatasetMetadata by name.

    Args:
        name: Dataset identifier

    Returns:
        Deep copy of DatasetMetadata

    Raises:
        KeyError: If dataset not found in registry
    """
    if name not in self.registry:
        available = list(self.registry.keys())
        raise KeyError(f"Dataset '{name}' not found. Available: {available}")

    return copy.deepcopy(self.registry[name])

RootOrchestrator(cfg, infra_manager=None, reporter=None, time_tracker=None, audit_saver=None, log_initializer=None, seed_setter=None, thread_applier=None, system_configurator=None, static_dir_setup=None, device_resolver=None, rank=None, local_rank=None)

Central coordinator for ML experiment lifecycle management.

Orchestrates the complete initialization sequence from configuration validation through resource provisioning to execution readiness. Implements a 7-phase initialization protocol (phases 1-6 eager, phase 7 deferred) with dependency injection for maximum testability.

The orchestrator follows the Single Responsibility Principle by delegating specialized tasks to injected dependencies while maintaining overall coordination. Uses the Context Manager pattern to guarantee resource cleanup even during failures.

Initialization Phases:

  1. Determinism: Global RNG seeding (Python, NumPy, PyTorch)
  2. Runtime Configuration: CPU thread affinity, system libraries
  3. Filesystem Provisioning: Dynamic workspace creation via RunPaths
  4. Logging Initialization: File-based persistent logging setup
  5. Config Persistence: YAML manifest export for auditability
  6. Infrastructure Guarding: OS-level resource locks (prevents race conditions)
  7. Environment Reporting: Comprehensive telemetry logging

Dependency Injection:

All external dependencies are injectable with sensible defaults:

  • infra_manager: OS resource management (locks, cleanup)
  • reporter: Environment telemetry engine
  • log_initializer: Logging setup strategy
  • seed_setter: RNG seeding function
  • thread_applier: CPU thread configuration
  • system_configurator: System library setup (matplotlib, etc)
  • static_dir_setup: Static directory creation
  • audit_saver: Config YAML + requirements snapshot persistence
  • device_resolver: Hardware device detection

Attributes:

Name Type Description
cfg Config

Validated global configuration (Single Source of Truth)

rank int

Global rank of this process (0 in single-process mode)

local_rank int

Node-local rank for GPU assignment (0 in single-process mode)

is_main_process bool

True for rank 0, False for non-main ranks

infra InfraManagerProtocol

Infrastructure resource manager

reporter ReporterProtocol

Environment telemetry engine

time_tracker TimeTrackerProtocol

Pipeline duration tracker

paths RunPaths | None

Session-specific directory structure (None on non-main ranks)

run_logger Logger | None

Active logger instance (None on non-main ranks)

repro_mode bool

Strict determinism flag

warn_only_mode bool

Warn-only mode for strict determinism

num_workers int

DataLoader worker processes

Example

cfg = Config.from_recipe(Path("recipes/config_mini_cnn.yaml")) with RootOrchestrator(cfg) as orch: ... device = orch.get_device() ... logger = orch.run_logger ... paths = orch.paths ... # Execute training pipeline with guaranteed cleanup

Notes:

  • Thread-safe: Single-instance locking via InfrastructureManager
  • Idempotent: initialize_core_services() is safe to call multiple times (subsequent calls return cached RunPaths without re-executing phases)
  • Auditable: All configuration saved to YAML in workspace
  • Deterministic: Reproducible experiments via strict seeding

Initializes orchestrator with dependency injection.

Parameters:

Name Type Description Default
cfg 'Config'

Validated global configuration (SSOT)

required
infra_manager InfraManagerProtocol | None

Infrastructure management handler (default: InfrastructureManager())

None
reporter ReporterProtocol | None

Environment reporting engine (default: Reporter())

None
time_tracker TimeTrackerProtocol | None

Pipeline duration tracker (default: TimeTracker())

None
audit_saver AuditSaverProtocol | None

Run-manifest persistence — config YAML + dependency snapshot (default: AuditSaver())

None
log_initializer Callable[..., Any] | None

Logging setup function (default: Logger.setup)

None
seed_setter Callable[..., Any] | None

RNG seeding function (default: set_seed)

None
thread_applier Callable[..., Any] | None

CPU thread configuration (default: apply_cpu_threads)

None
system_configurator Callable[..., Any] | None

System library setup (default: configure_system_libraries)

None
static_dir_setup Callable[..., Any] | None

Static directory creation (default: setup_static_directories)

None
device_resolver Callable[..., Any] | None

Device resolution (default: to_device_obj)

None
rank int | None

Global rank of this process (default: auto-detected from RANK env var). Rank 0 executes all phases; rank N skips filesystem, logging, config persistence, infrastructure locking, and reporting.

None
local_rank int | None

Node-local rank for GPU assignment (default: auto-detected from LOCAL_RANK env var). Used by device_resolver to select the correct GPU in multi-GPU distributed setups.

None
Source code in orchard/core/orchestrator.py
def __init__(
    self,
    cfg: "Config",
    infra_manager: InfraManagerProtocol | None = None,
    reporter: ReporterProtocol | None = None,
    time_tracker: TimeTrackerProtocol | None = None,
    audit_saver: AuditSaverProtocol | None = None,
    log_initializer: Callable[..., Any] | None = None,
    seed_setter: Callable[..., Any] | None = None,
    thread_applier: Callable[..., Any] | None = None,
    system_configurator: Callable[..., Any] | None = None,
    static_dir_setup: Callable[..., Any] | None = None,
    device_resolver: Callable[..., Any] | None = None,
    rank: int | None = None,
    local_rank: int | None = None,
) -> None:
    """
    Initializes orchestrator with dependency injection.

    Args:
        cfg: Validated global configuration (SSOT)
        infra_manager: Infrastructure management handler (default: InfrastructureManager())
        reporter: Environment reporting engine (default: Reporter())
        time_tracker: Pipeline duration tracker (default: TimeTracker())
        audit_saver: Run-manifest persistence — config YAML + dependency
            snapshot (default: AuditSaver())
        log_initializer: Logging setup function (default: Logger.setup)
        seed_setter: RNG seeding function (default: set_seed)
        thread_applier: CPU thread configuration (default: apply_cpu_threads)
        system_configurator: System library setup (default: configure_system_libraries)
        static_dir_setup: Static directory creation (default: setup_static_directories)
        device_resolver: Device resolution (default: to_device_obj)
        rank: Global rank of this process (default: auto-detected from RANK env var).
            Rank 0 executes all phases; rank N skips filesystem, logging,
            config persistence, infrastructure locking, and reporting.
        local_rank: Node-local rank for GPU assignment (default: auto-detected
            from LOCAL_RANK env var). Used by device_resolver to select the
            correct GPU in multi-GPU distributed setups.
    """
    self.cfg = cfg

    # Dependency injection: _resolve for objects, _resolve_callable for functions
    self.rank = _resolve(rank, get_rank)
    self.local_rank = _resolve(local_rank, get_local_rank)
    self.is_main_process = self.rank == 0
    self.infra = _resolve(infra_manager, InfrastructureManager)
    self.reporter = _resolve(reporter, Reporter)
    self.time_tracker = _resolve(time_tracker, TimeTracker)
    self._audit_saver = _resolve(audit_saver, AuditSaver)
    self._log_initializer = _resolve_callable(log_initializer, Logger.setup)
    self._seed_setter = _resolve_callable(seed_setter, set_seed)
    self._thread_applier = _resolve_callable(thread_applier, apply_cpu_threads)
    self._system_configurator = _resolve_callable(
        system_configurator, configure_system_libraries
    )
    self._static_dir_setup = _resolve_callable(static_dir_setup, setup_static_directories)
    self._device_resolver = _resolve_callable(device_resolver, to_device_obj)

    # Lazy initialization
    self._initialized: bool = False
    self._cleaned_up: bool = False
    self._infra_lock_acquired: bool = False
    self._applied_threads: int = 0
    self.paths: RunPaths | None = None
    self.run_logger: logging.Logger | None = None
    self._device_cache: torch.device | None = None

    # Policy extraction from SSOT
    self.repro_mode = self.cfg.hardware.use_deterministic_algorithms
    self.warn_only_mode = self.cfg.hardware.deterministic_warn_only
    self.num_workers = self.cfg.hardware.effective_num_workers

__enter__()

Context Manager entry — triggers the initialization sequence.

Starts the pipeline timer and delegates to initialize_core_services() for phases 1-6 (seeding, runtime config, filesystem, logging, config persistence, infrastructure locking, and device resolution). Phase 7 (environment reporting) is deferred to log_environment_report().

If any phase raises (including KeyboardInterrupt / SystemExit), cleanup() is called before re-raising to ensure partial resources (locks, file handles) are released even on failure.

Returns:

Type Description
'RootOrchestrator'

Fully initialized RootOrchestrator ready for pipeline execution.

Raises:

Type Description
BaseException

Re-raises any initialization error after cleanup.

Source code in orchard/core/orchestrator.py
def __enter__(self) -> "RootOrchestrator":
    """
    Context Manager entry — triggers the initialization sequence.

    Starts the pipeline timer and delegates to initialize_core_services()
    for phases 1-6 (seeding, runtime config, filesystem, logging,
    config persistence, infrastructure locking, and device resolution).
    Phase 7 (environment reporting) is deferred to log_environment_report().

    If any phase raises (including KeyboardInterrupt / SystemExit),
    cleanup() is called before re-raising to ensure partial resources
    (locks, file handles) are released even on failure.

    Returns:
        Fully initialized RootOrchestrator ready for pipeline execution.

    Raises:
        BaseException: Re-raises any initialization error after cleanup.
    """
    try:
        self.time_tracker.start()
        self.initialize_core_services()
        return self
    except BaseException:
        self.cleanup()
        raise

__exit__(exc_type, exc_val, exc_tb)

Context Manager exit — stops timer and guarantees resource teardown.

Invoked automatically when leaving the with block, whether the pipeline completed normally or raised an exception. Stops the timer, then delegates to cleanup() for infrastructure lock release and logging handler closure.

Error reporting is intentionally left to the caller (CLI layer), which has the user-facing context to log appropriate messages.

Returns False so that any exception propagates to the caller unchanged.

Parameters:

Name Type Description Default
exc_type type[BaseException] | None

Exception class if the block raised, else None.

required
exc_val BaseException | None

Exception instance if the block raised, else None.

required
exc_tb TracebackType | None

Traceback object if the block raised, else None.

required

Returns:

Type Description
Literal[False]

Always False — exceptions are never suppressed.

Source code in orchard/core/orchestrator.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> Literal[False]:
    """
    Context Manager exit — stops timer and guarantees resource teardown.

    Invoked automatically when leaving the ``with`` block, whether the
    pipeline completed normally or raised an exception. Stops the timer,
    then delegates to cleanup() for infrastructure lock release and
    logging handler closure.

    Error reporting is intentionally left to the caller (CLI layer),
    which has the user-facing context to log appropriate messages.

    Returns False so that any exception propagates to the caller unchanged.

    Args:
        exc_type: Exception class if the block raised, else None.
        exc_val: Exception instance if the block raised, else None.
        exc_tb: Traceback object if the block raised, else None.

    Returns:
        Always False — exceptions are never suppressed.
    """
    # Stop timer (duration already shown in pipeline summary)
    self.time_tracker.stop()

    self.cleanup()
    return False

initialize_core_services()

Executes linear sequence of environment initialization phases.

Synchronizes global state through phases 1-6, progressing from deterministic seeding to device resolution. Phase 7 (environment reporting) is deferred to log_environment_report().

In distributed mode (torchrun / DDP), only the main process (rank 0) executes phases 3-6 (filesystem, logging, config persistence, infra locking). All ranks execute phases 1-2 (seeding, threads) for identical RNG state and thread affinity, plus device resolution for DDP readiness (each rank binds to cuda:{local_rank}).

Idempotent: guarded by _initialized flag. If already initialized, returns existing RunPaths without re-executing any phase. This prevents orphaned directories (Phase 3 creates unique paths per call) and resource leaks (Phase 6 acquires filesystem locks).

Returns:

Type Description
RunPaths | None

Provisioned directory structure for rank 0, None for non-main ranks.

Raises:

Type Description
RuntimeError

If called after cleanup (single-use guard).

OrchardDeviceError

If device resolution fails at runtime.

Source code in orchard/core/orchestrator.py
def initialize_core_services(self) -> RunPaths | None:
    """
    Executes linear sequence of environment initialization phases.

    Synchronizes global state through phases 1-6, progressing from
    deterministic seeding to device resolution. Phase 7 (environment
    reporting) is deferred to log_environment_report().

    In distributed mode (torchrun / DDP), only the main process (rank 0)
    executes phases 3-6 (filesystem, logging, config persistence, infra
    locking).  All ranks execute phases 1-2 (seeding, threads) for
    identical RNG state and thread affinity, plus device resolution
    for DDP readiness (each rank binds to ``cuda:{local_rank}``).

    Idempotent: guarded by ``_initialized`` flag. If already initialized,
    returns existing RunPaths without re-executing any phase. This prevents
    orphaned directories (Phase 3 creates unique paths per call) and
    resource leaks (Phase 6 acquires filesystem locks).

    Returns:
        Provisioned directory structure for rank 0, None for non-main ranks.

    Raises:
        RuntimeError: If called after cleanup (single-use guard).
        OrchardDeviceError: If device resolution fails at runtime.
    """
    if self._cleaned_up:
        raise RuntimeError(
            "Cannot re-initialize after cleanup — "
            "RootOrchestrator is a single-use context manager"
        )
    if self._initialized:
        return self.paths

    # All ranks: deterministic seeding and thread configuration
    self._phase_1_determinism()
    applied_threads = self._phase_2_runtime_configuration()

    # Rank 0 only: filesystem, logging, persistence, locking, reporting
    if self.is_main_process:
        self._phase_3_filesystem_provisioning()
        self._phase_4_logging_initialization()

        # type guards: paths and logger are guaranteed after phases 3-4
        assert self.paths is not None, "Paths not initialized after phase 3"  # nosec B101
        assert self.run_logger is not None, "Logger not initialized after phase 4"  # nosec B101

        self._phase_5_run_manifest()
        self._phase_6_infrastructure_guarding()

        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            # resolve_device in HardwareConfig already handles GPU-unavailable
            # at config-time. If we reach here with device="cuda" in config,
            # CUDA was available then — a runtime failure (e.g. driver crash)
            # is unrecoverable. Silently falling back to CPU would waste hours
            # of compute with GPU-tuned hyperparameters (batch size, mixed
            # precision, etc.). Fail fast so the user can fix the environment.
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    else:
        logger.debug("Rank %d: skipping phases 3-6 (non-main process).", self.rank)
        # Non-main ranks still need their device for DDP readiness
        try:
            self._device_cache = self.get_device()
        except RuntimeError as e:
            raise OrchardDeviceError(
                f"{LogStyle.FAILURE} Device resolution failed at runtime "
                f"(config requested '{self.cfg.hardware.device}'): {e}"
            ) from e

    self._applied_threads = applied_threads
    self._initialized = True
    return self.paths

log_environment_report()

Emit the environment initialization report (phase 7).

Designed to be called explicitly by the CLI app after external services (e.g. MLflow tracker) have been started, so that all enter/exit log messages appear in the correct chronological order.

Source code in orchard/core/orchestrator.py
def log_environment_report(self) -> None:
    """
    Emit the environment initialization report (phase 7).

    Designed to be called explicitly by the CLI app after external
    services (e.g. MLflow tracker) have been started, so that all
    enter/exit log messages appear in the correct chronological order.
    """
    if self._initialized and self.is_main_process:
        self._phase_7_environment_report(self._applied_threads)

cleanup()

Releases system resources and removes execution lock file.

Guarantees clean state for subsequent runs by unlinking InfrastructureManager guards and closing logging handlers. Non-main ranks skip resource release (they never acquired locks or opened file-based log handlers).

Source code in orchard/core/orchestrator.py
def cleanup(self) -> None:
    """
    Releases system resources and removes execution lock file.

    Guarantees clean state for subsequent runs by unlinking
    InfrastructureManager guards and closing logging handlers.
    Non-main ranks skip resource release (they never acquired locks
    or opened file-based log handlers).
    """
    if not self.is_main_process:
        self._cleaned_up = True
        return

    cleanup_logger = self.run_logger or logging.getLogger(LOGGER_NAME)
    try:
        if self._infra_lock_acquired:
            self.infra.release_resources(self.cfg, logger=cleanup_logger)
            self._infra_lock_acquired = False
    except (OSError, RuntimeError) as e:
        cleanup_logger.error("Failed to release system lock: %s", e)

    self._close_logging_handlers()
    self._cleaned_up = True

get_device()

Resolves and caches optimal computation device (CUDA/CPU/MPS).

Returns:

Type Description
device

PyTorch device object for model execution

Source code in orchard/core/orchestrator.py
def get_device(self) -> torch.device:
    """
    Resolves and caches optimal computation device (CUDA/CPU/MPS).

    Returns:
        PyTorch device object for model execution
    """
    if self._device_cache is None:
        self._device_cache = self._device_resolver(
            device_str=self.cfg.hardware.device,
            local_rank=self.local_rank,
        )
    return self._device_cache

RunPaths

Bases: BaseModel

Immutable container for experiment-specific directory paths.

Implements atomic run isolation using a deterministic hashing strategy that combines DATE + DATASET_SLUG + MODEL_SLUG + CONFIG_HASH to create unique, collision-free directory structures. The Pydantic frozen model ensures paths cannot be modified after creation.

Attributes:

Name Type Description
run_id str

Unique identifier in format YYYYMMDD_dataset_model_hash.

dataset_slug str

Normalized lowercase dataset name.

architecture_slug str

Sanitized alphanumeric architecture identifier.

root Path

Base directory for all run artifacts.

figures Path

Directory for plots, confusion matrices, ROC curves.

checkpoints Path

Directory for saved checkpoints (.pth files).

reports Path

Directory for config mirrors, CSV/XLSX summaries.

logs Path

Directory for training logs and session output.

database Path

Directory for SQLite optimization studies.

exports Path

Directory for production exports (ONNX).

Example

Directory structure created::

outputs/20260208_organcmnist_efficientnetb0_a3f7c2/
├── figures/
├── checkpoints/
├── reports/
├── logs/
├── database/
└── exports/

best_model_path property

Path for the best-performing model checkpoint.

Returns:

Type Description
Path

Path in format: checkpoints/best_{architecture_slug}.pth

final_report_path property

Path for the comprehensive experiment summary report.

Returns:

Type Description
Path

Path to reports/training_summary.xlsx

create(dataset_slug, architecture_name, training_cfg, base_dir=None) classmethod

Factory method to create and initialize a unique run environment.

Creates a new RunPaths instance with a deterministic unique ID based on dataset, model, and training configuration. Physically creates all subdirectories on the filesystem.

Parameters:

Name Type Description Default
dataset_slug str

Dataset identifier (e.g., 'organcmnist'). Will be normalized to lowercase.

required
architecture_name str

Human-readable model name (e.g., 'EfficientNet-B0'). Special characters are stripped, converted to lowercase.

required
training_cfg dict[str, Any]

Dictionary of hyperparameters used for hash generation. Supports nested dicts, but only hashable primitives (int, float, str, bool, list) contribute to the hash.

required
base_dir Path | None

Custom base directory for outputs. Defaults to OUTPUTS_ROOT (typically './outputs').

None

Returns:

Type Description
'RunPaths'

Fully initialized RunPaths instance with all directories created.

Raises:

Type Description
ValueError

If dataset_slug or architecture_name is not a string.

Example

paths = RunPaths.create( ... dataset_slug="OrganCMNIST", ... architecture_name="EfficientNet-B0", ... training_cfg={"batch_size": 32, "lr": 0.001} ... ) paths.dataset_slug 'organcmnist' paths.architecture_slug 'efficientnetb0'

Source code in orchard/core/paths/run_paths.py
@classmethod
def create(
    cls,
    dataset_slug: str,
    architecture_name: str,
    training_cfg: dict[str, Any],
    base_dir: Path | None = None,
) -> "RunPaths":
    """
    Factory method to create and initialize a unique run environment.

    Creates a new RunPaths instance with a deterministic unique ID based
    on dataset, model, and training configuration. Physically creates all
    subdirectories on the filesystem.

    Args:
        dataset_slug: Dataset identifier (e.g., 'organcmnist'). Will be
            normalized to lowercase.
        architecture_name: Human-readable model name (e.g., 'EfficientNet-B0').
            Special characters are stripped, converted to lowercase.
        training_cfg: Dictionary of hyperparameters used for hash generation.
            Supports nested dicts, but only hashable primitives (int, float,
            str, bool, list) contribute to the hash.
        base_dir: Custom base directory for outputs. Defaults to OUTPUTS_ROOT
            (typically './outputs').

    Returns:
        Fully initialized RunPaths instance with all directories created.

    Raises:
        ValueError: If dataset_slug or architecture_name is not a string.

    Example:
        >>> paths = RunPaths.create(
        ...     dataset_slug="OrganCMNIST",
        ...     architecture_name="EfficientNet-B0",
        ...     training_cfg={"batch_size": 32, "lr": 0.001}
        ... )
        >>> paths.dataset_slug
        'organcmnist'
        >>> paths.architecture_slug
        'efficientnetb0'
    """
    if not isinstance(dataset_slug, str):
        raise ValueError(f"Expected string for dataset_slug but got {type(dataset_slug)}")
    ds_slug = dataset_slug.lower()

    if not isinstance(architecture_name, str):
        raise ValueError(
            f"Expected string for architecture_name but got {type(architecture_name)}"
        )
    a_slug = re.sub(r"[^a-zA-Z0-9]", "", architecture_name.lower())

    # Determine the unique run ID
    run_id = cls._generate_unique_id(ds_slug, a_slug, training_cfg)

    base = Path(base_dir or OUTPUTS_ROOT)
    root_path = base / run_id

    # No collision fallback needed: run_timestamp guarantees uniqueness

    instance = cls(
        run_id=run_id,
        dataset_slug=ds_slug,
        architecture_slug=a_slug,
        root=root_path,
        figures=root_path / "figures",
        checkpoints=root_path / "checkpoints",
        reports=root_path / "reports",
        logs=root_path / "logs",
        database=root_path / "database",
        exports=root_path / "exports",
    )

    instance._setup_run_directories()
    return instance

get_fig_path(filename)

Generate path for a visualization artifact.

Parameters:

Name Type Description Default
filename str

Name of the figure file (e.g., 'confusion_matrix.png').

required

Returns:

Type Description
Path

Absolute path within the figures directory.

Source code in orchard/core/paths/run_paths.py
def get_fig_path(self, filename: str) -> Path:
    """
    Generate path for a visualization artifact.

    Args:
        filename: Name of the figure file (e.g., 'confusion_matrix.png').

    Returns:
        Absolute path within the figures directory.
    """
    return self.figures / filename

get_config_path()

Get path for the archived run configuration.

Returns:

Type Description
Path

Path to reports/config.yaml

Source code in orchard/core/paths/run_paths.py
def get_config_path(self) -> Path:
    """
    Get path for the archived run configuration.

    Returns:
        Path to reports/config.yaml
    """
    return self.reports / "config.yaml"

get_db_path()

Get path for Optuna SQLite study database.

The database directory is created during RunPaths initialization, ensuring the parent directory exists before Optuna writes to it.

Returns:

Type Description
Path

Path to database/study.db

Source code in orchard/core/paths/run_paths.py
def get_db_path(self) -> Path:
    """
    Get path for Optuna SQLite study database.

    The database directory is created during RunPaths initialization,
    ensuring the parent directory exists before Optuna writes to it.

    Returns:
        Path to database/study.db
    """
    return self.database / "study.db"

__repr__()

Return string representation with run_id and root path.

Source code in orchard/core/paths/run_paths.py
def __repr__(self) -> str:
    """
    Return string representation with run_id and root path.
    """
    return f"RunPaths(run_id='{self.run_id}', root={self.root})"

apply_cpu_threads(num_workers)

Sets optimal compute threads to avoid resource contention.

Synchronizes PyTorch, OMP, and MKL thread counts.

Parameters:

Name Type Description Default
num_workers int

Active DataLoader workers

required

Returns:

Type Description
int

Number of threads assigned to compute operations

Source code in orchard/core/environment/hardware.py
def apply_cpu_threads(num_workers: int) -> int:
    """
    Sets optimal compute threads to avoid resource contention.

    Synchronizes PyTorch, OMP, and MKL thread counts.

    Args:
        num_workers: Active DataLoader workers

    Returns:
        Number of threads assigned to compute operations
    """
    total_cores = os.cpu_count() or 1
    optimal_threads = max(2, total_cores - num_workers)

    torch.set_num_threads(optimal_threads)
    os.environ["OMP_NUM_THREADS"] = str(optimal_threads)
    os.environ["MKL_NUM_THREADS"] = str(optimal_threads)

    return optimal_threads

configure_system_libraries()

Configures libraries for headless environments and reduces logging noise.

  • Sets Matplotlib to 'Agg' backend on Linux/Docker (no GUI)
  • Configures font embedding for PDF/PS exports
  • Suppresses verbose Matplotlib warnings
Source code in orchard/core/environment/hardware.py
def configure_system_libraries() -> None:
    """
    Configures libraries for headless environments and reduces logging noise.

    - Sets Matplotlib to 'Agg' backend on Linux/Docker (no GUI)
    - Configures font embedding for PDF/PS exports
    - Suppresses verbose Matplotlib warnings
    """
    is_linux = platform.system() == "Linux"
    is_docker = os.environ.get("IN_DOCKER") == "TRUE" or Path("/.dockerenv").exists()

    if is_linux or is_docker:
        matplotlib.use("Agg")
        matplotlib.rcParams["pdf.fonttype"] = 42
        matplotlib.rcParams["ps.fonttype"] = 42
        logging.getLogger("matplotlib").setLevel(logging.WARNING)

detect_best_device()

Detects the most performant accelerator (CUDA > MPS > CPU).

Returns:

Type Description
str

Device string: 'cuda', 'mps', or 'cpu'

Source code in orchard/core/environment/hardware.py
def detect_best_device() -> str:
    """
    Detects the most performant accelerator (CUDA > MPS > CPU).

    Returns:
        Device string: 'cuda', 'mps', or 'cpu'
    """
    if torch.cuda.is_available():
        return "cuda"
    if has_mps_backend():
        return "mps"
    return "cpu"

determine_tta_mode(use_tta, device_type, tta_mode='full')

Reports the active TTA ensemble policy.

The ensemble complexity is driven by the tta_mode config field, not by hardware. This guarantees identical predictions on CPU, CUDA and MPS for the same config, preserving cross-platform determinism.

Parameters:

Name Type Description Default
use_tta bool

Whether Test-Time Augmentation is enabled.

required
device_type str

The type of active device ('cpu', 'cuda', 'mps').

required
tta_mode str

Config-driven ensemble complexity ('full' or 'light').

'full'

Returns:

Type Description
str

Descriptive string of the TTA operation mode.

Source code in orchard/core/environment/policy.py
def determine_tta_mode(use_tta: bool, device_type: str, tta_mode: str = "full") -> str:
    """
    Reports the active TTA ensemble policy.

    The ensemble complexity is driven by the ``tta_mode`` config field,
    not by hardware.  This guarantees identical predictions on CPU, CUDA
    and MPS for the same config, preserving cross-platform determinism.

    Args:
        use_tta: Whether Test-Time Augmentation is enabled.
        device_type: The type of active device ('cpu', 'cuda', 'mps').
        tta_mode: Config-driven ensemble complexity ('full' or 'light').

    Returns:
        Descriptive string of the TTA operation mode.
    """
    if not use_tta:
        return "DISABLED"

    mode_label = tta_mode.upper()
    return f"{mode_label} ({device_type.upper()})"

ensure_single_instance(lock_file, logger)

Implements a cooperative advisory lock to guarantee singleton execution.

Leverages Unix 'flock' to create an exclusive lock on a sentinel file. If the lock cannot be acquired immediately, it indicates another instance is active, and the process will abort to prevent filesystem or GPU race conditions.

In distributed mode (torchrun / DDP), only the main process (rank 0) acquires the lock. Non-main ranks skip locking entirely to avoid deadlocking against the rank-0 held lock.

Parameters:

Name Type Description Default
lock_file Path

Filesystem path where the lock sentinel will reside.

required
logger Logger

Active logger for reporting acquisition status.

required

Raises:

Type Description
SystemExit

If an existing lock is detected on the system.

Source code in orchard/core/environment/guards.py
def ensure_single_instance(lock_file: Path, logger: logging.Logger) -> None:
    """
    Implements a cooperative advisory lock to guarantee singleton execution.

    Leverages Unix 'flock' to create an exclusive lock on a sentinel file.
    If the lock cannot be acquired immediately, it indicates another instance
    is active, and the process will abort to prevent filesystem or GPU
    race conditions.

    In distributed mode (torchrun / DDP), only the main process (rank 0)
    acquires the lock.  Non-main ranks skip locking entirely to avoid
    deadlocking against the rank-0 held lock.

    Args:
        lock_file (Path): Filesystem path where the lock sentinel will reside.
        logger (logging.Logger): Active logger for reporting acquisition status.

    Raises:
        SystemExit: If an existing lock is detected on the system.
    """
    global _lock_fd

    # In distributed mode, only rank 0 manages the lock
    if not is_main_process():
        logger.debug("Rank %d: skipping lock acquisition (non-main process).", os.getpid())
        return

    # Locking is currently only supported on Unix-like systems via fcntl
    if platform.system() in ("Linux", "Darwin") and HAS_FCNTL:
        f: IO[str] | None = None
        try:
            lock_file.parent.mkdir(parents=True, exist_ok=True)
            f = open(lock_file, "a")

            # Attempt to acquire an exclusive lock without blocking
            fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
            _lock_fd = f
            logger.info("  %s System lock acquired", LogStyle.ARROW)

        except (IOError, BlockingIOError):
            if f is not None:
                f.close()
            logger.error(
                " %s CRITICAL: Another instance is already running. Aborting.",
                LogStyle.WARNING,
            )
            sys.exit(1)

get_accelerator_name()

Returns accelerator model name (CUDA GPU or Apple Silicon) or empty string.

Source code in orchard/core/environment/hardware.py
def get_accelerator_name() -> str:
    """Returns accelerator model name (CUDA GPU or Apple Silicon) or empty string."""
    if torch.cuda.is_available():
        return torch.cuda.get_device_name(0)
    if has_mps_backend():
        return f"Apple Silicon ({platform.machine()})"
    return ""

get_num_workers()

Determines optimal DataLoader workers with RAM stability cap.

Returns:

Type Description
int

Recommended number of subprocesses (2-8 range)

Source code in orchard/core/environment/hardware.py
def get_num_workers() -> int:
    """
    Determines optimal DataLoader workers with RAM stability cap.

    Returns:
        Recommended number of subprocesses (2-8 range)
    """
    total_cores = os.cpu_count() or _MIN_WORKERS
    if total_cores <= 4:
        return _MIN_WORKERS
    return min(total_cores // 2, _MAX_WORKERS)

has_mps_backend()

Check if MPS backend is available (macOS Apple Silicon).

Source code in orchard/core/environment/hardware.py
def has_mps_backend() -> bool:
    """Check if MPS backend is available (macOS Apple Silicon)."""
    return hasattr(torch.backends, "mps") and torch.backends.mps.is_available()

release_single_instance(lock_file)

Safely releases the system lock and unlinks the sentinel file.

Guarantees that the file descriptor is closed and the lock is returned to the OS. Designed to be called during normal shutdown or within exception handling blocks.

Parameters:

Name Type Description Default
lock_file Path

Filesystem path to the sentinel file to be removed.

required
Source code in orchard/core/environment/guards.py
def release_single_instance(lock_file: Path) -> None:
    """
    Safely releases the system lock and unlinks the sentinel file.

    Guarantees that the file descriptor is closed and the lock is returned
    to the OS. Designed to be called during normal shutdown or within
    exception handling blocks.

    Args:
        lock_file (Path): Filesystem path to the sentinel file to be removed.
    """
    global _lock_fd

    if _lock_fd:
        try:
            if HAS_FCNTL:
                try:
                    fcntl.flock(_lock_fd, fcntl.LOCK_UN)
                except (OSError, IOError):
                    # Unlock may fail if process is already terminated
                    pass

            try:
                _lock_fd.close()
            except (OSError, IOError):  # pragma: no cover
                # Close may fail if fd is already closed
                pass
        finally:
            _lock_fd = None

    # Attempt unlink directly to avoid TOCTOU race condition
    # (file could be deleted between exists() check and unlink() call)
    try:
        lock_file.unlink()
    except FileNotFoundError:
        # File was already removed by another process - expected in race conditions
        pass
    except OSError:  # pragma: no cover
        # Other OS errors (permissions, etc.) - safe to ignore during cleanup
        pass

set_seed(seed, strict=False, warn_only=False)

Seed all PRNGs and optionally enforce deterministic algorithms.

Seeds Python's random, NumPy, and PyTorch (CPU + CUDA + MPS). In strict mode, additionally forces deterministic kernels at the cost of reduced performance.

Note

PYTHONHASHSEED is set here for completeness, but CPython reads it only at interpreter startup — the runtime assignment has no effect on the running process. The project Dockerfile handles this correctly (ENV PYTHONHASHSEED=0). For bare-metal runs, prefix the command: PYTHONHASHSEED=42 orchard run <recipe>. Full bit-exact determinism additionally requires strict=True and num_workers=0 (both enforced automatically in Docker via DOCKER_REPRODUCIBILITY_MODE).

Parameters:

Name Type Description Default
seed int

The seed value to set across all PRNGs.

required
strict bool

If True, enforces deterministic algorithms (5-30% perf penalty).

False
warn_only bool

If True (and strict=True), uses warn-only mode for torch.use_deterministic_algorithms — logs warnings instead of raising errors for non-deterministic ops. Ignored when strict is False.

False
Source code in orchard/core/environment/reproducibility.py
def set_seed(seed: int, strict: bool = False, warn_only: bool = False) -> None:  # pragma: no mutate
    """
    Seed all PRNGs and optionally enforce deterministic algorithms.

    Seeds Python's ``random``, NumPy, and PyTorch (CPU + CUDA + MPS).
    In strict mode, additionally forces deterministic kernels at the
    cost of reduced performance.

    Note:
        ``PYTHONHASHSEED`` is set here for completeness, but CPython reads it
        only at interpreter startup — the runtime assignment has no effect on
        the running process. The project Dockerfile handles this correctly
        (``ENV PYTHONHASHSEED=0``). For bare-metal runs, prefix the command:
        ``PYTHONHASHSEED=42 orchard run <recipe>``. Full bit-exact determinism
        additionally requires ``strict=True`` and ``num_workers=0`` (both
        enforced automatically in Docker via ``DOCKER_REPRODUCIBILITY_MODE``).

    Args:
        seed: The seed value to set across all PRNGs.
        strict: If True, enforces deterministic algorithms (5-30% perf penalty).
        warn_only: If True (and strict=True), uses warn-only mode for
            ``torch.use_deterministic_algorithms`` — logs warnings instead of
            raising errors for non-deterministic ops. Ignored when strict
            is False.
    """
    random.seed(seed)

    # Best-effort: effective only if set before interpreter startup (see Note)
    already_set = os.environ.get("PYTHONHASHSEED") == str(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    if strict and not already_set:
        _stacklevel = 2  # pragma: no mutate
        warnings.warn(
            f"PYTHONHASHSEED={seed} set at runtime, but CPython reads it only at "
            "interpreter startup. For bare-metal determinism: "
            f"PYTHONHASHSEED={seed} orchard run <recipe>",
            stacklevel=_stacklevel,
        )

    np.random.seed(seed)
    torch.manual_seed(seed)

    has_cuda = torch.cuda.is_available()
    has_mps = hasattr(torch.backends, "mps") and torch.backends.mps.is_available()

    if has_cuda:
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

        if strict:
            os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"

    if has_mps:
        torch.mps.manual_seed(seed)

    if strict:
        if has_mps:
            _stacklevel = 2  # pragma: no mutate
            warnings.warn(
                "MPS backend has partial determinism support in PyTorch. "
                "Some operations may not have deterministic implementations. "
                "Consider using CPU for fully deterministic experiments.",
                stacklevel=_stacklevel,
            )
        torch.use_deterministic_algorithms(True, warn_only=warn_only)

to_device_obj(device_str, local_rank=0)

Converts device string to PyTorch device object.

In distributed multi-GPU setups, uses local_rank to select the correct GPU and calls torch.cuda.set_device() for CUDA affinity.

Parameters:

Name Type Description Default
device_str str

'cuda', 'cpu', or 'auto' (auto-selects best available)

required
local_rank int

Node-local process rank for GPU assignment (default 0). Used to select cuda:{local_rank} in multi-GPU setups. Ignored for non-CUDA devices.

0

Returns:

Type Description
device

torch.device object

Raises:

Type Description
ValueError

If CUDA requested but unavailable, or invalid device string

Source code in orchard/core/environment/hardware.py
def to_device_obj(device_str: str, local_rank: int = 0) -> torch.device:
    """
    Converts device string to PyTorch device object.

    In distributed multi-GPU setups, uses ``local_rank`` to select the
    correct GPU and calls ``torch.cuda.set_device()`` for CUDA affinity.

    Args:
        device_str: 'cuda', 'cpu', or 'auto' (auto-selects best available)
        local_rank: Node-local process rank for GPU assignment (default 0).
            Used to select ``cuda:{local_rank}`` in multi-GPU setups.
            Ignored for non-CUDA devices.

    Returns:
        torch.device object

    Raises:
        ValueError: If CUDA requested but unavailable, or invalid device string
    """
    if device_str == "auto":
        device_str = detect_best_device()

    if device_str == "cuda" and not torch.cuda.is_available():
        raise ValueError("CUDA requested but not available")

    if device_str not in ("cuda", "cpu", "mps"):
        raise ValueError(f"Unsupported device: {device_str}")

    if device_str == "cuda" and local_rank > 0:
        torch.cuda.set_device(local_rank)
        return torch.device(f"cuda:{local_rank}")

    return torch.device(device_str)

worker_init_fn(worker_id)

Initialize PRNGs for a DataLoader worker subprocess.

Each worker receives a unique but deterministic sub-seed derived from the parent seed, ensuring augmentation diversity while maintaining reproducibility across runs.

Called automatically by DataLoader when num_workers > 0. In strict reproducibility mode, num_workers is forced to 0 by HardwareConfig, so this function is never invoked.

Parameters:

Name Type Description Default
worker_id int

Subprocess ID provided by DataLoader (0-based).

required
Source code in orchard/core/environment/reproducibility.py
def worker_init_fn(worker_id: int) -> None:
    """
    Initialize PRNGs for a DataLoader worker subprocess.

    Each worker receives a unique but deterministic sub-seed derived from
    the parent seed, ensuring augmentation diversity while maintaining
    reproducibility across runs.

    Called automatically by DataLoader when ``num_workers > 0``.
    In strict reproducibility mode, ``num_workers`` is forced to 0 by
    HardwareConfig, so this function is never invoked.

    Args:
        worker_id: Subprocess ID provided by DataLoader (0-based).
    """
    worker_info = torch.utils.data.get_worker_info()
    if worker_info is None:
        return

    # Derive unique sub-seed: deterministic per (parent_seed, worker_id)
    base_seed = worker_info.seed
    seed = (base_seed + worker_id) % 2**32

    # Synchronize all major PRNGs for this worker
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

load_config_from_yaml(yaml_path)

Loads a raw configuration dictionary from a YAML file.

Parameters:

Name Type Description Default
yaml_path Path

Path to the source YAML file.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: The loaded configuration manifest.

Raises:

Type Description
FileNotFoundError

If the specified path does not exist.

Source code in orchard/core/io/serialization.py
def load_config_from_yaml(yaml_path: Path) -> dict[str, Any]:
    """
    Loads a raw configuration dictionary from a YAML file.

    Args:
        yaml_path (Path): Path to the source YAML file.

    Returns:
        dict[str, Any]: The loaded configuration manifest.

    Raises:
        FileNotFoundError: If the specified path does not exist.
    """
    if not yaml_path.exists():
        raise FileNotFoundError(f"YAML configuration file not found at: {yaml_path}")

    # Equivalent mutants: "r" is Python's default open mode; cast() has no runtime effect.
    with open(yaml_path, "r", encoding="utf-8") as f:  # pragma: no mutate
        return cast(dict[str, Any], yaml.safe_load(f))  # pragma: no mutate

load_model_weights(model, path, device)

Restores model state from a checkpoint using secure weight-only loading.

Loads PyTorch state_dict from disk with security hardening (weights_only=True) to prevent arbitrary code execution. Automatically maps tensors to target device.

Parameters:

Name Type Description Default
model Module

The model instance to populate with loaded weights

required
path Path

Filesystem path to the checkpoint file (.pth)

required
device device

Target device for mapping the loaded tensors

required

Raises:

Type Description
OrchardExportError

If the checkpoint file does not exist at path

Example

model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture) checkpoint_path = Path("outputs/run_123/checkpoints/best_model.pth") load_model_weights(model, checkpoint_path, device)

Source code in orchard/core/io/checkpoints.py
def load_model_weights(model: torch.nn.Module, path: Path, device: torch.device) -> None:
    """
    Restores model state from a checkpoint using secure weight-only loading.

    Loads PyTorch state_dict from disk with security hardening (weights_only=True)
    to prevent arbitrary code execution. Automatically maps tensors to target device.

    Args:
        model: The model instance to populate with loaded weights
        path: Filesystem path to the checkpoint file (.pth)
        device: Target device for mapping the loaded tensors

    Raises:
        OrchardExportError: If the checkpoint file does not exist at path

    Example:
        >>> model = get_model(device, dataset_cfg=cfg.dataset, arch_cfg=cfg.architecture)
        >>> checkpoint_path = Path("outputs/run_123/checkpoints/best_model.pth")
        >>> load_model_weights(model, checkpoint_path, device)
    """
    if not path.exists():
        raise OrchardExportError(f"Model checkpoint not found at: {path}")

    # weights_only=True is used for security (avoids arbitrary code execution)
    state_dict = torch.load(path, map_location=device, weights_only=True)

    # Validate architecture compatibility before loading
    model_keys = set(model.state_dict().keys())
    checkpoint_keys = set(state_dict.keys())
    if model_keys != checkpoint_keys:
        missing = model_keys - checkpoint_keys
        unexpected = checkpoint_keys - model_keys
        parts = []
        if missing:
            parts.append(f"missing keys: {sorted(missing)[:5]}")
        if unexpected:
            parts.append(f"unexpected keys: {sorted(unexpected)[:5]}")
        raise OrchardExportError(
            f"Checkpoint architecture mismatch ({', '.join(parts)}). "
            "Ensure the config matches the architecture used during training."
        )

    model.load_state_dict(state_dict)

md5_checksum(path, chunk_size=_MD5_CHUNK_SIZE)

Calculates the MD5 checksum of a file using buffered reading.

Parameters:

Name Type Description Default
path Path

Path to the file to verify.

required
chunk_size int

Read buffer size in bytes.

_MD5_CHUNK_SIZE

Returns:

Name Type Description
str str

The calculated hexadecimal MD5 hash.

Source code in orchard/core/io/data_io.py
def md5_checksum(path: Path, chunk_size: int = _MD5_CHUNK_SIZE) -> str:
    """
    Calculates the MD5 checksum of a file using buffered reading.

    Args:
        path (Path): Path to the file to verify.
        chunk_size (int): Read buffer size in bytes.

    Returns:
        str: The calculated hexadecimal MD5 hash.
    """
    hash_md5 = hashlib.md5(usedforsecurity=False)  # pragma: no mutate
    with path.open("rb") as f:
        for chunk in iter(lambda: f.read(chunk_size), b""):  # pragma: no mutate
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

save_config_as_yaml(data, yaml_path)

Serializes and persists configuration data to a YAML file.

This function coordinates the extraction of data from potentially complex objects (supporting Pydantic models, custom portable manifests, or raw dicts), applies recursive sanitization, and performs an atomic write to disk.

Parameters:

Name Type Description Default
data Any

The configuration object to save. Supports objects with 'dump_portable()' or 'model_dump()' methods, or standard dictionaries.

required
yaml_path Path

The destination filesystem path.

required

Returns:

Name Type Description
Path Path

The confirmed path where the YAML was successfully written.

Raises:

Type Description
ValueError

If the data structure cannot be serialized.

OSError

If a filesystem-level error occurs (permissions, disk full).

Source code in orchard/core/io/serialization.py
def save_config_as_yaml(data: Any, yaml_path: Path) -> Path:
    """
    Serializes and persists configuration data to a YAML file.

    This function coordinates the extraction of data from potentially complex
    objects (supporting Pydantic models, custom portable manifests, or raw dicts),
    applies recursive sanitization, and performs an atomic write to disk.

    Args:
        data (Any): The configuration object to save. Supports objects with
            'dump_portable()' or 'model_dump()' methods, or standard dictionaries.
        yaml_path (Path): The destination filesystem path.

    Returns:
        Path: The confirmed path where the YAML was successfully written.

    Raises:
        ValueError: If the data structure cannot be serialized.
        OSError: If a filesystem-level error occurs (permissions, disk full).
    """
    logger = logging.getLogger(LOGGER_NAME)

    # 1. Extraction & Sanitization Phase
    try:
        # Priority 1: Custom portability protocol
        if hasattr(data, "dump_portable"):
            raw_dict = data.dump_portable()

        # Priority 2: Pydantic model protocol
        elif hasattr(data, "model_dump"):
            try:
                raw_dict = data.model_dump(mode="json")
            except (TypeError, ValueError):  # pragma: no cover
                # Fallback for older Pydantic V2 versions or complex types
                raw_dict = data.model_dump()  # pragma: no mutate

        # Priority 3: Raw dictionary or other types
        else:
            raw_dict = data

        final_data = _sanitize_for_yaml(raw_dict)

    except Exception as e:
        logger.error("Serialization failed: object structure is incompatible. Error: %s", e)
        raise ValueError(f"Could not serialize configuration object: {e}") from e

    # 2. Persistence Phase (Atomic Write)
    try:
        _persist_yaml_atomic(final_data, yaml_path)
        logger.debug("Configuration frozen at → %s", yaml_path.name)
        return yaml_path

    except OSError as e:
        logger.error("IO Error: Could not write YAML to %s. Error: %s", yaml_path, e)
        raise

validate_npz_keys(data)

Validates that the loaded NPZ dataset contains all required dataset keys.

Parameters:

Name Type Description Default
data NpzFile

The loaded NPZ file object.

required

Raises:

Type Description
OrchardDatasetError

If any required key (images/labels) is missing.

Source code in orchard/core/io/data_io.py
def validate_npz_keys(data: np.lib.npyio.NpzFile) -> None:
    """
    Validates that the loaded NPZ dataset contains all required dataset keys.

    Args:
        data (np.lib.npyio.NpzFile): The loaded NPZ file object.

    Raises:
        OrchardDatasetError: If any required key (images/labels) is missing.
    """
    missing = _REQUIRED_NPZ_KEYS - set(data.files)
    if missing:
        found = list(data.files)
        raise OrchardDatasetError(
            f"NPZ archive is corrupted or invalid. Missing keys: {missing} | Found keys: {found}"
        )

log_optimization_header(cfg, logger_instance=None)

Log Optuna optimization configuration details.

Logs search-specific parameters only (dataset/model already shown in environment).

Parameters:

Name Type Description Default
cfg 'Config'

Configuration with optuna settings

required
logger_instance Logger | None

Logger instance to use (defaults to module logger)

None
Source code in orchard/core/logger/progress.py
def log_optimization_header(cfg: "Config", logger_instance: logging.Logger | None = None) -> None:
    """
    Log Optuna optimization configuration details.

    Logs search-specific parameters only (dataset/model already shown in environment).

    Args:
        cfg: Configuration with optuna settings
        logger_instance: Logger instance to use (defaults to module logger)
    """
    log = logger_instance or logger

    # Search configuration (no duplicate header - phase header already shown)
    log.info("")
    I = LogStyle.INDENT  # noqa: E741  # pragma: no mutate
    A = LogStyle.ARROW  # pragma: no mutate
    log.info("%s%s Dataset      : %s", I, A, cfg.dataset.dataset_name)
    model_search = "Enabled" if cfg.optuna.enable_model_search else "Disabled"  # pragma: no mutate
    log.info("%s%s Model Search : %s", I, A, model_search)
    if cfg.optuna.model_pool is not None:
        log.info("%s%s Model Pool   : %s", I, A, ", ".join(cfg.optuna.model_pool))
    log.info("%s%s Search Space : %s", I, A, cfg.optuna.search_space_preset)
    log.info("%s%s Trials       : %s", I, A, cfg.optuna.n_trials)
    log.info("%s%s Epochs/Trial : %s", I, A, cfg.optuna.epochs)
    log.info("%s%s Metric       : %s", I, A, cfg.training.monitor_metric)
    pruning = "Enabled" if cfg.optuna.enable_pruning else "Disabled"  # pragma: no mutate
    log.info("%s%s Pruning      : %s", I, A, pruning)

    if cfg.optuna.enable_early_stopping:
        threshold = cfg.optuna.early_stopping_threshold or "auto"  # pragma: no mutate
        log.info(
            "%s%s Early Stop   : Enabled (threshold=%s, patience=%s)",
            I,
            A,
            threshold,
            cfg.optuna.early_stopping_patience,
        )

    log.info("")

log_optimization_summary(study, cfg, device, paths, logger_instance=None)

Log optimization study completion summary.

Parameters:

Name Type Description Default
study 'optuna.Study'

Completed Optuna study

required
cfg 'Config'

Configuration object

required
device 'torch.device'

PyTorch device used

required
paths 'RunPaths'

Run paths for artifacts

required
logger_instance Logger | None

Logger instance to use (defaults to module logger)

None
Source code in orchard/core/logger/progress.py
def log_optimization_summary(
    study: "optuna.Study",
    cfg: "Config",
    device: "torch.device",
    paths: "RunPaths",
    logger_instance: logging.Logger | None = None,
) -> None:
    """
    Log optimization study completion summary.

    Args:
        study: Completed Optuna study
        cfg: Configuration object
        device: PyTorch device used
        paths: Run paths for artifacts
        logger_instance: Logger instance to use (defaults to module logger)
    """
    log = logger_instance or logger
    completed, pruned, failed = _count_trial_states(study)

    I = LogStyle.INDENT  # noqa: E741  # pragma: no mutate
    A = LogStyle.ARROW  # pragma: no mutate
    S = LogStyle.SUCCESS  # pragma: no mutate
    W = LogStyle.WARNING  # pragma: no mutate

    Reporter.log_phase_header(log, "OPTIMIZATION SUMMARY", LogStyle.DOUBLE)  # pragma: no mutate
    log.info("%s%s Dataset        : %s", I, A, cfg.dataset.dataset_name)
    log.info("%s%s Search Space   : %s", I, A, cfg.optuna.search_space_preset)
    log.info("%s%s Total Trials   : %d", I, A, len(study.trials))
    log.info("%s%s Completed      : %d", I, S, len(completed))
    log.info("%s%s Pruned         : %d", I, A, len(pruned))

    if failed:
        log.info("%s%s Failed         : %d", I, W, len(failed))

    if completed:
        try:
            log.info(
                "%s%s Best %-9s : %.6f",
                I,
                S,
                cfg.training.monitor_metric.upper(),
                study.best_value,
            )
            log.info("%s%s Best Trial     : %d", I, S, study.best_trial.number)
        except ValueError:  # pragma: no cover
            # fmt: off
            log.error("%s%s Best trial lookup failed (check study integrity)", I, W)  # pragma: no mutate
            # fmt: on
    else:
        log.warning("%s%s No trials completed", I, W)

    log.info("%s%s Device         : %s", I, A, str(device).upper())
    log.info("%s%s Artifacts      : %s", I, A, Path(paths.root).name)
    log.info(LogStyle.DOUBLE)
    log.info("")

log_pipeline_summary(test_acc, macro_f1, best_model_path, run_dir, duration, test_auc=None, onnx_path=None, logger_instance=None)

Log final pipeline completion summary.

Called at the end of the pipeline after all phases complete. Consolidates key metrics and artifact locations.

Parameters:

Name Type Description Default
test_acc float

Final test accuracy

required
macro_f1 float

Final macro F1 score

required
best_model_path Path

Path to best model checkpoint

required
run_dir Path

Root directory for this run

required
duration str

Human-readable duration string

required
test_auc float | None

Final test AUC (if available)

None
onnx_path Path | None

Path to ONNX export (if performed)

None
logger_instance Logger | None

Logger instance to use (defaults to module logger)

None
Source code in orchard/core/logger/progress.py
def log_pipeline_summary(
    test_acc: float,
    macro_f1: float,
    best_model_path: Path,
    run_dir: Path,
    duration: str,
    test_auc: float | None = None,
    onnx_path: Path | None = None,
    logger_instance: logging.Logger | None = None,
) -> None:
    """
    Log final pipeline completion summary.

    Called at the end of the pipeline after all phases complete.
    Consolidates key metrics and artifact locations.

    Args:
        test_acc: Final test accuracy
        macro_f1: Final macro F1 score
        best_model_path: Path to best model checkpoint
        run_dir: Root directory for this run
        duration: Human-readable duration string
        test_auc: Final test AUC (if available)
        onnx_path: Path to ONNX export (if performed)
        logger_instance: Logger instance to use (defaults to module logger)
    """
    log = logger_instance or logger

    I = LogStyle.INDENT  # noqa: E741  # pragma: no mutate
    A = LogStyle.ARROW  # pragma: no mutate
    S = LogStyle.SUCCESS  # pragma: no mutate

    Reporter.log_phase_header(log, "PIPELINE COMPLETE", LogStyle.DOUBLE)  # pragma: no mutate
    log.info("%s%s Test Accuracy  : %7.2f%%", I, S, test_acc * 100)
    log.info("%s%s Macro F1       : %8.4f", I, S, macro_f1)
    if test_auc is not None:
        log.info("%s%s Test AUC       : %8.4f", I, S, test_auc)
    log.info("%s%s Best Model     : %s", I, A, Path(best_model_path).name)
    if onnx_path:
        log.info("%s%s ONNX Export    : %s", I, A, Path(onnx_path).name)
    log.info("%s%s Run Directory  : %s", I, A, Path(run_dir).name)
    log.info("%s%s Duration       : %s", I, A, duration)
    log.info(LogStyle.DOUBLE)

log_trial_start(trial_number, params, logger_instance=None)

Log trial start with formatted parameters (grouped by category).

Parameters:

Name Type Description Default
trial_number int

Trial index

required
params dict[str, Any]

Sampled hyperparameters

required
logger_instance Logger | None

Logger instance to use (defaults to module logger)

None
Source code in orchard/core/logger/progress.py
def log_trial_start(
    trial_number: int, params: dict[str, Any], logger_instance: logging.Logger | None = None
) -> None:
    """
    Log trial start with formatted parameters (grouped by category).

    Args:
        trial_number: Trial index
        params: Sampled hyperparameters
        logger_instance: Logger instance to use (defaults to module logger)
    """
    log = logger_instance or logger

    log.info(LogStyle.LIGHT)
    log.info("[Trial %d Hyperparameters]", trial_number)

    categories = {
        "Optimization": ["learning_rate", "weight_decay", "momentum", "min_lr"],
        "Loss": ["criterion_type", "focal_gamma", "label_smoothing"],
        "Regularization": ["mixup_alpha", "dropout"],
        "Scheduling": ["scheduler_type", "scheduler_patience", "batch_size"],
        "Augmentation": ["rotation_angle", "jitter_val", "min_scale"],
        "Architecture": ["model_name", "pretrained", "weight_variant"],
    }

    for category_name, keys in categories.items():
        category_params = {k: params[k] for k in keys if k in params}
        if category_params:
            log.info("%s[%s]", LogStyle.INDENT, category_name)
            for key, value in category_params.items():
                log.info(
                    "%s%s %-20s : %s",
                    LogStyle.DOUBLE_INDENT,
                    LogStyle.BULLET,
                    key,
                    _format_param_value(value),
                )

    log.info(LogStyle.LIGHT)

get_project_root()

Dynamically locate the project root by searching for anchor files.

Traverses upward from current file's directory until finding a marker file (.git or pyproject.toml). Supports Docker environments via IN_DOCKER environment variable override.

Returns:

Type Description
Path

Resolved absolute Path to the project root directory.

Note:

- IN_DOCKER=1 or IN_DOCKER=TRUE returns /app
- Falls back to fixed parent traversal if no markers found
Source code in orchard/core/paths/root.py
def get_project_root() -> Path:
    """
    Dynamically locate the project root by searching for anchor files.

    Traverses upward from current file's directory until finding a marker
    file (.git or pyproject.toml). Supports Docker environments via
    IN_DOCKER environment variable override.

    Returns:
        Resolved absolute Path to the project root directory.

    Note:

        - IN_DOCKER=1 or IN_DOCKER=TRUE returns /app
        - Falls back to fixed parent traversal if no markers found
    """
    # Environment override for Docker setups
    if str(os.getenv("IN_DOCKER")).upper() in ("1", "TRUE"):
        return Path("/app").resolve()

    # Start from the directory of this file
    current_path = Path(__file__).resolve().parent

    # Look for markers that define the project root
    # Note: .git is most reliable; README.md alone can exist in subdirectories
    root_markers = {".git", "pyproject.toml"}

    for parent in [current_path] + list(current_path.parents):
        if any((parent / marker).exists() for marker in root_markers):
            return parent

    # Fallback if no markers are found
    try:
        if len(current_path.parents) >= 3:
            return current_path.parents[2]
    except IndexError:  # pragma: no cover
        pass

    # Final fallback
    return current_path.parent.parent  # pragma: no cover

setup_static_directories()

Ensure core project directories exist at startup.

Creates DATASET_DIR and OUTPUTS_ROOT if they do not exist, preventing runtime errors during data fetching or artifact creation. Uses mkdir(parents=True, exist_ok=True) for idempotent operation.

Source code in orchard/core/paths/root.py
def setup_static_directories() -> None:
    """
    Ensure core project directories exist at startup.

    Creates DATASET_DIR and OUTPUTS_ROOT if they do not exist, preventing
    runtime errors during data fetching or artifact creation. Uses
    mkdir(parents=True, exist_ok=True) for idempotent operation.
    """
    for directory in STATIC_DIRS:
        directory.mkdir(parents=True, exist_ok=True)