如今做哪些網(wǎng)站致富手機(jī)營銷推廣方案
基于php-amqplib/php-amqplib組件適配laravel框架的amqp封裝庫
支持便捷可配置的隊列工作模式?官網(wǎng)詳情
在此基礎(chǔ)上可支持延遲消息、死信隊列等機(jī)制。
環(huán)境要求:
PHP版本: ^7.3|^8.0
需要開啟的擴(kuò)展: socket
其他:
- 如果需要實現(xiàn)延遲任務(wù)需要安裝對應(yīng)版本的rabbitmq延遲插件,以rabbitmq3.9.0版本為例:
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
用法:
第一步 安裝組件:
composer require sai97/laravel-amqp
第二步 發(fā)布服務(wù)以及配置:
php artisan vendor:publish --provider="Sai97\LaravelAmqp\AmqpQueueProviders"
執(zhí)行完后會分別在app/config目錄下生成amqp.php(amqp連接配置等)、app/QueueJob/DefaultQueueJob.php(默認(rèn)隊列任務(wù))
amqp.php
<?phpuse App\QueueJob\DefaultQueueJob;return ["connection" => ["default" => ["host" => env("AMQP_HOST", "127.0.0.1"),"port" => env("AMQP_PORT", 5672),"user" => env("AMQP_USER", "root"),"password" => env("AMQP_PASSWORD", "root")]],"event" => ["default" => DefaultQueueJob::class,]
];
connection為amqp連接配置,可根據(jù)自身業(yè)務(wù)去調(diào)整,完全對應(yīng)php-amqplib/php-amqplib相關(guān)配置項, event是隊列實例標(biāo)識,最好和connection用相同的key以便管理。
目前可支持相關(guān)接口項:
//獲取連接名稱public function getConnectName(): string;//獲取交換機(jī)名稱public function getExchangeName(): string;//獲取交換機(jī)類型public function getExchangeType(): string;//獲取隊列名稱public function getQueueName(): string;//獲取路由KEYpublic function getRoutingKey(): string;//獲取ContentTypepublic function getContentType(): string;//是否開啟死信模式public function isDeadLetter(): bool;//獲取死信交換機(jī)名稱public function getDeadLetterExchangeName(): string;//獲取死信路由KEYpublic function getDeadLetterRoutingKey(): string;//獲取死信隊列名稱public function getDeadLetterQueueName(): string;//是否開啟延遲任務(wù)public function isDelay(): bool;//獲取延遲任務(wù)過期時長public function getDelayTTL(): int;//獲取隊列附加參數(shù)public function getQueueArgs(): array;//獲取回調(diào)函數(shù)public function getCallback(): callable;//是否自動提交ACKpublic function isAutoAck(): bool;
當(dāng)然你也可以自定義隊列實例,只要繼承Sai97\LaravelAmqp\Queue基類即可,具體功能配置參數(shù)參考Sai97\LaravelAmqp\QueueInterface。
代碼示例:
生產(chǎn)者:
$message = "This is message...";
$amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance(DefaultQueueJob::class));
$amqpQueueServices->producer($message);
消費者:
利用laravel自帶的Command去定義一個RabbitMQWorker自定義命令行,僅需要定義一次,后續(xù)只需要更改amqp.php配置文件添加不同的隊列實例綁定關(guān)系即可,以下是RabbitMQWorker演示代碼:
<?phpnamespace App\Console\Commands;use Illuminate\Console\Command;
use Sai97\LaravelAmqp\AmqpQueueServices;
use Sai97\LaravelAmqp\QueueFactory;class RabbitMQWorker extends Command
{/*** The name and signature of the console command.** @var string*/protected $signature = 'rabbitmq:worker {event}';/*** The console command description.** @var string*/protected $description = 'rabbitmq worker 消費進(jìn)程';/*** Create a new command instance.** @return void*/public function __construct(){parent::__construct();}/*** Execute the console command.** @return mixed*/public function handle(){try {$event = $this->argument("event");$eventConfig = config("amqp.event");if (!isset($eventConfig[$event]) || empty($entity = $eventConfig[$event])) {return $this->error("未知的事件: {$event}");}$this->info("rabbitmq worker of event[{$event}] process start ...");$amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance($entity));$amqpQueueServices->consumer();} catch (\Throwable $throwable) {$event = $event ?? "";$this->error($throwable->getFile() . " [{$throwable->getLine()}]");return $this->error("rabbitmq worker of event[{$event}] process error:{$throwable->getMessage()}");}$this->info("rabbitmq worker of event[{$event}] process stop ...");}
}
完成RabbitMQWorker消費者命令后,我們只需執(zhí)行php artisan rabbitmq:worker default 完成監(jiān)聽,其中default是可變的,請根據(jù)的amqp.php配置中的隊列實例綁定標(biāo)識去輸入。
因為隊列的消費者都需要是守護(hù)進(jìn)程,所以我們可以依托supervisord進(jìn)程管理器去定義RabbitMQWorker消費者命令,這樣可以保證進(jìn)程可后臺允許以及重啟啟動等,以下是supervisord.conf配置文件示例:
[program:rabbitmq-worker-default]
#process_name=%(program_name)s_%(process_num)d
process_name=worker_%(process_num)d
numprocs=3
command=/usr/local/bin/php /app/www/laravel8/artisan rabbitmq:worker default
autostart=true
autorestart=true
startretries=3
priority=3
stdout_logfile=/var/log/rabbitmq-worker-default.log
redirect_stderr=true
搭配supervisord來進(jìn)行管理消費者進(jìn)程有許多便捷的方面:
- 如果需要新增一個隊列實例,只需要按照上述格式復(fù)制一個program,可以在不影響其他進(jìn)程的情況下進(jìn)程更新supervisord配置:
supervisorctl update
? ? 2. 通過配置numprocs參數(shù)來設(shè)定需要開啟多少個相同配置項的消費者worker,這在任務(wù)分發(fā)、并行處理等場景十分適用,大大提高消費者執(zhí)行效率。
這里不詳細(xì)敘述supervisord相關(guān)操作,具體可查看supervisord官方文檔。
參考鏈接:https://github.com/Z-Sai/laravel-amqp