diff --git a/src/aws/builder.rs b/src/aws/builder.rs index 6e6f8e2f..b40b0d0a 100644 --- a/src/aws/builder.rs +++ b/src/aws/builder.rs @@ -42,6 +42,10 @@ use url::Url; /// Default metadata endpoint static DEFAULT_METADATA_ENDPOINT: &str = "http://169.254.169.254"; +/// AWS S3 does not support copy operations larger than 5 GiB in a single request. See [Copy +/// Object](https://docs.aws.amazon.com/AmazonS3/latest/userguide/copy-object.html) for reference. +const MAX_SINGLE_REQUEST_COPY_SIZE: u64 = 5 * 1024 * 1024 * 1024; + /// A specialized `Error` for object store-related errors #[derive(Debug, thiserror::Error)] enum Error { @@ -186,6 +190,10 @@ pub struct AmazonS3Builder { request_payer: ConfigValue, /// The [`HttpConnector`] to use http_connector: Option>, + /// Threshold (bytes) above which copy uses multipart copy. If not set, defaults to 5 GiB. + multipart_copy_threshold: Option>, + /// Preferred multipart copy part size (bytes). If not set, defaults to 5 GiB. + multipart_copy_part_size: Option>, } /// Configuration keys for [`AmazonS3Builder`] @@ -396,6 +404,10 @@ pub enum AmazonS3ConfigKey { /// Encryption options Encryption(S3EncryptionConfigKey), + /// Threshold (bytes) to switch to multipart copy + MultipartCopyThreshold, + /// Preferred multipart copy part size (bytes) + MultipartCopyPartSize, } impl AsRef for AmazonS3ConfigKey { @@ -428,6 +440,8 @@ impl AsRef for AmazonS3ConfigKey { Self::RequestPayer => "aws_request_payer", Self::Client(opt) => opt.as_ref(), Self::Encryption(opt) => opt.as_ref(), + Self::MultipartCopyThreshold => "aws_multipart_copy_threshold", + Self::MultipartCopyPartSize => "aws_multipart_copy_part_size", } } } @@ -466,6 +480,12 @@ impl FromStr for AmazonS3ConfigKey { "aws_conditional_put" | "conditional_put" => Ok(Self::ConditionalPut), "aws_disable_tagging" | "disable_tagging" => Ok(Self::DisableTagging), "aws_request_payer" | "request_payer" => Ok(Self::RequestPayer), + "aws_multipart_copy_threshold" | "multipart_copy_threshold" => { + Ok(Self::MultipartCopyThreshold) + } + "aws_multipart_copy_part_size" | "multipart_copy_part_size" => { + Ok(Self::MultipartCopyPartSize) + } // Backwards compatibility "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), "aws_server_side_encryption" => Ok(Self::Encryption( @@ -631,6 +651,12 @@ impl AmazonS3Builder { self.encryption_customer_key_base64 = Some(value.into()) } }, + AmazonS3ConfigKey::MultipartCopyThreshold => { + self.multipart_copy_threshold = Some(ConfigValue::Deferred(value.into())) + } + AmazonS3ConfigKey::MultipartCopyPartSize => { + self.multipart_copy_part_size = Some(ConfigValue::Deferred(value.into())) + } }; self } @@ -698,6 +724,14 @@ impl AmazonS3Builder { self.encryption_customer_key_base64.clone() } }, + AmazonS3ConfigKey::MultipartCopyThreshold => self + .multipart_copy_threshold + .as_ref() + .map(|x| x.to_string()), + AmazonS3ConfigKey::MultipartCopyPartSize => self + .multipart_copy_part_size + .as_ref() + .map(|x| x.to_string()), } } @@ -990,6 +1024,18 @@ impl AmazonS3Builder { self } + /// Set threshold (bytes) above which copy uses multipart copy + pub fn with_multipart_copy_threshold(mut self, threshold_bytes: u64) -> Self { + self.multipart_copy_threshold = Some(ConfigValue::Parsed(threshold_bytes)); + self + } + + /// Set preferred multipart copy part size (bytes) + pub fn with_multipart_copy_part_size(mut self, part_size_bytes: u64) -> Self { + self.multipart_copy_part_size = Some(ConfigValue::Parsed(part_size_bytes)); + self + } + /// Create a [`AmazonS3`] instance from the provided values, /// consuming `self`. pub fn build(mut self) -> Result { @@ -1147,6 +1193,17 @@ impl AmazonS3Builder { S3EncryptionHeaders::default() }; + let multipart_copy_threshold = self + .multipart_copy_threshold + .map(|val| val.get()) + .transpose()? + .unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE); + let multipart_copy_part_size = self + .multipart_copy_part_size + .map(|val| val.get()) + .transpose()? + .unwrap_or(MAX_SINGLE_REQUEST_COPY_SIZE); + let config = S3Config { region, endpoint: self.endpoint, @@ -1164,6 +1221,8 @@ impl AmazonS3Builder { conditional_put: self.conditional_put.get()?, encryption_headers, request_payer: self.request_payer.get()?, + multipart_copy_threshold, + multipart_copy_part_size, }; let http_client = http.connect(&config.client_options)?; diff --git a/src/aws/client.rs b/src/aws/client.rs index 4edb977f..89091f8b 100644 --- a/src/aws/client.rs +++ b/src/aws/client.rs @@ -138,6 +138,7 @@ impl From for crate::Error { pub(crate) enum PutPartPayload<'a> { Part(PutPayload), Copy(&'a Path), + CopyRange(&'a Path, std::ops::Range), } impl Default for PutPartPayload<'_> { @@ -208,6 +209,10 @@ pub(crate) struct S3Config { pub conditional_put: S3ConditionalPut, pub request_payer: bool, pub(super) encryption_headers: S3EncryptionHeaders, + /// Threshold in bytes above which copy will use multipart copy + pub multipart_copy_threshold: u64, + /// Preferred multipart copy part size in bytes (None => auto) + pub multipart_copy_part_size: u64, } impl S3Config { @@ -676,7 +681,10 @@ impl S3Client { part_idx: usize, data: PutPartPayload<'_>, ) -> Result { - let is_copy = matches!(data, PutPartPayload::Copy(_)); + let is_copy = matches!( + data, + PutPartPayload::Copy(_) | PutPartPayload::CopyRange(_, _) + ); let part = (part_idx + 1).to_string(); let mut request = self @@ -690,6 +698,18 @@ impl S3Client { "x-amz-copy-source", &format!("{}/{}", self.config.bucket, encode_path(path)), ), + PutPartPayload::CopyRange(path, range) => { + // AWS expects inclusive end for copy range header + let start = range.start; + let end_inclusive = range.end.saturating_sub(1); + let range_value = format!("bytes={}-{}", start, end_inclusive); + request + .header( + "x-amz-copy-source", + &format!("{}/{}", self.config.bucket, encode_path(path)), + ) + .header("x-amz-copy-source-range", &range_value) + } }; if self @@ -701,24 +721,24 @@ impl S3Client { // If SSE-C is used, we must include the encryption headers in every upload request. request = request.with_encryption_headers(); } + let (parts, body) = request.send().await?.into_parts(); - let checksum_sha256 = parts - .headers - .get(SHA256_CHECKSUM) - .and_then(|v| v.to_str().ok()) - .map(|v| v.to_string()); - - let e_tag = match is_copy { - false => get_etag(&parts.headers).map_err(|source| Error::Metadata { source })?, - true => { - let response = body - .bytes() - .await - .map_err(|source| Error::CreateMultipartResponseBody { source })?; - let response: CopyPartResult = quick_xml::de::from_reader(response.reader()) - .map_err(|source| Error::InvalidMultipartResponse { source })?; - response.e_tag - } + let (e_tag, checksum_sha256) = if is_copy { + let response = body + .bytes() + .await + .map_err(|source| Error::CreateMultipartResponseBody { source })?; + let response: CopyPartResult = quick_xml::de::from_reader(response.reader()) + .map_err(|source| Error::InvalidMultipartResponse { source })?; + (response.e_tag, response.checksum_sha256) + } else { + let e_tag = get_etag(&parts.headers).map_err(|source| Error::Metadata { source })?; + let checksum_sha256 = parts + .headers + .get(SHA256_CHECKSUM) + .and_then(|v| v.to_str().ok()) + .map(|v| v.to_string()); + (e_tag, checksum_sha256) }; let content_id = if self.config.checksum == Some(Checksum::SHA256) { diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 4abf3748..ea00c637 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -103,6 +103,63 @@ impl AmazonS3 { fn path_url(&self, path: &Path) -> String { self.client.config.path_url(path) } + + /// Perform a multipart copy operation. + /// + /// If the multipart upload fails, make a best effort attempt to clean it up. It's the caller's + /// responsibility to add a lifecycle rule if guaranteed cleanup is required, as we cannot + /// protect against an ill-timed process crash. + async fn copy_multipart( + &self, + from: &Path, + to: &Path, + size: u64, + mode: CompleteMultipartMode, + ) -> Result<()> { + let upload_id = self + .client + .create_multipart(to, PutMultipartOptions::default()) + .await?; + + // S3 requires minimum 5 MiB per part (except final) and max 10,000 parts + let part_size = self.client.config.multipart_copy_part_size; + + let mut parts = Vec::new(); + let mut offset: u64 = 0; + let mut idx: usize = 0; + let res = async { + while offset < size { + let end = if size - offset <= part_size { + size + } else { + offset + part_size + }; + let payload = if offset == 0 && end == size { + PutPartPayload::Copy(from) + } else { + PutPartPayload::CopyRange(from, offset..end) + }; + let part = self.client.put_part(to, &upload_id, idx, payload).await?; + parts.push(part); + idx += 1; + offset = end; + } + self.client + .complete_multipart(to, &upload_id, parts, mode) + .await + .map(|_| ()) + } + .await; + + // If the multipart upload failed, make a best effort attempt to + // clean it up. It's the caller's responsibility to add a + // lifecycle rule if guaranteed cleanup is required, as we + // cannot protect against an ill-timed process crash. + if res.is_err() { + let _ = self.client.abort_multipart(to, &upload_id).await; + } + res + } } #[async_trait] @@ -316,11 +373,28 @@ impl ObjectStore for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.client - .copy_request(from, to) - .idempotent(true) - .send() - .await?; + // Determine source size to decide between single CopyObject and multipart copy + let head_meta = self + .client + .get_opts( + from, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await? + .meta; + if head_meta.size <= self.client.config.multipart_copy_threshold { + self.client + .copy_request(from, to) + .idempotent(true) + .send() + .await?; + } else { + self.copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Overwrite) + .await?; + } Ok(()) } @@ -329,45 +403,27 @@ impl ObjectStore for AmazonS3 { Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED), Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status), Some(S3CopyIfNotExists::Multipart) => { - let upload_id = self + let head_meta = self .client - .create_multipart(to, PutMultipartOptions::default()) - .await?; - - let res = async { - let part_id = self - .client - .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) - .await?; - match self - .client - .complete_multipart( - to, - &upload_id, - vec![part_id], - CompleteMultipartMode::Create, - ) - .await - { - Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { + .get_opts( + from, + GetOptions { + head: true, + ..Default::default() + }, + ) + .await? + .meta; + return self + .copy_multipart(from, to, head_meta.size, CompleteMultipartMode::Create) + .await + .map_err(|err| match err { + Error::Precondition { .. } => Error::AlreadyExists { path: to.to_string(), - source: Box::new(e), - }), - Ok(_) => Ok(()), - Err(e) => Err(e), - } - } - .await; - - // If the multipart upload failed, make a best effort attempt to - // clean it up. It's the caller's responsibility to add a - // lifecycle rule if guaranteed cleanup is required, as we - // cannot protect against an ill-timed process crash. - if res.is_err() { - let _ = self.client.abort_multipart(to, &upload_id).await; - } - - return res; + source: Box::new(err), + }, + other => other, + }); } #[allow(deprecated)] Some(S3CopyIfNotExists::Dynamo(lock)) => { diff --git a/src/client/s3.rs b/src/client/s3.rs index a2221fbb..a1b113e0 100644 --- a/src/client/s3.rs +++ b/src/client/s3.rs @@ -98,6 +98,8 @@ pub(crate) struct InitiateMultipartUploadResult { pub(crate) struct CopyPartResult { #[serde(rename = "ETag")] pub e_tag: String, + #[serde(default, rename = "ChecksumSHA256")] + pub checksum_sha256: Option, } #[derive(Debug, Serialize)] diff --git a/src/config.rs b/src/config.rs index 29a389d4..b042e209 100644 --- a/src/config.rs +++ b/src/config.rs @@ -112,6 +112,15 @@ impl Parse for u32 { } } +impl Parse for u64 { + fn parse(v: &str) -> Result { + Self::from_str(v).map_err(|_| Error::Generic { + store: "Config", + source: format!("failed to parse \"{v}\" as u64").into(), + }) + } +} + impl Parse for HeaderValue { fn parse(v: &str) -> Result { Self::from_str(v).map_err(|_| Error::Generic {