Magento 2: how to implement a consumer queue

Magento 2: how to implement a consumer queue
If you've ever worked with Magento 2, you know that some tasks can slow down your store's performance. Whether it's sending emails, processing orders, or syncing data with external systems, these operations can create bottlenecks. That's where consumer queues come in! They help you handle background tasks efficiently, keeping your store responsive for customers.
In this post, we'll break down how to implement a consumer queue in Magento 2 step by step. No fluff, just practical code examples and clear explanations. Let's dive in!
What is a consumer queue in Magento 2?
A consumer queue is a way to process tasks asynchronously. Instead of executing time-consuming operations immediately (which could slow down your store), Magento adds them to a queue and processes them in the background using consumers. This improves performance and ensures a smooth shopping experience.
Common use cases for consumer queues include:
- Sending transactional emails
- Updating product stock
- Syncing data with ERP/CRM systems
- Generating reports
Setting up a basic consumer queue
Let's create a simple example where we'll queue a task to log a message. Here's how to do it:
1. Create a communication configuration
First, create etc/communication.xml
in your module:
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="magefine.log.message" request="string">
<handler name="magefine.log.message.handler" type="Magefine\QueueExample\Consumer\LogMessage" method="process"/>
</topic>
</config>
2. Create the consumer class
Now, create the consumer class at Consumer/LogMessage.php
:
<?php
namespace Magefine\QueueExample\Consumer;
class LogMessage
{
protected $logger;
public function __construct(\Psr\Log\LoggerInterface $logger)
{
$this->logger = $logger;
}
public function process($message)
{
$this->logger->info('Queue message received: ' . $message);
return true;
}
}
3. Publish a message to the queue
To test our queue, let's publish a message from a controller:
<?php
namespace Magefine\QueueExample\Controller\Test;
use Magento\Framework\App\Action\Action;
use Magento\Framework\App\Action\Context;
use Magento\Framework\MessageQueue\PublisherInterface;
class Index extends Action
{
protected $publisher;
public function __construct(
Context $context,
PublisherInterface $publisher
) {
parent::__construct($context);
$this->publisher = $publisher;
}
public function execute()
{
$message = 'Hello from Magefine!';
$this->publisher->publish('magefine.log.message', $message);
$this->messageManager->addSuccessMessage('Message added to queue!');
return $this->_redirect('*/*/');
}
}
4. Configure the queue in deployment
Add this to your app/etc/env.php
:
'queue' => [
'consumers_wait_for_messages' => 1,
'amqp' => [
'host' => 'localhost',
'port' => '5672',
'user' => 'guest',
'password' => 'guest',
'virtualhost' => '/'
]
],
5. Start the consumer
Run this command to start processing messages:
php bin/magento queue:consumers:start magefine.log.message.handler
Advanced queue configuration
Now that we've seen a basic example, let's explore some more advanced configurations.
Using different connection types
Magento supports three types of queue connections:
- DB - Uses the database (simplest to set up)
- AMQP - Uses RabbitMQ (better performance)
- Topology - For complex routing scenarios
To configure a DB connection, add this to app/etc/env.php
:
'queue' => [
'consumers_wait_for_messages' => 1,
'db' => [
'connection' => 'default'
]
],
Bulk operations
For processing multiple messages efficiently, use bulk operations. First, update your communication.xml:
<topic name="magefine.bulk.operation" request="string[]">
<handler name="magefine.bulk.handler" type="Magefine\QueueExample\Consumer\BulkOperation" method="process"/>
</topic>
Then create your bulk consumer:
<?php
namespace Magefine\QueueExample\Consumer;
class BulkOperation
{
protected $logger;
public function __construct(\Psr\Log\LoggerInterface $logger)
{
$this->logger = $logger;
}
public function process(array $messages)
{
foreach ($messages as $message) {
$this->logger->info('Bulk message: ' . $message);
}
return true;
}
}
Error handling and retries
To implement retry logic for failed messages, you can use the max_messages
parameter when starting the consumer:
php bin/magento queue:consumers:start magefine.log.message.handler --max-messages=100
For more sophisticated error handling, modify your consumer:
public function process($message)
{
try {
// Process the message
$this->logger->info('Processing: ' . $message);
// Simulate an error
if ($message === 'error') {
throw new \Exception('Simulated error');
}
return true;
} catch (\Exception $e) {
$this->logger->error('Error processing message: ' . $e->getMessage());
return false; // Returning false will requeue the message
}
}
Monitoring and managing queues
To check the status of your queues, you can use these commands:
- List all consumers:
php bin/magento queue:consumers:list
- View queue status:
php bin/magento queue:consumers:status
- Process a specific number of messages:
php bin/magento queue:consumers:start [consumer_name] --max-messages=50
For RabbitMQ, you can use its management interface (usually at http://localhost:15672) to monitor queues.
Best practices for queue implementation
Here are some tips to get the most out of your consumer queues:
- Keep consumers focused: Each consumer should handle one specific type of task
- Monitor performance: Watch your queue processing times and adjust as needed
- Handle failures gracefully: Implement proper error handling and retry logic
- Use appropriate connection types: For high-volume queues, prefer AMQP over DB
- Scale consumers: Run multiple consumer processes for busy queues
Real-world example: order export queue
Let's implement a practical example where we export orders to an external system via queue.
1. Create the communication config
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="magefine.export.order" request="int">
<handler name="magefine.export.order.handler" type="Magefine\QueueExample\Consumer\ExportOrder" method="process"/>
</topic>
</config>
2. Create the export consumer
<?php
namespace Magefine\QueueExample\Consumer;
use Magento\Sales\Api\OrderRepositoryInterface;
class ExportOrder
{
protected $logger;
protected $orderRepository;
public function __construct(
\Psr\Log\LoggerInterface $logger,
OrderRepositoryInterface $orderRepository
) {
$this->logger = $logger;
$this->orderRepository = $orderRepository;
}
public function process($orderId)
{
try {
$order = $this->orderRepository->get($orderId);
// Simulate export to external system
$this->logger->info("Exporting order #{$order->getIncrementId()}");
// Here you would typically call your API or export logic
sleep(2); // Simulate API call delay
$this->logger->info("Successfully exported order #{$order->getIncrementId()}");
return true;
} catch (\Exception $e) {
$this->logger->error("Error exporting order #{$orderId}: " . $e->getMessage());
return false;
}
}
}
3. Trigger the export from order placement
Create an observer for the sales_order_place_after
event:
<?php
namespace Magefine\QueueExample\Observer;
use Magento\Framework\Event\Observer;
use Magento\Framework\Event\ObserverInterface;
use Magento\Framework\MessageQueue\PublisherInterface;
class QueueOrderExport implements ObserverInterface
{
protected $publisher;
public function __construct(PublisherInterface $publisher)
{
$this->publisher = $publisher;
}
public function execute(Observer $observer)
{
$order = $observer->getEvent()->getOrder();
$this->publisher->publish('magefine.export.order', $order->getId());
}
}
Troubleshooting common issues
Here are solutions to some common problems you might encounter:
Consumer not processing messages:
- Check if the consumer is running:
ps aux | grep consumer
- Verify queue configuration in
app/etc/env.php
- Check for errors in
var/log/system.log
andvar/log/exception.log
Messages stuck in queue:
- Restart the consumer process
- Check for deadlocks in the database
- Verify that your consumer returns true on success
Performance issues:
- Switch from DB to AMQP connection for better performance
- Increase the number of consumer processes
- Optimize your consumer code
Conclusion
Implementing consumer queues in Magento 2 can significantly improve your store's performance by offloading time-consuming tasks to background processes. We've covered everything from basic setup to advanced configurations and real-world examples.
Remember to:
- Start simple with the DB connection and move to AMQP as needed
- Implement proper error handling in your consumers
- Monitor your queues and scale consumers as your store grows
With these techniques, you'll keep your Magento store responsive while handling complex background operations efficiently. Happy queuing!