From a3e27849b113c577570e537f630833ff73689cd5 Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Mon, 7 Jul 2025 19:51:58 -0700 Subject: [PATCH 1/3] add conditional deletes --- src/aws/mod.rs | 35 +++++++- src/azure/client.rs | 57 ++++++++++++- src/azure/mod.rs | 10 ++- src/chunked.rs | 8 +- src/gcp/client.rs | 32 ++++++- src/gcp/mod.rs | 10 ++- src/http/mod.rs | 11 ++- src/lib.rs | 182 +++++++++++++++++++++++++++++++++++++++- src/limit.rs | 11 ++- src/local.rs | 22 ++++- src/memory.rs | 29 ++++++- src/prefix.rs | 7 +- src/throttle.rs | 11 ++- tests/get_range_file.rs | 4 + 14 files changed, 395 insertions(+), 34 deletions(-) diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 8dac2bd7..5ef14bb6 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -31,7 +31,7 @@ use async_trait::async_trait; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; -use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH}; +use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH, IF_UNMODIFIED_SINCE}; use reqwest::{Method, StatusCode}; use std::{sync::Arc, time::Duration}; use url::Url; @@ -44,9 +44,9 @@ use crate::multipart::{MultipartStore, PartId}; use crate::signer::Signer; use crate::util::STRICT_ENCODE_SET; use crate::{ - Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, - ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - UploadPart, + DeleteOptions, Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, + ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, + Result, UploadPart, }; static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); @@ -255,6 +255,33 @@ impl ObjectStore for AmazonS3 { Ok(()) } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + let request = self.client.request(Method::DELETE, location); + + // Add conditional headers if specified + let request = if let Some(if_match) = &opts.if_match { + request.header(&IF_MATCH, if_match) + } else { + request + }; + + let request = if let Some(if_unmodified_since) = opts.if_unmodified_since { + request.header(&IF_UNMODIFIED_SINCE, &if_unmodified_since.to_rfc2822()) + } else { + request + }; + + // AWS S3 supports versioned deletes + let request = if let Some(version) = &opts.version { + request.query(&[("versionId", version)]) + } else { + request + }; + + request.with_extensions(opts.extensions).send().await?; + Ok(()) + } + fn delete_stream<'a>( &'a self, locations: BoxStream<'a, Result>, diff --git a/src/azure/client.rs b/src/azure/client.rs index c7440a07..1e42feaf 100644 --- a/src/azure/client.rs +++ b/src/azure/client.rs @@ -28,8 +28,8 @@ use crate::list::{PaginatedListOptions, PaginatedListResult}; use crate::multipart::PartId; use crate::util::{deserialize_rfc1123, GetRange}; use crate::{ - Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, + Attribute, Attributes, ClientOptions, DeleteOptions, GetOptions, ListResult, ObjectMeta, Path, + PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, }; use async_trait::async_trait; use base64::prelude::{BASE64_STANDARD, BASE64_STANDARD_NO_PAD}; @@ -37,7 +37,10 @@ use base64::Engine; use bytes::{Buf, Bytes}; use chrono::{DateTime, Utc}; use http::{ - header::{HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH}, + header::{ + HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH, + IF_UNMODIFIED_SINCE, + }, HeaderName, Method, }; use rand::Rng as _; @@ -655,6 +658,54 @@ impl AzureClient { Ok(()) } + /// Make an Azure Delete request with conditional options + pub(crate) async fn delete_request_with_opts( + &self, + path: &Path, + opts: DeleteOptions, + ) -> Result<()> { + let credential = self.get_credential().await?; + let url = self.config.path_url(path); + + let sensitive = credential + .as_deref() + .map(|c| c.sensitive_request()) + .unwrap_or_default(); + + let mut builder = self + .client + .delete(url.as_str()) + .header(&DELETE_SNAPSHOTS, "include"); + + // Add conditional headers if specified + if let Some(if_match) = &opts.if_match { + builder = builder.header(IF_MATCH, if_match); + } + + if let Some(if_unmodified_since) = opts.if_unmodified_since { + builder = builder.header(IF_UNMODIFIED_SINCE, &if_unmodified_since.to_rfc2822()); + } + + // Azure supports versioned deletes via x-ms-version-id header + if let Some(version) = &opts.version { + builder = builder.header("x-ms-version-id", version); + } + + builder + .extensions(opts.extensions) + .with_azure_authorization(&credential, &self.config.account) + .retryable(&self.config.retry_config) + .sensitive(sensitive) + .send() + .await + .map_err(|source| { + let path = path.as_ref().into(); + Error::DeleteRequest { source, path } + })?; + + Ok(()) + } + fn build_bulk_delete_body( &self, boundary: &str, diff --git a/src/azure/mod.rs b/src/azure/mod.rs index f65bf9f3..ea25f38e 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -26,8 +26,8 @@ use crate::{ multipart::{MultipartStore, PartId}, path::Path, signer::Signer, - GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, + DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use futures::stream::{BoxStream, StreamExt, TryStreamExt}; @@ -120,6 +120,12 @@ impl ObjectStore for MicrosoftAzure { self.client.delete_request(location, &()).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + // Azure delete_request accepts a query parameter, but we need to handle DeleteOptions differently + // We'll need to update the client to have a specific method for conditional deletes + self.client.delete_request_with_opts(location, opts).await + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.client.list(prefix) } diff --git a/src/chunked.rs b/src/chunked.rs index 8af3b2c4..7cf31100 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -28,8 +28,8 @@ use futures::StreamExt; use crate::path::Path; use crate::{ - GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutResult, + DeleteOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutResult, }; use crate::{PutPayload, Result}; @@ -150,6 +150,10 @@ impl ObjectStore for ChunkedStore { self.inner.delete(location).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + self.inner.delete_opts(location, opts).await + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.inner.list(prefix) } diff --git a/src/gcp/client.rs b/src/gcp/client.rs index a988cc45..cd680841 100644 --- a/src/gcp/client.rs +++ b/src/gcp/client.rs @@ -32,8 +32,8 @@ use crate::multipart::PartId; use crate::path::Path; use crate::util::hex_encode; use crate::{ - Attribute, Attributes, ClientOptions, GetOptions, MultipartId, PutMode, PutMultipartOptions, - PutOptions, PutPayload, PutResult, Result, RetryConfig, + Attribute, Attributes, ClientOptions, DeleteOptions, GetOptions, MultipartId, PutMode, + PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, }; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; @@ -563,6 +563,34 @@ impl GoogleCloudStorageClient { Ok(()) } + pub(crate) async fn delete_request_with_opts( + &self, + path: &Path, + opts: DeleteOptions, + ) -> Result<()> { + let mut request = self.request(Method::DELETE, path); + + // Add conditional headers if specified + if let Some(if_match) = &opts.if_match { + request = request.header(&HeaderName::from_static("if-match"), if_match); + } + + if let Some(if_unmodified_since) = opts.if_unmodified_since { + request = request.header( + &HeaderName::from_static("if-unmodified-since"), + &if_unmodified_since.to_rfc2822(), + ); + } + + // GCS supports versioned deletes via generation parameter + if let Some(version) = &opts.version { + request = request.query(&[("generation", version)]); + } + + request.with_extensions(opts.extensions).send().await?; + Ok(()) + } + /// Perform a copy request pub(crate) async fn copy_request( &self, diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index 442b24fe..39dc2081 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -41,9 +41,9 @@ use crate::client::CredentialProvider; use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{ - multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - UploadPart, + multipart::PartId, path::Path, DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, + PutResult, Result, UploadPart, }; use async_trait::async_trait; use client::GoogleCloudStorageClient; @@ -184,6 +184,10 @@ impl ObjectStore for GoogleCloudStorage { self.client.delete_request(location).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + self.client.delete_request_with_opts(location, opts).await + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.client.list(prefix) } diff --git a/src/http/mod.rs b/src/http/mod.rs index 8581f923..e9406aa5 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -45,9 +45,9 @@ use crate::client::{http_connector, HttpConnector}; use crate::http::client::Client; use crate::path::Path; use crate::{ - ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - RetryConfig, + ClientConfigKey, ClientOptions, DeleteOptions, GetOptions, GetResult, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, + PutResult, Result, RetryConfig, }; mod client; @@ -136,6 +136,11 @@ impl ObjectStore for HttpStore { self.client.delete(location).await } + async fn delete_opts(&self, _location: &Path, _opts: DeleteOptions) -> Result<()> { + // HTTP/WebDAV protocol doesn't support conditional deletes + Err(crate::Error::NotImplemented) + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default(); let prefix = prefix.cloned(); diff --git a/src/lib.rs b/src/lib.rs index 06edd33c..b9ecc28d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,8 +46,8 @@ //! 2. Production quality, leading this crate to be used in large //! scale production systems, such as [crates.io] and [InfluxDB IOx] //! -//! 3. Support for advanced functionality, including atomic, conditional reads -//! and writes, vectored IO, bulk deletion, and more... +//! 3. Support for advanced functionality, including atomic, conditional reads, +//! writes, and deletes, vectored IO, bulk deletion, and more... //! //! 4. Stable and predictable governance via the [Apache Arrow] project //! @@ -486,6 +486,45 @@ //! [Apache Iceberg]: https://iceberg.apache.org/ //! [Delta Lake]: https://delta.io/ //! +//! # Conditional Delete +//! +//! Similar to conditional put operations, conditional deletes allow you to delete objects +//! only when certain preconditions are met. This is useful for preventing the deletion of +//! objects that have been recently modified, implementing garbage collection with safety +//! checks, or coordinating deletions in distributed systems. +//! +//! ``` +//! # use object_store::{Error, ObjectStore, DeleteOptions}; +//! # use std::sync::Arc; +//! # use object_store::memory::InMemory; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(InMemory::new()) +//! # } +//! # async fn conditional_delete() -> Result<(), Box> { +//! let store = get_object_store(); +//! let path = Path::from("archived-data.parquet"); +//! +//! // Get metadata for the object we want to delete +//! let meta = store.head(&path).await?; +//! +//! // Only delete if the object hasn't been modified (using ETag) +//! let delete_opts = DeleteOptions { +//! if_match: meta.e_tag, +//! ..Default::default() +//! }; +//! +//! match store.delete_opts(&path, delete_opts).await { +//! Ok(_) => println!("Object safely deleted"), +//! Err(Error::Precondition { .. }) => { +//! println!("Object was modified, skipping deletion"); +//! } +//! Err(e) => return Err(e.into()), +//! } +//! # Ok(()) +//! # } +//! ``` +//! //! # TLS Certificates //! //! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their [CA] @@ -671,7 +710,33 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { } /// Delete the object at the specified location. - async fn delete(&self, location: &Path) -> Result<()>; + async fn delete(&self, location: &Path) -> Result<()> { + self.delete_opts(location, DeleteOptions::default()).await + } + + /// Delete the object at the specified location with options. + /// + /// This method provides conditional delete capabilities through the [`DeleteOptions`] parameter. + /// It allows for atomic delete operations based on preconditions such as ETag matching or + /// modification time checks. + /// + /// # Example + /// + /// ```no_run + /// # use object_store::{ObjectStore, DeleteOptions}; + /// # use object_store::path::Path; + /// # async fn example(store: &dyn ObjectStore) -> Result<(), Box> { + /// // Only delete if the object hasn't changed (matches the ETag) + /// let meta = store.head(&Path::from("file.txt")).await?; + /// let opts = DeleteOptions { + /// if_match: meta.e_tag, + /// ..Default::default() + /// }; + /// store.delete_opts(&Path::from("file.txt"), opts).await?; + /// # Ok(()) + /// # } + /// ``` + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()>; /// Delete all the objects at the specified locations /// @@ -856,6 +921,10 @@ macro_rules! as_ref_impl { self.as_ref().delete(location).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + self.as_ref().delete_opts(location, opts).await + } + fn delete_stream<'a>( &'a self, locations: BoxStream<'a, Result>, @@ -1234,6 +1303,72 @@ impl From for PutOptions { } } +/// Options for a delete request +#[derive(Debug, Clone, Default)] +pub struct DeleteOptions { + /// Delete will succeed if the `ObjectMeta::e_tag` matches + /// otherwise returning [`Error::Precondition`] + /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-Match: "xyzzy" + /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-Match: * + /// ``` + pub if_match: Option, + /// Delete will succeed if the object has not been modified since + /// otherwise returning [`Error::Precondition`] + /// + /// Some stores may only return `Precondition` for exact + /// timestamp matches, instead of for any timestamp greater than or equal. + /// + /// + pub if_unmodified_since: Option>, + /// Request a particular object version be deleted + /// + /// Cloud providers often support object versioning, allowing multiple versions of the same object + /// to exist within the same bucket. This option allows specifying a specific version to delete. + /// If not specified, the latest version is typically deleted (though behavior may vary by provider). + pub version: Option, + /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations + /// that need to pass context-specific information (like tracing spans) via trait methods. + /// + /// These extensions are ignored entirely by backends offered through this crate. + pub extensions: ::http::Extensions, +} + +impl DeleteOptions { + /// Returns an error if the preconditions on this request are not satisfied + pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> { + // The use of the invalid etag "*" means no ETag is equivalent to never matching + let etag = meta.e_tag.as_deref().unwrap_or("*"); + let last_modified = meta.last_modified; + + if let Some(m) = &self.if_match { + if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) { + return Err(Error::Precondition { + path: meta.location.to_string(), + source: format!("{etag} does not match {m}").into(), + }); + } + } + + if let Some(date) = self.if_unmodified_since { + if last_modified > date { + return Err(Error::Precondition { + path: meta.location.to_string(), + source: format!("{date} < {last_modified}").into(), + }); + } + } + + Ok(()) + } +} + // See . #[doc(hidden)] #[deprecated(note = "Use PutMultipartOptions", since = "0.13.0")] @@ -1449,6 +1584,47 @@ mod tests { } pub(crate) use maybe_skip_integration; + #[tokio::test] + async fn test_delete_opts_precondition() { + use crate::memory::InMemory; + + let store = InMemory::new(); + let path = Path::from("test.txt"); + let data = Bytes::from("test data"); + + // Put an object + let result = store.put(&path, data.into()).await.unwrap(); + let etag = result.e_tag.unwrap(); + + // Test delete with matching ETag - should succeed + let opts = DeleteOptions { + if_match: Some(etag.clone()), + ..Default::default() + }; + store.delete_opts(&path, opts).await.unwrap(); + + // Verify object is deleted + let err = store.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. })); + + // Put object again + store + .put(&path, Bytes::from("test data 2").into()) + .await + .unwrap(); + + // Test delete with non-matching ETag - should fail + let opts = DeleteOptions { + if_match: Some("wrong-etag".to_string()), + ..Default::default() + }; + let err = store.delete_opts(&path, opts).await.unwrap_err(); + assert!(matches!(err, Error::Precondition { .. })); + + // Verify object still exists + store.get(&path).await.unwrap(); + } + /// Test that the returned stream does not borrow the lifetime of Path fn list_store<'a>( store: &'a dyn ObjectStore, diff --git a/src/limit.rs b/src/limit.rs index 85714967..debf4d17 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -18,9 +18,9 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, StreamExt, - UploadPart, + BoxStream, DeleteOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + StreamExt, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -137,6 +137,11 @@ impl ObjectStore for LimitStore { self.inner.delete(location).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.delete_opts(location, opts).await + } + fn delete_stream<'a>( &'a self, locations: BoxStream<'a, Result>, diff --git a/src/local.rs b/src/local.rs index 3404bc89..672e599a 100644 --- a/src/local.rs +++ b/src/local.rs @@ -37,9 +37,9 @@ use crate::{ maybe_spawn_blocking, path::{absolute_path_to_url, Path}, util::InvalidGetRange, - Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - UploadPart, + Attributes, DeleteOptions, GetOptions, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, + PutResult, Result, UploadPart, }; /// A specialized `Error` for filesystem object store-related errors @@ -483,6 +483,22 @@ impl ObjectStore for LocalFileSystem { .await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + // First check if we need to validate preconditions + if opts.if_match.is_some() || opts.if_unmodified_since.is_some() { + let meta = self.head(location).await?; + opts.check_preconditions(&meta)?; + } + + // Version-specific delete is not supported in local filesystem + if opts.version.is_some() { + return Err(crate::Error::NotImplemented); + } + + // Perform the delete + self.delete(location).await + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.list_with_maybe_offset(prefix, None) } diff --git a/src/memory.rs b/src/memory.rs index e15c2465..cc33cbfc 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -29,9 +29,9 @@ use parking_lot::RwLock; use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; use crate::{ - path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, - MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, - Result, UpdateVersion, UploadPart, + path::Path, Attributes, DeleteOptions, GetRange, GetResult, GetResultPayload, ListResult, + MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, + PutOptions, PutResult, Result, UpdateVersion, UploadPart, }; use crate::{GetOptions, PutPayload}; @@ -311,6 +311,29 @@ impl ObjectStore for InMemory { Ok(()) } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + // First check if object exists for conditional deletes + if opts.if_match.is_some() || opts.if_unmodified_since.is_some() { + let entry = self.entry(location)?; + let meta = ObjectMeta { + location: location.clone(), + e_tag: Some(entry.e_tag.to_string()), + last_modified: entry.last_modified, + size: entry.data.len() as u64, + version: None, + }; + opts.check_preconditions(&meta)?; + } + + // Version-specific delete is not supported in memory store + if opts.version.is_some() { + return Err(crate::Error::NotImplemented); + } + + self.storage.write().map.remove(location); + Ok(()) + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let root = Path::default(); let prefix = prefix.unwrap_or(&root); diff --git a/src/prefix.rs b/src/prefix.rs index e5a917aa..284b42cf 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -22,7 +22,7 @@ use std::ops::Range; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + DeleteOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; @@ -158,6 +158,11 @@ impl ObjectStore for PrefixStore { self.inner.delete(&full_path).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + let full_path = self.full_path(location); + self.inner.delete_opts(&full_path, opts).await + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let prefix = self.full_path(prefix.unwrap_or(&Path::default())); let s = self.inner.list(Some(&prefix)); diff --git a/src/throttle.rs b/src/throttle.rs index dec642a7..78260085 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -22,8 +22,9 @@ use std::{convert::TryInto, sync::Arc}; use crate::multipart::{MultipartStore, PartId}; use crate::{ - path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + path::Path, DeleteOptions, GetResult, GetResultPayload, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, + PutResult, Result, }; use crate::{GetOptions, UploadPart}; use async_trait::async_trait; @@ -237,6 +238,12 @@ impl ObjectStore for ThrottledStore { self.inner.delete(location).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + sleep(self.config().wait_delete_per_call).await; + + self.inner.delete_opts(location, opts).await + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { let stream = self.inner.list(prefix); let config = Arc::clone(&self.config); diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs index d5ac8e39..aadaa78a 100644 --- a/tests/get_range_file.rs +++ b/tests/get_range_file.rs @@ -62,6 +62,10 @@ impl ObjectStore for MyStore { todo!() } + async fn delete_opts(&self, _: &Path, _: DeleteOptions) -> Result<()> { + todo!() + } + fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result> { todo!() } From 9bd4f006fe2e9b3befce18e3fc88bb1e95ff8eeb Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Tue, 8 Jul 2025 21:19:23 -0700 Subject: [PATCH 2/3] add more tests --- src/aws/mod.rs | 2 + src/azure/mod.rs | 2 + src/gcp/mod.rs | 6 +- src/integration.rs | 286 +++++++++++++++++++++++++++++++++++++++++++++ src/local.rs | 1 + src/memory.rs | 2 + 6 files changed, 297 insertions(+), 2 deletions(-) diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 5ef14bb6..2102794a 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -638,6 +638,8 @@ mod tests { } if test_conditional_put { put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; } // run integration test with unsigned payload enabled diff --git a/src/azure/mod.rs b/src/azure/mod.rs index ea25f38e..f578e2a4 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -330,6 +330,8 @@ mod tests { copy_if_not_exists(&integration).await; stream_get(&integration).await; put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; multipart(&integration, &integration).await; multipart_race_condition(&integration, false).await; multipart_out_of_order(&integration).await; diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index 39dc2081..fbf5dc1a 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -42,8 +42,8 @@ use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{ multipart::PartId, path::Path, DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, - MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, - PutResult, Result, UploadPart, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, + Result, UploadPart, }; use async_trait::async_trait; use client::GoogleCloudStorageClient; @@ -319,6 +319,8 @@ mod test { // Fake GCS server doesn't currently honor preconditions get_opts(&integration).await; put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; // Fake GCS server doesn't currently support attributes put_get_attributes(&integration).await; } diff --git a/src/integration.rs b/src/integration.rs index 49b7be57..40cec139 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -1313,3 +1313,289 @@ pub async fn list_paginated(storage: &dyn ObjectStore, list: &dyn PaginatedListS ); assert!(ret.page_token.is_none()); } +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::DeleteOptions; +use chrono::{Duration, Utc}; + +/// Tests conditional deletes +pub async fn delete_opts(storage: &dyn ObjectStore, supports_conditional: bool) { + let rng = rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + + // Test 1: Delete with matching ETag + let path = Path::from(format!("delete_opts_etag_{suffix}")); + let result = storage.put(&path, "test data".into()).await.unwrap(); + let etag = result.e_tag.clone(); + + if supports_conditional && etag.is_some() { + // Delete with correct ETag should succeed + let opts = DeleteOptions { + if_match: etag.clone(), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + + // Verify object was deleted + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. }), "{err}"); + } else if !supports_conditional { + // Should return NotImplemented for stores that don't support it + let opts = DeleteOptions { + if_match: Some("some-etag".to_string()), + ..Default::default() + }; + let err = storage.delete_opts(&path, opts).await.unwrap_err(); + assert!(matches!(err, Error::NotImplemented), "{err}"); + + // Clean up + storage.delete(&path).await.unwrap(); + return; + } + + // Test 2: Delete with non-matching ETag should fail + let path = Path::from(format!("delete_opts_etag_fail_{suffix}")); + let result = storage.put(&path, "test data".into()).await.unwrap(); + + if supports_conditional && result.e_tag.is_some() { + let opts = DeleteOptions { + if_match: Some("wrong-etag".to_string()), + ..Default::default() + }; + let err = storage.delete_opts(&path, opts).await.unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + // Verify object still exists + let data = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(data.as_ref(), b"test data"); + + // Clean up + storage.delete(&path).await.unwrap(); + } + + // Test 3: Delete with if_unmodified_since + let path = Path::from(format!("delete_opts_time_{suffix}")); + let _result = storage.put(&path, "test data".into()).await.unwrap(); + let meta = storage.head(&path).await.unwrap(); + + if supports_conditional { + // Delete with future timestamp should succeed + let opts = DeleteOptions { + if_unmodified_since: Some(meta.last_modified + Duration::hours(1)), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + + // Verify object was deleted + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. }), "{err}"); + } + + // Test 4: Delete with past if_unmodified_since should fail + let path = Path::from(format!("delete_opts_time_fail_{suffix}")); + storage.put(&path, "test data".into()).await.unwrap(); + let meta = storage.head(&path).await.unwrap(); + + if supports_conditional { + let opts = DeleteOptions { + if_unmodified_since: Some(meta.last_modified - Duration::hours(1)), + ..Default::default() + }; + let err = storage.delete_opts(&path, opts).await.unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + // Verify object still exists + let data = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(data.as_ref(), b"test data"); + + // Clean up + storage.delete(&path).await.unwrap(); + } + + // Test 5: Delete with multiple conditions (ETag takes precedence) + let path = Path::from(format!("delete_opts_multi_{suffix}")); + let result = storage.put(&path, "test data".into()).await.unwrap(); + let meta = storage.head(&path).await.unwrap(); + + if supports_conditional && result.e_tag.is_some() { + // Both conditions satisfied - should succeed + let opts = DeleteOptions { + if_match: result.e_tag.clone(), + if_unmodified_since: Some(meta.last_modified + Duration::hours(1)), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + + // Verify object was deleted + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. }), "{err}"); + + // Test with ETag match and time match - should succeed + storage.put(&path, "test data 2".into()).await.unwrap(); + let result2 = storage.put(&path, "test data 3".into()).await.unwrap(); + let opts = DeleteOptions { + if_match: result2.e_tag.clone(), + if_unmodified_since: Some(Utc::now() + Duration::hours(1)), // Future time + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + } + + // Test 6: Delete non-existent object with conditions + let path = Path::from(format!("delete_opts_notfound_{suffix}")); + if supports_conditional { + let opts = DeleteOptions { + if_match: Some("some-etag".to_string()), + ..Default::default() + }; + // Behavior varies by store - some return NotFound, others may return success + let result = storage.delete_opts(&path, opts).await; + if let Err(err) = result { + // Most stores should return NotFound + assert!( + matches!(err, Error::NotFound { .. }) || matches!(err, Error::Precondition { .. }), + "{err}" + ); + } + } + + // Test 7: Version-specific delete (if supported) + let path = Path::from(format!("delete_opts_version_{suffix}")); + storage.put(&path, "version 1".into()).await.unwrap(); + + // Most stores don't support versioning in tests, so we just verify it doesn't crash + let opts = DeleteOptions { + version: Some("v123".to_string()), + ..Default::default() + }; + let result = storage.delete_opts(&path, opts).await; + match result { + Ok(_) => { + // Some stores might ignore version + } + Err(Error::NotImplemented) => { + // Expected for stores that don't support versioning + } + Err(Error::NotFound { .. }) => { + // Also acceptable - version doesn't exist + } + Err(err) => { + // Other errors might indicate version mismatch + assert!( + matches!(err, Error::Precondition { .. }) || matches!(err, Error::Generic { .. }), + "Unexpected error: {err}" + ); + } + } + + // Clean up + let _ = storage.delete(&path).await; + + // Test 8: Wildcard ETag matching + let path = Path::from(format!("delete_opts_wildcard_{suffix}")); + storage.put(&path, "test data".into()).await.unwrap(); + + if supports_conditional { + // "*" should match any existing object + let opts = DeleteOptions { + if_match: Some("*".to_string()), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + + // Verify object was deleted + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. }), "{err}"); + } + + // Test 9: Multiple ETags (comma-separated) + let path = Path::from(format!("delete_opts_multi_etag_{suffix}")); + let result = storage.put(&path, "test data".into()).await.unwrap(); + + if supports_conditional && result.e_tag.is_some() { + let etag = result.e_tag.unwrap(); + // Include correct ETag among others + let opts = DeleteOptions { + if_match: Some(format!("\"wrong1\", {}, \"wrong2\"", etag)), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + + // Verify object was deleted + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. }), "{err}"); + } +} + +/// Test concurrent conditional deletes (race conditions) +pub async fn delete_opts_race_condition(storage: &dyn ObjectStore, supports_conditional: bool) { + if !supports_conditional { + return; + } + + use futures::stream::{FuturesUnordered, StreamExt}; + + let rng = rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + let path = Path::from(format!("delete_race_{suffix}")); + + // Create object + let result = storage.put(&path, "test data".into()).await.unwrap(); + + if let Some(etag) = result.e_tag { + const NUM_WORKERS: usize = 5; + + // Multiple workers try to delete with same ETag + let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS) + .map(|_| { + let opts = DeleteOptions { + if_match: Some(etag.clone()), + ..Default::default() + }; + storage.delete_opts(&path, opts) + }) + .collect(); + + let mut success_count = 0; + let mut precondition_count = 0; + let mut not_found_count = 0; + + while let Some(result) = futures.next().await { + match result { + Ok(_) => success_count += 1, + Err(Error::Precondition { .. }) => precondition_count += 1, + Err(Error::NotFound { .. }) => not_found_count += 1, + Err(err) => panic!("Unexpected error: {err}"), + } + } + + // Exactly one should succeed + assert_eq!(success_count, 1, "Exactly one delete should succeed"); + + // Others should fail with either Precondition or NotFound + assert_eq!( + precondition_count + not_found_count, + NUM_WORKERS - 1, + "Other deletes should fail" + ); + + // Verify object is actually deleted + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. })); + } +} diff --git a/src/local.rs b/src/local.rs index 672e599a..c22cb88b 100644 --- a/src/local.rs +++ b/src/local.rs @@ -1125,6 +1125,7 @@ mod tests { copy_rename_nonexistent_object(&integration).await; stream_get(&integration).await; put_opts(&integration, false).await; + delete_opts(&integration, true).await; } #[test] diff --git a/src/memory.rs b/src/memory.rs index cc33cbfc..1c7997a1 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -575,6 +575,8 @@ mod tests { copy_if_not_exists(&integration).await; stream_get(&integration).await; put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; multipart(&integration, &integration).await; put_get_attributes(&integration).await; } From 07ea904f567a0287746bccedee079c391e24675e Mon Sep 17 00:00:00 2001 From: Ankur Goyal Date: Tue, 8 Jul 2025 22:14:53 -0700 Subject: [PATCH 3/3] clippy --- src/azure/client.rs | 2 +- src/gcp/mod.rs | 2 +- src/integration.rs | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/azure/client.rs b/src/azure/client.rs index 1e42feaf..327fbffe 100644 --- a/src/azure/client.rs +++ b/src/azure/client.rs @@ -683,7 +683,7 @@ impl AzureClient { } if let Some(if_unmodified_since) = opts.if_unmodified_since { - builder = builder.header(IF_UNMODIFIED_SINCE, &if_unmodified_since.to_rfc2822()); + builder = builder.header(IF_UNMODIFIED_SINCE, if_unmodified_since.to_rfc2822()); } // Azure supports versioned deletes via x-ms-version-id header diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index fbf5dc1a..15d76624 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -42,7 +42,7 @@ use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{ multipart::PartId, path::Path, DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, - MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; diff --git a/src/integration.rs b/src/integration.rs index 40cec139..db054930 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -34,6 +34,7 @@ use crate::{ use bytes::Bytes; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; +use rand::distr::Alphanumeric; use rand::{rng, Rng}; use std::collections::HashSet;