RabbitMQ 支持分布式部署(核心是 集群 + 镜像队列/联邦队列),默认单节点是单机,需手动配置实现分布式高可用和负载均衡。

关键分布式能力:

  • 集群:多节点共享队列元数据,实现负载分担;
  • 镜像队列:队列数据同步到多个节点,避免单点故障;
  • 联邦队列/ shovel:跨机房、跨集群同步消息,支持广域分布式。

一、RabbitMQ 分布式集群(3节点最简配置)

前提:3台Linux服务器(示例IP:192.168.1.101/102/103),已安装相同版本RabbitMQ+Erlang

核心目标:搭建“1主2从”集群,镜像队列同步数据,实现高可用

步骤1:配置主机名与免密登录(所有节点执行)

  1. 编辑 /etc/hosts,添加3节点映射:

    192.168.1.101 rabbit1
    192.168.1.102 rabbit2
    192.168.1.103 rabbit3
  2. 免密登录(让节点间可无密码通信):

    # 在rabbit1生成密钥(一路回车)
    ssh-keygen -t rsa
    # 复制密钥到另外两台节点
    ssh-copy-id root@rabbit2
    ssh-copy-id root@rabbit3

步骤2:同步Erlang Cookie(集群通信核心)

RabbitMQ集群依赖Erlang Cookie认证,所有节点必须用相同Cookie:

  1. 在rabbit1(主节点)查看Cookie:

    cat /var/lib/rabbitmq/.erlang.cookie
  2. 在rabbit2/rabbit3(从节点)覆盖Cookie(先停止RabbitMQ):

    rabbitmqctl stop_app
    echo "主节点的Cookie内容" > /var/lib/rabbitmq/.erlang.cookie
    chmod 400 /var/lib/rabbitmq/.erlang.cookie # 权限必须是400

步骤3:组建集群

  1. 启动所有节点的RabbitMQ服务:

    rabbitmq-server start -detached # 后台启动
  2. 在rabbit2/rabbit3执行(加入rabbit1集群):

    # 停止应用(保留节点)
    rabbitmqctl stop_app
    # 重置节点(首次加入需执行)
    rabbitmqctl reset
    # 加入集群(--ram 表示内存节点,--disc 表示磁盘节点,主节点默认磁盘)
    rabbitmqctl join_cluster --ram rabbit@rabbit1
    # 启动应用
    rabbitmqctl start_app
  3. 验证集群状态(任意节点执行):

    rabbitmqctl cluster_status
    # 输出包含3个节点信息,说明集群搭建成功

步骤4:配置镜像队列(数据同步,高可用)

默认集群仅共享元数据,队列数据只存主节点,需配置镜像策略让数据同步到所有节点:

  1. 执行镜像策略命令(任意节点):

    # 策略名:ha-all,匹配所有队列(^$),同步到所有节点(ha-mode=all)
    rabbitmqctl set_policy ha-all "^$" '{"ha-mode":"all", "ha-sync-mode":"automatic"}'
  2. 验证:创建队列后,在任意节点执行 rabbitmqctl list_queues,所有节点都会显示该队列,主节点故障后,从节点自动接管。

二、Yii2 对接分布式RabbitMQ(适配集群)

方案1:直接使用 php-amqplib(手动指定集群节点)

<?php
namespace app\console\controllers;

use yii\console\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitmqClusterController extends Controller
{
    public function actionListen()
    {
        // 集群节点列表(可填所有节点,客户端自动负载均衡)
        $nodes = [
            ['host' => 'rabbit1', 'port' => 5672],
            ['host' => 'rabbit2', 'port' => 5672],
            ['host' => 'rabbit3', 'port' => 5672],
        ];

        $connection = null;
        $retryDelay = 5;

        while (true) {
            try {
                // 随机选择一个节点连接(简单负载均衡)
                $node = $nodes[array_rand($nodes)];
                $connection = new AMQPStreamConnection(
                    $node['host'],
                    $node['port'],
                    'guest', // 集群统一账号(需在主节点创建)
                    'guest'
                );

                $channel = $connection->channel();
                $channel->queue_declare('test_queue', false, true, false, false);
                $channel->basic_qos(null, 1, null);

                $callback = function ($msg) {
                    echo date('Y-m-d H:i:s') . " 集群节点接收消息:{$msg->body}\n";
                    $msg->ack();
                };

                $channel->basic_consume('test_queue', '', false, false, false, false, $callback);
                while ($channel->is_consuming()) {
                    $channel->wait();
                }

            } catch (\Exception $e) {
                echo date('Y-m-d H:i:s') . " 连接失败:{$e->getMessage()},{$retryDelay}秒后重试...\n";
                if ($connection) {
                    try {
                        $connection->close();
                    } catch (\Exception $e) {}
                }
                sleep($retryDelay);
            }
        }
    }
}

方案2:使用 yii2-queue 扩展(配置集群节点)

  1. 修改 config/main.php 队列配置:

    'components' => [
     'queue' => [
         'class' => \yii\queue\amqp\Queue::class,
         'host' => 'rabbit1', // 主节点(客户端会自动发现其他节点)
         'port' => 5672,
         'user' => 'guest',
         'password' => 'guest',
         'queueName' => 'test_queue',
         'driver' => \yii\queue\amqp\drivers\PhpAmqpLib::class,
         'options' => [
             'connection_timeout' => 3,
             'read_write_timeout' => 60,
         ],
         // 集群节点列表(可选,用于故障转移)
         'nodes' => [
             ['host' => 'rabbit2', 'port' => 5672],
             ['host' => 'rabbit3', 'port' => 5672],
         ],
     ],
    ],
  2. 生产者/消费者使用方式与单机版一致,扩展会自动处理节点故障转移。

关键说明

  1. 集群高可用:主节点故障后,从节点自动接管队列,消息不丢失(依赖镜像队列);
  2. 负载均衡:生产者/消费者连接集群时,可随机选择节点,分摊压力;
  3. 账号同步:集群中只需在主节点创建账号,会自动同步到所有从节点。

RabbitMQ分布式集群验证(3步核心验证,覆盖高可用+数据一致性)

一、基础集群状态验证(确认节点已加入)

  1. 任意节点执行命令,查看集群节点列表:

    rabbitmqctl cluster_status
  2. 正常输出:包含 rabbit@rabbit1rabbit@rabbit2rabbit@rabbit3 三个节点,且 running_nodes 均在线。
  3. 浏览器访问 RabbitMQ 管理界面(主节点IP:15672),左侧「Admin → Cluster」,可见3个节点状态均为「running」。

二、镜像队列数据同步验证(确认数据不丢失)

  1. 生产者发消息:用Yii2代码或命令行向 test_queue 发送10条测试消息:

    # 命令行示例(需安装rabbitmqadmin工具)
    rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="test_msg_1"
    # 循环发送10条(Linux)
    for i in {1..10}; do rabbitmqadmin publish exchange=amq.default routing_key=test_queue payload="test_msg_$i"; done
  2. 从节点查队列:在 rabbit2/rabbit3 执行命令,确认能看到消息数:

    rabbitmqctl list_queues name messages_ready
    # 输出应包含 test_queue  10(消息数一致)

三、高可用故障转移验证(确认主节点挂掉仍可用)

  1. 手动停掉主节点(rabbit1):

    rabbitmqctl stop_app # 仅停止应用,节点未退出(模拟故障)
    # 或彻底停止服务:rabbitmq-server stop
  2. 验证从节点接管:

    • 在 rabbit2 执行 rabbitmqctl list_queuestest_queue 仍存在,消息数不变;
    • 用Yii2消费者在 rabbit2 监听,能正常接收之前发送的10条消息;
    • 新发送消息到集群,从节点能正常接收并处理。
  3. 恢复主节点:重启 rabbit1 后,执行 rabbitmqctl start_app,节点会自动重新加入集群,同步最新数据。

关键判断标准

  • 集群状态:故障后仍有节点在线,队列不消失;
  • 消息完整性:故障前后发送的消息无丢失、不重复;
  • 服务连续性:消费者无需修改配置,能正常连接存活节点消费。

随机连接这里有问题,要是随机到的那个节点停止服务了,会导致连接不上。

核心解决方案:用 集群地址列表+重连机制 替代单点随机,失败自动切换存活节点,不占用CPU。

关键实现(PHP示例,适配RabbitMQ)

// 1. 配置集群所有节点地址(而非单个随机节点)
$clusterNodes = [
    'amqp://user:pass@node1:5672',
    'amqp://user:pass@node2:5672',
    'amqp://user:pass@node3:5672'
];

// 2. 循环重试连接,失败自动换节点
$connection = null;
$maxRetries = 5; // 最大重试次数
$retryDelay = 2; // 重试间隔(秒,阻塞不占CPU)

for ($i = 0; $i < $maxRetries; $i++) {
    $randomNode = $clusterNodes[array_rand($clusterNodes)]; // 随机选一个节点尝试
    try {
        $connection = new AMQPConnection($randomNode);
        $connection->connect();
        break; // 连接成功退出循环
    } catch (AMQPConnectionException $e) {
        sleep($retryDelay); // 阻塞等待,不耗CPU
        continue; // 重试下一个节点
    }
}

核心保障(避免连接失败)

  1. 不单独随机单点,而是随机+集群列表重试,某节点挂了自动切其他存活节点;
  2. sleep() 实现阻塞等待,期间不占用CPU资源;
  3. 配合RabbitMQ集群高可用(镜像队列),即使连接的节点切换,消息也不丢失。

需要我帮你整合 Yii2框架的完整连接代码,或添加 连接失败日志告警 功能吗?

一、Yii2 消费者断连自动重连(增强版代码)

核心优化:网络波动/ RabbitMQ 重启后,自动重连队列,无需手动重启程序,兼容直接使用 php-amqplib 和 yii2-queue 扩展两种场景。

方案1:直接使用 php-amqplib(独立脚本,console 控制器)

<?php
namespace app\console\controllers;

use yii\console\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPChannelClosedException;

class RabbitmqConsumerController extends Controller
{
    // RabbitMQ 配置
    private $config = [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'queue' => 'test_queue',
        'retry_delay' => 5, // 断连后重试间隔(秒)
    ];

    public function actionListen()
    {
        echo "消费者启动,开始监听队列({$this->config['queue']})...\n";
        echo "断连会自动重试,按 Ctrl+C 退出\n";

        while (true) { // 外层重连循环
            $connection = null;
            $channel = null;

            try {
                // 1. 建立连接
                $connection = new AMQPStreamConnection(
                    $this->config['host'],
                    $this->config['port'],
                    $this->config['user'],
                    $this->config['password']
                );
                $channel = $connection->channel();

                // 2. 声明队列(与生产者一致,持久化)
                $channel->queue_declare(
                    $this->config['queue'],
                    false, // passive
                    true,  // durable
                    false, // exclusive
                    false  // auto_delete
                );

                // 3. 配置 QoS(每次推1条,防止过载)
                $channel->basic_qos(null, 1, null);

                // 4. 消息处理回调
                $callback = function ($msg) {
                    try {
                        // 业务逻辑:处理消息(替换成你的实际需求)
                        echo date('Y-m-d H:i:s') . " 收到消息:{$msg->body}\n";
                        // 模拟处理耗时(可选)
                        // sleep(1);
                        $msg->ack(); // 处理成功,手动确认
                    } catch (\Exception $e) {
                        echo date('Y-m-d H:i:s') . " 消息处理失败:{$e->getMessage()}\n";
                        $msg->nack(false, false); // 处理失败,不重新入队(可根据需求调整为 true 重新入队)
                    }
                };

                // 5. 注册订阅,开始监听
                $channel->basic_consume(
                    $this->config['queue'],
                    '', // consumer_tag
                    false, // no_local
                    false, // no_ack(关闭自动确认)
                    false, // exclusive
                    false, // nowait
                    $callback
                );

                // 6. 阻塞监听(有消息触发回调,无消息阻塞,不耗CPU)
                while ($channel->is_consuming()) {
                    $channel->wait();
                }

            } catch (AMQPConnectionClosedException | AMQPChannelClosedException $e) {
                // 连接/通道断开,触发重连
                echo date('Y-m-d H:i:s') . " 连接断开:{$e->getMessage()},{$this->config['retry_delay']}秒后重试...\n";
            } catch (\Exception $e) {
                // 其他异常
                echo date('Y-m-d H:i:s') . " 未知错误:{$e->getMessage()},{$this->config['retry_delay']}秒后重试...\n";
            } finally {
                // 关闭连接(防止资源泄露)
                if ($channel) {
                    try {
                        $channel->close();
                    } catch (\Exception $e) {}
                }
                if ($connection) {
                    try {
                        $connection->close();
                    } catch (\Exception $e) {}
                }

                // 重试间隔(断连后休眠,避免频繁重试)
                sleep($this->config['retry_delay']);
            }
        }
    }
}

方案2:基于 yii2-queue 扩展(简化版,利用框架封装)

yii2-queue 已内置基础重连机制,只需配置重试参数,配合后台运行即可:

  1. 修改 config/main.php 队列配置:

    'components' => [
     'queue' => [
         'class' => \yii\queue\amqp\Queue::class,
         'host' => 'localhost',
         'port' => 5672,
         'user' => 'guest',
         'password' => 'guest',
         'queueName' => 'test_queue',
         'driver' => \yii\queue\amqp\ drivers\PhpAmqpLib::class,
         'attempts' => 3, // 消息处理失败重试次数
         'retryDelay' => 5, // 重试间隔(秒)
     ],
    ],
  2. 消息处理类(app\jobs\TestJob.php):

    <?php
    namespace app\jobs;
    
    use yii\queue\JobInterface;
    use yii\base\BaseObject;
    
    class TestJob extends BaseObject implements JobInterface
    {
     public $message; // 接收的消息内容
    
     public function execute($queue)
     {
         // 业务逻辑:处理消息
         echo date('Y-m-d H:i:s') . " 处理消息:{$this->message}\n";
     }
    }
  3. 生产者发送消息(控制器中):

    Yii::$app->queue->push(new \app\jobs\TestJob([
     'message' => 'Yii2-queue 发送的消息',
    ]));

二、Linux 后台运行完整脚本(确保长期稳定)

方案1:直接使用 php-amqplib 时的后台运行脚本

  1. 创建启动脚本(/usr/local/rabbitmq-consumer/start.sh):

    #!/bin/bash
    # 定义变量
    CONSOLE_PATH="/var/www/your-yii2-project/yii" # Yii2 控制台入口文件路径
    CONTROLLER="rabbitmq-consumer/listen" # 控制器+动作
    LOG_FILE="/var/log/rabbitmq-consumer.log" # 日志文件路径
    PID_FILE="/var/run/rabbitmq-consumer.pid" # PID文件(用于停止进程)
    
    # 检查是否已运行
    if [ -f "$PID_FILE" ]; then
     PID=$(cat "$PID_FILE")
     if ps -p $PID > /dev/null; then
         echo "消费者已在运行(PID: $PID)"
         exit 1
     else
         # PID文件存在但进程不存在,删除PID文件
         rm -f "$PID_FILE"
     fi
    fi
    
    # 启动消费者,后台运行,日志写入文件
    nohup php $CONSOLE_PATH $CONTROLLER >> $LOG_FILE 2>&1 &
    # 记录PID
    echo $! > "$PID_FILE"
    echo "消费者启动成功(PID: $(cat $PID_FILE)),日志路径:$LOG_FILE"
  2. 创建停止脚本(/usr/local/rabbitmq-consumer/stop.sh):

    #!/bin/bash
    PID_FILE="/var/run/rabbitmq-consumer.pid"
    
    if [ ! -f "$PID_FILE" ]; then
     echo "消费者未运行"
     exit 1
    fi
    
    PID=$(cat "$PID_FILE")
    # 停止进程
    kill -9 $PID
    # 删除PID文件
    rm -f "$PID_FILE"
    echo "消费者已停止(PID: $PID)"
  3. 脚本授权与使用:

    # 授权脚本执行权限
    chmod +x /usr/local/rabbitmq-consumer/start.sh
    chmod +x /usr/local/rabbitmq-consumer/stop.sh
    
    # 启动消费者
    /usr/local/rabbitmq-consumer/start.sh
    
    # 停止消费者(如需)
    /usr/local/rabbitmq-consumer/stop.sh
    
    # 查看日志
    tail -f /var/log/rabbitmq-consumer.log

方案2:使用 yii2-queue 扩展时的后台运行脚本

  1. 创建启动脚本(/usr/local/yii2-queue/start.sh):

    #!/bin/bash
    CONSOLE_PATH="/var/www/your-yii2-project/yii"
    COMMAND="queue/listen"
    LOG_FILE="/var/log/yii2-queue.log"
    PID_FILE="/var/run/yii2-queue.pid"
    
    # 检查是否已运行
    if [ -f "$PID_FILE" ]; then
     PID=$(cat "$PID_FILE")
     if ps -p $PID > /dev/null; then
         echo "yii2-queue 已在运行(PID: $PID)"
         exit 1
     else
         rm -f "$PID_FILE"
     fi
    fi
    
    # 启动队列监听
    nohup php $CONSOLE_PATH $COMMAND >> $LOG_FILE 2>&1 &
    echo $! > "$PID_FILE"
    echo "yii2-queue 启动成功(PID: $(cat $PID_FILE)),日志路径:$LOG_FILE"
  2. 停止脚本(/usr/local/yii2-queue/stop.sh):

    #!/bin/bash
    PID_FILE="/var/run/yii2-queue.pid"
    
    if [ ! -f "$PID_FILE" ]; then
     echo "yii2-queue 未运行"
     exit 1
    fi
    
    PID=$(cat "$PID_FILE")
    kill -9 $PID
    rm -f "$PID_FILE"
    echo "yii2-queue 已停止(PID: $PID)"
  3. 授权与使用:

    chmod +x /usr/local/yii2-queue/start.sh
    chmod +x /usr/local/yii2-queue/stop.sh
    
    # 启动
    /usr/local/yii2-queue/start.sh
    
    # 查看日志
    tail -f /var/log/yii2-queue.log

关键说明

  1. 断连重连:外层 while (true) 循环处理重连,断连后休眠 5 秒重试,避免频繁占用资源。
  2. 后台运行:nohup 命令让程序脱离终端,即使关闭 SSH 连接也能继续运行,日志写入文件方便排查问题。
  3. 无需手动加休眠:$channel->wait() 是阻塞等待,无消息时不耗 CPU,比手动 sleep 更高效。

有!PHP 及 Yii2 都有成熟的 RabbitMQ SDK,核心依赖 php-amqplib(PHP 生态最常用的 RabbitMQ 客户端),Yii2 可通过扩展或直接集成该库使用。

一、核心 SDK:php-amqplib

二、Yii2 集成方式(2种方案)

方案1:直接使用 php-amqplib(简单灵活)

1. Yii2 生产者(发消息)
<?php
namespace app\controllers;

use Yii;
use yii\web\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitmqProducerController extends Controller
{
    public function actionSend()
    {
        // 连接RabbitMQ(本地默认配置,远程改host为服务器IP)
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        // 声明队列(持久化)
        $channel->queue_declare('test_queue', false, true, false, false);

        // 消息内容
        $messageBody = 'Yii2 发送的 RabbitMQ 消息';
        $message = new AMQPMessage(
            $messageBody,
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] // 消息持久化
        );

        // 发送消息
        $channel->basic_publish($message, '', 'test_queue');
        echo "消息发送成功:{$messageBody}\n";

        // 关闭连接
        $channel->close();
        $connection->close();

        return '发送完成';
    }
}
2. Yii2 消费者(订阅+主动推送,长期运行)
<?php
namespace app\console\controllers;

use yii\console\Controller;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class RabbitmqConsumerController extends Controller
{
    public function actionListen()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        $channel = $connection->channel();

        // 与生产者队列一致(持久化)
        $channel->queue_declare('test_queue', false, true, false, false);
        // 预取数:每次只推1条,处理完再推
        $channel->basic_qos(null, 1, null);

        // 消息回调(RabbitMQ主动推送时触发)
        $callback = function ($msg) {
            echo "收到推送消息:{$msg->body}\n";
            $msg->ack(); // 手动确认消息
        };

        // 注册订阅(关闭自动确认)
        $channel->basic_consume('test_queue', '', false, false, false, false, $callback);

        echo "等待接收消息...(按Ctrl+C退出)\n";
        // 客户端内部死循环,阻塞监听(无需手动写循环/休眠)
        while ($channel->is_consuming()) {
            $channel->wait(); // 阻塞等待消息,无消息时不占CPU
        }

        $channel->close();
        $connection->close();
    }
}

方案2:Yii2 专用扩展(更贴合框架)

推荐使用 vladimir-yuldashev/yii2-queue 扩展,支持 RabbitMQ 作为驱动,整合了 Yii2 的日志、依赖注入等特性:

  1. 安装扩展:

    composer require vladimir-yuldashev/yii2-queue
  2. 配置 config/main.php

    'components' => [
        'queue' => [
            'class' => \yii\queue\amqp\Queue::class,
            'host' => 'localhost',
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'queueName' => 'test_queue',
            'driver' => \yii\queue\amqp\ drivers\PhpAmqpLib::class,
        ],
    ],
  3. 消费者运行(Yii2 命令行):

    yii queue/listen

关键说明

  • 消费者逻辑:$channel->wait() 或扩展的 queue/listen 会启动阻塞循环,无需手动写死循环或加休眠,无消息时几乎不耗CPU。
  • 长期运行:Linux/macOS 用 nohup yii queue/listen & 后台运行,Windows 用任务计划程序设开机自启。

RabbitMQ 安装使用极简指导(Windows/macOS/Linux通用)

核心逻辑:先装Erlang(RabbitMQ依赖)→ 装RabbitMQ → 启动服务 → 可视化管理 → 简单收发消息。

一、安装准备(必做)

1. 安装Erlang

  • 原因:RabbitMQ基于Erlang开发,必须先装且版本匹配(官网有版本对照表,新手直接装最新版即可)。
  • 下载:Erlang官网,按系统选安装包(Windows双击安装,mac用brew install erlang,Linux用apt/yum install erlang)。
  • 验证:终端输入 erl -v,显示版本号即成功。

2. 安装RabbitMQ

  • 下载:RabbitMQ官网,选对应系统安装包(Windows/mac/Linux均有一键安装包)。
  • 关键步骤:

    • Windows:安装后默认路径 C:\Program Files\RabbitMQ Server\,需配置环境变量(将sbin目录加入PATH)。
    • macOS:brew install rabbitmq,安装后路径 /usr/local/Cellar/rabbitmq/
    • Linux(Ubuntu):sudo apt-get install rabbitmq-server,自动启动服务。

二、启动与基础配置

1. 启动RabbitMQ服务

  • Windows:以管理员身份打开终端,输入 rabbitmq-server start(启动),rabbitmq-server stop(停止)。
  • macOS/Linux:sudo rabbitmq-server start(启动),sudo rabbitmqctl stop(停止)。
  • 验证:终端输入 rabbitmqctl status,无报错即服务正常。

2. 启用管理插件(可视化界面,必开)

  • 终端输入 rabbitmq-plugins enable rabbitmq_management,启用后访问 http://localhost:15672
  • 默认账号密码:guest/guest(仅本地访问可用,远程访问需新建账号)。

3. 新建远程访问账号(可选,跨设备连接用)

  • 终端执行3条命令:

    1. rabbitmqctl add_user 用户名 密码(创建账号)
    2. rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"(授予所有权限)
    3. rabbitmqctl set_user_tags 用户名 administrator(设为管理员角色)

三、快速使用(以Python为例,实现消息收发)

1. 安装Python客户端库

  • 终端输入 pip install pika(RabbitMQ的Python驱动)。

2. 步骤1:创建生产者(往队列发消息)

import pika

# 连接RabbitMQ服务(本地默认地址+端口,远程需改host为RabbitMQ服务器IP)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明队列(不存在则创建, durable=True 设为持久化,防止服务重启消息丢失)
channel.queue_declare(queue='test_queue', durable=True)

# 发送消息(body为消息内容,需转字节)
channel.basic_publish(
    exchange='',
    routing_key='test_queue',  # 队列名,与声明一致
    body='这是一条测试消息!',
    properties=pika.BasicProperties(delivery_mode=2)  # 消息持久化
)

print("消息发送成功!")
connection.close()  # 关闭连接

3. 步骤2:创建消费者(订阅队列,接收推送消息)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='test_queue', durable=True)  # 与生产者队列名一致

# 消息回调函数(RabbitMQ主动推送消息时触发)
def callback(ch, method, properties, body):
    print(f"收到推送消息:{body.decode('utf-8')}")
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动确认消息已处理

# 注册订阅(auto_ack=False 关闭自动确认,需手动ack)
channel.basic_consume(queue='test_queue', on_message_callback=callback, auto_ack=False)
# 设置预取数(每次只推1条,处理完再推下一条,防止过载)
channel.basic_qos(prefetch_count=1)

print("等待接收消息...(按Ctrl+C退出)")
channel.start_consuming()  # 持续监听队列,接收推送

4. 测试流程

  1. 启动RabbitMQ服务(rabbitmq-server start)。
  2. 运行消费者程序(开始监听队列)。
  3. 运行生产者程序(发送消息)。
  4. 消费者终端会立即收到RabbitMQ主动推送的消息。

需要我帮你排查 安装失败问题,或生成 Java/Go语言的收发消息代码,亦或是配置 RabbitMQ集群(高可用) 吗?

MySQL 计算经纬度半径内最近商家,核心用 Haversine 公式(球面距离计算),结合索引优化提升查询效率,以下是完整实现方案:

一、核心公式(Haversine)

计算两点(用户经纬度 vs 商家经纬度)的球面距离(单位:公里):

6371 * 2 * ASIN(
    SQRT(
        POWER(SIN((用户纬度 - 商家纬度) * PI() / 180 / 2), 2) +
        COS(用户纬度 * PI() / 180) * COS(商家纬度 * PI() / 180) *
        POWER(SIN((用户经度 - 商家经度) * PI() / 180 / 2), 2)
    )
) AS distance
  • 6371 是地球平均半径(公里),需米则换为 6371000。
  • 经纬度需先转弧度( * PI() / 180 ),避免计算误差。

二、完整查询示例(返回半径 5 公里内最近商家)

  1. 表结构假设(商家表  merchant )

    CREATE TABLE merchant (
     id INT PRIMARY KEY AUTO_INCREMENT,
     name VARCHAR(50) COMMENT '商家名称',
     lat DECIMAL(10,6) NOT NULL COMMENT '纬度(如:39.9042)',
     lng DECIMAL(10,6) NOT NULL COMMENT '经度(如:116.4074)',
     address VARCHAR(200) COMMENT '地址'
    );
  2. 查询 SQL(用户经纬度:39.9042, 116.4074)

    SELECT 
     id, name, lat, lng, address,
     -- 计算距离(公里)
     6371 * 2 * ASIN(
         SQRT(
             POWER(SIN((39.9042 - lat) * PI() / 180 / 2), 2) +
             COS(39.9042 * PI() / 180) * COS(lat * PI() / 180) *
             POWER(SIN((116.4074 - lng) * PI() / 180 / 2), 2)
         )
     ) AS distance
    FROM merchant
    -- 筛选 5 公里内(先粗筛,提升效率)
    WHERE 
     lat BETWEEN 39.9042 - 5 / 111.12 AND 39.9042 + 5 / 111.12  -- 纬度每度约 111.12 公里
     AND lng BETWEEN 116.4074 - 5 / (111.12 * COS(39.9042 * PI() / 180)) AND 116.4074 + 5 / (111.12 * COS(39.9042 * PI() / 180))
    -- 按距离排序(最近在前)
    ORDER BY distance ASC
    -- 限制返回数量
    LIMIT 10;

三、性能优化(大数据量必备)

  1. 添加空间索引(MySQL 5.7+ 支持)

    -- 方式 1:创建经纬度联合索引(基础优化)
    CREATE INDEX idx_merchant_lat_lng ON merchant(lat, lng);
    
    -- 方式 2:创建空间索引(高效,支持 GIS 函数)
    ALTER TABLE merchant ADD COLUMN location POINT NOT NULL COMMENT '经纬度空间点' AFTER lng;
    -- 更新空间点数据(lat 纬度,lng 经度,注意顺序)
    UPDATE merchant SET location = POINT(lng, lat);
    -- 创建空间索引
    CREATE SPATIAL INDEX idx_merchant_location ON merchant(location);
  2. 空间索引查询(更高效)

    SELECT 
     id, name, lat, lng, address,
     -- ST_Distance_Sphere 直接计算球面距离(米),MySQL 8.0+ 支持
     ST_Distance_Sphere(POINT(116.4074, 39.9042), location) / 1000 AS distance  -- 转公里
    FROM merchant
    -- 筛选 5 公里内(空间范围查询)
    WHERE ST_DWithin(
     location,
     POINT(116.4074, 39.9042),
     5000  -- 半径(米)
    )
    ORDER BY distance ASC
    LIMIT 10;

四、关键注意事项

1. 经纬度格式:确保存储为  DECIMAL(10,6) (保留 6 位小数,精度约 10 厘米),避免用  FLOAT (精度丢失)。
2. 粗筛逻辑:未用空间索引时,先通过  BETWEEN  筛选大致经纬度范围,减少计算量。
3. 版本支持: ST_Distance_Sphere  是 MySQL 8.0+ 函数,5.7 及以下用 Haversine 公式。

五、应用层整合(如 PHP)

将用户经纬度(前端获取)传入 SQL,示例(Laravel):

$userLat = 39.9042; // 用户纬度(前端定位获取)
$userLng = 116.4074; // 用户经度
$radius = 5; // 搜索半径(公里)

$merchants = DB::table('merchant')
    ->selectRaw("
        id, name, lat, lng, address,
        6371 * 2 * ASIN(
            SQRT(
                POWER(SIN(($userLat - lat) * PI() / 180 / 2), 2) +
                COS($userLat * PI() / 180) * COS(lat * PI() / 180) *
                POWER(SIN(($userLng - lng) * PI() / 180 / 2), 2)
            )
        ) AS distance
    ")
    ->whereRaw("
        lat BETWEEN ? - ? / 111.12 AND ? + ? / 111.12
        AND lng BETWEEN ? - ? / (111.12 * COS(? * PI() / 180)) AND ? + ? / (111.12 * COS(? * PI() / 180))
    ", [$userLat, $radius, $userLat, $radius, $userLng, $radius, $userLat, $userLng, $radius, $userLat])
    ->orderBy('distance')
    ->limit(10)
    ->get();