diff --git a/CHANGES.md b/CHANGES.md index 7be4d0c..fa14434 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,7 @@ # Change Log +# 1.2.0 +- Added new py-amqp based sensor using asyncio (3.8) for concurrency. Details on why: https://github.com/StackStorm/st2/discussions/5743 +- Added pip dependency for `amqp==5.0.6` (py-amqp) # 1.1.1 - Updated pip dependency to pika `1.3.x` to support python >= 3.7 diff --git a/README.md b/README.md index 2bef8d3..f7fa619 100644 --- a/README.md +++ b/README.md @@ -3,8 +3,8 @@ Pack which allows integration with [RabbitMQ](http://www.rabbitmq.com/). ## Configuration - -Configuration is required to use the RabbitMQ sensor. Copy the example configuration +### rabbitmq.RabbitMQQueueSensor Sensor +Configuration is required to use the `pika` based RabbitMQ sensor. Copy the example configuration in [rabbitmq.yaml.example](./rabbitmq.yaml.example) to `/opt/stackstorm/configs/rabbitmq.yaml` and edit as required. @@ -23,6 +23,7 @@ You can also use dynamic values from the datastore. See the You can specify multiple queues using this syntax: ```yaml +--- sensor_config: rabbitmq_queue_sensor: queues: @@ -33,6 +34,40 @@ sensor_config: - queue2 ``` +### rabbitmq.QueueWatcherAMQP Sensor +You must configure this sensor under the top level configuration key `amqp_watcher_sensor_config`. This is to ensure backwards compatibility with other sensors. + +The below config will declare a simple 'Classic' queue, with a 'direct' exchange with routing key to route messages to that queue (for publishers). + +```yaml +--- +amqp_watcher_sensor_config: + host: "rabbitmq.domain.com" + port: 5672 + username: "guest" + password: "guest" + queues: + - queue: "temp_queue" + type: "classic" + exchanges: + - exchange: "temp_exchange" + type: "direct" + bindings: + - routing_key: "temp.messages" + queue: "temp_queue" +``` +The sensor then monitors any declared queues for new messages, and dispatches the trigger `rabbitmq.amqp_msg_rx` with the data: +```json +{"queue": "queue_name", "body": "message body"} +``` + +If the message body is a serialized string of JSON, it will be deserialized and loaded before being dispatched. + +#### Queues / Exchanges Config Parameters +The sensor uses passthrough via `**kwargs` for declares and binds on `queues` and `exchanges` list items, so follow the [documentation of `py-yaml`](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html) for the `exchange_declare()`, `queue_declare()`, and `queue_bind()` methods. + +**Note:** The `exchange` param is passed into `queue_bind()` explicitly via the sensor inferring it from config structure, and is not required. + ## Actions * ``list_exchanges`` - List available exchanges. @@ -50,11 +85,19 @@ The following action will publish a message to a remote RabbitMQ server with a s $ st2 run rabbitmq.publish_message host=localhost port=5673 virtual_host=sensu exchange=metrics exchange_type=topic username=sensu password=password message="foo.bar.baz 1 1436802746" ``` - ## Sensors * ``new_message`` - Sensor that triggers a rabbitmq.new_message with a payload containing the queue and the body +Configured in the pack config under the key `sensor_config` + +This sensor uses Python's `threading` for concurrency via `pika`'s included methods. + This sensor should only be used with ``fanout`` and ``topic`` exchanges, this way it doesn't affect the behavior of the app since messages will still be delivered to other consumers / subscribers. If it's used with ``direct`` or ``headers`` exchanges, those messages won't be delivered to other consumers so it will affect app behavior and potentially break it. +* `amqp_msg_rx` - Sensor that triggers rabbitmq.amqp_msg_rx with a payload containing the queue and the body + +Configured in the pack config under the key `amqp_watcher_sensor_config` + +This sensor uses `asyncio` for concurrency and `py-amqp` and to handle the declaration of configured queues and exchanges on the remote RabbitMQ instance, and the subsequent consumption of messages. \ No newline at end of file diff --git a/config.schema.yaml b/config.schema.yaml index c2f4e6c..7a5e6cf 100644 --- a/config.schema.yaml +++ b/config.schema.yaml @@ -2,7 +2,7 @@ sensor_config: description: "RabbitMQ Sensor settings" type: "object" - required: true + required: false additionalProperties: true properties: host: @@ -41,3 +41,117 @@ sensor_config: - "json" - "pickle" required: false +amqp_watcher_sensor_config: + description: "The config for a py-amqp sensor that creates and monitors queues in RabbitMQ for messages" + required: false + additionalProperties: false + type: "object" + properties: + host: + description: "The RabbitMQ host to connect to" + type: "string" + required: true + port: + description: "Connection port for RabbitMQ (Default: 5672)" + type: "number" + username: + description: "Username for authenticating to RabbitMQ" + type: "string" + required: true + password: + description: "Password for authenticating to RabbitMQ" + type: "string" + required: true + secret: true + queues: + description: "A list of queues to be declared (created if missing)" + type: "array" + required: false + additionalProperties: false + items: + type: "object" + additionalProperties: false + required: true + properties: + queue: + description: "The name of the queue" + type: "string" + required: true + type: + description: "The type of the queue" + type: "string" + required: false + enum: + - "classic" + - "quorum" + - "stream" + passive: + description: "Configure this queue as passive?" + type: "boolean" + required: false + durable: + description: "Configure this queue as durable?" + type: "boolean" + required: false + exclusive: + description: "Configure this queue as exclusive?" + type: "boolean" + required: false + auto_delete: + description: "Configure this queue to auto delete?" + type: "boolean" + required: false + arguments: + description: "Additional arguments to be pased during Queue declare" + type: "object" + required: false + additionalProperties: true + exchanges: + description: "A list of exchanges to be declared (created if missing)" + type: "array" + required: false + additionalProperties: false + items: + type: "object" + additionalProperties: false + required: true + properties: + exchange: + description: "The name of the exchange" + type: "string" + required: true + type: + description: "The type of the exchange" + type: "string" + required: false + enum: + - "direct" + - "fanout" + - "headers" + - "topic" + passive: + description: "Configure this exchange as passive?" + type: "boolean" + required: false + durable: + description: "Configure this exchange as durable?" + type: "boolean" + required: false + auto_delete: + description: "Configure this exchange to auto delete?" + type: "boolean" + required: false + arguments: + description: "Additional arguments to be pased during Exchange declare" + type: "object" + required: false + additionalProperties: true + bindings: + description: "A list of bindings to be declared for this exchange" + type: "array" + required: false + additionalProperties: false + items: + type: "object" + required: true + additionalProperties: true diff --git a/pack.yaml b/pack.yaml index 8882577..5b42a6c 100644 --- a/pack.yaml +++ b/pack.yaml @@ -9,7 +9,7 @@ keywords: - aqmp - stomp - message broker -version: 1.1.1 +version: 1.2.0 python_versions: - "3" author: StackStorm, Inc. diff --git a/rabbitmq.yaml.example b/rabbitmq.yaml.example index 2f7103e..ebf12cf 100644 --- a/rabbitmq.yaml.example +++ b/rabbitmq.yaml.example @@ -7,3 +7,17 @@ sensor_config: queues: - "queue1" deserialization_method: "json" +amqp_watcher_sensor_config: + host: "rabbitmq.domain.com" + port: 5672 + username: "guest" + password: "guest" + queues: + - queue: "temp_queue" + type: "classic" + exchanges: + - exchange: "temp_exchange" + type: "direct" + bindings: + - routing_key: "temp.messages" + queue: "temp_queue" diff --git a/requirements.txt b/requirements.txt index 5bb93bb..1ed7e2f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ pika~=1.3.1 +amqp==5.0.6 diff --git a/sensors/queue_watcher_amqp.py b/sensors/queue_watcher_amqp.py new file mode 100644 index 0000000..dc002f9 --- /dev/null +++ b/sensors/queue_watcher_amqp.py @@ -0,0 +1,389 @@ +""" +Author Notes: +Why does this sensor exist and uses asyncio rather than threading, multiprocessing, eventlet, etc? + +Short Answer: +These concurrency methods all seem to suffer a similar issue: They do not work reliably when ran +inside the st2sensorcontainer wrapper while also in the st2sensorcontainer official docker image. +However in various other environments, they work fine. + +(Very) Long Answer: +https://github.com/StackStorm/st2/discussions/5743 +""" + +import asyncio +import json +from copy import deepcopy +from typing import Any + +import amqp +from st2reactor.sensor.base import Sensor + +SENSOR_CONFIG_KEY = "amqp_watcher_sensor_config" +DEFAULT_RABBITMQ_PORT = 5672 +DISAPTCH_TRIGGER_NAME = "rabbitmq.amqp_msg_rx" +DESERIALIZATION_FUNCTIONS: dict = {"json": json.loads} +RETRY_DELAY = 3 # seconds + +# Default Queue Parameters by Queue type +DEFAULT_QUEUE_PARAMS_CLASSIC: dict = { + "arguments": {"x-queue-type": "classic"}, + "auto_delete": True, + "durable": False, + "passive": False, + "exclusive": False, + "nowait": False, +} +DEFAULT_QUEUE_PARAMS_QUORUM: dict = { + "arguments": {"x-queue-type": "quorum"}, + "auto_delete": False, + "durable": True, + "passive": False, + "exclusive": False, + "nowait": False, +} +DEFAULT_QUEUE_PARAMS_STREAM: dict = { + "arguments": {"x-queue-type": "stream"}, + "auto_delete": False, + "durable": True, + "passive": False, + "exclusive": False, + "nowait": False, +} +DEFAULT_QUEUE_TYPE = "classic" +VALID_QUEUE_TYPES: dict = { + "classic": DEFAULT_QUEUE_PARAMS_CLASSIC, + "quorum": DEFAULT_QUEUE_PARAMS_QUORUM, + "stream": DEFAULT_QUEUE_PARAMS_STREAM, +} + +# Default Exchange Parameters by Exchange type +DEFAULT_EXCHANGE_PARAMS_DIRECT: dict = { + "arguments": {}, + "type": "direct", + "auto_delete": True, + "durable": False, + "passive": False, + "nowait": False, +} +DEFAULT_EXCHANGE_PARAMS_FANOUT: dict = { + "arguments": {}, + "type": "fanout", + "auto_delete": True, + "durable": False, + "passive": False, + "nowait": False, +} +DEFAULT_EXCHANGE_PARAMS_HEADERS: dict = { + "arguments": {}, + "type": "headers", + "auto_delete": True, + "durable": False, + "passive": False, + "nowait": False, +} +DEFAULT_EXCHANGE_PARAMS_TOPIC: dict = { + "arguments": {}, + "type": "topic", + "auto_delete": True, + "durable": False, + "passive": False, + "nowait": False, +} +DEFAULT_EXCHANGE_TYPE = "direct" +VALID_EXCHANGE_TYPES: dict = { + "direct": DEFAULT_EXCHANGE_PARAMS_DIRECT, + "fanout": DEFAULT_EXCHANGE_PARAMS_FANOUT, + "headers": DEFAULT_EXCHANGE_PARAMS_HEADERS, + "topic": DEFAULT_EXCHANGE_PARAMS_TOPIC, +} + + +class QueueWatcherAMQP(Sensor): + """Sensor to watch queues for messages and dispatch trigger + + Defined Queues and Exchanges will be 'declared' on when sensor_wrapper runs setup() + + Documentation: + py-amqp: https://docs.celeryq.dev/projects/amqp/en/latest/reference/ + asyncio: https://docs.python.org/3.8/whatsnew/3.8.html#asyncio (python version is important) + """ + + def __init__(self, sensor_service, config=None) -> None: + super(QueueWatcherAMQP, self).__init__( + sensor_service=sensor_service, config=config + ) + self._logger = self.sensor_service.get_logger(name=self.__class__.__name__) + self._dispatch = self.sensor_service.dispatch + self.client = None + self.sensor_config: dict = dict() + + self.sensor_config = config[SENSOR_CONFIG_KEY] + self._logger.debug(f"Sensor config under key: {SENSOR_CONFIG_KEY} found") + + self.client = AMQPConnectionClient( + logger=self._logger, + callback=self._dispatch_trigger, + **self.sensor_config, + ) + + self._logger.debug("__init__() Complete") + + def setup(self): + self.client.setup() + self._logger.debug("setup() Complete") + + def run(self): + self._logger.debug("run(): Begin") + + while True: + asyncio.run(self.client.consume_messages()) + + # Sleep to hard prevent a run away loop + self._logger.debug(f"sleeping for {RETRY_DELAY}s") + asyncio.sleep(RETRY_DELAY) + + self._logger.debug("run(): Complete") + + def cleanup(self): + self.client.conn.close() + self._logger.debug("cleanup() Complete") + + def add_trigger(self, trigger): + pass + + def update_trigger(self, trigger): + pass + + def remove_trigger(self, trigger): + pass + + def _dispatch_trigger(self, queue, body): + """Pre-processing of data before dispatching trigger""" + self._logger.debug(f"Dispatching trigger for Queue: {queue}") + if isinstance(body, str): + try: + deserialize = DESERIALIZATION_FUNCTIONS["json"] + body = deserialize(body) + except Exception: + # Ignore exceptions + pass + else: + body = str(body) + + payload = {"queue": queue, "body": body} + self._dispatch(trigger=DISAPTCH_TRIGGER_NAME, payload=payload) + + +class AMQPConnectionClient: + """A class for managing the connection and activity with the server + + Can be called independently of stackstorm for testing. + Reccomend to source the same virtualenv as the pack + """ + + def __init__( + self, + logger, + callback: Any = None, + host: str = str(), + port: [int, None] = None, + username: str = str(), + password: str = str(), + queues: [list, None] = None, + exchanges: [list, None] = None, + ): + self._logger = logger + self.host: str = host + self.port: int = port if port else DEFAULT_RABBITMQ_PORT + self.username: str = username + self.password: str = password + self.queues: list = queues + self.exchanges: list = exchanges + + self.callback = callback + self.conn = None + self.src_bind: tuple = None + self.channels: dict = dict() + self.queue_channels: dict = dict() + self.ctag: str = str() + + self._setup_conn() + + def setup(self): + """Establish the connection, and declare Queues/Exchanges""" + self._connect() + + # Declare Queues + if self.queues: + for queue_cfg in self.queues: + self._declare_queue(**queue_cfg) + + # Declare Exchanges + if self.exchanges: + for exch_cfg in self.exchanges: + self._declare_exchange(**exch_cfg) + + # Create Bindings for this exchange + for bind in exch_cfg["bindings"]: + self._queue_bind(exchange=exch_cfg["exchange"], **bind) + + async def consume_messages(self): + """The 'run' method for the class + + Begins consumming messages using the channel created for the declared queue + Drains messages to the callback once consumed + + Uses asyncio coroutines to allow multiple queues to be watched simulatneously + + Why use async here? + + amqp.Connection().drain_events() would exit after drain is complete, and being + ran in a forever loop allows the drain to never end. A problem occurs however + since control is never released while waiting for an event to drain. This + results in only 1 queue ever being able to drain at a time, and all others + remain blocked (blindly) until the active queue has an event, drains, and + cycles to the next queue to wait for an event (or drain) + + For this reason async was chosen as a simple way to overcome this issue, as + well as the issue noted at the top of this file (the reason this sensor exists) + """ + for queue, ch_id in self.queue_channels.items(): + # Passing None or empty str to channel.basic_consume(): parameter 'consumer_tag' + # results in a new ctag being generated, returned, and assigned to self.ctag + # Ensures one ctag per instance of this class + self.ctag = self.channels[ch_id].basic_consume( + queue=queue, callback=self._msg_callback, consumer_tag=self.ctag + ) + + while True: + await self._drain_messages() + + async def _drain_messages(self): + """amqp.Connection().drain_events()""" + self.conn.drain_events() + + def _connect(self): + """Actually establish a socket to the server""" + host = f"{self.host}:{self.port}" + + self._logger.debug(f"Attempting to open connection to RabbitMQ @ {host}") + self.conn.connect() + self.src_bind = self.conn.sock.getsockname() + + local = f"{self.src_bind[0]}:{self.src_bind[1]}" + self._logger.debug( + f"Successfully connected to {host} from local binding {local}" + ) + self.channels = self.conn.channels + + def _setup_conn(self): + """Init amqp.Connection() and prepare the connection socket object""" + host = f"{self.host}:{self.port}" + self.conn = amqp.Connection( + host=host, userid=self.username, password=self.password + ) + self._logger.debug( + f"Connection object created for {host} with user: {self.username}" + ) + + def _open_channel(self, channel_id: [int, None] = None): + """Open a channel on this connection""" + # When None is passed to .channel(), the server chooses the next available id automatically + result = self.conn.channel(channel_id=channel_id) + self._logger.debug(f"Channel opened with ID: {result.channel_id}") + return result.channel_id + + def _declare_queue(self, queue: str, channel_id: [int, None] = None, **kwargs): + """Declare a queue based on the provided parameters + Note: **kwargs is transparently passing config keys form the pack config + """ + queue_type = ( + kwargs["type"] + if "type" in kwargs and kwargs["type"] in VALID_QUEUE_TYPES + else DEFAULT_QUEUE_TYPE + ) + + # Opens a channel if the provided channel_id doesn't exist, or was None + if not channel_id or not self.channels.get(channel_id, None): + channel_id = self._open_channel(channel_id=channel_id) + + # Copy the defaults, overwrite the default if the kwarg key is a valid param + # Extract default 'arguments', update custom params + # Re-Add default arguments that weren't overwritten by custom params + params = deepcopy(VALID_QUEUE_TYPES[queue_type]) + default_args = deepcopy(params["arguments"]) + params.update({k: v for k, v in kwargs.items() if k in params}) + params["arguments"].update( + {k: v for k, v in default_args.items() if k not in params["arguments"]} + ) + + self._logger.info(f"Declaring Queue: {queue}") + + self.channels[channel_id].queue_declare(queue=queue, **params) + self.queue_channels.update({queue: channel_id}) + + self._logger.debug(f"Declared queue: {queue} with params: {params}") + + def _declare_exchange(self, exchange: str, channel_id: int = 1, **kwargs): + """Declare an exchange based on the provided parameters + Note: **kwargs is transparently passing config keys form the pack config + """ + exchange_type = ( + kwargs["type"] + if "type" in kwargs and kwargs["type"] in VALID_EXCHANGE_TYPES + else DEFAULT_EXCHANGE_TYPE + ) + + # Copy the defaults, overwrite the default if the kwarg key is a valid param + params = deepcopy(VALID_EXCHANGE_TYPES[exchange_type]) + params.update({k: v for k, v in kwargs.items() if k in params}) + + self._logger.info(f"Declaring Exchange: {exchange}") + + self.channels[channel_id].exchange_declare(exchange=exchange, **params) + + self._logger.debug(f"Declared exchange: {exchange} with params: {params}") + + def _queue_bind( + self, + exchange: str, + queue: str, + routing_key: str, + arguments: dict = dict(), + channel_id: int = 1, + ): + """Bind a queue to an exchange based on provided parameters""" + self._logger.debug( + f"Binding queue: {queue} to exchange: {exchange} with routing key: {routing_key}" + ) + + self.channels[channel_id].queue_bind( + queue=queue, exchange=exchange, routing_key=routing_key, arguments=arguments + ) + + def _get_queue_by_ch_id(self, ch_id: int): + """Look up a queue name by a known channel_id""" + search = self.queue_channels + return list(search.keys())[list(search.values()).index(ch_id)] + + def _msg_callback(self, message): + """Pre-Processes message details before returning to the call back function + If no callback was provided, the message will print to console and log (for testing) + """ + body = message.body.decode("utf-8") + ctag = message.delivery_info["consumer_tag"] + ch_id = message.channel.channel_id + queue = self._get_queue_by_ch_id(ch_id) + + self._logger.info(f"Received message in {queue} for consumer {ctag}") + + if self.callback: + self.callback(queue, body) + else: + # Will never happen when called within st2senorcontainer + log_msg = f"_msg_callback(): Message Received ({queue}) {body}" + print(log_msg) + self._logger.info(log_msg) + + self.channels[ch_id].basic_ack(message.delivery_tag) diff --git a/sensors/queue_watcher_amqp.yaml b/sensors/queue_watcher_amqp.yaml new file mode 100644 index 0000000..48bf33b --- /dev/null +++ b/sensors/queue_watcher_amqp.yaml @@ -0,0 +1,16 @@ +--- +class_name: "QueueWatcherAMQP" +entry_point: "queue_watcher_amqp.py" +description: "A py-amqp sensor that creates and monitors queues in RabbitMQ for messages" +trigger_types: + - name: "amqp_msg_rx" + description: "Trigger that fires when a new amqp message is received" + payload_schema: + type: "object" + properties: + queue: + type: "string" + body: + anyOf: + - type: "string" + - type: "object"