-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Description
Describe the bug
此错误代表:滑动窗口表示可发送,但RDMA的SQ(Send Queue)没有可用的空间发送请求;
滑动窗口代表发送方可继续发送的能力;当前,滑动窗口的更新时机为:poll到带有imm(也就是带有acks)的recv cqe,并且进行对应的处理时;
RDMA的SQ是一个环形buffer,其上存放发送但未完成的请求,SQ的指针更新时机为 poll 到 send cqe时;
这里我们可以发现:SQ和滑动窗口的更新时机不一致;
按照正常逻辑来讲,如果处理到带有imm的recv cqe,那么代表此前发送方所发出的RDMA Send已经完成,在recv cqe之前应该先生成send cqe;
如下图示意:
这是brpc期望的一个逻辑,在更新滑动窗口值前,先更新了SQ;
但遇到该问题时,说明情况是这样的:
发送方调用大量RDMA Send操作,占用完SQ;
接着发送方poll到带有imm的recv cqe(并且此时一定没有其他send cqe,如果有的话,按照brpc一次poll 32个cqe,也会触发SQ指针更新),进入处理逻辑更新滑动窗口值;
更新窗口值后,某个线程发现窗口值非0,尝试发送,但SQ没有可用空间存放请求了,于是报错;
为什么在带有imm的recv cqe之前没有send cqe呢?
根据brpc的逻辑,带有imm就代表接收方收到数据(意味着RDMA Send已经完成),并向发送方发送确认;
而此时发送方已经生成带有imm的recv cqe,那么为什么已经完成的RDMA Send还没生成send cqe?
查看了IB spec,不同WQ共享CQ他们生成cqe是不能确保顺序的,只能确保同一个WQ内部的cqe生成顺序:
所以,brpc的SQ和RQ共享了一个CQ后,不能确保发送方收到带有imm的recv cqe之前,一定生成send cqe;
可以看如下示意图:
因此我认为此问题由两个原因共同导致:
1.滑动窗口更新时机和RDMA SQ更新时机不一致;
2.cqe生成顺序的不确定性。
To Reproduce
由业务触发,本地测试程序没有复现成功。不过发现日志中报错时业务均在发送大量数据。
Expected behavior
我们期望的是:滑动窗口更新时机与RDMA SQ的更新时机保持一致;
目前我们实现了一个patch,保证滑动窗口更新时机一定在RDMA SQ的更新时机之后。(确保RDMA SQ有空间再更新滑动窗口)
我们新增了_remote_acks记录接收方确认的数目;因为同一个WQ内是保证顺序的,我们使用_sq_to_update队列记录unsignaled的数目,使用_sq_update_flag队列记录已经poll到的send cqe数目;
当处理带有imm的recv cqe时,判断此前是否poll到过send cqe,若没有poll到,仅将此次的acks累加到_remote_acks中,不进行窗口值更新。
分析:
此patch不影响正常逻辑(即先生成send cqe,后生成recv cqe),这种情况在此patch下没有任何区别;
当先生成recv cqe,后生成send cqe时,此patch强制要求窗口值更新在send cqe后,保证SQ不会溢出。
diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp
index 1d502a98..dac82bb7 100644
--- a/src/brpc/rdma/rdma_endpoint.cpp
+++ b/src/brpc/rdma/rdma_endpoint.cpp
@@ -101,7 +114,7 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1;
static butil::Mutex* g_rdma_resource_mutex = NULL;
static RdmaResource* g_rdma_resource_list = NULL;
-
+static butil::atomic<uint64_t> g_wr_id(0);
struct HelloMessage {
void Serialize(void* data) const;
void Deserialize(void* data);
@@ -191,7 +204,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
, _remote_window_capacity(0)
, _window_size(0)
, _new_rq_wrs(0)
+ , _remote_acks(0)
+ , _m_sq_unsignaled(0)
{
+ LOG(INFO) << "_remote_acks: " << _remote_acks.load();
+ LOG(INFO) << "_m_sq_unsignaled: " << _m_sq_unsignaled;
if (_sq_size < MIN_QP_SIZE) {
_sq_size = MIN_QP_SIZE;
}
@@ -208,6 +225,11 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
}
RdmaEndpoint::~RdmaEndpoint() {
+ LOG(INFO) << _window_size << " " << _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size() << " " << _sq_unsignaled << " " << _sq_unsignaled;
+ while(_sq_to_update.empty() == false) {
+ LOG(INFO) << _sq_to_update.front();
+ _sq_to_update.pop();
+ }
Reset();
bthread::butex_destroy(_read_butex);
}
@@ -231,6 +253,8 @@ void RdmaEndpoint::Reset() {
_new_rq_wrs = 0;
_sq_sent = 0;
_rq_received = 0;
+ _remote_acks.store(0, butil::memory_order_relaxed);
+ _m_sq_unsignaled.store(0, butil::memory_order_relaxed);
}
void RdmaConnect::StartConnect(const Socket* socket,
@@ -878,15 +902,36 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
}
ibv_send_wr* bad = NULL;
- int err = ibv_post_send(_resource->qp, &wr, &bad);
- if (err != 0) {
- // We use other way to guarantee the Send Queue is not full.
- // So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
+
+ wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+ {
+ BAIDU_SCOPED_LOCK(_m_sq_mutex);
+ int err = ibv_post_send(_resource->qp, &wr, &bad);
+ if (err != 0) {
+ // We use other way to guarantee the Send Queue is not full.
+ // So we just consider this error as an unrecoverable error.
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err)
<< ", window=" << window
<< ", sq_current=" << _sq_current;
errno = err;
return -1;
+ }
+ _m_sq_unsignaled.fetch_add(1, butil::memory_order_release);
+ uint16_t cur_unsignaled = 0;
+ if(wr.send_flags & IBV_SEND_SIGNALED) {
+ cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_acquire);
+ }
+ if(cur_unsignaled != 0) {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ _sq_to_update.push(cur_unsignaled);
+ //LOG(INFO) << "send signaled before: " << cur_unsignaled;
+ }
}
++_sq_current;
@@ -924,13 +969,26 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
wr.send_flags |= IBV_SEND_SIGNALED;
ibv_send_wr* bad = NULL;
- int err = ibv_post_send(_resource->qp, &wr, &bad);
- if (err != 0) {
- // We use other way to guarantee the Send Queue is not full.
- // So we just consider this error as an unrecoverable error.
- LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
- return -1;
+ wr.wr_id = g_wr_id.fetch_add(1, butil::memory_order_relaxed);
+ uint16_t cur_unsignaled = 0;
+ {
+ BAIDU_SCOPED_LOCK(_m_sq_mutex);
+ int err = ibv_post_send(_resource->qp, &wr, &bad);
+ if (err != 0) {
+ // We use other way to guarantee the Send Queue is not full.
+ // So we just consider this error as an unrecoverable error.
+ LOG(WARNING) << "Fail to ibv_post_send: " << berror(err);
+ return -1;
+ }
+ cur_unsignaled = _m_sq_unsignaled.exchange(0, butil::memory_order_relaxed);
+ {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ _sq_to_update.push(cur_unsignaled);
+ //LOG(INFO) << "wr id: " << wr.wr_id << " SendImm before: " << cur_unsignaled;
+ }
+
}
+
return 0;
}
@@ -938,8 +996,11 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
bool zerocopy = FLAGS_rdma_recv_zerocopy;
switch (wc.opcode) {
case IBV_WC_SEND: { // send completion
- // Do nothing
- break;
+ {
+ BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+ _sq_update_flag.push(true);
+ }
+ break;
}
case IBV_WC_RECV: { // recv completion
// Please note that only the first wc.byte_len bytes is valid
@@ -959,24 +1020,66 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
}
if (wc.imm_data > 0) {
// Clear sbuf here because we ignore event wakeup for send completions
- uint32_t acks = butil::NetToHost32(wc.imm_data);
- uint32_t num = acks;
- while (num > 0) {
- _sbuf[_sq_sent++].clear();
- if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
- _sq_sent = 0;
- }
- --num;
- }
- butil::subtle::MemoryBarrier();
// Update window
- uint32_t wnd_thresh = _local_window_capacity / 8;
- if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
- || acks >= wnd_thresh) {
+ //uint32_t wnd_thresh = _local_window_capacity / 8;
+ //if (_window_size.fetch_add(acks, butil::memory_order_relaxed) >= wnd_thresh
+ // || acks >= wnd_thresh) {
// Do not wake up writing thread right after _window_size > 0.
// Otherwise the writing thread may switch to background too quickly.
- _socket->WakeAsEpollOut();
+ // _socket->WakeAsEpollOut();
+ //}
+ uint32_t acks = butil::NetToHost32(wc.imm_data);
+ //LOG(INFO) << "acks: " << acks;
+ _remote_acks.fetch_add(acks, butil::memory_order_relaxed);
+ {
+ BAIDU_SCOPED_LOCK(_sq_update_mutex);
+ while(_sq_to_update.empty() == false && _remote_acks.load() >= _sq_to_update.front()) {
+ {
+ BAIDU_SCOPED_LOCK(_sq_flag_mutex);
+ if(_sq_update_flag.empty() == true) {
+ break;
+ }
+ _sq_update_flag.pop();
+ }
+ uint32_t wnd_to_update = _sq_to_update.front();
+ _sq_to_update.pop();
+ _remote_acks.fetch_sub(wnd_to_update, butil::memory_order_relaxed);
+
+ //LOG(INFO) << wnd_to_update << " " << _remote_acks << " " << _sq_update_flag.size() << " " << _sq_to_update.size();
+ uint32_t num = wnd_to_update;
+ while(num > 0) {
+ _sbuf[_sq_sent++].clear();
+ if (_sq_sent == _sq_size - RESERVED_WR_NUM) {
+ _sq_sent = 0;
+ }
+ --num;
+ }
+ butil::subtle::MemoryBarrier();
+ uint32_t wnd_thresh = _local_window_capacity / 8;
+ if (_window_size.fetch_add(wnd_to_update, butil::memory_order_relaxed) >= wnd_thresh
+ || acks >= wnd_thresh) {
+ // Do not wake up writing thread right after _window_size > 0.
+ // Otherwise the writing thread may switch to background too quickly.
+ _socket->WakeAsEpollOut();
+ }
+ }
}
}
diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h
index de7cd5f6..114eb682 100644
--- a/src/brpc/rdma/rdma_endpoint.h
+++ b/src/brpc/rdma/rdma_endpoint.h
@@ -31,7 +31,7 @@
#include "butil/containers/mpsc_queue.h"
#include "brpc/socket.h"
-
+#include <queue>
namespace brpc {
class Socket;
namespace rdma {
@@ -262,6 +262,13 @@ private:
// The number of new WRs posted in the local Recv Queue
butil::atomic<uint16_t> _new_rq_wrs;
+ butil::atomic<int> _remote_acks;
+ butil::Mutex _m_sq_mutex;
+ butil::Mutex _sq_update_mutex;
+ butil::Mutex _sq_flag_mutex;
+ butil::atomic<uint16_t> _m_sq_unsignaled;
+ std::queue<uint16_t> _sq_to_update;
+ std::queue<bool> _sq_update_flag;
// butex for inform read events on TCP fd during handshake
butil::atomic<int> *_read_butex;
**
Versions
OS:
Compiler:
brpc:
protobuf:
Additional context/screenshots