Skip to content

climate_data

Climate Data

This package contains modules for extracting, processing, harmonizing, and downscaling climate data. It sources historical climate data from the European Centre for Medium-Range Weather Forecasts (ECMWF) ERA5 dataset and future climate data from the Coupled Model Intercomparison Project Phase 6 (CMIP6).

aggregate

hierarchy

Runner for the collate stage of the climate aggregates pipeline.

This module provides functions to collate block-level datasets into measure-level datasets. The compilation process: 1. Loads raw results from all blocks for a given hierarchy, measure, and scenario 2. Aggregates the data up the location hierarchy 3. Produces views for subset hierarchies 4. Saves the results as measure-level datasets

hierarchy(agg_version: str, hierarchy: list[str], agg_measure: list[str], agg_scenario: list[str], population_model_dir: str, output_dir: str, queue: str) -> None

Collate block-level datasets into measure-level datasets.

This command: 1. Identifies which combinations of hierarchy, measure, and scenario need compilation 2. Creates parallel jobs to collate each combination 3. Runs the jobs using jobmon

Source code in src/climate_data/aggregate/hierarchy.py
@click.command()
@clio.with_agg_version()
@clio.with_hierarchy(allow_all=True)
@clio.with_agg_measure(allow_all=True)
@clio.with_agg_scenario(allow_all=True)
@clio.with_input_directory("population-model", cdc.POPULATION_MODEL_ROOT)
@clio.with_output_directory(cdc.AGGREGATE_ROOT)
@clio.with_queue()
def hierarchy(
    agg_version: str,
    hierarchy: list[str],
    agg_measure: list[str],
    agg_scenario: list[str],
    population_model_dir: str,
    output_dir: str,
    queue: str,
) -> None:
    """Collate block-level datasets into measure-level datasets.

    This command:
    1. Identifies which combinations of hierarchy, measure, and scenario need compilation
    2. Creates parallel jobs to collate each combination
    3. Runs the jobs using jobmon
    """
    ca_data = ClimateAggregateData(output_dir)

    n_jobs = len(hierarchy) * len(agg_measure) * len(agg_scenario)
    print(f"Running {n_jobs} jobs")

    jobmon.run_parallel(
        runner="cdtask aggregate",
        task_name="hierarchy",
        node_args={
            "hierarchy": hierarchy,
            "agg-measure": agg_measure,
            "agg-scenario": agg_scenario,
        },
        task_args={
            "agg-version": agg_version,
            "population-model-dir": population_model_dir,
            "output-dir": output_dir,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "50G",
            "runtime": "200m",
            "project": "proj_rapidresponse",
        },
        log_root=ca_data.log_dir("aggregate_hierarchy"),
        max_attempts=3,
    )

hierarchy_main(agg_version: str, hierarchy: str, measure: str, scenario: str, population_model_dir: str, output_dir: str, *, progress_bar: bool = False) -> None

Collate block-level datasets into measure/scenario-level datasets.

This function: 1. Loads all block-level datasets for a given hierarchy, measure, and scenario 2. Combines them into a single dataset 3. Aggregates the data up the location hierarchy 4. Produces views for subset hierarchies 5. Saves the results as measure-level datasets

Parameters

agg_version The version identifier hierarchy The full aggregation hierarchy to process measure The climate measure to process scenario The climate scenario to process population_model_dir Path to the population model directory output_dir Path to save results progress_bar Whether to show a progress bar

Source code in src/climate_data/aggregate/hierarchy.py
def hierarchy_main(
    agg_version: str,
    hierarchy: str,
    measure: str,
    scenario: str,
    population_model_dir: str,
    output_dir: str,
    *,
    progress_bar: bool = False,
) -> None:
    """Collate block-level datasets into measure/scenario-level datasets.

    This function:
    1. Loads all block-level datasets for a given hierarchy, measure, and scenario
    2. Combines them into a single dataset
    3. Aggregates the data up the location hierarchy
    4. Produces views for subset hierarchies
    5. Saves the results as measure-level datasets

    Parameters
    ----------
    agg_version
        The version identifier
    hierarchy
        The full aggregation hierarchy to process
    measure
        The climate measure to process
    scenario
        The climate scenario to process
    population_model_dir
        Path to the population model directory
    output_dir
        Path to save results
    progress_bar
        Whether to show a progress bar
    """
    print(f"Compiling {measure} for {scenario} in {hierarchy}")
    ca_data = ClimateAggregateData(output_dir)
    pm_data = PopulationModelData(population_model_dir)
    draws = cdc.DRAWS

    # Load hierarchy data for aggregation
    hierarchy_df = pm_data.load_subset_hierarchy(hierarchy)

    # Get all block keys
    modeling_frame = pm_data.load_modeling_frame()
    block_keys = modeling_frame.block_key.unique()

    # Load and combine all block-level results
    desc_template = "{draw:5} {block_key:15}"
    pbar = tqdm.tqdm(
        total=len(block_keys) * len(draws),
        desc=desc_template.format(draw="DRAW", block_key="BLOCK KEY"),
        disable=not progress_bar,
    )

    all_results = []
    pop_df: pd.DataFrame | None = None

    for draw in draws:
        save_population = (
            measure == "mean_temperature" and scenario == "ssp245" and draw == "000"
        )
        draw_results = []
        for block_key in block_keys:
            pbar.set_description(desc_template.format(draw=draw, block_key=block_key))

            draw_df = ca_data.load_raw_results(
                agg_version,
                hierarchy,
                block_key,
                draw,
                measure=measure,
                scenario=scenario,
            ).drop(columns=["scenario", "measure"])
            draw_results.append(draw_df)

            pbar.update()
        draw_df = (
            pd.concat(draw_results, ignore_index=True)
            .groupby(["location_id", "year_id"])
            .sum()
            .reset_index()
        )

        agg_df = utils.aggregate_climate_to_hierarchy(
            draw_df,
            hierarchy_df,
        ).set_index(["location_id", "year_id"])
        all_results.append(agg_df["value"].rename(draw))

        if save_population:
            pop_df = agg_df["population"]

    pbar.close()

    combined_results = pd.concat(all_results, axis=1)

    # Produce views for subset hierarchies
    subset_hierarchies = cdc.HIERARCHY_MAP[hierarchy]
    for subset_hierarchy in subset_hierarchies:
        # Load the subset hierarchy
        subset_hierarchy_df = pm_data.load_subset_hierarchy(subset_hierarchy)

        # Filter results to only include locations in the subset hierarchy
        subset_location_ids = subset_hierarchy_df["location_id"].tolist()
        subset_results = combined_results.loc[subset_location_ids]

        # Save results for the subset hierarchy
        ca_data.save_results(
            subset_results,
            agg_version,
            subset_hierarchy,
            scenario,
            measure,
        )

        if pop_df is not None:
            subset_pop = pop_df.loc[subset_location_ids].reset_index()
            ca_data.save_population(subset_pop, agg_version, subset_hierarchy)

hierarchy_task(agg_version: str, hierarchy: str, agg_measure: str, agg_scenario: str, population_model_dir: str, output_dir: str, *, progress_bar: bool) -> None

Collate block-level datasets into measure-level datasets.

This command collates results for a specific hierarchy, measure, and scenario.

Source code in src/climate_data/aggregate/hierarchy.py
@click.command()
@clio.with_agg_version()
@clio.with_hierarchy()
@clio.with_agg_measure()
@clio.with_agg_scenario()
@clio.with_input_directory("population-model", cdc.POPULATION_MODEL_ROOT)
@clio.with_output_directory(cdc.AGGREGATE_ROOT)
@clio.with_progress_bar()
def hierarchy_task(
    agg_version: str,
    hierarchy: str,
    agg_measure: str,
    agg_scenario: str,
    population_model_dir: str,
    output_dir: str,
    *,
    progress_bar: bool,
) -> None:
    """Collate block-level datasets into measure-level datasets.

    This command collates results for a specific hierarchy, measure, and scenario.
    """
    hierarchy_main(
        agg_version,
        hierarchy,
        agg_measure,
        agg_scenario,
        population_model_dir,
        output_dir,
        progress_bar=progress_bar,
    )

utils

aggregate_climate_to_hierarchy(data: pd.DataFrame, hierarchy: pd.DataFrame) -> pd.DataFrame

Create all aggregate climate values for a given hierarchy from most-detailed data.

Parameters

data The most-detailed climate data to aggregate. hierarchy The hierarchy to aggregate the data to.

Returns

pd.DataFrame The climate data with values for all levels of the hierarchy.

Source code in src/climate_data/aggregate/utils.py
def aggregate_climate_to_hierarchy(
    data: pd.DataFrame, hierarchy: pd.DataFrame
) -> pd.DataFrame:
    """Create all aggregate climate values for a given hierarchy from most-detailed data.

    Parameters
    ----------
    data
        The most-detailed climate data to aggregate.
    hierarchy
        The hierarchy to aggregate the data to.

    Returns
    -------
    pd.DataFrame
        The climate data with values for all levels of the hierarchy.
    """
    results = data.set_index("location_id").copy()

    # Most detailed locations can be at multiple levels of the hierarchy,
    # so we loop over all levels from most detailed to global, aggregating
    # level by level and appending the results to the data.
    for level in reversed(list(range(1, hierarchy.level.max() + 1))):
        level_mask = hierarchy.level == level
        parent_map = hierarchy.loc[level_mask].set_index("location_id").parent_id

        subset = results.loc[parent_map.index]
        subset["parent_id"] = parent_map

        parent_values = (
            subset.groupby(["year_id", "parent_id"])[["weighted_climate", "population"]]
            .sum()
            .reset_index()
            .rename(columns={"parent_id": "location_id"})
            .set_index("location_id")
        )
        results = pd.concat([results, parent_values])
    results = (
        results.reset_index()
        .sort_values(["location_id", "year_id"])
        .reset_index(drop=True)
    )
    results["value"] = results["weighted_climate"] / results["population"]
    return results

build_bounds_map(raster_template: rt.RasterArray, shape_values: list[tuple[Polygon | MultiPolygon, int]]) -> dict[int, tuple[slice, slice]]

Build a map of location IDs to buffered slices of the raster template.

Parameters

raster_template The raster template to build the bounds map for. shape_values A list of tuples where the first element is a shapely Polygon or MultiPolygon in the CRS of the raster template and the second element is the location ID of the shape.

Returns

dict[int, tuple[slice, slice]] A dictionary mapping location IDs to a tuple of slices representing the bounds of the location in the raster template. The slices are buffered by 10 pixels to ensure that the entire shape is included in the mask.

Source code in src/climate_data/aggregate/utils.py
def build_bounds_map(
    raster_template: rt.RasterArray,
    shape_values: list[tuple[Polygon | MultiPolygon, int]],
) -> dict[int, tuple[slice, slice]]:
    """Build a map of location IDs to buffered slices of the raster template.

    Parameters
    ----------
    raster_template
        The raster template to build the bounds map for.
    shape_values
        A list of tuples where the first element is a shapely Polygon or MultiPolygon
        in the CRS of the raster template and the second element is the location ID
        of the shape.

    Returns
    -------
    dict[int, tuple[slice, slice]]
        A dictionary mapping location IDs to a tuple of slices representing the bounds
        of the location in the raster template. The slices are buffered by 10 pixels
        to ensure that the entire shape is included in the mask.
    """
    # The tranform maps pixel coordinates to the CRS coordinates.
    # This mask is the inverse of that transform.
    to_pixel = ~raster_template.transform

    bounds_map = {}
    for shp, loc_id in shape_values:
        xmin, ymin, xmax, ymax = shp.bounds
        pxmin, pymin = to_pixel * (xmin, ymax)
        pixel_buffer = 10
        pxmin = max(0, int(pxmin) - pixel_buffer)
        pymin = max(0, int(pymin) - pixel_buffer)
        pxmax, pymax = to_pixel * (xmax, ymin)
        pxmax = min(raster_template.width, int(pxmax) + pixel_buffer)
        pymax = min(raster_template.height, int(pymax) + pixel_buffer)
        bounds_map[loc_id] = (slice(pymin, pymax), slice(pxmin, pxmax))

    return bounds_map

build_location_masks(hierarchy: str, block_key: str, pm_data: PopulationModelData) -> tuple[dict[str, slice], dict[int, tuple[slice, slice, npt.NDArray[np.bool_]]]]

Build location masks for each location in the hierarchy.

Parameters

hierarchy The name of the hierarchy to build location masks for. Must be one of of the keys of the HIERARCHY_MAP constant. pm_data PopulationModelData object to load the population model data.

Returns

tuple[dict[int, tuple[slice, slice]], npt.NDArray[np.uint32]] The first element is a dictionary mapping location IDs to a tuple of slices representing the bounds of the location in the mask. This is useful for subseting the mask and data arrays before processing as downstream operations scale with the number of pixels in the mask. The second element is the mask itself, a 2D array of uint32 values where each location ID is represented by a unique integer value.

Source code in src/climate_data/aggregate/utils.py
def build_location_masks(
    hierarchy: str,
    block_key: str,
    pm_data: PopulationModelData,
) -> tuple[dict[str, slice], dict[int, tuple[slice, slice, npt.NDArray[np.bool_]]]]:
    """Build location masks for each location in the hierarchy.

    Parameters
    ----------
    hierarchy
        The name of the hierarchy to build location masks for. Must be one of
        of the keys of the HIERARCHY_MAP constant.
    pm_data
        PopulationModelData object to load the population model data.

    Returns
    -------
    tuple[dict[int, tuple[slice, slice]], npt.NDArray[np.uint32]]
        The first element is a dictionary mapping location IDs to a tuple of
        slices representing the bounds of the location in the mask. This is useful
        for subseting the mask and data arrays before processing as downstream
        operations scale with the number of pixels in the mask. The second element
        is the mask itself, a 2D array of uint32 values where each location ID is
        represented by a unique integer value.
    """
    template = pm_data.load_results("2020q1", block_key)
    template_bbox = get_bbox(template, "EPSG:4326")
    bounds = template_bbox.bounds
    raking_shapes = pm_data.load_raking_shapes(hierarchy, bounds=bounds)
    raking_shapes = raking_shapes[raking_shapes.intersects(template_bbox)].to_crs(
        template.crs
    )

    # Get some bounds to subset the climate rasters with.
    lon_min, lat_min, lon_max, lat_max = bounds
    buffer = 0.1  # Degrees, this is the resolution of the climate raster
    lon_min, lon_max = max(-180, lon_min - buffer), min(180, lon_max + buffer)
    lat_min, lat_max = max(-90, lat_min - buffer), min(90, lat_max + buffer)
    climate_slice = {
        "longitude": slice(lon_min, lon_max),
        "latitude": slice(lat_max, lat_min),
    }

    shape_values = [
        (shape, loc_id)
        for loc_id, shape in raking_shapes.set_index("location_id")
        .geometry.to_dict()
        .items()
    ]
    bounds_map = build_bounds_map(template, shape_values)

    location_mask = np.zeros_like(template, dtype=np.uint32)
    location_mask = rasterize(
        shape_values,
        out=location_mask,
        transform=template.transform,
        merge_alg=MergeAlg.replace,
    )
    final_bounds_map = {
        location_id: (rows, cols, location_mask[rows, cols] == location_id)
        for location_id, (rows, cols) in bounds_map.items()
    }
    return climate_slice, final_bounds_map, location_mask

get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon

Get the bounding box of a raster array.

Parameters

raster The raster array to get the bounding box of. crs The CRS to return the bounding box in. If None, the bounding box is returned in the CRS of the raster.

Returns

shapely.Polybon The bounding box of the raster in the CRS specified by the crs parameter.

Source code in src/climate_data/aggregate/utils.py
def get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon:
    """Get the bounding box of a raster array.

    Parameters
    ----------
    raster
        The raster array to get the bounding box of.
    crs
        The CRS to return the bounding box in. If None, the bounding box
        is returned in the CRS of the raster.

    Returns
    -------
    shapely.Polybon
        The bounding box of the raster in the CRS specified by the crs parameter.
    """
    if raster.crs not in MAX_BOUNDS:
        msg = f"Unsupported CRS: {raster.crs}"
        raise ValueError(msg)

    xmin_clip, xmax_clip, ymin_clip, ymax_clip = MAX_BOUNDS[raster.crs]
    xmin, xmax, ymin, ymax = raster.bounds

    xmin = np.clip(xmin, xmin_clip, xmax_clip)
    xmax = np.clip(xmax, xmin_clip, xmax_clip)
    ymin = np.clip(ymin, ymin_clip, ymax_clip)
    ymax = np.clip(ymax, ymin_clip, ymax_clip)

    bbox = gpd.GeoSeries([shapely.box(xmin, ymin, xmax, ymax)], crs=raster.crs)
    out_bbox = bbox.to_crs(crs) if crs is not None else bbox.copy()

    # Check that our transformation didn't do something weird
    # (e.g. artificially clip the bounds or have the bounds extend over the
    # antimeridian)
    check_bbox = out_bbox.to_crs(raster.crs)
    area_change = (np.abs(bbox.area - check_bbox.area) / bbox.area).iloc[0]
    tolerance = 1e-6
    if area_change > tolerance:
        msg = f"Area change: {area_change}"
        raise ValueError(msg)

    return cast(shapely.Polygon, out_bbox.iloc[0])

cli

cdrun() -> None

Entry point for running climate downscale workflows.

Source code in src/climate_data/cli.py
6
7
8
@click.group()
def cdrun() -> None:
    """Entry point for running climate downscale workflows."""

cdtask() -> None

Entry point for running climate downscale tasks.

Source code in src/climate_data/cli.py
@click.group()
def cdtask() -> None:
    """Entry point for running climate downscale tasks."""

cli_options

Climate Data CLI Options

This module provides a set of CLI options for extracting climate data from the ERA5 and CMIP6 datasets. These options are used to specify the data to extract, such as the year, month, variable, and dataset. It also provides global variables representing the full space of valid values for these options.

with_agg_measure(*, allow_all: bool = False) -> Callable[[Callable[P, T]], Callable[P, T]]

Add aggregation measure option to a command.

Source code in src/climate_data/cli_options.py
def with_agg_measure[**P, T](
    *,
    allow_all: bool = False,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Add aggregation measure option to a command."""
    return with_choice(
        "agg-measure",
        allow_all=allow_all,
        choices=cdc.AGGREGATION_MEASURES,
        help="Climate measure to process.",
    )

with_agg_scenario(*, allow_all: bool = False) -> Callable[[Callable[P, T]], Callable[P, T]]

Add aggregation scenario option to a command.

Source code in src/climate_data/cli_options.py
def with_agg_scenario[**P, T](
    *,
    allow_all: bool = False,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Add aggregation scenario option to a command."""
    return with_choice(
        "agg-scenario",
        allow_all=allow_all,
        choices=cdc.AGGREGATION_SCENARIOS,
        help="Climate scenario to process.",
    )

with_agg_version() -> Callable[[Callable[P, T]], Callable[P, T]]

Add aggregation version option to a command.

Source code in src/climate_data/cli_options.py
def with_agg_version[**P, T]() -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Add aggregation version option to a command."""
    return click.option(
        "--agg-version",
        help="Aggregation version to process.",
        required=True,
    )

with_block_key(*, allow_all: bool = False) -> Callable[[Callable[P, T]], Callable[P, T]]

Add block key option to a command.

Source code in src/climate_data/cli_options.py
def with_block_key[**P, T](
    *,
    allow_all: bool = False,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Add block key option to a command."""
    return with_choice(
        "block-key",
        allow_all=allow_all,
        choices=None,  # Will be populated at runtime
        help="Block key to process.",
    )

with_hierarchy(choices: Sequence[str] = cdc.HIERARCHY_MAP, *, allow_all: bool = False) -> Callable[[Callable[P, T]], Callable[P, T]]

Add hierarchy option to a command.

Source code in src/climate_data/cli_options.py
def with_hierarchy[**P, T](
    choices: Sequence[str] = cdc.HIERARCHY_MAP,
    *,
    allow_all: bool = False,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Add hierarchy option to a command."""
    return with_choice(
        "hierarchy",
        allow_all=allow_all,
        choices=choices,
        help="Hierarchy to process.",
        convert=allow_all,
    )

with_location_id() -> Callable[[Callable[P, T]], Callable[P, T]]

Add location ID option to a command.

Source code in src/climate_data/cli_options.py
def with_location_id[**P, T]() -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Add location ID option to a command."""
    return click.option(
        "--location-id",
        "-l",
        type=click.INT,
        help="Location ID to process.",
    )

with_year(years: Collection[str], *, allow_all: bool = False) -> Callable[[Callable[P, T]], Callable[P, T]]

Create a CLI option for selecting a year.

Source code in src/climate_data/cli_options.py
def with_year[**P, T](
    years: Collection[str],
    *,
    allow_all: bool = False,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Create a CLI option for selecting a year."""
    return with_choice(
        "year",
        "y",
        allow_all=allow_all,
        choices=years,
        help="Year to extract data for.",
        convert=allow_all,
    )

data

Climate Data Management

This module provides a class for managing the climate data used in the project. It includes methods for loading and saving data, as well as for accessing the various directories where data is stored. This abstraction allows for easy access to the data and ensures that all data is stored in a consistent and organized manner. It also provides a central location for managing the data, which makes it easier to update and maintain the path structure of the data as needed.

This module generally does not load or process data itself, though some exceptions are made for metadata which is generally loaded and cached on disk.

The main classes are: - PopulationModelData: Handles data from the gridded population modeling pipeline. This includes population estimates and projections as well as the location hierarchies for the population data. This class provides read-only access to the data. - ClimateData: Handles gridded climate data from the climate downscaling pipeline. This includes climate data for different scenarios and measures. This class provides read and write access to the data. - ClimateAggregateData: Handles the output data structure for climate aggregates. This includes raw results at the block level, final results at the measure level, and versioned results for different pipeline versions. This class provides both read and write access to the data.

ClimateAggregateData

Manages the output data structure for climate aggregates.

This class manages the file organization and paths for: 1. Reading and writing raw results at block level 2. Reading and writing final results at measure and scenario level 3. Versioning of results

Source code in src/climate_data/data.py
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
class ClimateAggregateData:
    """Manages the output data structure for climate aggregates.

    This class manages the file organization and paths for:
    1. Reading and writing raw results at block level
    2. Reading and writing final results at measure and scenario level
    3. Versioning of results
    """

    def __init__(
        self,
        root: str | Path = cdc.AGGREGATE_ROOT,
    ) -> None:
        """Initialize the climate aggregate data manager.

        Parameters
        ----------
        root
            Path to the model root directory
        """
        self._root = Path(root)
        self._create_model_root()

    def _create_model_root(self) -> None:
        """Create the model root directory and logs directory."""
        mkdir(self.root, exist_ok=True)
        mkdir(self.logs, exist_ok=True)

    @property
    def root(self) -> Path:
        """Get the root directory for model data."""
        return self._root

    @property
    def logs(self) -> Path:
        """Get the directory for log files."""
        return self.root / "logs"

    def log_dir(self, step_name: str) -> Path:
        """Get the directory for logs from a specific pipeline step.

        Parameters
        ----------
        step_name
            The name of the pipeline step

        Returns
        -------
        Path
            The directory for step-specific logs
        """
        return self.logs / step_name

    def version_root(self, version: str) -> Path:
        """Get the root directory for a specific version.

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The directory for version-specific data
        """
        return self.root / version

    def raw_results_root(self, version: str) -> Path:
        """Get the directory for raw results (block-level).

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The directory for raw results
        """
        return self.version_root(version) / "raw-results"

    def raw_results_path(
        self, version: str, hierarchy: str, block_key: str, draw: str
    ) -> Path:
        """Get the path to raw results for a specific hierarchy, block, and draw.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        block_key
            The block key
        draw
            The draw of the climate data (e.g. "000")

        Returns
        -------
        Path
            The path to the raw results file
        """
        root = self.raw_results_root(version)
        return root / hierarchy / block_key / f"{draw}.parquet"

    def save_raw_results(
        self,
        df: pd.DataFrame,
        version: str,
        hierarchy: str,
        block_key: str,
        draw: str,
    ) -> None:
        """Save raw results for a specific hierarchy, block, and draw.

        Parameters
        ----------
        df
            The results to save
        version
            The version identifier
        hierarchy
            The location hierarchy
        block_key
            The block key
        draw
            The draw of the climate data to save (e.g. "000")
        """
        path = self.raw_results_path(version, hierarchy, block_key, draw)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_parquet(df, path)

    def load_raw_results(
        self,
        version: str,
        hierarchy: str,
        block_key: str,
        draw: str,
        measure: str | None = None,
        scenario: str | None = None,
    ) -> pd.DataFrame:
        """Load raw results for a specific hierarchy, block, and draw.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        block_key
            The block key
        draw
            The draw of the climate data to load (e.g. "000")
        measure
            If provided, filter results to only include this measure
        scenario
            If provided, filter results to only include this scenario

        Returns
        -------
        pd.DataFrame
            The raw results
        """
        path = self.raw_results_path(version, hierarchy, block_key, draw)

        # Build filters for parquet's read_parquet function
        filters = []
        if measure is not None:
            filters.append(("measure", "==", measure))
        if scenario is not None:
            filters.append(("scenario", "==", scenario))

        return pd.read_parquet(path, filters=filters)

    def results_root(self, version: str) -> Path:
        """Get the directory for final results (measure-level).

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The directory for final results
        """
        return self.version_root(version) / "results"

    def population_path(self, version: str, hierarchy: str) -> Path:
        """Get the path to population data for a specific hierarchy.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy

        Returns
        -------
        Path
            The path to the population data file
        """
        return self.results_root(version) / hierarchy / "population.parquet"

    def save_population(self, df: pd.DataFrame, version: str, hierarchy: str) -> None:
        """Save population data for a specific hierarchy.

        Parameters
        ----------
        df
            The population data to save
        version
            The version identifier
        hierarchy
            The location hierarchy
        """
        path = self.population_path(version, hierarchy)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_parquet(df, path)

    def load_population(
        self, version: str, hierarchy: str, location_id: int | None = None
    ) -> pd.DataFrame:
        """Load population data for a specific hierarchy and optionally location.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        location_id
            If provided, load only data for this location

        Returns
        -------
        pd.DataFrame
            The population data
        """
        path = self.population_path(version, hierarchy)
        if location_id is not None:
            filters = [("location_id", "==", location_id)]
            return pd.read_parquet(path, filters=filters)
        return pd.read_parquet(path)

    def results_path(
        self, version: str, hierarchy: str, scenario: str, measure: str
    ) -> Path:
        """Get the path to final results for a specific scenario and measure.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        scenario
            The climate scenario
        measure
            The climate measure

        Returns
        -------
        Path
            The path to the results file
        """
        return self.results_root(version) / hierarchy / f"{measure}_{scenario}.parquet"

    def save_results(
        self,
        df: pd.DataFrame,
        version: str,
        hierarchy: str,
        scenario: str,
        measure: str,
    ) -> None:
        """Save final results for a specific scenario and measure.

        Parameters
        ----------
        df
            The results to save
        version
            The version identifier
        hierarchy
            The location hierarchy
        scenario
            The climate scenario
        measure
            The climate measure
        """
        path = self.results_path(version, hierarchy, scenario, measure)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_parquet(df, path)

    def load_results(
        self,
        version: str,
        hierarchy: str,
        scenario: str,
        measure: str,
        location_id: int | None = None,
    ) -> pd.DataFrame:
        """Load final results for a specific scenario and measure.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        scenario
            The climate scenario
        measure
            The climate measure
        location_id
            If provided, load only data for this location

        Returns
        -------
        pd.DataFrame
            The results
        """
        path = self.results_path(version, hierarchy, scenario, measure)
        if location_id is not None:
            filters = [("location_id", "==", location_id)]
            return pd.read_parquet(path, filters=filters)
        return pd.read_parquet(path)

    def diagnostics_root(self, version: str, hierarchy: str) -> Path:
        """Get the path to the diagnostics directory.

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The path to the diagnostics directory
        """
        return self.version_root(version) / "diagnostics" / hierarchy

    def grid_plots_pages_root(self, version: str, hierarchy: str) -> Path:
        return self.diagnostics_root(version, hierarchy) / "grid_plots_pages"

    def grid_plots_page_path(
        self, version: str, hierarchy: str, location_id: int
    ) -> Path:
        path = self.grid_plots_pages_root(version, hierarchy) / f"{location_id}.pdf"
        mkdir(path.parent, exist_ok=True, parents=True)
        return path

    def grid_plots_path(self, version: str, hierarchy: str) -> Path:
        path = self.diagnostics_root(version, hierarchy) / f"grid_plots_{hierarchy}.pdf"
        mkdir(path.parent, exist_ok=True, parents=True)
        return path

logs: Path property

Get the directory for log files.

root: Path property

Get the root directory for model data.

__init__(root: str | Path = cdc.AGGREGATE_ROOT) -> None

Initialize the climate aggregate data manager.

Parameters

root Path to the model root directory

Source code in src/climate_data/data.py
def __init__(
    self,
    root: str | Path = cdc.AGGREGATE_ROOT,
) -> None:
    """Initialize the climate aggregate data manager.

    Parameters
    ----------
    root
        Path to the model root directory
    """
    self._root = Path(root)
    self._create_model_root()

_create_model_root() -> None

Create the model root directory and logs directory.

Source code in src/climate_data/data.py
def _create_model_root(self) -> None:
    """Create the model root directory and logs directory."""
    mkdir(self.root, exist_ok=True)
    mkdir(self.logs, exist_ok=True)

diagnostics_root(version: str, hierarchy: str) -> Path

Get the path to the diagnostics directory.

Parameters

version The version identifier

Returns

Path The path to the diagnostics directory

Source code in src/climate_data/data.py
def diagnostics_root(self, version: str, hierarchy: str) -> Path:
    """Get the path to the diagnostics directory.

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The path to the diagnostics directory
    """
    return self.version_root(version) / "diagnostics" / hierarchy

load_population(version: str, hierarchy: str, location_id: int | None = None) -> pd.DataFrame

Load population data for a specific hierarchy and optionally location.

Parameters

version The version identifier hierarchy The location hierarchy location_id If provided, load only data for this location

Returns

pd.DataFrame The population data

Source code in src/climate_data/data.py
def load_population(
    self, version: str, hierarchy: str, location_id: int | None = None
) -> pd.DataFrame:
    """Load population data for a specific hierarchy and optionally location.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    location_id
        If provided, load only data for this location

    Returns
    -------
    pd.DataFrame
        The population data
    """
    path = self.population_path(version, hierarchy)
    if location_id is not None:
        filters = [("location_id", "==", location_id)]
        return pd.read_parquet(path, filters=filters)
    return pd.read_parquet(path)

load_raw_results(version: str, hierarchy: str, block_key: str, draw: str, measure: str | None = None, scenario: str | None = None) -> pd.DataFrame

Load raw results for a specific hierarchy, block, and draw.

Parameters

version The version identifier hierarchy The location hierarchy block_key The block key draw The draw of the climate data to load (e.g. "000") measure If provided, filter results to only include this measure scenario If provided, filter results to only include this scenario

Returns

pd.DataFrame The raw results

Source code in src/climate_data/data.py
def load_raw_results(
    self,
    version: str,
    hierarchy: str,
    block_key: str,
    draw: str,
    measure: str | None = None,
    scenario: str | None = None,
) -> pd.DataFrame:
    """Load raw results for a specific hierarchy, block, and draw.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    block_key
        The block key
    draw
        The draw of the climate data to load (e.g. "000")
    measure
        If provided, filter results to only include this measure
    scenario
        If provided, filter results to only include this scenario

    Returns
    -------
    pd.DataFrame
        The raw results
    """
    path = self.raw_results_path(version, hierarchy, block_key, draw)

    # Build filters for parquet's read_parquet function
    filters = []
    if measure is not None:
        filters.append(("measure", "==", measure))
    if scenario is not None:
        filters.append(("scenario", "==", scenario))

    return pd.read_parquet(path, filters=filters)

load_results(version: str, hierarchy: str, scenario: str, measure: str, location_id: int | None = None) -> pd.DataFrame

Load final results for a specific scenario and measure.

Parameters

version The version identifier hierarchy The location hierarchy scenario The climate scenario measure The climate measure location_id If provided, load only data for this location

Returns

pd.DataFrame The results

Source code in src/climate_data/data.py
def load_results(
    self,
    version: str,
    hierarchy: str,
    scenario: str,
    measure: str,
    location_id: int | None = None,
) -> pd.DataFrame:
    """Load final results for a specific scenario and measure.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    scenario
        The climate scenario
    measure
        The climate measure
    location_id
        If provided, load only data for this location

    Returns
    -------
    pd.DataFrame
        The results
    """
    path = self.results_path(version, hierarchy, scenario, measure)
    if location_id is not None:
        filters = [("location_id", "==", location_id)]
        return pd.read_parquet(path, filters=filters)
    return pd.read_parquet(path)

log_dir(step_name: str) -> Path

Get the directory for logs from a specific pipeline step.

Parameters

step_name The name of the pipeline step

Returns

Path The directory for step-specific logs

Source code in src/climate_data/data.py
def log_dir(self, step_name: str) -> Path:
    """Get the directory for logs from a specific pipeline step.

    Parameters
    ----------
    step_name
        The name of the pipeline step

    Returns
    -------
    Path
        The directory for step-specific logs
    """
    return self.logs / step_name

population_path(version: str, hierarchy: str) -> Path

Get the path to population data for a specific hierarchy.

Parameters

version The version identifier hierarchy The location hierarchy

Returns

Path The path to the population data file

Source code in src/climate_data/data.py
def population_path(self, version: str, hierarchy: str) -> Path:
    """Get the path to population data for a specific hierarchy.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy

    Returns
    -------
    Path
        The path to the population data file
    """
    return self.results_root(version) / hierarchy / "population.parquet"

raw_results_path(version: str, hierarchy: str, block_key: str, draw: str) -> Path

Get the path to raw results for a specific hierarchy, block, and draw.

Parameters

version The version identifier hierarchy The location hierarchy block_key The block key draw The draw of the climate data (e.g. "000")

Returns

Path The path to the raw results file

Source code in src/climate_data/data.py
def raw_results_path(
    self, version: str, hierarchy: str, block_key: str, draw: str
) -> Path:
    """Get the path to raw results for a specific hierarchy, block, and draw.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    block_key
        The block key
    draw
        The draw of the climate data (e.g. "000")

    Returns
    -------
    Path
        The path to the raw results file
    """
    root = self.raw_results_root(version)
    return root / hierarchy / block_key / f"{draw}.parquet"

raw_results_root(version: str) -> Path

Get the directory for raw results (block-level).

Parameters

version The version identifier

Returns

Path The directory for raw results

Source code in src/climate_data/data.py
def raw_results_root(self, version: str) -> Path:
    """Get the directory for raw results (block-level).

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The directory for raw results
    """
    return self.version_root(version) / "raw-results"

results_path(version: str, hierarchy: str, scenario: str, measure: str) -> Path

Get the path to final results for a specific scenario and measure.

Parameters

version The version identifier hierarchy The location hierarchy scenario The climate scenario measure The climate measure

Returns

Path The path to the results file

Source code in src/climate_data/data.py
def results_path(
    self, version: str, hierarchy: str, scenario: str, measure: str
) -> Path:
    """Get the path to final results for a specific scenario and measure.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    scenario
        The climate scenario
    measure
        The climate measure

    Returns
    -------
    Path
        The path to the results file
    """
    return self.results_root(version) / hierarchy / f"{measure}_{scenario}.parquet"

results_root(version: str) -> Path

Get the directory for final results (measure-level).

Parameters

version The version identifier

Returns

Path The directory for final results

Source code in src/climate_data/data.py
def results_root(self, version: str) -> Path:
    """Get the directory for final results (measure-level).

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The directory for final results
    """
    return self.version_root(version) / "results"

save_population(df: pd.DataFrame, version: str, hierarchy: str) -> None

Save population data for a specific hierarchy.

Parameters

df The population data to save version The version identifier hierarchy The location hierarchy

Source code in src/climate_data/data.py
def save_population(self, df: pd.DataFrame, version: str, hierarchy: str) -> None:
    """Save population data for a specific hierarchy.

    Parameters
    ----------
    df
        The population data to save
    version
        The version identifier
    hierarchy
        The location hierarchy
    """
    path = self.population_path(version, hierarchy)
    mkdir(path.parent, exist_ok=True, parents=True)
    save_parquet(df, path)

save_raw_results(df: pd.DataFrame, version: str, hierarchy: str, block_key: str, draw: str) -> None

Save raw results for a specific hierarchy, block, and draw.

Parameters

df The results to save version The version identifier hierarchy The location hierarchy block_key The block key draw The draw of the climate data to save (e.g. "000")

Source code in src/climate_data/data.py
def save_raw_results(
    self,
    df: pd.DataFrame,
    version: str,
    hierarchy: str,
    block_key: str,
    draw: str,
) -> None:
    """Save raw results for a specific hierarchy, block, and draw.

    Parameters
    ----------
    df
        The results to save
    version
        The version identifier
    hierarchy
        The location hierarchy
    block_key
        The block key
    draw
        The draw of the climate data to save (e.g. "000")
    """
    path = self.raw_results_path(version, hierarchy, block_key, draw)
    mkdir(path.parent, exist_ok=True, parents=True)
    save_parquet(df, path)

save_results(df: pd.DataFrame, version: str, hierarchy: str, scenario: str, measure: str) -> None

Save final results for a specific scenario and measure.

Parameters

df The results to save version The version identifier hierarchy The location hierarchy scenario The climate scenario measure The climate measure

Source code in src/climate_data/data.py
def save_results(
    self,
    df: pd.DataFrame,
    version: str,
    hierarchy: str,
    scenario: str,
    measure: str,
) -> None:
    """Save final results for a specific scenario and measure.

    Parameters
    ----------
    df
        The results to save
    version
        The version identifier
    hierarchy
        The location hierarchy
    scenario
        The climate scenario
    measure
        The climate measure
    """
    path = self.results_path(version, hierarchy, scenario, measure)
    mkdir(path.parent, exist_ok=True, parents=True)
    save_parquet(df, path)

version_root(version: str) -> Path

Get the root directory for a specific version.

Parameters

version The version identifier

Returns

Path The directory for version-specific data

Source code in src/climate_data/data.py
def version_root(self, version: str) -> Path:
    """Get the root directory for a specific version.

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The directory for version-specific data
    """
    return self.root / version

ClimateData

Class for managing the climate data used in the project.

Source code in src/climate_data/data.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
class ClimateData:
    """Class for managing the climate data used in the project."""

    def __init__(
        self,
        root: str | Path = cdc.MODEL_ROOT,
        *,
        read_only: bool = False,
    ) -> None:
        self._root = Path(root)
        self._credentials_root = self._root / "credentials"
        self._read_only = read_only
        if not read_only:
            self._create_model_root()

    def _create_model_root(self) -> None:
        mkdir(self.root, exist_ok=True)
        mkdir(self.credentials_root, exist_ok=True)

        mkdir(self.extracted_data, exist_ok=True)
        mkdir(self.extracted_era5, exist_ok=True)
        mkdir(self.extracted_cmip6, exist_ok=True)
        mkdir(self.ncei_climate_stations, exist_ok=True)
        mkdir(self.open_topography_elevation, exist_ok=True)
        mkdir(self.rub_local_climate_zones, exist_ok=True)

        mkdir(self.downscale_model, exist_ok=True)
        mkdir(self.predictors, exist_ok=True)
        mkdir(self.training_data, exist_ok=True)

        mkdir(self.results, exist_ok=True)
        mkdir(self.results_metadata, exist_ok=True)
        mkdir(self.daily_results, exist_ok=True)
        mkdir(self.raw_daily_results, exist_ok=True)
        mkdir(self.annual_results, exist_ok=True)
        mkdir(self.raw_annual_results, exist_ok=True)

    @property
    def root(self) -> Path:
        return self._root

    @property
    def credentials_root(self) -> Path:
        return self._credentials_root

    ##################
    # Extracted data #
    ##################

    @property
    def extracted_data(self) -> Path:
        return self.root / "extracted_data"

    @property
    def extracted_era5(self) -> Path:
        return self.extracted_data / "era5"

    def extracted_era5_path(
        self, dataset: str, variable: str, year: int | str, month: str
    ) -> Path:
        return self.extracted_era5 / f"{dataset}_{variable}_{year}_{month}.nc"

    @property
    def extracted_cmip6(self) -> Path:
        return self.extracted_data / "cmip6"

    def load_koppen_geiger_model_inclusion(
        self, *, return_full_criteria: bool = False
    ) -> pd.DataFrame:
        meta_path = self.extracted_cmip6 / "koppen_geiger_model_inclusion.parquet"

        if not meta_path.exists():
            df = pd.read_html(
                "https://www.nature.com/articles/s41597-023-02549-6/tables/3"
            )[0]
            df.columns = [  # type: ignore[assignment]
                "source_id",
                "member_count",
                "mean_trend",
                "std_dev_trend",
                "transient_climate_response",
                "equilibrium_climate_sensitivity",
                "included_raw",
            ]
            df["included"] = df["included_raw"].apply({"Yes": True, "No": False}.get)
            save_parquet(df, meta_path)

        df = pd.read_parquet(meta_path)
        if return_full_criteria:
            return df
        return df[["source_id", "included"]]

    def load_cmip6_metadata(self) -> pd.DataFrame:
        meta_path = self.extracted_cmip6 / "cmip6-metadata.parquet"

        if not meta_path.exists():
            external_path = "https://storage.googleapis.com/cmip6/cmip6-zarr-consolidated-stores.csv"
            meta = pd.read_csv(external_path)
            save_parquet(meta, meta_path)

        return pd.read_parquet(meta_path)

    def extracted_cmip6_path(
        self,
        variable: str,
        experiment: str,
        gcm_member: str,
    ) -> Path:
        return self.extracted_cmip6 / f"{variable}_{experiment}_{gcm_member}.nc"

    def get_gcms(
        self,
        source_variables: Collection[str],
    ) -> list[str]:
        inclusion_meta = self.load_scenario_inclusion_metadata()[source_variables]
        inclusion_meta = inclusion_meta[inclusion_meta.all(axis=1)]
        return [
            f"{model}_{variant}" for model, variant in inclusion_meta.index.tolist()
        ]

    @property
    def ncei_climate_stations(self) -> Path:
        return self.extracted_data / "ncei_climate_stations"

    def save_ncei_climate_stations(self, df: pd.DataFrame, year: int | str) -> None:
        if self._read_only:
            msg = "Cannot save NCEI climate stations to read-only data"
            raise ValueError(msg)
        path = self.ncei_climate_stations / f"{year}.parquet"
        save_parquet(df, path)

    def load_ncei_climate_stations(self, year: int | str) -> pd.DataFrame:
        return pd.read_parquet(self.ncei_climate_stations / f"{year}.parquet")

    @property
    def open_topography_elevation(self) -> Path:
        return self.extracted_data / "open_topography_elevation"

    @property
    def rub_local_climate_zones(self) -> Path:
        return self.extracted_data / "rub_local_climate_zones"

    ###################
    # Downscale model #
    ###################

    @property
    def downscale_model(self) -> Path:
        return self.root / "downscale_model"

    @property
    def predictors(self) -> Path:
        return self.downscale_model / "predictors"

    def save_predictor(
        self,
        predictor: rt.RasterArray,
        name: str,
        lat_start: int,
        lon_start: int,
    ) -> None:
        if self._read_only:
            msg = "Cannot save predictors to read-only data"
            raise ValueError(msg)
        path = self.predictors / f"{name}_{lat_start}_{lon_start}.tif"
        save_raster(predictor, path)

    def load_predictor(self, name: str) -> rt.RasterArray:
        paths = list(self.predictors.glob(f"{name}_*.tif"))
        return rt.load_mf_raster(paths)

    @property
    def training_data(self) -> Path:
        return self.downscale_model / "training_data"

    def save_training_data(self, df: pd.DataFrame, year: int | str) -> None:
        if self._read_only:
            msg = "Cannot save training data to read-only data"
            raise ValueError(msg)
        path = self.training_data / f"{year}.parquet"
        save_parquet(df, path)

    def load_training_data(self, year: int | str) -> pd.DataFrame:
        return pd.read_parquet(self.training_data / f"{year}.parquet")

    ###########
    # Results #
    ###########

    @property
    def results(self) -> Path:
        return self.root / "results"

    @property
    def results_metadata(self) -> Path:
        return self.results / "metadata"

    def save_scenario_metadata(self, df: pd.DataFrame) -> None:
        if self._read_only:
            msg = "Cannot save scenario metadata to read-only data"
            raise ValueError(msg)
        path = self.results_metadata / "scenario_metadata.parquet"
        save_parquet(df, path)

    def load_scenario_metadata(self) -> pd.DataFrame:
        path = self.results_metadata / "scenario_metadata.parquet"
        return pd.read_parquet(path)

    def save_scenario_inclusion_metadata(self, df: pd.DataFrame) -> None:
        if self._read_only:
            msg = "Cannot save scenario inclusion metadata to read-only data"
            raise ValueError(msg)
        # Need to save to our scripts directory for doc building
        scripts_root = Path(__file__).parent.parent.parent / "scripts"
        for root_dir in [self.results_metadata, scripts_root]:
            path = root_dir / "scenario_inclusion_metadata.parquet"
            save_parquet(df, path)

    def load_scenario_inclusion_metadata(self) -> pd.DataFrame:
        path = self.results_metadata / "scenario_inclusion_metadata.parquet"
        return pd.read_parquet(path)

    @property
    def daily_results(self) -> Path:
        return self.results / "daily"

    @property
    def raw_daily_results(self) -> Path:
        # return self.daily_results / "raw"
        return cdc.AGGREGATE_ROOT / "erf-scratch"

    def raw_daily_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> Path:
        return self.raw_daily_results / scenario / variable / f"{year}_{gcm_member}.nc"

    def save_raw_daily_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save raw daily results to read-only data"
            raise ValueError(msg)
        path = self.raw_daily_results_path(scenario, variable, year, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def load_raw_daily_results(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> xr.Dataset:
        path = self.raw_daily_results_path(scenario, variable, year, gcm_member)
        return xr.open_dataset(path)

    def daily_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
    ) -> Path:
        return self.daily_results / scenario / variable / f"{year}.nc"

    def save_daily_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save daily results to read-only data"
            raise ValueError(msg)
        path = self.daily_results_path(scenario, variable, year)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def load_daily_results(
        self,
        scenario: str,
        variable: str,
        year: int | str,
    ) -> xr.Dataset:
        results_path = self.daily_results_path(scenario, variable, year)
        return xr.open_dataset(results_path)

    @property
    def annual_results(self) -> Path:
        return self.results / "annual"

    @property
    def raw_annual_results(self) -> Path:
        return self.annual_results / "raw"

    def raw_annual_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> Path:
        return self.raw_annual_results / scenario / variable / f"{year}_{gcm_member}.nc"

    def save_raw_annual_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save raw annual results to read-only data"
            raise ValueError(msg)
        path = self.raw_annual_results_path(scenario, variable, year, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    @property
    def compiled_annual_results(self) -> Path:
        return self.raw_annual_results / "compiled"

    def compiled_annual_results_path(
        self,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> Path:
        return self.compiled_annual_results / scenario / variable / f"{gcm_member}.nc"

    def list_gcm_members(self, scenario: str, variable: str) -> list[str]:
        return [
            p.stem
            for p in self.compiled_annual_results_path(
                scenario, variable, ""
            ).parent.glob("*.nc")
        ]

    def save_compiled_annual_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save compiled annual results to read-only data"
            raise ValueError(msg)
        path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def load_compiled_annual_results(
        self,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> xr.Dataset:
        path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        return xr.open_dataset(path)

    def annual_results_path(
        self,
        scenario: str,
        variable: str,
        draw: int | str,
    ) -> Path:
        return self.annual_results / scenario / variable / f"{draw:0>3}.nc"

    def link_annual_draw(
        self,
        draw: int | str,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> None:
        if self._read_only:
            msg = "Cannot link annual draw to read-only data"
            raise ValueError(msg)
        source_path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        dest_path = self.annual_results_path(scenario, variable, draw)
        mkdir(dest_path.parent, exist_ok=True, parents=True)
        if dest_path.exists():
            dest_path.unlink()
        dest_path.symlink_to(source_path)

    def draw_results_path(self, scenario: str, measure: str, draw: str) -> Path:
        """Get the path to annual results for a specific scenario, measure, and draw.

        Parameters
        ----------
        scenario
            The climate scenario (e.g. "ssp126")
        measure
            The climate measure (e.g. "mean_temperature")
        draw
            The draw of the climate data to load (e.g. "000")

        Returns
        -------
        Path
            The path to the results file
        """
        return self.annual_results / scenario / measure / f"{draw}.nc"

    def load_draw_results(self, scenario: str, measure: str, draw: str) -> xr.Dataset:
        """Load annual climate results for a specific scenario, measure, and draw.

        Parameters
        ----------
        scenario
            The climate scenario (e.g. "ssp126")
        measure
            The climate measure (e.g. "mean_temperature")
        draw
            The draw of the climate data to load (e.g. "000")

        Returns
        -------
        xr.Dataset
            The climate data in xarray format
        """
        path = self.annual_results_path(scenario, measure, draw)
        ds = xr.open_dataset(path, decode_coords="all")
        ds = ds.rio.write_crs("EPSG:4326")
        return ds

draw_results_path(scenario: str, measure: str, draw: str) -> Path

Get the path to annual results for a specific scenario, measure, and draw.

Parameters

scenario The climate scenario (e.g. "ssp126") measure The climate measure (e.g. "mean_temperature") draw The draw of the climate data to load (e.g. "000")

Returns

Path The path to the results file

Source code in src/climate_data/data.py
def draw_results_path(self, scenario: str, measure: str, draw: str) -> Path:
    """Get the path to annual results for a specific scenario, measure, and draw.

    Parameters
    ----------
    scenario
        The climate scenario (e.g. "ssp126")
    measure
        The climate measure (e.g. "mean_temperature")
    draw
        The draw of the climate data to load (e.g. "000")

    Returns
    -------
    Path
        The path to the results file
    """
    return self.annual_results / scenario / measure / f"{draw}.nc"

load_draw_results(scenario: str, measure: str, draw: str) -> xr.Dataset

Load annual climate results for a specific scenario, measure, and draw.

Parameters

scenario The climate scenario (e.g. "ssp126") measure The climate measure (e.g. "mean_temperature") draw The draw of the climate data to load (e.g. "000")

Returns

xr.Dataset The climate data in xarray format

Source code in src/climate_data/data.py
def load_draw_results(self, scenario: str, measure: str, draw: str) -> xr.Dataset:
    """Load annual climate results for a specific scenario, measure, and draw.

    Parameters
    ----------
    scenario
        The climate scenario (e.g. "ssp126")
    measure
        The climate measure (e.g. "mean_temperature")
    draw
        The draw of the climate data to load (e.g. "000")

    Returns
    -------
    xr.Dataset
        The climate data in xarray format
    """
    path = self.annual_results_path(scenario, measure, draw)
    ds = xr.open_dataset(path, decode_coords="all")
    ds = ds.rio.write_crs("EPSG:4326")
    return ds

PopulationModelData

Handles population data and location hierarchies.

This class manages: 1. Population projections at different time points 2. Location hierarchies (GBD, LSAE, etc.) 3. Spatial data for aggregation

The population data is used as weights when aggregating climate data to different location hierarchies.

Source code in src/climate_data/data.py
class PopulationModelData:
    """Handles population data and location hierarchies.

    This class manages:
    1. Population projections at different time points
    2. Location hierarchies (GBD, LSAE, etc.)
    3. Spatial data for aggregation

    The population data is used as weights when aggregating climate data
    to different location hierarchies.
    """

    def __init__(
        self,
        root: str | Path = cdc.POPULATION_MODEL_ROOT,
    ) -> None:
        """Initialize the population model data manager.

        Parameters
        ----------
        root : str | Path
            Path to the population model root directory
        """
        self._root = Path(root)

    @property
    def root(self) -> Path:
        """Get the root directory for population model data."""
        return self._root

    @property
    def results(self) -> Path:
        """Get the directory containing current model results."""
        return Path(self.root, "results") / "current"

    @property
    def model_spec_path(self) -> Path:
        """Get the path to the model specification file."""
        return self.results / "specification.yaml"

    def load_model_spec(self) -> dict[str, Any]:
        """Load the model specification file.

        Returns
        -------
        dict
            The model specification containing paths and parameters
        """
        return cast(dict[str, Any], yaml.safe_load(self.model_spec_path.read_text()))

    def load_modeling_frame(self) -> gpd.GeoDataFrame:
        """Load the modeling frame containing spatial information.

        The modeling frame is a subdivision of the world into equal-area blocks.
        Each block is assigned a unique key that is used to parallelize
        pipeline steps in both population modeling and in this pipeline's
        aggregation step.

        Returns
        -------
        gpd.GeoDataFrame
            The modeling frame with spatial information and block keys
        """
        model_spec = self.load_model_spec()
        raw_root = Path(model_spec["output_root"])
        model_frame_path = raw_root.parent.parent / "modeling_frame.parquet"
        return gpd.read_parquet(model_frame_path)

    def load_results(self, time_point: str, block_key: str) -> rt.RasterArray:
        """Load population results for a specific time point and block.

        Parameters
        ----------
        time_point
            The time point to load (e.g. "2020q1")
        block_key
            The block key to load (e.g. "B-0021X-0003Y")

        Returns
        -------
        rt.RasterArray
            The population raster data
        """
        model_spec = self.load_model_spec()
        raw_root = Path(model_spec["output_root"])
        path = raw_root / "raked_predictions" / time_point / f"{block_key}.tif"
        return rt.load_raster(path)

    @property
    def raking_data(self) -> Path:
        """Get the directory containing data used to rake the population estimates.

        Raking enforces admin-level consistency between gridded population data
        and GBD/FHS population estimates. We'll use these same hierarchies to
        aggregate the climate data.

        """
        return self.root / "admin-inputs" / "raking"

    def load_raking_shapes(
        self, full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]
    ) -> gpd.GeoDataFrame:
        """Load shapes for a full aggregation hierarchy within given bounds.

        Parameters
        ----------
        full_aggregation_hierarchy
            The full aggregation hierarchy to load (e.g. "gbd_2021")
        bounds
            The bounds to load (xmin, ymin, xmax, ymax)

        Returns
        -------
        gpd.GeoDataFrame
            The shapes for the given hierarchy and bounds
        """
        if full_aggregation_hierarchy == "gbd_2021":
            shape_path = (
                self.raking_data / f"shapes_{full_aggregation_hierarchy}.parquet"
            )
            gdf = gpd.read_parquet(shape_path, bbox=bounds)

            # We're using population data here instead of a hierarchy because
            # The populations include extra locations we've supplemented that aren't
            # modeled in GBD (e.g. locations with zero population or places that
            # GBD uses population scalars from WPP to model)
            pop_path = (
                self.raking_data / f"population_{full_aggregation_hierarchy}.parquet"
            )
            pop = pd.read_parquet(pop_path)

            keep_cols = ["location_id", "location_name", "most_detailed", "parent_id"]
            keep_mask = (
                (pop.year_id == pop.year_id.max())  # Year doesn't matter
                & (pop.most_detailed == 1)
            )
            out = gdf.merge(pop.loc[keep_mask, keep_cols], on="location_id", how="left")
        elif full_aggregation_hierarchy in ["lsae_1209", "lsae_1285"]:
            # This is only a2 geoms, so already most detailed
            shape_path = (
                self.raking_data
                / "gbd-inputs"
                / f"shapes_{full_aggregation_hierarchy}_a2.parquet"
            )
            out = gpd.read_parquet(shape_path, bbox=bounds)
        else:
            msg = f"Unknown pixel hierarchy: {full_aggregation_hierarchy}"
            raise ValueError(msg)
        return out

    def load_raking_populations(self, hierarchy: str) -> pd.DataFrame:
        path = self.raking_data / f"population_{hierarchy}.parquet"
        return pd.read_parquet(path)

    def load_lsae_mapping_shapes(self, admin_level: int) -> gpd.GeoDataFrame:
        """Load the LSAE mapping shapes for a given admin level.

        Parameters
        ----------
        admin_level
            The admin level to load (0, 1, or 2)

        Returns
        -------
        gpd.GeoDataFrame
            The LSAE mapping shapes for the given admin level
        """
        assert admin_level in [0, 1, 2]
        path = f"/home/j/WORK/11_geospatial/admin_shapefiles/current/lbd_standard_admin_{admin_level}_simplified.shp"
        gdf = (
            gpd.read_file(path)
            .rename(columns={"loc_id": "location_id"})
            .loc[:, ["location_id", "geometry"]]
        )
        return gdf

    def load_subset_hierarchy(self, subset_hierarchy: str) -> pd.DataFrame:
        """Load a subset location hierarchy.

        The subset hierarchy might be equal to the full aggregation hierarchy,
        but it might also be a subset of the full aggregation hierarchy.
        These hierarchies are used to provide different views of aggregated
        climate data.

        Parameters
        ----------
        subset_hierarchy
            The administrative hierarchy to load (e.g. "gbd_2021")

        Returns
        -------
        pd.DataFrame
            The hierarchy data with parent-child relationships
        """
        allowed_hierarchies = ["gbd_2021", "fhs_2021", "lsae_1209", "lsae_1285"]
        if subset_hierarchy not in allowed_hierarchies:
            msg = f"Unknown admin hierarchy: {subset_hierarchy}"
            raise ValueError(msg)
        path = self.raking_data / "gbd-inputs" / f"hierarchy_{subset_hierarchy}.parquet"
        hierarchy_df = pd.read_parquet(path)
        if subset_hierarchy == "gbd_2021":
            to_drop_parents = [
                ## FROM POPULATION MODEL RAKING DATA PREP
                # Drop UK UTLAs from these regions
                4618,
                4919,
                4620,
                4621,
                4622,
                4623,
                4624,
                4625,
                4626,
                # Drop the India urban/rural splits from these states
                4841,
                4842,
                4843,
                4844,
                4846,
                4849,
                4850,
                4851,
                4852,
                4853,
                4854,
                4855,
                4856,
                4857,
                4859,
                4860,
                4861,
                4862,
                4863,
                4864,
                4865,
                4867,
                4868,
                4869,
                4870,
                4871,
                4872,
                4873,
                4874,
                4875,
                44538,
                # Drop the Maori/non-Maori split from New Zealand
                72,
            ]
            hierarchy_df = hierarchy_df.loc[
                ~hierarchy_df["parent_id"].isin(to_drop_parents)
            ]
            hierarchy_df.loc[
                hierarchy_df["location_id"].isin(to_drop_parents), "most_detailed"
            ] = 1

        return hierarchy_df

model_spec_path: Path property

Get the path to the model specification file.

raking_data: Path property

Get the directory containing data used to rake the population estimates.

Raking enforces admin-level consistency between gridded population data and GBD/FHS population estimates. We'll use these same hierarchies to aggregate the climate data.

results: Path property

Get the directory containing current model results.

root: Path property

Get the root directory for population model data.

__init__(root: str | Path = cdc.POPULATION_MODEL_ROOT) -> None

Initialize the population model data manager.

Parameters

root : str | Path Path to the population model root directory

Source code in src/climate_data/data.py
def __init__(
    self,
    root: str | Path = cdc.POPULATION_MODEL_ROOT,
) -> None:
    """Initialize the population model data manager.

    Parameters
    ----------
    root : str | Path
        Path to the population model root directory
    """
    self._root = Path(root)

load_lsae_mapping_shapes(admin_level: int) -> gpd.GeoDataFrame

Load the LSAE mapping shapes for a given admin level.

Parameters

admin_level The admin level to load (0, 1, or 2)

Returns

gpd.GeoDataFrame The LSAE mapping shapes for the given admin level

Source code in src/climate_data/data.py
def load_lsae_mapping_shapes(self, admin_level: int) -> gpd.GeoDataFrame:
    """Load the LSAE mapping shapes for a given admin level.

    Parameters
    ----------
    admin_level
        The admin level to load (0, 1, or 2)

    Returns
    -------
    gpd.GeoDataFrame
        The LSAE mapping shapes for the given admin level
    """
    assert admin_level in [0, 1, 2]
    path = f"/home/j/WORK/11_geospatial/admin_shapefiles/current/lbd_standard_admin_{admin_level}_simplified.shp"
    gdf = (
        gpd.read_file(path)
        .rename(columns={"loc_id": "location_id"})
        .loc[:, ["location_id", "geometry"]]
    )
    return gdf

load_model_spec() -> dict[str, Any]

Load the model specification file.

Returns

dict The model specification containing paths and parameters

Source code in src/climate_data/data.py
def load_model_spec(self) -> dict[str, Any]:
    """Load the model specification file.

    Returns
    -------
    dict
        The model specification containing paths and parameters
    """
    return cast(dict[str, Any], yaml.safe_load(self.model_spec_path.read_text()))

load_modeling_frame() -> gpd.GeoDataFrame

Load the modeling frame containing spatial information.

The modeling frame is a subdivision of the world into equal-area blocks. Each block is assigned a unique key that is used to parallelize pipeline steps in both population modeling and in this pipeline's aggregation step.

Returns

gpd.GeoDataFrame The modeling frame with spatial information and block keys

Source code in src/climate_data/data.py
def load_modeling_frame(self) -> gpd.GeoDataFrame:
    """Load the modeling frame containing spatial information.

    The modeling frame is a subdivision of the world into equal-area blocks.
    Each block is assigned a unique key that is used to parallelize
    pipeline steps in both population modeling and in this pipeline's
    aggregation step.

    Returns
    -------
    gpd.GeoDataFrame
        The modeling frame with spatial information and block keys
    """
    model_spec = self.load_model_spec()
    raw_root = Path(model_spec["output_root"])
    model_frame_path = raw_root.parent.parent / "modeling_frame.parquet"
    return gpd.read_parquet(model_frame_path)

load_raking_shapes(full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]) -> gpd.GeoDataFrame

Load shapes for a full aggregation hierarchy within given bounds.

Parameters

full_aggregation_hierarchy The full aggregation hierarchy to load (e.g. "gbd_2021") bounds The bounds to load (xmin, ymin, xmax, ymax)

Returns

gpd.GeoDataFrame The shapes for the given hierarchy and bounds

Source code in src/climate_data/data.py
def load_raking_shapes(
    self, full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]
) -> gpd.GeoDataFrame:
    """Load shapes for a full aggregation hierarchy within given bounds.

    Parameters
    ----------
    full_aggregation_hierarchy
        The full aggregation hierarchy to load (e.g. "gbd_2021")
    bounds
        The bounds to load (xmin, ymin, xmax, ymax)

    Returns
    -------
    gpd.GeoDataFrame
        The shapes for the given hierarchy and bounds
    """
    if full_aggregation_hierarchy == "gbd_2021":
        shape_path = (
            self.raking_data / f"shapes_{full_aggregation_hierarchy}.parquet"
        )
        gdf = gpd.read_parquet(shape_path, bbox=bounds)

        # We're using population data here instead of a hierarchy because
        # The populations include extra locations we've supplemented that aren't
        # modeled in GBD (e.g. locations with zero population or places that
        # GBD uses population scalars from WPP to model)
        pop_path = (
            self.raking_data / f"population_{full_aggregation_hierarchy}.parquet"
        )
        pop = pd.read_parquet(pop_path)

        keep_cols = ["location_id", "location_name", "most_detailed", "parent_id"]
        keep_mask = (
            (pop.year_id == pop.year_id.max())  # Year doesn't matter
            & (pop.most_detailed == 1)
        )
        out = gdf.merge(pop.loc[keep_mask, keep_cols], on="location_id", how="left")
    elif full_aggregation_hierarchy in ["lsae_1209", "lsae_1285"]:
        # This is only a2 geoms, so already most detailed
        shape_path = (
            self.raking_data
            / "gbd-inputs"
            / f"shapes_{full_aggregation_hierarchy}_a2.parquet"
        )
        out = gpd.read_parquet(shape_path, bbox=bounds)
    else:
        msg = f"Unknown pixel hierarchy: {full_aggregation_hierarchy}"
        raise ValueError(msg)
    return out

load_results(time_point: str, block_key: str) -> rt.RasterArray

Load population results for a specific time point and block.

Parameters

time_point The time point to load (e.g. "2020q1") block_key The block key to load (e.g. "B-0021X-0003Y")

Returns

rt.RasterArray The population raster data

Source code in src/climate_data/data.py
def load_results(self, time_point: str, block_key: str) -> rt.RasterArray:
    """Load population results for a specific time point and block.

    Parameters
    ----------
    time_point
        The time point to load (e.g. "2020q1")
    block_key
        The block key to load (e.g. "B-0021X-0003Y")

    Returns
    -------
    rt.RasterArray
        The population raster data
    """
    model_spec = self.load_model_spec()
    raw_root = Path(model_spec["output_root"])
    path = raw_root / "raked_predictions" / time_point / f"{block_key}.tif"
    return rt.load_raster(path)

load_subset_hierarchy(subset_hierarchy: str) -> pd.DataFrame

Load a subset location hierarchy.

The subset hierarchy might be equal to the full aggregation hierarchy, but it might also be a subset of the full aggregation hierarchy. These hierarchies are used to provide different views of aggregated climate data.

Parameters

subset_hierarchy The administrative hierarchy to load (e.g. "gbd_2021")

Returns

pd.DataFrame The hierarchy data with parent-child relationships

Source code in src/climate_data/data.py
def load_subset_hierarchy(self, subset_hierarchy: str) -> pd.DataFrame:
    """Load a subset location hierarchy.

    The subset hierarchy might be equal to the full aggregation hierarchy,
    but it might also be a subset of the full aggregation hierarchy.
    These hierarchies are used to provide different views of aggregated
    climate data.

    Parameters
    ----------
    subset_hierarchy
        The administrative hierarchy to load (e.g. "gbd_2021")

    Returns
    -------
    pd.DataFrame
        The hierarchy data with parent-child relationships
    """
    allowed_hierarchies = ["gbd_2021", "fhs_2021", "lsae_1209", "lsae_1285"]
    if subset_hierarchy not in allowed_hierarchies:
        msg = f"Unknown admin hierarchy: {subset_hierarchy}"
        raise ValueError(msg)
    path = self.raking_data / "gbd-inputs" / f"hierarchy_{subset_hierarchy}.parquet"
    hierarchy_df = pd.read_parquet(path)
    if subset_hierarchy == "gbd_2021":
        to_drop_parents = [
            ## FROM POPULATION MODEL RAKING DATA PREP
            # Drop UK UTLAs from these regions
            4618,
            4919,
            4620,
            4621,
            4622,
            4623,
            4624,
            4625,
            4626,
            # Drop the India urban/rural splits from these states
            4841,
            4842,
            4843,
            4844,
            4846,
            4849,
            4850,
            4851,
            4852,
            4853,
            4854,
            4855,
            4856,
            4857,
            4859,
            4860,
            4861,
            4862,
            4863,
            4864,
            4865,
            4867,
            4868,
            4869,
            4870,
            4871,
            4872,
            4873,
            4874,
            4875,
            44538,
            # Drop the Maori/non-Maori split from New Zealand
            72,
        ]
        hierarchy_df = hierarchy_df.loc[
            ~hierarchy_df["parent_id"].isin(to_drop_parents)
        ]
        hierarchy_df.loc[
            hierarchy_df["location_id"].isin(to_drop_parents), "most_detailed"
        ] = 1

    return hierarchy_df

save_parquet(df: pd.DataFrame, output_path: str | Path) -> None

Save a pandas DataFrame to a file with standard parameters.

Parameters

df The DataFrame to save. output_path The path to save the DataFrame to.

Source code in src/climate_data/data.py
def save_parquet(
    df: pd.DataFrame,
    output_path: str | Path,
) -> None:
    """Save a pandas DataFrame to a file with standard parameters.

    Parameters
    ----------
    df
        The DataFrame to save.
    output_path
        The path to save the DataFrame to.
    """
    touch(output_path, clobber=True)
    df.to_parquet(output_path)

save_raster(raster: rt.RasterArray, output_path: str | Path, num_cores: int = 1, **kwargs: Any) -> None

Save a raster to a file with standard parameters.

Parameters

raster The raster to save. output_path The path to save the raster to. num_cores The number of cores to use for compression.

Source code in src/climate_data/data.py
def save_raster(
    raster: rt.RasterArray,
    output_path: str | Path,
    num_cores: int = 1,
    **kwargs: Any,
) -> None:
    """Save a raster to a file with standard parameters.

    Parameters
    ----------
    raster
        The raster to save.
    output_path
        The path to save the raster to.
    num_cores
        The number of cores to use for compression.
    """
    save_params = {
        "tiled": True,
        "blockxsize": 512,
        "blockysize": 512,
        "compress": "ZSTD",
        "predictor": 2,  # horizontal differencing
        "num_threads": num_cores,
        "bigtiff": "yes",
        **kwargs,
    }
    touch(output_path, clobber=True)
    raster.to_file(output_path, **save_params)

save_raster_to_cog(raster: rt.RasterArray, output_path: str | Path, num_cores: int = 1, resampling: str = 'nearest') -> None

Save a raster to a COG file.

A COG file is a cloud-optimized GeoTIFF that is optimized for use in cloud storage systems. This function saves the raster to a COG file with the specified resampling method.

Parameters

raster The raster to save. output_path The path to save the raster to. num_cores The number of cores to use for compression. resampling The resampling method to use when building the overviews.

Source code in src/climate_data/data.py
def save_raster_to_cog(
    raster: rt.RasterArray,
    output_path: str | Path,
    num_cores: int = 1,
    resampling: str = "nearest",
) -> None:
    """Save a raster to a COG file.

    A COG file is a cloud-optimized GeoTIFF that is optimized for use in cloud storage
    systems. This function saves the raster to a COG file with the specified resampling
    method.

    Parameters
    ----------
    raster
        The raster to save.
    output_path
        The path to save the raster to.
    num_cores
        The number of cores to use for compression.
    resampling
        The resampling method to use when building the overviews.
    """
    cog_save_params = {
        "driver": "COG",
        "overview_resampling": resampling,
    }
    save_raster(raster, output_path, num_cores, **cog_save_params)

save_xarray(ds: xr.Dataset, output_path: str | Path, encoding_kwargs: dict[str, Any]) -> None

Save an xarray dataset to a file with standard parameters.

Parameters

ds The dataset to save. output_path The path to save the dataset to. encoding_kwargs The encoding parameters to use when saving the dataset.

Source code in src/climate_data/data.py
def save_xarray(
    ds: xr.Dataset,
    output_path: str | Path,
    encoding_kwargs: dict[str, Any],
) -> None:
    """Save an xarray dataset to a file with standard parameters.

    Parameters
    ----------
    ds
        The dataset to save.
    output_path
        The path to save the dataset to.
    encoding_kwargs
        The encoding parameters to use when saving the dataset.
    """
    touch(output_path, clobber=True)
    encoding = {
        "dtype": "int16",
        "_FillValue": -32767,
        "zlib": True,
        "complevel": 1,
    }
    encoding.update(encoding_kwargs)
    ds.to_netcdf(output_path, encoding={"value": encoding})

diagnostics

utils

get_locations_depth_first(hierarchy: pd.DataFrame) -> list[int]

Return location ids sorted by a depth first search of the hierarchy.

Locations at the same level are sorted alphabetically by name.

Source code in src/climate_data/diagnostics/utils.py
def get_locations_depth_first(hierarchy: pd.DataFrame) -> list[int]:
    """Return location ids sorted by a depth first search of the hierarchy.

    Locations at the same level are sorted alphabetically by name.
    """

    def _get_locations(location: pd.Series):
        locs = [location.location_id]

        children = hierarchy[
            (hierarchy.parent_id == location.location_id)
            & (hierarchy.location_id != location.location_id)
        ]
        for child in children.sort_values("location_ascii_name").itertuples():
            locs.extend(_get_locations(child))
        return locs

    top_locs = hierarchy[hierarchy.location_id == hierarchy.parent_id]
    locations = []
    for top_loc in top_locs.sort_values("location_ascii_name").itertuples():
        locations.extend(_get_locations(top_loc))

    return locations

extract

Climate Data Extraction

This module contains pipelines for extracting climate data from various sources.

cmip6

CMIP6 Data Extraction

extract_cmip6(cmip6_source: list[str], cmip6_experiment: list[str], cmip6_variable: list[str], output_dir: str, queue: str, overwrite: bool) -> None

Extract CMIP6 data.

Extracts CMIP6 data for the given source, experiment, and variable. We use the the table at https://www.nature.com/articles/s41597-023-02549-6/tables/3 to determine which CMIP6 source_ids to include. See ClimateData.load_koppen_geiger_model_inclusion to load and examine this table. The extraction criteria does not completely capture model inclusion criteria as it does not account for the year range avaialable in the data. This determiniation is made when we proccess the data in later steps.

Source code in src/climate_data/extract/cmip6.py
@click.command()
@clio.with_cmip6_source(allow_all=True)
@clio.with_cmip6_experiment(allow_all=True)
@clio.with_cmip6_variable(allow_all=True)
@clio.with_output_directory(cdc.MODEL_ROOT)
@clio.with_queue()
@clio.with_overwrite()
def extract_cmip6(
    cmip6_source: list[str],
    cmip6_experiment: list[str],
    cmip6_variable: list[str],
    output_dir: str,
    queue: str,
    overwrite: bool,
) -> None:
    """Extract CMIP6 data.

    Extracts CMIP6 data for the given source, experiment, and variable. We use the
    the table at https://www.nature.com/articles/s41597-023-02549-6/tables/3 to determine
    which CMIP6 source_ids to include. See `ClimateData.load_koppen_geiger_model_inclusion`
    to load and examine this table. The extraction criteria does not completely
    capture model inclusion criteria as it does not account for the year range avaialable
    in the data. This determiniation is made when we proccess the data in later steps.
    """
    overwrite_arg = {"overwrite": None} if overwrite else {}

    jobmon.run_parallel(
        runner="cdtask",
        task_name="extract cmip6",
        node_args={
            "cmip6-source": cmip6_source,
            "cmip6-experiment": cmip6_experiment,
            "cmip6-variable": cmip6_variable,
        },
        task_args={
            "output-dir": output_dir,
            **overwrite_arg,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "10G",
            "runtime": "3000m",
            "project": "proj_rapidresponse",
        },
        max_attempts=1,
        concurrency_limit=50,
    )

load_cmip_data(zarr_path: str) -> xr.Dataset

Loads a CMIP6 dataset from a zarr path.

Source code in src/climate_data/extract/cmip6.py
def load_cmip_data(zarr_path: str) -> xr.Dataset:
    """Loads a CMIP6 dataset from a zarr path."""
    gcs = gcsfs.GCSFileSystem(token="anon")  # noqa: S106
    mapper = gcs.get_mapper(zarr_path)
    ds = xr.open_zarr(mapper, consolidated=True)
    ds = ds.drop_vars(
        ["lat_bnds", "lon_bnds", "time_bnds", "height", "time_bounds", "bnds"],
        errors="ignore",
    )
    return ds  # type: ignore[no-any-return]

elevation

extract_elevation(model_name: str, output_dir: str, queue: str) -> None

Download elevation data from Open Topography.

Source code in src/climate_data/extract/elevation.py
@click.command()
@click.option(
    "--generate-name",
    required=True,
    type=click.Choice(ELEVATION_MODELS),
    help="Name of the elevation model to download.",
)
@clio.with_output_directory(cdc.MODEL_ROOT)
@clio.with_queue()
def extract_elevation(
    model_name: str,
    output_dir: str,
    queue: str,
) -> None:
    """Download elevation data from Open Topography."""
    invalid = True
    if invalid:
        msg = "Downloaded using aws cli, this implementation is not valid"
        raise NotImplementedError(msg)

    lat_starts = list(range(-90, 90, FETCH_SIZE))
    lon_starts = list(range(-180, 180, FETCH_SIZE))

    jobmon.run_parallel(
        runner="cdtask",
        task_name="extract elevation",
        node_args={
            "model-name": [model_name],
            "lat-start": lat_starts,
            "lon-start": lon_starts,
        },
        task_args={
            "output-dir": output_dir,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "10G",
            "runtime": "240m",
            "project": "proj_rapidresponse",
        },
    )

extract_elevation_task(model_name: str, lat_start: int, lon_start: int, output_dir: str) -> None

Download elevation data from Open Topography.

Source code in src/climate_data/extract/elevation.py
@click.command()
@click.option(
    "--model-name",
    required=True,
    type=click.Choice(ELEVATION_MODELS),
    help="Name of the elevation model to download.",
)
@click.option(
    "--lat-start",
    required=True,
    type=int,
    help="Latitude of the top-left corner of the tile.",
)
@click.option(
    "--lon-start",
    required=True,
    type=int,
    help="Longitude of the top-left corner of the tile.",
)
@clio.with_output_directory(cdc.MODEL_ROOT)
def extract_elevation_task(
    model_name: str,
    lat_start: int,
    lon_start: int,
    output_dir: str,
) -> None:
    """Download elevation data from Open Topography."""
    invalid = True
    if invalid:
        msg = "Downloaded using aws cli, this implementation is not valid"
        raise NotImplementedError(msg)

    extract_elevation_main(model_name, lat_start, lon_start, output_dir)

era5

ERA5 Data Extraction

generate

utils

buck_vapor_pressure(temperature_c: xr.Dataset) -> xr.Dataset

Approximate vapor pressure of water.

https://en.wikipedia.org/wiki/Arden_Buck_equation https://journals.ametsoc.org/view/journals/apme/20/12/1520-0450_1981_020_1527_nefcvp_2_0_co_2.xml

Parameters

temperature_c Temperature in Celsius

Returns

xr.Dataset Vapor pressure in hPa

Source code in src/climate_data/generate/utils.py
def buck_vapor_pressure(temperature_c: xr.Dataset) -> xr.Dataset:
    """Approximate vapor pressure of water.

    https://en.wikipedia.org/wiki/Arden_Buck_equation
    https://journals.ametsoc.org/view/journals/apme/20/12/1520-0450_1981_020_1527_nefcvp_2_0_co_2.xml

    Parameters
    ----------
    temperature_c
        Temperature in Celsius

    Returns
    -------
    xr.Dataset
        Vapor pressure in hPa
    """
    over_water = 6.1121 * np.exp(
        (18.678 - temperature_c / 234.5) * (temperature_c / (257.14 + temperature_c))
    )
    over_ice = 6.1115 * np.exp(
        (23.036 - temperature_c / 333.7) * (temperature_c / (279.82 + temperature_c))
    )
    vp = xr.where(temperature_c > 0, over_water, over_ice)  # type: ignore[no-untyped-call]
    return vp  # type: ignore[no-any-return]

identity(ds: xr.Dataset) -> xr.Dataset

Identity transformation

Source code in src/climate_data/generate/utils.py
def identity(ds: xr.Dataset) -> xr.Dataset:
    """Identity transformation"""
    return ds

interpolate_to_target_latlon(ds: xr.Dataset, method: str = 'nearest', target_lon: xr.DataArray = cdc.TARGET_LONGITUDE, target_lat: xr.DataArray = cdc.TARGET_LATITUDE) -> xr.Dataset

Interpolate a dataset to a target latitude and longitude grid.

Parameters

ds Dataset to interpolate method Interpolation method target_lon Target longitude grid target_lat Target latitude grid

Returns

xr.Dataset Interpolated dataset

Source code in src/climate_data/generate/utils.py
def interpolate_to_target_latlon(
    ds: xr.Dataset,
    method: str = "nearest",
    target_lon: xr.DataArray = cdc.TARGET_LONGITUDE,
    target_lat: xr.DataArray = cdc.TARGET_LATITUDE,
) -> xr.Dataset:
    """Interpolate a dataset to a target latitude and longitude grid.

    Parameters
    ----------
    ds
        Dataset to interpolate
    method
        Interpolation method
    target_lon
        Target longitude grid
    target_lat
        Target latitude grid

    Returns
    -------
    xr.Dataset
        Interpolated dataset
    """
    return (
        ds.interp(longitude=target_lon, latitude=target_lat, method=method)  # type: ignore[arg-type]
        .interpolate_na(dim="longitude", method="nearest", fill_value="extrapolate")
        .sortby("latitude")
        .interpolate_na(dim="latitude", method="nearest", fill_value="extrapolate")
        .sortby("latitude", ascending=False)
    )

kelvin_to_celsius(temperature_k: xr.Dataset) -> xr.Dataset

Convert temperature from Kelvin to Celsius

Parameters

temperature_k Temperature in Kelvin

Returns

xr.Dataset Temperature in Celsius

Source code in src/climate_data/generate/utils.py
def kelvin_to_celsius(temperature_k: xr.Dataset) -> xr.Dataset:
    """Convert temperature from Kelvin to Celsius

    Parameters
    ----------
    temperature_k
        Temperature in Kelvin

    Returns
    -------
    xr.Dataset
        Temperature in Celsius
    """
    return temperature_k - 273.15

meter_to_millimeter(rainfall_m: xr.Dataset) -> xr.Dataset

Convert rainfall from meters to millimeters

Parameters

rainfall_m Rainfall in meters

Returns

xr.Dataset Rainfall in millimeters

Source code in src/climate_data/generate/utils.py
def meter_to_millimeter(rainfall_m: xr.Dataset) -> xr.Dataset:
    """Convert rainfall from meters to millimeters

    Parameters
    ----------
    rainfall_m
        Rainfall in meters

    Returns
    -------
    xr.Dataset
        Rainfall in millimeters
    """
    return 1000 * rainfall_m

precipitation_flux_to_rainfall(precipitation_flux: xr.Dataset) -> xr.Dataset

Convert precipitation flux to rainfall

Parameters

precipitation_flux Precipitation flux in kg m-2 s-1

Returns

xr.Dataset Rainfall in mm/day

Source code in src/climate_data/generate/utils.py
def precipitation_flux_to_rainfall(precipitation_flux: xr.Dataset) -> xr.Dataset:
    """Convert precipitation flux to rainfall

    Parameters
    ----------
    precipitation_flux
        Precipitation flux in kg m-2 s-1

    Returns
    -------
    xr.Dataset
        Rainfall in mm/day
    """
    seconds_per_day = 86400
    mm_per_kg_m2 = 1
    return seconds_per_day * mm_per_kg_m2 * precipitation_flux

rh_percent(temperature_c: xr.Dataset, dewpoint_temperature_c: xr.Dataset) -> xr.Dataset

Calculate relative humidity from temperature and dewpoint temperature.

Parameters

temperature_c Temperature in Celsius dewpoint_temperature_c Dewpoint temperature in Celsius

Returns

xr.Dataset Relative humidity as a percentage

Source code in src/climate_data/generate/utils.py
def rh_percent(
    temperature_c: xr.Dataset, dewpoint_temperature_c: xr.Dataset
) -> xr.Dataset:
    """Calculate relative humidity from temperature and dewpoint temperature.

    Parameters
    ----------
    temperature_c
        Temperature in Celsius
    dewpoint_temperature_c
        Dewpoint temperature in Celsius

    Returns
    -------
    xr.Dataset
        Relative humidity as a percentage
    """
    # saturation vapour pressure
    svp = buck_vapor_pressure(temperature_c)
    # actual vapour pressure
    vp = buck_vapor_pressure(dewpoint_temperature_c)
    return 100 * vp / svp

scale_wind_speed_height(wind_speed_10m: xr.Dataset) -> xr.Dataset

Scaling wind speed from a height of 10 meters to a height of 2 meters

Reference: Bröde et al. (2012) https://doi.org/10.1007/s00484-011-0454-1

Parameters

wind_speed_10m The 10m wind speed [m/s]. May be signed (ie a velocity component)

Returns

xr.DataSet The 2m wind speed [m/s]. May be signed (ie a velocity component)

Source code in src/climate_data/generate/utils.py
def scale_wind_speed_height(wind_speed_10m: xr.Dataset) -> xr.Dataset:
    """Scaling wind speed from a height of 10 meters to a height of 2 meters

    Reference: Bröde et al. (2012)
    https://doi.org/10.1007/s00484-011-0454-1

    Parameters
    ----------
    wind_speed_10m
        The 10m wind speed [m/s]. May be signed (ie a velocity component)

    Returns
    -------
    xr.DataSet
        The 2m wind speed [m/s]. May be signed (ie a velocity component)
    """
    scale_factor = np.log10(2 / 0.01) / np.log10(10 / 0.01)
    return scale_factor * wind_speed_10m  # type: ignore[no-any-return]

vector_magnitude(x: xr.Dataset, y: xr.Dataset) -> xr.Dataset

Calculate the magnitude of a vector.

Source code in src/climate_data/generate/utils.py
def vector_magnitude(x: xr.Dataset, y: xr.Dataset) -> xr.Dataset:
    """Calculate the magnitude of a vector."""
    return np.sqrt(x**2 + y**2)  # type: ignore[return-value]

special

temperature_zone

generate_temperature_zone_main(gcm_member: str, cmip6_experiment: str, output_dir: str | Path) -> None

Generate the temperature zone for a given scenario and gcm member.

Parameters

gcm_member The gcm member to generate the temperature zone for. cmip6_experiment The cmip6 experiment to generate the temperature zone for. output_dir The directory to save the temperature zone to.

Source code in src/climate_data/special/temperature_zone.py
def generate_temperature_zone_main(
    gcm_member: str,
    cmip6_experiment: str,
    output_dir: str | Path,
) -> None:
    """Generate the temperature zone for a given scenario and gcm member.

    Parameters
    ----------
    gcm_member
        The gcm member to generate the temperature zone for.
    cmip6_experiment
        The cmip6 experiment to generate the temperature zone for.
    output_dir
        The directory to save the temperature zone to.
    """
    print(f"Generating temperature zone for {cmip6_experiment} {gcm_member}")
    cdata = ClimateData(output_dir)
    ds = cdata.load_compiled_annual_results(
        cmip6_experiment, "mean_temperature", gcm_member
    )
    temperature_zone = ds.rolling(year=10).mean().sel(year=slice(1990, 2100))
    print(f"Saving temperature zone for {cmip6_experiment} {gcm_member}")
    cdata.save_compiled_annual_results(
        temperature_zone,
        scenario=cmip6_experiment,
        variable="temperature_zone",
        gcm_member=gcm_member,
        encoding_kwargs={"scale_factor": 0.01, "add_offset": 0.0},
    )

utils

_to_idx(arr, bins)

Convert an array of values to an array of indices into a set of bins.T

Parameters

arr A N-dimensional array of values to convert. bins A 1-dimensional array of bins. Bins must be sorted, increasing, and not contain duplicates. Every value in arr will be converted to an index into bins, with values outside of bins being clipped to the nearest bin edge. Bins are 0-indexed and left-inclusive, so idx 0 contains values from -infinity to bins[1], idx 1 contains values from bins[1] to bins[2], etc.

Returns:

Type Description

The array of indices into the bins.

Source code in src/climate_data/special/utils.py
def _to_idx(arr, bins):
    """Convert an array of values to an array of indices into a set of bins.T

    Parameters
    ----------
    arr
       A N-dimensional array of values to convert.
    bins
       A 1-dimensional array of bins. Bins must be sorted, increasing,
       and not contain duplicates. Every value in `arr` will be converted
       to an index into `bins`, with values outside of `bins` being clipped
       to the nearest bin edge. Bins are 0-indexed and left-inclusive,
       so idx 0 contains values from -infinity to bins[1], idx 1 contains
       values from bins[1] to bins[2], etc.

    Returns:
        The array of indices into the bins.
    """
    return np.clip(np.digitize(arr, bins), 1, len(bins)) - 1

aggregate_to_hierarchy(data: pd.DataFrame, hierarchy: pd.DataFrame) -> pd.DataFrame

Create all aggregate climate values for a given hierarchy from most-detailed data.

Parameters

data The most-detailed climate data to aggregate. hierarchy The hierarchy to aggregate the data to.

Returns

pd.DataFrame The climate data with values for all levels of the hierarchy.

Source code in src/climate_data/special/utils.py
def aggregate_to_hierarchy(data: pd.DataFrame, hierarchy: pd.DataFrame) -> pd.DataFrame:
    """Create all aggregate climate values for a given hierarchy from most-detailed data.

    Parameters
    ----------
    data
        The most-detailed climate data to aggregate.
    hierarchy
        The hierarchy to aggregate the data to.

    Returns
    -------
    pd.DataFrame
        The climate data with values for all levels of the hierarchy.
    """
    agg_cols = sorted(
        set(data.columns) - {"location_id", "year_id", "temperature_zone"}
    )

    results = data.set_index("location_id")

    # Most detailed locations can be at multiple levels of the hierarchy,
    # so we loop over all levels from most detailed to global, aggregating
    # level by level and appending the results to the data.

    for level in reversed(list(range(1, hierarchy.level.max() + 1))):
        level_mask = hierarchy.level == level
        parent_map = hierarchy.loc[level_mask].set_index("location_id").parent_id

        subset = results.loc[results.index.intersection(parent_map.index)]
        subset["parent_id"] = parent_map

        parent_values = (
            subset.groupby(["year_id", "parent_id", "temperature_zone"])[agg_cols]
            .sum()
            .reset_index()
            .rename(columns={"parent_id": "location_id"})
            .set_index("location_id")
        )
        results = pd.concat([results, parent_values])
    results = (
        results.reset_index()
        .sort_values(["location_id", "year_id", "temperature_zone"])
        .reset_index(drop=True)
    )

    return results

utils

Climate Data Utilities

Utility functions for working with climate data.

make_raster_template(x_min: int | float, y_min: int | float, stride: int | float, resolution: int | float, crs: str = 'EPSG:4326') -> rt.RasterArray

Create a raster template with the specified dimensions and resolution.

A raster template is a RasterArray with a specified extent, resolution, and CRS. The data values are initialized to zero. This function is useful for creating a template to use when resampling another raster to a common grid.

Parameters

x_min The minimum x-coordinate of the raster. y_min The minimum y-coordinate of the raster. stride The length of one side of the raster in the x and y directions measured in the units of the provided coordinate reference system. resolution The resolution of the raster in the units of the provided coordinate reference system. crs The coordinate reference system of the generated raster.

Returns

rt.RasterArray A raster template with the specified dimensions and resolution.

Source code in src/climate_data/utils.py
def make_raster_template(
    x_min: int | float,
    y_min: int | float,
    stride: int | float,
    resolution: int | float,
    crs: str = "EPSG:4326",
) -> rt.RasterArray:
    """Create a raster template with the specified dimensions and resolution.

    A raster template is a RasterArray with a specified extent, resolution, and CRS. The data
    values are initialized to zero. This function is useful for creating a template to use
    when resampling another raster to a common grid.

    Parameters
    ----------
    x_min
        The minimum x-coordinate of the raster.
    y_min
        The minimum y-coordinate of the raster.
    stride
        The length of one side of the raster in the x and y directions measured in the units
        of the provided coordinate reference system.
    resolution
        The resolution of the raster in the units of the provided coordinate reference system.
    crs
        The coordinate reference system of the generated raster.

    Returns
    -------
    rt.RasterArray
        A raster template with the specified dimensions and resolution.
    """
    tolerance = 1e-12
    evenly_divides = (stride % resolution < tolerance) or (
        resolution - stride % resolution < tolerance
    )
    if not evenly_divides:
        msg = "Stride must be a multiple of resolution"
        raise ValueError(msg)

    transform = Affine(
        a=resolution,
        b=0,
        c=x_min,
        d=0,
        e=-resolution,
        f=y_min + stride,
    )

    n_pix = int(stride / resolution)

    data = np.zeros((n_pix, n_pix), dtype=np.int8)
    return rt.RasterArray(
        data,
        transform,
        crs=crs,
        no_data_value=-1,
    )

to_raster(ds: xr.DataArray, no_data_value: float | int, lat_col: str = 'lat', lon_col: str = 'lon', crs: str = 'EPSG:4326') -> rt.RasterArray

Convert an xarray DataArray to a RasterArray.

Parameters

ds The xarray DataArray to convert. no_data_value The value to use for missing data. This should be consistent with the dtype of the data. lat_col The name of the latitude coordinate in the dataset. lon_col The name of the longitude coordinate in the dataset. crs The coordinate reference system of the data.

Returns

rt.RasterArray The RasterArray representation of the input data.

Source code in src/climate_data/utils.py
def to_raster(
    ds: xr.DataArray,
    no_data_value: float | int,
    lat_col: str = "lat",
    lon_col: str = "lon",
    crs: str = "EPSG:4326",
) -> rt.RasterArray:
    """Convert an xarray DataArray to a RasterArray.

    Parameters
    ----------
    ds
        The xarray DataArray to convert.
    no_data_value
        The value to use for missing data. This should be consistent with the dtype of the data.
    lat_col
        The name of the latitude coordinate in the dataset.
    lon_col
        The name of the longitude coordinate in the dataset.
    crs
        The coordinate reference system of the data.

    Returns
    -------
    rt.RasterArray
        The RasterArray representation of the input data.
    """
    lat, lon = ds[lat_col].data, ds[lon_col].data

    dlat = (lat[1:] - lat[:-1]).mean()
    dlon = (lon[1:] - lon[:-1]).mean()

    transform = Affine(
        a=dlon,
        b=0.0,
        c=lon[0],
        d=0.0,
        e=-dlat,
        f=lat[-1],
    )
    return rt.RasterArray(
        data=ds.data[::-1],
        transform=transform,
        crs=crs,
        no_data_value=no_data_value,
    )