rra_tools
cli_tools
convert_choice(value: str, choices: Collection[str]) -> list[str]
Convert a choice to a list of choices, handling the special 'All' choice.
Parameters
value The choice to convert. choices The set of choices to choose from.
Returns
list[str] The list of choices.
Source code in src/rra_tools/cli_tools/options.py
handle_exceptions(func: Callable[P, T], logger: SupportsLogging, *, with_debugger: bool) -> Callable[P, T]
Drops a user into an interactive debugger if func raises an error.
Source code in src/rra_tools/cli_tools/exceptions.py
import_module_from_info(module_info: ModuleInfo) -> ModuleType
Import a module from a ModuleInfo object.
Source code in src/rra_tools/cli_tools/importers.py
process_choices(allow_all: bool, choices: Collection[str] | None) -> tuple[click.ParamType, str | None, bool]
Support function for creating options with choices.
A common pattern in RRA pipelines is to build CLIs that admit a choice of a specific set of values or a special value that represents all possible values. This function provides a way to handle this pattern in a consistent way.
There are four possible cases: 1. No choices are provided and RUN_ALL is allowed. This is useful when the set of choices is not known ahead of time, or is contingent on another option. For example, if there is a task that depends on location and year, but the years available depend on the location. The user might want to run a single year for a location (which they'll have to know ahead of time); or all years for a location, which would be the subset of years available for that location; or all years for all locations, which could be a different subset of years for each included location. 2. Choices are provided and RUN_ALL is allowed. This is useful when the set of choices is known ahead of time, but the user might want to run all of them. 3. No choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is not known ahead of time, but the user must provide a value. 4. Choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is known ahead of time and the user must provide a value.
Parameters
allow_all Whether to allow the special value RUN_ALL. choices The set of choices to allow.
Returns
tuple[click.ParamType, str | None, bool] The option type, default value, and whether to show the default.
Source code in src/rra_tools/cli_tools/options.py
with_choice(name: str, short_name: str | None = None, *, allow_all: bool = True, choices: Collection[str] | None = None, convert: bool | None = None, **kwargs: Any) -> Callable[[Callable[P, T]], Callable[P, T]]
Create an option with a set of choices.
Parameters
name The name of the option. short_name An optional short name for the option. allow_all Whether to allow the special value "ALL", which represents all choices. choices The set of choices to allow. convert Whether to convert the provided argument to a list, resolving the special value "ALL" to all choices.
Source code in src/rra_tools/cli_tools/options.py
exceptions
handle_exceptions(func: Callable[P, T], logger: SupportsLogging, *, with_debugger: bool) -> Callable[P, T]
Drops a user into an interactive debugger if func raises an error.
Source code in src/rra_tools/cli_tools/exceptions.py
importers
import_module_from_info(module_info: ModuleInfo) -> ModuleType
Import a module from a ModuleInfo object.
Source code in src/rra_tools/cli_tools/importers.py
options
convert_choice(value: str, choices: Collection[str]) -> list[str]
Convert a choice to a list of choices, handling the special 'All' choice.
Parameters
value The choice to convert. choices The set of choices to choose from.
Returns
list[str] The list of choices.
Source code in src/rra_tools/cli_tools/options.py
process_choices(allow_all: bool, choices: Collection[str] | None) -> tuple[click.ParamType, str | None, bool]
Support function for creating options with choices.
A common pattern in RRA pipelines is to build CLIs that admit a choice of a specific set of values or a special value that represents all possible values. This function provides a way to handle this pattern in a consistent way.
There are four possible cases: 1. No choices are provided and RUN_ALL is allowed. This is useful when the set of choices is not known ahead of time, or is contingent on another option. For example, if there is a task that depends on location and year, but the years available depend on the location. The user might want to run a single year for a location (which they'll have to know ahead of time); or all years for a location, which would be the subset of years available for that location; or all years for all locations, which could be a different subset of years for each included location. 2. Choices are provided and RUN_ALL is allowed. This is useful when the set of choices is known ahead of time, but the user might want to run all of them. 3. No choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is not known ahead of time, but the user must provide a value. 4. Choices are provided and RUN_ALL is not allowed. This is useful when the set of choices is known ahead of time and the user must provide a value.
Parameters
allow_all Whether to allow the special value RUN_ALL. choices The set of choices to allow.
Returns
tuple[click.ParamType, str | None, bool] The option type, default value, and whether to show the default.
Source code in src/rra_tools/cli_tools/options.py
with_choice(name: str, short_name: str | None = None, *, allow_all: bool = True, choices: Collection[str] | None = None, convert: bool | None = None, **kwargs: Any) -> Callable[[Callable[P, T]], Callable[P, T]]
Create an option with a set of choices.
Parameters
name The name of the option. short_name An optional short name for the option. allow_all Whether to allow the special value "ALL", which represents all choices. choices The set of choices to allow. convert Whether to convert the provided argument to a list, resolving the special value "ALL" to all choices.
Source code in src/rra_tools/cli_tools/options.py
jobmon
_process_args(args: dict[str, Collection[Any] | Any] | None) -> tuple[dict[str, Collection[Any]], str]
Process arguments for a task.
Parameters
args The arguments to process.
Returns
tuple[dict[str, Collection[Any]], str] The names of all non-flag and non-count arguments and the string representation of the arguments.
Source code in src/rra_tools/jobmon.py
build_parallel_task_graph(jobmon_tool, runner: str, task_name: str, task_resources: dict[str, str | int], *, node_args: dict[str, Collection[Any] | None] | None = None, flat_node_args: tuple[tuple[str, ...], Collection[tuple[Any, ...]]] | None = None, task_args: dict[str, Any] | None = None, op_args: dict[str, Any] | None = None, max_attempts: int | None = None) -> list[Any]
Build a parallel task graph for jobmon.
Parameters
jobmon_tool The jobmon tool. runner The runner to use for the task. task_name The name of the task. node_args The arguments to the task script that are unique to each task. The keys of the dict are the names of the arguments and the values are lists of the values to use for each task. A dict with multiple keys will result in a cartesian product of the values. Mutually exclusive with flat_node_args. flat_node_args The arguments to the task script that are unique to each task. The first element of the tuple is the names of the arguments and the second element is a list of tuples of the values to use for each task. This can be used to avoid the cartesian product of node_args and just run a subset of the possible tasks. Mutually exclusive with node_args. task_args The arguments to the task script that are the same for each task, but alter the behavior of the task (e.g. input and output root directories). op_args Arguments that are passed to the task script but do not alter the logical behavior of the task (e.g. number of cores, logging verbosity). task_resources The resources to allocate to the task. max_attempts The maximum number of attempts to make for each task.
Returns
list A list of tasks to run.
Source code in src/rra_tools/jobmon.py
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
|
get_jobmon_tool(workflow_name: str)
Get a jobmon tool for a given workflow name with a helpful error message.
Parameters
workflow_name The name of the workflow.
Returns
Tool A jobmon tool.
Raises
ModuleNotFoundError If jobmon is not installed.
Source code in src/rra_tools/jobmon.py
run_parallel(runner: str, task_name: str, task_resources: dict[str, str | int], *, node_args: dict[str, Collection[Any] | None] | None = None, flat_node_args: tuple[tuple[str, ...], Collection[tuple[Any, ...]]] | None = None, task_args: dict[str, Any] | None = None, op_args: dict[str, Any] | None = None, concurrency_limit: int = 10000, max_attempts: int | None = None, log_root: str | Path | None = None, log_method: Callable[[str], None] = print) -> str
Run a parallel set of tasks using Jobmon.
This helper function encapsulates one of the simpler workflow patterns in Jobmon: a set of tasks that run in parallel, each with the same command but different arguments. More complicated workflows should be implemented directly.
Parameters
runner The runner to use for the task. Default is 'rptask'. task_name The name of the task to run. Will also be used as the tool and workflow name. task_resources The resources to allocate to the task. node_args The arguments to the task script that are unique to each task. The keys of the dict are the names of the arguments and the values are lists of the values to use for each task. A dict with multiple keys will result in a cartesian product of the values. Mutually exclusive with flat_node_args. flat_node_args The arguments to the task script that are unique to each task. The first element of the tuple is the names of the arguments and the second element is a list of tuples of the values to use for each task. This can be used to avoid the cartesian product of node_args and just run a subset of the possible tasks. Mutually exclusive with node_args. task_args The arguments to the task script that are the same for each task, but alter the behavior of the task (e.g. input and output root directories). op_args Arguments that are passed to the task script but do not alter the logical behavior of the task (e.g. number of cores, logging verbosity). concurrency_limit The maximum number of tasks to run concurrently. Default is 10000. max_attempts The maximum number of attempts to make for each task. log_root The root directory for the logs. Default is None. log_method The method to use for logging. Default is print.
Returns
str The status of the workflow.
Source code in src/rra_tools/jobmon.py
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 |
|
logging
add_logging_sink(sink: TextIO | loguru.Writable | Callable[[loguru.Message], None] | Handler | Path, verbose: int, *, colorize: bool = False, serialize: bool = False) -> None
Add a new output file handle for logging.
Source code in src/rra_tools/logging/config.py
configure_logging_to_files(log_dir: str | Path) -> None
Sets up logging to a file in an output directory.
Logs to files are done with the highest verbosity to allow for debugging if necessary.
Source code in src/rra_tools/logging/config.py
configure_logging_to_terminal(verbose: int) -> None
Setup logging to sys.stdout.
This is presumed to be one of the first calls made in an application entry point. Any logging that occurs before this call won't be intercepted or handled with the standard logging configuration.
Source code in src/rra_tools/logging/config.py
config
add_logging_sink(sink: TextIO | loguru.Writable | Callable[[loguru.Message], None] | Handler | Path, verbose: int, *, colorize: bool = False, serialize: bool = False) -> None
Add a new output file handle for logging.
Source code in src/rra_tools/logging/config.py
configure_logging_to_files(log_dir: str | Path) -> None
Sets up logging to a file in an output directory.
Logs to files are done with the highest verbosity to allow for debugging if necessary.
Source code in src/rra_tools/logging/config.py
configure_logging_to_terminal(verbose: int) -> None
Setup logging to sys.stdout.
This is presumed to be one of the first calls made in an application entry point. Any logging that occurs before this call won't be intercepted or handled with the standard logging configuration.
Source code in src/rra_tools/logging/config.py
parallel
======== Parallel ========
This module simplifies the use of multiprocessing. It provides a single
function, :func:run_parallel
, that runs a function in parallel over a list of
arguments.
is_notebook() -> bool
Are we running code in a jupyter notebook?
Code from https://stackoverflow.com/a/39662359
Source code in src/rra_tools/parallel.py
run_parallel(runner: Callable[[T1], T2], arg_list: Collection[T1], *, num_cores: int = 1, progress_bar: bool = False, notebook_fallback: bool = True) -> list[T2]
Runs a single argument function in parallel over a list of arguments.
This function dodges multiprocessing if only a single process is requested to make functions more flexible to debugging. It also supports progress bars if requested.
Parameters
runner
A single argument function to be run in parallel.
arg_list
A list of arguments to be run over in parallel.
num_cores
Maximum number of processes to be run in parallel. If num_cores == 1,
The jobs will be run serially without invoking multiprocessing.
progress_bar
Whether to display a progress bar for the running jobs.
notebook_fallback
Whether to fallback to standard multiprocessing in a notebook. We use pathos
for multiprocessing as it uses a more robust serialization library, but pathos
has some leaky state and doesn't properly close down child processes when
interrupted in a jupyter notebook.
Returns
List[Any] A list of the results of the parallel calls of the runner.
Source code in src/rra_tools/parallel.py
plotting
Plotting utilities for RRA tools.
strip_axes(ax: Axes) -> Axes
write_or_show(fig: Figure, plot_file: str | Path | None, **savefig_kwargs: Any) -> None
Write the figure to a file or show it.
shell_tools
_touch_clean(path: str | Path, mode: int = 436) -> None
Creates a file with the specified mode, overwriting the file if it exists.
This function is a helper function for the touch
function. It is not
meant to be used outside of this module.
Parameters
path The path of the file to create. mode The permission mode to use in file creation.
Source code in src/rra_tools/shell_tools.py
mkdir(path: str | Path, mode: int = 509, *, exist_ok: bool = False, parents: bool = False) -> None
Creates a directory and its parents with the specified mode.
This method is meant to combat permissions errors generated by the default umask behavior when creating parent directories (i.e. ignore the mode argument and use the default permissions).
Parameters
path The path of the directory to create. mode The permission mode to use in directory creation. exist_ok If False, raises FileExistsError if the directory already exists. parents If False, raises FileNotFoundError if the directory's parent doesn't exist.
Source code in src/rra_tools/shell_tools.py
touch(path: str | Path, mode: int = 436, *, exist_ok: bool = False, clobber: bool = False) -> None
Creates a file with the specified mode.
Parameters
path The path of the file to create. mode The permission mode to use in file creation. exist_ok If False, raises FileExistsError if the file already exists. If True, raises FileExistsError if path is a directory or permissions do not match the mode argument. clobber If True, overwrites the file if it already exists.
Source code in src/rra_tools/shell_tools.py
unzip_and_delete_archive(archive_path: str | Path, output_path: str | Path) -> None
Unzips an archive file to a directory and then deletes the archive.
Parameters
archive_path The path to the archive we want to unzip. output_path The place to store the unzipped contents.
Source code in src/rra_tools/shell_tools.py
wget(url: str, output_path: str | Path) -> None
Retrieves content at the url and stores it at an output path.
Parameters
url The url to retrieve the content from. output_path Where we'll save the output to.
Source code in src/rra_tools/shell_tools.py
translate
translate_dataframe(df: pd.DataFrame, columns: list[str] | None = None, source_language: str = 'auto', target_language: str = 'en') -> pd.DataFrame
Translate a dataframe using Google Translate.
Parameters
df The dataframe to translate. columns The columns to translate. If None, all columns will be translated. source_language The language of the input text. If 'auto', Google Translate will attempt to detect the language. target_language The language to translate to.
Returns
pd.DataFrame The translated dataframe.
Source code in src/rra_tools/translate.py
translate_text_file(input_path: str | Path, output_path: str | Path, source_language: str = 'auto', target_language: str = 'en', input_encoding: str = 'utf-8') -> None
Translate a text file line-by-line using Google Translate.
This function will produce a new file interleaving the original lines with the translated lines. Google Translate is sometimes a little silly and so having the original line next to the translated line can be helpful, especially if you have some knowledge of the source language.
Parameters
input_path The path to the input file. output_path The path to the output file. source_language The language of the input text. If 'auto', Google Translate will attempt to detect the language. target_language The language to translate to. input_encoding The encoding of the input file.