Magento 2: how to implement a consumer queue

Magento 2: how to implement a consumer queue

If you've ever worked with Magento 2, you know that some tasks can slow down your store's performance. Whether it's sending emails, processing orders, or syncing data with external systems, these operations can create bottlenecks. That's where consumer queues come in! They help you handle background tasks efficiently, keeping your store responsive for customers.

In this post, we'll break down how to implement a consumer queue in Magento 2 step by step. No fluff, just practical code examples and clear explanations. Let's dive in!

What is a consumer queue in Magento 2?

A consumer queue is a way to process tasks asynchronously. Instead of executing time-consuming operations immediately (which could slow down your store), Magento adds them to a queue and processes them in the background using consumers. This improves performance and ensures a smooth shopping experience.

Common use cases for consumer queues include:

  • Sending transactional emails
  • Updating product stock
  • Syncing data with ERP/CRM systems
  • Generating reports

Setting up a basic consumer queue

Let's create a simple example where we'll queue a task to log a message. Here's how to do it:

1. Create a communication configuration

First, create etc/communication.xml in your module:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="magefine.log.message" request="string">
        <handler name="magefine.log.message.handler" type="Magefine\QueueExample\Consumer\LogMessage" method="process"/>
    </topic>
</config>

2. Create the consumer class

Now, create the consumer class at Consumer/LogMessage.php:

<?php
namespace Magefine\QueueExample\Consumer;

class LogMessage
{
    protected $logger;

    public function __construct(\Psr\Log\LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function process($message)
    {
        $this->logger->info('Queue message received: ' . $message);
        return true;
    }
}

3. Publish a message to the queue

To test our queue, let's publish a message from a controller:

<?php
namespace Magefine\QueueExample\Controller\Test;

use Magento\Framework\App\Action\Action;
use Magento\Framework\App\Action\Context;
use Magento\Framework\MessageQueue\PublisherInterface;

class Index extends Action
{
    protected $publisher;

    public function __construct(
        Context $context,
        PublisherInterface $publisher
    ) {
        parent::__construct($context);
        $this->publisher = $publisher;
    }

    public function execute()
    {
        $message = 'Hello from Magefine!';
        $this->publisher->publish('magefine.log.message', $message);
        $this->messageManager->addSuccessMessage('Message added to queue!');
        return $this->_redirect('*/*/');
    }
}

4. Configure the queue in deployment

Add this to your app/etc/env.php:

'queue' => [
    'consumers_wait_for_messages' => 1,
    'amqp' => [
        'host' => 'localhost',
        'port' => '5672',
        'user' => 'guest',
        'password' => 'guest',
        'virtualhost' => '/'
    ]
],

5. Start the consumer

Run this command to start processing messages:

php bin/magento queue:consumers:start magefine.log.message.handler

Advanced queue configuration

Now that we've seen a basic example, let's explore some more advanced configurations.

Using different connection types

Magento supports three types of queue connections:

  1. DB - Uses the database (simplest to set up)
  2. AMQP - Uses RabbitMQ (better performance)
  3. Topology - For complex routing scenarios

To configure a DB connection, add this to app/etc/env.php:

'queue' => [
    'consumers_wait_for_messages' => 1,
    'db' => [
        'connection' => 'default'
    ]
],

Bulk operations

For processing multiple messages efficiently, use bulk operations. First, update your communication.xml:

<topic name="magefine.bulk.operation" request="string[]">
    <handler name="magefine.bulk.handler" type="Magefine\QueueExample\Consumer\BulkOperation" method="process"/>
</topic>

Then create your bulk consumer:

<?php
namespace Magefine\QueueExample\Consumer;

class BulkOperation
{
    protected $logger;

    public function __construct(\Psr\Log\LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function process(array $messages)
    {
        foreach ($messages as $message) {
            $this->logger->info('Bulk message: ' . $message);
        }
        return true;
    }
}

Error handling and retries

To implement retry logic for failed messages, you can use the max_messages parameter when starting the consumer:

php bin/magento queue:consumers:start magefine.log.message.handler --max-messages=100

For more sophisticated error handling, modify your consumer:

public function process($message)
{
    try {
        // Process the message
        $this->logger->info('Processing: ' . $message);
        
        // Simulate an error
        if ($message === 'error') {
            throw new \Exception('Simulated error');
        }
        
        return true;
    } catch (\Exception $e) {
        $this->logger->error('Error processing message: ' . $e->getMessage());
        return false; // Returning false will requeue the message
    }
}

Monitoring and managing queues

To check the status of your queues, you can use these commands:

  • List all consumers: php bin/magento queue:consumers:list
  • View queue status: php bin/magento queue:consumers:status
  • Process a specific number of messages: php bin/magento queue:consumers:start [consumer_name] --max-messages=50

For RabbitMQ, you can use its management interface (usually at http://localhost:15672) to monitor queues.

Best practices for queue implementation

Here are some tips to get the most out of your consumer queues:

  1. Keep consumers focused: Each consumer should handle one specific type of task
  2. Monitor performance: Watch your queue processing times and adjust as needed
  3. Handle failures gracefully: Implement proper error handling and retry logic
  4. Use appropriate connection types: For high-volume queues, prefer AMQP over DB
  5. Scale consumers: Run multiple consumer processes for busy queues

Real-world example: order export queue

Let's implement a practical example where we export orders to an external system via queue.

1. Create the communication config

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="magefine.export.order" request="int">
        <handler name="magefine.export.order.handler" type="Magefine\QueueExample\Consumer\ExportOrder" method="process"/>
    </topic>
</config>

2. Create the export consumer

<?php
namespace Magefine\QueueExample\Consumer;

use Magento\Sales\Api\OrderRepositoryInterface;

class ExportOrder
{
    protected $logger;
    protected $orderRepository;

    public function __construct(
        \Psr\Log\LoggerInterface $logger,
        OrderRepositoryInterface $orderRepository
    ) {
        $this->logger = $logger;
        $this->orderRepository = $orderRepository;
    }

    public function process($orderId)
    {
        try {
            $order = $this->orderRepository->get($orderId);
            
            // Simulate export to external system
            $this->logger->info("Exporting order #{$order->getIncrementId()}");
            
            // Here you would typically call your API or export logic
            sleep(2); // Simulate API call delay
            
            $this->logger->info("Successfully exported order #{$order->getIncrementId()}");
            return true;
            
        } catch (\Exception $e) {
            $this->logger->error("Error exporting order #{$orderId}: " . $e->getMessage());
            return false;
        }
    }
}

3. Trigger the export from order placement

Create an observer for the sales_order_place_after event:

<?php
namespace Magefine\QueueExample\Observer;

use Magento\Framework\Event\Observer;
use Magento\Framework\Event\ObserverInterface;
use Magento\Framework\MessageQueue\PublisherInterface;

class QueueOrderExport implements ObserverInterface
{
    protected $publisher;

    public function __construct(PublisherInterface $publisher)
    {
        $this->publisher = $publisher;
    }

    public function execute(Observer $observer)
    {
        $order = $observer->getEvent()->getOrder();
        $this->publisher->publish('magefine.export.order', $order->getId());
    }
}

Troubleshooting common issues

Here are solutions to some common problems you might encounter:

Consumer not processing messages:

  • Check if the consumer is running: ps aux | grep consumer
  • Verify queue configuration in app/etc/env.php
  • Check for errors in var/log/system.log and var/log/exception.log

Messages stuck in queue:

  • Restart the consumer process
  • Check for deadlocks in the database
  • Verify that your consumer returns true on success

Performance issues:

  • Switch from DB to AMQP connection for better performance
  • Increase the number of consumer processes
  • Optimize your consumer code

Conclusion

Implementing consumer queues in Magento 2 can significantly improve your store's performance by offloading time-consuming tasks to background processes. We've covered everything from basic setup to advanced configurations and real-world examples.

Remember to:

  • Start simple with the DB connection and move to AMQP as needed
  • Implement proper error handling in your consumers
  • Monitor your queues and scale consumers as your store grows

With these techniques, you'll keep your Magento store responsive while handling complex background operations efficiently. Happy queuing!