Skip to content

Commit

Permalink
refactor: re-write resampling.py and add python/pipeline.py
Browse files Browse the repository at this point in the history
  • Loading branch information
spool committed Feb 29, 2024
1 parent e1a8dce commit f555d55
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 38 deletions.
95 changes: 95 additions & 0 deletions python/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Wrappers to automate the entire pipeline.
Following Andy's very helpful `excel` file, this manages
a reproduction of all steps of the debiasing pipeline.
Download Data
-------------
The `download_ftp` function from `ceda_ftp_download.py` can
be used (with registered account user name and password),
to download two datasets from the Centre for Environmental
Data Analysis (CEDA)
- Saved to `ClimateData/Raw`
- [`HadUK-Grid`](https://catalogue.ceda.ac.uk/uuid/e6822428e4124c5986b689a37fda10bc)
- a 1km climate projection grid which is designed to supersede `UKCP`
- For further details see [Met Office](https://www.metoffice.gov.uk/research/climate/maps-and-data/data/haduk-grid)
- Saved to `Raw/HadsUKgrid/`
- [`UKCP`](https://catalogue.ceda.ac.uk/uuid/f9b6b55dfa174386a05efae2f0f76141) UK Climate Projections at 2.2 km
- a 2.2km projection grid
- Saved to `Raw/UKCP2.2/`
Reproject UKCP
--------------
The `bash/reproject_one.sh` copies and reprojects `UKCP2.2`via `gdalwrap` to a `Raw/Reprojected_infill`:
```bash
gdalwarp -t_srs 'EPSG:27700' -tr 2200 2200 -r near -overwrite $f "${fn%.nc}.tif" # Reproject the file`
```
*New step*: project UKCP to 360/365 days
Relevant `xarray` utilities:
- [`convert_calendar`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.convert_calendar.html)
- [`interp_calendar`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.interp_calendar.html)
Resample HadUK-Grid
-------------------
Previous approach:
- `resampling_hads.py`
New approach:
- `resampling.py`
- check `x_grid` and `y_grid` interpolation
```
# the dataset to be resample must have dimensions named
# projection_x_coordinate and projection_y_coordinate .
resampled = data_360[[variable]].interp(
projection_x_coordinate=x_grid,
projection_y_coordinate=y_grid,
method="linear",
)
```
- [ ] Refactor to facilitate testing
- [ ] Ensure HADs still works
- [ ] Add function for UKCP
Cropping
--------
Pre-processing
--------------
- Originally used `debiasing.pre_processing.py`
New approach:
- Refactor `debiasing.debias-wrapper`
Debiasing
---------
- `python`
- Originally used `debiasing.pre_processing.py`
- Refactor `debiasing.debias-wrapper`
- `R`
"""
from pathlib import Path
from typing import Final

import ceda_ftp_download
import data_loader
import resampling
from osgeo.gdal import Warp

REPROJECTION_SHELL_SCRIPT: Final[Path] = Path("../bash/reproject_one.sh")
REPROJECTION_WRAPPER_SHELL_SCRIPT: Final[Path] = Path("../bash/reproject_all.sh")
192 changes: 154 additions & 38 deletions python/resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
"""

import argparse
import glob
import multiprocessing
import os
from dataclasses import dataclass
from glob import glob
from pathlib import Path
from typing import Callable, Iterable

import numpy as np
import pandas as pd
import xarray as xr # requires rioxarray extension
from tqdm import tqdm
Expand All @@ -17,13 +21,16 @@
ChangeDayType = set[tuple[int, int]]

MONTH_DAY_DROP: DropDayType = {(1, 31), (4, 1), (6, 1), (8, 1), (10, 1), (12, 1)}
"""A `set` of tuples of month and day numbers for `enforce_date_dropping`."""
"""A `set` of tuples of month and day numbers for `enforce_date_changes`."""

MONTH_DAY_ADD: ChangeDayType = {(1, 31), (4, 1), (6, 1), (8, 1), (10, 1)}
"""CONSIDER ADDING DAYS to 360."""

ResamplingArgs = tuple[os.PathLike, np.ndarray, np.ndarray, os.PathLike]
ResamplingCallable = Callable[[list | tuple], int]

def enforce_date_dropping(

def enforce_date_changes(
raw_data: xr.Dataset,
converted_data: xr.Dataset,
month_day_drop: DropDayType = MONTH_DAY_DROP,
Expand Down Expand Up @@ -54,14 +61,14 @@ def enforce_date_dropping(
--------
>>> test_4_days_xarray: xr.DataArray = getfixture(
... 'xarray_spatial_4_days')
>>> enforce_date_dropping(
>>> enforce_date_changes(
... test_4_days_xarray,
... test_4_days_xarray)['time'].coords
Coordinates:
* time (time) object 1980-11-30 1980-12-02 1980-12-03 1980-12-04
>>> test_4_years_xarray: xr.DataArray = getfixture(
... 'xarray_spatial_4_years')
>>> ts_4_years: xr.DataArray = enforce_date_dropping(
>>> ts_4_years: xr.DataArray = enforce_date_changes(
... test_4_years_xarray, test_4_years_xarray)
>>> ts_4_years
<xarray.DataArray (time: 1437, space: 3)>
Expand Down Expand Up @@ -104,7 +111,8 @@ def enforce_date_dropping(
return converted_data


def resample_hadukgrid(x: list) -> int:
# def resample_hadukgrid(x: list) -> int:
def resample_hadukgrid(x: list | tuple) -> int:
"""Resample UKHADs data to match UKCP18 spatially and temporally.
Results are saved to the output directory.
Expand Down Expand Up @@ -146,23 +154,29 @@ def resample_hadukgrid(x: list) -> int:

data = xr.open_dataset(file, decode_coords="all")

# convert to 360 day calendar.
data_360 = data.convert_calendar(
dim="time", calendar="360_day", align_on="year"
)
# apply correction if leap year
if data.time.dt.is_leap_year.any():
data_360 = enforce_date_dropping(data, data_360)
# # convert to 360 day calendar.
# data_360 = data.convert_calendar(
# dim="time", calendar="360_day", align_on="year"
# )
# # apply correction if leap year
# if data.time.dt.is_leap_year.any():
# data_360 = enforce_date_changes(data, data_360)

# the dataset to be resample must have dimensions named projection_x_coordinate and projection_y_coordinate .
resampled = data_360[[variable]].interp(
# resampled = data_360[[variable]].interp(
# projection_x_coordinate=x_grid,
# projection_y_coordinate=y_grid,
# method="linear",
# )
resampled = data[[variable]].interp(
projection_x_coordinate=x_grid,
projection_y_coordinate=y_grid,
method="linear",
)

# make sure we keep the original CRS
resampled.rio.write_crs(data_360.rio.crs, inplace=True)
# resampled.rio.write_crs(data_360.rio.crs, inplace=True)
resampled.rio.write_crs(data.rio.crs, inplace=True)

# save resampled file
resampled.to_netcdf(os.path.join(output_dir, output_name))
Expand All @@ -172,6 +186,102 @@ def resample_hadukgrid(x: list) -> int:
return 0


@dataclass
class HADsUKResampleManager:
input: os.PathLike | None
output: os.PathLike
grid_data: os.PathLike | None
grid: xr.Dataset | None = None
input_nc_files: Iterable[os.PathLike] | None = None
cpus: int | None = None
resampling_func: ResamplingCallable = resample_hadukgrid

def __len__(self) -> int:
"""Return the length of `self.input_nc_files`."""
return len(self.input_nc_files) if self.input_nc_files else 0

def set_input_nc_files(self, new_input_path: os.PathLike | None = None) -> None:
"""Replace `self.input` and process `self.input_nc_files`."""
if new_input_path:
self.input = new_input_path
if not self.input_nc_files:
self.input_nc_files = tuple(
glob(f"{parser_args.input}/*.nc", recursive=True)
)

def set_grid_x_y(self, new_grid_data: os.PathLike | None = None) -> None:
if new_grid_data:
self.grid_data = new_grid_data
if not self.grid:
self.grid = xr.open_dataset(self.grid_data)
try:
# must have dimensions named projection_x_coordinate and projection_y_coordinate
self.x: np.ndarray = grid["projection_x_coordinate"][:].values
self.y: np.ndarray = grid["projection_y_coordinate"][:].values
except Exception as e:
print(f"Grid file: {parser_args.grid_data} produced errors: {e}")

def __post_init__(self) -> None:
"""Generate related attributes."""
self.set_grid_x_y()
self.set_input_nc_files()
Path(self.output).mkdir(parents=True, exist_ok=True)
self.total_cpus: int | None = os.cpu_count()
if not self.cpus:
self.cpus = 1 if not self.total_cpus else self.total_cpus

@property
def resample_args(self) -> list[ResamplingArgs]:
"""Return args to pass to `self.resample`."""
if not self.input_nc_files:
self.set_input_nc_files()
if not self.x or not self.y:
self.set_grid_x_y()
return [[f, self.x, self.x, parser_args.output] for f in self.input_nc_files]

def resample_multiprocessing(self) -> list[int]:
"""Run `self.resampling_func` via `multiprocessing`."""

with multiprocessing.Pool(processes=self.cpus) as pool:
self.results = list(
tqdm(
pool.imap_unordered(self.resampling_func, self.resample_args),
total=len(self),
)
)
return self.results

# if not os.path.exists(parser_args.output):
# os.makedirs(parser_args.output)

# def process_grid_data(self) -> None:
# """Process `grid_data` attribute."""
# self.grid = xr.open_dataset(self.grid_data)

#
# grid = xr.open_dataset(parser_args.grid_data)
#
# try:
# # must have dimensions named projection_x_coordinate and projection_y_coordinate
# x = grid["projection_x_coordinate"][:].values
# y = grid["projection_y_coordinate"][:].values
# except Exception as e:
# print(f"Grid file: {parser_args.grid_data} produced errors: {e}")
#
# # If output file do not exist create it
# if not os.path.exists(parser_args.output):
# os.makedirs(parser_args.output)
#
# # find all nc files in input directory
# files = glob.glob(f"{parser_args.input}/*.nc", recursive=True)
# N = len(files)
#
# args = [[f, x, y, parser_args.output] for f in files]
#
# with multiprocessing.Pool(processes=os.cpu_count() - 1) as pool:
# res = list(tqdm(pool.imap_unordered(resample_hadukgrid, args), total=N))


if __name__ == "__main__":
"""
Script to resample UKHADs data from the command line
Expand Down Expand Up @@ -200,28 +310,34 @@ def resample_hadukgrid(x: list) -> int:
default=".",
type=str,
)

parser_args = parser.parse_args()
hads_run_manager = HADsUKResampleManager(
input=parser_args.input,
grid_data=parser_args.grid_data,
output=parser_args.output,
)

# reading baseline grid to resample files to
grid = xr.open_dataset(parser_args.grid_data)

try:
# must have dimensions named projection_x_coordinate and projection_y_coordinate
x = grid["projection_x_coordinate"][:].values
y = grid["projection_y_coordinate"][:].values
except Exception as e:
print(f"Grid file: {parser_args.grid_data} produced errors: {e}")

# If output file do not exist create it
if not os.path.exists(parser_args.output):
os.makedirs(parser_args.output)

# find all nc files in input directory
files = glob.glob(f"{parser_args.input}/*.nc", recursive=True)
N = len(files)

args = [[f, x, y, parser_args.output] for f in files]

with multiprocessing.Pool(processes=os.cpu_count() - 1) as pool:
res = list(tqdm(pool.imap_unordered(resample_hadukgrid, args), total=N))
# parser_args = parser.parse_args()
#
# # reading baseline grid to resample files to
# grid = xr.open_dataset(parser_args.grid_data)
#
# try:
# # must have dimensions named projection_x_coordinate and projection_y_coordinate
# x = grid["projection_x_coordinate"][:].values
# y = grid["projection_y_coordinate"][:].values
# except Exception as e:
# print(f"Grid file: {parser_args.grid_data} produced errors: {e}")
#
# # If output file do not exist create it
# if not os.path.exists(parser_args.output):
# os.makedirs(parser_args.output)
#
# # find all nc files in input directory
# files = glob.glob(f"{parser_args.input}/*.nc", recursive=True)
# N = len(files)
#
# args = [[f, x, y, parser_args.output] for f in files]
#
# with multiprocessing.Pool(processes=os.cpu_count() - 1) as pool:
# res = list(tqdm(pool.imap_unordered(resample_hadukgrid, args), total=N))

0 comments on commit f555d55

Please sign in to comment.