Add to .env
RABBITMQ_HOST=localhost RABBITMQ_PORT=5672 RABBITMQ_USERNAME=test RABBITMQ_PASSWORD=test RABBITMQ_QUEUE_NAME=task_queue
Add to services.yaml
App\Command\WorkerCommand:
arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%']
App\Command\PublisherCommand:
arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%']
PublisherCommand.php
namespace App\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
class PublisherCommand extends Command
{
protected static $defaultName = 'publisher';
/** @var \PhpAmqpLib\Connection\AMQPStreamConnection */
private $connection;
/**
* @var \PhpAmqpLib\Channel\AMQPChannel
*/
private $channel;
private $queueName;
/**
* WorkerCommand constructor.
* @param $host
* @param $port
* @param $username
* @param $password
* @param $queueName
*/
public function __construct($host, $port, $username, $password, $queueName)
{
parent::__construct();
$this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password);
$this->channel = $this->connection->channel();
$this->queueName = $queueName;
}
protected function configure()
{
$this
->setDescription('Command publisher for testing purposes')
->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description')
->addOption('option1', null, InputOption::VALUE_NONE, 'Option description');
}
/**
* @param InputInterface $input
* @param OutputInterface $output
*
* @return int|void|null
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output);
$io->comment('START publisher');
try {
$io->note('Creating messages. To stop press CTRL+C');
# Create the queue if it does not already exist.
$this->channel->queue_declare(
$queue = $this->queueName,
$passive = false,
$durable = true,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
);
$job_id = 0;
while (true) {
$jobArray = array(
'id' => $job_id++,
'task' => 'sleep',
'sleep_period' => rand(0, 3)
);
$msg = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($jobArray, JSON_UNESCAPED_SLASHES),
array('delivery_mode' => 2) # make message persistent
);
$this->channel->basic_publish($msg, '', $this->queueName);
$io->comment('Job created '. $job_id);
sleep(1);
}
$io->success('DONE!!!');
} catch (\ErrorException $error) {
$io->error($error->getMessage());
}
}
/**
* @throws \Exception
*/
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
WorkerCommand.php
namespace App\Command;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
class WorkerCommand extends Command
{
protected static $defaultName = 'worker';
/** @var \PhpAmqpLib\Connection\AMQPStreamConnection */
private $connection;
/**
* @var \PhpAmqpLib\Channel\AMQPChannel
*/
private $channel;
private $queueName;
/**
* WorkerCommand constructor.
* @param $host
* @param $port
* @param $username
* @param $password
* @param $queueName
*/
public function __construct($host, $port, $username, $password, $queueName)
{
parent::__construct();
$this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password);
$this->channel = $this->connection->channel();
$this->queueName = $queueName;
}
protected function configure()
{
$this
->setDescription('Command worker for testing purposes')
->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description')
->addOption('option1', null, InputOption::VALUE_NONE, 'Option description');
}
/**
* @param InputInterface $input
* @param OutputInterface $output
* @return int|null|void
* @throws \Exception
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$io = new SymfonyStyle($input, $output);
$io->comment('START worker');
try {
$io->note('Waiting for messages. To exit press CTRL+C');
# Create the queue if it does not already exist.
$this->channel->queue_declare(
$queue = $this->queueName,
$passive = false,
$durable = true,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
);
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
$job = json_decode($msg->body,true);
sleep($job['sleep_period']);
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$queue = $this->queueName,
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$callback
);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
$io->success('DONE!!!');
} catch (\ErrorException $error) {
$io->error($error->getMessage());
}
}
/**
* @throws \Exception
*/
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
Now you can fill the queue with publisher and check the current status of the queue at http://localhost:15672