Documentation

isolation

Subprocess isolation utilities for clean memory measurement.

This module provides utilities for running benchmark measurements in isolated subprocesses to avoid memory pollution from prior allocations.

Classes

class

IsolatedResult

Result from an isolated subprocess execution.

Attributes

NameTypeDescription
errorstr | None
execution_time_sfloat
memory_mbfloat
resultAny | None
successbool

Methods

__init__(self, success: bool, result: Any | None = None, error: str | None = None, memory_mb: float = 0.0, execution_time_s: float = 0.0) None
Parameters
  • success: bool
  • result: Any | None= None
  • error: str | None= None
  • memory_mb: float= 0.0
  • execution_time_s: float= 0.0
class

WorkerPool

Manager for parallel worker processes.

Manager for parallel worker processes. Handles spawning, synchronization, and memory measurement for multiple worker processes.

Attributes

NameTypeDescription
_ctx
_processeslist[]
_ready_eventslist[]
_result_queue | None
_start_event | None
context
num_workers

Methods

__init__(self, num_workers: int, context: Literal[('spawn', 'fork')] = 'spawn')

Initialize worker pool.

Parameters
  • num_workers: int
  • context: Literal[('spawn', 'fork')]= 'spawn'
get_pids(self) list[int]

Get list of worker process IDs.

get_results(self, timeout: float = 120.0) list[Any]

Collect results from all workers.

Parameters
  • timeout: float= 120.0
get_total_memory_mb(self) float

Get total memory usage across all workers.

shutdown(self, timeout: float = 10.0) None

Shutdown all worker processes.

Parameters
  • timeout: float= 10.0
signal_start(self) None

Signal all workers to proceed.

start_workers(self, worker_fn: Callable, worker_args: tuple = {'cls': 'ExprTuple', 'elements': [], 'implicit': False}, worker_kwargs: dict | None = None) None

Start worker processes.

Parameters
  • worker_fn: Callable
  • worker_args: tuple
  • worker_kwargs: dict | None= None
wait_for_ready(self, timeout: float = 300.0) bool

Wait for all workers to signal ready.

Parameters
  • timeout: float= 300.0

Functions

function
get_dir_size_mb(path: Path) float

Get total size of directory in MB.

Get total size of directory in MB. Args: path: Path to directory Returns: Total size in megabytes
Parameters
  • path: Path
function
get_memory_mb(pid: int | None = None) float

Get process memory usage in MB (RSS).

Get process memory usage in MB (RSS). Args: pid: Process ID to measure. If None, uses current process. Returns: Memory usage in megabytes (RSS - Resident Set Size)
Parameters
  • pid: int | None= None
function
get_total_memory_mb(pids: list[int]) float

Get total memory usage across multiple processes in MB (RSS).

Get total memory usage across multiple processes in MB (RSS). Args: pids: List of process IDs to measure Returns: Total memory usage in megabytes
Parameters
  • pids: list[int]
function
measure_memory_in_subprocess(load_fn: Callable[([], Any)], timeout: float = 300.0) tuple[(float, float, Any)]

Measure memory usage of a loading function in an isolated subprocess.

Measure memory usage of a loading function in an isolated subprocess. Args: load_fn: Function that loads something and returns it timeout: Maximum time to wait Returns: Tuple of (memory_mb, load_time_s, result)
Parameters
  • load_fn: Callable[([], Any)]
  • timeout: float= 300.0
function
run_isolated(func: Callable[(..., T)], args: tuple = {'cls': 'ExprTuple', 'elements': [], 'implicit': False}, kwargs: dict | None = None, timeout: float = 300.0, context: Literal[('spawn', 'fork')] = 'spawn') IsolatedResult

Run a function in an isolated subprocess.

Run a function in an isolated subprocess. This ensures clean memory measurement by running in a fresh process that doesn't have any prior allocations. Args: func: Function to execute (must be picklable) args: Positional arguments for the function kwargs: Keyword arguments for the function timeout: Maximum time to wait for result in seconds context: Multiprocessing context ('spawn' for clean memory, 'fork' for COW) Returns: IsolatedResult with success status, result, and memory measurement
Parameters
  • func: Callable[(..., T)]
  • args: tuple
  • kwargs: dict | None= None
  • timeout: float= 300.0
  • context: Literal[('spawn', 'fork')]= 'spawn'