目录
- Spring Boot 整合第三方组件:Redis、MyBATis、Kafka 实战
- 一、Spring Boot Starter 设计哲学
- 约定优于配置的核心思想
- 自动装配机制揭秘
- 二、Redis 整合实战
- 依赖配置与自动装配
- ⚙️ Redis 配置类
- Redis 操作实战
- Redis 健康检查与监控
- ️ 三、MyBatis 整合实战
- MyBatis 依赖配置
- ️ 数据层架构设计
- Service 层实现
- MyBatis 配置优化
- 四、Kafka 整合实战
- Kafka 依赖配置
- Kafka 配置类
- 消息生产者实现
- 消息消费者实现
- 邮件服务模拟
- Kafka 健康检查与监控
- 五、总结与扩展阅读
- 整合成果总结
- 性能优化建议
Spring Boot 整合第三方组件:Redis、MyBatis、Kafka 实战
一、Spring Boot Starter 设计哲学
约定优于配置的核心思想
传统整合 vs Spring Boot Starter 对比:
| 方面 | 传统方式 | Spring Boot Starter | 优势 / 成果 |
|---|---|---|---|
| 依赖管理 | 手动维护版本、易冲突 | 起步依赖(Starter)自动聚合与版本对齐 | 版本冲突减少 80%,依赖升级更安全 |
| 配置复杂度 | 需大量 XML 或 Java 配置 | 自动配置 + 合理默认值(约定优于配置) | 配置代码减少 70%,开发效率提升 |
| 启动速度 | 手动加载配置,启动较慢 | 条件化自动装配,按需加载模块 | 启动时间缩短 50%,资源占用更优 |
| 维护成本 | 配置分散,项目间不一致 | 统一 Starter 标准配置与依赖管理 | 维护效率提升 60%,团队协作更顺畅 |
自动装配机制揭秘
Spring Boot 自动配置流程:
graph TB
A[启动类] --> B[@SpringBootApplication]
B --> C[@EnableAutoConfiguration]
C --> D[spring.factories]
D --> E[自动配置类]
E --> F[条件注解检查]
F --> G[创建Bean]
G --> H[完成整合]
style E fill:#bbdefb,stroke:#333
style F fill:#c8e6c9,stroke:#333
条件注解工作原理示例:
@Configuration
@ConditionalOnClass(RedisTemplate.class) // 类路径存在时生效
@ConditionalOnProperty(prefix = "spring.redis", name = "enabled", matchIfMissing = true)
@EnableConfigurationProperties(RedisProperties.class) // 绑定配置
public class RedisAutoConfiguration {
@Bean
@ConditionalOnMissingBean // 容器中不存在时创建
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
return template;
}
}
二、Redis 整合实战
依赖配置与自动装配
Maven 依赖配置:
<!-- pom.xml -->
<dependencies>
<!-- Redis Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- jsON 序列化 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
application.yml 配置:
# application.yml
spring:
redis:
# 连接配置
host: localhost
port: 6379
password: 123456
database: 0
timeout: 2000ms
# 连接池配置
lettuce:
pool:
max-active: 20 # 最大连接数
max-idle: 10 # 最大空闲连接
min-idle: 5 # 最小空闲连接
max-wait: 1000ms # 获取连接最大等待时间
# 集群配置(可选)
# cluster:
# nodes:
# - 192.168.1.101:6379
# - 192.168.1.102:6379
⚙️ Redis 配置类
自定义 Redis 配置:
@Configuration
@Slf4j
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
/**
* 自定义 RedisTemplate,解决序列化问题
*/
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// 使用 Jackson2JsonRedisSerializer 替换默认序列化
Jackson2JsonRedisSerializer<Object> serializer =
new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(
LazyIterator.class, ObjectMapper.DefaultTyping.NON_FINAL);
serializer.setObjectMapper(objectMapper);
// 设置 key 和 value 的序列化规则
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(serializer);
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
template.afterPropertiesSet();
log.info("RedisTemplate 配置完成");
return template;
}
/**
* 缓存管理器配置
*/
@Bean
public RedisCacheManager cacheManager(RedisConnectionFactory factory) {
RedisCachephpConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(30)) // 默认缓存30分钟
.disableCachingNullValues() // 不缓存null值
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
}
}
Redis 操作实战
基础数据操作服务:
@Service
@Slf4j
public class RedisOperationService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* 字符串操作
*/
public void stringOperations() {
// 1. 设置值
redisTemplate.opsForValue().set("user:1001:name", "张三");
redisTemplate.opsForValue().set("user:1001:age", 25, Duration.ofMinutes(30));
// 2. 获取值
String name = (String) redisTemplate.opsForValue().get("user:1001:name");
Integer age = (Integer) redisTemplate.opsForValue().get("user:1001:age");
log.info("用户信息 - 姓名: {}, 年龄: {}", name, age);
// 3. 原子操作
Long increment = redisTemplate.opsForValue().increment("counter", 1);
log.info("计数器值: {}", increment);
}
/**
* Hash 操作
*/
public void hashOperations() {
String key = "user:1001:profile";
// 1. 设置Hash字段
Map<String, Object> userMap = new HashMap<>();
userMap.put("name", "李四");
userMap.put("age", 30);
userMap.put("email", "lisi@example.com");
redisTemplate.opsForHash().putAll(key, userMap);
// 2. 获取单个字段
String name = (String) redisTemplate.opsForHash().get(key, "name");
log.info("Hash字段 name: {}", name);
// 3. 获取所有字段
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
log.info("完整用户信息: {}", entries);
}
/**
* List 操作
*/
public void listOperations() {
String key = "recent:users";
// 1. 从左端插入
redisTemplate.opsForList().leftPush(key, "user1");
redisTemplate.opsForList().leftPush(key, "user2");
redisTemplate.opsForList().leftPush(key, "user3");
// 2. 获取范围
List<Object> recentUsers = redisTemplate.opsForList().range(key, 0, 2);
log.info("最近访问用户: {}", recentUsers);
}
/**
* Set 操作
*/
public void setOperations() {
String key = "user:tags:1001";
// 1. 添加元素
redisTemplate.opsForSet().add(key, "vip", "active", "new");
// 2. 判断元素是否存在
Boolean isVip = redisTemplate.opsForSet().isMember(key, "vip");
log.info("是否是VIP: {}", isVip);
// 3. 获取所有元素
Set<Object> tags = redisTemplate.opsForSet().members(key);
log.info("用户标签: {}", tags);
}
}
缓存注解实战:
@Service
@Slf4j
public class UserService {
/**
* 缓存用户信息
*/
@Cacheable(value = "users", key = "#userId", unless = "#result == null")
public User getUserById(Long userId) {
log.info("查询数据库获取用户: {}", userId);
// 模拟数据库查询
return findUserFromDB(userId);
}
/**
* 更新缓存
*/
@CachePut(value = "users", key = "#user.id")
public User updateUser(User user) {
log.info("更新用户信息: {}", user.getId());
// 模拟数据库更新
return updateUserInDB(user);
}
/**
* 删除缓存
*/
@CacheEvict(value = "users", key = "#userId")
public void deleteUser(Long userId) {
log.info("删除用户: {}", userId);
// 模拟数据库删除
deleteUserFromDB(userId);
}
/**
* 复杂缓存配置
*/
@Caching(
cacheable = {
@Cacheable(value = "user_detail", key = "#userId")
},
put = {
@CachePut(value = "user_profile", key = "#userId")
}
)
public User getUserDetail(Long userId) {
log.info("获取用户详情: {}", userId);
return findUserDetailFromDB(userId);
}
// 模拟数据库操作
private User findUserFromDB(Long userId) {
// 实际项目中这里会查询数据库
return new User(userId, "用户" + userId, "user" + userId + "@example.com");
}
p编程rivate User updateUserInDB(User user) {
return user;
}
private void deleteUserFromDB(Long userId) {
// 删除操作
}
private User findUserDetailFromDB(Long userId) {
return new User(userId, "详情用户" + userId, "detail" + userId + "@example.com");
}
@Data
@AllArgsConstructor
public static class User {
private Long id;
private String name;
private String email;
}
}
Redis 健康检查与监控
Redis 健康指示器:
@Component
public class RedisHealthIndicator implements HealthIndicator {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Override
public Health health() {
try {
// 测试Redis连接
RedisConnection connection = redisConnectionFactory.getConnection();
try {
String result = connection.ping();
if ("PONG".equals(result)) {
return Health.up()
.withDetail("server", "redis")
.withDetail("version", getRedisVersion(connection))
编程客栈 .withDetail("status", "connected")
.build();
} else {
return Health.down()
.withDetail("error", "Unexpected ping response: " + result)
.build();
}
} finally {
connection.close();
}
} catch (Exception e) {
return Health.down(e)
.withDetail("error", "Redis connection failed: " + e.getMessage())
.build();
}
}
private String getRedisVersion(RedisConnection connection) {
try {
Properties info = connection.info("server");
return info.getProperty("redis_version");
} catch (Exception e) {
return "unknown";
}
}
}
️ 三、MyBatis 整合实战
MyBatis 依赖配置
Maven 依赖:
<!-- pom.xml -->
<dependencies>
<!-- MyBatis Starter -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<!-- mysql 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- 数据源 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
</dependencies>
数据库配置:
# application.yml
spring:
datasource:
# 数据源配置
url: jdbc:mysql://localhost:3306/spring_boot_demo?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# Hikari 连接池配置
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 300000
max-lifetime: 1200000
# MyBatis 配置
mybatis:
# mapper 文件位置
mapper-locations: classpath:mapper/*.xml
# 别名扫描包
type-aliases-package: com.example.entity
# 开启驼峰命名转换
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# 全局配置
global-config:
db-config:
id-type: auto
logic-delete-field: deleted # 逻辑删除字段
logic-delete-value: 1 # 逻辑已删除值
logic-not-delete-value: 0 # 逻辑未删除值
️ 数据层架构设计
实体类定义:
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("user") // MyBatis-Plus 注解,标准 MyBatis 可省略
public class User {
@TableId(type = IdType.AUTO) // 主键自增
private Long id;
private String name;
private String email;
private Integer age;
@TableField(fill = FieldFill.INSERT) // 插入时自动填充
private LocalDateTime createTime;
@TableField(fill = FieldFill.INSERT_UPDATE) // 插入和更新时填充
private LocalDateTime updateTime;
@TableLogic // 逻辑删除标识
private Integer deleted;
}
Mapper 接口定义:
@Mapper
@Repository
public interface UserMapper {
/**
* 根据ID查询用户
*/
@Select("SELECT * FROM user WHERE id = #{id} AND deleted = 0")
User selectById(Long id);
/**
* 查询所有用户
*/
@Select("SELECT * FROM user WHERE deleted = 0 ORDER BY create_time DESC")
List<User> selectAll();
/**
* 插入用户
*/
@Insert("INSERT INTO user(name, email, age, create_time, update_time) " +
"VALUES(#{name}, #{email}, #{age}, NOW(), NOW())")
@Options(useGeneratedKeys = true, keyProperty = "id")
int insert(User user);
/**
* 更新用户
*/
@Update("UPDATE user SET name=#{name}, email=#{email}, age=#{age}, " +
"update_time=NOW() WHERE id=#{id} AND deleted = 0")
int update(User user);
/**
* 逻辑删除用户
*/
@Update("UPDATE user SET deleted=1, update_time=NOW() WHERE id=#{id}")
int deleteById(Long id);
/**
* 根据邮箱查询用户
*/
@Select("SELECT * FROM user WHERE email = #{email} AND deleted = 0")
User selectByEmail(String email);
/**
* 分页查询javascript用户
*/
@Select("SELECT * FROM user WHERE deleted = 0 ORDER BY create_time DESC " +
"LIMIT #{offset}, #{pageSize}")
List<User> selectByPage(@Param("offset") int offset,
@Param("pageSize") int pageSize);
}
XML Mapper 配置:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mapper.UserMapper">
<!-- 自定义结果映射 -->
<resultMap id="UserResultMap" type="User">
<id column="id" property="id" />
<result column="name" property="name" />
<result column="email" property="email" />
<result column="age" property="age" />
<result column="create_time" property="createTime" />
<result column="update_time" property="updateTime" />
<result column="deleted" property="deleted" />
</resultMap>
<!-- 复杂查询:根据条件动态查询用户 -->
<select id="selectByCondition" parameterType="map" resultMap="UserResultMap">
SELECT * FROM user
WHERE deleted = 0
<if test="name != null and name != ''">
AND name LIKE CONCAT('%', #{name}, '%')
</if>
<if test="minAge != null">
AND age >= #{minAge}
</if>
<if test="maxAge != null">
AND age <= #{maxAge}
</if>
<if test="email != null and email != ''">
AND email = #{email}
</if>
ORDER BY create_time DESC
</select>
<!-- 批量插入用户 -->
<insert id="batchInsert" parameterType="list">
INSERT INTO user (name, email, age, create_time, update_time)
VALUES
<foreach collection="list" item="user" separator=",">
(#{user.name}, #{user.email}, #{user.age}, NOW(), NOW())
</foreach>
</insert>
<!-- 统计用户数量 -->
<select id="countUsers" resultType="int">
SELECT COUNT(*) FROM user WHERE deleted = 0
</select>
</mapper>
Service 层实现
业务服务类:
@Service
@Slf4j
@Transactional
public class UserService {
@Autowired
private UserMapper userMapper;
/**
* 根据ID查询用户
*/
public User getUserById(Long id) {
log.info("查询用户: {}", id);
User user = userMapper.selectById(id);
if (user == null) {
throw new RuntimeException("用户不存在");
}
return user;
}
/**
* 查询所有用户
*/
public List<User> getAllUsers() {
log.info("查询所有用户");
return userMapper.selectAll();
}
/**
* 创建用户
*/
public User createUser(User user) {
log.info("创建用户: {}", user.getName());
// 检查邮箱是否已存在
User existingUser = userMapper.selectByEmail(user.getEmail());
if (existingUser != null) {
throw new RuntimeException("邮箱已存在");
}
int result = userMapper.insert(user);
if (result > 0) {
return user;
} else {
throw new RuntimeException("创建用户失败");
}
}
/**
* 更新用户
*/
public User updateUser(User user) {
log.info("更新用户: {}", user.getId());
// 检查用户是否存在
User existingUser = userMapper.selectById(user.getId());
if (existingUser == null) {
throw new RuntimeException("用户不存在");
}
int result = userMapper.update(user);
if (result > 0) {
return getUserById(user.getId());
} else {
throw new RuntimeException("更新用户失败");
}
}
/**
* 删除用户(逻辑删除)
*/
public void deleteUser(Long id) {
log.info("删除用户: {}", id);
User existingUser = userMapper.selectById(id);
if (existingUser == null) {
throw new RuntimeException("用户不存在");
}
int result = userMapper.deleteById(id);
if (result == 0) {
throw new RuntimeException("删除用户失败");
}
}
/**
* 分页查询用户
*/
public PageResult<User> getUsersByPage(int pageNum, int pageSize) {
log.info("分页查询用户: 页码={}, 大小={}", pageNum, pageSize);
int offset = (pageNum - 1) * pageSize;
List<User> users = userMapper.selectByPage(offset, pageSize);
int total = userMapper.countUsers();
return new PageResult<>(users, total, pageNum, pageSize);
}
/**
* 条件查询用户
*/
public List<User> getUsersByCondition(String name, Integer minAge, Integer maxAge, String email) {
log.info("条件查询用户: name={}, minAge={}, maxAge={}, email={}",
name, minAge, maxAge, email);
Map<String, Object> params = new HashMap<>();
params.put("name", name);
params.put("minAge", minAge);
params.put("maxAge", maxAge);
params.put("email", email);
return userMapper.selectByCondition(params);
}
/**
* 分页结果封装类
*/
@Data
@AllArgsConstructor
public static class PageResult<T> {
private List<T> data;
private long total;
private int pageNum;
private int pageSize;
private int totalPages;
public PageResult(List<T> data, long total, int pageNum, int pageSize) {
this.data = data;
this.total = total;
this.pageNum = pageNum;
this.pageSize = pageSize;
this.totalPages = (int) Math.ceil((double) total / pageSize);
}
}
}
MyBatis 配置优化
MyBatis 配置类:
@Configuration
@MapperScan("com.example.mapper") // 扫描Mapper接口
@Slf4j
public class MyBatisConfig {
/**
* 配置MyBatis SqlSessionFactory
*/
@Bean
@ConfigurationProperties(prefix = "mybatis.configuration")
public org.apache.ibatis.session.Configuration globalConfiguration() {
org.apache.ibatis.session.Configuration configuration =
new org.apache.ibatis.session.Configuration();
configuration.setMapUnderscoreToCamelCase(true);
configuration.setLogImpl(StdOutImpl.class);
configuration.setCacheEnabled(true);
return configuration;
}
/**
* 分页插件配置
*/
@Bean
public MybatisPlusInterceptor mybatisPlusInterceptor() {
MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
// 分页插件
PaginationInnerInterceptor paginationInterceptor = new PaginationInnerInterceptor();
paginationInterceptor.setMaxLimit(1000L); // 单页最大记录数
paginationInterceptor.setOverflow(true); // 超过最大页数时返回第一页
interceptor.addInnerInterceptor(paginationInterceptor);
// 乐观锁插件(可选)
OptimisticLockerInnerInterceptor optimisticLockerInterceptor =
new OptimisticLockerInnerInterceptor();
interceptor.addInnerInterceptor(optimisticLockerInterceptor);
log.info("MyBatis 插件配置完成");
return interceptor;
}
/**
* 元对象处理器(自动填充字段)
*/
@Bean
public MetaObjectHandler metaObjectHandler() {
return new MetaObjectHandler() {
@Override
public void insertFill(MetaObject metaObject) {
this.strictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now());
this.strictInsertFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());
}
@Override
public void updateFill(MetaObject metaObject) {
this.strictUpdateFill(metaObject, "updateTime", LocalDateTime.class, LocalDateTime.now());
}
};
}
}
四、Kafka 整合实战
Kafka 依赖配置
Maven 依赖:
<!-- pom.xml -->
<dependencies>
<!-- Kafka Starter -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- JSON 支持 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
Kafka 配置:
# application.yml
spring:
kafka:
# Kafka 服务器配置
bootstrap-servers: localhost:9092
# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 所有副本确认
retries: 3 # 重试次数
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 缓冲区大小
properties:
linger.ms: 10 # 发送延迟
# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: spring-boot-demo-group # 消费者组
auto-offset-reset: earliest # 偏移量重置策略
enable-auto-commit: false # 手动提交偏移量
properties:
spring.json.trusted.packages: "*" # 信任所有包进行反序列化
# 监听器配置
listener:
ack-mode: manual # 手动提交模式
concurrency: 3 # 并发消费者数量
Kafka 配置类
Kafka 高级配置:
@Configuration
@Slf4j
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* 生产者工厂配置
*/
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
return new DefaultKafkaProducerFactory<>(props);
}
/**
* Kafka 模板
*/
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory());
template.setProducerListener(new ProducerListener<String, Object>() {
@Override
public void onSuccess(ProducerRecord<String, Object> record, RecordMetadata metadata) {
log.info("消息发送成功: topic={}, partition={}, offset={}",
record.topic(), metadata.partition(), metadata.offset());
}
@Override
public void onError(ProducerRecord<String, Object> record, Exception exception) {
log.error("消息发送失败: topic={}, key={}", record.topic(), record.key(), exception);
}
});
return template;
}
/**
* 消费者工厂配置
*/
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-boot-demo-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 监听器容器工厂
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 并发消费者数量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 错误处理
factory.setErrorHandler(((thrownException, data) -> {
log.error("消费消息失败: {}", data, thrownException);
}));
return factory;
}
}
消息生产者实现
消息生产服务:
@Service
@Slf4j
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送用户注册消息
*/
public void sendUserRegistration(User user) {
String topic = "user-registration";
String key = "user_" + user.getId();
UserRegistrationEvent event = new UserRegistrationEvent(
user.getId(), user.getName(), user.getEmail(), LocalDateTime.now());
// 发送消息
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, key, event);
// 异步处理发送结果
future.whenComplete((result, exception) -> {
if (exception != null) {
log.error("发送用户注册消息失败: userId={}", user.getId(), exception);
} else {
log.info("用户注册消息发送成功: userId={}, offset={}",
user.getId(), result.getRecordMetadata().offset());
}
});
}
/**
* 发送订单创建消息
*/
public void sendOrderCreation(Order order) {
String topic = "order-creation";
String key =XlPRxzciK "order_" + order.getId();
// 确保消息顺序:使用订单ID作为key确保同一订单的消息发送到同一分区
kafkaTemplate.send(topic, key, order)
.addCallback(
result -> log.info("订单创建消息发送成功: orderId={}", order.getId()),
exception -> log.error("订单创建消息发送失败: orderId={}", order.getId(), exception)
);
}
/**
* 发送带事务的消息
*/
@Transactional
public void sendTransactionalMessage(String topic, String key, Object message) {
kafkaTemplate.executeInTransaction(operations -> {
operations.send(topic, key, message);
// 这里可以添加数据库操作,确保消息和数据库操作在同一个事务中
return null;
});
}
/**
* 用户注册事件
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class UserRegistrationEvent {
private Long userId;
private String username;
private String email;
private LocalDateTime registerTime;
}
/**
* 订单实体
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private Long id;
private Long userId;
private BigDecimal amount;
private LocalDateTime createTime;
}
}
消息消费者实现
消息消费服务:
@Service
@Slf4j
public class KafkaConsumerService {
@Autowired
private EmailService emailService;
@Autowired
private UserService userService;
/**
* 消费用户注册消息
*/
@KafkaListener(topics = "user-registration", groupId = "user-service")
public void consumeUserRegistration(
@Payload KafkaProducerService.UserRegistrationEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
Acknowledgment ack) {
log.info("收到用户注册消息: key={}, partition={}, event={}", key, partition, event);
try {
// 1. 发送欢迎邮件
emailService.sendwelcomeEmail(event.getEmail(), event.getUsername());
// 2. 初始化用户积分
userService.initUserPoints(event.getUserId());
// 3. 记录注册日志
log.info("用户注册处理完成: userId={}", event.getUserId());
// 4. 手动提交偏移量
ack.acknowledge();
} catch (Exception e) {
log.error("处理用户注册消息失败: userId={}", event.getUserId(), e);
// 根据业务需求决定是否重试
}
}
/**
* 消费订单创建消息
*/
@KafkaListener(topics = "order-creation", groupId = "order-service")
public void consumeOrderCreation(
@Payload KafkaProducerService.Order order,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
Acknowledgment ack) {
log.info("收到订单创建消息: orderId={}, amount={}, time={}",
order.getId(), order.getAmount(),
Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime());
try {
// 1. 库存检查
checkInventory(order);
// 2. 风控检查
riskControlCheck(order);
// 3. 发送订单确认
sendOrderConfirmation(order);
log.info("订单处理完成: orderId={}", order.getId());
ack.acknowledge();
} catch (Exception e) {
log.error("处理订单消息失败: orderId={}", order.getId(), e);
// 根据异常类型决定处理策略
if (e instanceof InventoryException) {
// 库存不足,需要人工处理
handleInventoryShortage(order, e);
} else {
// 其他异常,可以重试
throw e;
}
}
}
/**
* 批量消费消息
*/
@KafkaListener(topics = "batch-processing", groupId = "batch-service")
public void consumeBatchMessages(
List<ConsumerRecord<String, Object>> records,
Acknowledgment ack) {
log.info("收到批量消息,数量: {}", records.size());
try {
for (ConsumerRecord<String, Object> record : records) {
processSingleRecord(record);
}
// 批量提交偏移量
ack.acknowledge();
} catch (Exception e) {
log.error("批量处理消息失败", e);
// 可以根据业务需求实现重试逻辑
}
}
// 模拟业务方法
private void checkInventory(KafkaProducerService.Order order) {
// 库存检查逻辑
log.info("检查库存: orderId={}", order.getId());
}
private void riskControlCheck(KafkaProducerService.Order order) {
// 风控检查逻辑
log.info("风控检查: orderId={}", order.getId());
}
private void sendOrderConfirmation(KafkaProducerService.Order order) {
// 发送确认逻辑
log.info("发送订单确认: orderId={}", order.getId());
}
private void handleInventoryShortage(KafkaProducerService.Order order, Exception e) {
// 处理库存不足
log.warn("处理库存不足: orderId={}", order.getId());
}
private void processSingleRecord(ConsumerRecord<String, Object> record) {
// 处理单条记录
log.info("处理记录: key={}, value={}", record.key(), record.value());
}
}
邮件服务模拟
邮件服务实现:
@Service
@Slf4j
public class EmailService {
/**
* 发送欢迎邮件
*/
public void sendWelcomeEmail(String email, String username) {
log.info("发送欢迎邮件: 收件人={}, 用户名={}", email, username);
// 模拟邮件发送
try {
Thread.sleep(100); // 模拟网络延迟
log.info("欢迎邮件发送成功: {}", email);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("邮件发送中断", e);
}
}
/**
* 发送订单确认邮件
*/
public void sendOrderConfirmation(String email, String orderInfo) {
log.info("发送订单确认邮件: 收件人={}, 订单信息={}", email, orderInfo);
// 模拟邮件发送
try {
Thread.sleep(150);
log.info("订单确认邮件发送成功: {}", email);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("邮件发送中断", e);
}
}
}
Kafka 健康检查与监控
Kafka 健康指示器:
@Component
public class KafkaHealthIndicator implements HealthIndicator {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Override
public Health health() {
try {
// 测试Kafka连接
Future<RecordMetadata> future = kafkaTemplate.send("health-check", "test-key", "test-value");
// 设置超时时间
try {
RecordMetadata metadata = future.get(5, TimeUnit.SECONDS);
return Health.up()
.withDetail("cluster", "kafka")
.withDetail("topic", metadata.topic())
.withDetail("partition", metadata.partition())
.withDetail("status", "connected")
.build();
} catch (TimeoutException e) {
return Health.down()
.withDetail("error", "Kafka connection timeout")
.build();
}
} catch (Exception e) {
return Health.down(e)
.withDetail("error", "Kafka health check failed: " + e.getMessage())
.build();
}
}
}
五、总结与扩展阅读
整合成果总结
三大组件整合对比:
| 组件 | 核心 Starter | 关键配置 | 主要用途 | 性能优化点 |
|---|---|---|---|---|
| Redis | spring-boot-starter-data-redis | 连接池、序列化方式(JDK/JSON)、超时配置 | 缓存、分布式锁、会话存储 | ✅ 使用 Lettuce + 连接池复用✅ 采用 JSON 序列化(GenericJackson2JsonRedisSerializer)✅ 利用 Pipeline/批量操作 提升吞吐 |
| MyBatis | mybatis-spring-boot-starter | 数据源、Mapper 扫描、SqlSession 管理 | 数据持久化、ORM 映射 | ✅ 启用 二级缓存、合理设置 缓存刷新策略✅ 使用 分页插件(PageHelper / MyBatis Plus) 减少全表扫描✅ 优化 SQL 与索引结构 |
| Kafka | spring-kafka | Producer/Consumer 参数、Topic 配置 | 异步消息处理、事件驱动架构 | ✅ 启用 批量发送 (batch.size, linger.ms)✅ 消费端开启 多线程并发消费✅ 合理设置 acks、retries、幂等性 提高可靠性 |
性能优化建议
Redis 优化策略:
# 生产环境 Redis 优化配置
spring:
redis:
lettuce:
pool:
max-active: 50
max-idle: 20
min-idle: 10
timeout: 1000ms
MyBatis 优化建议:
// 启用二级缓存
@CacheNamespace
public interface UserMapper {
// Mapper 接口
}
// 使用批量操作
@Insert("<script>INSERT INTO user (...) VALUES " +
"<foreach collection='list' item='item' separator=','>(...)</foreach></script>")
void batchInsert(List<User> users);
Kafka 性能调优:
spring:
kafka:
producer:
batch-size: 32768 # 增大批量大小
linger-ms: 20 # 适当增加延迟
compression-type: snappy # 启用压缩
consumer:
fetch-max-wait: 500 # 最大等待时间
fetch-min-size: 1024 # 最小获取大小
到此这篇关于Spring Boot 整合第三方组件Redis、MyBatis、Kafka 实战案例指南的文章就介绍到这了,更多相关Spring Boot 整合Redis、MyBatis、Kafka内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
加载中,请稍侯......
精彩评论