Skip to main content

IM顶层设计


IM顶层设计

整体架构图

集群推送

http是无状态的,每次请求进行三次握手,然后发送消息获取响应,因此HTTP可以随意的做负载均衡

但是WebSocket是有状态的,当建立了 TCP链接之后,就会一直复用那个通道进行通信。

我们之前在用户登录的时候,保存到<uid,channel>的映射关系,但是是保存到一台JVM的内存里的,如果有集群,多台JVM运行,那么这种方式就失效了

Redis存储channel(❌)

用Redis中心化来 存储<uid,channel>的映射关系,使用的时候,再反序列化为channel使用

问题:channel是本地的socket连接,没法进行存储和 反序列化 ,它是有状态的

精准投放消息

我们本地依然要去维护<uid,channel>的映射关系

IM设计.svg

比如现在A想要给C发送一条消息,那么 就有以下步骤:

  1. A通过Websocket的 Channel将消息发送到了IM后台服务。
  2. IM服务将消息持久化 ,然后 通过Router,想要转发消息给C
  3. Router去Redis中找到C的服务器IP地址,Router会和所有的Websocket建立一条TCP链接,然后发送到对应的IP地址
  4. WebSocket收到A发送的消息后 ,通过本地的<uid,channel>的映射关系,然后找到C所在的Channel,调用channel,write将消息 发送给C

问题:

  1. 需要去频繁更新Redis维护用户和WebSocket的服务映射
  2. 连接数💥问题:websocket的瓶颈在于连接数,连接数满了,那么就会扩容WebSocket,使得可以支撑更多人同时在线,如果用户量非常大,那么就需要上千个WebSocket和 上千个Router

我们每个Router都会链接所有的WebSocket,总的连接数就是他们的乘积,1000*1000=1百万💥

链接爆炸.svg

这个问题也会出现在Dubbo中,如果Dubbo单实例集群达到1000以上,如何做?

分层路由思想:

分层路由.svg

我们在中间多加一层路由,就可以有效的减少非常多的连接数

  1. 消息发送开销:对于一个群聊,假如有1000个用户,那么就需要1000次推送,这个是必须的,但是我们的IM这边,通过Router进行转发,路由服务对每个用户都需要1000次的路由,每个用户都需要路由到每个WebSocket,需要把1条消息路由1000次,这部分是很大的开销
  2. 延迟叠加:即使使用多线程,也需要对所有的群成员进行消息的扇出(写扩散),会导致接收者接到的消息延迟叠加。
  3. 雪崩问题:如果WebSocket推送不及时,我们又是同步调用,那么WebSocket就会反向传导到Router服务 ,导致Router变慢 ,然后继续往回 传导到IM服务 ,导致全部变慢 ,导致所有的服务不可用,这个时候就需要MQ来做一个削峰填谷。

消息扇出:万人群聊,发一条消息,得在Router查一万次 这个用户在哪里,然后对一万个用户都进行一次消息的发送

集群广播

消息发送到IM服务后,IM服务会将消息写入消息队列,消息队列是一个广播消息,所有的WebSocket都去监听这个广播Topic,

所有的消息会推送到每一个WebSocket上,WebSocket拉取到消息后,它会来比对这个消息是否是自己需要的(会路由所有的WebSocket,WebSocket自己过滤)

对比之前的精准投递,如果一个群有一万个人,那么就会路由一万次,发送到这个WebSocket,再由WebSocket发送到具体用户

集群广播.svg

WebSocket消息过滤:

WebSocket消息过滤.svg

共享消息副本

在万人群聊中,大家收到的消息都是一样的,那么我们就可以共用一个消息体,在投递消息到MQ的时候,可以设置投递的消息为List<Long>uids,而不是拆分多次投递

投递公式:

精准投递:u*3份消息,u=群聊在线人数,3=消息传输的链路,固定值
集群广播:u+n份消息,u=群聊在线人数,n=websocket的集群数,不固定,但是极小。

缺点 :只适合大群聊,小群聊或者单聊占比多,就会失去优势

消息过滤优化

我们消息是被发送到WebSocket的地方进行UID的过滤,这时候就会有大量与本websocket无关的消息拉取过来,中间的网络IO就会 浪费了。

集群广播的最大问题就在于很多不属于我们websocket的消息推送,也会被我们读取,然后在本地过滤,这里浪费了网络IO,浪费了本地反序列化的CPU

解决办法:

  1. 将推送的uids放到header里面,拉取消息的时候,先在MQ的过滤器过滤消息,节省不必要的反序列化

    header长度有限,大群聊uid过多分批发送,或者群聊不过滤,考虑单聊场景

  2. 魔改RocketMQ,在broker里面过滤,节省 IO

百万直播间消息推送

问题背景:

  • 在抖音直播场景下,一个热门直播间可能同时有100万人在线,大量礼物和互动消息需要即时推送给所有观众。

方案设计:

  • 采用集群推送方式,将用户的 WebSocket 连接集中路由到固定的几百台服务器上,以减少扩散系数。
  • 对于超过1万人的热门直播间,进行升级处理:
    • 动态扩容 WebSocket 服务器集群至 50 台左右,并通过网关路由将用户连接重新分配到新集群。
    • 对于这种热门直播间,可以省略升级过程,直接分配到固定集群。
  • 消息推送采用广播方式,减少带宽开销:
    • 将同一直播间的所有消息(如点赞、弹幕等)广播到集群内的 WebSocket 服务器,再由这些服务器转发给对应的用户。
  • 对消息进行优先级隔离:
    • 将高优先级消息(如大礼物、主播发言)和低优先级消息(如普通弹幕)分开推送,避免相互影响。

其他优化:

  • 客户端和服务端都进行消息合并,减少实际推送量。
  • 依赖网关路由服务来确定消息推送的目标(普通直播间 vs 热门直播间)。

消息时序性

消息的顺序性:

每条消息发送来,服务端收到的消息是并发的,那么就可能出现发送方和接收方顺序不一致的情况

解决办法:我们可以让客户端加上时间戳,展示的时候,再根据时间戳进行排序

微信的设计:单聊消息 MsgSeq 字段的作用及说明:该字段在发送消息时由用户自行指定,该值可以重复,非后台生成,非全局唯一。与群聊消息的 MsgSeq 字段不同,群聊消息的 MsgSeq 由后台生成,每个群都维护一个 MsgSeq,从1开始严格递增。单聊消息历史记录对同一个会话的消息先以时间戳排序,同秒内的消息再以 MsgSeq 排序。

参考上面的设计,我们可以给消息设置一个本地的自增id,发送消息的时候带上。排序整体以服务器的时间为准,相同秒内的排序以自增id为准。

在群聊场景下,每个成员的客户端时间不一样 ,没法 统一排序 ,只能用服务端时间排序。

服务端排序:对于单表来说,可以按照主键ID进行排序,也可以按照时间戳,但是时间戳也会有精度问题,到毫秒左右足够了。

消息的唯一性:

游标翻页的时候,需要保证顺序性和唯一性,如果单纯用时间戳,毫秒内的消息没办法区分和排序,得额外有其他字段排序,不如统一用有序的ID作为游标

综上:

单聊可以用客户端seq来保证多条消息的顺序

用消息ID可以满足顺序性和唯一性要求

消息ID设计

我们希望消息的ID具有唯一,有序的特点

唯一可以用分布式ID实现,有序如何保证?

全局递增

单表:主键保证了递增

但是消息大了,就必须要分库分表,采用分布式ID,但是分布式ID是趋势递增,而不是单调递增

严格单调递增会有严重的单点竞争,难以实现

会话级别递增

我们想要递增就是为了满足消息的顺序性,那么只要满足一个群组里面消息ID唯一且有序就足够了。

如何保证会话级别递增?分库分表以会话ID分表,相同会话必定在同一表中。但是这种方案很难进行扩容

所以还是采用分布式ID保证会话级别的单调递增

收信箱递增

会话级别递增适合读扩散场景,所有人都去拉取消息列表,去会话的消息表拉取

收信箱的递增适合写扩散场景,所有人都有自己的收信箱,维护自己的时间线。收信箱的单调递增和uid相关

消息可靠性ACK

发出去的消息要保证能够被对方收到,主要有两个场景

  • 发送方发送消息到服务端,服务端入库成功返回ACK
  • 服务端推送消息到接收方,接收方返回ACK

发送可靠性保证

发送消息不能用TCP的ACK来保证,因为请求到达服务端的时候,TCP的ACK已经确认了,但是我们的业务逻辑中也可能失败,因此只有业务返回的ACK才可以保证可靠性

我们使用HTTP发送消息,通过返回的表示,判断消息是否发送成功

推送可靠性保证

服务端消息入库后,推送给接收方,要保证接收方可以收到消息,推送给每个人的ACK都需要确认,然后修改这条消息的状态持久化道数据库。

重试机制:如果信箱没有收到ACK,说明消息没有到达接收方,那么就会进行重试,可以使用定时任务来完成,但是如果用户不在线,不可能一直去推送,所以我们的定时任务只会去拉取内存中的ACK队列进行推送,并且只推送在线消息。如果内存队列ACK满了,那么我们可以使用LRU算法,淘汰老的消息

离线推送:对于不在线的用户,在下一次连接上的时候,再推送消息。

如果用户很长时间没上线,没有ACK的消息过多,一次性推送所有消息,容易产生问题,可以分批推送

万人群聊中,1条消息要写1万人的收信箱,写扩散系数💥,每条消息都要写一份消息id到每个人的信箱

消息重复-幂等性

最终一致性持久化重试幂等

在分布式场景下一般为了达到最终一致性,失败了我们也需要进行重试,重试就有幂等问题

发送幂等性设计

发送消息的时候,如果网络有问题,那么底层会帮我们重试,如何保证发送的幂等性,不重复发送?靠的是发送端生成一个唯一标识,后端根据这个唯一标识做幂等,但是我们后端只看1s内的去重,发送端也限制1s内不成功就放弃

接收幂等性设计

靠消息的ID来保证

会话级别的消息,判断会话id+消息id

推拉结合

推模式

服务端主动推送给前端,后台维护定时任务,定时推送未接收到ack的消息。

拉模式

短轮询和长轮询。前端主动询问后端是否有新消息。主要用在拉取历史消息列表

推拉结合

推主要是保证及时性。而拉主要是保证最终一致性

推模式(服务端主动推送)虽然能保证消息及时性,但需要处理推送失败的情况,服务端还需要启动定时任务确保接收方确认(ACK),整个方案比较复杂,对服务器也有较大消耗。

实际上推送失败概率并不高,如果客户端定期主动拉取消息,就可以达到最终一致性,无需服务端的复杂处理。

因此采用推拉结合的方式:

  • 推模式主要用于保证消息及时性
  • 拉模式主要用于保证消息最终一致性和可靠性
  1. 推送模式下,推送消息可以是无状态的,只是为了触发客户端拉取,实现很简单。

  2. 客户端拉取消息时,只需带上token即可,服务端查询未确认的消息并全部返回。

  3. 客户端接收到消息后可批量确认(ACK),服务端收到确认后将消息标记为已读。

  4. 为了优化查询性能,服务端可以建立"用户ID + 未确认"的联合索引。

  5. 通过推拉结合,可以实现无状态请求,减轻服务端压力,并提高客户端拉取效率。但依然没有解决大群聊下写扩散的问题。

多端同步-客户端游标

推拉结合的方案在多端同步的场景下会失效。因为在单端场景下,消息的状态只有一个ack(是否已读)标记。但在多端场景下,需要为每个端独立维护ack状态,变得复杂。

为了解决这个问题,提出了一种新的方案:

  • 服务端不再维护ack状态,而是由客户端维护自己的"游标"(已读到的最后一条消息的ID)
  • 客户端在拉取消息时,需要带上自己的游标
  • 服务端根据客户端的游标,查询出大于该游标的所有新消息返回

这种方案的关键是:

  • 消息ID需要是全局递增的,这样服务端只需维护一个游标
  • 如果消息ID是收信箱级别递增的,也可以实现类似效果,只是客户端需要维护每个会话的游标

这种基于客户端游标的方案可以很好地解决多端同步的问题,避免了服务端维护多端ack状态的复杂性。

由于我们的聊天室不需要保存历史消息,可以采用更简单的方案:登录每个端都加载最新的消息,后续通过拉取新消息的方式实现同步。

群聊设计

群聊设计,分为群聊和单聊(一对一)单聊也就是两个人在一个房间聊天,可以抽象出来

单聊群聊.svg

数据库设计:https://drawsql.app/teams/-328/diagrams/-6open in new window

image-20240521145455286

消息已读未读

我们想要消息有以下信息显示:多少人已读,未读、已读列表、未读列表

我们还是需要维护每个人对消息的ACK,每条消息都要投递到用户的收信箱,并且需要记录已读未读的状态,用做ACK统计

流程如下:

  1. 有人发了条消息,群消息入库,并插入到群成员A和B的收信箱,此时都是未读状态
  2. 推送服务通知A和B拉取新消息。
  3. A正好在线,拉取了新消息,拉到消息后,返回ACK。
  4. 推送服务标记A对新消息的ACK。
  5. ack的消息同时需要推送给消息的发送者,同步更新
  6. 发消息的人想看看消息的相关情况。数据库查询

收信箱写扩散💥

万人群聊,发送一条消息,就要写入到1万人的收信箱,每人发一条消息,就有1亿的消息量要存储

解决办法:用户收信箱不记录ACK,只记录阅读的最新时间线,这样只和群成员数有关,每个人一行阅读记录,然后在统计有多少人阅读的时候,只要去看这个人的阅读最新时间线是否>消息的发送时间

消息阅读推送

有人阅读消息后,消息阅读数+1,如何显示给用户?

最简单的办法,有人阅读就+1,然后推送给前端,但是这样频率太高了。可以采用服务端定时任务的方式。定时任务的间隔,就成为了合并的时间窗口。

实现的时候,我们只要去做最近一个频率的消息阅读量,不需要为每个消息都去统计已读未读数量。通过前端的定时请求,让消息的已读数更新,避免了性能的无效浪费

左侧会话列表

会话列表排序 :左侧的会话列表要按照发送消息的最新时间来进行排序,并且是倒叙,最新时间的要放在上面

消息未读数:会话列表要可以显示未读的消息数,这个可以直接按照自己最新读取消息的时间去查询,但是如果很多年没上线,消息数实在太多了,这个时候,我们可以加个限制,最多显示99条未读数。也就是在SQL语句的最后加个limit 100

热点群聊-读写扩散

万人群聊,每条消息都需要写入用户的收信箱,也就是会话表的active_time字段,更新了这个字段,最新消息的会话才会排在前面。

这个更新时间要记,不扩散写到用户的收信箱,而是单独写到热点信箱,用户读取的时候,综合读取:自己的信箱+热点信箱,将两个合并在一起

消息多类型设计

消息支持多种格式,主要用两个字段来 解决:

  • type:消息是什么类型
  • extra:消息详情

数据库设计

DROP TABLE IF EXISTS `room`;
CREATE TABLE `room` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `type` int(11) NOT NULL COMMENT '房间类型 1群聊 2单聊',
  `hot_flag` int(11) DEFAULT '0' COMMENT '是否全员展示 0否 1是',
  `active_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '群最后消息的更新时间(热点群不需要写扩散,只更新这里)',
  `last_msg_id` bigint(20) DEFAULT NULL COMMENT '会话中的最后一条消息id',
  `ext_json` json DEFAULT NULL COMMENT '额外信息(根据不同类型房间有不同存储的东西)',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_create_time` (`create_time`) USING BTREE,
  KEY `idx_update_time` (`update_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='房间表';
DROP TABLE IF EXISTS `room_friend`;
CREATE TABLE `room_friend` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `room_id` bigint(20) NOT NULL COMMENT '房间id',
  `uid1` bigint(20) NOT NULL COMMENT 'uid1(更小的uid)',
  `uid2` bigint(20) NOT NULL COMMENT 'uid2(更大的uid)',
  `room_key` varchar(64) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '房间key由两个uid拼接,先做排序uid1_uid2',
  `status` int(11) NOT NULL COMMENT '房间状态 0正常 1禁用(删好友了禁用)',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `room_key` (`room_key`) USING BTREE,
  KEY `idx_room_id` (`room_id`) USING BTREE,
  KEY `idx_create_time` (`create_time`) USING BTREE,
  KEY `idx_update_time` (`update_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='单聊房间表';
DROP TABLE IF EXISTS `room_group`;
CREATE TABLE `room_group` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `room_id` bigint(20) NOT NULL COMMENT '房间id',
  `name` varchar(16) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '群名称',
  `avatar` varchar(256) COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '群头像',
  `ext_json` json DEFAULT NULL COMMENT '额外信息(根据不同类型房间有不同存储的东西)',
  `delete_status` int(1) NOT NULL DEFAULT '0' COMMENT '逻辑删除(0-正常,1-删除)',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_room_id` (`room_id`) USING BTREE,
  KEY `idx_create_time` (`create_time`) USING BTREE,
  KEY `idx_update_time` (`update_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='群聊房间表';
DROP TABLE IF EXISTS `group_member`;
CREATE TABLE `group_member` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `group_id` bigint(20) NOT NULL COMMENT '群主id',
  `uid` bigint(20) NOT NULL COMMENT '成员uid',
  `role` int(11) NOT NULL COMMENT '成员角色 1群主 2管理员 3普通成员',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  KEY `idx_group_id_role` (`group_id`,`role`) USING BTREE,
  KEY `idx_create_time` (`create_time`) USING BTREE,
  KEY `idx_update_time` (`update_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='群成员表';
DROP TABLE IF EXISTS `contact`;
CREATE TABLE `contact` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id',
  `uid` bigint(20) NOT NULL COMMENT 'uid',
  `room_id` bigint(20) NOT NULL COMMENT '房间id',
  `read_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '阅读到的时间',
  `active_time` datetime(3) DEFAULT NULL COMMENT '会话内消息最后更新的时间(只有普通会话需要维护,全员会话不需要维护)',
  `last_msg_id` bigint(20) DEFAULT NULL COMMENT '会话最新消息id',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `uniq_uid_room_id` (`uid`,`room_id`) USING BTREE,
  KEY `idx_room_id_read_time` (`room_id`,`read_time`) USING BTREE,
  KEY `idx_create_time` (`create_time`) USING BTREE,
  KEY `idx_update_time` (`update_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='会话列表';
DROP TABLE IF EXISTS `message`;
CREATE TABLE `message`  (
  `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'id',
  `room_id` bigint(20) NOT NULL COMMENT '会话表id',
  `from_uid` bigint(20) NOT NULL COMMENT '消息发送者uid',
  `content` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '消息内容',
  `reply_msg_id` bigint(20) NULL DEFAULT NULL COMMENT '回复的消息内容',
  `status` int(11) NOT NULL COMMENT '消息状态 0正常 1删除',
  `gap_count` int(11) NULL DEFAULT NULL COMMENT '与回复的消息间隔多少条',
  `type` int(11) NULL DEFAULT 1 COMMENT '消息类型 1正常文本 2.撤回消息',
  `extra` json DEFAULT NULL COMMENT '扩展信息',
  `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间',
  `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE,
  INDEX `idx_room_id`(`room_id`) USING BTREE,
  INDEX `idx_from_uid`(`from_uid`) USING BTREE,
  INDEX `idx_create_time`(`create_time`) USING BTREE,
  INDEX `idx_update_time`(`update_time`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '消息表' ROW_FORMAT = Dynamic;
INSERT INTO `user` (`id`, `name`, `avatar`, `sex`, `open_id`, `last_opt_time`, `ip_info`, `item_id`, `status`, `create_time`, `update_time`) VALUES (1, '系统消息', 'http://mms1.baidu.com/it/u=1979830414,2984779047&fm=253&app=138&f=JPEG&fmt=auto&q=75?w=500&h=500', NULL, '0', '2023-07-01 11:58:24.605', NULL, NULL, 0, '2023-07-01 11:58:24.605', '2023-07-01 12:02:56.900');
insert INTO `room`(`id`,`type`,`hot_flag`) values (1,1,1);
insert INTO `room_group`(`id`,`room_id`,`name`,`avatar`) values (1,1,'抹茶全员群','https://mallchat.cn/assets/logo-e81cd252.jpeg');

image-20240521181913467