Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ bsl::shared_ptr<rmqa::VHost> RabbitContext::createVHostConnection(
.managedPtr()));
}

bsl::shared_ptr<rmqa::VHost> RabbitContext::createVHostConnection(
const bsl::string& userDefinedName,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback)
{
return bsl::shared_ptr<VHost>(
new VHost(d_impl
->createVHostConnection(
userDefinedName, endpoint, credentials, errorCallback)
.managedPtr()));
}

bsl::shared_ptr<rmqa::VHost>
RabbitContext::createVHostConnection(const bsl::string& userDefinedName,
const rmqt::VHostInfo& vhostInfo)
Expand Down
6 changes: 6 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ class RabbitContext {
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials);

bsl::shared_ptr<VHost>
createVHostConnection(const bsl::string& userDefinedName,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback);

/// \brief Connect to a RabbitMQ broker
///
/// \param vhostInfo identifies the broker to connect to & authentication
Expand Down
26 changes: 24 additions & 2 deletions src/rmq/rmqa/rmqa_rabbitcontextimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ RabbitContextImpl::RabbitContextImpl(
d_eventLoop->resolver(
options.shuffleConnectionEndpoints().value_or(false)),
d_eventLoop->timerFactory(),
d_onError,
metricPublisher,
d_connectionMonitor,
options.clientProperties(),
Expand Down Expand Up @@ -281,6 +280,29 @@ bsl::shared_ptr<rmqp::Connection> RabbitContextImpl::createVHostConnection(
bdlf::PlaceHolders::_1));
}

bsl::shared_ptr<rmqp::Connection> RabbitContextImpl::createVHostConnection(
const bsl::string& userDefinedName,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback)
{
// Override the error callback provided in RabbitContext
// by a per vhost error callback
d_onError = bdlf::BindUtil::bind(&handleErrorCbOnEventLoop,
bsl::ref(d_threadPool),
errorCallback,
bdlf::PlaceHolders::_1,
bdlf::PlaceHolders::_2);

return bsl::make_shared<rmqa::VHostImpl>(
bdlf::BindUtil::bind(&RabbitContextImpl::createNewConnection,
this,
userDefinedName,
endpoint,
credentials,
bdlf::PlaceHolders::_1));
}

bsl::shared_ptr<rmqp::Connection>
RabbitContextImpl::createVHostConnection(const bsl::string& userDefinedName,
const rmqt::VHostInfo& vhostInfo)
Expand Down Expand Up @@ -333,7 +355,7 @@ rmqt::Future<rmqp::Connection> RabbitContextImpl::createNewConnection(
}

bsl::shared_ptr<rmqamqp::Connection> amqpConn =
d_connectionFactory->create(endpoint, credentials, name);
d_connectionFactory->create(endpoint, credentials, d_onError, name);

// The cancel function is given `amqpConn` which is what keeps it alive
// until the shared_ptr<rmqamqp::Connection> is retrieved in
Expand Down
6 changes: 6 additions & 0 deletions src/rmq/rmqa/rmqa_rabbitcontextimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class RabbitContextImpl : public rmqp::RabbitContext {
const bsl::shared_ptr<rmqt::Credentials>& credentials)
BSLS_KEYWORD_OVERRIDE;

bsl::shared_ptr<rmqp::Connection> createVHostConnection(
const bsl::string& userDefinedName,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback) BSLS_KEYWORD_OVERRIDE;

bsl::shared_ptr<rmqp::Connection> createVHostConnection(
const bsl::string& userDefinedName,
const rmqt::VHostInfo& endpoint) BSLS_KEYWORD_OVERRIDE;
Expand Down
14 changes: 7 additions & 7 deletions src/rmq/rmqamqp/rmqamqp_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -983,13 +983,11 @@ bsl::string Connection::connectionDebugName() const
Connection::Factory::Factory(
const bsl::shared_ptr<rmqio::Resolver>& resolver,
const bsl::shared_ptr<rmqio::TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher,
const bsl::shared_ptr<ConnectionMonitor>& connectionMonitor,
const rmqt::FieldTable& clientProperties,
const bsl::optional<bsls::TimeInterval>& connectionErrorThreshold)
: d_errorCb(errorCb)
, d_clientProperties(clientProperties)
: d_clientProperties(clientProperties)
, d_metricPublisher(metricPublisher)
, d_resolver(resolver)
, d_timerFactory(timerFactory)
Expand All @@ -1001,11 +999,12 @@ Connection::Factory::Factory(
bsl::shared_ptr<Connection> Connection::Factory::create(
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback,
const bsl::string& name)
{
bsl::shared_ptr<rmqamqp::Connection> result(new rmqamqp::Connection(
d_resolver,
newRetryHandler(),
newRetryHandler(errorCallback),
newHeartBeatManager(),
d_timerFactory,
newChannelFactory(),
Expand All @@ -1020,18 +1019,19 @@ bsl::shared_ptr<Connection> Connection::Factory::create(
return result;
}

bsl::shared_ptr<rmqio::RetryHandler> Connection::Factory::newRetryHandler()
bsl::shared_ptr<rmqio::RetryHandler>
Connection::Factory::newRetryHandler(const rmqt::ErrorCallback& errorCallback)
{
return d_connectionErrorThreshold
? bsl::shared_ptr<rmqio::RetryHandler>(
bsl::make_shared<rmqio::ConnectionRetryHandler>(
d_timerFactory,
d_errorCb,
errorCallback,
bsl::make_shared<rmqio::BackoffLevelRetryStrategy>(),
*d_connectionErrorThreshold))
: bsl::make_shared<rmqio::RetryHandler>(
d_timerFactory,
d_errorCb,
errorCallback,
bsl::make_shared<rmqio::BackoffLevelRetryStrategy>());
}

Expand Down
6 changes: 3 additions & 3 deletions src/rmq/rmqamqp/rmqamqp_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ class Connection::Factory {

Factory(const bsl::shared_ptr<rmqio::Resolver>& resolver,
const bsl::shared_ptr<rmqio::TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher,
const bsl::shared_ptr<ConnectionMonitor>& connectionMonitor,
const rmqt::FieldTable& clientProperties,
Expand All @@ -321,18 +320,19 @@ class Connection::Factory {
virtual bsl::shared_ptr<Connection>
create(const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback,
const bsl::string& name = "");

protected:
virtual bsl::shared_ptr<rmqio::RetryHandler> newRetryHandler();
virtual bsl::shared_ptr<rmqio::RetryHandler>
newRetryHandler(const rmqt::ErrorCallback& errorCallback);
virtual bsl::shared_ptr<rmqamqp::HeartbeatManager> newHeartBeatManager();
virtual bsl::shared_ptr<rmqamqp::ChannelFactory> newChannelFactory();

private:
Factory(const Factory&) BSLS_KEYWORD_DELETED;
Factory& operator=(const Factory&) BSLS_KEYWORD_DELETED;

const rmqt::ErrorCallback d_errorCb;
const rmqt::FieldTable d_clientProperties;
const bsl::shared_ptr<rmqp::MetricPublisher> d_metricPublisher;
const bsl::shared_ptr<rmqio::Resolver> d_resolver;
Expand Down
6 changes: 6 additions & 0 deletions src/rmq/rmqp/rmqp_rabbitcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class RabbitContext {
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials) = 0;

virtual bsl::shared_ptr<Connection>
createVHostConnection(const bsl::string& userDefinedName,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback) = 0;

/// \brief Connect to a RabbitMQ broker
///
/// \param vhostInfo identifies the broker to connect to & authentication
Expand Down
7 changes: 7 additions & 0 deletions src/rmqtestmocks/rmqtestmocks_mockrabbitcontext.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ class MockRabbitContext : public rmqp::RabbitContext {
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials));

MOCK_METHOD4(createVHostConnection,
bsl::shared_ptr<rmqp::Connection>(
const bsl::string& userDefinedName,
const bsl::shared_ptr<rmqt::Endpoint>& endpoint,
const bsl::shared_ptr<rmqt::Credentials>& credentials,
const rmqt::ErrorCallback& errorCallback));

MOCK_METHOD2(
createVHostConnection,
bsl::shared_ptr<rmqp::Connection>(const bsl::string& userDefinedName,
Expand Down
10 changes: 3 additions & 7 deletions src/tests/rmqamqp/rmqamqp_connection.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,15 +322,13 @@ class ConnectionFactory : public rmqamqp::Connection::Factory {
ConnectionFactory(
const bsl::shared_ptr<rmqio::Resolver>& resolver,
const bsl::shared_ptr<rmqio::TimerFactory>& timerFactory,
const rmqt::ErrorCallback& errorCb,
const bsl::shared_ptr<rmqp::MetricPublisher>& metricPublisher,
const rmqt::FieldTable& clientProperties,
const bsl::shared_ptr<rmqio::RetryHandler>& retryHandler,
const bsl::shared_ptr<rmqamqp::HeartbeatManager>& hbManager,
const bsl::shared_ptr<rmqamqp::ChannelFactory>& channelFactory)
: rmqamqp::Connection::Factory(resolver,
timerFactory,
errorCb,
metricPublisher,
bsl::make_shared<MockConnectionMonitor>(),
clientProperties,
Expand All @@ -340,7 +338,8 @@ class ConnectionFactory : public rmqamqp::Connection::Factory {
, d_channelFactory(channelFactory)
{
}
bsl::shared_ptr<rmqio::RetryHandler> newRetryHandler() BSLS_KEYWORD_OVERRIDE
bsl::shared_ptr<rmqio::RetryHandler> newRetryHandler(
const rmqt::ErrorCallback& errorCallback) BSLS_KEYWORD_OVERRIDE
{
return d_retryHandler;
}
Expand Down Expand Up @@ -406,7 +405,6 @@ class ConnectionTests : public ::testing::Test {
, d_clientProperties(generateDefaultClientProperties())
, d_factory(bsl::make_shared<ConnectionFactory>(d_resolver,
d_timerFactory,
d_errorCallback,
d_metricPublisher,
d_clientProperties,
d_retryHandler,
Expand Down Expand Up @@ -636,7 +634,7 @@ class ConnectionTests : public ::testing::Test {
createAndStartConnection(const bsl::string& name = "test-connection")
{
bsl::shared_ptr<rmqamqp::Connection> conn =
d_factory->create(d_endpoint, d_credentials, name);
d_factory->create(d_endpoint, d_credentials, d_errorCallback, name);
conn->startFirstConnection(d_onConnectCb);

return conn;
Expand Down Expand Up @@ -683,7 +681,6 @@ TEST_F(ConnectionTests, ClientProperties)
rmqt::FieldValue(bsl::string("BAR")); // Add one more
d_factory = bsl::make_shared<ConnectionFactory>(d_resolver,
d_timerFactory,
d_errorCallback,
d_metricPublisher,
overriddenClientProperties,
d_retryHandler,
Expand Down Expand Up @@ -721,7 +718,6 @@ TEST_F(ConnectionTests, ClientPropertiesCantOverrideReservedOnes)
bsl::string("Should get overriden by library")); // Add one more
d_factory = bsl::make_shared<ConnectionFactory>(d_resolver,
d_timerFactory,
d_errorCallback,
d_metricPublisher,
overriddenClientProperties,
d_retryHandler,
Expand Down
Loading