|
| 1 | +package sdktests |
| 2 | + |
| 3 | +import ( |
| 4 | + "net/http" |
| 5 | + "time" |
| 6 | + |
| 7 | + "github.com/launchdarkly/go-test-helpers/v2/httphelpers" |
| 8 | + m "github.com/launchdarkly/go-test-helpers/v2/matchers" |
| 9 | + "github.com/launchdarkly/sdk-test-harness/v2/framework/harness" |
| 10 | + "github.com/launchdarkly/sdk-test-harness/v2/framework/ldtest" |
| 11 | + "github.com/launchdarkly/sdk-test-harness/v2/mockld" |
| 12 | + |
| 13 | + "github.com/launchdarkly/go-sdk-common/v3/ldcontext" |
| 14 | + "github.com/launchdarkly/go-sdk-common/v3/ldvalue" |
| 15 | + |
| 16 | + "github.com/stretchr/testify/require" |
| 17 | +) |
| 18 | + |
| 19 | +var ( |
| 20 | + initialValue = ldvalue.String("initial value") //nolint:gochecknoglobals |
| 21 | + updatedValue = ldvalue.String("updated value") //nolint:gochecknoglobals |
| 22 | + |
| 23 | + newInitialValue = ldvalue.String("new initial value") //nolint:gochecknoglobals |
| 24 | + |
| 25 | + defaultValue = ldvalue.String("default value") //nolint:gochecknoglobals |
| 26 | +) |
| 27 | + |
| 28 | +func (c CommonStreamingTests) FDv2(t *ldtest.T) { |
| 29 | + t.Run("reconnection state management", c.StateTransitions) |
| 30 | + t.Run( |
| 31 | + "updates are not complete until payload transferred is sent", |
| 32 | + c.UpdatesAreNotCompleteUntilPayloadTransferredIsSent) |
| 33 | +} |
| 34 | + |
| 35 | +func (c CommonStreamingTests) StateTransitions(t *ldtest.T) { |
| 36 | + t.Run("initializes from an empty state", c.InitializeFromEmptyState) |
| 37 | + t.Run("saves previously known state", c.SavesPreviouslyKnownState) |
| 38 | + t.Run("replaces previously known state", c.ReplacesPreviouslyKnownState) |
| 39 | + t.Run("updates previously known state", c.UpdatesPreviouslyKnownState) |
| 40 | +} |
| 41 | + |
| 42 | +func (c CommonStreamingTests) InitializeFromEmptyState(t *ldtest.T) { |
| 43 | + streamEndpoint := makeSequentialStreamHandler(t, c.makeSDKDataWithFlag("flag-key", 1, initialValue)) |
| 44 | + t.Defer(streamEndpoint.Close) |
| 45 | + client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint))) |
| 46 | + |
| 47 | + expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue} |
| 48 | + validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations) |
| 49 | +} |
| 50 | + |
| 51 | +func (c CommonStreamingTests) SavesPreviouslyKnownState(t *ldtest.T) { |
| 52 | + dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue) |
| 53 | + dataAfter := mockld.NewServerSDKDataBuilder().IntentCode("xfer-none").IntentReason("up-to-date").Build() |
| 54 | + streamEndpoint := makeSequentialStreamHandler(t, dataBefore, dataAfter) |
| 55 | + t.Defer(streamEndpoint.Close) |
| 56 | + client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint))) |
| 57 | + |
| 58 | + expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue} |
| 59 | + request := validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations) |
| 60 | + request.Cancel() // Drop the stream and allow the SDK to reconnect |
| 61 | + |
| 62 | + validatePayloadReceived(t, streamEndpoint, client, "initial", expectedEvaluations) |
| 63 | +} |
| 64 | + |
| 65 | +func (c CommonStreamingTests) ReplacesPreviouslyKnownState(t *ldtest.T) { |
| 66 | + dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue) |
| 67 | + dataAfter := mockld.NewServerSDKDataBuilder(). |
| 68 | + IntentCode("xfer-full"). |
| 69 | + IntentReason("cant-catchup"). |
| 70 | + Flag(c.makeServerSideFlag("new-flag-key", 1, ldvalue.String("replacement value"))). |
| 71 | + Build() |
| 72 | + streamEndpoint := makeSequentialStreamHandler(t, dataBefore, dataAfter) |
| 73 | + t.Defer(streamEndpoint.Close) |
| 74 | + client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint))) |
| 75 | + |
| 76 | + expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue, "new-flag-key": defaultValue} |
| 77 | + request := validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations) |
| 78 | + request.Cancel() // Drop the stream and allow the SDK to reconnect |
| 79 | + |
| 80 | + expectedEvaluations = map[string]ldvalue.Value{ |
| 81 | + "flag-key": defaultValue, |
| 82 | + "new-flag-key": ldvalue.String("replacement value")} |
| 83 | + validatePayloadReceived(t, streamEndpoint, client, "initial", expectedEvaluations) |
| 84 | +} |
| 85 | + |
| 86 | +func (c CommonStreamingTests) UpdatesPreviouslyKnownState(t *ldtest.T) { |
| 87 | + dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue) |
| 88 | + dataAfter := mockld.NewServerSDKDataBuilder(). |
| 89 | + IntentCode("xfer-changes"). |
| 90 | + IntentReason("stale"). |
| 91 | + Flag(c.makeServerSideFlag("flag-key", 2, updatedValue)). |
| 92 | + Flag(c.makeServerSideFlag("new-flag-key", 1, newInitialValue)). |
| 93 | + Build() |
| 94 | + streamEndpoint := makeSequentialStreamHandler(t, dataBefore, dataAfter) |
| 95 | + t.Defer(streamEndpoint.Close) |
| 96 | + client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint))) |
| 97 | + |
| 98 | + expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue, "new-flag-key": defaultValue} |
| 99 | + request := validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations) |
| 100 | + request.Cancel() // Drop the stream and allow the SDK to reconnect |
| 101 | + |
| 102 | + expectedEvaluations = map[string]ldvalue.Value{"flag-key": updatedValue, "new-flag-key": newInitialValue} |
| 103 | + validatePayloadReceived(t, streamEndpoint, client, "initial", expectedEvaluations) |
| 104 | +} |
| 105 | + |
| 106 | +func (c CommonStreamingTests) UpdatesAreNotCompleteUntilPayloadTransferredIsSent(t *ldtest.T) { |
| 107 | + dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue) |
| 108 | + stream := NewSDKDataSourceWithoutEndpoint(t, dataBefore) |
| 109 | + streamEndpoint := requireContext(t).harness.NewMockEndpoint(stream.Handler(), t.DebugLogger(), |
| 110 | + harness.MockEndpointDescription("streaming service")) |
| 111 | + t.Defer(streamEndpoint.Close) |
| 112 | + client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint))) |
| 113 | + |
| 114 | + _, err := streamEndpoint.AwaitConnection(time.Second) |
| 115 | + require.NoError(t, err) |
| 116 | + |
| 117 | + context := ldcontext.New("context-key") |
| 118 | + flagKeyValue := basicEvaluateFlag(t, client, "flag-key", context, defaultValue) |
| 119 | + m.In(t).Assert(flagKeyValue, m.JSONEqual(initialValue)) |
| 120 | + |
| 121 | + stream.streamingService.PushUpdate("flag", "flag-key", 2, c.makeFlagData("flag-key", 2, updatedValue)) |
| 122 | + stream.streamingService.PushUpdate("flag", "new-flag-key", 1, c.makeFlagData("new-flag-key", 1, newInitialValue)) |
| 123 | + |
| 124 | + require.Never( |
| 125 | + t, |
| 126 | + checkForUpdatedValue(t, client, "flag-key", context, initialValue, updatedValue, defaultValue), |
| 127 | + time.Millisecond*100, |
| 128 | + time.Millisecond*20, |
| 129 | + "flag value was updated, but it should not have been", |
| 130 | + ) |
| 131 | + |
| 132 | + require.Never( |
| 133 | + t, |
| 134 | + checkForUpdatedValue(t, client, "new-flag-key", context, defaultValue, newInitialValue, defaultValue), |
| 135 | + time.Millisecond*100, |
| 136 | + time.Millisecond*20, |
| 137 | + "flag value was updated, but it should not have been", |
| 138 | + ) |
| 139 | + |
| 140 | + stream.streamingService.PushPayloadTransferred("updated", 2) |
| 141 | + |
| 142 | + pollUntilFlagValueUpdated(t, client, "flag-key", context, initialValue, updatedValue, defaultValue) |
| 143 | + pollUntilFlagValueUpdated(t, client, "new-flag-key", context, defaultValue, newInitialValue, defaultValue) |
| 144 | +} |
| 145 | + |
| 146 | +func makeSequentialStreamHandler(t *ldtest.T, dataSources ...mockld.SDKData) *harness.MockEndpoint { |
| 147 | + handlers := make([]http.Handler, len(dataSources)) |
| 148 | + |
| 149 | + for i, data := range dataSources { |
| 150 | + stream := NewSDKDataSourceWithoutEndpoint(t, data) |
| 151 | + handlers[i] = stream.Handler() |
| 152 | + } |
| 153 | + |
| 154 | + handler := httphelpers.SequentialHandler(handlers[0], handlers[1:]...) |
| 155 | + |
| 156 | + return requireContext(t).harness.NewMockEndpoint(handler, t.DebugLogger(), |
| 157 | + harness.MockEndpointDescription("streaming service")) |
| 158 | +} |
| 159 | + |
| 160 | +func validatePayloadReceived(t *ldtest.T, |
| 161 | + streamEndpoint *harness.MockEndpoint, client *SDKClient, |
| 162 | + state string, evaluations map[string]ldvalue.Value) harness.IncomingRequestInfo { |
| 163 | + request, err := streamEndpoint.AwaitConnection(time.Second) |
| 164 | + require.NoError(t, err) |
| 165 | + |
| 166 | + m.In(t).Assert(request.URL.Query().Get("basis"), m.Equal(state)) |
| 167 | + |
| 168 | + context := ldcontext.New("context-key") |
| 169 | + for flagKey, expectedValue := range evaluations { |
| 170 | + actualValue := basicEvaluateFlag(t, client, flagKey, context, defaultValue) |
| 171 | + m.In(t).Assert(actualValue, m.JSONEqual(expectedValue)) |
| 172 | + } |
| 173 | + |
| 174 | + return request |
| 175 | +} |
0 commit comments