diff --git a/src/Attributes/Autodiscovery/AppliesToChildState.php b/src/Attributes/Autodiscovery/AppliesToChildState.php index 7333ec07..8a0a15c0 100644 --- a/src/Attributes/Autodiscovery/AppliesToChildState.php +++ b/src/Attributes/Autodiscovery/AppliesToChildState.php @@ -37,4 +37,9 @@ public function discoverState(Event $event, StateManager $manager): State return $manager->load($parent->{$this->id}, $this->state_type); } + + public function propertyName(): ?string + { + return $this->id; + } } diff --git a/src/Attributes/Autodiscovery/AppliesToSingletonState.php b/src/Attributes/Autodiscovery/AppliesToSingletonState.php index 3d931074..cfa43154 100644 --- a/src/Attributes/Autodiscovery/AppliesToSingletonState.php +++ b/src/Attributes/Autodiscovery/AppliesToSingletonState.php @@ -24,4 +24,9 @@ public function discoverState(Event $event, StateManager $manager): State { return $manager->singleton($this->state_type); } + + public function propertyName(): ?string + { + return null; + } } diff --git a/src/Attributes/Autodiscovery/AppliesToState.php b/src/Attributes/Autodiscovery/AppliesToState.php index 32dd227e..3d8d926a 100644 --- a/src/Attributes/Autodiscovery/AppliesToState.php +++ b/src/Attributes/Autodiscovery/AppliesToState.php @@ -66,4 +66,9 @@ protected function getStateIdProperty(Event $event): string throw new InvalidArgumentException("No ID property provided AppliesToState for {$this->state_type}"); } + + public function propertyName(): string + { + return $this->id; + } } diff --git a/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php b/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php index 385d97ba..b9a017f8 100644 --- a/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php +++ b/src/Attributes/Autodiscovery/StateDiscoveryAttribute.php @@ -55,4 +55,6 @@ protected function inferAliasFromVariableName(string $name) ? Str::beforeLast($name, '_id') : Str::beforeLast($name, 'Id'); } + + abstract public function propertyName(): ?string; } diff --git a/src/Attributes/Autodiscovery/StateId.php b/src/Attributes/Autodiscovery/StateId.php index 6366241d..2296d185 100644 --- a/src/Attributes/Autodiscovery/StateId.php +++ b/src/Attributes/Autodiscovery/StateId.php @@ -53,4 +53,9 @@ public function discoverState(Event $event, StateManager $manager): State|array return array_map(fn ($id) => $manager->load($id, $this->state_type), Arr::wrap($id)); } + + public function propertyName(): string + { + return $this->property->getName(); + } } diff --git a/src/Attributes/Hooks/DeferFor.php b/src/Attributes/Hooks/DeferFor.php new file mode 100644 index 00000000..750d07dd --- /dev/null +++ b/src/Attributes/Hooks/DeferFor.php @@ -0,0 +1,33 @@ +deferred_attribute = $this; + } +} diff --git a/src/Facades/Verbs.php b/src/Facades/Verbs.php index dce514c4..79205833 100644 --- a/src/Facades/Verbs.php +++ b/src/Facades/Verbs.php @@ -8,16 +8,41 @@ use Thunk\Verbs\Contracts\BrokersEvents; use Thunk\Verbs\Event; use Thunk\Verbs\Lifecycle\Phase; +use Thunk\Verbs\State; use Thunk\Verbs\Testing\BrokerFake; use Thunk\Verbs\Testing\EventStoreFake; /** + * Commits all outstanding events + * * @method static bool commit() + * + * Determines if verbs is currently replaying events. * @method static bool isReplaying() + * + * Executes the given callback only if not replaying events. * @method static void unlessReplaying(callable $callback) + * + * Defers the execution of a callback. It will only get called once per unique constraint. + * @method static void defer(State|string|iterable|null $unique_by, callable $callback, string $name = 'Default') + * + * @param State|string|int|iterable|null $unique_by The uniqueness constraint for the deferred callback. It can be a State, string or array combination of both + * @param callable $callback The callback to be executed + * @param string $name Optional name identifier for the deferred callback, defaults to 'Default'. It's a secondary constraint + * + * Fires an event through the event store. + * * @method static Event fire(Event $event) + * + * @param Event $event The event object to be fired + * * @method static void createMetadataUsing(callable $callback) - * @method static void commitImmediately(bool $commit_immediately = true) + * + * @param callable $callback The callback function that generates metadata + * @return Event The fired event instance + * + * Sets a callback to create metadata for events. + * * @method static EventStoreFake assertCommitted(string|Closure $event, Closure|int|null $callback = null) * @method static EventStoreFake assertNotCommitted(string|Closure $event, ?Closure $callback = null) * @method static EventStoreFake assertNothingCommitted() diff --git a/src/Lifecycle/Broker.php b/src/Lifecycle/Broker.php index 0fdaaa28..afa5848c 100644 --- a/src/Lifecycle/Broker.php +++ b/src/Lifecycle/Broker.php @@ -71,6 +71,7 @@ public function commit(): bool foreach ($events as $event) { $this->metadata->setLastResults($event, $this->dispatcher->handle($event)); } + app(DeferredWriteQueue::class)->flush(); return $this->commit(); } @@ -105,6 +106,7 @@ public function replay(?callable $beforeEach = null, ?callable $afterEach = null } }); } finally { + app(DeferredWriteQueue::class)->flush(); $this->states->writeSnapshots(); $this->states->prune(); $this->states->setReplaying(false); diff --git a/src/Lifecycle/BrokerConvenienceMethods.php b/src/Lifecycle/BrokerConvenienceMethods.php index aab881b6..d7cb1b9a 100644 --- a/src/Lifecycle/BrokerConvenienceMethods.php +++ b/src/Lifecycle/BrokerConvenienceMethods.php @@ -11,6 +11,7 @@ use Thunk\Verbs\Exceptions\EventNotAuthorized; use Thunk\Verbs\Exceptions\EventNotValid; use Thunk\Verbs\Facades\Id; +use Thunk\Verbs\State; use Thunk\Verbs\Support\IdManager; use Thunk\Verbs\Support\Wormhole; @@ -74,6 +75,12 @@ public function unlessReplaying(callable $callback) } } + public function defer(State|string|iterable|null $unique_by, callable $callback, string $name = 'default'): void + { + $states = is_iterable($unique_by) ? $unique_by : [$unique_by]; + app(DeferredWriteQueue::class)->addCallback($states, $callback, $name); + } + public function realNow(): CarbonInterface { return app(Wormhole::class)->realNow(); diff --git a/src/Lifecycle/DeferredWriteQueue.php b/src/Lifecycle/DeferredWriteQueue.php new file mode 100644 index 00000000..bedacbd6 --- /dev/null +++ b/src/Lifecycle/DeferredWriteQueue.php @@ -0,0 +1,89 @@ +property_name) ? $deferred->property_name : [$deferred->property_name]; + + $uniqueByKey = ''; + $states = new StateCollection; + foreach ($propertyNames as $property) { + if ($property === null) { + $uniqueByKey .= 'null'; + + continue; + } + + $states = $states->merge(app(EventStateRegistry::class)->statesForProperty($event, $property)); + } + + $uniqueByKey .= $states->map(fn (State $state) => $state->id)->implode('|'); + + $name = $deferred->name === DeferFor::EVENT_CLASS ? get_class($event) : $deferred->name; + + $this->callbacks[$name][$uniqueByKey][$this->count++] = [$event, $callback, true]; + } + + /** + * @param iterable $states + */ + public function addCallback(iterable $states, callable $callback, string $name): void + { + $id = ''; + foreach ($states as $state) { + if ($state === null) { + $id .= 'null'; + + continue; + } + if (is_string($state)) { + $id .= $state; + + continue; + } + if ($state instanceof State) { + $id .= $state->id; + + continue; + } + throw new \InvalidArgumentException('Invalid state type'); + } + + unset($this->callbacks[$name][$id]); + $this->callbacks[$name][$id][$this->count++] = [null, $callback, false]; + } + + public function flush(): void + { + foreach ($this->callbacks as $namedCallbacks) { + foreach ($namedCallbacks as $stateGroup) { + $lastCallback = end($stateGroup); + [$event, $callback, $isEventCallback] = $lastCallback; + + if ($isEventCallback) { + app(Wormhole::class)->warp($event, $callback); + } else { + $callback(); + } + } + } + + $this->callbacks = []; + $this->count = 0; + } +} diff --git a/src/Lifecycle/Hook.php b/src/Lifecycle/Hook.php index 6d79c4f8..8768cfad 100644 --- a/src/Lifecycle/Hook.php +++ b/src/Lifecycle/Hook.php @@ -7,6 +7,7 @@ use ReflectionMethod; use RuntimeException; use SplObjectStorage; +use Thunk\Verbs\Attributes\Hooks\DeferFor; use Thunk\Verbs\Event; use Thunk\Verbs\Support\DependencyResolver; use Thunk\Verbs\Support\Reflector; @@ -47,6 +48,7 @@ public function __construct( public array $states = [], public SplObjectStorage $phases = new SplObjectStorage, public ?string $name = null, + public ?DeferFor $deferred_attribute = null, ) {} public function forcePhases(Phase ...$phases): static @@ -98,7 +100,12 @@ public function fired(Container $container, Event $event): void public function handle(Container $container, Event $event): mixed { if ($this->runsInPhase(Phase::Handle)) { - return $this->execute($container, $event); + $callable = fn () => $this->execute($container, $event); + if ($this->deferred_attribute && ! $this->deferred_attribute->replay_only) { + app(DeferredWriteQueue::class)->addHook($event, $this->deferred_attribute, $callable); + } else { + return $this->execute($container, $event); + } } return null; @@ -107,7 +114,12 @@ public function handle(Container $container, Event $event): mixed public function replay(Container $container, Event $event): void { if ($this->runsInPhase(Phase::Replay)) { - app(Wormhole::class)->warp($event, fn () => $this->execute($container, $event)); + $callable = fn () => $this->execute($container, $event); + if ($this->deferred_attribute) { + app(DeferredWriteQueue::class)->addHook($event, $this->deferred_attribute, $callable); + } else { + app(Wormhole::class)->warp($event, $callable); + } } } diff --git a/src/Support/EventStateRegistry.php b/src/Support/EventStateRegistry.php index 3e3eabf4..62893bbe 100644 --- a/src/Support/EventStateRegistry.php +++ b/src/Support/EventStateRegistry.php @@ -48,6 +48,18 @@ public function getStates(Event $event): StateCollection return $discovered; } + public function statesForProperty(Event $event, string $property): StateCollection + { + $attributes = $this->getAttributes($event); + $property = $attributes + ->firstOrFail(fn (StateDiscoveryAttribute $attribute) => $attribute->propertyName() === $property); + + $states = new StateCollection; + $this->discoverAndPushState($property, $event, $states); + + return $states; + } + /** @return Collection */ protected function discoverAndPushState(StateDiscoveryAttribute $attribute, Event $target, StateCollection $discovered): Collection { diff --git a/src/VerbsServiceProvider.php b/src/VerbsServiceProvider.php index 125af4a1..baea4dd1 100644 --- a/src/VerbsServiceProvider.php +++ b/src/VerbsServiceProvider.php @@ -25,6 +25,7 @@ use Thunk\Verbs\Contracts\StoresSnapshots; use Thunk\Verbs\Lifecycle\AutoCommitManager; use Thunk\Verbs\Lifecycle\Broker; +use Thunk\Verbs\Lifecycle\DeferredWriteQueue; use Thunk\Verbs\Lifecycle\Dispatcher; use Thunk\Verbs\Lifecycle\EventStore; use Thunk\Verbs\Lifecycle\MetadataManager; @@ -66,6 +67,7 @@ public function packageRegistered() $this->app->scoped(EventQueue::class); $this->app->scoped(EventStateRegistry::class); $this->app->singleton(MetadataManager::class); + $this->app->singleton(DeferredWriteQueue::class); $this->app->scoped(StateManager::class, function (Container $app) { return new StateManager( diff --git a/tests/Feature/DeferredWritesTest.php b/tests/Feature/DeferredWritesTest.php new file mode 100644 index 00000000..80e7109a --- /dev/null +++ b/tests/Feature/DeferredWritesTest.php @@ -0,0 +1,192 @@ +toBe(2); + + $GLOBALS['handle_count'] = 0; + + $this->artisan(ReplayCommand::class); + + expect($GLOBALS['handle_count'])->toBe(2); +}); + +it('prevents duplicate writes automatically using the StateId attribute', function () { + $state1_id = Id::make(); + $state2_id = Id::make(); + + // State 1 + LatestHandleTestEvent::fire(state_id: $state1_id); + LatestHandleTestEvent::fire(state_id: $state1_id); + AnotherLatestHandleTestEvent::fire(state_id: $state1_id); + AnotherLatestHandleTestEvent::fire(state_id: $state1_id); + + // State 2 + LatestHandleTestEvent::fire(state_id: $state2_id); + LatestHandleTestEvent::fire(state_id: $state2_id); + AnotherLatestHandleTestEvent::fire(state_id: $state2_id); + AnotherLatestHandleTestEvent::fire(state_id: $state2_id); + + Verbs::commit(); + + expect($GLOBALS['handle_count'])->toBe(4); + + $GLOBALS['handle_count'] = 0; + + $this->artisan(ReplayCommand::class); + + expect($GLOBALS['handle_count'])->toBe(4); +}); + +it('prevents duplicate writes automatically using a specific name', function () { + $state1_id = Id::make(); + $state2_id = Id::make(); + + NamedHandleTestEvent::fire(state_id: $state2_id); // 5 + AnotherNamedHandleTestEvent::fire(state_id: $state2_id); // 7 + + Verbs::commit(); + + expect($GLOBALS['handle_count'])->toBe(1); + + $GLOBALS['handle_count'] = 0; + + $this->artisan(ReplayCommand::class); + + expect($GLOBALS['handle_count'])->toBe(1); +}); + +it('can receive handle data when replay_only is set', function () { + $this->assertTrue(CommitOnlyTestEvent::commit()); + $this->assertTrue(CommitOnlyTestEvent::commit()); + + expect($GLOBALS['handle_count'])->toBe(2); + + $GLOBALS['handle_count'] = 0; + + $this->artisan(ReplayCommand::class); + + expect($GLOBALS['handle_count'])->toBe(1); +}); + +it('only runs callbacks once', function () { + NamedHandleTestEvent::fire(); + + Verbs::defer(null, function () { + $GLOBALS['handle_count']++; + }); + + Verbs::defer(null, function () { + $GLOBALS['handle_count']++; + }); + + Verbs::defer(null, function () { + $GLOBALS['handle_count']++; + }, 'another'); + + Verbs::defer(null, function () { + $GLOBALS['handle_count']++; + }, 'another'); + + $state = LatestHandleTestState::load(snowflake_id()); + $state2 = LatestHandleTestState::load(snowflake_id()); + + Verbs::defer($state, function () { + $GLOBALS['handle_count']++; + }, 'another'); + + Verbs::defer($state, function () { + $GLOBALS['handle_count']++; + }, 'another'); + + Verbs::defer([$state, $state2], function () { + $GLOBALS['handle_count']++; + }, 'another'); + + Verbs::commit(); + + expect($GLOBALS['handle_count'])->toBe(5); +}); + +class LatestHandleTestEvent extends Event +{ + #[StateId(LatestHandleTestState::class)] + public int $state_id; + + #[DeferFor('state_id')] + public function handle(): void + { + $GLOBALS['handle_count']++; + } +} + +#[AppliesToState(LatestHandleTestState::class, 'state_id')] +class AnotherLatestHandleTestEvent extends Event +{ + public int $state_id; + + #[DeferFor('state_id')] + public function handle(): void + { + $GLOBALS['handle_count']++; + } +} + +class NamedHandleTestEvent extends Event +{ + #[DeferFor(null, name: 'named')] + public function handle(): void + { + $GLOBALS['handle_count']++; + } +} + +class AnotherNamedHandleTestEvent extends Event +{ + #[DeferFor(null, name: 'named')] + public function handle(): void + { + $GLOBALS['handle_count']++; + } +} + +class CommitOnlyTestEvent extends Event +{ + #[DeferFor(null, replay_only: true)] + public function handle(): bool + { + $GLOBALS['handle_count']++; + + return true; + } +} + +class LatestHandleTestState extends State +{ + public $test = 'test'; +}