Skip to content

rra_tools

cli_tools

convert_choice(value: str, choices: Collection[str]) -> list[str]

Convert a choice to a list of choices, handling the special 'All' choice.

Parameters

value The choice to convert. choices The set of choices to choose from.

Returns

list[str] The list of choices.

Source code in src/rra_tools/cli_tools/options.py
def convert_choice(value: str, choices: Collection[str]) -> list[str]:
    """Convert a choice to a list of choices, handling the special 'All' choice.

    Parameters
    ----------
    value
        The choice to convert.
    choices
        The set of choices to choose from.

    Returns
    -------
    list[str]
        The list of choices.
    """
    if value == RUN_ALL:
        return list(choices)
    elif value in choices:
        return [value]
    else:
        msg = f"Invalid choice: {value}. Must be one of {choices} or {RUN_ALL}."
        raise ValueError(msg)

handle_exceptions(func: Callable[P, T], logger: SupportsLogging, *, with_debugger: bool) -> Callable[P, T]

Drops a user into an interactive debugger if func raises an error.

Source code in src/rra_tools/cli_tools/exceptions.py
def handle_exceptions[**P, T](
    func: Callable[P, T],
    logger: SupportsLogging,
    *,
    with_debugger: bool,
) -> Callable[P, T]:
    """Drops a user into an interactive debugger if func raises an error."""

    @functools.wraps(func)
    def wrapped(*args: P.args, **kwargs: P.kwargs) -> T:  # type: ignore[return]
        try:
            return func(*args, **kwargs)
        except (BdbQuit, KeyboardInterrupt):
            raise
        except Exception:
            msg = "Uncaught exception"
            logger.exception(msg)
            if with_debugger:
                import pdb  # noqa: T100
                import traceback

                traceback.print_exc()
                pdb.post_mortem()
            else:
                raise

    return wrapped

import_module_from_info(module_info: ModuleInfo) -> ModuleType

Import a module from a ModuleInfo object.

Source code in src/rra_tools/cli_tools/importers.py
def import_module_from_info(module_info: ModuleInfo) -> ModuleType:
    """Import a module from a ModuleInfo object."""
    finder = module_info.module_finder
    spec = finder.find_spec(module_info.name)  # type: ignore[call-arg]
    module = spec.loader.load_module(module_info.name)  # type: ignore[union-attr]
    return module  # noqa: RET504

process_choices(allow_all: bool, choices: Collection[str] | None) -> tuple[click.ParamType, str | None, bool]

Support function for creating options with choices.

A common pattern in RRA pipelines is to build CLIs that admit a choice of a specific set of values or a special value that represents all possible values. This function provides a way to handle this pattern in a consistent way.

There are four possible cases: 1. No choices are provided and RUN_ALL is allowed. This is useful when the set of choices is not known ahead of time, or is contingent on another option. For example, if there is a task that depends on location and year, but the years available depend on the location. The user might want to run a single year for a location (which they'll have to know ahead of time); or all years for a location, which would be the subset of years available for that location; or all years for all locations, which could be a different subset of years for each included location. 2. Choices are provided and RUN_ALL is allowed. This is useful when the set of choices is known ahead of time, but the user might want to run all of them. 3. No choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is not known ahead of time, but the user must provide a value. 4. Choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is known ahead of time and the user must provide a value.

Parameters

allow_all Whether to allow the special value RUN_ALL. choices The set of choices to allow.

Returns

tuple[click.ParamType, str | None, bool] The option type, default value, and whether to show the default.

Source code in src/rra_tools/cli_tools/options.py
def process_choices(
    allow_all: bool,  # noqa: FBT001
    choices: Collection[str] | None,
) -> tuple[click.ParamType, str | None, bool]:
    """Support function for creating options with choices.

    A common pattern in RRA pipelines is to build CLIs that admit a choice
    of a specific set of values or a special value that represents all
    possible values. This function provides a way to handle this pattern
    in a consistent way.

    There are four possible cases:
    1. No choices are provided and RUN_ALL is allowed. This is useful when the
        set of choices is not known ahead of time, or is contingent on another
        option. For example, if there is a task that depends on location and year,
        but the years available depend on the location. The user might want to
        run a single year for a location (which they'll have to know ahead of time);
        or all years for a location, which would be the subset of years available
        for that location; or all years for all locations, which could be a different
        subset of years for each included location.
    2. Choices are provided and RUN_ALL is allowed. This is useful when the set of
        choices is known ahead of time, but the user might want to run all of them.
    3. No choices are provided and RUN_ALL is not allowed. This is useful when the
        set of choices is not known ahead of time, but the user must provide a value.
    4. Choices are provided and RUN_ALL is not allowed. This is useful when the set of
        choices is known ahead of time and the user must provide a value.

    Parameters
    ----------
    allow_all
        Whether to allow the special value RUN_ALL.
    choices
        The set of choices to allow.

    Returns
    -------
    tuple[click.ParamType, str | None, bool]
        The option type, default value, and whether to show the default.
    """

    if choices is None:
        option_type: click.ParamType = click.STRING
        default = RUN_ALL if allow_all else None
    else:
        choices = list(choices)
        if allow_all:
            choices.append(RUN_ALL)
            default = RUN_ALL
        else:
            default = None
        option_type = click.Choice(choices)
    show_default = default is not None
    return option_type, default, show_default

with_choice(name: str, short_name: str | None = None, *, allow_all: bool = True, choices: Collection[str] | None = None, convert: bool | None = None, **kwargs: Any) -> Callable[[Callable[P, T]], Callable[P, T]]

Create an option with a set of choices.

Parameters

name The name of the option. short_name An optional short name for the option. allow_all Whether to allow the special value "ALL", which represents all choices. choices The set of choices to allow. convert Whether to convert the provided argument to a list, resolving the special value "ALL" to all choices.

Source code in src/rra_tools/cli_tools/options.py
def with_choice[**P, T](
    name: str,
    short_name: str | None = None,
    *,
    allow_all: bool = True,
    choices: Collection[str] | None = None,
    convert: bool | None = None,
    **kwargs: Any,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Create an option with a set of choices.

    Parameters
    ----------
    name
        The name of the option.
    short_name
        An optional short name for the option.
    allow_all
        Whether to allow the special value "ALL", which represents all choices.
    choices
        The set of choices to allow.
    convert
        Whether to convert the provided argument to a list, resolving the special
        value "ALL" to all choices.

    """

    names = [f"--{name.replace('_', '-')}"]
    if short_name is not None:
        if len(short_name) != 1:
            msg = "Short names must be a single character."
            raise ValueError(msg)
        names.append(f"-{short_name}")
    option_type, default, show_default = process_choices(allow_all, choices)

    if choices and convert is None:
        convert = allow_all

    if convert:
        if not allow_all:
            msg = "Conversion is only supported when allow_all is True."
            raise ValueError(msg)
        if choices is None:
            msg = "Conversion is only supported when choices are provided."
            raise ValueError(msg)

        if "callback" in kwargs:
            old_callback = kwargs.pop("callback")

            def _callback(
                ctx: click.Context,
                param: click.Parameter,
                value: str,
            ) -> list[str]:
                value = old_callback(ctx, param, value)
                return convert_choice(value, choices)
        else:

            def _callback(
                ctx: click.Context,  # noqa: ARG001
                param: click.Parameter,  # noqa: ARG001
                value: str,
            ) -> list[str]:
                return convert_choice(value, choices)

        kwargs["callback"] = _callback

    return click.option(
        *names,
        type=option_type,
        default=default,
        show_default=show_default,
        **kwargs,
    )

exceptions

handle_exceptions(func: Callable[P, T], logger: SupportsLogging, *, with_debugger: bool) -> Callable[P, T]

Drops a user into an interactive debugger if func raises an error.

Source code in src/rra_tools/cli_tools/exceptions.py
def handle_exceptions[**P, T](
    func: Callable[P, T],
    logger: SupportsLogging,
    *,
    with_debugger: bool,
) -> Callable[P, T]:
    """Drops a user into an interactive debugger if func raises an error."""

    @functools.wraps(func)
    def wrapped(*args: P.args, **kwargs: P.kwargs) -> T:  # type: ignore[return]
        try:
            return func(*args, **kwargs)
        except (BdbQuit, KeyboardInterrupt):
            raise
        except Exception:
            msg = "Uncaught exception"
            logger.exception(msg)
            if with_debugger:
                import pdb  # noqa: T100
                import traceback

                traceback.print_exc()
                pdb.post_mortem()
            else:
                raise

    return wrapped

importers

import_module_from_info(module_info: ModuleInfo) -> ModuleType

Import a module from a ModuleInfo object.

Source code in src/rra_tools/cli_tools/importers.py
def import_module_from_info(module_info: ModuleInfo) -> ModuleType:
    """Import a module from a ModuleInfo object."""
    finder = module_info.module_finder
    spec = finder.find_spec(module_info.name)  # type: ignore[call-arg]
    module = spec.loader.load_module(module_info.name)  # type: ignore[union-attr]
    return module  # noqa: RET504

options

convert_choice(value: str, choices: Collection[str]) -> list[str]

Convert a choice to a list of choices, handling the special 'All' choice.

Parameters

value The choice to convert. choices The set of choices to choose from.

Returns

list[str] The list of choices.

Source code in src/rra_tools/cli_tools/options.py
def convert_choice(value: str, choices: Collection[str]) -> list[str]:
    """Convert a choice to a list of choices, handling the special 'All' choice.

    Parameters
    ----------
    value
        The choice to convert.
    choices
        The set of choices to choose from.

    Returns
    -------
    list[str]
        The list of choices.
    """
    if value == RUN_ALL:
        return list(choices)
    elif value in choices:
        return [value]
    else:
        msg = f"Invalid choice: {value}. Must be one of {choices} or {RUN_ALL}."
        raise ValueError(msg)

process_choices(allow_all: bool, choices: Collection[str] | None) -> tuple[click.ParamType, str | None, bool]

Support function for creating options with choices.

A common pattern in RRA pipelines is to build CLIs that admit a choice of a specific set of values or a special value that represents all possible values. This function provides a way to handle this pattern in a consistent way.

There are four possible cases: 1. No choices are provided and RUN_ALL is allowed. This is useful when the set of choices is not known ahead of time, or is contingent on another option. For example, if there is a task that depends on location and year, but the years available depend on the location. The user might want to run a single year for a location (which they'll have to know ahead of time); or all years for a location, which would be the subset of years available for that location; or all years for all locations, which could be a different subset of years for each included location. 2. Choices are provided and RUN_ALL is allowed. This is useful when the set of choices is known ahead of time, but the user might want to run all of them. 3. No choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is not known ahead of time, but the user must provide a value. 4. Choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is known ahead of time and the user must provide a value.

Parameters

allow_all Whether to allow the special value RUN_ALL. choices The set of choices to allow.

Returns

tuple[click.ParamType, str | None, bool] The option type, default value, and whether to show the default.

Source code in src/rra_tools/cli_tools/options.py
def process_choices(
    allow_all: bool,  # noqa: FBT001
    choices: Collection[str] | None,
) -> tuple[click.ParamType, str | None, bool]:
    """Support function for creating options with choices.

    A common pattern in RRA pipelines is to build CLIs that admit a choice
    of a specific set of values or a special value that represents all
    possible values. This function provides a way to handle this pattern
    in a consistent way.

    There are four possible cases:
    1. No choices are provided and RUN_ALL is allowed. This is useful when the
        set of choices is not known ahead of time, or is contingent on another
        option. For example, if there is a task that depends on location and year,
        but the years available depend on the location. The user might want to
        run a single year for a location (which they'll have to know ahead of time);
        or all years for a location, which would be the subset of years available
        for that location; or all years for all locations, which could be a different
        subset of years for each included location.
    2. Choices are provided and RUN_ALL is allowed. This is useful when the set of
        choices is known ahead of time, but the user might want to run all of them.
    3. No choices are provided and RUN_ALL is not allowed. This is useful when the
        set of choices is not known ahead of time, but the user must provide a value.
    4. Choices are provided and RUN_ALL is not allowed. This is useful when the set of
        choices is known ahead of time and the user must provide a value.

    Parameters
    ----------
    allow_all
        Whether to allow the special value RUN_ALL.
    choices
        The set of choices to allow.

    Returns
    -------
    tuple[click.ParamType, str | None, bool]
        The option type, default value, and whether to show the default.
    """

    if choices is None:
        option_type: click.ParamType = click.STRING
        default = RUN_ALL if allow_all else None
    else:
        choices = list(choices)
        if allow_all:
            choices.append(RUN_ALL)
            default = RUN_ALL
        else:
            default = None
        option_type = click.Choice(choices)
    show_default = default is not None
    return option_type, default, show_default

with_choice(name: str, short_name: str | None = None, *, allow_all: bool = True, choices: Collection[str] | None = None, convert: bool | None = None, **kwargs: Any) -> Callable[[Callable[P, T]], Callable[P, T]]

Create an option with a set of choices.

Parameters

name The name of the option. short_name An optional short name for the option. allow_all Whether to allow the special value "ALL", which represents all choices. choices The set of choices to allow. convert Whether to convert the provided argument to a list, resolving the special value "ALL" to all choices.

Source code in src/rra_tools/cli_tools/options.py
def with_choice[**P, T](
    name: str,
    short_name: str | None = None,
    *,
    allow_all: bool = True,
    choices: Collection[str] | None = None,
    convert: bool | None = None,
    **kwargs: Any,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Create an option with a set of choices.

    Parameters
    ----------
    name
        The name of the option.
    short_name
        An optional short name for the option.
    allow_all
        Whether to allow the special value "ALL", which represents all choices.
    choices
        The set of choices to allow.
    convert
        Whether to convert the provided argument to a list, resolving the special
        value "ALL" to all choices.

    """

    names = [f"--{name.replace('_', '-')}"]
    if short_name is not None:
        if len(short_name) != 1:
            msg = "Short names must be a single character."
            raise ValueError(msg)
        names.append(f"-{short_name}")
    option_type, default, show_default = process_choices(allow_all, choices)

    if choices and convert is None:
        convert = allow_all

    if convert:
        if not allow_all:
            msg = "Conversion is only supported when allow_all is True."
            raise ValueError(msg)
        if choices is None:
            msg = "Conversion is only supported when choices are provided."
            raise ValueError(msg)

        if "callback" in kwargs:
            old_callback = kwargs.pop("callback")

            def _callback(
                ctx: click.Context,
                param: click.Parameter,
                value: str,
            ) -> list[str]:
                value = old_callback(ctx, param, value)
                return convert_choice(value, choices)
        else:

            def _callback(
                ctx: click.Context,  # noqa: ARG001
                param: click.Parameter,  # noqa: ARG001
                value: str,
            ) -> list[str]:
                return convert_choice(value, choices)

        kwargs["callback"] = _callback

    return click.option(
        *names,
        type=option_type,
        default=default,
        show_default=show_default,
        **kwargs,
    )

jobmon

_process_args(args: dict[str, Collection[Any] | Any] | None) -> tuple[dict[str, Collection[Any]], str]

Process arguments for a task.

Parameters

args The arguments to process.

Returns

tuple[dict[str, Collection[Any]], str] The names of all non-flag and non-count arguments and the string representation of the arguments.

Source code in src/rra_tools/jobmon.py
def _process_args(
    args: dict[str, Collection[Any] | Any] | None,
) -> tuple[dict[str, Collection[Any]], str]:
    """Process arguments for a task.

    Parameters
    ----------
    args
        The arguments to process.

    Returns
    -------
    tuple[dict[str, Collection[Any]], str]
        The names of all non-flag and non-count arguments and the string
        representation of the arguments.
    """
    if args is None:
        return {}, ""
    out_args = {}
    arg_parts = []
    for k, v in args.items():
        if v is not None:
            arg_parts.append(f"--{k} {{{k.replace('-', '_')}}}")
            out_args[k.replace("-", "_")] = v
        elif len(k) == 1 or k in ["v", "vv", "vvv"]:
            arg_parts.append(f"-{k}")
        else:
            arg_parts.append(f"--{k}")
    arg_string = " ".join(arg_parts)
    return out_args, arg_string

build_parallel_task_graph(jobmon_tool, runner: str, task_name: str, task_resources: dict[str, str | int], *, node_args: dict[str, Collection[Any] | None] | None = None, flat_node_args: tuple[tuple[str, ...], Collection[tuple[Any, ...]]] | None = None, task_args: dict[str, Any] | None = None, op_args: dict[str, Any] | None = None, max_attempts: int | None = None) -> list[Any]

Build a parallel task graph for jobmon.

Parameters

jobmon_tool The jobmon tool. runner The runner to use for the task. task_name The name of the task. node_args The arguments to the task script that are unique to each task. The keys of the dict are the names of the arguments and the values are lists of the values to use for each task. A dict with multiple keys will result in a cartesian product of the values. Mutually exclusive with flat_node_args. flat_node_args The arguments to the task script that are unique to each task. The first element of the tuple is the names of the arguments and the second element is a list of tuples of the values to use for each task. This can be used to avoid the cartesian product of node_args and just run a subset of the possible tasks. Mutually exclusive with node_args. task_args The arguments to the task script that are the same for each task, but alter the behavior of the task (e.g. input and output root directories). op_args Arguments that are passed to the task script but do not alter the logical behavior of the task (e.g. number of cores, logging verbosity). task_resources The resources to allocate to the task. max_attempts The maximum number of attempts to make for each task.

Returns

list A list of tasks to run.

Source code in src/rra_tools/jobmon.py
def build_parallel_task_graph(  # type: ignore[no-untyped-def] # noqa: PLR0913
    jobmon_tool,
    runner: str,
    task_name: str,
    task_resources: dict[str, str | int],
    *,
    node_args: dict[str, Collection[Any] | None] | None = None,
    flat_node_args: tuple[tuple[str, ...], Collection[tuple[Any, ...]]] | None = None,
    task_args: dict[str, Any] | None = None,
    op_args: dict[str, Any] | None = None,
    max_attempts: int | None = None,
) -> list[Any]:
    """Build a parallel task graph for jobmon.

    Parameters
    ----------
    jobmon_tool
        The jobmon tool.
    runner
        The runner to use for the task.
    task_name
        The name of the task.
    node_args
        The arguments to the task script that are unique to each task. The keys of
        the dict are the names of the arguments and the values are lists of the
        values to use for each task. A dict with multiple keys will result in a
        cartesian product of the values. Mutually exclusive with
        flat_node_args.
    flat_node_args
        The arguments to the task script that are unique to each task. The first
        element of the tuple is the names of the arguments and the second element
        is a list of tuples of the values to use for each task. This can be used
        to avoid the cartesian product of node_args and just run a subset of the
        possible tasks. Mutually exclusive with node_args.
    task_args
        The arguments to the task script that are the same for each task, but
        alter the behavior of the task (e.g. input and output root directories).
    op_args
        Arguments that are passed to the task script but do not alter the logical
        behavior of the task (e.g. number of cores, logging verbosity).
    task_resources
        The resources to allocate to the task.
    max_attempts
        The maximum number of attempts to make for each task.

    Returns
    -------
    list
        A list of tasks to run.
    """
    for arg in ["stdout", "stderr"]:
        task_resources[arg] = str(task_resources.get(arg, "/tmp"))  # noqa: S108

    if node_args is not None and flat_node_args is not None:
        msg = "node_args and flat_node_args are mutually exclusive."
        raise ValueError(msg)
    if flat_node_args is not None:
        node_arg_string = " ".join(
            f"--{arg} {{{arg.replace('-', '_')}}}" for arg in flat_node_args[0]
        )
        flat_node_args = (
            tuple([arg.replace("-", "_") for arg in flat_node_args[0]]),
            flat_node_args[1],
        )
        clean_node_args: dict[str, Collection[Any]] = {k: [] for k in flat_node_args[0]}
    else:
        clean_node_args, node_arg_string = _process_args(node_args)
    clean_task_args, task_arg_string = _process_args(task_args)
    clean_op_args, op_arg_string = _process_args(op_args)

    command_template = (
        f"{runner} {task_name} {node_arg_string} {task_arg_string} {op_arg_string}"
    )

    task_template = jobmon_tool.get_task_template(
        default_compute_resources=task_resources,
        template_name=f"{task_name}_task_template",
        default_cluster_name="slurm",
        command_template=command_template,
        node_args=list(clean_node_args),
        task_args=list(clean_task_args),
        op_args=list(clean_op_args),
    )

    if flat_node_args is not None:
        tasks = []
        arg_names, arg_values = flat_node_args
        for args in arg_values:
            task_args = {
                **dict(zip(arg_names, args, strict=False)),
                **clean_task_args,
                **clean_op_args,
            }
            task = task_template.create_task(
                **task_args,
                max_attempts=max_attempts,
            )
            tasks.append(task)
    else:
        tasks = task_template.create_tasks(
            **clean_node_args,
            **clean_task_args,
            **clean_op_args,
            max_attempts=max_attempts,
        )
    return tasks

get_jobmon_tool(workflow_name: str)

Get a jobmon tool for a given workflow name with a helpful error message.

Parameters

workflow_name The name of the workflow.

Returns

Tool A jobmon tool.

Raises

ModuleNotFoundError If jobmon is not installed.

Source code in src/rra_tools/jobmon.py
def get_jobmon_tool(workflow_name: str):  # type: ignore[no-untyped-def]
    """Get a jobmon tool for a given workflow name with a helpful error message.

    Parameters
    ----------
    workflow_name
        The name of the workflow.

    Returns
    -------
    Tool
        A jobmon tool.

    Raises
    ------
    ModuleNotFoundError
        If jobmon is not installed.
    """
    try:
        from jobmon.client.tool import Tool  # type: ignore[import-not-found]
    except ModuleNotFoundError as e:
        msg = (
            "Jobmon is not installed.\n"
            "Ensure you have a file in your home "
            "directory at '~/.pip/pip.conf' with contents\n\n"
            "[global]\n"
            "extra-index-url = https://artifactory.ihme.washington.edu/artifactory/api/pypi/pypi-shared/simple\n"
            "trusted-host = artifactory.ihme.washington.edu/artifactory/api/pypi/pypi-shared\n\n"
            "and run 'pip install jobmon_installer_ihme' to install jobmon."
        )
        raise ModuleNotFoundError(msg) from e

    return Tool(workflow_name)

run_parallel(runner: str, task_name: str, task_resources: dict[str, str | int], *, node_args: dict[str, Collection[Any] | None] | None = None, flat_node_args: tuple[tuple[str, ...], Collection[tuple[Any, ...]]] | None = None, task_args: dict[str, Any] | None = None, op_args: dict[str, Any] | None = None, concurrency_limit: int = 10000, max_attempts: int | None = None, log_root: str | Path | None = None, log_method: Callable[[str], None] = print) -> str

Run a parallel set of tasks using Jobmon.

This helper function encapsulates one of the simpler workflow patterns in Jobmon: a set of tasks that run in parallel, each with the same command but different arguments. More complicated workflows should be implemented directly.

Parameters

runner The runner to use for the task. Default is 'rptask'. task_name The name of the task to run. Will also be used as the tool and workflow name. task_resources The resources to allocate to the task. node_args The arguments to the task script that are unique to each task. The keys of the dict are the names of the arguments and the values are lists of the values to use for each task. A dict with multiple keys will result in a cartesian product of the values. Mutually exclusive with flat_node_args. flat_node_args The arguments to the task script that are unique to each task. The first element of the tuple is the names of the arguments and the second element is a list of tuples of the values to use for each task. This can be used to avoid the cartesian product of node_args and just run a subset of the possible tasks. Mutually exclusive with node_args. task_args The arguments to the task script that are the same for each task, but alter the behavior of the task (e.g. input and output root directories). op_args Arguments that are passed to the task script but do not alter the logical behavior of the task (e.g. number of cores, logging verbosity). concurrency_limit The maximum number of tasks to run concurrently. Default is 10000. max_attempts The maximum number of attempts to make for each task. log_root The root directory for the logs. Default is None. log_method The method to use for logging. Default is print.

Returns

str The status of the workflow.

Source code in src/rra_tools/jobmon.py
def run_parallel(  # noqa: PLR0913
    runner: str,
    task_name: str,
    task_resources: dict[str, str | int],
    *,
    node_args: dict[str, Collection[Any] | None] | None = None,
    flat_node_args: tuple[tuple[str, ...], Collection[tuple[Any, ...]]] | None = None,
    task_args: dict[str, Any] | None = None,
    op_args: dict[str, Any] | None = None,
    concurrency_limit: int = 10000,
    max_attempts: int | None = None,
    log_root: str | Path | None = None,
    log_method: Callable[[str], None] = print,
) -> str:
    """Run a parallel set of tasks using Jobmon.

    This helper function encapsulates one of the simpler workflow patterns in Jobmon:
    a set of tasks that run in parallel, each with the same command but
    different arguments. More complicated workflows should be implemented
    directly.

    Parameters
    ----------
    runner
        The runner to use for the task. Default is 'rptask'.
    task_name
        The name of the task to run.  Will also be used as the tool and workflow name.
    task_resources
        The resources to allocate to the task.
    node_args
        The arguments to the task script that are unique to each task. The keys of
        the dict are the names of the arguments and the values are lists of the
        values to use for each task. A dict with multiple keys will result in a
        cartesian product of the values. Mutually exclusive with
        flat_node_args.
    flat_node_args
        The arguments to the task script that are unique to each task. The first
        element of the tuple is the names of the arguments and the second element
        is a list of tuples of the values to use for each task. This can be used
        to avoid the cartesian product of node_args and just run a subset of the
        possible tasks. Mutually exclusive with node_args.
    task_args
        The arguments to the task script that are the same for each task, but
        alter the behavior of the task (e.g. input and output root directories).
    op_args
        Arguments that are passed to the task script but do not alter the logical
        behavior of the task (e.g. number of cores, logging verbosity).
    concurrency_limit
        The maximum number of tasks to run concurrently. Default is 10000.
    max_attempts
        The maximum number of attempts to make for each task.
    log_root
        The root directory for the logs. Default is None.
    log_method
        The method to use for logging. Default is print.

    Returns
    -------
    str
        The status of the workflow.
    """
    if node_args is not None and flat_node_args is not None:
        msg = "node_args and flat_node_args are mutually exclusive."
        raise ValueError(msg)

    if log_root is None:
        if task_args is None or "output-dir" not in task_args:
            msg = (
                "The task_args dictionary must contain an 'output-dir' key if no "
                "log_root is provided."
            )
            raise KeyError(msg)
        log_root = Path(task_args["output-dir"])
    log_dir = make_log_dir(log_root)
    task_resources["stdout"] = str(log_dir / "output")
    task_resources["standard_output"] = str(log_dir / "output")
    task_resources["stderr"] = str(log_dir / "error")
    task_resources["standard_error"] = str(log_dir / "error")

    tool = get_jobmon_tool(workflow_name=task_name)
    workflow = tool.create_workflow(
        name=f"{task_name}_{uuid.uuid4()}",
        max_concurrently_running=concurrency_limit,
    )

    tasks = build_parallel_task_graph(
        jobmon_tool=tool,
        task_name=task_name,
        node_args=node_args,
        flat_node_args=flat_node_args,
        task_args=task_args,
        op_args=op_args,
        task_resources=task_resources,
        runner=runner,
        max_attempts=max_attempts,
    )

    workflow.add_tasks(tasks)
    return run_workflow(workflow, log_method)

logging

add_logging_sink(sink: TextIO | loguru.Writable | Callable[[loguru.Message], None] | Handler | Path, verbose: int, *, colorize: bool = False, serialize: bool = False) -> None

Add a new output file handle for logging.

Source code in src/rra_tools/logging/config.py
def add_logging_sink(
    sink: TextIO | loguru.Writable | Callable[[loguru.Message], None] | Handler | Path,
    verbose: int,
    *,
    colorize: bool = False,
    serialize: bool = False,
) -> None:
    """Add a new output file handle for logging."""
    level, message_format = LOG_FORMATS.get(
        verbose, LOG_FORMATS[max(LOG_FORMATS.keys())]
    )
    logger.add(
        sink,  # type: ignore[arg-type]
        colorize=colorize,
        level=level,
        format=message_format,
        serialize=serialize,
        filter={
            # Suppress logs up to the level provided.
            "urllib3": "WARNING",  # Uselessly (for us) noisy.
        },
    )

configure_logging_to_files(log_dir: str | Path) -> None

Sets up logging to a file in an output directory.

Logs to files are done with the highest verbosity to allow for debugging if necessary.

Source code in src/rra_tools/logging/config.py
def configure_logging_to_files(log_dir: str | Path) -> None:
    """Sets up logging to a file in an output directory.

    Logs to files are done with the highest verbosity to allow
    for debugging if necessary.

    """
    mkdir(log_dir, exist_ok=True)
    add_logging_sink(
        Path(log_dir) / "main_log.json",
        verbose=3,
        serialize=True,
    )
    add_logging_sink(
        Path(log_dir) / "main_log.txt",
        verbose=3,
    )

configure_logging_to_terminal(verbose: int) -> None

Setup logging to sys.stdout.

This is presumed to be one of the first calls made in an application entry point. Any logging that occurs before this call won't be intercepted or handled with the standard logging configuration.

Source code in src/rra_tools/logging/config.py
def configure_logging_to_terminal(verbose: int) -> None:
    """Setup logging to sys.stdout.

    This is presumed to be one of the first calls made in an
    application entry point. Any logging that occurs before this
    call won't be intercepted or handled with the standard
    logging configuration.

    """
    logger.remove(0)  # Clear default configuration
    add_logging_sink(sys.stdout, verbose, colorize=True)

config

add_logging_sink(sink: TextIO | loguru.Writable | Callable[[loguru.Message], None] | Handler | Path, verbose: int, *, colorize: bool = False, serialize: bool = False) -> None

Add a new output file handle for logging.

Source code in src/rra_tools/logging/config.py
def add_logging_sink(
    sink: TextIO | loguru.Writable | Callable[[loguru.Message], None] | Handler | Path,
    verbose: int,
    *,
    colorize: bool = False,
    serialize: bool = False,
) -> None:
    """Add a new output file handle for logging."""
    level, message_format = LOG_FORMATS.get(
        verbose, LOG_FORMATS[max(LOG_FORMATS.keys())]
    )
    logger.add(
        sink,  # type: ignore[arg-type]
        colorize=colorize,
        level=level,
        format=message_format,
        serialize=serialize,
        filter={
            # Suppress logs up to the level provided.
            "urllib3": "WARNING",  # Uselessly (for us) noisy.
        },
    )

configure_logging_to_files(log_dir: str | Path) -> None

Sets up logging to a file in an output directory.

Logs to files are done with the highest verbosity to allow for debugging if necessary.

Source code in src/rra_tools/logging/config.py
def configure_logging_to_files(log_dir: str | Path) -> None:
    """Sets up logging to a file in an output directory.

    Logs to files are done with the highest verbosity to allow
    for debugging if necessary.

    """
    mkdir(log_dir, exist_ok=True)
    add_logging_sink(
        Path(log_dir) / "main_log.json",
        verbose=3,
        serialize=True,
    )
    add_logging_sink(
        Path(log_dir) / "main_log.txt",
        verbose=3,
    )

configure_logging_to_terminal(verbose: int) -> None

Setup logging to sys.stdout.

This is presumed to be one of the first calls made in an application entry point. Any logging that occurs before this call won't be intercepted or handled with the standard logging configuration.

Source code in src/rra_tools/logging/config.py
def configure_logging_to_terminal(verbose: int) -> None:
    """Setup logging to sys.stdout.

    This is presumed to be one of the first calls made in an
    application entry point. Any logging that occurs before this
    call won't be intercepted or handled with the standard
    logging configuration.

    """
    logger.remove(0)  # Clear default configuration
    add_logging_sink(sys.stdout, verbose, colorize=True)

parallel

======== Parallel ========

This module simplifies the use of multiprocessing. It provides a single function, :func:run_parallel, that runs a function in parallel over a list of arguments.

is_notebook() -> bool

Are we running code in a jupyter notebook?

Code from https://stackoverflow.com/a/39662359

Source code in src/rra_tools/parallel.py
def is_notebook() -> bool:
    """Are we running code in a jupyter notebook?

    Code from https://stackoverflow.com/a/39662359
    """
    try:
        # The get_ipython function will be in the global namespace if we're in
        # an ipython-like environment (including jupyter notebooks).
        shell = get_ipython().__class__.__name__  # type: ignore[name-defined]
    except NameError:
        # Probably standard Python interpreter
        return False
    else:
        # Jupyter notebook or qtconsole
        return shell == "ZMQInteractiveShell"  # type: ignore[no-any-return]

run_parallel(runner: Callable[[T1], T2], arg_list: Collection[T1], *, num_cores: int = 1, progress_bar: bool = False, notebook_fallback: bool = True) -> list[T2]

Runs a single argument function in parallel over a list of arguments.

This function dodges multiprocessing if only a single process is requested to make functions more flexible to debugging. It also supports progress bars if requested.

Parameters

runner A single argument function to be run in parallel. arg_list A list of arguments to be run over in parallel. num_cores Maximum number of processes to be run in parallel. If num_cores == 1, The jobs will be run serially without invoking multiprocessing. progress_bar Whether to display a progress bar for the running jobs. notebook_fallback Whether to fallback to standard multiprocessing in a notebook. We use pathos for multiprocessing as it uses a more robust serialization library, but pathos has some leaky state and doesn't properly close down child processes when interrupted in a jupyter notebook.

Returns

List[Any] A list of the results of the parallel calls of the runner.

Source code in src/rra_tools/parallel.py
def run_parallel[T1, T2](
    runner: Callable[[T1], T2],
    arg_list: Collection[T1],
    *,
    num_cores: int = 1,
    progress_bar: bool = False,
    notebook_fallback: bool = True,
) -> list[T2]:
    """Runs a single argument function in parallel over a list of arguments.

    This function dodges multiprocessing if only a single process is requested to
    make functions more flexible to debugging. It also supports progress bars if
    requested.

    Parameters
    ----------
    runner
        A single argument function to be run in parallel.
    arg_list
        A list of arguments to be run over in parallel.
    num_cores
        Maximum number of processes to be run in parallel. If num_cores == 1,
        The jobs will be run serially without invoking multiprocessing.
    progress_bar
        Whether to display a progress bar for the running jobs.
    notebook_fallback
        Whether to fallback to standard multiprocessing in a notebook. We use `pathos`
        for multiprocessing as it uses a more robust serialization library, but `pathos`
        has some leaky state and doesn't properly close down child processes when
        interrupted in a jupyter notebook.

    Returns
    -------
    List[Any]
        A list of the results of the parallel calls of the runner.

    """

    if num_cores == 1:
        result = []
        for arg in tqdm.tqdm(arg_list, disable=not progress_bar):
            result.append(runner(arg))  # noqa: PERF401
    else:
        if is_notebook() and notebook_fallback:
            processing_pool_class = StdLibPool
        else:
            processing_pool_class = PathosPool

        with processing_pool_class(num_cores) as pool:
            result = list(
                tqdm.tqdm(
                    pool.imap(runner, arg_list),
                    total=len(arg_list),
                    disable=not progress_bar,
                )
            )
    return result

plotting

Plotting utilities for RRA tools.

strip_axes(ax: Axes) -> Axes

Despine axis and remove ticks and labels.

Source code in src/rra_tools/plotting.py
def strip_axes(ax: Axes) -> Axes:
    """Despine axis and remove ticks and labels."""
    sns.despine(ax=ax, left=True, bottom=True)
    ax.set_xticks([])
    ax.set_yticks([])
    return ax

write_or_show(fig: Figure, plot_file: str | Path | None, **savefig_kwargs: Any) -> None

Write the figure to a file or show it.

Source code in src/rra_tools/plotting.py
def write_or_show(
    fig: Figure, plot_file: str | Path | None, **savefig_kwargs: Any
) -> None:
    """Write the figure to a file or show it."""
    if plot_file:
        fig.savefig(plot_file, **savefig_kwargs)
        plt.close(fig)
    else:
        plt.show()

shell_tools

_touch_clean(path: str | Path, mode: int = 436) -> None

Creates a file with the specified mode, overwriting the file if it exists.

This function is a helper function for the touch function. It is not meant to be used outside of this module.

Parameters

path The path of the file to create. mode The permission mode to use in file creation.

Source code in src/rra_tools/shell_tools.py
def _touch_clean(path: str | Path, mode: int = 0o664) -> None:
    """Creates a file with the specified mode, overwriting the file if it exists.

    This function is a helper function for the `touch` function. It is not
    meant to be used outside of this module.

    Parameters
    ----------
    path
        The path of the file to create.
    mode
        The permission mode to use in file creation.

    """
    path = Path(path)
    old_umask = os.umask(0o777 - mode)
    try:
        path.touch()
    finally:
        os.umask(old_umask)

mkdir(path: str | Path, mode: int = 509, *, exist_ok: bool = False, parents: bool = False) -> None

Creates a directory and its parents with the specified mode.

This method is meant to combat permissions errors generated by the default umask behavior when creating parent directories (i.e. ignore the mode argument and use the default permissions).

Parameters

path The path of the directory to create. mode The permission mode to use in directory creation. exist_ok If False, raises FileExistsError if the directory already exists. parents If False, raises FileNotFoundError if the directory's parent doesn't exist.

Source code in src/rra_tools/shell_tools.py
def mkdir(
    path: str | Path,
    mode: int = 0o775,
    *,
    exist_ok: bool = False,
    parents: bool = False,
) -> None:
    """Creates a directory and its parents with the specified mode.

    This method is meant to combat permissions errors generated by the default
    umask behavior when creating parent directories (i.e. ignore the mode
    argument and use the default permissions).

    Parameters
    ----------
    path
        The path of the directory to create.
    mode
        The permission mode to use in directory creation.
    exist_ok
        If False, raises FileExistsError if the directory already exists.
    parents
        If False, raises FileNotFoundError if the directory's parent doesn't
        exist.

    """
    path = Path(path)
    old_umask = os.umask(0o777 - mode)
    try:
        path.mkdir(exist_ok=exist_ok, parents=parents)
    finally:
        os.umask(old_umask)

touch(path: str | Path, mode: int = 436, *, exist_ok: bool = False, clobber: bool = False) -> None

Creates a file with the specified mode.

Parameters

path The path of the file to create. mode The permission mode to use in file creation. exist_ok If False, raises FileExistsError if the file already exists. If True, raises FileExistsError if path is a directory or permissions do not match the mode argument. clobber If True, overwrites the file if it already exists.

Source code in src/rra_tools/shell_tools.py
def touch(
    path: str | Path,
    mode: int = 0o664,
    *,
    exist_ok: bool = False,
    clobber: bool = False,
) -> None:
    """Creates a file with the specified mode.

    Parameters
    ----------
    path
        The path of the file to create.
    mode
        The permission mode to use in file creation.
    exist_ok
        If False, raises FileExistsError if the file already exists.
        If True, raises FileExistsError if path is a directory or permissions
        do not match the mode argument.
    clobber
        If True, overwrites the file if it already exists.

    """
    path = Path(path)
    if path.exists():
        if not path.is_file():
            msg = f"File exists at {path} and is not a file."
            raise FileExistsError(msg)
        if not exist_ok and not clobber:
            msg = f"File exists at {path}."
            raise FileExistsError(msg)

        if clobber:
            path.unlink()
            _touch_clean(path, mode)
        else:
            path_chmod = path.stat().st_mode & 0o777
            if path_chmod != mode:
                msg = (
                    f"File exists at {path} with mode {oct(path_chmod)} "
                    f"and not {oct(mode)}."
                )
                raise FileExistsError(msg)
    else:
        _touch_clean(path, mode)

unzip_and_delete_archive(archive_path: str | Path, output_path: str | Path) -> None

Unzips an archive file to a directory and then deletes the archive.

Parameters

archive_path The path to the archive we want to unzip. output_path The place to store the unzipped contents.

Source code in src/rra_tools/shell_tools.py
def unzip_and_delete_archive(archive_path: str | Path, output_path: str | Path) -> None:
    """Unzips an archive file to a directory and then deletes the archive.

    Parameters
    ----------
    archive_path
        The path to the archive we want to unzip.
    output_path
        The place to store the unzipped contents.

    """
    subprocess.run(shlex.split(f"unzip {archive_path} -d {output_path}"), check=True)
    subprocess.run(shlex.split(f"rm {archive_path}"), check=True)

wget(url: str, output_path: str | Path) -> None

Retrieves content at the url and stores it at an output path.

Parameters

url The url to retrieve the content from. output_path Where we'll save the output to.

Source code in src/rra_tools/shell_tools.py
def wget(url: str, output_path: str | Path) -> None:
    """Retrieves content at the url and stores it at an output path.

    Parameters
    ----------
    url
        The url to retrieve the content from.
    output_path
        Where we'll save the output to.

    """
    subprocess.run(shlex.split(f"wget -O {output_path} {url}"), check=True)

translate

translate_dataframe(df: pd.DataFrame, columns: list[str] | None = None, source_language: str = 'auto', target_language: str = 'en') -> pd.DataFrame

Translate a dataframe using Google Translate.

Parameters

df The dataframe to translate. columns The columns to translate. If None, all columns will be translated. source_language The language of the input text. If 'auto', Google Translate will attempt to detect the language. target_language The language to translate to.

Returns

pd.DataFrame The translated dataframe.

Source code in src/rra_tools/translate.py
def translate_dataframe(
    df: pd.DataFrame,
    columns: list[str] | None = None,
    source_language: str = "auto",
    target_language: str = "en",
) -> pd.DataFrame:
    """Translate a dataframe using Google Translate.

    Parameters
    ----------
    df
        The dataframe to translate.
    columns
        The columns to translate. If None, all columns will be translated.
    source_language
        The language of the input text. If 'auto', Google Translate will attempt
        to detect the language.
    target_language
        The language to translate to.

    Returns
    -------
    pd.DataFrame
        The translated dataframe.
    """
    df = df.copy()  # don't mutate the original dataframe

    if columns is None:
        columns = df.columns.tolist()
    translator = GoogleTranslator(source=source_language, target=target_language)
    for col in columns:
        df[f"{col}"] = translator.translate_batch(df[col].tolist())
    return df

translate_text_file(input_path: str | Path, output_path: str | Path, source_language: str = 'auto', target_language: str = 'en', input_encoding: str = 'utf-8') -> None

Translate a text file line-by-line using Google Translate.

This function will produce a new file interleaving the original lines with the translated lines. Google Translate is sometimes a little silly and so having the original line next to the translated line can be helpful, especially if you have some knowledge of the source language.

Parameters

input_path The path to the input file. output_path The path to the output file. source_language The language of the input text. If 'auto', Google Translate will attempt to detect the language. target_language The language to translate to. input_encoding The encoding of the input file.

Source code in src/rra_tools/translate.py
def translate_text_file(
    input_path: str | Path,
    output_path: str | Path,
    source_language: str = "auto",
    target_language: str = "en",
    input_encoding: str = "utf-8",
) -> None:
    """Translate a text file line-by-line using Google Translate.

    This function will produce a new file interleaving the original lines with
    the translated lines. Google Translate is sometimes a little silly and so
    having the original line next to the translated line can be helpful, especially
    if you have some knowledge of the source language.

    Parameters
    ----------
    input_path
        The path to the input file.
    output_path
        The path to the output file.
    source_language
        The language of the input text. If 'auto', Google Translate will attempt
        to detect the language.
    target_language
        The language to translate to.
    input_encoding
        The encoding of the input file.
    """
    with Path(input_path, encoding=input_encoding).open() as f:
        lines = f.readlines()

    translator = GoogleTranslator(source=source_language, target=target_language)
    translated_lines = translator.translate_batch(lines)

    with Path(output_path).open("w") as f:
        for in_line, out_line in zip(lines, translated_lines, strict=False):
            if in_line:
                f.write(f"{in_line.strip()}\n{out_line.strip()}\n\n")