Skip to content

Commit 6af37a0

Browse files
authored
Merge pull request #247 from patchlevel/command-bus-config
add command bus service
2 parents 8a0778d + 478ef55 commit 6af37a0

File tree

5 files changed

+141
-24
lines changed

5 files changed

+141
-24
lines changed

docs/pages/configuration.md

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -417,20 +417,8 @@ patchlevel_event_sourcing:
417417

418418
## Command Bus
419419

420-
You can also enable and register our handlers in symfony messenger.
421-
These handlers allow you to use your aggregates as command handlers.
422-
423-
```yaml
424-
patchlevel_event_sourcing:
425-
aggregate_handlers: ~
426-
```
427-
!!! note
428-
429-
You can find out more about the command bus integration [here](https://event-sourcing.patchlevel.io/latest/command_bus/).
430-
431-
Default the handlers will be registered for all buses.
432-
You can also specify a specific bus.
433-
Before you have to define the bus in the messenger configuration.
420+
You can enable the command bus integration to use your aggregates as command handlers.
421+
For this bundle we provide only a symfony messenger integration, so you have to define the bus in the messenger configuration.
434422

435423
```yaml
436424
framework:
@@ -439,17 +427,19 @@ framework:
439427
buses:
440428
command.bus: ~
441429
```
442-
!!! tip
443-
444-
This is also useful if you have a event bus and a command bus.
445-
446-
And then you can specify the bus in the configuration.
430+
After this, you need to define the command bus in the event sourcing configuration.
431+
This will automatically register the aggregate handlers for the symfony messenger,
432+
so you can handle commands with your aggregates.
447433

448434
```yaml
449435
patchlevel_event_sourcing:
450-
aggregate_handlers:
451-
bus: command.bus
436+
command_bus:
437+
service: command.bus
452438
```
439+
!!! note
440+
441+
You can find out more about the command bus and the aggregate handlers [here](https://event-sourcing.patchlevel.io/latest/command_bus/).
442+
453443
## Event Bus
454444

455445
You can enable the event bus to listen for events and messages synchronously.
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Patchlevel\EventSourcingBundle\CommandBus;
6+
7+
use Patchlevel\EventSourcing\CommandBus\CommandBus;
8+
use Symfony\Component\Messenger\MessageBusInterface;
9+
10+
final class SymfonyCommandBus implements CommandBus
11+
{
12+
public function __construct(
13+
private readonly MessageBusInterface $messageBus,
14+
) {
15+
}
16+
17+
public function dispatch(object $command): void
18+
{
19+
$this->messageBus->dispatch($command);
20+
}
21+
}

src/DependencyInjection/Configuration.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
/**
1111
* @psalm-type Config = array{
1212
* event_bus: array{enabled: bool, type: string, service: string},
13+
* command_bus: array{enabled: bool, service: string},
1314
* subscription: array{
1415
* store: array{type: string, service: string|null},
1516
* retry_strategy: array{base_delay: int, delay_factor: int, max_attempts: int},
@@ -238,12 +239,26 @@ public function getConfigTreeBuilder(): TreeBuilder
238239
->end()
239240
->end()
240241

242+
->arrayNode('command_bus')
243+
->canBeEnabled()
244+
->addDefaultsIfNotSet()
245+
->children()
246+
->scalarNode('service')->isRequired()->end()
247+
->booleanNode('register_aggregate_handlers')->defaultTrue()->end()
248+
->end()
249+
->end()
250+
241251
->arrayNode('aggregate_handlers')
242252
->canBeEnabled()
243253
->addDefaultsIfNotSet()
244254
->children()
245255
->scalarNode('bus')->defaultNull()->end()
246256
->end()
257+
->setDeprecated(
258+
'patchlevel/event-sourcing-bundle',
259+
'3.9',
260+
'The "%node%" option is deprecated and will be removed in 4.0. Use "patchlevel_event_sourcing.command_bus" instead.'
261+
)
247262
->end()
248263

249264
->end();

src/DependencyInjection/PatchlevelEventSourcingExtension.php

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Patchlevel\EventSourcing\Attribute\Subscriber;
2222
use Patchlevel\EventSourcing\Clock\FrozenClock;
2323
use Patchlevel\EventSourcing\Clock\SystemClock;
24+
use Patchlevel\EventSourcing\CommandBus\CommandBus;
2425
use Patchlevel\EventSourcing\Console\Command\DatabaseCreateCommand;
2526
use Patchlevel\EventSourcing\Console\Command\DatabaseDropCommand;
2627
use Patchlevel\EventSourcing\Console\Command\DebugCommand;
@@ -107,6 +108,7 @@
107108
use Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberHelper;
108109
use Patchlevel\EventSourcingBundle\Attribute\AsListener;
109110
use Patchlevel\EventSourcingBundle\Command\StoreMigrateCommand;
111+
use Patchlevel\EventSourcingBundle\CommandBus\SymfonyCommandBus;
110112
use Patchlevel\EventSourcingBundle\DataCollector\EventSourcingCollector;
111113
use Patchlevel\EventSourcingBundle\DataCollector\MessageCollectorEventBus;
112114
use Patchlevel\EventSourcingBundle\Doctrine\DbalConnectionFactory;
@@ -158,7 +160,7 @@ public function load(array $configs, ContainerBuilder $container): void
158160
$this->configureUpcaster($container);
159161
$this->configureSerializer($config, $container);
160162
$this->configureMessageDecorator($container);
161-
$this->configureAggregateHandlers($config, $container);
163+
$this->configureCommandBus($config, $container);
162164
$this->configureEventBus($config, $container);
163165
$this->configureConnection($config, $container);
164166
$this->configureStore($config, $container);
@@ -218,13 +220,36 @@ private function configureSerializer(array $config, ContainerBuilder $container)
218220
}
219221

220222
/** @param Config $config */
221-
private function configureAggregateHandlers(array $config, ContainerBuilder $container): void
223+
private function configureCommandBus(array $config, ContainerBuilder $container): void
222224
{
225+
if ($config['command_bus']['enabled'] && $config['aggregate_handlers']['enabled']) {
226+
throw new InvalidArgumentException('Remove legacy aggregate_handlers configuration when using command_bus');
227+
}
228+
229+
if ($config['command_bus']['enabled']) {
230+
$container->register(SymfonyCommandBus::class)
231+
->setArguments([
232+
new Reference($config['command_bus']['service']),
233+
]);
234+
235+
$container->setAlias(CommandBus::class, SymfonyCommandBus::class);
236+
237+
$container->setParameter(
238+
'patchlevel_event_sourcing.aggregate_handlers.bus',
239+
$config['command_bus']['service'],
240+
);
241+
242+
return;
243+
}
244+
223245
if (!$config['aggregate_handlers']['enabled']) {
224246
return;
225247
}
226248

227-
$container->setParameter('patchlevel_event_sourcing.aggregate_handlers.bus', $config['aggregate_handlers']['bus']);
249+
$container->setParameter(
250+
'patchlevel_event_sourcing.aggregate_handlers.bus',
251+
$config['aggregate_handlers']['bus'],
252+
);
228253
}
229254

230255
/** @param Config $config */

tests/Unit/PatchlevelEventSourcingBundleTest.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Patchlevel\EventSourcing\Aggregate\CustomId;
1313
use Patchlevel\EventSourcing\Clock\FrozenClock;
1414
use Patchlevel\EventSourcing\Clock\SystemClock;
15+
use Patchlevel\EventSourcing\CommandBus\CommandBus;
1516
use Patchlevel\EventSourcing\CommandBus\Handler\CreateAggregateHandler;
1617
use Patchlevel\EventSourcing\Console\Command\DatabaseCreateCommand;
1718
use Patchlevel\EventSourcing\Console\Command\DatabaseDropCommand;
@@ -70,6 +71,7 @@
7071
use Patchlevel\EventSourcing\Subscription\Store\SubscriptionStore;
7172
use Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessorRepository;
7273
use Patchlevel\EventSourcingBundle\Command\StoreMigrateCommand;
74+
use Patchlevel\EventSourcingBundle\CommandBus\SymfonyCommandBus;
7375
use Patchlevel\EventSourcingBundle\DependencyInjection\PatchlevelEventSourcingExtension;
7476
use Patchlevel\EventSourcingBundle\EventBus\SymfonyEventBus;
7577
use Patchlevel\EventSourcingBundle\PatchlevelEventSourcingBundle;
@@ -547,6 +549,69 @@ public function testCommandHandler(): void
547549
self::assertEquals('command.bus', $tag['bus']);
548550
}
549551

552+
public function testCommandBusAndLegacyConfigurationNotAllowed(): void
553+
{
554+
$container = new ContainerBuilder();
555+
556+
$this->expectException(InvalidArgumentException::class);
557+
$this->expectExceptionMessage('Remove legacy aggregate_handlers configuration when using command_bus');
558+
559+
$this->compileContainer(
560+
$container,
561+
[
562+
'patchlevel_event_sourcing' => [
563+
'connection' => [
564+
'service' => 'doctrine.dbal.eventstore_connection',
565+
],
566+
'aggregates' => [__DIR__ . '/../Fixtures'],
567+
'aggregate_handlers' => [
568+
'bus' => 'command.bus',
569+
],
570+
'command_bus' => [
571+
'service' => 'command.bus',
572+
],
573+
],
574+
]
575+
);
576+
}
577+
578+
public function testCommandBus(): void
579+
{
580+
$container = new ContainerBuilder();
581+
582+
$this->compileContainer(
583+
$container,
584+
[
585+
'patchlevel_event_sourcing' => [
586+
'connection' => [
587+
'service' => 'doctrine.dbal.eventstore_connection',
588+
],
589+
'aggregates' => [__DIR__ . '/../Fixtures'],
590+
'command_bus' => [
591+
'service' => 'command.bus',
592+
],
593+
],
594+
]
595+
);
596+
597+
$handler = $container->get('event_sourcing.handler.profile.create');
598+
599+
self::assertInstanceOf(CreateAggregateHandler::class, $handler);
600+
601+
$handler(new CreateProfile(CustomId::fromString('1')));
602+
603+
$definition = $container->getDefinition('event_sourcing.handler.profile.create');
604+
$tags = $definition->getTag('messenger.message_handler');
605+
606+
self::assertCount(1, $tags);
607+
608+
$tag = $tags[0];
609+
610+
self::assertEquals(CreateProfile::class, $tag['handles']);
611+
self::assertEquals('command.bus', $tag['bus']);
612+
self::assertInstanceOf(SymfonyCommandBus::class, $container->get(CommandBus::class));
613+
}
614+
550615
public function testSnapshotStore(): void
551616
{
552617
$container = new ContainerBuilder();
@@ -1222,6 +1287,7 @@ private function compileContainer(ContainerBuilder $container, array $config): v
12221287

12231288
$container->set('doctrine.dbal.eventstore_connection', $this->prophesize(Connection::class)->reveal());
12241289
$container->set('event.bus', $this->prophesize(MessageBusInterface::class)->reveal());
1290+
$container->set('command.bus', $this->prophesize(MessageBusInterface::class)->reveal());
12251291
$container->set('cache.default', $this->prophesize(CacheItemPoolInterface::class)->reveal());
12261292
$container->set('event_dispatcher', $this->prophesize(EventDispatcherInterface::class)->reveal());
12271293
$container->set('services_resetter', $this->prophesize(ServicesResetter::class)->reveal());

0 commit comments

Comments
 (0)