rabbit MQ的延迟队列处理模型示例(基于SpringBoot)

news/2024/6/23 19:56:46

在这里插入图片描述

说明:
生产者P 往交换机X(type=direct)会发送两种消息:一、routingKey=XA的消息(消息存活周期10s),被队列QA队列绑定入列;一、routingKey=XB的消息(消息存活周期40s),被队列Q B队列绑定入列。QA、QB两个队列消息在失活(变成死信消息)以routingKey=YD发送到交换机Y(type=direct)。队列QD用routingKey绑定交换机Y消息入列。消费者监听处理QD的消息。
这个设计模型达到了消息从生产者到消费者延迟10s、40s不等的延迟队列处理。

这里用SpringBoot maven:<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在封装工具类中 其中【交换机】【队列】【绑定器】 可直接使用工具类,这里对案例图所用到组件器声明注解出来。
在这里插入图片描述

框内的组件和关系 可以在SpringBoot配置类中做出如下的组件声明与关系绑定:

package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL延迟队列配置文件类**/
@Configuration
public class TtlQueueConfig {////普通交换机的名称 Xpublic static final String X_EXCHANGE = "X";//死信交换机名称 Ypublic static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列QA QBpublic static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列名称QDpublic static final String DEAD_LETTER_QUEUE = "QD";////声明X_EXCHANGE@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明死信交换Y_DEAD_LETTER_EXCHANGE@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明队列 QA@Bean("queueA")public Queue queueA(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明队列 QB@Bean("queueB")public Queue queueB(){Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKey (死信后充当了消费者的发送路由)arguments.put("x-dead-letter-routing-key","YD");//消息过期时间arguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//声明死信队列QD@Bean("queueD")public Queue queueD(){return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//捆绑//绑定队列QA与交换机X_EXCHANGE@Beanpublic Binding queueABingXExchange(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列QB与交换机X_EXCHANGE@Beanpublic Binding queueBBingXExchange(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列QD与交换机Y_Exchange@Beanpublic Binding queueDBingYExchange(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}
生产者与交换机X:这里方便测试 我们把生产者放在一个Controller逻辑里
package com.esint.controller;//发送延迟消息import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMesController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/senMsg/{message}")public void sendMes(@PathVariable String message){log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}
}
消费者与死信队列创建一个监听者示例:
package com.esint.consumer;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 队列TTL消费者*/@Slf4j
@Component
public class DeadLetterQueueConsumer {//接受消息@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到私信队列的消息:{}",new Date().toString(),msg);}
}

rabbitmq的配置文件:

spring:rabbitmq:host: *.*.*.*port: 5672username: guestpassword: guest
接下来可以启动SpringBoot: 启动后,配置方法类会把交换机/队列/绑定器初始化配置

队列:
在这里插入图片描述

交换机:
在这里插入图片描述
点开详细后,也能考到他们之间的绑定关系:

在这里插入图片描述

在这里插入图片描述

消息发布测试:

生产者发送消息:

浏览器:
http://127.0.0.1:19092/ttl/senMsg/nice

通过生产者发送:nice

当前时间:Tue Nov 21 14:50:05 CST 2023,发送一条消息给两个TTL队列:nice

消费者在10s后和40秒分别收到了消息:
在这里插入图片描述


https://www.xjx100.cn/news/3091635.html

相关文章

Unity - 实现模型动态伸长缩短,贴图不变形(材质球参数篇)

思路为修改模型材质球的Tiling参数&#xff0c;根据与自身localScale的值得到合适的比例&#xff0c;再修改Tiling值 var mat target.transform.GetComponent<Renderer>().material; var oriValue mat.mainTextureScale;//沿着Y轴伸缩 oriValue.y 1 * target.transfo…

centos更换yum源

1. 安装wget yum install -y wget 2. 备份配置文件 mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.bak 3. 下载华为yum源 wget -O /etc/yum.repos.d/CentOS-Base.repo https://repo.huaweicloud.com/repository/conf/CentOS-7-reg.repo 4. 清…

Oracle-分析函数(累计求和,排序等)

在Oracle中分析函数又称为开窗函数 分为以下两类&#xff1a; 第一类&#xff1a;是聚合分析函数&#xff0c;主要包含&#xff08;sum&#xff0c;count&#xff0c;AVG、MAX、MIN等&#xff09;&#xff0c;主要是对内部分组的数值按照要求内部聚合处理&#xff1b; 第二类&a…

【python学习】基础篇-常用函数-sorted() 对可迭代对象进行排序

sorted()函数是Python中的内置函数&#xff0c;用于对可迭代对象进行排序操作。 它会返回一个新的已排序的列表&#xff0c;而不会修改原始的可迭代对象。 sorted()函数的基本语法如下&#xff1a; sorted(iterable, keyNone, reverseFalse)参数说明&#xff1a; iterable:表…

python盘符及路径获取

python盘符及路径获取&#xff1a; 参考&#xff1a; https://pythonjishu.com/rpszkrzoctrpkgq/ https://blog.51cto.com/u_16213346/7272017 https://blog.csdn.net/Java_ZZZZZ/article/details/130846038 http://www.mobiletrain.org/about/BBS/142777.html https://pythonj…

美国 地区 苹果 Apple ID获取

1、先进入苹果Apple ID官网&#xff0c;然后选择“创建你的Apple ID”&#xff01;&#xff0c;注册一个新的Apple ID。 2、Apple ID使用qq.com多数会报 “目前无法完成你的请求” &#xff0c;得登录&#xff0c;注册一个网易邮箱&#xff0c;或者hao123邮箱大全&#xff0c;选…

ADB安装及使用介绍

一、ADB简介 1、什么是adb ADB 全称为 Android Debug Bridge&#xff0c;起到调试桥的作用&#xff0c;是一个客户端-服务器端程序。其中客户端是用来操作的电脑&#xff0c;服务端是 Android 设备。 ADB 也是 Android SDK 中的一个工具&#xff0c;可以直接操作管理 Android …

A股风格因子看板 (2023.11 第11期)

该因子看板跟踪A股风格因子&#xff0c;该因子主要解释沪深两市的市场收益、刻画市场风格趋势的系列风格因子&#xff0c;用以分析市场风格切换、组合风格暴露等。 今日为该因子跟踪第11期&#xff0c;指数组合数据截止日2023-10-31&#xff0c;要点如下 近1年A股风格因子收益走…