Skip to content

shell

Symbol Details

class: ShellRun

class ShellRun:
    config: ShellConfig
    p_open: Popen | None = None
    current_attempt: int = 1

Stores dynamic behavior. Only created by this file never outside.

Args: _start_flag: Future[ShellRun]: Flag that is set when the run has both p_open and stdout/stderr reading started. During multiple attempts, only the 1st attempt will call it.

Field Type Default Since
config ShellConfig - 0.3.0
p_open Popen | None None 0.3.0
current_attempt int 1 0.3.0

Changes

Version Change
0.3.0 Made public

class: handle_interrupt_wait

class handle_interrupt_wait:
    interrupt_message: str
    immediate_kill: bool = False
Field Type Default Since
interrupt_message str - 0.3.0
immediate_kill bool False 0.3.0

Changes

Version Change
0.3.0 Made public

function: kill

def kill(run: ShellRun, immediate: bool = False, reason: str = '', abort_timeout: float = 3.0):
    ...

https://stackoverflow.com/questions/4789837/how-to-terminate-a-python-subprocess-launched-with-shell-true

Changes

Version Change
0.3.0 Made public

function: kill_all_runs

def kill_all_runs(immediate: bool = False, reason: str = '', abort_timeout: float = 3.0, *, skip_retry: bool = False):
    ...

Changes

Version Change
0.3.0 Made public

function: run_error

def run_error(run: ShellRun, timeout: float | None = 1) -> BaseException | None:
    ...

Changes

Version Change
0.3.0 Made public

class: run_pool

class run_pool:
    task_name: str
    total: int = 0
    max_concurrent_submits: int = 4
    threads_used_per_submit: int = 5
    sleep_time: float = 1
    sleep_callback: Callable[[], Any] | None = None
    exit_wait_timeout: float | None = None
    pool: ThreadPoolExecutor = ...
Field Type Default Since
task_name str - 0.3.0
total int 0 0.3.0
max_concurrent_submits int 4 0.3.0
threads_used_per_submit int 5 0.3.0
sleep_time float 1 0.3.0
sleep_callback Callable[[], Any] | None None 0.3.0
exit_wait_timeout float | None None 0.3.0
pool ThreadPoolExecutor ... 0.3.0

Changes

Version Change
0.3.0 Made public

function: stop_runs_and_pool

def stop_runs_and_pool(reason: str = 'atexit', immediate: bool = False):
    ...

Changes

Version Change
0.3.0 Made public

function: wait_on_ok_errors

def wait_on_ok_errors(*runs, timeout: float | None = None, skip_kill_timeouts: bool = False) -> tuple[list[ShellRun], list[tuple[BaseException, ShellRun]]]:
    ...

Changes

Version Change
0.3.0 Made public

exception: AbortRetryError

class AbortRetryError(Exception):
    ...

Raise from should_retry to stop retrying with a custom error.

Changes

Version Change
0.5.0 Made public