diff --git a/src/rmq/rmqa/rmqa_connectionimpl.cpp b/src/rmq/rmqa/rmqa_connectionimpl.cpp index e663c01..ba3a6e5 100644 --- a/src/rmq/rmqa/rmqa_connectionimpl.cpp +++ b/src/rmq/rmqa/rmqa_connectionimpl.cpp @@ -50,6 +50,7 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQA.CONNECTIONIMPL") bsl::shared_ptr createRetryHandler(const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const rmqt::Tunables& tunables) { if (tunables.find("IIR") != tunables.end()) { @@ -58,12 +59,14 @@ createRetryHandler(const bsl::shared_ptr& timerFactory, return bsl::make_shared( timerFactory, errorCb, + successCb, bsl::make_shared( bsl::numeric_limits::max())); } return bsl::make_shared( timerFactory, errorCb, + successCb, bsl::make_shared()); } @@ -155,6 +158,7 @@ ConnectionImpl::ConnectionImpl( rmqio::EventLoop& loop, bdlmt::ThreadPool& threadPool, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& endpoint, const rmqt::Tunables& tunables, const bsl::shared_ptr& consumerFactory, @@ -164,6 +168,7 @@ ConnectionImpl::ConnectionImpl( , d_threadPool(threadPool) , d_eventLoop(loop) , d_onError(errorCb) +, d_onSuccess(successCb) , d_endpoint(endpoint) , d_consumerFactory(consumerFactory) , d_producerFactory(producerFactory) @@ -208,6 +213,7 @@ bsl::shared_ptr ConnectionImpl::make( rmqio::EventLoop& eventLoop, bdlmt::ThreadPool& threadPool, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& endpoint, const rmqt::Tunables& tunables, const bsl::shared_ptr& consumerFactory, @@ -218,6 +224,7 @@ bsl::shared_ptr ConnectionImpl::make( eventLoop, threadPool, errorCb, + successCb, endpoint, tunables, consumerFactory, @@ -290,6 +297,7 @@ ConnectionImpl::createProducerAsync(const rmqt::Topology& topology, exchange, createRetryHandler(d_eventLoop.timerFactory(), bsl::ref(d_onError), + bsl::ref(d_onSuccess), d_tunables))))); return sendChannelFuture.then( @@ -332,6 +340,7 @@ rmqt::Future ConnectionImpl::createConsumerAsync( consumerConfig, createRetryHandler(d_eventLoop.timerFactory(), bsl::ref(d_onError), + bsl::ref(d_onSuccess), d_tunables), ackQueue)))); diff --git a/src/rmq/rmqa/rmqa_connectionimpl.h b/src/rmq/rmqa/rmqa_connectionimpl.h index b389006..a8cbdaa 100644 --- a/src/rmq/rmqa/rmqa_connectionimpl.h +++ b/src/rmq/rmqa/rmqa_connectionimpl.h @@ -60,6 +60,7 @@ class ConnectionImpl : public rmqp::Connection, rmqio::EventLoop& eventLoop, bdlmt::ThreadPool& threadpool, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& endpoint, const rmqt::Tunables& tunables, const bsl::shared_ptr& consumerFactory, @@ -97,6 +98,7 @@ class ConnectionImpl : public rmqp::Connection, rmqio::EventLoop& loop, bdlmt::ThreadPool& threadPool, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& endpoint, const rmqt::Tunables& tunables, const bsl::shared_ptr& consumerFactory, @@ -112,6 +114,7 @@ class ConnectionImpl : public rmqp::Connection, bdlmt::ThreadPool& d_threadPool; rmqio::EventLoop& d_eventLoop; rmqt::ErrorCallback d_onError; + rmqt::SuccessCallback d_onSuccess; bsl::shared_ptr d_endpoint; ///< Held for logging bsl::shared_ptr d_consumerFactory; diff --git a/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp b/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp index aa918dd..f55b07e 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp +++ b/src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp @@ -68,6 +68,15 @@ void handleErrorCbOnEventLoop(bdlmt::ThreadPool* threadPool, } } +void handleSuccessCbOnEventLoop(bdlmt::ThreadPool* threadPool, + const rmqt::SuccessCallback& successCb) +{ + if (successCb) { + threadPool->enqueueJob( + bdlf::BindUtil::bind(successCb)); + } +} + void startFirstConnection( const bsl::weak_ptr& weakConn, const rmqamqp::Connection::ConnectedCallback& callback) @@ -88,6 +97,7 @@ void initiateConnection( rmqio::EventLoop& eventLoop, bdlmt::ThreadPool& threadPool, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& endpoint, const rmqt::Tunables& tunables, const bsl::shared_ptr& consumerFactory, @@ -110,6 +120,7 @@ void initiateConnection( eventLoop, threadPool, errorCb, + successCb, endpoint, tunables, consumerFactory, @@ -151,6 +162,9 @@ RabbitContextImpl::RabbitContextImpl( options.errorCallback(), bdlf::PlaceHolders::_1, bdlf::PlaceHolders::_2)) +, d_onSuccess(bdlf::BindUtil::bind(&handleSuccessCbOnEventLoop, + bsl::ref(d_threadPool), + options.successCallback())) , d_connectionMonitor( bsl::make_shared(options.messageProcessingTimeout())) , d_connectionFactory() @@ -169,6 +183,7 @@ RabbitContextImpl::RabbitContextImpl( options.shuffleConnectionEndpoints().value_or(false)), d_eventLoop->timerFactory(), d_onError, + d_onSuccess, metricPublisher, d_connectionMonitor, options.clientProperties(), @@ -362,6 +377,7 @@ rmqt::Future RabbitContextImpl::createNewConnection( bsl::ref(*d_eventLoop), bsl::ref(*d_threadPool), d_onError, + d_onSuccess, endpoint, d_tunables, consumerFactory, diff --git a/src/rmq/rmqa/rmqa_rabbitcontextimpl.h b/src/rmq/rmqa/rmqa_rabbitcontextimpl.h index f9271be..80ffe2f 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextimpl.h +++ b/src/rmq/rmqa/rmqa_rabbitcontextimpl.h @@ -75,6 +75,7 @@ class RabbitContextImpl : public rmqp::RabbitContext { bdlmt::ThreadPool* d_threadPool; bslma::ManagedPtr d_hostedThreadPool; rmqt::ErrorCallback d_onError; + rmqt::SuccessCallback d_onSuccess; bsl::shared_ptr d_connectionMonitor; bslma::ManagedPtr d_connectionFactory; rmqt::Tunables d_tunables; diff --git a/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp b/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp index 457da3e..8ac6dc5 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp +++ b/src/rmq/rmqa/rmqa_rabbitcontextoptions.cpp @@ -85,6 +85,13 @@ RabbitContextOptions::setErrorCallback(const rmqt::ErrorCallback& errorCallback) return *this; } +RabbitContextOptions& +RabbitContextOptions::setSuccessCallback(const rmqt::SuccessCallback& successCallback) +{ + d_onSuccess = successCallback; + return *this; +} + RabbitContextOptions& RabbitContextOptions::setMetricPublisher( const bsl::shared_ptr& metricPublisher) { diff --git a/src/rmq/rmqa/rmqa_rabbitcontextoptions.h b/src/rmq/rmqa/rmqa_rabbitcontextoptions.h index 0001d2d..3854254 100644 --- a/src/rmq/rmqa/rmqa_rabbitcontextoptions.h +++ b/src/rmq/rmqa/rmqa_rabbitcontextoptions.h @@ -78,6 +78,10 @@ class RabbitContextOptions { RabbitContextOptions& setErrorCallback(const rmqt::ErrorCallback& errorCallback); + /// \param successCallback function will be called when channel or + /// connection is restored + RabbitContextOptions& setSuccessCallback(const rmqt::SuccessCallback& successCallback); + /// \param name name of client property to set /// \param value value of client property /// NOTE: The following properties are set by default and can be @@ -144,6 +148,8 @@ class RabbitContextOptions { const rmqt::ErrorCallback& errorCallback() const { return d_onError; } + const rmqt::SuccessCallback& successCallback() const { return d_onSuccess; } + const rmqt::FieldTable& clientProperties() const { return d_clientProperties; @@ -184,6 +190,7 @@ class RabbitContextOptions { static const int DEFAULT_MESSAGE_PROCESSING_TIMEOUT = 60; bdlmt::ThreadPool* d_threadpool; rmqt::ErrorCallback d_onError; + rmqt::SuccessCallback d_onSuccess; bsl::shared_ptr d_metricPublisher; rmqt::FieldTable d_clientProperties; bsls::TimeInterval d_messageProcessingTimeout; diff --git a/src/rmq/rmqamqp/rmqamqp_connection.cpp b/src/rmq/rmqamqp/rmqamqp_connection.cpp index b13efcd..8adafed 100644 --- a/src/rmq/rmqamqp/rmqamqp_connection.cpp +++ b/src/rmq/rmqamqp/rmqamqp_connection.cpp @@ -984,11 +984,13 @@ Connection::Factory::Factory( const bsl::shared_ptr& resolver, const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& metricPublisher, const bsl::shared_ptr& connectionMonitor, const rmqt::FieldTable& clientProperties, const bsl::optional& connectionErrorThreshold) : d_errorCb(errorCb) +, d_successCb(successCb) , d_clientProperties(clientProperties) , d_metricPublisher(metricPublisher) , d_resolver(resolver) @@ -1027,11 +1029,13 @@ bsl::shared_ptr Connection::Factory::newRetryHandler() bsl::make_shared( d_timerFactory, d_errorCb, + d_successCb, bsl::make_shared(), *d_connectionErrorThreshold)) : bsl::make_shared( d_timerFactory, d_errorCb, + d_successCb, bsl::make_shared()); } diff --git a/src/rmq/rmqamqp/rmqamqp_connection.h b/src/rmq/rmqamqp/rmqamqp_connection.h index 49d84e6..7c1411b 100644 --- a/src/rmq/rmqamqp/rmqamqp_connection.h +++ b/src/rmq/rmqamqp/rmqamqp_connection.h @@ -311,6 +311,7 @@ class Connection::Factory { Factory(const bsl::shared_ptr& resolver, const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& metricPublisher, const bsl::shared_ptr& connectionMonitor, const rmqt::FieldTable& clientProperties, @@ -333,6 +334,7 @@ class Connection::Factory { Factory& operator=(const Factory&) BSLS_KEYWORD_DELETED; const rmqt::ErrorCallback d_errorCb; + const rmqt::SuccessCallback d_successCb; const rmqt::FieldTable d_clientProperties; const bsl::shared_ptr d_metricPublisher; const bsl::shared_ptr d_resolver; diff --git a/src/rmq/rmqio/rmqio_connectionretryhandler.cpp b/src/rmq/rmqio/rmqio_connectionretryhandler.cpp index 8fa15c9..9be4891 100644 --- a/src/rmq/rmqio/rmqio_connectionretryhandler.cpp +++ b/src/rmq/rmqio/rmqio_connectionretryhandler.cpp @@ -38,9 +38,10 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQIO.CONNECTIONRETRYHANDLER") ConnectionRetryHandler::ConnectionRetryHandler( const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& retryStrategy, const bsls::TimeInterval& errorThreshold) -: RetryHandler(timerFactory, errorCb, retryStrategy) +: RetryHandler(timerFactory, errorCb, successCb, retryStrategy) , d_errorThreshold(errorThreshold) , d_errorSince() @@ -57,6 +58,7 @@ void ConnectionRetryHandler::success() { RetryHandler::success(); d_errorSince.reset(); + successCallback()(); } void ConnectionRetryHandler::evaluateErrorThreshold() diff --git a/src/rmq/rmqio/rmqio_connectionretryhandler.h b/src/rmq/rmqio/rmqio_connectionretryhandler.h index 64d8aaf..6d16d98 100644 --- a/src/rmq/rmqio/rmqio_connectionretryhandler.h +++ b/src/rmq/rmqio/rmqio_connectionretryhandler.h @@ -42,6 +42,7 @@ class ConnectionRetryHandler : public RetryHandler { /// there has been no success within ConnectionRetryHandler(const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& retryStrategy, const bsls::TimeInterval& errorThreshold); diff --git a/src/rmq/rmqio/rmqio_retryhandler.cpp b/src/rmq/rmqio/rmqio_retryhandler.cpp index e685092..1757f28 100644 --- a/src/rmq/rmqio/rmqio_retryhandler.cpp +++ b/src/rmq/rmqio/rmqio_retryhandler.cpp @@ -32,6 +32,7 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("RMQIO.RETRYHANDLER") RetryHandler::RetryHandler(const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& retryStrategy) : d_sleepTimer(timerFactory->createWithCallback( bdlf::BindUtil::bind(&RetryHandler::handleRetry, @@ -40,6 +41,7 @@ RetryHandler::RetryHandler(const bsl::shared_ptr& timerFactory, , d_retryStrategy(retryStrategy) , d_retryCallback() , d_onError(errorCb) +, d_onSuccess(successCb) { } diff --git a/src/rmq/rmqio/rmqio_retryhandler.h b/src/rmq/rmqio/rmqio_retryhandler.h index cda03fd..87d180f 100644 --- a/src/rmq/rmqio/rmqio_retryhandler.h +++ b/src/rmq/rmqio/rmqio_retryhandler.h @@ -42,6 +42,7 @@ class RetryHandler { /// \param retryStrategy the retry strategy to use RetryHandler(const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& retryStrategy); virtual ~RetryHandler() {} @@ -58,6 +59,11 @@ class RetryHandler { return d_onError; } + virtual const rmqt::SuccessCallback& successCallback() const + { + return d_onSuccess; + } + private: RetryHandler(const RetryHandler&) BSLS_KEYWORD_DELETED; RetryHandler& operator=(const RetryHandler&) BSLS_KEYWORD_DELETED; @@ -70,6 +76,7 @@ class RetryHandler { bsl::shared_ptr d_retryStrategy; RetryCallback d_retryCallback; rmqt::ErrorCallback d_onError; + rmqt::SuccessCallback d_onSuccess; }; } // namespace rmqio diff --git a/src/rmq/rmqt/rmqt_result.h b/src/rmq/rmqt/rmqt_result.h index 1c70017..531be87 100644 --- a/src/rmq/rmqt/rmqt_result.h +++ b/src/rmq/rmqt/rmqt_result.h @@ -167,6 +167,10 @@ typedef bsl::function /// The normal execution of program will not be stopped. /// The callback will only bubble up the error details to the client. +typedef bsl::function SuccessCallback; +/// SuccessCallback function will be called on client thread, +/// whenever channel or connection will be restored. + } // namespace rmqt } // namespace BloombergLP diff --git a/src/tests/rmqa/rmqa_connectionimpl.t.cpp b/src/tests/rmqa/rmqa_connectionimpl.t.cpp index 901be13..71cb630 100644 --- a/src/tests/rmqa/rmqa_connectionimpl.t.cpp +++ b/src/tests/rmqa/rmqa_connectionimpl.t.cpp @@ -116,6 +116,7 @@ class ConnectionTests : public ::testing::Test { NiceMock d_eventLoop; bsl::shared_ptr d_hb; rmqt::ErrorCallback d_onError; + rmqt::SuccessCallback d_onSuccess; bsl::shared_ptr d_retryHandler; bsl::shared_ptr d_channelFactory; bsl::shared_ptr d_metricPublisher; @@ -140,9 +141,11 @@ class ConnectionTests : public ::testing::Test { , d_eventLoop(d_timerFactory) , d_hb(new rmqamqp::HeartbeatManagerImpl(d_timerFactory)) , d_onError() + , d_onSuccess() , d_retryHandler(bsl::make_shared( d_timerFactory, d_onError, + d_onSuccess, bsl::make_shared())) , d_channelFactory(bsl::make_shared()) , d_metricPublisher(bsl::make_shared()) @@ -200,6 +203,7 @@ TEST_F(ConnectionTests, BreathingTest) d_eventLoop, d_threadPool, d_onError, + d_onSuccess, d_endpoint, d_tunables, d_consumerFactory, @@ -218,6 +222,7 @@ TEST_F(ConnectionTests, CreateSyncSuccess) d_eventLoop, d_threadPool, d_onError, + d_onSuccess, d_endpoint, d_tunables, d_consumerFactory, @@ -254,6 +259,7 @@ TEST_F(ConnectionTests, CreateConsumer) d_eventLoop, d_threadPool, d_onError, + d_onSuccess, d_endpoint, d_tunables, d_consumerFactory, @@ -277,6 +283,7 @@ TEST_F(ConnectionTests, CreateProducer) d_eventLoop, d_threadPool, d_onError, + d_onSuccess, d_endpoint, d_tunables, d_consumerFactory, @@ -298,6 +305,7 @@ TEST_F(ConnectionTests, CloseCreatesTimerAndInvokesClose) d_eventLoop, d_threadPool, d_onError, + d_onSuccess, d_endpoint, d_tunables, d_consumerFactory, @@ -326,6 +334,7 @@ TEST_F(ConnectionTests, GracefulCloseHitsTimeoutAndSuccessfulCloseRace) d_eventLoop, d_threadPool, d_onError, + d_onSuccess, d_endpoint, d_tunables, d_consumerFactory, diff --git a/src/tests/rmqamqp/rmqamqp_connection.t.cpp b/src/tests/rmqamqp/rmqamqp_connection.t.cpp index 97621f5..c01ad8d 100644 --- a/src/tests/rmqamqp/rmqamqp_connection.t.cpp +++ b/src/tests/rmqamqp/rmqamqp_connection.t.cpp @@ -65,6 +65,8 @@ const char* TEST_VHOST = "vhostname"; void noOpOnError(const bsl::string&, int) {} +void noOpOnSuccess() {} + void noopCloseHandler() {} void gracefulCloseHandler(bool& invoked) { invoked = true; } @@ -325,6 +327,7 @@ class ConnectionFactory : public rmqamqp::Connection::Factory { const bsl::shared_ptr& resolver, const bsl::shared_ptr& timerFactory, const rmqt::ErrorCallback& errorCb, + const rmqt::SuccessCallback& successCb, const bsl::shared_ptr& metricPublisher, const rmqt::FieldTable& clientProperties, const bsl::shared_ptr& retryHandler, @@ -333,6 +336,7 @@ class ConnectionFactory : public rmqamqp::Connection::Factory { : rmqamqp::Connection::Factory(resolver, timerFactory, errorCb, + successCb, metricPublisher, bsl::make_shared(), clientProperties, @@ -363,6 +367,7 @@ class ConnectionTests : public ::testing::Test { public: ReplayFrame d_replayFrame; rmqt::ErrorCallback d_errorCallback; + rmqt::SuccessCallback d_successCallback; bsl::shared_ptr d_resolver; bsl::shared_ptr d_retryHandler; bsl::shared_ptr d_timerFactory; @@ -393,6 +398,7 @@ class ConnectionTests : public ::testing::Test { ConnectionTests() : d_replayFrame() , d_errorCallback(noOpOnError) + , d_successCallback(noOpOnSuccess) , d_resolver(bsl::make_shared()) , d_retryHandler(bsl::make_shared()) , d_timerFactory(bsl::make_shared()) @@ -409,6 +415,7 @@ class ConnectionTests : public ::testing::Test { , d_factory(bsl::make_shared(d_resolver, d_timerFactory, d_errorCallback, + d_successCallback, d_metricPublisher, d_clientProperties, d_retryHandler, @@ -686,6 +693,7 @@ TEST_F(ConnectionTests, ClientProperties) d_factory = bsl::make_shared(d_resolver, d_timerFactory, d_errorCallback, + d_successCallback, d_metricPublisher, overriddenClientProperties, d_retryHandler, @@ -724,6 +732,7 @@ TEST_F(ConnectionTests, ClientPropertiesCantOverrideReservedOnes) d_factory = bsl::make_shared(d_resolver, d_timerFactory, d_errorCallback, + d_successCallback, d_metricPublisher, overriddenClientProperties, d_retryHandler, diff --git a/src/tests/rmqio/rmqio_connectionretryhandler.t.cpp b/src/tests/rmqio/rmqio_connectionretryhandler.t.cpp index 4395e78..c663d35 100644 --- a/src/tests/rmqio/rmqio_connectionretryhandler.t.cpp +++ b/src/tests/rmqio/rmqio_connectionretryhandler.t.cpp @@ -56,7 +56,9 @@ class ConnectionRetryHandlerTests : public ::testing::Test { d_retryStrategy; StrictMock > d_onErrorCallback; + StrictMock> d_onSuccessCallback; rmqt::ErrorCallback d_onError; + rmqt::SuccessCallback d_onSuccess; bdlt::CurrentTime::CurrentTimeCallback d_oldTimeCb; ConnectionRetryHandlerTests() @@ -68,6 +70,9 @@ class ConnectionRetryHandlerTests : public ::testing::Test { &d_onErrorCallback, bdlf::PlaceHolders::_1, bdlf::PlaceHolders::_2)) + , d_onSuccess(bdlf::BindUtil::bind( + &testing::MockFunction::Call, + &d_onSuccessCallback)) , d_oldTimeCb(bdlt::CurrentTime::setCurrentTimeCallback(fixedTimeCb<0, 0>)) { } @@ -90,13 +95,13 @@ class ConnectionRetryHandlerTests : public ::testing::Test { TEST_F(ConnectionRetryHandlerTests, Breathing) { ConnectionRetryHandler ConnectionRetryHandler( - d_timerFactory, d_onError, d_retryStrategy, bsls::TimeInterval(2)); + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); } TEST_F(ConnectionRetryHandlerTests, RetryWithoutWait) { ConnectionRetryHandler ConnectionRetryHandler( - d_timerFactory, d_onError, d_retryStrategy, bsls::TimeInterval(2)); + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); int numRetries = 0; retryExpectations(); @@ -114,6 +119,7 @@ TEST_F(ConnectionRetryHandlerTests, MultipleRetry) ConnectionRetryHandler ConnectionRetryHandler( d_timerFactory, d_onError, + d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); /* 3 */ int numRetries = 0; @@ -138,6 +144,7 @@ TEST_F(ConnectionRetryHandlerTests, MultipleRetryWithWait) ConnectionRetryHandler ConnectionRetryHandler( d_timerFactory, d_onError, + d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); /* 1,20 */ int numRetries = 0; @@ -164,6 +171,7 @@ TEST_F(ConnectionRetryHandlerTests, NoPrematureRetry) ConnectionRetryHandler ConnectionRetryHandler( d_timerFactory, d_onError, + d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); /* 1,20 */ int numRetries = 0; @@ -191,6 +199,7 @@ TEST_F(ConnectionRetryHandlerTests, MultipleRetryWithWaitLimit) ConnectionRetryHandler ConnectionRetryHandler( d_timerFactory, d_onError, + d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); /* 1,20, 21 */ int numRetries = 0; @@ -228,7 +237,7 @@ TEST_F(ConnectionRetryHandlerTests, RetryIsNotCalledAfterBeingDestroyed) { ConnectionRetryHandler ConnectionRetryHandler( - d_timerFactory, d_onError, d_retryStrategy, bsls::TimeInterval(2)); + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); retryExpectations(); ConnectionRetryHandler.retry(rmqtestutil::CallCount(&numRetries)); @@ -253,7 +262,7 @@ TEST_F(ConnectionRetryHandlerTests, { ConnectionRetryHandler ConnectionRetryHandler( - d_timerFactory, d_onError, d_retryStrategy, bsls::TimeInterval(2)); + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); retryExpectations(); ConnectionRetryHandler.retry(rmqtestutil::CallCount(&numRetries)); @@ -281,7 +290,7 @@ TEST_F(ConnectionRetryHandlerTests, int numRetries = 0; ConnectionRetryHandler ConnectionRetryHandler( - d_timerFactory, d_onError, d_retryStrategy, bsls::TimeInterval(2)); + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy, bsls::TimeInterval(2)); retryExpectations(); ConnectionRetryHandler.retry(rmqtestutil::CallCount(&numRetries)); diff --git a/src/tests/rmqio/rmqio_retryhandler.t.cpp b/src/tests/rmqio/rmqio_retryhandler.t.cpp index 719ce63..11463a9 100644 --- a/src/tests/rmqio/rmqio_retryhandler.t.cpp +++ b/src/tests/rmqio/rmqio_retryhandler.t.cpp @@ -56,7 +56,9 @@ class RetryHandlerTests : public ::testing::Test { d_retryStrategy; StrictMock > d_onErrorCallback; + StrictMock> d_onSuccessCallback; rmqt::ErrorCallback d_onError; + rmqt::SuccessCallback d_onSuccess; bdlt::CurrentTime::CurrentTimeCallback d_oldTimeCb; RetryHandlerTests() @@ -68,6 +70,9 @@ class RetryHandlerTests : public ::testing::Test { &d_onErrorCallback, bdlf::PlaceHolders::_1, bdlf::PlaceHolders::_2)) + , d_onSuccess(bdlf::BindUtil::bind( + &testing::MockFunction::Call, + &d_onSuccessCallback)) , d_oldTimeCb(bdlt::CurrentTime::setCurrentTimeCallback(fixedTimeCb<0, 0>)) { } @@ -89,12 +94,12 @@ class RetryHandlerTests : public ::testing::Test { TEST_F(RetryHandlerTests, Breathing) { - RetryHandler retryHandler(d_timerFactory, d_onError, d_retryStrategy); + RetryHandler retryHandler(d_timerFactory, d_onError, d_onSuccess, d_retryStrategy); } TEST_F(RetryHandlerTests, RetryWithoutWait) { - RetryHandler retryHandler(d_timerFactory, d_onError, d_retryStrategy); + RetryHandler retryHandler(d_timerFactory, d_onError, d_onSuccess, d_retryStrategy); int numRetries = 0; retryExpectations(); @@ -110,7 +115,7 @@ TEST_F(RetryHandlerTests, MultipleRetry) { RetryHandler retryHandler( - d_timerFactory, d_onError, d_retryStrategy); /* 3 */ + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy); /* 3 */ int numRetries = 0; retryExpectations(); @@ -131,7 +136,7 @@ TEST_F(RetryHandlerTests, MultipleRetry) TEST_F(RetryHandlerTests, MultipleRetryWithWait) { RetryHandler retryHandler( - d_timerFactory, d_onError, d_retryStrategy); /* 1,20 */ + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy); /* 1,20 */ int numRetries = 0; retryExpectations(); @@ -154,7 +159,7 @@ TEST_F(RetryHandlerTests, MultipleRetryWithWait) TEST_F(RetryHandlerTests, NoPrematureRetry) { RetryHandler retryHandler( - d_timerFactory, d_onError, d_retryStrategy); /* 1,20 */ + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy); /* 1,20 */ int numRetries = 0; retryExpectations(); @@ -178,7 +183,7 @@ TEST_F(RetryHandlerTests, NoPrematureRetry) TEST_F(RetryHandlerTests, MultipleRetryWithWaitLimit) { RetryHandler retryHandler( - d_timerFactory, d_onError, d_retryStrategy); /* 1,20, 21 */ + d_timerFactory, d_onError, d_onSuccess, d_retryStrategy); /* 1,20, 21 */ int numRetries = 0; retryExpectations(); @@ -213,7 +218,7 @@ TEST_F(RetryHandlerTests, RetryIsNotCalledAfterBeingDestroyed) int numRetries = 0; { - RetryHandler retryHandler(d_timerFactory, d_onError, d_retryStrategy); + RetryHandler retryHandler(d_timerFactory, d_onError, d_onSuccess, d_retryStrategy); retryExpectations(); retryHandler.retry(rmqtestutil::CallCount(&numRetries)); diff --git a/src/tests/rmqtestutil/rmqtestutil_mockretryhandler.t.h b/src/tests/rmqtestutil/rmqtestutil_mockretryhandler.t.h index 334dce0..76a352d 100644 --- a/src/tests/rmqtestutil/rmqtestutil_mockretryhandler.t.h +++ b/src/tests/rmqtestutil/rmqtestutil_mockretryhandler.t.h @@ -34,6 +34,7 @@ class MockRetryHandler : public rmqio::RetryHandler { explicit MockRetryHandler() : rmqio::RetryHandler(bsl::make_shared(), rmqt::ErrorCallback(), + rmqt::SuccessCallback(), bsl::make_shared()) { }