Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/Symfony/Component/Messenger/BatchMessageBusInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger;

use Symfony\Component\Messenger\Exception\ExceptionInterface;
use Symfony\Component\Messenger\Stamp\StampInterface;

/**
* A message bus that supports batch dispatching.
*
* @author Joppe De Cuyper <[email protected]>
*/
interface BatchMessageBusInterface extends MessageBusInterface
{
/**
* Dispatches multiple messages in a batch, allowing transports to optimize delivery.
*
* Each message passes through the full middleware chain individually.
* The actual transport send is deferred until all messages are processed,
* then executed as a single batch operation.
*
* @param object[]|Envelope[] $messages Messages or pre-wrapped envelopes
* @param StampInterface[] $stamps Stamps applied to all messages
*
* @return Envelope[] In same order as input, with TransportMessageIdStamp added
*
* @throws ExceptionInterface
*/
public function dispatchBatch(array $messages, array $stamps = []): array;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
use Doctrine\DBAL\Tools\DsnParser;
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
use PHPUnit\Framework\TestCase;
use Symfony\Bridge\Doctrine\Middleware\Debug\DebugDataHolder;
use Symfony\Bridge\Doctrine\Middleware\Debug\Middleware;
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;

Expand All @@ -25,6 +27,7 @@ class DoctrineIntegrationTest extends TestCase
{
private \Doctrine\DBAL\Connection $driverConnection;
private Connection $connection;
private DebugDataHolder $debugDataHolder;

protected function setUp(): void
{
Expand All @@ -33,6 +36,9 @@ protected function setUp(): void
$config = new Configuration();
$config->setSchemaManagerFactory(new DefaultSchemaManagerFactory());

$this->debugDataHolder = new DebugDataHolder();
$config->setMiddlewares([new Middleware($this->debugDataHolder, null)]);

$this->driverConnection = DriverManager::getConnection($params, $config);
$this->connection = new Connection([], $this->driverConnection);
}
Expand Down Expand Up @@ -197,6 +203,72 @@ public function testTheTransportIsSetupOnGet()
$this->assertEquals('the body', $envelope['body']);
}

public function testSendBatch()
{
// Setup connection, and reset the debug data holder to only capture the batch insert query
$this->connection->setup();
$this->debugDataHolder->reset();

$this->connection->sendBatch([
['body' => '{"message": "First"}', 'headers' => ['type' => DummyMessage::class], 'delay' => 0],
['body' => '{"message": "Second"}', 'headers' => ['type' => DummyMessage::class], 'delay' => 0],
['body' => '{"message": "Third"}', 'headers' => ['type' => DummyMessage::class], 'delay' => 0],
]);

$queries = $this->debugDataHolder->getData()['default'] ?? [];
$this->assertCount(1, $queries, 'Batch insert should execute exactly 1 SQL statement');
$this->assertStringStartsWith('INSERT INTO', $queries[0]['sql']);
$this->assertSame(3, $this->connection->getMessageCount());

$first = $this->connection->get();
$this->assertEquals('{"message": "First"}', $first['body']);
$this->connection->ack($first['id']);

$second = $this->connection->get();
$this->assertEquals('{"message": "Second"}', $second['body']);
$this->connection->ack($second['id']);

$third = $this->connection->get();
$this->assertEquals('{"message": "Third"}', $third['body']);
$this->connection->ack($third['id']);

$this->assertSame(0, $this->connection->getMessageCount());
}

public function testSendBatchWithDelay()
{
$this->connection->sendBatch([
['body' => '{"message": "Immediate"}', 'headers' => ['type' => DummyMessage::class], 'delay' => 0],
['body' => '{"message": "Delayed"}', 'headers' => ['type' => DummyMessage::class], 'delay' => 600000],
]);

// Only the immediate message should be available
$this->assertSame(1, $this->connection->getMessageCount());

$immediate = $this->connection->get();
$this->assertEquals('{"message": "Immediate"}', $immediate['body']);
}

public function testSendBatchEmpty()
{
$this->connection->setup();
$this->connection->sendBatch([]);

$this->assertSame(0, $this->connection->getMessageCount());
}

public function testSendBatchSingleMessage()
{
$this->connection->sendBatch([
['body' => '{"message": "Only one"}', 'headers' => ['type' => DummyMessage::class], 'delay' => 0],
]);

$this->assertSame(1, $this->connection->getMessageCount());

$message = $this->connection->get();
$this->assertEquals('{"message": "Only one"}', $message['body']);
}

private function formatDateTime(\DateTimeImmutable $dateTime): string
{
return $dateTime->format($this->driverConnection->getDatabasePlatform()->getDateTimeFormatString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,60 @@ public function testSendWithDelay()
$sender = new DoctrineSender($connection, $serializer);
$sender->send($envelope);
}

public function testSendBatchWithIdsReturned()
{
$envelope1 = new Envelope(new DummyMessage('First'));
$envelope2 = new Envelope(new DummyMessage('Second'));
$encoded1 = ['body' => 'body1', 'headers' => ['type' => DummyMessage::class]];
$encoded2 = ['body' => 'body2', 'headers' => ['type' => DummyMessage::class]];

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())
->method('sendBatch')
->with([
['body' => 'body1', 'headers' => ['type' => DummyMessage::class], 'delay' => 0],
['body' => 'body2', 'headers' => ['type' => DummyMessage::class], 'delay' => 0],
])
->willReturn(['15', '16']);

$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->willReturnCallback(fn ($envelope) => $envelope->getMessage()->getMessage() === 'First' ? $encoded1 : $encoded2);

$sender = new DoctrineSender($connection, $serializer);
$result = $sender->sendBatch([$envelope1, $envelope2]);

$this->assertCount(2, $result);

$stamp1 = $result[0]->last(TransportMessageIdStamp::class);
$this->assertNotNull($stamp1);
$this->assertSame('15', $stamp1->getId());

$stamp2 = $result[1]->last(TransportMessageIdStamp::class);
$this->assertNotNull($stamp2);
$this->assertSame('16', $stamp2->getId());
}

public function testSendBatchWithoutIdsReturned()
{
$envelope1 = new Envelope(new DummyMessage('First'));
$envelope2 = new Envelope(new DummyMessage('Second'));
$encoded1 = ['body' => 'body1', 'headers' => ['type' => DummyMessage::class]];
$encoded2 = ['body' => 'body2', 'headers' => ['type' => DummyMessage::class]];

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())
->method('sendBatch')
->willReturn(null);

$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->willReturnCallback(fn ($envelope) => $envelope->getMessage()->getMessage() === 'First' ? $encoded1 : $encoded2);

$sender = new DoctrineSender($connection, $serializer);
$result = $sender->sendBatch([$envelope1, $envelope2]);

$this->assertCount(2, $result);
$this->assertNull($result[0]->last(TransportMessageIdStamp::class));
$this->assertNull($result[1]->last(TransportMessageIdStamp::class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,67 @@
]);
}

/**
* Sends multiple messages in a single INSERT query.
*
* @param list<array{body: string, headers: array, delay: int}> $messages
*
* @return list<string>|null The inserted IDs if supported by the platform (PostgreSQL), null otherwise
*
* @throws DBALException
*/
public function sendBatch(array $messages): ?array
{
if ([] === $messages) {
return [];
}

$now = new \DateTimeImmutable('UTC');
$queueName = $this->configuration['queue_name'];
$queryBuilder = $this->driverConnection->createQueryBuilder()
->insert($this->configuration['table_name'])
->values([
'body' => '?',
'headers' => '?',
'queue_name' => '?',
'created_at' => '?',
'available_at' => '?',
]);

$baseSql = $queryBuilder->getSQL();

$additionalPlaceholders = [];
$values = [];
$types = [];

foreach ($messages as $index => $message) {
$delay = $message['delay'];
$availableAt = $now->modify(\sprintf('%+d seconds', $delay / 1000));

if ($index > 0) {
$additionalPlaceholders[] = '(?, ?, ?, ?, ?)';
}

$values[] = $message['body'];
$values[] = json_encode($message['headers']);
$values[] = $queueName;
$values[] = $now;
$values[] = $availableAt;

$types[] = Types::STRING;
$types[] = Types::STRING;
$types[] = Types::STRING;
$types[] = Types::DATETIME_IMMUTABLE;
$types[] = Types::DATETIME_IMMUTABLE;
}

$sql = [] === $additionalPlaceholders
? $baseSql
: $baseSql.', '.implode(', ', $additionalPlaceholders);

return $this->executeBatchInsert($sql, $values, $types);
}

public function get(): ?array
{
if ($this->doMysqlCleanup && $this->driverConnection->getDatabasePlatform() instanceof AbstractMySQLPlatform) {
Expand Down Expand Up @@ -517,6 +578,38 @@
return $id;
}

/**
* @return list<string>|null The inserted IDs if supported by the platform (PostgreSQL), null otherwise

Check failure on line 582 in src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

View workflow job for this annotation

GitHub Actions / Psalm

InvalidReturnType

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php:582:16: InvalidReturnType: Not all code paths of Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection::executeBatchInsert end in a return statement, return type list<string>|null expected (see https://psalm.dev/011)

Check failure on line 582 in src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

View workflow job for this annotation

GitHub Actions / Psalm

InvalidReturnType

src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php:582:16: InvalidReturnType: Not all code paths of Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection::executeBatchInsert end in a return statement, return type list<string>|null expected (see https://psalm.dev/011)
*/
private function executeBatchInsert(string $sql, array $parameters = [], array $types = []): ?array
{
$isPostgreSql = $this->driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform;

if ($isPostgreSql) {
$sql .= ' RETURNING id';
}

insert:
try {
if ($isPostgreSql) {
$ids = $this->driverConnection->fetchFirstColumn($sql, $parameters, $types);

return array_map(strval(...), $ids);
}

$this->driverConnection->executeStatement($sql, $parameters, $types);

return null;
} catch (TableNotFoundException $e) {
if (!$this->autoSetup) {
throw $e;
}

$this->setup();
goto insert;
}
}

private function getSchema(): Schema
{
$schema = new Schema([], [], $this->driverConnection->createSchemaManager()->createSchemaConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Sender\BatchSenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
* @author Vincent Touzet <[email protected]>
*/
class DoctrineSender implements SenderInterface
class DoctrineSender implements BatchSenderInterface
{
private SerializerInterface $serializer;

Expand All @@ -50,4 +50,44 @@ public function send(Envelope $envelope): Envelope

return $envelope->with(new TransportMessageIdStamp($id));
}

public function getMaxBatchSize(): ?int
{
return null;
}

public function sendBatch(array $envelopes): array
{
if ([] === $envelopes) {
return [];
}

$messages = [];
foreach ($envelopes as $envelope) {
$encodedMessage = $this->serializer->encode($envelope);

/** @var DelayStamp|null $delayStamp */
$delayStamp = $envelope->last(DelayStamp::class);

$messages[] = [
'body' => $encodedMessage['body'],
'headers' => $encodedMessage['headers'] ?? [],
'delay' => null !== $delayStamp ? $delayStamp->getDelay() : 0,
];
}

try {
$ids = $this->connection->sendBatch($messages);
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}

if (null !== $ids) {
foreach ($envelopes as $index => $envelope) {
$envelopes[$index] = $envelope->with(new TransportMessageIdStamp($ids[$index]));
}
}

return $envelopes;
}
}
Loading
Loading