Skip to content

Commit 9062ff5

Browse files
Merge pull request #22 from jbcr/issue_17_failed_message_never_retried_suite
[issue #17] fix retry and faillure queue
2 parents 26ab698 + 2927652 commit 9062ff5

31 files changed

+1192
-5
lines changed

.github/workflows/quality.yaml

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# This workflow uses actions that are not certified by GitHub.
2+
# They are provided by a third-party and are governed by
3+
# separate terms of service, privacy policy, and support
4+
# documentation.
5+
6+
name: Quality
7+
8+
on:
9+
push:
10+
branches: [ "2.x", "issue_17_failed_message_never_retried" ]
11+
pull_request:
12+
branches: [ "2.x" ]
13+
14+
permissions:
15+
contents: read
16+
17+
jobs:
18+
symfony-tests:
19+
strategy:
20+
fail-fast: false
21+
matrix:
22+
php: ['8.0', '8.1', '8.2', '8.3']
23+
runs-on: ubuntu-latest
24+
25+
services:
26+
mariadb:
27+
image: mariadb:10.11
28+
ports:
29+
- 3306:3306
30+
env:
31+
MYSQL_USER: user
32+
MYSQL_PASSWORD: nopassword
33+
MYSQL_DATABASE: app_test
34+
MYSQL_ROOT_PASSWORD: nopassword
35+
options: --health-cmd="mysqladmin ping" --health-interval=5s --health-timeout=2s --health-retries=3
36+
37+
38+
env:
39+
DATABASE_URL: mysql://root:[email protected]:3306/app?serverVersion=mariadb-10.11.2&charset=utf8mb4
40+
APP_ENV: test
41+
steps:
42+
# To automatically get bug fixes and new Php versions for shivammathur/setup-php,
43+
# change this to (see https://github.com/shivammathur/setup-php#bookmark-versioning):
44+
# uses: shivammathur/setup-php@v2
45+
- uses: shivammathur/setup-php@v2
46+
with:
47+
php-version: ${{ matrix.php }}
48+
coverage: xdebug
49+
50+
- uses: actions/checkout@v4
51+
# - name: Copy .env.test.local
52+
# run: php -r "file_exists('.env.test.local') || copy('.env.test', '.env.test.local');"
53+
- name: Cache Composer packages
54+
id: composer-cache
55+
uses: actions/cache@v4
56+
with:
57+
path: vendor
58+
key: ${{ runner.os }}-php${{ matrix.php }}-${{ hashFiles('**/composer.lock') }}
59+
restore-keys: |
60+
${{ runner.os }}-php${{ matrix.php }}-
61+
- name: Install Dependencies
62+
run: composer install -q --no-ansi --no-interaction --no-scripts --no-progress --prefer-dist
63+
- name: Create Database
64+
run: |
65+
cd tests/Application
66+
bin/console doctrine:migration:migrate -n
67+
- name: Execute tests (Unit and Feature tests) via PHPUnit
68+
run: vendor/bin/phpunit --process-isolation

composer.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@
4141
"symfony/dotenv": "^5.4|^6.0",
4242
"symfony/runtime": "^5.4|^6.0",
4343
"symfony/messenger": "^5.4|^6.0",
44+
"symfony/doctrine-messenger": "^5.4|^6.0",
4445
"symfony/monolog-bundle": "^3.10",
4546
"doctrine/doctrine-bundle": "^2.12",
4647
"doctrine/doctrine-migrations-bundle": "^3.3",
47-
"doctrine/orm": "^2.19"
48+
"doctrine/orm": "^2.19",
49+
"phpunit/phpunit": "^9.6"
4850
},
4951
"suggest": {
5052
"ext-win32service": "On Windows only, install this extension to run PHP Service on Windows Service Manager"

lib/DependencyInjection/MessengerPass.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
use Symfony\Component\DependencyInjection\ContainerBuilder;
99
use Symfony\Component\DependencyInjection\Exception\OutOfBoundsException;
1010
use Symfony\Component\DependencyInjection\Reference;
11+
use Win32ServiceBundle\MessengerSubscriber\ResetServicesListener;
12+
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageForRetryListener;
13+
use Win32ServiceBundle\MessengerSubscriber\SendFailedMessageToFailureTransportListener;
1114

1215
final class MessengerPass implements CompilerPassInterface
1316
{
@@ -16,6 +19,13 @@ final class MessengerPass implements CompilerPassInterface
1619
private string $win32ServiceRunnerTag = TagRunnerCompilerPass::WIN32SERVICE_RUNNER_TAG.'.messenger';
1720

1821
public function process(ContainerBuilder $container): void
22+
{
23+
$this->processService($container);
24+
$this->processRetryConfig($container);
25+
$this->processFailledConfig($container);
26+
}
27+
28+
private function processService(ContainerBuilder $container): void
1929
{
2030
$busIds = [];
2131
foreach ($container->findTaggedServiceIds($this->busTag) as $busId => $tags) {
@@ -42,6 +52,7 @@ public function process(ContainerBuilder $container): void
4252
$serviceRunnerDefinition = $container->getDefinition($win32ServiceId);
4353

4454
$serviceRunnerDefinition->replaceArgument(1, new Reference('messenger.routable_message_bus'));
55+
$serviceRunnerDefinition->replaceArgument(7, new Reference(ResetServicesListener::class));
4556

4657
$serviceRunnerDefinition->replaceArgument(6, array_values($receiverNames));
4758
try {
@@ -51,4 +62,34 @@ public function process(ContainerBuilder $container): void
5162
}
5263
}
5364
}
65+
66+
private function processFailledConfig(ContainerBuilder $container): void
67+
{
68+
if (
69+
$container->hasDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') === false
70+
|| $container->hasDefinition(SendFailedMessageToFailureTransportListener::class) === false
71+
) {
72+
return;
73+
}
74+
75+
$serviceSF = $container->findDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
76+
77+
$serviceWin32 = $container->findDefinition(SendFailedMessageToFailureTransportListener::class);
78+
$serviceWin32->replaceArgument('$failureSenders', $serviceSF->getArgument(0));
79+
}
80+
81+
private function processRetryConfig(ContainerBuilder $container): void
82+
{
83+
if (
84+
$container->hasDefinition('messenger.retry.send_failed_message_for_retry_listener') === false
85+
|| $container->hasDefinition(SendFailedMessageForRetryListener::class) === false
86+
) {
87+
return;
88+
}
89+
90+
$serviceSF = $container->findDefinition('messenger.retry.send_failed_message_for_retry_listener');
91+
92+
$serviceWin32 = $container->findDefinition(SendFailedMessageForRetryListener::class);
93+
$serviceWin32->replaceArgument('$sendersLocator', $serviceSF->getArgument(0));
94+
}
5495
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Win32ServiceBundle\Event;
6+
7+
use Win32ServiceBundle\Model\MessengerServiceRunner;
8+
9+
final class MessengerWorkerStoppedEvent
10+
{
11+
private MessengerServiceRunner $messengerServiceRunner;
12+
13+
public function __construct(MessengerServiceRunner $messengerServiceRunner)
14+
{
15+
$this->messengerServiceRunner = $messengerServiceRunner;
16+
}
17+
18+
public function getMessengerServiceRunner(): MessengerServiceRunner
19+
{
20+
return $this->messengerServiceRunner;
21+
}
22+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Win32ServiceBundle\MessengerSubscriber;
6+
7+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
8+
use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp;
9+
use Win32ServiceBundle\Event\MessengerWorkerMessageFailedEvent;
10+
11+
final class AddErrorDetailsStampListener implements EventSubscriberInterface
12+
{
13+
public function onMessageFailed(MessengerWorkerMessageFailedEvent $event): void
14+
{
15+
$stamp = ErrorDetailsStamp::create($event->getThrowable());
16+
$previousStamp = $event->getEnvelope()->last(ErrorDetailsStamp::class);
17+
18+
// Do not append duplicate information
19+
if ($previousStamp === null || !$previousStamp->equals($stamp)) {
20+
$event->addStamps($stamp);
21+
}
22+
}
23+
24+
public static function getSubscribedEvents(): array
25+
{
26+
return [
27+
// must have higher priority than SendFailedMessageForRetryListener
28+
MessengerWorkerMessageFailedEvent::class => ['onMessageFailed', 200],
29+
];
30+
}
31+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Win32ServiceBundle\MessengerSubscriber;
6+
7+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
8+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
9+
use Win32ServiceBundle\Event\MessengerWorkerRunningEvent;
10+
use Win32ServiceBundle\Event\MessengerWorkerStoppedEvent;
11+
12+
/**
13+
* @author Grégoire Pineau <[email protected]>
14+
*/
15+
class ResetServicesListener implements EventSubscriberInterface
16+
{
17+
private ServicesResetter $servicesResetter;
18+
19+
public function __construct(ServicesResetter $servicesResetter)
20+
{
21+
$this->servicesResetter = $servicesResetter;
22+
}
23+
24+
public function resetServices(MessengerWorkerRunningEvent $event): void
25+
{
26+
if (!$event->isWorkerIdle()) {
27+
$this->servicesResetter->reset();
28+
}
29+
}
30+
31+
public function resetServicesAtStop(MessengerWorkerStoppedEvent $event): void
32+
{
33+
$this->servicesResetter->reset();
34+
}
35+
36+
public static function getSubscribedEvents(): array
37+
{
38+
return [
39+
MessengerWorkerRunningEvent::class => ['resetServices', -1024],
40+
MessengerWorkerStoppedEvent::class => ['resetServicesAtStop', -1024],
41+
];
42+
}
43+
}

0 commit comments

Comments
 (0)