RabbitMQ Work Queue with Symfony

Add to .env

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=test
RABBITMQ_PASSWORD=test
RABBITMQ_QUEUE_NAME=task_queue

Add to services.yaml

App\Command\WorkerCommand:
        arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%']

    App\Command\PublisherCommand:
            arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%']

PublisherCommand.php


namespace App\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

class PublisherCommand extends Command
{
    protected static $defaultName = 'publisher';

    /** @var \PhpAmqpLib\Connection\AMQPStreamConnection  */
    private $connection;

    /**
     * @var \PhpAmqpLib\Channel\AMQPChannel
     */
    private $channel;

    private $queueName;

    /**
     * WorkerCommand constructor.
     * @param $host
     * @param $port
     * @param $username
     * @param $password
     * @param $queueName
     */
    public function __construct($host, $port, $username, $password, $queueName)
    {
        parent::__construct();
        $this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password);

        $this->channel = $this->connection->channel();
        $this->queueName = $queueName;
    }

    protected function configure()
    {
        $this
            ->setDescription('Command publisher for testing purposes')
            ->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description')
            ->addOption('option1', null, InputOption::VALUE_NONE, 'Option description');
    }

    /**
     * @param InputInterface  $input
     * @param OutputInterface $output
     *
     * @return int|void|null
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $io = new SymfonyStyle($input, $output);

        $io->comment('START publisher');

        try {
            $io->note('Creating messages. To stop press CTRL+C');

            # Create the queue if it does not already exist.
            $this->channel->queue_declare(
                $queue = $this->queueName,
                $passive = false,
                $durable = true,
                $exclusive = false,
                $auto_delete = false,
                $nowait = false,
                $arguments = null,
                $ticket = null
            );

            $job_id = 0;
            while (true) {
                $jobArray = array(
                    'id' => $job_id++,
                    'task' => 'sleep',
                    'sleep_period' => rand(0, 3)
                );

                $msg = new \PhpAmqpLib\Message\AMQPMessage(
                    json_encode($jobArray, JSON_UNESCAPED_SLASHES),
                    array('delivery_mode' => 2) # make message persistent
                );

                $this->channel->basic_publish($msg, '', $this->queueName);
                $io->comment('Job created '. $job_id);
                sleep(1);
            }

            $io->success('DONE!!!');
        } catch (\ErrorException $error) {
            $io->error($error->getMessage());
        }
    }

    /**
     * @throws \Exception
     */
    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }

}

WorkerCommand.php


namespace App\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;

class WorkerCommand extends Command
{
    protected static $defaultName = 'worker';

    /** @var \PhpAmqpLib\Connection\AMQPStreamConnection  */
    private $connection;

    /**
     * @var \PhpAmqpLib\Channel\AMQPChannel
     */
    private $channel;

    private $queueName;

    /**
     * WorkerCommand constructor.
     * @param $host
     * @param $port
     * @param $username
     * @param $password
     * @param $queueName
     */
    public function __construct($host, $port, $username, $password, $queueName)
    {
        parent::__construct();
        $this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password);

        $this->channel = $this->connection->channel();
        $this->queueName = $queueName;
    }

    protected function configure()
    {
        $this
            ->setDescription('Command worker for testing purposes')
            ->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description')
            ->addOption('option1', null, InputOption::VALUE_NONE, 'Option description');
    }

    /**
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return int|null|void
     * @throws \Exception
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $io = new SymfonyStyle($input, $output);

        $io->comment('START worker');

        try {
            $io->note('Waiting for messages. To exit press CTRL+C');

            # Create the queue if it does not already exist.
            $this->channel->queue_declare(
                $queue = $this->queueName,
                $passive = false,
                $durable = true,
                $exclusive = false,
                $auto_delete = false,
                $nowait = false,
                $arguments = null,
                $ticket = null
            );

            $callback = function ($msg) {
                echo " [x] Received ", $msg->body, "\n";
                $job = json_decode($msg->body,true);
                sleep($job['sleep_period']);
                echo " [x] Done", "\n";
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            };

            $this->channel->basic_qos(null, 1, null);

            $this->channel->basic_consume(
                $queue = $this->queueName,
                $consumer_tag = '',
                $no_local = false,
                $no_ack = false,
                $exclusive = false,
                $nowait = false,
                $callback
            );

            while (count($this->channel->callbacks)) {
                $this->channel->wait();
            }

            $io->success('DONE!!!');
        } catch (\ErrorException $error) {
            $io->error($error->getMessage());
        }
    }

    /**
     * @throws \Exception
     */
    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }


}

Now you can fill the queue with publisher and check the current status of the queue at http://localhost:15672

Leave a Reply

Your email address will not be published. Required fields are marked *