Loading... 上篇主要是了解了Java四种定时任务的实现,重点在于 `Spring Task`的使用和策略差异。 传送门: <div class="preview"> <div class="post-inser post box-shadow-wrap-normal"> <a href="http://www.tangsong.fun/index.php/Quartz01.html" target="_blank" class="post_inser_a no-external-link no-underline-link"> <div class="inner-image bg" style="background-image: url(https://blog-picture01.oss-cn-shenzhen.aliyuncs.com/img/20210222111254.PNG);background-size: cover;"></div> <div class="inner-content" > <p class="inser-title">[Quartz]Java四种定时任务</p> <div class="inster-summary text-muted"> Java定时任务的实现方式,一般是以下四种:Timer、ScheduledExecutorService、Spri... </div> </div> </a> <!-- .inner-content #####--> </div> <!-- .post-inser ####--> </div> 而本篇主要是分析**xy-core下的定时任务说明**以及后续的**基于Quartz实现可配置化的定时任务**。 <!--more--> ### 定时任务启动说明 定时任务采用 `Spring Task`实现,默认情况下通过 `java -jar xxxx.jar`命令是不会运行定时任务的。 只有在启动时添加 `--cli`参数定时任务才能运行,可以在cli中指定运行哪些任务,例:`--cli="类名1.方法名1,类名2.方法名2,..."`。 没有指定时,默认运行所有任务。 (原因是权安哥重写的Spring框架中编写了此逻辑,下文有分析) 例: ```java @Component public class TestSchedule { private Logger logger = LoggerFactory.getLogger(TestSchedule.class); @Scheduled(fixedRate = 17 * 1000) protected void cron1() { logger.info("run cron1"); } @Scheduled(fixedRate = 11 * 1000) protected void cron2() { logger.info("run cron2"); } } ``` * `--cli` 将会执行cron1和cron2中的代码 * `--cli="TestSchedule.cron1"` 将只执行cron1中的代码 #### 实现分析 **TaskScheduleEnableAspect** ```java /** * 用于控制定时任务的执行,通过在启动参数中添加--cli配置来确定执行哪些任务 */ @Lazy(false) @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) @Component @Aspect public class TaskScheduleEnableAspect implements EnvironmentAware, PriorityOrdered { private Logger logger = LoggerFactory.getLogger(TaskScheduleEnableAspect.class); //是否开始了定时任务,只有在启动参数中添加了--cli后才会变为true private boolean enableTask = false; //开启的任务名称 key:类名,value:类中方法名称集合, map为空,表示开始所有 private Map<String, Set<String>> taskMethodMap = new HashMap<>(); @Override public void setEnvironment(Environment environment) { String cli = environment.getProperty("cli"); if(cli == null) { return; } enableTask = true; if((cli = cli.trim()).isEmpty()) { return; } try { String[] cliMethods = cli.split(","); for(String cliMethod : cliMethods) { String[] methods = cliMethod.split("\\."); String className = methods[0].trim(); String methodName = methods[1].trim(); taskMethodMap.compute(className, (k, v) -> { if(v == null) { v = new HashSet<>(); } v.add(methodName); return v; }); } } catch (Exception e) { logger.error("脚本格式[" + cli + "]存在错误", e); } } @Around("@annotation(org.springframework.scheduling.annotation.Scheduled)") private void executeTask(ProceedingJoinPoint joinPoint) { //如果没有开始定时任务 if(!enableTask) { return; } //顺便把requestId也重置了,避免每次执行的时候RequestId都一样 SoaBaseParams soaBaseParams = SoaBaseParams.fromThread(); soaBaseParams.setDistinctRequestId(Identities.uuid2()); soaBaseParams.initSerialSequence("0.0"); soaBaseParams.init(); Signature signature = joinPoint.getSignature(); String className = joinPoint.getTarget().getClass().getSimpleName(); MethodSignature methodSignature = (MethodSignature)signature; String methodName = methodSignature.getMethod().getName(); //为空时运行所有任务 if(!taskMethodMap.isEmpty() && MethodSignature.class.isAssignableFrom(signature.getClass())) { if(!isTaskEnable(className, methodName)) { return; } } try { Object[] args = joinPoint.getArgs(); joinPoint.proceed(args); } catch (Throwable t) { logger.error("定时任务调用失败", t); } } /** * 判断是否可以执行任务 true:是; false:否; * @param className * @param methodName * @return */ private boolean isTaskEnable(String className, String methodName) { Set<String> methods = taskMethodMap.get(className); if(methods != null && methods.contains(methodName)) { return true; } return false; } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } } ``` **TaskSingleAspect** ```java /** * 集群环境下的单任务处理 * @author cqa 2019/2/18 6:53 PM */ @Lazy(false) @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) @Component @Aspect public class TaskSingleAspect { private Logger logger = LoggerFactory.getLogger(TaskSingleAspect.class); private static final String TASK_LOCK_KEY = "task:lock:%s:%s"; @Autowired(required = false) @Qualifier("lockStringRedisTemplate") private StringRedisTemplate lockStringRedisTemplate; @Around("@annotation(org.springframework.scheduling.annotation.Scheduled)") private void executeTask(ProceedingJoinPoint joinPoint) { Signature signature = joinPoint.getSignature(); String className = joinPoint.getTarget().getClass().getSimpleName(); MethodSignature methodSignature = (MethodSignature)signature; String methodName = methodSignature.getMethod().getName(); try { Object[] args = joinPoint.getArgs(); SingleTask singleTask = methodSignature.getMethod().getAnnotation(SingleTask.class); if(singleTask != null) { //加锁失败,不执行 if(!lock(className, methodName, singleTask.value())) { logger.debug("\"{}.{}\"任务已经在运行中", className, methodName); return; } //加锁成功,执行 joinPoint.proceed(args); //执行完,释放锁 unlock(className, methodName); return; } joinPoint.proceed(args); } catch (Throwable t) { logger.error("定时任务调用失败", t); } } /** * 判断任务是否已经在运行中,是返回true; 否则返回false; * @param className * @param methodName * @return */ private boolean lock(String className, String methodName, int lockSecond) { Objects.requireNonNull(lockStringRedisTemplate, "未指定分布式锁reids配置,请先添加名称为\"lock\"的redis配置"); long timeMills = System.currentTimeMillis(); String key = String.format(TASK_LOCK_KEY, className, methodName); Boolean isLock = lockStringRedisTemplate.opsForValue().setIfAbsent(key, String.valueOf(timeMills), lockSecond, TimeUnit.SECONDS); if(Boolean.TRUE.equals(isLock)) { return true; } //为空,可能是因为上一个任务刚刚结束,此时缓存里没有任何东西,下一个任务进来时将直接运行。因此这里直接返回false,加锁失败。 String lastTimeMills = lockStringRedisTemplate.opsForValue().get(key); if(StringUtils.isBlank(lastTimeMills)) { return false; } if(timeMills - Long.valueOf(lastTimeMills) > lockSecond * 1000) { String curTimeMills = lockStringRedisTemplate.opsForValue().getAndSet(key, String.valueOf(timeMills)); if(StringUtils.equals(lastTimeMills, curTimeMills)) { return true; } } return false; } private void unlock(String className, String methodName) { String key = String.format(TASK_LOCK_KEY, className, methodName); lockStringRedisTemplate.delete(key); } } ``` ### `@SingleTask`注解 * 作用在定时任务方法上,用于标记一个方法在集群环境下只能执行一次,利用redis的分布式锁来实现集群环境的任务互斥,因此需要配置一个名称为"lock"的redis连接 * 注解参数 `value`用于设定最长等待时间,超过该时间还是不能获取到分布式锁,仍然运行程序,单位为秒 #### 实现分析 ```java @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SingleTask { /** * 最长等待时间,超过该时间还是不能获取到分布式锁,仍然运行程序,单位为秒<br/> * 这在任务完成后,释放锁失败的情况下会很有用 */ int value() default 30; } ``` Last modification:August 21, 2022 © Allow specification reprint Like 0 喵ฅฅ