Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/rmq/rmqa/rmqa_connectionimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQA.CONNECTIONIMPL")
bsl::shared_ptr<rmqio::RetryHandler>
createRetryHandler(const bsl::shared_ptr<rmqio::TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const rmqt::Tunables& tunables)
{
if (tunables.find("IIR") != tunables.end()) {
Expand All @@ -58,12 +59,14 @@ createRetryHandler(const bsl::shared_ptr<rmqio::TimerFactory>& timerFactory,
return bsl::make_shared<rmqio::RetryHandler>(
timerFactory,
errorCb,
successCb,
bsl::make_shared<rmqio::BackoffLevelRetryStrategy>(
bsl::numeric_limits<unsigned int>::max()));
}
return bsl::make_shared<rmqio::RetryHandler>(
timerFactory,
errorCb,
successCb,
bsl::make_shared<rmqio::BackoffLevelRetryStrategy>());
}

Expand Down Expand Up @@ -155,6 +158,7 @@ ConnectionImpl::ConnectionImpl(
rmqio::EventLoop& loop,
bdlmt::ThreadPool& threadPool,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const rmqt::Tunables& tunables,
const bsl::shared_ptr<rmqa::ConsumerImpl::Factory>& consumerFactory,
Expand All @@ -164,6 +168,7 @@ ConnectionImpl::ConnectionImpl(
, d_threadPool(threadPool)
, d_eventLoop(loop)
, d_onError(errorCb)
, d_onSuccess(successCb)
, d_endpoint(endpoint)
, d_consumerFactory(consumerFactory)
, d_producerFactory(producerFactory)
Expand Down Expand Up @@ -208,6 +213,7 @@ bsl::shared_ptr<rmqa::ConnectionImpl> ConnectionImpl::make(
rmqio::EventLoop& eventLoop,
bdlmt::ThreadPool& threadPool,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const rmqt::Tunables& tunables,
const bsl::shared_ptr<rmqa::ConsumerImpl::Factory>& consumerFactory,
Expand All @@ -218,6 +224,7 @@ bsl::shared_ptr<rmqa::ConnectionImpl> ConnectionImpl::make(
eventLoop,
threadPool,
errorCb,
successCb,
endpoint,
tunables,
consumerFactory,
Expand Down Expand Up @@ -290,6 +297,7 @@ ConnectionImpl::createProducerAsync(const rmqt::Topology& topology,
exchange,
createRetryHandler(d_eventLoop.timerFactory(),
bsl::ref(d_onError),
bsl::ref(d_onSuccess),
d_tunables)))));

return sendChannelFuture.then<rmqp::Producer>(
Expand Down Expand Up @@ -332,6 +340,7 @@ rmqt::Future<rmqp::Consumer> ConnectionImpl::createConsumerAsync(
consumerConfig,
createRetryHandler(d_eventLoop.timerFactory(),
bsl::ref(d_onError),
bsl::ref(d_onSuccess),
d_tunables),
ackQueue))));

Expand Down
3 changes: 3 additions & 0 deletions src/rmq/rmqa/rmqa_connectionimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ConnectionImpl : public rmqp::Connection,
rmqio::EventLoop& eventLoop,
bdlmt::ThreadPool& threadpool,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const rmqt::Tunables& tunables,
const bsl::shared_ptr<rmqa::ConsumerImpl::Factory>& consumerFactory,
Expand Down Expand Up @@ -97,6 +98,7 @@ class ConnectionImpl : public rmqp::Connection,
rmqio::EventLoop& loop,
bdlmt::ThreadPool& threadPool,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const rmqt::Tunables& tunables,
const bsl::shared_ptr<rmqa::ConsumerImpl::Factory>& consumerFactory,
Expand All @@ -112,6 +114,7 @@ class ConnectionImpl : public rmqp::Connection,
bdlmt::ThreadPool& d_threadPool;
rmqio::EventLoop& d_eventLoop;
rmqt::ErrorCallback d_onError;
rmqt::SuccessCallback d_onSuccess;
bsl::shared_ptr<rmqt::Endpoint> d_endpoint;
///< Held for logging
bsl::shared_ptr<rmqa::ConsumerImpl::Factory> d_consumerFactory;
Expand Down
16 changes: 16 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ void handleErrorCbOnEventLoop(bdlmt::ThreadPool* threadPool,
}
}

void handleSuccessCbOnEventLoop(bdlmt::ThreadPool* threadPool,
const rmqt::SuccessCallback& successCb)
{
if (successCb) {
threadPool->enqueueJob(
bdlf::BindUtil::bind(successCb));
}
}

void startFirstConnection(
const bsl::weak_ptr<rmqamqp::Connection>& weakConn,
const rmqamqp::Connection::ConnectedCallback& callback)
Expand All @@ -88,6 +97,7 @@ void initiateConnection(
rmqio::EventLoop& eventLoop,
bdlmt::ThreadPool& threadPool,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const rmqt::Tunables& tunables,
const bsl::shared_ptr<rmqa::ConsumerImpl::Factory>& consumerFactory,
Expand All @@ -110,6 +120,7 @@ void initiateConnection(
eventLoop,
threadPool,
errorCb,
successCb,
endpoint,
tunables,
consumerFactory,
Expand Down Expand Up @@ -151,6 +162,9 @@ RabbitContextImpl::RabbitContextImpl(
options.errorCallback(),
bdlf::PlaceHolders::_1,
bdlf::PlaceHolders::_2))
, d_onSuccess(bdlf::BindUtil::bind(&handleSuccessCbOnEventLoop,
bsl::ref(d_threadPool),
options.successCallback()))
, d_connectionMonitor(
bsl::make_shared<ConnectionMonitor>(options.messageProcessingTimeout()))
, d_connectionFactory()
Expand All @@ -169,6 +183,7 @@ RabbitContextImpl::RabbitContextImpl(
options.shuffleConnectionEndpoints().value_or(false)),
d_eventLoop->timerFactory(),
d_onError,
d_onSuccess,
metricPublisher,
d_connectionMonitor,
options.clientProperties(),
Expand Down Expand Up @@ -362,6 +377,7 @@ rmqt::Future<rmqp::Connection> RabbitContextImpl::createNewConnection(
bsl::ref(*d_eventLoop),
bsl::ref(*d_threadPool),
d_onError,
d_onSuccess,
endpoint,
d_tunables,
consumerFactory,
Expand Down
1 change: 1 addition & 0 deletions src/rmq/rmqa/rmqa_rabbitcontextimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class RabbitContextImpl : public rmqp::RabbitContext {
bdlmt::ThreadPool* d_threadPool;
bslma::ManagedPtr<bdlmt::ThreadPool> d_hostedThreadPool;
rmqt::ErrorCallback d_onError;
rmqt::SuccessCallback d_onSuccess;
bsl::shared_ptr<ConnectionMonitor> d_connectionMonitor;
bslma::ManagedPtr<rmqamqp::Connection::Factory> d_connectionFactory;
rmqt::Tunables d_tunables;
Expand Down
7 changes: 7 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ RabbitContextOptions::setErrorCallback(const rmqt::ErrorCallback& errorCallback)
return *this;
}

RabbitContextOptions&
RabbitContextOptions::setSuccessCallback(const rmqt::SuccessCallback& successCallback)
{
d_onSuccess = successCallback;
return *this;
}

RabbitContextOptions& RabbitContextOptions::setMetricPublisher(
const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher)
{
Expand Down
7 changes: 7 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontextoptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class RabbitContextOptions {
RabbitContextOptions&
setErrorCallback(const rmqt::ErrorCallback& errorCallback);

/// \param successCallback function will be called when channel or
/// connection is restored
RabbitContextOptions& setSuccessCallback(const rmqt::SuccessCallback& successCallback);

/// \param name name of client property to set
/// \param value value of client property
/// NOTE: The following properties are set by default and can be
Expand Down Expand Up @@ -144,6 +148,8 @@ class RabbitContextOptions {

const rmqt::ErrorCallback& errorCallback() const { return d_onError; }

const rmqt::SuccessCallback& successCallback() const { return d_onSuccess; }

const rmqt::FieldTable& clientProperties() const
{
return d_clientProperties;
Expand Down Expand Up @@ -184,6 +190,7 @@ class RabbitContextOptions {
static const int DEFAULT_MESSAGE_PROCESSING_TIMEOUT = 60;
bdlmt::ThreadPool* d_threadpool;
rmqt::ErrorCallback d_onError;
rmqt::SuccessCallback d_onSuccess;
bsl::shared_ptr<rmqp::MetricPublisher> d_metricPublisher;
rmqt::FieldTable d_clientProperties;
bsls::TimeInterval d_messageProcessingTimeout;
Expand Down
4 changes: 4 additions & 0 deletions src/rmq/rmqamqp/rmqamqp_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -984,11 +984,13 @@ Connection::Factory::Factory(
const bsl::shared_ptr<rmqio::Resolver>& resolver,
const bsl::shared_ptr<rmqio::TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher,
const bsl::shared_ptr<ConnectionMonitor>& connectionMonitor,
const rmqt::FieldTable& clientProperties,
const bsl::optional<bsls::TimeInterval>& connectionErrorThreshold)
: d_errorCb(errorCb)
, d_successCb(successCb)
, d_clientProperties(clientProperties)
, d_metricPublisher(metricPublisher)
, d_resolver(resolver)
Expand Down Expand Up @@ -1027,11 +1029,13 @@ bsl::shared_ptr<rmqio::RetryHandler> Connection::Factory::newRetryHandler()
bsl::make_shared<rmqio::ConnectionRetryHandler>(
d_timerFactory,
d_errorCb,
d_successCb,
bsl::make_shared<rmqio::BackoffLevelRetryStrategy>(),
*d_connectionErrorThreshold))
: bsl::make_shared<rmqio::RetryHandler>(
d_timerFactory,
d_errorCb,
d_successCb,
bsl::make_shared<rmqio::BackoffLevelRetryStrategy>());
}

Expand Down
2 changes: 2 additions & 0 deletions src/rmq/rmqamqp/rmqamqp_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class Connection::Factory {
Factory(const bsl::shared_ptr<rmqio::Resolver>& resolver,
const bsl::shared_ptr<rmqio::TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher,
const bsl::shared_ptr<ConnectionMonitor>& connectionMonitor,
const rmqt::FieldTable& clientProperties,
Expand All @@ -333,6 +334,7 @@ class Connection::Factory {
Factory& operator=(const Factory&) BSLS_KEYWORD_DELETED;

const rmqt::ErrorCallback d_errorCb;
const rmqt::SuccessCallback d_successCb;
const rmqt::FieldTable d_clientProperties;
const bsl::shared_ptr<rmqp::MetricPublisher> d_metricPublisher;
const bsl::shared_ptr<rmqio::Resolver> d_resolver;
Expand Down
4 changes: 3 additions & 1 deletion src/rmq/rmqio/rmqio_connectionretryhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQIO.CONNECTIONRETRYHANDLER")
ConnectionRetryHandler::ConnectionRetryHandler(
const bsl::shared_ptr<TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<RetryStrategy>& retryStrategy,
const bsls::TimeInterval& errorThreshold)
: RetryHandler(timerFactory, errorCb, retryStrategy)
: RetryHandler(timerFactory, errorCb, successCb, retryStrategy)
, d_errorThreshold(errorThreshold)
, d_errorSince()

Expand All @@ -57,6 +58,7 @@ void ConnectionRetryHandler::success()
{
RetryHandler::success();
d_errorSince.reset();
successCallback()();
}

void ConnectionRetryHandler::evaluateErrorThreshold()
Expand Down
1 change: 1 addition & 0 deletions src/rmq/rmqio/rmqio_connectionretryhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class ConnectionRetryHandler : public RetryHandler {
/// there has been no success within
ConnectionRetryHandler(const bsl::shared_ptr<TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<RetryStrategy>& retryStrategy,
const bsls::TimeInterval& errorThreshold);

Expand Down
2 changes: 2 additions & 0 deletions src/rmq/rmqio/rmqio_retryhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQIO.RETRYHANDLER")

RetryHandler::RetryHandler(const bsl::shared_ptr<TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<RetryStrategy>& retryStrategy)
: d_sleepTimer(timerFactory->createWithCallback(
bdlf::BindUtil::bind(&RetryHandler::handleRetry,
Expand All @@ -40,6 +41,7 @@ RetryHandler::RetryHandler(const bsl::shared_ptr<TimerFactory>& timerFactory,
, d_retryStrategy(retryStrategy)
, d_retryCallback()
, d_onError(errorCb)
, d_onSuccess(successCb)
{
}

Expand Down
7 changes: 7 additions & 0 deletions src/rmq/rmqio/rmqio_retryhandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RetryHandler {
/// \param retryStrategy the retry strategy to use
RetryHandler(const bsl::shared_ptr<TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const rmqt::SuccessCallback& successCb,
const bsl::shared_ptr<RetryStrategy>& retryStrategy);

virtual ~RetryHandler() {}
Expand All @@ -58,6 +59,11 @@ class RetryHandler {
return d_onError;
}

virtual const rmqt::SuccessCallback& successCallback() const
{
return d_onSuccess;
}

private:
RetryHandler(const RetryHandler&) BSLS_KEYWORD_DELETED;
RetryHandler& operator=(const RetryHandler&) BSLS_KEYWORD_DELETED;
Expand All @@ -70,6 +76,7 @@ class RetryHandler {
bsl::shared_ptr<RetryStrategy> d_retryStrategy;
RetryCallback d_retryCallback;
rmqt::ErrorCallback d_onError;
rmqt::SuccessCallback d_onSuccess;
};

} // namespace rmqio
Expand Down
4 changes: 4 additions & 0 deletions src/rmq/rmqt/rmqt_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ typedef bsl::function<void(const bsl::string& errorText, int errorCode)>
/// The normal execution of program will not be stopped.
/// The callback will only bubble up the error details to the client.

typedef bsl::function<void()> SuccessCallback;
/// SuccessCallback function will be called on client thread,
/// whenever channel or connection will be restored.

} // namespace rmqt
} // namespace BloombergLP

Expand Down
9 changes: 9 additions & 0 deletions src/tests/rmqa/rmqa_connectionimpl.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class ConnectionTests : public ::testing::Test {
NiceMock<rmqtestutil::MockEventLoop> d_eventLoop;
bsl::shared_ptr<rmqamqp::HeartbeatManager> d_hb;
rmqt::ErrorCallback d_onError;
rmqt::SuccessCallback d_onSuccess;
bsl::shared_ptr<rmqio::RetryHandler> d_retryHandler;
bsl::shared_ptr<rmqamqp::ChannelFactory> d_channelFactory;
bsl::shared_ptr<rmqtestutil::MockMetricPublisher> d_metricPublisher;
Expand All @@ -140,9 +141,11 @@ class ConnectionTests : public ::testing::Test {
, d_eventLoop(d_timerFactory)
, d_hb(new rmqamqp::HeartbeatManagerImpl(d_timerFactory))
, d_onError()
, d_onSuccess()
, d_retryHandler(bsl::make_shared<rmqio::RetryHandler>(
d_timerFactory,
d_onError,
d_onSuccess,
bsl::make_shared<rmqio::BackoffLevelRetryStrategy>()))
, d_channelFactory(bsl::make_shared<rmqamqp::ChannelFactory>())
, d_metricPublisher(bsl::make_shared<rmqtestutil::MockMetricPublisher>())
Expand Down Expand Up @@ -200,6 +203,7 @@ TEST_F(ConnectionTests, BreathingTest)
d_eventLoop,
d_threadPool,
d_onError,
d_onSuccess,
d_endpoint,
d_tunables,
d_consumerFactory,
Expand All @@ -218,6 +222,7 @@ TEST_F(ConnectionTests, CreateSyncSuccess)
d_eventLoop,
d_threadPool,
d_onError,
d_onSuccess,
d_endpoint,
d_tunables,
d_consumerFactory,
Expand Down Expand Up @@ -254,6 +259,7 @@ TEST_F(ConnectionTests, CreateConsumer)
d_eventLoop,
d_threadPool,
d_onError,
d_onSuccess,
d_endpoint,
d_tunables,
d_consumerFactory,
Expand All @@ -277,6 +283,7 @@ TEST_F(ConnectionTests, CreateProducer)
d_eventLoop,
d_threadPool,
d_onError,
d_onSuccess,
d_endpoint,
d_tunables,
d_consumerFactory,
Expand All @@ -298,6 +305,7 @@ TEST_F(ConnectionTests, CloseCreatesTimerAndInvokesClose)
d_eventLoop,
d_threadPool,
d_onError,
d_onSuccess,
d_endpoint,
d_tunables,
d_consumerFactory,
Expand Down Expand Up @@ -326,6 +334,7 @@ TEST_F(ConnectionTests, GracefulCloseHitsTimeoutAndSuccessfulCloseRace)
d_eventLoop,
d_threadPool,
d_onError,
d_onSuccess,
d_endpoint,
d_tunables,
d_consumerFactory,
Expand Down
Loading