Skip to content

Commit 59d8706

Browse files
committed
WIP: Batch Messenger POC
1 parent 1aa580f commit 59d8706

File tree

8 files changed

+599
-7
lines changed

8 files changed

+599
-7
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Symfony\Component\Messenger\Exception\ExceptionInterface;
15+
use Symfony\Component\Messenger\Stamp\StampInterface;
16+
17+
/**
18+
* A message bus that supports batch dispatching.
19+
*
20+
* @author Joppe De Cuyper <[email protected]>
21+
*/
22+
interface BatchMessageBusInterface extends MessageBusInterface
23+
{
24+
/**
25+
* Dispatches multiple messages in a batch, allowing transports to optimize delivery.
26+
*
27+
* Each message passes through the full middleware chain individually.
28+
* The actual transport send is deferred until all messages are processed,
29+
* then executed as a single batch operation.
30+
*
31+
* @param object[]|Envelope[] $messages Messages or pre-wrapped envelopes
32+
* @param StampInterface[] $stamps Stamps applied to all messages
33+
*
34+
* @return Envelope[] In same order as input, with TransportMessageIdStamp added
35+
*
36+
* @throws ExceptionInterface
37+
*/
38+
public function dispatchBatch(array $messages, array $stamps = []): array;
39+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Event;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
16+
17+
/**
18+
* Event dispatched after a batch of messages has been sent to transports.
19+
*
20+
* @author Joppe De Cuyper <[email protected]>
21+
*/
22+
final class BatchSentToTransportsEvent
23+
{
24+
/**
25+
* @param Envelope[] $envelopes
26+
* @param array<string, SenderInterface> $senders
27+
*/
28+
public function __construct(
29+
private string $batchId,
30+
private array $envelopes,
31+
private array $senders,
32+
) {
33+
}
34+
35+
public function getBatchId(): string
36+
{
37+
return $this->batchId;
38+
}
39+
40+
/**
41+
* @return Envelope[]
42+
*/
43+
public function getEnvelopes(): array
44+
{
45+
return $this->envelopes;
46+
}
47+
48+
/**
49+
* @return array<string, SenderInterface>
50+
*/
51+
public function getSenders(): array
52+
{
53+
return $this->senders;
54+
}
55+
}

src/Symfony/Component/Messenger/MessageBus.php

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,31 @@
1111

1212
namespace Symfony\Component\Messenger;
1313

14+
use Psr\EventDispatcher\EventDispatcherInterface;
15+
use Psr\Log\LoggerAwareTrait;
1416
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
1517
use Symfony\Component\Messenger\Middleware\StackMiddleware;
18+
use Symfony\Component\Messenger\Stamp\BatchStamp;
19+
use Symfony\Component\Messenger\Transport\Sender\BatchCollector;
1620

1721
/**
1822
* @author Samuel Roze <[email protected]>
1923
* @author Matthias Noback <[email protected]>
2024
* @author Nicolas Grekas <[email protected]>
2125
*/
22-
class MessageBus implements MessageBusInterface
26+
class MessageBus implements BatchMessageBusInterface
2327
{
28+
use LoggerAwareTrait;
29+
2430
private \IteratorAggregate $middlewareAggregate;
2531

2632
/**
2733
* @param iterable<mixed, MiddlewareInterface> $middlewareHandlers
2834
*/
29-
public function __construct(iterable $middlewareHandlers = [])
30-
{
35+
public function __construct(
36+
iterable $middlewareHandlers = [],
37+
private ?EventDispatcherInterface $eventDispatcher = null,
38+
) {
3139
if ($middlewareHandlers instanceof \IteratorAggregate) {
3240
$this->middlewareAggregate = $middlewareHandlers;
3341
} elseif (\is_array($middlewareHandlers)) {
@@ -68,4 +76,28 @@ public function dispatch(object $message, array $stamps = []): Envelope
6876

6977
return $middlewareIterator->current()->handle($envelope, $stack);
7078
}
79+
80+
public function dispatchBatch(array $messages, array $stamps = []): array
81+
{
82+
if ([] === $messages) {
83+
return [];
84+
}
85+
86+
$collector = new BatchCollector($this->eventDispatcher);
87+
if (null !== $this->logger) {
88+
$collector->setLogger($this->logger);
89+
}
90+
91+
$messages = array_values($messages);
92+
$batchSize = \count($messages);
93+
94+
foreach ($messages as $index => $message) {
95+
$batchStamp = new BatchStamp($collector->getBatchId(), $index, $batchSize, $collector);
96+
97+
$envelope = $this->dispatch($message, [...$stamps, $batchStamp]);
98+
$collector->trackEnvelope($index, $envelope);
99+
}
100+
101+
return $collector->flush();
102+
}
71103
}

src/Symfony/Component/Messenger/Middleware/SendMessageMiddleware.php

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Symfony\Component\Messenger\Event\MessageSentToTransportsEvent;
1818
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
1919
use Symfony\Component\Messenger\Exception\NoSenderForMessageException;
20+
use Symfony\Component\Messenger\Stamp\BatchStamp;
2021
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2122
use Symfony\Component\Messenger\Stamp\SentStamp;
2223
use Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface;
@@ -43,6 +44,8 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
4344
];
4445

4546
$sender = null;
47+
$batchStamp = $envelope->last(BatchStamp::class);
48+
$collector = $batchStamp?->getCollector();
4649

4750
if ($envelope->all(ReceivedStamp::class)) {
4851
// it's a received message, do not send it back
@@ -57,12 +60,22 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
5760
$envelope = $event->getEnvelope();
5861
}
5962

60-
foreach ($senders as $alias => $sender) {
61-
$this->logger?->info('Sending message {class} with {alias} sender using {sender}', $context + ['alias' => $alias, 'sender' => $sender::class]);
62-
$envelope = $sender->send($envelope->with(new SentStamp($sender::class, \is_string($alias) ? $alias : null)));
63+
if ($collector) {
64+
// Batch mode: add to collector instead of sending immediately
65+
foreach ($senders as $alias => $sender) {
66+
$this->logger?->info('Batching message {class} with {alias} sender using {sender}', $context + ['alias' => $alias, 'sender' => $sender::class]);
67+
$envelope = $envelope->with(new SentStamp($sender::class, \is_string($alias) ? $alias : null));
68+
$collector->addPendingSend($alias, $sender, $envelope, $batchStamp->getBatchIndex());
69+
}
70+
} else {
71+
// Normal mode: send immediately
72+
foreach ($senders as $alias => $sender) {
73+
$this->logger?->info('Sending message {class} with {alias} sender using {sender}', $context + ['alias' => $alias, 'sender' => $sender::class]);
74+
$envelope = $sender->send($envelope->with(new SentStamp($sender::class, \is_string($alias) ? $alias : null)));
75+
}
6376
}
6477

65-
if (null !== $this->eventDispatcher && $senders) {
78+
if (null !== $this->eventDispatcher && $senders && !$collector) {
6679
$this->eventDispatcher->dispatch(new MessageSentToTransportsEvent($envelope, $senders));
6780
}
6881

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Symfony\Component\Messenger\Transport\Sender\BatchCollector;
15+
16+
/**
17+
* Stamp identifying a message as part of a batch dispatch.
18+
*
19+
* This stamp can be used by event listeners to correlate messages
20+
* belonging to the same batch.
21+
*
22+
* @author Joppe De Cuyper <[email protected]>
23+
*/
24+
final class BatchStamp implements NonSendableStampInterface
25+
{
26+
public function __construct(
27+
private string $batchId,
28+
private int $batchIndex,
29+
private int $batchSize,
30+
private ?BatchCollector $collector = null,
31+
) {
32+
}
33+
34+
public function getBatchId(): string
35+
{
36+
return $this->batchId;
37+
}
38+
39+
public function getBatchIndex(): int
40+
{
41+
return $this->batchIndex;
42+
}
43+
44+
public function getBatchSize(): int
45+
{
46+
return $this->batchSize;
47+
}
48+
49+
/**
50+
* @internal
51+
*/
52+
public function getCollector(): ?BatchCollector
53+
{
54+
return $this->collector;
55+
}
56+
}

0 commit comments

Comments
 (0)