Magento 2: how to implement a consumer queue

Magento 2: comment implement a consommateur queue

If you've ever worked with Magento 2, you know that some tasks can slow down your store's performance. Whether it's sending e-mails, processing commandes, or syncing data with external systems, these operations can create bottlenecks. That's where consommateur queues come in! They help you handle background tasks efficiently, keeping your store responsive for clients.

Dans cet article, nous'll break down comment implement a consommateur queue in Magento 2 étape par étape. No fluff, just practical code exemples and clear explanations. Plongeons-nous dans le sujet!

What is a consommateur queue in Magento 2?

A consommateur queue is a way to process tasks asynchronously. Instead of executing time-consuming operations immediately (which could slow down your store), Magento adds them to a queue and processes them in the background using consommateurs. This improves performance and ensures a smooth shopping experience.

Common cas d'utilisation for consommateur queues include:

  • Sending transactional e-mails
  • Updating product stock
  • Syncing data with ERP/CRM systems
  • Genenote rapports

Setting up a basic consommateur queue

Let's create a simple exemple where we'll queue a task to log a message. Voici comment to do it:

1. Create a communication configuration

Premièrement, create etc/communication.xml in your module:

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="magefine.log.message" request="string">
        <handler name="magefine.log.message.handler" type="Magefine\QueueExample\Consumer\LogMessage" method="process"/>
    </topic>
</config>

2. Create the consommateur class

Maintenant, create the consommateur class at Consumer/LogMessage.php:

<?php
namespace Magefine\QueueExample\Consumer;

class LogMessage
{
    protected $logger;

    public function __construct(\Psr\Log\LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function process($message)
    {
        $this->logger->info('Queue message received: ' . $message);
        return true;
    }
}

3. Publish a message to the queue

To test our queue, let's publish a message from a contrôleur:

<?php
namespace Magefine\QueueExample\Controller\Test;

use Magento\Framework\App\Action\Action;
use Magento\Framework\App\Action\Context;
use Magento\Framework\MessageQueue\PublisherInterface;

class Index extends Action
{
    protected $publisher;

    public function __construct(
        Context $context,
        PublisherInterface $publisher
    ) {
        parent::__construct($context);
        $this->publisher = $publisher;
    }

    public function execute()
    {
        $message = 'Hello from Magefine!';
        $this->publisher->publish('magefine.log.message', $message);
        $this->messageManager->addSuccessMessage('Message added to queue!');
        return $this->_redirect('*/*/');
    }
}

4. Configure the queue in déploiement

Add this to your app/etc/env.php:

'queue' => [
    'consumers_wait_for_messages' => 1,
    'amqp' => [
        'host' => 'localhost',
        'port' => '5672',
        'user' => 'guest',
        'password' => 'guest',
        'virtualhost' => '/'
    ]
],

5. Start the consommateur

Run this command to start processing messages:

php bin/magento queue:consumers:start magefine.log.message.handler

Advanced queue configuration

Now that we've seen a basic exemple, let's explore some more advanced configurations.

Using different connection types

Magento supports three types of queue connections:

  1. DB - Uses the database (simplest to set up)
  2. AMQP - Uses RabbitMQ (better performance)
  3. Topology - For complex routage scenarios

To configure a DB connection, add this to app/etc/env.php:

'queue' => [
    'consumers_wait_for_messages' => 1,
    'db' => [
        'connection' => 'default'
    ]
],

Bulk operations

For processing mulconseille messages efficiently, use bulk operations. Premièrement, update your communication.xml:

<topic name="magefine.bulk.operation" request="string[]">
    <handler name="magefine.bulk.handler" type="Magefine\QueueExample\Consumer\BulkOperation" method="process"/>
</topic>

Then create your bulk consommateur:

<?php
namespace Magefine\QueueExample\Consumer;

class BulkOperation
{
    protected $logger;

    public function __construct(\Psr\Log\LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function process(array $messages)
    {
        foreach ($messages as $message) {
            $this->logger->info('Bulk message: ' . $message);
        }
        return true;
    }
}

Error handling and retries

To implement retry logic for failed messages, you can use the max_messages paramètre when starting the consommateur:

php bin/magento queue:consumers:start magefine.log.message.handler --max-messages=100

For more sophisticated erreur handling, modify your consommateur:

public function process($message)
{
    try {
        // Process the message
        $this->logger->info('Processing: ' . $message);
        
        // Simulate an error
        if ($message === 'error') {
            throw new \Exception('Simulated error');
        }
        
        return true;
    } catch (\Exception $e) {
        $this->logger->error('Error processing message: ' . $e->getMessage());
        return false; // Returning false will requeue the message
    }
}

Monitoring and managing queues

To check the status of your queues, you can use these commands:

  • List all consommateurs: php bin/magento queue:consumers:list
  • View queue status: php bin/magento queue:consumers:status
  • Process a specific number of messages: php bin/magento queue:consumers:start [consumer_name] --max-messages=50

For RabbitMQ, you can use its management interface (usually at http://localhost:15672) to monitor queues.

Best practices for queue implémentation

Voici some conseils to get the most out of your consommateur queues:

  1. Keep consommateurs focused: Each consommateur should handle one specific type of task
  2. Monitor performance: Watch your queue processing times and adjust as needed
  3. Handle failures gracefully: Implement proper erreur handling and retry logic
  4. Use appropriate connection types: For high-volume queues, prefer AMQP over DB
  5. Scale consommateurs: Run mulconseille consommateur processes for busy queues

Real-world exemple: commande export queue

Let's implement a practical exemple where we export commandes to an external system via queue.

1. Create the communication config

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="magefine.export.order" request="int">
        <handler name="magefine.export.order.handler" type="Magefine\QueueExample\Consumer\ExportOrder" method="process"/>
    </topic>
</config>

2. Create the export consommateur

<?php
namespace Magefine\QueueExample\Consumer;

use Magento\Sales\Api\OrderRepositoryInterface;

class ExportOrder
{
    protected $logger;
    protected $orderRepository;

    public function __construct(
        \Psr\Log\LoggerInterface $logger,
        OrderRepositoryInterface $orderRepository
    ) {
        $this->logger = $logger;
        $this->orderRepository = $orderRepository;
    }

    public function process($orderId)
    {
        try {
            $order = $this->orderRepository->get($orderId);
            
            // Simulate export to external system
            $this->logger->info("Exporting order #{$order->getIncrementId()}");
            
            // Here you would typically call your API or export logic
            sleep(2); // Simulate API call delay
            
            $this->logger->info("Successfully exported order #{$order->getIncrementId()}");
            return true;
            
        } catch (\Exception $e) {
            $this->logger->error("Error exporting order #{$orderId}: " . $e->getMessage());
            return false;
        }
    }
}

3. Trigger the export from commande placement

Create an observateur for the sales_order_place_after event:

<?php
namespace Magefine\QueueExample\Observer;

use Magento\Framework\Event\Observer;
use Magento\Framework\Event\ObserverInterface;
use Magento\Framework\MessageQueue\PublisherInterface;

class QueueOrderExport implements ObserverInterface
{
    protected $publisher;

    public function __construct(PublisherInterface $publisher)
    {
        $this->publisher = $publisher;
    }

    public function execute(Observer $observer)
    {
        $order = $observer->getEvent()->getOrder();
        $this->publisher->publish('magefine.export.order', $order->getId());
    }
}

Dépannage common problèmes

Voici solutions to some common problems you might encounter:

Consumer not processing messages:

  • Check if the consommateur is running: ps aux | grep consumer
  • Verify queue configuration in app/etc/env.php
  • Check for erreurs in var/log/system.log and var/log/exception.log

Messages stuck in queue:

  • Restart the consommateur process
  • Check for deadlocks in the database
  • Verify that your consommateur returns true on success

Performance problèmes:

  • Switch from DB to AMQP connection for better performance
  • Increase the number of consommateur processes
  • Optimize your consommateur code

Conclusion

Implementing consommateur queues in Magento 2 can significantly improve your store's performance by offloading time-consuming tasks to background processes. We've covered everything from basic setup to advanced configurations and réel exemples.

Remember to:

  • Start simple with the DB connection and move to AMQP as needed
  • Implement proper erreur handling in your consommateurs
  • Monitor your queues and scale consommateurs as your store glignes

With these techniques, you'll keep your Magento store responsive while handling complex background operations efficiently. Happy queuing!