Loading...

上篇主要是了解了Java四种定时任务的实现,重点在于 Spring Task的使用和策略差异。
传送门:

而本篇主要是分析**xy-core下的定时任务说明**以及后续的**基于Quartz实现可配置化的定时任务**。

定时任务启动说明

定时任务采用 Spring Task实现,默认情况下通过 java -jar xxxx.jar命令是不会运行定时任务的。
只有在启动时添加 --cli参数定时任务才能运行,可以在cli中指定运行哪些任务,例:--cli="类名1.方法名1,类名2.方法名2,..."
没有指定时,默认运行所有任务。
(原因是权安哥重写的Spring框架中编写了此逻辑,下文有分析)

例:

@Component public class TestSchedule { private Logger logger = LoggerFactory.getLogger(TestSchedule.class); @Scheduled(fixedRate = 17 * 1000) protected void cron1() { logger.info("run cron1"); } @Scheduled(fixedRate = 11 * 1000) protected void cron2() { logger.info("run cron2"); } }
  • --cli 将会执行cron1和cron2中的代码
  • --cli="TestSchedule.cron1" 将只执行cron1中的代码

实现分析

TaskScheduleEnableAspect

/** * 用于控制定时任务的执行,通过在启动参数中添加--cli配置来确定执行哪些任务 */ @Lazy(false) @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) @Component @Aspect public class TaskScheduleEnableAspect implements EnvironmentAware, PriorityOrdered { private Logger logger = LoggerFactory.getLogger(TaskScheduleEnableAspect.class); //是否开始了定时任务,只有在启动参数中添加了--cli后才会变为true private boolean enableTask = false; //开启的任务名称 key:类名,value:类中方法名称集合, map为空,表示开始所有 private Map<String, Set<String>> taskMethodMap = new HashMap<>(); @Override public void setEnvironment(Environment environment) { String cli = environment.getProperty("cli"); if(cli == null) { return; } enableTask = true; if((cli = cli.trim()).isEmpty()) { return; } try { String[] cliMethods = cli.split(","); for(String cliMethod : cliMethods) { String[] methods = cliMethod.split("\\."); String className = methods[0].trim(); String methodName = methods[1].trim(); taskMethodMap.compute(className, (k, v) -> { if(v == null) { v = new HashSet<>(); } v.add(methodName); return v; }); } } catch (Exception e) { logger.error("脚本格式[" + cli + "]存在错误", e); } } @Around("@annotation(org.springframework.scheduling.annotation.Scheduled)") private void executeTask(ProceedingJoinPoint joinPoint) { //如果没有开始定时任务 if(!enableTask) { return; } //顺便把requestId也重置了,避免每次执行的时候RequestId都一样 SoaBaseParams soaBaseParams = SoaBaseParams.fromThread(); soaBaseParams.setDistinctRequestId(Identities.uuid2()); soaBaseParams.initSerialSequence("0.0"); soaBaseParams.init(); Signature signature = joinPoint.getSignature(); String className = joinPoint.getTarget().getClass().getSimpleName(); MethodSignature methodSignature = (MethodSignature)signature; String methodName = methodSignature.getMethod().getName(); //为空时运行所有任务 if(!taskMethodMap.isEmpty() && MethodSignature.class.isAssignableFrom(signature.getClass())) { if(!isTaskEnable(className, methodName)) { return; } } try { Object[] args = joinPoint.getArgs(); joinPoint.proceed(args); } catch (Throwable t) { logger.error("定时任务调用失败", t); } } /** * 判断是否可以执行任务 true:是; false:否; * @param className * @param methodName * @return */ private boolean isTaskEnable(String className, String methodName) { Set<String> methods = taskMethodMap.get(className); if(methods != null && methods.contains(methodName)) { return true; } return false; } @Override public int getOrder() { return Ordered.HIGHEST_PRECEDENCE; } }

TaskSingleAspect

/** * 集群环境下的单任务处理 * @author cqa 2019/2/18 6:53 PM */ @Lazy(false) @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON) @Component @Aspect public class TaskSingleAspect { private Logger logger = LoggerFactory.getLogger(TaskSingleAspect.class); private static final String TASK_LOCK_KEY = "task:lock:%s:%s"; @Autowired(required = false) @Qualifier("lockStringRedisTemplate") private StringRedisTemplate lockStringRedisTemplate; @Around("@annotation(org.springframework.scheduling.annotation.Scheduled)") private void executeTask(ProceedingJoinPoint joinPoint) { Signature signature = joinPoint.getSignature(); String className = joinPoint.getTarget().getClass().getSimpleName(); MethodSignature methodSignature = (MethodSignature)signature; String methodName = methodSignature.getMethod().getName(); try { Object[] args = joinPoint.getArgs(); SingleTask singleTask = methodSignature.getMethod().getAnnotation(SingleTask.class); if(singleTask != null) { //加锁失败,不执行 if(!lock(className, methodName, singleTask.value())) { logger.debug("\"{}.{}\"任务已经在运行中", className, methodName); return; } //加锁成功,执行 joinPoint.proceed(args); //执行完,释放锁 unlock(className, methodName); return; } joinPoint.proceed(args); } catch (Throwable t) { logger.error("定时任务调用失败", t); } } /** * 判断任务是否已经在运行中,是返回true; 否则返回false; * @param className * @param methodName * @return */ private boolean lock(String className, String methodName, int lockSecond) { Objects.requireNonNull(lockStringRedisTemplate, "未指定分布式锁reids配置,请先添加名称为\"lock\"的redis配置"); long timeMills = System.currentTimeMillis(); String key = String.format(TASK_LOCK_KEY, className, methodName); Boolean isLock = lockStringRedisTemplate.opsForValue().setIfAbsent(key, String.valueOf(timeMills), lockSecond, TimeUnit.SECONDS); if(Boolean.TRUE.equals(isLock)) { return true; } //为空,可能是因为上一个任务刚刚结束,此时缓存里没有任何东西,下一个任务进来时将直接运行。因此这里直接返回false,加锁失败。 String lastTimeMills = lockStringRedisTemplate.opsForValue().get(key); if(StringUtils.isBlank(lastTimeMills)) { return false; } if(timeMills - Long.valueOf(lastTimeMills) > lockSecond * 1000) { String curTimeMills = lockStringRedisTemplate.opsForValue().getAndSet(key, String.valueOf(timeMills)); if(StringUtils.equals(lastTimeMills, curTimeMills)) { return true; } } return false; } private void unlock(String className, String methodName) { String key = String.format(TASK_LOCK_KEY, className, methodName); lockStringRedisTemplate.delete(key); } }

@SingleTask注解

  • 作用在定时任务方法上,用于标记一个方法在集群环境下只能执行一次,利用redis的分布式锁来实现集群环境的任务互斥,因此需要配置一个名称为"lock"的redis连接
  • 注解参数 value用于设定最长等待时间,超过该时间还是不能获取到分布式锁,仍然运行程序,单位为秒

实现分析

@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SingleTask { /** * 最长等待时间,超过该时间还是不能获取到分布式锁,仍然运行程序,单位为秒<br/> * 这在任务完成后,释放锁失败的情况下会很有用 */ int value() default 30; }
Last modification:August 21, 2022
喵ฅฅ