Skip to content

map_to_admin_2

01_prep_maps

batch_process_all_covariates(output_dir=None, skip_existing=True)

Process all covariates in the COVARIATE_DICT and convert them to netCDF.

Parameters:

output_dir : str or Path, optional Directory to save output files. If None, uses default directory. skip_existing : bool, default=True If True, skip processing covariates that already have output files.

Returns:

dict: Dictionary with covariate names as keys and output paths as values

Source code in src/idd_forecast_mbp/map_to_admin_2/01_prep_maps.py
def batch_process_all_covariates(output_dir=None, skip_existing=True):
    """
    Process all covariates in the COVARIATE_DICT and convert them to netCDF.

    Parameters:
    -----------
    output_dir : str or Path, optional
        Directory to save output files. If None, uses default directory.
    skip_existing : bool, default=True
        If True, skip processing covariates that already have output files.

    Returns:
    --------
    dict: Dictionary with covariate names as keys and output paths as values
    """
    if output_dir is None:
        output_dir = OUTPUT_PATH
    else:
        output_dir = Path(output_dir)

    os.makedirs(output_dir, exist_ok=True)

    results = {}

    for covariate_key in COVARIATE_DICT.keys():
        print(f"\nProcessing {covariate_key}...")
        covariate_dict = parse_yaml_dictionary(covariate_key)
        covariate_name = covariate_dict['covariate_name']

        output_path = output_dir / f"{covariate_name}.nc"

        # Skip if output file already exists and skip_existing is True
        if skip_existing and os.path.exists(output_path):
            print(f"Output file {output_path} already exists. Skipping.")
            results[covariate_name] = str(output_path)
            continue

        try:
            synoptic = covariate_dict['synoptic']
            years = covariate_dict['years']

            if synoptic or len(years) == 1:
                # Process as synoptic variable
                results[covariate_name] = process_synoptic_variable(covariate_key, output_dir)
            else:
                # Process as multi-year variable
                results[covariate_name] = process_multiyear_variable(covariate_key, output_dir)
        except Exception as e:
            print(f"Error processing {covariate_name}: {str(e)}")
            results[covariate_name] = None

    # Display summary
    print("\n==== Processing Summary ====")
    success_count = sum(1 for path in results.values() if path is not None)
    print(f"Successfully processed {success_count} out of {len(results)} covariates")

    return results

create_multiyear_netcdf(input_path_template, years, output_path, variable_name)

Create a multi-year netCDF file from individual GeoTIFF files.

Parameters:

input_path_template : str Template path with {year} to be replaced for each year years : list of int List of years to process output_path : str Path to save the combined netCDF file variable_name : str Name of the variable in the output file

Returns:

xr.Dataset: The created dataset with time dimension

Source code in src/idd_forecast_mbp/map_to_admin_2/01_prep_maps.py
def create_multiyear_netcdf(input_path_template, years, output_path, variable_name):
    """
    Create a multi-year netCDF file from individual GeoTIFF files.

    Parameters:
    -----------
    input_path_template : str
        Template path with {year} to be replaced for each year
    years : list of int
        List of years to process
    output_path : str
        Path to save the combined netCDF file
    variable_name : str
        Name of the variable in the output file

    Returns:
    --------
    xr.Dataset: The created dataset with time dimension
    """
    first_file = True
    combined_ds = None

    for year in years:
        input_path = input_path_template.format(year=year)

        # Check if file exists
        if not os.path.exists(input_path):
            print(f"Warning: File {input_path} does not exist. Skipping.")
            continue

        # Create a temporary dataset
        temp_path = f"/tmp/{os.path.basename(input_path)}.nc"
        temp_ds = geotiff_to_netcdf(input_path, temp_path, variable_name, year=year)

        # For the first valid file, initialize the combined dataset
        if first_file:
            combined_ds = temp_ds
            first_file = False
        else:
            # Concatenate along time dimension
            combined_ds = xr.concat([combined_ds, temp_ds], dim="time")

        # Remove temporary file
        os.remove(temp_path)

    # Sort by time
    if combined_ds is not None:
        combined_ds = combined_ds.sortby("time")

        # Define encoding for the combined file
        encoding = {
            "value": {"zlib": True, "complevel": 5, "dtype": "float32"},
            "lon": {"dtype": "float32", "zlib": True, "complevel": 5},
            "lat": {"dtype": "float32", "zlib": True, "complevel": 5},
            "time": {"dtype": "int32"}
        }

        # Update time attributes to ensure it's treated as simple year integers
        if combined_ds is not None:
            combined_ds.time.attrs = {
                "long_name": "year",
                "standard_name": "year", 
                "units": "year"
            }

        # Save the combined dataset
        combined_ds.to_netcdf(output_path, encoding=encoding)

        return combined_ds
    else:
        print("No valid data files found.")
        return None

geotiff_to_netcdf(input_path, output_path, variable_name, resolution=None, year=None, encoding=None, global_extent=True, fill_value=0.0)

Convert a GeoTIFF file to netCDF format.

Parameters:

input_path : str Path to the input GeoTIFF file output_path : str Path to save the output netCDF file variable_name : str Name of the variable to use in the netCDF file resolution : float, optional Resolution of the raster in degrees. If None, will be calculated from the transform. year : int, optional Year for the data (for time dimension) encoding : dict, optional Encoding options for xarray.to_netcdf() global_extent : bool, default=False If True, extend the raster to global lat/lon coverage (-180 to 180, -90 to 90) fill_value : float, default=0.0 Value to fill extended areas with

Returns:

xr.Dataset: The created dataset

Source code in src/idd_forecast_mbp/map_to_admin_2/01_prep_maps.py
def geotiff_to_netcdf(input_path, output_path, variable_name, resolution=None, year=None, encoding=None,
                      global_extent=True, fill_value=0.0):
    """
    Convert a GeoTIFF file to netCDF format.

    Parameters:
    -----------
    input_path : str
        Path to the input GeoTIFF file
    output_path : str
        Path to save the output netCDF file
    variable_name : str
        Name of the variable to use in the netCDF file
    resolution : float, optional
        Resolution of the raster in degrees. If None, will be calculated from the transform.
    year : int, optional
        Year for the data (for time dimension)
    encoding : dict, optional
        Encoding options for xarray.to_netcdf()
    global_extent : bool, default=False
        If True, extend the raster to global lat/lon coverage (-180 to 180, -90 to 90)
    fill_value : float, default=0.0
        Value to fill extended areas with

    Returns:
    --------
    xr.Dataset: The created dataset
    """
    # Create output directory if it doesn't exist
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    # Open the GeoTIFF file
    with rio.open(input_path) as src:
        # Read the data and transform information
        data = src.read(1)  # Read first band
        height, width = data.shape

        # Handle no-data values
        if src.nodata is not None:
            data = np.where(data == src.nodata, np.nan, data)

        # Get geospatial information
        transform = src.transform

        # Use provided resolution or calculate it from transform
        if resolution is None:
            x_res = transform[0]  # Width of a pixel
            y_res = abs(transform[4])  # Height of a pixel
        else:
            x_res = y_res = resolution  # Use the square pixels resolution from YAML

        # Calculate bounds correctly with full pixel coverage
        xmin = transform[2]  # Left edge
        ymax = transform[5]  # Top edge

        # Use resolution to calculate exact right and bottom edges
        xmax = xmin + (width * x_res)  # Right edge (including full pixel width)
        ymin = ymax - (height * y_res)  # Bottom edge (including full pixel height)

        if global_extent:
            # Define global extents
            global_xmin, global_xmax = -180.0, 180.0
            global_ymin, global_ymax = -90.0, 90.0

            # Calculate number of pixels needed for global coverage
            global_width = int(np.ceil((global_xmax - global_xmin) / x_res))
            global_height = int(np.ceil((global_ymax - global_ymin) / y_res))

            # Create empty global array filled with fill_value
            global_data = np.full((global_height, global_width), fill_value, dtype=data.dtype)

            # Calculate indices to place original data
            x_start = int(round((xmin - global_xmin) / x_res))
            y_start = int(round((global_ymax - ymax) / y_res))

            # Ensure indices are within bounds
            x_start = max(0, x_start)
            y_start = max(0, y_start)

            # Place the original data into the global grid
            x_end = min(x_start + width, global_width)
            y_end = min(y_start + height, global_height)

            # Calculate how much of the original data we can fit
            orig_x_slice = slice(0, x_end - x_start)
            orig_y_slice = slice(0, y_end - y_start)

            # Place the data
            global_data[y_start:y_end, x_start:x_end] = data[orig_y_slice, orig_x_slice]

            # Use this data for the rest of the function
            data = global_data
            xmin, xmax = global_xmin, global_xmax
            ymin, ymax = global_ymin, global_ymax
            width, height = global_width, global_height

        # Create coordinate arrays for pixel centers (not edges)
        lons = np.linspace(xmin + x_res/2, xmax - x_res/2, width)
        lats = np.linspace(ymax - y_res/2, ymin + y_res/2, height)

        # Create xarray dataset
        if year is not None:
            # Create a dataset with time dimension using integer year
            time_val = np.array([int(year)])  # Ensure it's an integer in an array
            ds = xr.Dataset(
                {"value": (["time", "lat", "lon"], data[np.newaxis, :, :])},
                coords={
                    "lon": lons,
                    "lat": lats,
                    "time": time_val
                }
            )

            # Add time coordinate attributes indicating it's just a year, not a timestamp
            ds.time.attrs = {
                "long_name": "year",
                "standard_name": "year",
                "units": "year"
            }
        else:
            # Create a dataset without time dimension (synoptic)
            ds = xr.Dataset(
                {"value": (["lat", "lon"], data)},
                coords={
                    "lon": lons,
                    "lat": lats
                }
            )

        # Add variable attributes
        ds["value"].attrs = {
            "long_name": variable_name.replace("_", " "),
            "units": src.meta.get("units", "unknown"),
            "resolution": x_res  # Add resolution to metadata
        }

        # Add coordinate attributes
        ds.lon.attrs = {
            "long_name": "longitude",
            "standard_name": "longitude",
            "units": "degrees_east",
            "axis": "X",
            "resolution": x_res
        }

        ds.lat.attrs = {
            "long_name": "latitude",
            "standard_name": "latitude",
            "units": "degrees_north",
            "axis": "Y",
            "resolution": y_res
        }

        # Add global attributes
        ds.attrs = {
            "title": f"{variable_name} data",
            "source": f"Converted from GeoTIFF: {os.path.basename(input_path)}",
            "created": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "Conventions": "CF-1.8",
            "history": f"Created from {input_path}",
            "pixel_size_degrees": x_res,
            "pixel_registration": "center"
        }

        # Use default encoding if none provided
        if encoding is None:
            encoding = {
                "value": {"zlib": True, "complevel": 5, "dtype": "float32"},
                "lon": {"dtype": "float32", "zlib": True, "complevel": 5},
                "lat": {"dtype": "float32", "zlib": True, "complevel": 5}
            }
            if year is not None:
                # Ensure time is stored as a simple integer with no date units
                encoding["time"] = {"dtype": "int32"}

        # Save the dataset to netCDF
        ds.to_netcdf(output_path, encoding=encoding)

        # Set proper permissions
        set_file_permissions(output_path)

        return ds

main()

Main function to run when script is executed directly.

Source code in src/idd_forecast_mbp/map_to_admin_2/01_prep_maps.py
def main():
    """Main function to run when script is executed directly."""
    results = batch_process_all_covariates(skip_existing=False)

    # Print successful conversions
    print("\nSuccessfully converted covariates:")
    for covariate, path in results.items():
        if path:
            print(f"- {covariate}: {path}")

    # Print failed conversions
    print("\nFailed conversions:")
    for covariate, path in results.items():
        if not path:
            print(f"- {covariate}")

process_multiyear_variable(covariate_key, output_dir)

Process a multi-year variable and convert to a single netCDF with time dimension.

Parameters:

covariate_key : str Key of the covariate in the COVARIATE_DICT

Returns:

str: Path to the output netCDF file

Source code in src/idd_forecast_mbp/map_to_admin_2/01_prep_maps.py
def process_multiyear_variable(covariate_key, output_dir):
    """
    Process a multi-year variable and convert to a single netCDF with time dimension.

    Parameters:
    -----------
    covariate_key : str
        Key of the covariate in the COVARIATE_DICT

    Returns:
    --------
    str: Path to the output netCDF file
    """
    # Get dictionary from YAML
    covariate_dict = parse_yaml_dictionary(covariate_key)
    covariate_name = covariate_dict.get('covariate_name', covariate_key)
    path_template = covariate_dict['path']
    years = covariate_dict['years']
    resolution = covariate_dict.get('covariate_resolution', None)

    if not years:
        print(f"No years specified for {covariate_name}. Cannot process as multi-year variable.")
        return None

    # Create output directory if it doesn't exist
    if isinstance(output_dir, str):
        output_dir = Path(output_dir)
    os.makedirs(output_dir, exist_ok=True)

    # Define output path
    output_path = output_dir / f"{covariate_name}.nc"

    # Update create_multiyear_netcdf to pass resolution to geotiff_to_netcdf
    # This requires modifying the create_multiyear_netcdf function

    # For now, pass resolution directly to each geotiff_to_netcdf call
    first_file = True
    combined_ds = None

    for year in years:
        input_path = path_template.format(year=year)

        # Check if file exists
        if not os.path.exists(input_path):
            print(f"Warning: File {input_path} does not exist. Skipping.")
            continue

        # Create a temporary dataset
        temp_path = f"/tmp/{os.path.basename(input_path)}.nc"
        temp_ds = geotiff_to_netcdf(
            input_path=input_path, 
            output_path=temp_path, 
            variable_name=covariate_name, 
            resolution=resolution,
            year=year
        )

        # For the first valid file, initialize the combined dataset
        if first_file:
            combined_ds = temp_ds
            first_file = False
        else:
            # Concatenate along time dimension
            combined_ds = xr.concat([combined_ds, temp_ds], dim="time")

        # Remove temporary file
        os.remove(temp_path)

    if combined_ds is not None:
        # Sort by time
        combined_ds = combined_ds.sortby("time")

        # Define encoding for the combined file
        encoding = {
            "value": {"zlib": True, "complevel": 5, "dtype": "float32"},
            "lon": {"dtype": "float32", "zlib": True, "complevel": 5},
            "lat": {"dtype": "float32", "zlib": True, "complevel": 5},
            "time": {"dtype": "int32"}
        }

        # Save the combined dataset
        combined_ds.to_netcdf(output_path, encoding=encoding)

        # Set proper permissions
        set_file_permissions(output_path)

    return str(output_path) if combined_ds is not None else None

process_synoptic_variable(covariate_key, output_dir)

Process a synoptic (time-invariant) variable and convert to netCDF.

Parameters:

covariate_key : str Key of the covariate in the COVARIATE_DICT

Returns:

str: Path to the output netCDF file

Source code in src/idd_forecast_mbp/map_to_admin_2/01_prep_maps.py
def process_synoptic_variable(covariate_key, output_dir):
    """
    Process a synoptic (time-invariant) variable and convert to netCDF.

    Parameters:
    -----------
    covariate_key : str
        Key of the covariate in the COVARIATE_DICT

    Returns:
    --------
    str: Path to the output netCDF file
    """
    # Get dictionary from YAML
    covariate_dict = parse_yaml_dictionary(covariate_key)
    covariate_name = covariate_dict.get('covariate_name', covariate_key)
    path = covariate_dict['path']
    resolution = covariate_dict.get('covariate_resolution', None)

    # Create output directory if it doesn't exist
    if isinstance(output_dir, str):
        output_dir = Path(output_dir)
    os.makedirs(output_dir, exist_ok=True)

    # Define output path
    output_path = output_dir / f"{covariate_name}.nc"

    # Convert to netCDF with resolution from YAML
    geotiff_to_netcdf(
        input_path=path,
        output_path=str(output_path),
        variable_name=covariate_name,
        resolution=resolution
    )

    return str(output_path)

set_file_permissions(file_path)

Set 775 permissions (rwxrwxr-x) on the specified file.

Source code in src/idd_forecast_mbp/map_to_admin_2/01_prep_maps.py
def set_file_permissions(file_path):
    """Set 775 permissions (rwxrwxr-x) on the specified file."""
    try:
        os.chmod(file_path, 0o775)  # 0o775 is octal for 775 permissions
    except Exception as e:
        print(f"Warning: Could not set permissions on {file_path}: {str(e)}")

pixel_hierarchy

aggregate_climate_to_hierarchy(data: pd.DataFrame, hierarchy: pd.DataFrame) -> pd.DataFrame

Create all aggregate climate values for a given hierarchy from most-detailed data.

Parameters

data The most-detailed climate data to aggregate. hierarchy The hierarchy to aggregate the data to.

Returns

pd.DataFrame The climate data with values for all levels of the hierarchy.

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_hierarchy.py
def aggregate_climate_to_hierarchy(
    data: pd.DataFrame, hierarchy: pd.DataFrame
) -> pd.DataFrame:
    """Create all aggregate climate values for a given hierarchy from most-detailed data.

    Parameters
    ----------
    data
        The most-detailed climate data to aggregate.
    hierarchy
        The hierarchy to aggregate the data to.

    Returns
    -------
    pd.DataFrame
        The climate data with values for all levels of the hierarchy.
    """
    results = data.set_index("location_id").copy()

    # Most detailed locations can be at multiple levels of the hierarchy,
    # so we loop over all levels from most detailed to global, aggregating
    # level by level and appending the results to the data.
    for level in reversed(list(range(1, hierarchy.level.max() + 1))):
        level_mask = hierarchy.level == level
        parent_map = hierarchy.loc[level_mask].set_index("location_id").parent_id

        # For every location in the parent map, we need to check if it is the results
        # For those that are, proceed to aggregate
        # For those that aren't, check to make sure their parent is in the results. If not, exit with an error
        absent_parent_map = parent_map.index.difference(results.index)
        if len(absent_parent_map) > 0:
            msg = f"Some parent locations are not in the results: {absent_parent_map}"
            # Check to see if the parent of each location id that is missing is in the results
            parent_of_absent = parent_map.loc[absent_parent_map]
            unique_parent_ids = parent_of_absent.unique()
            # Check to see if the unique_parent_ids are in the results
            missing_parents = unique_parent_ids[~np.isin(unique_parent_ids, results.index)]
            if len(missing_parents) > 0:
                msg = f"Some parent locations are not in the results: {missing_parents}"
                raise ValueError(msg)

        present_parent_map = parent_map.loc[parent_map.index.isin(results.index)]
        # Continue aggregation only on the present locations
        subset = results.loc[present_parent_map.index]
        subset["parent_id"] = present_parent_map

        parent_values = (
            subset.groupby(["year_id", "parent_id"])[["weighted_climate", "population"]]
            .sum()
            .reset_index()
            .rename(columns={"parent_id": "location_id"})
            .set_index("location_id")
        )
        results = pd.concat([results, parent_values])
    results = (
        results.reset_index()
        .sort_values(["location_id", "year_id"])
    )
    parent_values["value"] = parent_values.weighted_climate / parent_values.population
    return results

load_subset_hierarchy(subset_hierarchy: str) -> pd.DataFrame

Load a subset location hierarchy.

The subset hierarchy might be equal to the full aggregation hierarchy, but it might also be a subset of the full aggregation hierarchy. These hierarchies are used to provide different views of aggregated climate data.

Parameters

subset_hierarchy The administrative hierarchy to load (e.g. "gbd_2021")

Returns

pd.DataFrame The hierarchy data with parent-child relationships

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_hierarchy.py
def load_subset_hierarchy(subset_hierarchy: str) -> pd.DataFrame:
    """Load a subset location hierarchy.

    The subset hierarchy might be equal to the full aggregation hierarchy,
    but it might also be a subset of the full aggregation hierarchy.
    These hierarchies are used to provide different views of aggregated
    climate data.

    Parameters
    ----------
    subset_hierarchy
        The administrative hierarchy to load (e.g. "gbd_2021")

    Returns
    -------
    pd.DataFrame
        The hierarchy data with parent-child relationships
    """
    root = Path("/mnt/team/rapidresponse/pub/population-model/admin-inputs/raking")
    allowed_hierarchies = ["gbd_2021", "fhs_2021", "lsae_1209", "lsae_1285"]
    if subset_hierarchy not in allowed_hierarchies:
        msg = f"Unknown admin hierarchy: {subset_hierarchy}"
        raise ValueError(msg)
    path = root / "gbd-inputs" / f"hierarchy_{subset_hierarchy}.parquet"
    return pd.read_parquet(path)

post_process(df: pd.DataFrame, pop_df: pd.DataFrame) -> pd.DataFrame

Rename 000 to {summary_covariate}_per_capita Merge in population Create {summary_covariate}_capita*population -> {summary_covariate}

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_hierarchy.py
def post_process(df: pd.DataFrame, pop_df: pd.DataFrame) -> pd.DataFrame: # Fix this for other summary_variable/variable/etc
    """
    Rename 000 to {summary_covariate}_per_capita
    Merge in population
    Create {summary_covariate}_capita*population -> {summary_covariate}
    """

    # Rename 000 to people_flood_days_per_capita
    df = df.rename(columns={"000": f"{summary_covariate}_per_capita"})

    # Merge in population
    full_df = df.merge(
        pop_df,
        on=["location_id", "year_id"],
        how="left",
    )
    # assert all location_ids and years combinations are present
    assert df.shape[0] == full_df.shape[0]
    assert df.location_id.nunique() == full_df.location_id.nunique()
    assert df.year_id.nunique() == full_df.year_id.nunique()

    # Create {summary_covariate}
    full_df[summary_covariate] = (
        full_df[f"{summary_covariate}_per_capita"] * full_df["population"]
    ).astype(np.float32)

    return full_df

pixel_main

build_bounds_map(raster_template: rt.RasterArray, shape_values: list[tuple[Polygon | MultiPolygon, int]]) -> dict[int, tuple[slice, slice]]

Build a map of location IDs to buffered slices of the raster template.

Parameters

raster_template The raster template to build the bounds map for. shape_values A list of tuples where the first element is a shapely Polygon or MultiPolygon in the CRS of the raster template and the second element is the location ID of the shape.

Returns

dict[int, tuple[slice, slice]] A dictionary mapping location IDs to a tuple of slices representing the bounds of the location in the raster template. The slices are buffered by 10 pixels to ensure that the entire shape is included in the mask.

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def build_bounds_map(
    raster_template: rt.RasterArray,
    shape_values: list[tuple[Polygon | MultiPolygon, int]],
) -> dict[int, tuple[slice, slice]]:
    """Build a map of location IDs to buffered slices of the raster template.

    Parameters
    ----------
    raster_template
        The raster template to build the bounds map for.
    shape_values
        A list of tuples where the first element is a shapely Polygon or MultiPolygon
        in the CRS of the raster template and the second element is the location ID
        of the shape.

    Returns
    -------
    dict[int, tuple[slice, slice]]
        A dictionary mapping location IDs to a tuple of slices representing the bounds
        of the location in the raster template. The slices are buffered by 10 pixels
        to ensure that the entire shape is included in the mask.
    """
    # The tranform maps pixel coordinates to the CRS coordinates.
    # This mask is the inverse of that transform.
    to_pixel = ~raster_template.transform

    bounds_map = {}
    for shp, loc_id in shape_values:
        xmin, ymin, xmax, ymax = shp.bounds
        pxmin, pymin = to_pixel * (xmin, ymax)
        pixel_buffer = 10
        pxmin = max(0, int(pxmin) - pixel_buffer)
        pymin = max(0, int(pymin) - pixel_buffer)
        pxmax, pymax = to_pixel * (xmax, ymin)
        pxmax = min(raster_template.width, int(pxmax) + pixel_buffer)
        pymax = min(raster_template.height, int(pymax) + pixel_buffer)
        bounds_map[loc_id] = (slice(pymin, pymax), slice(pxmin, pxmax))

    return bounds_map

get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon

Get the bounding box of a raster array.

Parameters

raster The raster array to get the bounding box of. crs The CRS to return the bounding box in. If None, the bounding box is returned in the CRS of the raster.

Returns

shapely.Polybon The bounding box of the raster in the CRS specified by the crs parameter.

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def get_bbox(raster: rt.RasterArray, crs: str | None = None) -> shapely.Polygon:
    """Get the bounding box of a raster array.

    Parameters
    ----------
    raster
        The raster array to get the bounding box of.
    crs
        The CRS to return the bounding box in. If None, the bounding box
        is returned in the CRS of the raster.

    Returns
    -------
    shapely.Polybon
        The bounding box of the raster in the CRS specified by the crs parameter.
    """
    if raster.crs not in MAX_BOUNDS:
        msg = f"Unsupported CRS: {raster.crs}"
        raise ValueError(msg)

    xmin_clip, xmax_clip, ymin_clip, ymax_clip = MAX_BOUNDS[raster.crs]
    xmin, xmax, ymin, ymax = raster.bounds

    xmin = np.clip(xmin, xmin_clip, xmax_clip)
    xmax = np.clip(xmax, xmin_clip, xmax_clip)
    ymin = np.clip(ymin, ymin_clip, ymax_clip)
    ymax = np.clip(ymax, ymin_clip, ymax_clip)

    bbox = gpd.GeoSeries([shapely.box(xmin, ymin, xmax, ymax)], crs=raster.crs)
    out_bbox = bbox.to_crs(crs) if crs is not None else bbox.copy()

    # Check that our transformation didn't do something weird
    # (e.g. artificially clip the bounds or have the bounds extend over the
    # antimeridian)
    check_bbox = out_bbox.to_crs(raster.crs)
    area_change = (np.abs(bbox.area - check_bbox.area) / bbox.area).iloc[0]
    tolerance = 1e-6
    if area_change > tolerance:
        msg = f"Area change: {area_change}"
        raise ValueError(msg)

    return cast(shapely.Polygon, out_bbox.iloc[0])

load_raking_shapes(full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]) -> gpd.GeoDataFrame

Load shapes for a full aggregation hierarchy within given bounds.

Parameters

full_aggregation_hierarchy The full aggregation hierarchy to load (e.g. "gbd_2021") bounds The bounds to load (xmin, ymin, xmax, ymax)

Returns

gpd.GeoDataFrame The shapes for the given hierarchy and bounds

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def load_raking_shapes(full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]
) -> gpd.GeoDataFrame:
    """Load shapes for a full aggregation hierarchy within given bounds.

    Parameters
    ----------
    full_aggregation_hierarchy
        The full aggregation hierarchy to load (e.g. "gbd_2021")
    bounds
        The bounds to load (xmin, ymin, xmax, ymax)

    Returns
    -------
    gpd.GeoDataFrame
        The shapes for the given hierarchy and bounds
    """
    root = Path("/mnt/team/rapidresponse/pub/population-model/admin-inputs/raking")
    if full_aggregation_hierarchy in ["gbd_2021", "gbd_2023"]:
        shape_path = (
            root/ f"shapes_{full_aggregation_hierarchy}.parquet"
        )
        gdf = gpd.read_parquet(shape_path, bbox=bounds)

        # We're using population data here instead of a hierarchy because
        # The populations include extra locations we've supplemented that aren't
        # modeled in GBD (e.g. locations with zero population or places that
        # GBD uses population scalars from WPP to model)
        pop_path = (
            root / f"population_{full_aggregation_hierarchy}.parquet"
        )
        pop = pd.read_parquet(pop_path)

        keep_cols = ["location_id", "location_name", "most_detailed", "parent_id"]
        keep_mask = (
            (pop.year_id == pop.year_id.max())  # Year doesn't matter
            & (pop.most_detailed == 1)
        )
        out = gdf.merge(pop.loc[keep_mask, keep_cols], on="location_id", how="left")
    elif full_aggregation_hierarchy in ["lsae_1209", "lsae_1285"]:
        # This is only a2 geoms, so already most detailed
        shape_path = (
            root
            / "gbd-inputs"
            / f"shapes_{full_aggregation_hierarchy}_a2.parquet"
        )
        out = gpd.read_parquet(shape_path, bbox=bounds)
    else:
        msg = f"Unknown pixel hierarchy: {full_aggregation_hierarchy}"
        raise ValueError(msg)
    return out

to_raster(ds: xr.DataArray, no_data_value: float | int, lat_col: str = 'lat', lon_col: str = 'lon', crs: str = 'EPSG:4326') -> rt.RasterArray

Convert an xarray DataArray to a RasterArray.

Parameters

ds The xarray DataArray to convert. no_data_value The value to use for missing data. This should be consistent with the dtype of the data. lat_col The name of the latitude coordinate in the dataset. lon_col The name of the longitude coordinate in the dataset. crs The coordinate reference system of the data.

Returns

rt.RasterArray The RasterArray representation of the input data.

Source code in src/idd_forecast_mbp/map_to_admin_2/pixel_main.py
def to_raster(
    ds: xr.DataArray,
    no_data_value: float | int,
    lat_col: str = "lat",
    lon_col: str = "lon",
    crs: str = "EPSG:4326",
) -> rt.RasterArray:
    """Convert an xarray DataArray to a RasterArray.

    Parameters
    ----------
    ds
        The xarray DataArray to convert.
    no_data_value
        The value to use for missing data. This should be consistent with the dtype of the data.
    lat_col
        The name of the latitude coordinate in the dataset.
    lon_col
        The name of the longitude coordinate in the dataset.
    crs
        The coordinate reference system of the data.

    Returns
    -------
    rt.RasterArray
        The RasterArray representation of the input data.
    """
    lat, lon = ds[lat_col].data, ds[lon_col].data

    dlat = (lat[1:] - lat[:-1]).mean()
    dlon = (lon[1:] - lon[:-1]).mean()

    transform = Affine(
        a=dlon,
        b=0.0,
        c=lon[0],
        d=0.0,
        e=-dlat,
        f=lat[-1],
    )
    return rt.RasterArray(
        data=ds.data[::-1],
        transform=transform,
        crs=crs,
        no_data_value=no_data_value,
    )