Skip to content

Commit bb827cc

Browse files
committed
add pending signaled wrs checking
1 parent 0708333 commit bb827cc

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

src/brpc/rdma/rdma_endpoint.cpp

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <gflags/gflags.h>
2121
#include "butil/fd_utility.h"
22+
#include "butil/fast_rand.h"
2223
#include "butil/logging.h" // CHECK, LOG
2324
#include "butil/sys_byteorder.h" // HostToNet,NetToHost
2425
#include "bthread/bthread.h"
@@ -191,6 +192,7 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
191192
, _remote_window_capacity(0)
192193
, _window_size(0)
193194
, _new_rq_wrs(0)
195+
, _pending_signaled_wrs(0)
194196
{
195197
if (_sq_size < MIN_QP_SIZE) {
196198
_sq_size = MIN_QP_SIZE;
@@ -875,6 +877,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
875877
// http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
876878
wr.send_flags |= IBV_SEND_SIGNALED;
877879
_sq_unsignaled = 0;
880+
_pending_signaled_wrs.fetch_add(1, butil::memory_order_relaxed);
878881
}
879882

880883
ibv_send_wr* bad = NULL;
@@ -904,6 +907,35 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
904907
return total_len;
905908
}
906909

910+
struct WrIdGen {
911+
bool init;
912+
uint32_t seq;
913+
uint64_t current_random;
914+
butil::FastRandSeed seed;
915+
};
916+
917+
static __thread WrIdGen tls_wr_id_gen = { false, 0, 0, { { 0, 0 } } };
918+
919+
inline uint64_t UpdateTLSRandom64(IdGen* g) {
920+
if (!g->init) {
921+
g->init = true;
922+
init_fast_rand_seed(&g->seed);
923+
}
924+
const uint64_t val = fast_rand(&g->seed);
925+
g->current_random = val;
926+
return val;
927+
}
928+
929+
inline uint64_t GenerateWrId() {
930+
// 0 is an invalid Id
931+
IdGen* g = &tls_wr_id_gen;
932+
if (g->seq == 0) {
933+
UpdateTLSRandom64(g);
934+
g->seq = 1;
935+
}
936+
return (g->current_random & 0xFFFFFFFF00000000ULL) | g->seq++;
937+
}
938+
907939
int RdmaEndpoint::SendAck(int num) {
908940
if (_new_rq_wrs.fetch_add(num, butil::memory_order_relaxed) > _remote_window_capacity / 2) {
909941
return SendImm(_new_rq_wrs.exchange(0, butil::memory_order_relaxed));
@@ -918,6 +950,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
918950

919951
ibv_send_wr wr;
920952
memset(&wr, 0, sizeof(wr));
953+
wr.wr_id = GenerateWrId();
921954
wr.opcode = IBV_WR_SEND_WITH_IMM;
922955
wr.imm_data = butil::HostToNet32(imm);
923956
wr.send_flags |= IBV_SEND_SOLICITED;
@@ -938,7 +971,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
938971
bool zerocopy = FLAGS_rdma_recv_zerocopy;
939972
switch (wc.opcode) {
940973
case IBV_WC_SEND: { // send completion
941-
// Do nothing
974+
if (wc.wr_id == 0) {
975+
_pending_signaled_wrs.fetch_sub(1, butil::memory_order_relaxed);
976+
}
942977
break;
943978
}
944979
case IBV_WC_RECV: { // recv completion
@@ -972,10 +1007,12 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
9721007

9731008
// Update window
9741009
uint32_t wnd_thresh = _local_window_capacity / 8;
975-
if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
976-
|| acks >= wnd_thresh) {
1010+
if (_pending_signaled_wrs.load(butil::memory_order_relaxed) <= 1 &&
1011+
(_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh ||
1012+
acks >= wnd_thresh)) {
9771013
// Do not wake up writing thread right after _window_size > 0.
978-
// Otherwise the writing thread may switch to background too quickly.
1014+
// Otherwise the writing thread may switch to background too
1015+
// quickly.
9791016
_socket->WakeAsEpollOut();
9801017
}
9811018
}

src/brpc/rdma/rdma_endpoint.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ friend class brpc::Socket;
261261
butil::atomic<uint16_t> _window_size;
262262
// The number of new WRs posted in the local Recv Queue
263263
butil::atomic<uint16_t> _new_rq_wrs;
264+
// The number of pending signaled send WRs
265+
butil::atomic<uint16_t> _pending_signaled_wrs;
264266

265267
// butex for inform read events on TCP fd during handshake
266268
butil::atomic<int> *_read_butex;

0 commit comments

Comments
 (0)