Skip to content

Commit f1adc5d

Browse files
feat(flagd): Adjust to disable-sync-metadata toggle in flagd (#1549)
Signed-off-by: Konvalinka <[email protected]>
1 parent 484b070 commit f1adc5d

File tree

8 files changed

+67
-38
lines changed

8 files changed

+67
-38
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure;
44

5+
import com.google.protobuf.Struct;
56
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
67
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
78
import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult;
89
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
910
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
10-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
1111
import dev.openfeature.sdk.ImmutableStructure;
1212
import dev.openfeature.sdk.Structure;
1313
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -114,7 +114,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
114114
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
115115
Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();
116116

117-
Structure metadata = parseSyncMetadata(payload.getMetadataResponse());
117+
Structure syncContext = parseSyncContext(payload.getSyncContext());
118118
writeLock.lock();
119119
try {
120120
changedFlagsKeys = getChangedFlagsKeys(flagMap);
@@ -126,7 +126,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
126126
writeLock.unlock();
127127
}
128128
if (!stateBlockingQueue.offer(
129-
new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) {
129+
new StorageStateChange(StorageState.OK, changedFlagsKeys, syncContext))) {
130130
log.warn("Failed to convey OK status, queue is full");
131131
}
132132
} catch (Throwable e) {
@@ -150,11 +150,13 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
150150
log.info("Shutting down store stream listener");
151151
}
152152

153-
private Structure parseSyncMetadata(GetMetadataResponse metadataResponse) {
154-
try {
155-
return convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap());
156-
} catch (Exception exception) {
157-
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
153+
private Structure parseSyncContext(Struct syncContext) {
154+
if (syncContext != null) {
155+
try {
156+
return convertProtobufMapToStructure(syncContext.getFieldsMap());
157+
} catch (Exception exception) {
158+
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
159+
}
158160
}
159161
return new ImmutableStructure();
160162
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector;
22

3-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
3+
import com.google.protobuf.Struct;
44
import lombok.AllArgsConstructor;
55
import lombok.Getter;
66

@@ -10,9 +10,9 @@
1010
public class QueuePayload {
1111
private final QueuePayloadType type;
1212
private final String flagData;
13-
private final GetMetadataResponse metadataResponse;
13+
private final Struct syncContext;
1414

1515
public QueuePayload(QueuePayloadType type, String flagData) {
16-
this(type, flagData, GetMetadataResponse.getDefaultInstance());
16+
this(type, flagData, null);
1717
}
1818
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
22

3+
import com.google.protobuf.Struct;
34
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
45
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
56
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
@@ -123,7 +124,7 @@ private void observeSyncStream() throws InterruptedException {
123124

124125
log.debug("Initializing sync stream request");
125126
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
126-
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
127+
GetMetadataResponse metadataResponse = null;
127128

128129
// create a context which exists to track and cancel the stream
129130
try (CancellableContext context = Context.current().withCancellation()) {
@@ -162,8 +163,7 @@ private void observeSyncStream() throws InterruptedException {
162163
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
163164
if (!outgoingQueue.offer(new QueuePayload(
164165
QueuePayloadType.ERROR,
165-
String.format("Error from stream: %s", streamException.getMessage()),
166-
metadataResponse))) {
166+
String.format("Error from stream: %s", streamException.getMessage())))) {
167167
log.error("Failed to convey ERROR status, queue is full");
168168
}
169169
break;
@@ -173,7 +173,14 @@ private void observeSyncStream() throws InterruptedException {
173173
final String data = flagsResponse.getFlagConfiguration();
174174
log.debug("Got stream response: {}", data);
175175

176-
if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) {
176+
Struct syncContext = null;
177+
if (flagsResponse.hasSyncContext()) {
178+
syncContext = flagsResponse.getSyncContext();
179+
} else if (metadataResponse != null) {
180+
syncContext = metadataResponse.getMetadata();
181+
}
182+
183+
if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
177184
log.error("Stream writing failed");
178185
}
179186
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ public void setupProvider(String providerType) throws InterruptedException {
120120
state.builder.port(container.getPort(State.resolverType));
121121
}
122122
break;
123-
default:
123+
case "syncpayload":
124+
flagdConfig = "sync-payload";
125+
state.builder.port(container.getPort(State.resolverType));
126+
break;
127+
case "stable":
124128
this.state.providerType = ProviderType.DEFAULT;
125129
if (State.resolverType == Config.Resolver.FILE) {
126130

@@ -134,6 +138,8 @@ public void setupProvider(String providerType) throws InterruptedException {
134138
state.builder.port(container.getPort(State.resolverType));
135139
}
136140
break;
141+
default:
142+
throw new IllegalStateException();
137143
}
138144
when().post("http://" + container.getLaunchpadUrl() + "/start?config={config}", flagdConfig)
139145
.then()

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
1212
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
1313
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
14-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
1514
import java.time.Duration;
1615
import java.util.HashSet;
1716
import java.util.Map;
@@ -35,10 +34,7 @@ void connectorHandling() throws Exception {
3534

3635
// OK for simple flag
3736
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
38-
payload.offer(new QueuePayload(
39-
QueuePayloadType.DATA,
40-
getFlagsFromResource(VALID_SIMPLE),
41-
GetMetadataResponse.getDefaultInstance()));
37+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
4238
});
4339

4440
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -47,10 +43,7 @@ void connectorHandling() throws Exception {
4743

4844
// STALE for invalid flag
4945
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
50-
payload.offer(new QueuePayload(
51-
QueuePayloadType.DATA,
52-
getFlagsFromResource(INVALID_FLAG),
53-
GetMetadataResponse.getDefaultInstance()));
46+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG)));
5447
});
5548

5649
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -59,8 +52,7 @@ void connectorHandling() throws Exception {
5952

6053
// OK again for next payload
6154
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
62-
payload.offer(new QueuePayload(
63-
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
55+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
6456
});
6557

6658
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -69,7 +61,7 @@ void connectorHandling() throws Exception {
6961

7062
// ERROR is propagated correctly
7163
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
72-
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, GetMetadataResponse.getDefaultInstance()));
64+
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null));
7365
});
7466

7567
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -93,10 +85,7 @@ public void changedFlags() throws Exception {
9385
final BlockingQueue<StorageStateChange> storageStateDTOS = store.getStateQueue();
9486

9587
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
96-
payload.offer(new QueuePayload(
97-
QueuePayloadType.DATA,
98-
getFlagsFromResource(VALID_SIMPLE),
99-
GetMetadataResponse.getDefaultInstance()));
88+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
10089
});
10190
// flags changed for first time
10291
assertEquals(
@@ -105,8 +94,7 @@ public void changedFlags() throws Exception {
10594
storageStateDTOS.take().getChangedFlagsKeys());
10695

10796
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
108-
payload.offer(new QueuePayload(
109-
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
97+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
11098
});
11199
Map<String, FeatureFlag> expectedChangedFlags =
112100
FlagParser.parseString(getFlagsFromResource(VALID_LONG), true).getFlags();

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
44
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
55
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
6-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
76
import java.util.concurrent.BlockingQueue;
87
import lombok.extern.slf4j.Slf4j;
98

@@ -26,8 +25,7 @@ public BlockingQueue<QueuePayload> getStreamQueue() {
2625

2726
public void shutdown() {
2827
// Emit error mocking closed connection scenario
29-
if (!mockQueue.offer(new QueuePayload(
30-
QueuePayloadType.ERROR, "shutdown invoked", GetMetadataResponse.getDefaultInstance()))) {
28+
if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked"))) {
3129
log.warn("Failed to offer shutdown status");
3230
}
3331
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
22

33
import static org.junit.Assert.assertNotNull;
4+
import static org.junit.Assert.assertNull;
45
import static org.junit.Assert.assertTrue;
56
import static org.junit.jupiter.api.Assertions.assertEquals;
67
import static org.mockito.ArgumentMatchers.any;
@@ -12,6 +13,7 @@
1213
import static org.mockito.Mockito.verify;
1314
import static org.mockito.Mockito.when;
1415

16+
import com.google.protobuf.Struct;
1517
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
1618
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
1719
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
@@ -73,6 +75,7 @@ void onNextEnqueuesDataPayload() throws Exception {
7375
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
7476
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
7577
assertNotNull(payload);
78+
assertNotNull(payload.getSyncContext());
7679
assertEquals(QueuePayloadType.DATA, payload.getType());
7780
// should NOT have restarted the stream (1 call)
7881
verify(stub, times(1)).syncFlags(any(), any());
@@ -94,13 +97,38 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
9497
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
9598
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
9699
assertNotNull(payload);
100+
assertNull(payload.getSyncContext());
97101
assertEquals(QueuePayloadType.DATA, payload.getType());
98102
// should NOT have restarted the stream (1 call)
99103
verify(stub, times(1)).syncFlags(any(), any());
100104
// should NOT have called getMetadata
101105
verify(blockingStub, times(0)).getMetadata(any());
102106
}
103107

108+
@Test
109+
void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
110+
// disable GetMetadata call
111+
SyncStreamQueueSource connector =
112+
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
113+
latch = new CountDownLatch(1);
114+
connector.init();
115+
latch.await();
116+
117+
// fire onNext (data) event
118+
Struct syncContext = Struct.newBuilder().build();
119+
observer.onNext(
120+
SyncFlagsResponse.newBuilder().setSyncContext(syncContext).build());
121+
122+
// should enqueue data payload
123+
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
124+
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
125+
assertNotNull(payload);
126+
assertEquals(syncContext, payload.getSyncContext());
127+
assertEquals(QueuePayloadType.DATA, payload.getType());
128+
// should NOT have restarted the stream (1 call)
129+
verify(stub, times(1)).syncFlags(any(), any());
130+
}
131+
104132
@Test
105133
void onErrorEnqueuesDataPayload() throws Exception {
106134
SyncStreamQueueSource connector =

providers/flagd/test-harness

0 commit comments

Comments
 (0)