diff --git a/.gitignore b/.gitignore index 1e8e66cc..42d81e53 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,9 @@ paper # Ignore IDE project files .idea/ .vscode +AGENTS.md +uv.lock + +tmp +test_ba +data diff --git a/atlite/convert.py b/atlite/convert.py index 96a2974e..246717d8 100644 --- a/atlite/convert.py +++ b/atlite/convert.py @@ -983,66 +983,91 @@ def runoff( def hydro( cutout, plants, - hydrobasins, + module="auto", + time=None, + hydrobasins=None, flowspeed=1, weight_with_height=False, show_progress=False, **kwargs, ): """ - Compute inflow time-series for `plants` by aggregating over catchment - basins from `hydrobasins` + Get inflow time-series for `plants` by either extracting the discharge time series for + the nearest grid points or by computing runoff-based inflow time series Parameters ---------- plants : pd.DataFrame Run-of-river plants or dams with lon, lat columns. + module : str + The method to compute hydro time series. "auto" will prefere discharge but fall back to runoff-based computation + "glofas" will use discharge directly, "era5" will use runoff-based computation + time : pd.DatetimeIndex, optional + Time index to interpolate the plant inflow onto. Only relevant for + discharge-based computation. Defaults to the cutout's own time index. hydrobasins : str|gpd.GeoDataFrame - Filename or GeoDataFrame of one level of the HydroBASINS dataset. + Filename or GeoDataFrame of one level of the HydroBASINS dataset. Only required + for runoff-based computation. flowspeed : float Average speed of water flows to estimate the water travel time from - basin to plant (default: 1 m/s). + basin to plant (default: 1 m/s). Only relevant for runoff-based computation. weight_with_height : bool Whether surface runoff should be weighted by potential height (probably - better for coarser resolution). + better for coarser resolution). Only relevant for runoff-based computation. show_progress : bool - Whether to display progressbars. - - References - ---------- - [1] Liu, Hailiang, et al. "A validated high-resolution hydro power - time-series model for energy systems analysis." arXiv preprint - arXiv:1901.08476 (2019). - - [2] Lehner, B., Grill G. (2013): Global river hydrography and network - routing: baseline data and new approaches to study the world’s large river - systems. Hydrological Processes, 27(15): 2171–2186. Data is available at - www.hydrosheds.org. - + Whether to display progressbars. Only relevant for runoff-based computation. + **kwargs + Additional arguments for runoff-based computation. Only relevant for runoff-based computation. """ - basins = hydrom.determine_basins(plants, hydrobasins, show_progress=show_progress) - - matrix = cutout.indicatormatrix(basins.shapes) - # compute the average surface runoff in each basin - # Fix NaN and Inf values to 0.0 to avoid numerical issues - matrix_normalized = np.nan_to_num( - matrix / matrix.sum(axis=1), nan=0.0, posinf=0.0, neginf=0.0 - ) - runoff = cutout.runoff( - matrix=matrix_normalized, - index=basins.shapes.index, - weight_with_height=weight_with_height, - show_progress=show_progress, - **kwargs, - ) - # The hydrological parameters are in units of "m of water per day" and so - # they should be multiplied by 1000 and the basin area to convert to m3 - # d-1 = m3 h-1 / 24 - runoff *= xr.DataArray(basins.shapes.to_crs(dict(proj="cea")).area) - - return hydrom.shift_and_aggregate_runoff_for_plants( - basins, runoff, flowspeed, show_progress - ) + if module.lower() == "auto": + # Check if discharge data is available in cutout, otherwise use runoff + if "discharge" in cutout.data.data_vars: + return hydrom._hydro_from_discharge( + cutout, + plants, + time=time, + ) + if hydrobasins is None or "runoff" not in cutout.data.data_vars: + raise ValueError( + "For runoff-based hydro time series, hydrobasins and runoff data must be provided." + ) + return hydrom._hydro_from_runoff( + cutout, + plants, + hydrobasins, + flowspeed=flowspeed, + weight_with_height=weight_with_height, + show_progress=show_progress, + **kwargs, + ) + elif module.lower() == "glofas": + # Check if discharge data is available in cutout, otherwise raise error + if "discharge" not in cutout.data.data_vars: + raise ValueError( + "For GloFAS-based hydro time series, the cutout must include discharge data." + ) + return hydrom._hydro_from_discharge( + cutout, + plants, + time=time, + ) + elif module.lower() == "era5": + # Check if hydrobasins is provided, otherwise raise error + if hydrobasins is None: + raise ValueError( + "For ERA5-based hydro time series, the hydrobasins dataset must be provided." + ) + return hydrom._hydro_from_runoff( + cutout, + plants, + hydrobasins, + flowspeed=flowspeed, + weight_with_height=weight_with_height, + show_progress=show_progress, + **kwargs, + ) + else: + raise ValueError(f'Unknown hydro module option "{module}".') def convert_line_rating( diff --git a/atlite/datasets/__init__.py b/atlite/datasets/__init__.py index 045c59d8..52784f18 100644 --- a/atlite/datasets/__init__.py +++ b/atlite/datasets/__init__.py @@ -6,6 +6,6 @@ atlite datasets. """ -from atlite.datasets import era5, gebco, sarah +from atlite.datasets import era5, gebco, glofas, sarah -modules = {"era5": era5, "sarah": sarah, "gebco": gebco} +modules = {"era5": era5, "sarah": sarah, "gebco": gebco, "glofas": glofas} diff --git a/atlite/datasets/cds_helper.py b/atlite/datasets/cds_helper.py new file mode 100644 index 00000000..88abbb52 --- /dev/null +++ b/atlite/datasets/cds_helper.py @@ -0,0 +1,132 @@ +# SPDX-FileCopyrightText: Contributors to atlite +# +# SPDX-License-Identifier: MIT +""" +Shared helpers for datasets downloaded via the Climate Data Store (CDS). + +Used by both the era5 and glofas dataset modules. +""" + +import logging +import os +import weakref +from pathlib import Path + +import xarray as xr + +logger = logging.getLogger(__name__) + + +def noisy_unlink(path): + """ + Delete file at given path. + """ + logger.debug(f"Deleting file {path}") + try: + os.unlink(path) + except PermissionError: + logger.error(f"Unable to delete file {path}, as it is still in use.") + + +def _area(coords): + # North, West, South, East. Default: global + x0, x1 = coords["x"].min().item(), coords["x"].max().item() + y0, y1 = coords["y"].min().item(), coords["y"].max().item() + return [y1, x0, y0, x1] + + +def sanitize_chunks(chunks, **dim_mapping): + dim_mapping = dict(time="valid_time", x="longitude", y="latitude") | dim_mapping + if not isinstance(chunks, dict): + # preserve "auto" or None + return chunks + + return { + extname: chunks[intname] + for intname, extname in dim_mapping.items() + if intname in chunks + } + + +def add_finalizer(ds: xr.Dataset, target: str | Path): + logger.debug(f"Adding finalizer for {target}") + weakref.finalize(ds._close.__self__.ds, noisy_unlink, target) + + +def open_with_grib_conventions( + grib_file: str | Path, chunks=None, tmpdir: str | Path | None = None +) -> xr.Dataset: + """ + Convert a grib file downloaded from the CDS to netcdf conventions locally. + + The function does the same thing as the CDS backend does, but locally. + This is needed, as the grib file is the recommended download file type for CDS, with conversion to netcdf locally. + The routine is a reduced version based on the documentation here: + https://confluence.ecmwf.int/display/CKB/GRIB+to+netCDF+conversion+on+new+CDS+and+ADS+systems#GRIBtonetCDFconversiononnewCDSandADSsystems-jupiternotebook + + Parameters + ---------- + grib_file : str | Path + Path to the grib file to be converted. + chunks + Chunks + tmpdir : Path, optional + If None adds a finalizer to the dataset object + + Returns + ------- + xr.Dataset + """ + # + # Open grib file as dataset + # Options to open different datasets into a datasets of consistent hypercubes which are compatible netCDF + # There are options that might be relevant for e.g. for wave model data, that have been removed here + # to keep the code cleaner and shorter + ds = xr.open_dataset( + grib_file, + engine="cfgrib", + time_dims=["valid_time"], + ignore_keys=["edition"], + coords_as_attributes=[ + "surface", + "depthBelowLandLayer", + "entireAtmosphere", + "heightAboveGround", + "meanSea", + ], + chunks=sanitize_chunks(chunks), + ) + if tmpdir is None: + add_finalizer(ds, grib_file) + + def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset: + """ + Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset + and that the order of dimensions is preserved. + """ + dims_required = [ + c for c in dataset.coords if c in expand_dims + list(dataset.dims) + ] + dims_missing = [ + (c, i) for i, c in enumerate(dims_required) if c not in dataset.dims + ] + dataset = dataset.expand_dims( + dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing] + ) + return dataset + + logger.debug("Converting grib file to netcdf format") + # Variables and dimensions to rename if they exist in the dataset + rename_vars = { + "time": "forecast_reference_time", + "step": "forecast_period", + "isobaricInhPa": "pressure_level", + "hybrid": "model_level", + } + rename_vars = {k: v for k, v in rename_vars.items() if k in ds} + ds = ds.rename(rename_vars) + + # safely expand dimensions in an xarray dataset to ensure that data for the new dimensions are in the dataset + ds = safely_expand_dims(ds, ["valid_time", "pressure_level", "model_level"]) + + return ds diff --git a/atlite/datasets/era5.py b/atlite/datasets/era5.py index c143ff32..abde524a 100644 --- a/atlite/datasets/era5.py +++ b/atlite/datasets/era5.py @@ -11,7 +11,6 @@ import logging import os import warnings -import weakref from pathlib import Path from tempfile import mkstemp @@ -24,6 +23,12 @@ from dask.utils import SerializableLock from numpy import atleast_1d +from atlite.datasets.cds_helper import ( + _area, + add_finalizer, + open_with_grib_conventions, + sanitize_chunks, +) from atlite.gis import maybe_swap_spatial_dims from atlite.pv.solar_position import SolarPosition @@ -256,13 +261,6 @@ def get_data_height(retrieval_params): return ds -def _area(coords): - # North, West, South, East. Default: global - x0, x1 = coords["x"].min().item(), coords["x"].max().item() - y0, y1 = coords["y"].min().item(), coords["y"].max().item() - return [y1, x0, y0, x1] - - def retrieval_times(coords, static=False, monthly_requests=False): """ Get list of retrieval cdsapi arguments for time dimension in coordinates. @@ -320,115 +318,6 @@ def retrieval_times(coords, static=False, monthly_requests=False): return times -def noisy_unlink(path): - """ - Delete file at given path. - """ - logger.debug(f"Deleting file {path}") - try: - os.unlink(path) - except PermissionError: - logger.error(f"Unable to delete file {path}, as it is still in use.") - - -def add_finalizer(ds: xr.Dataset, target: str | Path): - logger.debug(f"Adding finalizer for {target}") - weakref.finalize(ds._close.__self__.ds, noisy_unlink, target) - - -def sanitize_chunks(chunks, **dim_mapping): - dim_mapping = dict(time="valid_time", x="longitude", y="latitude") | dim_mapping - if not isinstance(chunks, dict): - # preserve "auto" or None - return chunks - - return { - extname: chunks[intname] - for intname, extname in dim_mapping.items() - if intname in chunks - } - - -def open_with_grib_conventions( - grib_file: str | Path, chunks=None, tmpdir: str | Path | None = None -) -> xr.Dataset: - """ - Convert grib file of ERA5 data from the CDS to netcdf file. - - The function does the same thing as the CDS backend does, but locally. - This is needed, as the grib file is the recommended download file type for CDS, with conversion to netcdf locally. - The routine is a reduced version based on the documentation here: - https://confluence.ecmwf.int/display/CKB/GRIB+to+netCDF+conversion+on+new+CDS+and+ADS+systems#GRIBtonetCDFconversiononnewCDSandADSsystems-jupiternotebook - - Parameters - ---------- - grib_file : str | Path - Path to the grib file to be converted. - chunks - Chunks - tmpdir : Path, optional - If None adds a finalizer to the dataset object - - Returns - ------- - xr.Dataset - """ - # - # Open grib file as dataset - # Options to open different datasets into a datasets of consistent hypercubes which are compatible netCDF - # There are options that might be relevant for e.g. for wave model data, that have been removed here - # to keep the code cleaner and shorter - ds = xr.open_dataset( - grib_file, - engine="cfgrib", - time_dims=["valid_time"], - ignore_keys=["edition"], - # extra_coords={"expver": "valid_time"}, - coords_as_attributes=[ - "surface", - "depthBelowLandLayer", - "entireAtmosphere", - "heightAboveGround", - "meanSea", - ], - chunks=sanitize_chunks(chunks), - ) - if tmpdir is None: - add_finalizer(ds, grib_file) - - def safely_expand_dims(dataset: xr.Dataset, expand_dims: list[str]) -> xr.Dataset: - """ - Expand dimensions in an xarray dataset, ensuring that the new dimensions are not already in the dataset - and that the order of dimensions is preserved. - """ - dims_required = [ - c for c in dataset.coords if c in expand_dims + list(dataset.dims) - ] - dims_missing = [ - (c, i) for i, c in enumerate(dims_required) if c not in dataset.dims - ] - dataset = dataset.expand_dims( - dim=[x[0] for x in dims_missing], axis=[x[1] for x in dims_missing] - ) - return dataset - - logger.debug("Converting grib file to netcdf format") - # Variables and dimensions to rename if they exist in the dataset - rename_vars = { - "time": "forecast_reference_time", - "step": "forecast_period", - "isobaricInhPa": "pressure_level", - "hybrid": "model_level", - } - rename_vars = {k: v for k, v in rename_vars.items() if k in ds} - ds = ds.rename(rename_vars) - - # safely expand dimensions in an xarray dataset to ensure that data for the new dimensions are in the dataset - ds = safely_expand_dims(ds, ["valid_time", "pressure_level", "model_level"]) - - return ds - - def retrieve_data( product: str, chunks: dict[str, int] | None = None, diff --git a/atlite/datasets/glofas.py b/atlite/datasets/glofas.py new file mode 100644 index 00000000..94a8c850 --- /dev/null +++ b/atlite/datasets/glofas.py @@ -0,0 +1,314 @@ +# SPDX-FileCopyrightText: Contributors to atlite +# +# SPDX-License-Identifier: MIT +""" +Module for downloading and curating data from ECMWFs GLOFAS dataset (via CDS). + +For further reference see +https://ewds.climate.copernicus.eu/datasets/cems-glofas-historical?tab=overview +""" + +import logging +import os +import zipfile +from pathlib import Path +from tempfile import mkstemp + +import cdsapi +import numpy as np +import xarray as xr +from dask import compute, delayed +from dask.utils import SerializableLock +from numpy import atleast_1d + +from atlite.datasets.cds_helper import ( + _area, + add_finalizer, + open_with_grib_conventions, + sanitize_chunks, +) +from atlite.gis import maybe_swap_spatial_dims + +# Null context for running a with statements wihout any context +try: + from contextlib import nullcontext +except ImportError: + # for Python verions < 3.7: + import contextlib + + @contextlib.contextmanager + def nullcontext(): + yield + + +logger = logging.getLogger(__name__) + +# Model and CRS Settings +crs = 4326 + +dataset = "cems-glofas-historical" + +features = {"discharge": ["discharge"]} + + +def _rename_and_clean_coords(ds, add_lon_lat=True): + """ + Rename 'longitude' and 'latitude' columns to 'x' and 'y' and fix roundings. + + Optionally (add_lon_lat, default:True) preserves latitude and + longitude columns as 'lat' and 'lon'. + """ + ds = ds.rename( + { + "longitude": "x", + "latitude": "y", + "valid_time": "time", + "dis24": "discharge", + } + ) + # round coords since cds coords are float64 which would lead to mismatches + ds = ds.assign_coords( + x=np.round(ds.x.astype(float), 5), y=np.round(ds.y.astype(float), 5) + ) + ds = maybe_swap_spatial_dims(ds) + if add_lon_lat: + ds = ds.assign_coords(lon=ds.coords["x"], lat=ds.coords["y"]) + ds = ds.drop_vars(["expver", "number"], errors="ignore") + + return ds + + +def retrieve_data( + product: str, + chunks: dict[str, int] | None = None, + tmpdir: str | Path | None = None, + lock: SerializableLock | None = None, + **updates, +) -> xr.Dataset: + """ + Download data like Glofas from the Climate Data Store (CDS). + + If you want to track the state of your request go to + https://cds-beta.climate.copernicus.eu/requests?tab=all + + Parameters + ---------- + product : str + Product name, e.g. 'cems-glofas-historical'. + chunks : dict, optional + Chunking for xarray dataset, e.g. {'time': 1, 'x': 100, 'y': 100}. + Default is None. + tmpdir : str, optional + Directory where the downloaded data is temporarily stored. + Default is None, which uses the system's temporary directory. + lock : dask.utils.SerializableLock, optional + Lock for thread-safe file writing. Default is None. + updates : dict + Additional parameters for the request. + Must include 'year', 'month', and 'variable'. + Can include e.g. 'data_format'. + + Returns + ------- + xarray.Dataset + Dataset with the retrieved variables. + + Examples + -------- + >>> ds = retrieve_data( + ... product='cems-glofas-historical', + ... chunks={'time': 1, 'x': 100, 'y': 100}, + ... tmpdir='/tmp', + ... lock=None, + ... hyear='2020', + ... hmonth='01', + ... variable=['river_discharge_in_the_last_24_hours'], + ... data_format='grib' + ... ) + """ + request = { + "system_version": ["version_4_0"], + "hydrological_model": ["lisflood"], + "product_type": ["consolidated"], + "variable": ["river_discharge_in_the_last_24_hours"], + "data_format": "grib", + "download_format": "zip", + } + request.update(updates) + + assert {"hyear", "hmonth", "variable"}.issubset(request), ( + "Need to specify at least 'variable', 'hyear' and 'hmonth'" + ) + + logger.debug(f"Requesting {product} with API request: {request}") + # Url needs to be set manually here, overrides url from .cdsapirc (for use with multiple modules) + client = cdsapi.Client( + info_callback=logger.debug, + debug=logging.DEBUG >= logging.root.level, + url="https://ewds.climate.copernicus.eu/api", + ) + result = client.retrieve(product, request) + + if lock is None: + lock = nullcontext() + + suffix = f".{request['data_format']}" # .netcdf or .grib + with lock: + fd, target = mkstemp(suffix=suffix, dir=tmpdir) + os.close(fd) + + # Inform user about data being downloaded as "* variable (year-month)" + timestr = f"{request['hyear']}-{request['hmonth']}" + variables = atleast_1d(request["variable"]) + varstr = "\n\t".join([f"{v} ({timestr})" for v in variables]) + logger.info(f"CDS: Downloading variables\n\t{varstr}\n") + result.download(target) + + # Extract data if downloaded as zip file + if request.get("download_format") == "zip": + with zipfile.ZipFile(target, "r") as zip_ref: + zip_ref.extractall(Path(tmpdir) / Path(target).stem) + os.unlink(target) # delete the zip file after extraction + target = Path(tmpdir) / Path(Path(target).stem) / Path("data" + suffix) + + # Convert from grib to netcdf locally, same conversion as in CDS backend + if request["data_format"] == "grib": + ds = open_with_grib_conventions(target, chunks=chunks, tmpdir=tmpdir) + else: + ds = xr.open_dataset(target, chunks=sanitize_chunks(chunks)) + if tmpdir is None: + add_finalizer(target) + return ds + + +def retrieval_times(coords, static=False, monthly_requests=False): + """ + Get list of retrieval cdsapi arguments for time dimension in coordinates. + + If static is False, this function creates a query for each month and year + in the time axis in coords. This ensures not running into size query limits + of the cdsapi even with very (spatially) large cutouts. + If static is True, the function return only one set of parameters + for the very first time point. + + Parameters + ---------- + coords : atlite.Cutout.coords + static : bool, optional + monthly_requests : bool, optional + If True, the data is requested on a monthly basis. This is useful for + large cutouts, where the data is requested in smaller chunks. The + default is False + + Returns + ------- + list of dicts witht retrieval arguments + + """ + time = coords["time"].to_index() + if static: + return { + "hyear": time[0].strftime("%Y"), + "hmonth": time[0].strftime("%m"), + "hday": time[0].strftime("%d"), + } + + # Prepare request for all months and years + times = [] + for year in time.year.unique(): + t = time[time.year == year] + if monthly_requests: + for month in t.month.unique(): + query = { + "hyear": str(year), + "hmonth": list(t[t.month == month].strftime("%m").unique()), + "hday": list(t[t.month == month].strftime("%d").unique()), + } + times.append(query) + else: + query = { + "hyear": str(year), + "hmonth": list(t.strftime("%m").unique()), + "hday": list(t.strftime("%d").unique()), + } + times.append(query) + return times + + +def get_data( + cutout, + feature, + tmpdir="tmp", + lock=None, + data_format="grib", + monthly_requests=False, + concurrent_requests=False, + **creation_parameters, +): + """ + Retrieve data from ECMWFs GLOFAS dataset (via CDS). + + This front-end function downloads data for a specific feature and formats + it to match the given Cutout. + + Parameters + ---------- + cutout : atlite.Cutout + feature : str + Name of the feature data to retrieve. Must be in + `atlite.datasets.glofas.features` + tmpdir : str/Path + Directory where the temporary netcdf files are stored. + data_format : str, optional + The format of the data to be downloaded. Can be either 'grib' or 'netcdf', + 'grib' highly recommended because CDSAPI limits request size for netcdf. + concurrent_requests : bool, optional + If True, the monthly data requests are posted concurrently. + Only has an effect if `monthly_requests` is True. + **creation_parameters : + Additional keyword arguments: + - 'sanitize' (default True): sets sanitization of the data on or off. + + Returns + ------- + xarray.Dataset + Dataset of dask arrays of the retrieved variables at the native GLOFAS + resolution (daily values on the native grid). + + """ + coords = cutout.coords + + sanitize = creation_parameters.get("sanitize", True) + + retrieval_params = { + "product": "cems-glofas-historical", + "area": _area(coords), + "chunks": cutout.chunks, + "tmpdir": tmpdir, + "lock": lock, + "data_format": data_format, + } + + def retrieve_once(time): + ds = retrieve_data( + variable=["river_discharge_in_the_last_24_hours"], + **retrieval_params, + **time, + ) + ds = _rename_and_clean_coords(ds) + # dis24 is timestamped at the end of its 24h window; shift to the flow day + ds = ds.assign_coords(time=ds["time"] - np.timedelta64(1, "D")) + if sanitize: + ds["discharge"] = ds["discharge"].clip(min=0.0).fillna(0.0) + return ds + + time_chunks = retrieval_times(coords, monthly_requests=monthly_requests) + if concurrent_requests: + delayed_datasets = [delayed(retrieve_once)(chunk) for chunk in time_chunks] + datasets = list(compute(*delayed_datasets)) + else: + datasets = list(map(retrieve_once, time_chunks)) + + # Keep discharge at its native GLOFAS resolution. Interpolation needs to happen + # later to avoid interpolating the whole grid here. + return xr.concat(datasets, dim="time").sortby("time") diff --git a/atlite/hydro.py b/atlite/hydro.py index df08aacc..29de8409 100644 --- a/atlite/hydro.py +++ b/atlite/hydro.py @@ -102,3 +102,103 @@ def shift_and_aggregate_runoff_for_plants( inflow_plant += runoff.sel(hid=b).roll(time=nhours.at[b]) return inflow + + +def _hydro_from_runoff( + cutout, + plants, + hydrobasins, + flowspeed=1, + weight_with_height=False, + show_progress=False, + **kwargs, +): + """ + Compute inflow time-series for `plants` by aggregating over catchment + basins from `hydrobasins` (ERA5 runoff-based computation). + + Parameters + ---------- + plants : pd.DataFrame + Run-of-river plants or dams with lon, lat columns. + hydrobasins : str|gpd.GeoDataFrame + Filename or GeoDataFrame of one level of the HydroBASINS dataset. + flowspeed : float + Average speed of water flows to estimate the water travel time from + basin to plant (default: 1 m/s). + weight_with_height : bool + Whether surface runoff should be weighted by potential height (probably + better for coarser resolution). + show_progress : bool + Whether to display progressbars. + + References + ---------- + [1] Liu, Hailiang, et al. "A validated high-resolution hydro power + time-series model for energy systems analysis." arXiv preprint + arXiv:1901.08476 (2019). + + [2] Lehner, B., Grill G. (2013): Global river hydrography and network + routing: baseline data and new approaches to study the world’s large river + systems. Hydrological Processes, 27(15): 2171–2186. Data is available at + www.hydrosheds.org. + + """ + basins = determine_basins(plants, hydrobasins, show_progress=show_progress) + + matrix = cutout.indicatormatrix(basins.shapes) + # compute the average surface runoff in each basin + # Fix NaN and Inf values to 0.0 to avoid numerical issues + matrix_normalized = np.nan_to_num( + matrix / matrix.sum(axis=1), nan=0.0, posinf=0.0, neginf=0.0 + ) + runoff = cutout.runoff( + matrix=matrix_normalized, + index=basins.shapes.index, + weight_with_height=weight_with_height, + show_progress=show_progress, + **kwargs, + ) + # The hydrological parameters are in units of "m of water per day" and so + # they should be multiplied by 1000 and the basin area to convert to m3 + # d-1 = m3 h-1 / 24 + runoff *= xr.DataArray(basins.shapes.to_crs(dict(proj="cea")).area) + + return shift_and_aggregate_runoff_for_plants( + basins, runoff, flowspeed, show_progress + ) + + +def _hydro_from_discharge( + cutout, + plants, + time=None, +): + """ + Get inflow time-series for `plants` from GLOFAS discharge by snapping each + plant to the nearest grid cell that holds data and interpolating onto the + target time index. + + Parameters + ---------- + plants : pd.DataFrame + Run-of-river plants or dams with lon, lat columns. + time : pd.DatetimeIndex, optional + Time index to interpolate the plant inflow onto. Defaults to the cutout's + own time index. + """ + if time is None: + time = cutout.coords["time"] + discharge = cutout.data.discharge + # snap plants to GLOFAS cells with data (cutout grid points may be all-NaN) + present = discharge.isel(time=0).notnull() + discharge = discharge.isel( + x=np.flatnonzero(present.any("y").values), + y=np.flatnonzero(present.any("x").values), + ) + x = xr.DataArray(plants["lon"].values, dims="plant", coords={"plant": plants.index}) + y = xr.DataArray(plants["lat"].values, dims="plant", coords={"plant": plants.index}) + inflow = discharge.sel(x=x, y=y, method="nearest").compute() + inflow = inflow.dropna("time", how="all").interp(time=time) + inflow = inflow.ffill("time").bfill("time") + return inflow.transpose("plant", "time")