互联网技术 / 互联网资讯 · 2024年4月6日 0

RocketMQ生产者的用法有哪些?

前言

消息队列RocketMQ版是阿里云基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。

看过我之前几篇文章的应该都大概队消息队列有个概念了,都明白了,那这个消息从何而来呢?

所谓黄河之水天上来,大自然间每一个事物都不是平白无故来的吧?怎么来的?它母亲生产的;香奈儿怎么来的?机器加原料生产的;就连平时吃的大米,也是有出处的;咱们是怎么来的,咱们当然是伟大的母亲生产下来的了

顺便感谢一下伟大的母亲,周日记得给她打个电话哦

下面进入主题,这是分割线

消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。下面列举了一些特点

消息查询:消息队列RocketMQ版提供了三种消息查询的方式,分别是按MeSSage ID、MeSSage Key以及topic查询 查询消息轨迹:通过消息轨迹,能清晰定位消息从生产者发出,经由消息队列RocketMQ版服务端,投递给消息消费者的完整链路,方便定位排查问题 集群消费和广播消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被消费者集群内的任意一个消费者处理即可;当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给消费者集群内所有注册过的消费者,保证消息至少被每台机器消费一次 重置消费位点:根据时间或位点重置消费进度,允许用户进行消息回溯或者丢弃堆积消息 死信队列:将无法正常消费的消息储存到特殊的死信队列供后续处理 全球信息路由:用于全球不同地域之间的消息同步,保证地域之间的数据一致性

客户端,其实很容易理解了,我们可以把RocketMQ理解成一个消息服务,既然是一个服务,我们就需要调用这个服务,那么调用这个服务的时候,这个消息从哪里来,这个就是要根据业务场景来定了,所以啊,消息的生产者ProdUCeR属于一个客户端;消息产生了,总不能一直放着吧,总要有人处理掉这些消息吧,这也是业务决定的,所以消息的消费者consuMeR也是属于客户端。

下面啊,大鱼就带着大家一起来看看这客户端的用处

生产者ProdUCeR

生产者ProdUCeR,顾名思义,就是负责生产消息的,此时大家应该脑子有很多问号才对,比如ProdUCeR发消息发到哪里了,流程是怎么样的,发的消息都是什么类型的等等这些,这些问题搞懂了的话,ProdUCeR这个客户端基本就搞定了

鱼鱼教大家一个小技巧,学习一个东西,先搞懂大体流程,再拆分而细攻之,最后再统筹理解,这样效果会很好,独家秘方

接下来我从消息是如何发送的(负载均衡、容错机制)、消息发给谁和存储到哪里、消息的类型三方面来介绍ProdUCeR

1、消息是如何发送的?

首先,消息总不能产生了哪里也不去吧,那产生这个消息就没有任何意义了,所以这个消息总要发送到一个地方去,接力传递,看下面这个图

RocketMQ生产者的用法有哪些?

ProdUCeR会首先从本地缓存中获取到指定的topic,如果找到就直接根据这个topic发送产生的消息,缓存大家都明白啊,就是为了优化速度,减少网络传输。

没有的话,就要去NaMeSeRveR获取最新的topic列表(这个是BRokeR启动的时候注册到NaMeSeRveR上的),通过一定的策略选择一个MeSSageQueue队列,获取这个Mq所在的BRokeR地址,也是先从本地缓存中获取,如果获取不到则请求NaMeSeRveR获取(NaMeSeRveR中也同样注册了BRokeR地址和topic的映射关系),进行发送消息

发送失败的话,会有重试机制,默认是重试三次

其实保存这么多,既能减少和NaMeSeRveR之间的网络传输,又能减小NaMeSeRveR的压力,NaMeSeRveR本身就是属于轻量级的设计,这样也有利于减轻NaMeSeRveR的压力,NaMeSeRveR我也会单独写一篇来介绍

负载均衡

我们知道消息发送的时候会首先选择一个对应的topic,每个topic会对应多个MeSSageQueue,这样就有一个问题,发消息的时候要是做不到雨露均沾,可能就会有的队列多,有的队列少这样的问题,就会造成资源的浪费

RocketMQ采用了朴素的方式,没错,就是轮询,高端的食材往往只需要最朴素的烹饪方式~

生产者通过轮询某个 topic 下的所有 MeSSageQueue 的方式来实现发送方的负载均衡,简单来说就是人人都有份,如下图:

RocketMQ生产者的用法有哪些?

通过这种方式,可以将一个 topic 的消息分散到多个 MeSSageQueue 上,进而分散到多个 BRokeR 上。

发送消息的容错机制:

ProdUCeR 作为发送消息的一方,有3种容错机制:

本地缓存:把从 NaMeSeveR 获取的信息缓存到本地,以防 NaMeSeveR 宕机 不可用BRokeR集合:ProdUCeR有一个 BRokeR 的容错机制,开关sendLatencyFAultEnable可以开启,RocketMq内部会维护一个故障BRokeR的HasHMap,把一定延迟级别的BRokeR放入这个Map,下次选择BRokeR的时候,就会规避不可用的BRokeR。 重试:ProdUCeR发送消息时,有一个重试机制,默认重试3次。死信队列 ConsuMeR消费重试超过指定次数,进入死信队列

通过这种方式,可以将一个 topic 的消息分散到多个 MeSSageQueue 上,进而分散到多个 BRokeR 上。

2、消息发给谁和存储在哪里?

ProdUCeR连接NaMeSeveR

ProdUCeR 通过 NaMeSeveR 获取指定 topic 的 BRokeR 路由信息,并在本地保存一份缓存数据,比如一个topic有哪些 MeSSageQueue,MeSSageQueue 在哪几台 BRokeR 上,BRokeR 的IP.poRt等等。ProdUCeR 发送消息只发到 MasteR BRokeR上,Slave 通过主从同步获取数据。

那么 ProdUCe 是怎么连接NaMeSeveR 的呢

连接:单个生产者者和一台 NaMeseRveR 保持长连接,定时查询topic配置信息,如果该naMeseRveR挂掉,生产者会自动连接下一个naMeseRveR,直到有可用连接为止,并能自动重连。 轮询时间:默认情况下,生产者每隔30秒从naMeseRveR获取所有topic的最新队列情况,这意味着某个bRokeR如果宕机,生产者最多要30秒才能感知,在此期间,发往该bRokeR的消息发送失败。该时间由DeFAultMQProdUCeR的pollNaMeSeRveRInteval参数决定,可手动配置。 心跳:与naMeseRveR没有心跳

ProdUCeR连接BRokeR

连接:生产者 跟 topic 涉及的所有BRokeR 保持长连接。 心跳:默认情况下,生产者每隔30秒向所有bRokeR发送心跳。bRokeR每隔10秒钟(此时间无法更改),扫描所有还存活的连接,若某个连接2分钟内(当前时间与最后更新时间差值超过2分钟,此时间无法更改)没有发送心跳数据,则关闭连接

PRodUCeR连接上BRokeR之后,消息会通过轮询的方式发送到BRokeR上,并且存储在BRokeR中的CoMMITLog中,这里面存储的是原始消息,还有一个ConsuMeQueue用于存储投递到某一个queue的消息的位置信息。当然,消息队列会持久化到磁盘中的,不影响内存,当然也会定期清理消息。

那消费完的消息去了哪里呢?什么时候清理物理消息文件呢?还有这样设计的好处呢?

这些我们都留在下下一篇中,也就是BRokeR篇,让你透彻了解BRokeR这个大脑是如何助力RocketMQ支持这么高的吞吐量的

总之啊,这个问题值得大家深入研究一下,如果再面试的时候,你不仅能说出RocketMQ的用处,你还能说出它的存储原理和寻址原理,那面试官就爱上你了。此时你再拿出王炸,就是解决各种实际问题的能力,比如如何处理重复消息啊、如何保证消息的顺序性啊、在分布式系统中如何保证分布式事务啊

面试官当场给你发oFFeR,say:How MUCh Money do you expect to woRk foR US ?

3、消息的种类

RocketMQ种的消息种类大致可以分为四种:普通消息、定时和延时消息、顺序消息、事务消息四种类型,这是重点!

简单介绍下四种类型

普通消息:消息队列RocketMQ版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。 定时和延时消息:允许消息生产者对指定消息进行定时(延时)投递,最长支持40天。 顺序消息:允许消息消费者按照消息发送的顺序对消息进行消费。 事务消息:实现类似X或Open XA的分布事务功能,以达到事务最终一致性状态。

消息队列RocketMQ提供的四种消息类型所对应的topic不能混用,例如,创建的普通消息的topic只能用于收发普通消息,不能用于收发其他类型的消息;同理,事务消息的topic也只能收发事务消息,不能用于收发其他类型的消息,以此类推

普通消息

普通消息:消息队列RocketMQ中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息

普通消息以三种发送方式:同步Sync发送、异步Async发送和单向Oneway发送

同步发送

同步,消息发送方发出一条消息后,会在收到服务端返回响应之后才发下一条消息的通讯方式

RocketMQ生产者的用法有哪些?

异步发送

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式

消息队列RocketMQ版的异步发送,需要实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果

一般用于对时间较敏感的业务场景

RocketMQ生产者的用法有哪些?

单向发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别

应用于对可靠性要求并不高的场景,比如日志收集

RocketMQ生产者的用法有哪些?

定时和延时消息