diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md index fea20e5fe8b..1509ccddd38 100644 --- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md +++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md @@ -1315,6 +1315,14 @@ of servers -- that is, when deploying clusters of servers. leader election. If you want to test multiple servers on a single machine, then different ports can be used for each server. +* *hostProvider.dnsSrvRefreshIntervalMs* : + (Java system property: **zookeeper.hostProvider.dnsSrvRefreshIntervalMs**) + **New in 3.10.0:** + The refresh interval in milliseconds for DNS SRV record lookups when using DnsSrvHostProvider. + This property controls how frequently the DNS SRV records are queried to update the server list. + A value of 0 disables periodic refresh. + + The default value is 60000 (60 seconds). Since ZooKeeper 3.6.0 it is possible to specify **multiple addresses** for each diff --git a/zookeeper-server/pom.xml b/zookeeper-server/pom.xml index b2ecf1902cd..789608619de 100644 --- a/zookeeper-server/pom.xml +++ b/zookeeper-server/pom.xml @@ -190,6 +190,11 @@ commons-io compile + + dnsjava + dnsjava + 3.5.1 + diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectionType.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectionType.java new file mode 100644 index 00000000000..60ceac9869e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ConnectionType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Represents the type of connection resolution used by ZooKeeper clients. + * This is an internal enum used for connection string and provider validation. + */ +@InterfaceAudience.Private +public enum ConnectionType { + /** + * Traditional host:port static server list. + */ + HOST_PORT("Host:Port"), + + /** + * DNS SRV record-based service discovery. + */ + DNS_SRV("DNS SRV"); + + private final String name; + + ConnectionType(final String name) { + this.name = name; + } + + /** + * Returns the name for this connection type. + * @return the name + */ + public String getName() { + return name; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java new file mode 100644 index 00000000000..a3cb4499377 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/DnsSrvHostProvider.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.common.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xbill.DNS.Lookup; +import org.xbill.DNS.Record; +import org.xbill.DNS.SRVRecord; +import org.xbill.DNS.Type; + +/** + * DNS SRV-based HostProvider that dynamically resolves host port names from DNS SRV records. + * + *

This implementation periodically refreshes the server list by querying DNS SRV records + * and uses HostConnectionManager for all connection management and reconfiguration logic.

+ * + * + *

Two-Phase Update Strategy:

+ * + */ +@InterfaceAudience.Public +public final class DnsSrvHostProvider implements HostProvider { + + public interface DnsResolver { + /** + * Performs a DNS SRV lookup for the specified name. + * + * @param dnsName the DNS name to look up + * @return array of SRV records, empty array if no records found + * @throws IOException if DNS lookup encounters an error + */ + SRVRecord[] lookupSrvRecords(String dnsName) throws IOException; + } + + private static final Logger LOG = LoggerFactory.getLogger(DnsSrvHostProvider.class); + + private final String dnsName; + private final DnsResolver dnsResolver; + private final long refreshIntervalMs; + private final Timer dnsRefreshTimer; + private HostConnectionManager connectionManager; + + // Track the current connected host for accurate load balancing decisions + private final AtomicReference currentConnectedHost = new AtomicReference<>(); + // Track the previous server list to detect changes + private volatile List previousServerList; + private volatile List latestServerList; + private final AtomicBoolean serverListChanged = new AtomicBoolean(false); + + /** + * Constructs a DnsSrvHostProvider with the given DNS name. + * + * @param dnsName the DNS name to query for SRV records + * @throws IllegalArgumentException if dnsName is null or empty or invalid + */ + public DnsSrvHostProvider(final String dnsName) { + this(dnsName, System.currentTimeMillis() ^ dnsName.hashCode()); + } + + /** + * Constructs a DnsSrvHostProvider with deterministic randomness seed + * + * @param dnsName the DNS name to query for SRV records + * @param randomnessSeed seed for randomization + * @throws IllegalArgumentException if dnsName is null or empty or invalid + */ + DnsSrvHostProvider(final String dnsName, final long randomnessSeed) { + this(dnsName, randomnessSeed, new DefaultDnsResolver()); + } + + /** + * Constructs a DnsSrvHostProvider with the given DNS name, randomization seed and DNS resolver + * + * @param dnsName the DNS name to query for SRV records + * @param randomnessSeed seed for randomization + * @param dnsResolver custom DNS resolver + * @throws IllegalArgumentException if dnsName is null or empty or invalid + */ + public DnsSrvHostProvider(final String dnsName, final long randomnessSeed, final DnsResolver dnsResolver) { + if (StringUtils.isBlank(dnsName)) { + throw new IllegalArgumentException("DNS name cannot be null or empty"); + } + + this.dnsName = dnsName; + this.dnsResolver = dnsResolver; + this.refreshIntervalMs = validateRefreshInterval(); + this.dnsRefreshTimer = initializeDnsRefreshTimer(); + + init(randomnessSeed); + } + + @Override + public int size() { + return connectionManager.size(); + } + + @Override + public InetSocketAddress next(long spinDelay) { + applyPendingServerListUpdate(); + return connectionManager.next(spinDelay); + } + + @Override + public void onConnected() { + currentConnectedHost.set(connectionManager.getServerAtCurrentIndex()); + connectionManager.onConnected(); + } + + @Override + public boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost) { + return connectionManager.updateServerList(serverAddresses, currentHost); + } + + @Override + public void close() { + if (dnsRefreshTimer != null) { + dnsRefreshTimer.cancel(); + } + } + + @Override + public ConnectionType getSupportedConnectionType() { + return ConnectionType.DNS_SRV; + } + + + /** + * Validates and returns the refresh interval + * + * @return the validated refresh interval in milliseconds + */ + private long validateRefreshInterval() { + final long defaultInterval = ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS_DEFAULT; + + final long interval = Long.getLong(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS, defaultInterval); + if (interval < 0) { + LOG.warn("Invalid DNS SRV refresh interval {}, using default {}", interval, defaultInterval); + return defaultInterval; + } + if (interval == 0) { + LOG.info("DNS SRV refresh disabled (interval = 0) for {}", dnsName); + } + return interval; + } + + + /** + * Initializes the DNS refresh timer if refresh interval is greater than 0. + * + * @return the initialized Timer or null if refresh interval is 0 or negative + */ + private Timer initializeDnsRefreshTimer() { + if (refreshIntervalMs > 0) { + return new Timer("DnsSrvRefresh-" + dnsName, true); + } + return null; + } + + /** + * Initializes the DNS SRV host provider. + * + * @param randomnessSeed seed for randomization + * @throws IllegalArgumentException if no SRV records are found or initialization fails + */ + private void init(final long randomnessSeed) { + try { + final List serverAddresses = queryDnsSrvRecords(); + if (serverAddresses.isEmpty()) { + throw new IllegalArgumentException("No SRV records found for DNS name: " + dnsName); + } + + this.connectionManager = new HostConnectionManager(serverAddresses, randomnessSeed); + this.previousServerList = new ArrayList<>(serverAddresses); + this.latestServerList = new ArrayList<>(serverAddresses); + + setupBackgroundDnsRefreshTimer(); + + LOG.info("DnsSrvHostProvider initialized with {} servers from DNS name: {} with refresh interval: {} ms", + serverAddresses.size(), dnsName, refreshIntervalMs); + } catch (final Exception e) { + LOG.error("Failed to initialize DnsSrvHostProvider for DNS name: {}", dnsName, e); + throw new IllegalArgumentException("Failed to initialize DnsSrvHostProvider for DNS name: " + dnsName, e); + } + } + + /** + * Queries DNS SRV records and returns a list of server addresses. + * + * @return list of server addresses from SRV records, empty list on failure + */ + private List queryDnsSrvRecords() { + final List addresses = new ArrayList<>(); + + try { + final SRVRecord[] srvRecords = dnsResolver.lookupSrvRecords(dnsName); + if (srvRecords.length == 0) { + LOG.warn("No SRV records found for DNS name: {}", dnsName); + return addresses; + } + + for (final SRVRecord srvRecord : srvRecords) { + try { + final InetSocketAddress address = createAddressFromSrvRecord(srvRecord); + if (address != null) { + addresses.add(address); + } + } catch (final Exception e) { + LOG.warn("Failed to create address from SRV record {}: {}", srvRecord, e.getMessage()); + } + } + } catch (final Exception e) { + LOG.error("DNS SRV lookup failed for {}", dnsName, e); + } + return addresses; + } + + /** + * Creates an InetSocketAddress from an SRV record. + * + * @param srvRecord the SRV record to process + * @return InetSocketAddress or null if invalid + */ + private InetSocketAddress createAddressFromSrvRecord(final SRVRecord srvRecord) { + if (srvRecord == null) { + return null; + } + + try { + final String target = srvRecord.getTarget().toString(true); + final int port = srvRecord.getPort(); + + if (port <= 0 || port > 65535) { + LOG.warn("Invalid port {} in SRV record for target {}", port, target); + return null; + } + + if (StringUtils.isBlank(target)) { + LOG.warn("Empty or blank target in SRV record"); + return null; + } + + return new InetSocketAddress(target, port); + } catch (final Exception e) { + LOG.warn("Failed to create InetSocketAddress from SRV record {}: {}", srvRecord, e.getMessage()); + return null; + } + } + + /** + * Sets up the background DNS refresh timer task if needed. + */ + private void setupBackgroundDnsRefreshTimer() { + if (dnsRefreshTimer != null) { + dnsRefreshTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + refreshServerListInBackground(); + } + }, + refreshIntervalMs, + refreshIntervalMs); + } + } + + /** + * Performs background DNS refresh to detect server list changes without blocking connection attempts. + * + *

Important: This method only detects changes; the actual server list update + * is deferred until the next connection attempt via {@link #applyPendingServerListUpdate()}.

+ */ + private void refreshServerListInBackground() { + try { + // Refresh the server list + final List newAddresses = queryDnsSrvRecords(); + if (newAddresses.isEmpty()) { + LOG.warn("DNS SRV lookup returned no records for {}, will retry on next refresh", dnsName); + return; + } + + // Check if server list has changed + if (!Objects.equals(previousServerList, newAddresses)) { + latestServerList = new ArrayList<>(newAddresses); + serverListChanged.set(true); + LOG.info("New server list detected from DNS SRV: {} servers", newAddresses.size()); + } + } catch (final Exception e) { + LOG.warn("Failed to refresh server list from DNS SRV records for {}: {}", dnsName, e.getMessage()); + } + } + + /** + * Apply pending server list updates when connection is actually needed. + * It is called from next() to ensure server list changes are applied with proper connection context. + */ + private synchronized void applyPendingServerListUpdate() { + if (serverListChanged.get() && latestServerList != null) { + try { + final boolean needReconnect = connectionManager.updateServerList(latestServerList, currentConnectedHost.get()); + + previousServerList = new ArrayList<>(latestServerList); + serverListChanged.set(false); + + LOG.info("Applied server list update during connection attempt. servers size: {}, need reconnection: {}", + latestServerList.size(), needReconnect); + } catch (final Exception e) { + LOG.error("Failed to apply server list update", e); + } + } + } + + /** + * Default implementation of DnsResolver that uses the real DNS system. + */ + private static class DefaultDnsResolver implements DnsResolver { + @Override + public SRVRecord[] lookupSrvRecords(final String name) throws IOException { + final Lookup lookup = new Lookup(name, Type.SRV); + final Record[] records = lookup.run(); + + if (lookup.getResult() != Lookup.SUCCESSFUL) { + final String errorMsg = lookup.getErrorString(); + throw new IOException("DNS SRV lookup failed for " + name + ": " + errorMsg); + } + + if (records == null) { + return new SRVRecord[0]; + } + return Arrays.stream(records).map(SRVRecord.class::cast).toArray(SRVRecord[]::new); + } + } +} + diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java new file mode 100644 index 00000000000..1c9c5db5ced --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostConnectionManager.java @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages ZooKeeper server connections with round-robin selection and load balancing during + * cluster reconfiguration. + * + *

Key Features:

+ *
    + *
  • Round-robin server selection with IP address resolution
  • + *
  • Migrates clients to balance load when servers are added or removed
  • + *
  • Probabilistic client migration during server list updates
  • + *
+ * + *

Reconfiguration Behavior:

+ * When server list changes, enters "reconfigMode" and calculates migration probabilities + * to balance load between old and new servers. + * + *

See ZOOKEEPER-1355 + * for more details.

+ */ +@InterfaceAudience.Private +public final class HostConnectionManager { + + /** + * Interface for address resolution to support testing and different resolution strategies. + */ + public interface Resolver { + InetAddress[] getAllByName(String name) throws UnknownHostException; + } + + private static final Logger LOG = LoggerFactory.getLogger(HostConnectionManager.class); + + private ZKClientConfig clientConfig; + private List serverAddresses; + private Random sourceOfRandomness; + private int lastIndex = -1; + private int currentIndex = -1; + + /** + * The following fields are used to migrate clients during reconfiguration + */ + private boolean reconfigMode = false; + private final List oldServers = new ArrayList<>(5); + private final List newServers = new ArrayList<>(5); + private int currentIndexOld = -1; + private int currentIndexNew = -1; + private float pOld, pNew; + + private final Resolver resolver; + + /** + * Constructs a HostConnectionManager with default resolver. + * + * @param serverAddresses + * possibly unresolved ZooKeeper server addresses + * @param randomnessSeed + * a seed used to initialize sourceOfRandomness + * @throws IllegalArgumentException + * if serverAddresses is empty + */ + public HostConnectionManager(Collection serverAddresses, long randomnessSeed) { + this(serverAddresses, randomnessSeed, InetAddress::getAllByName, null); + } + + /** + * Constructs a HostConnectionManager with custom resolver. + * + * @param serverAddresses + * possibly unresolved ZooKeeper server addresses + * @param randomnessSeed + * a seed used to initialize sourceOfRandomness + * @param resolver + * custom resolver implementation + * @param clientConfig + * ZooKeeper client configuration + * @throws IllegalArgumentException + * if serverAddresses is empty + */ + public HostConnectionManager(Collection serverAddresses, + long randomnessSeed, + Resolver resolver, + ZKClientConfig clientConfig) { + this.clientConfig = clientConfig == null ? new ZKClientConfig() : clientConfig; + this.sourceOfRandomness = new Random(randomnessSeed); + this.resolver = resolver; + + if (serverAddresses.isEmpty()) { + throw new IllegalArgumentException("A HostProvider may not be empty!"); + } + + this.serverAddresses = shuffle(serverAddresses); + currentIndex = -1; + lastIndex = -1; + + LOG.info("HostConnectionManager initialized with {} servers", serverAddresses.size()); + } + + /** + * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false + * otherwise. Changing connections is necessary if one of the following holds: + * a) the host to which this client is currently connected is not in serverAddresses. + * Otherwise (if currentHost is in the new list serverAddresses): + * b) the number of servers in the cluster is increasing - in this case the load on currentHost should decrease, + * which means that SOME of the clients connected to it will migrate to the new servers. The decision whether + * this client migrates or not (i.e., whether true or false is returned) is probabilistic so that the expected + * number of clients connected to each server is the same. + * + * If true is returned, the function sets pOld and pNew that correspond to the probability to migrate to one of the + * new servers in serverAddresses or one of the old servers (migrating to one of the old servers is done only + * if our client's currentHost is not in serverAddresses). See nextHostInReconfigMode for the selection logic. + * + * See ZOOKEEPER-1355 + * for the protocol and its evaluation. + * + * @param serverAddresses + * new host list + * @param currentHost + * the host to which this client is currently connected + * @return true if changing connections is necessary for load-balancing, false otherwise + */ + synchronized boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost) { + List shuffledList = shuffle(serverAddresses); + if (shuffledList.isEmpty()) { + throw new IllegalArgumentException("The server list may not be empty!"); + } + + int oldSize = this.serverAddresses.size(); + int newSize = shuffledList.size(); + + // Check if client's current server is in the new list of servers + boolean myServerInNewConfig = false; + InetSocketAddress myServer = currentHost; + + // choose "current" server according to the client rebalancing algorithm + if (reconfigMode) { + myServer = next(0); + } + + // if the client is not currently connected to any server + if (myServer == null) { + // reconfigMode = false (next shouldn't return null). + if (lastIndex >= 0) { + // take the last server to which we were connected + myServer = this.serverAddresses.get(lastIndex); + } else { + // take the first server on the list + myServer = this.serverAddresses.get(0); + } + } + + for (InetSocketAddress addr : shuffledList) { + if (addr.getPort() == myServer.getPort() + && ((addr.getAddress() != null + && myServer.getAddress() != null + && addr.getAddress().equals(myServer.getAddress())) + || addr.getHostString().equals(myServer.getHostString()))) { + myServerInNewConfig = true; + break; + } + } + + reconfigMode = true; + + newServers.clear(); + oldServers.clear(); + // Divide the new servers into oldServers that were in the previous list + // and newServers that were not in the previous list + for (InetSocketAddress address : shuffledList) { + if (this.serverAddresses.contains(address)) { + oldServers.add(address); + } else { + newServers.add(address); + } + } + + int numOld = oldServers.size(); + int numNew = newServers.size(); + + // number of servers increased + if (numOld + numNew > this.serverAddresses.size()) { + if (myServerInNewConfig) { + // my server is in new config, but load should be decreased. + // Need to decide if this client + // is moving to one of the new servers + if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses.size()) / (numOld + numNew))) { + pNew = 1; + pOld = 0; + } else { + // do nothing special - stay with the current server + reconfigMode = false; + } + } else { + // my server is not in new config, and load on old servers must + // be decreased, so connect to + // one of the new servers + pNew = 1; + pOld = 0; + } + } else { // number of servers stayed the same or decreased + if (myServerInNewConfig) { + // my server is in new config, and load should be increased, so + // stay with this server and do nothing special + reconfigMode = false; + } else { + pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew)))) + / ((numOld + numNew) * (this.serverAddresses.size() - numOld)); + pNew = 1 - pOld; + } + } + + if (!reconfigMode) { + currentIndex = shuffledList.indexOf(getServerAtCurrentIndex()); + } else { + currentIndex = -1; + } + this.serverAddresses = shuffledList; + currentIndexOld = -1; + currentIndexNew = -1; + lastIndex = currentIndex; + + LOG.info("Server list updated: oldSize={}, newSize={}, reconfigMode={}", oldSize, newSize, reconfigMode); + + return reconfigMode; + } + + /** + * Get the next server to connect to. + * + * @param spinDelay milliseconds to wait if all hosts have been tried once + * @return the next server address to connect to + */ + InetSocketAddress next(long spinDelay) { + boolean needToSleep = false; + InetSocketAddress addr; + + synchronized (this) { + if (reconfigMode) { + addr = nextHostInReconfigMode(); + if (addr != null) { + currentIndex = serverAddresses.indexOf(addr); + return resolve(addr); + } + //tried all servers and couldn't connect + reconfigMode = false; + needToSleep = (spinDelay > 0); + } + ++currentIndex; + if (currentIndex == serverAddresses.size()) { + currentIndex = 0; + } + addr = serverAddresses.get(currentIndex); + needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0); + if (lastIndex == -1) { + // We don't want to sleep on the first ever connect attempt. + lastIndex = 0; + } + } + + if (needToSleep) { + try { + Thread.sleep(spinDelay); + } catch (InterruptedException e) { + LOG.warn("Unexpected exception", e); + } + } + + return resolve(addr); + } + + /** + * Notify that a connection has been established successfully. + */ + synchronized void onConnected() { + lastIndex = currentIndex; + reconfigMode = false; + } + + /** + * Get the server at the specified index. + * + * @param i the index + * @return the server address at the index, or null if index is out of bounds + */ + synchronized InetSocketAddress getServerAtIndex(int i) { + if (i < 0 || i >= serverAddresses.size()) { + return null; + } + return serverAddresses.get(i); + } + + /** + * Get the server at the current index. + * + * @return the server address at the current index + */ + synchronized InetSocketAddress getServerAtCurrentIndex() { + return getServerAtIndex(currentIndex); + } + + /** + * Get the number of servers in the list. + * + * @return the number of servers + */ + synchronized int size() { + return serverAddresses.size(); + } + + /** + * Resolves an InetSocketAddress to a concrete address. + * + * @param address + * the address to resolve + * @return resolved address or original address if resolution fails + */ + private InetSocketAddress resolve(InetSocketAddress address) { + try { + String curHostString = address.getHostString(); + List resolvedAddresses = new ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString))); + if (resolvedAddresses.isEmpty()) { + return address; + } + if (clientConfig.isShuffleDnsResponseEnabled()) { + Collections.shuffle(resolvedAddresses); + } + return new InetSocketAddress(resolvedAddresses.get(0), address.getPort()); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve address: {}", address, e); + return address; + } + } + + /** + * Shuffles the server addresses using the internal random source. + * + * @param serverAddresses + * collection of server addresses to shuffle + * @return shuffled list of server addresses + */ + private List shuffle(Collection serverAddresses) { + List tmpList = new ArrayList<>(serverAddresses.size()); + tmpList.addAll(serverAddresses); + Collections.shuffle(tmpList, sourceOfRandomness); + return tmpList; + } + + /** + * Get the next server to connect to, when in "reconfigMode", which means that + * you've just updated the server list, and now trying to find some server to connect to. + * Once onConnected() is called, reconfigMode is set to false. Similarly, if we tried to connect + * to all servers in new config and failed, reconfigMode is set to false. + * + * While in reconfigMode, we should connect to a server in newServers with probability pNew and to servers in + * oldServers with probability pOld (which is just 1-pNew). If we tried out all servers in either oldServers + * or newServers we continue to try servers from the other set, regardless of pNew or pOld. If we tried all servers + * we give up and go back to the normal round robin mode + * + * When called, this should be protected by synchronized(this) + */ + private InetSocketAddress nextHostInReconfigMode() { + boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew); + + // take one of the new servers if it is possible (there are still such + // servers we didn't try), + // and either the probability tells us to connect to one of the new + // servers or if we already + // tried all the old servers + if (((currentIndexNew + 1) < newServers.size()) && (takeNew || (currentIndexOld + 1) >= oldServers.size())) { + ++currentIndexNew; + return newServers.get(currentIndexNew); + } + + // start taking old servers + if ((currentIndexOld + 1) < oldServers.size()) { + ++currentIndexOld; + return oldServers.get(currentIndexOld); + } + + return null; + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java index 73a102f1a8c..f8ceffbdc40 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/HostProvider.java @@ -43,7 +43,7 @@ * * A HostProvider that prefers nearby hosts. */ @InterfaceAudience.Public -public interface HostProvider { +public interface HostProvider extends AutoCloseable { int size(); @@ -72,4 +72,21 @@ public interface HostProvider { */ boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost); + /** + * Returns the connection type that this HostProvider supports. + * Default implementation returns HOST_PORT for backward compatibility. + * + * @return the ConnectionType supported by this provider + */ + default ConnectionType getSupportedConnectionType() { + return ConnectionType.HOST_PORT; + } + + /** + * Close the HostProvider and release any resources. + * Default implementation does nothing for backward compatibility. + */ + @Override + default void close() { + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java index e07754c3558..e5805475d63 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/StaticHostProvider.java @@ -21,15 +21,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Random; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Most simple HostProvider, resolves on every next() call. @@ -38,6 +31,7 @@ * present across the stack like in JVM, OS level, hardware, etc. The best we could do here is to get the most recent * address from the underlying system which is considered up-to-date. * + * It uses HostConnectionManager for all connection management and reconfiguration logic. */ @InterfaceAudience.Public public final class StaticHostProvider implements HostProvider { @@ -48,32 +42,7 @@ public interface Resolver { } - private static final Logger LOG = LoggerFactory.getLogger(StaticHostProvider.class); - - private ZKClientConfig clientConfig = null; - - private List serverAddresses = new ArrayList<>(5); - - private Random sourceOfRandomness; - private int lastIndex = -1; - - private int currentIndex = -1; - - /** - * The following fields are used to migrate clients during reconfiguration - */ - private boolean reconfigMode = false; - - private final List oldServers = new ArrayList<>(5); - - private final List newServers = new ArrayList<>(5); - - private int currentIndexOld = -1; - private int currentIndexNew = -1; - - private float pOld, pNew; - - private Resolver resolver; + private final HostConnectionManager connectionManager; /** * Constructs a SimpleHostSet. @@ -84,12 +53,7 @@ public interface Resolver { * if serverAddresses is empty or resolves to an empty list */ public StaticHostProvider(Collection serverAddresses) { - init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() { - @Override - public InetAddress[] getAllByName(String name) throws UnknownHostException { - return InetAddress.getAllByName(name); - } - }); + connectionManager = new HostConnectionManager(serverAddresses, System.currentTimeMillis() ^ this.hashCode()); } /** @@ -104,12 +68,10 @@ public InetAddress[] getAllByName(String name) throws UnknownHostException { * ZooKeeper client configuration */ public StaticHostProvider(Collection serverAddresses, ZKClientConfig clientConfig) { - init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), new Resolver() { - @Override - public InetAddress[] getAllByName(String name) throws UnknownHostException { - return InetAddress.getAllByName(name); - } - }, clientConfig); + connectionManager = new HostConnectionManager(serverAddresses, + System.currentTimeMillis() ^ this.hashCode(), + InetAddress::getAllByName, + clientConfig); } /** @@ -124,7 +86,10 @@ public InetAddress[] getAllByName(String name) throws UnknownHostException { * custom resolver implementation */ public StaticHostProvider(Collection serverAddresses, Resolver resolver) { - init(serverAddresses, System.currentTimeMillis() ^ this.hashCode(), resolver); + this.connectionManager = new HostConnectionManager(serverAddresses, + System.currentTimeMillis() ^ this.hashCode(), + resolver::getAllByName, + null); } /** @@ -133,274 +98,43 @@ public StaticHostProvider(Collection serverAddresses, Resolve * * @param serverAddresses * possibly unresolved ZooKeeper server addresses - * @param randomnessSeed a seed used to initialize sourceOfRandomnes + * @param randomnessSeed a seed used to initialize sourceOfRandomness * @throws IllegalArgumentException * if serverAddresses is empty or resolves to an empty list */ public StaticHostProvider(Collection serverAddresses, long randomnessSeed) { - init(serverAddresses, randomnessSeed, new Resolver() { - @Override - public InetAddress[] getAllByName(String name) throws UnknownHostException { - return InetAddress.getAllByName(name); - } - }); + this.connectionManager = new HostConnectionManager(serverAddresses, + randomnessSeed, + InetAddress::getAllByName, + null); } - private void init(Collection serverAddresses, long randomnessSeed, Resolver resolver) { - init(serverAddresses, randomnessSeed, resolver, null); - } - - private void init(Collection serverAddresses, long randomnessSeed, Resolver resolver, - ZKClientConfig clientConfig) { - this.clientConfig = clientConfig == null ? new ZKClientConfig() : clientConfig; - this.sourceOfRandomness = new Random(randomnessSeed); - this.resolver = resolver; - if (serverAddresses.isEmpty()) { - throw new IllegalArgumentException("A HostProvider may not be empty!"); - } - this.serverAddresses = shuffle(serverAddresses); - currentIndex = -1; - lastIndex = -1; - } - - private InetSocketAddress resolve(InetSocketAddress address) { - try { - String curHostString = address.getHostString(); - List resolvedAddresses = new ArrayList<>(Arrays.asList(this.resolver.getAllByName(curHostString))); - if (resolvedAddresses.isEmpty()) { - return address; - } - if (clientConfig.isShuffleDnsResponseEnabled()) { - Collections.shuffle(resolvedAddresses); - } - return new InetSocketAddress(resolvedAddresses.get(0), address.getPort()); - } catch (UnknownHostException e) { - LOG.error("Unable to resolve address: {}", address.toString(), e); - return address; - } - } - - private List shuffle(Collection serverAddresses) { - List tmpList = new ArrayList<>(serverAddresses.size()); - tmpList.addAll(serverAddresses); - Collections.shuffle(tmpList, sourceOfRandomness); - return tmpList; - } - - /** - * Update the list of servers. This returns true if changing connections is necessary for load-balancing, false - * otherwise. Changing connections is necessary if one of the following holds: - * a) the host to which this client is currently connected is not in serverAddresses. - * Otherwise (if currentHost is in the new list serverAddresses): - * b) the number of servers in the cluster is increasing - in this case the load on currentHost should decrease, - * which means that SOME of the clients connected to it will migrate to the new servers. The decision whether - * this client migrates or not (i.e., whether true or false is returned) is probabilistic so that the expected - * number of clients connected to each server is the same. - * - * If true is returned, the function sets pOld and pNew that correspond to the probability to migrate to ones of the - * new servers in serverAddresses or one of the old servers (migrating to one of the old servers is done only - * if our client's currentHost is not in serverAddresses). See nextHostInReconfigMode for the selection logic. - * - * See ZOOKEEPER-1355 - * for the protocol and its evaluation, and StaticHostProviderTest for the tests that illustrate how load balancing - * works with this policy. - * - * @param serverAddresses new host list - * @param currentHost the host to which this client is currently connected - * @return true if changing connections is necessary for load-balancing, false otherwise - */ @Override - public synchronized boolean updateServerList( - Collection serverAddresses, - InetSocketAddress currentHost) { - List shuffledList = shuffle(serverAddresses); - if (shuffledList.isEmpty()) { - throw new IllegalArgumentException("A HostProvider may not be empty!"); - } - // Check if client's current server is in the new list of servers - boolean myServerInNewConfig = false; - - InetSocketAddress myServer = currentHost; - - // choose "current" server according to the client rebalancing algorithm - if (reconfigMode) { - myServer = next(0); - } - - // if the client is not currently connected to any server - if (myServer == null) { - // reconfigMode = false (next shouldn't return null). - if (lastIndex >= 0) { - // take the last server to which we were connected - myServer = this.serverAddresses.get(lastIndex); - } else { - // take the first server on the list - myServer = this.serverAddresses.get(0); - } - } - - for (InetSocketAddress addr : shuffledList) { - if (addr.getPort() == myServer.getPort() - && ((addr.getAddress() != null - && myServer.getAddress() != null - && addr.getAddress().equals(myServer.getAddress())) - || addr.getHostString().equals(myServer.getHostString()))) { - myServerInNewConfig = true; - break; - } - } - - reconfigMode = true; - - newServers.clear(); - oldServers.clear(); - // Divide the new servers into oldServers that were in the previous list - // and newServers that were not in the previous list - for (InetSocketAddress address : shuffledList) { - if (this.serverAddresses.contains(address)) { - oldServers.add(address); - } else { - newServers.add(address); - } - } - - int numOld = oldServers.size(); - int numNew = newServers.size(); - - // number of servers increased - if (numOld + numNew > this.serverAddresses.size()) { - if (myServerInNewConfig) { - // my server is in new config, but load should be decreased. - // Need to decide if this client - // is moving to one of the new servers - if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses.size()) / (numOld + numNew))) { - pNew = 1; - pOld = 0; - } else { - // do nothing special - stay with the current server - reconfigMode = false; - } - } else { - // my server is not in new config, and load on old servers must - // be decreased, so connect to - // one of the new servers - pNew = 1; - pOld = 0; - } - } else { // number of servers stayed the same or decreased - if (myServerInNewConfig) { - // my server is in new config, and load should be increased, so - // stay with this server and do nothing special - reconfigMode = false; - } else { - pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew)))) - / ((numOld + numNew) * (this.serverAddresses.size() - numOld)); - pNew = 1 - pOld; - } - } - - if (!reconfigMode) { - currentIndex = shuffledList.indexOf(getServerAtCurrentIndex()); - } else { - currentIndex = -1; - } - this.serverAddresses = shuffledList; - currentIndexOld = -1; - currentIndexNew = -1; - lastIndex = currentIndex; - return reconfigMode; - } - - public synchronized InetSocketAddress getServerAtIndex(int i) { - if (i < 0 || i >= serverAddresses.size()) { - return null; - } - return serverAddresses.get(i); + public boolean updateServerList(Collection serverAddresses, InetSocketAddress currentHost) { + return connectionManager.updateServerList(serverAddresses, currentHost); } - public synchronized InetSocketAddress getServerAtCurrentIndex() { - return getServerAtIndex(currentIndex); + @Override + public int size() { + return connectionManager.size(); } - public synchronized int size() { - return serverAddresses.size(); + @Override + public InetSocketAddress next(long spinDelay) { + return connectionManager.next(spinDelay); } - /** - * Get the next server to connect to, when in "reconfigMode", which means that - * you've just updated the server list, and now trying to find some server to connect to. - * Once onConnected() is called, reconfigMode is set to false. Similarly, if we tried to connect - * to all servers in new config and failed, reconfigMode is set to false. - * - * While in reconfigMode, we should connect to a server in newServers with probability pNew and to servers in - * oldServers with probability pOld (which is just 1-pNew). If we tried out all servers in either oldServers - * or newServers we continue to try servers from the other set, regardless of pNew or pOld. If we tried all servers - * we give up and go back to the normal round robin mode - * - * When called, this should be protected by synchronized(this) - */ - private InetSocketAddress nextHostInReconfigMode() { - boolean takeNew = (sourceOfRandomness.nextFloat() <= pNew); - - // take one of the new servers if it is possible (there are still such - // servers we didn't try), - // and either the probability tells us to connect to one of the new - // servers or if we already - // tried all the old servers - if (((currentIndexNew + 1) < newServers.size()) && (takeNew || (currentIndexOld + 1) >= oldServers.size())) { - ++currentIndexNew; - return newServers.get(currentIndexNew); - } - - // start taking old servers - if ((currentIndexOld + 1) < oldServers.size()) { - ++currentIndexOld; - return oldServers.get(currentIndexOld); - } - - return null; + @Override + public void onConnected() { + connectionManager.onConnected(); } - public InetSocketAddress next(long spinDelay) { - boolean needToSleep = false; - InetSocketAddress addr; - - synchronized (this) { - if (reconfigMode) { - addr = nextHostInReconfigMode(); - if (addr != null) { - currentIndex = serverAddresses.indexOf(addr); - return resolve(addr); - } - //tried all servers and couldn't connect - reconfigMode = false; - needToSleep = (spinDelay > 0); - } - ++currentIndex; - if (currentIndex == serverAddresses.size()) { - currentIndex = 0; - } - addr = serverAddresses.get(currentIndex); - needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0); - if (lastIndex == -1) { - // We don't want to sleep on the first ever connect attempt. - lastIndex = 0; - } - } - if (needToSleep) { - try { - Thread.sleep(spinDelay); - } catch (InterruptedException e) { - LOG.warn("Unexpected exception", e); - } - } - return resolve(addr); + public InetSocketAddress getServerAtIndex(int i) { + return connectionManager.getServerAtIndex(i); } - public synchronized void onConnected() { - lastIndex = currentIndex; - reconfigMode = false; + public InetSocketAddress getServerAtCurrentIndex() { + return connectionManager.getServerAtCurrentIndex(); } - } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java index 5472838bdef..adc9de6ff91 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZKClientConfig.java @@ -56,6 +56,7 @@ public class ZKClientConfig extends ZKConfig { public static final int CLIENT_MAX_PACKET_LENGTH_DEFAULT = 0xfffff; /* 1 MB */ public static final String ZOOKEEPER_REQUEST_TIMEOUT = "zookeeper.request.timeout"; public static final String ZOOKEEPER_SERVER_PRINCIPAL = "zookeeper.server.principal"; + /** * Feature is disabled by default. */ @@ -70,6 +71,14 @@ public class ZKClientConfig extends ZKConfig { public static final String ZOOKEEPER_SHUFFLE_DNS_RESPONSE = "zookeeper.shuffleDnsResponse"; public static final boolean ZOOKEEPER_SHUFFLE_DNS_RESPONSE_DEFAULT = false; + /** + * DNS SRV refresh interval in milliseconds for DnsSrvHostProvider. + * Controls how frequently DNS SRV records are queried to update the server list. + * A value of 0 disables periodic refresh. + */ + public static final String DNS_SRV_REFRESH_INTERVAL_MS = "zookeeper.hostProvider.dnsSrvRefreshIntervalMs"; + public static final long DNS_SRV_REFRESH_INTERVAL_MS_DEFAULT = 60000; + public ZKClientConfig() { super(); initFromJavaSystemProperties(); @@ -130,6 +139,7 @@ protected void handleBackwardCompatibility() { setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET)); setProperty(SECURE_CLIENT, System.getProperty(SECURE_CLIENT)); setProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS, System.getProperty(ZK_SASL_CLIENT_ALLOW_REVERSE_DNS)); + setProperty(DNS_SRV_REFRESH_INTERVAL_MS, System.getProperty(DNS_SRV_REFRESH_INTERVAL_MS)); } /** diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java new file mode 100644 index 00000000000..478ce9f5764 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/client/DnsSrvHostProviderTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.client; + + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import java.net.InetSocketAddress; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.xbill.DNS.Name; +import org.xbill.DNS.SRVRecord; + +public class DnsSrvHostProviderTest { + + private static final String TEST_DNS_NAME = "_zookeeper._tcp.example.com."; + private static final long TEST_SEED = 12345L; + + private DnsSrvHostProvider.DnsResolver mockDnsResolver; + + @BeforeEach + public void setUp() { + mockDnsResolver = mock(DnsSrvHostProvider.DnsResolver.class); + } + + @AfterEach + public void tearDown() { + System.clearProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS); + } + + @Test + public void testBasic() throws Exception { + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)) { + assertEquals(3, hostProvider.size()); + assertNotNull(hostProvider.next(0)); + } + } + + @Test + public void testServerIteration() throws Exception { + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)) { + final InetSocketAddress addr1 = hostProvider.next(0); + final InetSocketAddress addr2 = hostProvider.next(0); + final InetSocketAddress addr3 = hostProvider.next(0); + + assertNotNull(addr1); + assertNotNull(addr2); + assertNotNull(addr3); + + // cycle back to first server + final InetSocketAddress addr4 = hostProvider.next(0); + assertNotNull(addr4); + } + } + + @Test + public void testEmptyDnsName() { + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider("", TEST_SEED, mockDnsResolver)); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(null, TEST_SEED, mockDnsResolver)); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(" ", TEST_SEED, mockDnsResolver)); + } + + @Test + public void testNoSrvRecords() throws Exception { + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(new SRVRecord[0]); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)); + } + + @Test + public void testDnsLookupFailure() throws Exception { + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)) + .thenThrow(new java.io.IOException("DNS lookup failed")); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)); + } + + @Test + public void testInvalidPortFiltering() throws Exception { + // Create SRV record with invalid port (0) + final SRVRecord invalidPortRecord = createMockSrvRecord("server1.example.com.", 0); + final SRVRecord[] srvRecords = new SRVRecord[]{invalidPortRecord}; + + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + assertThrows(IllegalArgumentException.class, + () -> new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)); + } + + @Test + public void testTrailingDotRemoval() throws Exception { + final SRVRecord recordWithDot = createMockSrvRecord("server1.example.com.", 2181); + final SRVRecord[] srvRecords = new SRVRecord[]{recordWithDot}; + + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)) { + assertEquals(1, hostProvider.size()); + final InetSocketAddress addr = hostProvider.next(0); + + // validate trailing dot is removed + assertEquals("server1.example.com", addr.getHostString()); + } + } + + @Test + public void testRefreshIntervalZeroDisablesPeriodicRefresh() throws Exception { + // Set system property to disable refresh + System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS, "0"); + + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)) { + // Verify initial setup works + assertEquals(3, hostProvider.size()); + + // Wait to ensure no background refresh occurs + Thread.sleep(100); + + // Verify DNS resolver was only called once during initialization (no periodic refresh) + verify(mockDnsResolver, times(1)).lookupSrvRecords(TEST_DNS_NAME); + + // Verify host provider still works normally + assertNotNull(hostProvider.next(0)); + + // Test multiple next() calls to ensure functionality is not affected + for (int i = 0; i < 5; i++) { + assertNotNull(hostProvider.next(0)); + } + + // Verify no additional DNS calls were made + verify(mockDnsResolver, times(1)).lookupSrvRecords(TEST_DNS_NAME); + } + } + + @Test + public void testRefreshIntervalNegativeUsesDefault() throws Exception { + // Set system property to negative value (should use default) + System.setProperty(ZKClientConfig.DNS_SRV_REFRESH_INTERVAL_MS, "-1"); + + final SRVRecord[] srvRecords = createMockSrvRecords(); + when(mockDnsResolver.lookupSrvRecords(TEST_DNS_NAME)).thenReturn(srvRecords); + + try (final DnsSrvHostProvider hostProvider = new DnsSrvHostProvider(TEST_DNS_NAME, TEST_SEED, mockDnsResolver)) { + // Verify initial setup works (negative value should be handled gracefully) + assertEquals(3, hostProvider.size()); + assertNotNull(hostProvider.next(0)); + } + } + + private SRVRecord[] createMockSrvRecords() { + return new SRVRecord[]{ + createMockSrvRecord("server1.example.com.", 2181), + createMockSrvRecord("server2.example.com.", 2181), + createMockSrvRecord("server3.example.com.", 2181) + }; + } + + private SRVRecord createMockSrvRecord(final String target, int port) { + try { + Name targetName = Name.fromString(target); + Name serviceName = Name.fromString(TEST_DNS_NAME); + return new SRVRecord(serviceName, 1, 300, 1, 1, port, targetName); + } catch (Exception e) { + throw new RuntimeException("Failed to create mock SRV record", e); + } + } +}