一、Yii2 消费者断连自动重连(增强版代码)

核心优化:网络波动/ RabbitMQ 重启后,自动重连队列,无需手动重启程序,兼容直接使用 php-amqplib 和 yii2-queue 扩展两种场景。

方案1:直接使用 php-amqplib(独立脚本,console 控制器)

<?php
namespace app\console\controllers;

use yii\console\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPChannelClosedException;

class RabbitmqConsumerController extends Controller
{
    // RabbitMQ 配置
    private $config = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queue' => 'test_queue',
        'retry_delay' => 5, // 断连后重试间隔(秒)
    ];

    public function actionListen()
    {
        echo "消费者启动,开始监听队列({$this->config['queue']})...\n";
        echo "断连会自动重试,按 Ctrl+C 退出\n";

        while (true) { // 外层重连循环
            $connection = null;
            $channel = null;

            try {
                // 1. 建立连接
                $connection = new AMQPStreamConnection(
                    $this->config['host'],
                    $this->config['port'],
                    $this->config['user'],
                    $this->config['password']
                );
                $channel = $connection->channel();

                // 2. 声明队列(与生产者一致,持久化)
                $channel->queue_declare(
                    $this->config['queue'],
                    false, // passive
                    true,  // durable
                    false, // exclusive
                    false  // auto_delete
                );

                // 3. 配置 QoS(每次推1条,防止过载)
                $channel->basic_qos(null, 1, null);

                // 4. 消息处理回调
                $callback = function ($msg) {
                    try {
                        // 业务逻辑:处理消息(替换成你的实际需求)
                        echo date('Y-m-d H:i:s') . " 收到消息:{$msg->body}\n";
                        // 模拟处理耗时(可选)
                        // sleep(1);
                        $msg->ack(); // 处理成功,手动确认
                    } catch (\Exception $e) {
                        echo date('Y-m-d H:i:s') . " 消息处理失败:{$e->getMessage()}\n";
                        $msg->nack(false, false); // 处理失败,不重新入队(可根据需求调整为 true 重新入队)
                    }
                };

                // 5. 注册订阅,开始监听
                $channel->basic_consume(
                    $this->config['queue'],
                    '', // consumer_tag
                    false, // no_local
                    false, // no_ack(关闭自动确认)
                    false, // exclusive
                    false, // nowait
                    $callback
                );

                // 6. 阻塞监听(有消息触发回调,无消息阻塞,不耗CPU)
                while ($channel->is_consuming()) {
                    $channel->wait();
                }

            } catch (AMQPConnectionClosedException | AMQPChannelClosedException $e) {
                // 连接/通道断开,触发重连
                echo date('Y-m-d H:i:s') . " 连接断开:{$e->getMessage()},{$this->config['retry_delay']}秒后重试...\n";
            } catch (\Exception $e) {
                // 其他异常
                echo date('Y-m-d H:i:s') . " 未知错误:{$e->getMessage()},{$this->config['retry_delay']}秒后重试...\n";
            } finally {
                // 关闭连接(防止资源泄露)
                if ($channel) {
                    try {
                        $channel->close();
                    } catch (\Exception $e) {}
                }
                if ($connection) {
                    try {
                        $connection->close();
                    } catch (\Exception $e) {}
                }

                // 重试间隔(断连后休眠,避免频繁重试)
                sleep($this->config['retry_delay']);
            }
        }
    }
}

方案2:基于 yii2-queue 扩展(简化版,利用框架封装)

yii2-queue 已内置基础重连机制,只需配置重试参数,配合后台运行即可:

  1. 修改 config/main.php 队列配置:

    'components' => [
     'queue' => [
         'class' => \yii\queue\amqp\Queue::class,
         'host' => 'localhost',
         'port' => 5672,
         'user' => 'guest',
         'password' => 'guest',
         'queueName' => 'test_queue',
         'driver' => \yii\queue\amqp\ drivers\PhpAmqpLib::class,
         'attempts' => 3, // 消息处理失败重试次数
         'retryDelay' => 5, // 重试间隔(秒)
     ],
    ],
  2. 消息处理类(app\jobs\TestJob.php):

    <?php
    namespace app\jobs;
    
    use yii\queue\JobInterface;
    use yii\base\BaseObject;
    
    class TestJob extends BaseObject implements JobInterface
    {
     public $message; // 接收的消息内容
    
     public function execute($queue)
     {
         // 业务逻辑:处理消息
         echo date('Y-m-d H:i:s') . " 处理消息:{$this->message}\n";
     }
    }
  3. 生产者发送消息(控制器中):

    Yii::$app->queue->push(new \app\jobs\TestJob([
     'message' => 'Yii2-queue 发送的消息',
    ]));

二、Linux 后台运行完整脚本(确保长期稳定)

方案1:直接使用 php-amqplib 时的后台运行脚本

  1. 创建启动脚本(/usr/local/rabbitmq-consumer/start.sh):

    #!/bin/bash
    # 定义变量
    CONSOLE_PATH="/var/www/your-yii2-project/yii" # Yii2 控制台入口文件路径
    CONTROLLER="rabbitmq-consumer/listen" # 控制器+动作
    LOG_FILE="/var/log/rabbitmq-consumer.log" # 日志文件路径
    PID_FILE="/var/run/rabbitmq-consumer.pid" # PID文件(用于停止进程)
    
    # 检查是否已运行
    if [ -f "$PID_FILE" ]; then
     PID=$(cat "$PID_FILE")
     if ps -p $PID > /dev/null; then
         echo "消费者已在运行(PID: $PID)"
         exit 1
     else
         # PID文件存在但进程不存在,删除PID文件
         rm -f "$PID_FILE"
     fi
    fi
    
    # 启动消费者,后台运行,日志写入文件
    nohup php $CONSOLE_PATH $CONTROLLER >> $LOG_FILE 2>&1 &
    # 记录PID
    echo $! > "$PID_FILE"
    echo "消费者启动成功(PID: $(cat $PID_FILE)),日志路径:$LOG_FILE"
  2. 创建停止脚本(/usr/local/rabbitmq-consumer/stop.sh):

    #!/bin/bash
    PID_FILE="/var/run/rabbitmq-consumer.pid"
    
    if [ ! -f "$PID_FILE" ]; then
     echo "消费者未运行"
     exit 1
    fi
    
    PID=$(cat "$PID_FILE")
    # 停止进程
    kill -9 $PID
    # 删除PID文件
    rm -f "$PID_FILE"
    echo "消费者已停止(PID: $PID)"
  3. 脚本授权与使用:

    # 授权脚本执行权限
    chmod +x /usr/local/rabbitmq-consumer/start.sh
    chmod +x /usr/local/rabbitmq-consumer/stop.sh
    
    # 启动消费者
    /usr/local/rabbitmq-consumer/start.sh
    
    # 停止消费者(如需)
    /usr/local/rabbitmq-consumer/stop.sh
    
    # 查看日志
    tail -f /var/log/rabbitmq-consumer.log

方案2:使用 yii2-queue 扩展时的后台运行脚本

  1. 创建启动脚本(/usr/local/yii2-queue/start.sh):

    #!/bin/bash
    CONSOLE_PATH="/var/www/your-yii2-project/yii"
    COMMAND="queue/listen"
    LOG_FILE="/var/log/yii2-queue.log"
    PID_FILE="/var/run/yii2-queue.pid"
    
    # 检查是否已运行
    if [ -f "$PID_FILE" ]; then
     PID=$(cat "$PID_FILE")
     if ps -p $PID > /dev/null; then
         echo "yii2-queue 已在运行(PID: $PID)"
         exit 1
     else
         rm -f "$PID_FILE"
     fi
    fi
    
    # 启动队列监听
    nohup php $CONSOLE_PATH $COMMAND >> $LOG_FILE 2>&1 &
    echo $! > "$PID_FILE"
    echo "yii2-queue 启动成功(PID: $(cat $PID_FILE)),日志路径:$LOG_FILE"
  2. 停止脚本(/usr/local/yii2-queue/stop.sh):

    #!/bin/bash
    PID_FILE="/var/run/yii2-queue.pid"
    
    if [ ! -f "$PID_FILE" ]; then
     echo "yii2-queue 未运行"
     exit 1
    fi
    
    PID=$(cat "$PID_FILE")
    kill -9 $PID
    rm -f "$PID_FILE"
    echo "yii2-queue 已停止(PID: $PID)"
  3. 授权与使用:

    chmod +x /usr/local/yii2-queue/start.sh
    chmod +x /usr/local/yii2-queue/stop.sh
    
    # 启动
    /usr/local/yii2-queue/start.sh
    
    # 查看日志
    tail -f /var/log/yii2-queue.log

关键说明

  1. 断连重连:外层 while (true) 循环处理重连,断连后休眠 5 秒重试,避免频繁占用资源。
  2. 后台运行:nohup 命令让程序脱离终端,即使关闭 SSH 连接也能继续运行,日志写入文件方便排查问题。
  3. 无需手动加休眠:$channel->wait() 是阻塞等待,无消息时不耗 CPU,比手动 sleep 更高效。

标签: none

添加新评论