Skip to main content

RocketMQ集群推送消息


集群推送消息实现

RocketMQ部署

Mac M1直接使用Docker的RocketMQ会有问题,因为没有arm版本的,需要自己编排容器镜像docker-compose.yml

version: "3.0"
services:
  namesrv:
    image: candice0630/rocketmq:5.0.0-alpine
    container_name: rocketmqNameServer
    #restart: always
    volumes:
      #挂载路径,冒号左边为服务器本地路径,冒号右边为容器内部路径
      - /Users/houyunfei/tools/docker-volumes/rocketmq/nameServer/logs:/root/logs
      - /Users/houyunfei/tools/docker-volumes/rocketmq/nameServer/store:/root/store
      # - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runserver.sh:/home/rocketmq/rocketmq-5.0.0/bin/runserver.sh
      # - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runbroker.sh:/home/rocketmq/rocketmq-5.0.0/bin/runbroker.sh
      # - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/tools.sh:/home/rocketmq/rocketmq-5.0.0/bin/tools.sh

    environment:
      - MEM_XMS=500m
      - MEM_XMX=500m
      - MEM_XMN=256m #MAX_POSSIBLE_HEAP: 100000000
    command:
      #服务启动
      sh mqnamesrv
    #platform: linux/amd64
    ports:
      - "9876:9876"

  rocketmqBroker:
    image: candice0630/rocketmq:5.0.0-alpine
    container_name: rocketmqBroker
    #restart: always
    volumes:
      #挂载路径,冒号左边为服务器本地路径,冒号右边为容器内部路径
      - /Users/houyunfei/tools/docker-volumes/rocketmq/broker/logs:/root/logs
      - /Users/houyunfei/tools/docker-volumes/rocketmq/broker/store:/root/store
      - /Users/houyunfei/tools/docker-volumes/rocketmq/broker.conf:/home/rocketmq/rocketmq-5.0.0/conf/broker.conf
      # - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runserver.sh:/home/rocketmq/rocketmq-5.0.0/bin/runserver.sh
      # - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/runbroker.sh:/home/rocketmq/rocketmq-5.0.0/bin/runbroker.sh
      # - /Users/houyunfei/tools/docker-volumes/rocketmq/mq/bin/tools.sh:/home/rocketmq/rocketmq-5.0.0/bin/tools.sh

    depends_on:
      - namesrv

    environment:
      - MEM_XMS=500m
      - MEM_XMX=500m
      - MEM_XMN=256m
      - NAMESRV_ADDR:物理ip地址:9876
      - BROKER_ID=0
      - BROKER_ROLE=ASYNC_MASTER
      - FLUSH_DISK_TYPE=ASYNC_FLUSH #MAX_POSSIBLE_HEAP: 200000000
    command:
      # 服务启动
      sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-5.0.0/conf/broker.conf
    #platform: linux/amd64
    ports:
      - "10909:10909"
      - "10911:10911"
      - "10912:10912"

  rocketmqConsole:
    image: candice0630/rocketmq-console-ng:2.0
    container_name: rocketmqConsole
    depends_on:
      - namesrv
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=namesrv:9876 -Drocketmq.config.isVIPChannel=false -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    #platform: linux/amd64
    ports:
      - 19876:8080

挂在目录需要设置为自己的,也就是代码中左侧目录部分,还需要配置一个配置文件

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
#  设置broker节点所在服务器的ip地址、物理ip,不能用127.0.0.1、localhost、docker内网ip
brokerIP1 = 192.168.111.182
# 磁盘使用达到95%之后,生产者再写入消息会报错 CODE: 14 DESC: service not available now, maybe disk full
diskMaxUsedSpaceRatio=95

上面的代码中,倒数第三行中的ip需要设置为自己本地的ip,否则可能会失败。具体可以ifconfig查看

然后启动容器:

docker-compose up -d 

image-20240529142734139

如果想要停止:该命令会停止并删除由 docker-compose up 命令启动的容器(只会关闭当前目录启动的,也就是docker-compose.yml文件所在目录)。

docker-compose down

RocketMQ推送消息

我们之前发送消息的逻辑中,发送消息的逻辑实在监听器里:

/**
 * 发送消息
 */
@Override
@Transactional
public Long  sendMsg(ChatMessageReq request, Long uid) {
    check(request, uid);
    // todo 保存消息
    AbstractMsgHandler<?> msgHandler = MsgHandlerFactory.getStrategyNoNull(request.getMsgType());
    Long msgId = msgHandler.checkAndSaveMsg(request, uid);
    // 发布消息发送事件
    applicationEventPublisher.publishEvent(new MessageSendEvent(this, msgId));
    return msgId;
}

接着我们看监听器里的实现:

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT, classes = MessageSendEvent.class, fallbackExecution = true)
public void messageRoute(MessageSendEvent event) {
    Long msgId = event.getMsgId();
    // todo RocketMQ 进行消息路由
    mqProducer.sendSecureMsg(MQConstant.SEND_MSG_TOPIC, new MsgSendMessageDTO(msgId), msgId);
}

注意这里phase = TransactionPhase.BEFORE_COMMIT表示是在事务提交之前执行。

我们使用RocketMQ进行消息路由,发送到SEND_MSG_TOPIC节点

下面是MsgSendConsumer

/**
 * Description: 发送消息更新房间收信箱,并同步给房间成员信箱
 */
@RocketMQMessageListener(consumerGroup = MQConstant.SEND_MSG_GROUP, topic = MQConstant.SEND_MSG_TOPIC)
@Component
public class MsgSendConsumer implements RocketMQListener<MsgSendMessageDTO> {
    @Override
    public void onMessage(MsgSendMessageDTO dto) {
        Message message = messageDao.getById(dto.getMsgId());
        Room room = roomCache.get(message.getRoomId());
        ChatMessageResp msgResp = chatService.getMsgResp(message, null);
        //所有房间更新房间最新消息
        roomService.refreshActiveTime(room.getId(), message.getId(), message.getCreateTime());
        roomCache.delete(room.getId());
        if (room.isHotRoom()) {//热门群聊推送所有在线的人
            //更新热门群聊时间-redis
            hotRoomCache.refreshActiveTime(room.getId(), message.getCreateTime());
            //推送所有人
            pushService.sendPushMsg(WSAdapter.buildMsgSend(msgResp));
        } else {
            List<Long> memberUidList = new ArrayList<>();
            if (Objects.equals(room.getType(), RoomTypeEnum.GROUP.getType())) {//普通群聊推送所有群成员
                memberUidList = groupMemberCache.getMemberUidList(room.getId());
            } else if (Objects.equals(room.getType(), RoomTypeEnum.FRIEND.getType())) {//单聊对象
                //对单人推送
                RoomFriend roomFriend = roomFriendService.getByRoomId(room.getId());
                memberUidList = Arrays.asList(roomFriend.getUid1(), roomFriend.getUid2());
            }
            //更新所有群成员的会话时间
            contactService.refreshOrCreateActiveTime(room.getId(), memberUidList, message.getId(), message.getCreateTime());
            //推送房间成员
            pushService.sendPushMsg(WSAdapter.buildMsgSend(msgResp), memberUidList);
        }
    }
}

这个是一个 RocketMQ 消息监听器,用于处理消息发送更新房间收信箱并同步给房间成员收信箱的任务。以下是对代码的解析:

  1. 该类 MsgSendConsumer 实现了 RocketMQListener 接口,监听 MQConstant.SEND_MSG_TOPIC 主题下的消息。
  2. 调用 roomService.refreshActiveTime 方法更新房间最新消息时间。目的是让左侧的消息列表进行排序
  3. 判断该房间是否为热门群聊:
    • 如果是热门群聊,则调用 hotRoomCache.refreshActiveTime 方法更新热门群聊最近活跃时间,并调用 pushService.sendPushMsg 方法推送消息给所有在线用户。
    • 如果不是热门群聊,则根据房间类型获取相应的成员 uid 列表,调用 contactService.refreshOrCreateActiveTime 方法更新成员最近活跃时间,并调用 pushService.sendPushMsg 方法推送消息给房间成员。

接下来我们去监听这个topic的消息,并通过WebSocketService推送给前端

@RocketMQMessageListener(topic = MQConstant.PUSH_TOPIC, consumerGroup = MQConstant.PUSH_GROUP, messageModel = MessageModel.BROADCASTING)
@Component
public class PushConsumer implements RocketMQListener<PushMessageDTO> {
    @Autowired
    private WebSocketService webSocketService;

    @Override
    public void onMessage(PushMessageDTO message) {
        WSPushTypeEnum wsPushTypeEnum = WSPushTypeEnum.of(message.getPushType());
        switch (wsPushTypeEnum) {
            case USER:
                message.getUidList().forEach(uid -> {
                    webSocketService.sendToUid(message.getWsBaseMsg(), uid);
                });
                break;
            case ALL:
                webSocketService.sendToAllOnline(message.getWsBaseMsg(), null);
                break;
        }
    }
}

这个代码是一个 RocketMQ 消息监听器,用于处理推送消息的任务。以下是对代码的解析:

  1. 该类 PushConsumer 实现了 RocketMQListener 接口,监听 MQConstant.PUSH_TOPIC 主题下的消息。注意这里要将messageModel = MessageModel.BROADCASTING,因为原本的模式是CLUSTERING
  2. 根据不同的推送类型,执行不同的推送逻辑:
    • 如果是 USER 类型,则遍历 uidList 中的用户 ID,调用 webSocketService.sendToUid 方法分别推送消息给每个用户。
    • 如果是 ALL 类型,则调用 webSocketService.sendToAllOnline 方法推送消息给所有在线用户。