上篇主要是了解了Java四种定时任务的实现,重点在于 Spring Task
的使用和策略差异。
传送门:
定时任务启动说明
定时任务采用 Spring Task
实现,默认情况下通过 java -jar xxxx.jar
命令是不会运行定时任务的。
只有在启动时添加 --cli
参数定时任务才能运行,可以在cli中指定运行哪些任务,例:--cli="类名1.方法名1,类名2.方法名2,..."
。
没有指定时,默认运行所有任务。
(原因是权安哥重写的Spring框架中编写了此逻辑,下文有分析)
例:
@Component
public class TestSchedule {
private Logger logger = LoggerFactory.getLogger(TestSchedule.class);
@Scheduled(fixedRate = 17 * 1000)
protected void cron1() {
logger.info("run cron1");
}
@Scheduled(fixedRate = 11 * 1000)
protected void cron2() {
logger.info("run cron2");
}
}
--cli
将会执行cron1和cron2中的代码--cli="TestSchedule.cron1"
将只执行cron1中的代码
实现分析
TaskScheduleEnableAspect
/**
* 用于控制定时任务的执行,通过在启动参数中添加--cli配置来确定执行哪些任务
*/
@Lazy(false)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
@Component
@Aspect
public class TaskScheduleEnableAspect implements EnvironmentAware, PriorityOrdered {
private Logger logger = LoggerFactory.getLogger(TaskScheduleEnableAspect.class);
//是否开始了定时任务,只有在启动参数中添加了--cli后才会变为true
private boolean enableTask = false;
//开启的任务名称 key:类名,value:类中方法名称集合, map为空,表示开始所有
private Map<String, Set<String>> taskMethodMap = new HashMap<>();
@Override
public void setEnvironment(Environment environment) {
String cli = environment.getProperty("cli");
if(cli == null) {
return;
}
enableTask = true;
if((cli = cli.trim()).isEmpty()) {
return;
}
try {
String[] cliMethods = cli.split(",");
for(String cliMethod : cliMethods) {
String[] methods = cliMethod.split("\\.");
String className = methods[0].trim();
String methodName = methods[1].trim();
taskMethodMap.compute(className, (k, v) -> {
if(v == null) {
v = new HashSet<>();
}
v.add(methodName);
return v;
});
}
} catch (Exception e) {
logger.error("脚本格式[" + cli + "]存在错误", e);
}
}
@Around("@annotation(org.springframework.scheduling.annotation.Scheduled)")
private void executeTask(ProceedingJoinPoint joinPoint) {
//如果没有开始定时任务
if(!enableTask) {
return;
}
//顺便把requestId也重置了,避免每次执行的时候RequestId都一样
SoaBaseParams soaBaseParams = SoaBaseParams.fromThread();
soaBaseParams.setDistinctRequestId(Identities.uuid2());
soaBaseParams.initSerialSequence("0.0");
soaBaseParams.init();
Signature signature = joinPoint.getSignature();
String className = joinPoint.getTarget().getClass().getSimpleName();
MethodSignature methodSignature = (MethodSignature)signature;
String methodName = methodSignature.getMethod().getName();
//为空时运行所有任务
if(!taskMethodMap.isEmpty() && MethodSignature.class.isAssignableFrom(signature.getClass())) {
if(!isTaskEnable(className, methodName)) {
return;
}
}
try {
Object[] args = joinPoint.getArgs();
joinPoint.proceed(args);
} catch (Throwable t) {
logger.error("定时任务调用失败", t);
}
}
/**
* 判断是否可以执行任务 true:是; false:否;
* @param className
* @param methodName
* @return
*/
private boolean isTaskEnable(String className, String methodName) {
Set<String> methods = taskMethodMap.get(className);
if(methods != null && methods.contains(methodName)) {
return true;
}
return false;
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
TaskSingleAspect
/**
* 集群环境下的单任务处理
* @author cqa 2019/2/18 6:53 PM
*/
@Lazy(false)
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
@Component
@Aspect
public class TaskSingleAspect {
private Logger logger = LoggerFactory.getLogger(TaskSingleAspect.class);
private static final String TASK_LOCK_KEY = "task:lock:%s:%s";
@Autowired(required = false)
@Qualifier("lockStringRedisTemplate")
private StringRedisTemplate lockStringRedisTemplate;
@Around("@annotation(org.springframework.scheduling.annotation.Scheduled)")
private void executeTask(ProceedingJoinPoint joinPoint) {
Signature signature = joinPoint.getSignature();
String className = joinPoint.getTarget().getClass().getSimpleName();
MethodSignature methodSignature = (MethodSignature)signature;
String methodName = methodSignature.getMethod().getName();
try {
Object[] args = joinPoint.getArgs();
SingleTask singleTask = methodSignature.getMethod().getAnnotation(SingleTask.class);
if(singleTask != null) {
//加锁失败,不执行
if(!lock(className, methodName, singleTask.value())) {
logger.debug("\"{}.{}\"任务已经在运行中", className, methodName);
return;
}
//加锁成功,执行
joinPoint.proceed(args);
//执行完,释放锁
unlock(className, methodName);
return;
}
joinPoint.proceed(args);
} catch (Throwable t) {
logger.error("定时任务调用失败", t);
}
}
/**
* 判断任务是否已经在运行中,是返回true; 否则返回false;
* @param className
* @param methodName
* @return
*/
private boolean lock(String className, String methodName, int lockSecond) {
Objects.requireNonNull(lockStringRedisTemplate, "未指定分布式锁reids配置,请先添加名称为\"lock\"的redis配置");
long timeMills = System.currentTimeMillis();
String key = String.format(TASK_LOCK_KEY, className, methodName);
Boolean isLock = lockStringRedisTemplate.opsForValue().setIfAbsent(key, String.valueOf(timeMills), lockSecond, TimeUnit.SECONDS);
if(Boolean.TRUE.equals(isLock)) {
return true;
}
//为空,可能是因为上一个任务刚刚结束,此时缓存里没有任何东西,下一个任务进来时将直接运行。因此这里直接返回false,加锁失败。
String lastTimeMills = lockStringRedisTemplate.opsForValue().get(key);
if(StringUtils.isBlank(lastTimeMills)) {
return false;
}
if(timeMills - Long.valueOf(lastTimeMills) > lockSecond * 1000) {
String curTimeMills = lockStringRedisTemplate.opsForValue().getAndSet(key, String.valueOf(timeMills));
if(StringUtils.equals(lastTimeMills, curTimeMills)) {
return true;
}
}
return false;
}
private void unlock(String className, String methodName) {
String key = String.format(TASK_LOCK_KEY, className, methodName);
lockStringRedisTemplate.delete(key);
}
}
@SingleTask
注解
- 作用在定时任务方法上,用于标记一个方法在集群环境下只能执行一次,利用redis的分布式锁来实现集群环境的任务互斥,因此需要配置一个名称为"lock"的redis连接
- 注解参数
value
用于设定最长等待时间,超过该时间还是不能获取到分布式锁,仍然运行程序,单位为秒
实现分析
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface SingleTask {
/**
* 最长等待时间,超过该时间还是不能获取到分布式锁,仍然运行程序,单位为秒<br/>
* 这在任务完成后,释放锁失败的情况下会很有用
*/
int value() default 30;
}