Skip to content

Commit e47748a

Browse files
xds: ClusterResolverLoadBalancer handle update for both resolved addresses and errors via ResolutionResult (v1.71.x backport) (#12005)
Backport of #11997 to v1.71.x. ------------ Fixes #11995.
1 parent 1435323 commit e47748a

File tree

2 files changed

+138
-79
lines changed

2 files changed

+138
-79
lines changed

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.grpc.NameResolver;
3333
import io.grpc.NameResolver.ResolutionResult;
3434
import io.grpc.Status;
35+
import io.grpc.StatusOr;
3536
import io.grpc.SynchronizationContext;
3637
import io.grpc.SynchronizationContext.ScheduledHandle;
3738
import io.grpc.internal.BackoffPolicy;
@@ -615,79 +616,84 @@ private class NameResolverListener extends NameResolver.Listener2 {
615616

616617
@Override
617618
public void onResult(final ResolutionResult resolutionResult) {
618-
class NameResolved implements Runnable {
619-
@Override
620-
public void run() {
621-
if (shutdown) {
622-
return;
623-
}
624-
backoffPolicy = null; // reset backoff sequence if succeeded
625-
// Arbitrary priority notation for all DNS-resolved endpoints.
626-
String priorityName = priorityName(name, 0); // value doesn't matter
627-
List<EquivalentAddressGroup> addresses = new ArrayList<>();
628-
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
629-
// No weight attribute is attached, all endpoint-level LB policy should be able
630-
// to handle such it.
631-
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
632-
Attributes attr = eag.getAttributes().toBuilder()
633-
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
634-
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
635-
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
636-
.build();
637-
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
638-
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
639-
addresses.add(eag);
640-
}
641-
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
642-
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
643-
lbRegistry, Collections.<DropOverload>emptyList());
644-
status = Status.OK;
645-
resolved = true;
646-
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
647-
handleEndpointResourceUpdate();
619+
syncContext.execute(() -> onResult2(resolutionResult));
620+
}
621+
622+
@Override
623+
public Status onResult2(final ResolutionResult resolutionResult) {
624+
if (shutdown) {
625+
return Status.OK;
626+
}
627+
// Arbitrary priority notation for all DNS-resolved endpoints.
628+
String priorityName = priorityName(name, 0); // value doesn't matter
629+
List<EquivalentAddressGroup> addresses = new ArrayList<>();
630+
StatusOr<List<EquivalentAddressGroup>> addressesOrError =
631+
resolutionResult.getAddressesOrError();
632+
if (addressesOrError.hasValue()) {
633+
backoffPolicy = null; // reset backoff sequence if succeeded
634+
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
635+
// No weight attribute is attached, all endpoint-level LB policy should be able
636+
// to handle such it.
637+
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
638+
Attributes attr = eag.getAttributes().toBuilder()
639+
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
640+
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
641+
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
642+
.build();
643+
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
644+
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
645+
addresses.add(eag);
648646
}
647+
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
648+
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
649+
lbRegistry, Collections.<DropOverload>emptyList());
650+
status = Status.OK;
651+
resolved = true;
652+
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
653+
handleEndpointResourceUpdate();
654+
return Status.OK;
655+
} else {
656+
handleErrorInSyncContext(addressesOrError.getStatus());
657+
return addressesOrError.getStatus();
649658
}
650-
651-
syncContext.execute(new NameResolved());
652659
}
653660

654661
@Override
655662
public void onError(final Status error) {
656-
syncContext.execute(new Runnable() {
657-
@Override
658-
public void run() {
659-
if (shutdown) {
660-
return;
661-
}
662-
status = error;
663-
// NameResolver.Listener API cannot distinguish between address-not-found and
664-
// transient errors. If the error occurs in the first resolution, treat it as
665-
// address not found. Otherwise, either there is previously resolved addresses
666-
// previously encountered error, propagate the error to downstream/upstream and
667-
// let downstream/upstream handle it.
668-
if (!resolved) {
669-
resolved = true;
670-
handleEndpointResourceUpdate();
671-
} else {
672-
handleEndpointResolutionError();
673-
}
674-
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
675-
return;
676-
}
677-
if (backoffPolicy == null) {
678-
backoffPolicy = backoffPolicyProvider.get();
679-
}
680-
long delayNanos = backoffPolicy.nextBackoffNanos();
681-
logger.log(XdsLogLevel.DEBUG,
663+
syncContext.execute(() -> handleErrorInSyncContext(error));
664+
}
665+
666+
private void handleErrorInSyncContext(final Status error) {
667+
if (shutdown) {
668+
return;
669+
}
670+
status = error;
671+
// NameResolver.Listener API cannot distinguish between address-not-found and
672+
// transient errors. If the error occurs in the first resolution, treat it as
673+
// address not found. Otherwise, either there is previously resolved addresses
674+
// previously encountered error, propagate the error to downstream/upstream and
675+
// let downstream/upstream handle it.
676+
if (!resolved) {
677+
resolved = true;
678+
handleEndpointResourceUpdate();
679+
} else {
680+
handleEndpointResolutionError();
681+
}
682+
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
683+
return;
684+
}
685+
if (backoffPolicy == null) {
686+
backoffPolicy = backoffPolicyProvider.get();
687+
}
688+
long delayNanos = backoffPolicy.nextBackoffNanos();
689+
logger.log(XdsLogLevel.DEBUG,
682690
"Logical DNS resolver for cluster {0} encountered name resolution "
683-
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
691+
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
684692
name, error, delayNanos);
685-
scheduledRefresh =
693+
scheduledRefresh =
686694
syncContext.schedule(
687-
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
688-
timeService);
689-
}
690-
});
695+
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
696+
timeService);
691697
}
692698
}
693699
}

xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ public XdsClient returnObject(Object object) {
198198
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
199199
private int xdsClientRefs;
200200
private ClusterResolverLoadBalancer loadBalancer;
201+
private NameResolverProvider fakeNameResolverProvider;
201202

202203
@Before
203204
public void setUp() throws URISyntaxException {
@@ -214,7 +215,8 @@ public void setUp() throws URISyntaxException {
214215
.setServiceConfigParser(mock(ServiceConfigParser.class))
215216
.setChannelLogger(mock(ChannelLogger.class))
216217
.build();
217-
nsRegistry.register(new FakeNameResolverProvider());
218+
fakeNameResolverProvider = new FakeNameResolverProvider(false);
219+
nsRegistry.register(fakeNameResolverProvider);
218220
when(helper.getNameResolverRegistry()).thenReturn(nsRegistry);
219221
when(helper.getNameResolverArgs()).thenReturn(args);
220222
when(helper.getSynchronizationContext()).thenReturn(syncContext);
@@ -715,6 +717,17 @@ public void handleEdsResource_noHealthyEndpoint() {
715717

716718
@Test
717719
public void onlyLogicalDnsCluster_endpointsResolved() {
720+
do_onlyLogicalDnsCluster_endpointsResolved();
721+
}
722+
723+
@Test
724+
public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() {
725+
nsRegistry.deregister(fakeNameResolverProvider);
726+
nsRegistry.register(new FakeNameResolverProvider(true));
727+
do_onlyLogicalDnsCluster_endpointsResolved();
728+
}
729+
730+
void do_onlyLogicalDnsCluster_endpointsResolved() {
718731
ClusterResolverConfig config = new ClusterResolverConfig(
719732
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
720733
deliverLbConfig(config);
@@ -743,7 +756,6 @@ public void onlyLogicalDnsCluster_endpointsResolved() {
743756
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
744757
assertThat(childBalancer.addresses.get(1).getAttributes()
745758
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
746-
747759
}
748760

749761
@Test
@@ -763,37 +775,48 @@ public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
763775
}
764776

765777
@Test
766-
public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
778+
public void resolutionError_backoffAndRefresh() {
779+
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
780+
}
781+
782+
@Test
783+
public void oldListenerCallback_resolutionError_backoffAndRefresh() {
784+
nsRegistry.deregister(fakeNameResolverProvider);
785+
nsRegistry.register(new FakeNameResolverProvider(true));
786+
do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh();
787+
}
788+
789+
void do_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
767790
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
768-
backoffPolicy1, backoffPolicy2);
791+
backoffPolicy1, backoffPolicy2);
769792
ClusterResolverConfig config = new ClusterResolverConfig(
770-
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
793+
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin);
771794
deliverLbConfig(config);
772795
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
773796
assertThat(childBalancers).isEmpty();
774797
Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server");
775798
resolver.deliverError(error);
776799
inOrder.verify(helper).updateBalancingState(
777-
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
800+
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
778801
assertPicker(pickerCaptor.getValue(), error, null);
779802
assertThat(resolver.refreshCount).isEqualTo(0);
780803
inOrder.verify(backoffPolicyProvider).get();
781804
inOrder.verify(backoffPolicy1).nextBackoffNanos();
782805
assertThat(fakeClock.getPendingTasks()).hasSize(1);
783806
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
784-
.isEqualTo(1L);
807+
.isEqualTo(1L);
785808
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
786809
assertThat(resolver.refreshCount).isEqualTo(1);
787810

788811
error = Status.UNKNOWN.withDescription("I am lost");
789812
resolver.deliverError(error);
790813
inOrder.verify(helper).updateBalancingState(
791-
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
814+
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
792815
inOrder.verify(backoffPolicy1).nextBackoffNanos();
793816
assertPicker(pickerCaptor.getValue(), error, null);
794817
assertThat(fakeClock.getPendingTasks()).hasSize(1);
795818
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
796-
.isEqualTo(10L);
819+
.isEqualTo(10L);
797820
fakeClock.forwardTime(10L, TimeUnit.SECONDS);
798821
assertThat(resolver.refreshCount).isEqualTo(2);
799822

@@ -803,7 +826,7 @@ public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
803826
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
804827
assertThat(childBalancers).hasSize(1);
805828
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2),
806-
Iterables.getOnlyElement(childBalancers).addresses);
829+
Iterables.getOnlyElement(childBalancers).addresses);
807830

808831
assertThat(fakeClock.getPendingTasks()).isEmpty();
809832
inOrder.verifyNoMoreInteractions();
@@ -1204,10 +1227,18 @@ void deliverError(Status error) {
12041227
}
12051228

12061229
private class FakeNameResolverProvider extends NameResolverProvider {
1230+
private final boolean useOldListenerCallback;
1231+
1232+
private FakeNameResolverProvider(boolean useOldListenerCallback) {
1233+
this.useOldListenerCallback = useOldListenerCallback;
1234+
}
1235+
12071236
@Override
12081237
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
12091238
assertThat(targetUri.getScheme()).isEqualTo("dns");
1210-
FakeNameResolver resolver = new FakeNameResolver(targetUri);
1239+
FakeNameResolver resolver = useOldListenerCallback
1240+
? new FakeNameResolverUsingOldListenerCallback(targetUri)
1241+
: new FakeNameResolver(targetUri);
12111242
resolvers.add(resolver);
12121243
return resolver;
12131244
}
@@ -1228,9 +1259,10 @@ protected int priority() {
12281259
}
12291260
}
12301261

1262+
12311263
private class FakeNameResolver extends NameResolver {
12321264
private final URI targetUri;
1233-
private Listener2 listener;
1265+
protected Listener2 listener;
12341266
private int refreshCount;
12351267

12361268
private FakeNameResolver(URI targetUri) {
@@ -1257,12 +1289,33 @@ public void shutdown() {
12571289
resolvers.remove(this);
12581290
}
12591291

1260-
private void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
1292+
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
1293+
syncContext.execute(() -> {
1294+
Status ret = listener.onResult2(ResolutionResult.newBuilder()
1295+
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
1296+
assertThat(ret.getCode()).isEqualTo(Status.Code.OK);
1297+
});
1298+
}
1299+
1300+
protected void deliverError(Status error) {
1301+
syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder()
1302+
.setAddressesOrError(StatusOr.fromStatus(error)).build()));
1303+
}
1304+
}
1305+
1306+
private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver {
1307+
private FakeNameResolverUsingOldListenerCallback(URI targetUri) {
1308+
super(targetUri);
1309+
}
1310+
1311+
@Override
1312+
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
12611313
listener.onResult(ResolutionResult.newBuilder()
1262-
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
1314+
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
12631315
}
12641316

1265-
private void deliverError(Status error) {
1317+
@Override
1318+
protected void deliverError(Status error) {
12661319
listener.onError(error);
12671320
}
12681321
}

0 commit comments

Comments
 (0)