Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 10 additions & 41 deletions dataretrieval/ogc/chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,14 @@
import functools
import os
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from contextvars import ContextVar, copy_context
from contextvars import copy_context
from typing import Any, cast

import httpx
import pandas as pd
from anyio.from_thread import start_blocking_portal

from dataretrieval.utils import HTTPX_DEFAULTS
from dataretrieval.utils import HTTPX_DEFAULTS, Ambient

from . import progress as _progress
from .interruptions import (
Expand Down Expand Up @@ -106,9 +105,6 @@
_OGC_URL_BYTE_LIMIT = 8000


# Response header USGS uses to advertise remaining hourly quota.
_QUOTA_HEADER = "x-ratelimit-remaining"

# Fan-out concurrency cap, read at call time (not import) so test
# ``monkeypatch.setenv`` applies. Value grammar in :func:`_read_concurrency_env`;
# the concurrency model is in the module docstring.
Expand Down Expand Up @@ -152,38 +148,11 @@ def _read_concurrency_env() -> int | None:
return value


# Shared per-call ``httpx.AsyncClient``, published via :func:`_publish`
# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``)
# reuse the same connection pool across every sub-request. ``None``
# outside a chunked call — paginated helpers then open their own
# short-lived client.
_chunked_client: ContextVar[httpx.AsyncClient | None] = ContextVar(
"_chunked_client", default=None
)


@contextmanager
def _publish(client: httpx.AsyncClient) -> Iterator[None]:
"""
Publish ``client`` on the ``_chunked_client`` ContextVar so the
paginated-loop helpers can borrow it via :func:`get_active_client`
for the duration of the ``with`` block.

Parameters
----------
client : httpx.AsyncClient
The client to publish.

Yields
------
None
Yields once, for the duration of the bind.
"""
token = _chunked_client.set(client)
try:
yield
finally:
_chunked_client.reset(token)
# Shared per-call ``httpx.AsyncClient``, scoped via ``with _chunked_client(c):``
# during ``ChunkedCall._run`` so paginated-loop helpers (``_walk_pages``) reuse
# the same connection pool across every sub-request. ``None`` outside a chunked
# call — paginated helpers then open their own short-lived client.
_chunked_client: Ambient[httpx.AsyncClient | None] = Ambient("_chunked_client", None)


def get_active_client() -> httpx.AsyncClient | None:
Expand All @@ -197,8 +166,8 @@ def get_active_client() -> httpx.AsyncClient | None:
Returns
-------
httpx.AsyncClient or None
The client published via :func:`_publish` if currently inside a
:class:`ChunkedCall` run; ``None`` otherwise.
The client scoped via ``with _chunked_client(...)`` if currently inside
a :class:`ChunkedCall` run; ``None`` otherwise.
"""
return _chunked_client.get()

Expand Down Expand Up @@ -541,7 +510,7 @@ async def _run(self, max_concurrent: int | None) -> tuple[pd.DataFrame, Any]:
)

async with httpx.AsyncClient(limits=limits, **HTTPX_DEFAULTS) as client:
with _publish(client):
with _chunked_client(client):
reporter = _progress.current()
if reporter is not None:
reporter.set_chunks(self.plan.total)
Expand Down
Loading