SpringBoot3.4.0整合消息中间件Kafka和RabbitMQ
SpringBoot3.4.0整合消息中间件Kafka和RabbitMQ

上一篇文章基于Docker完成了SpringBoot3.4.0和Apache Pulsar4.0.1整合并实现了消息的发送和订阅,今天基于SpringBoot实现整合Kafka和RabbitMQ并实现发送订阅消息功能。Docker Desktop 4.36.0的安装流程见基于Docker-SpringBoot3.4.0集成Apache Pulsar4.0.1实现消息发布和订阅
完整代码在文章最后,如果觉得本篇文章对你有用,记得点赞、关注、收藏哦。你的支持是我持续更新的动力!
SpringBoot3专栏软件环境
- JDK17.0.12
- SpringBoot3.4.0
- Kafka3.9.0
- RabbitMQ4.0
- Docker Desktop4.36.0 for Windows
- IDEA2024.2.0.2
- Win10专业版
我们先看本篇文章对应的项目结构,请看下图

1 基于Docker安装RabbitMQ
1.1 安装和启动RabbitMQ
# latest RabbitMQ 4.0.x
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management


2 RabbitMQ整合Springboot3.4.0
2.1 pom.xml依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.itbeien</groupId>
<artifactId>springboot3-labs-master</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>springboot-rabbitmq</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
2.2 代码实现
2.2.1 配置信息
# application.properties
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
2.2.2 生产者代码
package cn.itbeien.rabbitmq.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@Service
@Slf4j
public class ProducerService {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
@Bean
public Queue itbeienQueue() {
return new Queue("itbeienQueue");
}
public ProducerService(AmqpAdmin amqpAdmin ,AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
public void sendMsg(){
Message message = new Message("hello itbeien".getBytes());
this.amqpTemplate.send("itbeienQueue",message);
log.info("生产者发布消息...");
}
}
2.2.3 消费者代码
package cn.itbeien.rabbitmq.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@Service
@Slf4j
public class ConsumerService {
@RabbitHandler
public void process(@Payload String foo) {
log.info(foo);
}
@RabbitListener(queues = "itbeienQueue")
public void processMessage(String content) {
log.info("消费者消费消息:{}",content);
}
}
2.3 单元测试
2.3.1 单元测试代码
package cn.itbeien.rabbitmq.test;
import cn.itbeien.rabbitmq.service.ProducerService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@SpringBootTest
public class RabbitMQApp {
@Autowired
private ProducerService producerService;
@Test
public void test() throws Exception{
producerService.sendMsg();
Thread.sleep(10000);
}
}
2.3.2 单元测试结果
结合RabbitMQ完成消息的生产和消费,请看下图

3 基于Docker安装 Kafka
3.1 安装和启动Kafka
# 获取docker镜像
docker pull apache/kafka:3.9.0
# 启动运行kafka
docker run -p 9092:9092 apache/kafka:3.9.0


4 Kafka整合SpringBoot3.4.0
4.1 pom.xml依赖配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.itbeien</groupId>
<artifactId>springboot3-labs-master</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>springboot-kafka</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>
4.2 代码实现
4.2.1 配置信息
#application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=itbeienGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=cn.itbeien.kafka.vo
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
4.2.2 生产者代码
package cn.itbeien.kafka.service;
import cn.itbeien.kafka.vo.SampleMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@Service
@Slf4j
public class ProducerService {
private final KafkaTemplate<Object, SampleMessage> kafkaTemplate;
ProducerService(KafkaTemplate<Object, SampleMessage> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(SampleMessage message) {
this.kafkaTemplate.send("itbeienTopic", message);
log.info("Sent sample message [" + message + "]");
}
}
4.2.3 消费者代码
package cn.itbeien.kafka.service;
import cn.itbeien.kafka.vo.SampleMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@Service
@Slf4j
public class ConsumerService {
private final List<SampleMessage> messages = new CopyOnWriteArrayList<>();
@KafkaListener(topics = "itbeienTopic")
void processMessage(SampleMessage message) {
this.messages.add(message);
log.info("Received sample message [" + message + "]");
}
public List<SampleMessage> getMessages() {
return this.messages;
}
}
4.3 单元测试
4.3.1 单元测试代码
package cn.itbeien.kafka.test;
import cn.itbeien.kafka.service.ProducerService;
import cn.itbeien.kafka.vo.SampleMessage;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* @author itbeien
* 项目网站:https://www.itbeien.cn
* 公众号:贝恩聊架构
* 全网同名,欢迎小伙伴们关注
* Copyright© 2024 itbeien
*/
@SpringBootTest
public class KafkaApp {
@Autowired
private ProducerService producerService;
@Test
public void test() throws Exception{
SampleMessage sampleMessage = new SampleMessage(1,"hello itbeien");
producerService.send(sampleMessage);
Thread.sleep(5000);
}
}
4.3.2 单元测试结果
结合kafka完成消息的生产和消费,请看下图

以上就是 SpringBoot整合RabbitMQ、Kafka并基于Docker完成消息生产和消息消费的整体流程,完整代码在文章最后获取!
欢迎大家关注我的项目实战内容itbeien.cn,一起学习一起进步,在项目和业务中理解各种技术。

欢迎沟通交流技术和支付业务,一起探讨聚合支付/预付卡系统业务、技术、系统架构、微服务、容器化。并结合聚合支付系统深入技术框架/微服务原理及分布式事务原理。加入我的知识星球吧

SpringBoot3专栏
01SpringBoot3专栏-SpringBoot3.4.0整合Mybatis-plus和Mybatis
02SpringBoot3.4.0结合Mybatis-plus实现动态数据源
03mapstruct对象映射在Springboot3中这样用就对了
04RocketMQ5.3.1集成SpringBoot3.4.0就这样简单
05SpringBoot3.4.0整合Redisson实现分布式锁
06MySQL增量数据同步利器Canal1.1.7环境搭建流程
07SpringBoot3.4.0集成Canal1.1.7实现MySQL实时同步数据到Redis
08基于Docker-SpringBoot3.4.0集成Apache Pulsar4.0.1实现消息发布和订阅
跟着我学微服务系列
01跟着我学微服务,什么是微服务?微服务有哪些主流解决方案?
05SpringCloudAlibaba之图文搞懂微服务核心组件在企业级支付系统中的应用
06JDK17+SpringBoot3.4.0+Netty4.1.115搭建企业级支付系统POS网关
07JDK17+SpringCloud2023.0.3搭建企业级支付系统-预付卡支付交易微服务
08JDK17+Dubbo3.3.2搭建企业级支付系统-预付卡支付交易微服务
09JDK17+SpringBoot3.3.6+Netty4.1.115实现企业级支付系统POS网关签到功能
贝恩聊架构-项目实战地址
5 源码地址
贝恩聊架构-SpringBoot3专栏系列文章、资料和源代码会同步到以下地址,代码和资料每周都会同步更新
该仓库地址主要用于贝恩聊架构-SpringBoot3专栏、基于企业级支付系统,学习微服务整体技术栈
