diff --git a/Cargo.toml b/Cargo.toml index 81cdf66d..cf33c8c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,12 @@ serde_urlencoded = { version = "0.7", optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"], optional = true } tracing = { version = "0.1", optional = true } +[target.'cfg(target_family="unix")'.dependencies] +libc = { version = "0.2", optional = true } + +[target.'cfg(target_family="windows")'.dependencies] +windows-sys = { version = "0.61.2", default-features = false, features = ["Win32_Foundation"], optional = true } + [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.31.1", features = ["fs"] } @@ -77,7 +83,7 @@ futures-channel = {version = "0.3", features = ["sink"]} default = ["fs"] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded", "tokio"] azure = ["cloud", "httparse"] -fs = ["walkdir", "tokio"] +fs = ["walkdir", "tokio", "libc", "windows-sys"] gcp = ["cloud", "rustls-pki-types"] aws = ["cloud", "md-5"] http = ["cloud"] diff --git a/src/local.rs b/src/local.rs index 45f35f3b..3eb03851 100644 --- a/src/local.rs +++ b/src/local.rs @@ -137,6 +137,37 @@ impl From for super::Error { } } +/// Explicitly close a file, checking for errors that would be silently ignored by Rust's `File::drop()`. +/// +/// On network filesystems (e.g. NFS), `close()` can fail and indicate data loss. +fn close_file(file: File) -> std::result::Result<(), io::Error> { + #[cfg(target_family = "unix")] + { + use std::os::unix::io::IntoRawFd; + let fd = file.into_raw_fd(); + // SAFETY: `fd` is a valid, owned file descriptor obtained from `into_raw_fd()`. + match unsafe { libc::close(fd) } { + 0 => Ok(()), + _ => Err(io::Error::last_os_error()), + } + } + #[cfg(target_family = "windows")] + { + use std::os::windows::io::IntoRawHandle; + let handle = file.into_raw_handle(); + // SAFETY: `handle` is a valid, owned handle obtained from `into_raw_handle()`. + match unsafe { windows_sys::Win32::Foundation::CloseHandle(handle as _) } { + 0 => Err(io::Error::last_os_error()), + _ => Ok(()), + } + } + #[cfg(not(any(target_family = "unix", target_family = "windows")))] + { + drop(file); + Ok(()) + } +} + /// Local filesystem storage providing an [`ObjectStore`] interface to files on /// local disk. Can optionally be created with a directory prefix /// @@ -202,6 +233,7 @@ pub struct LocalFileSystem { config: Arc, // if you want to delete empty directories when deleting files automatic_cleanup: bool, + sync_on_close: bool, } #[derive(Debug)] @@ -229,6 +261,7 @@ impl LocalFileSystem { root: Url::parse("file:///").unwrap(), }), automatic_cleanup: false, + sync_on_close: false, } } @@ -247,6 +280,7 @@ impl LocalFileSystem { root: absolute_path_to_url(path)?, }), automatic_cleanup: false, + sync_on_close: false, }) } @@ -260,6 +294,17 @@ impl LocalFileSystem { self.automatic_cleanup = automatic_cleanup; self } + + /// Calls [`File::sync_all`] before closing files written by this store. + /// + /// This provides stronger durability guarantees at the cost of performance, + /// and is particularly important on network filesystems. + /// + /// Default: `false` + pub fn with_sync_on_close(mut self, sync: bool) -> Self { + self.sync_on_close = sync; + self + } } impl Config { @@ -348,6 +393,7 @@ impl ObjectStore for LocalFileSystem { } let path = self.path_to_filesystem(location)?; + let sync_on_close = self.sync_on_close; maybe_spawn_blocking(move || { let (mut file, staging_path) = new_staged_upload(&path)?; let mut e_tag = None; @@ -359,16 +405,21 @@ impl ObjectStore for LocalFileSystem { path: path.to_string_lossy().to_string(), })?; e_tag = Some(get_etag(&metadata)); + if sync_on_close { + file.sync_all() + .map_err(|source| Error::UnableToCopyDataToFile { source })?; + } + // Explicitly close the file, checking for errors that would be silently ignored by drop. + // On network filesystems (e.g. NFS), close can fail and indicate data loss. + // + // This also ensures the file is closed before rename, which is required by some FUSE + // filesystems (e.g. Blobfuse) to trigger the upload operation. + close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source })?; match opts.mode { - PutMode::Overwrite => { - // For some fuse types of file systems, the file must be closed first - // to trigger the upload operation, and then renamed, such as Blobfuse - std::mem::drop(file); - match std::fs::rename(&staging_path, &path) { - Ok(_) => None, - Err(source) => Some(Error::UnableToRenameFile { source }), - } - } + PutMode::Overwrite => match std::fs::rename(&staging_path, &path) { + Ok(_) => None, + Err(source) => Some(Error::UnableToRenameFile { source }), + }, PutMode::Create => match std::fs::hard_link(&staging_path, &path) { Ok(_) => { let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup @@ -415,7 +466,12 @@ impl ObjectStore for LocalFileSystem { let dest = self.path_to_filesystem(location)?; let (file, src) = new_staged_upload(&dest)?; - Ok(Box::new(LocalUpload::new(src, dest, file))) + Ok(Box::new(LocalUpload::new( + src, + dest, + file, + self.sync_on_close, + ))) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -840,15 +896,17 @@ struct LocalUpload { #[derive(Debug)] struct UploadState { dest: PathBuf, - file: Mutex, + file: Mutex>, + sync_on_close: bool, } impl LocalUpload { - pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self { + pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, sync_on_close: bool) -> Self { Self { state: Arc::new(UploadState { dest, - file: Mutex::new(file), + file: Mutex::new(Some(file)), + sync_on_close, }), src: Some(src), offset: 0, @@ -864,7 +922,8 @@ impl MultipartUpload for LocalUpload { let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { - let mut file = s.file.lock(); + let mut guard = s.file.lock(); + let file = guard.as_mut().ok_or(Error::Aborted)?; file.seek(SeekFrom::Start(offset)).map_err(|source| { let path = s.dest.clone(); Error::Seek { source, path } @@ -884,14 +943,29 @@ impl MultipartUpload for LocalUpload { let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { // Ensure no inflight writes - let file = s.file.lock(); - std::fs::rename(&src, &s.dest) - .map_err(|source| Error::UnableToRenameFile { source })?; + let mut guard = s.file.lock(); + let file = guard.take().ok_or(Error::Aborted)?; + + if s.sync_on_close { + file.sync_all() + .map_err(|source| Error::UnableToCopyDataToFile { source })?; + } + let metadata = file.metadata().map_err(|e| Error::Metadata { source: e.into(), path: src.to_string_lossy().to_string(), })?; + // Explicitly close the file, checking for errors that would be silently ignored by drop. + // On network filesystems (e.g. NFS), close can fail and indicate data loss. + // + // This also ensures the file is closed before rename, which is required by some FUSE + // filesystems (e.g. Blobfuse) to trigger the upload operation. + close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source })?; + + std::fs::rename(&src, &s.dest) + .map_err(|source| Error::UnableToRenameFile { source })?; + Ok(PutResult { e_tag: Some(get_etag(&metadata)), version: None, @@ -1282,6 +1356,26 @@ mod tests { put_opts(&integration, false).await; } + #[tokio::test] + #[cfg(target_family = "unix")] + async fn file_test_sync_on_close() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()) + .unwrap() + .with_sync_on_close(true); + + put_get_delete_list(&integration).await; + list_with_offset_exclusivity(&integration).await; + get_opts(&integration).await; + list_uses_directories_correctly(&integration).await; + list_with_delimiter(&integration).await; + rename_and_copy(&integration).await; + copy_if_not_exists(&integration).await; + copy_rename_nonexistent_object(&integration).await; + stream_get(&integration).await; + put_opts(&integration, false).await; + } + #[test] #[cfg(target_family = "unix")] fn test_non_tokio() { @@ -1913,4 +2007,16 @@ mod unix_test { spawned.await.unwrap(); } + + #[test] + fn test_close_file_detects_error() { + use std::os::unix::io::AsRawFd; + + let file = tempfile::tempfile().unwrap(); + let fd = file.as_raw_fd(); + // Close the fd behind Rust's back so close_file will get EBADF + assert_eq!(unsafe { libc::close(fd) }, 0); + let err = super::close_file(file).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(libc::EBADF)); + } }