RabbitMQ 支持分布式部署(核心是 集群 + 镜像队列/联邦队列),默认单节点是单机,需手动配置实现分布式高可用和负载均衡。

关键分布式能力:

  • 集群:多节点共享队列元数据,实现负载分担;
  • 镜像队列:队列数据同步到多个节点,避免单点故障;
  • 联邦队列/ shovel:跨机房、跨集群同步消息,支持广域分布式。

一、RabbitMQ 分布式集群(3节点最简配置)

前提:3台Linux服务器(示例IP:192.168.1.101/102/103),已安装相同版本RabbitMQ+Erlang

核心目标:搭建“1主2从”集群,镜像队列同步数据,实现高可用

步骤1:配置主机名与免密登录(所有节点执行)

  1. 编辑 /etc/hosts,添加3节点映射:

    192.168.1.101 rabbit1
    192.168.1.102 rabbit2
    192.168.1.103 rabbit3
  2. 免密登录(让节点间可无密码通信):

    # 在rabbit1生成密钥(一路回车)
    ssh-keygen -t rsa
    # 复制密钥到另外两台节点
    ssh-copy-id root@rabbit2
    ssh-copy-id root@rabbit3

步骤2:同步Erlang Cookie(集群通信核心)

RabbitMQ集群依赖Erlang Cookie认证,所有节点必须用相同Cookie:

  1. 在rabbit1(主节点)查看Cookie:

    cat /var/lib/rabbitmq/.erlang.cookie
  2. 在rabbit2/rabbit3(从节点)覆盖Cookie(先停止RabbitMQ):

    rabbitmqctl stop_app
    echo "主节点的Cookie内容" > /var/lib/rabbitmq/.erlang.cookie
    chmod 400 /var/lib/rabbitmq/.erlang.cookie # 权限必须是400

步骤3:组建集群

  1. 启动所有节点的RabbitMQ服务:

    rabbitmq-server start -detached # 后台启动
  2. 在rabbit2/rabbit3执行(加入rabbit1集群):

    # 停止应用(保留节点)
    rabbitmqctl stop_app
    # 重置节点(首次加入需执行)
    rabbitmqctl reset
    # 加入集群(--ram 表示内存节点,--disc 表示磁盘节点,主节点默认磁盘)
    rabbitmqctl join_cluster --ram rabbit@rabbit1
    # 启动应用
    rabbitmqctl start_app
  3. 验证集群状态(任意节点执行):

    rabbitmqctl cluster_status
    # 输出包含3个节点信息,说明集群搭建成功

步骤4:配置镜像队列(数据同步,高可用)

默认集群仅共享元数据,队列数据只存主节点,需配置镜像策略让数据同步到所有节点:

  1. 执行镜像策略命令(任意节点):

    # 策略名:ha-all,匹配所有队列(^$),同步到所有节点(ha-mode=all)
    rabbitmqctl set_policy ha-all "^$" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'
  2. 验证:创建队列后,在任意节点执行 rabbitmqctl list_queues,所有节点都会显示该队列,主节点故障后,从节点自动接管。

二、Yii2 对接分布式RabbitMQ(适配集群)

方案1:直接使用 php-amqplib(手动指定集群节点)

<?php
namespace app\console\controllers;

use yii\console\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitmqClusterController extends Controller
{
    public function actionListen()
    {
        // 集群节点列表(可填所有节点,客户端自动负载均衡)
        $nodes = [
            ['host' => 'rabbit1', 'port' => 5672],
            ['host' => 'rabbit2', 'port' => 5672],
            ['host' => 'rabbit3', 'port' => 5672],
        ];

        $connection = null;
        $retryDelay = 5;

        while (true) {
            try {
                // 随机选择一个节点连接(简单负载均衡)
                $node = $nodes[array_rand($nodes)];
                $connection = new AMQPStreamConnection(
                    $node['host'],
                    $node['port'],
                    'guest', // 集群统一账号(需在主节点创建)
                    'guest'
                );

                $channel = $connection->channel();
                $channel->queue_declare('test_queue', false, true, false, false);
                $channel->basic_qos(null, 1, null);

                $callback = function ($msg) {
                    echo date('Y-m-d H:i:s') . " 集群节点接收消息:{$msg->body}\n";
                    $msg->ack();
                };

                $channel->basic_consume('test_queue', '', false, false, false, false, $callback);
                while ($channel->is_consuming()) {
                    $channel->wait();
                }

            } catch (\Exception $e) {
                echo date('Y-m-d H:i:s') . " 连接失败:{$e->getMessage()},{$retryDelay}秒后重试...\n";
                if ($connection) {
                    try {
                        $connection->close();
                    } catch (\Exception $e) {}
                }
                sleep($retryDelay);
            }
        }
    }
}

方案2:使用 yii2-queue 扩展(配置集群节点)

  1. 修改 config/main.php 队列配置:

    'components' => [
     'queue' => [
         'class' => \yii\queue\amqp\Queue::class,
         'host' => 'rabbit1', // 主节点(客户端会自动发现其他节点)
         'port' => 5672,
         'user' => 'guest',
         'password' => 'guest',
         'queueName' => 'test_queue',
         'driver' => \yii\queue\amqp\drivers\PhpAmqpLib::class,
         'options' => [
             'connection_timeout' => 3,
             'read_write_timeout' => 60,
         ],
         // 集群节点列表(可选,用于故障转移)
         'nodes' => [
             ['host' => 'rabbit2', 'port' => 5672],
             ['host' => 'rabbit3', 'port' => 5672],
         ],
     ],
    ],
  2. 生产者/消费者使用方式与单机版一致,扩展会自动处理节点故障转移。

关键说明

  1. 集群高可用:主节点故障后,从节点自动接管队列,消息不丢失(依赖镜像队列);
  2. 负载均衡:生产者/消费者连接集群时,可随机选择节点,分摊压力;
  3. 账号同步:集群中只需在主节点创建账号,会自动同步到所有从节点。

RabbitMQ分布式集群验证(3步核心验证,覆盖高可用+数据一致性)

一、基础集群状态验证(确认节点已加入)

  1. 任意节点执行命令,查看集群节点列表:

    rabbitmqctl cluster_status
  2. 正常输出:包含 rabbit@rabbit1rabbit@rabbit2rabbit@rabbit3 三个节点,且 running_nodes 均在线。
  3. 浏览器访问 RabbitMQ 管理界面(主节点IP:15672),左侧「Admin → Cluster」,可见3个节点状态均为「running」。

二、镜像队列数据同步验证(确认数据不丢失)

  1. 生产者发消息:用Yii2代码或命令行向 test_queue 发送10条测试消息:

    # 命令行示例(需安装rabbitmqadmin工具)
    rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="test_msg_1"
    # 循环发送10条(Linux)
    for i in {1..10}; do rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="test_msg_$i"; done
  2. 从节点查队列:在 rabbit2/rabbit3 执行命令,确认能看到消息数:

    rabbitmqctl list_queues name messages_ready
    # 输出应包含 test_queue  10(消息数一致)

三、高可用故障转移验证(确认主节点挂掉仍可用)

  1. 手动停掉主节点(rabbit1):

    rabbitmqctl stop_app # 仅停止应用,节点未退出(模拟故障)
    # 或彻底停止服务:rabbitmq-server stop
  2. 验证从节点接管:

    • 在 rabbit2 执行 rabbitmqctl list_queuestest_queue 仍存在,消息数不变;
    • 用Yii2消费者在 rabbit2 监听,能正常接收之前发送的10条消息;
    • 新发送消息到集群,从节点能正常接收并处理。
  3. 恢复主节点:重启 rabbit1 后,执行 rabbitmqctl start_app,节点会自动重新加入集群,同步最新数据。

关键判断标准

  • 集群状态:故障后仍有节点在线,队列不消失;
  • 消息完整性:故障前后发送的消息无丢失、不重复;
  • 服务连续性:消费者无需修改配置,能正常连接存活节点消费。

随机连接这里有问题,要是随机到的那个节点停止服务了,会导致连接不上。

核心解决方案:用 集群地址列表+重连机制 替代单点随机,失败自动切换存活节点,不占用CPU。

关键实现(PHP示例,适配RabbitMQ)

// 1. 配置集群所有节点地址(而非单个随机节点)
$clusterNodes = [
    'amqp://user:pass@node1:5672',
    'amqp://user:pass@node2:5672',
    'amqp://user:pass@node3:5672'
];

// 2. 循环重试连接,失败自动换节点
$connection = null;
$maxRetries = 5; // 最大重试次数
$retryDelay = 2; // 重试间隔(秒,阻塞不占CPU)

for ($i = 0; $i < $maxRetries; $i++) {
    $randomNode = $clusterNodes[array_rand($clusterNodes)]; // 随机选一个节点尝试
    try {
        $connection = new AMQPConnection($randomNode);
        $connection->connect();
        break; // 连接成功退出循环
    } catch (AMQPConnectionException $e) {
        sleep($retryDelay); // 阻塞等待,不耗CPU
        continue; // 重试下一个节点
    }
}

核心保障(避免连接失败)

  1. 不单独随机单点,而是随机+集群列表重试,某节点挂了自动切其他存活节点;
  2. sleep() 实现阻塞等待,期间不占用CPU资源;
  3. 配合RabbitMQ集群高可用(镜像队列),即使连接的节点切换,消息也不丢失。

需要我帮你整合 Yii2框架的完整连接代码,或添加 连接失败日志告警 功能吗?

标签: none

添加新评论