Skip to content

extract

Climate Data Extraction

This module contains pipelines for extracting climate data from various sources.

cmip6

CMIP6 Data Extraction

extract_cmip6(cmip6_source: list[str], cmip6_experiment: list[str], cmip6_variable: list[str], output_dir: str, queue: str, overwrite: bool) -> None

Extract CMIP6 data.

Extracts CMIP6 data for the given source, experiment, and variable. We use the the table at https://www.nature.com/articles/s41597-023-02549-6/tables/3 to determine which CMIP6 source_ids to include. See ClimateData.load_koppen_geiger_model_inclusion to load and examine this table. The extraction criteria does not completely capture model inclusion criteria as it does not account for the year range avaialable in the data. This determiniation is made when we proccess the data in later steps.

Source code in src/climate_data/extract/cmip6.py
@click.command()
@clio.with_cmip6_source(allow_all=True)
@clio.with_cmip6_experiment(allow_all=True)
@clio.with_cmip6_variable(allow_all=True)
@clio.with_output_directory(cdc.MODEL_ROOT)
@clio.with_queue()
@clio.with_overwrite()
def extract_cmip6(
    cmip6_source: list[str],
    cmip6_experiment: list[str],
    cmip6_variable: list[str],
    output_dir: str,
    queue: str,
    overwrite: bool,
) -> None:
    """Extract CMIP6 data.

    Extracts CMIP6 data for the given source, experiment, and variable. We use the
    the table at https://www.nature.com/articles/s41597-023-02549-6/tables/3 to determine
    which CMIP6 source_ids to include. See `ClimateData.load_koppen_geiger_model_inclusion`
    to load and examine this table. The extraction criteria does not completely
    capture model inclusion criteria as it does not account for the year range avaialable
    in the data. This determiniation is made when we proccess the data in later steps.
    """
    overwrite_arg = {"overwrite": None} if overwrite else {}

    jobmon.run_parallel(
        runner="cdtask",
        task_name="extract cmip6",
        node_args={
            "cmip6-source": cmip6_source,
            "cmip6-experiment": cmip6_experiment,
            "cmip6-variable": cmip6_variable,
        },
        task_args={
            "output-dir": output_dir,
            **overwrite_arg,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "10G",
            "runtime": "3000m",
            "project": "proj_rapidresponse",
        },
        max_attempts=1,
        concurrency_limit=50,
    )

load_cmip_data(zarr_path: str) -> xr.Dataset

Loads a CMIP6 dataset from a zarr path.

Source code in src/climate_data/extract/cmip6.py
def load_cmip_data(zarr_path: str) -> xr.Dataset:
    """Loads a CMIP6 dataset from a zarr path."""
    gcs = gcsfs.GCSFileSystem(token="anon")  # noqa: S106
    mapper = gcs.get_mapper(zarr_path)
    ds = xr.open_zarr(mapper, consolidated=True)
    ds = ds.drop_vars(
        ["lat_bnds", "lon_bnds", "time_bnds", "height", "time_bounds", "bnds"],
        errors="ignore",
    )
    return ds  # type: ignore[no-any-return]

elevation

extract_elevation(model_name: str, output_dir: str, queue: str) -> None

Download elevation data from Open Topography.

Source code in src/climate_data/extract/elevation.py
@click.command()
@click.option(
    "--generate-name",
    required=True,
    type=click.Choice(ELEVATION_MODELS),
    help="Name of the elevation model to download.",
)
@clio.with_output_directory(cdc.MODEL_ROOT)
@clio.with_queue()
def extract_elevation(
    model_name: str,
    output_dir: str,
    queue: str,
) -> None:
    """Download elevation data from Open Topography."""
    invalid = True
    if invalid:
        msg = "Downloaded using aws cli, this implementation is not valid"
        raise NotImplementedError(msg)

    lat_starts = list(range(-90, 90, FETCH_SIZE))
    lon_starts = list(range(-180, 180, FETCH_SIZE))

    jobmon.run_parallel(
        runner="cdtask",
        task_name="extract elevation",
        node_args={
            "model-name": [model_name],
            "lat-start": lat_starts,
            "lon-start": lon_starts,
        },
        task_args={
            "output-dir": output_dir,
        },
        task_resources={
            "queue": queue,
            "cores": 1,
            "memory": "10G",
            "runtime": "240m",
            "project": "proj_rapidresponse",
        },
    )

extract_elevation_task(model_name: str, lat_start: int, lon_start: int, output_dir: str) -> None

Download elevation data from Open Topography.

Source code in src/climate_data/extract/elevation.py
@click.command()
@click.option(
    "--model-name",
    required=True,
    type=click.Choice(ELEVATION_MODELS),
    help="Name of the elevation model to download.",
)
@click.option(
    "--lat-start",
    required=True,
    type=int,
    help="Latitude of the top-left corner of the tile.",
)
@click.option(
    "--lon-start",
    required=True,
    type=int,
    help="Longitude of the top-left corner of the tile.",
)
@clio.with_output_directory(cdc.MODEL_ROOT)
def extract_elevation_task(
    model_name: str,
    lat_start: int,
    lon_start: int,
    output_dir: str,
) -> None:
    """Download elevation data from Open Topography."""
    invalid = True
    if invalid:
        msg = "Downloaded using aws cli, this implementation is not valid"
        raise NotImplementedError(msg)

    extract_elevation_main(model_name, lat_start, lon_start, output_dir)

era5

ERA5 Data Extraction