Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ serde_urlencoded = { version = "0.7", optional = true }
tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"], optional = true }
tracing = { version = "0.1", optional = true }

[target.'cfg(target_family="unix")'.dependencies]
libc = { version = "0.2", optional = true }

[target.'cfg(target_family="windows")'.dependencies]
windows-sys = { version = "0.61.2", default-features = false, features = ["Win32_Foundation"], optional = true }

[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.31.1", features = ["fs"] }

Expand All @@ -77,7 +83,7 @@ futures-channel = {version = "0.3", features = ["sink"]}
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded", "tokio"]
azure = ["cloud", "httparse"]
fs = ["walkdir", "tokio"]
fs = ["walkdir", "tokio", "libc", "windows-sys"]
gcp = ["cloud", "rustls-pki-types"]
aws = ["cloud", "md-5"]
http = ["cloud"]
Expand Down
140 changes: 123 additions & 17 deletions src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,37 @@ impl From<Error> for super::Error {
}
}

/// Explicitly close a file, checking for errors that would be silently ignored by Rust's `File::drop()`.
///
/// On network filesystems (e.g. NFS), `close()` can fail and indicate data loss.
fn close_file(file: File) -> std::result::Result<(), io::Error> {
#[cfg(target_family = "unix")]
{
use std::os::unix::io::IntoRawFd;
let fd = file.into_raw_fd();
// SAFETY: `fd` is a valid, owned file descriptor obtained from `into_raw_fd()`.
match unsafe { libc::close(fd) } {
0 => Ok(()),
_ => Err(io::Error::last_os_error()),
}
}
#[cfg(target_family = "windows")]
{
use std::os::windows::io::IntoRawHandle;
let handle = file.into_raw_handle();
// SAFETY: `handle` is a valid, owned handle obtained from `into_raw_handle()`.
match unsafe { windows_sys::Win32::Foundation::CloseHandle(handle as _) } {
0 => Err(io::Error::last_os_error()),
_ => Ok(()),
}
}
#[cfg(not(any(target_family = "unix", target_family = "windows")))]
{
drop(file);
Ok(())
}
}

/// Local filesystem storage providing an [`ObjectStore`] interface to files on
/// local disk. Can optionally be created with a directory prefix
///
Expand Down Expand Up @@ -202,6 +233,7 @@ pub struct LocalFileSystem {
config: Arc<Config>,
// if you want to delete empty directories when deleting files
automatic_cleanup: bool,
sync_on_close: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -229,6 +261,7 @@ impl LocalFileSystem {
root: Url::parse("file:///").unwrap(),
}),
automatic_cleanup: false,
sync_on_close: false,
}
}

Expand All @@ -247,6 +280,7 @@ impl LocalFileSystem {
root: absolute_path_to_url(path)?,
}),
automatic_cleanup: false,
sync_on_close: false,
})
}

Expand All @@ -260,6 +294,17 @@ impl LocalFileSystem {
self.automatic_cleanup = automatic_cleanup;
self
}

/// Calls [`File::sync_all`] before closing files written by this store.
///
/// This provides stronger durability guarantees at the cost of performance,
/// and is particularly important on network filesystems.
///
/// Default: `false`
pub fn with_sync_on_close(mut self, sync: bool) -> Self {
self.sync_on_close = sync;
self
}
}

impl Config {
Expand Down Expand Up @@ -348,6 +393,7 @@ impl ObjectStore for LocalFileSystem {
}

let path = self.path_to_filesystem(location)?;
let sync_on_close = self.sync_on_close;
maybe_spawn_blocking(move || {
let (mut file, staging_path) = new_staged_upload(&path)?;
let mut e_tag = None;
Expand All @@ -359,16 +405,21 @@ impl ObjectStore for LocalFileSystem {
path: path.to_string_lossy().to_string(),
})?;
e_tag = Some(get_etag(&metadata));
if sync_on_close {
file.sync_all()
.map_err(|source| Error::UnableToCopyDataToFile { source })?;
}
// Explicitly close the file, checking for errors that would be silently ignored by drop.
// On network filesystems (e.g. NFS), close can fail and indicate data loss.
//
// This also ensures the file is closed before rename, which is required by some FUSE
// filesystems (e.g. Blobfuse) to trigger the upload operation.
close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source })?;
match opts.mode {
PutMode::Overwrite => {
// For some fuse types of file systems, the file must be closed first
// to trigger the upload operation, and then renamed, such as Blobfuse
std::mem::drop(file);
match std::fs::rename(&staging_path, &path) {
Ok(_) => None,
Err(source) => Some(Error::UnableToRenameFile { source }),
}
}
PutMode::Overwrite => match std::fs::rename(&staging_path, &path) {
Ok(_) => None,
Err(source) => Some(Error::UnableToRenameFile { source }),
},
PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
Ok(_) => {
let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
Expand Down Expand Up @@ -415,7 +466,12 @@ impl ObjectStore for LocalFileSystem {

let dest = self.path_to_filesystem(location)?;
let (file, src) = new_staged_upload(&dest)?;
Ok(Box::new(LocalUpload::new(src, dest, file)))
Ok(Box::new(LocalUpload::new(
src,
dest,
file,
self.sync_on_close,
)))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -840,15 +896,17 @@ struct LocalUpload {
#[derive(Debug)]
struct UploadState {
dest: PathBuf,
file: Mutex<File>,
file: Mutex<Option<File>>,
sync_on_close: bool,
}

impl LocalUpload {
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, sync_on_close: bool) -> Self {
Self {
state: Arc::new(UploadState {
dest,
file: Mutex::new(file),
file: Mutex::new(Some(file)),
sync_on_close,
}),
src: Some(src),
offset: 0,
Expand All @@ -864,7 +922,8 @@ impl MultipartUpload for LocalUpload {

let s = Arc::clone(&self.state);
maybe_spawn_blocking(move || {
let mut file = s.file.lock();
let mut guard = s.file.lock();
let file = guard.as_mut().ok_or(Error::Aborted)?;
file.seek(SeekFrom::Start(offset)).map_err(|source| {
let path = s.dest.clone();
Error::Seek { source, path }
Expand All @@ -884,14 +943,29 @@ impl MultipartUpload for LocalUpload {
let s = Arc::clone(&self.state);
maybe_spawn_blocking(move || {
// Ensure no inflight writes
let file = s.file.lock();
std::fs::rename(&src, &s.dest)
.map_err(|source| Error::UnableToRenameFile { source })?;
let mut guard = s.file.lock();
let file = guard.take().ok_or(Error::Aborted)?;

if s.sync_on_close {
file.sync_all()
.map_err(|source| Error::UnableToCopyDataToFile { source })?;
}

let metadata = file.metadata().map_err(|e| Error::Metadata {
source: e.into(),
path: src.to_string_lossy().to_string(),
})?;

// Explicitly close the file, checking for errors that would be silently ignored by drop.
// On network filesystems (e.g. NFS), close can fail and indicate data loss.
//
// This also ensures the file is closed before rename, which is required by some FUSE
// filesystems (e.g. Blobfuse) to trigger the upload operation.
close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source })?;

std::fs::rename(&src, &s.dest)
.map_err(|source| Error::UnableToRenameFile { source })?;

Ok(PutResult {
e_tag: Some(get_etag(&metadata)),
version: None,
Expand Down Expand Up @@ -1282,6 +1356,26 @@ mod tests {
put_opts(&integration, false).await;
}

#[tokio::test]
#[cfg(target_family = "unix")]
async fn file_test_sync_on_close() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path())
.unwrap()
.with_sync_on_close(true);

put_get_delete_list(&integration).await;
list_with_offset_exclusivity(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
copy_if_not_exists(&integration).await;
copy_rename_nonexistent_object(&integration).await;
stream_get(&integration).await;
put_opts(&integration, false).await;
}

#[test]
#[cfg(target_family = "unix")]
fn test_non_tokio() {
Expand Down Expand Up @@ -1913,4 +2007,16 @@ mod unix_test {

spawned.await.unwrap();
}

#[test]
fn test_close_file_detects_error() {
use std::os::unix::io::AsRawFd;

let file = tempfile::tempfile().unwrap();
let fd = file.as_raw_fd();
// Close the fd behind Rust's back so close_file will get EBADF
assert_eq!(unsafe { libc::close(fd) }, 0);
let err = super::close_file(file).unwrap_err();
assert_eq!(err.raw_os_error(), Some(libc::EBADF));
}
}