消息中间件 在微服务架构中,服务之间的解耦和高效通信是关键,而消息中间件在其中扮演了不可或缺的角色。它允许不同服务通过异步消息的方式进行通信,既减少了服务间的直接依赖,也提高了系统的容错性与扩展性。 于此,本文将简单介绍两种常见的消息中间件技术——Kafka 和 RocketMQ,并分析它们在微服务架构中的使用场景与实现方式。 消息中间件是一种用于在分布式系统中传递消息的软件。它作为消息的中转站,确保消息能够在发送者和接收者之间高效且可靠地传递。 常见的消息中间件技术包括kafka、rabbitmq、rocketmq等。其在微服务架构中的主要应用包括了服务之间的异步通信、事件驱动架构中的事件流处理、提供消息的持久化存储,以防数据丢失、解耦生产者与消费者,使得它们不需要同时在线。
消息中间件
架构
优点
缺点
典型应用场景
Kafka
分布式流处理平台
- 高吞吐量,适合大数据场景 - 支持持久化,数据可靠性高 - 横向扩展性强 - 良好的日志管理功能
- 不适合低延迟场景 - 依赖 Zookeeper 进行集群管理 - 消息处理机制较为复杂
- 实时日志收集 - 数据流处理 - 大数据分析和监控系统
RabbitMQ
基于 AMQP 协议的消息代理
- 支持复杂的路由功能 - 提供消息确认机制,消息可靠性强 - 支持丰富的协议(如 AMQP、MQTT) - 低延迟,适合高并发场景
- 吞吐量相对较低 - 难以处理大规模消息流量
- 在线订票、秒杀等实时系统 - 金融交易系统 - 消息队列优先级和延时任务
RocketMQ
分布式、主从架构
- 支持分布式事务消息 - 高吞吐量,延迟低 - 支持顺序消息和延时消息 - 扩展性强
- 社区和生态相对较小 - 功能相对较少,缺乏一些高级功能
- 金融支付系统 - 秒杀、高并发订单系统 - 分布式事务处理
ActiveMQ
基于 JMS(Java 消息服务)
- 易于使用,兼容 JMS 标准 - 支持多种协议(如 STOMP、AMQP) - 丰富的消息传递模式(点对点、发布订阅等)
- 性能和扩展性不如 Kafka、RocketMQ - 延迟较高,不适合大规模并发
- 传统企业消息系统 - 轻量级的消息传递和集成 - 小型应用和系统集成
kafka 什么是kafka Kafka 是由 LinkedIn 开发并贡献给 Apache 基金会的分布式流处理平台。最初 Kafka 被设计为一种高吞吐量的消息队列,但它现在已经成为一个功能强大的流数据平台,能够处理实时数据流的发布和订阅,以及持久化存储。 Kafka 的架构由以下几部分组成:
组件
说明
Broker
消息的存储和转发节点,负责接收、存储和转发消息。Kafka 集群由多个 Broker 组成。
Producer
消息生产者,向 Broker 发送消息。每个消息都属于一个 Topic。
Consumer
消息消费者,从 Broker 中读取消息。消费者可以订阅一个或多个 Topic。
Zookeeper
用于管理 Kafka 集群的元数据和协调 Broker 的状态,Kafka 通过 Zookeeper 进行集群管理。
Topic
消息的分类,每个 Topic 下有多个分区(Partition)。
Partition
每个 Topic 可以有多个 Partition,消息在 Partition 中按照顺序存储,Partition 可以分布在不同的 Broker 上。
Consumer Group
消费者分组机制,Consumer Group 中的消费者共享消费任务,每条消息仅被其中一个消费者消费。
Kafka实践 docker安装kafka 对于较新版的kafka来说,可以考虑直接使用KRaft而非zookeeper。 于此采用docker compose,首先需根据实际情况创建一个docker-compose.yaml文件,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 services: kafka: image: 'bitnami/kafka:latest' user: root ports: - '9092:9092' - '9093:9093' environment: - KAFKA_ENABLE_KRAFT =yes - KAFKA_CFG_PROCESS_ROLES =broker,controller - KAFKA_CFG_CONTROLLER_LISTENER_NAMES =CONTROLLER - KAFKA_CFG_LISTENERS =PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP =CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_ADVERTISED_LISTENERS =PLAINTEXT://192.168.1.186:9092 - KAFKA_CFG_NODE_ID =1 - KAFKA_BROKER_ID =1 - KAFKA_KRAFT_CLUSTER_ID =LelM2dIFQkiUFvXCEcqRWA - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS =1@192.168.1.186:9093 - ALLOW_PLAINTEXT_LISTENER =yes volumes: - /data/deploy/kafkaCluster/kraft:/bitnami/kafka:rw network_mode: bridge
然后执行docker-compose up即可启动kafka服务,启动后可以通过docker compose logs kafka查看日志。(对于不同的docker compose版本是有些许去别的,有些需要指定name。) 值得注意的是,上面的是基于kraft部署多节点的kafka集群的,以下介绍一种简单的kafka部署。参考https://blog.csdn.net/m0_51390969/article/details/140156693 首先是选择一组兼容性较好的kafka和zookeeper版本。
1 2 docker pull bitnami/kafka:3 .6 .1 docker pull bitnami/zookeeper:3 .8 .2
而后创建一个kafka网络,以确保kafka和zookeeper在同一个docker网络中运行,能够相互通信:
1 docker network create kafka
而后运行zookeeper:
1 docker run -d --name zookeeper --network kafka -e ALLOW_ANONYMOUS_LOGIN =yes bitnami/zookeeper:3.8.2
在此基础上运行kafka:
1 docker run -d --name kafka --network kafka -e KAFKA_BROKER_ID =1 -e KAFKA_ZOOKEEPER_CONNECT =zookeeper:2181 -e ALLOW_PLAINTEXT_LISTENER =yes -e KAFKA_ADVERTISED_LISTENERS =PLAINTEXT://localhost:9092 -p 9092:9092 bitnami/kafka:3.6.1
检查安装情况的方式可以参考上文提到的链接:
1 2 3 4 5 6 7 8 9 10 11 12 docker logs zookeeper docker logs kafka docker exec -it kafka /bin/bash kafka-topics.sh --list --bootstrap-server kafka:9092 kafka-topics.sh --create --topic test-topic --partitions 1 --replication-factor 1 --bootstrap-server kafka:9092 kafka-topics.sh --list --bootstrap-server kafka:9092 kafka-topics.sh --delete --topic test-topic --bootstrap-server kafka:9092
另外,还有个需要注意的东西,就是kafka的外部访问 比如kafka部署在192.168.1.1上,那么对于192.168.1.2上的服务,很容易发现会有问题,无法正常使用kafka。 要让kafka进行外网访问,需要修改kafka的配置文件server.properties。以docker部署kafka+zookeeper为例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 # 进入kafka的容器,将配置文件拷贝出来 docker cp kafka:/opt/bitnami/kafka/config/server .properties server .properties# 修改配置文件 vim server .properties# 修改如下内容 # 这样Kafka就会在所有网络接口上监听9092端口 listeners=PLAINTEXT:# 修改Kafka的advertised.listeners,将localhost改为宿主机的公共IP地址,这样Kafka就会将消息发送到宿主机的IP地址 advertised.listeners=PLAINTEXT:# 将修改后的配置文件拷贝回去 docker cp server .properties kafka:/opt/bitnami/kafka/config/server .properties# 或者直接映射文件也可以 # 最后重启kafka让配置文件失效 docker restart kafka
kafka的使用 在安装好kafka的基础上,于此介绍一下基于springboot对kafka的使用。 首先,依旧是先配置好合适的依赖:
1 2 3 4 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > </dependency >
在此基础上,添加对应的kafka配置:
1 spring.kafka.bootstrap-servers =localhost:9092
随后编写生产者:
1 2 3 4 5 6 7 8 9 10 11 @Service public class KafkaProducer { @Resource private KafkaTemplate <String ,String > kafkaTemplate; public String sendKafkaTest (String topic, String msg ) { kafkaTemplate.send (topic, msg); return "send topic: " + topic + "and msg is: " + msg; } }
以一个简单的接口调用测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 @RestController public class KafkaController { @Resource private KafkaProducer kafkaProducer; @PostMapping ("/test" ) public String pushMsg (@RequestParam ("topic" ) String topic, @RequestParam ("message" ) String message) { return kafkaProducer .sendKafkaTest (topic, message); } }
随后创建一个生产者,对产生的内容进行消费:
1 2 3 4 5 6 7 @Component public class KafkaConsumer { @KafkaListener (topics = {"PushTopic" ,"test" },groupId = "123" ) public void consume (String message ){ System .out .println ("接收到消息:" +message); } }
通过postman进行简单的测试:
1 localhost:8080 /test ?topic=PushTopic&message ="test message"
如此,便实现了一个简单的由springboot整合的kafka。
RocketMq 什么是RocketMq RocketMQ 是由阿里巴巴开源的分布式消息中间件,起初是为电商场景设计的高可用、高可靠的消息队列系统。它特别适用于高并发、大数据量的场景,提供了更低的消息延迟和丰富的消息调度能力。 RocketMQ的架构由以下几个部分组成:
组件
说明
Broker
消息的存储和转发节点,分为 Master 和 Slave。Master 负责写入和读操作,Slave 负责数据备份。
Producer
消息生产者,向 Broker 发送消息。一个 Producer 可以将消息发送到多个 Topic。
Consumer
消息消费者,从 Broker 中读取消息。Consumer 可以订阅多个 Topic,并且可以分组以实现负载均衡。
NameServer
提供服务发现功能,消费者和生产者通过 NameServer 查找 Broker 地址。
Topic
消息的分类,每个 Topic 下可以有多个消息队列(Queue)。
Message Queue
每个 Topic 可以有多个消息队列,消息在队列中按照顺序存储,消费时可以保证顺序性。
Consumer Group
消费者分组机制,一个 Consumer Group 中的消费者共同消费 Group 下的消息,每条消息只被消费一次。
使用 RocketMQ 的优势主要在于其对于事务一致性的支持,但其并发量稍逊于百万吞吐的kafka,因此,Kafka 更适合大规模数据流处理,RocketMQ 则在低延迟和事务处理场景中表现出色。根据具体业务需求,选择合适的消息中间件,能够显著提升系统的可扩展性与稳定性。
RocketMQ实践 docker安装rocketMQ 对于rocketmq来说,部署需要nameserver和broker。采用docker部署是比较简单得方式,参考https://rocketmq.apache.org/zh/docs/quickStart/02quickstartWithDocker/ 首先拉取rocketmq镜像,以5.3.0为例:
1 docker pull apache/rocketmq:5 .3 .0
在此基础上创建容器共享网络,方便多个服务之间得相互通信。
1 docker network create rocketmq
而后启动NameServer:
1 2 3 4 docker run -d --name rmqnamesrv -p 9876 :9876 --network rocketmq apache/rocketmq:5 .3 .0 sh mqnamesrvdocker logs -f rmqnamesrv
1 2 3 4 OpenJDK 64 -Bit Server VM warning : Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release OpenJDK 64 -Bit Server VM warning : UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release . The Name Server boot success. serializeType=JSON , address 0.0 .0 .0 :9876 context canceled
NameServer 成功启动后,我们启动 Broker 和 Proxy.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 echo "brokerIP1=127.0.0.1" > broker.conf docker run -d \ --name rmqbroker \ --network rocketmq \ -p 10912 :10912 -p 10911 :10911 -p 10909 :10909 \ -p 8080 :8080 -p 8081 :8081 \ -e "NAMESRV_ADDR=rmqnamesrv:9876" \ -v ./broker.conf:/home/rocketmq/rocketmq-5.3 .0 /conf/broker.conf \ apache/rocketmq:5.3 .0 sh mqbroker --enable-proxy \ -c /home/rocketmq/rocketmq-5.3 .0 /conf/broker.conf docker exec -it rmqbroker bash -c "tail -n 10 /home/rocketmq/logs/rocketmqlogs/proxy.log"
1 2 3 4 5 6 7 8 9 2024 -09 -17 20 :12 :42 INFO main - ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator2024 -09 -17 20 :12 :43 INFO main - grpc server has built. port: 8081 , tlsKeyPath: 1 , tlsCertPath: 32 , threadPool: 136314880 , queueCapacity: {}, boosLoop: {}, workerLoop: {}, maxInboundMessageSize: {}2024 -09 -17 20 :12 :43 INFO main - Server is running in TLS permissive mode2024 -09 -17 20 :12 :43 INFO main - Using OpenSSL provider2024 -09 -17 20 :12 :43 INFO main - SSLContext created for server2024 -09 -17 20 :12 :43 INFO main - The broker[9 cb3449f88cb, 127.0.0.1:10911 ] boot success. serializeType=JSON and name server is rmqnamesrv:9876 2024 -09 -17 20 :12 :43 INFO main - user specified name server address: rmqnamesrv:9876 2024 -09 -17 20 :12 :43 INFO main - grpc server start successfully.2024 -09 -17 20 :12 :43 INFO main - Tue Sep 17 12 :12 :43 UTC 2024 rocketmq-proxy startup successfully
于此,rocketmq就部署完成了。
RocketMQ的使用 以java为例,于此提供一个基于rocketmq消息中间件进行收发的示例: 具体内容可参见:https://github.com/gagaducko/learning_demos/tree/main/rocketmq-demo 首先,要引入rocketmq的依赖:
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-spring-boot-starter</artifactId > <version > 2.2.2</version > </dependency >
而后在properties中设置rocketmq的相关设置:
1 2 3 4 5 6 7 8 9 rocketmq.consumer.pull-batch-size =10 rocketmq.name-server =192.168 .186.1 :9876 rocketmq.producer.group =rocketmq-msp_producer_grouprocketmq.producer.sendMessageTimeout =10000 rocketmq.producer.retryTimesWhenSendFailed =2 rocketmq.producer.retryTimesWhenSendAsyncFailed =2 rocketmq.producer.maxMessageSize =4096 rocketmq.producer.compressMessageBodyThreshold =4096 rocketmq.producer.retryNextServer =false
需要注意的是,在spring boot 3+,当使用RocketMQTemplate时,需要配置RocketMQTemplate bean,不配置会出现问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Configuration public class RocketMqConfig { @Value("${rocketmq.producer.group} " ) private String producerGroup; @Value("${rocketmq.name-server} " ) private String nameServer; @Bean("RocketMqTemplate" ) public RocketMQTemplate rocketMqTemplate(){ RocketMQTemplate rocketMqTemplate = new RocketMQTemplate(); DefaultMQProducer defaultMqProducer = new DefaultMQProducer(); defaultMqProducer.setProducerGroup(producerGroup); defaultMqProducer.setNamesrvAddr(nameServer); rocketMqTemplate.setProducer(defaultMqProducer); return rocketMqTemplate; } }
在此基础上,现在实现一个生产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 @Component public class RocketmqProducer { @Value ("${rocketmq.producer.group}" ) private String producerGroup; @Value ("${rocketmq.name-server}" ) private String nameServer; private DefaultMQProducer producer; @PostConstruct public void defaultMQProducer ( ) { producer= new DefaultMQProducer (producerGroup); producer.setNamesrvAddr (nameServer); producer.setVipChannelEnabled (false ); try { producer.start (); } catch (MQClientException e) { e.printStackTrace (); } } public String send (String topic, String tags, String body) throws InterruptedException , RemotingException , MQClientException , MQBrokerException , UnsupportedEncodingException { Message message = new Message (topic, tags, body.getBytes (RemotingHelper .DEFAULT_CHARSET )); StopWatch stop = new StopWatch (); stop.start (); SendResult result = producer.send (message); System .out .println ("发送响应:MsgId:" + result.getMsgId () + ",发送状态:" + result.getSendStatus ()); stop.stop (); return "{\"MsgId\":\"" +result.getMsgId ()+"\"}" ; } }
而后实现一个消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Component @Slf4j public class RocketmqConsumer implements CommandLineRunner { @Value ("${rocketmq.name-server}" ) private String nameServer; @Value ("${rocketmq.producer.group}" ) private String defaultProducerGroup; public void messageListener ( ){ DefaultMQPushConsumer consumer=new DefaultMQPushConsumer (defaultProducerGroup); consumer.setNamesrvAddr (nameServer); try { consumer.subscribe ("PushTopic" , "push" ); consumer.setConsumeFromWhere (ConsumeFromWhere .CONSUME_FROM_FIRST_OFFSET ); consumer.setConsumeMessageBatchMaxSize (1 ); consumer.registerMessageListener ((MessageListenerConcurrently ) (msgs, context) -> { for (Message msg :msgs){ System .out .println ("消费者接收到了消息:" + new String (msg.getBody ())); } return ConsumeConcurrentlyStatus .CONSUME_SUCCESS ; }); consumer.start (); } catch (Exception e) { e.printStackTrace (); } } @Override public void run (String ... args) throws Exception { this .messageListener (); } }
于此,以PushTopic为topic,push为tag作为一个例子的消息,做一个用于测试的接口:
1 2 3 4 5 6 7 8 9 10 11 @PostMapping ("/push" )public String pushMsg (@RequestParam ("topic" ) String topic, @RequestParam ("tag" ) String tag, @RequestParam ("message" ) String message ) { try { return rocketmqProducer.send (topic, tag, message); } catch (InterruptedException | RemotingException | MQClientException | MQBrokerException | UnsupportedEncodingException e) { e.printStackTrace (); return "ERROR" ; } }
项目启动后,调用接口localhost:8080/push?topic=PushTopic&tag=push&message=”test message6” 生产者产生消息并被消息者消费如下。 如此,便是一个简单的基于rocketmq的示例。