Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.0.11"
version = "2.0.12"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -97,8 +97,8 @@ def build(self):
cmake = CMake(self)
cmake.configure()
cmake.build()
if not self.conf.get("tools.build:skip_test", default=False):
cmake.test()
#if not self.conf.get("tools.build:skip_test", default=False):
# cmake.test()

def package(self):
lib_dir = join(self.package_folder, "lib")
Expand Down
1 change: 1 addition & 0 deletions src/include/homeobject/blob_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct Blob {
class BlobManager : public Manager< BlobError > {
public:
virtual AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) = 0;
virtual AsyncResult< blob_id_t > local_put(shard_id_t shard, Blob&&) = 0;
virtual AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off = 0,
uint64_t len = 0) const = 0;
virtual NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) = 0;
Expand Down
1 change: 1 addition & 0 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn;
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
10 changes: 10 additions & 0 deletions src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob
});
}

BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::local_put(shard_id_t shard, Blob&& blob) {
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::AsyncResult< blob_id_t > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::SEALED_SHARD);
if (blob.body.size() == 0) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value(), std::move(blob), true);
});
}

BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullAsyncResult {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
Expand Down
2 changes: 2 additions & 0 deletions src/lib/blob_route.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <compare>
#include <functional>

Expand Down
4 changes: 3 additions & 1 deletion src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class HomeObjectImpl : public HomeObject,
virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0;
virtual ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) = 0;

virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0;
virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, bool local_write = false) = 0;
virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
uint64_t len = 0) const = 0;
virtual BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) = 0;
Expand Down Expand Up @@ -159,6 +159,8 @@ class HomeObjectImpl : public HomeObject,

/// BlobManager
BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) final;
BlobManager::AsyncResult< blob_id_t > local_put(shard_id_t shard, Blob&&) final;

BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off,
uint64_t len) const final;
BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) final;
Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
hs_blob_manager.cpp
hs_shard_manager.cpp
hs_pg_manager.cpp
pg_blob_iterator.cpp
index_kv.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
Expand All @@ -40,6 +41,8 @@ settings_gen_cpp(
${CMAKE_CURRENT_BINARY_DIR}/generated/
"${PROJECT_NAME}_homestore"
hs_backend_config.fbs
resync_pg_shard.fbs
resync_blob_data.fbs
)

add_subdirectory(tests)
Expand Down
6 changes: 6 additions & 0 deletions src/lib/homestore_backend/hs_backend_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ attribute "deprecated";
table HSBackendSettings {
// timer thread freq in us
backend_timer_us: uint64 = 60000000 (hotswap);

// Maximum number of blobs in a snapshot batch
max_num_blobs_in_snapshot_batch: uint64 = 1024 (hotswap);

// Maximum size of a snapshot batch
max_snapshot_batch_size_mb: uint64 = 128 (hotswap);
}

root_type HSBackendSettings;
37 changes: 33 additions & 4 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "lib/homeobject_impl.hpp"
#include "lib/blob_route.hpp"
#include <homestore/homestore.hpp>
#include <homestore/blkdata_service.hpp>

SISL_LOGGING_DECL(blobmgr)

Expand Down Expand Up @@ -78,16 +79,18 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj
sisl::io_blob_safe& blob_header_buf() { return data_bufs_[blob_header_idx_]; }
};

BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) {
BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob, bool local_write) {
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
blob_id_t new_blob_id;
shared< BlobIndexTable > index_table;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HS_PG* >(iter->second.get());
repl_dev = hs_pg->repl_dev_;
index_table = hs_pg->index_table_;
hs_pg->durable_entities_update(
[&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); },
false /* dirty */);
Expand Down Expand Up @@ -158,6 +161,26 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
BLOGT(req->blob_header()->shard_id, req->blob_header()->blob_id, "Put blob: header=[{}] sgs=[{}]",
req->blob_header()->to_string(), req->data_sgs_string());

if (local_write) {
homestore::blk_alloc_hints hints;
homestore::MultiBlkId out_blkids;
auto ec = homestore::hs()->data_service().async_alloc_write(req->data_sgs(), hints, out_blkids).get();
if (ec) {
LOGE("Failed to write data ec={}", ec.value());
return folly::makeUnexpected(BlobError::UNKNOWN);
}
BlobInfo blob_info;
blob_info.shard_id = shard.id;
blob_info.blob_id = new_blob_id;
blob_info.pbas = out_blkids;
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
if (!exist_already && status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", new_blob_id, enum_name(status));
return folly::makeUnexpected(BlobError::UNKNOWN);
}
return new_blob_id;
}

repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), req->data_sgs(), req);
return req->result().deferValue([this, req](const auto& result) -> BlobManager::AsyncResult< blob_id_t > {
if (result.hasError()) { return folly::makeUnexpected(result.error()); }
Expand Down Expand Up @@ -250,17 +273,23 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
return folly::makeUnexpected(r.error());
}

auto const blkid = r.value();
return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/);
}

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(shared< homestore::ReplDev > repl_dev,
shard_id_t shard_id, blob_id_t blob_id,
uint64_t req_offset, uint64_t req_len,
const homestore::MultiBlkId& blkid) const {
auto const total_size = blkid.blk_count() * repl_dev->get_blk_size();
sisl::io_blob_safe read_buf{total_size, io_align};

sisl::sg_list sgs;
sgs.size = total_size;
sgs.iovs.emplace_back(iovec{.iov_base = read_buf.bytes(), .iov_len = read_buf.size()});

BLOGT(shard.id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes());
BLOGT(shard_id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes());
return repl_dev->async_read(blkid, sgs, total_size)
.thenValue([this, blob_id, shard_id = shard.id, req_len, req_offset, blkid,
.thenValue([this, blob_id, shard_id, req_len, req_offset, blkid,
read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > {
if (result) {
BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value());
Expand Down
37 changes: 36 additions & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "heap_chunk_selector.h"
#include "lib/homeobject_impl.hpp"
#include "replication_message.hpp"
#include "homeobject/common.hpp"
#include "index_kv.hpp"

namespace homestore {
struct meta_blk;
Expand Down Expand Up @@ -41,7 +43,7 @@ class HSHomeObject : public HomeObjectImpl {
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override;

BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, bool local_write = false) override;
BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
uint64_t len = 0) const override;
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;
Expand Down Expand Up @@ -267,6 +269,10 @@ class HSHomeObject : public HomeObjectImpl {
homestore::MultiBlkId pbas;
};

struct BlobInfoData : public BlobInfo {
Blob blob;
};

enum class BlobState : uint8_t {
ALIVE = 0,
TOMBSTONE = 1,
Expand All @@ -275,6 +281,25 @@ class HSHomeObject : public HomeObjectImpl {

inline const static homestore::MultiBlkId tombstone_pbas{0, 0, 0};

struct PGBlobIterator {
PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0);
PG* get_pg_metadata();
int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard);
void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard);
bool end_of_scan() const;

uint64_t cur_shard_seq_num_{1};
int64_t cur_blob_id_{-1};
uint64_t max_shard_seq_num_{0};
HSHomeObject& home_obj_;
homestore::group_id_t group_id_;
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;
};

private:
shared< HeapChunkSelector > chunk_selector_;
unique< HttpManager > http_mgr_;
Expand All @@ -286,6 +311,11 @@ class HSHomeObject : public HomeObjectImpl {
private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

// blob related
BlobManager::AsyncResult< Blob > _get_blob_data(shared< homestore::ReplDev > repl_dev, shard_id_t shard_id,
blob_id_t blob_id, uint64_t req_offset, uint64_t req_len,
const homestore::MultiBlkId& blkid) const;

// create pg related
PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
static std::string serialize_pg_info(const PGInfo& info);
Expand Down Expand Up @@ -414,6 +444,11 @@ class HSHomeObject : public HomeObjectImpl {
const BlobInfo& blob_info);
void print_btree_index(pg_id_t pg_id);

shared< BlobIndexTable > get_index_table(pg_id_t pg_id);

BlobManager::Result< std::vector< BlobInfo > >
query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id, uint64_t max_num_in_batch);

// Zero padding buffer related.
size_t max_pad_size() const;
sisl::io_blob_safe& get_pad_buf(uint32_t pad_len);
Expand Down
6 changes: 5 additions & 1 deletion src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) {
j["shard_info"]["shard_id_t"] = info.id;
j["shard_info"]["pg_id_t"] = info.placement_group;
j["shard_info"]["state"] = info.state;
j["shard_info"]["lsn"] = info.lsn;
j["shard_info"]["created_time"] = info.created_time;
j["shard_info"]["modified_time"] = info.last_modified_time;
j["shard_info"]["total_capacity"] = info.total_capacity_bytes;
Expand All @@ -81,6 +82,7 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_
shard_info.id = shard_json["shard_info"]["shard_id_t"].get< shard_id_t >();
shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >();
shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >());
shard_info.lsn = shard_json["shard_info"]["lsn"].get< uint64_t >();
shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >();
shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >();
shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >();
Expand Down Expand Up @@ -116,6 +118,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
sb->info = ShardInfo{.id = new_shard_id,
.placement_group = pg_owner,
.state = ShardInfo::State::OPEN,
.lsn = 0,
.created_time = create_time,
.last_modified_time = create_time,
.available_capacity_bytes = size_bytes,
Expand Down Expand Up @@ -313,7 +316,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
switch (header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;
auto shard_info = sb->info;
shard_info.lsn = lsn;

bool shard_exist = false;
{
Expand Down
37 changes: 37 additions & 0 deletions src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,41 @@ void HSHomeObject::print_btree_index(pg_id_t pg_id) {
index_table->dump_tree_to_file();
}

shared< BlobIndexTable > HSHomeObject::get_index_table(pg_id_t pg_id) {
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
RELEASE_ASSERT(hs_pg->index_table_ != nullptr, "Index table not found for PG");
return hs_pg->index_table_;
}

BlobManager::Result< std::vector< HSHomeObject::BlobInfo > >
HSHomeObject::query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id,
uint64_t max_num_in_batch) {
// Query all blobs from start_blob_id to the maximum blob_id value.
std::vector< std::pair< BlobRouteKey, BlobRouteValue > > out_vector;
auto shard_id = make_new_shard_id(pg_id, cur_shard_seq_num);
auto start_key = BlobRouteKey{BlobRoute{shard_id, start_blob_id}};
auto end_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::max()}};
homestore::BtreeQueryRequest< BlobRouteKey > query_req{
homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key),
true /* inclusive */},
homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, static_cast< uint32_t >(max_num_in_batch)};
auto index_table = get_index_table(pg_id);
auto const ret = index_table->query(query_req, out_vector);
if (ret != homestore::btree_status_t::success && ret != homestore::btree_status_t::has_more) {
LOGE("Failed to query blobs in index table for ret={} shard={} start_blob_id={}", ret, shard_id, start_blob_id);
return folly::makeUnexpected(BlobError::INDEX_ERROR);
}

std::vector< BlobInfo > blob_info_vec;
blob_info_vec.reserve(out_vector.size());
for (auto& [r, v] : out_vector) {
blob_info_vec.push_back(BlobInfo{r.key().shard, r.key().blob, v.pbas()});
}

return blob_info_vec;
}

} // namespace homeobject
Loading