Home / Symfony / New in Symfony 8.1: Messenger Improvements

New in Symfony 8.1: Messenger Improvements

The Symfony Messenger component continues to evolve in Symfony 8.1 with
improvements across worker execution, transport behavior, serialization, and
failure handling.

Batch Fetching Messages


Nicolas Grekas
Contributed by
Nicolas Grekas
in
#63662

In previous Symfony versions, workers fetched one message at a time from the
transport, requiring one network round-trip per message. Symfony 8.1 adds a
--fetch-size option to the messenger:consume command so workers can
request multiple messages in a single call:

$ php bin/console messenger:consume async --fetch-size=8

Transports use this hint when supported by their underlying protocol
(Amazon SQS up to 10 messages per ReceiveMessage call, Redis
XREADGROUP COUNT, Doctrine LIMIT, AMQP repeated basic_get).

Configurable Service Reset Interval


Nicolas Grekas
Contributed by
Nicolas Grekas
in
#63666

By default, messenger:consume resets services after each message to avoid
leaking state between handlers. The --no-reset option disables resets
entirely, which improves performance but can cause memory or state issues in
long-running workers.

Symfony 8.1 introduces a middle ground by allowing an integer value for --no-reset.
Services are then reset every N messages instead of after every message:

# default: reset services after every message
$ php bin/console messenger:consume async

# reset services every 100 messages (new in 8.1)
$ php bin/console messenger:consume async --no-reset=100

Custom Serialized Type Name for Messages


Grégoire Pineau
Contributed by
Grégoire Pineau
in
#63061

When applications exchange messages through a broker, they must agree on how
messages are encoded. Symfony’s Serializer transport currently stores the
PHP fully-qualified class name in a type header, but that value is often
not useful for non-Symfony consumers, or even Symfony applications using
different namespaces.

Symfony 8.1 adds a serializedTypeName argument to the #[AsMessage] attribute.
Its value is used as the type header instead of the FQCN:

namespace AppCrawlerMessage;

use SymfonyComponentMessengerAttributeAsMessage;

#[AsMessage(serializedTypeName: 'crawler.vectorization_finished')]
final readonly class VectorizationFinished
{
    public function __construct(
        public string $crawlId,
    ) {
    }
}

Per-Message Priority on AMQP


Valentin Nazarov
Contributed by
Valentin Nazarov
in
#41574

RabbitMQ supports message priorities through the priority property,
allowing higher-priority messages to overtake lower-priority ones within the
same queue. Symfony 8.1 exposes this through a new AmqpPriorityStamp for
the AMQP transport, similar to the existing BeanstalkdPriorityStamp:

use SymfonyComponentMessengerBridgeAmqpTransportAmqpPriorityStamp;

$bus->dispatch($message, [new AmqpPriorityStamp(5)]);

Per-message priority support is intentionally limited to AMQP because
transports such as Redis, Doctrine, or SQS do not support native priorities.
For cross-transport priority routing, configure separate queues and consume
them in priority order:

$ php bin/console messenger:consume high_priority low_priority

Idle Timeout for Batch Handlers


HypeMC
Contributed by
HypeMC
in
#63277

The BatchHandlerTrait lets a single handler process messages in batches
once the configured batch size is reached. However, in low-throughput scenarios,
batches can take a long time to fill, leaving messages buffered in memory.

Symfony 8.1 adds an optional getIdleTimeout() method to the trait. When the
worker remains idle for the configured duration, the current batch is flushed
even if it is not full:

use SymfonyComponentMessengerHandlerBatchHandlerInterface;
use SymfonyComponentMessengerHandlerBatchHandlerTrait;

class IndexProductsHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;

    // flush partial batches after 5 seconds of inactivity
    private function getIdleTimeout(): ?float
    {
        return 5.0;
    }

    private function shouldFlush(): bool
    {
        return 100 <= count($this->jobs);
    }

    // ...
}

Non-Blocking PostgreSQL LISTEN/NOTIFY


d-ph
Contributed by
d-ph
in
#47666

The Doctrine transport uses PostgreSQL’s LISTEN/NOTIFY feature to wake
workers as soon as new messages arrive instead of relying on polling.

Before Symfony 8.1, the blocking LISTEN call happened inside the transport
itself. When consuming from several PostgreSQL queues with priorities, the
worker could block on the first queue and never check the others.

Symfony 8.1 moves the blocking LISTEN/NOTIFY wait into a dedicated worker
subscriber triggered on the idle event. The worker loop now checks all
transports in priority order on every iteration and only blocks on NOTIFY
after all queues are found empty.

This change is fully transparent. Existing applications using the Doctrine
transport with PostgreSQL benefit automatically.

Decode Failures Routed Through Failure Handling


Nicolas Grekas
Contributed by
Nicolas Grekas
in
#62888

Previously, when Messenger could not decode a message, for example because its class
no longer existed after a refactor, the transport raised a MessageDecodingFailedException
and silently discarded the message from the queue.

Symfony 8.1 now routes decode failures through the normal failure-handling
pipeline. The message remains acknowledgeable, is wrapped in a
MessageDecodingFailedException envelope, and goes through the configured
retry and failure transports like any other failed message.

A new DecodeFailedMessageMiddleware retries decoding on every retry
attempt. If a fix is deployed later, such as restoring a missing class or
updating a serializer, the message can decode successfully on a subsequent
attempt and continue through the bus with all original stamps preserved.

Listable Redis Receiver


Mudassar Ali
Contributed by
Mudassar Ali
in
#61462

The ListableReceiverInterface lets monitoring tools and bundles such as
zenstruck/messenger-monitor-bundle inspect pending messages without
consuming them. The Doctrine transport already implemented this interface;
Symfony 8.1 adds the same support to the Redis transport.

RedisReceiver now exposes all() and find() methods backed by
Redis Stream XRANGE commands:

// list all pending messages on the transport
$envelopes = $receiver->all();

// find a specific message by Redis stream ID
$envelope = $receiver->find('1700000000000-0');

Force Redis Cluster via DSN


Alex Vlasov
Contributed by
Alex Vlasov
in
#54866

In previous Symfony versions, configuring a Redis Cluster transport required
listing every cluster node in the DSN. This approach is fragile when cluster
nodes change and impossible when only a load-balanced endpoint is exposed.

Symfony 8.1 adds a redis_cluster=true option to the Redis transport DSN.
When enabled, the Redis client connects in cluster mode through a single
endpoint and relies on cluster discovery for the remaining nodes:

# before: enumerate every node in the cluster
MESSENGER_TRANSPORT_DSN=redis://redis-0:6379,redis://redis-1:6379

# after: a single endpoint with redis_cluster=true
MESSENGER_TRANSPORT_DSN=redis://redis-cluster:6379?redis_cluster=true

Delayed Quorum Queues for AMQP


miquel-angel
Contributed by
miquel-angel
in
#60298

The AMQP transport uses dedicated delay queues to defer message delivery.
With RabbitMQ quorum queues, these delay queues require an explicit
x-expires argument so the broker eventually deletes them, but the value
must outlive every delayed message still stored in the queue.

Symfony 8.1 rewrites the delay queue strategy for quorum queues to use one
queue per day, with an expiration set to 1 day + delay + 10 seconds.
Messages scheduled for the same calendar day now share a queue, and that queue
is guaranteed to survive long enough for the slowest message to be delivered.

This fixes the long-standing issue where delayed quorum queues could expire
before their messages became due.

Disable Default AMQP Queue Binding


Max
Contributed by
Max
in
#63346

When declaring an AMQP transport without explicitly listing queues,
messenger:setup-transports creates a default messages queue and binds
it to the configured exchange. This is useful for consumers but unnecessary
for write-only transports that only publish to an exchange.

Symfony 8.1 lets you disable default queue creation by setting queues to
[] or false:

framework:
    messenger:
        transports:
            # write-only transport: no queue is declared or bound
            outgoing_events:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: 'EVENTS'
                        type: fanout
                    queues: false

            # ...

Release Deduplication Lock on Definitive Failure


Ousama Ben Younes
Contributed by
Ousama Ben Younes
in
#64070

Symfony 7.3 introduced the deduplication middleware, which acquires a lock
keyed by a DeduplicateStamp so duplicate dispatches of the same message
are skipped. The lock is released after the message is successfully handled,
or kept until expiration (300 seconds by default) while the message keeps failing.

In Symfony 8.1, when a message fails definitively, meaning the retry strategy
gives up and the message is moved to the failure transport, the deduplication
lock is released immediately. New dispatches using the same key can then
enter the queue right away instead of being skipped until the TTL expires.


Sponsor the Symfony project.
Tagged:

Leave a Reply

Your email address will not be published. Required fields are marked *