Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/reactive_commons/api/handler_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ defmodule HandlerRegistry do
do: conf
|> Conf.add_listener(:notification_event_listeners, path, handler)

def discard_event(path),
do: Conf.new()
|> discard_event(path)
def discard_event(conf = %Conf{}, path),
do: conf
|> Conf.remove_listener(path)

def commit_config(conf = %Conf{}) do
ListenerController.configure(conf)
end
Expand Down
7 changes: 6 additions & 1 deletion lib/reactive_commons/api/handlers_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ defmodule HandlersConfig do
query_listeners: %{},
event_listeners: %{},
notification_event_listeners: %{},
command_listeners: %{}
command_listeners: %{},
discarded_events: %{}
]

def new() do
Expand All @@ -15,4 +16,8 @@ defmodule HandlersConfig do
Map.update!(conf, type, fn listeners -> Map.put(listeners, path, handler) end)
end

def remove_listener(conf = %__MODULE__{}, path) do
Map.update!(conf, :discarded_events, fn discarded -> Map.put(discarded, path, true) end)
end

end
2 changes: 2 additions & 0 deletions lib/reactive_commons/listeners/command_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ defmodule CommandListener do
:ok = AMQP.Queue.bind(chan, command_queue_name, direct_exchange_name, routing_key: command_queue_name)
end

def drop_topology(conn), do: delete_queue(conn, MessageContext.command_queue_name())

end
10 changes: 10 additions & 0 deletions lib/reactive_commons/listeners/event_executor.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
defmodule EventExecutor do
use GenericExecutor, type: :event
require Logger

def get_handler_path(%{meta: %{routing_key: routing_key}}, _), do: routing_key

def when_no_handler(event_name) do
if Map.has_key?(MessageContext.handlers().discarded_events, event_name) do
{:ok, fn message -> Logger.info("Discarding event #{inspect(message)}") end}
else
{:error, :no_handler_for, @message_type, event_name}
end
end

end

6 changes: 6 additions & 0 deletions lib/reactive_commons/listeners/event_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ defmodule EventListener do
for {event_name, _handler} <- :ets.tab2list(@handlers_table) do
:ok = AMQP.Queue.bind(chan, event_queue_name, events_exchange_name, routing_key: event_name)
end
# Remove bindings for explicit discarded events
for event_name <- Map.keys(MessageContext.handlers().discarded_events) do
:ok = AMQP.Queue.unbind(chan, event_queue_name, events_exchange_name, routing_key: event_name)
end
:ok
end

def drop_topology(conn), do: delete_queue(conn, MessageContext.event_queue_name())

end
30 changes: 25 additions & 5 deletions lib/reactive_commons/listeners/generic_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ defmodule GenericListener do

@impl true
def init([]) do
IO.puts("########### STARTING #{@kind} LISTENER #############")
Logger.info("########### STARTING #{@kind} LISTENER #############")
if should_listen() do
:ok = ConnectionsHolder.get_connection_async(__MODULE__)
:ok = create_ets(@handlers_table)
{:ok, struct(__MODULE__, initial_state())}
else
IO.puts("########### #{@kind} LISTENER SKIPPED #############")
:ignore
Logger.info("########### #{@kind} LISTENER SKIPPED #############")
:ok = ConnectionsHolder.get_connection_async(__MODULE__)
{:ok, :stop}
end
end

Expand All @@ -54,10 +55,19 @@ defmodule GenericListener do
:ok = create_topology(chan)
:ok = AMQP.Basic.qos(chan, prefetch_count: prefetch_count)
{:ok, consumer_tag} = AMQP.Basic.consume(chan, queue_name)
IO.puts("########### #{@kind} LISTENER STARTED #############")
Logger.info("########### #{@kind} LISTENER STARTED #############")
{:noreply, %{state | chan: chan, consumer_tag: consumer_tag, conn: conn}}
end

@impl true
def handle_info({:connected, conn}, state = :stop) do
{:ok, chan} = AMQP.Channel.open(conn)
if drop_topology(chan) == :ok do
Logger.info("########### TOPOLOGY DROPPED FOR #{@kind} #############")
end
{:stop, :normal, state}
end

def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, state = %{queue_name: queue}) do
Logger.info("#{@kind} listener registered with consumer #{inspect(consumer_tag)} for queue #{queue}")
{:noreply, %{state | consumer_tag: consumer_tag}}
Expand Down Expand Up @@ -89,6 +99,8 @@ defmodule GenericListener do
spawn_link(@executor, :handle_message, [message_to_handle])
end

def drop_topology(_conn), do: :noop

def get_handlers(), do: %{}

defp save_handlers(handlers) do
Expand All @@ -101,7 +113,7 @@ defmodule GenericListener do
GenServer.cast(__MODULE__, {:save_handlers, get_handlers()})
end

defoverridable consume: 3, get_handlers: 0
defoverridable consume: 3, get_handlers: 0, drop_topology: 1

end
end
Expand All @@ -114,6 +126,14 @@ defmodule GenericListener do
{:ok, _} = AMQP.Queue.declare(chan, origin_queue <> ".DLQ", durable: true, arguments: args)
end

def delete_queue(conn, origin_queue) do
if MessageContext.with_dlq_retry() do
{:ok, _details} = AMQP.Queue.delete(conn, origin_queue <> ".DLQ")
end
{:ok, _details} = AMQP.Queue.delete(conn, origin_queue)
:ok
end

def get_correlation_id(props = %{headers: _headers}) do
get_header_value(props, "x-correlation-id")
end
Expand Down
2 changes: 2 additions & 0 deletions lib/reactive_commons/listeners/query_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ defmodule QueryListener do
:ok = AMQP.Queue.bind(chan, query_queue_name, direct_exchange_name, routing_key: query_queue_name)
end

def drop_topology(conn), do: delete_queue(conn, MessageContext.query_queue_name())

end
9 changes: 7 additions & 2 deletions lib/reactive_commons/messaging/generic_executor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ defmodule GenericExecutor do
try do
event = decode(msg)
handler_path = get_handler_path(msg, event)
[{_path, handler_fn}] = :ets.lookup(table, handler_path)
{:ok, handler_fn} = case :ets.lookup(table, handler_path) do
[{_path, handler_fn}] -> {:ok, handler_fn}
[] -> when_no_handler(handler_path)
end
handler_result = handler_fn.(event)
on_post_process(handler_result, msg)
report_to_telemetry(@message_type, handler_path, calc_duration(t0), :success)
Expand Down Expand Up @@ -64,9 +67,11 @@ defmodule GenericExecutor do
Poison.decode!(payload)
end

def when_no_handler(path), do: {:error, :no_handler_for, @message_type, path}

def on_post_process(_, _), do: :noop

defoverridable decode: 1, on_post_process: 2
defoverridable decode: 1, on_post_process: 2, when_no_handler: 1

end
end
Expand Down
10 changes: 6 additions & 4 deletions lib/reactive_commons/runtime/listener_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ defmodule ListenerController do
def handle_call({:configure_handlers, conf = %HandlersConfig{}}, _from, state) do
Logger.info "Configuring handlers and starting listeners"
MessageContext.save_handlers_config(conf)
DynamicSupervisor.start_child(ListenerController.Supervisor, QueryListener)
DynamicSupervisor.start_child(ListenerController.Supervisor, EventListener)
DynamicSupervisor.start_child(ListenerController.Supervisor, NotificationEventListener)
DynamicSupervisor.start_child(ListenerController.Supervisor, CommandListener)
DynamicSupervisor.start_child(ListenerController.Supervisor, transient(QueryListener))
DynamicSupervisor.start_child(ListenerController.Supervisor, transient(EventListener))
DynamicSupervisor.start_child(ListenerController.Supervisor, transient(NotificationEventListener))
DynamicSupervisor.start_child(ListenerController.Supervisor, transient(CommandListener))
{:reply, :ok, state}
end

defp transient(module), do: Supervisor.child_spec({module, []}, restart: :transient)

end
4 changes: 2 additions & 2 deletions lib/reactive_commons/runtime/message_runtime.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ defmodule MessageRuntime do
{MessageContext, config},
{ReplyRouter, []},
{ConnectionsHolder, []},
{ReplyListener, []},
Supervisor.child_spec({ReplyListener, []}, restart: :transient),
{MessageSender, []},
{ListenerController, []},
]
Supervisor.init(children, strategy: :rest_for_one)
end

end
end
8 changes: 4 additions & 4 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ defmodule ReactiveCommons.MixProject do
defp deps do
[
# {:dep_from_hexpm, "~> 0.3.0"},
{:poison, "~> 4.0"},
{:amqp, "~> 1.4"},
{:poison, "~> 5.0"},
{:amqp, "~> 3.1"},
{:uuid, "~> 1.1"},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:telemetry, "~> 0.4.2"},
{:mock, "~> 0.3.0", only: :test}
{:telemetry, "~> 1.0"},
{:mock, "~> 0.3", only: :test}
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
Expand Down
20 changes: 9 additions & 11 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
%{
"amqp": {:hex, :amqp, "1.6.0", "736d976f53780bcee72ccc50f7eb36e3d5881354480b1e34ceac84e3bbd954e9", [:mix], [{:amqp_client, "~> 3.8.0", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "b8168919ea1f1571a68349421e6501d1db7f559c4db3b8324ae90dbd3e2c1c4a"},
"amqp_client": {:hex, :amqp_client, "3.8.21", "607edc90e44a2dda6d605139b12f338589c1aa322e6002929d5a7a13ed73a172", [:make, :rebar3], [{:rabbit_common, "3.8.21", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "5ee3ca42b58cd56c1867543929c4dedd2ed77f8bc91321cfdd221271fc2718f2"},
"amqp": {:hex, :amqp, "3.1.1", "a96ee272d196dfd1bf4ffc15dc7dcf900004d928dbdc6f5fcb80e6b0da03927c", [:mix], [{:amqp_client, "~> 3.9.1", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "ee7ca576351b4629b6be0701db8c085e203e242c577c59f344be56ef5a262056"},
"amqp_client": {:hex, :amqp_client, "3.9.11", "4ebe8040be3ee195e42bb483d37cd64faf3c306201dc22a3f5cce2a91a9e562e", [:make, :rebar3], [{:rabbit_common, "3.9.11", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "cdd74bc8e9d5e8610975009dcae1293bdf7198ee6d8315a1ffb5055467010520"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "2.4.0", "9fb57683b84899ca3546b384e59ab5d3054a9f334eba50d74c82cd0ae82dd6ca", [:rebar3], [], "hexpm", "d28a89830e30698b075de9a4dbe683a20685c6bed1e3b7df744a0c06e6ff200a"},
"earmark_parser": {:hex, :earmark_parser, "1.4.17", "6f3c7e94170377ba45241d394389e800fb15adc5de51d0a3cd52ae766aafd63f", [:mix], [], "hexpm", "f93ac89c9feca61c165b264b5837bf82344d13bebc634cd575cb711e2e342023"},
"ex_doc": {:hex, :ex_doc, "0.25.5", "ac3c5425a80b4b7c4dfecdf51fa9c23a44877124dd8ca34ee45ff608b1c6deb9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "688cfa538cdc146bc4291607764a7f1fcfa4cce8009ecd62de03b27197528350"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"},
"earmark_parser": {:hex, :earmark_parser, "1.4.20", "89970db71b11b6b89759ce16807e857df154f8df3e807b2920a8c39834a9e5cf", [:mix], [], "hexpm", "1eb0d2dabeeeff200e0d17dc3048a6045aab271f73ebb82e416464832eb57bdd"},
"ex_doc": {:hex, :ex_doc, "0.28.2", "e031c7d1a9fc40959da7bf89e2dc269ddc5de631f9bd0e326cbddf7d8085a9da", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "51ee866993ffbd0e41c084a7677c570d0fc50cb85c6b5e76f8d936d9587fa719"},
"jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"},
"lager": {:hex, :lager, "3.9.2", "4cab289120eb24964e3886bd22323cb5fefe4510c076992a23ad18cf85413d8c", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm", "7f904d9e87a8cb7e66156ed31768d1c8e26eba1d54f4bc85b1aa4ac1f6340c28"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.2", "dc72dfe17eb240552857465cc00cce390960d9a0c055c4ccd38b70629227e97c", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "fd23ae48d09b32eff49d4ced2b43c9f086d402ee4fd4fcb2d7fad97fa8823e75"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"mock": {:hex, :mock, "0.3.7", "75b3bbf1466d7e486ea2052a73c6e062c6256fb429d6797999ab02fa32f29e03", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "4da49a4609e41fd99b7836945c26f373623ea968cfb6282742bcb94440cf7e5c"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"poison": {:hex, :poison, "4.0.1", "bcb755a16fac91cad79bfe9fc3585bb07b9331e50cfe3420a24bcc2d735709ae", [:mix], [], "hexpm", "ba8836feea4b394bb718a161fc59a288fe0109b5006d6bdf97b6badfcf6f0f25"},
"rabbit_common": {:hex, :rabbit_common, "3.8.21", "97ecdc566d6be82ad8dd05b9661d3915853ecd6536de92f6221682fa69e9ae4b", [:make, :rebar3], [{:credentials_obfuscation, "2.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:jsx, "3.1.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.9.2", [hex: :lager, repo: "hexpm", optional: false]}, {:recon, "2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "a4f895ec7571aa14064b361aafc3934d9f26846ef2c677ebcc5b25cb8b011385"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"poison": {:hex, :poison, "5.0.0", "d2b54589ab4157bbb82ec2050757779bfed724463a544b6e20d79855a9e43b24", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "11dc6117c501b80c62a7594f941d043982a1bd05a1184280c0d9166eb4d8d3fc"},
"rabbit_common": {:hex, :rabbit_common, "3.9.11", "25df900b1aec7357c90253cc4528b43c5ff064f27c8c627707b747ae986ebf77", [:make, :rebar3], [{:credentials_obfuscation, "2.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:jsx, "3.1.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:recon, "2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "1bcac63760a0bf0e55d7d3c2ff36ed2310e0b560bd110a5a2d602d76d9c08e1a"},
"recon": {:hex, :recon, "2.5.1", "430ffa60685ac1efdfb1fe4c97b8767c92d0d92e6e7c3e8621559ba77598678a", [:mix, :rebar3], [], "hexpm", "5721c6b6d50122d8f68cccac712caa1231f97894bab779eff5ff0f886cb44648"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
"telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
}
2 changes: 1 addition & 1 deletion samples/query-client/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule QueryServer.MixProject do
[
# {:dep_from_hexpm, "~> 0.3.0"},
{:reactive_commons, path: "../..", override: true},
{:plug_cowboy, "~> 2.2"},
{:plug_cowboy, "~> 2.5"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
Expand Down
Loading