The Neon MQ Connector is an MQ interface for microservices.
By default, this package will use ovos-config
to read default configuration. In general, configuration should be passed to the
MQConnector object at init.
A global configuration for the MQ Connector may be specified at ~/.config/neon/mq_config.json. This configuration file
may contain the following keys:
server: The hostname or IP address of the MQ server to connect to. If left blank, this defaults to"localhost"port: The port used by the MQ server. If left blank, this defaults to5672users: A mapping of service names to credentials. Note that not all users will have permissions required to access each service.
{
"server": "localhost",
"port": 5672,
"users": {
"<service_name>": {
"username": "<username>",
"password": "<password>"
}
}
}The MQConnector class should be extended by a class providing some specific service.
Service classes will specify the following parameters.
service_name: Name of the service, used to identify credentials in configurationvhost: Virtual host to connect to; messages are all constrained to this namespace.consumers: Dict of names toConsumerThreadobjects. AConsumerThreadwill accept a connection to a particularconnection, aqueue, and acallback_funcconnection: MQ connection to thevhostspecified above.queue: Queue to monitor within thevhost. Avhostmay handle multiple queues.callback_func: Function to call when a message arrives in thequeue
A callback function should have the following signature:
def handle_api_input(self,
channel: pika.channel.Channel,
method: pika.spec.Basic.Return,
properties: pika.spec.BasicProperties,
body: bytes):
"""
Handles input requests from MQ to Neon API
:param channel: MQ channel object (pika.channel.Channel)
:param method: MQ return method (pika.spec.Basic.Return)
:param properties: MQ properties (pika.spec.BasicProperties)
:param body: request body (bytes)
"""Generally, body should be decoded into a dict, and that dict should contain message_id. The message_id should
be included in the body of any response to associate the response to the request.
A response may be sent via:
channel.queue_declare(queue='<queue>')
channel.basic_publish(exchange='',
routing_key='<queue>',
body=<data>,
properties=pika.BasicProperties(expiration='1000')
)Where <queue> is the queue to which the response will be published, and data is a bytes response (generally a base64-encoded dict).
By default, async-based consumers handling based on pika.SelectConnection will
be used
There are a few methods to disable use of async consumers/subscribers.
-
To disable async consumers for a particular class/object, set the class-attribute
async_consumers_enabledtoFalse:from neon_mq_connector import MQConnector class MQConnectorChild(MQConnector): async_consumers_enabled = False
-
To disable the use of async consumers at runtime, set the
MQ_ASYNC_CONSUMERSenvvar toFalseexport MQ_ASYNC_CONSUMERS=false