Skip to content

pixel_main

build_bounds_map(raster_template: rt.RasterArray, shape_values: list[tuple[Polygon | MultiPolygon, int]]) -> dict[int, tuple[slice, slice]]

Build a map of location IDs to buffered slices of the raster template.

Parameters

raster_template The raster template to build the bounds map for. shape_values A list of tuples where the first element is a shapely Polygon or MultiPolygon in the CRS of the raster template and the second element is the location ID of the shape.

Returns

dict[int, tuple[slice, slice]] A dictionary mapping location IDs to a tuple of slices representing the bounds of the location in the raster template. The slices are buffered by 10 pixels to ensure that the entire shape is included in the mask.

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def build_bounds_map(
    raster_template: rt.RasterArray,
    shape_values: list[tuple[Polygon | MultiPolygon, int]],
) -> dict[int, tuple[slice, slice]]:
    """Build a map of location IDs to buffered slices of the raster template.

    Parameters
    ----------
    raster_template
        The raster template to build the bounds map for.
    shape_values
        A list of tuples where the first element is a shapely Polygon or MultiPolygon
        in the CRS of the raster template and the second element is the location ID
        of the shape.

    Returns
    -------
    dict[int, tuple[slice, slice]]
        A dictionary mapping location IDs to a tuple of slices representing the bounds
        of the location in the raster template. The slices are buffered by 10 pixels
        to ensure that the entire shape is included in the mask.
    """
    # The tranform maps pixel coordinates to the CRS coordinates.
    # This mask is the inverse of that transform.
    to_pixel = ~raster_template.transform

    bounds_map = {}
    for shp, loc_id in shape_values:
        xmin, ymin, xmax, ymax = shp.bounds
        pxmin, pymin = to_pixel * (xmin, ymax)
        pixel_buffer = 10
        pxmin = max(0, int(pxmin) - pixel_buffer)
        pymin = max(0, int(pymin) - pixel_buffer)
        pxmax, pymax = to_pixel * (xmax, ymin)
        pxmax = min(raster_template.width, int(pxmax) + pixel_buffer)
        pymax = min(raster_template.height, int(pymax) + pixel_buffer)
        bounds_map[loc_id] = (slice(pymin, pymax), slice(pxmin, pxmax))

    return bounds_map

get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon

Get the bounding box of a raster array.

Parameters

raster The raster array to get the bounding box of. crs The CRS to return the bounding box in. If None, the bounding box is returned in the CRS of the raster.

Returns

shapely.Polybon The bounding box of the raster in the CRS specified by the crs parameter.

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon:
    """Get the bounding box of a raster array.

    Parameters
    ----------
    raster
        The raster array to get the bounding box of.
    crs
        The CRS to return the bounding box in. If None, the bounding box
        is returned in the CRS of the raster.

    Returns
    -------
    shapely.Polybon
        The bounding box of the raster in the CRS specified by the crs parameter.
    """
    if raster.crs not in MAX_BOUNDS:
        msg = f"Unsupported CRS: {raster.crs}"
        raise ValueError(msg)

    xmin_clip, xmax_clip, ymin_clip, ymax_clip = MAX_BOUNDS[raster.crs]
    xmin, xmax, ymin, ymax = raster.bounds

    xmin = np.clip(xmin, xmin_clip, xmax_clip)
    xmax = np.clip(xmax, xmin_clip, xmax_clip)
    ymin = np.clip(ymin, ymin_clip, ymax_clip)
    ymax = np.clip(ymax, ymin_clip, ymax_clip)

    bbox = gpd.GeoSeries([shapely.box(xmin, ymin, xmax, ymax)], crs=raster.crs)
    out_bbox = bbox.to_crs(crs) if crs is not None else bbox.copy()

    # Check that our transformation didn't do something weird
    # (e.g. artificially clip the bounds or have the bounds extend over the
    # antimeridian)
    check_bbox = out_bbox.to_crs(raster.crs)
    area_change = (np.abs(bbox.area - check_bbox.area) / bbox.area).iloc[0]
    tolerance = 1e-6
    if area_change > tolerance:
        msg = f"Area change: {area_change}"
        raise ValueError(msg)

    return cast(shapely.Polygon, out_bbox.iloc[0])

load_raking_shapes(full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]) -> gpd.GeoDataFrame

Load shapes for a full aggregation hierarchy within given bounds.

Parameters

full_aggregation_hierarchy The full aggregation hierarchy to load (e.g. "gbd_2021") bounds The bounds to load (xmin, ymin, xmax, ymax)

Returns

gpd.GeoDataFrame The shapes for the given hierarchy and bounds

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def load_raking_shapes(full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]
) -> gpd.GeoDataFrame:
    """Load shapes for a full aggregation hierarchy within given bounds.

    Parameters
    ----------
    full_aggregation_hierarchy
        The full aggregation hierarchy to load (e.g. "gbd_2021")
    bounds
        The bounds to load (xmin, ymin, xmax, ymax)

    Returns
    -------
    gpd.GeoDataFrame
        The shapes for the given hierarchy and bounds
    """
    root = Path("/mnt/team/rapidresponse/pub/population-model/admin-inputs/raking")
    if full_aggregation_hierarchy in ["gbd_2021", "gbd_2023"]:
        shape_path = (
            root/ f"shapes_{full_aggregation_hierarchy}.parquet"
        )
        gdf = gpd.read_parquet(shape_path, bbox=bounds)

        # We're using population data here instead of a hierarchy because
        # The populations include extra locations we've supplemented that aren't
        # modeled in GBD (e.g. locations with zero population or places that
        # GBD uses population scalars from WPP to model)
        pop_path = (
            root / f"population_{full_aggregation_hierarchy}.parquet"
        )
        pop = pd.read_parquet(pop_path)

        keep_cols = ["location_id", "location_name", "most_detailed", "parent_id"]
        keep_mask = (
            (pop.year_id == pop.year_id.max())  # Year doesn't matter
            & (pop.most_detailed == 1)
        )
        out = gdf.merge(pop.loc[keep_mask, keep_cols], on="location_id", how="left")
    elif full_aggregation_hierarchy in ["lsae_1209", "lsae_1285"]:
        # This is only a2 geoms, so already most detailed
        shape_path = (
            root
            / "gbd-inputs"
            / f"shapes_{full_aggregation_hierarchy}_a2.parquet"
        )
        out = gpd.read_parquet(shape_path, bbox=bounds)
    else:
        msg = f"Unknown pixel hierarchy: {full_aggregation_hierarchy}"
        raise ValueError(msg)
    return out

to_raster(ds: xr.DataArray, no_data_value: float | int, lat_col: str = 'lat', lon_col: str = 'lon', crs: str = 'EPSG:4326') -> rt.RasterArray

Convert an xarray DataArray to a RasterArray.

Parameters

ds The xarray DataArray to convert. no_data_value The value to use for missing data. This should be consistent with the dtype of the data. lat_col The name of the latitude coordinate in the dataset. lon_col The name of the longitude coordinate in the dataset. crs The coordinate reference system of the data.

Returns

rt.RasterArray The RasterArray representation of the input data.

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def to_raster(
    ds: xr.DataArray,
    no_data_value: float | int,
    lat_col: str = "lat",
    lon_col: str = "lon",
    crs: str = "EPSG:4326",
) -> rt.RasterArray:
    """Convert an xarray DataArray to a RasterArray.

    Parameters
    ----------
    ds
        The xarray DataArray to convert.
    no_data_value
        The value to use for missing data. This should be consistent with the dtype of the data.
    lat_col
        The name of the latitude coordinate in the dataset.
    lon_col
        The name of the longitude coordinate in the dataset.
    crs
        The coordinate reference system of the data.

    Returns
    -------
    rt.RasterArray
        The RasterArray representation of the input data.
    """
    lat, lon = ds[lat_col].data, ds[lon_col].data

    dlat = (lat[1:] - lat[:-1]).mean()
    dlon = (lon[1:] - lon[:-1]).mean()

    transform = Affine(
        a=dlon,
        b=0.0,
        c=lon[0],
        d=0.0,
        e=-dlat,
        f=lat[-1],
    )
    return rt.RasterArray(
        data=ds.data[::-1],
        transform=transform,
        crs=crs,
        no_data_value=no_data_value,
    )