第四章-生产者-顺序延迟批量消息发送

news/2024/4/26 22:12:22

4.1 顺序消息

先引用一段官网对顺序消息的介绍

顺序消息是一种对消息发送和消费顺序有严格要求的消息。

对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。在 Apache RocketMQ 中支持分区顺序消息,如下图所示。我们可以按照某一个标准对消息进行分区(比如图中的ShardingKey),同一个ShardingKey的消息会被分配到同一个队列中,并按照顺序被消费。

需要注意的是 RocketMQ 消息的顺序性分为两部分,生产顺序性和消费顺序性。只有同时满足了生产顺序性和消费顺序性才能达到上述的FIFO效果。

生产顺序性: RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至服务端后,会保证设置了同一分区键的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

在这里插入图片描述

顺序消息的应用场景也非常广泛,在有序事件处理、撮合交易、数据实时增量同步等场景下,异构系统间需要维持强一致的状态同步,上游的事件变更需要按照顺序传递到下游进行处理。

例如创建订单的场景,需要保证同一个订单的生成、付款和发货,这三个操作被顺序执行。如果是普通消息,订单A的消息可能会被轮询发送到不同的队列中,不同队列的消息将无法保持顺序,而顺序消息发送时将ShardingKey相同(同一订单号)的消息序路由到一个逻辑队列中。
在这里插入图片描述

再从一段示例代码开始:

public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}
}

这里的区别主要是调用了SendResult send(Message msg, MessageQueueSelector selector, Object arg)方法,MessageQueueSelector 是队列选择器,arg 是一个 Java Object 对象,可以传入作为消息发送分区的分类标准。

提示

MessageQueueSelector的接口如下:

public interface MessageQueueSelector {MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

其中 mqs 是可以发送的队列,msg是消息,arg是上述send接口中传入的Object对象,返回的是该消息需要发送到的队列。上述例子里,是以orderId作为分区分类标准,对所有队列个数取余,来对将相同orderId的消息发送到同一个队列中。

生产环境中建议选择最细粒度的分区键进行拆分,例如,将订单ID、用户ID作为分区键关键字,可实现同一终端用户的消息按照顺序处理,不同用户的消息无需保证顺序。

调用链:

DefaultMQProducer.send(Message msg, MessageQueueSelector selector, Object arg)

->DefaultMQProducerImpl.send(Message msg, MessageQueueSelector selector, Object arg)

->DefaultMQProducerImpl.sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
)

->DefaultMQProducerImpl.sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)

sendSelectImpl

private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);// 本地缓存中没有,尝试从 name server 远程拉取TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) { // 判断状态MessageQueue mq = null;try {// 拿到该topic发布的所有消息队列List<MessageQueue> messageQueueList =mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());// 这里要克隆 msg,原因是 msg要传入到用户定义的select方法中,防止被篡改Message userMessage = MessageAccessor.cloneMessage(msg);// 去除命名空间,回归topicString userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());// 设置成无命名空间的topicuserMessage.setTopic(userTopic);// 调用 selector.select,根据用户定义的规则选择合适的 mq,然后再恢复topic为带命名空间的mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException("select message queue throwed exception.", e);}long costTime = System.currentTimeMillis() - beginStartTime;if (timeout < costTime) {throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");}if (mq != null) {// 选择好mq后,开始发送消息,具体详情看章节`2.1.2.2`return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else { // 没有选择到 mq,就抛出异常throw new MQClientException("select message queue return null.", null);}}throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}

顺序消息的一致性

如果一个Broker掉线,那么此时队列总数是否会发化?

如果发生变化,那么同一个 ShardingKey 的消息就会发送到不同的队列上,造成乱序。如果不发生变化,那消息将会发送到掉线Broker的队列上,必然是失败的。因此 Apache RocketMQ 提供了两种模式,如果要保证严格顺序而不是可用性,创建 Topic 是要指定 -o 参数(–order)为true,表示顺序消息:

$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]

其次要保证NameServer中的配置 orderMessageEnablereturnOrderTopicConfigToBroker 必须是 true。如果上述任意一个条件不满足,则是保证可用性而不是严格顺序。关于这块内容的源码解析,涉及到name server模块,具体讲到时再解释。

4.2 延迟消息

先来引用官方的一段描述

延时消息介绍

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的延时事件触发。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

在这里插入图片描述

延时消息约束

Apache RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level)延迟时间投递等级(delay level)延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

再看示例 代码

public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");// Launch producerproducer.start();int totalMessagesToSend = 100;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());// This message will be delivered to consumer 10 seconds later.message.setDelayTimeLevel(3);// 发送消息,详情看章节`2.1.2.2`producer.send(message);}// Shutdown producer after use.producer.shutdown();}}

提示

这里最重要的是message中设置延迟等级,例子中设置的等级是3,也就是发送者发送后,10s后消费者才能收到消息。

延时消息的实现逻辑需要先经过定时存储等待触发,延时时间到达后才会被投递给消费者。因此,如果将大量延时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。

所谓延迟消息,到底是对谁延迟了,实际上是对消费者的延迟,也就是延迟投喂到消费者,这就需要在讲Broker和消费者时才会涉及,对生产者来说,与发送普通消息没什么区别,这一点,从示例中也可以看出。

另外,批量消息的发送,直接参考章节2.12.2,除了把消息封装成批量的,其他没有不同,所以就不赘述了。至于事务消息,生产中实际使用情况极少,也不讲了,有兴趣的读者自行研究吧,那么至此,生产者相关的知识点就讲完了,还是比较简单的,接下来要讲的是Broker,这个是重点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.cpky.cn/p/11117.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

35.基于SpringBoot + Vue实现的前后端分离-在线考试系统(项目 + 论文)

项目介绍 本站是一个B/S模式系统&#xff0c;采用SpringBoot Vue框架&#xff0c;MYSQL数据库设计开发&#xff0c;充分保证系统的稳定性。系统具有界面清晰、操作简单&#xff0c;功能齐全的特点&#xff0c;使得基于SpringBoot Vue技术的在线考试系统设计与实现管理工作系统…

【项目管理——时间管理】【自用笔记】

1 项目时间管理&#xff08;进度管理&#xff09;概述 过程&#xff1a;&#xff08;2—6&#xff09;为规划过程组&#xff0c;7为监控过程组 题目定义&#xff1a;项目时间管理又称为进度管理&#xff0c;是指确保项目按时完成所需的过程。目标&#xff1a;时间管理的主要目标…

InstructGPT的流程介绍

1. Step1&#xff1a;SFT&#xff0c;Supervised Fine-Tuning&#xff0c;有监督微调。顾名思义&#xff0c;它是在有监督&#xff08;有标注&#xff09;数据上微调训练得到的。这里的监督数据其实就是输入Prompt&#xff0c;输出相应的回复&#xff0c;只不过这里的回复是人工…

PCB板在线自动激光打标机:高效、精准的电路板标识利器

PCB板在线自动激光打标机是一种高度自动化的设备&#xff0c;专为PCB&#xff08;印刷电路板&#xff09;板的在线镭雕需求而设计。这种设备结合了激光技术和自动化控制&#xff0c;使得在PCB板上进行高精度、高效率的镭雕成为可能。 ​ PCB板在线自动激光打标机主要由控制系统…

Mora: Enabling Generalist Video Generation via A Multi-Agent Framework

目录 论文地址&#xff1a;Mora: Enabling Generalist Video Generation viaA Multi-Agent Framework github地址&#xff1a;https://github.com/lichao-sun/Mora 一、摘要 &#xff08;1&#xff09;Mora 的主要特点&#xff1a; &#xff08;2&#xff09;Mora的应用场景…

代码随想录算法训练营三刷day35 |贪心 之 860.柠檬水找零 406.根据身高重建队列 452. 用最少数量的箭引爆气球

三刷day35 860.柠檬水找零406.根据身高重建队列452. 用最少数量的箭引爆气球 860.柠檬水找零 题目链接 解题思路&#xff1a; 局部最优&#xff1a;遇到账单20&#xff0c;优先消耗美元10&#xff0c;完成本次找零。全局最优&#xff1a;完成全部账单的找零。 代码如下&#x…