diff --git a/src/azure/mod.rs b/src/azure/mod.rs index e6b9b9c2..e059939f 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -127,11 +127,20 @@ impl ObjectStore for MicrosoftAzure { prefix: Option<&Path>, offset: &Path, ) -> BoxStream<'static, Result> { - if self.client.config().is_emulator { - // Azurite doesn't support the startFrom query parameter, + let disable_start_from = self.client.config().is_emulator + || self + .client + .config() + .service + .host_str() + .is_some_and(|h| h.ends_with(".fabric.microsoft.com")); + + if disable_start_from { + // Azurite and OneLake don't support the startFrom query parameter, // fall back to client-side filtering // // See https://github.com/Azure/Azurite/issues/2619#issuecomment-3660701055 + // See https://github.com/apache/arrow-rs-object-store/issues/695 let offset = offset.clone(); self.list(prefix) .try_filter(move |f| futures_util::future::ready(f.location > offset)) @@ -410,6 +419,134 @@ mod tests { assert_eq!(data, loaded); } + /// Verifies that `list_with_offset` works against OneLake (Fabric) endpoints. + /// + /// OneLake silently ignores the `startFrom` query parameter when using + /// friendly-name URLs (e.g. `.../MyWorkspace/lakehouse.Lakehouse/...`), + /// returning 200 OK with zero results. + /// GUID-based URLs handle `startFrom` correctly. + /// + /// Set these env vars before running: + /// - `AZURE_STORAGE_TOKEN`: bearer token (e.g. from `az account get-access-token`) + /// - `ONELAKE_URL`: full OneLake URL with friendly names, e.g. + /// `https://onelake.blob.fabric.microsoft.com//.Lakehouse/` + /// + /// See + #[ignore = "Used for manual testing against a real OneLake endpoint."] + #[tokio::test] + async fn test_onelake_list_with_offset() { + let url = std::env::var("ONELAKE_URL").unwrap(); + let token = std::env::var("AZURE_STORAGE_TOKEN").unwrap(); + + let store = MicrosoftAzureBuilder::new() + .with_url(&url) + .with_config(AzureConfigKey::Token, token) + .build() + .unwrap(); + + // Derive a writable path prefix from the URL + // (skip workspace segment which becomes the container) + let parsed: Url = url.parse().unwrap(); + let mut segments = parsed.path_segments().unwrap(); + let _workspace = segments.next().unwrap(); + let base: String = segments.collect::>().join("/"); + let test_dir = format!("{base}/test_onelake_offset"); + + // Create test files with predictable ordering + let prefix = Path::from(test_dir.as_str()); + let files: Vec = (b'a'..=b'e') + .map(|c| Path::from(format!("{test_dir}/file_{}.txt", c as char))) + .collect(); + let data = Bytes::from("test data"); + for file in &files { + store.put(file, data.clone().into()).await.unwrap(); + } + + // Test 1: Offset at file_b → should return c, d, e (not b) + let offset = Path::from(format!("{test_dir}/file_b.txt")); + let result: Vec = store + .list_with_offset(Some(&prefix), &offset) + .map_ok(|m| m.location) + .try_collect() + .await + .unwrap(); + assert!( + !result.contains(&offset), + "offset file_b should be excluded, got: {result:?}" + ); + assert_eq!( + result.len(), + 3, + "expected c/d/e after file_b, got: {result:?}" + ); + + // Test 2: Offset at file_a → should return b, c, d, e + let offset = Path::from(format!("{test_dir}/file_a.txt")); + let result: Vec = store + .list_with_offset(Some(&prefix), &offset) + .map_ok(|m| m.location) + .try_collect() + .await + .unwrap(); + assert!( + !result.contains(&offset), + "offset file_a should be excluded" + ); + assert_eq!( + result.len(), + 4, + "expected b/c/d/e after file_a, got: {result:?}" + ); + + // Test 3: Offset at file_e (last) → should return empty + let offset = Path::from(format!("{test_dir}/file_e.txt")); + let result: Vec = store + .list_with_offset(Some(&prefix), &offset) + .map_ok(|m| m.location) + .try_collect() + .await + .unwrap(); + assert!( + result.is_empty(), + "offset at last file should return empty, got: {result:?}" + ); + + // Test 4: Offset before all files → should return all 5 + let offset = Path::from(format!("{test_dir}/file")); + let result: Vec = store + .list_with_offset(Some(&prefix), &offset) + .map_ok(|m| m.location) + .try_collect() + .await + .unwrap(); + assert_eq!( + result.len(), + 5, + "offset before all files should return all, got: {result:?}" + ); + + // Test 5: Every returned entry is strictly greater than offset + let offset = Path::from(format!("{test_dir}/file_c.txt")); + let result: Vec = store + .list_with_offset(Some(&prefix), &offset) + .try_collect() + .await + .unwrap(); + for meta in &result { + assert!( + meta.location > offset, + "entry {} should be > offset {}", + meta.location, + offset + ); + } + + // Cleanup + for file in &files { + let _ = store.delete(file).await; + } + } + #[test] fn azure_test_config_get_value() { let azure_client_id = "object_store:fake_access_key_id".to_string();