Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Attributes/Autodiscovery/AppliesToChildState.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public function discoverState(Event $event, StateManager $manager): State

return $manager->load($parent->{$this->id}, $this->state_type);
}

public function propertyName(): ?string
{
return $this->id;
}
}
5 changes: 5 additions & 0 deletions src/Attributes/Autodiscovery/AppliesToSingletonState.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@ public function discoverState(Event $event, StateManager $manager): State
{
return $manager->singleton($this->state_type);
}

public function propertyName(): ?string
{
return null;
}
}
5 changes: 5 additions & 0 deletions src/Attributes/Autodiscovery/AppliesToState.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,9 @@ protected function getStateIdProperty(Event $event): string

throw new InvalidArgumentException("No ID property provided AppliesToState for {$this->state_type}");
}

public function propertyName(): string
{
return $this->id;
}
}
2 changes: 2 additions & 0 deletions src/Attributes/Autodiscovery/StateDiscoveryAttribute.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,6 @@ protected function inferAliasFromVariableName(string $name)
? Str::beforeLast($name, '_id')
: Str::beforeLast($name, 'Id');
}

abstract public function propertyName(): ?string;
}
5 changes: 5 additions & 0 deletions src/Attributes/Autodiscovery/StateId.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public function discoverState(Event $event, StateManager $manager): State|array

return array_map(fn ($id) => $manager->load($id, $this->state_type), Arr::wrap($id));
}

public function propertyName(): string
{
return $this->property->getName();
}
}
33 changes: 33 additions & 0 deletions src/Attributes/Hooks/DeferFor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Thunk\Verbs\Attributes\Hooks;

use Attribute;
use Thunk\Verbs\Lifecycle\Hook;

#[Attribute(Attribute::TARGET_METHOD)]
class DeferFor implements HookAttribute
{
public const EVENT_CLASS = 'event_class_name';

/**
* This attribute is used to defer the handling of a hook until
* after data is committed/replayed. You can use this attribute
* to prevent duplicate writes by ensuring that the hook is only
* handled once for a given set of state properties.
*
* @param string|string[] $property_name The state property name(s) to be unique by
* @param string $name Defaults to the event's class name
* @param bool $replay_only Only defer for replayed events
*/
public function __construct(
public string|array|null $property_name,
public string $name = self::EVENT_CLASS,
public bool $replay_only = false,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially this should default to true. Event::commit() will default to returning values, and only replays will benefit initially.

) {}

public function applyToHook(Hook $hook): void
{
$hook->deferred_attribute = $this;
}
}
27 changes: 26 additions & 1 deletion src/Facades/Verbs.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,41 @@
use Thunk\Verbs\Contracts\BrokersEvents;
use Thunk\Verbs\Event;
use Thunk\Verbs\Lifecycle\Phase;
use Thunk\Verbs\State;
use Thunk\Verbs\Testing\BrokerFake;
use Thunk\Verbs\Testing\EventStoreFake;

/**
* Commits all outstanding events
*
* @method static bool commit()
*
* Determines if verbs is currently replaying events.
* @method static bool isReplaying()
*
* Executes the given callback only if not replaying events.
* @method static void unlessReplaying(callable $callback)
*
* Defers the execution of a callback. It will only get called once per unique constraint.
* @method static void defer(State|string|iterable|null $unique_by, callable $callback, string $name = 'Default')
*
* @param State|string|int|iterable|null $unique_by The uniqueness constraint for the deferred callback. It can be a State, string or array combination of both
* @param callable $callback The callback to be executed
* @param string $name Optional name identifier for the deferred callback, defaults to 'Default'. It's a secondary constraint
*
* Fires an event through the event store.
*
* @method static Event fire(Event $event)
*
* @param Event $event The event object to be fired
*
* @method static void createMetadataUsing(callable $callback)
* @method static void commitImmediately(bool $commit_immediately = true)
*
* @param callable $callback The callback function that generates metadata
* @return Event The fired event instance
*
* Sets a callback to create metadata for events.
*
* @method static EventStoreFake assertCommitted(string|Closure $event, Closure|int|null $callback = null)
* @method static EventStoreFake assertNotCommitted(string|Closure $event, ?Closure $callback = null)
* @method static EventStoreFake assertNothingCommitted()
Expand Down
2 changes: 2 additions & 0 deletions src/Lifecycle/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public function commit(): bool
foreach ($events as $event) {
$this->metadata->setLastResults($event, $this->dispatcher->handle($event));
}
app(DeferredWriteQueue::class)->flush();

return $this->commit();
}
Expand Down Expand Up @@ -105,6 +106,7 @@ public function replay(?callable $beforeEach = null, ?callable $afterEach = null
}
});
} finally {
app(DeferredWriteQueue::class)->flush();
$this->states->writeSnapshots();
$this->states->prune();
$this->states->setReplaying(false);
Expand Down
7 changes: 7 additions & 0 deletions src/Lifecycle/BrokerConvenienceMethods.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Thunk\Verbs\Exceptions\EventNotAuthorized;
use Thunk\Verbs\Exceptions\EventNotValid;
use Thunk\Verbs\Facades\Id;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\IdManager;
use Thunk\Verbs\Support\Wormhole;

Expand Down Expand Up @@ -74,6 +75,12 @@ public function unlessReplaying(callable $callback)
}
}

public function defer(State|string|iterable|null $unique_by, callable $callback, string $name = 'default'): void
{
$states = is_iterable($unique_by) ? $unique_by : [$unique_by];
app(DeferredWriteQueue::class)->addCallback($states, $callback, $name);
}

public function realNow(): CarbonInterface
{
return app(Wormhole::class)->realNow();
Expand Down
89 changes: 89 additions & 0 deletions src/Lifecycle/DeferredWriteQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php

namespace Thunk\Verbs\Lifecycle;

use Thunk\Verbs\Attributes\Hooks\DeferFor;
use Thunk\Verbs\Event;
use Thunk\Verbs\State;
use Thunk\Verbs\Support\EventStateRegistry;
use Thunk\Verbs\Support\StateCollection;
use Thunk\Verbs\Support\Wormhole;

class DeferredWriteQueue
{
private array $callbacks = [];

private int $count = 0;

public function addHook(Event $event, DeferFor $deferred, callable $callback): void
{
/** @var string[] $propertyNames */
$propertyNames = is_array($deferred->property_name) ? $deferred->property_name : [$deferred->property_name];

$uniqueByKey = '';
$states = new StateCollection;
foreach ($propertyNames as $property) {
if ($property === null) {
$uniqueByKey .= 'null';

continue;
}

$states = $states->merge(app(EventStateRegistry::class)->statesForProperty($event, $property));
}

$uniqueByKey .= $states->map(fn (State $state) => $state->id)->implode('|');

$name = $deferred->name === DeferFor::EVENT_CLASS ? get_class($event) : $deferred->name;

$this->callbacks[$name][$uniqueByKey][$this->count++] = [$event, $callback, true];
}

/**
* @param iterable<State|string|null> $states
*/
public function addCallback(iterable $states, callable $callback, string $name): void
{
$id = '';
foreach ($states as $state) {
if ($state === null) {
$id .= 'null';

continue;
}
if (is_string($state)) {
$id .= $state;

continue;
}
if ($state instanceof State) {
$id .= $state->id;

continue;
}
throw new \InvalidArgumentException('Invalid state type');
}

unset($this->callbacks[$name][$id]);
$this->callbacks[$name][$id][$this->count++] = [null, $callback, false];
}

public function flush(): void
{
foreach ($this->callbacks as $namedCallbacks) {
foreach ($namedCallbacks as $stateGroup) {
$lastCallback = end($stateGroup);
[$event, $callback, $isEventCallback] = $lastCallback;

if ($isEventCallback) {
app(Wormhole::class)->warp($event, $callback);
} else {
$callback();
}
}
}

$this->callbacks = [];
$this->count = 0;
}
}
16 changes: 14 additions & 2 deletions src/Lifecycle/Hook.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use ReflectionMethod;
use RuntimeException;
use SplObjectStorage;
use Thunk\Verbs\Attributes\Hooks\DeferFor;
use Thunk\Verbs\Event;
use Thunk\Verbs\Support\DependencyResolver;
use Thunk\Verbs\Support\Reflector;
Expand Down Expand Up @@ -47,6 +48,7 @@ public function __construct(
public array $states = [],
public SplObjectStorage $phases = new SplObjectStorage,
public ?string $name = null,
public ?DeferFor $deferred_attribute = null,
) {}

public function forcePhases(Phase ...$phases): static
Expand Down Expand Up @@ -98,7 +100,12 @@ public function fired(Container $container, Event $event): void
public function handle(Container $container, Event $event): mixed
{
if ($this->runsInPhase(Phase::Handle)) {
return $this->execute($container, $event);
$callable = fn () => $this->execute($container, $event);
if ($this->deferred_attribute && ! $this->deferred_attribute->replay_only) {
app(DeferredWriteQueue::class)->addHook($event, $this->deferred_attribute, $callable);
} else {
return $this->execute($container, $event);
}
}

return null;
Expand All @@ -107,7 +114,12 @@ public function handle(Container $container, Event $event): mixed
public function replay(Container $container, Event $event): void
{
if ($this->runsInPhase(Phase::Replay)) {
app(Wormhole::class)->warp($event, fn () => $this->execute($container, $event));
$callable = fn () => $this->execute($container, $event);
if ($this->deferred_attribute) {
app(DeferredWriteQueue::class)->addHook($event, $this->deferred_attribute, $callable);
} else {
app(Wormhole::class)->warp($event, $callable);
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/Support/EventStateRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ public function getStates(Event $event): StateCollection
return $discovered;
}

public function statesForProperty(Event $event, string $property): StateCollection
{
$attributes = $this->getAttributes($event);
$property = $attributes
->firstOrFail(fn (StateDiscoveryAttribute $attribute) => $attribute->propertyName() === $property);

$states = new StateCollection;
$this->discoverAndPushState($property, $event, $states);

return $states;
}

/** @return Collection<string, State> */
protected function discoverAndPushState(StateDiscoveryAttribute $attribute, Event $target, StateCollection $discovered): Collection
{
Expand Down
2 changes: 2 additions & 0 deletions src/VerbsServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Thunk\Verbs\Contracts\StoresSnapshots;
use Thunk\Verbs\Lifecycle\AutoCommitManager;
use Thunk\Verbs\Lifecycle\Broker;
use Thunk\Verbs\Lifecycle\DeferredWriteQueue;
use Thunk\Verbs\Lifecycle\Dispatcher;
use Thunk\Verbs\Lifecycle\EventStore;
use Thunk\Verbs\Lifecycle\MetadataManager;
Expand Down Expand Up @@ -66,6 +67,7 @@ public function packageRegistered()
$this->app->scoped(EventQueue::class);
$this->app->scoped(EventStateRegistry::class);
$this->app->singleton(MetadataManager::class);
$this->app->singleton(DeferredWriteQueue::class);

$this->app->scoped(StateManager::class, function (Container $app) {
return new StateManager(
Expand Down
Loading