Skip to content

climate_data

Climate Data

This package contains modules for extracting, processing, harmonizing, and downscaling climate data. It sources historical climate data from the European Centre for Medium-Range Weather Forecasts (ECMWF) ERA5 dataset and future climate data from the Coupled Model Intercomparison Project Phase 6 (CMIP6).

cli

cdrun() -> None

Entry point for running climate downscale workflows.

Source code in src/climate_data/cli.py
6
7
8
@click.group()
def cdrun() -> None:
    """Entry point for running climate downscale workflows."""

cdtask() -> None

Entry point for running climate downscale tasks.

Source code in src/climate_data/cli.py
@click.group()
def cdtask() -> None:
    """Entry point for running climate downscale tasks."""

cli_options

Climate Data CLI Options

This module provides a set of CLI options for extracting climate data from the ERA5 and CMIP6 datasets. These options are used to specify the data to extract, such as the year, month, variable, and dataset. It also provides global variables representing the full space of valid values for these options.

with_year(years: Collection[str], *, allow_all: bool = False) -> Callable[[Callable[P, T]], Callable[P, T]]

Create a CLI option for selecting a year.

Source code in src/climate_data/cli_options.py
def with_year[**P, T](
    years: Collection[str],
    *,
    allow_all: bool = False,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    """Create a CLI option for selecting a year."""
    return with_choice(
        "year",
        "y",
        allow_all=allow_all,
        choices=years,
        help="Year to extract data for.",
        convert=allow_all,
    )

data

Climate Data Management

This module provides a class for managing the climate data used in the project. It includes methods for loading and saving data, as well as for accessing the various directories where data is stored. This abstraction allows for easy access to the data and ensures that all data is stored in a consistent and organized manner. It also provides a central location for managing the data, which makes it easier to update and maintain the path structure of the data as needed.

This module generally does not load or process data itself, though some exceptions are made for metadata which is generally loaded and cached on disk.

ClimateData

Class for managing the climate data used in the project.

Source code in src/climate_data/data.py
class ClimateData:
    """Class for managing the climate data used in the project."""

    def __init__(
        self, root: str | Path = cdc.MODEL_ROOT, *, create_root: bool = True
    ) -> None:
        self._root = Path(root)
        self._credentials_root = self._root / "credentials"
        if create_root:
            self._create_model_root()

    def _create_model_root(self) -> None:
        mkdir(self.root, exist_ok=True)
        mkdir(self.credentials_root, exist_ok=True)

        mkdir(self.extracted_data, exist_ok=True)
        mkdir(self.extracted_era5, exist_ok=True)
        mkdir(self.extracted_cmip6, exist_ok=True)
        mkdir(self.ncei_climate_stations, exist_ok=True)
        mkdir(self.open_topography_elevation, exist_ok=True)
        mkdir(self.rub_local_climate_zones, exist_ok=True)

        mkdir(self.downscale_model, exist_ok=True)
        mkdir(self.predictors, exist_ok=True)
        mkdir(self.training_data, exist_ok=True)

        mkdir(self.results, exist_ok=True)
        mkdir(self.results_metadata, exist_ok=True)
        mkdir(self.daily_results, exist_ok=True)
        mkdir(self.raw_daily_results, exist_ok=True)
        mkdir(self.annual_results, exist_ok=True)
        mkdir(self.raw_annual_results, exist_ok=True)

    @property
    def root(self) -> Path:
        return self._root

    @property
    def credentials_root(self) -> Path:
        return self._credentials_root

    ##################
    # Extracted data #
    ##################

    @property
    def extracted_data(self) -> Path:
        return self.root / "extracted_data"

    @property
    def extracted_era5(self) -> Path:
        return self.extracted_data / "era5"

    def extracted_era5_path(
        self, dataset: str, variable: str, year: int | str, month: str
    ) -> Path:
        return self.extracted_era5 / f"{dataset}_{variable}_{year}_{month}.nc"

    @property
    def extracted_cmip6(self) -> Path:
        return self.extracted_data / "cmip6"

    def load_koppen_geiger_model_inclusion(
        self, *, return_full_criteria: bool = False
    ) -> pd.DataFrame:
        meta_path = self.extracted_cmip6 / "koppen_geiger_model_inclusion.parquet"

        if not meta_path.exists():
            df = pd.read_html(
                "https://www.nature.com/articles/s41597-023-02549-6/tables/3"
            )[0]
            df.columns = [  # type: ignore[assignment]
                "source_id",
                "member_count",
                "mean_trend",
                "std_dev_trend",
                "transient_climate_response",
                "equilibrium_climate_sensitivity",
                "included_raw",
            ]
            df["included"] = df["included_raw"].apply({"Yes": True, "No": False}.get)
            save_parquet(df, meta_path)

        df = pd.read_parquet(meta_path)
        if return_full_criteria:
            return df
        return df[["source_id", "included"]]

    def load_cmip6_metadata(self) -> pd.DataFrame:
        meta_path = self.extracted_cmip6 / "cmip6-metadata.parquet"

        if not meta_path.exists():
            external_path = "https://storage.googleapis.com/cmip6/cmip6-zarr-consolidated-stores.csv"
            meta = pd.read_csv(external_path)
            save_parquet(meta, meta_path)

        return pd.read_parquet(meta_path)

    def extracted_cmip6_path(
        self,
        variable: str,
        experiment: str,
        gcm_member: str,
    ) -> Path:
        return self.extracted_cmip6 / f"{variable}_{experiment}_{gcm_member}.nc"

    def get_gcms(
        self,
        source_variables: Collection[str],
    ) -> list[str]:
        inclusion_meta = self.load_scenario_inclusion_metadata()[source_variables]
        inclusion_meta = inclusion_meta[inclusion_meta.all(axis=1)]
        return [
            f"{model}_{variant}" for model, variant in inclusion_meta.index.tolist()
        ]

    @property
    def ncei_climate_stations(self) -> Path:
        return self.extracted_data / "ncei_climate_stations"

    def save_ncei_climate_stations(self, df: pd.DataFrame, year: int | str) -> None:
        path = self.ncei_climate_stations / f"{year}.parquet"
        save_parquet(df, path)

    def load_ncei_climate_stations(self, year: int | str) -> pd.DataFrame:
        return pd.read_parquet(self.ncei_climate_stations / f"{year}.parquet")

    @property
    def open_topography_elevation(self) -> Path:
        return self.extracted_data / "open_topography_elevation"

    @property
    def rub_local_climate_zones(self) -> Path:
        return self.extracted_data / "rub_local_climate_zones"

    ###################
    # Downscale model #
    ###################

    @property
    def downscale_model(self) -> Path:
        return self.root / "downscale_model"

    @property
    def predictors(self) -> Path:
        return self.downscale_model / "predictors"

    def save_predictor(
        self,
        predictor: rt.RasterArray,
        name: str,
        lat_start: int,
        lon_start: int,
    ) -> None:
        path = self.predictors / f"{name}_{lat_start}_{lon_start}.tif"
        save_raster(predictor, path)

    def load_predictor(self, name: str) -> rt.RasterArray:
        paths = list(self.predictors.glob(f"{name}_*.tif"))
        return rt.load_mf_raster(paths)

    @property
    def training_data(self) -> Path:
        return self.downscale_model / "training_data"

    def save_training_data(self, df: pd.DataFrame, year: int | str) -> None:
        path = self.training_data / f"{year}.parquet"
        save_parquet(df, path)

    def load_training_data(self, year: int | str) -> pd.DataFrame:
        return pd.read_parquet(self.training_data / f"{year}.parquet")

    ###########
    # Results #
    ###########

    @property
    def results(self) -> Path:
        return self.root / "results"

    @property
    def results_metadata(self) -> Path:
        return self.results / "metadata"

    def save_scenario_metadata(self, df: pd.DataFrame) -> None:
        path = self.results_metadata / "scenario_metadata.parquet"
        save_parquet(df, path)

    def load_scenario_metadata(self) -> pd.DataFrame:
        path = self.results_metadata / "scenario_metadata.parquet"
        return pd.read_parquet(path)

    def save_scenario_inclusion_metadata(self, df: pd.DataFrame) -> None:
        # Need to save to our scripts directory for doc building
        scripts_root = Path(__file__).parent.parent.parent / "scripts"
        for root_dir in [self.results_metadata, scripts_root]:
            path = root_dir / "scenario_inclusion_metadata.parquet"
            save_parquet(df, path)

    def load_scenario_inclusion_metadata(self) -> pd.DataFrame:
        path = self.results_metadata / "scenario_inclusion_metadata.parquet"
        return pd.read_parquet(path)

    @property
    def daily_results(self) -> Path:
        return self.results / "daily"

    @property
    def raw_daily_results(self) -> Path:
        return self.daily_results / "raw"

    def raw_daily_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> Path:
        return self.raw_daily_results / scenario / variable / f"{year}_{gcm_member}.nc"

    def save_raw_daily_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        path = self.raw_daily_results_path(scenario, variable, year, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def daily_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
    ) -> Path:
        return self.daily_results / scenario / variable / f"{year}.nc"

    def save_daily_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        path = self.daily_results_path(scenario, variable, year)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def load_daily_results(
        self,
        scenario: str,
        variable: str,
        year: int | str,
    ) -> xr.Dataset:
        results_path = self.daily_results_path(scenario, variable, year)
        return xr.open_dataset(results_path)

    @property
    def annual_results(self) -> Path:
        return self.results / "annual"

    @property
    def raw_annual_results(self) -> Path:
        return self.annual_results / "raw"

    def raw_annual_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> Path:
        return self.raw_annual_results / scenario / variable / f"{year}_{gcm_member}.nc"

    def save_raw_annual_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        path = self.raw_annual_results_path(scenario, variable, year, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    @property
    def compiled_annual_results(self) -> Path:
        return self.raw_annual_results / "compiled"

    def compiled_annual_results_path(
        self,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> Path:
        return self.compiled_annual_results / scenario / variable / f"{gcm_member}.nc"

    def save_compiled_annual_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> None:
        path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        touch(path, clobber=True)
        results_ds.to_netcdf(path)

    def annual_results_path(
        self,
        scenario: str,
        variable: str,
        draw: int | str,
    ) -> Path:
        return self.annual_results / scenario / variable / f"{draw:0>3}.nc"

    def link_annual_draw(
        self,
        draw: int | str,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> None:
        source_path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        dest_path = self.annual_results_path(scenario, variable, draw)
        mkdir(dest_path.parent, exist_ok=True, parents=True)
        if dest_path.exists():
            dest_path.unlink()
        dest_path.symlink_to(source_path)

save_parquet(df: pd.DataFrame, output_path: str | Path) -> None

Save a pandas DataFrame to a file with standard parameters.

Parameters

df The DataFrame to save. output_path The path to save the DataFrame to.

Source code in src/climate_data/data.py
def save_parquet(
    df: pd.DataFrame,
    output_path: str | Path,
) -> None:
    """Save a pandas DataFrame to a file with standard parameters.

    Parameters
    ----------
    df
        The DataFrame to save.
    output_path
        The path to save the DataFrame to.
    """
    touch(output_path, clobber=True)
    df.to_parquet(output_path)

save_raster(raster: rt.RasterArray, output_path: str | Path, num_cores: int = 1, **kwargs: Any) -> None

Save a raster to a file with standard parameters.

Parameters

raster The raster to save. output_path The path to save the raster to. num_cores The number of cores to use for compression.

Source code in src/climate_data/data.py
def save_raster(
    raster: rt.RasterArray,
    output_path: str | Path,
    num_cores: int = 1,
    **kwargs: Any,
) -> None:
    """Save a raster to a file with standard parameters.

    Parameters
    ----------
    raster
        The raster to save.
    output_path
        The path to save the raster to.
    num_cores
        The number of cores to use for compression.
    """
    save_params = {
        "tiled": True,
        "blockxsize": 512,
        "blockysize": 512,
        "compress": "ZSTD",
        "predictor": 2,  # horizontal differencing
        "num_threads": num_cores,
        "bigtiff": "yes",
        **kwargs,
    }
    touch(output_path, clobber=True)
    raster.to_file(output_path, **save_params)

save_raster_to_cog(raster: rt.RasterArray, output_path: str | Path, num_cores: int = 1, resampling: str = 'nearest') -> None

Save a raster to a COG file.

A COG file is a cloud-optimized GeoTIFF that is optimized for use in cloud storage systems. This function saves the raster to a COG file with the specified resampling method.

Parameters

raster The raster to save. output_path The path to save the raster to. num_cores The number of cores to use for compression. resampling The resampling method to use when building the overviews.

Source code in src/climate_data/data.py
def save_raster_to_cog(
    raster: rt.RasterArray,
    output_path: str | Path,
    num_cores: int = 1,
    resampling: str = "nearest",
) -> None:
    """Save a raster to a COG file.

    A COG file is a cloud-optimized GeoTIFF that is optimized for use in cloud storage
    systems. This function saves the raster to a COG file with the specified resampling
    method.

    Parameters
    ----------
    raster
        The raster to save.
    output_path
        The path to save the raster to.
    num_cores
        The number of cores to use for compression.
    resampling
        The resampling method to use when building the overviews.
    """
    cog_save_params = {
        "driver": "COG",
        "overview_resampling": resampling,
    }
    save_raster(raster, output_path, num_cores, **cog_save_params)

save_xarray(ds: xr.Dataset, output_path: str | Path, encoding_kwargs: dict[str, Any]) -> None

Save an xarray dataset to a file with standard parameters.

Parameters

ds The dataset to save. output_path The path to save the dataset to. encoding_kwargs The encoding parameters to use when saving the dataset.

Source code in src/climate_data/data.py
def save_xarray(
    ds: xr.Dataset,
    output_path: str | Path,
    encoding_kwargs: dict[str, Any],
) -> None:
    """Save an xarray dataset to a file with standard parameters.

    Parameters
    ----------
    ds
        The dataset to save.
    output_path
        The path to save the dataset to.
    encoding_kwargs
        The encoding parameters to use when saving the dataset.
    """
    touch(output_path, clobber=True)
    encoding = {
        "dtype": "int16",
        "_FillValue": -32767,
        "zlib": True,
        "complevel": 1,
    }
    encoding.update(encoding_kwargs)
    ds.to_netcdf(output_path, encoding={"value": encoding})

extract

Climate Data Extraction

This module contains pipelines for extracting climate data from various sources.

cmip6

CMIP6 Data Extraction

extract_cmip6(cmip6_source: list[str], cmip6_experiment: list[str], cmip6_variable: list[str], output_dir: str, queue: str, overwrite: bool) -> None

Extract CMIP6 data.

Extracts CMIP6 data for the given source, experiment, and variable. We use the the table at https://www.nature.com/articles/s41597-023-02549-6/tables/3 to determine which CMIP6 source_ids to include. See ClimateData.load_koppen_geiger_model_inclusion to load and examine this table. The extraction criteria does not completely capture model inclusion criteria as it does not account for the year range avaialable in the data. This determiniation is made when we proccess the data in later steps.

Source code in src/climate_data/extract/cmip6.py
@click.command()
@clio.with_cmip6_source(allow_all=True)
@clio.with_cmip6_experiment(allow_all=True)
@clio.with_cmip6_variable(allow_all=True)
@clio.with_output_directory(cdc.MODEL_ROOT)
@clio.with_queue()
@clio.with_overwrite()
def extract_cmip6(
    cmip6_source: list[str],
    cmip6_experiment: list[str],
    cmip6_variable: list[str],
    output_dir: str,
    queue: str,
    overwrite: bool,
) -> None:
    """Extract CMIP6 data.

    Extracts CMIP6 data for the given source, experiment, and variable. We use the
    the table at https://www.nature.com/articles/s41597-023-02549-6/tables/3 to determine
    which CMIP6 source_ids to include. See `ClimateData.load_koppen_geiger_model_inclusion`
    to load and examine this table. The extraction criteria does not completely
    capture model inclusion criteria as it does not account for the year range avaialable
    in the data. This determiniation is made when we proccess the data in later steps.
    """
    overwrite_arg = {"overwrite": None} if overwrite else {}

    jobmon.run_parallel(
        runner="cdtask",
        task_name="extract cmip6",
        node_args={
            "cmip6-source": cmip6_source,
            "cmip6-experiment": cmip6_experiment,
            "cmip6-variable": cmip6_variable,
        },
        task_args={
            "output-dir": output_dir,
            **overwrite_arg,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "10G",
            "runtime": "3000m",
            "project": "proj_rapidresponse",
        },
        max_attempts=1,
        concurrency_limit=50,
    )

load_cmip_data(zarr_path: str) -> xr.Dataset

Loads a CMIP6 dataset from a zarr path.

Source code in src/climate_data/extract/cmip6.py
def load_cmip_data(zarr_path: str) -> xr.Dataset:
    """Loads a CMIP6 dataset from a zarr path."""
    gcs = gcsfs.GCSFileSystem(token="anon")  # noqa: S106
    mapper = gcs.get_mapper(zarr_path)
    ds = xr.open_zarr(mapper, consolidated=True)
    ds = ds.drop_vars(
        ["lat_bnds", "lon_bnds", "time_bnds", "height", "time_bounds", "bnds"],
        errors="ignore",
    )
    return ds  # type: ignore[no-any-return]

elevation

extract_elevation(model_name: str, output_dir: str, queue: str) -> None

Download elevation data from Open Topography.

Source code in src/climate_data/extract/elevation.py
@click.command()
@click.option(
    "--generate-name",
    required=True,
    type=click.Choice(ELEVATION_MODELS),
    help="Name of the elevation model to download.",
)
@clio.with_output_directory(cdc.MODEL_ROOT)
@clio.with_queue()
def extract_elevation(
    model_name: str,
    output_dir: str,
    queue: str,
) -> None:
    """Download elevation data from Open Topography."""
    invalid = True
    if invalid:
        msg = "Downloaded using aws cli, this implementation is not valid"
        raise NotImplementedError(msg)

    lat_starts = list(range(-90, 90, FETCH_SIZE))
    lon_starts = list(range(-180, 180, FETCH_SIZE))

    jobmon.run_parallel(
        runner="cdtask",
        task_name="extract elevation",
        node_args={
            "model-name": [model_name],
            "lat-start": lat_starts,
            "lon-start": lon_starts,
        },
        task_args={
            "output-dir": output_dir,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "10G",
            "runtime": "240m",
            "project": "proj_rapidresponse",
        },
    )

extract_elevation_task(model_name: str, lat_start: int, lon_start: int, output_dir: str) -> None

Download elevation data from Open Topography.

Source code in src/climate_data/extract/elevation.py
@click.command()
@click.option(
    "--model-name",
    required=True,
    type=click.Choice(ELEVATION_MODELS),
    help="Name of the elevation model to download.",
)
@click.option(
    "--lat-start",
    required=True,
    type=int,
    help="Latitude of the top-left corner of the tile.",
)
@click.option(
    "--lon-start",
    required=True,
    type=int,
    help="Longitude of the top-left corner of the tile.",
)
@clio.with_output_directory(cdc.MODEL_ROOT)
def extract_elevation_task(
    model_name: str,
    lat_start: int,
    lon_start: int,
    output_dir: str,
) -> None:
    """Download elevation data from Open Topography."""
    invalid = True
    if invalid:
        msg = "Downloaded using aws cli, this implementation is not valid"
        raise NotImplementedError(msg)

    extract_elevation_main(model_name, lat_start, lon_start, output_dir)

era5

ERA5 Data Extraction

generate

utils

buck_vapor_pressure(temperature_c: xr.Dataset) -> xr.Dataset

Approximate vapor pressure of water.

https://en.wikipedia.org/wiki/Arden_Buck_equation https://journals.ametsoc.org/view/journals/apme/20/12/1520-0450_1981_020_1527_nefcvp_2_0_co_2.xml

Parameters

temperature_c Temperature in Celsius

Returns

xr.Dataset Vapor pressure in hPa

Source code in src/climate_data/generate/utils.py
def buck_vapor_pressure(temperature_c: xr.Dataset) -> xr.Dataset:
    """Approximate vapor pressure of water.

    https://en.wikipedia.org/wiki/Arden_Buck_equation
    https://journals.ametsoc.org/view/journals/apme/20/12/1520-0450_1981_020_1527_nefcvp_2_0_co_2.xml

    Parameters
    ----------
    temperature_c
        Temperature in Celsius

    Returns
    -------
    xr.Dataset
        Vapor pressure in hPa
    """
    over_water = 6.1121 * np.exp(
        (18.678 - temperature_c / 234.5) * (temperature_c / (257.14 + temperature_c))
    )
    over_ice = 6.1115 * np.exp(
        (23.036 - temperature_c / 333.7) * (temperature_c / (279.82 + temperature_c))
    )
    vp = xr.where(temperature_c > 0, over_water, over_ice)  # type: ignore[no-untyped-call]
    return vp  # type: ignore[no-any-return]

identity(ds: xr.Dataset) -> xr.Dataset

Identity transformation

Source code in src/climate_data/generate/utils.py
def identity(ds: xr.Dataset) -> xr.Dataset:
    """Identity transformation"""
    return ds

interpolate_to_target_latlon(ds: xr.Dataset, method: str = 'nearest', target_lon: xr.DataArray = cdc.TARGET_LONGITUDE, target_lat: xr.DataArray = cdc.TARGET_LATITUDE) -> xr.Dataset

Interpolate a dataset to a target latitude and longitude grid.

Parameters

ds Dataset to interpolate method Interpolation method target_lon Target longitude grid target_lat Target latitude grid

Returns

xr.Dataset Interpolated dataset

Source code in src/climate_data/generate/utils.py
def interpolate_to_target_latlon(
    ds: xr.Dataset,
    method: str = "nearest",
    target_lon: xr.DataArray = cdc.TARGET_LONGITUDE,
    target_lat: xr.DataArray = cdc.TARGET_LATITUDE,
) -> xr.Dataset:
    """Interpolate a dataset to a target latitude and longitude grid.

    Parameters
    ----------
    ds
        Dataset to interpolate
    method
        Interpolation method
    target_lon
        Target longitude grid
    target_lat
        Target latitude grid

    Returns
    -------
    xr.Dataset
        Interpolated dataset
    """
    return (
        ds.interp(longitude=target_lon, latitude=target_lat, method=method)  # type: ignore[arg-type]
        .interpolate_na(dim="longitude", method="nearest", fill_value="extrapolate")
        .sortby("latitude")
        .interpolate_na(dim="latitude", method="nearest", fill_value="extrapolate")
        .sortby("latitude", ascending=False)
    )

kelvin_to_celsius(temperature_k: xr.Dataset) -> xr.Dataset

Convert temperature from Kelvin to Celsius

Parameters

temperature_k Temperature in Kelvin

Returns

xr.Dataset Temperature in Celsius

Source code in src/climate_data/generate/utils.py
def kelvin_to_celsius(temperature_k: xr.Dataset) -> xr.Dataset:
    """Convert temperature from Kelvin to Celsius

    Parameters
    ----------
    temperature_k
        Temperature in Kelvin

    Returns
    -------
    xr.Dataset
        Temperature in Celsius
    """
    return temperature_k - 273.15

meter_to_millimeter(rainfall_m: xr.Dataset) -> xr.Dataset

Convert rainfall from meters to millimeters

Parameters

rainfall_m Rainfall in meters

Returns

xr.Dataset Rainfall in millimeters

Source code in src/climate_data/generate/utils.py
def meter_to_millimeter(rainfall_m: xr.Dataset) -> xr.Dataset:
    """Convert rainfall from meters to millimeters

    Parameters
    ----------
    rainfall_m
        Rainfall in meters

    Returns
    -------
    xr.Dataset
        Rainfall in millimeters
    """
    return 1000 * rainfall_m

precipitation_flux_to_rainfall(precipitation_flux: xr.Dataset) -> xr.Dataset

Convert precipitation flux to rainfall

Parameters

precipitation_flux Precipitation flux in kg m-2 s-1

Returns

xr.Dataset Rainfall in mm/day

Source code in src/climate_data/generate/utils.py
def precipitation_flux_to_rainfall(precipitation_flux: xr.Dataset) -> xr.Dataset:
    """Convert precipitation flux to rainfall

    Parameters
    ----------
    precipitation_flux
        Precipitation flux in kg m-2 s-1

    Returns
    -------
    xr.Dataset
        Rainfall in mm/day
    """
    seconds_per_day = 86400
    mm_per_kg_m2 = 1
    return seconds_per_day * mm_per_kg_m2 * precipitation_flux

rh_percent(temperature_c: xr.Dataset, dewpoint_temperature_c: xr.Dataset) -> xr.Dataset

Calculate relative humidity from temperature and dewpoint temperature.

Parameters

temperature_c Temperature in Celsius dewpoint_temperature_c Dewpoint temperature in Celsius

Returns

xr.Dataset Relative humidity as a percentage

Source code in src/climate_data/generate/utils.py
def rh_percent(
    temperature_c: xr.Dataset, dewpoint_temperature_c: xr.Dataset
) -> xr.Dataset:
    """Calculate relative humidity from temperature and dewpoint temperature.

    Parameters
    ----------
    temperature_c
        Temperature in Celsius
    dewpoint_temperature_c
        Dewpoint temperature in Celsius

    Returns
    -------
    xr.Dataset
        Relative humidity as a percentage
    """
    # saturation vapour pressure
    svp = buck_vapor_pressure(temperature_c)
    # actual vapour pressure
    vp = buck_vapor_pressure(dewpoint_temperature_c)
    return 100 * vp / svp

scale_wind_speed_height(wind_speed_10m: xr.Dataset) -> xr.Dataset

Scaling wind speed from a height of 10 meters to a height of 2 meters

Reference: Bröde et al. (2012) https://doi.org/10.1007/s00484-011-0454-1

Parameters

wind_speed_10m The 10m wind speed [m/s]. May be signed (ie a velocity component)

Returns

xr.DataSet The 2m wind speed [m/s]. May be signed (ie a velocity component)

Source code in src/climate_data/generate/utils.py
def scale_wind_speed_height(wind_speed_10m: xr.Dataset) -> xr.Dataset:
    """Scaling wind speed from a height of 10 meters to a height of 2 meters

    Reference: Bröde et al. (2012)
    https://doi.org/10.1007/s00484-011-0454-1

    Parameters
    ----------
    wind_speed_10m
        The 10m wind speed [m/s]. May be signed (ie a velocity component)

    Returns
    -------
    xr.DataSet
        The 2m wind speed [m/s]. May be signed (ie a velocity component)
    """
    scale_factor = np.log10(2 / 0.01) / np.log10(10 / 0.01)
    return scale_factor * wind_speed_10m  # type: ignore[no-any-return]

vector_magnitude(x: xr.Dataset, y: xr.Dataset) -> xr.Dataset

Calculate the magnitude of a vector.

Source code in src/climate_data/generate/utils.py
def vector_magnitude(x: xr.Dataset, y: xr.Dataset) -> xr.Dataset:
    """Calculate the magnitude of a vector."""
    return np.sqrt(x**2 + y**2)  # type: ignore[return-value]

utils

Climate Data Utilities

Utility functions for working with climate data.

make_raster_template(x_min: int | float, y_min: int | float, stride: int | float, resolution: int | float, crs: str = 'EPSG:4326') -> rt.RasterArray

Create a raster template with the specified dimensions and resolution.

A raster template is a RasterArray with a specified extent, resolution, and CRS. The data values are initialized to zero. This function is useful for creating a template to use when resampling another raster to a common grid.

Parameters

x_min The minimum x-coordinate of the raster. y_min The minimum y-coordinate of the raster. stride The length of one side of the raster in the x and y directions measured in the units of the provided coordinate reference system. resolution The resolution of the raster in the units of the provided coordinate reference system. crs The coordinate reference system of the generated raster.

Returns

rt.RasterArray A raster template with the specified dimensions and resolution.

Source code in src/climate_data/utils.py
def make_raster_template(
    x_min: int | float,
    y_min: int | float,
    stride: int | float,
    resolution: int | float,
    crs: str = "EPSG:4326",
) -> rt.RasterArray:
    """Create a raster template with the specified dimensions and resolution.

    A raster template is a RasterArray with a specified extent, resolution, and CRS. The data
    values are initialized to zero. This function is useful for creating a template to use
    when resampling another raster to a common grid.

    Parameters
    ----------
    x_min
        The minimum x-coordinate of the raster.
    y_min
        The minimum y-coordinate of the raster.
    stride
        The length of one side of the raster in the x and y directions measured in the units
        of the provided coordinate reference system.
    resolution
        The resolution of the raster in the units of the provided coordinate reference system.
    crs
        The coordinate reference system of the generated raster.

    Returns
    -------
    rt.RasterArray
        A raster template with the specified dimensions and resolution.
    """
    tolerance = 1e-12
    evenly_divides = (stride % resolution < tolerance) or (
        resolution - stride % resolution < tolerance
    )
    if not evenly_divides:
        msg = "Stride must be a multiple of resolution"
        raise ValueError(msg)

    transform = Affine(
        a=resolution,
        b=0,
        c=x_min,
        d=0,
        e=-resolution,
        f=y_min + stride,
    )

    n_pix = int(stride / resolution)

    data = np.zeros((n_pix, n_pix), dtype=np.int8)
    return rt.RasterArray(
        data,
        transform,
        crs=crs,
        no_data_value=-1,
    )

to_raster(ds: xr.DataArray, no_data_value: float | int, lat_col: str = 'lat', lon_col: str = 'lon', crs: str = 'EPSG:4326') -> rt.RasterArray

Convert an xarray DataArray to a RasterArray.

Parameters

ds The xarray DataArray to convert. no_data_value The value to use for missing data. This should be consistent with the dtype of the data. lat_col The name of the latitude coordinate in the dataset. lon_col The name of the longitude coordinate in the dataset. crs The coordinate reference system of the data.

Returns

rt.RasterArray The RasterArray representation of the input data.

Source code in src/climate_data/utils.py
def to_raster(
    ds: xr.DataArray,
    no_data_value: float | int,
    lat_col: str = "lat",
    lon_col: str = "lon",
    crs: str = "EPSG:4326",
) -> rt.RasterArray:
    """Convert an xarray DataArray to a RasterArray.

    Parameters
    ----------
    ds
        The xarray DataArray to convert.
    no_data_value
        The value to use for missing data. This should be consistent with the dtype of the data.
    lat_col
        The name of the latitude coordinate in the dataset.
    lon_col
        The name of the longitude coordinate in the dataset.
    crs
        The coordinate reference system of the data.

    Returns
    -------
    rt.RasterArray
        The RasterArray representation of the input data.
    """
    lat, lon = ds[lat_col].data, ds[lon_col].data

    dlat = (lat[1:] - lat[:-1]).mean()
    dlon = (lon[1:] - lon[:-1]).mean()

    transform = Affine(
        a=dlon,
        b=0.0,
        c=lon[0],
        d=0.0,
        e=-dlat,
        f=lat[-1],
    )
    return rt.RasterArray(
        data=ds.data[::-1],
        transform=transform,
        crs=crs,
        no_data_value=no_data_value,
    )