diff --git a/python/pipeline.py b/python/pipeline.py new file mode 100644 index 00000000..3d5b9fdf --- /dev/null +++ b/python/pipeline.py @@ -0,0 +1,95 @@ +"""Wrappers to automate the entire pipeline. + +Following Andy's very helpful `excel` file, this manages +a reproduction of all steps of the debiasing pipeline. + +Download Data +------------- + +The `download_ftp` function from `ceda_ftp_download.py` can +be used (with registered account user name and password), +to download two datasets from the Centre for Environmental +Data Analysis (CEDA) + +- Saved to `ClimateData/Raw` +- [`HadUK-Grid`](https://catalogue.ceda.ac.uk/uuid/e6822428e4124c5986b689a37fda10bc) + - a 1km climate projection grid which is designed to supersede `UKCP` + - For further details see [Met Office](https://www.metoffice.gov.uk/research/climate/maps-and-data/data/haduk-grid) + - Saved to `Raw/HadsUKgrid/` +- [`UKCP`](https://catalogue.ceda.ac.uk/uuid/f9b6b55dfa174386a05efae2f0f76141) UK Climate Projections at 2.2 km + - a 2.2km projection grid + - Saved to `Raw/UKCP2.2/` + +Reproject UKCP +-------------- + +The `bash/reproject_one.sh` copies and reprojects `UKCP2.2`via `gdalwrap` to a `Raw/Reprojected_infill`: +```bash +gdalwarp -t_srs 'EPSG:27700' -tr 2200 2200 -r near -overwrite $f "${fn%.nc}.tif" # Reproject the file` +``` + +*New step*: project UKCP to 360/365 days + +Relevant `xarray` utilities: + +- [`convert_calendar`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.convert_calendar.html) +- [`interp_calendar`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.interp_calendar.html) + +Resample HadUK-Grid +------------------- + +Previous approach: + +- `resampling_hads.py` + +New approach: + +- `resampling.py` +- check `x_grid` and `y_grid` interpolation +``` +# the dataset to be resample must have dimensions named +# projection_x_coordinate and projection_y_coordinate . +resampled = data_360[[variable]].interp( + projection_x_coordinate=x_grid, + projection_y_coordinate=y_grid, + method="linear", +) +``` +- [ ] Refactor to facilitate testing +- [ ] Ensure HADs still works +- [ ] Add function for UKCP + +Cropping +-------- + + +Pre-processing +-------------- + +- Originally used `debiasing.pre_processing.py` + +New approach: + +- Refactor `debiasing.debias-wrapper` + +Debiasing +--------- + +- `python` + - Originally used `debiasing.pre_processing.py` + - Refactor `debiasing.debias-wrapper` +- `R` + + + +""" +from pathlib import Path +from typing import Final + +import ceda_ftp_download +import data_loader +import resampling +from osgeo.gdal import Warp + +REPROJECTION_SHELL_SCRIPT: Final[Path] = Path("../bash/reproject_one.sh") +REPROJECTION_WRAPPER_SHELL_SCRIPT: Final[Path] = Path("../bash/reproject_all.sh") diff --git a/python/resampling.py b/python/resampling.py index 9a455d96..00a9ddde 100644 --- a/python/resampling.py +++ b/python/resampling.py @@ -5,10 +5,14 @@ """ import argparse -import glob import multiprocessing import os +from dataclasses import dataclass +from glob import glob +from pathlib import Path +from typing import Callable, Iterable +import numpy as np import pandas as pd import xarray as xr # requires rioxarray extension from tqdm import tqdm @@ -17,13 +21,16 @@ ChangeDayType = set[tuple[int, int]] MONTH_DAY_DROP: DropDayType = {(1, 31), (4, 1), (6, 1), (8, 1), (10, 1), (12, 1)} -"""A `set` of tuples of month and day numbers for `enforce_date_dropping`.""" +"""A `set` of tuples of month and day numbers for `enforce_date_changes`.""" MONTH_DAY_ADD: ChangeDayType = {(1, 31), (4, 1), (6, 1), (8, 1), (10, 1)} """CONSIDER ADDING DAYS to 360.""" +ResamplingArgs = tuple[os.PathLike, np.ndarray, np.ndarray, os.PathLike] +ResamplingCallable = Callable[[list | tuple], int] -def enforce_date_dropping( + +def enforce_date_changes( raw_data: xr.Dataset, converted_data: xr.Dataset, month_day_drop: DropDayType = MONTH_DAY_DROP, @@ -54,14 +61,14 @@ def enforce_date_dropping( -------- >>> test_4_days_xarray: xr.DataArray = getfixture( ... 'xarray_spatial_4_days') - >>> enforce_date_dropping( + >>> enforce_date_changes( ... test_4_days_xarray, ... test_4_days_xarray)['time'].coords Coordinates: * time (time) object 1980-11-30 1980-12-02 1980-12-03 1980-12-04 >>> test_4_years_xarray: xr.DataArray = getfixture( ... 'xarray_spatial_4_years') - >>> ts_4_years: xr.DataArray = enforce_date_dropping( + >>> ts_4_years: xr.DataArray = enforce_date_changes( ... test_4_years_xarray, test_4_years_xarray) >>> ts_4_years @@ -104,7 +111,8 @@ def enforce_date_dropping( return converted_data -def resample_hadukgrid(x: list) -> int: +# def resample_hadukgrid(x: list) -> int: +def resample_hadukgrid(x: list | tuple) -> int: """Resample UKHADs data to match UKCP18 spatially and temporally. Results are saved to the output directory. @@ -146,23 +154,29 @@ def resample_hadukgrid(x: list) -> int: data = xr.open_dataset(file, decode_coords="all") - # convert to 360 day calendar. - data_360 = data.convert_calendar( - dim="time", calendar="360_day", align_on="year" - ) - # apply correction if leap year - if data.time.dt.is_leap_year.any(): - data_360 = enforce_date_dropping(data, data_360) + # # convert to 360 day calendar. + # data_360 = data.convert_calendar( + # dim="time", calendar="360_day", align_on="year" + # ) + # # apply correction if leap year + # if data.time.dt.is_leap_year.any(): + # data_360 = enforce_date_changes(data, data_360) # the dataset to be resample must have dimensions named projection_x_coordinate and projection_y_coordinate . - resampled = data_360[[variable]].interp( + # resampled = data_360[[variable]].interp( + # projection_x_coordinate=x_grid, + # projection_y_coordinate=y_grid, + # method="linear", + # ) + resampled = data[[variable]].interp( projection_x_coordinate=x_grid, projection_y_coordinate=y_grid, method="linear", ) # make sure we keep the original CRS - resampled.rio.write_crs(data_360.rio.crs, inplace=True) + # resampled.rio.write_crs(data_360.rio.crs, inplace=True) + resampled.rio.write_crs(data.rio.crs, inplace=True) # save resampled file resampled.to_netcdf(os.path.join(output_dir, output_name)) @@ -172,6 +186,102 @@ def resample_hadukgrid(x: list) -> int: return 0 +@dataclass +class HADsUKResampleManager: + input: os.PathLike | None + output: os.PathLike + grid_data: os.PathLike | None + grid: xr.Dataset | None = None + input_nc_files: Iterable[os.PathLike] | None = None + cpus: int | None = None + resampling_func: ResamplingCallable = resample_hadukgrid + + def __len__(self) -> int: + """Return the length of `self.input_nc_files`.""" + return len(self.input_nc_files) if self.input_nc_files else 0 + + def set_input_nc_files(self, new_input_path: os.PathLike | None = None) -> None: + """Replace `self.input` and process `self.input_nc_files`.""" + if new_input_path: + self.input = new_input_path + if not self.input_nc_files: + self.input_nc_files = tuple( + glob(f"{parser_args.input}/*.nc", recursive=True) + ) + + def set_grid_x_y(self, new_grid_data: os.PathLike | None = None) -> None: + if new_grid_data: + self.grid_data = new_grid_data + if not self.grid: + self.grid = xr.open_dataset(self.grid_data) + try: + # must have dimensions named projection_x_coordinate and projection_y_coordinate + self.x: np.ndarray = grid["projection_x_coordinate"][:].values + self.y: np.ndarray = grid["projection_y_coordinate"][:].values + except Exception as e: + print(f"Grid file: {parser_args.grid_data} produced errors: {e}") + + def __post_init__(self) -> None: + """Generate related attributes.""" + self.set_grid_x_y() + self.set_input_nc_files() + Path(self.output).mkdir(parents=True, exist_ok=True) + self.total_cpus: int | None = os.cpu_count() + if not self.cpus: + self.cpus = 1 if not self.total_cpus else self.total_cpus + + @property + def resample_args(self) -> list[ResamplingArgs]: + """Return args to pass to `self.resample`.""" + if not self.input_nc_files: + self.set_input_nc_files() + if not self.x or not self.y: + self.set_grid_x_y() + return [[f, self.x, self.x, parser_args.output] for f in self.input_nc_files] + + def resample_multiprocessing(self) -> list[int]: + """Run `self.resampling_func` via `multiprocessing`.""" + + with multiprocessing.Pool(processes=self.cpus) as pool: + self.results = list( + tqdm( + pool.imap_unordered(self.resampling_func, self.resample_args), + total=len(self), + ) + ) + return self.results + + # if not os.path.exists(parser_args.output): + # os.makedirs(parser_args.output) + + # def process_grid_data(self) -> None: + # """Process `grid_data` attribute.""" + # self.grid = xr.open_dataset(self.grid_data) + + # + # grid = xr.open_dataset(parser_args.grid_data) + # + # try: + # # must have dimensions named projection_x_coordinate and projection_y_coordinate + # x = grid["projection_x_coordinate"][:].values + # y = grid["projection_y_coordinate"][:].values + # except Exception as e: + # print(f"Grid file: {parser_args.grid_data} produced errors: {e}") + # + # # If output file do not exist create it + # if not os.path.exists(parser_args.output): + # os.makedirs(parser_args.output) + # + # # find all nc files in input directory + # files = glob.glob(f"{parser_args.input}/*.nc", recursive=True) + # N = len(files) + # + # args = [[f, x, y, parser_args.output] for f in files] + # + # with multiprocessing.Pool(processes=os.cpu_count() - 1) as pool: + # res = list(tqdm(pool.imap_unordered(resample_hadukgrid, args), total=N)) + + if __name__ == "__main__": """ Script to resample UKHADs data from the command line @@ -200,28 +310,34 @@ def resample_hadukgrid(x: list) -> int: default=".", type=str, ) - parser_args = parser.parse_args() + hads_run_manager = HADsUKResampleManager( + input=parser_args.input, + grid_data=parser_args.grid_data, + output=parser_args.output, + ) - # reading baseline grid to resample files to - grid = xr.open_dataset(parser_args.grid_data) - - try: - # must have dimensions named projection_x_coordinate and projection_y_coordinate - x = grid["projection_x_coordinate"][:].values - y = grid["projection_y_coordinate"][:].values - except Exception as e: - print(f"Grid file: {parser_args.grid_data} produced errors: {e}") - - # If output file do not exist create it - if not os.path.exists(parser_args.output): - os.makedirs(parser_args.output) - - # find all nc files in input directory - files = glob.glob(f"{parser_args.input}/*.nc", recursive=True) - N = len(files) - - args = [[f, x, y, parser_args.output] for f in files] - - with multiprocessing.Pool(processes=os.cpu_count() - 1) as pool: - res = list(tqdm(pool.imap_unordered(resample_hadukgrid, args), total=N)) + # parser_args = parser.parse_args() + # + # # reading baseline grid to resample files to + # grid = xr.open_dataset(parser_args.grid_data) + # + # try: + # # must have dimensions named projection_x_coordinate and projection_y_coordinate + # x = grid["projection_x_coordinate"][:].values + # y = grid["projection_y_coordinate"][:].values + # except Exception as e: + # print(f"Grid file: {parser_args.grid_data} produced errors: {e}") + # + # # If output file do not exist create it + # if not os.path.exists(parser_args.output): + # os.makedirs(parser_args.output) + # + # # find all nc files in input directory + # files = glob.glob(f"{parser_args.input}/*.nc", recursive=True) + # N = len(files) + # + # args = [[f, x, y, parser_args.output] for f in files] + # + # with multiprocessing.Pool(processes=os.cpu_count() - 1) as pool: + # res = list(tqdm(pool.imap_unordered(resample_hadukgrid, args), total=N))