2525#include " butil/iobuf.h" // butil::IOBuf
2626#include " butil/raw_pack.h" // RawPacker RawUnpacker
2727#include " brpc/controller.h" // Controller
28+ #include " brpc/errno.pb.h"
2829#include " brpc/socket.h" // Socket
2930#include " brpc/server.h" // Server
3031#include " brpc/span.h"
@@ -216,7 +217,9 @@ void SendRpcResponse(int64_t correlation_id,
216217 if (Socket::Address (response_stream_id, &stream_ptr) == 0 ) {
217218 Stream* s = (Stream*)stream_ptr->conn ();
218219 s->FillSettings (meta.mutable_stream_settings ());
219- s->SetHostSocket (sock);
220+ if (s->SetHostSocket (sock) != 0 ) {
221+ LOG (WARNING) << " SetHostSocket failed" ;
222+ }
220223 } else {
221224 LOG (WARNING) << " Stream=" << response_stream_id
222225 << " was closed before sending response" ;
@@ -247,6 +250,25 @@ void SendRpcResponse(int64_t correlation_id,
247250 // Send rpc response over stream even if server side failed to create
248251 // stream for some reason.
249252 if (cntl->has_remote_stream ()){
253+ // We have to check sock->Failed() before calling SetConnected().
254+ // Otherwise stream->_host_socket may be nullptr and cause CHECK failures.
255+ if (sock->Failed ()) {
256+ LOG (WARNING) << " Fail to write into " << *sock;
257+ cntl->SetFailed (EFAILEDSOCKET, " Fail to write into %s" ,
258+ sock->description ().c_str ());
259+ if (stream_ptr) {
260+ ((Stream *)stream_ptr->conn ())->Close ();
261+ }
262+ return ;
263+ }
264+ // If we don't set connected here before send the response,
265+ // client-side may close the stream before server-side set connected.
266+ // This will cause missing on_closed message on the client-side.
267+ if (stream_ptr) {
268+ // Now it's ok the mark this server-side stream as connectted as all the
269+ // written user data would follower the RPC response.
270+ ((Stream*)stream_ptr->conn ())->SetConnected ();
271+ }
250272 // Send the response over stream to notify that this stream connection
251273 // is successfully built.
252274 // Response_stream can be INVALID_STREAM_ID when error occurs.
@@ -262,12 +284,6 @@ void SendRpcResponse(int64_t correlation_id,
262284 }
263285 return ;
264286 }
265-
266- if (stream_ptr) {
267- // Now it's ok the mark this server-side stream as connected as all the
268- // written user data would follower the RPC response.
269- ((Stream*)stream_ptr->conn ())->SetConnected ();
270- }
271287 } else {
272288 // Have the risk of unlimited pending responses, in which case, tell
273289 // users to set max_concurrency.
0 commit comments