Add to .env
RABBITMQ_HOST=localhost |
|
RABBITMQ_PORT=5672 |
|
RABBITMQ_USERNAME=test |
|
RABBITMQ_PASSWORD=test |
|
RABBITMQ_QUEUE_NAME=task_queue |
Add to services.yaml
App\Command\WorkerCommand: |
|
arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%'] |
|
|
|
App\Command\PublisherCommand: |
|
arguments: ['%env(RABBITMQ_HOST)%', '%env(RABBITMQ_PORT)%', '%env(RABBITMQ_USERNAME)%', '%env(RABBITMQ_PASSWORD)%', '%env(RABBITMQ_QUEUE_NAME)%'] |
PublisherCommand.php
namespace App\Command; |
|
|
|
use Symfony\Component\Console\Command\Command; |
|
use Symfony\Component\Console\Input\InputArgument; |
|
use Symfony\Component\Console\Input\InputInterface; |
|
use Symfony\Component\Console\Input\InputOption; |
|
use Symfony\Component\Console\Output\OutputInterface; |
|
use Symfony\Component\Console\Style\SymfonyStyle; |
|
|
|
class PublisherCommand extends Command |
|
{ |
|
protected static $defaultName = 'publisher'; |
|
|
|
/** @var \PhpAmqpLib\Connection\AMQPStreamConnection */ |
|
private $connection; |
|
|
|
/** |
|
* @var \PhpAmqpLib\Channel\AMQPChannel |
|
*/ |
|
private $channel; |
|
|
|
private $queueName; |
|
|
|
/** |
|
* WorkerCommand constructor. |
|
* @param $host |
|
* @param $port |
|
* @param $username |
|
* @param $password |
|
* @param $queueName |
|
*/ |
|
public function __construct($host, $port, $username, $password, $queueName) |
|
{ |
|
parent::__construct(); |
|
$this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password); |
|
|
|
$this->channel = $this->connection->channel(); |
|
$this->queueName = $queueName; |
|
} |
|
|
|
protected function configure() |
|
{ |
|
$this |
|
->setDescription('Command publisher for testing purposes') |
|
->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description') |
|
->addOption('option1', null, InputOption::VALUE_NONE, 'Option description'); |
|
} |
|
|
|
/** |
|
* @param InputInterface $input |
|
* @param OutputInterface $output |
|
* |
|
* @return int|void|null |
|
*/ |
|
protected function execute(InputInterface $input, OutputInterface $output) |
|
{ |
|
$io = new SymfonyStyle($input, $output); |
|
|
|
$io->comment('START publisher'); |
|
|
|
try { |
|
$io->note('Creating messages. To stop press CTRL+C'); |
|
|
|
# Create the queue if it does not already exist. |
|
$this->channel->queue_declare( |
|
$queue = $this->queueName, |
|
$passive = false, |
|
$durable = true, |
|
$exclusive = false, |
|
$auto_delete = false, |
|
$nowait = false, |
|
$arguments = null, |
|
$ticket = null |
|
); |
|
|
|
$job_id = 0; |
|
while (true) { |
|
$jobArray = array( |
|
'id' => $job_id++, |
|
'task' => 'sleep', |
|
'sleep_period' => rand(0, 3) |
|
); |
|
|
|
$msg = new \PhpAmqpLib\Message\AMQPMessage( |
|
json_encode($jobArray, JSON_UNESCAPED_SLASHES), |
|
array('delivery_mode' => 2) # make message persistent |
|
); |
|
|
|
$this->channel->basic_publish($msg, '', $this->queueName); |
|
$io->comment('Job created '. $job_id); |
|
sleep(1); |
|
} |
|
|
|
$io->success('DONE!!!'); |
|
} catch (\ErrorException $error) { |
|
$io->error($error->getMessage()); |
|
} |
|
} |
|
|
|
/** |
|
* @throws \Exception |
|
*/ |
|
public function __destruct() |
|
{ |
|
$this->channel->close(); |
|
$this->connection->close(); |
|
} |
|
|
|
} |
WorkerCommand.php
namespace App\Command; |
|
|
|
use Symfony\Component\Console\Command\Command; |
|
use Symfony\Component\Console\Input\InputArgument; |
|
use Symfony\Component\Console\Input\InputInterface; |
|
use Symfony\Component\Console\Input\InputOption; |
|
use Symfony\Component\Console\Output\OutputInterface; |
|
use Symfony\Component\Console\Style\SymfonyStyle; |
|
|
|
class WorkerCommand extends Command |
|
{ |
|
protected static $defaultName = 'worker'; |
|
|
|
/** @var \PhpAmqpLib\Connection\AMQPStreamConnection */ |
|
private $connection; |
|
|
|
/** |
|
* @var \PhpAmqpLib\Channel\AMQPChannel |
|
*/ |
|
private $channel; |
|
|
|
private $queueName; |
|
|
|
/** |
|
* WorkerCommand constructor. |
|
* @param $host |
|
* @param $port |
|
* @param $username |
|
* @param $password |
|
* @param $queueName |
|
*/ |
|
public function __construct($host, $port, $username, $password, $queueName) |
|
{ |
|
parent::__construct(); |
|
$this->connection = new \PhpAmqpLib\Connection\AMQPStreamConnection($host, $port, $username, $password); |
|
|
|
$this->channel = $this->connection->channel(); |
|
$this->queueName = $queueName; |
|
} |
|
|
|
protected function configure() |
|
{ |
|
$this |
|
->setDescription('Command worker for testing purposes') |
|
->addArgument('arg1', InputArgument::OPTIONAL, 'Argument description') |
|
->addOption('option1', null, InputOption::VALUE_NONE, 'Option description'); |
|
} |
|
|
|
/** |
|
* @param InputInterface $input |
|
* @param OutputInterface $output |
|
* @return int|null|void |
|
* @throws \Exception |
|
*/ |
|
protected function execute(InputInterface $input, OutputInterface $output) |
|
{ |
|
$io = new SymfonyStyle($input, $output); |
|
|
|
$io->comment('START worker'); |
|
|
|
try { |
|
$io->note('Waiting for messages. To exit press CTRL+C'); |
|
|
|
# Create the queue if it does not already exist. |
|
$this->channel->queue_declare( |
|
$queue = $this->queueName, |
|
$passive = false, |
|
$durable = true, |
|
$exclusive = false, |
|
$auto_delete = false, |
|
$nowait = false, |
|
$arguments = null, |
|
$ticket = null |
|
); |
|
|
|
$callback = function ($msg) { |
|
echo " [x] Received ", $msg->body, "\n"; |
|
$job = json_decode($msg->body,true); |
|
sleep($job['sleep_period']); |
|
echo " [x] Done", "\n"; |
|
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); |
|
}; |
|
|
|
$this->channel->basic_qos(null, 1, null); |
|
|
|
$this->channel->basic_consume( |
|
$queue = $this->queueName, |
|
$consumer_tag = '', |
|
$no_local = false, |
|
$no_ack = false, |
|
$exclusive = false, |
|
$nowait = false, |
|
$callback |
|
); |
|
|
|
while (count($this->channel->callbacks)) { |
|
$this->channel->wait(); |
|
} |
|
|
|
$io->success('DONE!!!'); |
|
} catch (\ErrorException $error) { |
|
$io->error($error->getMessage()); |
|
} |
|
} |
|
|
|
/** |
|
* @throws \Exception |
|
*/ |
|
public function __destruct() |
|
{ |
|
$this->channel->close(); |
|
$this->connection->close(); |
|
} |
|
|
|
|
|
} |
Now you can fill the queue with publisher and check the current status of the queue at http://localhost:15672