Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions mockld/polling_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,39 +158,34 @@ func (p *PollingService) standardPollingHandler() http.Handler {
return nil
}

// QUESTION: How dynamic do we need to make this?
serverIntent := framework.ServerIntent{
Payloads: []framework.Payload{
{
ID: "payloadID",
Target: 1,
Code: "xfer-full",
Reason: "payload-missing",
},
},
}

payloadTransferred := framework.PayloadTransferred{
//nolint:godox
// TODO: Need to replace this with a valid state value
State: "state",
Version: 1,
}
events := make([]framework.PayloadEvent, 0, len(fdv2SdkData.events)+2)

events := make([]framework.PayloadEvent, 0, len(fdv2SdkData)+2)
events = append(events, framework.PayloadEvent{
Name: "server-intent",
EventData: serverIntent,
})
for _, obj := range fdv2SdkData {
Name: "server-intent",
EventData: framework.ServerIntent{
Payloads: []framework.Payload{
{
ID: "payloadID",
Target: 1,
Code: fdv2SdkData.intentCode,
Reason: fdv2SdkData.intentReason,
},
},
}})

for _, obj := range fdv2SdkData.events {
events = append(events, framework.PayloadEvent{
Name: "put-object",
EventData: obj,
})
}

events = append(events, framework.PayloadEvent{
Name: "payload-transferred",
EventData: payloadTransferred,
Name: "payload-transferred",
EventData: framework.PayloadTransferred{
State: fdv2SdkData.state,
Version: 1,
},
})

payload := framework.PollingPayload{
Expand Down
60 changes: 53 additions & 7 deletions mockld/sdk_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,22 @@ type SDKData interface {
Serialize() []byte
}

type FDv2SDKData []framework.BaseObject
type FDv2SDKData struct {
intentCode string
intentReason string
state string

events []framework.BaseObject
}

func NewFDv2SDKData(intentCode, intentReason, state string, events []framework.BaseObject) FDv2SDKData {
return FDv2SDKData{
intentCode: intentCode,
intentReason: intentReason,
state: state,
events: events,
}
}

func (f FDv2SDKData) Serialize() []byte {
return jsonhelpers.ToJSON(f)
Expand Down Expand Up @@ -85,7 +100,12 @@ func (d ServerSDKData) ConvertToFDv2SDKData(t *ldtest.T) FDv2SDKData {
}
}

return payloadObjects
return FDv2SDKData{
intentCode: "xfer-full",
intentReason: "initial",
state: "initial",
events: payloadObjects,
}
}

// ClientSDKData contains simulated LaunchDarkly environment data for a client-side SDK.
Expand All @@ -112,8 +132,7 @@ type ClientSDKFlagWithKey struct {
}

func EmptyServerSDKData() SDKData {
var data FDv2SDKData = make([]framework.BaseObject, 0)
return data
return NewFDv2SDKData("xfer-full", "payload-missing", "initial", make([]framework.BaseObject, 0))
}

func EmptyClientSDKData() SDKData {
Expand Down Expand Up @@ -194,14 +213,21 @@ func normalizeSegment(key string, data json.RawMessage) (json.RawMessage, error)
}

type ServerSDKDataBuilder struct {
flags map[string]json.RawMessage
segments map[string]json.RawMessage
flags map[string]json.RawMessage
segments map[string]json.RawMessage
intentCode string
intentReason string
state string
}

func NewServerSDKDataBuilder() *ServerSDKDataBuilder {
return &ServerSDKDataBuilder{
flags: make(map[string]json.RawMessage),
segments: make(map[string]json.RawMessage),

intentCode: "xfer-full",
intentReason: "payload-missing",
state: "initial",
}
}

Expand Down Expand Up @@ -242,7 +268,27 @@ func (b *ServerSDKDataBuilder) Build() FDv2SDKData {
})
}

return events
return FDv2SDKData{
intentCode: b.intentCode,
intentReason: b.intentReason,
state: b.state,
events: events,
}
}

func (b *ServerSDKDataBuilder) IntentCode(code string) *ServerSDKDataBuilder {
b.intentCode = code
return b
}

func (b *ServerSDKDataBuilder) IntentReason(reason string) *ServerSDKDataBuilder {
b.intentReason = reason
return b
}

func (b *ServerSDKDataBuilder) State(state string) *ServerSDKDataBuilder {
b.state = state
return b
}

func (b *ServerSDKDataBuilder) RawFlag(key string, data json.RawMessage) *ServerSDKDataBuilder {
Expand Down
42 changes: 17 additions & 25 deletions mockld/streaming_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,37 +125,26 @@ func (s *StreamingService) makeXferFull() []eventsource.Event {

fdv2SdkData, ok := s.initialData.(FDv2SDKData)
if !ok {
s.debugLogger.Println("poller cannot handle non-fdv2 sdk data at this time")
s.debugLogger.Println("streamer cannot handle non-fdv2 sdk data at this time")
return nil
}

// QUESTION: How dynamic do we need to bother making this?
serverIntent := framework.ServerIntent{
Payloads: []framework.Payload{
{
ID: "payloadID",
Target: 1,
Code: "xfer-full",
Reason: "payload-missing",
},
},
}

// QUESTION: How dynamic do we need to bother making this?
payloadTransferred := framework.PayloadTransferred{
//nolint:godox
// TODO: Need to replace this with a valid state value
State: "state",
Version: 1,
}

events := make([]eventsource.Event, 0, len(fdv2SdkData)+2)
events := make([]eventsource.Event, 0, len(fdv2SdkData.events)+2)
events = append(events, eventImpl{
name: "server-intent",
data: serverIntent,
data: framework.ServerIntent{
Payloads: []framework.Payload{
{
ID: "payloadID",
Target: 1,
Code: fdv2SdkData.intentCode,
Reason: fdv2SdkData.intentReason,
},
},
},
})

for _, obj := range fdv2SdkData {
for _, obj := range fdv2SdkData.events {
events = append(events, eventImpl{
name: "put-object",
data: obj,
Expand All @@ -164,7 +153,10 @@ func (s *StreamingService) makeXferFull() []eventsource.Event {

events = append(events, eventImpl{
name: "payload-transferred",
data: payloadTransferred,
data: framework.PayloadTransferred{
State: fdv2SdkData.state,
Version: 1,
},
})

s.lock.RUnlock()
Expand Down
175 changes: 175 additions & 0 deletions sdktests/common_tests_stream_fdv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package sdktests

import (
"net/http"
"time"

"github.com/launchdarkly/go-test-helpers/v2/httphelpers"
m "github.com/launchdarkly/go-test-helpers/v2/matchers"
"github.com/launchdarkly/sdk-test-harness/v2/framework/harness"
"github.com/launchdarkly/sdk-test-harness/v2/framework/ldtest"
"github.com/launchdarkly/sdk-test-harness/v2/mockld"

"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"

"github.com/stretchr/testify/require"
)

var (
initialValue = ldvalue.String("initial value") //nolint:gochecknoglobals
updatedValue = ldvalue.String("updated value") //nolint:gochecknoglobals

newInitialValue = ldvalue.String("new initial value") //nolint:gochecknoglobals

defaultValue = ldvalue.String("default value") //nolint:gochecknoglobals
)

func (c CommonStreamingTests) FDv2(t *ldtest.T) {
t.Run("reconnection state management", c.StateTransitions)
t.Run(
"updates are not complete until payload transferred is sent",
c.UpdatesAreNotCompleteUntilPayloadTransferredIsSent)
}

func (c CommonStreamingTests) StateTransitions(t *ldtest.T) {
t.Run("initializes from an empty state", c.InitializeFromEmptyState)
t.Run("saves previously known state", c.SavesPreviouslyKnownState)
t.Run("replaces previously known state", c.ReplacesPreviouslyKnownState)
t.Run("updates previously known state", c.UpdatesPreviouslyKnownState)
}

func (c CommonStreamingTests) InitializeFromEmptyState(t *ldtest.T) {
streamEndpoint := makeSequentialStreamHandler(t, c.makeSDKDataWithFlag("flag-key", 1, initialValue))
t.Defer(streamEndpoint.Close)
client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint)))

expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue}
validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations)
}

func (c CommonStreamingTests) SavesPreviouslyKnownState(t *ldtest.T) {
dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue)
dataAfter := mockld.NewServerSDKDataBuilder().IntentCode("xfer-none").IntentReason("up-to-date").Build()
streamEndpoint := makeSequentialStreamHandler(t, dataBefore, dataAfter)
t.Defer(streamEndpoint.Close)
client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint)))

expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue}
request := validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations)
request.Cancel() // Drop the stream and allow the SDK to reconnect

validatePayloadReceived(t, streamEndpoint, client, "initial", expectedEvaluations)
}

func (c CommonStreamingTests) ReplacesPreviouslyKnownState(t *ldtest.T) {
dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue)
dataAfter := mockld.NewServerSDKDataBuilder().
IntentCode("xfer-full").
IntentReason("cant-catchup").
Flag(c.makeServerSideFlag("new-flag-key", 1, ldvalue.String("replacement value"))).
Build()
streamEndpoint := makeSequentialStreamHandler(t, dataBefore, dataAfter)
t.Defer(streamEndpoint.Close)
client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint)))

expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue, "new-flag-key": defaultValue}
request := validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations)
request.Cancel() // Drop the stream and allow the SDK to reconnect

expectedEvaluations = map[string]ldvalue.Value{
"flag-key": defaultValue,
"new-flag-key": ldvalue.String("replacement value")}
validatePayloadReceived(t, streamEndpoint, client, "initial", expectedEvaluations)
}

func (c CommonStreamingTests) UpdatesPreviouslyKnownState(t *ldtest.T) {
dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue)
dataAfter := mockld.NewServerSDKDataBuilder().
IntentCode("xfer-changes").
IntentReason("stale").
Flag(c.makeServerSideFlag("flag-key", 2, updatedValue)).
Flag(c.makeServerSideFlag("new-flag-key", 1, newInitialValue)).
Build()
streamEndpoint := makeSequentialStreamHandler(t, dataBefore, dataAfter)
t.Defer(streamEndpoint.Close)
client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint)))

expectedEvaluations := map[string]ldvalue.Value{"flag-key": initialValue, "new-flag-key": defaultValue}
request := validatePayloadReceived(t, streamEndpoint, client, "", expectedEvaluations)
request.Cancel() // Drop the stream and allow the SDK to reconnect

expectedEvaluations = map[string]ldvalue.Value{"flag-key": updatedValue, "new-flag-key": newInitialValue}
validatePayloadReceived(t, streamEndpoint, client, "initial", expectedEvaluations)
}

func (c CommonStreamingTests) UpdatesAreNotCompleteUntilPayloadTransferredIsSent(t *ldtest.T) {
dataBefore := c.makeSDKDataWithFlag("flag-key", 1, initialValue)
stream := NewSDKDataSourceWithoutEndpoint(t, dataBefore)
streamEndpoint := requireContext(t).harness.NewMockEndpoint(stream.Handler(), t.DebugLogger(),
harness.MockEndpointDescription("streaming service"))
t.Defer(streamEndpoint.Close)
client := NewSDKClient(t, WithStreamingConfig(baseStreamConfig(streamEndpoint)))

_, err := streamEndpoint.AwaitConnection(time.Second)
require.NoError(t, err)

context := ldcontext.New("context-key")
flagKeyValue := basicEvaluateFlag(t, client, "flag-key", context, defaultValue)
m.In(t).Assert(flagKeyValue, m.JSONEqual(initialValue))

stream.streamingService.PushUpdate("flag", "flag-key", 2, c.makeFlagData("flag-key", 2, updatedValue))
stream.streamingService.PushUpdate("flag", "new-flag-key", 1, c.makeFlagData("new-flag-key", 1, newInitialValue))

require.Never(
t,
checkForUpdatedValue(t, client, "flag-key", context, initialValue, updatedValue, defaultValue),
time.Millisecond*100,
time.Millisecond*20,
"flag value was updated, but it should not have been",
)

require.Never(
t,
checkForUpdatedValue(t, client, "new-flag-key", context, defaultValue, newInitialValue, defaultValue),
time.Millisecond*100,
time.Millisecond*20,
"flag value was updated, but it should not have been",
)

stream.streamingService.PushPayloadTransferred("updated", 2)

pollUntilFlagValueUpdated(t, client, "flag-key", context, initialValue, updatedValue, defaultValue)
pollUntilFlagValueUpdated(t, client, "new-flag-key", context, defaultValue, newInitialValue, defaultValue)
}

func makeSequentialStreamHandler(t *ldtest.T, dataSources ...mockld.SDKData) *harness.MockEndpoint {
handlers := make([]http.Handler, len(dataSources))

for i, data := range dataSources {
stream := NewSDKDataSourceWithoutEndpoint(t, data)
handlers[i] = stream.Handler()
}

handler := httphelpers.SequentialHandler(handlers[0], handlers[1:]...)

return requireContext(t).harness.NewMockEndpoint(handler, t.DebugLogger(),
harness.MockEndpointDescription("streaming service"))
}

func validatePayloadReceived(t *ldtest.T,
streamEndpoint *harness.MockEndpoint, client *SDKClient,
state string, evaluations map[string]ldvalue.Value) harness.IncomingRequestInfo {
request, err := streamEndpoint.AwaitConnection(time.Second)
require.NoError(t, err)

m.In(t).Assert(request.URL.Query().Get("basis"), m.Equal(state))

context := ldcontext.New("context-key")
for flagKey, expectedValue := range evaluations {
actualValue := basicEvaluateFlag(t, client, flagKey, context, defaultValue)
m.In(t).Assert(actualValue, m.JSONEqual(expectedValue))
}

return request
}
Loading