diff --git a/AGENTS.md b/AGENTS.md
index 91c07b24..455c108c 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -6,7 +6,7 @@
- Exclude `.claude/worktrees/` from searches and edits; it contains stale worktrees that pollute results.
## Example Notebooks
-- `demos/*.ipynb` — top-level Water Data tour: `USGS_WaterData_Introduction_Examples.ipynb` is the entry point; `_ContinuousData_`, `_DailyStatistics_`, `_DiscreteSamples_`, `_ReferenceLists_` cover individual collections; `WaterData_demo.ipynb`, `peak_streamflow_trends.ipynb`, and `R Python Vignette equivalents.ipynb` are standalone walkthroughs.
+- `demos/*.ipynb` — top-level Water Data tour: `USGS_WaterData_Introduction_Examples.ipynb` is the entry point; `_ContinuousData_`, `_DailyStatistics_`, `_DiscreteSamples_`, `_ReferenceLists_` cover individual collections; `WaterData_demo.ipynb`, `peak_streamflow_trends.ipynb`, `USGS_WaterUse_Examples.ipynb` (NWDC water-use data via `wateruse.get_wateruse`), and `R Python Vignette equivalents.ipynb` are standalone walkthroughs.
- `demos/hydroshare/*.ipynb` — per-service HydroShare examples (NLDI, NWIS WaterUse, and Water Data DailyValues / GroundwaterLevels / Measurements / ParameterCodes / Peaks / Ratings / Samples / SiteInfo / SiteInventory / Statistics / UnitValues). Mirror these when adding examples for a new collection.
- `demos/nwqn_data_pull/` — non-notebook example: a lithops/Docker batch pipeline (`retrieve_nwqn_samples.py`, `retrieve_nwqn_streamflow.py`) with its own `README.md`.
- Any `Untitled*.ipynb`, `*_test.ipynb`, or notebooks not listed here are untracked local scratch; ignore them.
diff --git a/README.md b/README.md
index bf3a319d..f7ce3039 100644
--- a/README.md
+++ b/README.md
@@ -133,7 +133,7 @@ sites, metadata = ngwmn.get_sites(state='Wisconsin')
print(f"Found {len(sites)} NGWMN sites in Wisconsin")
-# Pull water levels from the first twenty sites over a time window.
+# Pull water levels from the first twenty sites over a time window.
water_levels, metadata = ngwmn.get_water_level(
monitoring_location_id=sites['monitoring_location_id'][:20],
datetime=['2022-01-01', '2024-01-01']
@@ -192,6 +192,31 @@ flowlines = nldi.get_flowlines(
print(f"Found {len(flowlines)} upstream tributaries within 50km")
```
+### Water Use (NWDC)
+
+Retrieve modeled water-use estimates from the National Water Availability
+Assessment Data Companion:
+
+```python
+from dataretrieval import wateruse
+
+# Monthly public-supply withdrawals for Rhode Island, split into
+# groundwater and surface-water sources (returns a DataFrame and metadata).
+df, metadata = wateruse.get_wateruse(
+ model='wu-public-supply-wd',
+ variable=['pswdtot', 'pswdgw', 'pswdsw'],
+ state='RI', # name/postal/FIPS; pass a list to fan out over several areas
+ start_date='2020-01',
+ time_resolution='monthly',
+)
+
+print(f"Retrieved {len(df)} records across {df['huc12_id'].nunique()} watersheds")
+
+# Aggregate the HUC12 grid to a statewide monthly total (million gallons/day)
+statewide = df.groupby('year_month')['pswdtot_mgd'].sum()
+print(statewide.head())
+```
+
## Available Data Services
### Modern USGS Water Data APIs (Recommended) — `dataretrieval.waterdata`
@@ -232,6 +257,13 @@ print(f"Found {len(flowlines)} upstream tributaries within 50km")
- `get_features`: Find monitoring sites, dams, and other features along the network
- `get_features_by_data_source`: Features from a specific data source
+### Water Use (NWDC)
+- **Public supply**: Modeled public-supply withdrawals and consumptive use
+- **Irrigation**: Modeled irrigation withdrawals and consumptive use
+- **Thermoelectric**: Modeled thermoelectric-power water use
+- **HUC12 estimates**: National coverage on a 12-digit hydrologic-unit grid,
+ summarizable to counties, states, or coarser hydrologic units
+
## More Examples
Explore additional examples in the
diff --git a/dataretrieval/__init__.py b/dataretrieval/__init__.py
index 97420f43..c9df1c45 100644
--- a/dataretrieval/__init__.py
+++ b/dataretrieval/__init__.py
@@ -11,7 +11,8 @@
df, meta = nwis.get_dv(sites="05427718")
Available service modules: ``waterdata``, ``wqp`` (Water Quality Portal),
-``nldi``, ``streamstats``, and the deprecated ``nwis``.
+``wateruse`` (NWDC water-use data), ``nldi``, ``streamstats``, and the
+deprecated ``nwis``.
``nldi`` requires geopandas (``pip install dataretrieval[nldi]``) and is
imported on demand: ``from dataretrieval import nldi``.
@@ -62,6 +63,7 @@
streamstats,
utils,
waterdata,
+ wateruse,
wqp,
)
@@ -72,6 +74,7 @@
"streamstats",
"utils",
"waterdata",
+ "wateruse",
"wqp",
# error taxonomy (canonical home: ``dataretrieval.exceptions``), re-exported
# so callers can ``except dataretrieval.DataRetrievalError``
diff --git a/dataretrieval/nwis.py b/dataretrieval/nwis.py
index 49b91a51..25e9cf8f 100644
--- a/dataretrieval/nwis.py
+++ b/dataretrieval/nwis.py
@@ -741,8 +741,17 @@ def get_pmcodes(**kwargs: Any) -> NoReturn:
def get_water_use(**kwargs: Any) -> NoReturn:
- """Defunct: no current replacement."""
- raise NameError("`nwis.get_water_use` is defunct.")
+ """Defunct: use ``dataretrieval.wateruse.get_wateruse`` instead.
+
+ The legacy NWIS water-use service has been retired. Modeled water-use
+ estimates are now served by the National Water Availability Assessment Data
+ Companion (NWDC); retrieve them with
+ :func:`dataretrieval.wateruse.get_wateruse`.
+ """
+ raise NameError(
+ "`nwis.get_water_use` is defunct; use "
+ "`dataretrieval.wateruse.get_wateruse` instead."
+ )
@_deprecated
diff --git a/dataretrieval/ogc/chunking.py b/dataretrieval/ogc/chunking.py
index b7fff335..00d2766c 100644
--- a/dataretrieval/ogc/chunking.py
+++ b/dataretrieval/ogc/chunking.py
@@ -69,15 +69,14 @@
import functools
import os
from collections.abc import Callable, Iterator
-from contextlib import contextmanager
-from contextvars import ContextVar, copy_context
+from contextvars import copy_context
from typing import Any, cast
import httpx
import pandas as pd
from anyio.from_thread import start_blocking_portal
-from dataretrieval.utils import HTTPX_DEFAULTS
+from dataretrieval.utils import HTTPX_DEFAULTS, Ambient
from . import progress as _progress
from .interruptions import (
@@ -106,9 +105,6 @@
_OGC_URL_BYTE_LIMIT = 8000
-# Response header USGS uses to advertise remaining hourly quota.
-_QUOTA_HEADER = "x-ratelimit-remaining"
-
# Fan-out concurrency cap, read at call time (not import) so test
# ``monkeypatch.setenv`` applies. Value grammar in :func:`_read_concurrency_env`;
# the concurrency model is in the module docstring.
@@ -152,38 +148,11 @@ def _read_concurrency_env() -> int | None:
return value
-# Shared per-call ``httpx.AsyncClient``, published via :func:`_publish`
-# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``)
-# reuse the same connection pool across every sub-request. ``None``
-# outside a chunked call — paginated helpers then open their own
-# short-lived client.
-_chunked_client: ContextVar[httpx.AsyncClient | None] = ContextVar(
- "_chunked_client", default=None
-)
-
-
-@contextmanager
-def _publish(client: httpx.AsyncClient) -> Iterator[None]:
- """
- Publish ``client`` on the ``_chunked_client`` ContextVar so the
- paginated-loop helpers can borrow it via :func:`get_active_client`
- for the duration of the ``with`` block.
-
- Parameters
- ----------
- client : httpx.AsyncClient
- The client to publish.
-
- Yields
- ------
- None
- Yields once, for the duration of the bind.
- """
- token = _chunked_client.set(client)
- try:
- yield
- finally:
- _chunked_client.reset(token)
+# Shared per-call ``httpx.AsyncClient``, scoped via ``with _chunked_client(c):``
+# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``) reuse
+# the same connection pool across every sub-request. ``None`` outside a chunked
+# call — paginated helpers then open their own short-lived client.
+_chunked_client: Ambient[httpx.AsyncClient | None] = Ambient("_chunked_client", None)
def get_active_client() -> httpx.AsyncClient | None:
@@ -197,8 +166,8 @@ def get_active_client() -> httpx.AsyncClient | None:
Returns
-------
httpx.AsyncClient or None
- The client published via :func:`_publish` if currently inside a
- :class:`ChunkedCall` run; ``None`` otherwise.
+ The client scoped via ``with _chunked_client(...)`` if currently inside
+ a :class:`ChunkedCall` run; ``None`` otherwise.
"""
return _chunked_client.get()
@@ -541,7 +510,7 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
)
async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
- with _publish(client):
+ with _chunked_client(client):
reporter = _progress.current()
if reporter is not None:
reporter.set_chunks(self.plan.total)
diff --git a/dataretrieval/ogc/engine.py b/dataretrieval/ogc/engine.py
index 2fe89440..ab4b8371 100644
--- a/dataretrieval/ogc/engine.py
+++ b/dataretrieval/ogc/engine.py
@@ -24,44 +24,42 @@
from __future__ import annotations
-import copy
import functools
import json
import logging
import numbers
-import os
import re
from collections.abc import (
AsyncIterator,
Awaitable,
Callable,
Iterable,
- Iterator,
Mapping,
)
-from contextlib import asynccontextmanager, contextmanager
-from contextvars import ContextVar
+from contextlib import asynccontextmanager
from dataclasses import dataclass, field
-from datetime import timedelta
from typing import Any, TypeVar, cast
import httpx
import pandas as pd
from anyio.from_thread import start_blocking_portal
-from dataretrieval import __version__
from dataretrieval.exceptions import DataRetrievalError
from dataretrieval.ogc import chunking
from dataretrieval.ogc import progress as _progress
-from dataretrieval.ogc.chunking import (
- _QUOTA_HEADER,
- get_active_client,
-)
+from dataretrieval.ogc.chunking import get_active_client
from dataretrieval.ogc.dates import _DATE_RANGE_PARAMS, _format_api_dates
from dataretrieval.ogc.errors import _paginated_failure_message, _raise_for_non_200
-from dataretrieval.ogc.planning import _safe_elapsed
+from dataretrieval.ogc.planning import _QUOTA_HEADER, _merge_response, _safe_elapsed
from dataretrieval.ogc.shaping import GEOPANDAS, _finalize_ogc, _get_resp_data
-from dataretrieval.utils import HTTPX_DEFAULTS, BaseMetadata, _get, _network_error
+from dataretrieval.utils import (
+ HTTPX_DEFAULTS,
+ Ambient,
+ BaseMetadata,
+ _default_headers,
+ _get,
+ _network_error,
+)
# Set up logger for this module
logger = logging.getLogger(__name__)
@@ -233,36 +231,14 @@ def _cql2_param(args: dict[str, Any]) -> str:
which roughly doubles how many monitoring-location ids fit in one
sub-request and so halves the chunk count for large id lists.
"""
- filters = []
- for key, values in args.items():
- filters.append({"op": "in", "args": [{"property": key}, values]})
-
- query = {"op": "and", "args": filters}
-
- return json.dumps(query, separators=(",", ":"))
-
-
-def _default_headers() -> dict[str, str]:
- """
- Generate default HTTP headers for API requests.
-
- Returns
- -------
- dict
- A dictionary containing default headers including 'Accept-Encoding',
- 'Accept', 'User-Agent', and 'lang'. If the environment variable
- 'API_USGS_PAT' is set, its value is included as the 'X-Api-Key' header.
- """
- headers = {
- "Accept-Encoding": "compress, gzip",
- "Accept": "application/json",
- "User-Agent": f"python-dataretrieval/{__version__}",
- "lang": "en-US",
+ query = {
+ "op": "and",
+ "args": [
+ {"op": "in", "args": [{"property": key}, values]}
+ for key, values in args.items()
+ ],
}
- token = os.getenv("API_USGS_PAT")
- if token:
- headers["X-Api-Key"] = token
- return headers
+ return json.dumps(query, separators=(",", ":"))
def _check_ogc_requests(endpoint: str, req_type: str = "queryables") -> dict[str, Any]:
@@ -294,7 +270,7 @@ def _check_ogc_requests(endpoint: str, req_type: str = "queryables") -> dict[str
"""
if req_type not in ("queryables", "schema"):
raise ValueError(f"req_type must be 'queryables' or 'schema', got {req_type!r}")
- url = f"{_ogc_base_url_var.get()}/collections/{endpoint}/{req_type}"
+ url = f"{_ogc_base_url.get()}/collections/{endpoint}/{req_type}"
resp = _get(url, headers=_default_headers(), **HTTPX_DEFAULTS)
_raise_for_non_200(resp)
# ``Response.json`` is typed ``Any``; the OGC queryables/schema endpoints
@@ -374,8 +350,8 @@ def _construct_api_requests(
-----
- Date/time parameters are automatically formatted to ISO8601.
"""
- service_url = f"{_ogc_base_url_var.get()}/collections/{service}/items"
- dialect = _dialect_var.get()
+ service_url = f"{_ogc_base_url.get()}/collections/{service}/items"
+ dialect = _dialect.get()
# Format date/time parameters to ISO8601 first — both routing paths need it.
for key in _DATE_RANGE_PARAMS:
@@ -473,7 +449,7 @@ def _construct_cql_request(
httpx.Request
A POST request with ``Content-Type: application/query-cql-json``.
"""
- service_url = f"{_ogc_base_url_var.get()}/collections/{service}/items"
+ service_url = f"{_ogc_base_url.get()}/collections/{service}/items"
params = _ogc_query_params(
{},
properties=properties,
@@ -605,108 +581,25 @@ async def _client_for(
yield new
-def _aggregate_paginated_response(
- initial: httpx.Response,
- last: httpx.Response,
- total_elapsed: timedelta,
-) -> httpx.Response:
- """
- Build a single response covering a paginated call.
-
- Returns a shallow copy of ``initial`` with ``.headers`` set to the
- LAST page's (so downstream sees current ``x-ratelimit-remaining``)
- and ``.elapsed`` set to total wall-clock. The canonical
- ``initial.url`` is preserved (it's the user's original query).
- Both ``initial`` and ``last`` are left unmutated, mirroring the
- convention of
- :func:`dataretrieval.ogc.planning._combine_chunk_responses`.
-
- Parameters
- ----------
- initial : httpx.Response
- First-page response (the canonical one for ``md.url``).
- last : httpx.Response
- Last-page response — supplies the headers to copy over.
- total_elapsed : datetime.timedelta
- Cumulative wall-clock across every page, including ``initial``.
-
- Returns
- -------
- httpx.Response
- A shallow copy of ``initial`` with ``.headers`` set to a fresh
- ``httpx.Headers`` and ``.elapsed`` set to the cumulative
- wall-clock. ``initial.headers`` / ``initial.elapsed`` are
- never mutated, so callers holding a pre-pagination reference
- still see the original first-page values.
- """
- final = copy.copy(initial)
- final.headers = httpx.Headers(last.headers)
- final.elapsed = total_elapsed
- return final
-
-
_Cursor = TypeVar("_Cursor")
-# Optional cap on the total rows a single paginated call accumulates before it
-# stops following ``next`` links. ``None`` (the default the data getters use)
-# means "no cap — fetch the whole series". Set via :func:`_row_cap` so the deep
-# ``_paginate`` loop can honor it without threading the value through the
-# generic chunker; this mirrors the ``_progress`` ambient-reporter pattern.
-_row_cap_var: ContextVar[int | None] = ContextVar("ogc_row_cap", default=None)
-
-
-@contextmanager
-def _row_cap(max_rows: int | None) -> Iterator[None]:
- """Cap the rows any :func:`_paginate` under this context will
- accumulate (``None`` = uncapped). Used by :func:`get_reference_table`
- to preview large tables without downloading every page."""
- token = _row_cap_var.set(max_rows)
- try:
- yield
- finally:
- _row_cap_var.reset(token)
-
+# Ambient per-call state the generic chunker would otherwise have to thread
+# through to the deep request builder / paginate loop. Each is read with
+# ``.get()`` and scoped with ``with _x(value):``; the defaults leave every
+# existing getter unaffected. (Mirrors the ``_progress`` ambient-reporter.)
-# OGC base URL for the active request. ``get_ogc_data`` sets it per call so the
-# shared request builder (:func:`_construct_api_requests`) can target either the
-# main Water Data API or the NGWMN sub-API without threading the value through
-# the generic chunker; this mirrors the ``_row_cap`` ambient pattern. The
-# default is the main API, so every existing getter is unaffected.
-_ogc_base_url_var: ContextVar[str] = ContextVar("ogc_base_url", default=OGC_API_URL)
+# Optional cap on the rows one paginated call accumulates before it stops
+# following ``next`` links (``None`` = uncapped). Set by :func:`get_reference_table`
+# to preview large tables without downloading every page.
+_row_cap: Ambient[int | None] = Ambient("ogc_row_cap", None)
+# OGC base URL the shared request builder (:func:`_construct_api_requests`)
+# targets — the main Water Data API or, for NGWMN collections, their own base.
+_ogc_base_url: Ambient[str] = Ambient("ogc_base_url", OGC_API_URL)
-@contextmanager
-def _ogc_base_url(base_url: str) -> Iterator[None]:
- """Point :func:`_construct_api_requests` (and the chunk planner that calls
- it) at ``base_url`` for the duration of the block. Used by
- :func:`get_ogc_data` to serve NGWMN collections from their own OGC base."""
- token = _ogc_base_url_var.set(base_url)
- try:
- yield
- finally:
- _ogc_base_url_var.reset(token)
-
-
-# Per-call OGC dialect (which services need POST/CQL2, which use date-only time
-# args). ``get_ogc_data`` sets it so the shared request builder
-# (:func:`_construct_api_requests`) can adapt to the active API without
-# threading the value through the generic chunker; this mirrors the
-# ``_ogc_base_url`` ambient pattern. The default is a plain OGC API.
-_dialect_var: ContextVar[OgcDialect] = ContextVar(
- "ogc_dialect", default=_DEFAULT_DIALECT
-)
-
-
-@contextmanager
-def _dialect(dialect: OgcDialect) -> Iterator[None]:
- """Make ``dialect`` the active :class:`OgcDialect` that
- :func:`_construct_api_requests` reads for CQL2-vs-GET routing and
- date-only formatting, for the duration of the block."""
- token = _dialect_var.set(dialect)
- try:
- yield
- finally:
- _dialect_var.reset(token)
+# Per-call OGC dialect the request builder reads for CQL2-vs-GET routing and
+# date-only formatting (default: a plain OGC API).
+_dialect: Ambient[OgcDialect] = Ambient("ogc_dialect", _DEFAULT_DIALECT)
async def _paginate(
@@ -715,6 +608,7 @@ async def _paginate(
parse_response: Callable[[httpx.Response], tuple[pd.DataFrame, _Cursor | None]],
follow_up: Callable[[_Cursor, httpx.AsyncClient], Awaitable[httpx.Response]],
client: httpx.AsyncClient | None = None,
+ raise_for_status: Callable[[httpx.Response], None] = _raise_for_non_200,
) -> tuple[pd.DataFrame, httpx.Response]:
"""
Drive a paginated request to completion over an
@@ -745,6 +639,10 @@ async def _paginate(
Caller-borrowed client. ``None`` (default) means use the
chunker's shared client (if inside a chunked call) or open
a temporary one.
+ raise_for_status : callable, optional
+ ``resp -> None``; raises the typed error for a non-OK response.
+ Defaults to :func:`_raise_for_non_200` (the OGC ``{code, description}``
+ envelope); wateruse passes its own to surface the NWDC ``detail``.
Returns
-------
@@ -775,9 +673,19 @@ async def _paginate(
"""
logger.debug("Requesting: %s", initial_req.url)
reporter = _progress.current()
+
+ def report_page(page: httpx.Response, frame: pd.DataFrame) -> None:
+ """Tick the ambient progress reporter (a no-op when unset) for one page."""
+ if reporter is not None:
+ reporter.set_rate_remaining(
+ page.headers.get(_QUOTA_HEADER),
+ limit=page.headers.get("x-ratelimit-limit"),
+ )
+ reporter.add_page(rows=len(frame))
+
async with _client_for(client) as sess:
resp = await sess.send(initial_req)
- _raise_for_non_200(resp)
+ raise_for_status(resp)
initial_response = resp
total_elapsed = _safe_elapsed(resp)
@@ -794,28 +702,25 @@ async def _paginate(
# Stop following ``next`` links once the optional row cap is reached
# (see :func:`_row_cap`); ``None`` means uncapped. The concatenation
# is sliced to the cap below so a final over-budget page can't exceed it.
- cap = _row_cap_var.get()
+ cap = _row_cap.get()
nrows = len(df)
- if reporter is not None:
- reporter.set_rate_remaining(
- resp.headers.get(_QUOTA_HEADER),
- limit=resp.headers.get("x-ratelimit-limit"),
- )
- reporter.add_page(rows=len(df))
- while cursor is not None and (cap is None or nrows < cap):
+ # Guard a non-advancing or cyclic cursor (a server bug that would
+ # otherwise loop forever). OGC's next-URLs are unique, so this never
+ # fires for them; the Link-header pagers (e.g. wateruse) rely on it.
+ seen: set[Any] = set()
+ report_page(resp, df)
+ while (
+ cursor is not None and cursor not in seen and (cap is None or nrows < cap)
+ ):
+ seen.add(cursor)
try:
resp = await follow_up(cursor, sess)
- _raise_for_non_200(resp)
+ raise_for_status(resp)
df, cursor = parse_response(resp)
dfs.append(df)
nrows += len(df)
total_elapsed += _safe_elapsed(resp)
- if reporter is not None:
- reporter.set_rate_remaining(
- resp.headers.get(_QUOTA_HEADER),
- limit=resp.headers.get("x-ratelimit-limit"),
- )
- reporter.add_page(rows=len(df))
+ report_page(resp, df)
except Exception as e: # noqa: BLE001
logger.warning(
"Request failed at cursor %r. Data download interrupted.",
@@ -823,12 +728,13 @@ async def _paginate(
)
raise DataRetrievalError(_paginated_failure_message(len(dfs), e)) from e
- # Aggregate headers / elapsed onto a COPY of the initial
- # response so the user's caller never sees an in-place
- # mutation of the response object they may have inspected
- # mid-pagination via a hook or test fixture.
- final_response = _aggregate_paginated_response(
- initial_response, resp, total_elapsed
+ # Fold the pages onto a COPY of the initial response so a caller that
+ # inspected it mid-pagination (a hook, a test fixture) never sees an
+ # in-place mutation. ``resp`` is the last page, whose headers carry the
+ # current ``x-ratelimit-remaining`` (monotonic, so the last page is the
+ # most depleted) — the same low-level merge the fan-out aggregation uses.
+ final_response = _merge_response(
+ initial_response, headers_from=resp, elapsed=total_elapsed
)
result = pd.concat(dfs, ignore_index=True)
if cap is not None:
@@ -1039,6 +945,7 @@ def _run_sync(
make_coro: Callable[[], Awaitable[tuple[pd.DataFrame, httpx.Response]]],
*,
service: str,
+ error_url: str | httpx.URL | None = None,
) -> tuple[pd.DataFrame, httpx.Response]:
"""Drive an async OGC fetch to completion from synchronous code.
@@ -1051,6 +958,11 @@ def _run_sync(
Shared by the non-chunked fetch paths; the chunked OGC getters
drive their own portal
inside :meth:`chunking.ChunkedCall.resume`.
+
+ A connection failure on the initial request is surfaced as a typed
+ ``NetworkError`` against ``error_url`` when given (callers that build their
+ own requests, e.g. ``wateruse``), else the request-builder base the caller
+ scoped via ``_ogc_base_url`` (the OGC / NGWMN getters).
"""
with _progress.progress_context(service=service):
with start_blocking_portal() as portal:
@@ -1064,9 +976,15 @@ def _run_sync(
except httpx.TransportError as exc:
# The initial-request connection failure ``_paginate`` lets
# through raw; mid-pagination failures are already typed.
- # Report the base URL actually targeted (NGWMN/sibling APIs
- # set their own via ``_ogc_base_url``), not a hardcoded host.
- raise _network_error(_ogc_base_url_var.get(), exc) from exc
+ # Report the base URL actually targeted: callers that build
+ # their own requests (``wateruse``) pass ``error_url``; the OGC
+ # getters leave it unset and fall back to the request-builder
+ # base they scoped via ``_ogc_base_url`` (NGWMN/sibling APIs set
+ # their own), not a hardcoded host.
+ raise _network_error(
+ error_url if error_url is not None else _ogc_base_url.get(),
+ exc,
+ ) from exc
# ``AGENCY-ID``: a hyphen-separated agency prefix and local id. The local id
@@ -1187,19 +1105,14 @@ def _check_monitoring_location_id(
if value is None:
return None
for item in (value,) if isinstance(value, str) else value:
- _check_id_format(item)
+ if not _MONITORING_LOCATION_ID_RE.fullmatch(item):
+ raise ValueError(
+ f"Invalid monitoring_location_id: {item!r}. "
+ f"Expected 'AGENCY-ID' format, e.g., 'USGS-01646500'."
+ )
return value
-def _check_id_format(value: str) -> None:
- """Raise ``ValueError`` if ``value`` is not in ``AGENCY-ID`` format."""
- if not _MONITORING_LOCATION_ID_RE.fullmatch(value):
- raise ValueError(
- f"Invalid monitoring_location_id: {value!r}. "
- f"Expected 'AGENCY-ID' format, e.g., 'USGS-01646500'."
- )
-
-
def _get_args(
local_vars: dict[str, Any],
exclude: set[str] | None = None,
diff --git a/dataretrieval/ogc/planning.py b/dataretrieval/ogc/planning.py
index 3e3c1ccf..68c337ae 100644
--- a/dataretrieval/ogc/planning.py
+++ b/dataretrieval/ogc/planning.py
@@ -563,6 +563,56 @@ def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame:
return combined
+# Response header USGS uses to advertise remaining hourly quota. Lives in this
+# base module so every layer (planning's ``_lowest_remaining``, the engine's
+# per-page progress) reads it from one place rather than hard-coding the string.
+_QUOTA_HEADER = "x-ratelimit-remaining"
+
+
+def _lowest_remaining(responses: list[httpx.Response]) -> httpx.Response:
+ """The response reporting the lowest ``x-ratelimit-remaining``.
+
+ The rate-limit counter decreases monotonically within a window, so the
+ smallest value any sub-request saw is the most-current "quota left after
+ this call" — the right thing to surface. Under concurrent fan-out the
+ last response *by index* need not be the one the server processed last, so
+ pick the minimum (falling back to the last response if none report it).
+ """
+ best: httpx.Response | None = None
+ best_remaining: int | None = None
+ for response in responses:
+ try:
+ remaining = int(response.headers[_QUOTA_HEADER])
+ except (KeyError, ValueError):
+ continue
+ if best_remaining is None or remaining < best_remaining:
+ best, best_remaining = response, remaining
+ return best if best is not None else responses[-1]
+
+
+def _merge_response(
+ base: httpx.Response,
+ *,
+ headers_from: httpx.Response,
+ elapsed: timedelta,
+ url: str | httpx.URL | None = None,
+) -> httpx.Response:
+ """Fold several responses into one: a shallow copy of ``base`` whose
+ ``.headers`` are rebuilt as a fresh ``httpx.Headers`` from ``headers_from``,
+ ``.elapsed`` set to ``elapsed``, and ``.url`` overridden when ``url`` is
+ given. ``base`` and ``headers_from`` are never mutated, and the fresh
+ ``httpx.Headers`` means downstream mutations don't back-propagate into any
+ underlying response — so callers may re-fold idempotently. This is the one
+ low-level merge behind both pagination (:func:`_paginate`) and the chunked /
+ fan-out aggregation (:func:`_combine_chunk_responses`)."""
+ merged = copy.copy(base)
+ merged.headers = httpx.Headers(headers_from.headers)
+ merged.elapsed = elapsed
+ if url is not None:
+ _set_response_url(merged, url)
+ return merged
+
+
def _combine_chunk_responses(
responses: list[httpx.Response], canonical_url: str | None
) -> httpx.Response:
@@ -570,8 +620,9 @@ def _combine_chunk_responses(
Fold per-sub-request responses into a single aggregated response.
For a multi-response input, returns a shallow copy of
- ``responses[0]`` with ``.headers`` set to the last response's (so
- ``x-ratelimit-remaining`` reflects current state), ``.elapsed`` set
+ ``responses[0]`` with ``.headers`` set to those of the most-depleted
+ response (lowest ``x-ratelimit-remaining`` — the quota actually left
+ after the fan-out; see :func:`_lowest_remaining`), ``.elapsed`` set
to total wall-clock across every response, and ``.url`` set to the
canonical original-query URL (when supplied) so ``BaseMetadata``
reflects the user's full request rather than the first chunk.
@@ -605,20 +656,15 @@ def _combine_chunk_responses(
if len(responses) == 1 and canonical_url is None:
return responses[0]
- # ``copy.copy`` lets repeated calls re-sum elapsed from scratch
- # rather than re-mutating ``responses[0]`` in place. The headers
- # dict is then rewrapped in a fresh ``httpx.Headers`` so the
- # aggregate's headers don't share identity with — or leak mutations
- # back into — any underlying response on ``ChunkedCall._chunks``.
- head = copy.copy(responses[0])
- if len(responses) > 1:
- head.headers = httpx.Headers(responses[-1].headers)
- head.elapsed = sum(
- (_safe_elapsed(r) for r in responses[1:]),
- start=_safe_elapsed(responses[0]),
- )
- else:
- head.headers = httpx.Headers(responses[0].headers)
- if canonical_url is not None:
- _set_response_url(head, canonical_url)
- return head
+ # Headers come from the most-depleted response (lowest quota left after a
+ # concurrent fan-out; ``_lowest_remaining`` returns the lone response as-is
+ # for a single-element list). ``_merge_response`` re-sums elapsed onto a
+ # fresh copy, so repeated calls (e.g. via ``ChunkedCall.partial_response``
+ # during resume) stay idempotent.
+ elapsed = sum((_safe_elapsed(r) for r in responses), start=timedelta())
+ return _merge_response(
+ responses[0],
+ headers_from=_lowest_remaining(responses),
+ elapsed=elapsed,
+ url=canonical_url,
+ )
diff --git a/dataretrieval/utils.py b/dataretrieval/utils.py
index e6cce009..8d28ee86 100644
--- a/dataretrieval/utils.py
+++ b/dataretrieval/utils.py
@@ -4,9 +4,12 @@
from __future__ import annotations
+import os
import warnings
-from collections.abc import Iterable
-from typing import Any
+from collections.abc import Callable, Iterable, Iterator
+from contextlib import contextmanager
+from contextvars import ContextVar
+from typing import Any, Generic, TypeVar
import httpx
import pandas as pd
@@ -28,6 +31,69 @@
"timeout": httpx.Timeout(60.0, connect=10.0),
}
+_T = TypeVar("_T")
+
+
+class Ambient(Generic[_T]):
+ """A :class:`~contextvars.ContextVar` paired with a scoping contextmanager.
+
+ Bundles the var and its set/reset-token dance into one object, so an ambient
+ value needs a single declaration instead of a ``var`` + setter-function pair.
+ Read the current value with :meth:`get`; set it for a ``with`` block by
+ *calling* the instance — the previous value is restored on exit (and can't
+ leak into a later call the way a hand-written ``try/finally`` can when its
+ ``reset`` is dropped)::
+
+ _base_url = Ambient("ogc_base_url", DEFAULT)
+ with _base_url(other): # scoped to the block
+ _base_url.get() # -> other
+ """
+
+ def __init__(self, name: str, default: _T) -> None:
+ self._var: ContextVar[_T] = ContextVar(name, default=default)
+
+ def get(self) -> _T:
+ """The current value — the default outside any active scope."""
+ return self._var.get()
+
+ @contextmanager
+ def __call__(self, value: _T) -> Iterator[None]:
+ """Set the value for the duration of the ``with`` block."""
+ token = self._var.set(value)
+ try:
+ yield
+ finally:
+ self._var.reset(token)
+
+
+def _default_headers() -> dict[str, str]:
+ """Build the default HTTP headers for a USGS web-API request.
+
+ Always sets a descriptive ``User-Agent`` plus ``Accept`` /
+ ``Accept-Encoding`` and ``lang``. If the ``API_USGS_PAT`` environment
+ variable is set, its value is added as the ``X-Api-Key`` header — a USGS
+ personal access token raises the request rate limit.
+
+ Shared by the OGC engine (:mod:`dataretrieval.ogc`), the Water Data getters
+ (:mod:`dataretrieval.waterdata`), and :mod:`dataretrieval.wateruse`, so the
+ request identity is consistent across every USGS API the package talks to.
+
+ Returns
+ -------
+ dict[str, str]
+ Headers suitable for an ``httpx`` request against a USGS API.
+ """
+ headers = {
+ "Accept-Encoding": "compress, gzip",
+ "Accept": "application/json",
+ "User-Agent": f"python-dataretrieval/{dataretrieval.__version__}",
+ "lang": "en-US",
+ }
+ token = os.getenv("API_USGS_PAT")
+ if token:
+ headers["X-Api-Key"] = token
+ return headers
+
def to_str(listlike: object, delimiter: str = ",") -> str | None:
"""Translates list-like objects into strings.
@@ -303,26 +369,38 @@ def _get(url: str | httpx.URL, **kwargs: Any) -> httpx.Response:
raise _network_error(url, exc) from exc
-def _raise_for_status(response: httpx.Response) -> None:
+def _raise_for_status(
+ response: httpx.Response,
+ *,
+ detail_from: Callable[[httpx.Response], str | None] | None = None,
+) -> None:
"""Raise the typed :class:`DataRetrievalError` for an HTTP error response;
return ``None`` on success.
- Shared by the legacy :func:`query` path (and ``streamstats``).
- Delegates the status-to-type mapping to
+ Shared by the legacy :func:`query` path (and ``streamstats`` /
+ ``wateruse``). Delegates the status-to-type mapping to
:func:`dataretrieval.exceptions.error_for_status`, except a too-long-URL
status (413 / 414): that gets the same actionable "split your query"
remediation as the client-side over-long-URL case below, rather than a bare
``HTTP 414`` (both still raise :class:`~dataretrieval.exceptions.URLTooLong`).
+
+ ``detail_from``, when given, is called *only on an error response* to pull an
+ API-specific detail string (e.g. a JSON error envelope's message) out of the
+ body; a truthy result is appended to the raised message. This lets callers
+ surface their API's error wording without re-implementing the status-to-type
+ mapping and message format.
"""
status = response.status_code
if status < 400:
return
if status in (413, 414):
raise _url_too_long_error(f"API response reason: {response.reason_phrase}")
- raise error_for_status(
- status,
- f"HTTP {status} {response.reason_phrase}".rstrip() + f" (URL: {response.url})",
- )
+ message = f"HTTP {status} {response.reason_phrase}".rstrip()
+ detail = detail_from(response) if detail_from is not None else None
+ if detail:
+ message += f": {detail}"
+ message += f" (URL: {response.url})"
+ raise error_for_status(status, message)
def query(
diff --git a/dataretrieval/waterdata/stats.py b/dataretrieval/waterdata/stats.py
index 72e5a884..5514bb50 100644
--- a/dataretrieval/waterdata/stats.py
+++ b/dataretrieval/waterdata/stats.py
@@ -18,7 +18,6 @@
from dataretrieval.ogc.engine import (
BASE_URL,
- _default_headers,
_paginate,
_run_sync,
)
@@ -27,7 +26,7 @@
_attach_coordinates,
_empty_feature_frame,
)
-from dataretrieval.utils import BaseMetadata
+from dataretrieval.utils import BaseMetadata, _default_headers
# ``_handle_nesting``'s geopandas branch calls ``gpd.GeoDataFrame.from_features``
# directly, so this module needs its own bound ``gpd`` name. Import it under the
diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py
index 65f9ea2f..5cdbc320 100644
--- a/dataretrieval/waterdata/utils.py
+++ b/dataretrieval/waterdata/utils.py
@@ -35,12 +35,10 @@
OGC_API_URL,
OgcDialect,
_as_str_list,
- _check_id_format,
_check_monitoring_location_id,
_check_ogc_requests,
_construct_api_requests,
_construct_cql_request,
- _default_headers,
_next_req_url,
_normalize_str_iterable,
_paginate,
@@ -68,7 +66,7 @@
from dataretrieval.ogc.shaping import (
_finalize_ogc as _engine_finalize_ogc,
)
-from dataretrieval.utils import BaseMetadata
+from dataretrieval.utils import BaseMetadata, _default_headers
from dataretrieval.waterdata.types import (
PROFILE_LOOKUP,
PROFILES,
@@ -363,7 +361,6 @@ def wrapper(*args: Any, **kwargs: Any) -> _R:
"_accept_legacy_kwargs",
"_arrange_cols",
"_as_str_list",
- "_check_id_format",
"_check_monitoring_location_id",
"_check_ogc_requests",
"_check_profiles",
diff --git a/dataretrieval/wateruse.py b/dataretrieval/wateruse.py
new file mode 100644
index 00000000..635a97fb
--- /dev/null
+++ b/dataretrieval/wateruse.py
@@ -0,0 +1,415 @@
+"""Retrieve USGS water-use data from the National Water Availability
+Assessment Data Companion (NWDC).
+
+The NWDC web services provide national-scale, USGS-modeled water-use data that
+underlie the `USGS National Water Availability Assessment
+`_. Estimates are served on a HUC12
+(12-digit hydrologic unit) spatial grid and can be queried for any county,
+state, or hydrologic unit. This is the modern replacement for the defunct
+legacy NWIS water-use service (``nwis.get_water_use``).
+
+Unlike the main Water Data getters (:mod:`dataretrieval.waterdata`) and NGWMN
+(:mod:`dataretrieval.ngwmn`), the NWDC is a plain CSV REST service rather than
+an OGC API Features collection. This module supplies the NWDC-specific bits —
+request building, CSV parsing, the ``Link``-header cursor, and the ``{detail}``
+error envelope — but reuses the OGC engine's generic, API-agnostic pagination
+and sync-from-async plumbing (:func:`~dataretrieval.ogc.engine._paginate` and
+:func:`~dataretrieval.ogc.engine._run_sync`) rather than re-implementing it. It
+follows the same conventions: shared request headers
+(:func:`~dataretrieval.utils._default_headers`), the typed
+:class:`~dataretrieval.exceptions.DataRetrievalError` taxonomy, and a
+``(DataFrame, BaseMetadata)`` return.
+
+See https://api.water.usgs.gov/docs/nwaa-data/ for the API reference and
+https://water.usgs.gov/nwaa-data/ for the catalog of available models and
+variables.
+
+Examples
+--------
+.. code-block:: python
+
+ from dataretrieval import wateruse
+
+ # Monthly public-supply withdrawals for Rhode Island, 2020 onward.
+ df, md = wateruse.get_wateruse(
+ model="wu-public-supply-wd",
+ variable=["pswdtot", "pswdgw", "pswdsw"],
+ state="RI",
+ start_date="2020-01",
+ time_resolution="monthly",
+ )
+
+"""
+
+from __future__ import annotations
+
+import asyncio
+import io
+from collections.abc import Callable, Iterable
+from typing import Any
+
+import httpx
+import pandas as pd
+
+from dataretrieval.codes.states import to_state
+from dataretrieval.exceptions import DataRetrievalError
+from dataretrieval.ogc.engine import _paginate, _run_sync
+from dataretrieval.ogc.planning import _combine_chunk_frames, _combine_chunk_responses
+from dataretrieval.utils import (
+ HTTPX_DEFAULTS,
+ BaseMetadata,
+ _default_headers,
+ _raise_for_status,
+ to_str,
+)
+
+WATERUSE_URL = "https://api.water.usgs.gov/nwaa-data/data"
+
+#: Water-use models (categories) served by the NWDC. The catalog at
+#: https://water.usgs.gov/nwaa-data/ lists the variables available within each.
+MODELS = (
+ "wu-public-supply-wd", # public-supply withdrawals
+ "wu-public-supply-cu", # public-supply consumptive use
+ "wu-thermoelectric", # thermoelectric-power water use
+ "wu-irrigation-wd", # irrigation withdrawals
+ "wu-irrigation-cu", # irrigation consumptive use
+)
+
+#: Temporal resolutions: monthly, annual calendar year, annual water year.
+TIME_RESOLUTIONS = ("monthly", "annualcy", "annualwy")
+
+#: Maximum locations fetched concurrently when a list of state/county/huc
+#: selectors is fanned out (one request per location). Kept conservative
+#: because this module intentionally carries no request backoff/retry; the
+#: NWDC tolerates this level of concurrency without rate-limit errors (verified
+#: by stress test). Set ``wateruse.MAX_CONCURRENT_REQUESTS = 1`` for serial.
+MAX_CONCURRENT_REQUESTS = 4
+
+# Page responses carry the HUC12 identifier in this column; it must stay a
+# string so leading zeros (e.g. "010900020502") survive the round trip.
+_HUC12_COLUMN = "huc12_id"
+
+
+def get_wateruse(
+ model: str,
+ variable: str | Iterable[str] | None = None,
+ state: str | int | Iterable[str | int] | None = None,
+ county: str | Iterable[str] | None = None,
+ huc: str | Iterable[str] | None = None,
+ time_resolution: str | None = None,
+ start_date: str | None = None,
+ end_date: str | None = None,
+ intersection: str = "overlap",
+ limit: int = 600,
+ ssl_check: bool = True,
+) -> tuple[pd.DataFrame, BaseMetadata]:
+ """Get USGS water-use data from the NWDC web service.
+
+ Retrieves modeled water-use estimates from the USGS National Water
+ Availability Assessment Data Companion. The area is given as exactly one of
+ ``state``, ``county``, or ``huc``; results are always returned on a HUC12
+ grid, in a long (tidy) frame with one row per HUC12 and time step. Large
+ areas (e.g. a whole region or a populous state) are served across multiple
+ pages, which this function follows transparently and concatenates into one
+ frame.
+
+ Each selector also accepts a list of values. The NWDC queries one area per
+ request, so a list is fanned out into one request per value — up to
+ :data:`MAX_CONCURRENT_REQUESTS` in parallel — and the results are
+ concatenated in the order given.
+
+ Parameters
+ ----------
+ model : string
+ Water-use category to query. See :data:`MODELS` for the available
+ options (e.g. ``"wu-public-supply-wd"``). The full catalog of models
+ and their variables is at https://water.usgs.gov/nwaa-data/.
+ variable : string or iterable of strings, optional
+ One or more variable IDs within ``model`` (e.g. ``"pswdtot"`` for total
+ public-supply withdrawals, or ``["pswdgw", "pswdsw"]`` for the
+ groundwater and surface-water components). Multiple variables are
+ comma-joined into a single request. The service requires at least one
+ variable; omitting it returns a 400 listing the model's valid variable
+ IDs (surfaced as a :class:`~dataretrieval.exceptions.DataRetrievalError`).
+ state : string, int, or iterable, optional
+ One or more US states/territories to query. Each accepts a full name
+ (``"Wisconsin"``), a two-letter postal code (``"WI"``), or a two-digit
+ ANSI/FIPS code (``"55"`` or ``55``), mirroring
+ :func:`dataretrieval.ngwmn.get_sites`.
+ county : string or iterable, optional
+ One or more five-digit county FIPS codes — state FIPS + county FIPS,
+ e.g. ``"55025"`` for Dane County, Wisconsin.
+ huc : string or iterable, optional
+ One or more hydrologic unit codes. Each code's level is taken from its
+ length: a 2-digit code queries a HUC2 region, 8-digit a HUC8 subbasin,
+ 12-digit a single HUC12, and so on (even lengths 2-12, e.g. ``"04"``,
+ ``"07070005"``, ``"010900020502"``).
+
+ Provide exactly one of ``state``, ``county``, or ``huc`` (each may be a
+ single value or a list).
+ time_resolution : string, optional
+ Temporal resolution: ``"monthly"``, ``"annualcy"`` (annual, calendar
+ year), or ``"annualwy"`` (annual, water year). See
+ :data:`TIME_RESOLUTIONS`.
+ start_date : string, optional
+ Start of the query window, formatted ``"YYYY"`` for annual data or
+ ``"YYYY-MM"`` for monthly data.
+ end_date : string, optional
+ End of the query window, in the same format as ``start_date``.
+ intersection : string, optional
+ How to select HUC12s that straddle the queried-area boundary:
+ ``"overlap"`` (any overlap, the default) or ``"envelop"`` (fully
+ enclosed).
+ limit : int, optional
+ Maximum number of HUC12s returned per page. Queries spanning more than
+ ``limit`` HUC12s are split across pages and reassembled. Default 600.
+ ssl_check : bool, optional
+ If True (default), verify SSL certificates; set False to skip
+ verification (e.g. behind a TLS-intercepting proxy).
+
+ Returns
+ -------
+ df : ``pandas.DataFrame``
+ Water-use estimates in long form: a ``huc12_id`` column (string,
+ leading zeros preserved), a time column (``year_month`` for monthly
+ data or ``year`` for annual data), and one value column per requested
+ variable (suffixed with its unit, e.g. ``pswdtot_mgd`` for million
+ gallons per day).
+ md : :class:`dataretrieval.utils.BaseMetadata`
+ Metadata describing the request (URL, query time, response headers).
+
+ Raises
+ ------
+ ValueError
+ If not exactly one of ``state``, ``county``, or ``huc`` is given, or a
+ given selector is malformed (an unrecognized state, a county code that
+ is not five digits, or a HUC of invalid length).
+ DataRetrievalError
+ On an HTTP error response, the typed subclass for the status (see
+ :func:`dataretrieval.exceptions.error_for_status`); or
+ :class:`~dataretrieval.exceptions.NetworkError` on a connection-level
+ failure (timeout, DNS).
+
+ Examples
+ --------
+ .. doctest::
+ :skipif: True # network
+
+ >>> from dataretrieval import wateruse
+ >>> df, md = wateruse.get_wateruse(
+ ... model="wu-public-supply-wd",
+ ... variable=["pswdtot", "pswdgw", "pswdsw"],
+ ... state="RI",
+ ... start_date="2020-01",
+ ... time_resolution="monthly",
+ ... )
+
+ """
+ # The public parameters are idiomatic snake_case (consistent with
+ # ``waterdata.get_samples``); the NWDC service expects compact lowercase
+ # query names, so map to those here as the request is built.
+ base_params: dict[str, Any] = {
+ "format": "csv",
+ "model": model,
+ "variable": to_str(variable),
+ "timeres": time_resolution,
+ "startdate": start_date,
+ "enddate": end_date,
+ "intersection": intersection,
+ "limit": limit,
+ }
+ # Drop params the caller left unset; the service rejects empty values.
+ base_params = {k: v for k, v in base_params.items() if v is not None}
+
+ # The NWDC queries one location per request, so fan a multi-value selector
+ # out into one request per location, each paginated by the OGC engine's
+ # shared pager (``_paginate``), and concatenate the results.
+ headers = _default_headers()
+ requests = [
+ httpx.Request(
+ "GET",
+ WATERUSE_URL,
+ params={**base_params, "location": location},
+ headers=headers,
+ )
+ for location in _resolve_locations(state, county, huc)
+ ]
+ # ``_run_sync`` drives the async fan-out via an anyio portal, so it is safe
+ # even inside an already-running event loop (e.g. a Jupyter notebook).
+ # ``error_url`` is the host reported in any connection-error message (this
+ # module builds its own requests, so it has no OGC request-builder base).
+ df, response = _run_sync(
+ lambda: _fan_out(requests, headers, ssl_check),
+ service="wateruse",
+ error_url=WATERUSE_URL,
+ )
+ return df, BaseMetadata(response)
+
+
+# Valid HUC code lengths (digits) → the hydrologic-unit level they query.
+_HUC_LENGTHS = (2, 4, 6, 8, 10, 12)
+
+# Maps each selector to the NWDC ``location=:`` value(s) it produces.
+# A value may be a single code or a list; ``_as_list`` normalizes both (``state``
+# additionally normalizes to the two-letter postal code, and ``to_state`` may
+# itself return a scalar or list, which ``_as_list`` flattens the same way).
+# Since NWDC takes one location per request, a list value fans out — one request
+# per location (see :func:`_fan_out`).
+_LOCATION_BUILDERS: dict[str, Callable[[Any], list[str]]] = {
+ "state": lambda v: [f"stateCd:{c}" for c in _as_list(to_state(v, to="postal"))],
+ "county": lambda v: [f"countyCd:{_validate_county(c)}" for c in _as_list(v)],
+ "huc": lambda v: [f"huc{len(c)}:{c}" for c in map(_validate_huc, _as_list(v))],
+}
+
+
+def _resolve_locations(
+ state: str | int | Iterable[str | int] | None,
+ county: str | Iterable[str] | None,
+ huc: str | Iterable[str] | None,
+) -> list[str]:
+ """Build the NWDC ``location=:`` value(s) from the selectors.
+
+ Exactly one of ``state`` / ``county`` / ``huc`` must be given; each may be a
+ single value or a list. ``state`` is normalized to the two-letter postal
+ code ``stateCd`` requires; ``county`` is a five-digit FIPS code; and a
+ ``huc`` code's length selects its level (``huc2`` … ``huc12``). Returns one
+ location string per value — the caller issues one request per location.
+ """
+ selected = {
+ name: value
+ for name, value in (("state", state), ("county", county), ("huc", huc))
+ if value is not None
+ }
+ if len(selected) != 1:
+ raise ValueError(
+ "Specify exactly one of state, county, or huc "
+ f"(got: {', '.join(selected) or 'none'})."
+ )
+ [(name, value)] = selected.items()
+ locations = _LOCATION_BUILDERS[name](value)
+ if not locations:
+ raise ValueError(
+ "The chosen location selector is empty; pass at least one value."
+ )
+ return locations
+
+
+def _as_list(value: object) -> list[Any]:
+ """A scalar becomes a one-element list; any non-string iterable (list,
+ tuple, Series, ndarray, generator) is materialized to a list. A string is
+ treated as a scalar so it isn't exploded into characters."""
+ if isinstance(value, Iterable) and not isinstance(value, str):
+ return list(value)
+ return [value]
+
+
+def _validate_county(value: object) -> str:
+ """Validate and normalize a five-digit state+county FIPS code."""
+ code = str(value).strip()
+ if not (code.isdigit() and len(code) == 5):
+ raise ValueError(
+ "county must be a five-digit state+county FIPS code "
+ f"(e.g. '55025'), got {value!r}."
+ )
+ return code
+
+
+def _validate_huc(value: object) -> str:
+ """Validate a HUC code (even length 2-12 digits; level set by length)."""
+ code = str(value).strip()
+ if not (code.isdigit() and len(code) in _HUC_LENGTHS):
+ raise ValueError(
+ "huc must be a hydrologic unit code of even length 2-12 digits "
+ f"(e.g. '04', '07070005', '010900020502'), got {value!r}."
+ )
+ return code
+
+
+async def _fan_out(
+ requests: list[httpx.Request], headers: dict[str, str], ssl_check: bool
+) -> tuple[pd.DataFrame, httpx.Response]:
+ """Fetch every request (each paginated) concurrently over one shared client.
+
+ Each request is paginated by the engine's
+ :func:`~dataretrieval.ogc.engine._paginate` with NWDC strategies: parse a CSV
+ page and read its ``Link`` header cursor (``parse``), follow that cursor
+ (``follow``), and raise the typed error carrying the NWDC ``detail``
+ (``raise_for_status``). Concurrency is bounded by a semaphore at
+ :data:`MAX_CONCURRENT_REQUESTS`, and ``asyncio.gather`` preserves input
+ order, so the concatenation is deterministic. The shared
+ :class:`httpx.AsyncClient` keeps connections alive across pages and requests.
+ """
+
+ def parse(response: httpx.Response) -> tuple[pd.DataFrame, str | None]:
+ return _read_csv_page(response), _next_page_url(response)
+
+ async def follow(cursor: str, sess: httpx.AsyncClient) -> httpx.Response:
+ return await sess.get(cursor, headers=headers)
+
+ def raise_for_status(response: httpx.Response) -> None:
+ _raise_for_status(response, detail_from=_nwdc_error_detail)
+
+ async with httpx.AsyncClient(verify=ssl_check, **HTTPX_DEFAULTS) as client:
+ semaphore = asyncio.Semaphore(max(1, MAX_CONCURRENT_REQUESTS))
+
+ async def _one(request: httpx.Request) -> tuple[pd.DataFrame, httpx.Response]:
+ async with semaphore:
+ return await _paginate(
+ request,
+ parse_response=parse,
+ follow_up=follow,
+ client=client,
+ raise_for_status=raise_for_status,
+ )
+
+ results = await asyncio.gather(*(_one(req) for req in requests))
+
+ # Reuse the engine's combine helpers: drop empty frames and concat, and fold
+ # the per-location responses into one (lowest-remaining rate-limit headers +
+ # cumulative elapsed), keeping the first request's URL as the query identity.
+ frames = [frame for frame, _ in results]
+ responses = [resp for _, resp in results]
+ return _combine_chunk_frames(frames), _combine_chunk_responses(
+ responses, str(requests[0].url)
+ )
+
+
+def _read_csv_page(response: httpx.Response) -> pd.DataFrame:
+ """Parse one CSV page; ``huc12_id`` stays a string to keep leading zeros."""
+ try:
+ return pd.read_csv(io.BytesIO(response.content), dtype={_HUC12_COLUMN: str})
+ except pd.errors.EmptyDataError as exc:
+ # NWDC normally signals "no data" with a 400 (handled above) or rows of
+ # zeros, never an empty body — but keep the typed-error contract if it
+ # ever returns one rather than leaking a bare pandas exception.
+ raise DataRetrievalError(
+ f"NWDC returned an empty response body (URL: {response.url})."
+ ) from exc
+
+
+def _next_page_url(response: httpx.Response) -> str | None:
+ """Return the absolute URL of the next page, or None if this is the last.
+
+ Reads the standard ``Link: <...>; rel="next"`` header (parsed by httpx into
+ ``response.links``). A next link served against the bare ``water.usgs.gov``
+ host is normalized to the public ``api.water.usgs.gov`` gateway so the
+ follow-up request reaches the API.
+ """
+ url = response.links.get("next", {}).get("url")
+ if not url:
+ return None
+ return url.replace("https://water.usgs.gov", "https://api.water.usgs.gov", 1)
+
+
+def _nwdc_error_detail(response: httpx.Response) -> str | None:
+ """Pull the ``detail`` message out of an NWDC JSON error envelope, if any.
+
+ The NWDC reports errors as ``{"detail": "Invalid model name: ..."}``. Passed
+ to :func:`~dataretrieval.utils._raise_for_status` as ``detail_from`` so the
+ service's wording surfaces in the typed error message.
+ """
+ try:
+ body = response.json()
+ except ValueError:
+ return None
+ return body.get("detail") if isinstance(body, dict) else None
diff --git a/demos/USGS_WaterUse_Examples.ipynb b/demos/USGS_WaterUse_Examples.ipynb
new file mode 100644
index 00000000..0017f048
--- /dev/null
+++ b/demos/USGS_WaterUse_Examples.ipynb
@@ -0,0 +1,228 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# USGS Water-Use Data (NWDC)\n",
+ "\n",
+ "The USGS [National Water Availability Assessment Data Companion\n",
+ "(NWDC)](https://water.usgs.gov/nwaa-data/) serves national-scale, *modeled*\n",
+ "water-use estimates that underlie the USGS National Water Availability\n",
+ "Assessment. Estimates are produced on a **HUC12** (12-digit hydrologic unit)\n",
+ "grid and can be queried for any county, state, or hydrologic unit. They are\n",
+ "the modern replacement for the retired legacy NWIS water-use service.\n",
+ "\n",
+ "`dataretrieval` exposes the service through a single function,\n",
+ "`wateruse.get_wateruse`, which returns a tidy `pandas.DataFrame` plus a\n",
+ "metadata object. Available **models** (categories) include:\n",
+ "\n",
+ "| model | description |\n",
+ "| --- | --- |\n",
+ "| `wu-public-supply-wd` | public-supply withdrawals |\n",
+ "| `wu-public-supply-cu` | public-supply consumptive use |\n",
+ "| `wu-irrigation-wd` | irrigation withdrawals |\n",
+ "| `wu-irrigation-cu` | irrigation consumptive use |\n",
+ "| `wu-thermoelectric` | thermoelectric-power water use |\n",
+ "\n",
+ "Each model exposes its own set of **variables**; see the\n",
+ "[NWDC data catalog](https://water.usgs.gov/nwaa-data/) for the full list.\n",
+ "\n",
+ "## A motivating question\n",
+ "\n",
+ "> *Where does Wisconsin's public water supply come from — groundwater or\n",
+ "> surface water — and when through the year is demand highest?*\n",
+ "\n",
+ "Answering it is a typical water-use workflow: pull a state's monthly\n",
+ "withdrawals split by source, aggregate the HUC12 grid to a statewide time\n",
+ "series, and look at the seasonal cycle and the groundwater/surface-water\n",
+ "split."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%matplotlib inline\n",
+ "\n",
+ "import matplotlib.pyplot as plt\n",
+ "import pandas as pd\n",
+ "\n",
+ "from dataretrieval import wateruse"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 1. Retrieve the data\n",
+ "\n",
+ "We request three variables for the public-supply-withdrawals model:\n",
+ "`pswdtot` (total), `pswdgw` (groundwater source), and `pswdsw` (surface-water\n",
+ "source), for every HUC12 in Wisconsin, monthly, for calendar year 2020.\n",
+ "\n",
+ "Wisconsin spans more HUC12s than fit on a single page, so the service returns\n",
+ "the result across several pages — `get_wateruse` follows the pagination links\n",
+ "and stitches the pages into one frame for you."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "df, md = wateruse.get_wateruse(\n",
+ " model=\"wu-public-supply-wd\",\n",
+ " variable=[\"pswdtot\", \"pswdgw\", \"pswdsw\"],\n",
+ " state=\"WI\",\n",
+ " start_date=\"2020-01\",\n",
+ " end_date=\"2020-12\",\n",
+ " time_resolution=\"monthly\",\n",
+ ")\n",
+ "\n",
+ "print(f\"{len(df):,} rows across {df['huc12_id'].nunique():,} HUC12 watersheds\")\n",
+ "df.head()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The frame is in **long (tidy) form**: one row per HUC12 and month. The\n",
+ "`huc12_id` column is kept as a string so leading zeros are preserved, and each\n",
+ "value column carries its unit — here `_mgd`, million gallons per day."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 2. Aggregate to a statewide monthly series\n",
+ "\n",
+ "Summing across every HUC12 gives the statewide withdrawal for each month."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "statewide = df.groupby(\"year_month\")[[\"pswdtot_mgd\", \"pswdgw_mgd\", \"pswdsw_mgd\"]].sum()\n",
+ "statewide.index = pd.to_datetime(statewide.index)\n",
+ "statewide"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 3. The seasonal demand cycle\n",
+ "\n",
+ "Public-supply withdrawals peak in summer, when outdoor use (lawns, gardens,\n",
+ "recreation) adds to baseline indoor demand."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "fig, ax = plt.subplots(figsize=(9, 4))\n",
+ "ax.plot(statewide.index, statewide[\"pswdtot_mgd\"], marker=\"o\", color=\"#1f77b4\")\n",
+ "ax.set_title(\"Wisconsin public-supply withdrawals, 2020\")\n",
+ "ax.set_ylabel(\"Withdrawal (million gallons/day)\")\n",
+ "ax.set_xlabel(\"Month\")\n",
+ "ax.grid(True, alpha=0.3)\n",
+ "fig.tight_layout()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## 4. Groundwater vs. surface water\n",
+ "\n",
+ "Because total withdrawal is the sum of its groundwater and surface-water\n",
+ "components, a stacked area shows how each source contributes through the year."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "gw_share = statewide[\"pswdgw_mgd\"].sum() / statewide[\"pswdtot_mgd\"].sum()\n",
+ "print(f\"Groundwater supplies {gw_share:.0%} of Wisconsin's public water supply\")\n",
+ "\n",
+ "fig, ax = plt.subplots(figsize=(9, 4))\n",
+ "ax.stackplot(\n",
+ " statewide.index,\n",
+ " statewide[\"pswdgw_mgd\"],\n",
+ " statewide[\"pswdsw_mgd\"],\n",
+ " labels=[\"Groundwater\", \"Surface water\"],\n",
+ " colors=[\"#8c6d31\", \"#1f77b4\"],\n",
+ " alpha=0.85,\n",
+ ")\n",
+ "ax.set_title(\"Source of Wisconsin public-supply withdrawals, 2020\")\n",
+ "ax.set_ylabel(\"Withdrawal (million gallons/day)\")\n",
+ "ax.set_xlabel(\"Month\")\n",
+ "ax.legend(loc=\"upper left\")\n",
+ "fig.tight_layout()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Notes\n",
+ "\n",
+ "- **Spatial resolution.** Estimates are always returned on the HUC12 grid,\n",
+ " regardless of the area you query. Joining `huc12_id` to a HUC12\n",
+ " boundary layer (for example with `geopandas`) lets you map the results.\n",
+ "- **Pick one selector.** Use exactly one of `state`, `county` (5-digit FIPS),\n",
+ " or `huc`. `state` accepts a name, postal code, or FIPS; a `huc` code's length\n",
+ " sets its level (`huc=\"04\"` → HUC2, `huc=\"07070005\"` → HUC8). Each selector\n",
+ " also accepts a list (e.g. `state=[\"WI\", \"MN\"]`), fanned out into one request\n",
+ " per area and concatenated.\n",
+ "- **Large areas paginate.** A query spanning more than `limit` HUC12s (600 by\n",
+ " default) is split across pages; `get_wateruse` follows the links and returns\n",
+ " a single concatenated frame. Whole-region queries (e.g. `huc=\"04\"`)\n",
+ " may return many pages and take longer.\n",
+ "- **Annual data.** Set `time_resolution=\"annualcy\"` (calendar year) or `\"annualwy\"`\n",
+ " (water year) and use four-digit years for `start_date`/`end_date`; the time\n",
+ " column comes back as `year` instead of `year_month`.\n",
+ "- **Other models.** Swap `model` and `variable` to retrieve irrigation\n",
+ " (`wu-irrigation-wd`) or thermoelectric (`wu-thermoelectric`) water use. See\n",
+ " the [NWDC data catalog](https://water.usgs.gov/nwaa-data/) for the variables\n",
+ " available in each."
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.12.10"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}
diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst
index 0d5b9b6d..13c44963 100644
--- a/docs/source/reference/index.rst
+++ b/docs/source/reference/index.rst
@@ -14,4 +14,5 @@ API reference
streamstats
utils
waterdata
+ wateruse
wqp
diff --git a/docs/source/reference/wateruse.rst b/docs/source/reference/wateruse.rst
new file mode 100644
index 00000000..db4e4962
--- /dev/null
+++ b/docs/source/reference/wateruse.rst
@@ -0,0 +1,7 @@
+.. _wateruse:
+
+dataretrieval.wateruse
+----------------------
+
+.. automodule:: dataretrieval.wateruse
+ :members:
diff --git a/tests/waterdata_chunking_test.py b/tests/waterdata_chunking_test.py
index ca3be951..5a946946 100644
--- a/tests/waterdata_chunking_test.py
+++ b/tests/waterdata_chunking_test.py
@@ -40,7 +40,6 @@
from dataretrieval.ogc import chunking as _chunking
from dataretrieval.ogc import retry as _retry_mod
from dataretrieval.ogc.chunking import (
- _QUOTA_HEADER,
ChunkedCall,
_chunked_client,
get_active_client,
@@ -55,6 +54,7 @@
_LIST_SEP,
_NEVER_CHUNK,
_OR_SEP,
+ _QUOTA_HEADER,
ChunkPlan,
_combine_chunk_frames,
_combine_chunk_responses,
@@ -1040,7 +1040,8 @@ def test_combine_chunk_responses_returns_independent_headers():
)
head = _combine_chunk_responses([r0, r1], canonical_url=None)
- # Aggregate carries the last chunk's headers...
+ # Aggregate carries a chunk's headers (here the last, as the fallback when
+ # neither reports a rate limit)...
assert head.headers["X-Foo"] == "1"
# ...but mutating the aggregate must not back-propagate.
head.headers["X-Trace-Id"] = "abc"
@@ -1048,6 +1049,24 @@ def test_combine_chunk_responses_returns_independent_headers():
assert "X-Trace-Id" not in r0.headers
+def test_combine_chunk_responses_surfaces_lowest_remaining():
+ """``x-ratelimit-remaining`` reports the LOWEST any sub-request saw — the
+ quota actually left after the fan-out — not the last-by-index, which under
+ concurrency need not be the response the server processed last."""
+ r0 = mock.Mock(
+ elapsed=datetime.timedelta(seconds=0.1),
+ headers={"x-ratelimit-remaining": "5"}, # lowest, but first by index
+ url="u0",
+ )
+ r1 = mock.Mock(
+ elapsed=datetime.timedelta(seconds=0.2),
+ headers={"x-ratelimit-remaining": "99"}, # last by index, but higher
+ url="u1",
+ )
+ head = _combine_chunk_responses([r0, r1], canonical_url=None)
+ assert head.headers["x-ratelimit-remaining"] == "5"
+
+
def test_paginate_terminates_on_empty_string_cursor():
"""``_paginate``'s loop predicate is ``while cursor is not None``.
Parse-response wrappers in ``_walk_pages`` / ``stats.get_data``
diff --git a/tests/waterdata_test.py b/tests/waterdata_test.py
index 06289ce4..12727fc6 100644
--- a/tests/waterdata_test.py
+++ b/tests/waterdata_test.py
@@ -1002,8 +1002,8 @@ def test_get_daily_malformed_id_raises(self):
def test_per_item_format_check_in_list(self):
"""The AGENCY-ID format check runs on EVERY element of an
iterable, not just the first. Regression guard against a
- future ``_check_id_format`` loop that bails after one valid
- item or only checks the head."""
+ future ``_check_monitoring_location_id`` loop that bails after one
+ valid item or only checks the head."""
with pytest.raises(ValueError, match="Invalid monitoring_location_id"):
_check_monitoring_location_id(["USGS-01646500", "badformat"])
diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py
index 4d568d1f..b6fd2984 100644
--- a/tests/waterdata_utils_test.py
+++ b/tests/waterdata_utils_test.py
@@ -379,7 +379,7 @@ def test_next_req_url_stops_when_no_features():
def test_walk_pages_does_not_mutate_initial_response():
"""The aggregated response returned from ``_walk_pages`` is built
- via ``_aggregate_paginated_response``, which returns a fresh copy.
+ via ``_merge_response``, which returns a fresh copy.
Any caller that inspected ``initial_response.headers`` /
``.elapsed`` before pagination completed (a Session response hook,
a logging middleware) must continue to see the original first-page
diff --git a/tests/wateruse_test.py b/tests/wateruse_test.py
new file mode 100644
index 00000000..031aad74
--- /dev/null
+++ b/tests/wateruse_test.py
@@ -0,0 +1,416 @@
+"""Offline tests for :mod:`dataretrieval.wateruse`.
+
+All HTTP is mocked with ``pytest-httpx``; no live calls (per AGENTS.md).
+"""
+
+import re
+from urllib.parse import parse_qs, urlsplit
+
+import httpx
+import pandas as pd
+import pytest
+
+import dataretrieval
+from dataretrieval import wateruse
+from dataretrieval.utils import BaseMetadata
+from dataretrieval.wateruse import _next_page_url, _resolve_locations, get_wateruse
+
+# Match the NWDC endpoint regardless of query string, so assertions can drill
+# into the captured params without coupling registration to param order.
+WU_RE = re.compile(r"^https://api\.water\.usgs\.gov/nwaa-data/data(\?.*)?$")
+
+# A single-page monthly CSV: two HUC12s (one with a leading zero), three months.
+_CSV_PAGE = """\
+huc12_id,year_month,pswdgw_mgd,pswdsw_mgd,pswdtot_mgd
+010900020502,2020-01,0.0,0.8313625,0.8313625
+010900020502,2020-02,0.0,0.8977986,0.8977986
+180600060101,2020-01,1.5,0.5,2.0
+"""
+
+# Two pages used for pagination tests; each page is its own CSV (own header).
+_CSV_P1 = """\
+huc12_id,year_month,pswdtot_mgd
+010900020502,2020-01,0.8313625
+010900020503,2020-01,0.0
+"""
+_CSV_P2 = """\
+huc12_id,year_month,pswdtot_mgd
+010900020504,2020-01,1.25
+"""
+
+
+def test_get_wateruse_single_page(httpx_mock):
+ """Happy path: CSV parsed to a long frame; returns (df, BaseMetadata)."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE)
+
+ df, md = get_wateruse(
+ model="wu-public-supply-wd",
+ variable=["pswdtot", "pswdgw", "pswdsw"],
+ state="RI",
+ start_date="2020-01",
+ time_resolution="monthly",
+ )
+
+ assert isinstance(df, pd.DataFrame)
+ assert isinstance(md, BaseMetadata)
+ assert list(df.columns) == [
+ "huc12_id",
+ "year_month",
+ "pswdgw_mgd",
+ "pswdsw_mgd",
+ "pswdtot_mgd",
+ ]
+ assert len(df) == 3
+
+
+def test_huc12_id_kept_as_string_with_leading_zero(httpx_mock):
+ """The HUC12 identifier must not be coerced to int (leading zeros matter)."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE)
+
+ df, _ = get_wateruse(model="wu-public-supply-wd", state="RI")
+
+ # String-typed (object or the pandas StringDtype, depending on version),
+ # never coerced to int — the leading zero must survive.
+ assert pd.api.types.is_string_dtype(df["huc12_id"])
+ assert df["huc12_id"].iloc[0] == "010900020502"
+
+
+def test_variables_are_comma_joined(httpx_mock):
+ """A list of variables is sent as one comma-joined query parameter."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE)
+
+ get_wateruse(
+ model="wu-public-supply-wd",
+ variable=["pswdtot", "pswdgw", "pswdsw"],
+ state="RI",
+ )
+
+ qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query)
+ assert qs["variable"] == ["pswdtot,pswdgw,pswdsw"]
+ assert qs["format"] == ["csv"]
+
+
+def test_unset_params_are_dropped(httpx_mock):
+ """Params left as None are omitted (the service rejects empty values)."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE)
+
+ get_wateruse(model="wu-public-supply-wd", state="RI")
+
+ qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query)
+ assert "enddate" not in qs
+ assert "variable" not in qs
+ assert "timeres" not in qs
+ # Defaulted params are still present.
+ assert qs["intersection"] == ["overlap"]
+ assert qs["limit"] == ["600"]
+
+
+def test_snake_case_date_params_map_to_nwdc_wire_names(httpx_mock):
+ """The public snake_case params (``start_date`` / ``end_date`` /
+ ``time_resolution``) are sent under the NWDC's compact wire names
+ (``startdate`` / ``enddate`` / ``timeres``)."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE)
+
+ get_wateruse(
+ model="wu-public-supply-wd",
+ state="RI",
+ start_date="2020-01",
+ end_date="2020-12",
+ time_resolution="monthly",
+ )
+
+ qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query)
+ assert qs["startdate"] == ["2020-01"]
+ assert qs["enddate"] == ["2020-12"]
+ assert qs["timeres"] == ["monthly"]
+
+
+def test_pagination_follows_link_header_and_concatenates(httpx_mock):
+ """Pages are followed via the ``rel="next"`` Link header and concatenated."""
+ httpx_mock.add_response(
+ method="GET",
+ url=WU_RE,
+ text=_CSV_P1,
+ headers={
+ "link": (
+ "; rel="next"'
+ )
+ },
+ )
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_P2)
+
+ df, _ = get_wateruse(model="wu-public-supply-wd", state="RI")
+
+ # 2 rows from page 1 + 1 row from page 2, reindexed.
+ assert len(df) == 3
+ assert df["huc12_id"].tolist() == [
+ "010900020502",
+ "010900020503",
+ "010900020504",
+ ]
+ assert list(df.index) == [0, 1, 2]
+ assert len(httpx_mock.get_requests()) == 2
+ # The second request carries the Link's ``skip`` offset, not the originals.
+ second_qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[1].url)).query)
+ assert second_qs["skip"] == ["2"]
+
+
+def test_pagination_rewrites_bare_host(httpx_mock):
+ """A next link on the bare ``water.usgs.gov`` host is routed to the API."""
+ httpx_mock.add_response(
+ method="GET",
+ url=WU_RE,
+ text=_CSV_P1,
+ headers={
+ "link": (
+ "; rel="next"'
+ )
+ },
+ )
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_P2)
+
+ get_wateruse(model="wu-public-supply-wd", state="RI")
+
+ second = httpx_mock.get_requests()[1]
+ assert second.url.host == "api.water.usgs.gov"
+
+
+def test_http_error_raises_typed_exception_with_detail(httpx_mock):
+ """A 4xx response surfaces as a typed error carrying the NWDC ``detail``."""
+ httpx_mock.add_response(
+ method="GET",
+ url=WU_RE,
+ status_code=400,
+ json={"detail": "Invalid model name: bad-model"},
+ )
+
+ with pytest.raises(dataretrieval.DataRetrievalError, match="Invalid model name"):
+ get_wateruse(model="bad-model", state="RI")
+
+
+def test_empty_response_body_raises_typed_error(httpx_mock):
+ """An empty 200 body becomes a typed error, not a bare pandas EmptyDataError."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text="")
+
+ with pytest.raises(dataretrieval.DataRetrievalError, match="empty response"):
+ get_wateruse(model="wu-public-supply-wd", state="RI")
+
+
+def test_cyclic_next_link_terminates(httpx_mock):
+ """A non-advancing/cyclic ``next`` cursor must not loop forever."""
+ # Page 1 points to a "next" URL; page 2 points back to that SAME URL.
+ cyclic = (
+ "; rel="next"'
+ )
+ httpx_mock.add_response(
+ method="GET", url=WU_RE, text=_CSV_P1, headers={"link": cyclic}
+ )
+ httpx_mock.add_response(
+ method="GET", url=WU_RE, text=_CSV_P2, headers={"link": cyclic}
+ )
+
+ df, _ = get_wateruse(model="wu-public-supply-wd", state="RI")
+
+ # Fetches page 1 + the cyclic page once, then breaks on the repeat — it must
+ # return (not hang) with the two pages collected.
+ assert len(df) == 3
+ assert len(httpx_mock.get_requests()) == 2
+
+
+def test_uses_shared_default_headers(httpx_mock):
+ """Requests carry the shared dataretrieval User-Agent (per _default_headers)."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE)
+
+ get_wateruse(model="wu-public-supply-wd", state="RI")
+
+ sent = httpx_mock.get_requests()[0]
+ assert sent.headers["User-Agent"].startswith("python-dataretrieval/")
+
+
+def test_state_selector_builds_location_query(httpx_mock):
+ """``state=`` is resolved to the wire ``location=stateCd:`` param."""
+ httpx_mock.add_response(method="GET", url=WU_RE, text=_CSV_PAGE)
+
+ get_wateruse(model="wu-public-supply-wd", state="Rhode Island")
+
+ qs = parse_qs(urlsplit(str(httpx_mock.get_requests()[0].url)).query)
+ assert qs["location"] == ["stateCd:RI"]
+
+
+def test_multiple_states_fan_out_preserves_input_order(httpx_mock):
+ """A list selector fans out one request per location and concatenates the
+ results in the order given — even though the requests run concurrently and
+ may reach the server out of order. Each location is routed to its own
+ response so attribution is deterministic regardless of arrival order."""
+ httpx_mock.add_response(
+ method="GET", url=re.compile(r".*location=stateCd%3ARI.*"), text=_CSV_P1
+ )
+ httpx_mock.add_response(
+ method="GET", url=re.compile(r".*location=stateCd%3AWI.*"), text=_CSV_P2
+ )
+
+ df, _ = get_wateruse(model="wu-public-supply-wd", state=["RI", "Wisconsin"])
+
+ # RI's rows (_CSV_P1) precede WI's (_CSV_P2) regardless of which request the
+ # thread pool dispatched first.
+ assert df["huc12_id"].tolist() == [
+ "010900020502",
+ "010900020503",
+ "010900020504",
+ ]
+ reqs = httpx_mock.get_requests()
+ assert len(reqs) == 2
+ assert {parse_qs(urlsplit(str(r.url)).query)["location"][0] for r in reqs} == {
+ "stateCd:RI",
+ "stateCd:WI",
+ }
+
+
+def test_fan_out_is_serial_when_concurrency_is_one(httpx_mock, monkeypatch):
+ """``MAX_CONCURRENT_REQUESTS = 1`` still fans out correctly (serial path)."""
+ monkeypatch.setattr(wateruse, "MAX_CONCURRENT_REQUESTS", 1)
+ httpx_mock.add_response(
+ method="GET", url=re.compile(r".*location=stateCd%3ARI.*"), text=_CSV_P1
+ )
+ httpx_mock.add_response(
+ method="GET", url=re.compile(r".*location=stateCd%3AWI.*"), text=_CSV_P2
+ )
+
+ df, _ = get_wateruse(model="wu-public-supply-wd", state=["RI", "WI"])
+
+ assert len(df) == 3
+ assert len(httpx_mock.get_requests()) == 2
+
+
+def test_fan_out_surfaces_final_rate_limit_header(httpx_mock):
+ """``md.header`` reports the lowest (latest) remaining quota across the fan-out,
+ not the first request's value."""
+ httpx_mock.add_response(
+ method="GET",
+ url=re.compile(r".*location=stateCd%3ARI.*"),
+ text=_CSV_P1,
+ headers={"x-ratelimit-remaining": "900"},
+ )
+ httpx_mock.add_response(
+ method="GET",
+ url=re.compile(r".*location=stateCd%3AWI.*"),
+ text=_CSV_P2,
+ headers={"x-ratelimit-remaining": "850"},
+ )
+
+ _, md = get_wateruse(model="wu-public-supply-wd", state=["RI", "WI"])
+
+ assert md.header["x-ratelimit-remaining"] == "850"
+
+
+# (response aggregation now reuses ogc.planning._combine_chunk_responses; the
+# integration test above pins the rate-limit-header behavior end-to-end.)
+
+
+# --- _resolve_locations unit tests (no HTTP) -------------------------------
+
+
+def test_resolve_locations_state_accepts_name_postal_fips():
+ # All three encodings normalize to the two-letter postal code stateCd wants.
+ assert _resolve_locations("Rhode Island", None, None) == ["stateCd:RI"]
+ assert _resolve_locations("ri", None, None) == ["stateCd:RI"]
+ assert _resolve_locations("44", None, None) == ["stateCd:RI"]
+ assert _resolve_locations(44, None, None) == ["stateCd:RI"]
+
+
+def test_resolve_locations_county_five_digit_fips():
+ assert _resolve_locations(None, "55025", None) == ["countyCd:55025"]
+
+
+@pytest.mark.parametrize(
+ "code,expected",
+ [
+ ("04", "huc2:04"),
+ ("0109", "huc4:0109"),
+ ("07070005", "huc8:07070005"),
+ ("010900020502", "huc12:010900020502"),
+ ],
+)
+def test_resolve_locations_huc_level_from_length(code, expected):
+ assert _resolve_locations(None, None, code) == [expected]
+
+
+def test_resolve_locations_accepts_lists():
+ assert _resolve_locations(["RI", "Wisconsin"], None, None) == [
+ "stateCd:RI",
+ "stateCd:WI",
+ ]
+ assert _resolve_locations(None, ["55025", "55021"], None) == [
+ "countyCd:55025",
+ "countyCd:55021",
+ ]
+ assert _resolve_locations(None, None, ["04", "070700"]) == [
+ "huc2:04",
+ "huc6:070700",
+ ]
+
+
+def test_resolve_locations_requires_exactly_one():
+ with pytest.raises(ValueError, match="exactly one"):
+ _resolve_locations(None, None, None)
+ with pytest.raises(ValueError, match="exactly one"):
+ _resolve_locations("RI", "55025", None)
+
+
+def test_resolve_locations_empty_list_rejected():
+ with pytest.raises(ValueError, match="empty"):
+ _resolve_locations([], None, None)
+
+
+def test_resolve_locations_rejects_malformed_selectors():
+ with pytest.raises(ValueError): # unrecognized state
+ _resolve_locations("Atlantis", None, None)
+ with pytest.raises(ValueError, match="five-digit"): # county not 5 digits
+ _resolve_locations(None, "025", None)
+ with pytest.raises(ValueError, match="hydrologic unit"): # odd-length huc
+ _resolve_locations(None, None, "123")
+
+
+# --- _next_page_url unit tests (no HTTP) -----------------------------------
+
+
+def test_next_page_url_none_when_no_link():
+ resp = httpx.Response(200, text="")
+ assert _next_page_url(resp) is None
+
+
+def test_next_page_url_none_when_link_has_no_next():
+ resp = httpx.Response(
+ 200,
+ text="",
+ headers={"link": '; rel="prev"'},
+ )
+ assert _next_page_url(resp) is None
+
+
+def test_next_page_url_rewrites_bare_host():
+ resp = httpx.Response(
+ 200,
+ text="",
+ headers={
+ "link": '; rel="next"'
+ },
+ )
+ assert _next_page_url(resp) == (
+ "https://api.water.usgs.gov/nwaa-data/data?skip=600"
+ )
+
+
+def test_next_page_url_leaves_api_host_untouched():
+ url = "https://api.water.usgs.gov/nwaa-data/data?skip=600"
+ resp = httpx.Response(200, text="", headers={"link": f'<{url}>; rel="next"'})
+ # Must not double-prefix into ``api.api.water.usgs.gov``.
+ assert _next_page_url(resp) == url
+
+
+def test_module_exposes_catalog_constants():
+ assert "wu-public-supply-wd" in wateruse.MODELS
+ assert set(wateruse.TIME_RESOLUTIONS) == {"monthly", "annualcy", "annualwy"}