Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
use async_trait::async_trait;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH, IF_UNMODIFIED_SINCE};
use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
use url::Url;
Expand All @@ -44,9 +44,9 @@ use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
use crate::util::STRICT_ENCODE_SET;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
UploadPart,
DeleteOptions, Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult,
Result, UploadPart,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
Expand Down Expand Up @@ -255,6 +255,33 @@ impl ObjectStore for AmazonS3 {
Ok(())
}

async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> {
let request = self.client.request(Method::DELETE, location);

// Add conditional headers if specified
let request = if let Some(if_match) = &opts.if_match {
request.header(&IF_MATCH, if_match)
} else {
request
};

let request = if let Some(if_unmodified_since) = opts.if_unmodified_since {
request.header(&IF_UNMODIFIED_SINCE, &if_unmodified_since.to_rfc2822())
} else {
request
};

// AWS S3 supports versioned deletes
let request = if let Some(version) = &opts.version {
request.query(&[("versionId", version)])
} else {
request
};

request.with_extensions(opts.extensions).send().await?;
Ok(())
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, Result<Path>>,
Expand Down Expand Up @@ -611,6 +638,8 @@ mod tests {
}
if test_conditional_put {
put_opts(&integration, true).await;
delete_opts(&integration, true).await;
delete_opts_race_condition(&integration, true).await;
}

// run integration test with unsigned payload enabled
Expand Down
57 changes: 54 additions & 3 deletions src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,19 @@ use crate::list::{PaginatedListOptions, PaginatedListResult};
use crate::multipart::PartId;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet,
Attribute, Attributes, ClientOptions, DeleteOptions, GetOptions, ListResult, ObjectMeta, Path,
PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet,
};
use async_trait::async_trait;
use base64::prelude::{BASE64_STANDARD, BASE64_STANDARD_NO_PAD};
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use http::{
header::{HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH},
header::{
HeaderMap, HeaderValue, CONTENT_LENGTH, CONTENT_TYPE, IF_MATCH, IF_NONE_MATCH,
IF_UNMODIFIED_SINCE,
},
HeaderName, Method,
};
use rand::Rng as _;
Expand Down Expand Up @@ -655,6 +658,54 @@ impl AzureClient {
Ok(())
}

/// Make an Azure Delete request with conditional options
pub(crate) async fn delete_request_with_opts(
&self,
path: &Path,
opts: DeleteOptions,
) -> Result<()> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);

let sensitive = credential
.as_deref()
.map(|c| c.sensitive_request())
.unwrap_or_default();

let mut builder = self
.client
.delete(url.as_str())
.header(&DELETE_SNAPSHOTS, "include");

// Add conditional headers if specified
if let Some(if_match) = &opts.if_match {
builder = builder.header(IF_MATCH, if_match);
}

if let Some(if_unmodified_since) = opts.if_unmodified_since {
builder = builder.header(IF_UNMODIFIED_SINCE, if_unmodified_since.to_rfc2822());
}

// Azure supports versioned deletes via x-ms-version-id header
if let Some(version) = &opts.version {
builder = builder.header("x-ms-version-id", version);
}

builder
.extensions(opts.extensions)
.with_azure_authorization(&credential, &self.config.account)
.retryable(&self.config.retry_config)
.sensitive(sensitive)
.send()
.await
.map_err(|source| {
let path = path.as_ref().into();
Error::DeleteRequest { source, path }
})?;

Ok(())
}

fn build_bulk_delete_body(
&self,
boundary: &str,
Expand Down
12 changes: 10 additions & 2 deletions src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::{
multipart::{MultipartStore, PartId},
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -120,6 +120,12 @@ impl ObjectStore for MicrosoftAzure {
self.client.delete_request(location, &()).await
}

async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> {
// Azure delete_request accepts a query parameter, but we need to handle DeleteOptions differently
// We'll need to update the client to have a specific method for conditional deletes
self.client.delete_request_with_opts(location, opts).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}
Expand Down Expand Up @@ -324,6 +330,8 @@ mod tests {
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
put_opts(&integration, true).await;
delete_opts(&integration, true).await;
delete_opts_race_condition(&integration, true).await;
multipart(&integration, &integration).await;
multipart_race_condition(&integration, false).await;
multipart_out_of_order(&integration).await;
Expand Down
8 changes: 6 additions & 2 deletions src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use futures::StreamExt;

use crate::path::Path;
use crate::{
GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
PutMultipartOptions, PutOptions, PutResult,
DeleteOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutResult,
};
use crate::{PutPayload, Result};

Expand Down Expand Up @@ -150,6 +150,10 @@ impl ObjectStore for ChunkedStore {
self.inner.delete(location).await
}

async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> {
self.inner.delete_opts(location, opts).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list(prefix)
}
Expand Down
32 changes: 30 additions & 2 deletions src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use crate::multipart::PartId;
use crate::path::Path;
use crate::util::hex_encode;
use crate::{
Attribute, Attributes, ClientOptions, GetOptions, MultipartId, PutMode, PutMultipartOptions,
PutOptions, PutPayload, PutResult, Result, RetryConfig,
Attribute, Attributes, ClientOptions, DeleteOptions, GetOptions, MultipartId, PutMode,
PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
Expand Down Expand Up @@ -563,6 +563,34 @@ impl GoogleCloudStorageClient {
Ok(())
}

pub(crate) async fn delete_request_with_opts(
&self,
path: &Path,
opts: DeleteOptions,
) -> Result<()> {
let mut request = self.request(Method::DELETE, path);

// Add conditional headers if specified
if let Some(if_match) = &opts.if_match {
request = request.header(&HeaderName::from_static("if-match"), if_match);
}

if let Some(if_unmodified_since) = opts.if_unmodified_since {
request = request.header(
&HeaderName::from_static("if-unmodified-since"),
&if_unmodified_since.to_rfc2822(),
);
}

// GCS supports versioned deletes via generation parameter
if let Some(version) = &opts.version {
request = request.query(&[("generation", version)]);
}

request.with_extensions(opts.extensions).send().await?;
Ok(())
}

/// Perform a copy request <https://cloud.google.com/storage/docs/xml-api/put-object-copy>
pub(crate) async fn copy_request(
&self,
Expand Down
12 changes: 9 additions & 3 deletions src/gcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use crate::client::CredentialProvider;
use crate::gcp::credential::GCSAuthorizer;
use crate::signer::Signer;
use crate::{
multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
UploadPart,
multipart::PartId, path::Path, DeleteOptions, GetOptions, GetResult, ListResult, MultipartId,
MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
Result, UploadPart,
};
use async_trait::async_trait;
use client::GoogleCloudStorageClient;
Expand Down Expand Up @@ -184,6 +184,10 @@ impl ObjectStore for GoogleCloudStorage {
self.client.delete_request(location).await
}

async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> {
self.client.delete_request_with_opts(location, opts).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.client.list(prefix)
}
Expand Down Expand Up @@ -315,6 +319,8 @@ mod test {
// Fake GCS server doesn't currently honor preconditions
get_opts(&integration).await;
put_opts(&integration, true).await;
delete_opts(&integration, true).await;
delete_opts_race_condition(&integration, true).await;
// Fake GCS server doesn't currently support attributes
put_get_attributes(&integration).await;
}
Expand Down
11 changes: 8 additions & 3 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ use crate::client::{http_connector, HttpConnector};
use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
RetryConfig,
ClientConfigKey, ClientOptions, DeleteOptions, GetOptions, GetResult, ListResult,
MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload,
PutResult, Result, RetryConfig,
};

mod client;
Expand Down Expand Up @@ -136,6 +136,11 @@ impl ObjectStore for HttpStore {
self.client.delete(location).await
}

async fn delete_opts(&self, _location: &Path, _opts: DeleteOptions) -> Result<()> {
// HTTP/WebDAV protocol doesn't support conditional deletes
Err(crate::Error::NotImplemented)
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let prefix = prefix.cloned();
Expand Down
Loading
Loading