Skip to content

aggregate

hierarchy

Runner for the collate stage of the climate aggregates pipeline.

This module provides functions to collate block-level datasets into measure-level datasets. The compilation process: 1. Loads raw results from all blocks for a given hierarchy, measure, and scenario 2. Aggregates the data up the location hierarchy 3. Produces views for subset hierarchies 4. Saves the results as measure-level datasets

hierarchy(agg_version: str, hierarchy: list[str], agg_measure: list[str], agg_scenario: list[str], population_model_dir: str, output_dir: str, queue: str) -> None

Collate block-level datasets into measure-level datasets.

This command: 1. Identifies which combinations of hierarchy, measure, and scenario need compilation 2. Creates parallel jobs to collate each combination 3. Runs the jobs using jobmon

Source code in src/climate_data/aggregate/hierarchy.py
@click.command()
@clio.with_agg_version()
@clio.with_hierarchy(allow_all=True)
@clio.with_agg_measure(allow_all=True)
@clio.with_agg_scenario(allow_all=True)
@clio.with_input_directory("population-model", cdc.POPULATION_MODEL_ROOT)
@clio.with_output_directory(cdc.AGGREGATE_ROOT)
@clio.with_queue()
def hierarchy(
    agg_version: str,
    hierarchy: list[str],
    agg_measure: list[str],
    agg_scenario: list[str],
    population_model_dir: str,
    output_dir: str,
    queue: str,
) -> None:
    """Collate block-level datasets into measure-level datasets.

    This command:
    1. Identifies which combinations of hierarchy, measure, and scenario need compilation
    2. Creates parallel jobs to collate each combination
    3. Runs the jobs using jobmon
    """
    ca_data = ClimateAggregateData(output_dir)

    n_jobs = len(hierarchy) * len(agg_measure) * len(agg_scenario)
    print(f"Running {n_jobs} jobs")

    jobmon.run_parallel(
        runner="cdtask aggregate",
        task_name="hierarchy",
        node_args={
            "hierarchy": hierarchy,
            "agg-measure": agg_measure,
            "agg-scenario": agg_scenario,
        },
        task_args={
            "agg-version": agg_version,
            "population-model-dir": population_model_dir,
            "output-dir": output_dir,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "50G",
            "runtime": "200m",
            "project": "proj_rapidresponse",
        },
        log_root=ca_data.log_dir("aggregate_hierarchy"),
        max_attempts=3,
    )

hierarchy_main(agg_version: str, hierarchy: str, measure: str, scenario: str, population_model_dir: str, output_dir: str, *, progress_bar: bool = False) -> None

Collate block-level datasets into measure/scenario-level datasets.

This function: 1. Loads all block-level datasets for a given hierarchy, measure, and scenario 2. Combines them into a single dataset 3. Aggregates the data up the location hierarchy 4. Produces views for subset hierarchies 5. Saves the results as measure-level datasets

Parameters

agg_version The version identifier hierarchy The full aggregation hierarchy to process measure The climate measure to process scenario The climate scenario to process population_model_dir Path to the population model directory output_dir Path to save results progress_bar Whether to show a progress bar

Source code in src/climate_data/aggregate/hierarchy.py
def hierarchy_main(
    agg_version: str,
    hierarchy: str,
    measure: str,
    scenario: str,
    population_model_dir: str,
    output_dir: str,
    *,
    progress_bar: bool = False,
) -> None:
    """Collate block-level datasets into measure/scenario-level datasets.

    This function:
    1. Loads all block-level datasets for a given hierarchy, measure, and scenario
    2. Combines them into a single dataset
    3. Aggregates the data up the location hierarchy
    4. Produces views for subset hierarchies
    5. Saves the results as measure-level datasets

    Parameters
    ----------
    agg_version
        The version identifier
    hierarchy
        The full aggregation hierarchy to process
    measure
        The climate measure to process
    scenario
        The climate scenario to process
    population_model_dir
        Path to the population model directory
    output_dir
        Path to save results
    progress_bar
        Whether to show a progress bar
    """
    print(f"Compiling {measure} for {scenario} in {hierarchy}")
    ca_data = ClimateAggregateData(output_dir)
    pm_data = PopulationModelData(population_model_dir)
    draws = cdc.DRAWS

    # Load hierarchy data for aggregation
    hierarchy_df = pm_data.load_subset_hierarchy(hierarchy)

    # Get all block keys
    modeling_frame = pm_data.load_modeling_frame()
    block_keys = modeling_frame.block_key.unique()

    # Load and combine all block-level results
    desc_template = "{draw:5} {block_key:15}"
    pbar = tqdm.tqdm(
        total=len(block_keys) * len(draws),
        desc=desc_template.format(draw="DRAW", block_key="BLOCK KEY"),
        disable=not progress_bar,
    )

    all_results = []
    pop_df: pd.DataFrame | None = None

    for draw in draws:
        save_population = (
            measure == "mean_temperature" and scenario == "ssp245" and draw == "000"
        )
        draw_results = []
        for block_key in block_keys:
            pbar.set_description(desc_template.format(draw=draw, block_key=block_key))

            draw_df = ca_data.load_raw_results(
                agg_version,
                hierarchy,
                block_key,
                draw,
                measure=measure,
                scenario=scenario,
            ).drop(columns=["scenario", "measure"])
            draw_results.append(draw_df)

            pbar.update()
        draw_df = (
            pd.concat(draw_results, ignore_index=True)
            .groupby(["location_id", "year_id"])
            .sum()
            .reset_index()
        )

        agg_df = utils.aggregate_climate_to_hierarchy(
            draw_df,
            hierarchy_df,
        ).set_index(["location_id", "year_id"])
        all_results.append(agg_df["value"].rename(draw))

        if save_population:
            pop_df = agg_df["population"]

    pbar.close()

    combined_results = pd.concat(all_results, axis=1)

    # Produce views for subset hierarchies
    subset_hierarchies = cdc.HIERARCHY_MAP[hierarchy]
    for subset_hierarchy in subset_hierarchies:
        # Load the subset hierarchy
        subset_hierarchy_df = pm_data.load_subset_hierarchy(subset_hierarchy)

        # Filter results to only include locations in the subset hierarchy
        subset_location_ids = subset_hierarchy_df["location_id"].tolist()
        subset_results = combined_results.loc[subset_location_ids]

        # Save results for the subset hierarchy
        ca_data.save_results(
            subset_results,
            agg_version,
            subset_hierarchy,
            scenario,
            measure,
        )

        if pop_df is not None:
            subset_pop = pop_df.loc[subset_location_ids].reset_index()
            ca_data.save_population(subset_pop, agg_version, subset_hierarchy)

hierarchy_task(agg_version: str, hierarchy: str, agg_measure: str, agg_scenario: str, population_model_dir: str, output_dir: str, *, progress_bar: bool) -> None

Collate block-level datasets into measure-level datasets.

This command collates results for a specific hierarchy, measure, and scenario.

Source code in src/climate_data/aggregate/hierarchy.py
@click.command()
@clio.with_agg_version()
@clio.with_hierarchy()
@clio.with_agg_measure()
@clio.with_agg_scenario()
@clio.with_input_directory("population-model", cdc.POPULATION_MODEL_ROOT)
@clio.with_output_directory(cdc.AGGREGATE_ROOT)
@clio.with_progress_bar()
def hierarchy_task(
    agg_version: str,
    hierarchy: str,
    agg_measure: str,
    agg_scenario: str,
    population_model_dir: str,
    output_dir: str,
    *,
    progress_bar: bool,
) -> None:
    """Collate block-level datasets into measure-level datasets.

    This command collates results for a specific hierarchy, measure, and scenario.
    """
    hierarchy_main(
        agg_version,
        hierarchy,
        agg_measure,
        agg_scenario,
        population_model_dir,
        output_dir,
        progress_bar=progress_bar,
    )

utils

aggregate_climate_to_hierarchy(data: pd.DataFrame, hierarchy: pd.DataFrame) -> pd.DataFrame

Create all aggregate climate values for a given hierarchy from most-detailed data.

Parameters

data The most-detailed climate data to aggregate. hierarchy The hierarchy to aggregate the data to.

Returns

pd.DataFrame The climate data with values for all levels of the hierarchy.

Source code in src/climate_data/aggregate/utils.py
def aggregate_climate_to_hierarchy(
    data: pd.DataFrame, hierarchy: pd.DataFrame
) -> pd.DataFrame:
    """Create all aggregate climate values for a given hierarchy from most-detailed data.

    Parameters
    ----------
    data
        The most-detailed climate data to aggregate.
    hierarchy
        The hierarchy to aggregate the data to.

    Returns
    -------
    pd.DataFrame
        The climate data with values for all levels of the hierarchy.
    """
    results = data.set_index("location_id").copy()

    # Most detailed locations can be at multiple levels of the hierarchy,
    # so we loop over all levels from most detailed to global, aggregating
    # level by level and appending the results to the data.
    for level in reversed(list(range(1, hierarchy.level.max() + 1))):
        level_mask = hierarchy.level == level
        parent_map = hierarchy.loc[level_mask].set_index("location_id").parent_id

        subset = results.loc[parent_map.index]
        subset["parent_id"] = parent_map

        parent_values = (
            subset.groupby(["year_id", "parent_id"])[["weighted_climate", "population"]]
            .sum()
            .reset_index()
            .rename(columns={"parent_id": "location_id"})
            .set_index("location_id")
        )
        results = pd.concat([results, parent_values])
    results = (
        results.reset_index()
        .sort_values(["location_id", "year_id"])
        .reset_index(drop=True)
    )
    results["value"] = results["weighted_climate"] / results["population"]
    return results

build_bounds_map(raster_template: rt.RasterArray, shape_values: list[tuple[Polygon | MultiPolygon, int]]) -> dict[int, tuple[slice, slice]]

Build a map of location IDs to buffered slices of the raster template.

Parameters

raster_template The raster template to build the bounds map for. shape_values A list of tuples where the first element is a shapely Polygon or MultiPolygon in the CRS of the raster template and the second element is the location ID of the shape.

Returns

dict[int, tuple[slice, slice]] A dictionary mapping location IDs to a tuple of slices representing the bounds of the location in the raster template. The slices are buffered by 10 pixels to ensure that the entire shape is included in the mask.

Source code in src/climate_data/aggregate/utils.py
def build_bounds_map(
    raster_template: rt.RasterArray,
    shape_values: list[tuple[Polygon | MultiPolygon, int]],
) -> dict[int, tuple[slice, slice]]:
    """Build a map of location IDs to buffered slices of the raster template.

    Parameters
    ----------
    raster_template
        The raster template to build the bounds map for.
    shape_values
        A list of tuples where the first element is a shapely Polygon or MultiPolygon
        in the CRS of the raster template and the second element is the location ID
        of the shape.

    Returns
    -------
    dict[int, tuple[slice, slice]]
        A dictionary mapping location IDs to a tuple of slices representing the bounds
        of the location in the raster template. The slices are buffered by 10 pixels
        to ensure that the entire shape is included in the mask.
    """
    # The tranform maps pixel coordinates to the CRS coordinates.
    # This mask is the inverse of that transform.
    to_pixel = ~raster_template.transform

    bounds_map = {}
    for shp, loc_id in shape_values:
        xmin, ymin, xmax, ymax = shp.bounds
        pxmin, pymin = to_pixel * (xmin, ymax)
        pixel_buffer = 10
        pxmin = max(0, int(pxmin) - pixel_buffer)
        pymin = max(0, int(pymin) - pixel_buffer)
        pxmax, pymax = to_pixel * (xmax, ymin)
        pxmax = min(raster_template.width, int(pxmax) + pixel_buffer)
        pymax = min(raster_template.height, int(pymax) + pixel_buffer)
        bounds_map[loc_id] = (slice(pymin, pymax), slice(pxmin, pxmax))

    return bounds_map

build_location_masks(hierarchy: str, block_key: str, pm_data: PopulationModelData) -> tuple[dict[str, slice], dict[int, tuple[slice, slice, npt.NDArray[np.bool_]]]]

Build location masks for each location in the hierarchy.

Parameters

hierarchy The name of the hierarchy to build location masks for. Must be one of of the keys of the HIERARCHY_MAP constant. pm_data PopulationModelData object to load the population model data.

Returns

tuple[dict[int, tuple[slice, slice]], npt.NDArray[np.uint32]] The first element is a dictionary mapping location IDs to a tuple of slices representing the bounds of the location in the mask. This is useful for subseting the mask and data arrays before processing as downstream operations scale with the number of pixels in the mask. The second element is the mask itself, a 2D array of uint32 values where each location ID is represented by a unique integer value.

Source code in src/climate_data/aggregate/utils.py
def build_location_masks(
    hierarchy: str,
    block_key: str,
    pm_data: PopulationModelData,
) -> tuple[dict[str, slice], dict[int, tuple[slice, slice, npt.NDArray[np.bool_]]]]:
    """Build location masks for each location in the hierarchy.

    Parameters
    ----------
    hierarchy
        The name of the hierarchy to build location masks for. Must be one of
        of the keys of the HIERARCHY_MAP constant.
    pm_data
        PopulationModelData object to load the population model data.

    Returns
    -------
    tuple[dict[int, tuple[slice, slice]], npt.NDArray[np.uint32]]
        The first element is a dictionary mapping location IDs to a tuple of
        slices representing the bounds of the location in the mask. This is useful
        for subseting the mask and data arrays before processing as downstream
        operations scale with the number of pixels in the mask. The second element
        is the mask itself, a 2D array of uint32 values where each location ID is
        represented by a unique integer value.
    """
    template = pm_data.load_results("2020q1", block_key)
    template_bbox = get_bbox(template, "EPSG:4326")
    bounds = template_bbox.bounds
    raking_shapes = pm_data.load_raking_shapes(hierarchy, bounds=bounds)
    raking_shapes = raking_shapes[raking_shapes.intersects(template_bbox)].to_crs(
        template.crs
    )

    # Get some bounds to subset the climate rasters with.
    lon_min, lat_min, lon_max, lat_max = bounds
    buffer = 0.1  # Degrees, this is the resolution of the climate raster
    lon_min, lon_max = max(-180, lon_min - buffer), min(180, lon_max + buffer)
    lat_min, lat_max = max(-90, lat_min - buffer), min(90, lat_max + buffer)
    climate_slice = {
        "longitude": slice(lon_min, lon_max),
        "latitude": slice(lat_max, lat_min),
    }

    shape_values = [
        (shape, loc_id)
        for loc_id, shape in raking_shapes.set_index("location_id")
        .geometry.to_dict()
        .items()
    ]
    bounds_map = build_bounds_map(template, shape_values)

    location_mask = np.zeros_like(template, dtype=np.uint32)
    location_mask = rasterize(
        shape_values,
        out=location_mask,
        transform=template.transform,
        merge_alg=MergeAlg.replace,
    )
    final_bounds_map = {
        location_id: (rows, cols, location_mask[rows, cols] == location_id)
        for location_id, (rows, cols) in bounds_map.items()
    }
    return climate_slice, final_bounds_map, location_mask

get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon

Get the bounding box of a raster array.

Parameters

raster The raster array to get the bounding box of. crs The CRS to return the bounding box in. If None, the bounding box is returned in the CRS of the raster.

Returns

shapely.Polybon The bounding box of the raster in the CRS specified by the crs parameter.

Source code in src/climate_data/aggregate/utils.py
def get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon:
    """Get the bounding box of a raster array.

    Parameters
    ----------
    raster
        The raster array to get the bounding box of.
    crs
        The CRS to return the bounding box in. If None, the bounding box
        is returned in the CRS of the raster.

    Returns
    -------
    shapely.Polybon
        The bounding box of the raster in the CRS specified by the crs parameter.
    """
    if raster.crs not in MAX_BOUNDS:
        msg = f"Unsupported CRS: {raster.crs}"
        raise ValueError(msg)

    xmin_clip, xmax_clip, ymin_clip, ymax_clip = MAX_BOUNDS[raster.crs]
    xmin, xmax, ymin, ymax = raster.bounds

    xmin = np.clip(xmin, xmin_clip, xmax_clip)
    xmax = np.clip(xmax, xmin_clip, xmax_clip)
    ymin = np.clip(ymin, ymin_clip, ymax_clip)
    ymax = np.clip(ymax, ymin_clip, ymax_clip)

    bbox = gpd.GeoSeries([shapely.box(xmin, ymin, xmax, ymax)], crs=raster.crs)
    out_bbox = bbox.to_crs(crs) if crs is not None else bbox.copy()

    # Check that our transformation didn't do something weird
    # (e.g. artificially clip the bounds or have the bounds extend over the
    # antimeridian)
    check_bbox = out_bbox.to_crs(raster.crs)
    area_change = (np.abs(bbox.area - check_bbox.area) / bbox.area).iloc[0]
    tolerance = 1e-6
    if area_change > tolerance:
        msg = f"Area change: {area_change}"
        raise ValueError(msg)

    return cast(shapely.Polygon, out_bbox.iloc[0])