Skip to content

environment

orchard.core.environment

Environment & Infrastructure Abstraction Layer.

This package centralizes hardware acceleration discovery, system-level optimizations, and reproducibility protocols. It provides a unified interface to ensure consistent execution across Local, HPC, and Docker environments.

DuplicateProcessCleaner(script_name=None)

Scans and optionally terminates duplicate instances of the current script.

Attributes:

Name Type Description
script_path str

Absolute path of the script to match against running processes.

current_pid int

PID of the current process.

Source code in orchard/core/environment/guards.py
def __init__(self, script_name: str | None = None) -> None:
    self.script_path = str(Path(script_name or sys.argv[0]).resolve())
    self.current_pid = os.getpid()

detect_duplicates()

Detects other Python processes running the same script.

Returns:

Type Description
list[Process]

list of psutil.Process instances representing duplicates.

Source code in orchard/core/environment/guards.py
def detect_duplicates(self) -> list[psutil.Process]:
    """
    Detects other Python processes running the same script.

    Returns:
        list of psutil.Process instances representing duplicates.
    """
    duplicates = []

    for proc in psutil.process_iter(["pid", "name", "cmdline"]):
        try:
            info = proc.info
            if not info["cmdline"] or info["pid"] == self.current_pid:
                continue

            # Check if process is Python
            cmd0 = Path(info["cmdline"][0]).name.lower()
            if "python" not in cmd0:
                continue

            # Match exact script path in cmdline
            cmdline_paths = [str(Path(arg).resolve()) for arg in info["cmdline"][1:]]
            if self.script_path in cmdline_paths:
                duplicates.append(proc)

        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue

    return duplicates

terminate_duplicates(logger=None)

Terminates detected duplicate processes.

In distributed mode (torchrun / DDP), termination is skipped entirely because sibling rank processes are intentional, not duplicates.

Parameters:

Name Type Description Default
logger Logger | None

Logger for reporting terminated PIDs.

None

Returns:

Type Description
int

Number of terminated duplicate processes (0 in distributed mode).

Source code in orchard/core/environment/guards.py
def terminate_duplicates(self, logger: logging.Logger | None = None) -> int:
    """
    Terminates detected duplicate processes.

    In distributed mode (torchrun / DDP), termination is skipped entirely
    because sibling rank processes are intentional, not duplicates.

    Args:
        logger (logging.Logger | None): Logger for reporting terminated PIDs.

    Returns:
        Number of terminated duplicate processes (0 in distributed mode).
    """
    if is_distributed():
        if logger:
            logger.debug("Distributed mode: skipping duplicate process cleanup.")
        return 0

    duplicates = self.detect_duplicates()
    count = 0

    for proc in duplicates:
        try:
            proc.terminate()
            proc.wait(timeout=1)
            count += 1
            continue
        except psutil.TimeoutExpired:
            # Graceful SIGTERM timed out — fall through to SIGKILL escalation
            pass
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            continue

        # If terminate failed, try kill
        try:
            proc.kill()
            proc.wait(timeout=1)
            count += 1
        except (psutil.TimeoutExpired, psutil.NoSuchProcess, psutil.AccessDenied):
            # SIGKILL also failed or process vanished — nothing more we can do
            continue

    if count and logger:
        logger.info(f" {LogStyle.ARROW} Cleaned {count} duplicate process(es). Cooling down...")
        time.sleep(1.5)

    return count

TimeTracker()

Default implementation of TimeTrackerProtocol.

Tracks elapsed time between start() and stop() calls, providing both raw seconds and formatted output.

Source code in orchard/core/environment/timing.py
def __init__(self) -> None:
    self._start_time: float | None = None
    self._end_time: float | None = None

elapsed_seconds property

Total elapsed time in seconds.

elapsed_formatted property

Human-readable elapsed time string (e.g., '1h 23m 45s').

start()

Record pipeline start time.

Source code in orchard/core/environment/timing.py
def start(self) -> None:
    """Record pipeline start time."""
    self._start_time = time.time()
    self._end_time = None

stop()

Record stop time and return elapsed seconds.

Source code in orchard/core/environment/timing.py
def stop(self) -> float:
    """Record stop time and return elapsed seconds."""
    self._end_time = time.time()
    return self.elapsed_seconds

TimeTrackerProtocol

Bases: Protocol

Protocol for pipeline duration tracking.

elapsed_seconds property

Total elapsed time in seconds.

elapsed_formatted property

Human-readable elapsed time string.

start()

Record pipeline start time.

Source code in orchard/core/environment/timing.py
def start(self) -> None:
    """Record pipeline start time."""
    ...  # pragma: no cover

stop()

Record stop time and return elapsed seconds.

Source code in orchard/core/environment/timing.py
def stop(self) -> float:
    """Record stop time and return elapsed seconds."""
    ...  # pragma: no cover

get_local_rank()

Return the node-local rank of the current process.

Reads from the LOCAL_RANK environment variable. Used primarily for per-rank GPU assignment (torch.device(f"cuda:{local_rank}")). Returns 0 in single-process mode.

Source code in orchard/core/environment/distributed.py
def get_local_rank() -> int:
    """
    Return the node-local rank of the current process.

    Reads from the ``LOCAL_RANK`` environment variable.  Used primarily
    for per-rank GPU assignment (``torch.device(f"cuda:{local_rank}")``).
    Returns 0 in single-process mode.
    """
    return int(os.environ.get("LOCAL_RANK", 0))

get_rank()

Return the global rank of the current process.

Reads from the RANK environment variable set by torchrun or torch.distributed.launch. Returns 0 when running outside a distributed context (single-process default).

Source code in orchard/core/environment/distributed.py
def get_rank() -> int:
    """
    Return the global rank of the current process.

    Reads from the ``RANK`` environment variable set by torchrun or
    torch.distributed.launch.  Returns 0 when running outside a
    distributed context (single-process default).
    """
    return int(os.environ.get("RANK", 0))

get_world_size()

Return the total number of distributed processes.

Reads from the WORLD_SIZE environment variable. Returns 1 when running outside a distributed context.

Source code in orchard/core/environment/distributed.py
def get_world_size() -> int:
    """
    Return the total number of distributed processes.

    Reads from the ``WORLD_SIZE`` environment variable.  Returns 1
    when running outside a distributed context.
    """
    return int(os.environ.get("WORLD_SIZE", 1))

is_distributed()

Detect whether the current process was launched in a distributed context.

Returns True when either RANK or LOCAL_RANK is present in the environment, indicating a torchrun or equivalent launcher.

Source code in orchard/core/environment/distributed.py
def is_distributed() -> bool:
    """
    Detect whether the current process was launched in a distributed context.

    Returns True when either ``RANK`` or ``LOCAL_RANK`` is present in
    the environment, indicating a torchrun or equivalent launcher.
    """
    return "RANK" in os.environ or "LOCAL_RANK" in os.environ

is_main_process()

Check whether the current process is the main (rank 0) process.

Always returns True in single-process mode. In distributed mode, only the process with RANK=0 returns True.

Source code in orchard/core/environment/distributed.py
def is_main_process() -> bool:
    """
    Check whether the current process is the main (rank 0) process.

    Always returns True in single-process mode.  In distributed mode,
    only the process with ``RANK=0`` returns True.
    """
    return get_rank() == 0

ensure_single_instance(lock_file, logger)

Implements a cooperative advisory lock to guarantee singleton execution.

Leverages Unix 'flock' to create an exclusive lock on a sentinel file. If the lock cannot be acquired immediately, it indicates another instance is active, and the process will abort to prevent filesystem or GPU race conditions.

In distributed mode (torchrun / DDP), only the main process (rank 0) acquires the lock. Non-main ranks skip locking entirely to avoid deadlocking against the rank-0 held lock.

Parameters:

Name Type Description Default
lock_file Path

Filesystem path where the lock sentinel will reside.

required
logger Logger

Active logger for reporting acquisition status.

required

Raises:

Type Description
SystemExit

If an existing lock is detected on the system.

Source code in orchard/core/environment/guards.py
def ensure_single_instance(lock_file: Path, logger: logging.Logger) -> None:
    """
    Implements a cooperative advisory lock to guarantee singleton execution.

    Leverages Unix 'flock' to create an exclusive lock on a sentinel file.
    If the lock cannot be acquired immediately, it indicates another instance
    is active, and the process will abort to prevent filesystem or GPU
    race conditions.

    In distributed mode (torchrun / DDP), only the main process (rank 0)
    acquires the lock.  Non-main ranks skip locking entirely to avoid
    deadlocking against the rank-0 held lock.

    Args:
        lock_file (Path): Filesystem path where the lock sentinel will reside.
        logger (logging.Logger): Active logger for reporting acquisition status.

    Raises:
        SystemExit: If an existing lock is detected on the system.
    """
    global _lock_fd

    # In distributed mode, only rank 0 manages the lock
    if not is_main_process():
        logger.debug("Rank %d: skipping lock acquisition (non-main process).", os.getpid())
        return

    # Locking is currently only supported on Unix-like systems via fcntl
    if platform.system() in ("Linux", "Darwin") and HAS_FCNTL:
        f: IO[str] | None = None
        try:
            lock_file.parent.mkdir(parents=True, exist_ok=True)
            f = open(lock_file, "a")

            # Attempt to acquire an exclusive lock without blocking
            fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
            _lock_fd = f
            logger.info("  %s System lock acquired", LogStyle.ARROW)

        except (IOError, BlockingIOError):
            if f is not None:
                f.close()
            logger.error(
                " %s CRITICAL: Another instance is already running. Aborting.",
                LogStyle.WARNING,
            )
            sys.exit(1)

release_single_instance(lock_file)

Safely releases the system lock and unlinks the sentinel file.

Guarantees that the file descriptor is closed and the lock is returned to the OS. Designed to be called during normal shutdown or within exception handling blocks.

Parameters:

Name Type Description Default
lock_file Path

Filesystem path to the sentinel file to be removed.

required
Source code in orchard/core/environment/guards.py
def release_single_instance(lock_file: Path) -> None:
    """
    Safely releases the system lock and unlinks the sentinel file.

    Guarantees that the file descriptor is closed and the lock is returned
    to the OS. Designed to be called during normal shutdown or within
    exception handling blocks.

    Args:
        lock_file (Path): Filesystem path to the sentinel file to be removed.
    """
    global _lock_fd

    if _lock_fd:
        try:
            if HAS_FCNTL:
                try:
                    fcntl.flock(_lock_fd, fcntl.LOCK_UN)
                except (OSError, IOError):
                    # Unlock may fail if process is already terminated
                    pass

            try:
                _lock_fd.close()
            except (OSError, IOError):  # pragma: no cover
                # Close may fail if fd is already closed
                pass
        finally:
            _lock_fd = None

    # Attempt unlink directly to avoid TOCTOU race condition
    # (file could be deleted between exists() check and unlink() call)
    try:
        lock_file.unlink()
    except FileNotFoundError:
        # File was already removed by another process - expected in race conditions
        pass
    except OSError:  # pragma: no cover
        # Other OS errors (permissions, etc.) - safe to ignore during cleanup
        pass

apply_cpu_threads(num_workers)

Sets optimal compute threads to avoid resource contention.

Synchronizes PyTorch, OMP, and MKL thread counts.

Parameters:

Name Type Description Default
num_workers int

Active DataLoader workers

required

Returns:

Type Description
int

Number of threads assigned to compute operations

Source code in orchard/core/environment/hardware.py
def apply_cpu_threads(num_workers: int) -> int:
    """
    Sets optimal compute threads to avoid resource contention.

    Synchronizes PyTorch, OMP, and MKL thread counts.

    Args:
        num_workers: Active DataLoader workers

    Returns:
        Number of threads assigned to compute operations
    """
    total_cores = os.cpu_count() or 1
    optimal_threads = max(2, total_cores - num_workers)

    torch.set_num_threads(optimal_threads)
    os.environ["OMP_NUM_THREADS"] = str(optimal_threads)
    os.environ["MKL_NUM_THREADS"] = str(optimal_threads)

    return optimal_threads

configure_system_libraries()

Configures libraries for headless environments and reduces logging noise.

  • Sets Matplotlib to 'Agg' backend on Linux/Docker (no GUI)
  • Configures font embedding for PDF/PS exports
  • Suppresses verbose Matplotlib warnings
Source code in orchard/core/environment/hardware.py
def configure_system_libraries() -> None:
    """
    Configures libraries for headless environments and reduces logging noise.

    - Sets Matplotlib to 'Agg' backend on Linux/Docker (no GUI)
    - Configures font embedding for PDF/PS exports
    - Suppresses verbose Matplotlib warnings
    """
    is_linux = platform.system() == "Linux"
    is_docker = os.environ.get("IN_DOCKER") == "TRUE" or Path("/.dockerenv").exists()

    if is_linux or is_docker:
        matplotlib.use("Agg")
        matplotlib.rcParams["pdf.fonttype"] = 42
        matplotlib.rcParams["ps.fonttype"] = 42
        logging.getLogger("matplotlib").setLevel(logging.WARNING)

detect_best_device()

Detects the most performant accelerator (CUDA > MPS > CPU).

Returns:

Type Description
str

Device string: 'cuda', 'mps', or 'cpu'

Source code in orchard/core/environment/hardware.py
def detect_best_device() -> str:
    """
    Detects the most performant accelerator (CUDA > MPS > CPU).

    Returns:
        Device string: 'cuda', 'mps', or 'cpu'
    """
    if torch.cuda.is_available():
        return "cuda"
    if has_mps_backend():
        return "mps"
    return "cpu"

get_accelerator_name()

Returns accelerator model name (CUDA GPU or Apple Silicon) or empty string.

Source code in orchard/core/environment/hardware.py
def get_accelerator_name() -> str:
    """Returns accelerator model name (CUDA GPU or Apple Silicon) or empty string."""
    if torch.cuda.is_available():
        return torch.cuda.get_device_name(0)
    if has_mps_backend():
        return f"Apple Silicon ({platform.machine()})"
    return ""

get_num_workers()

Determines optimal DataLoader workers with RAM stability cap.

Returns:

Type Description
int

Recommended number of subprocesses (2-8 range)

Source code in orchard/core/environment/hardware.py
def get_num_workers() -> int:
    """
    Determines optimal DataLoader workers with RAM stability cap.

    Returns:
        Recommended number of subprocesses (2-8 range)
    """
    total_cores = os.cpu_count() or _MIN_WORKERS
    if total_cores <= 4:
        return _MIN_WORKERS
    return min(total_cores // 2, _MAX_WORKERS)

get_vram_info(device_idx=0)

Retrieves VRAM availability for a CUDA device.

Note

MPS (Apple Silicon) does not expose VRAM info via PyTorch — torch.mps.mem_get_info() does not exist. Returns 'N/A' for non-CUDA devices until Apple provides a public API.

Parameters:

Name Type Description Default
device_idx int

GPU index to query

0

Returns:

Type Description
str

Formatted string 'X.XX GB / Y.YY GB' or status message

Source code in orchard/core/environment/hardware.py
def get_vram_info(device_idx: int = 0) -> str:
    """
    Retrieves VRAM availability for a CUDA device.

    Note:
        MPS (Apple Silicon) does not expose VRAM info via PyTorch —
        ``torch.mps.mem_get_info()`` does not exist. Returns 'N/A' for
        non-CUDA devices until Apple provides a public API.

    Args:
        device_idx: GPU index to query

    Returns:
        Formatted string 'X.XX GB / Y.YY GB' or status message
    """
    if not torch.cuda.is_available():
        return "N/A"

    try:
        if device_idx >= torch.cuda.device_count():
            return "Invalid Device Index"

        free, total = torch.cuda.mem_get_info(device_idx)
        return f"{free / 1024**3:.2f} GB / {total / 1024**3:.2f} GB"
    except RuntimeError as e:
        logging.debug("VRAM query failed: %s", e)
        return "Query Failed"

has_mps_backend()

Check if MPS backend is available (macOS Apple Silicon).

Source code in orchard/core/environment/hardware.py
def has_mps_backend() -> bool:
    """Check if MPS backend is available (macOS Apple Silicon)."""
    return hasattr(torch.backends, "mps") and torch.backends.mps.is_available()

to_device_obj(device_str, local_rank=0)

Converts device string to PyTorch device object.

In distributed multi-GPU setups, uses local_rank to select the correct GPU and calls torch.cuda.set_device() for CUDA affinity.

Parameters:

Name Type Description Default
device_str str

'cuda', 'cpu', or 'auto' (auto-selects best available)

required
local_rank int

Node-local process rank for GPU assignment (default 0). Used to select cuda:{local_rank} in multi-GPU setups. Ignored for non-CUDA devices.

0

Returns:

Type Description
device

torch.device object

Raises:

Type Description
ValueError

If CUDA requested but unavailable, or invalid device string

Source code in orchard/core/environment/hardware.py
def to_device_obj(device_str: str, local_rank: int = 0) -> torch.device:
    """
    Converts device string to PyTorch device object.

    In distributed multi-GPU setups, uses ``local_rank`` to select the
    correct GPU and calls ``torch.cuda.set_device()`` for CUDA affinity.

    Args:
        device_str: 'cuda', 'cpu', or 'auto' (auto-selects best available)
        local_rank: Node-local process rank for GPU assignment (default 0).
            Used to select ``cuda:{local_rank}`` in multi-GPU setups.
            Ignored for non-CUDA devices.

    Returns:
        torch.device object

    Raises:
        ValueError: If CUDA requested but unavailable, or invalid device string
    """
    if device_str == "auto":
        device_str = detect_best_device()

    if device_str == "cuda" and not torch.cuda.is_available():
        raise ValueError("CUDA requested but not available")

    if device_str not in ("cuda", "cpu", "mps"):
        raise ValueError(f"Unsupported device: {device_str}")

    if device_str == "cuda" and local_rank > 0:
        torch.cuda.set_device(local_rank)
        return torch.device(f"cuda:{local_rank}")

    return torch.device(device_str)

determine_tta_mode(use_tta, device_type, tta_mode='full')

Reports the active TTA ensemble policy.

The ensemble complexity is driven by the tta_mode config field, not by hardware. This guarantees identical predictions on CPU, CUDA and MPS for the same config, preserving cross-platform determinism.

Parameters:

Name Type Description Default
use_tta bool

Whether Test-Time Augmentation is enabled.

required
device_type str

The type of active device ('cpu', 'cuda', 'mps').

required
tta_mode str

Config-driven ensemble complexity ('full' or 'light').

'full'

Returns:

Type Description
str

Descriptive string of the TTA operation mode.

Source code in orchard/core/environment/policy.py
def determine_tta_mode(use_tta: bool, device_type: str, tta_mode: str = "full") -> str:
    """
    Reports the active TTA ensemble policy.

    The ensemble complexity is driven by the ``tta_mode`` config field,
    not by hardware.  This guarantees identical predictions on CPU, CUDA
    and MPS for the same config, preserving cross-platform determinism.

    Args:
        use_tta: Whether Test-Time Augmentation is enabled.
        device_type: The type of active device ('cpu', 'cuda', 'mps').
        tta_mode: Config-driven ensemble complexity ('full' or 'light').

    Returns:
        Descriptive string of the TTA operation mode.
    """
    if not use_tta:
        return "DISABLED"

    mode_label = tta_mode.upper()
    return f"{mode_label} ({device_type.upper()})"

set_seed(seed, strict=False, warn_only=False)

Seed all PRNGs and optionally enforce deterministic algorithms.

Seeds Python's random, NumPy, and PyTorch (CPU + CUDA + MPS). In strict mode, additionally forces deterministic kernels at the cost of reduced performance.

Note

PYTHONHASHSEED is set here for completeness, but CPython reads it only at interpreter startup — the runtime assignment has no effect on the running process. The project Dockerfile handles this correctly (ENV PYTHONHASHSEED=0). For bare-metal runs, prefix the command: PYTHONHASHSEED=42 orchard run <recipe>. Full bit-exact determinism additionally requires strict=True and num_workers=0 (both enforced automatically in Docker via DOCKER_REPRODUCIBILITY_MODE).

Parameters:

Name Type Description Default
seed int

The seed value to set across all PRNGs.

required
strict bool

If True, enforces deterministic algorithms (5-30% perf penalty).

False
warn_only bool

If True (and strict=True), uses warn-only mode for torch.use_deterministic_algorithms — logs warnings instead of raising errors for non-deterministic ops. Ignored when strict is False.

False
Source code in orchard/core/environment/reproducibility.py
def set_seed(seed: int, strict: bool = False, warn_only: bool = False) -> None:  # pragma: no mutate
    """
    Seed all PRNGs and optionally enforce deterministic algorithms.

    Seeds Python's ``random``, NumPy, and PyTorch (CPU + CUDA + MPS).
    In strict mode, additionally forces deterministic kernels at the
    cost of reduced performance.

    Note:
        ``PYTHONHASHSEED`` is set here for completeness, but CPython reads it
        only at interpreter startup — the runtime assignment has no effect on
        the running process. The project Dockerfile handles this correctly
        (``ENV PYTHONHASHSEED=0``). For bare-metal runs, prefix the command:
        ``PYTHONHASHSEED=42 orchard run <recipe>``. Full bit-exact determinism
        additionally requires ``strict=True`` and ``num_workers=0`` (both
        enforced automatically in Docker via ``DOCKER_REPRODUCIBILITY_MODE``).

    Args:
        seed: The seed value to set across all PRNGs.
        strict: If True, enforces deterministic algorithms (5-30% perf penalty).
        warn_only: If True (and strict=True), uses warn-only mode for
            ``torch.use_deterministic_algorithms`` — logs warnings instead of
            raising errors for non-deterministic ops. Ignored when strict
            is False.
    """
    random.seed(seed)

    # Best-effort: effective only if set before interpreter startup (see Note)
    already_set = os.environ.get("PYTHONHASHSEED") == str(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    if strict and not already_set:
        _stacklevel = 2  # pragma: no mutate
        warnings.warn(
            f"PYTHONHASHSEED={seed} set at runtime, but CPython reads it only at "
            "interpreter startup. For bare-metal determinism: "
            f"PYTHONHASHSEED={seed} orchard run <recipe>",
            stacklevel=_stacklevel,
        )

    np.random.seed(seed)
    torch.manual_seed(seed)

    has_cuda = torch.cuda.is_available()
    has_mps = hasattr(torch.backends, "mps") and torch.backends.mps.is_available()

    if has_cuda:
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

        if strict:
            os.environ["CUBLAS_WORKSPACE_CONFIG"] = ":4096:8"

    if has_mps:
        torch.mps.manual_seed(seed)

    if strict:
        if has_mps:
            _stacklevel = 2  # pragma: no mutate
            warnings.warn(
                "MPS backend has partial determinism support in PyTorch. "
                "Some operations may not have deterministic implementations. "
                "Consider using CPU for fully deterministic experiments.",
                stacklevel=_stacklevel,
            )
        torch.use_deterministic_algorithms(True, warn_only=warn_only)

worker_init_fn(worker_id)

Initialize PRNGs for a DataLoader worker subprocess.

Each worker receives a unique but deterministic sub-seed derived from the parent seed, ensuring augmentation diversity while maintaining reproducibility across runs.

Called automatically by DataLoader when num_workers > 0. In strict reproducibility mode, num_workers is forced to 0 by HardwareConfig, so this function is never invoked.

Parameters:

Name Type Description Default
worker_id int

Subprocess ID provided by DataLoader (0-based).

required
Source code in orchard/core/environment/reproducibility.py
def worker_init_fn(worker_id: int) -> None:
    """
    Initialize PRNGs for a DataLoader worker subprocess.

    Each worker receives a unique but deterministic sub-seed derived from
    the parent seed, ensuring augmentation diversity while maintaining
    reproducibility across runs.

    Called automatically by DataLoader when ``num_workers > 0``.
    In strict reproducibility mode, ``num_workers`` is forced to 0 by
    HardwareConfig, so this function is never invoked.

    Args:
        worker_id: Subprocess ID provided by DataLoader (0-based).
    """
    worker_info = torch.utils.data.get_worker_info()
    if worker_info is None:
        return

    # Derive unique sub-seed: deterministic per (parent_seed, worker_id)
    base_seed = worker_info.seed
    seed = (base_seed + worker_id) % 2**32

    # Synchronize all major PRNGs for this worker
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)