From 61810fa7840d71c978b8ae70cfd567ea78e59fa2 Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Mon, 6 Apr 2026 11:06:43 +0200 Subject: [PATCH 01/10] add GLOFAS dataset module --- atlite/datasets/__init__.py | 4 +- atlite/datasets/glofas.py | 444 ++++++++++++++++++++++++++++++++++++ 2 files changed, 446 insertions(+), 2 deletions(-) create mode 100644 atlite/datasets/glofas.py diff --git a/atlite/datasets/__init__.py b/atlite/datasets/__init__.py index 045c59d8..52784f18 100644 --- a/atlite/datasets/__init__.py +++ b/atlite/datasets/__init__.py @@ -6,6 +6,6 @@ atlite datasets. """ -from atlite.datasets import era5, gebco, sarah +from atlite.datasets import era5, gebco, glofas, sarah -modules = {"era5": era5, "sarah": sarah, "gebco": gebco} +modules = {"era5": era5, "sarah": sarah, "gebco": gebco, "glofas": glofas} diff --git a/atlite/datasets/glofas.py b/atlite/datasets/glofas.py new file mode 100644 index 00000000..f0fd230c --- /dev/null +++ b/atlite/datasets/glofas.py @@ -0,0 +1,444 @@ +# SPDX-FileCopyrightText: Contributors to atlite +# +# SPDX-License-Identifier: MIT +""" +Module for downloading and curating data from ECMWFs GLOFAS dataset (via CDS). + +For further reference see +https://ewds.climate.copernicus.eu/datasets/cems-glofas-historical?tab=overview +""" + +import logging +import os +import weakref +import zipfile +from pathlib import Path +from tempfile import mkstemp + +import cdsapi +import numpy as np +import xarray as xr +from dask import compute, delayed +from dask.utils import SerializableLock +from numpy import atleast_1d + +from atlite.gis import maybe_swap_spatial_dims + +# Null context for running a with statements wihout any context +try: + from contextlib import nullcontext +except ImportError: + # for Python verions < 3.7: + import contextlib + + @contextlib.contextmanager + def nullcontext(): + yield + + +logger = logging.getLogger(__name__) + +# Model and CRS Settings +crs = 4326 + +dataset = "cems-glofas-historical" + +features = {"discharge": ["discharge"]} + + +def noisy_unlink(path): + """ + Delete file at given path. + """ + logger.debug(f"Deleting file {path}") + try: + os.unlink(path) + except PermissionError: + logger.error(f"Unable to delete file {path}, as it is still in use.") + + +def _area(coords): + # North, West, South, East. Default: global + x0, x1 = coords["x"].min().item(), coords["x"].max().item() + y0, y1 = coords["y"].min().item(), coords["y"].max().item() + return [y1, x0, y0, x1] + + +def sanitize_chunks(chunks, **dim_mapping): + dim_mapping = dict(time="valid_time", x="longitude", y="latitude") | dim_mapping + if not isinstance(chunks, dict): + # preserve "auto" or None + return chunks + + return { + extname: chunks[intname] + for intname, extname in dim_mapping.items() + if intname in chunks + } + + +def add_finalizer(ds: xr.Dataset, target: str | Path): + logger.debug(f"Adding finalizer for {target}") + weakref.finalize(ds._close.__self__.ds, noisy_unlink, target) + + +def _rename_and_clean_coords(ds, add_lon_lat=True): + """ + Rename 'longitude' and 'latitude' columns to 'x' and 'y' and fix roundings. + + Optionally (add_lon_lat, default:True) preserves latitude and + longitude columns as 'lat' and 'lon'. + """ + ds = ds.rename( + { + "longitude": "x", + "latitude": "y", + "valid_time": "time", + "dis24": "discharge", + } + ) + # round coords since cds coords are float32 which would lead to mismatches + ds = ds.assign_coords( + x=np.round(ds.x.astype(float), 5), y=np.round(ds.y.astype(float), 5) + ) + ds = maybe_swap_spatial_dims(ds) + if add_lon_lat: + ds = ds.assign_coords(lon=ds.coords["x"], lat=ds.coords["y"]) + ds = ds.drop_vars(["expver", "number"], errors="ignore") + + return ds + + +def open_with_grib_conventions( + grib_file: str | Path, chunks=None, tmpdir: str | Path | None = None +) -> xr.Dataset: + """ + Convert grib file of Glofas data from the CDS to netcdf file. + + The function does the same thing as the CDS backend does, but locally. + This is needed, as the grib file is the recommended download file type for CDS, with conversion to netcdf locally. + The routine is a reduced version based on the documentation here: + https://confluence.ecmwf.int/display/CKB/GRIB+to+netCDF+conversion+on+new+CDS+and+ADS+systems#GRIBtonetCDFconversiononnewCDSandADSsystems-jupiternotebook + + Parameters + ---------- + grib_file : str | Path + Path to the grib file to be converted. + chunks + Chunks + tmpdir : Path, optional + If None adds a finalizer to the dataset object + + Returns + ------- + xr.Dataset + """ + # + # Open grib file as dataset + # Options to open different datasets into a datasets of consistent hypercubes which are compatible netCDF + # There are options that might be relevant for e.g. for wave model data, that have been removed here + # to keep the code cleaner and shorter + ds = xr.open_dataset( + grib_file, + engine="cfgrib", + time_dims=["valid_time"], + ignore_keys=["edition"], + coords_as_attributes=[ + "surface", + "depthBelowLandLayer", + "entireAtmosphere", + "heightAboveGround", + "meanSea", + ], + chunks=sanitize_chunks(chunks), + ) + if tmpdir is None: + add_finalizer(ds, grib_file) + + def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset: + """ + Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset + and that the order of dimensions is preserved. + """ + dims_required = [ + c for c in dataset.coords if c in expand_dims + list(dataset.dims) + ] + dims_missing = [ + (c, i) for i, c in enumerate(dims_required) if c not in dataset.dims + ] + dataset = dataset.expand_dims( + dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing] + ) + return dataset + + logger.debug("Converting grib file to netcdf format") + # Variables and dimensions to rename if they exist in the dataset + rename_vars = { + "time": "forecast_reference_time", + "step": "forecast_period", + "isobaricInhPa": "pressure_level", + "hybrid": "model_level", + } + rename_vars = {k: v for k, v in rename_vars.items() if k in ds} + ds = ds.rename(rename_vars) + + # safely expand dimensions in an xarray dataset to ensure that data for the new dimensions are in the dataset + ds = safely_expand_dims(ds, ["valid_time", "pressure_level", "model_level"]) + + return ds + + +def retrieve_data( + product: str, + chunks: dict[str, int] | None = None, + tmpdir: str | Path | None = None, + lock: SerializableLock | None = None, + **updates, +) -> xr.Dataset: + """ + Download data like Glofas from the Climate Data Store (CDS). + + If you want to track the state of your request go to + https://cds-beta.climate.copernicus.eu/requests?tab=all + + Parameters + ---------- + product : str + Product name, e.g. 'cems-glofas-historical'. + chunks : dict, optional + Chunking for xarray dataset, e.g. {'time': 1, 'x': 100, 'y': 100}. + Default is None. + tmpdir : str, optional + Directory where the downloaded data is temporarily stored. + Default is None, which uses the system's temporary directory. + lock : dask.utils.SerializableLock, optional + Lock for thread-safe file writing. Default is None. + updates : dict + Additional parameters for the request. + Must include 'year', 'month', and 'variable'. + Can include e.g. 'data_format'. + + Returns + ------- + xarray.Dataset + Dataset with the retrieved variables. + + Examples + -------- + >>> ds = retrieve_data( + ... product='cems-glofas-historical', + ... chunks={'time': 1, 'x': 100, 'y': 100}, + ... tmpdir='/tmp', + ... lock=None, + ... year='2020', + ... month='01', + ... variable=['river_discharge_in_the_last_24_hours'], + ... data_format='grib' + ... ) + """ + request = { + "system_version": ["version_4_0"], + "hydrological_model": ["lisflood"], + "product_type": ["consolidated"], + "variable": ["river_discharge_in_the_last_24_hours"], + "data_format": "grib", + "download_format": "zip", + } + request.update(updates) + + assert {"hyear", "hmonth", "variable"}.issubset(request), ( + "Need to specify at least 'variable', 'hyear' and 'hmonth'" + ) + + logger.debug(f"Requesting {product} with API request: {request}") + # Url needs to be set manually here, overrides url from .cdsapirc (for use with multiple modules) + client = cdsapi.Client( + info_callback=logger.debug, + debug=logging.DEBUG >= logging.root.level, + url="https://ewds.climate.copernicus.eu/api", + ) + result = client.retrieve(product, request) + + if lock is None: + lock = nullcontext() + + suffix = f".{request['data_format']}" # .netcdf or .grib + with lock: + fd, target = mkstemp(suffix=suffix, dir=tmpdir) + os.close(fd) + + # Inform user about data being downloaded as "* variable (year-month)" + timestr = f"{request['hyear']}-{request['hmonth']}" + variables = atleast_1d(request["variable"]) + varstr = "\n\t".join([f"{v} ({timestr})" for v in variables]) + logger.info(f"CDS: Downloading variables\n\t{varstr}\n") + result.download(target) + + # Extract data if downloaded as zip file + if request.get("download_format") == "zip": + with zipfile.ZipFile(target, "r") as zip_ref: + zip_ref.extractall(Path(tmpdir) / Path(target).stem) + os.unlink(target) # delete the zip file after extraction + target = Path(tmpdir) / Path(Path(target).stem) / Path("data" + suffix) + + # Convert from grib to netcdf locally, same conversion as in CDS backend + if request["data_format"] == "grib": + ds = open_with_grib_conventions(target, chunks=chunks, tmpdir=tmpdir) + else: + ds = xr.open_dataset(target, chunks=sanitize_chunks(chunks)) + if tmpdir is None: + add_finalizer(target) + return ds + + +def retrieval_times(coords, static=False, monthly_requests=False): + """ + Get list of retrieval cdsapi arguments for time dimension in coordinates. + + If static is False, this function creates a query for each month and year + in the time axis in coords. This ensures not running into size query limits + of the cdsapi even with very (spatially) large cutouts. + If static is True, the function return only one set of parameters + for the very first time point. + + Parameters + ---------- + coords : atlite.Cutout.coords + static : bool, optional + monthly_requests : bool, optional + If True, the data is requested on a monthly basis. This is useful for + large cutouts, where the data is requested in smaller chunks. The + default is False + + Returns + ------- + list of dicts witht retrieval arguments + + """ + time = coords["time"].to_index() + if static: + # Make sure that day and month are in two digit format + day = str(time[0].day) if time[0].day >= 10 else f"0{time[0].day}" + month = str(time[0].month) if time[0].month >= 10 else f"0{time[0].month}" + return { + "hyear": str(time[0].year), + "hmonth": month, + "hday": day, + "time": time[0].strftime("%H:00"), + } + + # Prepare request for all months and years + times = [] + for year in time.year.unique(): + t = time[time.year == year] + if monthly_requests: + for month in t.month.unique(): + # Make sure that day and month are in two digit format + days = list(t[t.month == month].day.unique()) + days_str = [str(d) if d >= 10 else f"0{d}" for d in days] + query = { + "hyear": str(year), + "hmonth": str(month) if month >= 10 else f"0{month}", + "hday": days_str, + } + times.append(query) + else: + # Make sure that day and month are in two digit format + days = list(t.day.unique()) + days_str = [str(d) if d >= 10 else f"0{d}" for d in days] + months = list(t.month.unique()) + months_str = [str(m) if m >= 10 else f"0{m}" for m in months] + query = {"hyear": str(year), "hmonth": months_str, "hday": days_str} + times.append(query) + return times + + +def get_data( + cutout, + feature, + tmpdir="tmp", + lock=None, + data_format="grib", + monthly_requests=False, + concurrent_requests=False, + **creation_parameters, +): + """ + Retrieve data from ECMWFs GLOFAS dataset (via CDS). + + This front-end function downloads data for a specific feature and formats + it to match the given Cutout. + + Parameters + ---------- + cutout : atlite.Cutout + feature : str + Name of the feature data to retrieve. Must be in + `atlite.datasets.glofas.features` + tmpdir : str/Path + Directory where the temporary netcdf files are stored. + data_format : str, optional + The format of the data to be downloaded. Can be either 'grib' or 'netcdf', + 'grib' highly recommended because CDSAPI limits request size for netcdf. + concurrent_requests : bool, optional + If True, the monthly data requests are posted concurrently. + Only has an effect if `monthly_requests` is True. + **creation_parameters : + Additional keyword arguments: + - 'sanitize' (default True): sets sanitization of the data on or off. + - 'time_fill_method' (default "nearest"): strategy used to align the + returned dataset to cutout time coordinates. Supported values are + xarray reindex methods (e.g. "nearest", "pad", "backfill") and + "interpolate" (or "interp") for time interpolation. + - 'time_interp_method' (default "linear"): interpolation method passed + to xarray when 'time_fill_method' is "interpolate". + + Returns + ------- + xarray.Dataset + Dataset of dask arrays of the retrieved variables. + + """ + coords = cutout.coords + + sanitize = creation_parameters.get("sanitize", True) + + retrieval_params = { + "product": "cems-glofas-historical", + "area": _area(coords), + "chunks": cutout.chunks, + "tmpdir": tmpdir, + "lock": lock, + "data_format": data_format, + } + + def retrieve_once(time): + ds = retrieve_data( + variable=["river_discharge_in_the_last_24_hours"], + **retrieval_params, + **time, + ) + ds = _rename_and_clean_coords(ds) + if sanitize: + ds["discharge"] = ds["discharge"].clip(min=0.0).fillna(0.0) + return ds + + time_chunks = retrieval_times(coords, monthly_requests=monthly_requests) + if concurrent_requests: + delayed_datasets = [delayed(retrieve_once)(chunk) for chunk in time_chunks] + datasets = list(compute(*delayed_datasets)) + else: + datasets = list(map(retrieve_once, time_chunks)) + + ds = xr.concat(datasets, dim="time").sortby("time") + fill_method = creation_parameters.get("time_fill_method", "interpolate") + if fill_method is None: + return ds.reindex(time=coords["time"]) + if isinstance(fill_method, str) and fill_method.lower() in { + "interpolate", + "interp", + }: + interp_method = creation_parameters.get("time_interp_method", "linear") + interpolated = ds.interp(time=coords["time"], method=interp_method) + return interpolated.bfill("time").ffill("time") + return ds.reindex(time=coords["time"], method=fill_method) From 5f3cfecb6b2f9bcb932ea62014ca6f402966b470 Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Wed, 8 Apr 2026 15:25:36 +0200 Subject: [PATCH 02/10] split hydro functionality into runoff and discharge based --- .gitignore | 6 +++ atlite/convert.py | 109 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 1e8e66cc..42d81e53 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,9 @@ paper # Ignore IDE project files .idea/ .vscode +AGENTS.md +uv.lock + +tmp +test_ba +data diff --git a/atlite/convert.py b/atlite/convert.py index 96a2974e..92a8d652 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -979,8 +979,7 @@ def runoff( return result - -def hydro( +def _hydro_from_runoff( cutout, plants, hydrobasins, @@ -1044,6 +1043,112 @@ def hydro( basins, runoff, flowspeed, show_progress ) +def _hydro_from_discharge( + cutout, + plants, +): + """ + Get inflow time-series for `plants` by extracting the discharge time series for + the nearest grid points. + + Parameters + ---------- + plants : pd.DataFrame + Run-of-river plants or dams with lon, lat columns. + """ + print("<"*100) + print("Extracting discharge time series for nearest grid points to plants...") + print("<"*100) + discharge = cutout.data.discharge + inflow = xr.DataArray( + np.zeros((len(plants), discharge.indexes["time"].size)), + [("plant", plants.index), discharge.coords["time"]], + ) + for plant in plants.itertuples(): + # Extract the discharge time series for the nearest point + inflow.loc[dict(plant=plant.Index)] = discharge.sel( + x=plant.lon, y=plant.lat, method="nearest" + ) + return inflow + + +def hydro( + cutout, + plants, + module="auto", + hydrobasins=None, + flowspeed=1, + weight_with_height=False, + show_progress=False, + **kwargs, +): + """ + Get inflow time-series for `plants` by either extracting the discharge time series for + the nearest grid points or by computing runoff-based inflow time series + + Parameters + ---------- + plants : pd.DataFrame + Run-of-river plants or dams with lon, lat columns. + module : str + The method to compute hydro time series. "auto" will prefere discharge but fall back to runoff-based computation + "glofas" will use discharge directly, "era5" will use runoff-based computation + hydrobasins : str|gpd.GeoDataFrame + Filename or GeoDataFrame of one level of the HydroBASINS dataset. Only required + for runoff-based computation. + flowspeed : float + Average speed of water flows to estimate the water travel time from + basin to plant (default: 1 m/s). Only relevant for runoff-based computation. + weight_with_height : bool + Whether surface runoff should be weighted by potential height (probably + better for coarser resolution). Only relevant for runoff-based computation. + show_progress : bool + Whether to display progressbars. Only relevant for runoff-based computation. + **kwargs + Additional arguments for runoff-based computation. Only relevant for runoff-based computation. + """ + if module == "auto": + # Check if discharge data is available in cutout, otherwise use runoff + if "discharge" in cutout.available_features.values: + return _hydro_from_discharge( + cutout, + plants, + ) + if hydrobasins is None or "runoff" not in cutout.available_features.values: + raise ValueError("For runoff-based hydro time series, hydrobasins and runoff data must be provided.") + return _hydro_from_runoff( + cutout, + plants, + hydrobasins, + flowspeed=flowspeed, + weight_with_height=weight_with_height, + show_progress=show_progress, + **kwargs, + ) + elif module.lower() == "glofas": + # Check if discharge data is available in cutout, otherwise raise error + if "discharge" not in cutout.data_vars: + raise ValueError("For GloFAS-based hydro time series, the cutout must include discharge data.") + return _hydro_from_discharge( + cutout, + plants, + ) + elif module.lower() == "era5": + # Check if hydrobasins is provided, otherwise raise error + if hydrobasins is None: + raise ValueError("For ERA5-based hydro time series, the hydrobasins dataset must be provided.") + return _hydro_from_runoff( + cutout, + plants, + hydrobasins, + flowspeed=flowspeed, + weight_with_height=weight_with_height, + show_progress=show_progress, + **kwargs, + ) + else: + raise ValueError(f'Unknown hydro module option "{module}".') + def convert_line_rating( ds, psi, R, D=0.028, Ts=373, epsilon=0.6, alpha=0.6, per_unit=False From 33ef0bfdd08a77ef0f101f40c5934d3c7081247a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 8 Apr 2026 13:40:51 +0000 Subject: [PATCH 03/10] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- atlite/convert.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/atlite/convert.py b/atlite/convert.py index 92a8d652..c2e2bc32 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -979,6 +979,7 @@ def runoff( return result + def _hydro_from_runoff( cutout, plants, @@ -1043,6 +1044,7 @@ def _hydro_from_runoff( basins, runoff, flowspeed, show_progress ) + def _hydro_from_discharge( cutout, plants, @@ -1056,9 +1058,9 @@ def _hydro_from_discharge( plants : pd.DataFrame Run-of-river plants or dams with lon, lat columns. """ - print("<"*100) + print("<" * 100) print("Extracting discharge time series for nearest grid points to plants...") - print("<"*100) + print("<" * 100) discharge = cutout.data.discharge inflow = xr.DataArray( np.zeros((len(plants), discharge.indexes["time"].size)), @@ -1070,7 +1072,7 @@ def _hydro_from_discharge( x=plant.lon, y=plant.lat, method="nearest" ) return inflow - + def hydro( cutout, @@ -1115,7 +1117,9 @@ def hydro( plants, ) if hydrobasins is None or "runoff" not in cutout.available_features.values: - raise ValueError("For runoff-based hydro time series, hydrobasins and runoff data must be provided.") + raise ValueError( + "For runoff-based hydro time series, hydrobasins and runoff data must be provided." + ) return _hydro_from_runoff( cutout, plants, @@ -1128,7 +1132,9 @@ def hydro( elif module.lower() == "glofas": # Check if discharge data is available in cutout, otherwise raise error if "discharge" not in cutout.data_vars: - raise ValueError("For GloFAS-based hydro time series, the cutout must include discharge data.") + raise ValueError( + "For GloFAS-based hydro time series, the cutout must include discharge data." + ) return _hydro_from_discharge( cutout, plants, @@ -1136,7 +1142,9 @@ def hydro( elif module.lower() == "era5": # Check if hydrobasins is provided, otherwise raise error if hydrobasins is None: - raise ValueError("For ERA5-based hydro time series, the hydrobasins dataset must be provided.") + raise ValueError( + "For ERA5-based hydro time series, the hydrobasins dataset must be provided." + ) return _hydro_from_runoff( cutout, plants, From 8d7b645e7fadeb783f4a8a4dac697e2958eb9f21 Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Tue, 9 Jun 2026 09:25:08 +0200 Subject: [PATCH 04/10] remove debug prints from _hydro_from_discharge --- atlite/convert.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/atlite/convert.py b/atlite/convert.py index c2e2bc32..d73804ce 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -1058,9 +1058,6 @@ def _hydro_from_discharge( plants : pd.DataFrame Run-of-river plants or dams with lon, lat columns. """ - print("<" * 100) - print("Extracting discharge time series for nearest grid points to plants...") - print("<" * 100) discharge = cutout.data.discharge inflow = xr.DataArray( np.zeros((len(plants), discharge.indexes["time"].size)), From 000d840df3b7d5910f7761b104ad4f2cebccef94 Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Tue, 9 Jun 2026 13:25:04 +0200 Subject: [PATCH 05/10] build glofas retrieval_times dates with strftime --- atlite/datasets/glofas.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/atlite/datasets/glofas.py b/atlite/datasets/glofas.py index f0fd230c..58e1ef81 100644 --- a/atlite/datasets/glofas.py +++ b/atlite/datasets/glofas.py @@ -317,14 +317,10 @@ def retrieval_times(coords, static=False, monthly_requests=False): """ time = coords["time"].to_index() if static: - # Make sure that day and month are in two digit format - day = str(time[0].day) if time[0].day >= 10 else f"0{time[0].day}" - month = str(time[0].month) if time[0].month >= 10 else f"0{time[0].month}" return { - "hyear": str(time[0].year), - "hmonth": month, - "hday": day, - "time": time[0].strftime("%H:00"), + "hyear": time[0].strftime("%Y"), + "hmonth": time[0].strftime("%m"), + "hday": time[0].strftime("%d"), } # Prepare request for all months and years @@ -333,22 +329,18 @@ def retrieval_times(coords, static=False, monthly_requests=False): t = time[time.year == year] if monthly_requests: for month in t.month.unique(): - # Make sure that day and month are in two digit format - days = list(t[t.month == month].day.unique()) - days_str = [str(d) if d >= 10 else f"0{d}" for d in days] query = { "hyear": str(year), - "hmonth": str(month) if month >= 10 else f"0{month}", - "hday": days_str, + "hmonth": list(t[t.month == month].strftime("%m").unique()), + "hday": list(t[t.month == month].strftime("%d").unique()), } times.append(query) else: - # Make sure that day and month are in two digit format - days = list(t.day.unique()) - days_str = [str(d) if d >= 10 else f"0{d}" for d in days] - months = list(t.month.unique()) - months_str = [str(m) if m >= 10 else f"0{m}" for m in months] - query = {"hyear": str(year), "hmonth": months_str, "hday": days_str} + query = { + "hyear": str(year), + "hmonth": list(t.strftime("%m").unique()), + "hday": list(t.strftime("%d").unique()), + } times.append(query) return times From 3fee4dff98650ef4ae17d83ee2c6b3d2348ddde7 Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Sun, 14 Jun 2026 09:51:43 +0200 Subject: [PATCH 06/10] return glofas discharge at native resolution --- atlite/datasets/glofas.py | 38 +++++++++++++------------------------- 1 file changed, 13 insertions(+), 25 deletions(-) diff --git a/atlite/datasets/glofas.py b/atlite/datasets/glofas.py index 58e1ef81..2e67af3f 100644 --- a/atlite/datasets/glofas.py +++ b/atlite/datasets/glofas.py @@ -97,7 +97,7 @@ def _rename_and_clean_coords(ds, add_lon_lat=True): "dis24": "discharge", } ) - # round coords since cds coords are float32 which would lead to mismatches + # round coords since cds coords are float64 which would lead to mismatches ds = ds.assign_coords( x=np.round(ds.x.astype(float), 5), y=np.round(ds.y.astype(float), 5) ) @@ -230,8 +230,8 @@ def retrieve_data( ... chunks={'time': 1, 'x': 100, 'y': 100}, ... tmpdir='/tmp', ... lock=None, - ... year='2020', - ... month='01', + ... hyear='2020', + ... hmonth='01', ... variable=['river_discharge_in_the_last_24_hours'], ... data_format='grib' ... ) @@ -375,20 +375,15 @@ def get_data( concurrent_requests : bool, optional If True, the monthly data requests are posted concurrently. Only has an effect if `monthly_requests` is True. - **creation_parameters : - Additional keyword arguments: - - 'sanitize' (default True): sets sanitization of the data on or off. - - 'time_fill_method' (default "nearest"): strategy used to align the - returned dataset to cutout time coordinates. Supported values are - xarray reindex methods (e.g. "nearest", "pad", "backfill") and - "interpolate" (or "interp") for time interpolation. - - 'time_interp_method' (default "linear"): interpolation method passed - to xarray when 'time_fill_method' is "interpolate". + **creation_parameters : + Additional keyword arguments: + - 'sanitize' (default True): sets sanitization of the data on or off. Returns ------- xarray.Dataset - Dataset of dask arrays of the retrieved variables. + Dataset of dask arrays of the retrieved variables at the native GLOFAS + resolution (daily values on the native grid). """ coords = cutout.coords @@ -411,6 +406,8 @@ def retrieve_once(time): **time, ) ds = _rename_and_clean_coords(ds) + # dis24 is timestamped at the end of its 24h window; shift to the flow day + ds = ds.assign_coords(time=ds["time"] - np.timedelta64(1, "D")) if sanitize: ds["discharge"] = ds["discharge"].clip(min=0.0).fillna(0.0) return ds @@ -422,15 +419,6 @@ def retrieve_once(time): else: datasets = list(map(retrieve_once, time_chunks)) - ds = xr.concat(datasets, dim="time").sortby("time") - fill_method = creation_parameters.get("time_fill_method", "interpolate") - if fill_method is None: - return ds.reindex(time=coords["time"]) - if isinstance(fill_method, str) and fill_method.lower() in { - "interpolate", - "interp", - }: - interp_method = creation_parameters.get("time_interp_method", "linear") - interpolated = ds.interp(time=coords["time"], method=interp_method) - return interpolated.bfill("time").ffill("time") - return ds.reindex(time=coords["time"], method=fill_method) + # Keep discharge at its native GLOFAS resolution. Interpolation needs to happen + # later to avoid interpolating the whole grid here. + return xr.concat(datasets, dim="time").sortby("time") From e397aea7f04483e8137212702a63741fa38c0a83 Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Sun, 14 Jun 2026 14:22:23 +0200 Subject: [PATCH 07/10] extract shared CDS helpers into cds_helper.py --- atlite/datasets/cds_helper.py | 132 ++++++++++++++++++++++++++++++++++ atlite/datasets/era5.py | 123 ++----------------------------- atlite/datasets/glofas.py | 122 ++----------------------------- 3 files changed, 144 insertions(+), 233 deletions(-) create mode 100644 atlite/datasets/cds_helper.py diff --git a/atlite/datasets/cds_helper.py b/atlite/datasets/cds_helper.py new file mode 100644 index 00000000..88abbb52 --- /dev/null +++ b/atlite/datasets/cds_helper.py @@ -0,0 +1,132 @@ +# SPDX-FileCopyrightText: Contributors to atlite +# +# SPDX-License-Identifier: MIT +""" +Shared helpers for datasets downloaded via the Climate Data Store (CDS). + +Used by both the era5 and glofas dataset modules. +""" + +import logging +import os +import weakref +from pathlib import Path + +import xarray as xr + +logger = logging.getLogger(__name__) + + +def noisy_unlink(path): + """ + Delete file at given path. + """ + logger.debug(f"Deleting file {path}") + try: + os.unlink(path) + except PermissionError: + logger.error(f"Unable to delete file {path}, as it is still in use.") + + +def _area(coords): + # North, West, South, East. Default: global + x0, x1 = coords["x"].min().item(), coords["x"].max().item() + y0, y1 = coords["y"].min().item(), coords["y"].max().item() + return [y1, x0, y0, x1] + + +def sanitize_chunks(chunks, **dim_mapping): + dim_mapping = dict(time="valid_time", x="longitude", y="latitude") | dim_mapping + if not isinstance(chunks, dict): + # preserve "auto" or None + return chunks + + return { + extname: chunks[intname] + for intname, extname in dim_mapping.items() + if intname in chunks + } + + +def add_finalizer(ds: xr.Dataset, target: str | Path): + logger.debug(f"Adding finalizer for {target}") + weakref.finalize(ds._close.__self__.ds, noisy_unlink, target) + + +def open_with_grib_conventions( + grib_file: str | Path, chunks=None, tmpdir: str | Path | None = None +) -> xr.Dataset: + """ + Convert a grib file downloaded from the CDS to netcdf conventions locally. + + The function does the same thing as the CDS backend does, but locally. + This is needed, as the grib file is the recommended download file type for CDS, with conversion to netcdf locally. + The routine is a reduced version based on the documentation here: + https://confluence.ecmwf.int/display/CKB/GRIB+to+netCDF+conversion+on+new+CDS+and+ADS+systems#GRIBtonetCDFconversiononnewCDSandADSsystems-jupiternotebook + + Parameters + ---------- + grib_file : str | Path + Path to the grib file to be converted. + chunks + Chunks + tmpdir : Path, optional + If None adds a finalizer to the dataset object + + Returns + ------- + xr.Dataset + """ + # + # Open grib file as dataset + # Options to open different datasets into a datasets of consistent hypercubes which are compatible netCDF + # There are options that might be relevant for e.g. for wave model data, that have been removed here + # to keep the code cleaner and shorter + ds = xr.open_dataset( + grib_file, + engine="cfgrib", + time_dims=["valid_time"], + ignore_keys=["edition"], + coords_as_attributes=[ + "surface", + "depthBelowLandLayer", + "entireAtmosphere", + "heightAboveGround", + "meanSea", + ], + chunks=sanitize_chunks(chunks), + ) + if tmpdir is None: + add_finalizer(ds, grib_file) + + def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset: + """ + Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset + and that the order of dimensions is preserved. + """ + dims_required = [ + c for c in dataset.coords if c in expand_dims + list(dataset.dims) + ] + dims_missing = [ + (c, i) for i, c in enumerate(dims_required) if c not in dataset.dims + ] + dataset = dataset.expand_dims( + dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing] + ) + return dataset + + logger.debug("Converting grib file to netcdf format") + # Variables and dimensions to rename if they exist in the dataset + rename_vars = { + "time": "forecast_reference_time", + "step": "forecast_period", + "isobaricInhPa": "pressure_level", + "hybrid": "model_level", + } + rename_vars = {k: v for k, v in rename_vars.items() if k in ds} + ds = ds.rename(rename_vars) + + # safely expand dimensions in an xarray dataset to ensure that data for the new dimensions are in the dataset + ds = safely_expand_dims(ds, ["valid_time", "pressure_level", "model_level"]) + + return ds diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index c143ff32..abde524a 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -11,7 +11,6 @@ import logging import os import warnings -import weakref from pathlib import Path from tempfile import mkstemp @@ -24,6 +23,12 @@ from dask.utils import SerializableLock from numpy import atleast_1d +from atlite.datasets.cds_helper import ( + _area, + add_finalizer, + open_with_grib_conventions, + sanitize_chunks, +) from atlite.gis import maybe_swap_spatial_dims from atlite.pv.solar_position import SolarPosition @@ -256,13 +261,6 @@ def get_data_height(retrieval_params): return ds -def _area(coords): - # North, West, South, East. Default: global - x0, x1 = coords["x"].min().item(), coords["x"].max().item() - y0, y1 = coords["y"].min().item(), coords["y"].max().item() - return [y1, x0, y0, x1] - - def retrieval_times(coords, static=False, monthly_requests=False): """ Get list of retrieval cdsapi arguments for time dimension in coordinates. @@ -320,115 +318,6 @@ def retrieval_times(coords, static=False, monthly_requests=False): return times -def noisy_unlink(path): - """ - Delete file at given path. - """ - logger.debug(f"Deleting file {path}") - try: - os.unlink(path) - except PermissionError: - logger.error(f"Unable to delete file {path}, as it is still in use.") - - -def add_finalizer(ds: xr.Dataset, target: str | Path): - logger.debug(f"Adding finalizer for {target}") - weakref.finalize(ds._close.__self__.ds, noisy_unlink, target) - - -def sanitize_chunks(chunks, **dim_mapping): - dim_mapping = dict(time="valid_time", x="longitude", y="latitude") | dim_mapping - if not isinstance(chunks, dict): - # preserve "auto" or None - return chunks - - return { - extname: chunks[intname] - for intname, extname in dim_mapping.items() - if intname in chunks - } - - -def open_with_grib_conventions( - grib_file: str | Path, chunks=None, tmpdir: str | Path | None = None -) -> xr.Dataset: - """ - Convert grib file of ERA5 data from the CDS to netcdf file. - - The function does the same thing as the CDS backend does, but locally. - This is needed, as the grib file is the recommended download file type for CDS, with conversion to netcdf locally. - The routine is a reduced version based on the documentation here: - https://confluence.ecmwf.int/display/CKB/GRIB+to+netCDF+conversion+on+new+CDS+and+ADS+systems#GRIBtonetCDFconversiononnewCDSandADSsystems-jupiternotebook - - Parameters - ---------- - grib_file : str | Path - Path to the grib file to be converted. - chunks - Chunks - tmpdir : Path, optional - If None adds a finalizer to the dataset object - - Returns - ------- - xr.Dataset - """ - # - # Open grib file as dataset - # Options to open different datasets into a datasets of consistent hypercubes which are compatible netCDF - # There are options that might be relevant for e.g. for wave model data, that have been removed here - # to keep the code cleaner and shorter - ds = xr.open_dataset( - grib_file, - engine="cfgrib", - time_dims=["valid_time"], - ignore_keys=["edition"], - # extra_coords={"expver": "valid_time"}, - coords_as_attributes=[ - "surface", - "depthBelowLandLayer", - "entireAtmosphere", - "heightAboveGround", - "meanSea", - ], - chunks=sanitize_chunks(chunks), - ) - if tmpdir is None: - add_finalizer(ds, grib_file) - - def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset: - """ - Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset - and that the order of dimensions is preserved. - """ - dims_required = [ - c for c in dataset.coords if c in expand_dims + list(dataset.dims) - ] - dims_missing = [ - (c, i) for i, c in enumerate(dims_required) if c not in dataset.dims - ] - dataset = dataset.expand_dims( - dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing] - ) - return dataset - - logger.debug("Converting grib file to netcdf format") - # Variables and dimensions to rename if they exist in the dataset - rename_vars = { - "time": "forecast_reference_time", - "step": "forecast_period", - "isobaricInhPa": "pressure_level", - "hybrid": "model_level", - } - rename_vars = {k: v for k, v in rename_vars.items() if k in ds} - ds = ds.rename(rename_vars) - - # safely expand dimensions in an xarray dataset to ensure that data for the new dimensions are in the dataset - ds = safely_expand_dims(ds, ["valid_time", "pressure_level", "model_level"]) - - return ds - - def retrieve_data( product: str, chunks: dict[str, int] | None = None, diff --git a/atlite/datasets/glofas.py b/atlite/datasets/glofas.py index 2e67af3f..94a8c850 100644 --- a/atlite/datasets/glofas.py +++ b/atlite/datasets/glofas.py @@ -10,7 +10,6 @@ import logging import os -import weakref import zipfile from pathlib import Path from tempfile import mkstemp @@ -22,6 +21,12 @@ from dask.utils import SerializableLock from numpy import atleast_1d +from atlite.datasets.cds_helper import ( + _area, + add_finalizer, + open_with_grib_conventions, + sanitize_chunks, +) from atlite.gis import maybe_swap_spatial_dims # Null context for running a with statements wihout any context @@ -46,42 +51,6 @@ def nullcontext(): features = {"discharge": ["discharge"]} -def noisy_unlink(path): - """ - Delete file at given path. - """ - logger.debug(f"Deleting file {path}") - try: - os.unlink(path) - except PermissionError: - logger.error(f"Unable to delete file {path}, as it is still in use.") - - -def _area(coords): - # North, West, South, East. Default: global - x0, x1 = coords["x"].min().item(), coords["x"].max().item() - y0, y1 = coords["y"].min().item(), coords["y"].max().item() - return [y1, x0, y0, x1] - - -def sanitize_chunks(chunks, **dim_mapping): - dim_mapping = dict(time="valid_time", x="longitude", y="latitude") | dim_mapping - if not isinstance(chunks, dict): - # preserve "auto" or None - return chunks - - return { - extname: chunks[intname] - for intname, extname in dim_mapping.items() - if intname in chunks - } - - -def add_finalizer(ds: xr.Dataset, target: str | Path): - logger.debug(f"Adding finalizer for {target}") - weakref.finalize(ds._close.__self__.ds, noisy_unlink, target) - - def _rename_and_clean_coords(ds, add_lon_lat=True): """ Rename 'longitude' and 'latitude' columns to 'x' and 'y' and fix roundings. @@ -109,85 +78,6 @@ def _rename_and_clean_coords(ds, add_lon_lat=True): return ds -def open_with_grib_conventions( - grib_file: str | Path, chunks=None, tmpdir: str | Path | None = None -) -> xr.Dataset: - """ - Convert grib file of Glofas data from the CDS to netcdf file. - - The function does the same thing as the CDS backend does, but locally. - This is needed, as the grib file is the recommended download file type for CDS, with conversion to netcdf locally. - The routine is a reduced version based on the documentation here: - https://confluence.ecmwf.int/display/CKB/GRIB+to+netCDF+conversion+on+new+CDS+and+ADS+systems#GRIBtonetCDFconversiononnewCDSandADSsystems-jupiternotebook - - Parameters - ---------- - grib_file : str | Path - Path to the grib file to be converted. - chunks - Chunks - tmpdir : Path, optional - If None adds a finalizer to the dataset object - - Returns - ------- - xr.Dataset - """ - # - # Open grib file as dataset - # Options to open different datasets into a datasets of consistent hypercubes which are compatible netCDF - # There are options that might be relevant for e.g. for wave model data, that have been removed here - # to keep the code cleaner and shorter - ds = xr.open_dataset( - grib_file, - engine="cfgrib", - time_dims=["valid_time"], - ignore_keys=["edition"], - coords_as_attributes=[ - "surface", - "depthBelowLandLayer", - "entireAtmosphere", - "heightAboveGround", - "meanSea", - ], - chunks=sanitize_chunks(chunks), - ) - if tmpdir is None: - add_finalizer(ds, grib_file) - - def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset: - """ - Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset - and that the order of dimensions is preserved. - """ - dims_required = [ - c for c in dataset.coords if c in expand_dims + list(dataset.dims) - ] - dims_missing = [ - (c, i) for i, c in enumerate(dims_required) if c not in dataset.dims - ] - dataset = dataset.expand_dims( - dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing] - ) - return dataset - - logger.debug("Converting grib file to netcdf format") - # Variables and dimensions to rename if they exist in the dataset - rename_vars = { - "time": "forecast_reference_time", - "step": "forecast_period", - "isobaricInhPa": "pressure_level", - "hybrid": "model_level", - } - rename_vars = {k: v for k, v in rename_vars.items() if k in ds} - ds = ds.rename(rename_vars) - - # safely expand dimensions in an xarray dataset to ensure that data for the new dimensions are in the dataset - ds = safely_expand_dims(ds, ["valid_time", "pressure_level", "model_level"]) - - return ds - - def retrieve_data( product: str, chunks: dict[str, int] | None = None, From 5dcc17673a93b7675657ecd5dcecfb1411da6522 Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Sun, 14 Jun 2026 20:39:45 +0200 Subject: [PATCH 08/10] snap plants to nearest cell and interpolate hydro discharge --- atlite/convert.py | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/atlite/convert.py b/atlite/convert.py index d73804ce..ed52dc0c 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -1048,33 +1048,43 @@ def _hydro_from_runoff( def _hydro_from_discharge( cutout, plants, + time=None, ): """ - Get inflow time-series for `plants` by extracting the discharge time series for - the nearest grid points. + Get inflow time-series for `plants` from GLOFAS discharge by snapping each + plant to the nearest grid cell that holds data and interpolating onto the + target time index. Parameters ---------- plants : pd.DataFrame Run-of-river plants or dams with lon, lat columns. + time : pd.DatetimeIndex, optional + Time index to interpolate the plant inflow onto. Defaults to the cutout's + own time index. """ + if time is None: + time = cutout.coords["time"] discharge = cutout.data.discharge - inflow = xr.DataArray( - np.zeros((len(plants), discharge.indexes["time"].size)), - [("plant", plants.index), discharge.coords["time"]], + # snap plants to GLOFAS cells with data (cutout grid points may be all-NaN) + present = discharge.isel(time=0).notnull() + discharge = discharge.isel( + x=np.flatnonzero(present.any("y").values), + y=np.flatnonzero(present.any("x").values), ) - for plant in plants.itertuples(): - # Extract the discharge time series for the nearest point - inflow.loc[dict(plant=plant.Index)] = discharge.sel( - x=plant.lon, y=plant.lat, method="nearest" - ) - return inflow + x = xr.DataArray(plants["lon"].values, dims="plant", coords={"plant": plants.index}) + y = xr.DataArray(plants["lat"].values, dims="plant", coords={"plant": plants.index}) + inflow = discharge.sel(x=x, y=y, method="nearest").compute() + inflow = inflow.dropna("time", how="all").interp(time=time) + inflow = inflow.ffill("time").bfill("time") + return inflow.transpose("plant", "time") def hydro( cutout, plants, module="auto", + time=None, hydrobasins=None, flowspeed=1, weight_with_height=False, @@ -1092,6 +1102,9 @@ def hydro( module : str The method to compute hydro time series. "auto" will prefere discharge but fall back to runoff-based computation "glofas" will use discharge directly, "era5" will use runoff-based computation + time : pd.DatetimeIndex, optional + Time index to interpolate the plant inflow onto. Only relevant for + discharge-based computation. Defaults to the cutout's own time index. hydrobasins : str|gpd.GeoDataFrame Filename or GeoDataFrame of one level of the HydroBASINS dataset. Only required for runoff-based computation. @@ -1112,6 +1125,7 @@ def hydro( return _hydro_from_discharge( cutout, plants, + time=time, ) if hydrobasins is None or "runoff" not in cutout.available_features.values: raise ValueError( @@ -1135,6 +1149,7 @@ def hydro( return _hydro_from_discharge( cutout, plants, + time=time, ) elif module.lower() == "era5": # Check if hydrobasins is provided, otherwise raise error From 989c0de537d17e68c14c1827c99baf612d065d3b Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Mon, 15 Jun 2026 14:51:44 +0200 Subject: [PATCH 09/10] use cutout.data.data_vars consistently for hydro data checks --- atlite/convert.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/atlite/convert.py b/atlite/convert.py index ed52dc0c..15186a04 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -1119,15 +1119,15 @@ def hydro( **kwargs Additional arguments for runoff-based computation. Only relevant for runoff-based computation. """ - if module == "auto": + if module.lower() == "auto": # Check if discharge data is available in cutout, otherwise use runoff - if "discharge" in cutout.available_features.values: + if "discharge" in cutout.data.data_vars: return _hydro_from_discharge( cutout, plants, time=time, ) - if hydrobasins is None or "runoff" not in cutout.available_features.values: + if hydrobasins is None or "runoff" not in cutout.data.data_vars: raise ValueError( "For runoff-based hydro time series, hydrobasins and runoff data must be provided." ) @@ -1142,7 +1142,7 @@ def hydro( ) elif module.lower() == "glofas": # Check if discharge data is available in cutout, otherwise raise error - if "discharge" not in cutout.data_vars: + if "discharge" not in cutout.data.data_vars: raise ValueError( "For GloFAS-based hydro time series, the cutout must include discharge data." ) From befe311b58550da05e21e38bc3475c9b886fc01e Mon Sep 17 00:00:00 2001 From: StuberSimon Date: Tue, 16 Jun 2026 19:16:10 +0200 Subject: [PATCH 10/10] move hydro inflow helpers from convert.py into hydro.py --- atlite/convert.py | 108 ++-------------------------------------------- atlite/hydro.py | 100 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 104 deletions(-) diff --git a/atlite/convert.py b/atlite/convert.py index 15186a04..246717d8 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -980,106 +980,6 @@ def runoff( return result -def _hydro_from_runoff( - cutout, - plants, - hydrobasins, - flowspeed=1, - weight_with_height=False, - show_progress=False, - **kwargs, -): - """ - Compute inflow time-series for `plants` by aggregating over catchment - basins from `hydrobasins` - - Parameters - ---------- - plants : pd.DataFrame - Run-of-river plants or dams with lon, lat columns. - hydrobasins : str|gpd.GeoDataFrame - Filename or GeoDataFrame of one level of the HydroBASINS dataset. - flowspeed : float - Average speed of water flows to estimate the water travel time from - basin to plant (default: 1 m/s). - weight_with_height : bool - Whether surface runoff should be weighted by potential height (probably - better for coarser resolution). - show_progress : bool - Whether to display progressbars. - - References - ---------- - [1] Liu, Hailiang, et al. "A validated high-resolution hydro power - time-series model for energy systems analysis." arXiv preprint - arXiv:1901.08476 (2019). - - [2] Lehner, B., Grill G. (2013): Global river hydrography and network - routing: baseline data and new approaches to study the world’s large river - systems. Hydrological Processes, 27(15): 2171–2186. Data is available at - www.hydrosheds.org. - - """ - basins = hydrom.determine_basins(plants, hydrobasins, show_progress=show_progress) - - matrix = cutout.indicatormatrix(basins.shapes) - # compute the average surface runoff in each basin - # Fix NaN and Inf values to 0.0 to avoid numerical issues - matrix_normalized = np.nan_to_num( - matrix / matrix.sum(axis=1), nan=0.0, posinf=0.0, neginf=0.0 - ) - runoff = cutout.runoff( - matrix=matrix_normalized, - index=basins.shapes.index, - weight_with_height=weight_with_height, - show_progress=show_progress, - **kwargs, - ) - # The hydrological parameters are in units of "m of water per day" and so - # they should be multiplied by 1000 and the basin area to convert to m3 - # d-1 = m3 h-1 / 24 - runoff *= xr.DataArray(basins.shapes.to_crs(dict(proj="cea")).area) - - return hydrom.shift_and_aggregate_runoff_for_plants( - basins, runoff, flowspeed, show_progress - ) - - -def _hydro_from_discharge( - cutout, - plants, - time=None, -): - """ - Get inflow time-series for `plants` from GLOFAS discharge by snapping each - plant to the nearest grid cell that holds data and interpolating onto the - target time index. - - Parameters - ---------- - plants : pd.DataFrame - Run-of-river plants or dams with lon, lat columns. - time : pd.DatetimeIndex, optional - Time index to interpolate the plant inflow onto. Defaults to the cutout's - own time index. - """ - if time is None: - time = cutout.coords["time"] - discharge = cutout.data.discharge - # snap plants to GLOFAS cells with data (cutout grid points may be all-NaN) - present = discharge.isel(time=0).notnull() - discharge = discharge.isel( - x=np.flatnonzero(present.any("y").values), - y=np.flatnonzero(present.any("x").values), - ) - x = xr.DataArray(plants["lon"].values, dims="plant", coords={"plant": plants.index}) - y = xr.DataArray(plants["lat"].values, dims="plant", coords={"plant": plants.index}) - inflow = discharge.sel(x=x, y=y, method="nearest").compute() - inflow = inflow.dropna("time", how="all").interp(time=time) - inflow = inflow.ffill("time").bfill("time") - return inflow.transpose("plant", "time") - - def hydro( cutout, plants, @@ -1122,7 +1022,7 @@ def hydro( if module.lower() == "auto": # Check if discharge data is available in cutout, otherwise use runoff if "discharge" in cutout.data.data_vars: - return _hydro_from_discharge( + return hydrom._hydro_from_discharge( cutout, plants, time=time, @@ -1131,7 +1031,7 @@ def hydro( raise ValueError( "For runoff-based hydro time series, hydrobasins and runoff data must be provided." ) - return _hydro_from_runoff( + return hydrom._hydro_from_runoff( cutout, plants, hydrobasins, @@ -1146,7 +1046,7 @@ def hydro( raise ValueError( "For GloFAS-based hydro time series, the cutout must include discharge data." ) - return _hydro_from_discharge( + return hydrom._hydro_from_discharge( cutout, plants, time=time, @@ -1157,7 +1057,7 @@ def hydro( raise ValueError( "For ERA5-based hydro time series, the hydrobasins dataset must be provided." ) - return _hydro_from_runoff( + return hydrom._hydro_from_runoff( cutout, plants, hydrobasins, diff --git a/atlite/hydro.py b/atlite/hydro.py index df08aacc..29de8409 100644 --- a/atlite/hydro.py +++ b/atlite/hydro.py @@ -102,3 +102,103 @@ def shift_and_aggregate_runoff_for_plants( inflow_plant += runoff.sel(hid=b).roll(time=nhours.at[b]) return inflow + + +def _hydro_from_runoff( + cutout, + plants, + hydrobasins, + flowspeed=1, + weight_with_height=False, + show_progress=False, + **kwargs, +): + """ + Compute inflow time-series for `plants` by aggregating over catchment + basins from `hydrobasins` (ERA5 runoff-based computation). + + Parameters + ---------- + plants : pd.DataFrame + Run-of-river plants or dams with lon, lat columns. + hydrobasins : str|gpd.GeoDataFrame + Filename or GeoDataFrame of one level of the HydroBASINS dataset. + flowspeed : float + Average speed of water flows to estimate the water travel time from + basin to plant (default: 1 m/s). + weight_with_height : bool + Whether surface runoff should be weighted by potential height (probably + better for coarser resolution). + show_progress : bool + Whether to display progressbars. + + References + ---------- + [1] Liu, Hailiang, et al. "A validated high-resolution hydro power + time-series model for energy systems analysis." arXiv preprint + arXiv:1901.08476 (2019). + + [2] Lehner, B., Grill G. (2013): Global river hydrography and network + routing: baseline data and new approaches to study the world’s large river + systems. Hydrological Processes, 27(15): 2171–2186. Data is available at + www.hydrosheds.org. + + """ + basins = determine_basins(plants, hydrobasins, show_progress=show_progress) + + matrix = cutout.indicatormatrix(basins.shapes) + # compute the average surface runoff in each basin + # Fix NaN and Inf values to 0.0 to avoid numerical issues + matrix_normalized = np.nan_to_num( + matrix / matrix.sum(axis=1), nan=0.0, posinf=0.0, neginf=0.0 + ) + runoff = cutout.runoff( + matrix=matrix_normalized, + index=basins.shapes.index, + weight_with_height=weight_with_height, + show_progress=show_progress, + **kwargs, + ) + # The hydrological parameters are in units of "m of water per day" and so + # they should be multiplied by 1000 and the basin area to convert to m3 + # d-1 = m3 h-1 / 24 + runoff *= xr.DataArray(basins.shapes.to_crs(dict(proj="cea")).area) + + return shift_and_aggregate_runoff_for_plants( + basins, runoff, flowspeed, show_progress + ) + + +def _hydro_from_discharge( + cutout, + plants, + time=None, +): + """ + Get inflow time-series for `plants` from GLOFAS discharge by snapping each + plant to the nearest grid cell that holds data and interpolating onto the + target time index. + + Parameters + ---------- + plants : pd.DataFrame + Run-of-river plants or dams with lon, lat columns. + time : pd.DatetimeIndex, optional + Time index to interpolate the plant inflow onto. Defaults to the cutout's + own time index. + """ + if time is None: + time = cutout.coords["time"] + discharge = cutout.data.discharge + # snap plants to GLOFAS cells with data (cutout grid points may be all-NaN) + present = discharge.isel(time=0).notnull() + discharge = discharge.isel( + x=np.flatnonzero(present.any("y").values), + y=np.flatnonzero(present.any("x").values), + ) + x = xr.DataArray(plants["lon"].values, dims="plant", coords={"plant": plants.index}) + y = xr.DataArray(plants["lat"].values, dims="plant", coords={"plant": plants.index}) + inflow = discharge.sel(x=x, y=y, method="nearest").compute() + inflow = inflow.dropna("time", how="all").interp(time=time) + inflow = inflow.ffill("time").bfill("time") + return inflow.transpose("plant", "time")