composer require thesis/sync-onceuse Amp\TimeoutCancellation;
use Thesis\Amqp\Channel;
use Thesis\Amqp\Client;
use Thesis\Amqp\Message;
use Thesis\Sync\Once;
final readonly class AmqpTransport
{
/**
* @var Once<Channel>
*/
private Once $publishChannel;
public function __construct(
private Client $client,
) {
$this->publishChannel = new Once(
// make sure to use static closures to avoid circular references
function: static fn (): Channel => $client->channel(),
isAlive: static fn (Channel $channel): bool => !$channel->isClosed(),
);
}
public function publish(Message $message): void
{
$this
->publishChannel
->await(new TimeoutCancellation(10))
->publish($message);
}
}