PHP 或者 Yii2框架集成 RabbitMQ SDK (实现断线重连)
一、Yii2 消费者断连自动重连(增强版代码)
核心优化:网络波动/ RabbitMQ 重启后,自动重连队列,无需手动重启程序,兼容直接使用 php-amqplib 和 yii2-queue 扩展两种场景。
方案1:直接使用 php-amqplib(独立脚本,console 控制器)
<?php
namespace app\console\controllers;
use yii\console\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
class RabbitmqConsumerController extends Controller
{
// RabbitMQ 配置
private $config = [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'queue' => 'test_queue',
'retry_delay' => 5, // 断连后重试间隔(秒)
];
public function actionListen()
{
echo "消费者启动,开始监听队列({$this->config['queue']})...\n";
echo "断连会自动重试,按 Ctrl+C 退出\n";
while (true) { // 外层重连循环
$connection = null;
$channel = null;
try {
// 1. 建立连接
$connection = new AMQPStreamConnection(
$this->config['host'],
$this->config['port'],
$this->config['user'],
$this->config['password']
);
$channel = $connection->channel();
// 2. 声明队列(与生产者一致,持久化)
$channel->queue_declare(
$this->config['queue'],
false, // passive
true, // durable
false, // exclusive
false // auto_delete
);
// 3. 配置 QoS(每次推1条,防止过载)
$channel->basic_qos(null, 1, null);
// 4. 消息处理回调
$callback = function ($msg) {
try {
// 业务逻辑:处理消息(替换成你的实际需求)
echo date('Y-m-d H:i:s') . " 收到消息:{$msg->body}\n";
// 模拟处理耗时(可选)
// sleep(1);
$msg->ack(); // 处理成功,手动确认
} catch (\Exception $e) {
echo date('Y-m-d H:i:s') . " 消息处理失败:{$e->getMessage()}\n";
$msg->nack(false, false); // 处理失败,不重新入队(可根据需求调整为 true 重新入队)
}
};
// 5. 注册订阅,开始监听
$channel->basic_consume(
$this->config['queue'],
'', // consumer_tag
false, // no_local
false, // no_ack(关闭自动确认)
false, // exclusive
false, // nowait
$callback
);
// 6. 阻塞监听(有消息触发回调,无消息阻塞,不耗CPU)
while ($channel->is_consuming()) {
$channel->wait();
}
} catch (AMQPConnectionClosedException | AMQPChannelClosedException $e) {
// 连接/通道断开,触发重连
echo date('Y-m-d H:i:s') . " 连接断开:{$e->getMessage()},{$this->config['retry_delay']}秒后重试...\n";
} catch (\Exception $e) {
// 其他异常
echo date('Y-m-d H:i:s') . " 未知错误:{$e->getMessage()},{$this->config['retry_delay']}秒后重试...\n";
} finally {
// 关闭连接(防止资源泄露)
if ($channel) {
try {
$channel->close();
} catch (\Exception $e) {}
}
if ($connection) {
try {
$connection->close();
} catch (\Exception $e) {}
}
// 重试间隔(断连后休眠,避免频繁重试)
sleep($this->config['retry_delay']);
}
}
}
}方案2:基于 yii2-queue 扩展(简化版,利用框架封装)
yii2-queue 已内置基础重连机制,只需配置重试参数,配合后台运行即可:
修改
config/main.php队列配置:'components' => [ 'queue' => [ 'class' => \yii\queue\amqp\Queue::class, 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'queueName' => 'test_queue', 'driver' => \yii\queue\amqp\ drivers\PhpAmqpLib::class, 'attempts' => 3, // 消息处理失败重试次数 'retryDelay' => 5, // 重试间隔(秒) ], ],消息处理类(
app\jobs\TestJob.php):<?php namespace app\jobs; use yii\queue\JobInterface; use yii\base\BaseObject; class TestJob extends BaseObject implements JobInterface { public $message; // 接收的消息内容 public function execute($queue) { // 业务逻辑:处理消息 echo date('Y-m-d H:i:s') . " 处理消息:{$this->message}\n"; } }生产者发送消息(控制器中):
Yii::$app->queue->push(new \app\jobs\TestJob([ 'message' => 'Yii2-queue 发送的消息', ]));
二、Linux 后台运行完整脚本(确保长期稳定)
方案1:直接使用 php-amqplib 时的后台运行脚本
创建启动脚本(
/usr/local/rabbitmq-consumer/start.sh):#!/bin/bash # 定义变量 CONSOLE_PATH="/var/www/your-yii2-project/yii" # Yii2 控制台入口文件路径 CONTROLLER="rabbitmq-consumer/listen" # 控制器+动作 LOG_FILE="/var/log/rabbitmq-consumer.log" # 日志文件路径 PID_FILE="/var/run/rabbitmq-consumer.pid" # PID文件(用于停止进程) # 检查是否已运行 if [ -f "$PID_FILE" ]; then PID=$(cat "$PID_FILE") if ps -p $PID > /dev/null; then echo "消费者已在运行(PID: $PID)" exit 1 else # PID文件存在但进程不存在,删除PID文件 rm -f "$PID_FILE" fi fi # 启动消费者,后台运行,日志写入文件 nohup php $CONSOLE_PATH $CONTROLLER >> $LOG_FILE 2>&1 & # 记录PID echo $! > "$PID_FILE" echo "消费者启动成功(PID: $(cat $PID_FILE)),日志路径:$LOG_FILE"创建停止脚本(
/usr/local/rabbitmq-consumer/stop.sh):#!/bin/bash PID_FILE="/var/run/rabbitmq-consumer.pid" if [ ! -f "$PID_FILE" ]; then echo "消费者未运行" exit 1 fi PID=$(cat "$PID_FILE") # 停止进程 kill -9 $PID # 删除PID文件 rm -f "$PID_FILE" echo "消费者已停止(PID: $PID)"脚本授权与使用:
# 授权脚本执行权限 chmod +x /usr/local/rabbitmq-consumer/start.sh chmod +x /usr/local/rabbitmq-consumer/stop.sh # 启动消费者 /usr/local/rabbitmq-consumer/start.sh # 停止消费者(如需) /usr/local/rabbitmq-consumer/stop.sh # 查看日志 tail -f /var/log/rabbitmq-consumer.log
方案2:使用 yii2-queue 扩展时的后台运行脚本
创建启动脚本(
/usr/local/yii2-queue/start.sh):#!/bin/bash CONSOLE_PATH="/var/www/your-yii2-project/yii" COMMAND="queue/listen" LOG_FILE="/var/log/yii2-queue.log" PID_FILE="/var/run/yii2-queue.pid" # 检查是否已运行 if [ -f "$PID_FILE" ]; then PID=$(cat "$PID_FILE") if ps -p $PID > /dev/null; then echo "yii2-queue 已在运行(PID: $PID)" exit 1 else rm -f "$PID_FILE" fi fi # 启动队列监听 nohup php $CONSOLE_PATH $COMMAND >> $LOG_FILE 2>&1 & echo $! > "$PID_FILE" echo "yii2-queue 启动成功(PID: $(cat $PID_FILE)),日志路径:$LOG_FILE"停止脚本(
/usr/local/yii2-queue/stop.sh):#!/bin/bash PID_FILE="/var/run/yii2-queue.pid" if [ ! -f "$PID_FILE" ]; then echo "yii2-queue 未运行" exit 1 fi PID=$(cat "$PID_FILE") kill -9 $PID rm -f "$PID_FILE" echo "yii2-queue 已停止(PID: $PID)"授权与使用:
chmod +x /usr/local/yii2-queue/start.sh chmod +x /usr/local/yii2-queue/stop.sh # 启动 /usr/local/yii2-queue/start.sh # 查看日志 tail -f /var/log/yii2-queue.log
关键说明
- 断连重连:外层
while (true)循环处理重连,断连后休眠 5 秒重试,避免频繁占用资源。 - 后台运行:
nohup命令让程序脱离终端,即使关闭 SSH 连接也能继续运行,日志写入文件方便排查问题。 - 无需手动加休眠:
$channel->wait()是阻塞等待,无消息时不耗 CPU,比手动sleep更高效。
版权属于:Joyber
本文链接:https://blog.qqvbc.com/default/1378.html
转载时须注明出处及本声明