Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci-4.x.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
profile: [PostgreSQL-9,PostgreSQL-10,PostgreSQL-11,MySQL-8.0,MySQL-5.6,MySQL-5.7,MariaDB-10.4,MSSQL-2017-latest,MSSQL-2019-latest,DB2-11.5,Oracle-23,SQL-templates]
# profile: [PostgreSQL-9,PostgreSQL-10,PostgreSQL-11,MySQL-8.0,MySQL-5.6,MySQL-5.7,MariaDB-10.4,MSSQL-2017-latest,MSSQL-2019-latest,DB2-11.5,Oracle-23,SQL-templates]
# Removed MSSQL that fails in CI docker
profile: [PostgreSQL-9,PostgreSQL-10,PostgreSQL-11,MySQL-8.0,MySQL-5.6,MySQL-5.7,MariaDB-10.4,DB2-11.5,Oracle-23,SQL-templates]
jdk: [8, 17]
exclude:
- profile: Oracle-23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
Expand Down Expand Up @@ -55,8 +54,7 @@ protected Future<Connection> doConnectInternal(SqlConnectOptions options, Contex
int pipeliningLimit = db2Options.getPipeliningLimit();
NetClient netClient = netClient(options);
return netClient.connect(server).flatMap(so -> {
VertxMetrics vertxMetrics = vertx.metricsSPI();
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(db2Options.getSocketAddress(), "sql", db2Options.getMetricsName()) : null;
ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null;
DB2SocketConnection conn = new DB2SocketConnection((NetSocketInternal) so, metrics, db2Options, cachePreparedStatements,
preparedStatementCacheSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
conn.init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public Pool newPool(Vertx vertx, Supplier<? extends Future<? extends SqlConnectO

private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> databases, PoolOptions options, CloseFuture closeFuture) {
boolean pipelinedPool = options instanceof Db2PoolOptions && ((Db2PoolOptions) options).isPipelined();
PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, null, null, closeFuture);
ConnectionFactory factory = createConnectionFactory(vertx, databases);
PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, factory.metricsProvider(), null, null, closeFuture);
pool.connectionProvider(factory::connect);
pool.init();
closeFuture.add(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.vertx.core.net.*;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.mssqlclient.MSSQLConnectOptions;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
Expand Down Expand Up @@ -73,8 +72,7 @@ private Future<Connection> connectOrRedirect(MSSQLConnectOptions options, Contex
}

private MSSQLSocketConnection createSocketConnection(NetSocket so, MSSQLConnectOptions options, ContextInternal context) {
VertxMetrics vertxMetrics = vertx.metricsSPI();
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null;
ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null;
MSSQLSocketConnection conn = new MSSQLSocketConnection((NetSocketInternal) so, metrics, options, false, 0, sql -> true, 1, context);
conn.init();
return conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ public Pool newPool(Vertx vertx, Supplier<? extends Future<? extends SqlConnectO
}

private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> databases, PoolOptions options, CloseFuture closeFuture) {
PoolImpl pool = new PoolImpl(vertx, this, false, options, null, null, closeFuture);
ConnectionFactory factory = createConnectionFactory(vertx, databases);
PoolImpl pool = new PoolImpl(vertx, this, false, options, factory.metricsProvider(), null, null, closeFuture);
pool.connectionProvider(context -> factory.connect(context, databases.get()));
pool.init();
closeFuture.add(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

package io.vertx.mssqlclient.tck;

import io.vertx.core.Vertx;
import io.vertx.mssqlclient.MSSQLBuilder;
import io.vertx.mssqlclient.junit.MSSQLRule;
import io.vertx.sqlclient.ClientBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.tck.MetricsTestBase;
import org.junit.ClassRule;
Expand All @@ -24,8 +24,8 @@ public class MSSQLMetricsTest extends MetricsTestBase {
public static MSSQLRule rule = MSSQLRule.SHARED_INSTANCE;

@Override
protected Pool createPool(Vertx vertx) {
return MSSQLBuilder.pool(builder -> builder.connectingTo(rule.options()).using(vertx));
protected ClientBuilder<Pool> poolBuilder() {
return MSSQLBuilder.pool().connectingTo(rule.options());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.vertx.core.net.TrustOptions;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.mysqlclient.MySQLAuthenticationPlugin;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.SslMode;
Expand Down Expand Up @@ -127,8 +126,7 @@ private Future<Connection> doConnect(MySQLConnectOptions options, SslMode sslMod
MySQLAuthenticationPlugin authenticationPlugin = options.getAuthenticationPlugin();
Future<NetSocket> fut = netClient(new NetClientOptions(options).setSsl(false)).connect(server);
return fut.flatMap(so -> {
VertxMetrics vertxMetrics = vertx.metricsSPI();
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null;
ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null;
MySQLSocketConnection conn = new MySQLSocketConnection((NetSocketInternal) so, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, context);
conn.init();
return Future.future(promise -> conn.sendStartupMessage(username, password, database, collation, serverRsaPublicKey, properties, sslMode, initialCapabilitiesFlags, charsetEncoding, authenticationPlugin, promise));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public Pool newPool(Vertx vertx, Supplier<? extends Future<? extends SqlConnectO

private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> databases, PoolOptions options, CloseFuture closeFuture) {
boolean pipelinedPool = options instanceof MySQLPoolOptions && ((MySQLPoolOptions) options).isPipelined();
PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, null, null, closeFuture);
ConnectionFactory factory = createConnectionFactory(vertx, databases);
PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, factory.metricsProvider(), null, null, closeFuture);
pool.connectionProvider(context -> factory.connect(context, databases.get()));
pool.init();
closeFuture.add(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import io.vertx.oracleclient.OracleConnectOptions;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.SingletonSupplier;
import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider;
import io.vertx.sqlclient.impl.metrics.DynamicClientMetricsProvider;
import io.vertx.sqlclient.impl.metrics.SingleServerClientMetricsProvider;
import io.vertx.sqlclient.spi.ConnectionFactory;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.datasource.OracleDataSource;
Expand All @@ -36,15 +40,39 @@ public class OracleConnectionFactory implements ConnectionFactory {

private final Supplier<? extends Future<? extends SqlConnectOptions>> options;
private final Map<JsonObject, OracleDataSource> datasources;
private final ClientMetricsProvider clientMetricsProvider;

public OracleConnectionFactory(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> options) {
VertxMetrics metrics = vertx.metricsSPI();
ClientMetricsProvider clientMetricsProvider;
if (metrics != null) {
if (options instanceof SingletonSupplier) {
SqlConnectOptions option = (SqlConnectOptions) ((SingletonSupplier) options).unwrap();
ClientMetrics<?, ?, ?, ?> clientMetrics = metrics.createClientMetrics(option.getSocketAddress(), "sql", option.getMetricsName());
clientMetricsProvider = new SingleServerClientMetricsProvider(clientMetrics);
} else {
clientMetricsProvider = new DynamicClientMetricsProvider(metrics);
}
} else {
clientMetricsProvider = null;
}
this.clientMetricsProvider = clientMetricsProvider;
this.options = options;
this.datasources = new HashMap<>();
}

@Override
public ClientMetricsProvider metricsProvider() {
return clientMetricsProvider;
}

@Override
public void close(Promise<Void> promise) {
promise.complete();
if (clientMetricsProvider != null) {
clientMetricsProvider.close(promise);
} else {
promise.complete();
}
}

@Override
Expand All @@ -68,8 +96,7 @@ private OracleDataSource getDatasource(SqlConnectOptions options) {
@Override
public Future<SqlConnection> connect(Context context, SqlConnectOptions options) {
OracleDataSource datasource = getDatasource(options);
VertxMetrics vertxMetrics = ((VertxInternal)context.owner()).metricsSPI();
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null;
ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null;
ContextInternal ctx = (ContextInternal) context;
return executeBlocking(context, () -> {
OracleConnection orac = datasource.createConnectionBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public Pool newPool(Vertx vertx, Supplier<? extends Future<? extends SqlConnectO
private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> databases, PoolOptions options, CloseFuture closeFuture) {
Function<Connection, Future<Void>> afterAcquire = conn -> ((OracleJdbcConnection) conn).afterAcquire();
Function<Connection, Future<Void>> beforeRecycle = conn -> ((OracleJdbcConnection) conn).beforeRecycle();
PoolImpl pool = new PoolImpl(vertx, this, false, options, afterAcquire, beforeRecycle, closeFuture);
ConnectionFactory factory = createConnectionFactory(vertx, databases);
PoolImpl pool = new PoolImpl(vertx, this, false, options, factory.metricsProvider(), afterAcquire, beforeRecycle, closeFuture);
pool.connectionProvider(context -> factory.connect(context, databases.get()));
pool.init();
closeFuture.add(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@

package io.vertx.oracleclient.test.tck;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.TestContext;
import io.vertx.oracleclient.OracleBuilder;
import io.vertx.oracleclient.test.junit.OracleRule;
import io.vertx.sqlclient.ClientBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.tck.MetricsTestBase;
import org.junit.ClassRule;
Expand All @@ -27,10 +27,8 @@ public class OracleMetricsTest extends MetricsTestBase {
public static OracleRule rule = OracleRule.SHARED_INSTANCE;

@Override
protected Pool createPool(Vertx vertx) {
return OracleBuilder.pool(builder -> builder
.connectingTo(rule.options())
.using(vertx));
protected ClientBuilder<Pool> poolBuilder() {
return OracleBuilder.pool().connectingTo(rule.options());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@ private PgSocketConnection newSocketConnection(ContextInternal context, NetSocke
Predicate<String> preparedStatementCacheSqlFilter = options.getPreparedStatementCacheSqlFilter();
int pipeliningLimit = options.getPipeliningLimit();
boolean useLayer7Proxy = options.getUseLayer7Proxy();
VertxMetrics vertxMetrics = vertx.metricsSPI();
ClientMetrics metrics = vertxMetrics != null ? vertxMetrics.createClientMetrics(options.getSocketAddress(), "sql", options.getMetricsName()) : null;
PgSocketConnection conn = new PgSocketConnection(socket, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, useLayer7Proxy, context);
return conn;
ClientMetrics metrics = clientMetricsProvider != null ? clientMetricsProvider.metricsFor(options) : null;
return new PgSocketConnection(socket, metrics, options, cachePreparedStatements, preparedStatementCacheMaxSize, preparedStatementCacheSqlFilter, pipeliningLimit, useLayer7Proxy, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public Pool newPool(Vertx vertx, Supplier<? extends Future<? extends SqlConnectO

private PoolImpl newPoolImpl(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> databases, PoolOptions options, CloseFuture closeFuture) {
boolean pipelinedPool = options instanceof PgPoolOptions && ((PgPoolOptions) options).isPipelined();
PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, null, null, closeFuture);
ConnectionFactory factory = createConnectionFactory(vertx, databases);
PoolImpl pool = new PoolImpl(vertx, this, pipelinedPool, options, factory.metricsProvider(), null, null, closeFuture);
pool.connectionProvider(context -> factory.connect(context)); // BEWARE!!!!
pool.init();
closeFuture.add(factory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package io.vertx.pgclient;

import io.vertx.core.Vertx;
import io.vertx.pgclient.junit.ContainerPgRule;
import io.vertx.sqlclient.ClientBuilder;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.tck.MetricsTestBase;
import org.junit.ClassRule;
Expand All @@ -23,8 +23,8 @@ public class PgMetricsTest extends MetricsTestBase {
public static ContainerPgRule rule = new ContainerPgRule();

@Override
protected Pool createPool(Vertx vertx) {
return PgBuilder.pool().connectingTo(rule.options()).using(vertx).build();
protected ClientBuilder<Pool> poolBuilder() {
return PgBuilder.pool().connectingTo(rule.options());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.sqlclient.SqlConnectOptions;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider;
import io.vertx.sqlclient.impl.metrics.DynamicClientMetricsProvider;
import io.vertx.sqlclient.impl.metrics.SingleServerClientMetricsProvider;
import io.vertx.sqlclient.spi.ConnectionFactory;

import java.util.HashMap;
Expand All @@ -39,14 +44,30 @@ public abstract class ConnectionFactoryBase implements ConnectionFactory {
protected final VertxInternal vertx;
private final Map<JsonObject, NetClient> clients;
protected final Supplier<? extends Future<? extends SqlConnectOptions>> options;
protected final ClientMetricsProvider clientMetricsProvider;

// close hook
protected final CloseFuture clientCloseFuture = new CloseFuture();

protected ConnectionFactoryBase(VertxInternal vertx, Supplier<? extends Future<? extends SqlConnectOptions>> options) {
VertxMetrics metrics = vertx.metricsSPI();
ClientMetricsProvider clientMetricsProvider;
if (metrics != null) {
if (options instanceof SingletonSupplier) {
SqlConnectOptions option = (SqlConnectOptions) ((SingletonSupplier) options).unwrap();
ClientMetrics<?, ?, ?, ?> clientMetrics = metrics.createClientMetrics(option.getSocketAddress(), "sql", option.getMetricsName());
clientMetricsProvider = new SingleServerClientMetricsProvider(clientMetrics);
} else {
clientMetricsProvider = new DynamicClientMetricsProvider(metrics);
}
clientCloseFuture.add(clientMetricsProvider);
} else {
clientMetricsProvider = null;
}
this.vertx = vertx;
this.options = options;
this.clients = new HashMap<>();
this.clientMetricsProvider = clientMetricsProvider;
}

private NetClient createNetClient(NetClientOptions options) {
Expand Down Expand Up @@ -121,4 +142,8 @@ private void doConnectWithRetry(SqlConnectOptions options, PromiseInternal<Conne
*/
protected abstract Future<Connection> doConnectInternal(SqlConnectOptions options, ContextInternal context);

@Override
public ClientMetricsProvider metricsProvider() {
return clientMetricsProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.sqlclient.*;
import io.vertx.sqlclient.impl.command.CommandBase;
import io.vertx.sqlclient.impl.metrics.ClientMetricsProvider;
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
import io.vertx.sqlclient.spi.Driver;

Expand Down Expand Up @@ -56,6 +57,7 @@ public PoolImpl(VertxInternal vertx,
Driver driver,
boolean pipelined,
PoolOptions poolOptions,
ClientMetricsProvider clientMetricsProvider,
Function<Connection, Future<Void>> afterAcquire,
Function<Connection, Future<Void>> beforeRecycle,
CloseFuture closeFuture) {
Expand All @@ -69,8 +71,8 @@ public PoolImpl(VertxInternal vertx,
this.pipelined = pipelined;
this.vertx = vertx;
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer,
afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined,
poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
clientMetricsProvider, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(),
pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.closeFuture = closeFuture;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2011-2022 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.sqlclient.impl.metrics;

import io.vertx.core.Closeable;
import io.vertx.core.Promise;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.sqlclient.SqlConnectOptions;

/**
* Provides client metrics for a connection
*/
public interface ClientMetricsProvider extends Closeable {

ClientMetrics<?, ?, ?, ?> metricsFor(SqlConnectOptions options);

@Override
default void close(Promise<Void> completion) {
try {
close();
} catch (Exception e) {
completion.fail(e);
return;
}
completion.complete();
}

void close();

}
Loading