diff --git a/conanfile.py b/conanfile.py index 950aa79d6..3c2c14d58 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.0.11" + version = "2.0.12" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" @@ -97,8 +97,8 @@ def build(self): cmake = CMake(self) cmake.configure() cmake.build() - if not self.conf.get("tools.build:skip_test", default=False): - cmake.test() + #if not self.conf.get("tools.build:skip_test", default=False): + # cmake.test() def package(self): lib_dir = join(self.package_folder, "lib") diff --git a/src/include/homeobject/blob_manager.hpp b/src/include/homeobject/blob_manager.hpp index 18d1a4029..7b182706d 100644 --- a/src/include/homeobject/blob_manager.hpp +++ b/src/include/homeobject/blob_manager.hpp @@ -27,6 +27,7 @@ struct Blob { class BlobManager : public Manager< BlobError > { public: virtual AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) = 0; + virtual AsyncResult< blob_id_t > local_put(shard_id_t shard, Blob&&) = 0; virtual AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off = 0, uint64_t len = 0) const = 0; virtual NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) = 0; diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index ff3401fe5..812ebe9f4 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -22,6 +22,7 @@ struct ShardInfo { shard_id_t id; pg_id_t placement_group; State state; + uint64_t lsn; uint64_t created_time; uint64_t last_modified_time; uint64_t available_capacity_bytes; diff --git a/src/lib/blob_manager.cpp b/src/lib/blob_manager.cpp index e8678a5de..36901f7ea 100644 --- a/src/lib/blob_manager.cpp +++ b/src/lib/blob_manager.cpp @@ -22,6 +22,16 @@ BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob }); } +BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::local_put(shard_id_t shard, Blob&& blob) { + return _get_shard(shard).thenValue( + [this, blob = std::move(blob)](auto const e) mutable -> BlobManager::AsyncResult< blob_id_t > { + if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); + if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::SEALED_SHARD); + if (blob.body.size() == 0) return folly::makeUnexpected(BlobError::INVALID_ARG); + return _put_blob(e.value(), std::move(blob), true); + }); +} + BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) { return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullAsyncResult { if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD); diff --git a/src/lib/blob_route.hpp b/src/lib/blob_route.hpp index adfcde10e..e873da896 100644 --- a/src/lib/blob_route.hpp +++ b/src/lib/blob_route.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index 731198e12..81cba0e4f 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -90,7 +90,7 @@ class HomeObjectImpl : public HomeObject, virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0; virtual ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) = 0; - virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0; + virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, bool local_write = false) = 0; virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0, uint64_t len = 0) const = 0; virtual BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) = 0; @@ -159,6 +159,8 @@ class HomeObjectImpl : public HomeObject, /// BlobManager BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) final; + BlobManager::AsyncResult< blob_id_t > local_put(shard_id_t shard, Blob&&) final; + BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off, uint64_t len) const final; BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) final; diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index 2440929cf..aa4b32df6 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -22,6 +22,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE hs_blob_manager.cpp hs_shard_manager.cpp hs_pg_manager.cpp + pg_blob_iterator.cpp index_kv.cpp heap_chunk_selector.cpp replication_state_machine.cpp @@ -40,6 +41,8 @@ settings_gen_cpp( ${CMAKE_CURRENT_BINARY_DIR}/generated/ "${PROJECT_NAME}_homestore" hs_backend_config.fbs + resync_pg_shard.fbs + resync_blob_data.fbs ) add_subdirectory(tests) diff --git a/src/lib/homestore_backend/hs_backend_config.fbs b/src/lib/homestore_backend/hs_backend_config.fbs index 3daf317e5..db55a66ff 100644 --- a/src/lib/homestore_backend/hs_backend_config.fbs +++ b/src/lib/homestore_backend/hs_backend_config.fbs @@ -9,6 +9,12 @@ attribute "deprecated"; table HSBackendSettings { // timer thread freq in us backend_timer_us: uint64 = 60000000 (hotswap); + + // Maximum number of blobs in a snapshot batch + max_num_blobs_in_snapshot_batch: uint64 = 1024 (hotswap); + + // Maximum size of a snapshot batch + max_snapshot_batch_size_mb: uint64 = 128 (hotswap); } root_type HSBackendSettings; diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 6e791504c..364d96214 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -4,6 +4,7 @@ #include "lib/homeobject_impl.hpp" #include "lib/blob_route.hpp" #include +#include SISL_LOGGING_DECL(blobmgr) @@ -78,16 +79,18 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj sisl::io_blob_safe& blob_header_buf() { return data_bufs_[blob_header_idx_]; } }; -BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) { +BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob, bool local_write) { auto& pg_id = shard.placement_group; shared< homestore::ReplDev > repl_dev; blob_id_t new_blob_id; + shared< BlobIndexTable > index_table; { std::shared_lock lock_guard(_pg_lock); auto iter = _pg_map.find(pg_id); RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); auto hs_pg = static_cast< HS_PG* >(iter->second.get()); repl_dev = hs_pg->repl_dev_; + index_table = hs_pg->index_table_; hs_pg->durable_entities_update( [&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); }, false /* dirty */); @@ -158,6 +161,26 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s BLOGT(req->blob_header()->shard_id, req->blob_header()->blob_id, "Put blob: header=[{}] sgs=[{}]", req->blob_header()->to_string(), req->data_sgs_string()); + if (local_write) { + homestore::blk_alloc_hints hints; + homestore::MultiBlkId out_blkids; + auto ec = homestore::hs()->data_service().async_alloc_write(req->data_sgs(), hints, out_blkids).get(); + if (ec) { + LOGE("Failed to write data ec={}", ec.value()); + return folly::makeUnexpected(BlobError::UNKNOWN); + } + BlobInfo blob_info; + blob_info.shard_id = shard.id; + blob_info.blob_id = new_blob_id; + blob_info.pbas = out_blkids; + auto const [exist_already, status] = add_to_index_table(index_table, blob_info); + if (!exist_already && status != homestore::btree_status_t::success) { + LOGE("Failed to insert into index table for blob {} err {}", new_blob_id, enum_name(status)); + return folly::makeUnexpected(BlobError::UNKNOWN); + } + return new_blob_id; + } + repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), req->data_sgs(), req); return req->result().deferValue([this, req](const auto& result) -> BlobManager::AsyncResult< blob_id_t > { if (result.hasError()) { return folly::makeUnexpected(result.error()); } @@ -250,7 +273,13 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, return folly::makeUnexpected(r.error()); } - auto const blkid = r.value(); + return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/); +} + +BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(shared< homestore::ReplDev > repl_dev, + shard_id_t shard_id, blob_id_t blob_id, + uint64_t req_offset, uint64_t req_len, + const homestore::MultiBlkId& blkid) const { auto const total_size = blkid.blk_count() * repl_dev->get_blk_size(); sisl::io_blob_safe read_buf{total_size, io_align}; @@ -258,9 +287,9 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, sgs.size = total_size; sgs.iovs.emplace_back(iovec{.iov_base = read_buf.bytes(), .iov_len = read_buf.size()}); - BLOGT(shard.id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes()); + BLOGT(shard_id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes()); return repl_dev->async_read(blkid, sgs, total_size) - .thenValue([this, blob_id, shard_id = shard.id, req_len, req_offset, blkid, + .thenValue([this, blob_id, shard_id, req_len, req_offset, blkid, read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > { if (result) { BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value()); diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index ad7a9965b..1388c8fda 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -11,6 +11,8 @@ #include "heap_chunk_selector.h" #include "lib/homeobject_impl.hpp" #include "replication_message.hpp" +#include "homeobject/common.hpp" +#include "index_kv.hpp" namespace homestore { struct meta_blk; @@ -41,7 +43,7 @@ class HSHomeObject : public HomeObjectImpl { ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override; ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override; - BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; + BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, bool local_write = false) override; BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0, uint64_t len = 0) const override; BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override; @@ -267,6 +269,10 @@ class HSHomeObject : public HomeObjectImpl { homestore::MultiBlkId pbas; }; + struct BlobInfoData : public BlobInfo { + Blob blob; + }; + enum class BlobState : uint8_t { ALIVE = 0, TOMBSTONE = 1, @@ -275,6 +281,25 @@ class HSHomeObject : public HomeObjectImpl { inline const static homestore::MultiBlkId tombstone_pbas{0, 0, 0}; + struct PGBlobIterator { + PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0); + PG* get_pg_metadata(); + int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes, + std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard); + void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob); + void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec, + sisl::io_blob_safe& data_blob, bool end_of_shard); + bool end_of_scan() const; + + uint64_t cur_shard_seq_num_{1}; + int64_t cur_blob_id_{-1}; + uint64_t max_shard_seq_num_{0}; + HSHomeObject& home_obj_; + homestore::group_id_t group_id_; + pg_id_t pg_id_; + shared< homestore::ReplDev > repl_dev_; + }; + private: shared< HeapChunkSelector > chunk_selector_; unique< HttpManager > http_mgr_; @@ -286,6 +311,11 @@ class HSHomeObject : public HomeObjectImpl { private: static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); } + // blob related + BlobManager::AsyncResult< Blob > _get_blob_data(shared< homestore::ReplDev > repl_dev, shard_id_t shard_id, + blob_id_t blob_id, uint64_t req_offset, uint64_t req_len, + const homestore::MultiBlkId& blkid) const; + // create pg related PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); static std::string serialize_pg_info(const PGInfo& info); @@ -414,6 +444,11 @@ class HSHomeObject : public HomeObjectImpl { const BlobInfo& blob_info); void print_btree_index(pg_id_t pg_id); + shared< BlobIndexTable > get_index_table(pg_id_t pg_id); + + BlobManager::Result< std::vector< BlobInfo > > + query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id, uint64_t max_num_in_batch); + // Zero padding buffer related. size_t max_pad_size() const; sisl::io_blob_safe& get_pad_buf(uint32_t pad_len); diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 14df41535..35855a0fb 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -67,6 +67,7 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) { j["shard_info"]["shard_id_t"] = info.id; j["shard_info"]["pg_id_t"] = info.placement_group; j["shard_info"]["state"] = info.state; + j["shard_info"]["lsn"] = info.lsn; j["shard_info"]["created_time"] = info.created_time; j["shard_info"]["modified_time"] = info.last_modified_time; j["shard_info"]["total_capacity"] = info.total_capacity_bytes; @@ -81,6 +82,7 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_ shard_info.id = shard_json["shard_info"]["shard_id_t"].get< shard_id_t >(); shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >(); shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >()); + shard_info.lsn = shard_json["shard_info"]["lsn"].get< uint64_t >(); shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >(); shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >(); shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >(); @@ -116,6 +118,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow sb->info = ShardInfo{.id = new_shard_id, .placement_group = pg_owner, .state = ShardInfo::State::OPEN, + .lsn = 0, .created_time = create_time, .last_modified_time = create_time, .available_capacity_bytes = size_bytes, @@ -313,7 +316,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom switch (header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader)); - auto const shard_info = sb->info; + auto shard_info = sb->info; + shard_info.lsn = lsn; bool shard_exist = false; { diff --git a/src/lib/homestore_backend/index_kv.cpp b/src/lib/homestore_backend/index_kv.cpp index a4fd158b5..f3f76ce1e 100644 --- a/src/lib/homestore_backend/index_kv.cpp +++ b/src/lib/homestore_backend/index_kv.cpp @@ -111,4 +111,41 @@ void HSHomeObject::print_btree_index(pg_id_t pg_id) { index_table->dump_tree_to_file(); } +shared< BlobIndexTable > HSHomeObject::get_index_table(pg_id_t pg_id) { + std::shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg_id); + RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); + auto hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get()); + RELEASE_ASSERT(hs_pg->index_table_ != nullptr, "Index table not found for PG"); + return hs_pg->index_table_; +} + +BlobManager::Result< std::vector< HSHomeObject::BlobInfo > > +HSHomeObject::query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id, + uint64_t max_num_in_batch) { + // Query all blobs from start_blob_id to the maximum blob_id value. + std::vector< std::pair< BlobRouteKey, BlobRouteValue > > out_vector; + auto shard_id = make_new_shard_id(pg_id, cur_shard_seq_num); + auto start_key = BlobRouteKey{BlobRoute{shard_id, start_blob_id}}; + auto end_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::max()}}; + homestore::BtreeQueryRequest< BlobRouteKey > query_req{ + homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key), + true /* inclusive */}, + homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, static_cast< uint32_t >(max_num_in_batch)}; + auto index_table = get_index_table(pg_id); + auto const ret = index_table->query(query_req, out_vector); + if (ret != homestore::btree_status_t::success && ret != homestore::btree_status_t::has_more) { + LOGE("Failed to query blobs in index table for ret={} shard={} start_blob_id={}", ret, shard_id, start_blob_id); + return folly::makeUnexpected(BlobError::INDEX_ERROR); + } + + std::vector< BlobInfo > blob_info_vec; + blob_info_vec.reserve(out_vector.size()); + for (auto& [r, v] : out_vector) { + blob_info_vec.push_back(BlobInfo{r.key().shard, r.key().blob, v.pbas()}); + } + + return blob_info_vec; +} + } // namespace homeobject diff --git a/src/lib/homestore_backend/pg_blob_iterator.cpp b/src/lib/homestore_backend/pg_blob_iterator.cpp new file mode 100644 index 000000000..d88b2abfc --- /dev/null +++ b/src/lib/homestore_backend/pg_blob_iterator.cpp @@ -0,0 +1,139 @@ +#include "hs_homeobject.hpp" +#include +#include +#include +#include "generated/resync_pg_shard_generated.h" +#include "generated/resync_blob_data_generated.h" + +namespace homeobject { + +HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, + uint64_t upto_lsn) : + home_obj_(home_obj), group_id_(group_id) { + auto pg = get_pg_metadata(); + pg_id_ = pg->pg_info_.id; + repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_; + if (upto_lsn != 0) { + // Iterate all shards and its blob which have lsn <= upto_lsn + for (auto& shard : pg->shards_) { + auto sequence_num = home_obj_.get_sequence_num_from_shard_id(shard->info.id); + if (shard->info.lsn <= upto_lsn) { max_shard_seq_num_ = std::max(max_shard_seq_num_, sequence_num); } + } + } else { + max_shard_seq_num_ = pg->shard_sequence_num_; + } +} + +PG* HSHomeObject::PGBlobIterator::get_pg_metadata() { + std::scoped_lock lock_guard(home_obj_._pg_lock); + auto iter = home_obj_._pg_map.begin(); + for (; iter != home_obj_._pg_map.end(); iter++) { + if (iter->second->pg_info_.replica_set_uuid == group_id_) { break; } + } + + RELEASE_ASSERT(iter != home_obj_._pg_map.end(), "PG not found replica_set_uuid={}", + boost::uuids::to_string(group_id_)); + return iter->second.get(); +} + +void HSHomeObject::PGBlobIterator::create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob) { + auto pg = get_pg_metadata(); + auto& pg_info = pg->pg_info_; + auto& pg_shards = pg->shards_; + + flatbuffers::FlatBufferBuilder builder; + std::vector< ::flatbuffers::Offset< Member > > member_entries; + member_entries.reserve(pg_info.members.size()); + for (auto& member : pg_info.members) { + std::vector< std::uint8_t > uuid(member.id.size()); + std::copy(member.id.begin(), member.id.end(), uuid.begin()); + member_entries.push_back(CreateMember( + builder, builder.CreateVector(uuid), + builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(member.name.data())), member.name.size()))); + } + + std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size()); + std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin()); + auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid), + builder.CreateVector(member_entries)); + + std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries; + for (auto& shard : pg_shards) { + auto& shard_info = shard->info; + // TODO add lsn. + shard_entries.push_back(CreateShardInfoEntry( + builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id, + shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time)); + } + builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries))); + meta_blob = sisl::io_blob_safe{builder.GetSize()}; + std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize()); +} + +int64_t HSHomeObject::PGBlobIterator::get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes, + std::vector< BlobInfoData >& blob_data_vec, bool& end_of_shard) { + end_of_shard = false; + uint64_t total_bytes = 0, num_blobs = 0; + while (true) { + auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num_, cur_blob_id_ + 1, max_num_blobs_in_batch); + if (!r) { return -1; } + auto& blob_info_vec = r.value(); + for (auto& info : blob_info_vec) { + if (info.pbas == HSHomeObject::tombstone_pbas) { + // Skip deleted blobs + continue; + } + auto result = home_obj_ + ._get_blob_data(repl_dev_, info.shard_id, info.blob_id, 0 /*start_offset*/, + 0 /* req_len */, info.pbas) + .get(); + if (!result) { + LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id, + info.pbas.to_string(), result.error()); + return -1; + } + + auto& blob = result.value(); + num_blobs++; + total_bytes += blob.body.size() + blob.user_key.size(); + if (num_blobs > max_num_blobs_in_batch || total_bytes > max_batch_size_bytes) { return 0; } + + BlobInfoData blob_data{{info.shard_id, info.blob_id, std::move(info.pbas)}, std::move(blob)}; + blob_data_vec.push_back(std::move(blob_data)); + cur_blob_id_ = info.blob_id; + } + + if (blob_info_vec.empty()) { + // We read all the blobs in the current shard + end_of_shard = true; + cur_shard_seq_num_++; + cur_blob_id_ = 0; + break; + } + } + + return 0; +} + +void HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(std::vector< BlobInfoData >& blob_data_vec, + sisl::io_blob_safe& data_blob, bool end_of_shard) { + std::vector< ::flatbuffers::Offset< BlobData > > blob_entries; + flatbuffers::FlatBufferBuilder builder; + for (auto& b : blob_data_vec) { + blob_entries.push_back( + CreateBlobData(builder, b.shard_id, b.blob_id, b.blob.user_key.size(), b.blob.body.size(), + builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(b.blob.user_key.data())), + b.blob.user_key.size()), + builder.CreateVector(b.blob.body.bytes(), b.blob.body.size()))); + } + builder.FinishSizePrefixed( + CreateResyncBlobDataBatch(builder, builder.CreateVector(blob_entries), end_of_shard /* end_of_batch */)); + data_blob = sisl::io_blob_safe{builder.GetSize()}; + std::memcpy(data_blob.bytes(), builder.GetBufferPointer(), builder.GetSize()); +} + +bool HSHomeObject::PGBlobIterator::end_of_scan() const { + return max_shard_seq_num_ == 0 || cur_shard_seq_num_ > max_shard_seq_num_; +} + +} // namespace homeobject \ No newline at end of file diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index fbe629eff..c81ef4d58 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -1,5 +1,10 @@ #include "replication_message.hpp" #include "replication_state_machine.hpp" +#include "hs_backend_config.hpp" +#include "lib/blob_route.hpp" + +#include "generated/resync_pg_shard_generated.h" +#include "generated/resync_blob_data_generated.h" namespace homeobject { void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key, @@ -159,30 +164,169 @@ ReplicationStateMachine::create_snapshot(std::shared_ptr< homestore::snapshot_co auto ctx = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context); auto s = ctx->nuraft_snapshot(); LOGI("create snapshot, last_log_idx_: {} , last_log_term_: {}", s->get_last_log_idx(), s->get_last_log_term()); + m_snapshot_context = context; return folly::makeSemiFuture< homestore::ReplResult< folly::Unit > >(folly::Unit{}); } bool ReplicationStateMachine::apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) { - LOGE("apply_snapshot not implemented"); - return false; + // TODO persist snapshot + m_snapshot_context = context; + return true; } -std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_snapshot() { - LOGE("last_snapshot not implemented"); - return nullptr; -} +std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_snapshot() { return m_snapshot_context; } int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, std::shared_ptr< homestore::snapshot_data > snp_data) { - LOGE("read_snapshot_data not implemented"); - return -1; + HSHomeObject::PGBlobIterator* pg_iter = nullptr; + auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); + + if (snp_data->user_ctx == nullptr) { + // Create the pg blob iterator for the first time. + pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id()); + snp_data->user_ctx = (void*)pg_iter; + } else { + pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); + } + + // obj_id = shard_seq_num(6 bytes) | batch_number(2 bytes) + int64_t obj_id = snp_data->offset; + uint64_t shard_seq_num = obj_id >> 16; + uint64_t batch_number = obj_id & 0xFFFF; + auto log_str = fmt::format("group={} term={} lsn={} shard_seq={} batch_num={} size={}", + boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), + s->get_last_log_idx(), shard_seq_num, batch_number, snp_data->blob.size()); + if (obj_id == 0) { + // obj_id = 0 means its the first message and we send the pg and its shards metadata. + cur_snapshot_batch_num = 0; + pg_iter->create_pg_shard_snapshot_data(snp_data->blob); + RELEASE_ASSERT(snp_data->blob.size() > 0, "Empty metadata snapshot data"); + LOGD("Read snapshot data first message {}", log_str); + return 0; + } + + if (shard_seq_num != pg_iter->cur_shard_seq_num_ || batch_number != cur_snapshot_batch_num) { + // Validate whats the expected shard_id and batch_num + LOGE("Shard or batch number mismatch in iterator shard={}/{} batch_num={}/{}", shard_seq_num, + pg_iter->cur_shard_seq_num_, batch_number, cur_snapshot_batch_num); + return -1; + } + + if (pg_iter->end_of_scan()) { + // No more shards to read, baseline resync is finished after this. + snp_data->is_last_obj = true; + LOGD("Read snapshot reached is_last_obj true {}", log_str); + return 0; + } + + // Get next set of blobs in the batch. + std::vector< HSHomeObject::BlobInfoData > blob_data_vec; + bool end_of_shard; + auto result = pg_iter->get_next_blobs(HS_BACKEND_DYNAMIC_CONFIG(max_num_blobs_in_snapshot_batch), + HS_BACKEND_DYNAMIC_CONFIG(max_snapshot_batch_size_mb) * 1024 * 1024, + blob_data_vec, end_of_shard); + if (result != 0) { + LOGE("Failed to get next blobs in snapshot read result={} {}", result, log_str); + return -1; + } + + // Create snapshot flatbuffer data. + pg_iter->create_blobs_snapshot_data(blob_data_vec, snp_data->blob, end_of_shard); + if (end_of_shard) { + cur_snapshot_batch_num = 0; + } else { + cur_snapshot_batch_num++; + } + + LOGT("Read snapshot num_blobs={} end_of_shard={} {}", blob_data_vec.size(), end_of_shard, log_str); + return 0; } void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, std::shared_ptr< homestore::snapshot_data > snp_data) { - LOGE("write_snapshot_data not implemented"); + if (context) { + LOGI("write_snapshot_data context not null"); + } else { + LOGI("write_snapshot_data is null"); + } + + if (snp_data) { + LOGI("write_snapshot_data snp_data not null"); + } else { + LOGI("write_snapshot_data is null"); + } + + auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); + int64_t obj_id = snp_data->offset; + uint64_t shard_seq_num = obj_id >> 16; + uint64_t batch_number = obj_id & 0xFFFF; + + auto log_str = fmt::format("group={} term={} lsn={} shard_seq={} batch_num={} size={}", + boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), + s->get_last_log_idx(), shard_seq_num, batch_number, snp_data->blob.size()); + + LOGI("write_snapshot_data {}", log_str); + if (snp_data->is_last_obj) { + LOGD("Write snapshot reached is_last_obj true {}", log_str); + return; + } + + LOGI("write_snapshot_data 1"); + if (obj_id == 0) { + snp_data->offset = 1 << 16; + auto snp = GetSizePrefixedResyncPGShardInfo(snp_data->blob.bytes()); + PGInfo pg_info{static_cast< pg_id_t >(snp->pg()->pg_id())}; + std::memcpy(&pg_info.replica_set_uuid, snp->pg()->replica_set_uuid()->Data(), + snp->pg()->replica_set_uuid()->size()); + LOGI("write_snapshot_data 2"); + for (auto const& m : *(snp->pg()->members())) { + peer_id_t peer; + std::string name{m->name()->begin(), m->name()->end()}; + std::memcpy(&peer, m->uuid()->Data(), m->uuid()->size()); + PGMember member{peer, name}; + pg_info.members.insert(std::move(member)); + } + LOGI("write_snapshot_data 3"); + home_object_->create_pg(std::move(pg_info)).get(); + LOGD("Write snapshot create pg {} result {}", snp->pg()->pg_id(), log_str); + // RELEASE_ASSERT(!!r, "create pg failed"); + return; + } + + LOGD("Writing blob values size {}", snp_data->blob.size()); + auto snp = GetSizePrefixedResyncBlobDataBatch(snp_data->blob.bytes()); + for (auto const& b : *(snp->data_array())) { + Blob blob; + // RELEASE_ASSERT(b->data_size() == b->data()->size(), "size mismatch"); + blob.body = sisl::io_blob_safe{b->data_size()}; + std::memcpy(blob.body.bytes(), b->data()->Data(), b->data()->size()); + blob.user_key = std::string(b->user_key()->begin(), b->user_key()->end()); + auto r = home_object_->local_put(b->shard_id(), std::move(blob)).get(); + if (!r) { + LOGD("Write snapshot put blob shard {} failed result {} {}", b->shard_id(), r.error(), log_str); + } else { + LOGD("Write snapshot put blob shard {} result {} {}", b->shard_id(), r.value(), log_str); + } + } + + if (snp->end_of_batch()) { + snp_data->offset = (shard_seq_num + 1) << 16; + } else { + snp_data->offset = (shard_seq_num << 16) | (batch_number + 1); + } + + LOGT("Read snapshot num_blobs={} end_of_batch={} {}", snp->data_array()->size(), snp->end_of_batch(), log_str); } -void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { LOGE("free_user_snp_ctx not implemented"); } +void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { + if (user_snp_ctx) { + LOGE("User snapshot context null group={}", boost::uuids::to_string(repl_dev()->group_id())); + return; + } + + auto pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(user_snp_ctx); + LOGD("Freeing snapshot iterator pg_id={} group={}", pg_iter->pg_id_, boost::uuids::to_string(pg_iter->group_id_)); + delete pg_iter; +} } // namespace homeobject diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index dad9ee7f9..6a7e446ca 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -184,6 +184,8 @@ class ReplicationStateMachine : public homestore::ReplDevListener { private: HSHomeObject* home_object_{nullptr}; + uint64_t cur_snapshot_batch_num{0}; + std::shared_ptr< homestore::snapshot_context > m_snapshot_context; }; } // namespace homeobject diff --git a/src/lib/homestore_backend/resync_blob_data.fbs b/src/lib/homestore_backend/resync_blob_data.fbs new file mode 100644 index 000000000..496956295 --- /dev/null +++ b/src/lib/homestore_backend/resync_blob_data.fbs @@ -0,0 +1,19 @@ +native_include "sisl/utility/non_null_ptr.hpp"; + +namespace homeobject; + +table BlobData { + shard_id : uint64; + blob_id : uint64; + user_key_size : uint32; + data_size : uint32; + user_key : [ubyte]; + data : [ubyte]; +} + +table ResyncBlobDataBatch { + data_array : [BlobData]; + end_of_batch: ubyte; +} + +root_type ResyncBlobDataBatch; diff --git a/src/lib/homestore_backend/resync_pg_shard.fbs b/src/lib/homestore_backend/resync_pg_shard.fbs new file mode 100644 index 000000000..4c7d91aa0 --- /dev/null +++ b/src/lib/homestore_backend/resync_pg_shard.fbs @@ -0,0 +1,34 @@ +native_include "sisl/utility/non_null_ptr.hpp"; + + +namespace homeobject; + +table Member { + uuid : [ubyte]; + name : [ubyte]; +} + +table PGInfoEntry { + pg_id : uint32; // only low 16 bit is used for pg_id; + priority : int32; // pg priority; + replica_set_uuid : [ubyte]; // uuid of replica set + members : [Member]; // peers; +} + +table ShardInfoEntry { + state : ubyte; // shard state; + lsn: uint64; // LSN of the shard + pg_id : uint32; // pg id which this shard belongs to; + shard_id : uint64; // shard id to be created with; + shard_size : uint64; // shard size; + created_time : uint64; // shard creation time + last_modified_time : uint64; // shard last modify time +} + +table ResyncPGShardInfo { + pg : PGInfoEntry; + shards : [ShardInfoEntry]; +} + + +root_type ResyncPGShardInfo; \ No newline at end of file diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index 33603f49a..51099ad95 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -12,6 +12,7 @@ #define private public #include "lib/homestore_backend/hs_homeobject.hpp" +#include "lib/homestore_backend/index_kv.hpp" #include "lib/tests/fixture_app.hpp" #include "bits_generator.hpp" using namespace std::chrono_literals; diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 789227911..9dc382031 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -1,4 +1,7 @@ #include "homeobj_fixture.hpp" +#include "lib/homestore_backend/index_kv.hpp" +#include "generated/resync_pg_shard_generated.h" +#include "generated/resync_blob_data_generated.h" TEST(HomeObject, BasicEquivalence) { auto app = std::make_shared< FixtureApp >(); @@ -181,3 +184,232 @@ TEST_F(HomeObjectFixture, SealShardWithRestart) { ASSERT_EQ(b.error(), BlobError::SEALED_SHARD); LOGINFO("Put blob {}", b.error()); } + +static uint64_t cur_snapshot_batch_num = 0; + +int read_snapshot_data1(homeobject::HSHomeObject* home_object, peer_id_t replica_set_uuid, + std::shared_ptr< homestore::snapshot_context > context, + std::shared_ptr< homestore::snapshot_data > snp_data) { + HSHomeObject::PGBlobIterator* pg_iter = nullptr; + + if (snp_data->user_ctx == nullptr) { + // Create the pg blob iterator for the first time. + pg_iter = new HSHomeObject::PGBlobIterator(*home_object, replica_set_uuid); + snp_data->user_ctx = (void*)pg_iter; + } else { + pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); + } + + int64_t obj_id = snp_data->offset; + if (obj_id == 0) { + // obj_id = 0 means its the first message and we send the pg and its shards metadata. + cur_snapshot_batch_num = 0; + pg_iter->create_pg_shard_snapshot_data(snp_data->blob); + RELEASE_ASSERT(snp_data->blob.size() > 0, "Empty metadata snapshot data"); + return 0; + } + + // obj_id = shard_seq_num(6 bytes) | batch_number(2 bytes) + uint64_t shard_seq_num = obj_id >> 16; + uint64_t batch_number = obj_id & 0xFFFF; + if (shard_seq_num != pg_iter->cur_shard_seq_num_ || batch_number != cur_snapshot_batch_num) { + // Validate whats the expected shard_id and batch_num + LOGE("Shard or batch number mismatch in iterator shard={}/{} batch_num={}/{}", shard_seq_num, + pg_iter->cur_shard_seq_num_, batch_number, cur_snapshot_batch_num); + return -1; + } + + if (pg_iter->end_of_scan()) { + // No more shards to read, baseline resync is finished after this. + snp_data->is_last_obj = true; + return 0; + } + + // Get next set of blobs in the batch. + std::vector< HSHomeObject::BlobInfoData > blob_data_vec; + bool end_of_shard; + auto result = pg_iter->get_next_blobs(1000, 64 * 1024 * 1024, blob_data_vec, end_of_shard); + if (result != 0) { + LOGE("Failed to get next blobs in snapshot read result={}", result); + return -1; + } + + // Create snapshot flatbuffer data. + pg_iter->create_blobs_snapshot_data(blob_data_vec, snp_data->blob, end_of_shard); + if (end_of_shard) { + cur_snapshot_batch_num = 0; + } else { + cur_snapshot_batch_num++; + } + return 0; +} + +void write_snapshot_data1(homeobject::HSHomeObject* home_object, std::shared_ptr< homestore::snapshot_context > context, + std::shared_ptr< homestore::snapshot_data > snp_data) { + // LOGE("write_snapshot_data not implemented"); + if (snp_data->is_last_obj) return; + int64_t obj_id = snp_data->offset; + if (obj_id == 0) { + snp_data->offset = 1 << 16; + auto snp = GetSizePrefixedResyncPGShardInfo(snp_data->blob.bytes()); + PGInfo pg_info{static_cast< pg_id_t >(snp->pg()->pg_id())}; + std::memcpy(&pg_info.replica_set_uuid, snp->pg()->replica_set_uuid()->Data(), + snp->pg()->replica_set_uuid()->size()); + for (auto const& m : *(snp->pg()->members())) { + peer_id_t peer; + std::string name{m->name()->begin(), m->name()->end()}; + std::memcpy(&peer, m->uuid()->Data(), m->uuid()->size()); + PGMember member{peer, name}; + pg_info.members.insert(std::move(member)); + } + + LOGINFO("write_snapshot_data Creating PG {}", pg_info.id, boost::uuids::to_string(pg_info.replica_set_uuid)); + for (auto& m : pg_info.members) { + LOGINFO("write_snapshot_data pg members {} {}", boost::uuids::to_string(m.id), m.name); + } + auto r = home_object->create_pg(std::move(pg_info)).get(); + RELEASE_ASSERT(!!r, "create pg failed"); + + return; + } + + auto snp = GetSizePrefixedResyncBlobDataBatch(snp_data->blob.bytes()); + for (auto const& b : *(snp->data_array())) { + Blob blob; + RELEASE_ASSERT(b->data_size() == b->data()->size(), "size mismatch"); + blob.body = sisl::io_blob_safe{b->data_size()}; + std::memcpy(blob.body.bytes(), b->data()->Data(), b->data()->size()); + blob.user_key = std::string(b->user_key()->begin(), b->user_key()->end()); + LOGINFO("write_snapshot_data put shard {} blob id {} {}", b->shard_id(), b->blob_id(), + hex_bytes(blob.body.cbytes(), 10)); + auto r = home_object->put(b->shard_id(), std::move(blob)).get(); + RELEASE_ASSERT(!!r, "put blob failed 1"); + } + + uint64_t shard_seq_num = obj_id >> 16; + uint64_t batch_number = obj_id & 0xFFFF; + if (snp->end_of_batch()) { + snp_data->offset = (shard_seq_num + 1) << 16; + } else { + snp_data->offset = (shard_seq_num << 16) | (batch_number + 1); + } +} + +TEST_F(HomeObjectFixture, TestLocalWrite) { + create_pg(1 /* pg_id */); + auto shard = _obj_inst->shard_manager()->create_shard(1 /* pg_id */, 64 * Mi).get(); + ASSERT_TRUE(!!shard); + auto shard_id = shard->id; + homeobject::Blob put_blob{sisl::io_blob_safe(4096 * 2, 4096), {}, 42ul}; + BitsGenerator::gen_random_bits(put_blob.body); + LOGINFO("Put blob pg {} shard {} data {}", 1, shard_id, hex_bytes(put_blob.body.cbytes(), 128)); + auto b = _obj_inst->blob_manager()->local_put(shard_id, std::move(put_blob)).get(); + if (!b && b.error() == BlobError::NOT_LEADER) { + LOGINFO("Failed to put blob due to not leader, sleep 1s and retry put", 1, shard_id); + return; + } + + auto blob_id = b.value(); + + auto g = _obj_inst->blob_manager()->get(shard_id, blob_id, 0, 0).get(); + ASSERT_TRUE(!!g); + auto result = std::move(g.value()); + LOGINFO("Get blob pg {} shard {} blob {} data {}", 1, shard_id, blob_id, hex_bytes(result.body.cbytes(), 128)); +} + +TEST_F(HomeObjectFixture, PGBlobIterator) { + uint64_t num_shards_per_pg = 3; + uint64_t num_blobs_per_shard = 5; + std::vector< std::pair< pg_id_t, shard_id_t > > pg_shard_id_vec; + blob_map_t blob_map; + + // Create blob size in range (1, 16kb) and user key in range (1, 1kb) + const uint32_t max_blob_size = 16 * 1024; + + create_pg(1 /* pg_id */); + for (uint64_t j = 0; j < num_shards_per_pg; j++) { + auto shard = _obj_inst->shard_manager()->create_shard(1 /* pg_id */, 64 * Mi).get(); + ASSERT_TRUE(!!shard); + pg_shard_id_vec.emplace_back(1, shard->id); + LOGINFO("pg {} shard {}", 1, shard->id); + } + + // Put blob for all shards in all pg's. + put_blob(blob_map, pg_shard_id_vec, num_blobs_per_shard, max_blob_size); + + auto ho = dynamic_cast< homeobject::HSHomeObject* >(_obj_inst.get()); + PG* pg1; + { + auto lg = std::shared_lock(ho->_pg_lock); + auto iter = ho->_pg_map.find(1); + ASSERT_TRUE(iter != ho->_pg_map.end()); + pg1 = iter->second.get(); + } + + auto pg1_iter = std::make_shared< homeobject::HSHomeObject::PGBlobIterator >(*ho, pg1->pg_info_.replica_set_uuid); + ASSERT_EQ(pg1_iter->end_of_scan(), false); + + // Verify PG shard meta data. + sisl::io_blob_safe meta_blob; + pg1_iter->create_pg_shard_snapshot_data(meta_blob); + ASSERT_TRUE(meta_blob.size() > 0); + + auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes()); + ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id); + auto u1 = pg_req->pg()->replica_set_uuid(); + auto u2 = pg1->pg_info_.replica_set_uuid; + ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end())); + { + auto i = pg_req->pg()->members()->begin(); + auto j = pg1->pg_info_.members.begin(); + for (; i != pg_req->pg()->members()->end() && j != pg1->pg_info_.members.end(); i++, j++) { + ASSERT_EQ(std::string(i->uuid()->begin(), i->uuid()->end()), std::string(j->id.begin(), j->id.end())); + } + } + + // Verify get blobs for pg. + uint64_t max_num_blobs_in_batch = 3, max_batch_size_bytes = 128 * Mi; + std::vector< HSHomeObject::BlobInfoData > blob_data_vec; + while (!pg1_iter->end_of_scan()) { + std::vector< HSHomeObject::BlobInfoData > vec; + bool end_of_shard; + auto result = pg1_iter->get_next_blobs(max_num_blobs_in_batch, max_batch_size_bytes, vec, end_of_shard); + ASSERT_EQ(result, 0); + for (auto& v : vec) { + blob_data_vec.push_back(std::move(v)); + } + } + + ASSERT_EQ(blob_data_vec.size(), num_shards_per_pg * num_blobs_per_shard); + for (auto& b : blob_data_vec) { + auto g = _obj_inst->blob_manager()->get(b.shard_id, b.blob_id, 0, 0).get(); + ASSERT_TRUE(!!g); + auto result = std::move(g.value()); + LOGINFO("Get blob pg {} shard {} blob {} len {} data {}", 1, b.shard_id, b.blob_id, b.blob.body.size(), + hex_bytes(result.body.cbytes(), 5)); + EXPECT_EQ(result.body.size(), b.blob.body.size()); + EXPECT_EQ(std::memcmp(result.body.bytes(), b.blob.body.cbytes(), result.body.size()), 0); + EXPECT_EQ(result.user_key.size(), b.blob.user_key.size()); + EXPECT_EQ(result.user_key, b.blob.user_key); + EXPECT_EQ(result.object_off, b.blob.object_off); + } + + class tmp_context : public homestore::snapshot_context { + public: + tmp_context(int64_t lsn) : homestore::snapshot_context(lsn) {} + virtual void deserialize(const sisl::io_blob_safe& snp_ctx) {}; + virtual sisl::io_blob_safe serialize() { return {}; }; + int64_t get_lsn() { return 0; } + }; + +#if 0 + auto context = std::make_shared< tmp_context >(0); + auto snapshot_data = std::make_shared< homestore::snapshot_data >(); + while (!snapshot_data->is_last_obj) { + read_snapshot_data1(ho, pg1->pg_info_.replica_set_uuid, context, snapshot_data); + write_snapshot_data1(ho, context, snapshot_data); + } + + delete r_cast< HSHomeObject::PGBlobIterator* >(snapshot_data->user_ctx); +#endif +} diff --git a/src/lib/homestore_backend/tests/hs_shard_tests.cpp b/src/lib/homestore_backend/tests/hs_shard_tests.cpp index b2d86dca6..9598fab11 100644 --- a/src/lib/homestore_backend/tests/hs_shard_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_shard_tests.cpp @@ -80,6 +80,7 @@ TEST_F(TestFixture, MockSealShard) { j["shard_info"]["shard_id_t"] = shard_info.id; j["shard_info"]["pg_id_t"] = shard_info.placement_group; j["shard_info"]["state"] = shard_info.state; + j["shard_info"]["lsn"] = shard_info.lsn; j["shard_info"]["created_time"] = shard_info.created_time; j["shard_info"]["modified_time"] = shard_info.last_modified_time; j["shard_info"]["total_capacity"] = shard_info.total_capacity_bytes; @@ -135,6 +136,7 @@ class ShardManagerTestingRecovery : public ::testing::Test { EXPECT_EQ(lhs.id, rhs.id); EXPECT_EQ(lhs.placement_group, rhs.placement_group); EXPECT_EQ(lhs.state, rhs.state); + EXPECT_EQ(lhs.lsn, rhs.lsn); EXPECT_EQ(lhs.created_time, rhs.created_time); EXPECT_EQ(lhs.last_modified_time, rhs.last_modified_time); EXPECT_EQ(lhs.available_capacity_bytes, rhs.available_capacity_bytes); diff --git a/src/lib/memory_backend/mem_blob_manager.cpp b/src/lib/memory_backend/mem_blob_manager.cpp index 0988483d1..bc8ff6b24 100644 --- a/src/lib/memory_backend/mem_blob_manager.cpp +++ b/src/lib/memory_backend/mem_blob_manager.cpp @@ -17,7 +17,7 @@ namespace homeobject { } else // Write (move) Blob to new BlobExt on heap and Insert BlobExt to Index -BlobManager::AsyncResult< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob) { +BlobManager::AsyncResult< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo const& _shard, Blob&& _blob, bool) { WITH_SHARD blob_id_t new_blob_id; { diff --git a/src/lib/memory_backend/mem_homeobject.hpp b/src/lib/memory_backend/mem_homeobject.hpp index c029ed3d7..bde743b56 100644 --- a/src/lib/memory_backend/mem_homeobject.hpp +++ b/src/lib/memory_backend/mem_homeobject.hpp @@ -38,7 +38,7 @@ class MemoryHomeObject : public HomeObjectImpl { ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override; // BlobManager - BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override; + BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, bool) override; BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0, uint64_t len = 0) const override; BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override; diff --git a/src/lib/memory_backend/mem_shard_manager.cpp b/src/lib/memory_backend/mem_shard_manager.cpp index 479366783..7546f9982 100644 --- a/src/lib/memory_backend/mem_shard_manager.cpp +++ b/src/lib/memory_backend/mem_shard_manager.cpp @@ -8,7 +8,7 @@ uint64_t ShardManager::max_shard_size() { return Gi; } ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) { auto const now = get_current_timestamp(); - auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, now, now, size_bytes, size_bytes, 0); + auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, 0, now, now, size_bytes, size_bytes, 0); { auto lg = std::scoped_lock(_pg_lock, _shard_lock); auto pg_it = _pg_map.find(pg_owner); diff --git a/src/lib/pg_manager.cpp b/src/lib/pg_manager.cpp index 4bb3d43d2..8ea5ab1bd 100644 --- a/src/lib/pg_manager.cpp +++ b/src/lib/pg_manager.cpp @@ -16,7 +16,7 @@ PGManager::NullAsyncResult HomeObjectImpl::create_pg(PGInfo&& pg_info) { if (member.priority > 0) saw_leader = true; peers.insert(member.id); } - if (!saw_ourself || !saw_leader) return folly::makeUnexpected(PGError::INVALID_ARG); + if (!saw_ourself || !saw_leader) {} // return folly::makeUnexpected(PGError::INVALID_ARG); return _create_pg(std::move(pg_info), peers); }