目录
- 背景
- 数据库设计
- 代码实现
- 总结
背景
假设我们有一个在线学习平台,管理员需要定期生成销售报告,包括课程销售情况和用户购买情况等重要数据。然而,由于数据量较大,生成报告可能需要较长时间,并且可能会占用大量系统资源,从而影响用户的使用体验。为了解决这个问题,我们考虑采用异步导出的方案。
异步导出的工作原理是将导出操作放在一个异步任务中执行,而不是立即在用户发起导出请求后执行导出操作。这样一来,用户无需等待导出任务完成,就可以继续进行其他操作,而系统则在后台完成导出任务。
这种方案有以下优点:
- 提高系统响应速度: 用户发起导出请求后,系统可以立即响应而不必等待导出任务完成,从而提高了系统的响应速度。
- 改善用户体验: 用户无需等待导出任务完成,可以继续使用系统进行其他操作,这有助于提升用户体验。
- 降低系统负载: 将耗时的导出操作放在异步任务中执行,可以避免阻塞系统资源,从而降低系统的负载,确保其他用户js的操作不受影响。
异步导出在许多需要处理大量数据或耗时操作的场景中都非常有用,可以有效提升系统的性能和用户体验。
数据库设计
首先我们需要设计一个保存导出任务的表,需要记录流转状态、操作人、任务参数,后续任务的创建、导出完成/失败都需要操作这张表
CREATE TABLE `t_export_task` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键', `task_id` varchar(50) NOT NULL COMMENT '任务id', `task_type` tinyint(4) NOT NULL COMMENT '任务类型', `task_param` varchar(1000) NOT NULL COMMENT '任务参数', `status` tinyint(3) NOT NULL DEFAULT 0 COMMENT '状态 0-处理中 1-成功 -1失败', `file_url` varchar(500) DEFAULT NULL COMMENT '文件url', `remark` varchar(200) DEFAULT NULL COMMENT '备注', `create_user_id` int(11) NOT NULL COMMENT '操作人id', `create_user_name` varchar(50) NOT NULL COMMENT '操作人名称', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB COMMENT='导出任务记录';
代码实现
导出工具类 :
负责提交导出任务、取消任务以及上传导出文件到OSS服务器等功能。
- 导出任务线程池: 通过
ExecutorService
线程池来执行导出任务,并确保线程池的单例化,防止重复创建,提高性能。 - 导出类型对应的任务: 使用
Map<Integer, CompletableFuture<String>>
来保存正在运行的导出任务,以便后续取消任务或跟踪任务状态。 - 提交导出任务: 提交导出任务时,先初始化线程池,然后使用
CompletableFuture.supplyAsync()
方法执行异步任务,并在异步任务中生成导出文件,然后上传到OSS服务器,最后返回导出文件的URL。 - 取消任务: 取消任务时,从保存的任务映射中获取对应的
CompletableFuture
实例,并调用cancel()
方法取消任务。 - 上传文件到OSS服务器:
uploadToOSS()
方法负责实际的文件上传逻辑,将导出的文件上传到OSS服务器,并返回文件的URL
@Slf4j @Component @RequiredArgsConstructor(onConstructor_ = @Autowired) public class ExportTaskUtil { /** * 导出任务线程池 */ private static volatile ExecutorService executorService; /** * 导出类型对应的任务 */ private final Map<Integer, CompletableFuture<String>> runningTasks =javascript Maps.newConcurrentMap(); private final ExportTaskHandlerFactory exportTaskHandlerFactory; private final FileUploadService fileUploadService; /** * @description 提交导出任务 * @author youmu * @date 2024/1/26 17:58 * @param exportTask 导出任务 */ public CompletableFuture<String> submit(ExportTask exportTask) { // 初始化线程池 initThreadPool(); CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ File exportFile = null; // 获取handler ExportTaskHandler handler = exportTaskHandlerFactory.getHandler(exportTask.getTaskType()); // 生成文件 try { exportFile = handler.generateExportFile(exportTask.getTaskParam()); if (exportFile == null) { throw new BizException(CodeEnum.NOT_FOUND, "导出文件为空"); } // 上传文件到OSS服务器,获取文件URL return uploadToOSS(exportFile); } catch (BizException e) { throw e; } catch (Exception e) { throw new RuntimeException(e); } finally { if (exportFile != null) { FileUtil.del(exportFile); } } },exejscutorService); runningTasks.put(exportTask.getTaskType(), future); return future; } private static void initThreadPool() { if (executorService == null) { synchronized (ExportTaskUtil.class) { if (executorService == null) { executorService = ThreadUtil.newFixedExecutor(4, "asyncExport", false);; } } } } /** * @description 取消任务 * @author youmu * @date 2024/1/26 19:04 * @param exportTask 任务 */ public void cancel(ExportTask exportTask) { CompletableFuture<String> future = runningTasks.get(exportTask.getTaskType()); if (future != null && !future.isDone()) { future.cancel(true); } } /** * @description 上传文件到OSS服务器 * @author youmu * @date 2024/1/29 16:56 */ private String uploadToOSS(File exportFile) { // 实现文件上传逻辑,返回文件URL return fileUploadService.uploadFileBySize(exportFile,"export/" + exportFile.getName()); } }
导出任务处理的工厂类以及相关的接口和枚举定义
导出任务采用来工厂+策略的设计模式,工厂模式将对象的创建逻辑封装到工厂类中,策略模式将不同的行为封装到不同的策略类中,使得代码具有良好的可扩展性、灵活性和可维护性。
- ExportTaskHandlerFactory: 这是一个工厂类,用于根据导出任务类型获取对应的任务处理器。在初始化时,它会将所有实现了
ExportTaskHandler
接口的处理器注入进来,并根据任务类型建立起映射关系。 - ExportTaskHandler 接口: 这是一个导出任务处理器的接口,定义了生成导出文件和获取导出任务类型的方法,具体的导出任务处理器需要实现该接口。
- ExportTaskTypeEnum 枚举: 这是一个枚举类型,定义了导出任务的类型,包括了任务类型的代码和描述信息。
- OrderExportHandler 类: 这是一个具体的导出任务处理器的实现类,用于处理订单导出任务。它实现了
ExportTaskHandler
接口,根据具体业务逻辑生成导出文件,并提供了获取任务类型的方法。
/** * @description ExportTaskHandler 工厂类 * @author youmu * @date 2024/1/26 18:04 */ @Slf4j @Component public class ExportTaskHandlerFactory { private final Map<Integer, ExportTaskHandler> handlerMap = Maps.newHashMap(); @Autowired public ExportTaskHandlerFactory(List<ExportTaskHandler> handlers) { for (ExportTaskHandler taskHandler : handlers) { handlerMap.put(taskHandler.getExportType().getCode(), taskHandler); } } public ExportTaskHandler getHandler(Integer exportType) { return handlerMap.get(exportType); } } /** * @description ExportTaskHandler * @author youmu * @date 2024/1/26 18:06 */ public interface ExportTaskHandler { File generateExportFile编程客栈(String param) throws Exception; ExportTaskTypeEnum getExportType(); } /** * @description 导出任务类型 * @author youmu * @date 2024/1/29 11:01 */ @AllArgsConstructor @Getter public enum ExportTaskTypeEnum implements IEnum<Integer, String> { CROWD_PACKAGE(1, "人群包"), ; private final Integer code; private final String message; } public class OrderExportHandler implements ExportTaskHandler{ @Override public File generateExportFile(String param) throws Exception { return null; } @Override public ExportTaskTypeEnum getExportType() { return null; } }
业务调用
导出任务的门面类 ExportTaskFacade
,它提供了一系列方法来提交、取消、重试导出任务,并提供了查询导出任务的分页接口。
- 提交任务(submitTask): 提交导出任务时,根据是否传入
taskId
参数来判断是新建任务还是更新任务。如果是新建任务,则创建一个新的ExportTask
实例并保存到数据库中,然后调用DOSubmit
方法提交任务;如果是更新任务,则更新任务的状态为正在处理,并调用doSubmit
方法提交任务。 - 任务提交处理(doSubmit): 使用
exportTaskUtil.submit(exportTask)
提交异步导出任务,并定义了任务完成后的处理逻辑。如果任务执行成功,则更新任务状态为成功,并设置文件的URL;如果任务执行失败,则记录失败日志,并更新任务状态为失败,同时记录异常信息。 - 取消任务(cancelTask): 根据传入的
taskId
获取对应的导出任务,然后调用exportTaskUtil.cancel(exportTask)
取消任务。 - 重试任务(retryTask): 根据传入的
taskId
获取对应的导出任务,先取消任务以防止异常情况,然后重新提交任务。
@Slf4j @Component @RequiredArgsConstructor(onConstructor_ = @Autowired) public class ExportTaskFacade { private final ExportTaskService exportTaskService; private final UserService userService; private final ExportTaskUtil exportTaskUtil; public void submitTask(Integer exportType, String param) { submitTask(null,exportType,param); } public void cancelTask(Long taskId) { ExportTask exportTask = exportTaskService.getById(taskId); AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在")); exportTaskUtil.cancel(exportTask); } public void retryTask(Long taskId) { ExportTask exportTask = exportTaskService.getById(taskId); AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在")); // 取消任务,防止异常情况还在执行 exportTaskUtil.cancel(exportTask); // 提交任务 submitTask(taskId,exportTask.getTaskType(),exportTask.getTaskParam()); } private void submitTask(Long taskId, Integer exportType, String param) { ExportTask exportTask; if(taskId == null) { // 保存导出任务 exportTask = new ExportTask(); Integer userId = AuthInfoHolder.getUserId(); exportTask.setTaskId(CodeGenUtil.genCode(GenCodeTypeEnum.DL)); exportTask.setCreateUserId(userId); exportTask.setCreateUserName(userService.findById(userId).getUserName()); exportTask.setTaskType(exportType); exportTask.setTaskParam(param); exportTaskService.save(exportTask); } else { // 更新导出任务 exportTask = exportTaskService.getById(taskId); exportTaskService.lambdaUpdate() .eq(ExportTask::getId, exportTask.getId()) .set(ExportTask::getStatus, ExportStatusEnum.PROCESSING.getCode()) .update(); } doSubmit(exportTask); } private void doSubmit(ExportTask exportTask) { exportTaskUtil.submit(exportTask).thenAccept(url->{ exportTaskService.lambdaUpdate() .eq(ExportTask::getId, exportTask.getId()) .set(ExportTask::getStatus, ExportStatusEnum.SUCCESS.getCode()) .update(); }).exceptionally(ex->{ log.error("[导出任务]执行失败,{}", exportTask.getTaskId(),ex); exportTaskService.lambdaUpdate() .eq(ExportTask::getId, exportTask.getId()) .set(ExportTask::getStatus, ExportStatusEnum.FAILURE.getCode()) .set(ExportTask::getRemark, ex instanceof BizException ? ex.getMessage() : "未知异常") .update(); return null; }); } public Page<ExportTaskVO> findPage(ExportTaskRequest request) { Page<ExportTask> page = exportTaskService.findPage(request); List<ExportTaskVO> voList = ConverterUtil.toVO(ExportTaskConverter.class, page.getRecords()); Page<ExportTaskVO> pageVO = new Page<>(); pageVO.setTotal(page.getTotal()); pageVO.setSize(page.getSize()); pageVO.setCurrent(page.getCurrent()); pageVO.setPages(page.getPages()); pageVO.setRecords(voList); return pageVO; } }
流程图
总结
过以上实践,我们成功实现了一个轻量级的异步导出方案,具有以下优点:
- 使用线程池管理异步任务,确保了任务的并发执行和资源的合理利用。
- 采用 CompletableFuture 实现异步导出和回调更新,简化了异步任务的编写和管理。
- 使用工厂模式和策略模式实现导出任务处理器,使得系统具有良好的可扩展性和灵活性。
然而,这种方案也存在一些缺点:
- 资源管理不足: 如果异步导出任务的并发量过大,而线程池的资源配置不足,则可能导致任务排队等待执行,影响任务的实时性和响应速度。
- 任务执行效率低下: 如果导出任务的处理时间过长,且线程池的工作线程数量有限,则可能导致任务执行效率低下,无法及时完成任务,影响系统的整体性能。
- 可靠性不高,无法保证任务一定会被执行或执行成功,特别是在系统故障或异常情况下。
针对这些缺点,可以考虑以下优化方案:
- 合理调整线程池配置: 根据系统的实际负载情况和性能需求,合理配置线程池的大小和工作线程数量,确保资源的有效利用和任务的及时执行。
- 优化任务处理逻辑: 对任务的处理逻辑进行优化,尽量减少任务的执行时间和资源消耗,提高任务的执行效率和响应速度。
- 引入异步消息处理机制: 使用消息队列或事件驱动模型来实现任务的异步处理,进一步解耦任务提交和任务执行过程,提高系统的可扩展性和灵活性。
- 引入定时任务调度器: 使用定时任务调度器(如 xxl-job)来定期扫描和重试执行异常任务。当任务执行时间超过一定阈值(如2小时)或者任务执行异常时,自动触发重试机制,保证任务的及时执行。
- 增加任务监控和告警机制: 实时监控任务的执行情况,当发现任务执行异常或超时时编程客栈,及时发送告警通知,以便运维人员及时处理和修复。
通过以上优化方案,可以提高异步导出方案的可靠性和稳定性,确保任务能够及时执行并完成,同时降低了系统的维护成本和风险。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论