Add to .env
RABBITMQ_HOST=localhost RABBITMQ_PORT=5672 RABBITMQ_USERNAME=test RABBITMQ_PASSWORD=test RABBITMQ_QUEUE_NAME=task_queue
Add to services.yaml
App\Command\WorkerCommand: arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%'] App\Command\PublisherCommand: arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%']
PublisherCommand.php
namespace App\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; class PublisherCommand extends Command { protected static $defaultName = 'publisher'; /** @var \PhpAmqpLib\Connection\AMQPStreamConnection */ private $connection; /** * @var \PhpAmqpLib\Channel\AMQPChannel */ private $channel; private $queueName; /** * WorkerCommand constructor. * @param $host * @param $port * @param $username * @param $password * @param $queueName */ public function __construct($host, $port, $username, $password, $queueName) { parent::__construct(); $this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password); $this->channel = $this->connection->channel(); $this->queueName = $queueName; } protected function configure() { $this ->setDescription('Command publisher for testing purposes') ->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description') ->addOption('option1', null, InputOption::VALUE_NONE, 'Option description'); } /** * @param InputInterface $input * @param OutputInterface $output * * @return int|void|null */ protected function execute(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output); $io->comment('START publisher'); try { $io->note('Creating messages. To stop press CTRL+C'); # Create the queue if it does not already exist. $this->channel->queue_declare( $queue = $this->queueName, $passive = false, $durable = true, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = null, $ticket = null ); $job_id = 0; while (true) { $jobArray = array( 'id' => $job_id++, 'task' => 'sleep', 'sleep_period' => rand(0, 3) ); $msg = new \PhpAmqpLib\Message\AMQPMessage( json_encode($jobArray, JSON_UNESCAPED_SLASHES), array('delivery_mode' => 2) # make message persistent ); $this->channel->basic_publish($msg, '', $this->queueName); $io->comment('Job created '. $job_id); sleep(1); } $io->success('DONE!!!'); } catch (\ErrorException $error) { $io->error($error->getMessage()); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); } }
WorkerCommand.php
namespace App\Command; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; class WorkerCommand extends Command { protected static $defaultName = 'worker'; /** @var \PhpAmqpLib\Connection\AMQPStreamConnection */ private $connection; /** * @var \PhpAmqpLib\Channel\AMQPChannel */ private $channel; private $queueName; /** * WorkerCommand constructor. * @param $host * @param $port * @param $username * @param $password * @param $queueName */ public function __construct($host, $port, $username, $password, $queueName) { parent::__construct(); $this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password); $this->channel = $this->connection->channel(); $this->queueName = $queueName; } protected function configure() { $this ->setDescription('Command worker for testing purposes') ->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description') ->addOption('option1', null, InputOption::VALUE_NONE, 'Option description'); } /** * @param InputInterface $input * @param OutputInterface $output * @return int|null|void * @throws \Exception */ protected function execute(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output); $io->comment('START worker'); try { $io->note('Waiting for messages. To exit press CTRL+C'); # Create the queue if it does not already exist. $this->channel->queue_declare( $queue = $this->queueName, $passive = false, $durable = true, $exclusive = false, $auto_delete = false, $nowait = false, $arguments = null, $ticket = null ); $callback = function ($msg) { echo " [x] Received ", $msg->body, "\n"; $job = json_decode($msg->body,true); sleep($job['sleep_period']); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume( $queue = $this->queueName, $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback ); while (count($this->channel->callbacks)) { $this->channel->wait(); } $io->success('DONE!!!'); } catch (\ErrorException $error) { $io->error($error->getMessage()); } } /** * @throws \Exception */ public function __destruct() { $this->channel->close(); $this->connection->close(); } }
Now you can fill the queue with publisher and check the current status of the queue at http://localhost:15672