Skip to content

Commit cadd0a8

Browse files
committed
[refactor] Simplify connection creation by removing database parameter
1 parent 906cfc5 commit cadd0a8

File tree

3 files changed

+6
-9
lines changed

3 files changed

+6
-9
lines changed

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseShardInputFormat.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ public void open(InputSplit split) {
109109
// PreparedStatement.
110110
String shardUrl = shardMap.get((Integer) shardIds[i]);
111111
ClickHouseConnection connection =
112-
connectionProvider.createAndStoreShardConnection(
113-
shardUrl, engineFullSchema.getDatabase());
112+
connectionProvider.createAndStoreShardConnection(shardUrl);
114113
String query =
115114
getQuery(engineFullSchema.getTable(), engineFullSchema.getDatabase());
116115
PreparedStatement statement = connection.prepareStatement(query);

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/ClickHouseShardOutputFormat.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ protected ClickHouseShardOutputFormat(
9090
public void open(int taskNumber, int numTasks) throws IOException {
9191
try {
9292
Map<Integer, ClickHouseConnection> connectionMap =
93-
connectionProvider.createShardConnections(
94-
clusterSpec, shardTableSchema.getDatabase());
93+
connectionProvider.createShardConnections(clusterSpec);
9594
for (Map.Entry<Integer, ClickHouseConnection> connectionEntry :
9695
connectionMap.entrySet()) {
9796
ClickHouseExecutor executor =

flink-connector-clickhouse/src/main/java/org/apache/flink/connector/clickhouse/internal/connection/ClickHouseConnectionProvider.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,20 @@ public synchronized ClickHouseConnection getOrCreateConnection() throws SQLExcep
7575
}
7676

7777
public synchronized Map<Integer, ClickHouseConnection> createShardConnections(
78-
ClusterSpec clusterSpec, String defaultDatabase) throws SQLException {
78+
ClusterSpec clusterSpec) throws SQLException {
7979
Map<Integer, ClickHouseConnection> connectionMap = new HashMap<>();
8080
String urlSuffix = options.getUrlSuffix();
8181
for (ShardSpec shardSpec : clusterSpec.getShards()) {
8282
String shardUrl = shardSpec.getJdbcUrls() + urlSuffix;
83-
ClickHouseConnection connection =
84-
createAndStoreShardConnection(shardUrl, defaultDatabase);
83+
ClickHouseConnection connection = createAndStoreShardConnection(shardUrl);
8584
connectionMap.put(shardSpec.getNum(), connection);
8685
}
8786

8887
return connectionMap;
8988
}
9089

91-
public synchronized ClickHouseConnection createAndStoreShardConnection(
92-
String url, String database) throws SQLException {
90+
public synchronized ClickHouseConnection createAndStoreShardConnection(String url)
91+
throws SQLException {
9392
if (shardConnections == null) {
9493
shardConnections = new ArrayList<>();
9594
}

0 commit comments

Comments
 (0)