Symfony Messenger: Queues and Workers

In the previous article, we were introduced to the Symfony Messenger component. We created a simple implementation of a message bus. Our message was sent through the bus and directly handled by an assigned handler. Today we are going to take a look at queues and workers. Our message bus will send the message to a queue.  Then our worker will pull the message from the queue so it be can be handled by a handler.

AMQP

If we want to use a queue, we need to use an AMQP broker. AMQP stands for advanced message queue protocol. We need to install a plugin trough pecl on our PHP installation to allow us to use this protocol.

&& apt-get -y install gcc make autoconf libc-dev pkg-config \
&& apt-get -y install libssl-dev \
&& apt-get -y install librabbitmq-dev \
&& apt-get -y install php7.2-dev \
&& pecl install amqp \

Adding these commands to our PHP-fpm Dockerfile and then rebuilding it allows us to use the AMQP protocol to connect with an AMQP broker.

But don’t forget to add extension=amqp.so to your php ini file.

We also need to add a package to our project to use AMQP. This is the recipe amqp-pack. This will allow Symfony Messenger to connect with an AMQP message broker.

composer require symfony/amqp-pack

RabbitMQ

RabbitMQ is an AMQP message broker. It’s one of the most widely used open source message brokers. There are different alternatives and you can use whatever message broker you want. I have chosen RabbitMQ because its open source and can be easily set up with docker.

Setting up RabbitMQ

We will be starting from the previous example found here. In there you can find a docker setup. We will be needing to expand our docker compose to also have RabbitMQ.

In docker-compose.yml you need to add a RabbitMQ image. I use the one from bitnami. You can find it here.

rabbitmq:
  image: bitnami/rabbitmq:latest
  container_name: rabbitmq
  volumes:
  - ./rabbitmq-persistence:/bitnami
  ports:
    - 15672:15672

Now we just need to up this container and we are ready to use it.

Configuring RabbitMQ with Symfony Messenger

We need to configure an AMQP transport. For this, we can use a DSN from our environment variables. A DSN is a Data Source Name. You can compare it with your connection string for your database, but in this case, it’s used for connecting to a message broker, RabbitMQ in our case.

framework:
    messenger:
        transports:
            # Uncomment the following line to enable a transport named "amqp"
            amqp: '%env(MESSENGER_TRANSPORT_DSN)%'

        routing:
            # Route your messages to the transports
            'App\Core\Domain\Message\CreateSalesOrder': amqp

We have also configured that our CreateSalesOrder message will use the amqp transport. This allows us to route different messages to different transports. If a message does not have any transport configured, it will follow the normal flow as seen in the previous article.

And at last, don’t forget to set up your environment variable.

MESSENGER_TRANSPORT_DSN=amqp://guest:guest@rabbitmq:5672/%2f/messages

By default, RabbitMQ has a guest user with guest password. The hostname is RabbitMQ (or an IP if not using docker). The default port is 5672. But this will not work with docker. We will need to create an account.

Go to your RabbitMQ Management panel (http://ip:15672/). Login with username user and password bitnami. Got to Admin and create a new user. After the user is created you need to set permissions to access the virtual hosts for that user.

Now you can use that newly created user in your DNS.

Testing the transport

If you now POST to purchaseOrder you won’t see the var_dump result. This is because the message is now sent to the queue, and has not been executed yet. If you would open your RabbitMQ management panel you will see that there is one message in the queue.

Running the worker

Now we can use a worker. We just have to run php bin/console messenger-consume-messages. This will consume all the messages from the queue and execute them one by one. You will now see your var_dump result again.

In production, you would keep this worker running in the background as a long-running task.

Conclusion

You now took the first steps of working with Messages, Handlers, Transports and Workers. You can create truly asynchronous jobs now and use simple message busses. This is only the beginning and there is a lot more to explore in the realm of message buses and asynchronous processes. I hope to further explore these things together without in the future. And as always, think before you code!