From d9e8c7e425c948856688068acf511de24cdb503c Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Wed, 24 Jun 2026 10:57:56 -0500 Subject: [PATCH] feat(wateruse): add water-use module for the USGS NWDC API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `dataretrieval.wateruse` for USGS National Water Availability Assessment Data Companion (NWDC) water-use estimates — modeled on a HUC12 grid and queryable by state, county, or hydrologic unit. This is the modern replacement for the defunct legacy NWIS water-use service (`nwis.get_water_use` now points callers here). from dataretrieval import wateruse df, md = wateruse.get_wateruse( model="wu-public-supply-wd", variable=["pswdtot", "pswdgw", "pswdsw"], state="RI", start_date="2020-01", time_resolution="monthly", ) The NWDC is a plain CSV REST service, not an OGC API Features collection, so the module supplies the NWDC-specific pieces (CSV parsing, the RFC 8288 Link-header pagination cursor, the `{detail}` error envelope, and state/county/huc location builders) but reuses the OGC engine's generic transport rather than re-implementing it: the shared pager (`_paginate`), the Jupyter-safe anyio sync bridge (`_run_sync`), response/frame aggregation, and `_default_headers`. It keeps the package conventions where they fit — a `(DataFrame, BaseMetadata)` return, the typed `DataRetrievalError` taxonomy (surfacing the NWDC `detail`), `API_USGS_PAT` token support, idiomatic snake_case params, and `state` / `county` / `huc` selectors that each accept a value or a list (a list fans out one concurrent request per location). Large areas paginate transparently. A `FutureWarning` flags the module as experimental, since the NWDC service is new and still changing. Extracting the reusable engine seams also de-duplicated the engine itself (~-66 LOC, behavior-preserving): `planning._merge_response` now backs both pagination and fan-out aggregation; a generic `utils.Ambient[T]` contextvar-with-scope helper collapses the per-call ambients; and `x-ratelimit-remaining` now reports the lowest value any concurrent sub-request saw (the quota actually left after a fan-out), fixing a latent inaccuracy in the OGC chunker too. Includes offline pytest-httpx coverage, a reference page, a README example, and a demo notebook. Co-Authored-By: Claude Opus 4.8 Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd --- AGENTS.md | 2 +- README.md | 34 ++- dataretrieval/__init__.py | 5 +- dataretrieval/nwis.py | 13 +- dataretrieval/ogc/chunking.py | 51 +--- dataretrieval/ogc/engine.py | 269 +++++++------------ dataretrieval/ogc/planning.py | 84 ++++-- dataretrieval/utils.py | 96 ++++++- dataretrieval/waterdata/stats.py | 3 +- dataretrieval/waterdata/utils.py | 5 +- dataretrieval/wateruse.py | 415 ++++++++++++++++++++++++++++ demos/USGS_WaterUse_Examples.ipynb | 228 ++++++++++++++++ docs/source/reference/index.rst | 1 + docs/source/reference/wateruse.rst | 7 + tests/waterdata_chunking_test.py | 23 +- tests/waterdata_test.py | 4 +- tests/waterdata_utils_test.py | 2 +- tests/wateruse_test.py | 416 +++++++++++++++++++++++++++++ 18 files changed, 1395 insertions(+), 263 deletions(-) create mode 100644 dataretrieval/wateruse.py create mode 100644 demos/USGS_WaterUse_Examples.ipynb create mode 100644 docs/source/reference/wateruse.rst create mode 100644 tests/wateruse_test.py diff --git a/AGENTS.md b/AGENTS.md index 91c07b24..455c108c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -6,7 +6,7 @@ - Exclude `.claude/worktrees/` from searches and edits; it contains stale worktrees that pollute results. ## Example Notebooks -- `demos/*.ipynb` — top-level Water Data tour: `USGS_WaterData_Introduction_Examples.ipynb` is the entry point; `_ContinuousData_`, `_DailyStatistics_`, `_DiscreteSamples_`, `_ReferenceLists_` cover individual collections; `WaterData_demo.ipynb`, `peak_streamflow_trends.ipynb`, and `R Python Vignette equivalents.ipynb` are standalone walkthroughs. +- `demos/*.ipynb` — top-level Water Data tour: `USGS_WaterData_Introduction_Examples.ipynb` is the entry point; `_ContinuousData_`, `_DailyStatistics_`, `_DiscreteSamples_`, `_ReferenceLists_` cover individual collections; `WaterData_demo.ipynb`, `peak_streamflow_trends.ipynb`, `USGS_WaterUse_Examples.ipynb` (NWDC water-use data via `wateruse.get_wateruse`), and `R Python Vignette equivalents.ipynb` are standalone walkthroughs. - `demos/hydroshare/*.ipynb` — per-service HydroShare examples (NLDI, NWIS WaterUse, and Water Data DailyValues / GroundwaterLevels / Measurements / ParameterCodes / Peaks / Ratings / Samples / SiteInfo / SiteInventory / Statistics / UnitValues). Mirror these when adding examples for a new collection. - `demos/nwqn_data_pull/` — non-notebook example: a lithops/Docker batch pipeline (`retrieve_nwqn_samples.py`, `retrieve_nwqn_streamflow.py`) with its own `README.md`. - Any `Untitled*.ipynb`, `*_test.ipynb`, or notebooks not listed here are untracked local scratch; ignore them. diff --git a/README.md b/README.md index bf3a319d..f7ce3039 100644 --- a/README.md +++ b/README.md @@ -133,7 +133,7 @@ sites, metadata = ngwmn.get_sites(state='Wisconsin') print(f"Found {len(sites)} NGWMN sites in Wisconsin") -# Pull water levels from the first twenty sites over a time window. +# Pull water levels from the first twenty sites over a time window. water_levels, metadata = ngwmn.get_water_level( monitoring_location_id=sites['monitoring_location_id'][:20], datetime=['2022-01-01', '2024-01-01'] @@ -192,6 +192,31 @@ flowlines = nldi.get_flowlines( print(f"Found {len(flowlines)} upstream tributaries within 50km") ``` +### Water Use (NWDC) + +Retrieve modeled water-use estimates from the National Water Availability +Assessment Data Companion: + +```python +from dataretrieval import wateruse + +# Monthly public-supply withdrawals for Rhode Island, split into +# groundwater and surface-water sources (returns a DataFrame and metadata). +df, metadata = wateruse.get_wateruse( + model='wu-public-supply-wd', + variable=['pswdtot', 'pswdgw', 'pswdsw'], + state='RI', # name/postal/FIPS; pass a list to fan out over several areas + start_date='2020-01', + time_resolution='monthly', +) + +print(f"Retrieved {len(df)} records across {df['huc12_id'].nunique()} watersheds") + +# Aggregate the HUC12 grid to a statewide monthly total (million gallons/day) +statewide = df.groupby('year_month')['pswdtot_mgd'].sum() +print(statewide.head()) +``` + ## Available Data Services ### Modern USGS Water Data APIs (Recommended) — `dataretrieval.waterdata` @@ -232,6 +257,13 @@ print(f"Found {len(flowlines)} upstream tributaries within 50km") - `get_features`: Find monitoring sites, dams, and other features along the network - `get_features_by_data_source`: Features from a specific data source +### Water Use (NWDC) +- **Public supply**: Modeled public-supply withdrawals and consumptive use +- **Irrigation**: Modeled irrigation withdrawals and consumptive use +- **Thermoelectric**: Modeled thermoelectric-power water use +- **HUC12 estimates**: National coverage on a 12-digit hydrologic-unit grid, + summarizable to counties, states, or coarser hydrologic units + ## More Examples Explore additional examples in the diff --git a/dataretrieval/__init__.py b/dataretrieval/__init__.py index 97420f43..c9df1c45 100644 --- a/dataretrieval/__init__.py +++ b/dataretrieval/__init__.py @@ -11,7 +11,8 @@ df, meta = nwis.get_dv(sites="05427718") Available service modules: ``waterdata``, ``wqp`` (Water Quality Portal), -``nldi``, ``streamstats``, and the deprecated ``nwis``. +``wateruse`` (NWDC water-use data), ``nldi``, ``streamstats``, and the +deprecated ``nwis``. ``nldi`` requires geopandas (``pip install dataretrieval[nldi]``) and is imported on demand: ``from dataretrieval import nldi``. @@ -62,6 +63,7 @@ streamstats, utils, waterdata, + wateruse, wqp, ) @@ -72,6 +74,7 @@ "streamstats", "utils", "waterdata", + "wateruse", "wqp", # error taxonomy (canonical home: ``dataretrieval.exceptions``), re-exported # so callers can ``except dataretrieval.DataRetrievalError`` diff --git a/dataretrieval/nwis.py b/dataretrieval/nwis.py index 49b91a51..25e9cf8f 100644 --- a/dataretrieval/nwis.py +++ b/dataretrieval/nwis.py @@ -741,8 +741,17 @@ def get_pmcodes(**kwargs: Any) -> NoReturn: def get_water_use(**kwargs: Any) -> NoReturn: - """Defunct: no current replacement.""" - raise NameError("`nwis.get_water_use` is defunct.") + """Defunct: use ``dataretrieval.wateruse.get_wateruse`` instead. + + The legacy NWIS water-use service has been retired. Modeled water-use + estimates are now served by the National Water Availability Assessment Data + Companion (NWDC); retrieve them with + :func:`dataretrieval.wateruse.get_wateruse`. + """ + raise NameError( + "`nwis.get_water_use` is defunct; use " + "`dataretrieval.wateruse.get_wateruse` instead." + ) @_deprecated diff --git a/dataretrieval/ogc/chunking.py b/dataretrieval/ogc/chunking.py index b7fff335..00d2766c 100644 --- a/dataretrieval/ogc/chunking.py +++ b/dataretrieval/ogc/chunking.py @@ -69,15 +69,14 @@ import functools import os from collections.abc import Callable, Iterator -from contextlib import contextmanager -from contextvars import ContextVar, copy_context +from contextvars import copy_context from typing import Any, cast import httpx import pandas as pd from anyio.from_thread import start_blocking_portal -from dataretrieval.utils import HTTPX_DEFAULTS +from dataretrieval.utils import HTTPX_DEFAULTS, Ambient from . import progress as _progress from .interruptions import ( @@ -106,9 +105,6 @@ _OGC_URL_BYTE_LIMIT = 8000 -# Response header USGS uses to advertise remaining hourly quota. -_QUOTA_HEADER = "x-ratelimit-remaining" - # Fan-out concurrency cap, read at call time (not import) so test # ``monkeypatch.setenv`` applies. Value grammar in :func:`_read_concurrency_env`; # the concurrency model is in the module docstring. @@ -152,38 +148,11 @@ def _read_concurrency_env() -> int | None: return value -# Shared per-call ``httpx.AsyncClient``, published via :func:`_publish` -# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``) -# reuse the same connection pool across every sub-request. ``None`` -# outside a chunked call — paginated helpers then open their own -# short-lived client. -_chunked_client: ContextVar[httpx.AsyncClient | None] = ContextVar( - "_chunked_client", default=None -) - - -@contextmanager -def _publish(client: httpx.AsyncClient) -> Iterator[None]: - """ - Publish ``client`` on the ``_chunked_client`` ContextVar so the - paginated-loop helpers can borrow it via :func:`get_active_client` - for the duration of the ``with`` block. - - Parameters - ---------- - client : httpx.AsyncClient - The client to publish. - - Yields - ------ - None - Yields once, for the duration of the bind. - """ - token = _chunked_client.set(client) - try: - yield - finally: - _chunked_client.reset(token) +# Shared per-call ``httpx.AsyncClient``, scoped via ``with _chunked_client(c):`` +# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``) reuse +# the same connection pool across every sub-request. ``None`` outside a chunked +# call — paginated helpers then open their own short-lived client. +_chunked_client: Ambient[httpx.AsyncClient | None] = Ambient("_chunked_client", None) def get_active_client() -> httpx.AsyncClient | None: @@ -197,8 +166,8 @@ def get_active_client() -> httpx.AsyncClient | None: Returns ------- httpx.AsyncClient or None - The client published via :func:`_publish` if currently inside a - :class:`ChunkedCall` run; ``None`` otherwise. + The client scoped via ``with _chunked_client(...)`` if currently inside + a :class:`ChunkedCall` run; ``None`` otherwise. """ return _chunked_client.get() @@ -541,7 +510,7 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]: ) async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client: - with _publish(client): + with _chunked_client(client): reporter = _progress.current() if reporter is not None: reporter.set_chunks(self.plan.total) diff --git a/dataretrieval/ogc/engine.py b/dataretrieval/ogc/engine.py index 2fe89440..ab4b8371 100644 --- a/dataretrieval/ogc/engine.py +++ b/dataretrieval/ogc/engine.py @@ -24,44 +24,42 @@ from __future__ import annotations -import copy import functools import json import logging import numbers -import os import re from collections.abc import ( AsyncIterator, Awaitable, Callable, Iterable, - Iterator, Mapping, ) -from contextlib import asynccontextmanager, contextmanager -from contextvars import ContextVar +from contextlib import asynccontextmanager from dataclasses import dataclass, field -from datetime import timedelta from typing import Any, TypeVar, cast import httpx import pandas as pd from anyio.from_thread import start_blocking_portal -from dataretrieval import __version__ from dataretrieval.exceptions import DataRetrievalError from dataretrieval.ogc import chunking from dataretrieval.ogc import progress as _progress -from dataretrieval.ogc.chunking import ( - _QUOTA_HEADER, - get_active_client, -) +from dataretrieval.ogc.chunking import get_active_client from dataretrieval.ogc.dates import _DATE_RANGE_PARAMS, _format_api_dates from dataretrieval.ogc.errors import _paginated_failure_message, _raise_for_non_200 -from dataretrieval.ogc.planning import _safe_elapsed +from dataretrieval.ogc.planning import _QUOTA_HEADER, _merge_response, _safe_elapsed from dataretrieval.ogc.shaping import GEOPANDAS, _finalize_ogc, _get_resp_data -from dataretrieval.utils import HTTPX_DEFAULTS, BaseMetadata, _get, _network_error +from dataretrieval.utils import ( + HTTPX_DEFAULTS, + Ambient, + BaseMetadata, + _default_headers, + _get, + _network_error, +) # Set up logger for this module logger = logging.getLogger(__name__) @@ -233,36 +231,14 @@ def _cql2_param(args: dict[str, Any]) -> str: which roughly doubles how many monitoring-location ids fit in one sub-request and so halves the chunk count for large id lists. """ - filters = [] - for key, values in args.items(): - filters.append({"op": "in", "args": [{"property": key}, values]}) - - query = {"op": "and", "args": filters} - - return json.dumps(query, separators=(",", ":")) - - -def _default_headers() -> dict[str, str]: - """ - Generate default HTTP headers for API requests. - - Returns - ------- - dict - A dictionary containing default headers including 'Accept-Encoding', - 'Accept', 'User-Agent', and 'lang'. If the environment variable - 'API_USGS_PAT' is set, its value is included as the 'X-Api-Key' header. - """ - headers = { - "Accept-Encoding": "compress, gzip", - "Accept": "application/json", - "User-Agent": f"python-dataretrieval/{__version__}", - "lang": "en-US", + query = { + "op": "and", + "args": [ + {"op": "in", "args": [{"property": key}, values]} + for key, values in args.items() + ], } - token = os.getenv("API_USGS_PAT") - if token: - headers["X-Api-Key"] = token - return headers + return json.dumps(query, separators=(",", ":")) def _check_ogc_requests(endpoint: str, req_type: str = "queryables") -> dict[str, Any]: @@ -294,7 +270,7 @@ def _check_ogc_requests(endpoint: str, req_type: str = "queryables") -> dict[str """ if req_type not in ("queryables", "schema"): raise ValueError(f"req_type must be 'queryables' or 'schema', got {req_type!r}") - url = f"{_ogc_base_url_var.get()}/collections/{endpoint}/{req_type}" + url = f"{_ogc_base_url.get()}/collections/{endpoint}/{req_type}" resp = _get(url, headers=_default_headers(), **HTTPX_DEFAULTS) _raise_for_non_200(resp) # ``Response.json`` is typed ``Any``; the OGC queryables/schema endpoints @@ -374,8 +350,8 @@ def _construct_api_requests( ----- - Date/time parameters are automatically formatted to ISO8601. """ - service_url = f"{_ogc_base_url_var.get()}/collections/{service}/items" - dialect = _dialect_var.get() + service_url = f"{_ogc_base_url.get()}/collections/{service}/items" + dialect = _dialect.get() # Format date/time parameters to ISO8601 first — both routing paths need it. for key in _DATE_RANGE_PARAMS: @@ -473,7 +449,7 @@ def _construct_cql_request( httpx.Request A POST request with ``Content-Type: application/query-cql-json``. """ - service_url = f"{_ogc_base_url_var.get()}/collections/{service}/items" + service_url = f"{_ogc_base_url.get()}/collections/{service}/items" params = _ogc_query_params( {}, properties=properties, @@ -605,108 +581,25 @@ async def _client_for( yield new -def _aggregate_paginated_response( - initial: httpx.Response, - last: httpx.Response, - total_elapsed: timedelta, -) -> httpx.Response: - """ - Build a single response covering a paginated call. - - Returns a shallow copy of ``initial`` with ``.headers`` set to the - LAST page's (so downstream sees current ``x-ratelimit-remaining``) - and ``.elapsed`` set to total wall-clock. The canonical - ``initial.url`` is preserved (it's the user's original query). - Both ``initial`` and ``last`` are left unmutated, mirroring the - convention of - :func:`dataretrieval.ogc.planning._combine_chunk_responses`. - - Parameters - ---------- - initial : httpx.Response - First-page response (the canonical one for ``md.url``). - last : httpx.Response - Last-page response — supplies the headers to copy over. - total_elapsed : datetime.timedelta - Cumulative wall-clock across every page, including ``initial``. - - Returns - ------- - httpx.Response - A shallow copy of ``initial`` with ``.headers`` set to a fresh - ``httpx.Headers`` and ``.elapsed`` set to the cumulative - wall-clock. ``initial.headers`` / ``initial.elapsed`` are - never mutated, so callers holding a pre-pagination reference - still see the original first-page values. - """ - final = copy.copy(initial) - final.headers = httpx.Headers(last.headers) - final.elapsed = total_elapsed - return final - - _Cursor = TypeVar("_Cursor") -# Optional cap on the total rows a single paginated call accumulates before it -# stops following ``next`` links. ``None`` (the default the data getters use) -# means "no cap — fetch the whole series". Set via :func:`_row_cap` so the deep -# ``_paginate`` loop can honor it without threading the value through the -# generic chunker; this mirrors the ``_progress`` ambient-reporter pattern. -_row_cap_var: ContextVar[int | None] = ContextVar("ogc_row_cap", default=None) - - -@contextmanager -def _row_cap(max_rows: int | None) -> Iterator[None]: - """Cap the rows any :func:`_paginate` under this context will - accumulate (``None`` = uncapped). Used by :func:`get_reference_table` - to preview large tables without downloading every page.""" - token = _row_cap_var.set(max_rows) - try: - yield - finally: - _row_cap_var.reset(token) - +# Ambient per-call state the generic chunker would otherwise have to thread +# through to the deep request builder / paginate loop. Each is read with +# ``.get()`` and scoped with ``with _x(value):``; the defaults leave every +# existing getter unaffected. (Mirrors the ``_progress`` ambient-reporter.) -# OGC base URL for the active request. ``get_ogc_data`` sets it per call so the -# shared request builder (:func:`_construct_api_requests`) can target either the -# main Water Data API or the NGWMN sub-API without threading the value through -# the generic chunker; this mirrors the ``_row_cap`` ambient pattern. The -# default is the main API, so every existing getter is unaffected. -_ogc_base_url_var: ContextVar[str] = ContextVar("ogc_base_url", default=OGC_API_URL) +# Optional cap on the rows one paginated call accumulates before it stops +# following ``next`` links (``None`` = uncapped). Set by :func:`get_reference_table` +# to preview large tables without downloading every page. +_row_cap: Ambient[int | None] = Ambient("ogc_row_cap", None) +# OGC base URL the shared request builder (:func:`_construct_api_requests`) +# targets — the main Water Data API or, for NGWMN collections, their own base. +_ogc_base_url: Ambient[str] = Ambient("ogc_base_url", OGC_API_URL) -@contextmanager -def _ogc_base_url(base_url: str) -> Iterator[None]: - """Point :func:`_construct_api_requests` (and the chunk planner that calls - it) at ``base_url`` for the duration of the block. Used by - :func:`get_ogc_data` to serve NGWMN collections from their own OGC base.""" - token = _ogc_base_url_var.set(base_url) - try: - yield - finally: - _ogc_base_url_var.reset(token) - - -# Per-call OGC dialect (which services need POST/CQL2, which use date-only time -# args). ``get_ogc_data`` sets it so the shared request builder -# (:func:`_construct_api_requests`) can adapt to the active API without -# threading the value through the generic chunker; this mirrors the -# ``_ogc_base_url`` ambient pattern. The default is a plain OGC API. -_dialect_var: ContextVar[OgcDialect] = ContextVar( - "ogc_dialect", default=_DEFAULT_DIALECT -) - - -@contextmanager -def _dialect(dialect: OgcDialect) -> Iterator[None]: - """Make ``dialect`` the active :class:`OgcDialect` that - :func:`_construct_api_requests` reads for CQL2-vs-GET routing and - date-only formatting, for the duration of the block.""" - token = _dialect_var.set(dialect) - try: - yield - finally: - _dialect_var.reset(token) +# Per-call OGC dialect the request builder reads for CQL2-vs-GET routing and +# date-only formatting (default: a plain OGC API). +_dialect: Ambient[OgcDialect] = Ambient("ogc_dialect", _DEFAULT_DIALECT) async def _paginate( @@ -715,6 +608,7 @@ async def _paginate( parse_response: Callable[[httpx.Response], tuple[pd.DataFrame, _Cursor | None]], follow_up: Callable[[_Cursor, httpx.AsyncClient], Awaitable[httpx.Response]], client: httpx.AsyncClient | None = None, + raise_for_status: Callable[[httpx.Response], None] = _raise_for_non_200, ) -> tuple[pd.DataFrame, httpx.Response]: """ Drive a paginated request to completion over an @@ -745,6 +639,10 @@ async def _paginate( Caller-borrowed client. ``None`` (default) means use the chunker's shared client (if inside a chunked call) or open a temporary one. + raise_for_status : callable, optional + ``resp -> None``; raises the typed error for a non-OK response. + Defaults to :func:`_raise_for_non_200` (the OGC ``{code, description}`` + envelope); wateruse passes its own to surface the NWDC ``detail``. Returns ------- @@ -775,9 +673,19 @@ async def _paginate( """ logger.debug("Requesting: %s", initial_req.url) reporter = _progress.current() + + def report_page(page: httpx.Response, frame: pd.DataFrame) -> None: + """Tick the ambient progress reporter (a no-op when unset) for one page.""" + if reporter is not None: + reporter.set_rate_remaining( + page.headers.get(_QUOTA_HEADER), + limit=page.headers.get("x-ratelimit-limit"), + ) + reporter.add_page(rows=len(frame)) + async with _client_for(client) as sess: resp = await sess.send(initial_req) - _raise_for_non_200(resp) + raise_for_status(resp) initial_response = resp total_elapsed = _safe_elapsed(resp) @@ -794,28 +702,25 @@ async def _paginate( # Stop following ``next`` links once the optional row cap is reached # (see :func:`_row_cap`); ``None`` means uncapped. The concatenation # is sliced to the cap below so a final over-budget page can't exceed it. - cap = _row_cap_var.get() + cap = _row_cap.get() nrows = len(df) - if reporter is not None: - reporter.set_rate_remaining( - resp.headers.get(_QUOTA_HEADER), - limit=resp.headers.get("x-ratelimit-limit"), - ) - reporter.add_page(rows=len(df)) - while cursor is not None and (cap is None or nrows < cap): + # Guard a non-advancing or cyclic cursor (a server bug that would + # otherwise loop forever). OGC's next-URLs are unique, so this never + # fires for them; the Link-header pagers (e.g. wateruse) rely on it. + seen: set[Any] = set() + report_page(resp, df) + while ( + cursor is not None and cursor not in seen and (cap is None or nrows < cap) + ): + seen.add(cursor) try: resp = await follow_up(cursor, sess) - _raise_for_non_200(resp) + raise_for_status(resp) df, cursor = parse_response(resp) dfs.append(df) nrows += len(df) total_elapsed += _safe_elapsed(resp) - if reporter is not None: - reporter.set_rate_remaining( - resp.headers.get(_QUOTA_HEADER), - limit=resp.headers.get("x-ratelimit-limit"), - ) - reporter.add_page(rows=len(df)) + report_page(resp, df) except Exception as e: # noqa: BLE001 logger.warning( "Request failed at cursor %r. Data download interrupted.", @@ -823,12 +728,13 @@ async def _paginate( ) raise DataRetrievalError(_paginated_failure_message(len(dfs), e)) from e - # Aggregate headers / elapsed onto a COPY of the initial - # response so the user's caller never sees an in-place - # mutation of the response object they may have inspected - # mid-pagination via a hook or test fixture. - final_response = _aggregate_paginated_response( - initial_response, resp, total_elapsed + # Fold the pages onto a COPY of the initial response so a caller that + # inspected it mid-pagination (a hook, a test fixture) never sees an + # in-place mutation. ``resp`` is the last page, whose headers carry the + # current ``x-ratelimit-remaining`` (monotonic, so the last page is the + # most depleted) — the same low-level merge the fan-out aggregation uses. + final_response = _merge_response( + initial_response, headers_from=resp, elapsed=total_elapsed ) result = pd.concat(dfs, ignore_index=True) if cap is not None: @@ -1039,6 +945,7 @@ def _run_sync( make_coro: Callable[[], Awaitable[tuple[pd.DataFrame, httpx.Response]]], *, service: str, + error_url: str | httpx.URL | None = None, ) -> tuple[pd.DataFrame, httpx.Response]: """Drive an async OGC fetch to completion from synchronous code. @@ -1051,6 +958,11 @@ def _run_sync( Shared by the non-chunked fetch paths; the chunked OGC getters drive their own portal inside :meth:`chunking.ChunkedCall.resume`. + + A connection failure on the initial request is surfaced as a typed + ``NetworkError`` against ``error_url`` when given (callers that build their + own requests, e.g. ``wateruse``), else the request-builder base the caller + scoped via ``_ogc_base_url`` (the OGC / NGWMN getters). """ with _progress.progress_context(service=service): with start_blocking_portal() as portal: @@ -1064,9 +976,15 @@ def _run_sync( except httpx.TransportError as exc: # The initial-request connection failure ``_paginate`` lets # through raw; mid-pagination failures are already typed. - # Report the base URL actually targeted (NGWMN/sibling APIs - # set their own via ``_ogc_base_url``), not a hardcoded host. - raise _network_error(_ogc_base_url_var.get(), exc) from exc + # Report the base URL actually targeted: callers that build + # their own requests (``wateruse``) pass ``error_url``; the OGC + # getters leave it unset and fall back to the request-builder + # base they scoped via ``_ogc_base_url`` (NGWMN/sibling APIs set + # their own), not a hardcoded host. + raise _network_error( + error_url if error_url is not None else _ogc_base_url.get(), + exc, + ) from exc # ``AGENCY-ID``: a hyphen-separated agency prefix and local id. The local id @@ -1187,19 +1105,14 @@ def _check_monitoring_location_id( if value is None: return None for item in (value,) if isinstance(value, str) else value: - _check_id_format(item) + if not _MONITORING_LOCATION_ID_RE.fullmatch(item): + raise ValueError( + f"Invalid monitoring_location_id: {item!r}. " + f"Expected 'AGENCY-ID' format, e.g., 'USGS-01646500'." + ) return value -def _check_id_format(value: str) -> None: - """Raise ``ValueError`` if ``value`` is not in ``AGENCY-ID`` format.""" - if not _MONITORING_LOCATION_ID_RE.fullmatch(value): - raise ValueError( - f"Invalid monitoring_location_id: {value!r}. " - f"Expected 'AGENCY-ID' format, e.g., 'USGS-01646500'." - ) - - def _get_args( local_vars: dict[str, Any], exclude: set[str] | None = None, diff --git a/dataretrieval/ogc/planning.py b/dataretrieval/ogc/planning.py index 3e3c1ccf..68c337ae 100644 --- a/dataretrieval/ogc/planning.py +++ b/dataretrieval/ogc/planning.py @@ -563,6 +563,56 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: return combined +# Response header USGS uses to advertise remaining hourly quota. Lives in this +# base module so every layer (planning's ``_lowest_remaining``, the engine's +# per-page progress) reads it from one place rather than hard-coding the string. +_QUOTA_HEADER = "x-ratelimit-remaining" + + +def _lowest_remaining(responses: list[httpx.Response]) -> httpx.Response: + """The response reporting the lowest ``x-ratelimit-remaining``. + + The rate-limit counter decreases monotonically within a window, so the + smallest value any sub-request saw is the most-current "quota left after + this call" — the right thing to surface. Under concurrent fan-out the + last response *by index* need not be the one the server processed last, so + pick the minimum (falling back to the last response if none report it). + """ + best: httpx.Response | None = None + best_remaining: int | None = None + for response in responses: + try: + remaining = int(response.headers[_QUOTA_HEADER]) + except (KeyError, ValueError): + continue + if best_remaining is None or remaining < best_remaining: + best, best_remaining = response, remaining + return best if best is not None else responses[-1] + + +def _merge_response( + base: httpx.Response, + *, + headers_from: httpx.Response, + elapsed: timedelta, + url: str | httpx.URL | None = None, +) -> httpx.Response: + """Fold several responses into one: a shallow copy of ``base`` whose + ``.headers`` are rebuilt as a fresh ``httpx.Headers`` from ``headers_from``, + ``.elapsed`` set to ``elapsed``, and ``.url`` overridden when ``url`` is + given. ``base`` and ``headers_from`` are never mutated, and the fresh + ``httpx.Headers`` means downstream mutations don't back-propagate into any + underlying response — so callers may re-fold idempotently. This is the one + low-level merge behind both pagination (:func:`_paginate`) and the chunked / + fan-out aggregation (:func:`_combine_chunk_responses`).""" + merged = copy.copy(base) + merged.headers = httpx.Headers(headers_from.headers) + merged.elapsed = elapsed + if url is not None: + _set_response_url(merged, url) + return merged + + def _combine_chunk_responses( responses: list[httpx.Response], canonical_url: str | None ) -> httpx.Response: @@ -570,8 +620,9 @@ def _combine_chunk_responses( Fold per-sub-request responses into a single aggregated response. For a multi-response input, returns a shallow copy of - ``responses[0]`` with ``.headers`` set to the last response's (so - ``x-ratelimit-remaining`` reflects current state), ``.elapsed`` set + ``responses[0]`` with ``.headers`` set to those of the most-depleted + response (lowest ``x-ratelimit-remaining`` — the quota actually left + after the fan-out; see :func:`_lowest_remaining`), ``.elapsed`` set to total wall-clock across every response, and ``.url`` set to the canonical original-query URL (when supplied) so ``BaseMetadata`` reflects the user's full request rather than the first chunk. @@ -605,20 +656,15 @@ def _combine_chunk_responses( if len(responses) == 1 and canonical_url is None: return responses[0] - # ``copy.copy`` lets repeated calls re-sum elapsed from scratch - # rather than re-mutating ``responses[0]`` in place. The headers - # dict is then rewrapped in a fresh ``httpx.Headers`` so the - # aggregate's headers don't share identity with — or leak mutations - # back into — any underlying response on ``ChunkedCall._chunks``. - head = copy.copy(responses[0]) - if len(responses) > 1: - head.headers = httpx.Headers(responses[-1].headers) - head.elapsed = sum( - (_safe_elapsed(r) for r in responses[1:]), - start=_safe_elapsed(responses[0]), - ) - else: - head.headers = httpx.Headers(responses[0].headers) - if canonical_url is not None: - _set_response_url(head, canonical_url) - return head + # Headers come from the most-depleted response (lowest quota left after a + # concurrent fan-out; ``_lowest_remaining`` returns the lone response as-is + # for a single-element list). ``_merge_response`` re-sums elapsed onto a + # fresh copy, so repeated calls (e.g. via ``ChunkedCall.partial_response`` + # during resume) stay idempotent. + elapsed = sum((_safe_elapsed(r) for r in responses), start=timedelta()) + return _merge_response( + responses[0], + headers_from=_lowest_remaining(responses), + elapsed=elapsed, + url=canonical_url, + ) diff --git a/dataretrieval/utils.py b/dataretrieval/utils.py index e6cce009..8d28ee86 100644 --- a/dataretrieval/utils.py +++ b/dataretrieval/utils.py @@ -4,9 +4,12 @@ from __future__ import annotations +import os import warnings -from collections.abc import Iterable -from typing import Any +from collections.abc import Callable, Iterable, Iterator +from contextlib import contextmanager +from contextvars import ContextVar +from typing import Any, Generic, TypeVar import httpx import pandas as pd @@ -28,6 +31,69 @@ "timeout": httpx.Timeout(60.0, connect=10.0), } +_T = TypeVar("_T") + + +class Ambient(Generic[_T]): + """A :class:`~contextvars.ContextVar` paired with a scoping contextmanager. + + Bundles the var and its set/reset-token dance into one object, so an ambient + value needs a single declaration instead of a ``var`` + setter-function pair. + Read the current value with :meth:`get`; set it for a ``with`` block by + *calling* the instance — the previous value is restored on exit (and can't + leak into a later call the way a hand-written ``try/finally`` can when its + ``reset`` is dropped):: + + _base_url = Ambient("ogc_base_url", DEFAULT) + with _base_url(other): # scoped to the block + _base_url.get() # -> other + """ + + def __init__(self, name: str, default: _T) -> None: + self._var: ContextVar[_T] = ContextVar(name, default=default) + + def get(self) -> _T: + """The current value — the default outside any active scope.""" + return self._var.get() + + @contextmanager + def __call__(self, value: _T) -> Iterator[None]: + """Set the value for the duration of the ``with`` block.""" + token = self._var.set(value) + try: + yield + finally: + self._var.reset(token) + + +def _default_headers() -> dict[str, str]: + """Build the default HTTP headers for a USGS web-API request. + + Always sets a descriptive ``User-Agent`` plus ``Accept`` / + ``Accept-Encoding`` and ``lang``. If the ``API_USGS_PAT`` environment + variable is set, its value is added as the ``X-Api-Key`` header — a USGS + personal access token raises the request rate limit. + + Shared by the OGC engine (:mod:`dataretrieval.ogc`), the Water Data getters + (:mod:`dataretrieval.waterdata`), and :mod:`dataretrieval.wateruse`, so the + request identity is consistent across every USGS API the package talks to. + + Returns + ------- + dict[str, str] + Headers suitable for an ``httpx`` request against a USGS API. + """ + headers = { + "Accept-Encoding": "compress, gzip", + "Accept": "application/json", + "User-Agent": f"python-dataretrieval/{dataretrieval.__version__}", + "lang": "en-US", + } + token = os.getenv("API_USGS_PAT") + if token: + headers["X-Api-Key"] = token + return headers + def to_str(listlike: object, delimiter: str = ",") -> str | None: """Translates list-like objects into strings. @@ -303,26 +369,38 @@ def _get(url: str | httpx.URL, **kwargs: Any) -> httpx.Response: raise _network_error(url, exc) from exc -def _raise_for_status(response: httpx.Response) -> None: +def _raise_for_status( + response: httpx.Response, + *, + detail_from: Callable[[httpx.Response], str | None] | None = None, +) -> None: """Raise the typed :class:`DataRetrievalError` for an HTTP error response; return ``None`` on success. - Shared by the legacy :func:`query` path (and ``streamstats``). - Delegates the status-to-type mapping to + Shared by the legacy :func:`query` path (and ``streamstats`` / + ``wateruse``). Delegates the status-to-type mapping to :func:`dataretrieval.exceptions.error_for_status`, except a too-long-URL status (413 / 414): that gets the same actionable "split your query" remediation as the client-side over-long-URL case below, rather than a bare ``HTTP 414`` (both still raise :class:`~dataretrieval.exceptions.URLTooLong`). + + ``detail_from``, when given, is called *only on an error response* to pull an + API-specific detail string (e.g. a JSON error envelope's message) out of the + body; a truthy result is appended to the raised message. This lets callers + surface their API's error wording without re-implementing the status-to-type + mapping and message format. """ status = response.status_code if status < 400: return if status in (413, 414): raise _url_too_long_error(f"API response reason: {response.reason_phrase}") - raise error_for_status( - status, - f"HTTP {status} {response.reason_phrase}".rstrip() + f" (URL: {response.url})", - ) + message = f"HTTP {status} {response.reason_phrase}".rstrip() + detail = detail_from(response) if detail_from is not None else None + if detail: + message += f": {detail}" + message += f" (URL: {response.url})" + raise error_for_status(status, message) def query( diff --git a/dataretrieval/waterdata/stats.py b/dataretrieval/waterdata/stats.py index 72e5a884..5514bb50 100644 --- a/dataretrieval/waterdata/stats.py +++ b/dataretrieval/waterdata/stats.py @@ -18,7 +18,6 @@ from dataretrieval.ogc.engine import ( BASE_URL, - _default_headers, _paginate, _run_sync, ) @@ -27,7 +26,7 @@ _attach_coordinates, _empty_feature_frame, ) -from dataretrieval.utils import BaseMetadata +from dataretrieval.utils import BaseMetadata, _default_headers # ``_handle_nesting``'s geopandas branch calls ``gpd.GeoDataFrame.from_features`` # directly, so this module needs its own bound ``gpd`` name. Import it under the diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index 65f9ea2f..5cdbc320 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -35,12 +35,10 @@ OGC_API_URL, OgcDialect, _as_str_list, - _check_id_format, _check_monitoring_location_id, _check_ogc_requests, _construct_api_requests, _construct_cql_request, - _default_headers, _next_req_url, _normalize_str_iterable, _paginate, @@ -68,7 +66,7 @@ from dataretrieval.ogc.shaping import ( _finalize_ogc as _engine_finalize_ogc, ) -from dataretrieval.utils import BaseMetadata +from dataretrieval.utils import BaseMetadata, _default_headers from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -363,7 +361,6 @@ def wrapper(*args: Any, **kwargs: Any) -> _R: "_accept_legacy_kwargs", "_arrange_cols", "_as_str_list", - "_check_id_format", "_check_monitoring_location_id", "_check_ogc_requests", "_check_profiles", diff --git a/dataretrieval/wateruse.py b/dataretrieval/wateruse.py new file mode 100644 index 00000000..635a97fb --- /dev/null +++ b/dataretrieval/wateruse.py @@ -0,0 +1,415 @@ +"""Retrieve USGS water-use data from the National Water Availability +Assessment Data Companion (NWDC). + +The NWDC web services provide national-scale, USGS-modeled water-use data that +underlie the `USGS National Water Availability Assessment +`_. Estimates are served on a HUC12 +(12-digit hydrologic unit) spatial grid and can be queried for any county, +state, or hydrologic unit. This is the modern replacement for the defunct +legacy NWIS water-use service (``nwis.get_water_use``). + +Unlike the main Water Data getters (:mod:`dataretrieval.waterdata`) and NGWMN +(:mod:`dataretrieval.ngwmn`), the NWDC is a plain CSV REST service rather than +an OGC API Features collection. This module supplies the NWDC-specific bits — +request building, CSV parsing, the ``Link``-header cursor, and the ``{detail}`` +error envelope — but reuses the OGC engine's generic, API-agnostic pagination +and sync-from-async plumbing (:func:`~dataretrieval.ogc.engine._paginate` and +:func:`~dataretrieval.ogc.engine._run_sync`) rather than re-implementing it. It +follows the same conventions: shared request headers +(:func:`~dataretrieval.utils._default_headers`), the typed +:class:`~dataretrieval.exceptions.DataRetrievalError` taxonomy, and a +``(DataFrame, BaseMetadata)`` return. + +See https://api.water.usgs.gov/docs/nwaa-data/ for the API reference and +https://water.usgs.gov/nwaa-data/ for the catalog of available models and +variables. + +Examples +-------- +.. code-block:: python + + from dataretrieval import wateruse + + # Monthly public-supply withdrawals for Rhode Island, 2020 onward. + df, md = wateruse.get_wateruse( + model="wu-public-supply-wd", + variable=["pswdtot", "pswdgw", "pswdsw"], + state="RI", + start_date="2020-01", + time_resolution="monthly", + ) + +""" + +from __future__ import annotations + +import asyncio +import io +from collections.abc import Callable, Iterable +from typing import Any + +import httpx +import pandas as pd + +from dataretrieval.codes.states import to_state +from dataretrieval.exceptions import DataRetrievalError +from dataretrieval.ogc.engine import _paginate, _run_sync +from dataretrieval.ogc.planning import _combine_chunk_frames, _combine_chunk_responses +from dataretrieval.utils import ( + HTTPX_DEFAULTS, + BaseMetadata, + _default_headers, + _raise_for_status, + to_str, +) + +WATERUSE_URL = "https://api.water.usgs.gov/nwaa-data/data" + +#: Water-use models (categories) served by the NWDC. The catalog at +#: https://water.usgs.gov/nwaa-data/ lists the variables available within each. +MODELS = ( + "wu-public-supply-wd", # public-supply withdrawals + "wu-public-supply-cu", # public-supply consumptive use + "wu-thermoelectric", # thermoelectric-power water use + "wu-irrigation-wd", # irrigation withdrawals + "wu-irrigation-cu", # irrigation consumptive use +) + +#: Temporal resolutions: monthly, annual calendar year, annual water year. +TIME_RESOLUTIONS = ("monthly", "annualcy", "annualwy") + +#: Maximum locations fetched concurrently when a list of state/county/huc +#: selectors is fanned out (one request per location). Kept conservative +#: because this module intentionally carries no request backoff/retry; the +#: NWDC tolerates this level of concurrency without rate-limit errors (verified +#: by stress test). Set ``wateruse.MAX_CONCURRENT_REQUESTS = 1`` for serial. +MAX_CONCURRENT_REQUESTS = 4 + +# Page responses carry the HUC12 identifier in this column; it must stay a +# string so leading zeros (e.g. "010900020502") survive the round trip. +_HUC12_COLUMN = "huc12_id" + + +def get_wateruse( + model: str, + variable: str | Iterable[str] | None = None, + state: str | int | Iterable[str | int] | None = None, + county: str | Iterable[str] | None = None, + huc: str | Iterable[str] | None = None, + time_resolution: str | None = None, + start_date: str | None = None, + end_date: str | None = None, + intersection: str = "overlap", + limit: int = 600, + ssl_check: bool = True, +) -> tuple[pd.DataFrame, BaseMetadata]: + """Get USGS water-use data from the NWDC web service. + + Retrieves modeled water-use estimates from the USGS National Water + Availability Assessment Data Companion. The area is given as exactly one of + ``state``, ``county``, or ``huc``; results are always returned on a HUC12 + grid, in a long (tidy) frame with one row per HUC12 and time step. Large + areas (e.g. a whole region or a populous state) are served across multiple + pages, which this function follows transparently and concatenates into one + frame. + + Each selector also accepts a list of values. The NWDC queries one area per + request, so a list is fanned out into one request per value — up to + :data:`MAX_CONCURRENT_REQUESTS` in parallel — and the results are + concatenated in the order given. + + Parameters + ---------- + model : string + Water-use category to query. See :data:`MODELS` for the available + options (e.g. ``"wu-public-supply-wd"``). The full catalog of models + and their variables is at https://water.usgs.gov/nwaa-data/. + variable : string or iterable of strings, optional + One or more variable IDs within ``model`` (e.g. ``"pswdtot"`` for total + public-supply withdrawals, or ``["pswdgw", "pswdsw"]`` for the + groundwater and surface-water components). Multiple variables are + comma-joined into a single request. The service requires at least one + variable; omitting it returns a 400 listing the model's valid variable + IDs (surfaced as a :class:`~dataretrieval.exceptions.DataRetrievalError`). + state : string, int, or iterable, optional + One or more US states/territories to query. Each accepts a full name + (``"Wisconsin"``), a two-letter postal code (``"WI"``), or a two-digit + ANSI/FIPS code (``"55"`` or ``55``), mirroring + :func:`dataretrieval.ngwmn.get_sites`. + county : string or iterable, optional + One or more five-digit county FIPS codes — state FIPS + county FIPS, + e.g. ``"55025"`` for Dane County, Wisconsin. + huc : string or iterable, optional + One or more hydrologic unit codes. Each code's level is taken from its + length: a 2-digit code queries a HUC2 region, 8-digit a HUC8 subbasin, + 12-digit a single HUC12, and so on (even lengths 2-12, e.g. ``"04"``, + ``"07070005"``, ``"010900020502"``). + + Provide exactly one of ``state``, ``county``, or ``huc`` (each may be a + single value or a list). + time_resolution : string, optional + Temporal resolution: ``"monthly"``, ``"annualcy"`` (annual, calendar + year), or ``"annualwy"`` (annual, water year). See + :data:`TIME_RESOLUTIONS`. + start_date : string, optional + Start of the query window, formatted ``"YYYY"`` for annual data or + ``"YYYY-MM"`` for monthly data. + end_date : string, optional + End of the query window, in the same format as ``start_date``. + intersection : string, optional + How to select HUC12s that straddle the queried-area boundary: + ``"overlap"`` (any overlap, the default) or ``"envelop"`` (fully + enclosed). + limit : int, optional + Maximum number of HUC12s returned per page. Queries spanning more than + ``limit`` HUC12s are split across pages and reassembled. Default 600. + ssl_check : bool, optional + If True (default), verify SSL certificates; set False to skip + verification (e.g. behind a TLS-intercepting proxy). + + Returns + ------- + df : ``pandas.DataFrame`` + Water-use estimates in long form: a ``huc12_id`` column (string, + leading zeros preserved), a time column (``year_month`` for monthly + data or ``year`` for annual data), and one value column per requested + variable (suffixed with its unit, e.g. ``pswdtot_mgd`` for million + gallons per day). + md : :class:`dataretrieval.utils.BaseMetadata` + Metadata describing the request (URL, query time, response headers). + + Raises + ------ + ValueError + If not exactly one of ``state``, ``county``, or ``huc`` is given, or a + given selector is malformed (an unrecognized state, a county code that + is not five digits, or a HUC of invalid length). + DataRetrievalError + On an HTTP error response, the typed subclass for the status (see + :func:`dataretrieval.exceptions.error_for_status`); or + :class:`~dataretrieval.exceptions.NetworkError` on a connection-level + failure (timeout, DNS). + + Examples + -------- + .. doctest:: + :skipif: True # network + + >>> from dataretrieval import wateruse + >>> df, md = wateruse.get_wateruse( + ... model="wu-public-supply-wd", + ... variable=["pswdtot", "pswdgw", "pswdsw"], + ... state="RI", + ... start_date="2020-01", + ... time_resolution="monthly", + ... ) + + """ + # The public parameters are idiomatic snake_case (consistent with + # ``waterdata.get_samples``); the NWDC service expects compact lowercase + # query names, so map to those here as the request is built. + base_params: dict[str, Any] = { + "format": "csv", + "model": model, + "variable": to_str(variable), + "timeres": time_resolution, + "startdate": start_date, + "enddate": end_date, + "intersection": intersection, + "limit": limit, + } + # Drop params the caller left unset; the service rejects empty values. + base_params = {k: v for k, v in base_params.items() if v is not None} + + # The NWDC queries one location per request, so fan a multi-value selector + # out into one request per location, each paginated by the OGC engine's + # shared pager (``_paginate``), and concatenate the results. + headers = _default_headers() + requests = [ + httpx.Request( + "GET", + WATERUSE_URL, + params={**base_params, "location": location}, + headers=headers, + ) + for location in _resolve_locations(state, county, huc) + ] + # ``_run_sync`` drives the async fan-out via an anyio portal, so it is safe + # even inside an already-running event loop (e.g. a Jupyter notebook). + # ``error_url`` is the host reported in any connection-error message (this + # module builds its own requests, so it has no OGC request-builder base). + df, response = _run_sync( + lambda: _fan_out(requests, headers, ssl_check), + service="wateruse", + error_url=WATERUSE_URL, + ) + return df, BaseMetadata(response) + + +# Valid HUC code lengths (digits) → the hydrologic-unit level they query. +_HUC_LENGTHS = (2, 4, 6, 8, 10, 12) + +# Maps each selector to the NWDC ``location=:`` value(s) it produces. +# A value may be a single code or a list; ``_as_list`` normalizes both (``state`` +# additionally normalizes to the two-letter postal code, and ``to_state`` may +# itself return a scalar or list, which ``_as_list`` flattens the same way). +# Since NWDC takes one location per request, a list value fans out — one request +# per location (see :func:`_fan_out`). +_LOCATION_BUILDERS: dict[str, Callable[[Any], list[str]]] = { + "state": lambda v: [f"stateCd:{c}" for c in _as_list(to_state(v, to="postal"))], + "county": lambda v: [f"countyCd:{_validate_county(c)}" for c in _as_list(v)], + "huc": lambda v: [f"huc{len(c)}:{c}" for c in map(_validate_huc, _as_list(v))], +} + + +def _resolve_locations( + state: str | int | Iterable[str | int] | None, + county: str | Iterable[str] | None, + huc: str | Iterable[str] | None, +) -> list[str]: + """Build the NWDC ``location=:`` value(s) from the selectors. + + Exactly one of ``state`` / ``county`` / ``huc`` must be given; each may be a + single value or a list. ``state`` is normalized to the two-letter postal + code ``stateCd`` requires; ``county`` is a five-digit FIPS code; and a + ``huc`` code's length selects its level (``huc2`` … ``huc12``). Returns one + location string per value — the caller issues one request per location. + """ + selected = { + name: value + for name, value in (("state", state), ("county", county), ("huc", huc)) + if value is not None + } + if len(selected) != 1: + raise ValueError( + "Specify exactly one of state, county, or huc " + f"(got: {', '.join(selected) or 'none'})." + ) + [(name, value)] = selected.items() + locations = _LOCATION_BUILDERS[name](value) + if not locations: + raise ValueError( + "The chosen location selector is empty; pass at least one value." + ) + return locations + + +def _as_list(value: object) -> list[Any]: + """A scalar becomes a one-element list; any non-string iterable (list, + tuple, Series, ndarray, generator) is materialized to a list. A string is + treated as a scalar so it isn't exploded into characters.""" + if isinstance(value, Iterable) and not isinstance(value, str): + return list(value) + return [value] + + +def _validate_county(value: object) -> str: + """Validate and normalize a five-digit state+county FIPS code.""" + code = str(value).strip() + if not (code.isdigit() and len(code) == 5): + raise ValueError( + "county must be a five-digit state+county FIPS code " + f"(e.g. '55025'), got {value!r}." + ) + return code + + +def _validate_huc(value: object) -> str: + """Validate a HUC code (even length 2-12 digits; level set by length).""" + code = str(value).strip() + if not (code.isdigit() and len(code) in _HUC_LENGTHS): + raise ValueError( + "huc must be a hydrologic unit code of even length 2-12 digits " + f"(e.g. '04', '07070005', '010900020502'), got {value!r}." + ) + return code + + +async def _fan_out( + requests: list[httpx.Request], headers: dict[str, str], ssl_check: bool +) -> tuple[pd.DataFrame, httpx.Response]: + """Fetch every request (each paginated) concurrently over one shared client. + + Each request is paginated by the engine's + :func:`~dataretrieval.ogc.engine._paginate` with NWDC strategies: parse a CSV + page and read its ``Link`` header cursor (``parse``), follow that cursor + (``follow``), and raise the typed error carrying the NWDC ``detail`` + (``raise_for_status``). Concurrency is bounded by a semaphore at + :data:`MAX_CONCURRENT_REQUESTS`, and ``asyncio.gather`` preserves input + order, so the concatenation is deterministic. The shared + :class:`httpx.AsyncClient` keeps connections alive across pages and requests. + """ + + def parse(response: httpx.Response) -> tuple[pd.DataFrame, str | None]: + return _read_csv_page(response), _next_page_url(response) + + async def follow(cursor: str, sess: httpx.AsyncClient) -> httpx.Response: + return await sess.get(cursor, headers=headers) + + def raise_for_status(response: httpx.Response) -> None: + _raise_for_status(response, detail_from=_nwdc_error_detail) + + async with httpx.AsyncClient(verify=ssl_check, **HTTPX_DEFAULTS) as client: + semaphore = asyncio.Semaphore(max(1, MAX_CONCURRENT_REQUESTS)) + + async def _one(request: httpx.Request) -> tuple[pd.DataFrame, httpx.Response]: + async with semaphore: + return await _paginate( + request, + parse_response=parse, + follow_up=follow, + client=client, + raise_for_status=raise_for_status, + ) + + results = await asyncio.gather(*(_one(req) for req in requests)) + + # Reuse the engine's combine helpers: drop empty frames and concat, and fold + # the per-location responses into one (lowest-remaining rate-limit headers + + # cumulative elapsed), keeping the first request's URL as the query identity. + frames = [frame for frame, _ in results] + responses = [resp for _, resp in results] + return _combine_chunk_frames(frames), _combine_chunk_responses( + responses, str(requests[0].url) + ) + + +def _read_csv_page(response: httpx.Response) -> pd.DataFrame: + """Parse one CSV page; ``huc12_id`` stays a string to keep leading zeros.""" + try: + return pd.read_csv(io.BytesIO(response.content), dtype={_HUC12_COLUMN: str}) + except pd.errors.EmptyDataError as exc: + # NWDC normally signals "no data" with a 400 (handled above) or rows of + # zeros, never an empty body — but keep the typed-error contract if it + # ever returns one rather than leaking a bare pandas exception. + raise DataRetrievalError( + f"NWDC returned an empty response body (URL: {response.url})." + ) from exc + + +def _next_page_url(response: httpx.Response) -> str | None: + """Return the absolute URL of the next page, or None if this is the last. + + Reads the standard ``Link: <...>; rel="next"`` header (parsed by httpx into + ``response.links``). A next link served against the bare ``water.usgs.gov`` + host is normalized to the public ``api.water.usgs.gov`` gateway so the + follow-up request reaches the API. + """ + url = response.links.get("next", {}).get("url") + if not url: + return None + return url.replace("https://water.usgs.gov", "https://api.water.usgs.gov", 1) + + +def _nwdc_error_detail(response: httpx.Response) -> str | None: + """Pull the ``detail`` message out of an NWDC JSON error envelope, if any. + + The NWDC reports errors as ``{"detail": "Invalid model name: ..."}``. Passed + to :func:`~dataretrieval.utils._raise_for_status` as ``detail_from`` so the + service's wording surfaces in the typed error message. + """ + try: + body = response.json() + except ValueError: + return None + return body.get("detail") if isinstance(body, dict) else None diff --git a/demos/USGS_WaterUse_Examples.ipynb b/demos/USGS_WaterUse_Examples.ipynb new file mode 100644 index 00000000..0017f048 --- /dev/null +++ b/demos/USGS_WaterUse_Examples.ipynb @@ -0,0 +1,228 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# USGS Water-Use Data (NWDC)\n", + "\n", + "The USGS [National Water Availability Assessment Data Companion\n", + "(NWDC)](https://water.usgs.gov/nwaa-data/) serves national-scale, *modeled*\n", + "water-use estimates that underlie the USGS National Water Availability\n", + "Assessment. Estimates are produced on a **HUC12** (12-digit hydrologic unit)\n", + "grid and can be queried for any county, state, or hydrologic unit. They are\n", + "the modern replacement for the retired legacy NWIS water-use service.\n", + "\n", + "`dataretrieval` exposes the service through a single function,\n", + "`wateruse.get_wateruse`, which returns a tidy `pandas.DataFrame` plus a\n", + "metadata object. Available **models** (categories) include:\n", + "\n", + "| model | description |\n", + "| --- | --- |\n", + "| `wu-public-supply-wd` | public-supply withdrawals |\n", + "| `wu-public-supply-cu` | public-supply consumptive use |\n", + "| `wu-irrigation-wd` | irrigation withdrawals |\n", + "| `wu-irrigation-cu` | irrigation consumptive use |\n", + "| `wu-thermoelectric` | thermoelectric-power water use |\n", + "\n", + "Each model exposes its own set of **variables**; see the\n", + "[NWDC data catalog](https://water.usgs.gov/nwaa-data/) for the full list.\n", + "\n", + "## A motivating question\n", + "\n", + "> *Where does Wisconsin's public water supply come from — groundwater or\n", + "> surface water — and when through the year is demand highest?*\n", + "\n", + "Answering it is a typical water-use workflow: pull a state's monthly\n", + "withdrawals split by source, aggregate the HUC12 grid to a statewide time\n", + "series, and look at the seasonal cycle and the groundwater/surface-water\n", + "split." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib inline\n", + "\n", + "import matplotlib.pyplot as plt\n", + "import pandas as pd\n", + "\n", + "from dataretrieval import wateruse" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Retrieve the data\n", + "\n", + "We request three variables for the public-supply-withdrawals model:\n", + "`pswdtot` (total), `pswdgw` (groundwater source), and `pswdsw` (surface-water\n", + "source), for every HUC12 in Wisconsin, monthly, for calendar year 2020.\n", + "\n", + "Wisconsin spans more HUC12s than fit on a single page, so the service returns\n", + "the result across several pages — `get_wateruse` follows the pagination links\n", + "and stitches the pages into one frame for you." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df, md = wateruse.get_wateruse(\n", + " model=\"wu-public-supply-wd\",\n", + " variable=[\"pswdtot\", \"pswdgw\", \"pswdsw\"],\n", + " state=\"WI\",\n", + " start_date=\"2020-01\",\n", + " end_date=\"2020-12\",\n", + " time_resolution=\"monthly\",\n", + ")\n", + "\n", + "print(f\"{len(df):,} rows across {df['huc12_id'].nunique():,} HUC12 watersheds\")\n", + "df.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The frame is in **long (tidy) form**: one row per HUC12 and month. The\n", + "`huc12_id` column is kept as a string so leading zeros are preserved, and each\n", + "value column carries its unit — here `_mgd`, million gallons per day." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Aggregate to a statewide monthly series\n", + "\n", + "Summing across every HUC12 gives the statewide withdrawal for each month." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "statewide = df.groupby(\"year_month\")[[\"pswdtot_mgd\", \"pswdgw_mgd\", \"pswdsw_mgd\"]].sum()\n", + "statewide.index = pd.to_datetime(statewide.index)\n", + "statewide" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. The seasonal demand cycle\n", + "\n", + "Public-supply withdrawals peak in summer, when outdoor use (lawns, gardens,\n", + "recreation) adds to baseline indoor demand." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "fig, ax = plt.subplots(figsize=(9, 4))\n", + "ax.plot(statewide.index, statewide[\"pswdtot_mgd\"], marker=\"o\", color=\"#1f77b4\")\n", + "ax.set_title(\"Wisconsin public-supply withdrawals, 2020\")\n", + "ax.set_ylabel(\"Withdrawal (million gallons/day)\")\n", + "ax.set_xlabel(\"Month\")\n", + "ax.grid(True, alpha=0.3)\n", + "fig.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Groundwater vs. surface water\n", + "\n", + "Because total withdrawal is the sum of its groundwater and surface-water\n", + "components, a stacked area shows how each source contributes through the year." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "gw_share = statewide[\"pswdgw_mgd\"].sum() / statewide[\"pswdtot_mgd\"].sum()\n", + "print(f\"Groundwater supplies {gw_share:.0%} of Wisconsin's public water supply\")\n", + "\n", + "fig, ax = plt.subplots(figsize=(9, 4))\n", + "ax.stackplot(\n", + " statewide.index,\n", + " statewide[\"pswdgw_mgd\"],\n", + " statewide[\"pswdsw_mgd\"],\n", + " labels=[\"Groundwater\", \"Surface water\"],\n", + " colors=[\"#8c6d31\", \"#1f77b4\"],\n", + " alpha=0.85,\n", + ")\n", + "ax.set_title(\"Source of Wisconsin public-supply withdrawals, 2020\")\n", + "ax.set_ylabel(\"Withdrawal (million gallons/day)\")\n", + "ax.set_xlabel(\"Month\")\n", + "ax.legend(loc=\"upper left\")\n", + "fig.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Notes\n", + "\n", + "- **Spatial resolution.** Estimates are always returned on the HUC12 grid,\n", + " regardless of the area you query. Joining `huc12_id` to a HUC12\n", + " boundary layer (for example with `geopandas`) lets you map the results.\n", + "- **Pick one selector.** Use exactly one of `state`, `county` (5-digit FIPS),\n", + " or `huc`. `state` accepts a name, postal code, or FIPS; a `huc` code's length\n", + " sets its level (`huc=\"04\"` → HUC2, `huc=\"07070005\"` → HUC8). Each selector\n", + " also accepts a list (e.g. `state=[\"WI\", \"MN\"]`), fanned out into one request\n", + " per area and concatenated.\n", + "- **Large areas paginate.** A query spanning more than `limit` HUC12s (600 by\n", + " default) is split across pages; `get_wateruse` follows the links and returns\n", + " a single concatenated frame. Whole-region queries (e.g. `huc=\"04\"`)\n", + " may return many pages and take longer.\n", + "- **Annual data.** Set `time_resolution=\"annualcy\"` (calendar year) or `\"annualwy\"`\n", + " (water year) and use four-digit years for `start_date`/`end_date`; the time\n", + " column comes back as `year` instead of `year_month`.\n", + "- **Other models.** Swap `model` and `variable` to retrieve irrigation\n", + " (`wu-irrigation-wd`) or thermoelectric (`wu-thermoelectric`) water use. See\n", + " the [NWDC data catalog](https://water.usgs.gov/nwaa-data/) for the variables\n", + " available in each." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst index 0d5b9b6d..13c44963 100644 --- a/docs/source/reference/index.rst +++ b/docs/source/reference/index.rst @@ -14,4 +14,5 @@ API reference streamstats utils waterdata + wateruse wqp diff --git a/docs/source/reference/wateruse.rst b/docs/source/reference/wateruse.rst new file mode 100644 index 00000000..db4e4962 --- /dev/null +++ b/docs/source/reference/wateruse.rst @@ -0,0 +1,7 @@ +.. _wateruse: + +dataretrieval.wateruse +---------------------- + +.. automodule:: dataretrieval.wateruse + :members: diff --git a/tests/waterdata_chunking_test.py b/tests/waterdata_chunking_test.py index ca3be951..5a946946 100644 --- a/tests/waterdata_chunking_test.py +++ b/tests/waterdata_chunking_test.py @@ -40,7 +40,6 @@ from dataretrieval.ogc import chunking as _chunking from dataretrieval.ogc import retry as _retry_mod from dataretrieval.ogc.chunking import ( - _QUOTA_HEADER, ChunkedCall, _chunked_client, get_active_client, @@ -55,6 +54,7 @@ _LIST_SEP, _NEVER_CHUNK, _OR_SEP, + _QUOTA_HEADER, ChunkPlan, _combine_chunk_frames, _combine_chunk_responses, @@ -1040,7 +1040,8 @@ def test_combine_chunk_responses_returns_independent_headers(): ) head = _combine_chunk_responses([r0, r1], canonical_url=None) - # Aggregate carries the last chunk's headers... + # Aggregate carries a chunk's headers (here the last, as the fallback when + # neither reports a rate limit)... assert head.headers["X-Foo"] == "1" # ...but mutating the aggregate must not back-propagate. head.headers["X-Trace-Id"] = "abc" @@ -1048,6 +1049,24 @@ def test_combine_chunk_responses_returns_independent_headers(): assert "X-Trace-Id" not in r0.headers +def test_combine_chunk_responses_surfaces_lowest_remaining(): + """``x-ratelimit-remaining`` reports the LOWEST any sub-request saw — the + quota actually left after the fan-out — not the last-by-index, which under + concurrency need not be the response the server processed last.""" + r0 = mock.Mock( + elapsed=datetime.timedelta(seconds=0.1), + headers={"x-ratelimit-remaining": "5"}, # lowest, but first by index + url="u0", + ) + r1 = mock.Mock( + elapsed=datetime.timedelta(seconds=0.2), + headers={"x-ratelimit-remaining": "99"}, # last by index, but higher + url="u1", + ) + head = _combine_chunk_responses([r0, r1], canonical_url=None) + assert head.headers["x-ratelimit-remaining"] == "5" + + def test_paginate_terminates_on_empty_string_cursor(): """``_paginate``'s loop predicate is ``while cursor is not None``. Parse-response wrappers in ``_walk_pages`` / ``stats.get_data`` diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py index 06289ce4..12727fc6 100644 --- a/tests/waterdata_test.py +++ b/tests/waterdata_test.py @@ -1002,8 +1002,8 @@ def test_get_daily_malformed_id_raises(self): def test_per_item_format_check_in_list(self): """The AGENCY-ID format check runs on EVERY element of an iterable, not just the first. Regression guard against a - future ``_check_id_format`` loop that bails after one valid - item or only checks the head.""" + future ``_check_monitoring_location_id`` loop that bails after one + valid item or only checks the head.""" with pytest.raises(ValueError, match="Invalid monitoring_location_id"): _check_monitoring_location_id(["USGS-01646500", "badformat"]) diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index 4d568d1f..b6fd2984 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -379,7 +379,7 @@ def test_next_req_url_stops_when_no_features(): def test_walk_pages_does_not_mutate_initial_response(): """The aggregated response returned from ``_walk_pages`` is built - via ``_aggregate_paginated_response``, which returns a fresh copy. + via ``_merge_response``, which returns a fresh copy. Any caller that inspected ``initial_response.headers`` / ``.elapsed`` before pagination completed (a Session response hook, a logging middleware) must continue to see the original first-page diff --git a/tests/wateruse_test.py b/tests/wateruse_test.py new file mode 100644 index 00000000..031aad74 --- /dev/null +++ b/tests/wateruse_test.py @@ -0,0 +1,416 @@ +"""Offline tests for :mod:`dataretrieval.wateruse`. + +All HTTP is mocked with ``pytest-httpx``; no live calls (per AGENTS.md). +""" + +import re +from urllib.parse import parse_qs, urlsplit + +import httpx +import pandas as pd +import pytest + +import dataretrieval +from dataretrieval import wateruse +from dataretrieval.utils import BaseMetadata +from dataretrieval.wateruse import _next_page_url, _resolve_locations, get_wateruse + +# Match the NWDC endpoint regardless of query string, so assertions can drill +# into the captured params without coupling registration to param order. +WU_RE = re.compile(r"^https://api\.water\.usgs\.gov/nwaa-data/data(\?.*)?$") + +# A single-page monthly CSV: two HUC12s (one with a leading zero), three months. +_CSV_PAGE = """\ +huc12_id,year_month,pswdgw_mgd,pswdsw_mgd,pswdtot_mgd +010900020502,2020-01,0.0,0.8313625,0.8313625 +010900020502,2020-02,0.0,0.8977986,0.8977986 +180600060101,2020-01,1.5,0.5,2.0 +""" + +# Two pages used for pagination tests; each page is its own CSV (own header). +_CSV_P1 = """\ +huc12_id,year_month,pswdtot_mgd +010900020502,2020-01,0.8313625 +010900020503,2020-01,0.0 +""" +_CSV_P2 = """\ +huc12_id,year_month,pswdtot_mgd +010900020504,2020-01,1.25 +""" + + +def test_get_wateruse_single_page(httpx_mock): + """Happy path: CSV parsed to a long frame; returns (df, BaseMetadata).""" + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE) + + df, md = get_wateruse( + model="wu-public-supply-wd", + variable=["pswdtot", "pswdgw", "pswdsw"], + state="RI", + start_date="2020-01", + time_resolution="monthly", + ) + + assert isinstance(df, pd.DataFrame) + assert isinstance(md, BaseMetadata) + assert list(df.columns) == [ + "huc12_id", + "year_month", + "pswdgw_mgd", + "pswdsw_mgd", + "pswdtot_mgd", + ] + assert len(df) == 3 + + +def test_huc12_id_kept_as_string_with_leading_zero(httpx_mock): + """The HUC12 identifier must not be coerced to int (leading zeros matter).""" + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE) + + df, _ = get_wateruse(model="wu-public-supply-wd", state="RI") + + # String-typed (object or the pandas StringDtype, depending on version), + # never coerced to int — the leading zero must survive. + assert pd.api.types.is_string_dtype(df["huc12_id"]) + assert df["huc12_id"].iloc[0] == "010900020502" + + +def test_variables_are_comma_joined(httpx_mock): + """A list of variables is sent as one comma-joined query parameter.""" + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE) + + get_wateruse( + model="wu-public-supply-wd", + variable=["pswdtot", "pswdgw", "pswdsw"], + state="RI", + ) + + qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query) + assert qs["variable"] == ["pswdtot,pswdgw,pswdsw"] + assert qs["format"] == ["csv"] + + +def test_unset_params_are_dropped(httpx_mock): + """Params left as None are omitted (the service rejects empty values).""" + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE) + + get_wateruse(model="wu-public-supply-wd", state="RI") + + qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query) + assert "enddate" not in qs + assert "variable" not in qs + assert "timeres" not in qs + # Defaulted params are still present. + assert qs["intersection"] == ["overlap"] + assert qs["limit"] == ["600"] + + +def test_snake_case_date_params_map_to_nwdc_wire_names(httpx_mock): + """The public snake_case params (``start_date`` / ``end_date`` / + ``time_resolution``) are sent under the NWDC's compact wire names + (``startdate`` / ``enddate`` / ``timeres``).""" + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE) + + get_wateruse( + model="wu-public-supply-wd", + state="RI", + start_date="2020-01", + end_date="2020-12", + time_resolution="monthly", + ) + + qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query) + assert qs["startdate"] == ["2020-01"] + assert qs["enddate"] == ["2020-12"] + assert qs["timeres"] == ["monthly"] + + +def test_pagination_follows_link_header_and_concatenates(httpx_mock): + """Pages are followed via the ``rel="next"`` Link header and concatenated.""" + httpx_mock.add_response( + method="GET", + url=WU_RE, + text=_CSV_P1, + headers={ + "link": ( + "; rel="next"' + ) + }, + ) + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_P2) + + df, _ = get_wateruse(model="wu-public-supply-wd", state="RI") + + # 2 rows from page 1 + 1 row from page 2, reindexed. + assert len(df) == 3 + assert df["huc12_id"].tolist() == [ + "010900020502", + "010900020503", + "010900020504", + ] + assert list(df.index) == [0, 1, 2] + assert len(httpx_mock.get_requests()) == 2 + # The second request carries the Link's ``skip`` offset, not the originals. + second_qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[1].url)).query) + assert second_qs["skip"] == ["2"] + + +def test_pagination_rewrites_bare_host(httpx_mock): + """A next link on the bare ``water.usgs.gov`` host is routed to the API.""" + httpx_mock.add_response( + method="GET", + url=WU_RE, + text=_CSV_P1, + headers={ + "link": ( + "; rel="next"' + ) + }, + ) + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_P2) + + get_wateruse(model="wu-public-supply-wd", state="RI") + + second = httpx_mock.get_requests()[1] + assert second.url.host == "api.water.usgs.gov" + + +def test_http_error_raises_typed_exception_with_detail(httpx_mock): + """A 4xx response surfaces as a typed error carrying the NWDC ``detail``.""" + httpx_mock.add_response( + method="GET", + url=WU_RE, + status_code=400, + json={"detail": "Invalid model name: bad-model"}, + ) + + with pytest.raises(dataretrieval.DataRetrievalError, match="Invalid model name"): + get_wateruse(model="bad-model", state="RI") + + +def test_empty_response_body_raises_typed_error(httpx_mock): + """An empty 200 body becomes a typed error, not a bare pandas EmptyDataError.""" + httpx_mock.add_response(method="GET", url=WU_RE, text="") + + with pytest.raises(dataretrieval.DataRetrievalError, match="empty response"): + get_wateruse(model="wu-public-supply-wd", state="RI") + + +def test_cyclic_next_link_terminates(httpx_mock): + """A non-advancing/cyclic ``next`` cursor must not loop forever.""" + # Page 1 points to a "next" URL; page 2 points back to that SAME URL. + cyclic = ( + "; rel="next"' + ) + httpx_mock.add_response( + method="GET", url=WU_RE, text=_CSV_P1, headers={"link": cyclic} + ) + httpx_mock.add_response( + method="GET", url=WU_RE, text=_CSV_P2, headers={"link": cyclic} + ) + + df, _ = get_wateruse(model="wu-public-supply-wd", state="RI") + + # Fetches page 1 + the cyclic page once, then breaks on the repeat — it must + # return (not hang) with the two pages collected. + assert len(df) == 3 + assert len(httpx_mock.get_requests()) == 2 + + +def test_uses_shared_default_headers(httpx_mock): + """Requests carry the shared dataretrieval User-Agent (per _default_headers).""" + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE) + + get_wateruse(model="wu-public-supply-wd", state="RI") + + sent = httpx_mock.get_requests()[0] + assert sent.headers["User-Agent"].startswith("python-dataretrieval/") + + +def test_state_selector_builds_location_query(httpx_mock): + """``state=`` is resolved to the wire ``location=stateCd:`` param.""" + httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE) + + get_wateruse(model="wu-public-supply-wd", state="Rhode Island") + + qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query) + assert qs["location"] == ["stateCd:RI"] + + +def test_multiple_states_fan_out_preserves_input_order(httpx_mock): + """A list selector fans out one request per location and concatenates the + results in the order given — even though the requests run concurrently and + may reach the server out of order. Each location is routed to its own + response so attribution is deterministic regardless of arrival order.""" + httpx_mock.add_response( + method="GET", url=re.compile(r".*location=stateCd%3ARI.*"), text=_CSV_P1 + ) + httpx_mock.add_response( + method="GET", url=re.compile(r".*location=stateCd%3AWI.*"), text=_CSV_P2 + ) + + df, _ = get_wateruse(model="wu-public-supply-wd", state=["RI", "Wisconsin"]) + + # RI's rows (_CSV_P1) precede WI's (_CSV_P2) regardless of which request the + # thread pool dispatched first. + assert df["huc12_id"].tolist() == [ + "010900020502", + "010900020503", + "010900020504", + ] + reqs = httpx_mock.get_requests() + assert len(reqs) == 2 + assert {parse_qs(urlsplit(str(r.url)).query)["location"][0] for r in reqs} == { + "stateCd:RI", + "stateCd:WI", + } + + +def test_fan_out_is_serial_when_concurrency_is_one(httpx_mock, monkeypatch): + """``MAX_CONCURRENT_REQUESTS = 1`` still fans out correctly (serial path).""" + monkeypatch.setattr(wateruse, "MAX_CONCURRENT_REQUESTS", 1) + httpx_mock.add_response( + method="GET", url=re.compile(r".*location=stateCd%3ARI.*"), text=_CSV_P1 + ) + httpx_mock.add_response( + method="GET", url=re.compile(r".*location=stateCd%3AWI.*"), text=_CSV_P2 + ) + + df, _ = get_wateruse(model="wu-public-supply-wd", state=["RI", "WI"]) + + assert len(df) == 3 + assert len(httpx_mock.get_requests()) == 2 + + +def test_fan_out_surfaces_final_rate_limit_header(httpx_mock): + """``md.header`` reports the lowest (latest) remaining quota across the fan-out, + not the first request's value.""" + httpx_mock.add_response( + method="GET", + url=re.compile(r".*location=stateCd%3ARI.*"), + text=_CSV_P1, + headers={"x-ratelimit-remaining": "900"}, + ) + httpx_mock.add_response( + method="GET", + url=re.compile(r".*location=stateCd%3AWI.*"), + text=_CSV_P2, + headers={"x-ratelimit-remaining": "850"}, + ) + + _, md = get_wateruse(model="wu-public-supply-wd", state=["RI", "WI"]) + + assert md.header["x-ratelimit-remaining"] == "850" + + +# (response aggregation now reuses ogc.planning._combine_chunk_responses; the +# integration test above pins the rate-limit-header behavior end-to-end.) + + +# --- _resolve_locations unit tests (no HTTP) ------------------------------- + + +def test_resolve_locations_state_accepts_name_postal_fips(): + # All three encodings normalize to the two-letter postal code stateCd wants. + assert _resolve_locations("Rhode Island", None, None) == ["stateCd:RI"] + assert _resolve_locations("ri", None, None) == ["stateCd:RI"] + assert _resolve_locations("44", None, None) == ["stateCd:RI"] + assert _resolve_locations(44, None, None) == ["stateCd:RI"] + + +def test_resolve_locations_county_five_digit_fips(): + assert _resolve_locations(None, "55025", None) == ["countyCd:55025"] + + +@pytest.mark.parametrize( + "code,expected", + [ + ("04", "huc2:04"), + ("0109", "huc4:0109"), + ("07070005", "huc8:07070005"), + ("010900020502", "huc12:010900020502"), + ], +) +def test_resolve_locations_huc_level_from_length(code, expected): + assert _resolve_locations(None, None, code) == [expected] + + +def test_resolve_locations_accepts_lists(): + assert _resolve_locations(["RI", "Wisconsin"], None, None) == [ + "stateCd:RI", + "stateCd:WI", + ] + assert _resolve_locations(None, ["55025", "55021"], None) == [ + "countyCd:55025", + "countyCd:55021", + ] + assert _resolve_locations(None, None, ["04", "070700"]) == [ + "huc2:04", + "huc6:070700", + ] + + +def test_resolve_locations_requires_exactly_one(): + with pytest.raises(ValueError, match="exactly one"): + _resolve_locations(None, None, None) + with pytest.raises(ValueError, match="exactly one"): + _resolve_locations("RI", "55025", None) + + +def test_resolve_locations_empty_list_rejected(): + with pytest.raises(ValueError, match="empty"): + _resolve_locations([], None, None) + + +def test_resolve_locations_rejects_malformed_selectors(): + with pytest.raises(ValueError): # unrecognized state + _resolve_locations("Atlantis", None, None) + with pytest.raises(ValueError, match="five-digit"): # county not 5 digits + _resolve_locations(None, "025", None) + with pytest.raises(ValueError, match="hydrologic unit"): # odd-length huc + _resolve_locations(None, None, "123") + + +# --- _next_page_url unit tests (no HTTP) ----------------------------------- + + +def test_next_page_url_none_when_no_link(): + resp = httpx.Response(200, text="") + assert _next_page_url(resp) is None + + +def test_next_page_url_none_when_link_has_no_next(): + resp = httpx.Response( + 200, + text="", + headers={"link": '; rel="prev"'}, + ) + assert _next_page_url(resp) is None + + +def test_next_page_url_rewrites_bare_host(): + resp = httpx.Response( + 200, + text="", + headers={ + "link": '; rel="next"' + }, + ) + assert _next_page_url(resp) == ( + "https://api.water.usgs.gov/nwaa-data/data?skip=600" + ) + + +def test_next_page_url_leaves_api_host_untouched(): + url = "https://api.water.usgs.gov/nwaa-data/data?skip=600" + resp = httpx.Response(200, text="", headers={"link": f'<{url}>; rel="next"'}) + # Must not double-prefix into ``api.api.water.usgs.gov``. + assert _next_page_url(resp) == url + + +def test_module_exposes_catalog_constants(): + assert "wu-public-supply-wd" in wateruse.MODELS + assert set(wateruse.TIME_RESOLUTIONS) == {"monthly", "annualcy", "annualwy"}