Consumer的负载均衡

news/2023/11/30 9:21:59

        想要提高Consumer的处理速度,可以启动多个Consumer并发处理,这个时候就涉及如何在多个Consumer之间负载均衡的问题,接下来结合源码分析Consumer的负载均衡实现。

要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer,知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。

1.DefaultMQPushConsumer的负载均衡

DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加入新的DefaultMQPush-Consumer时,各个Consumer都会被触发doRebalance动作。

如图7-2所示,具体的负载均衡算法有五种,默认用的是第一种AllocateMessageQueueAveragely。负载均衡的结果与Topic的Message Queue数量,以及ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有Message Queue分配到不同的Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系影响负载均衡结果。

图7-2 RocketMQ客户端负载均衡策略

以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。

2.DefaultMQPullConsumer的负载均衡

Pull Consumer可以看到所有的Message Queue,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。

DefaultMQPullConsumer有两个辅助方法可以帮助实现负载均衡,一个是registerMessageQueueListener函数,如代码清单7-5所示。

代码清单7-5 registerMessageQueueListener

Consumer.registerMessageQueueListener("TOPICNAME", new MessageQueue-Listener() {
public void MessageQueueChanged(String Topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) }

 

registerMessageQueueListener函数在有新的Consumer加入或退出时被触发。另一个辅助工具是MQPullConsumerScheduleService类,使用这个Class类似使用DefaultMQPushConsumer,但是它把Pull消息的主动性留给了使用者,如代码清单7-6所示。

代码清单7-6 使用MQPullConsumerScheduleService示例

public class PullConsumerServiceTest {
    public static void main(String[] args) throws MQClientException {
        final MQPullConsumerScheduleService scheduleService = new MQPull-ConsumerScheduleService("PullConsumerService1");
        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("localh-ost:9876");
        scheduleService.setMessageModel(MessageModel.CLUSTERING );
        scheduleService.registerPullTaskCallback("testPullConsumer", new PullTaskCallback() {
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer Consumer = context.getPullConsumer();
                try {
                    long Offset = Consumer.fetchConsumeOffset(mq, false);
                    if (Offset < 0)
                        Offset = 0;
                    PullResult pullResult = Consumer.pull(mq, "*", Offset, 32);
                    System.out.printf("%s%n", Offset + "\t" + mq + "\t" + pullResult);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    Consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    context.setPullNextDelayTimeMillis(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        scheduleService.start();
    }
}

 

然后我们看一看在MQPullConsumerScheduleService类的实现里,实现负载均衡的代码,如代码清单7-7所示。

代码清单7-7 MQPullConsumerScheduleService的负载均衡实现

class MessageQueueListenerImpl implements MessageQueueListener {
    @Override
    public void MessageQueueChanged(String Topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
        MessageModel MessageModel =
            MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
        switch (MessageModel) {
            case BROADCASTING:
                MQPullConsumerScheduleService.this.putTask(Topic, mqAll);
                break;
            case CLUSTERING :
                MQPullConsumerScheduleService.this.putTask(Topic, mqDivided);
                break;
            default:
                break;
        }
    }
}

 

从源码中可以看出,用户通过更改MessageQueueListenerImpl的实现来做自己的负载均衡策略。


https://www.xjx100.cn/news/3091667.html

相关文章

安全+Linux!IBM新一代大型机Z14全新发布

导读本周&#xff0c;以“架构 人机同行”为主题的IBM Systems创行者高峰论坛在北京召开&#xff0c;IBM全球及大中华区硬件系统部负责人&#xff0c;金融、医疗、制造等领域的企业、合作伙伴共与这一年度盛会&#xff0c;探讨认知时代下的基础架构技术趋势及IBM硬件系统业务的…

KT142C语音芯片客户反馈电脑端的配置文件,打开都正常,但是拷贝到KT142C内部就乱码

KT142C语音芯片客户反馈电脑端的配置文件&#xff0c;打开都正常&#xff0c;但是拷贝到KT142C内部就乱码 首先解释一下原理&#xff0c;KT142C内置的330Kbyte空间可供用户下载&#xff0c;实际上拿出程序部分的空间 作为声音存储介质的&#xff0c;也就是说&#xff0c;代码空…

VAD监测(一)

麦克风的采样率是16000&#xff0c;代表一秒钟采集16000个数据点 我们每次拿1024个采样点作为一个buffer&#xff0c;buffer是一个b类型&#xff0c;也就是字节类型。 这一个buffer的长度不一定是1024&#xff0c;取决于每个采样点的采样点的位深度&#xff0c;如果音频数据是…

如果文件已经存在与git本地库中,配置gitignore能否将其从git库中删除

想把项目的前后台代码放到同一个git仓库管理&#xff0c;由于未设置.gitignore&#xff0c;就使用vscode做stage操作&#xff08;相当于git add . 命令 其中【.】点表示全部文件&#xff09;&#xff0c;观察将要入库的文件发现&#xff0c;node_modules、target、.idea、log等…

MatrixOne完成与麒麟信安、欧拉的兼容互认

近日&#xff0c;超融合异构云原生数据库MatrixOne企业版软件V1.0完成了与欧拉开源操作系统&#xff08;openEuler简称“欧拉”&#xff09;、麒麟信安操作系统系列产品和虚拟化平台的相互兼容认证&#xff0c;通过了欧拉兼容性测评&#xff0c;获得了《openEuler技术测评证书》…

基于MS16F3211芯片的触摸控制灯的状态变化和亮度控制(11.20)

1.判断长按短按 u8 Mode_state_flag 0; u32 buttonPressTime 0; u8 longPressflag 0; u8 shortPressflag 0; // 普通认证 执行此处函数 void T_Key0_Func(void) {if (TKey_Signal.oneBit.b0 1){buttonPressTime;}if ((TKey_Signal.oneBit.b0 1) && (Pre_TKey_Re…

B站短视频如何去水印?一键解析下载B站视频!

在浏览B站视频时&#xff0c;我们有时会遇到带有水印的场景。这些水印可能会干扰我们对视频内容的观看体验&#xff0c;特别是在全屏观看时。此外&#xff0c;当我们想要保存或分享这些视频时&#xff0c;水印也会成为一种障碍。因此&#xff0c;去除水印的需求就变得非常迫切。…

计算机类编程课学生编写的代码应该如何管理

管理学生编写的代码是一个重要的任务&#xff0c;以下是几种常见的方式来有效地管理学生的代码&#xff1a; 版本控制系统&#xff1a;使用版本控制系统&#xff08;如Git&#xff09;来管理学生的代码。每个学生都可以在自己的分支上进行开发&#xff0c;并通过提交请求&#…