Skip to content

API documentation

cli

strun() -> None

Entry point for running spatial-temporal CGF pipeline workflows.

Source code in src/rra_climate_health/cli.py
6
7
8
@click.group()
def strun() -> None:
    """Entry point for running spatial-temporal CGF pipeline workflows."""

sttask() -> None

Entry point for running spatial-temporal CGF pipeline tasks.

Source code in src/rra_climate_health/cli.py
@click.group()
def sttask() -> None:
    """Entry point for running spatial-temporal CGF pipeline tasks."""

data

get_run_directory(output_root: str | Path) -> Path

Gets a path to a datetime directory for a new output.

Parameters

output_root The root directory for all outputs.

Source code in src/rra_climate_health/data.py
def get_run_directory(output_root: str | Path) -> Path:
    """Gets a path to a datetime directory for a new output.

    Parameters
    ----------
    output_root
        The root directory for all outputs.

    """
    output_root = Path(output_root).resolve()
    launch_time = datetime.datetime.now(tz=datetime.UTC).strftime("%Y_%m_%d")
    today_runs = [
        int(run_dir.name.split(".")[1])
        for run_dir in output_root.iterdir()
        if run_dir.name.startswith(launch_time)
    ]
    run_version = max(today_runs) + 1 if today_runs else 1
    datetime_dir = output_root / f"{launch_time}.{run_version:0>2}"
    return datetime_dir

save_raster(raster: rt.RasterArray, output_path: str | Path, num_cores: int = 1, **kwargs: typing.Any) -> None

Save a raster to a file with standard parameters.

Source code in src/rra_climate_health/data.py
def save_raster(
    raster: rt.RasterArray,
    output_path: str | Path,
    num_cores: int = 1,
    **kwargs: typing.Any,
) -> None:
    """Save a raster to a file with standard parameters."""
    save_params = {
        "tiled": True,
        "blockxsize": 512,
        "blockysize": 512,
        "compress": "ZSTD",
        "predictor": 2,  # horizontal differencing
        "num_threads": num_cores,
        "bigtiff": "yes",
        **kwargs,
    }
    touch(output_path, exist_ok=True)
    raster.to_file(output_path, **save_params)

data_prep

run_inference_data_prep

run_ldi_prep(output_root: str, year: list[str], queue: str) -> None

Prep LDI rasters from admin2 data

Source code in src/rra_climate_health/data_prep/run_inference_data_prep.py
@click.command()  # type: ignore[arg-type]
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_year(allow_all=True)
@clio.with_queue()
def run_ldi_prep(output_root: str, year: list[str], queue: str) -> None:
    """Prep LDI rasters from admin2 data"""
    jobmon.run_parallel(
        runner="sttask",
        task_name="ldi_prep",
        node_args={"year": year},
        task_args={
            "output-root": output_root,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "35Gb",
            "runtime": "1h",
            "project": "proj_rapidresponse",
        },
        max_attempts=1,
        log_root=str(output_root),
    )

run_ldi_prep_main(output_root: str | Path, year: int) -> None

Run LDI data preparation.

Source code in src/rra_climate_health/data_prep/run_inference_data_prep.py
def run_ldi_prep_main(
    output_root: str | Path,
    year: int,
) -> None:
    """Run LDI data preparation."""
    # Measure doesn't matter for this task
    cm_data = ClimateMalnutritionData(Path(output_root) / "stunting")
    print("Loading admin2 shapes and raster template")
    admin2 = cm_data.load_lbd_admin2_shapes()
    raster_template = cm_data.load_raster_template()

    print("Loading LDI data")
    ldi = pd.read_csv(upstream_paths.LDIPC_SUBNATIONAL_FILEPATH)
    # Fill in missing values with national mean
    national_mean = ldi.groupby(
        ["year_id", "national_ihme_loc_id", "population_percentile"]
    ).ldipc.transform("mean")
    null_mask = ldi.ldipc.isna()
    ldi.loc[null_mask, "ldipc"] = national_mean.loc[null_mask]
    # Convert to daily, and drop 0th percentile, which is just 0.
    ldi["ldi_pc_pd"] = ldi["ldipc"] / 365.25
    ldi = ldi[ldi.population_percentile > 0]

    print("Building shape map")
    ldi_locs = ldi["location_id"].unique().tolist()
    shape_map = (
        admin2.loc[admin2.loc_id.isin(ldi_locs), ["loc_id", "geometry"]]
        .rename(columns={"loc_id": "location_id"})
        .set_index("location_id")
        .geometry
    )

    print("Rasterizing LDI data")
    percentiles = ldi["population_percentile"].unique().tolist()
    for percentile in percentiles:
        print(f"Rasterizing percentile: {percentile}")
        p_year_mask = (ldi.population_percentile == percentile) & (ldi.year_id == year)
        ldi_pc_pd = ldi.loc[p_year_mask].set_index("location_id").ldi_pc_pd
        ldi_pc_pd = ldi_pc_pd[~ldi_pc_pd.index.duplicated()]
        shapes = [(shape_map.loc[loc], ldi_pc_pd.loc[loc]) for loc in ldi_pc_pd.index]
        ldi_arr = rasterize(
            shapes,
            out=np.zeros_like(raster_template),
            transform=raster_template.transform,
        )
        ldi_raster = rt.RasterArray(
            ldi_arr,
            transform=raster_template.transform,
            crs=raster_template.crs,
            no_data_value=np.nan,
        )
        cm_data.save_ldi_raster(ldi_raster, year, percentile)

run_ldi_prep_task(output_root: str, year: str) -> None

Run LDI data preparation.

Source code in src/rra_climate_health/data_prep/run_inference_data_prep.py
@click.command()  # type: ignore[arg-type]
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_year()
def run_ldi_prep_task(output_root: str, year: str) -> None:
    """Run LDI data preparation."""
    run_ldi_prep_main(Path(output_root), int(year))

run_training_data_prep

run_training_data_prep(output_root: str, source_type: str) -> None

Run training data prep.

Source code in src/rra_climate_health/data_prep/run_training_data_prep.py
@click.command()  # type: ignore[arg-type]
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_source_type(allow_all=False)
def run_training_data_prep(output_root: str, source_type: str) -> None:
    """Run training data prep."""
    print(f"Running training data prep for {source_type}...")
    # for src in source_type:
    run_training_data_prep_main(output_root, source_type)

upstream_paths

These are paths the RRA team does not own.

They should only be accessed from the data_prep subpackage. Downstream pipeline code should depend on data we manage wherever practical and load that data using the rra_climate_health.data module.

inference

run_inference

forecast_scenarios_task(output_root: str, measure: str, results_version: str, model_version: str) -> None

Run forecasting applying the inference results, and output diagnostics.

Source code in src/rra_climate_health/inference/run_inference.py
@click.command()  # type: ignore[arg-type]
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_measure()
@clio.with_results_version()
@clio.with_model_version()
def forecast_scenarios_task(
    output_root: str,
    measure: str,
    results_version: str,
    model_version: str,
) -> None:
    """Run forecasting applying the inference results, and output diagnostics."""
    forecast_scenarios(
        Path(output_root),
        measure,
        results_version,
    )
    create_inference_diagnostics_report(
        Path(output_root), measure, results_version, model_version
    )

model_inference(output_root: str, model_version: str, measure: str, cmip6_scenario: list[str], year: list[str], queue: str) -> None

Run model inference.

Source code in src/rra_climate_health/inference/run_inference.py
@click.command()  # type: ignore[arg-type]
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_model_version()
@clio.with_measure()
@clio.with_cmip6_scenario(allow_all=True)
@clio.with_year(allow_all=True)
@clio.with_queue()
def model_inference(
    output_root: str,
    model_version: str,
    measure: str,
    cmip6_scenario: list[str],
    year: list[str],
    queue: str,
) -> None:
    """Run model inference."""
    cm_data = ClimateMalnutritionData(Path(output_root) / measure)
    results_version = cm_data.new_results_version(model_version)
    print(
        f"Running inference for {measure} using {model_version}. Results version: {results_version}"
    )

    jobmon.run_parallel(
        runner="sttask",
        task_name="inference",
        node_args={
            "measure": [measure],
            "cmip6-scenario": cmip6_scenario,
            "year": year,
        },
        task_args={
            "output-root": output_root,
            "model-version": model_version,
            "results-version": results_version,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "90Gb",
            "runtime": "240m",
            "project": "proj_rapidresponse",
        },
        max_attempts=1,
        log_root=str(cm_data.results / results_version),
    )

    jobmon.run_parallel(
        runner="sttask",
        task_name="forecast",
        node_args={
            "measure": [measure],
        },
        task_args={
            "output-root": output_root,
            "model-version": model_version,
            "results-version": results_version,
        },
        task_resources={
            "queue": queue,
            "cores": 2,
            "memory": "30Gb",
            "runtime": "30m",
            "project": "proj_rapidresponse",
        },
        max_attempts=1,
        log_root=str(cm_data.results / results_version),
    )
    print(
        f"Inference complete, results can be found at {cm_data.results / results_version}"
    )

model_inference_task(output_root: str, measure: str, results_version: str, model_version: str, cmip6_scenario: str, year: str) -> None

Run model inference.

Source code in src/rra_climate_health/inference/run_inference.py
@click.command()  # type: ignore[arg-type]
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_measure()
@clio.with_results_version()
@clio.with_model_version()
@clio.with_cmip6_scenario()
@clio.with_year()
def model_inference_task(
    output_root: str,
    measure: str,
    results_version: str,
    model_version: str,
    cmip6_scenario: str,
    year: str,
) -> None:
    """Run model inference."""
    model_inference_main(
        Path(output_root),
        measure,
        results_version,
        model_version,
        cmip6_scenario,
        int(year),
    )

training

run_training

model_training(model_specification_path: str, output_root: str, queue: str) -> None

Run model training.

Source code in src/rra_climate_health/training/run_training.py
@click.command()  # type: ignore[arg-type]
@click.argument(
    "model_specification_path",
    type=click.Path(exists=True),
)
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_queue()
def model_training(
    model_specification_path: str,
    output_root: str,
    queue: str,
) -> None:
    """Run model training."""
    model_spec = ModelSpecification.from_yaml(model_specification_path)
    measure = model_spec.measure
    measure_root = Path(output_root) / measure
    cm_data = ClimateMalnutritionData(measure_root)
    model_version = cm_data.new_model_version()
    version_root = cm_data.models / model_version
    model_spec.version.model = model_version
    cm_data.save_model_specification(model_spec, model_version)

    print("Runing model training for model version", model_version)

    jobmon.run_parallel(
        runner="sttask",
        task_name="training",
        node_args={
            "age-group-id": clio.VALID_AGE_GROUP_IDS,
            "sex-id": clio.VALID_SEX_IDS,
        },
        task_args={
            "output-root": output_root,
            "measure": measure,
            "model-version": model_version,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "20Gb",
            "runtime": "1h",
            "project": "proj_rapidresponse",
        },
        max_attempts=1,
        log_root=str(version_root),
    )

    print("Model training complete. Results can be found at", version_root)

model_training_task(output_root: str, measure: str, model_version: str, age_group_id: str, sex_id: str) -> None

Run model training.

Source code in src/rra_climate_health/training/run_training.py
@click.command()  # type: ignore[arg-type]
@clio.with_output_root(DEFAULT_ROOT)
@clio.with_measure()
@clio.with_model_version()
@clio.with_age_group_id()
@clio.with_sex_id()
def model_training_task(
    output_root: str,
    measure: str,
    model_version: str,
    age_group_id: str,
    sex_id: str,
) -> None:
    """Run model training."""
    model_training_main(
        Path(output_root),
        measure,
        model_version,
        int(age_group_id),
        int(sex_id),
    )