开发者

Spring动态管理定时任务之ThreadPoolTaskScheduler解读

开发者 https://www.devze.com 2022-12-29 10:30 出处:网络 作者: DayDayUp丶
目录Spring动态管理定时任务ThreadPoolTaskScheduler实现思路ThreadPoolTaskScheduler 定时任务实现总结Spring动态管理定时任务ThreadPoolTaskScheduler
目录
  • Spring动态管理定时任务ThreadPoolTaskScheduler
    • 实现思路
  • ThreadPoolTaskScheduler 定时任务实现
    • 总结

      Spring动态管理定时任务ThreadPoolTaskScheduler

      Spring任务调度核心类ThreadPoolTaskScheduler,API文档解释如下:

      Implementation of Spring's TaskScheduler interface, wrapping a native Java.util.concurrent.ScheduledThreadPoolExecutor.

      Spring的TaskScheduler接口的实现,包装了一个本地java.util.concurrent.ScheduledThreadPoolExecutor。

      实现思路

      注入调度类bean,初始化一个ConcurrentHashMap容器,用来保存多个定时任务的状态,每一个任务的运行状态被封装在ScheduledFuture中,借此类可取消对应的定时任务。

      import java.time.LocalDateTime;
      import java.util.Map;
      import java.util.Set;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.ScheduledFuture;
       
      import javax.annotation.Resource;
       
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.context.annotation.Bean;
      import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
      import org.springframework.scheduling.support.CronTrigger;
      import org.springframework.web.bind.annotation.PathVariable;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
       
      import com.cjia.spidercommon.model.SpiderJob;
      import com.cjia.spiderjob.mapper.SpiderJobMapper;
       
      import lombok.Data;
      import lombok.extern.slf4j.Slf4j;
       
      /**
       * 用来管理(启动、停止、新增、删除、更新编辑、查看运行状态)定时任务(增量任务)
       */
      @Slf4j
      @RestController
      @RequestMapping("spiderJob/cron")
      public class CronJobController extends SpiderJobController {
       
          @Autowired
          private ThreadPoolTaskScheduler threadPoolTaskSchedphpuler;
       
          private Map<Integer, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
       
          @Resource
          private SpiderJobMapper spiderJobMapper;
       
          @Bean
          public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
              return new ThreadPoolTaskScheduler();
          }
       
          /**
           * 启动单个定时任务
           */
          @RequestMapping("/start/{jobId}")
          public String start(@PathVariable Integer jobId) {
              SpiderJob job = spiderJobMapper.selectById(jobId);
              if (job == null) {
                  log.warn("任务[{}]已不存在,无法启动!", jobId);
                  return "任务[" + jobId + "]已不存在,无法启动!";
              }
              int enable = job.getEnable();
              if (enable == 0) {
                  log.warn("任务[{}]已被禁用,无法启动!", jobId);
                  return "任务[" + jobId + "]已被禁用,无法启动!";
              }
              // 检测该任务是否已在运行调度中
              if (futureMap.get(jobId) != null) {
                  log.warn("任务[{}]已在调度运行,无法重复启动!", jobId);
                  return "任务[" + jobId + "]已在调度运行,无法重复启动!";
              }
              String cron = job.getCron();
              // TODO check cron
              ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(job), new CronTrigger(cron));
              log.info("任务[{}]已被启动!", jobId);
              futureMap.put(jobId, future);
              return "任务[" + jobId + "]已被启动!";
          }
       
          /**
           * 批量启动定时任务
           */
          @RequestMapping("/startBATch/{jobIds}")
          public String startBatch(@PathVariable String jobIds) {
              // TODO jobIds valid
              String[] jobIdsArr = jobIds.split(",");
              StringBuffer sb = new StringBuffer();
              for (String jobId : jobIdsArr) {
                  String result = start(Integer.valueOf(jobId));
                  sb.append(result).append("<br>");
              }
              return sb.toString();
          }
       
          /**
           * 停止单个定时任务
           */
          @RequestMapping("/stop/{jobId}")
          public String stop(@PathVariable Integer jobId) {
              // 检测该任务是否已在运行调度中
              ScheduledFuture<?> future = futureMap.get(jobId);
              if (future == null) {
                  log.warn("任务[{}]已不在调度中,无法停止!", jobId);
                  return "任务[" + jobId + "]已不在调度中,无法停止!";
              } else {
                  future.cancel(true);
                  futureMap.remove(jobId);
                  log.info("任务[{}]已被停止!", jobId);
                  return "任务[" + jobId + "]已被停止!";
              }
          }
       
          /**
           * 批量停止定时任务
           */
          @RequestMapping("/stopBatch/{jobIds}")
          public String stopBatch(@PathVariable String jobIds) {
              // TODO jobIds valid
              String[] jobIdsArr = jobIds.split(",");
              StringBuffer sb = new StringBuffer();
              for (String jobId : jobIdsArr) {
                  String result = stop(Integer.valueOf(jobId));
                  sb.append(result).append("<br>");
              }
              return sb.toString();
          }
       
          /**
           * 查看当前时刻调度中的定时任务
           */
          @RequestMapping("/status")
          public String getAllStatus() {
              Set<Integer> runningKeys = futureMap.keySet();
              return "当前正在调度的任务列表:" + runningKeys.toString();
          }
       
          @Data
          private class MyRunnable implements Runnable {
              private SpiderJob job;
       
              public MyRhttp://www.devze.comunnable(SpiderJob job) {
                  this.job = job;
              }
       
              @Override
              public void run() {
                  log.info("运行定时任务[{}: {}] at {}!", job.getId(), job.getBizName(), LocalDateTime.now());
                  executeIncrementJob(job.getBizName());
              }
          }
       
      }

      ThreadPoolTaskScheduler 定时任务实现

      org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler定时任务调度线程池

      CREATE TABLE `sys_job` (
        `id` bigint(20) NOT NULL COMMENT '任务key',
        `job_name` varchar(64) NOT NULL COMMENT '任务名称',
        `bean_class` varchar(128) NOT NULL COMMENT '类路径',
        `cron_expression` varchar(64) NOT NULL COMMENT 'cron表达式',
        `status` tinyint(1) NOT NULL COMMENT '状态值 @JobStatusEnum 详见具体枚举类',
        `is_deleted` tinyint(1) DEFAULT '0' COMMENT '删除标识 1是 0否',
        `create_time` datetime DEFAULT NULL,
        `update_time` datetime DEFAULT NULL,
        PRIMARY KEY (`id`) USING BTREE
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
      
      @Configuration
      @Slf4j
      public class SchedulingConfigure {
      
          @Bean
          public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
              log.info("开始创建定时任务调度线程池");
              ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
              threadPoolTaskScheduler.setPoolSize(20);
              threadPoolTaskScheduler.setThreadNamePrefix("schedule-task-");
              threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
              threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
              log.info("创建定时任务调度线程池完成!");
              return threadPoolTaskScheduler;
          }
      }
      public enum JobStatusEnum {
      
          /**
           * 未加入调度器
           */
          NOT_SCHEDULE(0, "未加入调度器"),
      
          /**
           * 加入调度器,但未运行
           */
          SCHEDULED_BUT_NOT_RUNNING(1, "加入调度器,但未运行"),
      
          /**
           * 从调度器中已删除
           */
          DELETED(2, "从调度器中已删除"),
          ;
      
          private Integer status;
      
          private String detail;
      
          JobStatusEnum(Integer status, String detail) {
              this.status = status;
              this.detail = detail;
          }
      
          public Integer getStatus() {
              return status;
          }
      
          public void setStatus(Integer status) {
              this.status = status;
          }
      
          public String getDetail() {
              return detail;
          }
      
          public void setDetail(String detail) {
              this.detail = detail;
          }
      }
      
      @Component
      @Slf4j
      public class ScheduledJobService {
      
          private final ReentrantLock lock = new ReentrantLock();
      
          @Autowired
          private ThreadPoolTaskScheduler threadPoolTaskScheduler;
          @Autowired
          private SysJobService jobService;
          @Autowired
          private SpringBeanUtils springBeanUtils;
      
          /**
           * 已经加入调度器的任务map
           */
          private final ConcurrentHashMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();
      
          /**
           * 初始化启动任务
           *
           * @param sysJobs 数据库任务集合
           */
          public void initAllJob(List<SysJob> sysJobs) {
              if (CollectionUtils.isEmpty(sysJobs)) {
                  return;
              }
              for (SysJob sysJob : sysJobs) {
                  if (JobStatusEnum.NOT_SCHEDULE.getStatus().equals(sysJob.getStatus())
                          || JobStatusEnum.DELETED.getStatus().equals(sysJob.getStatus())
                          || this.isScheduled(sysJob.getId())) {
                      // 任务初始化状态或已删除或已加载到调度器中
                      continue;
                  }
                  // 将任务加入调度器
                  this.DOScheduleJob(sysJob);
              }
          }
      
          /**
           * 启动任务
           *
           * @param jobId job主键id
           */
          public void start(Long jobId) {
              log.info("启动任务:-> jobId_{}", jobId);
              // 加入调度器
              schedule(jobId);
              log.info("启动任务结束:-> jobId_{}", jobId);
              // 更新任务状态
              jobService.updateJobStatus(jobId, JobStatusEnum.SCHEDULED_BUT_NOT_RUNNING.getStatus());
          }
      
          /**
           * 停止任务
           *
           * @param jobId job主键id
           */
          public void stop(Long jobId) {
              log.info("停止任务:-> jobId_{}", jobId);
              // 取消任务
              cancel(jobId);
              log.info(http://www.devze.com"停止任务结束:-> jobId_{}", jobId);
              // 更新表中任务状态为已停止
              jobService.updateJobStatus(jobId, JobStatusEnum.NOT_SCHEDULE.getStatus());
          }
      
          /**
           * 移除任务
           *
           * @paramjavascript jobId job主键id
           */
          public void remove(Long jobId) {
              log.info("移除任务:-> jobId_{}", jobId);
              // 取消任务
              cancel(jobId);
              log.info("移除任务结束:-> jobId_{}", jobId);
              // 更新表中任务状态为已删除
              jobService.updateJobStatus(jobId, JobStatusEnum.DELETED.getStatus());
          }
      
          /**
           * 取消
           *
           * @param jobId 工作id
           */
          private void cancel(Long jobId) {
              // 任务是否存在
              if (scheduledFutureMap.containsKey(jobId)) {
                  ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(jobId);
                  if (!scheduledFuture.isCancelled()) {
                      // 取消调度
                      scheduledFuture.cancel(true);
                  }
              }
          }
      
          private void schedule(Long jobId) {
              // 添加锁,只允许单个线程访问,防止任务启动多次
              lock.lock();
              try {
                  if (isScheduled(jobId)) {
                      log.error("任务jobId_{}已经加入调度器,无需重复操作", jobId);
                      return;
                  }
                  // 通过jobKey查询jobBean对象
                  SysJob sysJob = jobService.getById(jobId);
                  // 启动定时任务
                  doScheduleJob(sysJob);
              } finally {
                  // 释放锁资源
                  lock.unlock();
              }
          }
      
          /**
           * 执行启动任务
           *
           * @param sysJob 任务实体类对象
           */
          private void doScheduleJob(SysJob sysJob) {
              Long jobId = sysJob.getId();
              String beanClass = sysJob.getBeanClass();
              String jobName = sysJob.getJobName();
              String cron = sysJob.getCronExpression();
              // 从Spring中获取目标的job业务实现类
              ScheduledJob scheduledJob = parseFrom(beanClass);
              if (scheduledJob == null) {
                  return;
              }
              scheduledJob.setJobId(jobId);
              scheduledJob.setJobName(jobName);
      
              ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledJob,
                      triggerContext -> {
                          CronTrigger cronTrigger = new CronTrigger(cron);
                          return cronTrigger.nextExecutionTime(triggerContext);
                      });
      
              log.info("任务加入调度器 -> jobId:{},jobName:{}", jobId, jobName);
      
              // 将启动的任务放入map
              assert scheduledFuture != null;
              scheduledFutureMap.put(jobId, scheduledFuture);
          }
      
          /**
           * 任务是否已经进入调度器
           *
           * @param jobId 任务主键key
           * @return {@link Boolean}
           */
          private Boolean isScheduled(Long jobId) {
              if (scheduledFutureMap.containsKey(jobId)) {
                  return !scheduledFutureMap.get(jobId).isCancelled();
              }
              return false;
          }
      
          private ScheduledJob parseFrom(String beanClass) {
              try {
                  Class<?> clazz = Class.forName(beanClass);
                  return (ScheduledJob) springBeanUtils.getBean(clazz);
              } catch (ClassNotFoundException e) {
                  e.printStackTrace();
              }
              return null;
          }
      }
      
      @Component
      public class SpringBeanUtils implements ApplicationContextAware {
      
          private static ApplicationContext applicationContext;
      
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              SpringBeanUtils.applicationContext = applicationContext;
          }
      
          /**
           * 获取applicationContext
           */
          public static ApplicationContext getApplicationContext() {
              return applicationContext;
          }
      
          /**
           * 通过name获取 开发者_Python培训Bean.
           */
          public  Object getBean(String name) {
              return getApplicationContext().getBean(name);
          }
      
          /**
           * 通过class获取Bean.
           */
          public  <T> T getBean(Class<T> clazz) {
              return getApplicationContext().getBean(clazz);
          }
      
          /**
           * 通过name,以及Clazz返回指定的Bean
           */
          public  <T> T getBean(String name, Class<T> clazz) {
              return getApplicationContext().getBean(name, clazz);
          }
      }
      
      @Data
      public abstract class ScheduledJob implements Runnable {
          
          /**
          * 任务主键id
          */
          private Long jobId;
          
          /**
          * 任务名
          */
          private String jobName;
      }
      
      
      @Component
      public class SchedulerTestDemo extends ScheduledJob {
      
          @Override
          public void run() {
              System.out.println("我是定时任务要执行的类..");
              System.out.println(SchedulerTestDemo.class.getName() + ":" + LocalDateTime.now());
          }
      
      }
      
      /**
       * 项目启动时,将数据库中job定时任务加载
       */
      @Component
      public class GrapeApplicationListener {
      
          private final ScheduledJobService scheduledJobService;
          private final ISysJobService sysJobService;
      
          public GrapeApplicationListener(ISysJobService sysJobService, ScheduledJobService scheduledJobService) {
              this.sysJobService = sysJobService;
              this.scheduledJobService = scheduledJobService;
          }
      
          @PostConstruct
          public void initStartJob() {
          	// 初始化job
              scheduledJobService.initAllJob(sysJobService.list());
          }
      }
      
      @SpringBootApplication(scanBasePackages = {"com.example.grape"})
      @MapperScan("com.example.grape.dao.mapper")
      @EnableScheduling
      public class GrapeApplication {
      
          public static void main(String[] args) {
              SpringApplication.run(GrapeApplication.class, argsandroid);
          }
      
      }
      

      总结

      以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

      0

      精彩评论

      暂无评论...
      验证码 换一张
      取 消