Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 55 additions & 33 deletions src/core/ext/transport/binder/wire_format/wire_reader_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,47 +248,58 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
transaction_code_t code, ReadableParcel* parcel) {
bool need_to_send_ack = false;
int64_t num_bytes = 0;
// Indicates which callbacks should be cancelled. It will be initialized as
// the flags the in-coming transaction carries, and when a particular
// callback is completed, the corresponding bit in cancellation_flag will be
// set to 0 so that we won't cancel it afterward.
int cancellation_flags = 0;
// The queue saves the actions needed to be done "WITHOUT" `mu_`.
// It prevents deadlock against wire writer issues.
std::queue<std::function<void()>> deferred_func_queue;
absl::Status tx_process_result;

{
grpc_core::MutexLock lock(&mu_);
if (!connected_) {
return absl::InvalidArgumentError("Transports not connected yet");
}

// Indicate which callbacks should be cancelled. It will be initialized as
// the flags the in-coming transaction carries, and when a particular
// callback is completed, the corresponding bit in cancellation_flag will be
// set to 0 so that we won't cancel it afterward.
int cancellation_flags = 0;
tx_process_result =
ProcessStreamingTransactionImpl(code, parcel, &cancellation_flags);
if (!tx_process_result.ok()) {
gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s",
tx_process_result.ToString().c_str());
// Something went wrong when receiving transaction. Cancel failed
// requests.
if (cancellation_flags & kFlagPrefix) {
gpr_log(GPR_INFO, "cancelling initial metadata");
transport_stream_receiver_->NotifyRecvInitialMetadata(
code, tx_process_result);
}
if (cancellation_flags & kFlagMessageData) {
gpr_log(GPR_INFO, "cancelling message data");
transport_stream_receiver_->NotifyRecvMessage(code, tx_process_result);
}
if (cancellation_flags & kFlagSuffix) {
gpr_log(GPR_INFO, "cancelling trailing metadata");
transport_stream_receiver_->NotifyRecvTrailingMetadata(
code, tx_process_result, 0);
}
}
tx_process_result = ProcessStreamingTransactionImpl(
code, parcel, &cancellation_flags, deferred_func_queue);
if ((num_incoming_bytes_ - num_acknowledged_bytes_) >=
kFlowControlAckBytes) {
need_to_send_ack = true;
num_bytes = num_incoming_bytes_;
num_acknowledged_bytes_ = num_incoming_bytes_;
}
}
// Executes all actions in the queue.
while (!deferred_func_queue.empty()) {
const auto& func = deferred_func_queue.front();
func();
deferred_func_queue.pop();
}

if (!tx_process_result.ok()) {
gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s",
tx_process_result.ToString().c_str());
// Something went wrong when receiving transaction. Cancel failed requests.
if (cancellation_flags & kFlagPrefix) {
gpr_log(GPR_INFO, "cancelling initial metadata");
transport_stream_receiver_->NotifyRecvInitialMetadata(code,
tx_process_result);
}
if (cancellation_flags & kFlagMessageData) {
gpr_log(GPR_INFO, "cancelling message data");
transport_stream_receiver_->NotifyRecvMessage(code, tx_process_result);
}
if (cancellation_flags & kFlagSuffix) {
gpr_log(GPR_INFO, "cancelling trailing metadata");
transport_stream_receiver_->NotifyRecvTrailingMetadata(
code, tx_process_result, 0);
}
}

if (need_to_send_ack) {
if (!wire_writer_ready_notification_.WaitForNotificationWithTimeout(
absl::Seconds(5))) {
Expand All @@ -310,7 +321,8 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction(
}

absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
transaction_code_t code, ReadableParcel* parcel, int* cancellation_flags) {
transaction_code_t code, ReadableParcel* parcel, int* cancellation_flags,
std::queue<std::function<void()>>& deferred_func_queue) {
GPR_ASSERT(cancellation_flags);
num_incoming_bytes_ += parcel->GetDataSize();
gpr_log(GPR_INFO, "Total incoming bytes: %" PRId64, num_incoming_bytes_);
Expand Down Expand Up @@ -380,8 +392,12 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
"binder.authority");
}
}
transport_stream_receiver_->NotifyRecvInitialMetadata(
code, *initial_metadata_or_error);
deferred_func_queue.emplace(
[this, code,
initial_metadata_or_error = std::move(initial_metadata_or_error)]() {
this->transport_stream_receiver_->NotifyRecvInitialMetadata(
code, initial_metadata_or_error);
});
*cancellation_flags &= ~kFlagPrefix;
}
if (flags & kFlagMessageData) {
Expand All @@ -396,7 +412,9 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
if ((flags & kFlagMessageDataIsPartial) == 0) {
std::string s = std::move(message_buffer_[code]);
message_buffer_.erase(code);
transport_stream_receiver_->NotifyRecvMessage(code, std::move(s));
deferred_func_queue.emplace([this, code, s = std::move(s)]() {
this->transport_stream_receiver_->NotifyRecvMessage(code, std::move(s));
});
}
*cancellation_flags &= ~kFlagMessageData;
}
Expand All @@ -416,8 +434,12 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl(
}
trailing_metadata = *trailing_metadata_or_error;
}
transport_stream_receiver_->NotifyRecvTrailingMetadata(
code, std::move(trailing_metadata), status);
deferred_func_queue.emplace(
[this, code, trailing_metadata = std::move(trailing_metadata),
status]() {
this->transport_stream_receiver_->NotifyRecvTrailingMetadata(
code, std::move(trailing_metadata), status);
});
*cancellation_flags &= ~kFlagSuffix;
}
return absl::OkStatus();
Expand Down
10 changes: 7 additions & 3 deletions src/core/ext/transport/binder/wire_format/wire_reader_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#include <grpc/support/port_platform.h>

#include <functional>
#include <memory>
#include <queue>
#include <utility>

#include "absl/container/flat_hash_map.h"
Expand Down Expand Up @@ -99,9 +101,11 @@ class WireReaderImpl : public WireReader {
private:
absl::Status ProcessStreamingTransaction(transaction_code_t code,
ReadableParcel* parcel);
absl::Status ProcessStreamingTransactionImpl(transaction_code_t code,
ReadableParcel* parcel,
int* cancellation_flags)
absl::Status ProcessStreamingTransactionImpl(
transaction_code_t code, ReadableParcel* parcel, int* cancellation_flags,
// The queue saves the actions needed to be done "WITHOUT" `mu_`.
// It prevents deadlock against wire writer issues.
std::queue<std::function<void()>>& deferred_func_queue)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

std::shared_ptr<TransportStreamReceiver> transport_stream_receiver_;
Expand Down