Spring Boot 提供了强大的异步处理能力,可以帮助开发者轻松实现非阻塞式编程,提高系统吞吐量和响应速度。
以下是实现异步处理的完整方案:
一、基于注解的异步方法
1. 启用异步支持
@Configuration @EnableAsync // 核心注解,启用异步处理 public class AsyncConfig { // 可选:自定义线程池 @Bean(name = "taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(5); // 核心线程数 executor.setMaxPoolSize(10); // 最大线程数 executor.setQueueCapacity(100); // 队列容量 executor.setThreadNamePrefix("Async-"); // 线程名前android缀 executor.setRpythonejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
2. 标记异步方法
@Service public class EmailService { // 使用默认线程池 @Async public CompletableFuture<String> sendEmailAsync(String to, String content) { // 模拟耗时操作 Thread.sleep(1000); System.out.println("邮件发送至: " + to + ",线程: " + Thread.currentThread().getName()); return CompletableFuture.completedFuture("success"); } // 使用指定线程池 @Async("taskExecutor") public void processInBackgjavascriptround(Order order) { // 后台处理逻辑 System.out.println("处理订单: " + order.getId()); } }
3. 调用异步方法
@RestController @RequestMapping("/api") public class NotificationController { @Autowired private EmailService emailService; @GetMapping("/send") public ResponseEntity<String> sendNotification() { // 非阻塞调用 CompletableFuture<String> future = emailService.sendEmailAsync("user@example.com", "Hello"); // 可以继续执行其他逻辑 System.out.println("请求已接收,邮件发送中..."); return ResponseEntity.ok("请求已处理"); } }
二、响应式异步处理
1. 添加WebFlux支持
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency>
2. 响应式控制器
@RestController @RequestMapping("/reactive") public class ReactiveController { @GetMapping("/data") public Mono<String> getData() { return Mono.fromCallable(() -> { // 模拟耗时操作 Thread.sleep(500); r编程eturn "响应式数据"; }).subscribeOn(Schedulers.boundedElastic()); // 指定异步调度器 } }
三、事件驱动的异步处理
1. 定义异步事件
public class OrderProcessedEvent extends ApplicationEvent { private final Order order; public OrderProcessedEvent(Object source, Order order) { super(source); this.order = order; } public Order getOrder() { return order; } }
2. 异步事件监听
@Component public class OrderEventListener { @Async @EventListener public void handleOrderEvent(OrderProcessedEvent event) { System.out.println("异步处理订单事件: " + event.getOrder().getId()); // 执行耗时操作 } }
3. 发布事件
@Service public class OrderService { private final ApplicationEventPublisher eventPublisher; public OrderServichttp://www.devze.come(ApplicationEventPublisher eventPublisher) { this.eventPublisher = eventPublisher; } public void processOrder(Order order) { // 业务逻辑... eventPublisher.publishEvent(new OrderProcessedEvent(this, order)); } }
四、异步RestTemplate
1. 配置AsyncRestTemplate
@Bean public AsyncRestTemplate asyncRestTemplate() { return new AsyncRestTemplate( new ConcurrentTaskExecutor(Executors.newFixedThreadPool(10)) ); }
2. 使用示例
@Service public class ApiService { @Autowired private AsyncRestTemplate asyncRestTemplate; public ListenableFuture<ResponseEntity<String>> fetchDataAsync(String url) { return asyncRestTemplate.getForEntity(url, String.class); } }
五、Spring WebClient (非阻塞HTTP客户端)
1. 配置WebClient
@Bean public WebClient webClient() { return WebClient.builder() .baseUrl("https://api.example.com") .clientConnector(new ReactorClientHttpConnector( HttpClient.create().followRedirect(true) )) .build(); }
2. 使用示例
@Service public class ApiConsumerService { @Autowired private WebClient webClient; public Mono<User> getUserAsync(String userId) { return webClient.get() .uri("/users/{id}", userId) .retrieve() .bodyToMono(User.class); } }
六、事务边界处理
1. 异步事务处理
@Transactional @Async public CompletableFuture<Void> processWithTransaction(Order order) { // 业务逻辑 orderRepository.save(order); return CompletableFuture.completedFuture(null); }
2. 事务事件监听
@Async @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void handleAfterCommit(OrderEvent event) { // 事务提交后异步执行 System.out.println("事务提交后处理: " + event.getOrderId()); }
七、异步异常处理
1. 自定义异常处理器
@Configuration public class AsyncExceptionConfig implements AsyncConfigurer { @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> { System.err.println("异步方法异常: " + method.getName()); ex.printStackTrace(); // 可以发送通知或记录日志 }; } }
2. 异常处理示例
@Async public CompletableFuture<String> riskyOperation() { if (Math.random() > 0.5) { throw new RuntimeException("随机错误"); } return CompletableFuture.completedFuture("成功"); } // 调用处处理异常 riskyOperation().exceptionally(ex -> { System.out.println("处理异常: " + ex.getMessage()); return "fallback"; });
八、性能监控与指标
1. 监控异步任务
@Async @Timed(value = "async.email", description = "邮件发送耗时") public void sendEmailWithMetrics(String to) { // 发送邮件逻辑 }
2. 线程池指标
@Bean @ExportMeterRegistry public ExecutorServiceMetrics executorServiceMetrics(MeterRegistry registry) { return new ExecutorServiceMetrics( Executors.newFixedThreadPool(5), "async.executor", Collections.emptyList() ); }
九、最佳实践
线程池配置:
- 根据业务类型隔离线程池(IO密集型 vs CPU密集型)
- 合理设置队列容量和拒绝策略
资源清理:
@PreDestroy public void cleanup() { ((ThreadPoolTaskExecutor) executor).shutdown(); }
上下文传递:
@Bean public Executor contextAwareExecutor() { return new DelegatingSecurityContextExecutorService( Executors.newFixedThreadPool(5) ); }
调试技巧:
- 设置线程名前缀便于日志追踪
- 使用MDC传递请求上下文
避免陷阱:
- 异步方法不能自调用
- 避免在异步方法中使用ThreadLocal
- 注意事务边界问题
十、完整示例
电商订单处理系统
@Slf4j @Service public class OrderProcessingService { @Async("orderTaskExecutor") @Transactional public CompletableFuture<OrderResult> processOrderAsync(Order order) { log.info("开始处理订单 {}", order.getId()); // 1. 扣减库存 inventoryService.reduceStock(order); // 2. 生成支付单 Payment payment = paymentService.createPayment(order); // 3. 异步通知 notificationService.sendOrderConfirmation(order); // 4. 记录审计日志 auditService.logOrderProcessed(order); return CompletableFuture.completedFuture( new OrderResult(order.getId(), "SUCCESS") ); } } @Configuration @EnableAsync public class AsyncConfig { @Bean(name = "orderTaskExecutor") public Executor orderTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(100); executor.setThreadNamePrefix("Order-Processor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } @Bean(name = "notificationTaskExecutor") public Executor notificationTaskExecutor() { // 单独线程池处理通知 return new ThreadPoolTaskExecutor(); } }
Spring Boot的异步处理能力为构建高性能应用提供了强大支持,合理使用可以显著提升系统吞吐量。开发者应根据具体业务场景选择合适的异步模式,并注意线程安全、资源管理和异常处理等问题。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论