Skip to content
1 change: 1 addition & 0 deletions include/pulsar/MessageId.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class PULSAR_PUBLIC MessageId {
friend class MultiTopicsConsumerImpl;
friend class UnAckedMessageTrackerEnabled;
friend class BatchAcknowledgementTracker;
friend struct OpSendMsg;
friend class PulsarWrapper;
friend class PulsarFriend;
friend class NegativeAcksTracker;
Expand Down
22 changes: 11 additions & 11 deletions lib/BatchMessageContainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@
#include <stdexcept>

#include "LogUtils.h"
#include "OpSendMsg.h"

DECLARE_LOG_OBJECT()

namespace pulsar {

BatchMessageContainer::BatchMessageContainer(const ProducerImpl& producer)
: BatchMessageContainerBase(producer) {}
: BatchMessageContainerBase(producer), batch_(producerConfig_.getBatchingMaxMessages()) {}

BatchMessageContainer::~BatchMessageContainer() {
LOG_DEBUG(*this << " destructed");
LOG_DEBUG("[numberOfBatchesSent = " << numberOfBatchesSent_
<< "] [averageBatchSize_ = " << averageBatchSize_ << "]");
}

bool BatchMessageContainer::add(const Message& msg, const SendCallback& callback) {
bool BatchMessageContainer::add(const Message& msg, SendCallback&& callback) {
LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]");
batch_.add(msg, callback);
batch_.add(msg, std::move(callback));
updateStats(msg);
LOG_DEBUG("After add: " << *this);
return isFull();
Expand All @@ -52,14 +53,13 @@ void BatchMessageContainer::clear() {
LOG_DEBUG(*this << " clear() called");
}

Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg,
const FlushCallback& flushCallback) const {
return createOpSendMsgHelper(opSendMsg, flushCallback, batch_);
}

std::vector<Result> BatchMessageContainer::createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
const FlushCallback& flushCallback) const {
throw std::runtime_error("createOpSendMsgs is not supported for BatchMessageContainer");
std::unique_ptr<OpSendMsg> BatchMessageContainer::createOpSendMsg(const FlushCallback& flushCallback) {
auto op = createOpSendMsgHelper(batch_);
if (flushCallback) {
op->addTrackerCallback(flushCallback);
}
clear();
return op;
}

void BatchMessageContainer::serialize(std::ostream& os) const {
Expand Down
15 changes: 6 additions & 9 deletions lib/BatchMessageContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,22 @@ class BatchMessageContainer : public BatchMessageContainerBase {

~BatchMessageContainer();

size_t getNumBatches() const override { return 1; }
bool hasMultiOpSendMsgs() const override { return false; }

bool isFirstMessageToAdd(const Message& msg) const override { return batch_.empty(); }

bool add(const Message& msg, const SendCallback& callback) override;

void clear() override;

Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override;

std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
const FlushCallback& flushCallback) const override;
bool add(const Message& msg, SendCallback&& callback) override;

void serialize(std::ostream& os) const override;

std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback& flushCallback) override;

private:
MessageAndCallbackBatch batch_;
size_t numberOfBatchesSent_ = 0;
double averageBatchSize_ = 0;

void clear() override;
};

} // namespace pulsar
Expand Down
80 changes: 5 additions & 75 deletions lib/BatchMessageContainerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@
*/
#include "BatchMessageContainerBase.h"

#include "ClientConnection.h"
#include "CompressionCodec.h"
#include "MessageAndCallbackBatch.h"
#include "MessageCrypto.h"
#include "MessageImpl.h"
#include "OpSendMsg.h"
#include "ProducerImpl.h"
#include "PulsarApi.pb.h"
#include "SharedBuffer.h"

namespace pulsar {
Expand All @@ -37,78 +33,12 @@ BatchMessageContainerBase::BatchMessageContainerBase(const ProducerImpl& produce
producerId_(producer.producerId_),
msgCryptoWeakPtr_(producer.msgCrypto_) {}

Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
const FlushCallback& flushCallback,
const MessageAndCallbackBatch& batch) const {
opSendMsg.sendCallback_ = batch.createSendCallback();
opSendMsg.messagesCount_ = batch.messagesCount();
opSendMsg.messagesSize_ = batch.messagesSize();
BatchMessageContainerBase::~BatchMessageContainerBase() {}

if (flushCallback) {
auto sendCallback = opSendMsg.sendCallback_;
opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result, const MessageId& id) {
sendCallback(result, id);
flushCallback(result);
};
}

if (batch.empty()) {
return ResultOperationNotSupported;
}

MessageImplPtr impl = batch.msgImpl();
impl->metadata.set_num_messages_in_batch(batch.size());
auto compressionType = producerConfig_.getCompressionType();
if (compressionType != CompressionNone) {
impl->metadata.set_compression(static_cast<proto::CompressionType>(compressionType));
impl->metadata.set_uncompressed_size(impl->payload.readableBytes());
}
impl->payload = CompressionCodecProvider::getCodec(compressionType).encode(impl->payload);

auto msgCrypto = msgCryptoWeakPtr_.lock();
if (msgCrypto && producerConfig_.isEncryptionEnabled()) {
SharedBuffer encryptedPayload;
if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), producerConfig_.getCryptoKeyReader(),
impl->metadata, impl->payload, encryptedPayload)) {
return ResultCryptoError;
}
impl->payload = encryptedPayload;
}

if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
return ResultMessageTooBig;
}

opSendMsg.metadata_ = impl->metadata;
opSendMsg.payload_ = impl->payload;
opSendMsg.sequenceId_ = impl->metadata.sequence_id();
opSendMsg.producerId_ = producerId_;
opSendMsg.timeout_ = TimeUtils::now() + milliseconds(producerConfig_.getSendTimeout());

return ResultOk;
}

void BatchMessageContainerBase::processAndClear(
std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) {
if (isEmpty()) {
if (flushCallback) {
// do nothing, flushCallback complete until the lastOpSend complete
}
} else {
const auto numBatches = getNumBatches();
if (numBatches == 1) {
OpSendMsg opSendMsg;
Result result = createOpSendMsg(opSendMsg, flushCallback);
opSendMsgCallback(result, opSendMsg);
} else if (numBatches > 1) {
std::vector<OpSendMsg> opSendMsgs;
std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback);
for (size_t i = 0; i < results.size(); i++) {
opSendMsgCallback(results[i], opSendMsgs[i]);
}
} // else numBatches is 0, do nothing
}
clear();
std::unique_ptr<OpSendMsg> BatchMessageContainerBase::createOpSendMsgHelper(
MessageAndCallbackBatch& batch) const {
auto crypto = msgCryptoWeakPtr_.lock();
return batch.createOpSendMsg(producerId_, producerConfig_, crypto.get());
}

} // namespace pulsar
52 changes: 14 additions & 38 deletions lib/BatchMessageContainerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <boost/noncopyable.hpp>
#include <memory>
#include <stdexcept>
#include <vector>

namespace pulsar {
Expand All @@ -44,14 +45,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
public:
BatchMessageContainerBase(const ProducerImpl& producer);

virtual ~BatchMessageContainerBase() {}
virtual ~BatchMessageContainerBase();

/**
* Get number of batches in the batch message container
*
* @return number of batches
*/
virtual size_t getNumBatches() const = 0;
virtual bool hasMultiOpSendMsgs() const = 0;

/**
* Check the message will be the 1st message to be added to the batch
Expand All @@ -71,34 +67,16 @@ class BatchMessageContainerBase : public boost::noncopyable {
* @param callback message send callback
* @return true if the batch is full, otherwise false
*/
virtual bool add(const Message& msg, const SendCallback& callback) = 0;
virtual bool add(const Message& msg, SendCallback&& callback) = 0;

/**
* Clear the batch message container
*/
virtual void clear() = 0;

/**
* Create a OpSendMsg object to send
*
* @param opSendMsg the OpSendMsg object to create
* @param flushCallback the callback to trigger after the OpSendMsg was completed
* @return ResultOk if create successfully
* @note OpSendMsg's sendCallback_ must be set even if it failed
*/
virtual Result createOpSendMsg(OpSendMsg& opSendMsg,
const FlushCallback& flushCallback = nullptr) const = 0;
virtual std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback& flushCallback = nullptr) {
throw std::runtime_error("createOpSendMsg is not supported");
}

/**
* Create a OpSendMsg list to send
*
* @param opSendMsgList the OpSendMsg list to create
* @param flushCallback the callback to trigger after the OpSendMsg was completed
* @return all create results of `opSendMsgs`, ResultOk means create successfully
* @note OpSendMsg's sendCallback_ must be set even if it failed
*/
virtual std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
const FlushCallback& flushCallback = nullptr) const = 0;
virtual std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(
const FlushCallback& flushCallback = nullptr) {
throw std::runtime_error("createOpSendMsgs is not supported");
}

/**
* Serialize into a std::ostream for logging
Expand All @@ -110,9 +88,6 @@ class BatchMessageContainerBase : public boost::noncopyable {
bool hasEnoughSpace(const Message& msg) const noexcept;
bool isEmpty() const noexcept;

void processAndClear(std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
FlushCallback flushCallback);

protected:
// references to ProducerImpl's fields
const std::shared_ptr<std::string> topicName_;
Expand All @@ -134,8 +109,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
void updateStats(const Message& msg);
void resetStats();

Result createOpSendMsgHelper(OpSendMsg& opSendMsg, const FlushCallback& flushCallback,
const MessageAndCallbackBatch& batch) const;
std::unique_ptr<OpSendMsg> createOpSendMsgHelper(MessageAndCallbackBatch& flushCallback) const;

virtual void clear() = 0;
};

inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg) const noexcept {
Expand Down
56 changes: 24 additions & 32 deletions lib/BatchMessageKeyBasedContainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@ bool BatchMessageKeyBasedContainer::isFirstMessageToAdd(const Message& msg) cons
}
}

bool BatchMessageKeyBasedContainer::add(const Message& msg, const SendCallback& callback) {
bool BatchMessageKeyBasedContainer::add(const Message& msg, SendCallback&& callback) {
LOG_DEBUG("Before add: " << *this << " [message = " << msg << "]");
batches_[getKey(msg)].add(msg, callback);
const auto key = getKey(msg);
auto it = batches_.find(getKey(msg));
if (it == batches_.end()) {
// Do not preallocate for key based batching in case there are many keys
it = batches_.emplace(key, 1).first;
}
it->second.add(msg, std::move(callback));
updateStats(msg);
LOG_DEBUG("After add: " << *this);
return isFull();
Expand All @@ -72,38 +78,24 @@ void BatchMessageKeyBasedContainer::clear() {
LOG_DEBUG(*this << " clear() called");
}

Result BatchMessageKeyBasedContainer::createOpSendMsg(OpSendMsg& opSendMsg,
const FlushCallback& flushCallback) const {
if (batches_.size() < 1) {
return ResultOperationNotSupported;
}
return createOpSendMsgHelper(opSendMsg, flushCallback, batches_.begin()->second);
}

std::vector<Result> BatchMessageKeyBasedContainer::createOpSendMsgs(
std::vector<OpSendMsg>& opSendMsgs, const FlushCallback& flushCallback) const {
// Sorted the batches by sequence id
std::vector<const MessageAndCallbackBatch*> sortedBatches;
for (const auto& kv : batches_) {
sortedBatches.emplace_back(&kv.second);
}
std::sort(sortedBatches.begin(), sortedBatches.end(),
[](const MessageAndCallbackBatch* lhs, const MessageAndCallbackBatch* rhs) {
return lhs->sequenceId() < rhs->sequenceId();
});

size_t numBatches = sortedBatches.size();
opSendMsgs.resize(numBatches);

std::vector<Result> results(numBatches);
for (size_t i = 0; i + 1 < numBatches; i++) {
results[i] = createOpSendMsgHelper(opSendMsgs[i], nullptr, *sortedBatches[i]);
std::vector<std::unique_ptr<OpSendMsg>> BatchMessageKeyBasedContainer::createOpSendMsgs(
const FlushCallback& flushCallback) {
// Store raw pointers to use std::sort
std::vector<OpSendMsg*> rawOpSendMsgs;
for (auto& kv : batches_) {
rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
}
if (numBatches > 0) {
// Add flush callback to the last batch
results.back() = createOpSendMsgHelper(opSendMsgs.back(), flushCallback, *sortedBatches.back());
std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg* lhs, const OpSendMsg* rhs) {
return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId;
});
rawOpSendMsgs.back()->addTrackerCallback(flushCallback);

std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{rawOpSendMsgs.size()};
for (size_t i = 0; i < opSendMsgs.size(); i++) {
opSendMsgs[i].reset(rawOpSendMsgs[i]);
}
return results;
clear();
return opSendMsgs;
}

void BatchMessageKeyBasedContainer::serialize(std::ostream& os) const {
Expand Down
14 changes: 4 additions & 10 deletions lib/BatchMessageKeyBasedContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,13 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase {

~BatchMessageKeyBasedContainer();

size_t getNumBatches() const override { return batches_.size(); }
bool hasMultiOpSendMsgs() const override { return true; }

bool isFirstMessageToAdd(const Message& msg) const override;

bool add(const Message& msg, const SendCallback& callback) override;
bool add(const Message& msg, SendCallback&& callback) override;

void clear() override;

Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback) const override;

std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
const FlushCallback& flushCallback) const override;
std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(const FlushCallback& flushCallback) override;

void serialize(std::ostream& os) const override;

Expand All @@ -53,8 +48,7 @@ class BatchMessageKeyBasedContainer : public BatchMessageContainerBase {
size_t numberOfBatchesSent_ = 0;
double averageBatchSize_ = 0;

Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& flushCallback,
MessageAndCallbackBatch& batch) const;
void clear() override;
};

} // namespace pulsar
Expand Down
Loading