Non-blocking php client for pgmq. See the extension installation guide.
composer require thesis/pgmqSince you most likely expect exactly-once semantics from a database-based queue, all requests — sending or processing business logic with message acknowledgments — must be transactional.
And the transaction object is short-lived: it cannot be used after rollback() or commit(), so it cannot be made a dependency.
That's why all the API is built on functions that take Amp\Postgres\PostgresLink as their first parameter, which can be either a transaction object or just a connection.
And only the consumer accepts Amp\Postgres\PostgresConnection, because it itself opens transactions for reading and acknowledging messages transactionally.
- Create queue
- Create unlogged queue
- Create partitioned queue
- List queues
- List queue metrics
- List queue metadata
- Drop queue
- Purge queue
- Send message
- Send message with relative delay
- Send message with absolute delay
- Send batch
- Send batch with relative delay
- Send batch with absolute delay
- Read message
- Read batch
- Pop message
- Read batch with poll
- Set visibility timeout
- Archive message
- Archive batch
- Delete message
- Delete batch
- Enable notify insert
- Disable notify insert
- Consume messages
<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createUnloggedQueue($pg, 'events');<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createPartitionedQueue(
pg: $pg,
queue: 'events',
partitionInterval: 10000,
retentionInterval: 100000,
);<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
foreach (Pgmq\listQueues($pg) as $queue) {
$md = $queue->metadata();
var_dump($md);
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
foreach (Pgmq\metrics($pg) as $metrics) {
var_dump($metrics);
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
foreach (Pgmq\listQueueMetadata($pg) as $md) {
var_dump($md);
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$queue->drop();<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
var_dump($queue->purge());<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'));<?php
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
TimeSpan::fromSeconds(5),
);<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new \DateTimeImmutable('+5 seconds'),
);<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch([
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
]);<?php
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
[
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
],
TimeSpan::fromSeconds(5),
);<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
[
new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
],
new \DateTimeImmutable('+5 seconds'),
);<?php
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read(TimeSpan::fromSeconds(20));<?php
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->readBatch(10, TimeSpan::fromSeconds(20));<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->pop();<?php
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messages = $queue->readPoll(
batch: 10,
maxPoll: TimeSpan::fromSeconds(5),
pollInterval: TimeSpan::fromMilliseconds(250),
);<?php
use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();
if ($message !== null) {
// handle the message
$queue->setVisibilityTimeout($message->id, TimeSpan::fromSeconds(10));
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();
if ($message !== null) {
$queue->archive($message->id);
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];
if ($messages !== []) {
$queue->archiveBatch(array_map(
static fn(Pgmq\Message $message): int => $messages->id),
$messages,
);
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();
if ($message !== null) {
$queue->delete($message->id);
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];
if ($messages !== []) {
$queue->deleteBatch(array_map(
static fn(Pgmq\Message $message): int => $messages->id),
$messages,
);
}<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$channel = $queue->enableNotifyInsert(); // postgres channel to listen is returned<?php
use Thesis\Pgmq;
use Amp\Postgres;
$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));
$queue = Pgmq\createQueue($pg, 'events');
$queue->disableNotifyInsert();This functionality is not a standard feature of the pgmq extension, but is provided by the library as an add-on for reliable and correct processing of message batches from the queue, with the ability to ack, nack (with delay) and archive (term) messages from the queue.
- First of all, create the extension if it doesn't exist yet:
<?php
declare(strict_types=1);
use Thesis\Pgmq;
Pgmq\createExtension($pg);- Then create a queue:
<?php
declare(strict_types=1);
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');- Next, create the consumer object:
<?php
declare(strict_types=1);
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);- Now we can proceed to configure the queue consumer handler:
<?php
declare(strict_types=1);
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(
static function (array $messages, Pgmq\ConsumeController $ctrl): void {
var_dump($messages);
$ctrl->ack($messages);
},
new Pgmq\ConsumeConfig(
queue: 'events',
),
);Through Pgmq\ConsumeConfig you can configure:
- the
batchsize of received messages; - the message visibility timeout;
- enable monitoring for queue inserts via the LISTEN/NOTIFY mechanism;
- and set the polling interval.
At least one of these settings — listenForInserts or pollTimeout — must be specified.
Through the Pgmq\ConsumeController, you can:
- ack messages, causing them to be deleted from the queue;
- nack messages with a delay, setting a visibility timeout for them;
- terminate processing (when a message can no longer be retried), resulting in them being archived;
- stop the consumer.
Since receiving messages and acking/nacking them occur within the same transaction, for your own database queries you must use the ConsumeController::$tx object to ensure exactly-once semantics for message processing.
<?php
declare(strict_types=1);
use Thesis\Pgmq;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(
static function (array $messages, Pgmq\ConsumeController $ctrl): void {
$ctrl->tx->execute('...some business logic');
$ctrl->ack($messages);
},
new Pgmq\ConsumeConfig(
queue: 'events',
),
);Using ConsumeContext, you can gracefully stop the consumer, waiting for the current batch to finish processing.
<?php
declare(strict_types=1);
use Thesis\Pgmq;
use function Amp\trapSignal;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(
static function (array $messages, Pgmq\ConsumeController $ctrl): void {
$ctrl->tx->execute('...some business logic');
$ctrl->ack($messages);
},
new Pgmq\ConsumeConfig(
queue: 'events',
),
);
trapSignal([\SIGINT, \SIGTERM])
$context->stop();
$context->awaitCompletion();Or stop all current consumers using $consumer->stop():
<?php
declare(strict_types=1);
use Thesis\Pgmq;
use function Amp\trapSignal;
Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
$consumer = Pgmq\createConsumer($pg);
$context = $consumer->consume(...);
trapSignal([\SIGINT, \SIGTERM])
$consumer->stop();
$context->awaitCompletion();\
The MIT License (MIT). Please see License File for more information.