diff --git a/components/data_proxy/src/caching/cache.rs b/components/data_proxy/src/caching/cache.rs index b7937397..e741b0f7 100644 --- a/components/data_proxy/src/caching/cache.rs +++ b/components/data_proxy/src/caching/cache.rs @@ -5,6 +5,7 @@ use crate::data_backends::storage_backend::StorageBackend; use crate::database::persistence::{delete_parts_by_object_id, delete_parts_by_upload_id}; use crate::replication::replication_handler::ReplicationMessage; use crate::s3_frontend::data_handler::DataHandler; +use crate::s3_frontend::utils::checksum::ChecksumHandler; use crate::structs::{ AccessKeyPermissions, Bundle, DbPermissionLevel, LocationBinding, ObjectType, TypedId, UploadPart, User, @@ -385,6 +386,7 @@ impl Cache { backend, temp_location, None, + ChecksumHandler::new(), ) .await { @@ -831,7 +833,7 @@ impl Cache { } #[tracing::instrument(level = "trace", skip(self, object))] - pub async fn upsert_object(&self, object: Object) -> Result<()> { + pub async fn upsert_object(&self, mut object: Object) -> Result<()> { trace!(?object, "upserting object"); if let Ok((rwlock_object, _)) = self.get_resource(&object.id) { @@ -839,6 +841,11 @@ impl Cache { if *cache_object == object { return Ok(()); } + + // Add/Update existing checksums to object hashes + let mut proxy_hashes = cache_object.hashes.clone(); + proxy_hashes.extend(object.hashes.clone()); + object.hashes = proxy_hashes; } if let Some(persistence) = self.persistence.read().await.as_ref() { @@ -921,6 +928,32 @@ impl Cache { Ok(()) } + #[tracing::instrument(level = "trace", skip(self, object_id, hashes))] + pub async fn update_object_hashes( + &self, + object_id: DieselUlid, + hashes: HashMap, + ) -> Result<()> { + if let Ok((rwlock_object, _)) = self.get_resource(&object_id) { + // Update hashes in cache object + let mut cache_object = rwlock_object.write().await; + cache_object.hashes.extend(hashes); + + // Write update to persistence if available + if let Some(persistence) = self.persistence.read().await.as_ref() { + let mut client = persistence.get_client().await?; + let transaction = client.transaction().await?; + let transaction_client = transaction.client(); + cache_object.upsert(transaction_client).await?; + transaction.commit().await?; + } + } else { + bail!("Object with id {} not found", object_id); + } + + Ok(()) + } + #[tracing::instrument(level = "trace", skip(self))] pub async fn delete_object(&self, id: DieselUlid) -> Result<()> { // Remove object and location from database @@ -1315,6 +1348,7 @@ impl Cache { part_number: u64, raw_size: u64, final_size: u64, + checksums: HashMap, ) -> Result<()> { let part = UploadPart { id: DieselUlid::generate(), @@ -1323,6 +1357,7 @@ impl Cache { object_id, upload_id: upload_id.clone(), raw_size, + checksums: Some(checksums), }; if let Some(persistence) = self.persistence.read().await.as_ref() { part.upsert(persistence.get_client().await?.client()) diff --git a/components/data_proxy/src/caching/grpc_query_handler.rs b/components/data_proxy/src/caching/grpc_query_handler.rs index f9c536b0..b53eea8f 100644 --- a/components/data_proxy/src/caching/grpc_query_handler.rs +++ b/components/data_proxy/src/caching/grpc_query_handler.rs @@ -688,7 +688,7 @@ impl GrpcQueryHandler { let mut req = Request::new(FinishObjectStagingRequest { object_id: server_object.id.to_string(), content_len, - hashes: proxy_object.get_hashes(), // Hashes stay the same + hashes: proxy_object.get_api_safe_hashes()?, // Hashes stay the same completed_parts: vec![], upload_id: "".to_string(), // Upload id only needed in requests from users }); @@ -711,8 +711,9 @@ impl GrpcQueryHandler { anyhow!("Object missing in FinishObjectResponse") })?; - // Id of location record should be set to Dataproxy Object id but is set to Server Object id... the fuck? - let object = DPObject::try_from(response)?; + // Convert Object from response to Dataproxy internal representation and merge hashes + let mut object = DPObject::try_from(response)?; + object.hashes.extend(proxy_object.hashes); // Persist Object and Location in cache/database self.cache.upsert_object(object.clone()).await?; diff --git a/components/data_proxy/src/s3_frontend/data_handler.rs b/components/data_proxy/src/s3_frontend/data_handler.rs index f735d611..5094a6e3 100644 --- a/components/data_proxy/src/s3_frontend/data_handler.rs +++ b/components/data_proxy/src/s3_frontend/data_handler.rs @@ -1,6 +1,9 @@ use crate::caching::cache::Cache; use crate::data_backends::storage_backend::StorageBackend; -use crate::s3_frontend::utils::buffered_s3_sink::BufferedS3Sink; +use crate::s3_frontend::utils::checksum::{ChecksumHandler, IntegrityChecksum}; +use crate::s3_frontend::utils::{ + buffered_s3_sink::BufferedS3Sink, crc_transformer::CrcTransformer, +}; use crate::structs::Object; use crate::structs::ObjectLocation; use crate::structs::VersionVariant; @@ -9,6 +12,8 @@ use anyhow::Result; use aruna_rust_api::api::storage::models::v2::Hash; use aruna_rust_api::api::storage::models::v2::Hashalgorithm; use aruna_rust_api::api::storage::models::v2::Status; +use crc_fast::CrcAlgorithm; +use crc_fast::Digest as CrcDigest; use diesel_ulid::DieselUlid; use md5::{Digest, Md5}; use pithos_lib::streamreadwrite::GenericStreamReadWriter; @@ -21,6 +26,7 @@ use pithos_lib::transformers::pithos_comp_enc::PithosTransformer; use pithos_lib::transformers::size_probe::SizeProbe; use pithos_lib::transformers::zstd_comp::ZstdEnc; use pithos_lib::transformers::zstd_decomp::ZstdDec; +use s3s::dto::{ChecksumType, CompleteMultipartUploadOutput, ETag}; use sha2::Sha256; use std::fmt::Debug; use std::sync::Arc; @@ -45,7 +51,8 @@ impl DataHandler { backend: Arc>, before_location: ObjectLocation, path_level: Option<[Option<(DieselUlid, String)>; 4]>, - ) -> Result<()> { + mut checksum_handler: ChecksumHandler, + ) -> Result { let token = if let Some(handler) = cache.auth.read().await.as_ref() { let Some(created_by) = object.created_by else { error!("No created_by found"); @@ -157,17 +164,38 @@ impl DataHandler { asr = asr.add_transformer(ZstdDec::new()); } - let (uncompressed_probe, uncompressed_stream) = SizeProbe::new(); + // [Optional] Checksums + let (crc32_t, crc32_rx) = CrcTransformer::new_with_backchannel( + CrcDigest::new(CrcAlgorithm::Crc32IsoHdlc), + false, + ); + let (crc32c_t, crc32c_rx) = CrcTransformer::new_with_backchannel( + CrcDigest::new(CrcAlgorithm::Crc32Iscsi), + false, + ); + let (crc64nvme_t, crc64nvme_rx) = CrcTransformer::new_with_backchannel( + CrcDigest::new(CrcAlgorithm::Crc64Nvme), + false, + ); - asr = asr.add_transformer(uncompressed_probe); + asr = asr.add_transformer(crc32_t); + asr = asr.add_transformer(crc32c_t); + asr = asr.add_transformer(crc64nvme_t); - let (sha_transformer, sha_recv) = + // Hashes + let (sha_t, sha_rx) = HashingTransformer::new_with_backchannel(Sha256::new(), "sha256".to_string()); - let (md5_transformer, md5_recv) = + let (md5_t, md5_rx) = HashingTransformer::new_with_backchannel(Md5::new(), "md5".to_string()); + //TODO: HashingTransformer sends hash to Footer which does not support SHA1 ... + //let (sha_transformer, sha_recv) = HashingTransformer::new_with_backchannel(Sha1::new(), "sha1".to_string()); + + asr = asr.add_transformer(sha_t); + asr = asr.add_transformer(md5_t); - asr = asr.add_transformer(sha_transformer); - asr = asr.add_transformer(md5_transformer); + // Log uncompressed size + let (uncompressed_probe, uncompressed_stream) = SizeProbe::new(); + asr = asr.add_transformer(uncompressed_probe); if new_location_clone.is_pithos() { tx.send(pithos_lib::helpers::notifications::Message::FileContext( @@ -205,19 +233,34 @@ impl DataHandler { e })?; - Ok::<(u64, u64, String, String, String), anyhow::Error>(( + // Fetch hashes + for (key, rx, hex_to_b64) in [ + ("sha256", sha_rx, true), + ("md5", md5_rx, true), + ("crc32", crc32_rx, false), + ("crc32c", crc32c_rx, false), + ("crc64nvme", crc64nvme_rx, false), + ] { + //hashes.insert( + checksum_handler.add_calculated_checksum( + //key.into(), + key, + rx.try_recv().inspect_err(|&e| { + error!(error = ?e, msg = e.to_string()); + })?, + hex_to_b64, + )?; + } + + //Ok::<(u64, u64, HashMap, String), anyhow::Error>(( + Ok::<(u64, u64, ChecksumHandler, String), anyhow::Error>(( disk_size_stream.try_recv().inspect_err(|&e| { error!(error = ?e, msg = e.to_string()); })?, uncompressed_stream.try_recv().inspect_err(|&e| { error!(error = ?e, msg = e.to_string()); })?, - sha_recv.try_recv().inspect_err(|&e| { - error!(error = ?e, msg = e.to_string()); - })?, - md5_recv.try_recv().inspect_err(|&e| { - error!(error = ?e, msg = e.to_string()); - })?, + checksum_handler, final_sha_recv.try_recv().inspect_err(|&e| { error!(error = ?e, msg = e.to_string()); })?, @@ -242,7 +285,7 @@ impl DataHandler { } } - let (before_size, after_size, sha, md5, final_sha) = aswr_handle + let (before_size, after_size, checksum_handler, final_sha) = aswr_handle .await .map_err(|e| { error!(error = ?e, msg = e.to_string()); @@ -259,20 +302,32 @@ impl DataHandler { debug!(new_location = ?new_location, "Finished finalizing location"); - let hashes = vec![ - Hash { - alg: Hashalgorithm::Sha256.into(), - hash: sha, - }, - Hash { - alg: Hashalgorithm::Md5.into(), - hash: md5, - }, - ]; + // Already store hashes in cache/database + cache + .update_object_hashes( + object.id, + checksum_handler.get_calculated_checksums().clone(), + ) + .await?; if let Some(handler) = cache.aruna_client.read().await.as_ref() { - // Set id of new location to object id to satisfy FK constraint - // TODO: Update hashes etc. + // Send SHA256 and MD5 hashes to Aruna server + let hashes = vec![ + Hash { + alg: Hashalgorithm::Sha256.into(), + hash: checksum_handler + .get_checksum_by_key("sha256", true)? + .ok_or_else(|| anyhow!("SHA256 not found."))? + .clone(), + }, + Hash { + alg: Hashalgorithm::Md5.into(), + hash: checksum_handler + .get_checksum_by_key("md5", true)? + .ok_or_else(|| anyhow!("MD5 not found."))? + .clone(), + }, + ]; handler .set_object_hashes(&object.id, hashes, &token) @@ -287,10 +342,30 @@ impl DataHandler { .to_string(); backend.delete_object(before_location).await?; + // Remove parts from database cache.delete_parts_by_upload_id(upload_id).await?; } - Ok(()) + // Create basic response with Etag + let mut output = CompleteMultipartUploadOutput { + e_tag: Some(ETag::Strong(format!("-{}", object.id))), + ..Default::default() + }; + + // Add required checksum to response + if let Some(required) = &checksum_handler.required_checksum { + let checksum = checksum_handler.get_calculated_checksum(); + match required { + IntegrityChecksum::CRC32(_) => output.checksum_crc32 = checksum, + IntegrityChecksum::CRC32C(_) => output.checksum_crc32c = checksum, + IntegrityChecksum::CRC64NVME(_) => output.checksum_crc64nvme = checksum, + IntegrityChecksum::_SHA1(_) => output.checksum_sha1 = checksum, + IntegrityChecksum::SHA256(_) => output.checksum_sha256 = checksum, + } + output.checksum_type = Some(ChecksumType::from_static(ChecksumType::FULL_OBJECT)); + } + + Ok(output) } } diff --git a/components/data_proxy/src/s3_frontend/s3server.rs b/components/data_proxy/src/s3_frontend/s3server.rs index 921147f9..2de73ffe 100644 --- a/components/data_proxy/src/s3_frontend/s3server.rs +++ b/components/data_proxy/src/s3_frontend/s3server.rs @@ -190,7 +190,6 @@ impl Service> for WrappingService { .map_err(|_| s3_error!(InternalError, "Failed to add CORS header")); final_response.await }) - //final_response.boxed() } } @@ -201,37 +200,11 @@ impl AsRef for WrappingService { } } -/* -impl WrappingService { - #[tracing::instrument(level = "trace", skip(self))] - #[must_use] - pub fn into_make_service(self) -> MakeService { - MakeService(self) - } -} - -#[derive(Clone)] -pub struct MakeService(S); - -impl Service for MakeService { - type Response = S; - - type Error = Infallible; - - type Future = Ready>; - - #[tracing::instrument(level = "trace", skip(self))] - fn call(&self, _: T) -> Self::Future { - ready(Ok(self.0.clone())) - } -} -*/ - impl WrappingService { async fn get_cors_headers( &self, origin_exception: bool, - req: &hyper::Request, + req: &Request, ) -> Result { // Return all * if origin exception matches if origin_exception { diff --git a/components/data_proxy/src/s3_frontend/s3service.rs b/components/data_proxy/src/s3_frontend/s3service.rs index 7b3fc5ed..36e44fdd 100644 --- a/components/data_proxy/src/s3_frontend/s3service.rs +++ b/components/data_proxy/src/s3_frontend/s3service.rs @@ -4,6 +4,8 @@ use super::utils::ranges::calculate_ranges; use crate::bundler::bundle_helper::get_bundle; use crate::caching::cache::Cache; use crate::data_backends::storage_backend::StorageBackend; +use crate::s3_frontend::utils::checksum::{ChecksumHandler, IntegrityChecksum}; +use crate::s3_frontend::utils::crc_transformer::CrcTransformer; use crate::s3_frontend::utils::list_objects::list_response; use crate::structs::CheckAccessResult; use crate::structs::NewOrExistingObject; @@ -17,17 +19,15 @@ use aruna_rust_api::api::storage::models::v2::Hash; use aruna_rust_api::api::storage::models::v2::Hashalgorithm; use aruna_rust_api::api::storage::models::v2::Status; use base64::engine::general_purpose; -use base64::prelude::BASE64_STANDARD; use base64::Engine; use bytes::BufMut; use bytes::BytesMut; -use futures_util::TryStreamExt; +use crc_fast::{CrcAlgorithm, Digest as CrcDigest}; +use futures_util::{FutureExt, TryStreamExt}; use http::HeaderName; use http::HeaderValue; use md5::{Digest, Md5}; -use pithos_lib::helpers::footer_parser::Footer; -use pithos_lib::helpers::footer_parser::FooterParser; -use pithos_lib::helpers::footer_parser::FooterParserState; +use pithos_lib::helpers::footer_parser::{Footer, FooterParser, FooterParserState}; use pithos_lib::helpers::notifications::Message as PithosMessage; use pithos_lib::streamreadwrite::GenericStreamReadWriter; use pithos_lib::transformer::ReadWriter; @@ -43,24 +43,14 @@ use pithos_lib::transformers::zstd_comp::ZstdEnc; use pithos_lib::transformers::zstd_decomp::ZstdDec; use s3s::dto::*; use s3s::s3_error; -use s3s::S3Error; -use s3s::S3Request; -use s3s::S3Response; -use s3s::S3Result; -use s3s::S3; +use s3s::{S3Error, S3Request, S3Response, S3Result, S3}; use sha2::Sha256; -use std::collections::HashMap; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::str::FromStr; use std::sync::Arc; use tokio::pin; -use tracing::debug; -use tracing::error; -use tracing::info_span; -use tracing::trace; -use tracing::warn; -use tracing::Instrument; +use tracing::{debug, error, info_span, trace, warn, Instrument}; pub struct ArunaS3Service { pub backend: Arc>, @@ -97,22 +87,36 @@ impl ArunaS3Service { &self, object: ProxyObject, headers: Option>, + checksum_handler: ChecksumHandler, ) -> S3Result> { - let md5 = Md5::new().finalize(); - let sha256 = Sha256::new().finalize(); - - let output = PutObjectOutput { - e_tag: Some(ETag::Strong(format!("{md5:x}"))), - checksum_sha256: Some(BASE64_STANDARD.encode(sha256).to_string()), + // Base output + let mut output = PutObjectOutput { + e_tag: Some(ETag::Strong(format!("{:x}", Md5::new().finalize()))), version_id: Some(object.id.to_string()), + size: Some(0), ..Default::default() }; + // Add optionally required checksum + if let Some(required) = checksum_handler.required_checksum { + let empty_checksum = required.get_empty_checksum(); + match required { + IntegrityChecksum::CRC32(_) => output.checksum_crc32 = Some(empty_checksum), + IntegrityChecksum::CRC32C(_) => output.checksum_crc32c = Some(empty_checksum), + IntegrityChecksum::CRC64NVME(_) => output.checksum_crc64nvme = Some(empty_checksum), + IntegrityChecksum::_SHA1(_) => output.checksum_sha1 = Some(empty_checksum), + IntegrityChecksum::SHA256(_) => output.checksum_sha256 = Some(empty_checksum), + } + output.checksum_type = Some(ChecksumType::from_static(ChecksumType::FULL_OBJECT)); + } + + // Insert temp resource into cache self.cache.insert_temp_resource(object).await.map_err(|_| { error!(error = "Unable to temporarily cache empty object"); s3_error!(InternalError, "Unable to temporarily cache empty object") })?; + // Add other provided headers to response let mut resp = S3Response::new(output); if let Some(headers) = headers { for (k, v) in headers { @@ -124,6 +128,7 @@ impl ArunaS3Service { ); } } + debug!(?resp); Ok(resp) } @@ -150,6 +155,9 @@ impl S3 for ArunaS3Service { s3_error!(UnexpectedContent, "Missing data context") })?; + // Init checksum handler + let checksum_handler = ChecksumHandler::try_from(&req.headers)?; + let impersonating_token = user_state.sign_impersonating_token(self.cache.auth.read().await.as_ref()); @@ -231,11 +239,6 @@ impl S3 for ArunaS3Service { s3_error!(InternalError, "Unable to finish upload") })?; - let response = CompleteMultipartUploadOutput { - e_tag: Some(ETag::Strong(format!("-{}", object.id))), - ..Default::default() - }; - old_location.disk_content_len = disk_size as i64; old_location.raw_content_len = cumulative_size as i64; @@ -260,13 +263,31 @@ impl S3 for ArunaS3Service { } } - tokio::spawn(DataHandler::finalize_location( - object, - self.cache.clone(), - self.backend.clone(), - old_location, - Some(objects_state.try_slice()?), - )); + let object_clone = object.clone(); + let cache_clone = self.cache.clone(); + let backend_clone = self.backend.clone(); + let path_level = objects_state.try_slice()?; + let future = async move { + let response = DataHandler::finalize_location( + object_clone, + cache_clone, + backend_clone, + old_location, + Some(path_level), + checksum_handler, + ) + .await + .map_err(|e| s3_error!(InternalError, "{e}"))?; + + Ok::(response) + } + .boxed(); + + let response = CompleteMultipartUploadOutput { + e_tag: Some(ETag::Strong(format!("-{}", object.id))), + future: Some(future), + ..Default::default() + }; debug!(?response); let mut resp = S3Response::new(response); @@ -626,6 +647,9 @@ impl S3 for ArunaS3Service { s3_error!(InternalError, "No context found") })?; + // Init checksum handler + let checksum_handler = ChecksumHandler::from_headers(&req.headers)?; + let ObjectsState::Regular { states, location } = objects_state else { let (levels, name) = match objects_state { ObjectsState::Bundle { bundle, .. } => ( @@ -868,7 +892,7 @@ impl S3 for ArunaS3Service { .first() .and_then(|mime_guess| ContentType::from_str(mime_guess.as_ref()).ok()); - let output = GetObjectOutput { + let mut output = GetObjectOutput { body, accept_ranges, content_range, @@ -880,6 +904,19 @@ impl S3 for ArunaS3Service { content_disposition: Some(format!(r#"attachment;filename="{}""#, object.name)), ..Default::default() }; + // Optionally add checksums (if available) to output + if checksum_handler.checksum_mode { + for (alg, hash) in object.hashes.iter() { + match alg.as_str() { + "crc32" => output.checksum_crc32 = Some(hash.clone()), + "crc32c" => output.checksum_crc32c = Some(hash.clone()), + "crc64nvme" => output.checksum_crc64nvme = Some(hash.clone()), + "sha1" => output.checksum_sha1 = Some(hash.clone()), + "sha256" => output.checksum_sha256 = Some(hash.clone()), + _ => {} + } + } + } debug!(?output); let mut resp = S3Response::new(output); @@ -915,6 +952,9 @@ impl S3 for ArunaS3Service { s3_error!(InternalError, "No context found") })?; + // Init checksum handler + let checksum_handler = ChecksumHandler::from_headers(&req.headers)?; + if let ObjectsState::Bundle { bundle, .. } = objects_state { return Ok(S3Response::new(HeadObjectOutput { content_length: None, @@ -941,7 +981,7 @@ impl S3 for ArunaS3Service { .first() .and_then(|mime_guess| ContentType::from_str(mime_guess.as_ref()).ok()); - let output = HeadObjectOutput { + let mut output = HeadObjectOutput { content_length: Some(content_len), last_modified: Some( time::OffsetDateTime::from_unix_timestamp((object.id.timestamp() / 1000) as i64) @@ -956,10 +996,23 @@ impl S3 for ArunaS3Service { content_type: mime, ..Default::default() }; - debug!(?output); debug!(?headers); + // Optionally add checksums (if available) to output + if checksum_handler.checksum_mode { + for (alg, hash) in object.hashes { + match alg.as_str() { + "crc32" => output.checksum_crc32 = Some(hash), + "crc32c" => output.checksum_crc32c = Some(hash), + "crc64nvme" => output.checksum_crc64nvme = Some(hash), + "sha1" => output.checksum_sha1 = Some(hash), + "sha256" => output.checksum_sha256 = Some(hash), + _ => {} + } + } + } + let mut resp = S3Response::new(output); if let Some(headers) = headers { for (k, v) in headers { @@ -1696,6 +1749,9 @@ impl S3 for ArunaS3Service { s3_error!(UnexpectedContent, "Missing data context") })?; + // Init checksum handler + let mut checksum_handler = ChecksumHandler::from_headers(&req.headers)?; + let impersonating_token = user_state.sign_impersonating_token(self.cache.auth.read().await.as_ref()); @@ -1747,7 +1803,17 @@ impl S3 for ArunaS3Service { trace!(?new_object); match req.input.content_length { Some(0) => { - return self.put_empty_object(new_object, headers).await; + // Create temp object with full key as name (as normal S3 would do) and project (bucket) as parent + new_object.name = req.input.key; + new_object.parents = Some(HashSet::from_iter([TypedRelation::Project( + states + .get_project() + .ok_or_else(|| s3_error!(NoSuchBucket, "Bucket does not exist"))? + .id, + )])); + return self + .put_empty_object(new_object, headers, checksum_handler) + .await; } None => { error!("Missing content-length"); @@ -1769,11 +1835,17 @@ impl S3 for ArunaS3Service { trace!("Initialized data location"); // Initialize hashing transformers - let (initial_sha_trans, initial_sha_recv) = + let (initial_size_trans, initial_size_rx) = SizeProbe::new(); + let (initial_sha_trans, initial_sha_rx) = HashingTransformer::new_with_backchannel(Sha256::new(), "sha256".to_string()); - let (initial_md5_trans, initial_md5_recv) = + let (md5_trans, md5_rx) = HashingTransformer::new_with_backchannel(Md5::new(), "md5".to_string()); - let (initial_size_trans, initial_size_recv) = SizeProbe::new(); + let (crc32_t, crc32_rx) = + CrcTransformer::new_with_backchannel(CrcDigest::new(CrcAlgorithm::Crc32IsoHdlc), false); + let (crc32c_t, crc32c_rx) = + CrcTransformer::new_with_backchannel(CrcDigest::new(CrcAlgorithm::Crc32Iscsi), false); + let (crc64nvme_t, crc64nvme_rx) = + CrcTransformer::new_with_backchannel(CrcDigest::new(CrcAlgorithm::Crc64Nvme), false); let (final_sha_trans, final_sha_recv) = HashingTransformer::new_with_backchannel(Sha256::new(), "sha256".to_string()); let (final_size_trans, final_size_recv) = SizeProbe::new(); @@ -1781,7 +1853,6 @@ impl S3 for ArunaS3Service { match req.input.body { Some(data) => { let (tx, rx) = async_channel::bounded(10); - let mut awr = GenericStreamReadWriter::new_with_sink( data, BufferedS3Sink::new( @@ -1801,8 +1872,11 @@ impl S3 for ArunaS3Service { s3_error!(InternalError, "Internal notifier error") })?; + awr = awr.add_transformer(crc32_t); + awr = awr.add_transformer(crc32c_t); + awr = awr.add_transformer(crc64nvme_t); awr = awr.add_transformer(initial_sha_trans); - awr = awr.add_transformer(initial_md5_trans); + awr = awr.add_transformer(md5_trans); awr = awr.add_transformer(initial_size_trans); if location.is_compressed() && !location.is_pithos() { @@ -1898,19 +1972,33 @@ impl S3 for ArunaS3Service { // Fetch calculated hashes trace!("fetching hashes"); - let md5_initial = Some(initial_md5_recv.try_recv().map_err(|_| { - error!(error = "Unable to md5 hash initial data"); - s3_error!(InternalError, "Unable to md5 hash initial data") - })?); - let sha_initial = Some(initial_sha_recv.try_recv().map_err(|_| { - error!(error = "Unable to sha hash initial data"); - s3_error!(InternalError, "Unable to sha hash initial data") - })?); + for (key, rx, hex_to_b64) in [ + ("sha256", initial_sha_rx, true), + ("md5", md5_rx, true), + ("crc32", crc32_rx, false), + ("crc32c", crc32c_rx, false), + ("crc64nvme", crc64nvme_rx, false), + ] { + checksum_handler.add_calculated_checksum( + key, + rx.try_recv().map_err(|_| { + error!("Unable to fetch {key} from channel"); + s3_error!(InternalError, "Unable to {key} hash initial data") + })?, + hex_to_b64, + )?; + } + + // Validate optionally provided checksum + if !checksum_handler.validate_checksum() { + return Err(s3_error!(BadDigest, "Checksum validation failed")); + } + let sha_final: String = final_sha_recv.try_recv().map_err(|_| { error!(error = "Unable to sha hash final data"); s3_error!(InternalError, "Unable to sha hash final data") })?; - let initial_size: u64 = initial_size_recv.try_recv().map_err(|_| { + let initial_size: u64 = initial_size_rx.try_recv().map_err(|_| { error!(error = "Unable to get size"); s3_error!(InternalError, "Unable to get size") })?; @@ -1919,57 +2007,38 @@ impl S3 for ArunaS3Service { s3_error!(InternalError, "Unable to get size") })?; - new_object.hashes = vec![ - ( - "MD5".to_string(), - md5_initial.clone().ok_or_else(|| { - error!(error = "Unable to get md5 hash initial data"); - s3_error!(InternalError, "Unable to get md5 hash initial data") - })?, - ), - ( - "SHA256".to_string(), - sha_initial.clone().ok_or_else(|| { - error!(error = "Unable to get sha hash initial data"); - s3_error!(InternalError, "Unable to get sha hash initial data") - })?, - ), - ] - .into_iter() - .collect::>(); + location.raw_content_len = initial_size as i64; + location.disk_content_len = final_size as i64; + location.disk_hash = Some(sha_final.clone()); - let hashes = vec![ + let proto_hashes = vec![ Hash { - alg: Hashalgorithm::Sha256.into(), - hash: sha_initial.clone().ok_or_else(|| { - error!(error = "Unable to get sha hash initial data"); - s3_error!(InternalError, "Unable to get sha hash initial data") - })?, + alg: Hashalgorithm::Md5.into(), + hash: checksum_handler + .get_checksum_by_key("md5", true)? + .ok_or_else(|| { + error!(error = "Unable to get md5 hash initial data"); + s3_error!(InternalError, "Unable to get md5 hash initial data") + })?, }, Hash { - alg: Hashalgorithm::Md5.into(), - hash: md5_initial.clone().ok_or_else(|| { - error!(error = "Unable to get md5 hash initial data"); - s3_error!(InternalError, "Unable to get md5 hash initial data") - })?, + alg: Hashalgorithm::Sha256.into(), + hash: checksum_handler + .get_checksum_by_key("sha256", true)? + .ok_or_else(|| { + error!(error = "Unable to get sha hash initial data"); + s3_error!(InternalError, "Unable to get sha hash initial data") + })?, }, ]; - location.raw_content_len = initial_size as i64; - location.disk_content_len = final_size as i64; - location.disk_hash = Some(sha_final.clone()); - trace!("finishing object"); + new_object.hashes = checksum_handler.get_calculated_checksums().clone(); if let Some(handler) = self.cache.aruna_client.read().await.as_ref() { if let Some(token) = &impersonating_token { if was_init { new_object = handler - .finish_object( - new_object.id, - location.raw_content_len, - hashes.clone(), - token, - ) + .finish_object(new_object.id, location.raw_content_len, proto_hashes, token) .await .map_err(|_| { error!(error = "Unable to finish object"); @@ -1995,10 +2064,21 @@ impl S3 for ArunaS3Service { s3_error!(InternalError, "Unable to add location with binding") })?; - let output = PutObjectOutput { + let mut output = PutObjectOutput { e_tag: Some(ETag::Strong(format!("-{}", new_object.id))), ..Default::default() }; + if let Some(required) = &checksum_handler.required_checksum { + let checksum = checksum_handler.get_calculated_checksum(); + match required { + IntegrityChecksum::CRC32(_) => output.checksum_crc32 = checksum, + IntegrityChecksum::CRC32C(_) => output.checksum_crc32c = checksum, + IntegrityChecksum::CRC64NVME(_) => output.checksum_crc64nvme = checksum, + IntegrityChecksum::_SHA1(_) => output.checksum_sha1 = checksum, + IntegrityChecksum::SHA256(_) => output.checksum_sha256 = checksum, + } + output.checksum_type = Some(ChecksumType::from_static(ChecksumType::FULL_OBJECT)); + } debug!(?output); let mut resp = S3Response::new(output); @@ -2061,6 +2141,10 @@ impl S3 for ArunaS3Service { s3_error!(NoSuchKey, "Object not found") })?; + // Init checksum handler + let mut checksum_handler = ChecksumHandler::from_headers(&req.headers)?; + + // Upload part let etag = match req.input.body { Some(data) => { trace!("streaming data to backend"); @@ -2077,11 +2161,35 @@ impl S3 for ArunaS3Service { let mut awr = GenericStreamReadWriter::new_with_sink(data, sink); + // [Optional] Checksums + let (sha_t, sha_rx) = + HashingTransformer::new_with_backchannel(Sha256::new(), "sha256".to_string()); + let (md5_t, md5_rx) = + HashingTransformer::new_with_backchannel(Md5::new(), "md5".to_string()); + let (crc32_t, crc32_rx) = CrcTransformer::new_with_backchannel( + CrcDigest::new(CrcAlgorithm::Crc32IsoHdlc), + false, + ); + let (crc32c_t, crc32c_rx) = CrcTransformer::new_with_backchannel( + CrcDigest::new(CrcAlgorithm::Crc32Iscsi), + false, + ); + let (crc64nvme_t, crc64nvme_rx) = CrcTransformer::new_with_backchannel( + CrcDigest::new(CrcAlgorithm::Crc64Nvme), + false, + ); + + awr = awr.add_transformer(crc32_t); + awr = awr.add_transformer(crc32c_t); + awr = awr.add_transformer(crc64nvme_t); + awr = awr.add_transformer(sha_t); + awr = awr.add_transformer(md5_t); + + // Size probe before encryption let (before_probe, before_receiver) = SizeProbe::new(); awr = awr.add_transformer(before_probe); - let (after_probe, after_receiver) = SizeProbe::new(); - + // Encrypt if encryption key is available if let Some(enc_key) = &location.get_encryption_key() { trace!("adding chacha20 encryption"); awr = awr.add_transformer(ChaCha20Enc::new_with_fixed(*enc_key).map_err( @@ -2092,6 +2200,8 @@ impl S3 for ArunaS3Service { )?); } + // Size probe after encryption + let (after_probe, after_receiver) = SizeProbe::new(); awr = awr.add_transformer(after_probe); awr.process().await.map_err(|_| { @@ -2109,6 +2219,29 @@ impl S3 for ArunaS3Service { s3_error!(InternalError, "Unable to get size") })?; + // Fetch hashes + for (key, rx, hex_to_b64) in [ + ("md5", md5_rx, true), + ("sha256", sha_rx, true), + ("crc32", crc32_rx, false), + ("crc32c", crc32c_rx, false), + ("crc64nvme", crc64nvme_rx, false), + ] { + checksum_handler.add_calculated_checksum( + key, + rx.try_recv().map_err(|_| { + error!("Unable to fetch {key} from channel"); + s3_error!(InternalError, "Unable to {key} hash initial data") + })?, + hex_to_b64, + )?; + } + + // Validate optionally provided checksum + if !checksum_handler.validate_checksum() { + return Err(s3_error!(BadDigest, "Checksum validation failed")); + } + self.cache .create_multipart_upload( location.upload_id.ok_or_else(|| { @@ -2119,6 +2252,7 @@ impl S3 for ArunaS3Service { req.input.part_number as u64, before_size, after_size, + checksum_handler.get_calculated_checksums().clone(), ) .await .map_err(|_| { @@ -2143,10 +2277,21 @@ impl S3 for ArunaS3Service { }; // Create basic response output - let output = UploadPartOutput { + let mut output = UploadPartOutput { e_tag: Some(ETag::Strong(format!("-{etag}"))), ..Default::default() }; + // Add required checksum to response + if let Some(required) = &checksum_handler.required_checksum { + let checksum = checksum_handler.get_calculated_checksum(); + match required { + IntegrityChecksum::CRC32(_) => output.checksum_crc32 = checksum, + IntegrityChecksum::CRC32C(_) => output.checksum_crc32c = checksum, + IntegrityChecksum::CRC64NVME(_) => output.checksum_crc64nvme = checksum, + IntegrityChecksum::_SHA1(_) => output.checksum_sha1 = checksum, + IntegrityChecksum::SHA256(_) => output.checksum_sha256 = checksum, + } + } debug!(?output); let mut resp = S3Response::new(output); diff --git a/components/data_proxy/src/s3_frontend/utils/checksum.rs b/components/data_proxy/src/s3_frontend/utils/checksum.rs new file mode 100644 index 00000000..4079e323 --- /dev/null +++ b/components/data_proxy/src/s3_frontend/utils/checksum.rs @@ -0,0 +1,555 @@ +use aws_sdk_s3::types::ChecksumType; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use http::{HeaderMap, HeaderValue}; +use s3s::{s3_error, S3Error}; +use std::collections::HashMap; +use std::fmt::Formatter; + +pub const CRC32_HEADER: &str = "x-amz-checksum-crc32"; +pub const CRC32C_HEADER: &str = "x-amz-checksum-crc32c"; +pub const CRC64NVME_HEADER: &str = "x-amz-checksum-crc64nvme"; +pub const SHA1_HEADER: &str = "x-amz-checksum-sha1"; +pub const SHA256_HEADER: &str = "x-amz-checksum-sha256"; + +pub const CRC32_EMPTY: &str = "AAAAAA=="; +pub const CRC32C_EMPTY: &str = "AAAAAA=="; +pub const CRC64NVME_EMPTY: &str = "AAAAAAAAAAA="; +pub const SHA1_EMPTY: &str = "2jmj7l5rSw0yVb/vlWAYkK/YBwk="; +pub const SHA256_EMPTY: &str = "47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU="; + +#[derive(Clone, Debug)] +pub enum IntegrityChecksum { + CRC32(Option), + CRC32C(Option), + CRC64NVME(Option), + _SHA1(Option), // For the sake of completeness. Not supported. + SHA256(Option), +} + +impl std::fmt::Display for IntegrityChecksum { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + IntegrityChecksum::CRC32(_) => write!(f, "crc32"), + IntegrityChecksum::CRC32C(_) => write!(f, "crc32c"), + IntegrityChecksum::CRC64NVME(_) => write!(f, "crc64nvme"), + IntegrityChecksum::_SHA1(_) => write!(f, "sha1"), + IntegrityChecksum::SHA256(_) => write!(f, "sha256"), + } + } +} + +impl TryFrom<&HeaderValue> for IntegrityChecksum { + type Error = S3Error; + + #[tracing::instrument(level = "trace", skip(value))] + fn try_from(value: &HeaderValue) -> Result { + match header_value_to_str(value)? { + "CRC32" => Ok(IntegrityChecksum::CRC32(None)), + "CRC32C" => Ok(IntegrityChecksum::CRC32C(None)), + "CRC64NVME" => Ok(IntegrityChecksum::CRC64NVME(None)), + "SHA256" => Ok(IntegrityChecksum::SHA256(None)), + "SHA1" => Err(s3_error!( + NotImplemented, + "Sha1 checksum integrity validation is not supported" + )), + _ => Err(s3_error!(InvalidArgument, "invalid checksum algorithm")), + } + } +} + +impl TryFrom<(&str, Option<&HeaderValue>)> for IntegrityChecksum { + type Error = S3Error; + + fn try_from(value: (&str, Option<&HeaderValue>)) -> Result { + let provided_checksum = match value.1 { + None => None, + Some(val) => Some(header_value_to_str(val)?.to_string()), + }; + + match value.0 { + "CRC32" => Ok(IntegrityChecksum::CRC32(provided_checksum)), + "CRC32C" => Ok(IntegrityChecksum::CRC32C(provided_checksum)), + "CRC64NVME" => Ok(IntegrityChecksum::CRC64NVME(provided_checksum)), + "SHA256" => Ok(IntegrityChecksum::SHA256(provided_checksum)), + "SHA1" => Err(s3_error!( + NotImplemented, + "Sha1 checksum integrity validation is not supported" + )), + _ => Err(s3_error!(InvalidArgument, "invalid checksum algorithm")), + } + } +} + +impl TryFrom<&HeaderMap> for ChecksumHandler { + type Error = S3Error; + + #[tracing::instrument(level = "trace", skip(value))] + fn try_from(value: &HeaderMap) -> Result { + // Init default ChecksumHandler + let mut handler = Self::default(); + + // Set checksum type + if let Some(header_value) = value.get("x-amz-checksum-type") { + let checksum_type = header_value_to_str(header_value)?; + handler.checksum_type = Some(ChecksumType::from(checksum_type)); + } + + // Set checksum mode + handler.checksum_mode = value.contains_key("x-amz-checksum-mode"); + + // Eval required checksum + handler.required_checksum = eval_required_checksum(value)?; + + Ok(handler) + } +} + +impl IntegrityChecksum { + /// Get the checksum header name for this algorithm + pub fn checksum_header_name(&self) -> &str { + match self { + Self::CRC32(_) => CRC32_HEADER, + Self::CRC32C(_) => CRC32C_HEADER, + Self::CRC64NVME(_) => CRC64NVME_HEADER, + Self::_SHA1(_) => SHA1_HEADER, + Self::SHA256(_) => SHA256_HEADER, + } + } + + pub fn set_validation_checksum(&mut self, checksum: String) { + match self { + IntegrityChecksum::CRC32(ref mut val) => *val = Some(checksum), + IntegrityChecksum::CRC32C(ref mut val) => *val = Some(checksum), + IntegrityChecksum::CRC64NVME(ref mut val) => *val = Some(checksum), + IntegrityChecksum::_SHA1(ref mut val) => *val = Some(checksum), + IntegrityChecksum::SHA256(ref mut val) => *val = Some(checksum), + } + } + + pub fn get_empty_checksum(&self) -> String { + match self { + IntegrityChecksum::CRC32(_) => CRC32_EMPTY, + IntegrityChecksum::CRC32C(_) => CRC32C_EMPTY, + IntegrityChecksum::CRC64NVME(_) => CRC64NVME_EMPTY, + IntegrityChecksum::_SHA1(_) => SHA1_EMPTY, + IntegrityChecksum::SHA256(_) => SHA256_EMPTY, + } + .to_string() + } +} + +pub fn eval_required_checksum( + headers: &HeaderMap, +) -> Result, S3Error> { + // Check if "x-amz-sdk-checksum-algorithm" is present + if let Some(header_value) = headers.get("x-amz-sdk-checksum-algorithm") { + let mut req_checksum = IntegrityChecksum::try_from(header_value)?; + + // Enforce that the corresponding "x-amz-checksum-algorithm" or "x-amz-trailer" is present + let checksum_header = req_checksum.checksum_header_name(); + if let Some(validation_checksum) = headers.get(checksum_header) { + let checksum_str = header_value_to_str(validation_checksum)?.to_string(); + req_checksum.set_validation_checksum(checksum_str); + } else if let Some(trailer) = headers.get("x-amz-trailer") { + let trailer_header: Vec<&str> = header_value_to_str(trailer)? + .split(',') + .map(|t| t.trim()) + .collect(); + + if !trailer_header.contains(&checksum_header) { + return Err(s3_error!( + MissingSecurityHeader, + "Integrity validation is missing required header" + )); + } + } + + return Ok(Some(req_checksum)); + } + + // Else check if any standalone "x-amz-checksum-algorithm" is present + let checksum = detect_algorithm_from_headers(headers)?; + Ok(checksum) +} + +/// Detect algorithm from individual checksum headers +fn detect_algorithm_from_headers( + headers: &HeaderMap, +) -> Result, S3Error> { + let (algo, header) = if headers.contains_key(CRC32_HEADER) { + ("CRC32", CRC32_HEADER) + } else if headers.contains_key(CRC32C_HEADER) { + ("CRC32C", CRC32C_HEADER) + } else if headers.contains_key(CRC64NVME_HEADER) { + ("CRC64NVME", CRC64NVME_HEADER) + } else if headers.contains_key(SHA1_HEADER) { + ("SHA1", SHA1_HEADER) + } else if headers.contains_key(SHA256_HEADER) { + ("SHA256", SHA256_HEADER) + } else { + return Ok(None); + }; + + Ok(Some(IntegrityChecksum::try_from(( + algo, + headers.get(header), + ))?)) +} + +#[derive(Clone, Debug, Default)] +pub struct ChecksumHandler { + pub required_checksum: Option, + pub checksum_type: Option, + pub checksum_mode: bool, + pub calculated_checksums: HashMap, +} + +impl ChecksumHandler { + pub fn new() -> Self { + Self::default() + } + + pub fn _new_with_checksum(required_checksum: Option) -> Self { + ChecksumHandler { + required_checksum, + checksum_type: None, + checksum_mode: false, + calculated_checksums: HashMap::new(), + } + } + + pub fn from_headers(headers: &HeaderMap) -> Result { + Self::try_from(headers) + } + + fn get_validation_checksum(&self) -> &Option { + if let Some(checksum) = &self.required_checksum { + match checksum { + IntegrityChecksum::CRC32(val) + | IntegrityChecksum::CRC32C(val) + | IntegrityChecksum::CRC64NVME(val) + | IntegrityChecksum::_SHA1(val) + | IntegrityChecksum::SHA256(val) => val, + } + } else { + &None + } + } + + pub fn add_calculated_checksum( + &mut self, + key: impl Into, + checksum: String, + to_b64: bool, + ) -> Result<(), S3Error> { + self.calculated_checksums.insert( + key.into(), + if to_b64 { + hex_to_base64(&checksum)? + } else { + checksum + }, + ); + Ok(()) + } + + pub fn get_calculated_checksum(&self) -> Option { + self.required_checksum.as_ref().and_then(|required| { + self.calculated_checksums + .get(&required.to_string()) + .cloned() + }) + } + + pub fn get_calculated_checksums(&self) -> &HashMap { + &self.calculated_checksums + } + + pub fn get_checksum_by_key(&self, key: &str, to_hex: bool) -> Result, S3Error> { + self.calculated_checksums + .get(key) + .map(|hash| { + if to_hex { + base64_to_hex(hash) + } else { + Ok(hash.clone()) + } + }) + .transpose() + } + + pub fn validate_checksum(&self) -> bool { + if let Some(required_checksum) = &self.required_checksum { + return match self.get_validation_checksum() { + None => true, // No checksum provided for validation + Some(validation_checksum) => { + if let Some(calculated_checksum) = self + .calculated_checksums + .get(&required_checksum.to_string()) + { + validation_checksum.eq(calculated_checksum) + } else { + false + } + } + }; + } + // If no checksum is provided checksum is always valid + true + } +} + +pub fn header_value_to_str(value: &HeaderValue) -> Result<&str, S3Error> { + value + .to_str() + .map_err(|_| s3_error!(InvalidArgument, "invalid header value")) +} + +pub fn hex_to_base64(value: &str) -> Result { + Ok(BASE64_STANDARD.encode( + hex::decode(value) + .map_err(|_| s3_error!(InternalError, "Hex to base64 conversion failed"))?, + )) +} + +pub fn base64_to_hex(value: &str) -> Result { + Ok(hex::encode(BASE64_STANDARD.decode(value).map_err( + |_| s3_error!(InternalError, "Base64 to hex conversion failed"), + )?)) +} + +#[cfg(test)] +mod tests { + use super::*; + use http::{HeaderMap, HeaderValue}; + + #[test] + fn test_display_variants() { + assert_eq!(IntegrityChecksum::CRC32(None).to_string(), "crc32"); + assert_eq!(IntegrityChecksum::CRC32C(None).to_string(), "crc32c"); + assert_eq!(IntegrityChecksum::CRC64NVME(None).to_string(), "crc64nvme"); + assert_eq!(IntegrityChecksum::_SHA1(None).to_string(), "sha1"); + assert_eq!(IntegrityChecksum::SHA256(None).to_string(), "sha256"); + + assert_eq!( + IntegrityChecksum::CRC32(Some("lorem".into())).to_string(), + "crc32" + ); + assert_eq!( + IntegrityChecksum::CRC32C(Some("lorem".into())).to_string(), + "crc32c" + ); + assert_eq!( + IntegrityChecksum::CRC64NVME(Some("lorem".into())).to_string(), + "crc64nvme" + ); + assert_eq!( + IntegrityChecksum::_SHA1(Some("lorem".into())).to_string(), + "sha1" + ); + assert_eq!( + IntegrityChecksum::SHA256(Some("lorem".into())).to_string(), + "sha256" + ); + } + + #[test] + fn test_try_from_headervalue_ok_and_err() { + let hv_crc32 = HeaderValue::from_static("CRC32"); + let res = IntegrityChecksum::try_from(&hv_crc32); + assert!(res.is_ok()); + match res.unwrap() { + IntegrityChecksum::CRC32(val) => assert!(val.is_none()), + _ => panic!("Expected CRC32 variant"), + } + + let hv_invalid = HeaderValue::from_static("INVALID"); + let res_invalid = IntegrityChecksum::try_from(&hv_invalid); + assert!(res_invalid.is_err()); + + // SHA1 is explicitly not implemented + let hv_sha1 = HeaderValue::from_static("SHA1"); + let res_sha1 = IntegrityChecksum::try_from(&hv_sha1); + assert!(res_sha1.is_err()); + } + + #[test] + fn test_try_from_tuple_ok_and_err() { + let v = ("CRC32", Some(&HeaderValue::from_static("abcd"))); + let res = IntegrityChecksum::try_from(v); + assert!(res.is_ok()); + match res.unwrap() { + IntegrityChecksum::CRC32(val) => assert_eq!(val, Some("abcd".to_string())), + _ => panic!("Expected CRC32 variant"), + } + + let v_none = ("CRC32", None); + let res_none = IntegrityChecksum::try_from(v_none); + assert!(res_none.is_ok()); + match res_none.unwrap() { + IntegrityChecksum::CRC32(val) => assert!(val.is_none()), + _ => panic!("Expected CRC32 variant"), + } + + let v_invalid = ("NOT_AN_ALGO", None); + assert!(IntegrityChecksum::try_from(v_invalid).is_err()); + } + + #[test] + fn test_checksum_header_name_and_set_validation() { + let mut ic = IntegrityChecksum::CRC32(None); + assert_eq!(ic.checksum_header_name(), CRC32_HEADER); + ic.set_validation_checksum("val1".to_string()); + match ic { + IntegrityChecksum::CRC32(Some(v)) => assert_eq!(v, "val1"), + _ => panic!("Expected CRC32(Some)"), + } + + let mut ic2 = IntegrityChecksum::SHA256(None); + assert_eq!(ic2.checksum_header_name(), SHA256_HEADER); + ic2.set_validation_checksum("val2".to_string()); + match ic2 { + IntegrityChecksum::SHA256(Some(v)) => assert_eq!(v, "val2"), + _ => panic!("Expected SHA256(Some)"), + } + } + + #[test] + fn test_eval_required_checksum_sdk_with_header() { + let mut headers = HeaderMap::new(); + headers.insert( + "x-amz-sdk-checksum-algorithm", + HeaderValue::from_static("CRC32"), + ); + headers.insert(CRC32_HEADER, HeaderValue::from_static("mycrc")); + + let res = eval_required_checksum(&headers).expect("should parse checksum"); + assert!(res.is_some()); + match res.unwrap() { + IntegrityChecksum::CRC32(val) => assert_eq!(val, Some("mycrc".to_string())), + _ => panic!("Expected CRC32"), + } + } + + #[test] + fn test_eval_required_checksum_sdk_with_trailer() { + let mut headers = HeaderMap::new(); + headers.insert( + "x-amz-sdk-checksum-algorithm", + HeaderValue::from_static("CRC32"), + ); + headers.insert("x-amz-trailer", HeaderValue::from_static(CRC32_HEADER)); + + let res = eval_required_checksum(&headers).expect("should parse checksum from trailer"); + assert!(res.is_some()); + match res.unwrap() { + IntegrityChecksum::CRC32(val) => assert!(val.is_none()), + _ => panic!("Expected CRC32"), + } + } + + #[test] + fn test_eval_required_checksum_sdk_missing_trailer_or_header() { + let mut headers = HeaderMap::new(); + headers.insert( + "x-amz-sdk-checksum-algorithm", + HeaderValue::from_static("CRC32"), + ); + headers.insert( + "x-amz-trailer", + HeaderValue::from_static("some-other-header"), + ); + + let res = eval_required_checksum(&headers); + assert!(res.is_err()); + } + + #[test] + fn test_detect_algorithm_from_headers_via_eval() { + let mut headers = HeaderMap::new(); + headers.insert(CRC32_HEADER, HeaderValue::from_static("detected")); + let res = eval_required_checksum(&headers).expect("should detect algorithm"); + assert!(res.is_some()); + match res.unwrap() { + IntegrityChecksum::CRC32(val) => assert_eq!(val, Some("detected".to_string())), + _ => panic!("Expected CRC32"), + } + } + + #[test] + fn test_checksum_handler_operations() { + // No required checksum + let handler = ChecksumHandler::new(); + assert!(handler.get_validation_checksum().is_none()); + assert!(handler.get_calculated_checksum().is_none()); + assert!(handler.validate_checksum()); // no required checksum => true + + // With required checksum + let required = IntegrityChecksum::CRC32(None); + let mut handler = ChecksumHandler::_new_with_checksum(Some(required)); + // set validation checksum inside the handler + if let Some(ic) = &mut handler.required_checksum { + ic.set_validation_checksum("expected".to_string()); + } + assert_eq!( + handler.get_validation_checksum(), + &Some("expected".to_string()) + ); + + // Validation should fail + assert!(!handler.validate_checksum()); + + // add calculated checksum and retrieve + handler + .add_calculated_checksum("crc32", "expected".to_string(), false) + .unwrap(); + assert_eq!( + handler.get_calculated_checksum(), + Some("expected".to_string()) + ); + assert_eq!( + handler.get_checksum_by_key("crc32", false).unwrap(), + Some("expected".to_string()) + ); + + // Validation should now succeed + assert!(handler.validate_checksum()); + } + + #[test] + fn test_header_value_to_str() { + let hv = HeaderValue::from_static("somestring"); + let s = header_value_to_str(&hv).expect("should convert"); + assert_eq!(s, "somestring"); + + let invalid_hv = + HeaderValue::from_bytes(&[115, 195, 182, 109, 195, 169, 115, 116, 114, 105, 110, 103]) + .unwrap(); + let res = header_value_to_str(&invalid_hv); + assert!(res.is_err()); + } + + #[test] + fn test_hex_and_base64_conversion_success_and_roundtrip() { + // Use hex "abcd" => bytes [0xAB, 0xCD] => base64 "q80=" + let hex = "abcd"; + let b64 = hex_to_base64(hex).expect("should convert hex to base64"); + assert_eq!(b64, "q80="); + + // Convert back + let hex_back = base64_to_hex(&b64).expect("should convert base64 to hex"); + assert_eq!(hex_back, "abcd"); + } + + #[test] + fn test_hex_and_base64_conversion_errors() { + // Invalid hex should return an error + let bad_hex = "zzzz"; + assert!(hex_to_base64(bad_hex).is_err()); + + // Invalid base64 should return an error + let bad_b64 = "!!!notbase64"; + assert!(base64_to_hex(bad_b64).is_err()); + } +} diff --git a/components/data_proxy/src/s3_frontend/utils/mod.rs b/components/data_proxy/src/s3_frontend/utils/mod.rs index 68db7f63..ada647f0 100644 --- a/components/data_proxy/src/s3_frontend/utils/mod.rs +++ b/components/data_proxy/src/s3_frontend/utils/mod.rs @@ -1,4 +1,5 @@ pub mod buffered_s3_sink; +pub mod checksum; pub mod crc_transformer; pub mod debug_transformer; pub mod list_objects; diff --git a/components/data_proxy/src/structs.rs b/components/data_proxy/src/structs.rs index 501568c5..5337cba6 100644 --- a/components/data_proxy/src/structs.rs +++ b/components/data_proxy/src/structs.rs @@ -2,7 +2,6 @@ use anyhow::Result; use anyhow::{anyhow, bail}; use aruna_rust_api::api::storage::models::v2::generic_resource::Resource; use aruna_rust_api::api::storage::models::v2::permission::ResourceId; -use aruna_rust_api::api::storage::models::v2::Pubkey; use aruna_rust_api::api::storage::models::v2::{ relation::Relation, DataClass, InternalRelationVariant, KeyValue, Object as GrpcObject, PermissionLevel, Project, RelationDirection, Status, User as GrpcUser, @@ -10,6 +9,7 @@ use aruna_rust_api::api::storage::models::v2::{ use aruna_rust_api::api::storage::models::v2::{Collection, DataEndpoint}; use aruna_rust_api::api::storage::models::v2::{Dataset, ResourceVariant}; use aruna_rust_api::api::storage::models::v2::{Hash, Permission}; +use aruna_rust_api::api::storage::models::v2::{Hashalgorithm, Pubkey}; use aruna_rust_api::api::storage::services::v2::create_collection_request; use aruna_rust_api::api::storage::services::v2::create_dataset_request; use aruna_rust_api::api::storage::services::v2::create_object_request; @@ -36,6 +36,7 @@ use tracing::{debug, error}; use crate::auth::auth::AuthHandler; use crate::helpers::IntoOption; +use crate::s3_frontend::utils::checksum::{base64_to_hex, hex_to_base64}; use crate::CONFIG; /* ----- Constants ----- */ @@ -999,6 +1000,23 @@ impl TryFrom for Object { let outbounds = outbound.into_option(); let versions = version.into_option(); + // Convert hashes to proxy internal format + let mut hashes = HashMap::new(); + for h in value.hashes.iter() { + let b64_hash = hex_to_base64(&h.hash)?; + match Hashalgorithm::try_from(h.alg)? { + Hashalgorithm::Md5 => { + hashes.insert("md5".to_string(), b64_hash); + } + Hashalgorithm::Sha256 => { + hashes.insert("sha256".to_string(), b64_hash); + } + _ => { + // Ignore unspecified hashes + } + } + } + Ok(Object { id: DieselUlid::from_str(&value.id)?, name: value.name.to_string(), @@ -1007,7 +1025,7 @@ impl TryFrom for Object { object_status: value.status(), data_class: value.data_class(), object_type: ObjectType::Object, - hashes: HashMap::default(), + hashes, metadata_license: value.metadata_license_tag, data_license: value.data_license_tag, dynamic: value.dynamic, @@ -1150,12 +1168,12 @@ impl Object { self.hashes .iter() .map(|(k, v)| { - let alg = if k == "MD5" { - 2 - } else if k == "SHA256" { - 1 + let alg = if k == "md5" { + Hashalgorithm::Md5 as i32 + } else if k == "sha256" { + Hashalgorithm::Sha256 as i32 } else { - 0 + Hashalgorithm::Unspecified as i32 }; Hash { @@ -1166,6 +1184,26 @@ impl Object { .collect() } + #[tracing::instrument(level = "trace", skip(self))] + pub fn get_api_safe_hashes(&self) -> Result> { + self.hashes + .iter() + .filter(|(k, _)| *k == "md5" || *k == "sha256") + .map(|(k, v)| { + let alg = if k == "md5" { + Hashalgorithm::Md5 + } else { + Hashalgorithm::Sha256 + }; + + Ok(Hash { + alg: alg as i32, + hash: base64_to_hex(v)?, + }) + }) + .collect() + } + #[tracing::instrument(level = "trace", skip(self, ep_id))] pub fn is_partial_sync(&self, ep_id: &DieselUlid) -> bool { self.endpoints @@ -1955,7 +1993,7 @@ impl From<&Object> for TypedId { } } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct UploadPart { pub id: DieselUlid, pub object_id: DieselUlid, @@ -1963,6 +2001,7 @@ pub struct UploadPart { pub part_number: u64, pub raw_size: u64, pub size: u64, + pub checksums: Option>, } impl From for Part { @@ -1979,6 +2018,94 @@ impl From for Part { #[cfg(test)] mod tests { + use crate::database::database::Database; + use crate::database::persistence::WithGenericBytes; + use crate::structs::UploadPart; + use diesel_ulid::DieselUlid; + use std::collections::HashMap; + #[test] fn test_resource_strings_cmp() {} + + #[test] + fn upload_part_marshalling() { + // Create a test instance of UploadPart with checksums + let mut checksums = HashMap::new(); + checksums.insert("md5".to_string(), "abc123".to_string()); + checksums.insert("sha256".to_string(), "def456".to_string()); + checksums.insert("crc32".to_string(), "hC5Uvw==".to_string()); + checksums.insert("crc32c".to_string(), "p+pydw====".to_string()); + checksums.insert("crc64nvme".to_string(), "N4vzZ9TJUr8=".to_string()); + + let mut original = UploadPart { + id: DieselUlid::generate(), + object_id: DieselUlid::generate(), + upload_id: "upload-123".to_string(), + part_number: 1, + raw_size: 1024, + size: 512, + checksums: Some(checksums), + }; + + // Serialize / Deserialize + let json = serde_json::to_string(&original).expect("Failed to serialize"); + dbg!(&json); + let deserialized: UploadPart = serde_json::from_str(&json).expect("Failed to deserialize"); + + assert_eq!(original, deserialized); + + // Set checksums to None and try again + original.checksums = None; + + // Serialize / Deserialize + let json = serde_json::to_string(&original).expect("Failed to serialize"); + dbg!(&json); + let deserialized: UploadPart = serde_json::from_str(&json).expect("Failed to deserialize"); + + assert_eq!(original, deserialized); + } + + #[tokio::test] + async fn upload_part_persistence() { + // Create a test instance of UploadPart with checksums + let mut checksums = HashMap::new(); + checksums.insert("md5".to_string(), "abc123".to_string()); + checksums.insert("sha256".to_string(), "def456".to_string()); + checksums.insert("crc32".to_string(), "hC5Uvw==".to_string()); + checksums.insert("crc32c".to_string(), "p+pydw====".to_string()); + checksums.insert("crc64nvme".to_string(), "N4vzZ9TJUr8=".to_string()); + + let mut original = UploadPart { + id: DieselUlid::generate(), + object_id: DieselUlid::generate(), + upload_id: "upload-123".to_string(), + part_number: 1, + raw_size: 1024, + size: 512, + checksums: Some(checksums), + }; + + let db = Database::new().await.unwrap(); + let client = db.get_client().await.unwrap(); + + original.upsert(&client).await.unwrap(); + + let fetched = UploadPart::get_opt(&original.id, &client) + .await + .unwrap() + .unwrap(); + + assert_eq!(original, fetched); + + // Remove checksums, upsert in database and re-fetch + original.checksums = None; + original.upsert(&client).await.unwrap(); + + let fetched = UploadPart::get_opt(&original.id, &client) + .await + .unwrap() + .unwrap(); + + assert_eq!(original, fetched); + } }