Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions neurow/integration_test/message_brokering_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Neurow.IntegrationTest.MessageBrokeringTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
Expand Down Expand Up @@ -91,7 +91,7 @@ defmodule Neurow.IntegrationTest.MessageBrokeringTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
Expand Down
12 changes: 6 additions & 6 deletions neurow/integration_test/message_history_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
Expand All @@ -70,9 +70,9 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"content-type", "text/event-stream"}
])

assert_receive %HTTPoison.AsyncChunk{chunk: body}
Expand Down Expand Up @@ -112,7 +112,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
Expand All @@ -139,7 +139,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
Expand Down Expand Up @@ -171,7 +171,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
Expand Down
29 changes: 27 additions & 2 deletions neurow/integration_test/sse_livecycle_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do

import SseHelper
import SseHelper.HttpSse
import JwtHelper

alias Neurow.IntegrationTest.TestCluster

Expand All @@ -25,7 +26,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
Expand Down Expand Up @@ -59,7 +60,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do
headers,
[
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"},
Expand Down Expand Up @@ -127,6 +128,30 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do
end
end

test "also supports GET HTTP requests for SSE subscription", %{
cluster_state: %{
public_api_ports: [first_public_port | _other_ports]
}
} do
headers =
[Authorization: "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic")}"]

async_response = HTTPoison.get!(subscribe_url(first_public_port), headers, stream_to: self())

assert_receive %HTTPoison.AsyncStatus{code: 200}
assert_receive %HTTPoison.AsyncHeaders{headers: headers}

assert_headers(headers, [
{"access-control-allow-origin", "*"},
{"cache-control", "no-cache, no-store, max-age=0, must-revalidate"},
{"connection", "close"},
{"content-type", "text/event-stream"},
{"transfer-encoding", "chunked"}
])

:hackney.stop_async(async_response.id)
end

def override_timeout(timeout) do
{:ok, default_timeout} = Application.fetch_env(:neurow, :sse_timeout)
TestCluster.update_sse_timeout(timeout)
Expand Down
4 changes: 2 additions & 2 deletions neurow/lib/neurow/public_api/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ defmodule Neurow.PublicApi.Endpoint do
context_path = Neurow.Configuration.public_api_context_path()

case {conn.method, conn.request_path} do
{"GET", ^context_path <> "/v1/subscribe"} ->
{method, ^context_path <> "/v1/subscribe"} when method in ["GET", "POST"] ->
subscribe(conn)

_ ->
Expand All @@ -45,7 +45,7 @@ defmodule Neurow.PublicApi.Endpoint do
conn
|> put_resp_header("content-type", "text/event-stream")
|> put_resp_header("access-control-allow-origin", "*")
|> put_resp_header("cache-control", "no-cache, no-store")
|> put_resp_header("cache-control", "no-cache, no-store, max-age=0, must-revalidate")
|> put_resp_header("connection", "close")
|> put_resp_header("x-sse-timeout", to_string(timeout_ms))
|> put_resp_header("x-sse-keepalive", to_string(keep_alive_ms))
Expand Down
55 changes: 39 additions & 16 deletions neurow/test/neurow/public_api/endpoint_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Neurow.PublicApi.EndpointTest do
describe "authentication" do
test "denies access if no JWT token is provided" do
conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")

call(Neurow.PublicApi.Endpoint, conn, fn ->
assert_receive {:send_resp_status, 401}
Expand All @@ -33,7 +33,7 @@ defmodule Neurow.PublicApi.EndpointTest do

test "denies access if an invalid JWT token is provided" do
conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header("authorization", "Bearer bad_token")

call(Neurow.PublicApi.Endpoint, conn, fn ->
Expand All @@ -57,7 +57,7 @@ defmodule Neurow.PublicApi.EndpointTest do

test "allows access if a valid JWT token is provided" do
conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("foo56")}"
Expand All @@ -72,7 +72,7 @@ defmodule Neurow.PublicApi.EndpointTest do
describe "messaging" do
test "transmits messages for the subscribed topic" do
conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand Down Expand Up @@ -113,7 +113,7 @@ defmodule Neurow.PublicApi.EndpointTest do

test "returns a bad request error if the Last-Event_Id header is not an integer" do
conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand Down Expand Up @@ -152,7 +152,7 @@ defmodule Neurow.PublicApi.EndpointTest do
publish_message("test_issuer1-test_topic1", 8, "Message ID8")

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand All @@ -169,7 +169,7 @@ defmodule Neurow.PublicApi.EndpointTest do
publish_message("test_issuer1-other_topic", 7, "This message is not expected")

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand All @@ -194,7 +194,7 @@ defmodule Neurow.PublicApi.EndpointTest do
publish_message("test_issuer1-test_topic1", 8, "Message ID8")

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand Down Expand Up @@ -239,7 +239,7 @@ defmodule Neurow.PublicApi.EndpointTest do
publish_message("test_issuer1-test_topic1", 8, "Message ID8")

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand All @@ -263,7 +263,7 @@ defmodule Neurow.PublicApi.EndpointTest do
publish_message("test_issuer1-test_topic1", 8, "Message ID8")

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand Down Expand Up @@ -301,7 +301,7 @@ defmodule Neurow.PublicApi.EndpointTest do
publish_message("test_issuer1-test_topic1", 8, "Message ID8")

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand Down Expand Up @@ -341,7 +341,7 @@ defmodule Neurow.PublicApi.EndpointTest do
override_timeout(500)

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand All @@ -357,7 +357,7 @@ defmodule Neurow.PublicApi.EndpointTest do
override_keepalive(500)

conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand All @@ -379,7 +379,7 @@ defmodule Neurow.PublicApi.EndpointTest do

test "the client is disconnected when the JWT token expires" do
conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1", duration_s: 3)}"
Expand Down Expand Up @@ -466,7 +466,7 @@ defmodule Neurow.PublicApi.EndpointTest do

test "the authentication logic is applied to urls prefixed by the context path" do
conn =
conn(:get, "/v1/subscribe")
conn(:post, "/v1/subscribe")

call(Neurow.PublicApi.Endpoint, conn, fn ->
assert_receive {:send_resp_status, 401}
Expand All @@ -490,7 +490,7 @@ defmodule Neurow.PublicApi.EndpointTest do

test "The subscribe url is prefixed with the context path" do
conn =
conn(:get, "/context_path/v1/subscribe")
conn(:post, "/context_path/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
Expand All @@ -511,6 +511,29 @@ defmodule Neurow.PublicApi.EndpointTest do
end
end

test "also support GET HTTP requests for SSE subscription" do
conn =
conn(:get, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}"
)

call(Neurow.PublicApi.Endpoint, conn, fn ->
assert_receive {:send_chunked, 200}

publish_message("test_issuer1-test_topic1", 1234, "Message")

assert_receive {:chunk, first_event}

assert parse_sse_event(first_event) == %{
id: "1234",
event: "test-event",
data: "Message"
}
end)
end

defp override_timeout(timeout) do
default_timeout = Application.fetch_env!(:neurow, :sse_timeout)
Application.put_env(:neurow, :sse_timeout, timeout)
Expand Down
4 changes: 2 additions & 2 deletions neurow/test/sse_helper.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule SseHelper do
@moduledoc """
Provides Helper function to test SSE connections
Provides Helper functions to test SSE connections
- Functions at the root of the modules can be used both in unit and integration tests
- Functions in SseHelper.PlugSse help to test Plug endpoint in unit test,
- Functions in SSeHelper.HttpSse help to test Neurow though actual HTTP connections in integration tests
Expand Down Expand Up @@ -121,7 +121,7 @@ defmodule SseHelper do
[Authorization: "Bearer #{compute_jwt_token_in_req_header_public_api(topic)}"] ++
extra_headers

async_response = HTTPoison.get!(subscribe_url(port), headers, stream_to: self())
async_response = HTTPoison.post!(subscribe_url(port), "", headers, stream_to: self())
assert_fn.()
:hackney.stop_async(async_response.id)
end
Expand Down