diff --git a/Cargo.toml b/Cargo.toml index 6a189223..8138c820 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,12 @@ serde_urlencoded = { version = "0.7", optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"], optional = true } tracing = { version = "0.1", optional = true } +[target.'cfg(target_family="unix")'.dependencies] +nix = { version = "0.31.1", default-features = false, optional = true } + +[target.'cfg(target_family="windows")'.dependencies] +windows-sys = { version = "0.61.2", default-features = false, features = ["Win32_Foundation"], optional = true } + [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.31.1", features = ["fs"] } @@ -77,7 +83,7 @@ futures-channel = {version = "0.3", features = ["sink"]} default = ["fs"] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded", "tokio"] azure = ["cloud", "httparse"] -fs = ["walkdir", "tokio"] +fs = ["walkdir", "tokio", "nix", "windows-sys"] gcp = ["cloud", "rustls-pki-types"] aws = ["cloud", "md-5"] http = ["cloud"] diff --git a/src/local.rs b/src/local.rs index a12a775a..2710711f 100644 --- a/src/local.rs +++ b/src/local.rs @@ -114,6 +114,9 @@ pub(crate) enum Error { #[error("Filenames containing trailing '/#\\d+/' are not supported: {}", path)] InvalidPath { path: String }, + #[error("Unable to sync data to file {}: {}", path.display(), source)] + UnableToSyncFile { source: io::Error, path: PathBuf }, + #[error("Upload aborted")] Aborted, } @@ -137,6 +140,32 @@ impl From for super::Error { } } +/// Explicitly close a file, checking for errors that would be silently ignored by Rust's `File::drop()`. +/// +/// On network filesystems (e.g. NFS), `close()` can fail and indicate data loss. +fn close_file(file: File) -> std::result::Result<(), io::Error> { + #[cfg(target_family = "unix")] + { + nix::unistd::close(file).map_err(|e| e.into()) + } + #[cfg(target_family = "windows")] + { + use std::os::windows::io::IntoRawHandle; + + let handle = file.into_raw_handle(); + // SAFETY: `handle` is a valid, owned handle obtained from `into_raw_handle()`. + match unsafe { windows_sys::Win32::Foundation::CloseHandle(handle) } { + 0 => Err(io::Error::last_os_error()), + _ => Ok(()), + } + } + #[cfg(not(any(target_family = "unix", target_family = "windows")))] + { + drop(file); + Ok(()) + } +} + /// Local filesystem storage providing an [`ObjectStore`] interface to files on /// local disk. Can optionally be created with a directory prefix /// @@ -202,6 +231,8 @@ pub struct LocalFileSystem { config: Arc, // if you want to delete empty directories when deleting files automatic_cleanup: bool, + // if true, call fsync on files and directories after writes + fsync: bool, } #[derive(Debug)] @@ -229,6 +260,7 @@ impl LocalFileSystem { root: Url::parse("file:///").unwrap(), }), automatic_cleanup: false, + fsync: false, } } @@ -247,6 +279,7 @@ impl LocalFileSystem { root: absolute_path_to_url(path)?, }), automatic_cleanup: false, + fsync: false, }) } @@ -260,6 +293,20 @@ impl LocalFileSystem { self.automatic_cleanup = automatic_cleanup; self } + + /// Enable fsync after writes to ensure durability + /// + /// When enabled, [`LocalFileSystem`] will call [`File::sync_all`] on written files + /// and fsync parent directories after write operations ([`put_opts`](ObjectStore::put_opts), + /// [`copy_opts`](ObjectStore::copy_opts), [`rename_opts`](ObjectStore::rename_opts), + /// and multipart upload completion), ensuring that when an operation returns success, + /// both the file contents and the directory entries are durable on stable storage. + /// + /// This is disabled by default. + pub fn with_fsync(mut self, fsync: bool) -> Self { + self.fsync = fsync; + self + } } impl Config { @@ -348,8 +395,9 @@ impl ObjectStore for LocalFileSystem { } let path = self.path_to_filesystem(location)?; + let fsync = self.fsync; maybe_spawn_blocking(move || { - let (mut file, staging_path) = new_staged_upload(&path)?; + let (mut file, staging_path) = new_staged_upload(&path, fsync)?; let mut e_tag = None; let err = match payload.iter().try_for_each(|x| file.write_all(x)) { @@ -359,19 +407,34 @@ impl ObjectStore for LocalFileSystem { path: path.to_string_lossy().to_string(), })?; e_tag = Some(get_etag(&metadata)); + if fsync { + file.sync_all().map_err(|source| Error::UnableToSyncFile { + source, + path: staging_path.clone(), + })?; + } + // Explicitly close the file, checking for errors that would be silently ignored by drop. + // On network filesystems (e.g. NFS), close can fail and indicate data loss. + // + // This also ensures the file is closed before rename, which is required by some FUSE + // filesystems (e.g. Blobfuse) to trigger the upload operation. + close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source })?; match opts.mode { - PutMode::Overwrite => { - // For some fuse types of file systems, the file must be closed first - // to trigger the upload operation, and then renamed, such as Blobfuse - std::mem::drop(file); - match std::fs::rename(&staging_path, &path) { - Ok(_) => None, - Err(source) => Some(Error::UnableToRenameFile { source }), + PutMode::Overwrite => match std::fs::rename(&staging_path, &path) { + Ok(_) => { + if fsync { + fsync_parent_dir(&path)?; + } + None } - } + Err(source) => Some(Error::UnableToRenameFile { source }), + }, PutMode::Create => match std::fs::hard_link(&staging_path, &path) { Ok(_) => { let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup + if fsync { + fsync_parent_dir(&path)?; + } None } Err(source) => match source.kind() { @@ -414,8 +477,8 @@ impl ObjectStore for LocalFileSystem { } let dest = self.path_to_filesystem(location)?; - let (file, src) = new_staged_upload(&dest)?; - Ok(Box::new(LocalUpload::new(src, dest, file))) + let (file, src) = new_staged_upload(&dest, self.fsync)?; + Ok(Box::new(LocalUpload::new(src, dest, file, self.fsync))) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -549,6 +612,7 @@ impl ObjectStore for LocalFileSystem { let from = self.path_to_filesystem(from)?; let to = self.path_to_filesystem(to)?; + let fsync = self.fsync; match mode { CopyMode::Overwrite => { @@ -564,15 +628,25 @@ impl ObjectStore for LocalFileSystem { let staged = staged_upload_path(&to, &id.to_string()); match std::fs::hard_link(&from, &staged) { Ok(_) => { - return std::fs::rename(&staged, &to).map_err(|source| { - let _ = std::fs::remove_file(&staged); // Attempt to clean up - Error::UnableToCopyFile { from, to, source }.into() - }); + match std::fs::rename(&staged, &to) { + Ok(_) => { + if fsync { + fsync_parent_dir(&to)?; + } + return Ok(()); + } + Err(source) => { + let _ = std::fs::remove_file(&staged); // Attempt to clean up + return Err( + Error::UnableToCopyFile { from, to, source }.into() + ); + } + } } Err(source) => match source.kind() { ErrorKind::AlreadyExists => id += 1, ErrorKind::NotFound => match from.exists() { - true => create_parent_dirs(&to, source)?, + true => create_parent_dirs(&to, source, fsync)?, false => { return Err(Error::NotFound { path: from, source }.into()); } @@ -590,7 +664,12 @@ impl ObjectStore for LocalFileSystem { maybe_spawn_blocking(move || { loop { match std::fs::hard_link(&from, &to) { - Ok(_) => return Ok(()), + Ok(_) => { + if fsync { + fsync_parent_dir(&to)?; + } + return Ok(()); + } Err(source) => match source.kind() { ErrorKind::AlreadyExists => { return Err(Error::AlreadyExists { @@ -600,7 +679,7 @@ impl ObjectStore for LocalFileSystem { .into()); } ErrorKind::NotFound => match from.exists() { - true => create_parent_dirs(&to, source)?, + true => create_parent_dirs(&to, source, fsync)?, false => { return Err(Error::NotFound { path: from, source }.into()); } @@ -628,13 +707,22 @@ impl ObjectStore for LocalFileSystem { RenameTargetMode::Overwrite => { let from = self.path_to_filesystem(from)?; let to = self.path_to_filesystem(to)?; + let fsync = self.fsync; maybe_spawn_blocking(move || { loop { match std::fs::rename(&from, &to) { - Ok(_) => return Ok(()), + Ok(_) => { + if fsync { + fsync_parent_dir(&to)?; + if from.parent() != to.parent() { + fsync_parent_dir(&from)?; + } + } + return Ok(()); + } Err(source) => match source.kind() { ErrorKind::NotFound => match from.exists() { - true => create_parent_dirs(&to, source)?, + true => create_parent_dirs(&to, source, fsync)?, false => { return Err(Error::NotFound { path: from, source }.into()); } @@ -786,23 +874,69 @@ impl LocalFileSystem { } /// Creates the parent directories of `path` or returns an error based on `source` if no parent -fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> { +/// +/// When `fsync` is true, fsyncs each newly created directory and the first pre-existing +/// ancestor to ensure the new directory entries are durable. +fn create_parent_dirs(path: &std::path::Path, source: io::Error, fsync: bool) -> Result<()> { let parent = path.parent().ok_or_else(|| { let path = path.to_path_buf(); Error::UnableToCreateFile { path, source } })?; - std::fs::create_dir_all(parent).map_err(|source| { - let path = parent.into(); - Error::UnableToCreateDir { source, path } + if fsync { + let mut first_existing = parent; + while !first_existing.exists() { + first_existing = first_existing.parent().unwrap_or(first_existing); + } + + std::fs::create_dir_all(parent).map_err(|source| { + let path = parent.into(); + Error::UnableToCreateDir { source, path } + })?; + + let mut dir = parent; + loop { + fsync_dir(dir)?; + if dir == first_existing { + break; + } + dir = match dir.parent() { + Some(p) => p, + None => break, + }; + } + } else { + std::fs::create_dir_all(parent).map_err(|source| { + let path = parent.into(); + Error::UnableToCreateDir { source, path } + })?; + } + Ok(()) +} + +/// Fsyncs a directory to ensure its entries are durable +fn fsync_dir(dir_path: &std::path::Path) -> Result<()> { + let dir = File::open(dir_path).map_err(|source| Error::UnableToOpenFile { + source, + path: dir_path.into(), + })?; + dir.sync_all().map_err(|source| Error::UnableToSyncFile { + source, + path: dir_path.into(), })?; Ok(()) } +/// Fsyncs the parent directory of `path` to ensure directory entry durability +fn fsync_parent_dir(path: &std::path::Path) -> Result<()> { + let parent = path.parent().unwrap_or(path); + fsync_dir(parent) +} + /// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path` /// /// Creates any directories if necessary -fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> { +fn new_staged_upload(base: &std::path::Path, fsync: bool) -> Result<(File, PathBuf)> { let mut multipart_id = 1; loop { let suffix = multipart_id.to_string(); @@ -812,7 +946,7 @@ fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> { Ok(f) => return Ok((f, path)), Err(source) => match source.kind() { ErrorKind::AlreadyExists => multipart_id += 1, - ErrorKind::NotFound => create_parent_dirs(&path, source)?, + ErrorKind::NotFound => create_parent_dirs(&path, source, fsync)?, _ => return Err(Error::UnableToOpenFile { source, path }.into()), }, } @@ -835,23 +969,26 @@ struct LocalUpload { src: Option, /// The next offset to write into the file offset: u64, + /// Whether to fsync on complete + fsync: bool, } #[derive(Debug)] struct UploadState { dest: PathBuf, - file: Mutex, + file: Mutex>, } impl LocalUpload { - pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self { + pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, fsync: bool) -> Self { Self { state: Arc::new(UploadState { dest, - file: Mutex::new(file), + file: Mutex::new(Some(file)), }), src: Some(src), offset: 0, + fsync, } } } @@ -864,7 +1001,8 @@ impl MultipartUpload for LocalUpload { let s = Arc::clone(&self.state); maybe_spawn_blocking(move || { - let mut file = s.file.lock(); + let mut guard = s.file.lock(); + let file = guard.as_mut().ok_or(Error::Aborted)?; file.seek(SeekFrom::Start(offset)).map_err(|source| { let path = s.dest.clone(); Error::Seek { source, path } @@ -882,16 +1020,38 @@ impl MultipartUpload for LocalUpload { async fn complete(&mut self) -> Result { let src = self.src.take().ok_or(Error::Aborted)?; let s = Arc::clone(&self.state); + let fsync = self.fsync; maybe_spawn_blocking(move || { // Ensure no inflight writes - let file = s.file.lock(); - std::fs::rename(&src, &s.dest) - .map_err(|source| Error::UnableToRenameFile { source })?; + let mut guard = s.file.lock(); + let file = guard.take().ok_or(Error::Aborted)?; + + if fsync { + file.sync_all().map_err(|source| Error::UnableToSyncFile { + source, + path: src.clone(), + })?; + } + let metadata = file.metadata().map_err(|e| Error::Metadata { source: e.into(), path: src.to_string_lossy().to_string(), })?; + // Explicitly close the file, checking for errors that would be silently ignored by drop. + // On network filesystems (e.g. NFS), close can fail and indicate data loss. + // + // This also ensures the file is closed before rename, which is required by some FUSE + // filesystems (e.g. Blobfuse) to trigger the upload operation. + close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source })?; + + std::fs::rename(&src, &s.dest) + .map_err(|source| Error::UnableToRenameFile { source })?; + + if fsync { + fsync_parent_dir(&s.dest)?; + } + Ok(PutResult { e_tag: Some(get_etag(&metadata)), version: None, @@ -1847,6 +2007,47 @@ mod tests { integration.delete(&location).await.unwrap(); assert!(fs::read_dir(root.path()).unwrap().count() == 0); } + + #[test] + #[cfg(target_family = "unix")] + fn test_close_file_detects_error_unix() { + use std::os::fd::FromRawFd; + use std::os::unix::io::AsRawFd; + + let file = tempfile::tempfile().unwrap(); + + // Close and reclaim a File from the now-invalid fd + let file = { + let fd = file.as_raw_fd(); + super::close_file(file).unwrap(); + unsafe { std::fs::File::from_raw_fd(fd) } + }; + + let err = super::close_file(file).unwrap_err(); + assert_eq!(err.raw_os_error(), Some(nix::libc::EBADF), "got: {err:?}"); + } + + #[test] + #[cfg(target_family = "windows")] + fn test_close_file_detects_error_windows() { + use std::os::windows::io::{AsRawHandle, FromRawHandle}; + + let file = tempfile::tempfile().unwrap(); + + // Close and reclaim a File from the now-invalid handle + let file = { + let handle = file.as_raw_handle(); + super::close_file(file).unwrap(); + unsafe { std::fs::File::from_raw_handle(handle) } + }; + + let err = super::close_file(file).unwrap_err(); + assert_eq!( + err.raw_os_error(), + Some(windows_sys::Win32::Foundation::ERROR_INVALID_HANDLE as i32), + "got: {err:?}" + ); + } } #[cfg(not(target_arch = "wasm32"))]