Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ use url::Url;
/// Default metadata endpoint
static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254";

/// AWS S3 does not support copy operations larger than 5 GiB in a single request. See [Copy
/// Object](https://docs.aws.amazon.com/AmazonS3/latest/userguide/copy-object.html) for reference.
const MAX_SINGLE_REQUEST_COPY_SIZE: u64 = 5 * 1024 * 1024 * 1024;

/// A specialized `Error` for object store-related errors
#[derive(Debug, thiserror::Error)]
enum Error {
Expand Down Expand Up @@ -186,6 +190,10 @@ pub struct AmazonS3Builder {
request_payer: ConfigValue<bool>,
/// The [`HttpConnector`] to use
http_connector: Option<Arc<dyn HttpConnector>>,
/// Threshold (bytes) above which copy uses multipart copy. If not set, defaults to 5 GiB.
multipart_copy_threshold: Option<ConfigValue<u64>>,
/// Preferred multipart copy part size (bytes). If not set, defaults to 5 GiB.
multipart_copy_part_size: Option<ConfigValue<u64>>,
}

/// Configuration keys for [`AmazonS3Builder`]
Expand Down Expand Up @@ -396,6 +404,10 @@ pub enum AmazonS3ConfigKey {

/// Encryption options
Encryption(S3EncryptionConfigKey),
/// Threshold (bytes) to switch to multipart copy
MultipartCopyThreshold,
/// Preferred multipart copy part size (bytes)
MultipartCopyPartSize,
}

impl AsRef<str> for AmazonS3ConfigKey {
Expand Down Expand Up @@ -428,6 +440,8 @@ impl AsRef<str> for AmazonS3ConfigKey {
Self::RequestPayer => "aws_request_payer",
Self::Client(opt) => opt.as_ref(),
Self::Encryption(opt) => opt.as_ref(),
Self::MultipartCopyThreshold => "aws_multipart_copy_threshold",
Self::MultipartCopyPartSize => "aws_multipart_copy_part_size",
}
}
}
Expand Down Expand Up @@ -466,6 +480,12 @@ impl FromStr for AmazonS3ConfigKey {
"aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut),
"aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging),
"aws_request_payer" | "request_payer" => Ok(Self::RequestPayer),
"aws_multipart_copy_threshold" | "multipart_copy_threshold" => {
Ok(Self::MultipartCopyThreshold)
}
"aws_multipart_copy_part_size" | "multipart_copy_part_size" => {
Ok(Self::MultipartCopyPartSize)
}
// Backwards compatibility
"aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
"aws_server_side_encryption" => Ok(Self::Encryption(
Expand Down Expand Up @@ -631,6 +651,12 @@ impl AmazonS3Builder {
self.encryption_customer_key_base64 = Some(value.into())
}
},
AmazonS3ConfigKey::MultipartCopyThreshold => {
self.multipart_copy_threshold = Some(ConfigValue::Deferred(value.into()))
}
AmazonS3ConfigKey::MultipartCopyPartSize => {
self.multipart_copy_part_size = Some(ConfigValue::Deferred(value.into()))
}
};
self
}
Expand Down Expand Up @@ -698,6 +724,14 @@ impl AmazonS3Builder {
self.encryption_customer_key_base64.clone()
}
},
AmazonS3ConfigKey::MultipartCopyThreshold => self
.multipart_copy_threshold
.as_ref()
.map(|x| x.to_string()),
AmazonS3ConfigKey::MultipartCopyPartSize => self
.multipart_copy_part_size
.as_ref()
.map(|x| x.to_string()),
}
}

Expand Down Expand Up @@ -990,6 +1024,18 @@ impl AmazonS3Builder {
self
}

/// Set threshold (bytes) above which copy uses multipart copy
pub fn with_multipart_copy_threshold(mut self, threshold_bytes: u64) -> Self {
self.multipart_copy_threshold = Some(ConfigValue::Parsed(threshold_bytes));
self
}

/// Set preferred multipart copy part size (bytes)
pub fn with_multipart_copy_part_size(mut self, part_size_bytes: u64) -> Self {
self.multipart_copy_part_size = Some(ConfigValue::Parsed(part_size_bytes));
self
}

/// Create a [`AmazonS3`] instance from the provided values,
/// consuming `self`.
pub fn build(mut self) -> Result<AmazonS3> {
Expand Down Expand Up @@ -1147,6 +1193,17 @@ impl AmazonS3Builder {
S3EncryptionHeaders::default()
};

let multipart_copy_threshold = self
.multipart_copy_threshold
.map(|val| val.get())
.transpose()?
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
let multipart_copy_part_size = self
.multipart_copy_part_size
.map(|val| val.get())
.transpose()?
.unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE);
Comment on lines +1196 to +1205
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clamp to 5GiB because that's the documented maximum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if someone wants to push it over 5GB, they should be able to. There are many "s3-compatible" object stores that might not share the same limitations.


let config = S3Config {
region,
endpoint: self.endpoint,
Expand All @@ -1164,6 +1221,8 @@ impl AmazonS3Builder {
conditional_put: self.conditional_put.get()?,
encryption_headers,
request_payer: self.request_payer.get()?,
multipart_copy_threshold,
multipart_copy_part_size,
};

let http_client = http.connect(&config.client_options)?;
Expand Down
56 changes: 38 additions & 18 deletions src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl From<Error> for crate::Error {
pub(crate) enum PutPartPayload<'a> {
Part(PutPayload),
Copy(&'a Path),
CopyRange(&'a Path, std::ops::Range<u64>),
}

impl Default for PutPartPayload<'_> {
Expand Down Expand Up @@ -208,6 +209,10 @@ pub(crate) struct S3Config {
pub conditional_put: S3ConditionalPut,
pub request_payer: bool,
pub(super) encryption_headers: S3EncryptionHeaders,
/// Threshold in bytes above which copy will use multipart copy
pub multipart_copy_threshold: u64,
/// Preferred multipart copy part size in bytes (None => auto)
pub multipart_copy_part_size: u64,
}

impl S3Config {
Expand Down Expand Up @@ -676,7 +681,10 @@ impl S3Client {
part_idx: usize,
data: PutPartPayload<'_>,
) -> Result<PartId> {
let is_copy = matches!(data, PutPartPayload::Copy(_));
let is_copy = matches!(
data,
PutPartPayload::Copy(_) | PutPartPayload::CopyRange(_, _)
);
let part = (part_idx + 1).to_string();

let mut request = self
Expand All @@ -690,6 +698,18 @@ impl S3Client {
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
),
PutPartPayload::CopyRange(path, range) => {
// AWS expects inclusive end for copy range header
let start = range.start;
let end_inclusive = range.end.saturating_sub(1);
let range_value = format!("bytes={}-{}", start, end_inclusive);
request
.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
)
.header("x-amz-copy-source-range", &range_value)
}
};

if self
Expand All @@ -701,24 +721,24 @@ impl S3Client {
// If SSE-C is used, we must include the encryption headers in every upload request.
request = request.with_encryption_headers();
}

let (parts, body) = request.send().await?.into_parts();
let checksum_sha256 = parts
.headers
.get(SHA256_CHECKSUM)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());

let e_tag = match is_copy {
false => get_etag(&parts.headers).map_err(|source| Error::Metadata { source })?,
true => {
let response = body
.bytes()
.await
.map_err(|source| Error::CreateMultipartResponseBody { source })?;
let response: CopyPartResult = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;
response.e_tag
}
let (e_tag, checksum_sha256) = if is_copy {
let response = body
.bytes()
.await
.map_err(|source| Error::CreateMultipartResponseBody { source })?;
let response: CopyPartResult = quick_xml::de::from_reader(response.reader())
.map_err(|source| Error::InvalidMultipartResponse { source })?;
(response.e_tag, response.checksum_sha256)
} else {
let e_tag = get_etag(&parts.headers).map_err(|source| Error::Metadata { source })?;
let checksum_sha256 = parts
.headers
.get(SHA256_CHECKSUM)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());
(e_tag, checksum_sha256)
};

let content_id = if self.config.checksum == Some(Checksum::SHA256) {
Expand Down
140 changes: 98 additions & 42 deletions src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,63 @@ impl AmazonS3 {
fn path_url(&self, path: &Path) -> String {
self.client.config.path_url(path)
}

/// Perform a multipart copy operation.
///
/// If the multipart upload fails, make a best effort attempt to clean it up. It's the caller's
/// responsibility to add a lifecycle rule if guaranteed cleanup is required, as we cannot
/// protect against an ill-timed process crash.
async fn copy_multipart(
&self,
from: &Path,
to: &Path,
size: u64,
mode: CompleteMultipartMode,
) -> Result<()> {
let upload_id = self
.client
.create_multipart(to, PutMultipartOptions::default())
.await?;

// S3 requires minimum 5 MiB per part (except final) and max 10,000 parts
let part_size = self.client.config.multipart_copy_part_size;

let mut parts = Vec::new();
let mut offset: u64 = 0;
let mut idx: usize = 0;
let res = async {
while offset < size {
let end = if size - offset <= part_size {
size
} else {
offset + part_size
};
let payload = if offset == 0 && end == size {
PutPartPayload::Copy(from)
} else {
PutPartPayload::CopyRange(from, offset..end)
};
let part = self.client.put_part(to, &upload_id, idx, payload).await?;
parts.push(part);
idx += 1;
offset = end;
}
self.client
.complete_multipart(to, &upload_id, parts, mode)
.await
.map(|_| ())
}
.await;

// If the multipart upload failed, make a best effort attempt to
// clean it up. It's the caller's responsibility to add a
// lifecycle rule if guaranteed cleanup is required, as we
// cannot protect against an ill-timed process crash.
if res.is_err() {
let _ = self.client.abort_multipart(to, &upload_id).await;
}
res
}
}

#[async_trait]
Expand Down Expand Up @@ -316,11 +373,28 @@ impl ObjectStore for AmazonS3 {
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
self.client
.copy_request(from, to)
.idempotent(true)
.send()
.await?;
// Determine source size to decide between single CopyObject and multipart copy
let head_meta = self
.client
.get_opts(
from,
GetOptions {
head: true,
..Default::default()
},
)
.await?
.meta;
Comment on lines +376 to +387
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this first try to use CopyObject and then fall back if it fails due to size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - let me see if that's straightforward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with that approach is that on error, AWS does not respond with anything more specific than "InvalidRequest":

<Error><Code>InvalidRequest</Code><Message>The specified copy source is larger than the maximum allowable size for a copy source: 5368709120</Message><RequestId>8550KAYYHRYF33SM</RequestId><HostId>R7zaiPWt96z/yQm2PtDT+pyFmYF76YCBcW0AeukdrXpS4qlSuO1nmXTFI4Ak2YcHMsBoymw33j4=</HostId></Error>

So there's not really a stable API for determining that the request is invalid because of the size of the source.

if head_meta.size <= self.client.config.multipart_copy_threshold {
self.client
.copy_request(from, to)
.idempotent(true)
.send()
.await?;
} else {
self.copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Overwrite)
.await?;
}
Ok(())
}

Expand All @@ -329,45 +403,27 @@ impl ObjectStore for AmazonS3 {
Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED),
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status),
Some(S3CopyIfNotExists::Multipart) => {
let upload_id = self
let head_meta = self
.client
.create_multipart(to, PutMultipartOptions::default())
.await?;

let res = async {
let part_id = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
match self
.client
.complete_multipart(
to,
&upload_id,
vec![part_id],
CompleteMultipartMode::Create,
)
.await
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
.get_opts(
from,
GetOptions {
head: true,
..Default::default()
},
)
.await?
.meta;
return self
.copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Create)
.await
.map_err(|err| match err {
Error::Precondition { .. } => Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
}),
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
.await;

// If the multipart upload failed, make a best effort attempt to
// clean it up. It's the caller's responsibility to add a
// lifecycle rule if guaranteed cleanup is required, as we
// cannot protect against an ill-timed process crash.
if res.is_err() {
let _ = self.client.abort_multipart(to, &upload_id).await;
}

return res;
source: Box::new(err),
},
other => other,
});
}
#[allow(deprecated)]
Some(S3CopyIfNotExists::Dynamo(lock)) => {
Expand Down
2 changes: 2 additions & 0 deletions src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub(crate) struct InitiateMultipartUploadResult {
pub(crate) struct CopyPartResult {
#[serde(rename = "ETag")]
pub e_tag: String,
#[serde(default, rename = "ChecksumSHA256")]
pub checksum_sha256: Option<String>,
}

#[derive(Debug, Serialize)]
Expand Down
Loading