diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index c1e1684aea7..c239981adc2 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1315,6 +1315,14 @@ of servers -- that is, when deploying clusters of servers. leader election. If you want to test multiple servers on a single machine, then different ports can be used for each server. +* *hostProvider.dnsSrvRefreshIntervalMs* : + (Java system property: **zookeeper.hostProvider.dnsSrvRefreshIntervalMs**) + **New in 3.10.0:** + The refresh interval in milliseconds for DNS SRV record lookups when using DnsSrvHostProvider. + This property controls how frequently the DNS SRV records are queried to update the server list. + A value of 0 disables periodic refresh. + + The default value is 60000 (60 seconds). Since ZooKeeper 3.6.0 it is possible to specify **multiple addresses** for each diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml index b2ecf1902cd..789608619de 100644 --- a/zookeeper-server/pom.xml +++ b/zookeeper-server/pom.xml @@ -190,6 +190,11 @@ commons-io compile + + dnsjava + dnsjava + 3.5.1 + diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java index 7533e01a9d0..42b48d51cd7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java @@ -45,7 +45,9 @@ import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.client.Chroot; import org.apache.zookeeper.client.ConnectStringParser; +import org.apache.zookeeper.client.ConnectionType; import org.apache.zookeeper.client.HostProvider; +import org.apache.zookeeper.client.HostProviderFactory; import org.apache.zookeeper.client.StaticHostProvider; import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.client.ZooKeeperBuilder; @@ -1142,9 +1144,10 @@ public ZooKeeper(ZooKeeperOptions options) throws IOException { if (options.getHostProvider() != null) { hostProvider = options.getHostProvider().apply(connectStringParser.getServerAddresses()); } else { - hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses(), clientConfig); + hostProvider = HostProviderFactory.createHostProvider(connectString, clientConfig); } this.hostProvider = hostProvider; + validateHostProviderCompatibility(connectString, hostProvider); chroot = Chroot.ofNullable(connectStringParser.getChrootPath()); cnxn = createConnection( @@ -1332,6 +1335,11 @@ public synchronized void close() throws InterruptedException { LOG.debug("Ignoring unexpected exception during close", e); } + // Close the host provider to release any resources + if (hostProvider != null) { + hostProvider.close(); + } + LOG.info("Session: 0x{} closed", Long.toHexString(getSessionId())); } @@ -3202,4 +3210,26 @@ public synchronized List whoAmI() throws InterruptedException { return response.getClientInfo(); } + /** + * Validates compatibility between connectString and hostProvider. + * + * @param connectString the connection string provided by user + * @param hostProvider the host provider provided by user + * @throws IllegalArgumentException if incompatible combination is detected + */ + private static void validateHostProviderCompatibility(final String connectString, final HostProvider hostProvider) { + final ConnectionType connectStringType = ConnectStringParser.getConnectionType(connectString); + final ConnectionType supportedConnectStringType = hostProvider.getSupportedConnectionType(); + + if (connectStringType != supportedConnectStringType) { + final String hostProviderName = hostProvider.getClass().getSimpleName(); + + LOG.error("Connection string type {} is incompatible with host provider type {}: connectString={}, hostProvider={}", + connectStringType.getName(), supportedConnectStringType.getName(), connectString, hostProviderName); + throw new IllegalArgumentException( + String.format("Connection string type %s is incompatible with host provider type %s", + connectStringType.getName(), hostProviderName)); + } + } + } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java index 5d292819622..66ee7070cf3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectStringParser.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.zookeeper.common.NetUtils; import org.apache.zookeeper.common.PathUtils; +import org.apache.zookeeper.common.StringUtils; /** * A parser for ZooKeeper Client connect strings. @@ -38,9 +39,9 @@ public final class ConnectStringParser { private static final int DEFAULT_PORT = 2181; + private static final String DNS_SRV_PREFIX = "dns-srv://"; - private final String chrootPath; - + private String chrootPath; private final ArrayList serverAddresses = new ArrayList<>(); /** @@ -49,27 +50,71 @@ public final class ConnectStringParser { * @throws IllegalArgumentException * for an invalid chroot path. */ - public ConnectStringParser(String connectString) { - // parse out chroot, if any - int off = connectString.indexOf('/'); - if (off >= 0) { - String chrootPath = connectString.substring(off); - // ignore "/" chroot spec, same as null - if (chrootPath.length() == 1) { - this.chrootPath = null; - } else { - PathUtils.validatePath(chrootPath); - this.chrootPath = chrootPath; - } - connectString = connectString.substring(0, off); + public ConnectStringParser(final String connectString) { + if (StringUtils.isBlank(connectString)) { + throw new IllegalArgumentException("Connect string cannot be null or empty"); + } + + if (connectString.startsWith(DNS_SRV_PREFIX)) { + parseDnsSrvFormat(connectString); } else { - this.chrootPath = null; + parseHostPortFormat(connectString); } + } - List hostsList = split(connectString, ","); + public String getChrootPath() { + return chrootPath; + } + + public ArrayList getServerAddresses() { + return serverAddresses; + } + + + /** + * Gets the connection type for the given connect string. + * + * @param connectString the connection string to analyze + * @return ConnectionType.DNS_SRV if it's a DNS SRV connect string, ConnectionType.HOST_PORT otherwise + */ + public static ConnectionType getConnectionType(final String connectString) { + if (connectString == null) { + throw new IllegalArgumentException("connect string cannot be null"); + } + return connectString.startsWith(DNS_SRV_PREFIX) + ? ConnectionType.DNS_SRV : ConnectionType.HOST_PORT; + } + + /** + * Parse DNS SRV connection string format: dns-srv://service.domain.com/chroot + * @throws IllegalArgumentException for an invalid chroot path. + */ + private void parseDnsSrvFormat(final String connectString) { + final String dnsName = connectString.substring(DNS_SRV_PREFIX.length()); + + final String[] parts = extractChrootPath(dnsName); + final String dnsServiceName = parts[0]; + + chrootPath = parts[1]; + // The DNS service name is stored as a placeholder address + // The actual resolution will be handled by DnsSrvHostProvider + serverAddresses.add(InetSocketAddress.createUnresolved(dnsServiceName, DEFAULT_PORT)); + } + + /** + * Parse host and port by splitting client connectString + * with support for IPv6 literals + * @throws IllegalArgumentException for an invalid chroot path. + */ + private void parseHostPortFormat(String connectString) { + final String[] parts = extractChrootPath(connectString); + final String serversPart = parts[0]; + chrootPath = parts[1]; + + final List hostsList = split(serversPart, ","); for (String host : hostsList) { int port = DEFAULT_PORT; - String[] hostAndPort = NetUtils.getIPV6HostAndPort(host); + final String[] hostAndPort = NetUtils.getIPV6HostAndPort(host); if (hostAndPort.length != 0) { host = hostAndPort[0]; if (hostAndPort.length == 2) { @@ -89,12 +134,30 @@ public ConnectStringParser(String connectString) { } } - public String getChrootPath() { - return chrootPath; - } + /** + * Extract chroot path from a connection string. + * + * @param connectionString the connection string that may contain a chroot path + * @return array where [0] is the server part (before chroot) and [1] is the chroot path (or null) + * @throws IllegalArgumentException for an invalid chroot path + */ + private String[] extractChrootPath(final String connectionString) { + String serverPart = connectionString; + String chrootPath = null; - public ArrayList getServerAddresses() { - return serverAddresses; + // parse out chroot, if any + final int chrootIndex = connectionString.indexOf('/'); + if (chrootIndex >= 0) { + chrootPath = connectionString.substring(chrootIndex); + // ignore "/" chroot spec, same as null + if (chrootPath.length() == 1) { + chrootPath = null; + } else { + PathUtils.validatePath(chrootPath); + } + serverPart = connectionString.substring(0, chrootIndex); + } + return new String[]{serverPart, chrootPath}; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectionType.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectionType.java new file mode 100644 index 00000000000..60ceac9869e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectionType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Represents the type of connection resolution used by ZooKeeper clients. + * This is an internal enum used for connection string and provider validation. + */ +@InterfaceAudience.Private +public enum ConnectionType { + /** + * Traditional host:port static server list. + */ + HOST_PORT("Host:Port"), + + /** + * DNS SRV record-based service discovery. + */ + DNS_SRV("DNS SRV"); + + private final String name; + + ConnectionType(final String name) { + this.name = name; + } + + /** + * Returns the name for this connection type. + * @return the name + */ + public String getName() { + return name; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java new file mode 100644 index 00000000000..8444b8b838b --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.common.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xbill.DNS.Lookup; +import org.xbill.DNS.Record; +import org.xbill.DNS.SRVRecord; +import org.xbill.DNS.Type; + +/** + * DNS SRV-based HostProvider that dynamically resolves host port names from DNS SRV records. + * + *

This implementation periodically refreshes the server list by querying DNS SRV records + * and uses HostConnectionManager for all connection management and reconfiguration logic.

+ * + * + *

Two-Phase Update Strategy:

+ *
    + *
  • Phase 1 (Background): Timer thread detects DNS changes without blocking connection request
  • + *
  • Phase 2 (Connection-time): Changes applied during actual connection attempts
  • + *
+ */ +@InterfaceAudience.Public +public final class DnsSrvHostProvider implements HostProvider { + + public interface DnsResolver { + /** + * Performs a DNS SRV lookup for the specified name. + * + * @param dnsName the DNS name to look up + * @return array of SRV records, empty array if no records found + * @throws IOException if DNS lookup encounters an error + */ + SRVRecord[] lookupSrvRecords(String dnsName) throws IOException; + } + + private static final Logger LOG = LoggerFactory.getLogger(DnsSrvHostProvider.class); + + private final String dnsName; + private final DnsResolver dnsResolver; + private final long refreshIntervalMs; + private final Timer dnsRefreshTimer; + private HostConnectionManager connectionManager; + + // Track the current connected host for accurate load balancing decisions + private final AtomicReference currentConnectedHost = new AtomicReference<>(); + // Track the previous server list to detect changes + private volatile List previousServerList; + private volatile List latestServerList; + private final AtomicBoolean serverListChanged = new AtomicBoolean(false); + + /** + * Constructs a DnsSrvHostProvider with the given DNS name + * + * @param dnsName the DNS name to query for SRV records + * @throws IllegalArgumentException if dnsName is null or empty or invalid + */ + public DnsSrvHostProvider(final String dnsName) { + this(dnsName, null); + } + + /** + * Constructs a DnsSrvHostProvider with the given DNS name and ZKClientConfig + * + * @param dnsName the DNS name to query for SRV records + * @param clientConfig ZooKeeper client configuration + * @throws IllegalArgumentException if dnsName is null or empty or invalid + */ + public DnsSrvHostProvider(final String dnsName, final ZKClientConfig clientConfig) { + this(dnsName, System.currentTimeMillis() ^ dnsName.hashCode(), clientConfig); + } + + /** + * Constructs a DnsSrvHostProvider with the given DNS name, randomness seed and ZKClientConfig + * + * @param dnsName the DNS name to query for SRV records + * @param randomnessSeed seed for randomization + * @param clientConfig ZooKeeper client configuration + * @throws IllegalArgumentException if dnsName is null or empty or invalid + */ + DnsSrvHostProvider(final String dnsName, final long randomnessSeed, final ZKClientConfig clientConfig) { + this(dnsName, randomnessSeed, new DefaultDnsResolver(), clientConfig); + } + + /** + * Constructs a DnsSrvHostProvider with the given DNS name, randomization seed, DNS resolver and ZKClientConfig + * + * @param dnsName the DNS name to query for SRV records + * @param randomnessSeed seed for randomization + * @param dnsResolver custom DNS resolver + * @param clientConfig ZooKeeper client configuration + * @throws IllegalArgumentException if dnsName is null or empty or invalid + */ + public DnsSrvHostProvider(final String dnsName, final long randomnessSeed, final DnsResolver dnsResolver, final ZKClientConfig clientConfig) { + if (StringUtils.isBlank(dnsName)) { + throw new IllegalArgumentException("DNS name cannot be null or empty"); + } + + this.dnsName = dnsName; + this.dnsResolver = dnsResolver; + this.refreshIntervalMs = validateRefreshInterval(); + this.dnsRefreshTimer = initializeDnsRefreshTimer(); + + init(randomnessSeed, clientConfig); + } + + @Override + public int size() { + return connectionManager.size(); + } + + @Override + public InetSocketAddress next(long spinDelay) { + applyPendingServerListUpdate(); + return connectionManager.next(spinDelay); + } + + @Override + public void onConnected() { + currentConnectedHost.set(connectionManager.getServerAtCurrentIndex()); + connectionManager.onConnected(); + } + + @Override + public boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost) { + return connectionManager.updateServerList(serverAddresses, currentHost); + } + + @Override + public void close() { + if (dnsRefreshTimer != null) { + dnsRefreshTimer.cancel(); + } + } + + @Override + public ConnectionType getSupportedConnectionType() { + return ConnectionType.DNS_SRV; + } + + + /** + * Validates and returns the refresh interval + * + * @return the validated refresh interval in milliseconds + */ + private long validateRefreshInterval() { + final long defaultInterval = ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS_DEFAULT; + + final long interval = Long.getLong(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS, defaultInterval); + if (interval < 0) { + LOG.warn("Invalid DNS SRV refresh interval {}, using default {}", interval, defaultInterval); + return defaultInterval; + } + if (interval == 0) { + LOG.info("DNS SRV refresh disabled (interval = 0) for {}", dnsName); + } + return interval; + } + + + /** + * Initializes the DNS refresh timer if refresh interval is greater than 0. + * + * @return the initialized Timer or null if refresh interval is 0 or negative + */ + private Timer initializeDnsRefreshTimer() { + if (refreshIntervalMs > 0) { + return new Timer("DnsSrvRefresh-" + dnsName, true); + } + return null; + } + + /** + * Initializes the DNS SRV host provider. + * + * @param randomnessSeed seed for randomization + * @param clientConfig ZooKeeper client configuration + * @throws IllegalArgumentException if no SRV records are found or initialization fails + */ + private void init(final long randomnessSeed, final ZKClientConfig clientConfig) { + try { + final List serverAddresses = queryDnsSrvRecords(); + if (serverAddresses.isEmpty()) { + throw new IllegalArgumentException("No SRV records found for DNS name: " + dnsName); + } + + this.connectionManager = new HostConnectionManager(serverAddresses, randomnessSeed, clientConfig); + this.previousServerList = new ArrayList<>(serverAddresses); + this.latestServerList = new ArrayList<>(serverAddresses); + + setupBackgroundDnsRefreshTimer(); + + LOG.info("DnsSrvHostProvider initialized with {} servers from DNS name: {} with refresh interval: {} ms", + serverAddresses.size(), dnsName, refreshIntervalMs); + } catch (final Exception e) { + LOG.error("Failed to initialize DnsSrvHostProvider for DNS name: {}", dnsName, e); + throw new IllegalArgumentException("Failed to initialize DnsSrvHostProvider for DNS name: " + dnsName, e); + } + } + + /** + * Queries DNS SRV records and returns a list of server addresses. + * + * @return list of server addresses from SRV records, empty list on failure + */ + private List queryDnsSrvRecords() { + final List addresses = new ArrayList<>(); + + try { + final SRVRecord[] srvRecords = dnsResolver.lookupSrvRecords(dnsName); + if (srvRecords.length == 0) { + LOG.warn("No SRV records found for DNS name: {}", dnsName); + return addresses; + } + + for (final SRVRecord srvRecord : srvRecords) { + try { + final InetSocketAddress address = createAddressFromSrvRecord(srvRecord); + if (address != null) { + addresses.add(address); + } + } catch (final Exception e) { + LOG.warn("Failed to create address from SRV record {}: {}", srvRecord, e.getMessage()); + } + } + } catch (final Exception e) { + LOG.error("DNS SRV lookup failed for {}", dnsName, e); + } + return addresses; + } + + /** + * Creates an InetSocketAddress from an SRV record. + * + * @param srvRecord the SRV record to process + * @return InetSocketAddress or null if invalid + */ + private InetSocketAddress createAddressFromSrvRecord(final SRVRecord srvRecord) { + if (srvRecord == null) { + return null; + } + + try { + final String target = srvRecord.getTarget().toString(true); + final int port = srvRecord.getPort(); + + if (port <= 0 || port > 65535) { + LOG.warn("Invalid port {} in SRV record for target {}", port, target); + return null; + } + + if (StringUtils.isBlank(target)) { + LOG.warn("Empty or blank target in SRV record"); + return null; + } + + return new InetSocketAddress(target, port); + } catch (final Exception e) { + LOG.warn("Failed to create InetSocketAddress from SRV record {}: {}", srvRecord, e.getMessage()); + return null; + } + } + + /** + * Sets up the background DNS refresh timer task if needed. + */ + private void setupBackgroundDnsRefreshTimer() { + if (dnsRefreshTimer != null) { + dnsRefreshTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + refreshServerListInBackground(); + } + }, + refreshIntervalMs, + refreshIntervalMs); + } + } + + /** + * Performs background DNS refresh to detect server list changes without blocking connection attempts. + * + *

Important: This method only detects changes; the actual server list update + * is deferred until the next connection attempt via {@link #applyPendingServerListUpdate()}.

+ */ + private void refreshServerListInBackground() { + try { + // Refresh the server list + final List newAddresses = queryDnsSrvRecords(); + if (newAddresses.isEmpty()) { + LOG.warn("DNS SRV lookup returned no records for {}, will retry on next refresh", dnsName); + return; + } + + // Check if server list has changed + if (!Objects.equals(previousServerList, newAddresses)) { + latestServerList = new ArrayList<>(newAddresses); + serverListChanged.set(true); + LOG.info("New server list detected from DNS SRV: {} servers", newAddresses.size()); + } + } catch (final Exception e) { + LOG.warn("Failed to refresh server list from DNS SRV records for {}: {}", dnsName, e.getMessage()); + } + } + + /** + * Apply pending server list updates when connection is actually needed. + * It is called from next() to ensure server list changes are applied with proper connection context. + */ + private synchronized void applyPendingServerListUpdate() { + if (serverListChanged.get() && latestServerList != null) { + try { + final boolean needReconnect = connectionManager.updateServerList(latestServerList, currentConnectedHost.get()); + + previousServerList = new ArrayList<>(latestServerList); + serverListChanged.set(false); + + LOG.info("Applied server list update during connection attempt. servers size: {}, need reconnection: {}", + latestServerList.size(), needReconnect); + } catch (final Exception e) { + LOG.error("Failed to apply server list update", e); + } + } + } + + /** + * Default implementation of DnsResolver that uses the real DNS system. + */ + private static class DefaultDnsResolver implements DnsResolver { + @Override + public SRVRecord[] lookupSrvRecords(final String name) throws IOException { + final Lookup lookup = new Lookup(name, Type.SRV); + final Record[] records = lookup.run(); + + if (lookup.getResult() != Lookup.SUCCESSFUL) { + final String errorMsg = lookup.getErrorString(); + throw new IOException("DNS SRV lookup failed for " + name + ": " + errorMsg); + } + + if (records == null) { + return new SRVRecord[0]; + } + return Arrays.stream(records).map(SRVRecord.class::cast).toArray(SRVRecord[]::new); + } + } +} + diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java new file mode 100644 index 00000000000..c71830832af --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages ZooKeeper server connections with round-robin selection and load balancing during + * cluster reconfiguration. + * + *

Key Features:

+ *
    + *
  • Round-robin server selection with IP address resolution
  • + *
  • Migrates clients to balance load when servers are added or removed
  • + *
  • Probabilistic client migration during server list updates
  • + *
+ * + *

Reconfiguration Behavior:

+ * When server list changes, enters "reconfigMode" and calculates migration probabilities + * to balance load between old and new servers. + * + *

See ZOOKEEPER-1355 + * for more details.

+ */ +@InterfaceAudience.Private +public final class HostConnectionManager { + + /** + * Interface for address resolution to support testing and different resolution strategies. + */ + public interface Resolver { + InetAddress[] getAllByName(String name) throws UnknownHostException; + } + + private static final Logger LOG = LoggerFactory.getLogger(HostConnectionManager.class); + + private List serverAddresses; + private final ZKClientConfig clientConfig; + private final Resolver resolver; + private final Random sourceOfRandomness; + private int lastIndex; + private int currentIndex; + + /** + * The following fields are used to migrate clients during reconfiguration + */ + private boolean reconfigMode = false; + private final List oldServers = new ArrayList<>(5); + private final List newServers = new ArrayList<>(5); + private int currentIndexOld = -1; + private int currentIndexNew = -1; + private float pOld, pNew; + + /** + * Constructs a HostConnectionManager with default resolver. + * + * @param serverAddresses + * possibly unresolved ZooKeeper server addresses + * @param randomnessSeed + * a seed used to initialize sourceOfRandomness + * @param clientConfig + * ZooKeeper client configuration + * @throws IllegalArgumentException + * if serverAddresses is empty + */ + public HostConnectionManager(Collection serverAddresses, long randomnessSeed, ZKClientConfig clientConfig) { + this(serverAddresses, randomnessSeed, InetAddress::getAllByName, clientConfig); + } + + /** + * Constructs a HostConnectionManager with custom resolver. + * + * @param serverAddresses + * possibly unresolved ZooKeeper server addresses + * @param randomnessSeed + * a seed used to initialize sourceOfRandomness + * @param resolver + * custom resolver implementation + * @param clientConfig + * ZooKeeper client configuration + * @throws IllegalArgumentException + * if serverAddresses is empty + */ + public HostConnectionManager(Collection serverAddresses, + long randomnessSeed, + Resolver resolver, + ZKClientConfig clientConfig) { + this.clientConfig = clientConfig == null ? new ZKClientConfig() : clientConfig; + this.sourceOfRandomness = new Random(randomnessSeed); + this.resolver = resolver; + + if (serverAddresses.isEmpty()) { + throw new IllegalArgumentException("A HostProvider may not be empty!"); + } + + this.serverAddresses = shuffle(serverAddresses); + currentIndex = -1; + lastIndex = -1; + + LOG.info("HostConnectionManager initialized with {} servers", serverAddresses.size()); + } + + /** + * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false + * otherwise. Changing connections is necessary if one of the following holds: + * a) the host to which this client is currently connected is not in serverAddresses. + * Otherwise (if currentHost is in the new list serverAddresses): + * b) the number of servers in the cluster is increasing - in this case the load on currentHost should decrease, + * which means that SOME of the clients connected to it will migrate to the new servers. The decision whether + * this client migrates or not (i.e., whether true or false is returned) is probabilistic so that the expected + * number of clients connected to each server is the same. + * + * If true is returned, the function sets pOld and pNew that correspond to the probability to migrate to one of the + * new servers in serverAddresses or one of the old servers (migrating to one of the old servers is done only + * if our client's currentHost is not in serverAddresses). See nextHostInReconfigMode for the selection logic. + * + * See ZOOKEEPER-1355 + * for the protocol and its evaluation. + * + * @param serverAddresses + * new host list + * @param currentHost + * the host to which this client is currently connected + * @return true if changing connections is necessary for load-balancing, false otherwise + */ + synchronized boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost) { + List shuffledList = shuffle(serverAddresses); + if (shuffledList.isEmpty()) { + throw new IllegalArgumentException("The server list may not be empty!"); + } + + int oldSize = this.serverAddresses.size(); + int newSize = shuffledList.size(); + + // Check if client's current server is in the new list of servers + boolean myServerInNewConfig = false; + InetSocketAddress myServer = currentHost; + + // choose "current" server according to the client rebalancing algorithm + if (reconfigMode) { + myServer = next(0); + } + + // if the client is not currently connected to any server + if (myServer == null) { + // reconfigMode = false (next shouldn't return null). + if (lastIndex >= 0) { + // take the last server to which we were connected + myServer = this.serverAddresses.get(lastIndex); + } else { + // take the first server on the list + myServer = this.serverAddresses.get(0); + } + } + + for (InetSocketAddress addr : shuffledList) { + if (addr.getPort() == myServer.getPort() + && ((addr.getAddress() != null + && myServer.getAddress() != null + && addr.getAddress().equals(myServer.getAddress())) + || addr.getHostString().equals(myServer.getHostString()))) { + myServerInNewConfig = true; + break; + } + } + + reconfigMode = true; + + newServers.clear(); + oldServers.clear(); + // Divide the new servers into oldServers that were in the previous list + // and newServers that were not in the previous list + for (InetSocketAddress address : shuffledList) { + if (this.serverAddresses.contains(address)) { + oldServers.add(address); + } else { + newServers.add(address); + } + } + + int numOld = oldServers.size(); + int numNew = newServers.size(); + + // number of servers increased + if (numOld + numNew > this.serverAddresses.size()) { + if (myServerInNewConfig) { + // my server is in new config, but load should be decreased. + // Need to decide if this client + // is moving to one of the new servers + if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses.size()) / (numOld + numNew))) { + pNew = 1; + pOld = 0; + } else { + // do nothing special - stay with the current server + reconfigMode = false; + } + } else { + // my server is not in new config, and load on old servers must + // be decreased, so connect to + // one of the new servers + pNew = 1; + pOld = 0; + } + } else { // number of servers stayed the same or decreased + if (myServerInNewConfig) { + // my server is in new config, and load should be increased, so + // stay with this server and do nothing special + reconfigMode = false; + } else { + pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew)))) + / ((numOld + numNew) * (this.serverAddresses.size() - numOld)); + pNew = 1 - pOld; + } + } + + if (!reconfigMode) { + currentIndex = shuffledList.indexOf(getServerAtCurrentIndex()); + } else { + currentIndex = -1; + } + this.serverAddresses = shuffledList; + currentIndexOld = -1; + currentIndexNew = -1; + lastIndex = currentIndex; + + LOG.info("Server list updated: oldSize={}, newSize={}, reconfigMode={}", oldSize, newSize, reconfigMode); + + return reconfigMode; + } + + /** + * Get the next server to connect to. + * + * @param spinDelay milliseconds to wait if all hosts have been tried once + * @return the next server address to connect to + */ + InetSocketAddress next(long spinDelay) { + boolean needToSleep = false; + InetSocketAddress addr; + + synchronized (this) { + if (reconfigMode) { + addr = nextHostInReconfigMode(); + if (addr != null) { + currentIndex = serverAddresses.indexOf(addr); + return resolve(addr); + } + //tried all servers and couldn't connect + reconfigMode = false; + needToSleep = (spinDelay > 0); + } + ++currentIndex; + if (currentIndex == serverAddresses.size()) { + currentIndex = 0; + } + addr = serverAddresses.get(currentIndex); + needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0); + if (lastIndex == -1) { + // We don't want to sleep on the first ever connect attempt. + lastIndex = 0; + } + } + + if (needToSleep) { + try { + Thread.sleep(spinDelay); + } catch (InterruptedException e) { + LOG.warn("Unexpected exception", e); + } + } + + return resolve(addr); + } + + /** + * Notify that a connection has been established successfully. + */ + synchronized void onConnected() { + lastIndex = currentIndex; + reconfigMode = false; + } + + /** + * Get the server at the specified index. + * + * @param i the index + * @return the server address at the index, or null if index is out of bounds + */ + synchronized InetSocketAddress getServerAtIndex(int i) { + if (i < 0 || i >= serverAddresses.size()) { + return null; + } + return serverAddresses.get(i); + } + + /** + * Get the server at the current index. + * + * @return the server address at the current index + */ + synchronized InetSocketAddress getServerAtCurrentIndex() { + return getServerAtIndex(currentIndex); + } + + /** + * Get the number of servers in the list. + * + * @return the number of servers + */ + synchronized int size() { + return serverAddresses.size(); + } + + /** + * Resolves an InetSocketAddress to a concrete address. + * + * @param address + * the address to resolve + * @return resolved address or original address if resolution fails + */ + private InetSocketAddress resolve(InetSocketAddress address) { + try { + String curHostString = address.getHostString(); + List resolvedAddresses = new ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString))); + if (resolvedAddresses.isEmpty()) { + return address; + } + if (clientConfig.isShuffleDnsResponseEnabled()) { + Collections.shuffle(resolvedAddresses); + } + return new InetSocketAddress(resolvedAddresses.get(0), address.getPort()); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve address: {}", address, e); + return address; + } + } + + /** + * Shuffles the server addresses using the internal random source. + * + * @param serverAddresses + * collection of server addresses to shuffle + * @return shuffled list of server addresses + */ + private List shuffle(Collection serverAddresses) { + List tmpList = new ArrayList<>(serverAddresses.size()); + tmpList.addAll(serverAddresses); + Collections.shuffle(tmpList, sourceOfRandomness); + return tmpList; + } + + /** + * Get the next server to connect to, when in "reconfigMode", which means that + * you've just updated the server list, and now trying to find some server to connect to. + * Once onConnected() is called, reconfigMode is set to false. Similarly, if we tried to connect + * to all servers in new config and failed, reconfigMode is set to false. + * + * While in reconfigMode, we should connect to a server in newServers with probability pNew and to servers in + * oldServers with probability pOld (which is just 1-pNew). If we tried out all servers in either oldServers + * or newServers we continue to try servers from the other set, regardless of pNew or pOld. If we tried all servers + * we give up and go back to the normal round robin mode + * + * When called, this should be protected by synchronized(this) + */ + private InetSocketAddress nextHostInReconfigMode() { + boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew); + + // take one of the new servers if it is possible (there are still such + // servers we didn't try), + // and either the probability tells us to connect to one of the new + // servers or if we already + // tried all the old servers + if (((currentIndexNew + 1) < newServers.size()) && (takeNew || (currentIndexOld + 1) >= oldServers.size())) { + ++currentIndexNew; + return newServers.get(currentIndexNew); + } + + // start taking old servers + if ((currentIndexOld + 1) < oldServers.size()) { + ++currentIndexOld; + return oldServers.get(currentIndexOld); + } + + return null; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java index 73a102f1a8c..f8ceffbdc40 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java @@ -43,7 +43,7 @@ * * A HostProvider that prefers nearby hosts. */ @InterfaceAudience.Public -public interface HostProvider { +public interface HostProvider extends AutoCloseable { int size(); @@ -72,4 +72,21 @@ public interface HostProvider { */ boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost); + /** + * Returns the connection type that this HostProvider supports. + * Default implementation returns HOST_PORT for backward compatibility. + * + * @return the ConnectionType supported by this provider + */ + default ConnectionType getSupportedConnectionType() { + return ConnectionType.HOST_PORT; + } + + /** + * Close the HostProvider and release any resources. + * Default implementation does nothing for backward compatibility. + */ + @Override + default void close() { + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java new file mode 100644 index 00000000000..47a027f16ac --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProviderFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.common.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Factory for creating appropriate HostProvider instances based on connection string format. + * This factory enables zero-code-change migration by automatically detecting the connection + * string format and creating the appropriate HostProvider implementation. + * + * Supported formats: + * - Host:Port: "host1:port1,host2:port2,host3:port3" (StaticHostProvider) + * - DNS SRV: "dns-srv://service.domain.com" (DnsSrvHostProvider) + * - Future formats can be easily added by extending the factory + */ +@InterfaceAudience.Public +public class HostProviderFactory { + + private static final Logger LOG = LoggerFactory.getLogger(HostProviderFactory.class); + + /** + * Creates a HostProvider based on the connection string format. + * + * @param connectString the connection string (host:port or DNS SRV format) + * @param clientConfig ZooKeeper client configuration + * + * @return appropriate HostProvider implementation + * @throws IllegalArgumentException if the connection string format is not supported + */ + public static HostProvider createHostProvider(final String connectString, final ZKClientConfig clientConfig) { + if (StringUtils.isBlank(connectString)) { + throw new IllegalArgumentException("Connection string cannot be null or empty"); + } + + final String trimmedConnectString = connectString.trim(); + final ConnectionType connectionType = ConnectStringParser.getConnectionType(trimmedConnectString); + if (connectionType == ConnectionType.DNS_SRV) { + LOG.info("Detected DNS SRV connection string format: {}", trimmedConnectString); + return createDnsSrvHostProvider(trimmedConnectString, clientConfig); + } + final ConnectStringParser parser = new ConnectStringParser(trimmedConnectString); + return new StaticHostProvider(parser.getServerAddresses(), clientConfig); + } + + private static DnsSrvHostProvider createDnsSrvHostProvider(final String connectString, final ZKClientConfig clientConfig) { + final ConnectStringParser parser = new ConnectStringParser(connectString); + + if (parser.getServerAddresses().isEmpty()) { + throw new IllegalArgumentException("No DNS service name found in connect string: " + connectString); + } + + final String dnsServiceName = parser.getServerAddresses().get(0).getHostString(); + return new DnsSrvHostProvider(dnsServiceName, clientConfig); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java index e07754c3558..df20922ab87 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java @@ -21,15 +21,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Random; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Most simple HostProvider, resolves on every next() call. @@ -38,6 +31,7 @@ * present across the stack like in JVM, OS level, hardware, etc. The best we could do here is to get the most recent * address from the underlying system which is considered up-to-date. * + * It uses HostConnectionManager for all connection management and reconfiguration logic. */ @InterfaceAudience.Public public final class StaticHostProvider implements HostProvider { @@ -48,32 +42,7 @@ public interface Resolver { } - private static final Logger LOG = LoggerFactory.getLogger(StaticHostProvider.class); - - private ZKClientConfig clientConfig = null; - - private List serverAddresses = new ArrayList<>(5); - - private Random sourceOfRandomness; - private int lastIndex = -1; - - private int currentIndex = -1; - - /** - * The following fields are used to migrate clients during reconfiguration - */ - private boolean reconfigMode = false; - - private final List oldServers = new ArrayList<>(5); - - private final List newServers = new ArrayList<>(5); - - private int currentIndexOld = -1; - private int currentIndexNew = -1; - - private float pOld, pNew; - - private Resolver resolver; + private final HostConnectionManager connectionManager; /** * Constructs a SimpleHostSet. @@ -84,12 +53,7 @@ public interface Resolver { * if serverAddresses is empty or resolves to an empty list */ public StaticHostProvider(Collection serverAddresses) { - init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() { - @Override - public InetAddress[] getAllByName(String name) throws UnknownHostException { - return InetAddress.getAllByName(name); - } - }); + connectionManager = new HostConnectionManager(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), null); } /** @@ -104,12 +68,10 @@ public InetAddress[] getAllByName(String name) throws UnknownHostException { * ZooKeeper client configuration */ public StaticHostProvider(Collection serverAddresses, ZKClientConfig clientConfig) { - init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() { - @Override - public InetAddress[] getAllByName(String name) throws UnknownHostException { - return InetAddress.getAllByName(name); - } - }, clientConfig); + connectionManager = new HostConnectionManager(serverAddresses, + System.currentTimeMillis() ^ this.hashCode(), + InetAddress::getAllByName, + clientConfig); } /** @@ -124,7 +86,10 @@ public InetAddress[] getAllByName(String name) throws UnknownHostException { * custom resolver implementation */ public StaticHostProvider(Collection serverAddresses, Resolver resolver) { - init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), resolver); + this.connectionManager = new HostConnectionManager(serverAddresses, + System.currentTimeMillis() ^ this.hashCode(), + resolver::getAllByName, + null); } /** @@ -133,274 +98,43 @@ public StaticHostProvider(Collection serverAddresses, Resolve * * @param serverAddresses * possibly unresolved ZooKeeper server addresses - * @param randomnessSeed a seed used to initialize sourceOfRandomnes + * @param randomnessSeed a seed used to initialize sourceOfRandomness * @throws IllegalArgumentException * if serverAddresses is empty or resolves to an empty list */ public StaticHostProvider(Collection serverAddresses, long randomnessSeed) { - init(serverAddresses, randomnessSeed, new Resolver() { - @Override - public InetAddress[] getAllByName(String name) throws UnknownHostException { - return InetAddress.getAllByName(name); - } - }); + this.connectionManager = new HostConnectionManager(serverAddresses, + randomnessSeed, + InetAddress::getAllByName, + null); } - private void init(Collection serverAddresses, long randomnessSeed, Resolver resolver) { - init(serverAddresses, randomnessSeed, resolver, null); - } - - private void init(Collection serverAddresses, long randomnessSeed, Resolver resolver, - ZKClientConfig clientConfig) { - this.clientConfig = clientConfig == null ? new ZKClientConfig() : clientConfig; - this.sourceOfRandomness = new Random(randomnessSeed); - this.resolver = resolver; - if (serverAddresses.isEmpty()) { - throw new IllegalArgumentException("A HostProvider may not be empty!"); - } - this.serverAddresses = shuffle(serverAddresses); - currentIndex = -1; - lastIndex = -1; - } - - private InetSocketAddress resolve(InetSocketAddress address) { - try { - String curHostString = address.getHostString(); - List resolvedAddresses = new ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString))); - if (resolvedAddresses.isEmpty()) { - return address; - } - if (clientConfig.isShuffleDnsResponseEnabled()) { - Collections.shuffle(resolvedAddresses); - } - return new InetSocketAddress(resolvedAddresses.get(0), address.getPort()); - } catch (UnknownHostException e) { - LOG.error("Unable to resolve address: {}", address.toString(), e); - return address; - } - } - - private List shuffle(Collection serverAddresses) { - List tmpList = new ArrayList<>(serverAddresses.size()); - tmpList.addAll(serverAddresses); - Collections.shuffle(tmpList, sourceOfRandomness); - return tmpList; - } - - /** - * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false - * otherwise. Changing connections is necessary if one of the following holds: - * a) the host to which this client is currently connected is not in serverAddresses. - * Otherwise (if currentHost is in the new list serverAddresses): - * b) the number of servers in the cluster is increasing - in this case the load on currentHost should decrease, - * which means that SOME of the clients connected to it will migrate to the new servers. The decision whether - * this client migrates or not (i.e., whether true or false is returned) is probabilistic so that the expected - * number of clients connected to each server is the same. - * - * If true is returned, the function sets pOld and pNew that correspond to the probability to migrate to ones of the - * new servers in serverAddresses or one of the old servers (migrating to one of the old servers is done only - * if our client's currentHost is not in serverAddresses). See nextHostInReconfigMode for the selection logic. - * - * See ZOOKEEPER-1355 - * for the protocol and its evaluation, and StaticHostProviderTest for the tests that illustrate how load balancing - * works with this policy. - * - * @param serverAddresses new host list - * @param currentHost the host to which this client is currently connected - * @return true if changing connections is necessary for load-balancing, false otherwise - */ @Override - public synchronized boolean updateServerList( - Collection serverAddresses, - InetSocketAddress currentHost) { - List shuffledList = shuffle(serverAddresses); - if (shuffledList.isEmpty()) { - throw new IllegalArgumentException("A HostProvider may not be empty!"); - } - // Check if client's current server is in the new list of servers - boolean myServerInNewConfig = false; - - InetSocketAddress myServer = currentHost; - - // choose "current" server according to the client rebalancing algorithm - if (reconfigMode) { - myServer = next(0); - } - - // if the client is not currently connected to any server - if (myServer == null) { - // reconfigMode = false (next shouldn't return null). - if (lastIndex >= 0) { - // take the last server to which we were connected - myServer = this.serverAddresses.get(lastIndex); - } else { - // take the first server on the list - myServer = this.serverAddresses.get(0); - } - } - - for (InetSocketAddress addr : shuffledList) { - if (addr.getPort() == myServer.getPort() - && ((addr.getAddress() != null - && myServer.getAddress() != null - && addr.getAddress().equals(myServer.getAddress())) - || addr.getHostString().equals(myServer.getHostString()))) { - myServerInNewConfig = true; - break; - } - } - - reconfigMode = true; - - newServers.clear(); - oldServers.clear(); - // Divide the new servers into oldServers that were in the previous list - // and newServers that were not in the previous list - for (InetSocketAddress address : shuffledList) { - if (this.serverAddresses.contains(address)) { - oldServers.add(address); - } else { - newServers.add(address); - } - } - - int numOld = oldServers.size(); - int numNew = newServers.size(); - - // number of servers increased - if (numOld + numNew > this.serverAddresses.size()) { - if (myServerInNewConfig) { - // my server is in new config, but load should be decreased. - // Need to decide if this client - // is moving to one of the new servers - if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses.size()) / (numOld + numNew))) { - pNew = 1; - pOld = 0; - } else { - // do nothing special - stay with the current server - reconfigMode = false; - } - } else { - // my server is not in new config, and load on old servers must - // be decreased, so connect to - // one of the new servers - pNew = 1; - pOld = 0; - } - } else { // number of servers stayed the same or decreased - if (myServerInNewConfig) { - // my server is in new config, and load should be increased, so - // stay with this server and do nothing special - reconfigMode = false; - } else { - pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew)))) - / ((numOld + numNew) * (this.serverAddresses.size() - numOld)); - pNew = 1 - pOld; - } - } - - if (!reconfigMode) { - currentIndex = shuffledList.indexOf(getServerAtCurrentIndex()); - } else { - currentIndex = -1; - } - this.serverAddresses = shuffledList; - currentIndexOld = -1; - currentIndexNew = -1; - lastIndex = currentIndex; - return reconfigMode; - } - - public synchronized InetSocketAddress getServerAtIndex(int i) { - if (i < 0 || i >= serverAddresses.size()) { - return null; - } - return serverAddresses.get(i); + public boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost) { + return connectionManager.updateServerList(serverAddresses, currentHost); } - public synchronized InetSocketAddress getServerAtCurrentIndex() { - return getServerAtIndex(currentIndex); + @Override + public int size() { + return connectionManager.size(); } - public synchronized int size() { - return serverAddresses.size(); + @Override + public InetSocketAddress next(long spinDelay) { + return connectionManager.next(spinDelay); } - /** - * Get the next server to connect to, when in "reconfigMode", which means that - * you've just updated the server list, and now trying to find some server to connect to. - * Once onConnected() is called, reconfigMode is set to false. Similarly, if we tried to connect - * to all servers in new config and failed, reconfigMode is set to false. - * - * While in reconfigMode, we should connect to a server in newServers with probability pNew and to servers in - * oldServers with probability pOld (which is just 1-pNew). If we tried out all servers in either oldServers - * or newServers we continue to try servers from the other set, regardless of pNew or pOld. If we tried all servers - * we give up and go back to the normal round robin mode - * - * When called, this should be protected by synchronized(this) - */ - private InetSocketAddress nextHostInReconfigMode() { - boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew); - - // take one of the new servers if it is possible (there are still such - // servers we didn't try), - // and either the probability tells us to connect to one of the new - // servers or if we already - // tried all the old servers - if (((currentIndexNew + 1) < newServers.size()) && (takeNew || (currentIndexOld + 1) >= oldServers.size())) { - ++currentIndexNew; - return newServers.get(currentIndexNew); - } - - // start taking old servers - if ((currentIndexOld + 1) < oldServers.size()) { - ++currentIndexOld; - return oldServers.get(currentIndexOld); - } - - return null; + @Override + public void onConnected() { + connectionManager.onConnected(); } - public InetSocketAddress next(long spinDelay) { - boolean needToSleep = false; - InetSocketAddress addr; - - synchronized (this) { - if (reconfigMode) { - addr = nextHostInReconfigMode(); - if (addr != null) { - currentIndex = serverAddresses.indexOf(addr); - return resolve(addr); - } - //tried all servers and couldn't connect - reconfigMode = false; - needToSleep = (spinDelay > 0); - } - ++currentIndex; - if (currentIndex == serverAddresses.size()) { - currentIndex = 0; - } - addr = serverAddresses.get(currentIndex); - needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0); - if (lastIndex == -1) { - // We don't want to sleep on the first ever connect attempt. - lastIndex = 0; - } - } - if (needToSleep) { - try { - Thread.sleep(spinDelay); - } catch (InterruptedException e) { - LOG.warn("Unexpected exception", e); - } - } - return resolve(addr); + public InetSocketAddress getServerAtIndex(int i) { + return connectionManager.getServerAtIndex(i); } - public synchronized void onConnected() { - lastIndex = currentIndex; - reconfigMode = false; + public InetSocketAddress getServerAtCurrentIndex() { + return connectionManager.getServerAtCurrentIndex(); } - } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java index 5472838bdef..adc9de6ff91 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java @@ -56,6 +56,7 @@ public class ZKClientConfig extends ZKConfig { public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 0xfffff; /* 1 MB */ public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout"; public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal"; + /** * Feature is disabled by default. */ @@ -70,6 +71,14 @@ public class ZKClientConfig extends ZKConfig { public static final String ZOOKEEPER_SHUFFLE_DNS_RESPONSE = "zookeeper.shuffleDnsResponse"; public static final boolean ZOOKEEPER_SHUFFLE_DNS_RESPONSE_DEFAULT = false; + /** + * DNS SRV refresh interval in milliseconds for DnsSrvHostProvider. + * Controls how frequently DNS SRV records are queried to update the server list. + * A value of 0 disables periodic refresh. + */ + public static final String DNS_SRV_REFRESH_INTERVAL_MS = "zookeeper.hostProvider.dnsSrvRefreshIntervalMs"; + public static final long DNS_SRV_REFRESH_INTERVAL_MS_DEFAULT = 60000; + public ZKClientConfig() { super(); initFromJavaSystemProperties(); @@ -130,6 +139,7 @@ protected void handleBackwardCompatibility() { setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET)); setProperty(SECURE_CLIENT, System.getProperty(SECURE_CLIENT)); setProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS, System.getProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS)); + setProperty(DNS_SRV_REFRESH_INTERVAL_MS, System.getProperty(DNS_SRV_REFRESH_INTERVAL_MS)); } /** diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java new file mode 100644 index 00000000000..ce6709ab6cd --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/HostProviderSelectionTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import org.apache.zookeeper.client.DnsSrvHostProvider; +import org.apache.zookeeper.client.StaticHostProvider; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.test.ClientBase; +import org.junit.jupiter.api.Test; +import org.xbill.DNS.Name; +import org.xbill.DNS.SRVRecord; + +public class HostProviderSelectionTest extends ZKTestCase { + private static final String TEST_DNS_NAME = "service.com"; + + @Test + public void testStaticHostProviderSelection() throws Exception { + final String[] staticFormats = { + "localhost:2181", + "zk1:2181,zk2:2181,zk3:2181", + "zk1:2181,zk2:2181/myapp", + "[::1]:2181,[2001:db8::1]:2181" + }; + + for (final String connectString : staticFormats) { + // Test without config + try (final ZooKeeper zk = new ZooKeeper(connectString, + ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE)) { + assertNotNull(zk); + } + + // Test with config + final ZKClientConfig config = new ZKClientConfig(); + try (final ZooKeeper zk = new ZooKeeper(connectString, + ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE, config)) { + assertNotNull(zk); + } + } + } + + @Test + public void testDnsSrvHostProviderSelection() { + final String[] dnsSrvFormats = { + "dns-srv://nonexistent.test.local", + "dns-srv://nonexistent.test.local/myapp" + }; + + for (final String connectString : dnsSrvFormats) { + // Test without config + final IllegalArgumentException exception1 = assertThrows(IllegalArgumentException.class, () -> + new ZooKeeper(connectString, ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE)); + validateDnsSrvError(exception1); + + // Test with config + final ZKClientConfig config = new ZKClientConfig(); + final IllegalArgumentException exception2 = assertThrows(IllegalArgumentException.class, () -> + new ZooKeeper(connectString, ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE, config)); + validateDnsSrvError(exception2); + } + } + + @Test + public void testProviderFormatMismatch() throws Exception { + final StaticHostProvider staticProvider = new StaticHostProvider( + Collections.singletonList(new InetSocketAddress("localhost", 2181))); + + final DnsSrvHostProvider dnsSrvProvider = new DnsSrvHostProvider(TEST_DNS_NAME, + System.currentTimeMillis(), new TestDnsResolver(), new ZKClientConfig()); + + // Test 1: DNS SRV format with StaticHostProvider should fail on mismatch + IllegalArgumentException dnsSrvWithStaticException = assertThrows(IllegalArgumentException.class, () -> + new ZooKeeper("dns-srv://service.com", ClientBase.CONNECTION_TIMEOUT, + DummyWatcher.INSTANCE, false, staticProvider)); + assertEquals("Connection string type DNS SRV is incompatible with host provider type StaticHostProvider", + dnsSrvWithStaticException.getMessage()); + + // Test 2: Host:port format with DnsSrvHostProvider should fail on mismatch + IllegalArgumentException hostPortWithDnsException = assertThrows(IllegalArgumentException.class, () -> + new ZooKeeper("localhost:2181", ClientBase.CONNECTION_TIMEOUT, + DummyWatcher.INSTANCE, false, dnsSrvProvider)); + + assertEquals("Connection string type Host:Port is incompatible with host provider type DnsSrvHostProvider", + hostPortWithDnsException.getMessage()); + + // Test 3: Host:port format with StaticHostProvider should work (compatible) + try (ZooKeeper zk = new ZooKeeper("localhost:2181", ClientBase.CONNECTION_TIMEOUT, + DummyWatcher.INSTANCE, false, staticProvider)) { + assertNotNull(zk); + } + + // Test 4: DNS SRV format with DnsSrvHostProvider should work (compatible) + try (final ZooKeeper zk = new ZooKeeper("dns-srv://" + TEST_DNS_NAME, ClientBase.CONNECTION_TIMEOUT, + DummyWatcher.INSTANCE, false, dnsSrvProvider)) { + assertNotNull(zk); + } + } + + @Test + public void testInvalidFormats() { + final String[] invalidFormats = { + "", + "dns-srv://" + }; + + for (final String connectString : invalidFormats) { + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> + new ZooKeeper(connectString, ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE)); + assertTrue(exception.getMessage().contains("Connect string cannot be null or empty") + || exception.getMessage().contains("DNS name cannot be null or empty")); + } + } + + private void validateDnsSrvError(final IllegalArgumentException exception) { + final String message = exception.getMessage(); + assertTrue(message.contains("Failed to initialize DnsSrvHostProvider for DNS name:")); + } + + private static class TestDnsResolver implements DnsSrvHostProvider.DnsResolver { + @Override + public SRVRecord[] lookupSrvRecords(String dnsName) throws IOException { + return createMockSrvRecords(); + } + } + + private static SRVRecord[] createMockSrvRecords() { + try { + return new SRVRecord[] { + new SRVRecord(Name.fromString(TEST_DNS_NAME + "."), 1, 300, + 1, 1, 2181, Name.fromString("localhost.")) + }; + } catch (final Exception e) { + throw new RuntimeException("Failed to create mock SRV records", e); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java new file mode 100644 index 00000000000..af2988616cf --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.net.InetSocketAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.xbill.DNS.Name; +import org.xbill.DNS.SRVRecord; + +public class DnsSrvHostProviderTest { + + private static final String TEST_DNS_NAME = "_zookeeper._tcp.example.com."; + private static final long TEST_SEED = 12345L; + + private DnsSrvHostProvider.DnsResolver mockDnsResolver; + + @BeforeEach + public void setUp() { + mockDnsResolver = mock(DnsSrvHostProvider.DnsResolver.class); + } + + @AfterEach + public void tearDown() { + System.clearProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS); + } + + @Test + public void testBasic() throws Exception { + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)) { + assertEquals(3, hostProvider.size()); + assertNotNull(hostProvider.next(0)); + } + } + + @Test + public void testServerIteration() throws Exception { + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)) { + final InetSocketAddress addr1 = hostProvider.next(0); + final InetSocketAddress addr2 = hostProvider.next(0); + final InetSocketAddress addr3 = hostProvider.next(0); + + assertNotNull(addr1); + assertNotNull(addr2); + assertNotNull(addr3); + + // cycle back to first server + final InetSocketAddress addr4 = hostProvider.next(0); + assertNotNull(addr4); + } + } + + @Test + public void testEmptyDnsName() { + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider("", TEST_SEED, mockDnsResolver, null)); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(null, TEST_SEED, mockDnsResolver, null)); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(" ", TEST_SEED, mockDnsResolver, null)); + } + + @Test + public void testNoSrvRecords() throws Exception { + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(new SRVRecord[0]); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)); + } + + @Test + public void testDnsLookupFailure() throws Exception { + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)) + .thenThrow(new java.io.IOException("DNS lookup failed")); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)); + } + + @Test + public void testInvalidPortFiltering() throws Exception { + // Create SRV record with invalid port (0) + final SRVRecord invalidPortRecord = createMockSrvRecord("server1.example.com.", 0); + final SRVRecord[] srvRecords = new SRVRecord[]{invalidPortRecord}; + + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)); + } + + @Test + public void testTrailingDotRemoval() throws Exception { + final SRVRecord recordWithDot = createMockSrvRecord("server1.example.com.", 2181); + final SRVRecord[] srvRecords = new SRVRecord[]{recordWithDot}; + + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)) { + assertEquals(1, hostProvider.size()); + final InetSocketAddress addr = hostProvider.next(0); + + // validate trailing dot is removed + assertEquals("server1.example.com", addr.getHostString()); + } + } + + @Test + public void testRefreshIntervalZeroDisablesPeriodicRefresh() throws Exception { + // Set system property to disable refresh + System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS, "0"); + + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)) { + // Verify initial setup works + assertEquals(3, hostProvider.size()); + + // Wait to ensure no background refresh occurs + Thread.sleep(100); + + // Verify DNS resolver was only called once during initialization (no periodic refresh) + verify(mockDnsResolver, times(1)).lookupSrvRecords(TEST_DNS_NAME); + + // Verify host provider still works normally + assertNotNull(hostProvider.next(0)); + + // Test multiple next() calls to ensure functionality is not affected + for (int i = 0; i < 5; i++) { + assertNotNull(hostProvider.next(0)); + } + + // Verify no additional DNS calls were made + verify(mockDnsResolver, times(1)).lookupSrvRecords(TEST_DNS_NAME); + } + } + + @Test + public void testRefreshIntervalNegativeUsesDefault() throws Exception { + // Set system property to negative value (should use default) + System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS, "-1"); + + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver, null)) { + // Verify initial setup works (negative value should be handled gracefully) + assertEquals(3, hostProvider.size()); + assertNotNull(hostProvider.next(0)); + } + } + + private SRVRecord[] createMockSrvRecords() { + return new SRVRecord[]{ + createMockSrvRecord("server1.example.com.", 2181), + createMockSrvRecord("server2.example.com.", 2181), + createMockSrvRecord("server3.example.com.", 2181) + }; + } + + private SRVRecord createMockSrvRecord(final String target, int port) { + try { + Name targetName = Name.fromString(target); + Name serviceName = Name.fromString(TEST_DNS_NAME); + return new SRVRecord(serviceName, 1, 300, 1, 1, port, targetName); + } catch (Exception e) { + throw new RuntimeException("Failed to create mock SRV record", e); + } + } +} diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java index fc30e69920f..d7d55a4b135 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ConnectStringParserTest.java @@ -19,6 +19,11 @@ package org.apache.zookeeper.test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.HashMap; +import java.util.Map; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.client.ConnectStringParser; import org.junit.jupiter.api.Test; @@ -106,4 +111,89 @@ public void testParseIPV6ConnectionString() { assertEquals(2183, parser.getServerAddresses().get(2).getPort()); } + @Test + public void testDnsSrvFormatNoChroot() { + testDnsSrvFormat("dns-srv://zookeeper.myapp.com", "zookeeper.myapp.com", null); + } + + @Test + public void testDnsSrvFormatWithChroot() { + testDnsSrvFormat("dns-srv://zookeeper.myapp.com/myapp", "zookeeper.myapp.com", "/myapp"); + } + + @Test + public void testDnsSrvFormatWithNestedChroot() { + testDnsSrvFormat("dns-srv://zookeeper.shared.com/services/auth", "zookeeper.shared.com", "/services/auth"); + } + + @Test + public void testDnsSrvFormatWithRootChroot() { + testDnsSrvFormat("dns-srv://zookeeper.myapp.com/", "zookeeper.myapp.com", null); + } + + @Test + public void testDnsSrvFormatWithSubdomain() { + testDnsSrvFormat("dns-srv://zk.prod.myapp.com/production", "zk.prod.myapp.com", "/production"); + } + + @Test + public void testDnsSrvFormatInvalidChroot() { + final String connectString = "dns-srv://zookeeper.myapp.com/invalid/"; + assertThrows(IllegalArgumentException.class, () -> new ConnectStringParser(connectString)); + } + + @Test + public void testMixedFormatsComparison() { + final String hostPortFormat = "zk1:2181,zk2:2181/myapp"; + final String dnsSrvFormat = "dns-srv://zookeeper.myapp.com/myapp"; + + final ConnectStringParser hostPortParser = new ConnectStringParser(hostPortFormat); + final ConnectStringParser dnsSrvParser = new ConnectStringParser(dnsSrvFormat); + + // Both should have the same chroot path + assertEquals("/myapp", hostPortParser.getChrootPath()); + assertEquals("/myapp", dnsSrvParser.getChrootPath()); + + // Host:Port format should have multiple server addresses + assertEquals(2, hostPortParser.getServerAddresses().size()); + assertEquals("zk1", hostPortParser.getServerAddresses().get(0).getHostString()); + assertEquals("zk2", hostPortParser.getServerAddresses().get(1).getHostString()); + + // DNS SRV format should have single DNS service name + assertEquals(1, dnsSrvParser.getServerAddresses().size()); + assertEquals("zookeeper.myapp.com", dnsSrvParser.getServerAddresses().get(0).getHostString()); + } + + @Test + public void testBackwardCompatibility() { + final Map testCases = new HashMap<>(); + + testCases.put("localhost:2181", 1); + testCases.put("zk1:2181,zk2:2181,zk3:2181", 3); + testCases.put("zk1:2181,zk2:2181/myapp", 2); + testCases.put("[::1]:2181", 1); + testCases.put("[2001:db8::1]:2181,[2001:db8::2]:2181/test", 2); + + for (final Map.Entry testCase : testCases.entrySet()) { + final String connectString = testCase.getKey(); + final int expectedSize = testCase.getValue(); + + final ConnectStringParser parser = new ConnectStringParser(connectString); + assertNotNull(parser.getServerAddresses()); + assertEquals(expectedSize, parser.getServerAddresses().size()); + } + } + + private void testDnsSrvFormat(final String connectString, final String expectedHostName, final String expectedChrootPath) { + final ConnectStringParser parser = new ConnectStringParser(connectString); + + if (expectedChrootPath == null) { + assertNull(parser.getChrootPath()); + } else { + assertEquals(expectedChrootPath, parser.getChrootPath()); + } + assertEquals(1, parser.getServerAddresses().size()); + assertEquals(expectedHostName, parser.getServerAddresses().get(0).getHostString()); + assertEquals(2181, parser.getServerAddresses().get(0).getPort()); + } }