Symfony Messenger: Messages and Handlers

Symfony introduced the Messenger component in 4.1. There are bundles like prooph and simplebus that can be used for the same purpose. But having a simple default package in Symfony might help more developers get into the concepts of CQRS and Event Sourcing.

In a series of 2 blog posts. I’m going to look at how can use the Messenger component to solve a simple problem, without applying CQRS or Event Sourcing. This to give a first impression of message buses and how we can use them in any existing application.

In this article, we are going to look at the basics of messages and handlers. Then next week we will take a deeper look at the queues and workers with RabbitMQ.

Starting without a message bus

Lets first start with how we would solve our problem without message buses. For this, we start with a Symfony skeleton and the framework extra bundle (annotations recipe).

composer create-project symfony/website-skeleton my-project
composer require annotations

We will also be using assertions.

composer require beberlei/assert

As an example, we are going to create a simple REST API.

The story is simple: The backend of our customer is sending new purchase orders. They are unable to send them in batches. They will need to make 1 POST call for each purchase order. There will be a lot of new purchase orders, and their business is booming big time. We only need to validate if their entered data is valid, after that we have an hour to process their purchase through our system.

Our team does not have any experience with message busses or asynchronous messages and thus creates the following solution:

final class PurchaseOrderController extends Controller
{
    use PayloadFromRequestTrait;
    
    /**
     * @Route("purchaseOrder", name="api_post_purchase_order", methods={"POST"})
     * @param Request $request
     * @return JsonResponse
     */
    public function postPurchaseOrder(Request $request): JsonResponse
    {
        $payload = $this->getPayloadFromRequest($request);
        $customerId = $payload['customerId'] ?? '';
        $title = $payload['title'] ?? '';

        try {
            Assertion::notEmpty($customerId);
            Assertion::notEmpty($title);
            
            // Todo: Create Purchase order
            var_dump($payload);
        } catch (AssertionFailedException $e) {
            return JsonResponse::create(['message' => $e->getMessage()], Response::HTTP_BAD_REQUEST);
        } catch (\Exception $e) {
            return JsonResponse::create(['message' => $e->getMessage()], Response::HTTP_INTERNAL_SERVER_ERROR);
        }

        return JsonResponse::create(null, Response::HTTP_ACCEPTED);
    }
}

Note that we show the payload instead of making an actual call to our service that will create the purchase order. This is just as example, you should implement your real service here instead.

Let’s order some new shoes.

POST /api/purchaseOrder HTTP/1.1
Host: 192.168.99.100:8080
Content-Type: application/json
{
    "customerId": "c8c68ad0-1f8b-44e7-97a8-e53916db6e60",
    "title": "New shoes"
}

As you can see we get a response with our payload.

/application/src/Presentation/Api/Rest/Controller/PurchaseOrderController.php:32:
array (size=2)
  'customerId' => string 'c8c68ad0-1f8b-44e7-97a8-e53916db6e60' (length=36)
  'title' => string 'New shoes' (length=9)

The problem we face now is that every time someone posts data. The caller needs to wait until the process is finished. In case our server is getting hammered with a lot of purchase orders. Everything will slow down to a halt and some requests might time out. This solution is not scalable.

We can create scalability by having the ability to do this work asynchronously. The caller provides its data, we validate it, and then send it right to a message queue. We can then potentially also offload it to one or multiple servers fine-tuned and scalable for this kind of work.

In this part, we are not going to look at the asynchronously part yet, but we will use message buses to allow this scalability in the future. And you will also see some of the advantages of using message buses in your system.

What is Symfony Messenger?

The idea behind the Messenger component is to send or receive messages to or from the same or different applications. These messages can also be sent or received through message queues and external systems. This allows you to create more decoupled code.

It all starts with a message. A message can be any serialisable PHP object, preferable an immutable object.

To be able to send messages, we also need something that we call a message bus. This is the bus you use to dispatch your messages to a receiver.

If we have sent a message through a bus, we need a message handler. This is a class that handles a message. It executes your business logic when a message arrives in it.

Then next we have transports. Transports allow you to send or receive messages through a third party system. This allows you to send and receive message between different systems. This allows you to use microservices or have dedicated servers handling certain work. You can offload image processing to a dedicated optimised microservice or send emails with your decoupled mail server.

Then, at last, we also need a worker. A worker consumes messages it receives from transports. This allows us to asynchronously handle messages that are stored in a queue.

We will look at the message, message bus and message handler in this article. And will go over transports and workers in the next.

Creating and dispatching a message

Before we can make a message bus, we need to install the Symfony Messenger component.

composer require symfony/messenger

First, we have to create the message that we will send through the message bus to our handler. This handler will then execute the business logic.

<?php
namespace App\Core\Domain\Message;

final class CreateSalesOrder
{
    /**
     * @var string
     */
    private $customerId;

    /**
     * @var string
     */
    private $title;
    
    public function __construct(string $customerId, string $title)
    {
        $this->customerId = $customerId;
        $this->title = $title;
    }
    
    public function getCustomerId(): string
    {
        return $this->customerId;
    }
    
    public function getTitle(): string
    {
        return $this->title;
    }
}

Good, we have a message now. Let us dispatch it to our message bus by injecting the MessageBusInterface in our controller.

public function __construct(MessageBusInterface $messageBus)
{
    $this->messageBus = $messageBus;
}

And then dispatch a newly created message through the message bus.

public function postPurchaseOrder(Request $request): JsonResponse
{
    $payload = $this->getPayloadFromRequest($request);
    $customerId = $payload['customerId'] ?? '';
    $title = $payload['title'] ?? '';

    try {
        Assertion::notEmpty($customerId);
        Assertion::notEmpty($title);

        $this->messageBus->dispatch(new CreateSalesOrder($customerId, $title));
    } catch (AssertionFailedException $e) {
        return JsonResponse::create(['message' => $e->getMessage()], Response::HTTP_BAD_REQUEST);
    } catch (\Exception $e) {
        return JsonResponse::create(['message' => $e->getMessage()], Response::HTTP_INTERNAL_SERVER_ERROR);
    }

    return JsonResponse::create(null, Response::HTTP_ACCEPTED);
}

It’s simple as dispatching events. (An event is also a message. We could use a message bus instead of an Event Dispatcher for events.)

If you execute the POST request as we did earlier. You will see that we get an exception, telling us that we don’t have any handler for our message.

We have sent a message to the bus. But we don’t have anyone assigned to handle this message. Thus, nothing will happen with our message. A good thing Symfony tells us about this.

Handling a message

Handling a message is part of the application layer. You can compare a message handler with an application service. You need to act on some domain request and then do the necessary handling. Getting data from a repository, creating your domain objects or sending a simple email.

<?php
namespace App\Core\Application\MessageHandler;

use App\Core\Domain\Message\CreateSalesOrder;

final class CreateSalesOrderHandler
{
    public function __invoke(CreateSalesOrder $createSalesOrder)
    {
        // Todo: Create Purchase order
        var_dump($createSalesOrder);
    }
}

In our example, we just execute our var_dump like we did before.

But now you might wonder. How does our message bus know that it needs to use this handler for the CreateSalesOrder message?

We need to tag our message handler to let the message bus know where to look for its handlers.

App\Core\Application\MessageHandler\:
    resource: '../src/Core/Application/MessageHandler'
    tags: ['messenger.message_handler']

Here messenger in messenger.message_handler is the default message bus. We can define multiple message buses in messenger.yaml. This allows us to have separate message buses for events or sending messages to different microservices.

We tag all classes inside the App\Core\Application\MessageHandler namespace to be used as handlers. It will automatically detect which message can be sent to which handler based on our type definition of the invoke method.

If you now POST to purchaseOrder again, you will see the var_dump. You can find the complete example including docker here.

But you might now wonder, isn’t this still synchronous? I still am waiting for the handler to handle the message before I get a response?

Well, you are correct. We don’t have any queue setup yet so the messages get handled synchronously. But what we have done is decoupled our code. We have the ability to dispatch this message to a bus, and then handle it somewhere else.  By using transports and storing these message in a queue we can create an asynchronous process. We will look at that next week.