Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.17.4"
version = "6.17.5"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
29 changes: 16 additions & 13 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1423,13 +1423,6 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), {rreq->local_blkid()}, rreq);
}

if (!recovery) {
auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn());
RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn,
"Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}",
rreq->lsn(), prev_lsn);
}

// Remove the request from repl_key map only after the listener operation is completed.
// This prevents unnecessary block allocation in the following scenario:
// 1. The follower processes a commit for LSN 100 and remove rreq from rep_key map before listener commit
Expand All @@ -1440,6 +1433,13 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
m_repl_key_req_map.erase(rreq->rkey());
// Remove the request from lsn map.
m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq);

if (!recovery) {
auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn());
RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn,
"Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}",
rreq->lsn(), prev_lsn);
}
if (!rreq->is_proposer()) rreq->clear();
}

Expand Down Expand Up @@ -1878,18 +1878,21 @@ nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type,
m_commit_upto_lsn.load(), raft_req->get_commit_idx());

auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
auto last_commit_lsn = uint64_cast(get_last_commit_lsn());
auto local_start_lsn = uint64_cast(m_data_journal->start_index());
auto local_last_lsn = uint64_cast(m_data_journal->next_slot() - 1);
// set term=0 when there is no log entry, which will happen when SM first boot or after Baseline Re-sync
auto local_last_term = local_start_lsn > local_last_lsn ? 0 : m_data_journal->term_at(local_last_lsn);
for (unsigned long i = 0; i < entries.size(); i++) {
auto& entry = entries[i];
auto lsn = start_lsn + i;
auto term = entry->get_term();
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
// skipping localize for already committed log(dup), they anyway will be discard
// by nuraft before append_log.
if (lsn <= last_commit_lsn) {
RD_LOGT(NO_TRACE_ID, "Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", term, lsn,
last_commit_lsn);
// skipping localize for already appended log with the same term(dup), they anyway will be discard
// or rollback by nuraft before append_log, this way can avoid dup-append for committing logs
if (lsn <= local_last_lsn && term == local_last_term) {
RD_LOGT(NO_TRACE_ID, "Raft channel: term {}, lsn {}, skipping dup, local_last_lsn {}, local_last_term {}",
term, lsn, local_last_lsn, local_last_term);
continue;
}
// Those LSNs already in logstore but not yet committed, will be dedup here,
Expand Down
9 changes: 8 additions & 1 deletion src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,14 @@ class RaftReplDevTestBase : public testing::Test {
data_size == nullptr ? std::abs(std::lround(num_blks_gen(g_re))) * block_size : *data_size;
this->generate_writes(size, block_size, db);
});
if (wait_for_commit) { g_helper->runner().execute().get(); }
if (wait_for_commit) {
g_helper->runner().execute().get();
// wait for related rreqs being removed from map. this way to avoid rreqs reused in this case:
// 1. follower committing rreq
// 2. follower received a duplicated append log entries from leader, then get the rreq from map
// 3. follower finished commit, clear rreq, then the append thread hold an empty rreq.
std::this_thread::sleep_for(std::chrono::seconds{1});
}
break;
} else {
LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries,
Expand Down
Loading