RabbitMQ是分布式部署方法,以及PHP或者yii2-queue扩展适配集群
RabbitMQ 支持分布式部署(核心是 集群 + 镜像队列/联邦队列),默认单节点是单机,需手动配置实现分布式高可用和负载均衡。
关键分布式能力:
- 集群:多节点共享队列元数据,实现负载分担;
- 镜像队列:队列数据同步到多个节点,避免单点故障;
- 联邦队列/ shovel:跨机房、跨集群同步消息,支持广域分布式。
一、RabbitMQ 分布式集群(3节点最简配置)
前提:3台Linux服务器(示例IP:192.168.1.101/102/103),已安装相同版本RabbitMQ+Erlang
核心目标:搭建“1主2从”集群,镜像队列同步数据,实现高可用
步骤1:配置主机名与免密登录(所有节点执行)
编辑
/etc/hosts,添加3节点映射:192.168.1.101 rabbit1 192.168.1.102 rabbit2 192.168.1.103 rabbit3免密登录(让节点间可无密码通信):
# 在rabbit1生成密钥(一路回车) ssh-keygen -t rsa # 复制密钥到另外两台节点 ssh-copy-id root@rabbit2 ssh-copy-id root@rabbit3
步骤2:同步Erlang Cookie(集群通信核心)
RabbitMQ集群依赖Erlang Cookie认证,所有节点必须用相同Cookie:
在rabbit1(主节点)查看Cookie:
cat /var/lib/rabbitmq/.erlang.cookie在rabbit2/rabbit3(从节点)覆盖Cookie(先停止RabbitMQ):
rabbitmqctl stop_app echo "主节点的Cookie内容" > /var/lib/rabbitmq/.erlang.cookie chmod 400 /var/lib/rabbitmq/.erlang.cookie # 权限必须是400
步骤3:组建集群
启动所有节点的RabbitMQ服务:
rabbitmq-server start -detached # 后台启动在rabbit2/rabbit3执行(加入rabbit1集群):
# 停止应用(保留节点) rabbitmqctl stop_app # 重置节点(首次加入需执行) rabbitmqctl reset # 加入集群(--ram 表示内存节点,--disc 表示磁盘节点,主节点默认磁盘) rabbitmqctl join_cluster --ram rabbit@rabbit1 # 启动应用 rabbitmqctl start_app验证集群状态(任意节点执行):
rabbitmqctl cluster_status # 输出包含3个节点信息,说明集群搭建成功
步骤4:配置镜像队列(数据同步,高可用)
默认集群仅共享元数据,队列数据只存主节点,需配置镜像策略让数据同步到所有节点:
执行镜像策略命令(任意节点):
# 策略名:ha-all,匹配所有队列(^$),同步到所有节点(ha-mode=all) rabbitmqctl set_policy ha-all "^$" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'- 验证:创建队列后,在任意节点执行
rabbitmqctl list_queues,所有节点都会显示该队列,主节点故障后,从节点自动接管。
二、Yii2 对接分布式RabbitMQ(适配集群)
方案1:直接使用 php-amqplib(手动指定集群节点)
<?php
namespace app\console\controllers;
use yii\console\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class RabbitmqClusterController extends Controller
{
public function actionListen()
{
// 集群节点列表(可填所有节点,客户端自动负载均衡)
$nodes = [
['host' => 'rabbit1', 'port' => 5672],
['host' => 'rabbit2', 'port' => 5672],
['host' => 'rabbit3', 'port' => 5672],
];
$connection = null;
$retryDelay = 5;
while (true) {
try {
// 随机选择一个节点连接(简单负载均衡)
$node = $nodes[array_rand($nodes)];
$connection = new AMQPStreamConnection(
$node['host'],
$node['port'],
'guest', // 集群统一账号(需在主节点创建)
'guest'
);
$channel = $connection->channel();
$channel->queue_declare('test_queue', false, true, false, false);
$channel->basic_qos(null, 1, null);
$callback = function ($msg) {
echo date('Y-m-d H:i:s') . " 集群节点接收消息:{$msg->body}\n";
$msg->ack();
};
$channel->basic_consume('test_queue', '', false, false, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
} catch (\Exception $e) {
echo date('Y-m-d H:i:s') . " 连接失败:{$e->getMessage()},{$retryDelay}秒后重试...\n";
if ($connection) {
try {
$connection->close();
} catch (\Exception $e) {}
}
sleep($retryDelay);
}
}
}
}方案2:使用 yii2-queue 扩展(配置集群节点)
修改
config/main.php队列配置:'components' => [ 'queue' => [ 'class' => \yii\queue\amqp\Queue::class, 'host' => 'rabbit1', // 主节点(客户端会自动发现其他节点) 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'queueName' => 'test_queue', 'driver' => \yii\queue\amqp\drivers\PhpAmqpLib::class, 'options' => [ 'connection_timeout' => 3, 'read_write_timeout' => 60, ], // 集群节点列表(可选,用于故障转移) 'nodes' => [ ['host' => 'rabbit2', 'port' => 5672], ['host' => 'rabbit3', 'port' => 5672], ], ], ],- 生产者/消费者使用方式与单机版一致,扩展会自动处理节点故障转移。
关键说明
- 集群高可用:主节点故障后,从节点自动接管队列,消息不丢失(依赖镜像队列);
- 负载均衡:生产者/消费者连接集群时,可随机选择节点,分摊压力;
- 账号同步:集群中只需在主节点创建账号,会自动同步到所有从节点。
RabbitMQ分布式集群验证(3步核心验证,覆盖高可用+数据一致性)
一、基础集群状态验证(确认节点已加入)
任意节点执行命令,查看集群节点列表:
rabbitmqctl cluster_status- 正常输出:包含
rabbit@rabbit1、rabbit@rabbit2、rabbit@rabbit3三个节点,且running_nodes均在线。 - 浏览器访问 RabbitMQ 管理界面(主节点IP:15672),左侧「Admin → Cluster」,可见3个节点状态均为「running」。
二、镜像队列数据同步验证(确认数据不丢失)
生产者发消息:用Yii2代码或命令行向
test_queue发送10条测试消息:# 命令行示例(需安装rabbitmqadmin工具) rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="test_msg_1" # 循环发送10条(Linux) for i in {1..10}; do rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="test_msg_$i"; done从节点查队列:在 rabbit2/rabbit3 执行命令,确认能看到消息数:
rabbitmqctl list_queues name messages_ready # 输出应包含 test_queue 10(消息数一致)
三、高可用故障转移验证(确认主节点挂掉仍可用)
手动停掉主节点(rabbit1):
rabbitmqctl stop_app # 仅停止应用,节点未退出(模拟故障) # 或彻底停止服务:rabbitmq-server stop验证从节点接管:
- 在 rabbit2 执行
rabbitmqctl list_queues,test_queue仍存在,消息数不变; - 用Yii2消费者在 rabbit2 监听,能正常接收之前发送的10条消息;
- 新发送消息到集群,从节点能正常接收并处理。
- 在 rabbit2 执行
- 恢复主节点:重启 rabbit1 后,执行
rabbitmqctl start_app,节点会自动重新加入集群,同步最新数据。
关键判断标准
- 集群状态:故障后仍有节点在线,队列不消失;
- 消息完整性:故障前后发送的消息无丢失、不重复;
- 服务连续性:消费者无需修改配置,能正常连接存活节点消费。
随机连接这里有问题,要是随机到的那个节点停止服务了,会导致连接不上。
核心解决方案:用 集群地址列表+重连机制 替代单点随机,失败自动切换存活节点,不占用CPU。
关键实现(PHP示例,适配RabbitMQ)
// 1. 配置集群所有节点地址(而非单个随机节点)
$clusterNodes = [
'amqp://user:pass@node1:5672',
'amqp://user:pass@node2:5672',
'amqp://user:pass@node3:5672'
];
// 2. 循环重试连接,失败自动换节点
$connection = null;
$maxRetries = 5; // 最大重试次数
$retryDelay = 2; // 重试间隔(秒,阻塞不占CPU)
for ($i = 0; $i < $maxRetries; $i++) {
$randomNode = $clusterNodes[array_rand($clusterNodes)]; // 随机选一个节点尝试
try {
$connection = new AMQPConnection($randomNode);
$connection->connect();
break; // 连接成功退出循环
} catch (AMQPConnectionException $e) {
sleep($retryDelay); // 阻塞等待,不耗CPU
continue; // 重试下一个节点
}
}核心保障(避免连接失败)
- 不单独随机单点,而是随机+集群列表重试,某节点挂了自动切其他存活节点;
sleep()实现阻塞等待,期间不占用CPU资源;- 配合RabbitMQ集群高可用(镜像队列),即使连接的节点切换,消息也不丢失。
需要我帮你整合 Yii2框架的完整连接代码,或添加 连接失败日志告警 功能吗?