Skip to content

data

Climate Data Management

This module provides a class for managing the climate data used in the project. It includes methods for loading and saving data, as well as for accessing the various directories where data is stored. This abstraction allows for easy access to the data and ensures that all data is stored in a consistent and organized manner. It also provides a central location for managing the data, which makes it easier to update and maintain the path structure of the data as needed.

This module generally does not load or process data itself, though some exceptions are made for metadata which is generally loaded and cached on disk.

The main classes are: - PopulationModelData: Handles data from the gridded population modeling pipeline. This includes population estimates and projections as well as the location hierarchies for the population data. This class provides read-only access to the data. - ClimateData: Handles gridded climate data from the climate downscaling pipeline. This includes climate data for different scenarios and measures. This class provides read and write access to the data. - ClimateAggregateData: Handles the output data structure for climate aggregates. This includes raw results at the block level, final results at the measure level, and versioned results for different pipeline versions. This class provides both read and write access to the data.

ClimateAggregateData

Manages the output data structure for climate aggregates.

This class manages the file organization and paths for: 1. Reading and writing raw results at block level 2. Reading and writing final results at measure and scenario level 3. Versioning of results

Source code in src/climate_data/data.py
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
class ClimateAggregateData:
    """Manages the output data structure for climate aggregates.

    This class manages the file organization and paths for:
    1. Reading and writing raw results at block level
    2. Reading and writing final results at measure and scenario level
    3. Versioning of results
    """

    def __init__(
        self,
        root: str | Path = cdc.AGGREGATE_ROOT,
    ) -> None:
        """Initialize the climate aggregate data manager.

        Parameters
        ----------
        root
            Path to the model root directory
        """
        self._root = Path(root)
        self._create_model_root()

    def _create_model_root(self) -> None:
        """Create the model root directory and logs directory."""
        mkdir(self.root, exist_ok=True)
        mkdir(self.logs, exist_ok=True)

    @property
    def root(self) -> Path:
        """Get the root directory for model data."""
        return self._root

    @property
    def logs(self) -> Path:
        """Get the directory for log files."""
        return self.root / "logs"

    def log_dir(self, step_name: str) -> Path:
        """Get the directory for logs from a specific pipeline step.

        Parameters
        ----------
        step_name
            The name of the pipeline step

        Returns
        -------
        Path
            The directory for step-specific logs
        """
        return self.logs / step_name

    def version_root(self, version: str) -> Path:
        """Get the root directory for a specific version.

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The directory for version-specific data
        """
        return self.root / version

    def raw_results_root(self, version: str) -> Path:
        """Get the directory for raw results (block-level).

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The directory for raw results
        """
        return self.version_root(version) / "raw-results"

    def raw_results_path(
        self, version: str, hierarchy: str, block_key: str, draw: str
    ) -> Path:
        """Get the path to raw results for a specific hierarchy, block, and draw.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        block_key
            The block key
        draw
            The draw of the climate data (e.g. "000")

        Returns
        -------
        Path
            The path to the raw results file
        """
        root = self.raw_results_root(version)
        return root / hierarchy / block_key / f"{draw}.parquet"

    def save_raw_results(
        self,
        df: pd.DataFrame,
        version: str,
        hierarchy: str,
        block_key: str,
        draw: str,
    ) -> None:
        """Save raw results for a specific hierarchy, block, and draw.

        Parameters
        ----------
        df
            The results to save
        version
            The version identifier
        hierarchy
            The location hierarchy
        block_key
            The block key
        draw
            The draw of the climate data to save (e.g. "000")
        """
        path = self.raw_results_path(version, hierarchy, block_key, draw)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_parquet(df, path)

    def load_raw_results(
        self,
        version: str,
        hierarchy: str,
        block_key: str,
        draw: str,
        measure: str | None = None,
        scenario: str | None = None,
    ) -> pd.DataFrame:
        """Load raw results for a specific hierarchy, block, and draw.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        block_key
            The block key
        draw
            The draw of the climate data to load (e.g. "000")
        measure
            If provided, filter results to only include this measure
        scenario
            If provided, filter results to only include this scenario

        Returns
        -------
        pd.DataFrame
            The raw results
        """
        path = self.raw_results_path(version, hierarchy, block_key, draw)

        # Build filters for parquet's read_parquet function
        filters = []
        if measure is not None:
            filters.append(("measure", "==", measure))
        if scenario is not None:
            filters.append(("scenario", "==", scenario))

        return pd.read_parquet(path, filters=filters)

    def results_root(self, version: str) -> Path:
        """Get the directory for final results (measure-level).

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The directory for final results
        """
        return self.version_root(version) / "results"

    def population_path(self, version: str, hierarchy: str) -> Path:
        """Get the path to population data for a specific hierarchy.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy

        Returns
        -------
        Path
            The path to the population data file
        """
        return self.results_root(version) / hierarchy / "population.parquet"

    def save_population(self, df: pd.DataFrame, version: str, hierarchy: str) -> None:
        """Save population data for a specific hierarchy.

        Parameters
        ----------
        df
            The population data to save
        version
            The version identifier
        hierarchy
            The location hierarchy
        """
        path = self.population_path(version, hierarchy)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_parquet(df, path)

    def load_population(
        self, version: str, hierarchy: str, location_id: int | None = None
    ) -> pd.DataFrame:
        """Load population data for a specific hierarchy and optionally location.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        location_id
            If provided, load only data for this location

        Returns
        -------
        pd.DataFrame
            The population data
        """
        path = self.population_path(version, hierarchy)
        if location_id is not None:
            filters = [("location_id", "==", location_id)]
            return pd.read_parquet(path, filters=filters)
        return pd.read_parquet(path)

    def results_path(
        self, version: str, hierarchy: str, scenario: str, measure: str
    ) -> Path:
        """Get the path to final results for a specific scenario and measure.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        scenario
            The climate scenario
        measure
            The climate measure

        Returns
        -------
        Path
            The path to the results file
        """
        return self.results_root(version) / hierarchy / f"{measure}_{scenario}.parquet"

    def save_results(
        self,
        df: pd.DataFrame,
        version: str,
        hierarchy: str,
        scenario: str,
        measure: str,
    ) -> None:
        """Save final results for a specific scenario and measure.

        Parameters
        ----------
        df
            The results to save
        version
            The version identifier
        hierarchy
            The location hierarchy
        scenario
            The climate scenario
        measure
            The climate measure
        """
        path = self.results_path(version, hierarchy, scenario, measure)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_parquet(df, path)

    def load_results(
        self,
        version: str,
        hierarchy: str,
        scenario: str,
        measure: str,
        location_id: int | None = None,
    ) -> pd.DataFrame:
        """Load final results for a specific scenario and measure.

        Parameters
        ----------
        version
            The version identifier
        hierarchy
            The location hierarchy
        scenario
            The climate scenario
        measure
            The climate measure
        location_id
            If provided, load only data for this location

        Returns
        -------
        pd.DataFrame
            The results
        """
        path = self.results_path(version, hierarchy, scenario, measure)
        if location_id is not None:
            filters = [("location_id", "==", location_id)]
            return pd.read_parquet(path, filters=filters)
        return pd.read_parquet(path)

    def diagnostics_root(self, version: str, hierarchy: str) -> Path:
        """Get the path to the diagnostics directory.

        Parameters
        ----------
        version
            The version identifier

        Returns
        -------
        Path
            The path to the diagnostics directory
        """
        return self.version_root(version) / "diagnostics" / hierarchy

    def grid_plots_pages_root(self, version: str, hierarchy: str) -> Path:
        return self.diagnostics_root(version, hierarchy) / "grid_plots_pages"

    def grid_plots_page_path(
        self, version: str, hierarchy: str, location_id: int
    ) -> Path:
        path = self.grid_plots_pages_root(version, hierarchy) / f"{location_id}.pdf"
        mkdir(path.parent, exist_ok=True, parents=True)
        return path

    def grid_plots_path(self, version: str, hierarchy: str) -> Path:
        path = self.diagnostics_root(version, hierarchy) / f"grid_plots_{hierarchy}.pdf"
        mkdir(path.parent, exist_ok=True, parents=True)
        return path

logs: Path property

Get the directory for log files.

root: Path property

Get the root directory for model data.

__init__(root: str | Path = cdc.AGGREGATE_ROOT) -> None

Initialize the climate aggregate data manager.

Parameters

root Path to the model root directory

Source code in src/climate_data/data.py
def __init__(
    self,
    root: str | Path = cdc.AGGREGATE_ROOT,
) -> None:
    """Initialize the climate aggregate data manager.

    Parameters
    ----------
    root
        Path to the model root directory
    """
    self._root = Path(root)
    self._create_model_root()

_create_model_root() -> None

Create the model root directory and logs directory.

Source code in src/climate_data/data.py
def _create_model_root(self) -> None:
    """Create the model root directory and logs directory."""
    mkdir(self.root, exist_ok=True)
    mkdir(self.logs, exist_ok=True)

diagnostics_root(version: str, hierarchy: str) -> Path

Get the path to the diagnostics directory.

Parameters

version The version identifier

Returns

Path The path to the diagnostics directory

Source code in src/climate_data/data.py
def diagnostics_root(self, version: str, hierarchy: str) -> Path:
    """Get the path to the diagnostics directory.

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The path to the diagnostics directory
    """
    return self.version_root(version) / "diagnostics" / hierarchy

load_population(version: str, hierarchy: str, location_id: int | None = None) -> pd.DataFrame

Load population data for a specific hierarchy and optionally location.

Parameters

version The version identifier hierarchy The location hierarchy location_id If provided, load only data for this location

Returns

pd.DataFrame The population data

Source code in src/climate_data/data.py
def load_population(
    self, version: str, hierarchy: str, location_id: int | None = None
) -> pd.DataFrame:
    """Load population data for a specific hierarchy and optionally location.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    location_id
        If provided, load only data for this location

    Returns
    -------
    pd.DataFrame
        The population data
    """
    path = self.population_path(version, hierarchy)
    if location_id is not None:
        filters = [("location_id", "==", location_id)]
        return pd.read_parquet(path, filters=filters)
    return pd.read_parquet(path)

load_raw_results(version: str, hierarchy: str, block_key: str, draw: str, measure: str | None = None, scenario: str | None = None) -> pd.DataFrame

Load raw results for a specific hierarchy, block, and draw.

Parameters

version The version identifier hierarchy The location hierarchy block_key The block key draw The draw of the climate data to load (e.g. "000") measure If provided, filter results to only include this measure scenario If provided, filter results to only include this scenario

Returns

pd.DataFrame The raw results

Source code in src/climate_data/data.py
def load_raw_results(
    self,
    version: str,
    hierarchy: str,
    block_key: str,
    draw: str,
    measure: str | None = None,
    scenario: str | None = None,
) -> pd.DataFrame:
    """Load raw results for a specific hierarchy, block, and draw.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    block_key
        The block key
    draw
        The draw of the climate data to load (e.g. "000")
    measure
        If provided, filter results to only include this measure
    scenario
        If provided, filter results to only include this scenario

    Returns
    -------
    pd.DataFrame
        The raw results
    """
    path = self.raw_results_path(version, hierarchy, block_key, draw)

    # Build filters for parquet's read_parquet function
    filters = []
    if measure is not None:
        filters.append(("measure", "==", measure))
    if scenario is not None:
        filters.append(("scenario", "==", scenario))

    return pd.read_parquet(path, filters=filters)

load_results(version: str, hierarchy: str, scenario: str, measure: str, location_id: int | None = None) -> pd.DataFrame

Load final results for a specific scenario and measure.

Parameters

version The version identifier hierarchy The location hierarchy scenario The climate scenario measure The climate measure location_id If provided, load only data for this location

Returns

pd.DataFrame The results

Source code in src/climate_data/data.py
def load_results(
    self,
    version: str,
    hierarchy: str,
    scenario: str,
    measure: str,
    location_id: int | None = None,
) -> pd.DataFrame:
    """Load final results for a specific scenario and measure.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    scenario
        The climate scenario
    measure
        The climate measure
    location_id
        If provided, load only data for this location

    Returns
    -------
    pd.DataFrame
        The results
    """
    path = self.results_path(version, hierarchy, scenario, measure)
    if location_id is not None:
        filters = [("location_id", "==", location_id)]
        return pd.read_parquet(path, filters=filters)
    return pd.read_parquet(path)

log_dir(step_name: str) -> Path

Get the directory for logs from a specific pipeline step.

Parameters

step_name The name of the pipeline step

Returns

Path The directory for step-specific logs

Source code in src/climate_data/data.py
def log_dir(self, step_name: str) -> Path:
    """Get the directory for logs from a specific pipeline step.

    Parameters
    ----------
    step_name
        The name of the pipeline step

    Returns
    -------
    Path
        The directory for step-specific logs
    """
    return self.logs / step_name

population_path(version: str, hierarchy: str) -> Path

Get the path to population data for a specific hierarchy.

Parameters

version The version identifier hierarchy The location hierarchy

Returns

Path The path to the population data file

Source code in src/climate_data/data.py
def population_path(self, version: str, hierarchy: str) -> Path:
    """Get the path to population data for a specific hierarchy.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy

    Returns
    -------
    Path
        The path to the population data file
    """
    return self.results_root(version) / hierarchy / "population.parquet"

raw_results_path(version: str, hierarchy: str, block_key: str, draw: str) -> Path

Get the path to raw results for a specific hierarchy, block, and draw.

Parameters

version The version identifier hierarchy The location hierarchy block_key The block key draw The draw of the climate data (e.g. "000")

Returns

Path The path to the raw results file

Source code in src/climate_data/data.py
def raw_results_path(
    self, version: str, hierarchy: str, block_key: str, draw: str
) -> Path:
    """Get the path to raw results for a specific hierarchy, block, and draw.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    block_key
        The block key
    draw
        The draw of the climate data (e.g. "000")

    Returns
    -------
    Path
        The path to the raw results file
    """
    root = self.raw_results_root(version)
    return root / hierarchy / block_key / f"{draw}.parquet"

raw_results_root(version: str) -> Path

Get the directory for raw results (block-level).

Parameters

version The version identifier

Returns

Path The directory for raw results

Source code in src/climate_data/data.py
def raw_results_root(self, version: str) -> Path:
    """Get the directory for raw results (block-level).

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The directory for raw results
    """
    return self.version_root(version) / "raw-results"

results_path(version: str, hierarchy: str, scenario: str, measure: str) -> Path

Get the path to final results for a specific scenario and measure.

Parameters

version The version identifier hierarchy The location hierarchy scenario The climate scenario measure The climate measure

Returns

Path The path to the results file

Source code in src/climate_data/data.py
def results_path(
    self, version: str, hierarchy: str, scenario: str, measure: str
) -> Path:
    """Get the path to final results for a specific scenario and measure.

    Parameters
    ----------
    version
        The version identifier
    hierarchy
        The location hierarchy
    scenario
        The climate scenario
    measure
        The climate measure

    Returns
    -------
    Path
        The path to the results file
    """
    return self.results_root(version) / hierarchy / f"{measure}_{scenario}.parquet"

results_root(version: str) -> Path

Get the directory for final results (measure-level).

Parameters

version The version identifier

Returns

Path The directory for final results

Source code in src/climate_data/data.py
def results_root(self, version: str) -> Path:
    """Get the directory for final results (measure-level).

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The directory for final results
    """
    return self.version_root(version) / "results"

save_population(df: pd.DataFrame, version: str, hierarchy: str) -> None

Save population data for a specific hierarchy.

Parameters

df The population data to save version The version identifier hierarchy The location hierarchy

Source code in src/climate_data/data.py
def save_population(self, df: pd.DataFrame, version: str, hierarchy: str) -> None:
    """Save population data for a specific hierarchy.

    Parameters
    ----------
    df
        The population data to save
    version
        The version identifier
    hierarchy
        The location hierarchy
    """
    path = self.population_path(version, hierarchy)
    mkdir(path.parent, exist_ok=True, parents=True)
    save_parquet(df, path)

save_raw_results(df: pd.DataFrame, version: str, hierarchy: str, block_key: str, draw: str) -> None

Save raw results for a specific hierarchy, block, and draw.

Parameters

df The results to save version The version identifier hierarchy The location hierarchy block_key The block key draw The draw of the climate data to save (e.g. "000")

Source code in src/climate_data/data.py
def save_raw_results(
    self,
    df: pd.DataFrame,
    version: str,
    hierarchy: str,
    block_key: str,
    draw: str,
) -> None:
    """Save raw results for a specific hierarchy, block, and draw.

    Parameters
    ----------
    df
        The results to save
    version
        The version identifier
    hierarchy
        The location hierarchy
    block_key
        The block key
    draw
        The draw of the climate data to save (e.g. "000")
    """
    path = self.raw_results_path(version, hierarchy, block_key, draw)
    mkdir(path.parent, exist_ok=True, parents=True)
    save_parquet(df, path)

save_results(df: pd.DataFrame, version: str, hierarchy: str, scenario: str, measure: str) -> None

Save final results for a specific scenario and measure.

Parameters

df The results to save version The version identifier hierarchy The location hierarchy scenario The climate scenario measure The climate measure

Source code in src/climate_data/data.py
def save_results(
    self,
    df: pd.DataFrame,
    version: str,
    hierarchy: str,
    scenario: str,
    measure: str,
) -> None:
    """Save final results for a specific scenario and measure.

    Parameters
    ----------
    df
        The results to save
    version
        The version identifier
    hierarchy
        The location hierarchy
    scenario
        The climate scenario
    measure
        The climate measure
    """
    path = self.results_path(version, hierarchy, scenario, measure)
    mkdir(path.parent, exist_ok=True, parents=True)
    save_parquet(df, path)

version_root(version: str) -> Path

Get the root directory for a specific version.

Parameters

version The version identifier

Returns

Path The directory for version-specific data

Source code in src/climate_data/data.py
def version_root(self, version: str) -> Path:
    """Get the root directory for a specific version.

    Parameters
    ----------
    version
        The version identifier

    Returns
    -------
    Path
        The directory for version-specific data
    """
    return self.root / version

ClimateData

Class for managing the climate data used in the project.

Source code in src/climate_data/data.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
class ClimateData:
    """Class for managing the climate data used in the project."""

    def __init__(
        self,
        root: str | Path = cdc.MODEL_ROOT,
        *,
        read_only: bool = False,
    ) -> None:
        self._root = Path(root)
        self._credentials_root = self._root / "credentials"
        self._read_only = read_only
        if not read_only:
            self._create_model_root()

    def _create_model_root(self) -> None:
        mkdir(self.root, exist_ok=True)
        mkdir(self.credentials_root, exist_ok=True)

        mkdir(self.extracted_data, exist_ok=True)
        mkdir(self.extracted_era5, exist_ok=True)
        mkdir(self.extracted_cmip6, exist_ok=True)
        mkdir(self.ncei_climate_stations, exist_ok=True)
        mkdir(self.open_topography_elevation, exist_ok=True)
        mkdir(self.rub_local_climate_zones, exist_ok=True)

        mkdir(self.downscale_model, exist_ok=True)
        mkdir(self.predictors, exist_ok=True)
        mkdir(self.training_data, exist_ok=True)

        mkdir(self.results, exist_ok=True)
        mkdir(self.results_metadata, exist_ok=True)
        mkdir(self.daily_results, exist_ok=True)
        mkdir(self.raw_daily_results, exist_ok=True)
        mkdir(self.annual_results, exist_ok=True)
        mkdir(self.raw_annual_results, exist_ok=True)

    @property
    def root(self) -> Path:
        return self._root

    @property
    def credentials_root(self) -> Path:
        return self._credentials_root

    ##################
    # Extracted data #
    ##################

    @property
    def extracted_data(self) -> Path:
        return self.root / "extracted_data"

    @property
    def extracted_era5(self) -> Path:
        return self.extracted_data / "era5"

    def extracted_era5_path(
        self, dataset: str, variable: str, year: int | str, month: str
    ) -> Path:
        return self.extracted_era5 / f"{dataset}_{variable}_{year}_{month}.nc"

    @property
    def extracted_cmip6(self) -> Path:
        return self.extracted_data / "cmip6"

    def load_koppen_geiger_model_inclusion(
        self, *, return_full_criteria: bool = False
    ) -> pd.DataFrame:
        meta_path = self.extracted_cmip6 / "koppen_geiger_model_inclusion.parquet"

        if not meta_path.exists():
            df = pd.read_html(
                "https://www.nature.com/articles/s41597-023-02549-6/tables/3"
            )[0]
            df.columns = [  # type: ignore[assignment]
                "source_id",
                "member_count",
                "mean_trend",
                "std_dev_trend",
                "transient_climate_response",
                "equilibrium_climate_sensitivity",
                "included_raw",
            ]
            df["included"] = df["included_raw"].apply({"Yes": True, "No": False}.get)
            save_parquet(df, meta_path)

        df = pd.read_parquet(meta_path)
        if return_full_criteria:
            return df
        return df[["source_id", "included"]]

    def load_cmip6_metadata(self) -> pd.DataFrame:
        meta_path = self.extracted_cmip6 / "cmip6-metadata.parquet"

        if not meta_path.exists():
            external_path = "https://storage.googleapis.com/cmip6/cmip6-zarr-consolidated-stores.csv"
            meta = pd.read_csv(external_path)
            save_parquet(meta, meta_path)

        return pd.read_parquet(meta_path)

    def extracted_cmip6_path(
        self,
        variable: str,
        experiment: str,
        gcm_member: str,
    ) -> Path:
        return self.extracted_cmip6 / f"{variable}_{experiment}_{gcm_member}.nc"

    def get_gcms(
        self,
        source_variables: Collection[str],
    ) -> list[str]:
        inclusion_meta = self.load_scenario_inclusion_metadata()[source_variables]
        inclusion_meta = inclusion_meta[inclusion_meta.all(axis=1)]
        return [
            f"{model}_{variant}" for model, variant in inclusion_meta.index.tolist()
        ]

    @property
    def ncei_climate_stations(self) -> Path:
        return self.extracted_data / "ncei_climate_stations"

    def save_ncei_climate_stations(self, df: pd.DataFrame, year: int | str) -> None:
        if self._read_only:
            msg = "Cannot save NCEI climate stations to read-only data"
            raise ValueError(msg)
        path = self.ncei_climate_stations / f"{year}.parquet"
        save_parquet(df, path)

    def load_ncei_climate_stations(self, year: int | str) -> pd.DataFrame:
        return pd.read_parquet(self.ncei_climate_stations / f"{year}.parquet")

    @property
    def open_topography_elevation(self) -> Path:
        return self.extracted_data / "open_topography_elevation"

    @property
    def rub_local_climate_zones(self) -> Path:
        return self.extracted_data / "rub_local_climate_zones"

    ###################
    # Downscale model #
    ###################

    @property
    def downscale_model(self) -> Path:
        return self.root / "downscale_model"

    @property
    def predictors(self) -> Path:
        return self.downscale_model / "predictors"

    def save_predictor(
        self,
        predictor: rt.RasterArray,
        name: str,
        lat_start: int,
        lon_start: int,
    ) -> None:
        if self._read_only:
            msg = "Cannot save predictors to read-only data"
            raise ValueError(msg)
        path = self.predictors / f"{name}_{lat_start}_{lon_start}.tif"
        save_raster(predictor, path)

    def load_predictor(self, name: str) -> rt.RasterArray:
        paths = list(self.predictors.glob(f"{name}_*.tif"))
        return rt.load_mf_raster(paths)

    @property
    def training_data(self) -> Path:
        return self.downscale_model / "training_data"

    def save_training_data(self, df: pd.DataFrame, year: int | str) -> None:
        if self._read_only:
            msg = "Cannot save training data to read-only data"
            raise ValueError(msg)
        path = self.training_data / f"{year}.parquet"
        save_parquet(df, path)

    def load_training_data(self, year: int | str) -> pd.DataFrame:
        return pd.read_parquet(self.training_data / f"{year}.parquet")

    ###########
    # Results #
    ###########

    @property
    def results(self) -> Path:
        return self.root / "results"

    @property
    def results_metadata(self) -> Path:
        return self.results / "metadata"

    def save_scenario_metadata(self, df: pd.DataFrame) -> None:
        if self._read_only:
            msg = "Cannot save scenario metadata to read-only data"
            raise ValueError(msg)
        path = self.results_metadata / "scenario_metadata.parquet"
        save_parquet(df, path)

    def load_scenario_metadata(self) -> pd.DataFrame:
        path = self.results_metadata / "scenario_metadata.parquet"
        return pd.read_parquet(path)

    def save_scenario_inclusion_metadata(self, df: pd.DataFrame) -> None:
        if self._read_only:
            msg = "Cannot save scenario inclusion metadata to read-only data"
            raise ValueError(msg)
        # Need to save to our scripts directory for doc building
        scripts_root = Path(__file__).parent.parent.parent / "scripts"
        for root_dir in [self.results_metadata, scripts_root]:
            path = root_dir / "scenario_inclusion_metadata.parquet"
            save_parquet(df, path)

    def load_scenario_inclusion_metadata(self) -> pd.DataFrame:
        path = self.results_metadata / "scenario_inclusion_metadata.parquet"
        return pd.read_parquet(path)

    @property
    def daily_results(self) -> Path:
        return self.results / "daily"

    @property
    def raw_daily_results(self) -> Path:
        # return self.daily_results / "raw"
        return cdc.AGGREGATE_ROOT / "erf-scratch"

    def raw_daily_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> Path:
        return self.raw_daily_results / scenario / variable / f"{year}_{gcm_member}.nc"

    def save_raw_daily_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save raw daily results to read-only data"
            raise ValueError(msg)
        path = self.raw_daily_results_path(scenario, variable, year, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def load_raw_daily_results(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> xr.Dataset:
        path = self.raw_daily_results_path(scenario, variable, year, gcm_member)
        return xr.open_dataset(path)

    def daily_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
    ) -> Path:
        return self.daily_results / scenario / variable / f"{year}.nc"

    def save_daily_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save daily results to read-only data"
            raise ValueError(msg)
        path = self.daily_results_path(scenario, variable, year)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def load_daily_results(
        self,
        scenario: str,
        variable: str,
        year: int | str,
    ) -> xr.Dataset:
        results_path = self.daily_results_path(scenario, variable, year)
        return xr.open_dataset(results_path)

    @property
    def annual_results(self) -> Path:
        return self.results / "annual"

    @property
    def raw_annual_results(self) -> Path:
        return self.annual_results / "raw"

    def raw_annual_results_path(
        self,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
    ) -> Path:
        return self.raw_annual_results / scenario / variable / f"{year}_{gcm_member}.nc"

    def save_raw_annual_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        year: int | str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save raw annual results to read-only data"
            raise ValueError(msg)
        path = self.raw_annual_results_path(scenario, variable, year, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    @property
    def compiled_annual_results(self) -> Path:
        return self.raw_annual_results / "compiled"

    def compiled_annual_results_path(
        self,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> Path:
        return self.compiled_annual_results / scenario / variable / f"{gcm_member}.nc"

    def list_gcm_members(self, scenario: str, variable: str) -> list[str]:
        return [
            p.stem
            for p in self.compiled_annual_results_path(
                scenario, variable, ""
            ).parent.glob("*.nc")
        ]

    def save_compiled_annual_results(
        self,
        results_ds: xr.Dataset,
        scenario: str,
        variable: str,
        gcm_member: str,
        encoding_kwargs: dict[str, Any],
    ) -> None:
        if self._read_only:
            msg = "Cannot save compiled annual results to read-only data"
            raise ValueError(msg)
        path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        mkdir(path.parent, exist_ok=True, parents=True)
        save_xarray(results_ds, path, encoding_kwargs)

    def load_compiled_annual_results(
        self,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> xr.Dataset:
        path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        return xr.open_dataset(path)

    def annual_results_path(
        self,
        scenario: str,
        variable: str,
        draw: int | str,
    ) -> Path:
        return self.annual_results / scenario / variable / f"{draw:0>3}.nc"

    def link_annual_draw(
        self,
        draw: int | str,
        scenario: str,
        variable: str,
        gcm_member: str,
    ) -> None:
        if self._read_only:
            msg = "Cannot link annual draw to read-only data"
            raise ValueError(msg)
        source_path = self.compiled_annual_results_path(scenario, variable, gcm_member)
        dest_path = self.annual_results_path(scenario, variable, draw)
        mkdir(dest_path.parent, exist_ok=True, parents=True)
        if dest_path.exists():
            dest_path.unlink()
        dest_path.symlink_to(source_path)

    def draw_results_path(self, scenario: str, measure: str, draw: str) -> Path:
        """Get the path to annual results for a specific scenario, measure, and draw.

        Parameters
        ----------
        scenario
            The climate scenario (e.g. "ssp126")
        measure
            The climate measure (e.g. "mean_temperature")
        draw
            The draw of the climate data to load (e.g. "000")

        Returns
        -------
        Path
            The path to the results file
        """
        return self.annual_results / scenario / measure / f"{draw}.nc"

    def load_draw_results(self, scenario: str, measure: str, draw: str) -> xr.Dataset:
        """Load annual climate results for a specific scenario, measure, and draw.

        Parameters
        ----------
        scenario
            The climate scenario (e.g. "ssp126")
        measure
            The climate measure (e.g. "mean_temperature")
        draw
            The draw of the climate data to load (e.g. "000")

        Returns
        -------
        xr.Dataset
            The climate data in xarray format
        """
        path = self.annual_results_path(scenario, measure, draw)
        ds = xr.open_dataset(path, decode_coords="all")
        ds = ds.rio.write_crs("EPSG:4326")
        return ds

draw_results_path(scenario: str, measure: str, draw: str) -> Path

Get the path to annual results for a specific scenario, measure, and draw.

Parameters

scenario The climate scenario (e.g. "ssp126") measure The climate measure (e.g. "mean_temperature") draw The draw of the climate data to load (e.g. "000")

Returns

Path The path to the results file

Source code in src/climate_data/data.py
def draw_results_path(self, scenario: str, measure: str, draw: str) -> Path:
    """Get the path to annual results for a specific scenario, measure, and draw.

    Parameters
    ----------
    scenario
        The climate scenario (e.g. "ssp126")
    measure
        The climate measure (e.g. "mean_temperature")
    draw
        The draw of the climate data to load (e.g. "000")

    Returns
    -------
    Path
        The path to the results file
    """
    return self.annual_results / scenario / measure / f"{draw}.nc"

load_draw_results(scenario: str, measure: str, draw: str) -> xr.Dataset

Load annual climate results for a specific scenario, measure, and draw.

Parameters

scenario The climate scenario (e.g. "ssp126") measure The climate measure (e.g. "mean_temperature") draw The draw of the climate data to load (e.g. "000")

Returns

xr.Dataset The climate data in xarray format

Source code in src/climate_data/data.py
def load_draw_results(self, scenario: str, measure: str, draw: str) -> xr.Dataset:
    """Load annual climate results for a specific scenario, measure, and draw.

    Parameters
    ----------
    scenario
        The climate scenario (e.g. "ssp126")
    measure
        The climate measure (e.g. "mean_temperature")
    draw
        The draw of the climate data to load (e.g. "000")

    Returns
    -------
    xr.Dataset
        The climate data in xarray format
    """
    path = self.annual_results_path(scenario, measure, draw)
    ds = xr.open_dataset(path, decode_coords="all")
    ds = ds.rio.write_crs("EPSG:4326")
    return ds

PopulationModelData

Handles population data and location hierarchies.

This class manages: 1. Population projections at different time points 2. Location hierarchies (GBD, LSAE, etc.) 3. Spatial data for aggregation

The population data is used as weights when aggregating climate data to different location hierarchies.

Source code in src/climate_data/data.py
class PopulationModelData:
    """Handles population data and location hierarchies.

    This class manages:
    1. Population projections at different time points
    2. Location hierarchies (GBD, LSAE, etc.)
    3. Spatial data for aggregation

    The population data is used as weights when aggregating climate data
    to different location hierarchies.
    """

    def __init__(
        self,
        root: str | Path = cdc.POPULATION_MODEL_ROOT,
    ) -> None:
        """Initialize the population model data manager.

        Parameters
        ----------
        root : str | Path
            Path to the population model root directory
        """
        self._root = Path(root)

    @property
    def root(self) -> Path:
        """Get the root directory for population model data."""
        return self._root

    @property
    def results(self) -> Path:
        """Get the directory containing current model results."""
        return Path(self.root, "results") / "current"

    @property
    def model_spec_path(self) -> Path:
        """Get the path to the model specification file."""
        return self.results / "specification.yaml"

    def load_model_spec(self) -> dict[str, Any]:
        """Load the model specification file.

        Returns
        -------
        dict
            The model specification containing paths and parameters
        """
        return cast(dict[str, Any], yaml.safe_load(self.model_spec_path.read_text()))

    def load_modeling_frame(self) -> gpd.GeoDataFrame:
        """Load the modeling frame containing spatial information.

        The modeling frame is a subdivision of the world into equal-area blocks.
        Each block is assigned a unique key that is used to parallelize
        pipeline steps in both population modeling and in this pipeline's
        aggregation step.

        Returns
        -------
        gpd.GeoDataFrame
            The modeling frame with spatial information and block keys
        """
        model_spec = self.load_model_spec()
        raw_root = Path(model_spec["output_root"])
        model_frame_path = raw_root.parent.parent / "modeling_frame.parquet"
        return gpd.read_parquet(model_frame_path)

    def load_results(self, time_point: str, block_key: str) -> rt.RasterArray:
        """Load population results for a specific time point and block.

        Parameters
        ----------
        time_point
            The time point to load (e.g. "2020q1")
        block_key
            The block key to load (e.g. "B-0021X-0003Y")

        Returns
        -------
        rt.RasterArray
            The population raster data
        """
        model_spec = self.load_model_spec()
        raw_root = Path(model_spec["output_root"])
        path = raw_root / "raked_predictions" / time_point / f"{block_key}.tif"
        return rt.load_raster(path)

    @property
    def raking_data(self) -> Path:
        """Get the directory containing data used to rake the population estimates.

        Raking enforces admin-level consistency between gridded population data
        and GBD/FHS population estimates. We'll use these same hierarchies to
        aggregate the climate data.

        """
        return self.root / "admin-inputs" / "raking"

    def load_raking_shapes(
        self, full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]
    ) -> gpd.GeoDataFrame:
        """Load shapes for a full aggregation hierarchy within given bounds.

        Parameters
        ----------
        full_aggregation_hierarchy
            The full aggregation hierarchy to load (e.g. "gbd_2021")
        bounds
            The bounds to load (xmin, ymin, xmax, ymax)

        Returns
        -------
        gpd.GeoDataFrame
            The shapes for the given hierarchy and bounds
        """
        if full_aggregation_hierarchy == "gbd_2021":
            shape_path = (
                self.raking_data / f"shapes_{full_aggregation_hierarchy}.parquet"
            )
            gdf = gpd.read_parquet(shape_path, bbox=bounds)

            # We're using population data here instead of a hierarchy because
            # The populations include extra locations we've supplemented that aren't
            # modeled in GBD (e.g. locations with zero population or places that
            # GBD uses population scalars from WPP to model)
            pop_path = (
                self.raking_data / f"population_{full_aggregation_hierarchy}.parquet"
            )
            pop = pd.read_parquet(pop_path)

            keep_cols = ["location_id", "location_name", "most_detailed", "parent_id"]
            keep_mask = (
                (pop.year_id == pop.year_id.max())  # Year doesn't matter
                & (pop.most_detailed == 1)
            )
            out = gdf.merge(pop.loc[keep_mask, keep_cols], on="location_id", how="left")
        elif full_aggregation_hierarchy in ["lsae_1209", "lsae_1285"]:
            # This is only a2 geoms, so already most detailed
            shape_path = (
                self.raking_data
                / "gbd-inputs"
                / f"shapes_{full_aggregation_hierarchy}_a2.parquet"
            )
            out = gpd.read_parquet(shape_path, bbox=bounds)
        else:
            msg = f"Unknown pixel hierarchy: {full_aggregation_hierarchy}"
            raise ValueError(msg)
        return out

    def load_raking_populations(self, hierarchy: str) -> pd.DataFrame:
        path = self.raking_data / f"population_{hierarchy}.parquet"
        return pd.read_parquet(path)

    def load_lsae_mapping_shapes(self, admin_level: int) -> gpd.GeoDataFrame:
        """Load the LSAE mapping shapes for a given admin level.

        Parameters
        ----------
        admin_level
            The admin level to load (0, 1, or 2)

        Returns
        -------
        gpd.GeoDataFrame
            The LSAE mapping shapes for the given admin level
        """
        assert admin_level in [0, 1, 2]
        path = f"/home/j/WORK/11_geospatial/admin_shapefiles/current/lbd_standard_admin_{admin_level}_simplified.shp"
        gdf = (
            gpd.read_file(path)
            .rename(columns={"loc_id": "location_id"})
            .loc[:, ["location_id", "geometry"]]
        )
        return gdf

    def load_subset_hierarchy(self, subset_hierarchy: str) -> pd.DataFrame:
        """Load a subset location hierarchy.

        The subset hierarchy might be equal to the full aggregation hierarchy,
        but it might also be a subset of the full aggregation hierarchy.
        These hierarchies are used to provide different views of aggregated
        climate data.

        Parameters
        ----------
        subset_hierarchy
            The administrative hierarchy to load (e.g. "gbd_2021")

        Returns
        -------
        pd.DataFrame
            The hierarchy data with parent-child relationships
        """
        allowed_hierarchies = ["gbd_2021", "fhs_2021", "lsae_1209", "lsae_1285"]
        if subset_hierarchy not in allowed_hierarchies:
            msg = f"Unknown admin hierarchy: {subset_hierarchy}"
            raise ValueError(msg)
        path = self.raking_data / "gbd-inputs" / f"hierarchy_{subset_hierarchy}.parquet"
        hierarchy_df = pd.read_parquet(path)
        if subset_hierarchy == "gbd_2021":
            to_drop_parents = [
                ## FROM POPULATION MODEL RAKING DATA PREP
                # Drop UK UTLAs from these regions
                4618,
                4919,
                4620,
                4621,
                4622,
                4623,
                4624,
                4625,
                4626,
                # Drop the India urban/rural splits from these states
                4841,
                4842,
                4843,
                4844,
                4846,
                4849,
                4850,
                4851,
                4852,
                4853,
                4854,
                4855,
                4856,
                4857,
                4859,
                4860,
                4861,
                4862,
                4863,
                4864,
                4865,
                4867,
                4868,
                4869,
                4870,
                4871,
                4872,
                4873,
                4874,
                4875,
                44538,
                # Drop the Maori/non-Maori split from New Zealand
                72,
            ]
            hierarchy_df = hierarchy_df.loc[
                ~hierarchy_df["parent_id"].isin(to_drop_parents)
            ]
            hierarchy_df.loc[
                hierarchy_df["location_id"].isin(to_drop_parents), "most_detailed"
            ] = 1

        return hierarchy_df

model_spec_path: Path property

Get the path to the model specification file.

raking_data: Path property

Get the directory containing data used to rake the population estimates.

Raking enforces admin-level consistency between gridded population data and GBD/FHS population estimates. We'll use these same hierarchies to aggregate the climate data.

results: Path property

Get the directory containing current model results.

root: Path property

Get the root directory for population model data.

__init__(root: str | Path = cdc.POPULATION_MODEL_ROOT) -> None

Initialize the population model data manager.

Parameters

root : str | Path Path to the population model root directory

Source code in src/climate_data/data.py
def __init__(
    self,
    root: str | Path = cdc.POPULATION_MODEL_ROOT,
) -> None:
    """Initialize the population model data manager.

    Parameters
    ----------
    root : str | Path
        Path to the population model root directory
    """
    self._root = Path(root)

load_lsae_mapping_shapes(admin_level: int) -> gpd.GeoDataFrame

Load the LSAE mapping shapes for a given admin level.

Parameters

admin_level The admin level to load (0, 1, or 2)

Returns

gpd.GeoDataFrame The LSAE mapping shapes for the given admin level

Source code in src/climate_data/data.py
def load_lsae_mapping_shapes(self, admin_level: int) -> gpd.GeoDataFrame:
    """Load the LSAE mapping shapes for a given admin level.

    Parameters
    ----------
    admin_level
        The admin level to load (0, 1, or 2)

    Returns
    -------
    gpd.GeoDataFrame
        The LSAE mapping shapes for the given admin level
    """
    assert admin_level in [0, 1, 2]
    path = f"/home/j/WORK/11_geospatial/admin_shapefiles/current/lbd_standard_admin_{admin_level}_simplified.shp"
    gdf = (
        gpd.read_file(path)
        .rename(columns={"loc_id": "location_id"})
        .loc[:, ["location_id", "geometry"]]
    )
    return gdf

load_model_spec() -> dict[str, Any]

Load the model specification file.

Returns

dict The model specification containing paths and parameters

Source code in src/climate_data/data.py
def load_model_spec(self) -> dict[str, Any]:
    """Load the model specification file.

    Returns
    -------
    dict
        The model specification containing paths and parameters
    """
    return cast(dict[str, Any], yaml.safe_load(self.model_spec_path.read_text()))

load_modeling_frame() -> gpd.GeoDataFrame

Load the modeling frame containing spatial information.

The modeling frame is a subdivision of the world into equal-area blocks. Each block is assigned a unique key that is used to parallelize pipeline steps in both population modeling and in this pipeline's aggregation step.

Returns

gpd.GeoDataFrame The modeling frame with spatial information and block keys

Source code in src/climate_data/data.py
def load_modeling_frame(self) -> gpd.GeoDataFrame:
    """Load the modeling frame containing spatial information.

    The modeling frame is a subdivision of the world into equal-area blocks.
    Each block is assigned a unique key that is used to parallelize
    pipeline steps in both population modeling and in this pipeline's
    aggregation step.

    Returns
    -------
    gpd.GeoDataFrame
        The modeling frame with spatial information and block keys
    """
    model_spec = self.load_model_spec()
    raw_root = Path(model_spec["output_root"])
    model_frame_path = raw_root.parent.parent / "modeling_frame.parquet"
    return gpd.read_parquet(model_frame_path)

load_raking_shapes(full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]) -> gpd.GeoDataFrame

Load shapes for a full aggregation hierarchy within given bounds.

Parameters

full_aggregation_hierarchy The full aggregation hierarchy to load (e.g. "gbd_2021") bounds The bounds to load (xmin, ymin, xmax, ymax)

Returns

gpd.GeoDataFrame The shapes for the given hierarchy and bounds

Source code in src/climate_data/data.py
def load_raking_shapes(
    self, full_aggregation_hierarchy: str, bounds: tuple[float, float, float, float]
) -> gpd.GeoDataFrame:
    """Load shapes for a full aggregation hierarchy within given bounds.

    Parameters
    ----------
    full_aggregation_hierarchy
        The full aggregation hierarchy to load (e.g. "gbd_2021")
    bounds
        The bounds to load (xmin, ymin, xmax, ymax)

    Returns
    -------
    gpd.GeoDataFrame
        The shapes for the given hierarchy and bounds
    """
    if full_aggregation_hierarchy == "gbd_2021":
        shape_path = (
            self.raking_data / f"shapes_{full_aggregation_hierarchy}.parquet"
        )
        gdf = gpd.read_parquet(shape_path, bbox=bounds)

        # We're using population data here instead of a hierarchy because
        # The populations include extra locations we've supplemented that aren't
        # modeled in GBD (e.g. locations with zero population or places that
        # GBD uses population scalars from WPP to model)
        pop_path = (
            self.raking_data / f"population_{full_aggregation_hierarchy}.parquet"
        )
        pop = pd.read_parquet(pop_path)

        keep_cols = ["location_id", "location_name", "most_detailed", "parent_id"]
        keep_mask = (
            (pop.year_id == pop.year_id.max())  # Year doesn't matter
            & (pop.most_detailed == 1)
        )
        out = gdf.merge(pop.loc[keep_mask, keep_cols], on="location_id", how="left")
    elif full_aggregation_hierarchy in ["lsae_1209", "lsae_1285"]:
        # This is only a2 geoms, so already most detailed
        shape_path = (
            self.raking_data
            / "gbd-inputs"
            / f"shapes_{full_aggregation_hierarchy}_a2.parquet"
        )
        out = gpd.read_parquet(shape_path, bbox=bounds)
    else:
        msg = f"Unknown pixel hierarchy: {full_aggregation_hierarchy}"
        raise ValueError(msg)
    return out

load_results(time_point: str, block_key: str) -> rt.RasterArray

Load population results for a specific time point and block.

Parameters

time_point The time point to load (e.g. "2020q1") block_key The block key to load (e.g. "B-0021X-0003Y")

Returns

rt.RasterArray The population raster data

Source code in src/climate_data/data.py
def load_results(self, time_point: str, block_key: str) -> rt.RasterArray:
    """Load population results for a specific time point and block.

    Parameters
    ----------
    time_point
        The time point to load (e.g. "2020q1")
    block_key
        The block key to load (e.g. "B-0021X-0003Y")

    Returns
    -------
    rt.RasterArray
        The population raster data
    """
    model_spec = self.load_model_spec()
    raw_root = Path(model_spec["output_root"])
    path = raw_root / "raked_predictions" / time_point / f"{block_key}.tif"
    return rt.load_raster(path)

load_subset_hierarchy(subset_hierarchy: str) -> pd.DataFrame

Load a subset location hierarchy.

The subset hierarchy might be equal to the full aggregation hierarchy, but it might also be a subset of the full aggregation hierarchy. These hierarchies are used to provide different views of aggregated climate data.

Parameters

subset_hierarchy The administrative hierarchy to load (e.g. "gbd_2021")

Returns

pd.DataFrame The hierarchy data with parent-child relationships

Source code in src/climate_data/data.py
def load_subset_hierarchy(self, subset_hierarchy: str) -> pd.DataFrame:
    """Load a subset location hierarchy.

    The subset hierarchy might be equal to the full aggregation hierarchy,
    but it might also be a subset of the full aggregation hierarchy.
    These hierarchies are used to provide different views of aggregated
    climate data.

    Parameters
    ----------
    subset_hierarchy
        The administrative hierarchy to load (e.g. "gbd_2021")

    Returns
    -------
    pd.DataFrame
        The hierarchy data with parent-child relationships
    """
    allowed_hierarchies = ["gbd_2021", "fhs_2021", "lsae_1209", "lsae_1285"]
    if subset_hierarchy not in allowed_hierarchies:
        msg = f"Unknown admin hierarchy: {subset_hierarchy}"
        raise ValueError(msg)
    path = self.raking_data / "gbd-inputs" / f"hierarchy_{subset_hierarchy}.parquet"
    hierarchy_df = pd.read_parquet(path)
    if subset_hierarchy == "gbd_2021":
        to_drop_parents = [
            ## FROM POPULATION MODEL RAKING DATA PREP
            # Drop UK UTLAs from these regions
            4618,
            4919,
            4620,
            4621,
            4622,
            4623,
            4624,
            4625,
            4626,
            # Drop the India urban/rural splits from these states
            4841,
            4842,
            4843,
            4844,
            4846,
            4849,
            4850,
            4851,
            4852,
            4853,
            4854,
            4855,
            4856,
            4857,
            4859,
            4860,
            4861,
            4862,
            4863,
            4864,
            4865,
            4867,
            4868,
            4869,
            4870,
            4871,
            4872,
            4873,
            4874,
            4875,
            44538,
            # Drop the Maori/non-Maori split from New Zealand
            72,
        ]
        hierarchy_df = hierarchy_df.loc[
            ~hierarchy_df["parent_id"].isin(to_drop_parents)
        ]
        hierarchy_df.loc[
            hierarchy_df["location_id"].isin(to_drop_parents), "most_detailed"
        ] = 1

    return hierarchy_df

save_parquet(df: pd.DataFrame, output_path: str | Path) -> None

Save a pandas DataFrame to a file with standard parameters.

Parameters

df The DataFrame to save. output_path The path to save the DataFrame to.

Source code in src/climate_data/data.py
def save_parquet(
    df: pd.DataFrame,
    output_path: str | Path,
) -> None:
    """Save a pandas DataFrame to a file with standard parameters.

    Parameters
    ----------
    df
        The DataFrame to save.
    output_path
        The path to save the DataFrame to.
    """
    touch(output_path, clobber=True)
    df.to_parquet(output_path)

save_raster(raster: rt.RasterArray, output_path: str | Path, num_cores: int = 1, **kwargs: Any) -> None

Save a raster to a file with standard parameters.

Parameters

raster The raster to save. output_path The path to save the raster to. num_cores The number of cores to use for compression.

Source code in src/climate_data/data.py
def save_raster(
    raster: rt.RasterArray,
    output_path: str | Path,
    num_cores: int = 1,
    **kwargs: Any,
) -> None:
    """Save a raster to a file with standard parameters.

    Parameters
    ----------
    raster
        The raster to save.
    output_path
        The path to save the raster to.
    num_cores
        The number of cores to use for compression.
    """
    save_params = {
        "tiled": True,
        "blockxsize": 512,
        "blockysize": 512,
        "compress": "ZSTD",
        "predictor": 2,  # horizontal differencing
        "num_threads": num_cores,
        "bigtiff": "yes",
        **kwargs,
    }
    touch(output_path, clobber=True)
    raster.to_file(output_path, **save_params)

save_raster_to_cog(raster: rt.RasterArray, output_path: str | Path, num_cores: int = 1, resampling: str = 'nearest') -> None

Save a raster to a COG file.

A COG file is a cloud-optimized GeoTIFF that is optimized for use in cloud storage systems. This function saves the raster to a COG file with the specified resampling method.

Parameters

raster The raster to save. output_path The path to save the raster to. num_cores The number of cores to use for compression. resampling The resampling method to use when building the overviews.

Source code in src/climate_data/data.py
def save_raster_to_cog(
    raster: rt.RasterArray,
    output_path: str | Path,
    num_cores: int = 1,
    resampling: str = "nearest",
) -> None:
    """Save a raster to a COG file.

    A COG file is a cloud-optimized GeoTIFF that is optimized for use in cloud storage
    systems. This function saves the raster to a COG file with the specified resampling
    method.

    Parameters
    ----------
    raster
        The raster to save.
    output_path
        The path to save the raster to.
    num_cores
        The number of cores to use for compression.
    resampling
        The resampling method to use when building the overviews.
    """
    cog_save_params = {
        "driver": "COG",
        "overview_resampling": resampling,
    }
    save_raster(raster, output_path, num_cores, **cog_save_params)

save_xarray(ds: xr.Dataset, output_path: str | Path, encoding_kwargs: dict[str, Any]) -> None

Save an xarray dataset to a file with standard parameters.

Parameters

ds The dataset to save. output_path The path to save the dataset to. encoding_kwargs The encoding parameters to use when saving the dataset.

Source code in src/climate_data/data.py
def save_xarray(
    ds: xr.Dataset,
    output_path: str | Path,
    encoding_kwargs: dict[str, Any],
) -> None:
    """Save an xarray dataset to a file with standard parameters.

    Parameters
    ----------
    ds
        The dataset to save.
    output_path
        The path to save the dataset to.
    encoding_kwargs
        The encoding parameters to use when saving the dataset.
    """
    touch(output_path, clobber=True)
    encoding = {
        "dtype": "int16",
        "_FillValue": -32767,
        "zlib": True,
        "complevel": 1,
    }
    encoding.update(encoding_kwargs)
    ds.to_netcdf(output_path, encoding={"value": encoding})