Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6aa3f93
chore: Remove deprecated code
das-Abroxas Jan 9, 2026
a58260c
feat: Implement checksum evaluation utils #225
das-Abroxas Jan 9, 2026
abbfaa5
refactor: Integrate hashes into protobuf Object to Dataproxy Object c…
das-Abroxas Jan 9, 2026
7be625c
fix: Mapping in get_hashes(&self) of Object
das-Abroxas Jan 9, 2026
99bdb48
refactor: Only use API conform hashes for communication with server
das-Abroxas Jan 15, 2026
6ca4e4e
feat: Extend UploadPart struct to optionally store checksums #225
das-Abroxas Jan 15, 2026
a265c84
feat: Upsert local hashes #225
das-Abroxas Jan 15, 2026
f954c17
feat: Extend ChecksumHandler for empty objects
das-Abroxas Jan 15, 2026
d16d79c
feat: Integrate ChecksumHandler in put_object operation #225
das-Abroxas Jan 15, 2026
8eb90da
refactor: 0 byte object handling
das-Abroxas Jan 15, 2026
df15c0d
feat: Integrate ChecksumHandler in upload_part operation #225
das-Abroxas Jan 15, 2026
4460e08
feat: Integrate ChecksumHandler in complete_multipart_upload operatio…
das-Abroxas Jan 15, 2026
6ce748e
feat: Add checksum type to output #225
das-Abroxas Jan 15, 2026
6528243
chore: Cleanup, cargo clippy fixes and cargo fmt
das-Abroxas Jan 15, 2026
95aa8b4
feat: Add checksum type and mode to ChecksumHandler
das-Abroxas Jan 20, 2026
5649efc
refactor: ChecksumHandler initialization
das-Abroxas Jan 20, 2026
d759756
feat: Add checksums to head_object and get_object operations #225
das-Abroxas Jan 20, 2026
dc69f5e
chore: Add hex <--> base64 conversion functions
das-Abroxas Jan 20, 2026
8b80195
refactor: All checksums are stored base64 encoded in ChecksumHandler
das-Abroxas Jan 20, 2026
566717c
feat: Add checksum validation to put_object and upload_part operation…
das-Abroxas Jan 20, 2026
8938f95
chore: Cleanup, cargo clippy fixes and cargo fmt
das-Abroxas Jan 20, 2026
c6ab877
fix: Incorrect assert in test_checksum_handler_operations()
das-Abroxas Jan 20, 2026
cab8bf3
test: Add simple tests for hex_to_base64 and base64_to_hex
das-Abroxas Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion components/data_proxy/src/caching/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::database::persistence::{delete_parts_by_object_id, delete_parts_by_upload_id};
use crate::replication::replication_handler::ReplicationMessage;
use crate::s3_frontend::data_handler::DataHandler;
use crate::s3_frontend::utils::checksum::ChecksumHandler;
use crate::structs::{
AccessKeyPermissions, Bundle, DbPermissionLevel, LocationBinding, ObjectType, TypedId,
UploadPart, User,
Expand Down Expand Up @@ -208,7 +209,7 @@
}
}

pub async fn get_cache(&self) -> Result<Arc<Cache>> {

Check warning on line 212 in components/data_proxy/src/caching/cache.rs

View workflow job for this annotation

GitHub Actions / clippy

methods `get_cache`, `handle_temp_locations`, `get_paths_of_id`, and `delete_bundle` are never used

warning: methods `get_cache`, `handle_temp_locations`, `get_paths_of_id`, and `delete_bundle` are never used --> components/data_proxy/src/caching/cache.rs:212:18 | 84 | impl Cache { | ---------- methods in this implementation ... 212 | pub async fn get_cache(&self) -> Result<Arc<Cache>> { | ^^^^^^^^^ ... 358 | pub async fn handle_temp_locations( | ^^^^^^^^^^^^^^^^^^^^^ ... 1009 | fn get_paths_of_id(&self, id: &DieselUlid) -> Vec<String> { | ^^^^^^^^^^^^^^^ ... 1105 | pub fn delete_bundle(&self, bundle_id: &DieselUlid) { | ^^^^^^^^^^^^^ | = note: `#[warn(dead_code)]` (part of `#[warn(unused)]`) on by default
self.self_arc
.read()
.await
Expand Down Expand Up @@ -385,6 +386,7 @@
backend,
temp_location,
None,
ChecksumHandler::new(),
)
.await
{
Expand Down Expand Up @@ -831,14 +833,19 @@
}

#[tracing::instrument(level = "trace", skip(self, object))]
pub async fn upsert_object(&self, object: Object) -> Result<()> {
pub async fn upsert_object(&self, mut object: Object) -> Result<()> {
trace!(?object, "upserting object");

if let Ok((rwlock_object, _)) = self.get_resource(&object.id) {
let cache_object = rwlock_object.read().await;
if *cache_object == object {
return Ok(());
}

// Add/Update existing checksums to object hashes
let mut proxy_hashes = cache_object.hashes.clone();
proxy_hashes.extend(object.hashes.clone());
object.hashes = proxy_hashes;
}

if let Some(persistence) = self.persistence.read().await.as_ref() {
Expand Down Expand Up @@ -921,6 +928,32 @@
Ok(())
}

#[tracing::instrument(level = "trace", skip(self, object_id, hashes))]
pub async fn update_object_hashes(
&self,
object_id: DieselUlid,
hashes: HashMap<String, String>,
) -> Result<()> {
if let Ok((rwlock_object, _)) = self.get_resource(&object_id) {
// Update hashes in cache object
let mut cache_object = rwlock_object.write().await;
cache_object.hashes.extend(hashes);

// Write update to persistence if available
if let Some(persistence) = self.persistence.read().await.as_ref() {
let mut client = persistence.get_client().await?;
let transaction = client.transaction().await?;
let transaction_client = transaction.client();
cache_object.upsert(transaction_client).await?;
transaction.commit().await?;
}
} else {
bail!("Object with id {} not found", object_id);
}

Ok(())
}

#[tracing::instrument(level = "trace", skip(self))]
pub async fn delete_object(&self, id: DieselUlid) -> Result<()> {
// Remove object and location from database
Expand Down Expand Up @@ -1315,6 +1348,7 @@
part_number: u64,
raw_size: u64,
final_size: u64,
checksums: HashMap<String, String>,
) -> Result<()> {
let part = UploadPart {
id: DieselUlid::generate(),
Expand All @@ -1323,6 +1357,7 @@
object_id,
upload_id: upload_id.clone(),
raw_size,
checksums: Some(checksums),
};
if let Some(persistence) = self.persistence.read().await.as_ref() {
part.upsert(persistence.get_client().await?.client())
Expand Down
7 changes: 4 additions & 3 deletions components/data_proxy/src/caching/grpc_query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ impl GrpcQueryHandler {
let mut req = Request::new(FinishObjectStagingRequest {
object_id: server_object.id.to_string(),
content_len,
hashes: proxy_object.get_hashes(), // Hashes stay the same
hashes: proxy_object.get_api_safe_hashes()?, // Hashes stay the same
completed_parts: vec![],
upload_id: "".to_string(), // Upload id only needed in requests from users
});
Expand All @@ -711,8 +711,9 @@ impl GrpcQueryHandler {
anyhow!("Object missing in FinishObjectResponse")
})?;

// Id of location record should be set to Dataproxy Object id but is set to Server Object id... the fuck?
let object = DPObject::try_from(response)?;
// Convert Object from response to Dataproxy internal representation and merge hashes
let mut object = DPObject::try_from(response)?;
object.hashes.extend(proxy_object.hashes);

// Persist Object and Location in cache/database
self.cache.upsert_object(object.clone()).await?;
Expand Down
133 changes: 104 additions & 29 deletions components/data_proxy/src/s3_frontend/data_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::caching::cache::Cache;
use crate::data_backends::storage_backend::StorageBackend;
use crate::s3_frontend::utils::buffered_s3_sink::BufferedS3Sink;
use crate::s3_frontend::utils::checksum::{ChecksumHandler, IntegrityChecksum};
use crate::s3_frontend::utils::{
buffered_s3_sink::BufferedS3Sink, crc_transformer::CrcTransformer,
};
use crate::structs::Object;
use crate::structs::ObjectLocation;
use crate::structs::VersionVariant;
Expand All @@ -9,6 +12,8 @@ use anyhow::Result;
use aruna_rust_api::api::storage::models::v2::Hash;
use aruna_rust_api::api::storage::models::v2::Hashalgorithm;
use aruna_rust_api::api::storage::models::v2::Status;
use crc_fast::CrcAlgorithm;
use crc_fast::Digest as CrcDigest;
use diesel_ulid::DieselUlid;
use md5::{Digest, Md5};
use pithos_lib::streamreadwrite::GenericStreamReadWriter;
Expand All @@ -21,6 +26,7 @@ use pithos_lib::transformers::pithos_comp_enc::PithosTransformer;
use pithos_lib::transformers::size_probe::SizeProbe;
use pithos_lib::transformers::zstd_comp::ZstdEnc;
use pithos_lib::transformers::zstd_decomp::ZstdDec;
use s3s::dto::{ChecksumType, CompleteMultipartUploadOutput, ETag};
use sha2::Sha256;
use std::fmt::Debug;
use std::sync::Arc;
Expand All @@ -45,7 +51,8 @@ impl DataHandler {
backend: Arc<Box<dyn StorageBackend>>,
before_location: ObjectLocation,
path_level: Option<[Option<(DieselUlid, String)>; 4]>,
) -> Result<()> {
mut checksum_handler: ChecksumHandler,
) -> Result<CompleteMultipartUploadOutput> {
let token = if let Some(handler) = cache.auth.read().await.as_ref() {
let Some(created_by) = object.created_by else {
error!("No created_by found");
Expand Down Expand Up @@ -157,17 +164,38 @@ impl DataHandler {
asr = asr.add_transformer(ZstdDec::new());
}

let (uncompressed_probe, uncompressed_stream) = SizeProbe::new();
// [Optional] Checksums
let (crc32_t, crc32_rx) = CrcTransformer::new_with_backchannel(
CrcDigest::new(CrcAlgorithm::Crc32IsoHdlc),
false,
);
let (crc32c_t, crc32c_rx) = CrcTransformer::new_with_backchannel(
CrcDigest::new(CrcAlgorithm::Crc32Iscsi),
false,
);
let (crc64nvme_t, crc64nvme_rx) = CrcTransformer::new_with_backchannel(
CrcDigest::new(CrcAlgorithm::Crc64Nvme),
false,
);

asr = asr.add_transformer(uncompressed_probe);
asr = asr.add_transformer(crc32_t);
asr = asr.add_transformer(crc32c_t);
asr = asr.add_transformer(crc64nvme_t);

let (sha_transformer, sha_recv) =
// Hashes
let (sha_t, sha_rx) =
HashingTransformer::new_with_backchannel(Sha256::new(), "sha256".to_string());
let (md5_transformer, md5_recv) =
let (md5_t, md5_rx) =
HashingTransformer::new_with_backchannel(Md5::new(), "md5".to_string());
//TODO: HashingTransformer sends hash to Footer which does not support SHA1 ...
//let (sha_transformer, sha_recv) = HashingTransformer::new_with_backchannel(Sha1::new(), "sha1".to_string());

asr = asr.add_transformer(sha_t);
asr = asr.add_transformer(md5_t);

asr = asr.add_transformer(sha_transformer);
asr = asr.add_transformer(md5_transformer);
// Log uncompressed size
let (uncompressed_probe, uncompressed_stream) = SizeProbe::new();
asr = asr.add_transformer(uncompressed_probe);

if new_location_clone.is_pithos() {
tx.send(pithos_lib::helpers::notifications::Message::FileContext(
Expand Down Expand Up @@ -205,19 +233,34 @@ impl DataHandler {
e
})?;

Ok::<(u64, u64, String, String, String), anyhow::Error>((
// Fetch hashes
for (key, rx, hex_to_b64) in [
("sha256", sha_rx, true),
("md5", md5_rx, true),
("crc32", crc32_rx, false),
("crc32c", crc32c_rx, false),
("crc64nvme", crc64nvme_rx, false),
] {
//hashes.insert(
checksum_handler.add_calculated_checksum(
//key.into(),
key,
rx.try_recv().inspect_err(|&e| {
error!(error = ?e, msg = e.to_string());
})?,
hex_to_b64,
)?;
}

//Ok::<(u64, u64, HashMap<String, String>, String), anyhow::Error>((
Ok::<(u64, u64, ChecksumHandler, String), anyhow::Error>((
disk_size_stream.try_recv().inspect_err(|&e| {
error!(error = ?e, msg = e.to_string());
})?,
uncompressed_stream.try_recv().inspect_err(|&e| {
error!(error = ?e, msg = e.to_string());
})?,
sha_recv.try_recv().inspect_err(|&e| {
error!(error = ?e, msg = e.to_string());
})?,
md5_recv.try_recv().inspect_err(|&e| {
error!(error = ?e, msg = e.to_string());
})?,
checksum_handler,
final_sha_recv.try_recv().inspect_err(|&e| {
error!(error = ?e, msg = e.to_string());
})?,
Expand All @@ -242,7 +285,7 @@ impl DataHandler {
}
}

let (before_size, after_size, sha, md5, final_sha) = aswr_handle
let (before_size, after_size, checksum_handler, final_sha) = aswr_handle
.await
.map_err(|e| {
error!(error = ?e, msg = e.to_string());
Expand All @@ -259,20 +302,32 @@ impl DataHandler {

debug!(new_location = ?new_location, "Finished finalizing location");

let hashes = vec![
Hash {
alg: Hashalgorithm::Sha256.into(),
hash: sha,
},
Hash {
alg: Hashalgorithm::Md5.into(),
hash: md5,
},
];
// Already store hashes in cache/database
cache
.update_object_hashes(
object.id,
checksum_handler.get_calculated_checksums().clone(),
)
.await?;

if let Some(handler) = cache.aruna_client.read().await.as_ref() {
// Set id of new location to object id to satisfy FK constraint
// TODO: Update hashes etc.
// Send SHA256 and MD5 hashes to Aruna server
let hashes = vec![
Hash {
alg: Hashalgorithm::Sha256.into(),
hash: checksum_handler
.get_checksum_by_key("sha256", true)?
.ok_or_else(|| anyhow!("SHA256 not found."))?
.clone(),
},
Hash {
alg: Hashalgorithm::Md5.into(),
hash: checksum_handler
.get_checksum_by_key("md5", true)?
.ok_or_else(|| anyhow!("MD5 not found."))?
.clone(),
},
];

handler
.set_object_hashes(&object.id, hashes, &token)
Expand All @@ -287,10 +342,30 @@ impl DataHandler {
.to_string();
backend.delete_object(before_location).await?;

// Remove parts from database
cache.delete_parts_by_upload_id(upload_id).await?;
}

Ok(())
// Create basic response with Etag
let mut output = CompleteMultipartUploadOutput {
e_tag: Some(ETag::Strong(format!("-{}", object.id))),
..Default::default()
};

// Add required checksum to response
if let Some(required) = &checksum_handler.required_checksum {
let checksum = checksum_handler.get_calculated_checksum();
match required {
IntegrityChecksum::CRC32(_) => output.checksum_crc32 = checksum,
IntegrityChecksum::CRC32C(_) => output.checksum_crc32c = checksum,
IntegrityChecksum::CRC64NVME(_) => output.checksum_crc64nvme = checksum,
IntegrityChecksum::_SHA1(_) => output.checksum_sha1 = checksum,
IntegrityChecksum::SHA256(_) => output.checksum_sha256 = checksum,
}
output.checksum_type = Some(ChecksumType::from_static(ChecksumType::FULL_OBJECT));
}

Ok(output)
}
}

Expand Down
29 changes: 1 addition & 28 deletions components/data_proxy/src/s3_frontend/s3server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ impl Service<Request<Incoming>> for WrappingService {
.map_err(|_| s3_error!(InternalError, "Failed to add CORS header"));
final_response.await
})
//final_response.boxed()
}
}

Expand All @@ -201,37 +200,11 @@ impl AsRef<S3Service> for WrappingService {
}
}

/*
impl WrappingService {
#[tracing::instrument(level = "trace", skip(self))]
#[must_use]
pub fn into_make_service(self) -> MakeService<Self> {
MakeService(self)
}
}

#[derive(Clone)]
pub struct MakeService<S>(S);

impl<T, S: Clone> Service<T> for MakeService<S> {
type Response = S;

type Error = Infallible;

type Future = Ready<Result<Self::Response, Self::Error>>;

#[tracing::instrument(level = "trace", skip(self))]
fn call(&self, _: T) -> Self::Future {
ready(Ok(self.0.clone()))
}
}
*/

impl WrappingService {
async fn get_cors_headers(
&self,
origin_exception: bool,
req: &hyper::Request<hyper::body::Incoming>,
req: &Request<Incoming>,
) -> Result<hyper::HeaderMap, S3Error> {
// Return all * if origin exception matches
if origin_exception {
Expand Down
Loading
Loading