Skip to content

Commit 77f653d

Browse files
feat: Adding environmentID support to SSEHandler (#21)
1 parent 0cde92a commit 77f653d

File tree

4 files changed

+106
-7
lines changed

4 files changed

+106
-7
lines changed

httphelpers/handlers_sse.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,23 @@ func SSEHandler(initialEvent *SSEEvent) (http.Handler, SSEStreamControl) {
106106
return handler, &sseStreamControlImpl{streamControl}
107107
}
108108

109+
// SSEHandlerWithEnvironmentID creates an HTTP handler that streams Server-Sent Events data.
110+
//
111+
// The behavior is exactly the same as SSEHandler except environmentID will be returned in
112+
// the response header X-Ld-Envid.
113+
func SSEHandlerWithEnvironmentID(initialEvent *SSEEvent, environmentID string) (http.Handler, SSEStreamControl) {
114+
var initialData []byte
115+
if initialEvent != nil {
116+
initialData = initialEvent.Bytes()
117+
}
118+
handler, streamControl := ChunkedStreamingHandler(
119+
initialData,
120+
"text/event-stream; charset=utf-8",
121+
ChunkedStreamingHandlerOptionEnvironmentID(environmentID),
122+
)
123+
return handler, &sseStreamControlImpl{streamControl}
124+
}
125+
109126
func (s *sseStreamControlImpl) Enqueue(event SSEEvent) {
110127
s.streamControl.Enqueue(event.Bytes())
111128
}

httphelpers/handlers_sse_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,29 @@ data: data3
5050
`, string(data))
5151
})
5252
}
53+
54+
func TestSSEHandlerWithEnvironmentID(t *testing.T) {
55+
initialEvent := SSEEvent{"id1", "event1", "data1", 0}
56+
handler, stream := SSEHandlerWithEnvironmentID(&initialEvent, "env-id")
57+
defer stream.Close()
58+
59+
WithServer(handler, func(server *httptest.Server) {
60+
resp1, err := http.DefaultClient.Get(server.URL)
61+
require.NoError(t, err)
62+
defer resp1.Body.Close()
63+
64+
assert.Equal(t, 200, resp1.StatusCode)
65+
assert.Equal(t, "text/event-stream; charset=utf-8", resp1.Header.Get("Content-Type"))
66+
assert.Equal(t, "env-id", resp1.Header.Get("X-Ld-Envid"))
67+
68+
stream.EndAll()
69+
70+
data, err := io.ReadAll(resp1.Body)
71+
assert.NoError(t, err)
72+
assert.Equal(t, `id: id1
73+
event: event1
74+
data: data1
75+
76+
`, string(data))
77+
})
78+
}

httphelpers/handlers_streaming.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,25 @@ type StreamControl interface {
2525
Close() error
2626
}
2727

28+
// ChunkedStreamingHandlerOption is a common interface for optional configuration parameters that
29+
// can be used in creating a ChunkedStreamingHandler.
30+
type ChunkedStreamingHandlerOption interface {
31+
apply(h *chunkedStreamingHandlerImpl)
32+
}
33+
34+
type environmentIDChunkedStreamingHandlerOption string
35+
36+
func (o environmentIDChunkedStreamingHandlerOption) apply(h *chunkedStreamingHandlerImpl) {
37+
h.environmentID = string(o)
38+
}
39+
40+
// ChunkedStreamingHandlerOptionEnvironmentID returns an option that sets the environment ID
41+
// for a ChunkedStreamingHandler when the handler is created. The environment ID will be
42+
// returned in the response header X-Ld-Envid.
43+
func ChunkedStreamingHandlerOptionEnvironmentID(environmentID string) ChunkedStreamingHandlerOption {
44+
return environmentIDChunkedStreamingHandlerOption(environmentID)
45+
}
46+
2847
// ChunkedStreamingHandler creates an HTTP handler that streams arbitrary data using chunked encoding.
2948
//
3049
// The initialData parameter, if not nil, specifies a starting chunk that should always be sent to any
@@ -52,21 +71,29 @@ type StreamControl interface {
5271
// }
5372
// }
5473
// }()
55-
func ChunkedStreamingHandler(initialChunk []byte, contentType string) (http.Handler, StreamControl) {
74+
func ChunkedStreamingHandler(
75+
initialChunk []byte,
76+
contentType string,
77+
options ...ChunkedStreamingHandlerOption,
78+
) (http.Handler, StreamControl) {
5679
sh := &chunkedStreamingHandlerImpl{
5780
initialChunk: initialChunk,
5881
contentType: contentType,
5982
}
83+
for _, o := range options {
84+
o.apply(sh)
85+
}
6086
return sh, sh
6187
}
6288

6389
type chunkedStreamingHandlerImpl struct {
64-
initialChunk []byte
65-
contentType string
66-
queued [][]byte
67-
channels []chan []byte
68-
closed bool
69-
lock sync.Mutex
90+
initialChunk []byte
91+
contentType string
92+
queued [][]byte
93+
channels []chan []byte
94+
closed bool
95+
lock sync.Mutex
96+
environmentID string
7097
}
7198

7299
func (s *chunkedStreamingHandlerImpl) Enqueue(data []byte) {
@@ -173,6 +200,9 @@ func (s *chunkedStreamingHandlerImpl) ServeHTTP(w http.ResponseWriter, r *http.R
173200
h := w.Header()
174201
h.Set("Content-Type", s.contentType)
175202
h.Set("Cache-Control", "no-cache, no-store, must-revalidate")
203+
if len(s.environmentID) > 0 {
204+
h.Set("X-Ld-Envid", s.environmentID)
205+
}
176206

177207
if s.initialChunk != nil {
178208
_, _ = w.Write(s.initialChunk)

httphelpers/handlers_streaming_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,29 @@ func TestChunkedStreamingHandlerClose(t *testing.T) {
123123
assert.Equal(t, 500, resp2.StatusCode)
124124
})
125125
}
126+
127+
func TestChunkedStreamingHandlerWithEnvironmentID(t *testing.T) {
128+
initialData := []byte("hello")
129+
handler, stream := ChunkedStreamingHandler(
130+
initialData,
131+
"text/plain",
132+
ChunkedStreamingHandlerOptionEnvironmentID("env-id"),
133+
)
134+
defer stream.Close()
135+
136+
WithServer(handler, func(server *httptest.Server) {
137+
resp1, err := http.DefaultClient.Get(server.URL)
138+
require.NoError(t, err)
139+
defer resp1.Body.Close()
140+
141+
assert.Equal(t, 200, resp1.StatusCode)
142+
assert.Equal(t, "text/plain", resp1.Header.Get("Content-Type"))
143+
assert.Equal(t, "env-id", resp1.Header.Get("X-Ld-Envid"))
144+
145+
stream.EndAll()
146+
147+
data, err := io.ReadAll(resp1.Body)
148+
require.NoError(t, err)
149+
assert.Equal(t, "hello", string(data))
150+
})
151+
}

0 commit comments

Comments
 (0)