diff --git a/.github/workflows/ci-4.x.yml b/.github/workflows/ci-4.x.yml index 4fb719c2a2..ba1e500435 100644 --- a/.github/workflows/ci-4.x.yml +++ b/.github/workflows/ci-4.x.yml @@ -16,7 +16,9 @@ jobs: strategy: matrix: os: [ubuntu-latest] - profile: [PostgreSQL-9,PostgreSQL-10,PostgreSQL-11,MySQL-8.0,MySQL-5.6,MySQL-5.7,MariaDB-10.4,MSSQL-2017-latest,MSSQL-2019-latest,DB2-11.5,Oracle-23,SQL-templates] + # profile: [PostgreSQL-9,PostgreSQL-10,PostgreSQL-11,MySQL-8.0,MySQL-5.6,MySQL-5.7,MariaDB-10.4,MSSQL-2017-latest,MSSQL-2019-latest,DB2-11.5,Oracle-23,SQL-templates] + # Removed MSSQL that fails in CI docker + profile: [PostgreSQL-9,PostgreSQL-10,PostgreSQL-11,MySQL-8.0,MySQL-5.6,MySQL-5.7,MariaDB-10.4,DB2-11.5,Oracle-23,SQL-templates] jdk: [8, 17] exclude: - profile: Oracle-23 diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java index 00c4675431..2cbf621668 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/impl/DB2ConnectionFactory.java @@ -24,7 +24,6 @@ import io.vertx.core.net.SocketAddress; import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.core.spi.metrics.ClientMetrics; -import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.db2client.DB2ConnectOptions; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; @@ -55,8 +54,7 @@ protected Future doConnectInternal(SqlConnectOptions options, Contex int pipeliningLimit = db2Options.getPipeliningLimit(); NetClient netClient = netClient(options); return netClient.connect(server).flatMap(so -> { - VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(db2Options.getSocketAddress(), "sql", db2Options.getMetricsName()) : null; + ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null; DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, metrics, db2Options, cachePreparedStatements, preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context); conn.init(); diff --git a/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java b/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java index 0c6178f2f4..c8d68fc9ee 100644 --- a/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java +++ b/vertx-db2-client/src/main/java/io/vertx/db2client/spi/DB2Driver.java @@ -55,8 +55,8 @@ public Pool newPool(Vertx vertx, Supplier> databases, PoolOptions options, CloseFuture closeFuture) { boolean pipelinedPool = options instanceof Db2PoolOptions && ((Db2PoolOptions) options).isPipelined(); - PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, null, null, closeFuture); ConnectionFactory factory = createConnectionFactory(vertx, databases); + PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, factory.metricsProvider(), null, null, closeFuture); pool.connectionProvider(factory::connect); pool.init(); closeFuture.add(factory); diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java index 1c73641d20..7b66f97a89 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/impl/MSSQLConnectionFactory.java @@ -19,7 +19,6 @@ import io.vertx.core.net.*; import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.core.spi.metrics.ClientMetrics; -import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.mssqlclient.MSSQLConnectOptions; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; @@ -73,8 +72,7 @@ private Future connectOrRedirect(MSSQLConnectOptions options, Contex } private MSSQLSocketConnection createSocketConnection(NetSocket so, MSSQLConnectOptions options, ContextInternal context) { - VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; + ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null; MSSQLSocketConnection conn = new MSSQLSocketConnection((NetSocketInternal) so, metrics, options, false, 0, sql -> true, 1, context); conn.init(); return conn; diff --git a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/spi/MSSQLDriver.java b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/spi/MSSQLDriver.java index 1b367ac82e..f437fcf01f 100644 --- a/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/spi/MSSQLDriver.java +++ b/vertx-mssql-client/src/main/java/io/vertx/mssqlclient/spi/MSSQLDriver.java @@ -57,8 +57,8 @@ public Pool newPool(Vertx vertx, Supplier> databases, PoolOptions options, CloseFuture closeFuture) { - PoolImpl pool = new PoolImpl(vertx, this, false, options, null, null, closeFuture); ConnectionFactory factory = createConnectionFactory(vertx, databases); + PoolImpl pool = new PoolImpl(vertx, this, false, options, factory.metricsProvider(), null, null, closeFuture); pool.connectionProvider(context -> factory.connect(context, databases.get())); pool.init(); closeFuture.add(factory); diff --git a/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java b/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java index c8be77ff40..4d2bd3c8e2 100644 --- a/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java +++ b/vertx-mssql-client/src/test/java/io/vertx/mssqlclient/tck/MSSQLMetricsTest.java @@ -11,9 +11,9 @@ package io.vertx.mssqlclient.tck; -import io.vertx.core.Vertx; import io.vertx.mssqlclient.MSSQLBuilder; import io.vertx.mssqlclient.junit.MSSQLRule; +import io.vertx.sqlclient.ClientBuilder; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.tck.MetricsTestBase; import org.junit.ClassRule; @@ -24,8 +24,8 @@ public class MSSQLMetricsTest extends MetricsTestBase { public static MSSQLRule rule = MSSQLRule.SHARED_INSTANCE; @Override - protected Pool createPool(Vertx vertx) { - return MSSQLBuilder.pool(builder -> builder.connectingTo(rule.options()).using(vertx)); + protected ClientBuilder poolBuilder() { + return MSSQLBuilder.pool().connectingTo(rule.options()); } @Override diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java index 6cd17abf1f..dc5951f7fa 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/impl/MySQLConnectionFactory.java @@ -23,7 +23,6 @@ import io.vertx.core.net.TrustOptions; import io.vertx.core.net.impl.NetSocketInternal; import io.vertx.core.spi.metrics.ClientMetrics; -import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.mysqlclient.MySQLAuthenticationPlugin; import io.vertx.mysqlclient.MySQLConnectOptions; import io.vertx.mysqlclient.SslMode; @@ -127,8 +126,7 @@ private Future doConnect(MySQLConnectOptions options, SslMode sslMod MySQLAuthenticationPlugin authenticationPlugin = options.getAuthenticationPlugin(); Future fut = netClient(new NetClientOptions(options).setSsl(false)).connect(server); return fut.flatMap(so -> { - VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; + ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null; MySQLSocketConnection conn = new MySQLSocketConnection((NetSocketInternal) so, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, context); conn.init(); return Future.future(promise -> conn.sendStartupMessage(username, password, database, collation, serverRsaPublicKey, properties, sslMode, initialCapabilitiesFlags, charsetEncoding, authenticationPlugin, promise)); diff --git a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/spi/MySQLDriver.java b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/spi/MySQLDriver.java index 82abbf1cbb..6feba16afc 100644 --- a/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/spi/MySQLDriver.java +++ b/vertx-mysql-client/src/main/java/io/vertx/mysqlclient/spi/MySQLDriver.java @@ -55,8 +55,8 @@ public Pool newPool(Vertx vertx, Supplier> databases, PoolOptions options, CloseFuture closeFuture) { boolean pipelinedPool = options instanceof MySQLPoolOptions && ((MySQLPoolOptions) options).isPipelined(); - PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, null, null, closeFuture); ConnectionFactory factory = createConnectionFactory(vertx, databases); + PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, factory.metricsProvider(), null, null, closeFuture); pool.connectionProvider(context -> factory.connect(context, databases.get())); pool.init(); closeFuture.add(factory); diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java index 924f7412c9..547038ad31 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/impl/OracleConnectionFactory.java @@ -21,6 +21,10 @@ import io.vertx.oracleclient.OracleConnectOptions; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.impl.SingletonSupplier; +import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider; +import io.vertx.sqlclient.impl.metrics.DynamicClientMetricsProvider; +import io.vertx.sqlclient.impl.metrics.SingleServerClientMetricsProvider; import io.vertx.sqlclient.spi.ConnectionFactory; import oracle.jdbc.OracleConnection; import oracle.jdbc.datasource.OracleDataSource; @@ -36,15 +40,39 @@ public class OracleConnectionFactory implements ConnectionFactory { private final Supplier> options; private final Map datasources; + private final ClientMetricsProvider clientMetricsProvider; public OracleConnectionFactory(VertxInternal vertx, Supplier> options) { + VertxMetrics metrics = vertx.metricsSPI(); + ClientMetricsProvider clientMetricsProvider; + if (metrics != null) { + if (options instanceof SingletonSupplier) { + SqlConnectOptions option = (SqlConnectOptions) ((SingletonSupplier) options).unwrap(); + ClientMetrics clientMetrics = metrics.createClientMetrics(option.getSocketAddress(), "sql", option.getMetricsName()); + clientMetricsProvider = new SingleServerClientMetricsProvider(clientMetrics); + } else { + clientMetricsProvider = new DynamicClientMetricsProvider(metrics); + } + } else { + clientMetricsProvider = null; + } + this.clientMetricsProvider = clientMetricsProvider; this.options = options; this.datasources = new HashMap<>(); } + @Override + public ClientMetricsProvider metricsProvider() { + return clientMetricsProvider; + } + @Override public void close(Promise promise) { - promise.complete(); + if (clientMetricsProvider != null) { + clientMetricsProvider.close(promise); + } else { + promise.complete(); + } } @Override @@ -68,8 +96,7 @@ private OracleDataSource getDatasource(SqlConnectOptions options) { @Override public Future connect(Context context, SqlConnectOptions options) { OracleDataSource datasource = getDatasource(options); - VertxMetrics vertxMetrics = ((VertxInternal)context.owner()).metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; + ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null; ContextInternal ctx = (ContextInternal) context; return executeBlocking(context, () -> { OracleConnection orac = datasource.createConnectionBuilder().build(); diff --git a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/spi/OracleDriver.java b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/spi/OracleDriver.java index 3992428672..7ff3fff590 100644 --- a/vertx-oracle-client/src/main/java/io/vertx/oracleclient/spi/OracleDriver.java +++ b/vertx-oracle-client/src/main/java/io/vertx/oracleclient/spi/OracleDriver.java @@ -52,8 +52,8 @@ public Pool newPool(Vertx vertx, Supplier> databases, PoolOptions options, CloseFuture closeFuture) { Function> afterAcquire = conn -> ((OracleJdbcConnection) conn).afterAcquire(); Function> beforeRecycle = conn -> ((OracleJdbcConnection) conn).beforeRecycle(); - PoolImpl pool = new PoolImpl(vertx, this, false, options, afterAcquire, beforeRecycle, closeFuture); ConnectionFactory factory = createConnectionFactory(vertx, databases); + PoolImpl pool = new PoolImpl(vertx, this, false, options, factory.metricsProvider(), afterAcquire, beforeRecycle, closeFuture); pool.connectionProvider(context -> factory.connect(context, databases.get())); pool.init(); closeFuture.add(factory); diff --git a/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java index 702181755a..389385203c 100644 --- a/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java +++ b/vertx-oracle-client/src/test/java/io/vertx/oracleclient/test/tck/OracleMetricsTest.java @@ -11,10 +11,10 @@ package io.vertx.oracleclient.test.tck; -import io.vertx.core.Vertx; import io.vertx.ext.unit.TestContext; import io.vertx.oracleclient.OracleBuilder; import io.vertx.oracleclient.test.junit.OracleRule; +import io.vertx.sqlclient.ClientBuilder; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.tck.MetricsTestBase; import org.junit.ClassRule; @@ -27,10 +27,8 @@ public class OracleMetricsTest extends MetricsTestBase { public static OracleRule rule = OracleRule.SHARED_INSTANCE; @Override - protected Pool createPool(Vertx vertx) { - return OracleBuilder.pool(builder -> builder - .connectingTo(rule.options()) - .using(vertx)); + protected ClientBuilder poolBuilder() { + return OracleBuilder.pool().connectingTo(rule.options()); } @Override diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java index f98c09f4cb..a00c9eaa07 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/impl/PgConnectionFactory.java @@ -179,9 +179,7 @@ private PgSocketConnection newSocketConnection(ContextInternal context, NetSocke Predicate preparedStatementCacheSqlFilter = options.getPreparedStatementCacheSqlFilter(); int pipeliningLimit = options.getPipeliningLimit(); boolean useLayer7Proxy = options.getUseLayer7Proxy(); - VertxMetrics vertxMetrics = vertx.metricsSPI(); - ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null; - PgSocketConnection conn = new PgSocketConnection(socket, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, useLayer7Proxy, context); - return conn; + ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null; + return new PgSocketConnection(socket, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, useLayer7Proxy, context); } } diff --git a/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java b/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java index 1e3b2de9bd..92d76c9b34 100644 --- a/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java +++ b/vertx-pg-client/src/main/java/io/vertx/pgclient/spi/PgDriver.java @@ -42,8 +42,8 @@ public Pool newPool(Vertx vertx, Supplier> databases, PoolOptions options, CloseFuture closeFuture) { boolean pipelinedPool = options instanceof PgPoolOptions && ((PgPoolOptions) options).isPipelined(); - PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, null, null, closeFuture); ConnectionFactory factory = createConnectionFactory(vertx, databases); + PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, factory.metricsProvider(), null, null, closeFuture); pool.connectionProvider(context -> factory.connect(context)); // BEWARE!!!! pool.init(); closeFuture.add(factory); diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java index 644025b13a..4d009cdc5d 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgMetricsTest.java @@ -11,8 +11,8 @@ package io.vertx.pgclient; -import io.vertx.core.Vertx; import io.vertx.pgclient.junit.ContainerPgRule; +import io.vertx.sqlclient.ClientBuilder; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.tck.MetricsTestBase; import org.junit.ClassRule; @@ -23,8 +23,8 @@ public class PgMetricsTest extends MetricsTestBase { public static ContainerPgRule rule = new ContainerPgRule(); @Override - protected Pool createPool(Vertx vertx) { - return PgBuilder.pool().connectingTo(rule.options()).using(vertx).build(); + protected ClientBuilder poolBuilder() { + return PgBuilder.pool().connectingTo(rule.options()); } @Override diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/ConnectionFactoryBase.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/ConnectionFactoryBase.java index 0f1e1df77d..095855ee30 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/ConnectionFactoryBase.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/ConnectionFactoryBase.java @@ -21,8 +21,13 @@ import io.vertx.core.net.NetClient; import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.impl.NetClientBuilder; +import io.vertx.core.spi.metrics.ClientMetrics; +import io.vertx.core.spi.metrics.VertxMetrics; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider; +import io.vertx.sqlclient.impl.metrics.DynamicClientMetricsProvider; +import io.vertx.sqlclient.impl.metrics.SingleServerClientMetricsProvider; import io.vertx.sqlclient.spi.ConnectionFactory; import java.util.HashMap; @@ -39,14 +44,30 @@ public abstract class ConnectionFactoryBase implements ConnectionFactory { protected final VertxInternal vertx; private final Map clients; protected final Supplier> options; + protected final ClientMetricsProvider clientMetricsProvider; // close hook protected final CloseFuture clientCloseFuture = new CloseFuture(); protected ConnectionFactoryBase(VertxInternal vertx, Supplier> options) { + VertxMetrics metrics = vertx.metricsSPI(); + ClientMetricsProvider clientMetricsProvider; + if (metrics != null) { + if (options instanceof SingletonSupplier) { + SqlConnectOptions option = (SqlConnectOptions) ((SingletonSupplier) options).unwrap(); + ClientMetrics clientMetrics = metrics.createClientMetrics(option.getSocketAddress(), "sql", option.getMetricsName()); + clientMetricsProvider = new SingleServerClientMetricsProvider(clientMetrics); + } else { + clientMetricsProvider = new DynamicClientMetricsProvider(metrics); + } + clientCloseFuture.add(clientMetricsProvider); + } else { + clientMetricsProvider = null; + } this.vertx = vertx; this.options = options; this.clients = new HashMap<>(); + this.clientMetricsProvider = clientMetricsProvider; } private NetClient createNetClient(NetClientOptions options) { @@ -121,4 +142,8 @@ private void doConnectWithRetry(SqlConnectOptions options, PromiseInternal doConnectInternal(SqlConnectOptions options, ContextInternal context); + @Override + public ClientMetricsProvider metricsProvider() { + return clientMetricsProvider; + } } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index 7249132979..e957b03d7c 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -25,6 +25,7 @@ import io.vertx.core.impl.future.PromiseInternal; import io.vertx.sqlclient.*; import io.vertx.sqlclient.impl.command.CommandBase; +import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider; import io.vertx.sqlclient.impl.pool.SqlConnectionPool; import io.vertx.sqlclient.spi.Driver; @@ -56,6 +57,7 @@ public PoolImpl(VertxInternal vertx, Driver driver, boolean pipelined, PoolOptions poolOptions, + ClientMetricsProvider clientMetricsProvider, Function> afterAcquire, Function> beforeRecycle, CloseFuture closeFuture) { @@ -69,8 +71,8 @@ public PoolImpl(VertxInternal vertx, this.pipelined = pipelined; this.vertx = vertx; this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, - afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, - poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); + clientMetricsProvider, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), + pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); this.closeFuture = closeFuture; } diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/ClientMetricsProvider.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/ClientMetricsProvider.java new file mode 100644 index 0000000000..1e38110d9b --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/ClientMetricsProvider.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.sqlclient.impl.metrics; + +import io.vertx.core.Closeable; +import io.vertx.core.Promise; +import io.vertx.core.spi.metrics.ClientMetrics; +import io.vertx.sqlclient.SqlConnectOptions; + +/** + * Provides client metrics for a connection + */ +public interface ClientMetricsProvider extends Closeable { + + ClientMetrics metricsFor(SqlConnectOptions options); + + @Override + default void close(Promise completion) { + try { + close(); + } catch (Exception e) { + completion.fail(e); + return; + } + completion.complete(); + } + + void close(); + +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/ClientMetricsWrapper.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/ClientMetricsWrapper.java new file mode 100644 index 0000000000..a7e313f306 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/ClientMetricsWrapper.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.sqlclient.impl.metrics; + +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; +import io.vertx.core.spi.metrics.ClientMetrics; + +/** + * Wraps an {@link #actual} wrapper, metrics operations are delegated, error are logged using the {@link ClientMetricsProvider} name. + */ +public class ClientMetricsWrapper implements ClientMetrics { + + private static final Logger logger = LoggerFactory.getLogger(ClientMetricsProvider.class); + + protected final ClientMetrics actual; + + public ClientMetricsWrapper(ClientMetrics actual) { + this.actual = actual; + } + + @Override + public T enqueueRequest() { + try { + return actual.enqueueRequest(); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + return null; + } + @Override + public void dequeueRequest(T taskMetric) { + try { + actual.dequeueRequest(taskMetric); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + } + @Override + public M requestBegin(String uri, Req request) { + try { + return actual.requestBegin(uri, request); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + return null; + } + @Override + public void requestEnd(M requestMetric) { + try { + actual.requestEnd(requestMetric); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + } + @Override + public void requestEnd(M requestMetric, long bytesWritten) { + try { + actual.requestEnd(requestMetric, bytesWritten); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + } + @Override + public void responseBegin(M requestMetric, Resp response) { + try { + actual.responseBegin(requestMetric, response); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + } + @Override + public void requestReset(M requestMetric) { + try { + actual.requestReset(requestMetric); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + } + @Override + public void responseEnd(M requestMetric) { + try { + actual.responseEnd(requestMetric); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + } + @Override + public void responseEnd(M requestMetric, long bytesRead) { + try { + actual.responseEnd(requestMetric, bytesRead); + } catch (Exception e) { + logger.error("Metrics failure", e); + } + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/DynamicClientMetricsProvider.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/DynamicClientMetricsProvider.java new file mode 100644 index 0000000000..51444f99c6 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/DynamicClientMetricsProvider.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.sqlclient.impl.metrics; + +import io.vertx.core.net.SocketAddress; +import io.vertx.core.spi.metrics.ClientMetrics; +import io.vertx.core.spi.metrics.VertxMetrics; +import io.vertx.sqlclient.SqlConnectOptions; + +import java.util.*; + +/** + * A metrics provider that is not coupled to a single database. + */ +public final class DynamicClientMetricsProvider implements ClientMetricsProvider { + + private final Map> metricsMap = new HashMap<>(); + private final VertxMetrics metrics; + private boolean closed; + + public DynamicClientMetricsProvider(VertxMetrics metrics) { + this.metrics = metrics; + } + + public ClientMetrics metricsFor(SqlConnectOptions options) { + SocketAddress address = options.getSocketAddress(); + synchronized (this) { + if (closed) { + return null; + } + Wrapper wrapper = metricsMap.get(address); + if (wrapper == null) { + ClientMetrics actual = metrics.createClientMetrics(address, "sql", options.getMetricsName()); + wrapper = new Wrapper<>(address, this, actual); + metricsMap.put(address, wrapper); + } + return wrapper; + } + } + + @Override + public void close() { + List> toClose = new ArrayList<>(); + synchronized (this) { + if (closed) { + return; + } + closed = true; + for (Wrapper wrapper : metricsMap.values()) { + ClientMetrics actual = wrapper.doClose(); + if (actual != null) { + toClose.add(actual); + } + } + metricsMap.clear(); + } + for (ClientMetrics metrics : toClose) { + metrics.close(); + } + } + + private static class Wrapper extends ClientMetricsWrapper { + + private final SocketAddress socketAddress; + private final DynamicClientMetricsProvider provider; + private int refCount; + + public Wrapper(SocketAddress socketAddress, DynamicClientMetricsProvider provider, ClientMetrics actualMetrics) { + super(actualMetrics); + this.socketAddress = socketAddress; + this.provider = provider; + this.refCount = 1; + } + + // Call under sync + ClientMetrics doClose() { + if (refCount > 0) { + refCount = 0; + return actual; + } else { + return null; + } + } + + @Override + public void close() { + synchronized (provider) { + if (--refCount > 0) { + return; + } + provider.metricsMap.remove(socketAddress); + } + actual.close(); + } + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/SingleServerClientMetricsProvider.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/SingleServerClientMetricsProvider.java new file mode 100644 index 0000000000..4478a0b287 --- /dev/null +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/metrics/SingleServerClientMetricsProvider.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2011-2022 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.sqlclient.impl.metrics; + +import io.vertx.core.spi.metrics.ClientMetrics; +import io.vertx.sqlclient.SqlConnectOptions; + +/** + * A metrics provider for a single database for which we can reasonably record queue metrics. + */ +public final class SingleServerClientMetricsProvider implements ClientMetricsProvider { + + private final UncloseableClientMetrics metrics; + + public SingleServerClientMetricsProvider(ClientMetrics metrics) { + this.metrics = new UncloseableClientMetrics<>(metrics); + } + + public ClientMetrics metrics() { + return metrics; + } + + @Override + public ClientMetrics metricsFor(SqlConnectOptions options) { + return metrics; + } + + @Override + public void close() { + metrics.actual.close(); + } + + static class UncloseableClientMetrics extends ClientMetricsWrapper { + public UncloseableClientMetrics(ClientMetrics actual) { + super(actual); + } + } +} diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index 98bbbde714..27095df05d 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -26,6 +26,8 @@ import io.vertx.sqlclient.impl.SqlConnectionBase; import io.vertx.sqlclient.impl.command.CommandBase; import io.vertx.sqlclient.impl.command.QueryCommandBase; +import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider; +import io.vertx.sqlclient.impl.metrics.SingleServerClientMetricsProvider; import io.vertx.sqlclient.impl.tracing.QueryReporter; import io.vertx.sqlclient.spi.ConnectionFactory; import io.vertx.sqlclient.spi.DatabaseMetadata; @@ -48,6 +50,7 @@ public class SqlConnectionPool { private final VertxInternal vertx; private final ConnectionPool pool; private final Supplier> hook; + private final ClientMetrics clientMetrics; private final Function> afterAcquire; private final Function> beforeRecycle; private final boolean pipelined; @@ -57,6 +60,7 @@ public class SqlConnectionPool { public SqlConnectionPool(Function> connectionProvider, Supplier> hook, + ClientMetricsProvider clientMetricsProvider, Function> afterAcquire, Function> beforeRecycle, VertxInternal vertx, @@ -79,6 +83,7 @@ public SqlConnectionPool(Function> connectionProv this.maxLifetime = maxLifetime; this.maxSize = maxSize; this.hook = hook; + this.clientMetrics = clientMetricsProvider instanceof SingleServerClientMetricsProvider ? ((SingleServerClientMetricsProvider)clientMetricsProvider).metrics() : null; this.connectionProvider = connectionProvider; this.afterAcquire = afterAcquire; this.beforeRecycle = beforeRecycle; @@ -160,9 +165,11 @@ public void evict() { public Future execute(ContextInternal context, CommandBase cmd) { Promise> p = context.promise(); + Object queueMetric = enqueueMetric(); pool.acquire(context, 0, p); return p.future().compose(lease -> { PooledConnection pooled = lease.get(); + dequeueMetric(queueMetric); Connection conn = pooled.conn; Future future; if (afterAcquire != null) { @@ -181,8 +188,14 @@ public Future execute(ContextInternal context, CommandBase cmd) { public void acquire(ContextInternal context, long timeout, Handler> handler) { class PoolRequest implements PoolWaiter.Listener, Handler>> { + + private final Object queueMetric; private long timerID = -1L; + public PoolRequest(Object queueMetric) { + this.queueMetric = queueMetric; + } + @Override public void handle(AsyncResult> ar) { if (timerID != -1L) { @@ -211,6 +224,7 @@ private void handle(Lease lease) { PooledConnection pooled = lease.get(); pooled.lease = lease; handler.handle(Future.succeededFuture(pooled)); + dequeueMetric(queueMetric); } @Override @@ -235,10 +249,32 @@ public void onConnect(PoolWaiter waiter) { onEnqueue(waiter); } } - PoolRequest request = new PoolRequest(); + Object queueMetric = enqueueMetric(); + PoolRequest request = new PoolRequest(queueMetric); pool.acquire(context, request, 0, request); } + private Object enqueueMetric() { + if (clientMetrics != null) { + try { + return clientMetrics.enqueueRequest(); + } catch (Exception e) { + // Log + } + } + return null; + } + + private void dequeueMetric(Object metric) { + if (clientMetrics != null) { + try { + clientMetrics.dequeueRequest(metric); + } catch (Exception e) { + // Log + } + } + } + public Future close() { Promise promise = vertx.promise(); pool.close(ar1 -> { diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java index d0b52d0d10..e7816963c3 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/spi/ConnectionFactory.java @@ -4,6 +4,7 @@ import io.vertx.core.impl.ContextInternal; import io.vertx.sqlclient.SqlConnectOptions; import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider; /** * A connection factory, can be obtained from {@link Driver#createConnectionFactory} @@ -42,4 +43,6 @@ default Future connect(Context context, Future connect(Context context, SqlConnectOptions options); + ClientMetricsProvider metricsProvider(); + } diff --git a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java index da13d67236..4743fc816d 100644 --- a/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java +++ b/vertx-sql-client/src/test/java/io/vertx/sqlclient/tck/MetricsTestBase.java @@ -21,15 +21,15 @@ import io.vertx.ext.unit.Async; import io.vertx.ext.unit.TestContext; import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.sqlclient.Pool; -import io.vertx.sqlclient.SqlConnection; -import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -66,7 +66,15 @@ protected Pool getPool() { return pool; } - protected abstract Pool createPool(Vertx vertx); + protected abstract ClientBuilder poolBuilder(); + + protected Pool createPool(Vertx vertx) { + return createPool(vertx, new PoolOptions()); + } + + protected Pool createPool(Vertx vertx, PoolOptions options) { + return poolBuilder().with(options).using(vertx).build(); + } protected abstract String statement(String... parts); @@ -90,6 +98,43 @@ public void close() { } } + @Test + public void testQueuing(TestContext ctx) throws Exception { + AtomicInteger queueSize = new AtomicInteger(); + List enqueueMetrics = Collections.synchronizedList(new ArrayList<>()); + List dequeueMetrics = Collections.synchronizedList(new ArrayList<>()); + metrics = new ClientMetrics() { + @Override + public Object enqueueRequest() { + Object metric = new Object(); + enqueueMetrics.add(metric); + queueSize.incrementAndGet(); + return metric; + } + @Override + public void dequeueRequest(Object taskMetric) { + dequeueMetrics.add(taskMetric); + queueSize.decrementAndGet(); + } + }; + Pool pool = createPool(vertx, new PoolOptions().setMaxSize(1)); + SqlConnection conn = pool.getConnection().toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS); + int num = 16; + List> futures = new ArrayList<>(); + for (int i = 0;i < num;i++) { + futures.add(pool.query("SELECT * FROM immutable WHERE id=1").execute()); + } + long now = System.currentTimeMillis(); + while (queueSize.get() != num) { + ctx.assertTrue(System.currentTimeMillis() - now < 20_000); + Thread.sleep(100); + } + conn.close(); + Future.join(futures).toCompletionStage().toCompletableFuture().get(20, TimeUnit.SECONDS); + ctx.assertEquals(0, queueSize.get()); + ctx.assertEquals(enqueueMetrics, dequeueMetrics); + } + @Test public void testSimpleQuery(TestContext ctx) { Function> fn = conn -> conn.query("SELECT * FROM immutable WHERE id=1").execute();