From 8eed3ee12cb65462e740174d3bc2b08ee20f7a25 Mon Sep 17 00:00:00 2001 From: Alexandre Chouippe Date: Wed, 12 Feb 2025 16:27:33 +0100 Subject: [PATCH 1/2] Both support GET and POST HTTP requests for SSE subscription --- .../integration_test/sse_livecycle_test.exs | 25 +++++++++ neurow/lib/neurow/public_api/endpoint.ex | 2 +- .../test/neurow/public_api/endpoint_test.exs | 55 +++++++++++++------ neurow/test/sse_helper.exs | 4 +- 4 files changed, 67 insertions(+), 19 deletions(-) diff --git a/neurow/integration_test/sse_livecycle_test.exs b/neurow/integration_test/sse_livecycle_test.exs index 54a7cc1..003eba2 100644 --- a/neurow/integration_test/sse_livecycle_test.exs +++ b/neurow/integration_test/sse_livecycle_test.exs @@ -3,6 +3,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do import SseHelper import SseHelper.HttpSse + import JwtHelper alias Neurow.IntegrationTest.TestCluster @@ -127,6 +128,30 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do end end + test "also supports GET HTTP requests for SSE subscription", %{ + cluster_state: %{ + public_api_ports: [first_public_port | _other_ports] + } + } do + headers = + [Authorization: "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic")}"] + + async_response = HTTPoison.get!(subscribe_url(first_public_port), headers, stream_to: self()) + + assert_receive %HTTPoison.AsyncStatus{code: 200} + assert_receive %HTTPoison.AsyncHeaders{headers: headers} + + assert_headers(headers, [ + {"access-control-allow-origin", "*"}, + {"cache-control", "no-cache, no-store"}, + {"connection", "close"}, + {"content-type", "text/event-stream"}, + {"transfer-encoding", "chunked"} + ]) + + :hackney.stop_async(async_response.id) + end + def override_timeout(timeout) do {:ok, default_timeout} = Application.fetch_env(:neurow, :sse_timeout) TestCluster.update_sse_timeout(timeout) diff --git a/neurow/lib/neurow/public_api/endpoint.ex b/neurow/lib/neurow/public_api/endpoint.ex index ba2dbbc..37566d7 100644 --- a/neurow/lib/neurow/public_api/endpoint.ex +++ b/neurow/lib/neurow/public_api/endpoint.ex @@ -25,7 +25,7 @@ defmodule Neurow.PublicApi.Endpoint do context_path = Neurow.Configuration.public_api_context_path() case {conn.method, conn.request_path} do - {"GET", ^context_path <> "/v1/subscribe"} -> + {method, ^context_path <> "/v1/subscribe"} when method in ["GET", "POST"] -> subscribe(conn) _ -> diff --git a/neurow/test/neurow/public_api/endpoint_test.exs b/neurow/test/neurow/public_api/endpoint_test.exs index d54076a..39292ce 100644 --- a/neurow/test/neurow/public_api/endpoint_test.exs +++ b/neurow/test/neurow/public_api/endpoint_test.exs @@ -9,7 +9,7 @@ defmodule Neurow.PublicApi.EndpointTest do describe "authentication" do test "denies access if no JWT token is provided" do conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") call(Neurow.PublicApi.Endpoint, conn, fn -> assert_receive {:send_resp_status, 401} @@ -33,7 +33,7 @@ defmodule Neurow.PublicApi.EndpointTest do test "denies access if an invalid JWT token is provided" do conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header("authorization", "Bearer bad_token") call(Neurow.PublicApi.Endpoint, conn, fn -> @@ -57,7 +57,7 @@ defmodule Neurow.PublicApi.EndpointTest do test "allows access if a valid JWT token is provided" do conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("foo56")}" @@ -72,7 +72,7 @@ defmodule Neurow.PublicApi.EndpointTest do describe "messaging" do test "transmits messages for the subscribed topic" do conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -113,7 +113,7 @@ defmodule Neurow.PublicApi.EndpointTest do test "returns a bad request error if the Last-Event_Id header is not an integer" do conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -152,7 +152,7 @@ defmodule Neurow.PublicApi.EndpointTest do publish_message("test_issuer1-test_topic1", 8, "Message ID8") conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -169,7 +169,7 @@ defmodule Neurow.PublicApi.EndpointTest do publish_message("test_issuer1-other_topic", 7, "This message is not expected") conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -194,7 +194,7 @@ defmodule Neurow.PublicApi.EndpointTest do publish_message("test_issuer1-test_topic1", 8, "Message ID8") conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -239,7 +239,7 @@ defmodule Neurow.PublicApi.EndpointTest do publish_message("test_issuer1-test_topic1", 8, "Message ID8") conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -263,7 +263,7 @@ defmodule Neurow.PublicApi.EndpointTest do publish_message("test_issuer1-test_topic1", 8, "Message ID8") conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -301,7 +301,7 @@ defmodule Neurow.PublicApi.EndpointTest do publish_message("test_issuer1-test_topic1", 8, "Message ID8") conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -341,7 +341,7 @@ defmodule Neurow.PublicApi.EndpointTest do override_timeout(500) conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -357,7 +357,7 @@ defmodule Neurow.PublicApi.EndpointTest do override_keepalive(500) conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -379,7 +379,7 @@ defmodule Neurow.PublicApi.EndpointTest do test "the client is disconnected when the JWT token expires" do conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1", duration_s: 3)}" @@ -466,7 +466,7 @@ defmodule Neurow.PublicApi.EndpointTest do test "the authentication logic is applied to urls prefixed by the context path" do conn = - conn(:get, "/v1/subscribe") + conn(:post, "/v1/subscribe") call(Neurow.PublicApi.Endpoint, conn, fn -> assert_receive {:send_resp_status, 401} @@ -490,7 +490,7 @@ defmodule Neurow.PublicApi.EndpointTest do test "The subscribe url is prefixed with the context path" do conn = - conn(:get, "/context_path/v1/subscribe") + conn(:post, "/context_path/v1/subscribe") |> put_req_header( "authorization", "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" @@ -511,6 +511,29 @@ defmodule Neurow.PublicApi.EndpointTest do end end + test "also support GET HTTP requests for SSE subscription" do + conn = + conn(:get, "/v1/subscribe") + |> put_req_header( + "authorization", + "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" + ) + + call(Neurow.PublicApi.Endpoint, conn, fn -> + assert_receive {:send_chunked, 200} + + publish_message("test_issuer1-test_topic1", 1234, "Message") + + assert_receive {:chunk, first_event} + + assert parse_sse_event(first_event) == %{ + id: "1234", + event: "test-event", + data: "Message" + } + end) + end + defp override_timeout(timeout) do default_timeout = Application.fetch_env!(:neurow, :sse_timeout) Application.put_env(:neurow, :sse_timeout, timeout) diff --git a/neurow/test/sse_helper.exs b/neurow/test/sse_helper.exs index f39e3c1..a42eecc 100644 --- a/neurow/test/sse_helper.exs +++ b/neurow/test/sse_helper.exs @@ -1,6 +1,6 @@ defmodule SseHelper do @moduledoc """ - Provides Helper function to test SSE connections + Provides Helper functions to test SSE connections - Functions at the root of the modules can be used both in unit and integration tests - Functions in SseHelper.PlugSse help to test Plug endpoint in unit test, - Functions in SSeHelper.HttpSse help to test Neurow though actual HTTP connections in integration tests @@ -121,7 +121,7 @@ defmodule SseHelper do [Authorization: "Bearer #{compute_jwt_token_in_req_header_public_api(topic)}"] ++ extra_headers - async_response = HTTPoison.get!(subscribe_url(port), headers, stream_to: self()) + async_response = HTTPoison.post!(subscribe_url(port), "", headers, stream_to: self()) assert_fn.() :hackney.stop_async(async_response.id) end From 880ee3f69c1db8baf730d147e35424aad471992c Mon Sep 17 00:00:00 2001 From: Alexandre Chouippe Date: Wed, 12 Feb 2025 16:39:28 +0100 Subject: [PATCH 2/2] Add extra caching directive to prevent browser from automatically refetching after being idle --- neurow/integration_test/message_brokering_test.exs | 4 ++-- neurow/integration_test/message_history_test.exs | 12 ++++++------ neurow/integration_test/sse_livecycle_test.exs | 6 +++--- neurow/lib/neurow/public_api/endpoint.ex | 2 +- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/neurow/integration_test/message_brokering_test.exs b/neurow/integration_test/message_brokering_test.exs index ab5ab06..355abf1 100644 --- a/neurow/integration_test/message_brokering_test.exs +++ b/neurow/integration_test/message_brokering_test.exs @@ -34,7 +34,7 @@ defmodule Neurow.IntegrationTest.MessageBrokeringTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} @@ -91,7 +91,7 @@ defmodule Neurow.IntegrationTest.MessageBrokeringTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} diff --git a/neurow/integration_test/message_history_test.exs b/neurow/integration_test/message_history_test.exs index 915836d..e6ef7fe 100644 --- a/neurow/integration_test/message_history_test.exs +++ b/neurow/integration_test/message_history_test.exs @@ -46,7 +46,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} @@ -70,9 +70,9 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, - {"content-type", "text/event-stream"}, + {"content-type", "text/event-stream"} ]) assert_receive %HTTPoison.AsyncChunk{chunk: body} @@ -112,7 +112,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} @@ -139,7 +139,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} @@ -171,7 +171,7 @@ defmodule Neurow.IntegrationTest.MessageHistoryTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} diff --git a/neurow/integration_test/sse_livecycle_test.exs b/neurow/integration_test/sse_livecycle_test.exs index 003eba2..6253e4a 100644 --- a/neurow/integration_test/sse_livecycle_test.exs +++ b/neurow/integration_test/sse_livecycle_test.exs @@ -26,7 +26,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} @@ -60,7 +60,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"}, @@ -143,7 +143,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do assert_headers(headers, [ {"access-control-allow-origin", "*"}, - {"cache-control", "no-cache, no-store"}, + {"cache-control", "no-cache, no-store, max-age=0, must-revalidate"}, {"connection", "close"}, {"content-type", "text/event-stream"}, {"transfer-encoding", "chunked"} diff --git a/neurow/lib/neurow/public_api/endpoint.ex b/neurow/lib/neurow/public_api/endpoint.ex index 37566d7..93d5c86 100644 --- a/neurow/lib/neurow/public_api/endpoint.ex +++ b/neurow/lib/neurow/public_api/endpoint.ex @@ -45,7 +45,7 @@ defmodule Neurow.PublicApi.Endpoint do conn |> put_resp_header("content-type", "text/event-stream") |> put_resp_header("access-control-allow-origin", "*") - |> put_resp_header("cache-control", "no-cache, no-store") + |> put_resp_header("cache-control", "no-cache, no-store, max-age=0, must-revalidate") |> put_resp_header("connection", "close") |> put_resp_header("x-sse-timeout", to_string(timeout_ms)) |> put_resp_header("x-sse-keepalive", to_string(keep_alive_ms))