IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】

news/2024/2/21 10:10:29

0前言

消息收发模型
在这里插入图片描述

在这里插入图片描述
第一张图是一个时序图,第二张图是一个标清楚步骤的流程图,更加清晰。消息的插入环节主要在2步。save部分。主要也是对这个部分就行消息幂等的操作。

前情提要:使用Redis发布 token 以及lua脚本来共同完成消息的幂等

目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期)】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】

1.我开源项目IM重复的原因

  • IM系统中有三个常见的指标。消息可靠(不丢消息)。就是消息不能重复(不重复)。保证消息的时序性(不乱序)。这三个指标非常重要。
  • 消息可靠主要通过报文协议等操作来完成。前面视频有一期讲过报文协议。目前主要采取上述方式去保证消息的可靠性。然后再保证消息可靠性的过程中,有一些需要重试的操作。可能会导致数据库多次插入。需要我们来保证一下消息的幂等。通俗的讲就是保证消息的不重复。
1.客户端会重复的发送消息

客户端是一个timer的机制。客户端a发送给b消息的时候,在0.5秒没有收到b的ack的时候会重发消息,重发三次还没有收到ack视为重发失败
//使用timer机制 检测队列里面是否存在ack,如果存在,则超时重发以及限制次数

伪代码如下。用户在线并且不是重试消息的时候,添加到队列里面。

 if (res.params.online == true && res.params.isretry == "false") {state.queue.offer(state.tempSendMsg);////使用timer机制 检测队列里面是否存在ack,如果存在,则超时重发以及限制次数const result = await retry(fetchDataFn, 3, 1000, res.params.msgid);//三次之后消息还没有发送成功 提示消息发送失败if (result == false) {Toast("消息发送失败,请重新发送");}} else {console.log("【IM日志】 接受消息者没有登录或者是重试消息 ");}

进行重试的js代码

//重试的一个方法
export function retry(fn, maxRetry, timeout,msg) {return new Promise(async (resolve, reject) => {let retryCount = 0;let timer;const run = async () => {try {const result = await fn(msg);resolve(result);} catch (err) {if (retryCount < maxRetry) {retryCount++;clearTimeout(timer);timer = setTimeout(run, timeout);} else {reject(err);}}};timer = setTimeout(run, timeout);});
}
2.mq出现超时等的重试机制

参考上述逻辑图,消息落库的时候异步分发到了mq上面。rocketmq有超时重试机制,会自动重试。导致消息被多次消费。(明天补充个图片例子)

2.如何解决的幂等

为什么要解决幂等,什么情况下出现幂等(明天写);
使用redis做的幂等。redis做幂等其实有两种思路。

一种思路是我目前正在使用的防重 Token 令牌思路。另一种是下游传递唯一请求编号。主要说明防重token令牌的思路。其实差别就是一个redis里面的键被删除了。另一个没有删除。
防重token令牌
在这里插入图片描述
下游传递唯一请求编号如下
在这里插入图片描述

当客户端请求分布式id的时候将其存入redis。也就是获取一个唯一id。当进行消费消息的时候。先判断唯一id在不在。在的话删除redis中的唯一id并且进行业务操作。不再的话就不能进行业务操作来实现的幂等。
流程代码如下所示:

1.获取token以及存储token到redis中;
在loginUser 用户中心服务中

    @RequestMapping(value = "/api/segment/get/{key}")public GenericResponse getSegmentId(@PathVariable("key") String key) {String leafno = get(key, segmentService.getId(key));SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();Long add = opsForSet.add(RedisPrefix.LEAF_PERFIX, leafno);//往集合添加元素/*** 设置一个10分钟的有效期*/
//        stringRedisTemplate.expire(RedisPrefix.LEAF_PERFIX,600, TimeUnit.SECONDS);return GenericResponse.response(ServiceError.NORMAL,leafno );}

我们使用了美团的分布式id来生成分布式id。
2.前台发送消息的时候携带上唯一id

const sendMsg2 = async () => {const { content, toUser } = state;const no = await getLeaf();let data = {// 1代表着私聊的意思type: 1,params: {msgid: no.content,toMessageId: toUser.openid,message: content,fileType: 0,isretry: false,},};if (state.current == 2) {data = {type: 9,params: {toMessageId: state.groupId,message: content,fileType: 0,},};}console.log(data);state.tempSendMsg = data;state.socketServe.send(data);state.recesiveAllMsg.push({type: "self",content: content,});state.content = "";};

这个是发送消息的操作
const no = await getLeaf();这行代码请求后端接口。然后构造消息体。
3.聊天服务(Netty)收到前台消息后 mq异步发送消息

 public void sendMessage(String topic ,ChannelHandlerContext ctx, String message, String toUser, String state, Boolean type, String msgid,String token) {MqMessage messageMQ = new MqMessage();messageMQ.setFromId(SessionUtils.getUser(ctx.channel()).getOpenid());messageMQ.setToId(toUser);messageMQ.setType(state);messageMQ.setInfoContent(message);messageMQ.setTime(new DateTime().toString());messageMQ.setState(type);messageMQ.setMsgid(msgid);messageMQ.setToken(token);messageDispatchService.sendForSave(topic,messageMQ);}

发送给保存的主题
4.业务模块(frist)消费消息

 @Overridepublic void onMessage(String o) {String mqmsg =o;log.info("RocketMqConsumerService=====消费消息:"+mqmsg);//消息内容MqMessage message1 = JSON.parseObject(mqmsg, MqMessage.class);try {ChatDto chatDto = new ChatDto();chatDto.setContent(message1.getInfoContent());chatDto.setToOpenid(message1.getToId());chatDto.setGroup(message1.getState());//将msgid存储进去,方便后续进行updatechatDto.setMsgId(message1.getMsgid());SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();
//            Boolean member = opsForSet.isMember(RedisPrefix.LEAF_PERFIX, message1.getMsgid());if( executeOperation(message1.getMsgid())){
//                Long remove = opsForSet.remove(RedisPrefix.LEAF_PERFIX, message1.getMsgid());//删除元if (message1.getState() !=null){if(message1.getType().equals("onLine")){/*** 用户在线需要去推送一下*/yanUserChatService.saveChat(message1.getFromId(),chatDto,1);SendRequest send = buildSendRequest(message1);//设置过滤应该有的tokenRoseFeignConfig.token.set(message1.getToken());nettyMqFeign.send(send);}else {/*** 离线消息直接落库就链路就结束了*/yanUserChatService.saveChat(message1.getFromId(),chatDto,0);}}}}catch (Exception e){//失败的话需要把redis的这个消息还回去.SetOperations<String, String> opsForSet = stringRedisTemplate.opsForSet();Long add = opsForSet.add(RedisPrefix.LEAF_PERFIX,  message1.getMsgid());//往集合添加元素log.error("consumeMsg 消费mq消息失败.",e);// 处理失败,抛出异常,消息会根据重试策略稍后重新消费throw new RuntimeException("处理消息时发生错误,消息将被重新消费。");}}

lua表达式
目前使用redis的类型是set,键是yan_leaf

    /*** 幂等的方法,判断list存不存在。存在的话直接删除,下次进来就不存在了。* @param token* @return*/public boolean executeOperation(String token) {// Lua脚本String script = "if redis.call('sismember', KEYS[1], ARGV[1]) == 1 then return redis.call('srem', KEYS[1], ARGV[1]) else return 0 end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);// 执行Lua脚本Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(RedisPrefix.LEAF_PERFIX), token);// 根据Lua脚本执行结果判断操作是否执行return result != null && result > 0;}

通过这个lua防止并发请求进来导致幂等失败


https://www.xjx100.cn/news/3270984.html

相关文章

使用深度学习进行“序列到序列”回归

目录 下载数据 准备训练数据 定义网络架构 训练网络 测试网络 此示例说明如何使用深度学习预测发动机的剩余使用寿命 (RUL)。 要训练深度神经网络以根据时间序列数据或序列数据预测数值,可以使用长短期记忆 (LSTM) 网络。 此示例使用 [1] 中所述的涡轮风扇发动机…

23种设计模式之抽象工厂模式

目录 什么是抽象工厂模式 基本结构 基本实现步骤 实现代码&#xff08;有注释&#xff09; 应用场景 简单工厂、工厂方法、抽象工厂的区别 什么是抽象工厂模式 抽象工厂模式也是一种创建型设计模式&#xff0c;提供了一系列相关或相互依赖对象的接口&#xff0c;而无需…

GO 的 Web 开发系列(五)—— 使用 Swagger 生成一份好看的接口文档

经过前面的文章&#xff0c;已经完成了 Web 系统基础功能的搭建&#xff0c;也实现了 API 接口、HTML 模板渲染等功能。接下来要做的就是使用 Swagger 工具&#xff0c;为这些 Api 接口生成一份好看的接口文档。 一、写注释 注释是 Swagger 的灵魂&#xff0c;Swagger 是通过…

SQLyog安装配置(注册码)连接MySQL

下载资源 博主给你打包好了安装包&#xff0c;在网盘里&#xff0c;只有几Mb&#xff0c;防止你下载到钓鱼软件 快说谢谢博主&#xff08;然后心甘情愿的点个赞~&#x1f60a;&#xff09; SQLyog.zip 安装流程 ①下载好压缩包后并解压 ②打开文件夹&#xff0c;双击安装包 ③…

使用R语言建立回归模型并分割训练集和测试集

通过简单的回归实例&#xff0c;可以说明数据分割为训练集和测试集的必要性。以下先建立示例数据: set.seed(123) #设置随机种子 x <- rnorm(100, 2, 1) # 生成100个正态分布的随机数&#xff0c;均值为2&#xff0c;标准差为1 y exp(x) rnorm(5, 0, 2) # 生成一个新的变…

Redis中缓存问题

缓存预热 Redis缓存预热是一项关键任务&#xff0c;可帮助提升应用程序的性能和响应速度。在高流量的应用程序中&#xff0c;Redis缓存预热可以加速数据查询和读取&#xff0c;从而改善用户体验。本文将介绍一种快速、稳定的Redis缓存预热方案&#xff0c;并提供相应代码实现。…

Go+:一种简单而强大的编程语言

Go是一种简单而强大的编程语言&#xff0c;它是在Go语言之上构建的&#xff0c;旨在提供更加强大、灵活和易于使用的编程体验。Go与Go语言共享大部分语法和语义&#xff0c;因此Go开发人员可以很快上手Go&#xff0c;同时也可以使用Go来编写更加简洁和高效的代码。在本文中&…

设计模式-策略模式 Strategy

策略模式 1) 原理和实现1、策略的定义2、策略的创建3、策略的使用 该模式最常见的应用场景是&#xff0c;利用它来避免冗长的 if-else 或 switch 分支判断。不过&#xff0c;它的作用还不止如此。它也可以像模板模式那样&#xff0c;提供框架的扩展点等等。 1) 原理和实现 策…