Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 5 additions & 38 deletions obstore/python/obstore/_buffered.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ from contextlib import AbstractAsyncContextManager, AbstractContextManager

from ._attributes import Attributes
from ._bytes import Bytes
from ._list import ObjectMeta
from ._store import ObjectStore

if sys.version_info >= (3, 11):
Expand All @@ -16,16 +15,12 @@ if sys.version_info >= (3, 12):
else:
from typing_extensions import Buffer

if sys.version_info >= (3, 13):
from warnings import deprecated
else:
from typing_extensions import deprecated

def open_reader(
store: ObjectStore,
path: str,
*,
buffer_size: int = 1024 * 1024,
size: int | None = None,
) -> ReadableFile:
"""Open a readable file object from the specified location.

Expand All @@ -35,6 +30,9 @@ def open_reader(

Keyword Args:
buffer_size: The minimum number of bytes to read in a single request. Up to `buffer_size` bytes will be buffered in memory.
size: Optional byte size of the object. When provided, skips the HEAD request used to fetch the file size. Useful for callers that already know the size from external metadata.

The caller is responsible for accuracy: a value larger than the actual file surfaces as a read-time range error, a value smaller causes silent truncation. Defaults to `None`.

Returns:
ReadableFile
Expand All @@ -46,6 +44,7 @@ async def open_reader_async(
path: str,
*,
buffer_size: int = 1024 * 1024,
size: int | None = None,
) -> AsyncReadableFile:
"""Call `open_reader` asynchronously, returning a readable file object with asynchronous operations.

Expand Down Expand Up @@ -92,22 +91,6 @@ class ReadableFile:
This is currently a no-op.
"""

@property
@deprecated(
"`ReadableFile.meta` is deprecated and will be removed in a future release. "
"Use the `head` or `head_async` methods directly if you need object metadata.",
)
def meta(self) -> ObjectMeta:
"""Access the metadata of the underlying file.

!!! warning "Deprecated"

This attribute is deprecated and will be removed in a future
release. Use the [`head`][obstore.head] or
[`head_async`][obstore.head_async] methods directly if you need
object metadata.
"""

def read(self, size: int | None = None, /) -> Bytes:
"""Read up to `size` bytes from the object and return them.

Expand Down Expand Up @@ -186,22 +169,6 @@ class AsyncReadableFile:
This is currently a no-op.
"""

@property
@deprecated(
"`AsyncReadableFile.meta` is deprecated and will be removed in a future release. "
"Use the `head` or `head_async` methods directly if you need object metadata.",
)
def meta(self) -> ObjectMeta:
"""Access the metadata of the underlying file.

!!! warning "Deprecated"

This attribute is deprecated and will be removed in a future
release. Use the [`head`][obstore.head] or
[`head_async`][obstore.head_async] methods directly if you need
object metadata.
"""

async def read(self, size: int | None = None, /) -> Bytes:
"""Read up to `size` bytes from the object and return them.

Expand Down
7 changes: 6 additions & 1 deletion obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,12 @@ def __init__( # noqa: PLR0913

if self.mode == "rb":
buffer_size = 1024 * 1024 if buffer_size is None else buffer_size
self._reader = open_reader(store, path, buffer_size=buffer_size)
self._reader = open_reader(
store,
path,
buffer_size=buffer_size,
size=self.size,
)
elif self.mode == "wb":
buffer_size = 10 * 1024 * 1024 if buffer_size is None else buffer_size
self._writer = open_writer(
Expand Down
60 changes: 31 additions & 29 deletions obstore/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use bytes::Bytes;
use object_store::buffered::{BufReader, BufWriter};
use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
use pyo3::exceptions::{PyDeprecationWarning, PyIOError, PyStopAsyncIteration, PyStopIteration};
use pyo3::exceptions::{PyIOError, PyStopAsyncIteration, PyStopIteration};
use pyo3::prelude::*;
use pyo3::types::PyString;
use pyo3::{intern, IntoPyObjectExt};
Expand All @@ -15,62 +15,75 @@ use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Line
use tokio::sync::Mutex;

use crate::attributes::PyAttributes;
use crate::list::PyObjectMeta;
use crate::tags::PyTagSet;

#[pyfunction]
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024))]
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024, size=None))]
pub(crate) fn open_reader(
py: Python,
store: PyObjectStore,
path: PyPath,
buffer_size: usize,
size: Option<u64>,
) -> PyObjectStoreResult<PyReadableFile> {
let store = store.into_inner();
let runtime = get_runtime();
let (reader, meta) = py.detach(|| runtime.block_on(create_reader(store, path, buffer_size)))?;
Ok(PyReadableFile::new(reader, meta, false))
let (reader, resolved_size) =
py.detach(|| runtime.block_on(create_reader(store, path, buffer_size, size)))?;
Ok(PyReadableFile::new(reader, resolved_size, false))
}

#[pyfunction]
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024))]
#[pyo3(signature = (store, path, *, buffer_size=1024 * 1024, size=None))]
pub(crate) fn open_reader_async(
py: Python,
store: PyObjectStore,
path: PyPath,
buffer_size: usize,
size: Option<u64>,
) -> PyResult<Bound<PyAny>> {
let store = store.into_inner();
future_into_py(py, async move {
let (reader, meta) = create_reader(store, path, buffer_size).await?;
Ok(PyReadableFile::new(reader, meta, true))
let (reader, resolved_size) = create_reader(store, path, buffer_size, size).await?;
Ok(PyReadableFile::new(reader, resolved_size, true))
})
}

async fn create_reader(
store: Arc<dyn ObjectStore>,
path: PyPath,
capacity: usize,
) -> PyObjectStoreResult<(BufReader, ObjectMeta)> {
let meta = store
.head(path.as_ref())
.await
.map_err(PyObjectStoreError::ObjectStoreError)?;
Ok((BufReader::with_capacity(store, &meta, capacity), meta))
size: Option<u64>,
) -> PyObjectStoreResult<(BufReader, u64)> {
let meta = match size {
Some(size) => ObjectMeta {
location: path.as_ref().clone(),
last_modified: Default::default(),
size,
e_tag: None,
version: None,
},
None => store
.head(path.as_ref())
.await
.map_err(PyObjectStoreError::ObjectStoreError)?,
};
let size = meta.size;
Ok((BufReader::with_capacity(store, &meta, capacity), size))
}

#[pyclass(name = "ReadableFile", frozen)]
pub(crate) struct PyReadableFile {
reader: Arc<Mutex<BufReader>>,
meta: ObjectMeta,
size: u64,
r#async: bool,
}

impl PyReadableFile {
fn new(reader: BufReader, meta: ObjectMeta, r#async: bool) -> Self {
fn new(reader: BufReader, size: u64, r#async: bool) -> Self {
Self {
reader: Arc::new(Mutex::new(reader)),
meta,
size,
r#async,
}
}
Expand All @@ -91,17 +104,6 @@ impl PyReadableFile {
// `Option<Arc<Mutex<BufReader>>>`.
fn close(&self) {}

#[getter]
fn meta(&self, py: Python) -> PyResult<PyObjectMeta> {
let warnings_mod = py.import(intern!(py, "warnings"))?;
let warning = PyDeprecationWarning::new_err(
"The `meta` attribute is deprecated and will be removed in a future release. \
Use the `head` or `head_async` methods directly if you need object metadata.",
);
warnings_mod.call_method1(intern!(py, "warn"), (warning,))?;
Ok(self.meta.clone().into())
}

#[pyo3(signature = (size = None, /))]
fn read<'py>(&'py self, py: Python<'py>, size: Option<usize>) -> PyResult<Bound<'py, PyAny>> {
let reader = self.reader.clone();
Expand Down Expand Up @@ -179,7 +181,7 @@ impl PyReadableFile {

#[getter]
fn size(&self) -> u64 {
self.meta.size
self.size
}

fn tell<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
Expand Down
70 changes: 55 additions & 15 deletions tests/test_buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,28 +114,68 @@ async def test_read_past_eof_async():
assert memoryview(expected) == memoryview(buffer)


def test_readable_file_meta_emits_deprecation_warning():
def test_open_reader_size_hint_sync():
store = MemoryStore()
data = b"x" * 1000
path = "sized.bin"
obs.put(store, path, b"x" * 100)

file = obs.open_reader(store, path)
with pytest.warns(DeprecationWarning, match="`meta` attribute is deprecated"):
meta = file.meta
obs.put(store, path, data)

assert meta["size"] == 100
assert meta["path"] == path
file = obs.open_reader(store, path, size=len(data))
assert file.size == len(data)
assert memoryview(data) == memoryview(file.read())


@pytest.mark.asyncio
async def test_async_readable_file_meta_emits_deprecation_warning():
async def test_open_reader_size_hint_async():
store = MemoryStore()
data = b"x" * 1000
path = "sized.bin"
await obs.put_async(store, path, b"x" * 100)
await obs.put_async(store, path, data)

file = await obs.open_reader_async(store, path)
with pytest.warns(DeprecationWarning, match="`meta` attribute is deprecated"):
meta = file.meta
file = await obs.open_reader_async(store, path, size=len(data))
assert file.size == len(data)
assert memoryview(data) == memoryview(await file.read())


def test_open_reader_size_hint_larger_than_actual_errors_on_read():
store = MemoryStore()
data = b"x" * 1000
path = "sized.bin"
obs.put(store, path, data)

file = obs.open_reader(store, path, size=5000)
assert file.size == 5000
with pytest.raises(OSError, match="range"):
file.read()


def test_open_reader_size_hint_smaller_than_actual_truncates():
store = MemoryStore()
data = b"x" * 1000
path = "sized.bin"
obs.put(store, path, data)

file = obs.open_reader(store, path, size=500)
assert file.size == 500
buffer = file.read()
assert memoryview(data[:500]) == memoryview(buffer)


def test_open_reader_size_hint_zero_byte_file():
store = MemoryStore()
path = "empty.bin"
obs.put(store, path, b"")

assert meta["size"] == 100
assert meta["path"] == path
file = obs.open_reader(store, path, size=0)
assert file.size == 0
assert memoryview(b"") == memoryview(file.read())


def test_open_reader_no_longer_exposes_meta():
store = MemoryStore()
data = b"x" * 1000
path = "sized.bin"
obs.put(store, path, data)

file = obs.open_reader(store, path)
assert not hasattr(file, "meta")
17 changes: 17 additions & 0 deletions tests/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,23 @@ async def test_info_synthesizes_directory_for_trailing_slash_query():
assert mock_construct.call_count == 0


def test_buffered_file_forwards_size_to_open_reader():
register("file")
fs: FsspecStore = fsspec.filesystem("file", asynchronous=False)

with TemporaryDirectory() as tmp:
path = Path(tmp) / "sized.bin"
path.write_bytes(b"x" * 1000)

file = fs._open(str(path), mode="rb", size=500)

assert file.size == 500
assert file._reader.size == 500

data = file.read()
assert len(data) == 500


def test_construct_store_cache_diff_bucket_name(
minio_bucket: tuple[S3Config, ClientConfig],
):
Expand Down
Loading