# 消息队列
特指分布式消息队列
# 消息队列基础知识
# 消息队列的定义
可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也按照顺序来消费。
消息队列是一种 Pub/Sub 模型(发布 / 订阅模型),参与消息传递的双方称为生产者和消费者,生产者负责发送消息,消费者负责处理消息。
我们知道操作系统中的进程通信的一种很重要的方式就是消息队列。我们这里提到的消息队列稍微有点区别,更多指的是各个服务以及系统内部各个组件 / 模块之前的通信,属于一种中间件。
简单来说:中间件是一类服务于应用软件的软件,应用软件是为用户服务的,用户不会接触或者使用到中间件。
随着分布式和微服务系统的发展,消息队列在系统设计中有了更大的发挥空间,使用消息队列可以降低系统耦合性、实现任务异步、有效地进行流量削峰,是分布式和微服务系统中重要的组件之一。
# 消息队列的作用
通常来说,使用消息队列能为我们的系统带来下面三点好处:
- 通过异步处理提高系统性能(减少响应所需时间)
- 削峰 / 限流
- 降低系统耦合性
- 实现分布式事务
如果在面试的时候你被面试官问到这个问题的话,一般情况是你在你的简历上涉及到消息队列这方面的内容,这个时候推荐结合自己的项目来回答。
# 作用 1:通过异步处理提高系统性能(减少响应所需时间)
服务端将用户的请求数据存储到消息队列后,立即返回结果。随后,系统再对消息进行消费。
因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
# 作用 2:削峰 / 限流
先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免大量事务直接把后端服务冲垮。
举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:
# 作用 3:降低系统的耦合性
使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。还是直接上图吧:
生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列获取消息进行消费即可,而不需要和其他系统有耦合,这显然也提高了系统的扩展性。
** 消息队列使用 Pub/Sub 模式(发布 / 订阅模式)工作,生产者发布消息,一个或多个消费者订阅消息。** 从上图可以看到生产者和消费者之间没有直接耦合,
- 生产者将消息发送至分布式消息队列即结束对消息的处理
- 消费者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。
对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。
消费者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消费者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。
另外,为了避免 “消息队列服务器” 宕机造成消息丢失,会将成功发送到消息队列的消息存储在 “生产者服务器” 上,等消息真正被 “消费者服务器” 处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
备注:不要认为消息队列只能利用发布 - 订阅模式工作,只不过在解耦这个特定业务环境下是使用发布 - 订阅模式的。除了发布 - 订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布 - 订阅模式。
# 作用 4:实现分布式事务
分布式事务的解决方案之一就是 MQ 事务。
RocketMQ、Kafka、Pulsar、QMQ 都提供了事务相关的功能。事务允许事件流应用将生产、处理、消费消息的整个过程定义为一个原子操作。
详细介绍可以查看 分布式事务详解 (付费) 这篇文章。
# 消息队列存在的问题
系统可用性降低:在加入 MQ 之前,你不用考虑消息丢失、 MQ 挂掉等等的情况,但是引入 MQ 之后你就需要去考虑了!
系统复杂性提高:加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
一致性问题:消息队列可以实现异步,确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
# JMS 和 AMQP
# JMS 定义
JMS(JAVA Message Service)是 Java 的消息服务,其 API 是一个消息服务的标准 / 规范。JMS 的客户端之间可以通过 JMS 服务进行异步的消息传输。它允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS 定义了五种不同消息的正文格式以及调用的消息类型,允许你发送并接收一些不同形式的数据:
StreamMessage
:Java 原始值的数据流MapMessage
:一套名称 - 值对TextMessage
:一个字符串对象ObjectMessage
:一个序列化的 Java 对象BytesMessage
:一个字节的数据流
# JMS 两种消息模型
# 点到点模型(P2P)
使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:生产者发送 100 条消息的话,两个消费者来消费,一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费)。
# 发布 / 订阅模型(Pub/Sub)
使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者。
# AMQP 定义
AMQP,即 Advanced Message Queuing Protocol
,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端 / 中间件产品,不同的开发语言等条件的限制。
RabbitMQ 就是基于 AMQP 协议实现的。
# JMS vs AMQP
JMS | AMQP | |
---|---|---|
定义 | Java API | 协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
支持消息类型 | 提供两种消息模型:①P2P;②Pub/sub | 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和 JMS 的 pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分; |
支持消息类型 | 多种消息类型 | byte [](二进制) |
总结:
- AMQP 为消息定义了线路层的协议,而 JMS 所定义的是 API 规范。在 Java 体系中,多个 client 均可以通过 JMS 进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而 AMQP 天然具有跨平台、跨语言特性。
- JMS 支持
TextMessage
、MapMessage
等复杂的消息类型;而 AMQP 仅支持byte[]
消息类型(复杂的类型可序列化后发送)。 - 由于 Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 P2P 和 Pub/Sub 方式两种。
# RPC 和消息队列的区别
RPC 和消息队列都是分布式微服务系统中重要的组件之一,下面我们来简单对比一下两者:
- 用途:
- RPC:主要用来解决两个服务的远程通信问题,不需要了解底层网络的通信机制。通过 RPC 可以帮助我们远程调用某个服务的方法,这个过程就像调用本地方法一样简单。
- 消息队列:主要用来降低系统耦合性、实现任务异步、流量削峰。
- 通信方式:
- RPC:双向直接网络通讯
- 消息队列:单向引入中间载体的网络通讯
- 架构:
- RPC:不需要存储消息,因为是双向直接网络通讯
- 消息队列:需要把消息存储起来
- 请求处理的时效性:
- 通过 RPC 发出的调用一般会立即被处理
- 存放在消息队列中的消息并不一定会立即被处理
RPC 和消息队列本质上是网络通讯的两种不同的实现机制,两者的用途不同,万不可将两者混为一谈。
# 消息队列选型
# 常见的消息队列
# Kafka
Kafka 官网:http://kafka.apache.org/
Kafka 更新记录(可以直观看到项目是否还在维护):https://kafka.apache.org/downloads
Kafka 是 LinkedIn 开源的一个分布式流式处理平台,已经成为 Apache 顶级项目,早期被用来用于处理海量的日志,后面才慢慢发展成了一款功能全面的高性能消息队列。
流式处理平台具有三个关键功能:
- 消息队列:发布和订阅消息流。
- 持久方式存储消息流:Kafka 会把消息持久化到磁盘,有效地避免了消息丢失的风险。
- 流式处理平台:在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成,可以部署在在本地和云环境中的裸机硬件、虚拟机和容器上。
在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper 做元数据管理和集群的高可用。在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构。
提示一下:如果要使用 KRaft 模式的话,建议选择较高版本的 Kafka,因为这个功能还在持续完善优化中。Kafka 3.3.1 版本是第一个将 KRaft(Kafka Raft)共识协议标记为生产就绪的版本。
# RocketMQ
RocketMQ 官网:https://rocketmq.apache.org/ (文档很详细,推荐阅读)
RocketMQ 更新记录(可以直观看到项目是否还在维护):https://github.com/apache/rocketmq/releases
RocketMQ 是阿里开源的一款云原生 “消息、事件、流” 实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。
RocketMQ 的核心特性(摘自 RocketMQ 官网):
- 云原生:生与云,长与云,无限弹性扩缩,K8s 友好
- 高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景。
- 流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
- 金融级:金融级的稳定性,广泛用于交易核心链路。
- 架构极简:零外部依赖,Shared-nothing 架构。
- 生态友好:无缝对接微服务、实时计算、数据湖等周边生态。
根据官网介绍:
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
# RabbitMQ
RabbitMQ 官网:https://www.rabbitmq.com/ 。
RabbitMQ 更新记录(可以直观看到项目是否还在维护):https://www.rabbitmq.com/news.html
RabbitMQ 是采用 Erlang 语言实现 AMQP (Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
RabbitMQ 发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。RabbitMQ 的具体特点可以概括为以下几点:
- 可靠性:RabbitMQ 使用一些机制来保证消息的可靠性,如持久化、传输确认、发布确认等。
- 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。这个后面会在我们讲 RabbitMQ 核心概念的时候详细介绍到。
- 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 支持多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
- 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、JavaScript 等。
- 易用的管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。在安装 RabbitMQ 的时候会介绍到,安装好 RabbitMQ 就自带管理界面。
- 插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。感觉这个有点类似 Dubbo 的 SPI 机制。
# ActiveMQ
目前已经被淘汰,不推荐使用,不建议学习。
# 对比
参考《Java 工程师面试突击第 1 季 - 中华石杉老师》
对比方向 | 概要 |
---|---|
吞吐量 | 万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。 |
可用性 | 都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 Kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
时效性 | RabbitMQ 基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级,其他几个都是 ms 级。 |
功能支持 | Pulsar 的功能更全面,支持多租户、多种消费模式和持久性模式等功能,是下一代云原生分布式消息流平台。 |
消息丢失 | ActiveMQ 和 RabbitMQ 丢失的可能性非常低, Kafka、RocketMQ 和 Pulsar 理论上可以做到 0 丢失。 |
总结:
- ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用,已经被淘汰了。
- RabbitMQ 在吞吐量方面虽然稍逊于 Kafka、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这几种消息队列中,RabbitMQ 或许是你的首选。
- RocketMQ 和 Pulsar 支持强一致性,对消息一致性要求比较高的场景可以使用。
- RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。
- Kafka 仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
# Kafka
# 基础
# 功能、应用场景
Kafka 是一个分布式流式处理平台,流平台具有三个关键功能:
- 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
- 容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
- **流式处理平台:** 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
Kafka 主要有两大应用场景:
- 消息队列:建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
- **数据处理:** 构建实时的流数据处理程序来转换或处理数据流。
# 相比其他 MQ 的优势
Kafka 相比其他消息队列主要的优势如下:
- 极致的性能:基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高每秒可以处理千万级别的消息。
- 生态系统兼容性无可匹敌:Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。
# 消息模型
题外话:早期的
JMS
和AMQP
属于消息服务领域权威组织所做的相关的标准,但是这些标准的进化跟不上消息队列的演进速度,这些标准实际上已经属于废弃状态。所以,可能存在的情况是:不同的消息队列都有自己的一套消息模型。
# 队列模型(P2P)
早期的消息模型
使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费,一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
队列模型存在的问题:无法将生产者产生的消息分发给多个消费者,并保证每个消费者都能接收到完整的消息内容。这种情况,队列模型就不好解决了。很多比较杠精的人就说:我们可以为每个消费者创建一个单独的队列,让生产者发送多份。这是一种非常愚蠢的做法,浪费资源不说,还违背了使用消息队列的目的。
# 发布 - 订阅模型(Pub-Sub)
Kafka 采用的消息模型,为了解决队列模型存在的问题
发布 - 订阅模型(Pub-Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式。发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
在发布 - 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。所以说,发布 - 订阅模型在功能层面上是可以兼容队列模型的。
RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。
# 核心概念
# Producer、Comsumer、Broker、Topic、Partition
Kafka 将生产者发布的消息发送到 **Topic(主题)** 中,需要这些消息的消费者可以订阅这些 Topic,如下图所示:
上面这张图也为我们引出了,Kafka 比较重要的几个概念:
Producer(生产者): 产生消息的一方。
Consumer(消费者): 消费消息的一方。
Broker(代理): 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster(集群)。每个 Broker 中又包含:
Topic(主题): Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic (主题) 来消费消息。
Partition(分区): 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且 **同一 Topic 下的 Partition 可以分布在不同的 Broker 上,表明一个 Topic 可以横跨多个 Broker**。
# 多副本机制(Replica)
类似于 Redis 中的主从复制(Replica)
Kafka 为分区(Partition)引入了多副本(Replica)机制。Partition 中的多个副本之间会有一个叫做 leader
的家伙,其他副本称为 follower
。发送的消息先被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
生产者和消费者只与 leader 副本交互。可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,前提是该 follower 和 leader 同步程度一致。
Kafka 多副本(Replica)机制的好处如下:
- 负载均衡:Kafka 通过给特定 Topic 指定多个 Partition,而各个 Partition 可以分布在不同的 Broker 上,这样便能提供比较好的并发能力。
- 数据容灾能力:Partition 可以指定对应的 Replica 数,极大地提高了消息存储的安全性与容灾能力,不过也相应地增加了所需要的存储空间。
# Kafka 与 Zookeeper 的关系
# Zookeeper 在 Kafka 中的作用
下图就是我的本地 Zookeeper,它成功和我本地的 Kafka 关联上(以下文件夹结构借助 idea 插件 Zookeeper tool 实现)。
ZooKeeper 主要为 Kafka 提供元数据的管理的功能。
从图中我们可以看出,Zookeeper 主要为 Kafka 做了下面这些事情:
- Broker 注册:在 Zookeeper 上会有一个专门用来记录 Broker 服务器列表的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到
/brokers/ids
下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去 - Topic 注册:在 Kafka 中,同一个 Topic 的消息会被分成多个 Partition,并将其分布在多个 Broker 上。这些 Partition 信息及与 Broker 的对应关系也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的 Topic 并且它有两个 Partition ,对应到 zookeeper 中会创建这些文件夹:
/brokers/topics/my-topic/Partitions/0
、/brokers/topics/my-topic/Partitions/1
- 负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition,而各个 Partition 可以分布在不同的 Broker 上,这样便能提供比较好的并发能力。对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。
- 当 Provider 产生消息时,会尽量投递到不同 Broker 的 Partition 里面。
- 当 Consumer 消费消息时,Zookeeper 会可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
- ……
# Kafka 不再依赖于 Zookeeper
在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper。在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。
不过,要提示一下:如果要使用 KRaft 模式的话,建议选择较高版本的 Kafka,因为这个功能还在持续完善优化中。Kafka 3.3.1 版本是第一个将 KRaft(Kafka Raft)共识协议标记为生产就绪的版本。
![](https://oss.javaguide.cn/github/javaguide/high-performance/message-queue/kafka3.3.1-kraft- production-ready.png)
# 消费顺序、消息丢失、重复消费
# 如何保证消息的消费顺序?
我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:
- 更改用户会员等级。
- 根据会员等级计算订单价格。
假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。
我们知道 Kafka 中 Partition (分区) 是真正保存消息的地方,我们发送的消息都被放在了这里。而 Partition (分区) 又存在于 Topic (主题) 概念中,并且我们可以给特定 Topic 指定多个 Partition。
每次添加消息到 Partition (分区) 的时候都会采用 **尾加法**,如上图所示。 Kafka 只能为我们保证 Partition(分区)中的消息有序,通过 **offset(偏移量)** 实现。
消息在被追加到 Partition (分区) 的时候都会分配一个特定的偏移量(offset)。
所以,我们就有一种很简单的保证消息消费顺序的方法:1 个 Topic 只对应一个 Partition。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。
Kafka 中发送 1 条消息的时候,可以指定 topic、partition、key、data(数据)4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表 / 对象的 id 来作为 key 。
总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
- 1 个 Topic 只对应一个 Partition。
- (推荐)发送消息的时候指定 key/Partition。
当然不仅仅只有上面两种方法,上面两种方法是我觉得比较好理解的。
# 如何保证消息不丢失?
# 情况 1:Producer 丢失消息
Producer 调用 send
方法发送消息之后,消息可能因为网络问题并没有发送过去。所以,我们不能默认在调用 send
方法发送消息之后消息发送成功了。
为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 中 Producer 使用 send
方法发送消息实际上是异步的操作,可以通过 get()
方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:
详细代码见这篇文章:Kafka 系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get(); | |
if (sendResult.getRecordMetadata() != null) { | |
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe | |
sult.getProducerRecord().value().toString()); | |
} |
但是一般不推荐这么做!可以借助 ListenableFuture
为 Producer 的 send
方法添加回调函数的形式,示例代码如下:
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o); | |
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()), | |
ex -> logger.error("生产者发送消息失败,原因:{}", ex.getMessage())); |
如果消息发送失败的话,我们检查失败的原因之后重新发送即可!
这里推荐为 Producer 的
retries
(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你 3 次一下子就重试完了。
# 情况 2:Consumer 丢失消息
我们知道消息在被追加到 Partition (分区) 的时候都会分配一个特定的 offset(偏移量)。offset 表示 Consumer 当前消费到的 Partition (分区) 的所在位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
当 Consumer 拉取到了 Partition 的某个消息之后,Consumer 会自动提交了 offset 。自动提交的话会有一个问题,试想一下,当 Consumer 刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
解决办法也比较粗暴,我们 ** 关闭自动提交 offset,每次在真正消费完消息之后再手动提交 offset **。 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
# 情况 3:Kafka 丢失消息
我们知道 Kafka 为 Partition 引入了多副本(Replica)机制。Partition 中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。Producer 和 Comsumer 只与 leader 副本交互。可以理解为其他 follower 副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种情况:假如 leader 副本所在的 broker 突然挂掉,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。
设置
acks = all
:acks 是 Kafka Producer 很重要的一个参数,其默认值为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。配置acks = all
表示只有所有 ISR 列表的副本全部收到消息时,Producer 才会接收到来自服务器的响应。这种模式是最高级别、最安全的,可以确保不止一个 Broker 接收到了消息,但副作用是延迟会很高。Partition 中的所有副本统称为
AR
(Assigned Repllicas)。所有与 leader 副本保持一定程度同步的副本(包括 Leader 本身)组成ISR
(In-Sync Replicas),ISR 集合是 AR 集合中的一个子集。设置
replication.factor >= 3
:为了保证 leader 副本能有 follower 副本能同步消息,可以 **确保每个 Partition 至少有 3 个副本**。虽然造成了数据冗余,但是带来了数据的安全性。设置
min.insync.replicas > 1
:代表消息至少要被写入到 2 个副本才算是被成功发送。其默认值为 1 ,在实际生产中应尽量避免。确保
replication.factor > min.insync.replicas
:为了保证整个 Kafka 服务的高可用性。设想一下假如两者相等的话,只要是有一个副本挂掉,整个 Partition 就无法正常工作了。这明显违反高可用性!一般推荐设置成replication.factor = min.insync.replicas + 1
。设置
unclean.leader.election.enable = false
:Kafka 从 0.11.0.0 版本开始,
unclean.leader.election.enable
参数的默认值改为 false我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步,多个 follower 副本之间的消息同步情况不一样。当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时,就不会从同步程度达不到要求的 follower 副本中选择出 leader,降低了消息丢失的可能性。
# 如何保证消息不被重复消费?
kafka 出现消息重复消费的原因:
根本原因:Consumer 已经消费了消息,但没有成功提交 offset
直接原因:Consumer 由于处理业务时间长或者网络链接等原因,让 Kafka 认为服务假死,触发了 Partition
rebalance
。
解决方案:
- Consumer 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能,这种方法最有效。
- 将
enable.auto.commit
参数设置为 false,关闭 offset 的自动提交,开发者在代码中手动提交 offset。那么这里会有个问题:什么时候提交 offset 合适?- 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
- 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
# 重试机制
在 Kafka 如何保证消息不丢失中提到了 Kafka 的重试机制。
网上关于 Spring Kafka 的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下是根据 spring-kafka-2.9.3 源码重新梳理一下。
# 消费失败后会先重试,多次重试失败后会跳过
Producer 代码:
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(KafkaConst.TEST_TOPIC, String.valueOf(i))
}
Comsumer 代码:
@KafkaListener(topics = {KafkaConst.TEST_TOPIC},groupId = "apple")
private void customer(String message) throws InterruptedException {
log.info("kafka customer:{}",message);
Integer n = Integer.parseInt(message);
if (n%5==0){ // 当n为0和5时抛出异常
throw new RuntimeException();
}
}
在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。
下面是一段消费的日志,可以看出当 test-0@95
重试多次后会被跳过。
2023-08-10 12:03:32.918 DEBUG 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Skipping seek of: test-0@95 | |
2023-08-10 12:03:32.918 TRACE 9700 --- [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Seeking: test-0 to: 96 | |
2023-08-10 12:03:32.918 INFO 9700 --- [ntainer#0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-apple-1, groupId=apple] Seeking to offset 96 for partition test-0 |
# 默认重试 10 次,时间间隔为 0
源码略了
总结:Kafka Comsumer 在默认配置下会进行最多 10 次重试,每次重试的时间间隔为 0,即立即重试。如果在 10 次重试后仍然无法成功消费消息,则不再进行重试,该消息将被视为消费失败。
# 自定义重试次数、时间间隔
从上面的代码可以知道,默认错误处理器的重试次数以及时间间隔是由 FixedBackOff
控制的, FixedBackOff
是 DefaultErrorHandler
初始化时默认的。
所以自定义重试次数以及时间间隔,只需要在 DefaultErrorHandler
初始化的时候传入自定义的 FixedBackOff
即可。
重新实现一个 KafkaListenerContainerFactory
,调用 setCommonErrorHandler
设置新的自定义的错误处理器就可以实现。
@Bean | |
public KafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) { | |
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); | |
// 自定义重试时间间隔以及次数 | |
FixedBackOff fixedBackOff = new FixedBackOff(1000, 5); | |
factory.setCommonErrorHandler(new DefaultErrorHandler(fixedBackOff)); | |
factory.setConsumerFactory(consumerFactory); | |
return factory; | |
} |
# 重试失败后,如何告警?
重试失败后逻辑需要手动实现,以下是一个简单的例子。继承 DefaultErrorHandler
并重写 handleRemaining
函数,加上自定义的告警等操作。
@Slf4j
public class DelErrorHandler extends DefaultErrorHandler {
public DelErrorHandler(FixedBackOff backOff) {
super(null,backOff);
}
@Override
public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
super.handleRemaining(thrownException, records, consumer, container);
log.info("重试多次失败");
// 自定义操作
}
}
DefaultErrorHandler
只是默认的一个错误处理器,Spring Kafka 还提供了 CommonErrorHandler
接口。手动实现 CommonErrorHandler
就可以实现更多的自定义操作,有很高的灵活性。例如根据不同的错误类型,实现不同的重试逻辑以及业务逻辑等。
# 重试最大次数失败后,如何再次处理该消息?
当达到最大重试次数后,消息数据会被直接跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的消息数据呢?
死信队列(Dead Letter Queue,简称 DLQ) 是消息中间件中的一种特殊队列。它主要 **用于处理无法被 Comsumer 正确处理的消息**,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被 "丢弃" 或 "死亡" 的情况。
当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
# 重试注解 @RetryableTopic
这是 Spring Kafka 中的一个注解,用于配置某个 Topic 支持消息重试,更推荐使用这个注解来完成重试。
// 重试 5 次,重试间隔 100 毫秒,最大间隔 1 秒
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 100, maxDelay = 1000)
)
@KafkaListener(topics = {KafkaConst.TEST_TOPIC}, groupId = "apple")
private void customer(String message) {
log.info("kafka customer:{}", message);
Integer n = Integer.parseInt(message);
if (n % 5 == 0) {
throw new RuntimeException();
}
System.out.println(n);
}
当达到最大重试次数后,如果仍然无法成功处理消息,消息会被发送到对应的死信队列中。对于死信队列的处理,既可以用 @DltHandler
处理,也可以使用 @KafkaListener
重新消费。
# 【面试题】kafka 消费组有什么特性?
Kafka 消费组:是 Kafka 提供的可扩展、具有容错性的 Comsumer 机制。同一个消费组的多个 Comsumer 能分布到多个物理机器上加速消费,同时也可以实现负载均衡。
当某个 Comsumer 宕机时,其他 Comsumer 可以继续消费该 Partition 的数据,从而保证了数据的可靠性和持久性。
因此,Kafka 消费组有以下特性:
- 同一个消费组内的所有 Comsumer 共享一个公共 ID。
- 消费组内的每个 Comsumer 都有自己的 offset (偏移量),用于记录已经消费过的消息位置。
- 消费组内的每个 Comsumer 可以处理 Partition 中的所有消息。
# 【面试题】kafka 多 topic 为什么有性能问题?
kafka 多 topic 可能会导致性能问题的原因:
- Partition 数量增加:每个 topic 都会被分成多个 Partition,当有大量的 topic 存在时, Partition 数量也会相应增加。这会导致 Kafka 需要处理更多的 Partition,增加了系统的负载和资源消耗。
- 网络开销增加:Kafka 的多 topic 会导致更多的网络通信开销。当一个 Producer 同时向多个 topic 发送消息时,需要建立多个网络连接,并且需要维护多个网络通信的状态。这增加了网络开销和延迟。
- 资源竞争:多个 topic 同时进行读写操作时,会引起资源的竞争。例如,多个 Comsumer 同时消费不同的 topic,会导致 Comsumer 之间的竞争和争夺资源的情况,从而影响系统的性能。
- 数据分布不均衡:当有大量的 topic 存在时,数据的分布可能会不均衡。某些 topic 可能会有更多的数据量,而某些 topic 可能会有较少的数据量。这会导致某些 broker 负载过重,而其他 broker 负载较轻,从而影响整个系统的性能。
为了解决这些性能问题,可以采取以下措施:
- 合并 topic:如果有多个 topic 的数据量较小,可以考虑将它们合并为一个 topic,减少分区数量和网络开销。
- 增加资源:增加 Kafka 集群的 broker 数量和 C omsumer 数量,以便更好地处理多 topic 的负载。
- 均衡数据分布:通过调整 Partition 分配策略,使得数据在各个 broker 上均衡分布,避免负载不均的情况。
- 使用分区策略:根据业务需求,合理地选择分区策略,避免数据集中在某个 Partition 或某个 broker 上。
- 优化网络通信:通过合理的网络配置和优化,减少网络开销和延迟,提高系统的性能。
# RocketMQ
RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
RocketMQ 源码地址:https://github.com/apache/rocketmq
RocketMQ 官方网站:https://rocketmq.apache.org
文章描述 RocketMQ 相关概念和知识,如无特别声明,均是 Apache RocketMQ 4.x
版本。
🔥SpringBoot Ladder:从零到一学习 SpringBoot 各种组件框架实战的项目,让 Demo 变得简单。咱们文章中的 RocketMQ 示例也在这个项目。
# 使用场景
# 异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法有以下两种:
# 串行方式
串行方式下的注册流程如下图所示。
数据流动如下所述:
- 您在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收到请求后向用户发送邮件通知。
- 邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通知系统收到请求后向用户发送短信通知。
以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
假设每个任务耗时分别为 50ms,则用户需要在注册页面等待总共 150ms 才能登录。
# 并行方式
并行方式下的注册流程如下图所示。
数据流动如下所述:
- 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 注册信息写入注册系统成功后,再同时发送请求至邮件和短信通知系统。邮件和短信通知系统收到请求后分别向用户发送邮件和短信通知。
以上两个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
假设每个任务耗时分别为 50ms,其中,邮件和短信通知并行完成,则用户需要在注册页面等待总共 100ms 才能登录。
# 异步解耦
对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,后续的注册短信和邮件不是即时需要关注的步骤。
对于注册系统而言,发送注册成功的短信和邮件通知并不一定要绑定在一起同步完成,所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的 RocketMQ 中然后马上返回用户结果,由 RocketMQ 异步地进行这些操作。
数据流动如下所述:
- 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 注册信息写入注册系统成功后,再发送消息至 RocketMQ。RocketMQ 会马上返回响应给注册系统,注册完成。用户可立即登录。
- 下游的邮件和短信通知系统订阅 RocketMQ 的此类注册请求消息,即可向用户发送邮件和短信通知,完成所有的注册流程。
用户只需在注册页面等待注册数据写入注册系统和 RocketMQ 的时间,即等待 55ms 即可登录。
# 流量削峰
流量削峰也是 RocketMQ 的常用场景,一般在秒杀或团队抢购活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入 RocketMQ。
秒杀处理流程如下所述:
- 用户发起海量秒杀请求到秒杀业务处理系统。
- 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送 RocketMQ。
- 下游的通知系统订阅 RocketMQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
- 用户收到秒杀成功的通知。
# 顺序消息
详细请见 “消息类型 - 顺序消息” 小节
顺序消息是 RocketMQ 提供的一种对消息发送、消费顺序有严格要求的消息。
# 分布式模缓存同步
双十一大促时,各个分会场会有琳琅满目的商品,每件商品的价格都会实时变化。使用缓存技术也无法满足对商品价格的访问需求,缓存服务器网卡满载。访问较多次商品价格查询影响会场页面的打开速度。
此时需要提供一种广播机制,一条消息本来只可以被集群的一台机器消费,如果使用 RocketMQ 的广播消费模式,那么这条消息会被所有节点消费一次,相当于把价格信息同步到需要的每台机器上,取代缓存的作用。
# 分布式定时 / 延时调度
详细请见 “消息类型 - 定时消息” 小节
RocketMQ 提供精确度到秒级的分布式定时消息能力(5.0 架构后),可广泛应用于订单超时中心处理、分布式延时调度系统等场景。
# 消息类型
# 普通消息
普通消息一般应用于微服务解耦、事件驱动、数据集成等场景,这些场景大多数要求数据传输通道具有可靠的传输能力,且对消息的处理时机、处理顺序没有特别要求。
以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至 RocketMQ 服务端,下游按需从服务端订阅消息,并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。
另外还有日志系统,以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 RocketMQ 。
普通消息的生命周期:
- 初始化(Initialized):消息被 Producer 构建并完成初始化,待发送到服务端的状态。
- 待消费(Ready):消息被发送到服务端,对 Consumer 可见,等待消费的状态。
- 消费中(Inflight):消息被 Consumer 获取,并按照其本地的业务逻辑进行处理的过程。此时服务端会等待 Consumer 完成消费并提交消费结果,如果一定时间后没有收到 Consumer 的响应,RocketMQ 会对消息进行重试处理。
- 消费提交(Acked):Consumer 完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,Consumer 仍然可以回溯消息,重新消费。
- 消息删除(Deleted):RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
# 定时消息
在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
定时消息仅支持在 MessageType 为 Delay 的 Topic 内使用,即定时消息只能发送至类型为定时消息的 Topic 中,发送的消息的类型必须和 Topic 的类型一致。
基于定时消息的超时任务处理具备如下优势:
- 定时精度高、开发门槛低:消息定时时间不存在阶梯间隔,可以轻松实现任意精度事件触发,无需业务去重。
- 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。
定时消息的生命周期:
- 初始化(Initialized):消息被生产者构建并完成初始化,待发送到服务端的状态。
- 定时中(In timing):消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
- 待消费(Ready):定时时刻到达后,服务端将定时消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
- 消费中(Inflight):消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。
- 消费提交(Acked):消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
- 消息删除(Deleted):Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。
定时消息的实现逻辑:先经过定时存储等待触发,定时时间到达后才会被投递给 Consumer。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。
# 顺序消息
顺序消息是 RocketMQ 提供的一种对消息发送、消费顺序有严格要求的消息。顺序消息仅支持在 MessageType 为 FIFO 的 Topic 内使用,对于一个指定的 Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
- 分区顺序消息:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区,同一个 Partition 内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一 Partition 内的消息保证顺序,不同 Partition 之间的消息顺序不做要求。
- 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
- 示例
- 用户注册需要发送验证码,以用户 ID 作为 Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
- 电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
- 全局顺序消息:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
- 示例:在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
和普通消息发送相比,顺序消息发送必须要设置消息组(推荐实现 MessageQueueSelector 的方式,见下文)。要保证消息的顺序性需要单一 Producer 串行发送。
单线程使用 MessageListenerConcurrently 可以顺序消费,多线程环境下使用 MessageListenerOrderly 才能顺序消费。
# 事务消息
# 基础概念
RocketMQ 消息模型:在一个
Topic
中配置多个Queue
,并且每个Queue
维护每个Consumer组
的offset
(消费位置) 实现了 主题模式 / 发布订阅模式。
# Topic 主题
Topic 是消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。Topic 的作用主要如下:
- 定义数据的分类隔离:在 Apache RocketMQ 的方案设计中,建议将不同业务类型的数据拆分到不同的 Topic 中管理,实现存储的隔离性和订阅隔离性。
- 定义数据的身份和权限:Apache RocketMQ 的消息本身是匿名无身份的,同一分类的消息使用相同的 Topic 来做身份识别和权限管理。
# Queue 队列
类似于 **Kafka 中的 Partition(分区)** 这一概念?
Queue 是消息存储和传输的实际容器,也是消息的最小存储单元。每个 Topic 都是由多个 Queue 组成的,以此实现 Queue 数量的水平拆分和 Queue 内部的流式存储。
# Message 消息
Message 是最小数据传输单元。Producer 将业务数据的负载和拓展属性包装成 Message 发送到 Apache RocketMQ 服务端,服务端按照相关语义将 Message 投递到消费端进行消费。
# Producer 生产者
发布消息的角色。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
# Consumer 消费者
消息消费的角色。
- 支持以 ** 推(push)、拉(pull)** 两种模式对消息进行消费。
- 同时也支持集群方式和广播方式的消费。
- 提供实时消息订阅机制,可以满足大多数用户的需求。
# Broker 代理服务器
Broker 负责消息的存储、投递和查询,并保证服务高可用。其实 Broker 就是 MQ 服务器,Producer 生产消息到 Broker,Consumer 从 Broker 拉取并消费消息。
Broker、Topic、Queue 的关系:
- 一个 Topic 中存在多个 Queue
- 一个 Topic 分布在多个 Broker 上,而一个 Broker 可以配置多个 Topic,即 Topic 和 Broker 之间是多对多的关系
如果某个 Topic 消息量很大,应该给它多配置几个 Queue (提高并发能力),并且尽量多分布在不同 Broker 上,以减轻某个 Broker 的压力。
Broker 集群遵从 Master-Slave 架构 :
- Broker 分为 Master 与 Slave
- 一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master
- Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave
- Master 也可以部署多个
# NameServer 名字服务器
为 Broker 和 Producer、Consumer 解耦,类似于 SpringCloud 中的 Eureka / Nacos
NameServer 是一个简单的 Broker 路由注册中心,支持 Broker 的注册与发现,主要提供两个功能:
- Broker 管理:
- NameServer 接受并保存 Broker 所提交的注册信息,作为 Broker 路由表的基本数据
- 提供心跳检测机制,检查 Broker 是否还存活
- 路由信息管理:
- 每个 NameServer 保存了关于 Broker 集群的整个路由信息、用于客户端查询的 Queue 信息
- Producer 和 Consumer 通过 NameServer 中的 Broker 路由表就可以知道整个 Broker 集群的路由信息,从而和对应的 Broker 进行消息的投递和消费(Producer 和 Consumer 定期会向 NameServer 查询相关的 Broker 信息)
NameServer 通常会有多个实例部署,各实例间相互不进行信息通讯。Broker 向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,客户端仍然可以向其它 NameServer 获取路由信息。
上图中的四个角色实际上都需要做集群。
官方架构图
Broker
做了集群,并且还进行了主从部署:由于消息分布在各个Broker
上,一旦某个Broker
宕机,则该Broker
上的消息读写都会受到影响。所以RocketMQ
提供了master/slave
的结构,salve
定时从master
同步数据 (同步刷盘或者异步刷盘),如果master
宕机,则slave
提供消费服务,但是不能写入消息 (后面我还会提到哦)。- 为了保证高可用
HA
,NameServer
也做了集群部署,但它是去中心化的:意味着NameServer
没有 master 节点,在RocketMQ
中是通过 单个 Broker 和所有 NameServer 保持长连接 ,并且Broker
会定期向所有Nameserver
发送心跳,其中包含了自身的Topic
配置信息,这个步骤就对应图中的Routing Info
。- 在
Producer
需要向Broker
发送消息的时候,需要先从NameServer
获取关于Broker
的路由信息,然后通过 轮询 的方式向每个 Queue 中生产数据,以达到负载均衡的效果。Comsumer
通过NameServer
获取所有Broker
的路由信息后,向Broker
发送Pull
请求来获取消息数据。Consumer
可以以两种模式启动 —— 广播(Broadcast)和集群(Cluster):
- 广播模式下,一条消息会发送给 同一个消费组中的所有消费者
- 集群模式下消息只会发送给一个消费者
# 部署模型小结
每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时将 Topic 信息注册到所有 NameServer。
Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic/Broker 路由信息。并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker Master 发送心跳。Producer 完全无状态。
Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic/Broker 路由信息,并向提供 Topic 服务的 Broker Master、Broker Slave 建立长连接,且定时向 Broker Master、Broker Slave 发送心跳。
Consumer 既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
# 工作原理
# 1、启动 NameServer
启动 NameServer。NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
# 2、启动 Broker
启动 Broker。与所有 NameServer 保持长连接,定时发送心跳包(包含当前 Broker 信息以及存储的所有 Topic 信息)。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
# 3、创建 Topic
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
# 4、Producer 发送消息
Producer 发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中查询当前发送的 Topic 存在于哪些 Broker 上,轮询从对应 Broker 上的 Queue 列表中选择一个 Queue,然后与该 Queue 所在的 Broker 建立长连接,从而向 Broker 发消息。
# 5、Consumer 接收消息
Consumer 接受消息。跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在于哪些 Broker 上,然后直接跟 Broker 建立连接通道,然后开始消费消息。
# 如何正确发送消息
# 不建议单一进程创建大量 Producer
Apache RocketMQ 的 Producer 和 Topic 是多对多的关系,支持同一个 Producer 向多个 Topic 发送消息。对于 Producer 的创建和初始化,建议遵循够用即可、最大化复用原则,如果有需要发送消息到多个 Topic 的场景,无需为每个 Topic 都创建一个 Producer。
# 不建议频繁创建和销毁 Producer
Apache RocketMQ 的 Producer 是可以重复利用的底层资源,类似数据库的连接池。因此不需要在每次发送消息时动态创建 Producer,且在发送结束后销毁 Producer。这样频繁的创建销毁会在服务端产生大量短连接请求,严重影响系统性能。
正确示例:
Producer p = ProducerBuilder.build(); | |
for (int i =0;i<n;i++){ | |
Message m= MessageBuilder.build(); | |
p.send(m); | |
} | |
p.shutdown(); |
# Consumer 分类
- PushConsumer
- SimpleConsumer
- PullConsumer
# Producer 和 Consumer 分组
# Producer 分组
RocketMQ 服务端 5.x 版本开始,Producer 是匿名的,无需管理 Producer 分组(ProducerGroup)。
对于历史版本服务端 3.x 和 4.x 版本,已经使用的 ProducerGroup 可以废弃无需再设置,且不会对当前业务产生影响。
# Consumer 分组
Consumer 分组是多个消费行为一致的 Consumer 的负载均衡分组。Consumer 分组不是具体实体而是一个逻辑资源。通过 Consumer 分组实现消费性能的水平扩展以及高可用容灾。
Consumer 分组中的订阅关系、投递顺序性、消费重试策略是一致的。
- 订阅关系:Apache RocketMQ 以 Consumer 分组的粒度管理订阅关系,实现订阅关系的管理和追溯。
- 投递顺序性:Apache RocketMQ 的服务端将消息投递给 Consumer 消费时,支持顺序投递和并发投递,投递方式在 Consumer 分组中统一配置。
- 消费重试策略:Consumer 消费消息失败时的重试策略,包括重试次数、死信队列设置等。
RocketMQ 服务端 5.x 版本:上述 Consumer 的消费行为从关联的 Consumer 分组中统一获取,因此同一分组内所有 Consumer 的消费行为必然是一致的,客户端无需关注。
RocketMQ 服务端 3.x/4.x 历史版本:上述消费逻辑由消费者客户端接口定义,因此,您需要自己在消费者客户端设置时保证同一分组下的消费者的消费行为一致。[来自官方网站]
# 如何解决顺序消费、重复消费?
其实 RocketMQ
的架构基本和 Kafka
类似,只不过:
- RocketMQ 的注册中心是
NameServer
,而 Kafka 的是Zookeeper
- RocketMQ 的 **Queue(队列)** 相当于是 Kafka 的 Partition(分区)
# 顺序消费
可以参考 “消息类型 - 顺序消息” 小节
RocketMQ
在 Topic 上是无序的、它只有在 Queue 层面才保证有序:
- 普通顺序(分区顺序消息):对于一个 Topic,同一 Queue 内的消息保证顺序,不同 Queue 之间的消息顺序不做要求。
- 严格顺序(全局顺序消息):对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
严格顺序的代价巨大,而 MQ 一般能容忍短暂的乱序,所以推荐使用普通顺序模式!
在 Producer 生产消息的时候,会轮询同一 Topic 的不同 Queue 来发送消息。那么如果此时我有几个消息分别是同一个订单的创建、支付、发货,在轮询的策略下这三个消息会被发送到不同的 Queue,那么此时就无法使用 RocketMQ
的队列有序特性来保证消息有序性了。
解决方法很简单,只需要将同一语义下的消息放入同一个队列(比如这里是同一个订单),那我们就可以使用 Hash 取模法 来保证同一个订单在同一个队列中就行了。
RocketMQ 实现了两种 Queue 选择算法:
轮询算法
- 向消息指定的 Topic 所在 Queue 中依次发送消息,保证消息均匀分布
- 是 RocketMQ 默认队列选择算法
最小投递延迟算法
每次消息投递的时候统计消息投递的延迟,优先选择消息延时小的 Queue,导致消息分布不均匀,按照如下设置即可。
producer.setSendLatencyFaultEnable(true);
继承
MessageQueueSelector
实现自定义的选择算法SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 从 mqs 中选择一个队列,可以根据 msg 特点选择
return null;
}
}, new Object());
# 特殊情况
# 发送异常
选择 Queue 后会与 Broker 建立连接,通过网络请求将消息发送到 Broker 上,如果 Broker 挂了或者网络波动发送消息超时,此时 RocketMQ 会进行重试。
重新选择其他 Broker 中的 Queue 进行发送,默认重试两次,可以手动设置。
producer.setRetryTimesWhenSendFailed(5); |
# 消息过大
消息超过 4k 时 RocketMQ 会将消息压缩后再发送到 Broker 上,减少网络资源的占用。
# 重复消费
需求:有一个订单的处理积分的系统 FrancisQ,每当来一个消息的时候它就负责为创建这个订单的用户的积分加上相应的数值。可是有一次,消息队列发送给订单系统 FrancisQ 的订单信息,其要求是给 FrancisQ 的积分加上 500。但是积分系统在收到 FrancisQ 的订单信息处理完成之后返回给消息队列处理成功的信息的时候出现了网络波动 (当然还有很多种情况,比如 Broker 意外重启等等),这条回应没有发送成功。那么,消息队列没收到积分系统的回应会不会尝试重发这个消息?问题就来了,我再发这个消息,万一它又给 FrancisQ 的账户加上 500 积分怎么办呢?
解决方法:让 Consumer 实现幂等校验!即对同一个消息的处理结果,执行多少次都不变。
幂等操作的特点:其执行任意多次所产生的影响,均与执行一次的影响相同。
那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。
- 可以使用 **写入
Redis
** 来保证,因为Redis
的key
和value
就是天然支持幂等的。 - 可以使用 **数据库插入法**,基于数据库的唯一键来保证重复数据不会被插入多条。
需要根据特定场景使用特定的解决方案,你要知道你的消息消费是否是完全不可重复消费还是可以忍受重复消费的,然后再选择强校验和弱校验的方式。毕竟在 CS 领域还是很少有技术银弹的说法。
而在互联网领域,幂等不仅仅适用于 **消息队列的重复消费问题,也同样适用于在其他场景中来解决重复请求或者重复调用的问题**:
- 比如将 HTTP 服务设计成幂等的解决前端或者 APP 重复提交表单数据的问题
- 可以将一个微服务设计成幂等的,解决
RPC
框架自动重试导致的重复调用问题。
# 如何实现分布式事务?
事务指的是要么都执行,要么都不执行。在分布式架构中,很多服务是部署在不同系统之间的,那么如何实现分布式事务呢?常见的分布式事务实现有:
- 2PC(两阶段提交):优点是简单,缺点是同步阻塞、中心化问题、数据不一致、太过保守...
- TCC(Try Confirm/Cancel):也是 2PC 的一种。
- 事务消息(half 半消息机制):RocketMQ 支持这种类型的消息。
这三种实现都有特定的使用场景和各自的局限,并不完美。
RocketMQ 中使用的是事务消息(half 半消息机制) + 事务反查机制来解决分布式事务问题的,可以对照着图进行理解。
在第 1 步发送的 half 消息,它的意思是在事务提交之前,对于 Consumer 来说这个消息是不可见的。
那么,如何做到写入消息但是对 Consumer 不可见呢?RocketMQ 事务消息的做法是:
- 如果消息是 half 消息,将备份原消息的 Topic 与消息消费队列
- 然后改变 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,由于 Consumer 未订阅该 Topic,故无法消费 half 类型的消息。
- 然后 RocketMQ 会开启一个定时任务,从该 Topic 中拉取消息进行消费,根据 Producer 组获取一个服务提供者,发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。
如果没有从第 5 步开始的事务反查机制,如果出现网路波动第 4 步没有发送成功,这样就会产生 MQ 不知道是不是需要给 Consumer 消费的问题。在 RocketMQ
中就是使用的上述的事务反查来解决的,而在 Kafka
中通常是直接抛出一个异常让用户来自行解决。
你还需要注意的是, MQ Server
指向系统 B 的操作已经和系统 A 不相关了,也就是说在消息队列中的分布式事务是:本地事务和存储消息到消息队列才是同一个事务。这样也就产生了事务的最终一致性,因为整个过程是异步的,每个系统只要保证它自己那一部分的事务就行了。
# 如何解决消息堆积问题?
在上面我们提到了消息队列的一个很重要的功能 —— 削峰。那么如果这个峰值太大了导致消息堆积在队列中怎么办呢?
其实这个问题可以将它广义化,因为产生消息堆积的根源其实就只有两个:
Producer 生产太快:限流降级,或者增加 Consumer 实例以水平扩展消费能力(同时还需要增加每个 Topic 的 Queue 数量)。
别忘了在
RocketMQ
中,一个 Queue 只会被一个 Consumer 消费 ,如果你仅仅是增加 Consumer 实例就会出现我一开始给你画的架构图的那种情况:Consumer 消费太慢:先检查 Consumer 是否出现了大量的消费错误,或者打印日志查看是否有哪一个线程卡死,导致了锁资源不释放等问题。
# 回溯消费
回溯消费是指 Consumer
已经消费成功的消息,由于业务上需求需要重新消费。在 RocketMQ
中, Broker
在向 Consumer
投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于 Consumer
系统故障,恢复后需要重新消费 1 小时前的数据,那么 Broker
要提供一种机制,可以按照时间维度来回退消费进度。 RocketMQ
支持按照时间回溯消费,时间维度精确到毫秒。
# 如何保证高性能读写?
可参考 “Java IO - IO 模型 - NIO” 小节,略了。
- 传统 IO 方式:即
read + write
,整个过程会发生 4 次上下文切换和 4 次数据的拷贝,这在高并发场景下会严重影响读写性能,故引入了零拷贝技术。- 用户调用 read () 方法,开始读取数据,此时发生一次上下文从用户态到内核态的切换,也就是图示的切换 1
- 将磁盘数据通过 DMA 拷贝到内核缓存区
- 将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据
- read () 方法返回,此时就会从内核态切换到用户态,也就是图示的切换 2
- 当我们拿到数据之后,就可以调用 write () 方法,此时上下文会从用户态切换到内核态,即图示切换 3
- CPU 将用户缓冲区的数据拷贝到 Socket 缓冲区
- 将 Socket 缓冲区数据拷贝至网卡
- write () 方法返回,上下文重新从内核态切换到用户态,即图示切换 4
- 零拷贝技术
mmap
:sendfile
:
# 刷盘机制
# 同步刷盘、异步刷盘
在单个节点层面
同步刷盘:需要等待一个刷盘成功的 ACK
,对消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融等特定业务场景。
异步刷盘:开启一个线程去异步地执行刷盘操作。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了 MQ
的性能和吞吐量,一般适用于如发验证码等对于消息保证要求不太高的业务场景。
一般地,异步刷盘只有在 Broker
意外宕机的时候会丢失部分数据,你可以设置 Broker
的参数 FlushDiskType
来调整你的刷盘策略 (ASYNC_FLUSH 或者 SYNC_FLUSH)。
# 同步复制、异步复制
在 Broker 主从模式下,master 返回消息给客户端时是否需要同步 slave
同步复制:也叫 “同步双写”,即只有消息同步双写到主从节点上时,才返回写入成功。
异步复制:消息写入主节点之后,直接返回写入成功。
然而,很多事情是没有完美的方案的,就比如我们进行消息写入的节点越多就更能保证消息的可靠性,但是随之的性能也会下降,所以需要程序员根据特定业务场景去选择适应的主从复制方案。
异步复制不会像异步刷盘那样影响消息的可靠性,因为两者是不同的概念,对于消息可靠性是通过不同的刷盘策略保证的,而像异步同步复制策略仅仅是影响到了可用性。为什么呢?其主要原因是 RocketMQ
是不支持自动主从切换的,当 master 节点挂掉之后,Producer 就不能再给这个 master 节点生产消息了。
比如这个时候采用异步复制的方式,在主节点还未发送完需要同步的消息的时候主节点挂掉了,这个时候从节点就少了一部分消息。但是此时生产者无法再给主节点生产消息了,消费者可以自动切换到从节点进行消费 (仅仅是消费),所以在主节点挂掉的时间只会产生主从结点短暂的消息不一致的情况,降低了可用性,而当主节点重启之后,从节点那部分未来得及复制的消息还会继续复制。
在单主从架构中,如果一个主节点挂掉了,那么也就意味着整个系统不能再生产了。那么这个可用性的问题能否解决呢?一个主从不行那就多个主从的呗,别忘了在我们最初的架构图中,每个 Topic
是分布在不同 Broker
中的。
但是这种复制方式同样也会带来一个问题,那就是无法保证 严格顺序 。在上文中我们提到了如何保证的消息顺序性是通过将一个语义的消息发送在同一个队列中,使用 Topic
下的队列来保证顺序性的。如果此时我们主节点 A 负责的是订单 A 的一系列语义消息,然后它挂了,这样其他节点是无法代替主节点 A 的,如果我们任意节点都可以存入任何消息,那就没有顺序性可言了。
而在 RocketMQ
中采用了 Dledger
解决这个问题。他要求在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客⼾端返回写⼊成功,并且它是⽀持通过选举来动态切换主节点的。这里我就不展开说明了,读者可以自己去了解。
也不是说
Dledger
是个完美的方案,至少在Dledger
选举过程中是无法提供服务的,而且他必须要使用三个节点或以上,如果多数节点同时挂掉他也是无法保证可用性的,而且要求消息复制半数以上节点的效率和直接异步复制还是有一定的差距的。
# 存储机制
# 动手发一条消息
# 1、启动 RocketMQ
安装 NameServer
docker run -d -p 9876:9876 --name rmqnamesrv foxiswho/rocketmq:server-4.5.1 |
安装 Broker
1)新建配置目录。
如果是 Windows 需要替换为 Windows 的电脑路径,和 Linux 还是有点差异。
mkdir -p ${HOME}/docker/software/rocketmq/conf |
2)新建配置文件 broker.conf。
brokerClusterName = DefaultCluster | |
brokerName = broker-a | |
brokerId = 0 | |
deleteWhen = 04 | |
fileReservedTime = 48 | |
brokerRole = ASYNC_MASTER | |
flushDiskType = ASYNC_FLUSH | |
# 此处为本地 ip, 如果部署服务器,需要填写服务器外网 ip | |
brokerIP1 = xx.xx.xx.xx |
3)创建容器。
docker run -d \ | |
-p 10911:10911 \ | |
-p 10909:10909 \ | |
--name rmqbroker \ | |
--link rmqnamesrv:namesrv \ | |
-v ${HOME}/docker/software/rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf \ | |
-e "NAMESRV_ADDR=namesrv:9876" \ | |
-e "JAVA_OPTS=-Duser.home=/opt" \ | |
-e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m" \ | |
foxiswho/rocketmq:broker-4.5.1 |
安装 RocketMQ 控制台
docker pull pangliang/rocketmq-console-ng | |
docker run -d \ | |
--link rmqnamesrv:namesrv \ | |
-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false" \ | |
--name rmqconsole \ | |
-p 8088:8080 \ | |
-t pangliang/rocketmq-console-ng |
运行成功,稍等几秒启动时间,浏览器输入 localhost:8088
查看控制台。
# 2、发送普通消息
下述完整 Demo 详情查看 springboot-ladder/mq-rocketmq-4x 项目模块。
# 2.1、引入 RocketMQ 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
# 2.2、启动自动装配
因为咱们 Demo 中使用的是 SpringBoot3,RocketMQ 最新版本 2.2.3 没有适配 SpringBoot3,所以需要手动搞定自动装配。
如果 SpringBoot2 版本,就不需要执行这一步。
resources 目录下创建 META-INF/spring 目录,并创建 org.springframework.boot.autoconfigure.AutoConfiguration.imports
文件。
# RocketMQ 2.2.3 version does not adapt to SpringBoot3 | |
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration |
# 2.3、Producer
配置文件中引入 RocketMQ 相关配置定义,比如连接 NameServer 地址等。
server: | |
port: 6060 | |
rocketmq: | |
name-server: 127.0.0.1:9876 # NameServer 地址 | |
producer: | |
group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义 |
定义消息生产者,通过 RocketMQTemplate
向 RocketMQ 发送普通常规消息。
import cn.hutool.core.util.StrUtil; | |
import com.alibaba.fastjson.JSON; | |
import com.nageoffer.springbootladder.rocketmq4x.event.GeneralMessageEvent; | |
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.rocketmq.client.producer.SendResult; | |
import org.apache.rocketmq.common.message.MessageConst; | |
import org.apache.rocketmq.spring.core.RocketMQTemplate; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.support.MessageBuilder; | |
import org.springframework.stereotype.Component; | |
/** | |
* 普通消息发送者 | |
* | |
* @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:ladder)获取更多项目资料 | |
*/ | |
@Slf4j | |
@Component | |
@RequiredArgsConstructor | |
public class GeneralMessageDemoProduce { | |
private final RocketMQTemplate rocketMQTemplate; | |
/** | |
* 发送普通消息 | |
* | |
* @param topic 消息发送主题,用于标识同一类业务逻辑的消息 | |
* @param tag 消息的过滤标签,消费者可通过 Tag 对消息进行过滤,仅接收指定标签的消息。 | |
* @param keys 消息索引键,可根据关键字精确查找某条消息 | |
* @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串 | |
* @return 消息发送 RocketMQ 返回结果 | |
*/ | |
public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) { | |
SendResult sendResult; | |
try { | |
StringBuilder destinationBuilder = StrUtil.builder().append(topic); | |
if (StrUtil.isNotBlank(tag)) { | |
destinationBuilder.append(":").append(tag); | |
} | |
Message<?> message = MessageBuilder | |
.withPayload(messageSendEvent) | |
.setHeader(MessageConst.PROPERTY_KEYS, keys) | |
.setHeader(MessageConst.PROPERTY_TAGS, tag) | |
.build(); | |
sendResult = rocketMQTemplate.syncSend( | |
destinationBuilder.toString(), | |
message, | |
2000L | |
); | |
log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys); | |
} catch (Throwable ex) { | |
log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex); | |
throw ex; | |
} | |
return sendResult; | |
} | |
} |
# 2.4、Consumer
定义消息消费者,从 RocketMQ Broker 拉取对应 Topic Tag 的消息列表。
import com.alibaba.fastjson.JSON; | |
import com.nageoffer.springbootladder.rocketmq4x.event.GeneralMessageEvent; | |
import lombok.RequiredArgsConstructor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; | |
import org.apache.rocketmq.spring.core.RocketMQListener; | |
import org.springframework.stereotype.Component; | |
/** | |
* 普通消息消费者 | |
* | |
* @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:ladder)获取更多项目资料 | |
*/ | |
@Slf4j | |
@Component | |
@RequiredArgsConstructor | |
@RocketMQMessageListener( | |
topic = "rocketmq-demo_common-message_topic", | |
selectorExpression = "general", | |
consumerGroup = "rocketmq-demo_general-message_cg" | |
) | |
public class GeneralMessageDemoConsume implements RocketMQListener<GeneralMessageEvent> { | |
@Override | |
public void onMessage(GeneralMessageEvent message) { | |
log.info("接到到RocketMQ消息,消息体:{}", JSON.toJSONString(message)); | |
} | |
} |
# 2.5、发送一条消息
定义消息发送程序,这里为了避免类过多,直接写在 SpringBoot 的启动程序里。发送普通消息的方法返回值就是发送 RocketMQ Broker 返回的状态码,成功的话就是 SEND_OK
。
import com.nageoffer.springbootladder.rocketmq4x.event.GeneralMessageEvent; | |
import com.nageoffer.springbootladder.rocketmq4x.produce.GeneralMessageDemoProduce; | |
import io.swagger.v3.oas.annotations.Operation; | |
import io.swagger.v3.oas.annotations.tags.Tag; | |
import lombok.RequiredArgsConstructor; | |
import org.apache.rocketmq.client.producer.SendResult; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.web.bind.annotation.PostMapping; | |
import org.springframework.web.bind.annotation.RestController; | |
import java.util.UUID; | |
@RestController | |
@RequiredArgsConstructor | |
@SpringBootApplication | |
@Tag(name = "RocketMQ发送示例", description = "RocketMQ发送示例启动器") | |
public class RocketMQDemoApplication { | |
private final GeneralMessageDemoProduce generalMessageDemoProduce; | |
@PostMapping("/test/send/general-message") | |
@Operation(summary = "发送RocketMQ普通消息") | |
public String sendGeneralMessage() { | |
String keys = UUID.randomUUID().toString(); | |
GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder() | |
.body("消息具体内容,可以是自定义对象,最终都会序列化为字符串。如果是取消订单,这里应该是订单ID或者相关联的信息") | |
.keys(keys) | |
.build(); | |
SendResult sendResult = generalMessageDemoProduce.sendMessage( | |
"rocketmq-demo_common-message_topic", | |
"general", | |
keys, | |
generalMessageEvent | |
); | |
return sendResult.getSendStatus().name(); | |
} | |
public static void main(String[] args) { | |
SpringApplication.run(RocketMQDemoApplication.class, args); | |
} | |
} |
项目中引入了 Swagger3,通过界面 UI 发送一条消息测试效果。访问 http://127.0.0.1:6060/swagger-ui/index.html
,调用定义的发送 RocketMQ 普通消息方法。
点击 Execute 执行方法调用。
通过方法调用得知,返回数据为成功。
也能看到 RocketMQ 对应的生产者和消费者对应日志。
2023-09-24T17:38:57.457+08:00 INFO 48437 --- [nio-6060-exec-6] c.n.s.r.p.GeneralMessageDemoProduce : [普通消息] 消息发送结果:SEND_OK,消息ID:7F000001BD35251A69D77A3BC5280002,消息Keys:7a60c853-08dc-46cd-a647-398d45b54966 | |
2023-09-24T17:38:57.459+08:00 INFO 48437 --- [al-message_cg_3] c.n.s.r.c.GeneralMessageDemoConsume : 接到RocketMQ消息,消息体:{"body":"消息具体内容,可以是自定义对象,最终都会序列化为字符串。如果是取消订单,这里应该是订单ID或者相关联的信息","keys":"7a60c853-08dc-46cd-a647-398d45b54966"} |
# 3、扩展框架 SpringCloud Stream
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration
与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
Binder
:跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如 Kafka
的实现 KafkaMessageChannelBinder
, RabbitMQ
的实现 RabbitMessageChannelBinder
以及 RocketMQ
的实现 RocketMQMessageChannelBinder
。
Binding
:包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
下图是 Spring Cloud Stream 的架构设计。
SpringCloud Stream RocketMQ 不是咱们本次介绍的重点,所以只是抛砖引玉,大家需要了解详情参考:RocketMQ Example
# 部署架构
# 本地部署
# 单组节点单副本模式
这种方式风险较大,因为 Broker 只有一个节点,一旦 Broker 重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
# 多组节点(集群)单副本模式
一个集群内全部部署 Master 角色,不部署 Slave 副本,例如 2 个 Master 或者 3 个 Master,这种模式的优缺点如下:
- 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
# 生产部署
# 多节点(集群)多副本模式:异步复制
每个 Master 配置一个 Slave,有多组 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;
- 缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。
# 多节点(集群)多副本模式:同步双写
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低 10% 左右),发送单个消息的 RT 会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。