注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git.
从本文开始,我们将用三篇文章来介绍xxl-job
最核心的功能——xxl-job
任务执行流程。xxl-job
任务的执行包含3个组件:调度器、触发器、执行器,本文将重点介绍调度器的流程。
xxl-job
调度器位于admin
进程,主要功能为精准获取将要执行的任务,处理调度器的方法为JobScheduleHelper#start
,在《admin启动流程》一文中也提到过该方法,不过当时并未展开讨论,在本文中将详细分析该方法。
JobScheduleHelper#start
代码如下:
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
public void run() {
// 省略run()方法的内容,下面再展开
...
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(new Runnable() {
public void run() {
// 省略run()方法的内容,下面再展开
...
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
可以看到,这个方法启动了两个线程:
scheduleThread
:调度线程,用来获取需要执行的任务ringThread
:时间轮线程,用来精准控制任务的执行时间点
任务的调度就是依靠以上两个线程来完成的,我们继续。
scheduleThread
我们先来分析scheduleThread
,查看其 run()
方法:
@Override
public void run() {
// 休眠,表示线程启动5s后才去执行任务
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms,
// qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax()
+ XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
// 调整数据库连接的自动提交方式,为false表示手动提交,后面会看到 conn.commit() 的调用
conn.setAutoCommit(false);
// sql 语句后加 for update,表示获取数据库的写锁(排他锁)
preparedStatement = conn.prepareStatement(
"select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();
// 能执行到这里,就表示获得了锁
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
// 查出未来5s内需要执行的任务
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig()
.getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
// 处理过期任务:执行触发时间超过了5s还没执行
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = "
+ jobInfo.getId());
// 1、misfire match
// 处理过期的任务,处理策略有 FIRE_ONCE_NOW(立即执行1次)
// 与 DO_NOTHING(什么也不做),默认是 DO_NOTHING
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(
jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
// 触发过期的任务,这里的 trigger(...) 就是触发任务的方法了
JobTriggerPoolHelper.trigger(jobInfo.getId(),
TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger :"
+ " jobId = " + jobInfo.getId() );
}
// 2、fresh next
// 更新最近一次执行时间以及下次执行时间,这里只是设置值,数据库的更新操作在下面执行
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 处理立即执行的任务:过了执行时间,但执行时间与当前时间相差不足5s,立即执行1次
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON,
-1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = "
+ jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again
// 执行完之后,还需要再次执行,加入时间轮
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS
> jobInfo.getTriggerNextTime()) {
// 1、make ring second
// 执行时间的秒数,比如任务的执行时间为 11:02:05,这里得到的就是秒数 5
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 未来5秒内执行的任务,加入时间轮
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 在使用数据库连接池时,close()方法表示将连接归还到连接池中
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
// 加载到了接下来的任务,就休眠1s,否则休眠5s
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS)
- System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
方法有点长,但实际并不复杂,对照着代码中的注释,大体上应该也能明白这个方法做了什么,不明白也没关系,接下来我们对这个方法具体分析。
1. 休眠,对齐执行时间
// 休眠,表示线程启动5s后才去执行任务
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
之所以不是一启动就去获取要执行的任务,主要是为了等待准备工作的完成(如触发线程的启动)。在设置休眠时间时,xxl-job
是这么处理的:
5000 - System.currentTimeMillis()%1000
前面的5000
表示5s这没问题,后面的System.currentTimeMillis()%1000
是几个意思呢?
我们都知道,System.currentTimeMillis()
得到的是一个精确到毫秒的时间戳,将其与1000
求余,得到的就是当时时刻的毫秒数了。
那么5000 - 当前时刻的毫秒数
究竟有什么意义呢?我们想一想,假设当前时间是2022-05-12 12:02:12.324
,当前时刻的毫秒数
是324
,如果直接休眠5s
,休眠后的时间是2022-05-12 12:02:17.324
,而如果休眠时间是5000 - 324 = 4676
毫秒呢?这样休眠后的时间就是2022-05-12 12:02:17.000
,可以看到,这样就凑成了整秒数。
这样我们就明白了,休眠时间设置为5000 - System.currentTimeMillis()%1000
,是为了整秒执行线程里的内容。
2. 每次查询的任务数
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax()
+ XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
这个值设置的是每次获取的任务数,其实就是限制每次从数据库中获取的记录数,值为线程池的总线程数的20倍,之所以有getTriggerPoolFastMax()
与getTriggerPoolSlowMax()
,是因为xxl-job
会根据任务触发时间,将任务分为“触发慢的任务”与“触发快的任务”,分别对应一个线程池来处理,这块是xxl-job
触发流程的内容,我们在下一篇文章中再详细叙述。
getTriggerPoolFastMax()
与getTriggerPoolSlowMax()
的值是多少呢?在application.properties
中这样的配置:
## xxl-job, triggerpool max size
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
这就是用来配置这两个线程池的线程数的,这样算下来,preReadCount = (200 + 100)* 20 = 6000
,即每次查询从数据库中最多可获取6000个待执行的任务。
3. 在 while 循环中获取需要执行的任务
接下来的代码都在一个while
循环体中,代码如下:
while (!scheduleThreadToStop) {
long start = System.currentTimeMillis();
// 获取需要执行的任务
...
long cost = System.currentTimeMillis()-start;
if (cost < 1000) {
try {
// 加载到了接下来的任务,就休眠1s,否则休眠5s
// 加载到了任务时,preReadSuc为ture
TimeUnit.MILLISECONDS.sleep((preReadSuc ? 1000 : PRE_READ_MS)
- System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
为了能清晰看到代码的脉络,以上代码省略了任务获取逻辑。
对于代码中的while (!scheduleThreadToStop)
,想必小伙伴已经很熟悉了,xxl-job
中需要一直执行的线程都是这么个套路,scheduleThreadToStop
的值又会是一个stop
方法中被修改,这块我们就不展示了,有兴趣的小伙伴可自行查看JobScheduleHelper#toStop
。
while
循环体中的代码,代码运行到最后,依然是进行休眠操作,这里需要重点说明下他的休眠处理。
在休眠前,会先判断cost < 1000
,这里的cost
是“获取待执行任务”这个操作消耗的时间,当消耗的时间小于1000毫秒时才进入休眠操作。为何大于等于1000时,不用休眠呢?
xxl-job
任务调度的最小精度是秒,因此理想情况下,调度线程应该每秒去数据库中获取下将要执行的任务,对于cost < 1000
的情况就应该休眠下等到下一秒再去执行;对于cost >= 1000
,已经过了1秒了,不用再休眠,直接执行就可以了。
再来看看休眠时间的处理:
(preReadSuc ? 1000 : PRE_READ_MS) - System.currentTimeMillis() % 1000
对上述代码中出现的两个变量做下说明:
preReadSuc
:当从数据库中获取到任务时,该值会设置成true
,否则为false
PRE_READ_MS
:常量,值为 5000
因此,从以上表达式中可以看出,当数据库中存在待执行的任务时,就休眠1s,否则就休眠5s.为何是5s呢?因为从数据库中获取任务时,会获取在未来5s内执行的任务,如果未获取到任务,就表示未来5s内都没有需要执行的任务,直接休眠5s就可以了。
对于减去System.currentTimeMillis() % 1000
的操作,同前面讲述的一样,也是为了得到整秒数,就不多说了。
4. 分布式锁的实现
在任务的获取前,需要处理加锁操作,这点也好理解,在分布式时代,可能有多个admin
实例在协作运行,如果不加分布式锁,在同一时刻,会有多个admin
实例同时执行调度操作,最终造成任务的重复执行,这样明显不是我们所期望的,因此加锁是有必要的,那么分布式锁该如何加呢?
xxl-job
加分布式锁的方案是使用数据库的读锁,相关代码如下:
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
// 调整数据库连接的自动提交方式,为false表示手动提交,后面会看到 conn.commit() 的调用
conn.setAutoCommit(false);
// sql 查询语句后加 for update,表示获取数据库的写锁(排他锁)
preparedStatement = conn.prepareStatement(
"select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();
// 能执行到这里,就表示获得了锁
// 省略执行的内容
...
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 还原 autoCommit 值
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 在使用数据库连接池时,close()方法表示将连接归还到连接池中
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
对代码中重要的地方都作了注释,使用数据库实现排他锁的关键就在于如下:
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
对一个select
语句加上for update
,就表示获取数据库的写锁,写锁是排他锁,当一个连接获取到写锁后,其他连接就只能等待写锁的释放了,这样也就实现了分布式锁。
关于以上代码获取连接、关闭自动提交、手动执行sql、重围自动提交状态、关闭连接等等操作,为何不使用spring-tx
提供的功能呢?使用spring编程式事务或声明式事务(@Transactional
注解)就能完成代码以上一大堆代码的功能了。
在获取锁时,如果一个连接一直无法获取锁,是不是会一直等待呢?在mysql中,获取锁是有超时时间的,达到超时时间后,会报错:
报完错之后,while
循环还在继续,又进入下一轮的锁等待,直到获得锁或到达超时时间。因此,对于数据库实现的分布式锁来说,在多个线程竞争锁的情况下,至少有以下几个不足:
- 无法立即知道锁获取结果,也无法指定超时时间,成功与否完全取决于数据库,等待时间也是,自主性太差
- 获取不到锁的数据库连接会一直等待锁,这会造成数据库的压力会比较大
5. 任务的获取
继续,获取到锁后,接着就是从数据库查找要执行的任务了:
// 查出未来5s内需要执行的任务
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig()
.getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
查找需要执行的任务的方法为XxlJobInfoDao#scheduleJobQuery
,参数中的PRE_READ_MS
值为5000,preReadCount
值为6000,执行的sql如下:
SELECT <include refid="Base_Column_List" />
FROM xxl_job_info AS t
WHERE t.trigger_status = 1
and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
ORDER BY id ASC
LIMIT #{pagesize}
注意到and t.trigger_next_time <= #{maxNextTime}
,这样就得到的是下一次执行时间小于maxNextTime
的任务,也包含了过期任务(即到了执行时间但但没执行的任务)。
6. 任务的处理
得到待执行的任务后,接下来就是对这些任务进行触发操作了。在xxl-job
中,待执行的任务分为3类:
- 过期时间超时5s的任务:例如,当前时间是
2022-05-12 12:00:00
,任务A下一次的执行应该在2022-05-12 11:59:50
,这就是到点但任务未执行的情况,而执行时间与当前时间相差超过了5s. - 过期时间在5s内:例如,当前时间是
2022-05-12 12:00:00
,任务B下一次的执行应该在2022-05-12 11:59:56
,这也是到点任务未执行的情况,不过执行时间与当前时间相差在5s内. - 在未来5s内要执行的任务:例如,当前时间是
2022-05-12 12:00:00
,任务A下一次的执行应该在2022-05-12 12:00:03
,该任务是未来5s内将要执行的.
由于查询时指定了PRE_READ_MS
值为5000,因此得到的任务中不会出现在未来执行但与当前时间超过5s的任务。
对于以上3类任务,xxl-job
分别做了不同的处理,我们继续往下分析。
1. 过期时间超过5s的任务
对于过期时间超过5s的任务,xxl-job
处理的代码如下:
// 处理过期任务:执行触发时间超过了5s还没执行
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = "
+ jobInfo.getId());
// 处理过期的任务,处理策略有 FIRE_ONCE_NOW(立即执行1次)
// 与 DO_NOTHING(什么也不做),默认是 DO_NOTHING
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(
jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
// 触发过期的任务,这里的 trigger(...) 就是触发任务的方法了
JobTriggerPoolHelper.trigger(jobInfo.getId(),
TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger :"
+ " jobId = " + jobInfo.getId() );
}
// 更新最近一次执行时间以及下次执行时间,这里只是设置值,数据库的更新操作在下面执行
refreshNextValidTime(jobInfo, new Date());
}
代码中,会先判断过期处理策略,对于FIRE_ONCE_NOW
策略,会调用JobTriggerPoolHelper.trigger(...)
方法,否则就什么也不做,之前就调用refreshNextValidTime(...)
设置该任务下一次执行时间。
在xxl-job
中,调度过期策略有两种:
- FIRE_ONCE_NOW:立即执行一次
- DO_NOTHING:什么也不做,默认策略
该策略可以在任务编辑界面设置:
代码中出现的JobTriggerPoolHelper.trigger(...)
方法,就是任务的触发方法,也是任务调度的另一组件——触发器。关于它的内容我们在下一篇文章中再详细展开,这里知道它是负责将任务提交到executor
的组件就行了。
代码的最后调用了refreshNextValidTime(...)
方法,我们来看看这个方法做了什么:
private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
// 下一次执行的时间
Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
if (nextValidTime != null) {
jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
jobInfo.setTriggerNextTime(nextValidTime.getTime());
} else {
jobInfo.setTriggerStatus(0);
jobInfo.setTriggerLastTime(0);
jobInfo.setTriggerNextTime(0);
logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, "
+ "scheduleType={}, scheduleConf={}",
jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
}
}
从代码来看,以上代码就是得到任务下一次的执行时间,然后进行一个赋值操作,generateNextValidTime(jobInfo, fromTime)
就是计算下一次执行时间的。注意到该方法仅是做了一个赋值,真正更新到数据库的操作还在接下来的代码中。
2. 过期5s内的任务
继续看来看过期时间在5s内的任务,代码如下:
else if (nowTime > jobInfo.getTriggerNextTime()) {
// 处理立即执行的任务:过了执行时间,但执行时间与当前时间相差不足5s,立即执行1次
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON,
-1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = "
+ jobInfo.getId() );
refreshNextValidTime(jobInfo, new Date());
// 执行完之后,还需要再次执行,加入时间轮
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS
> jobInfo.getTriggerNextTime()) {
// 执行时间的秒数,比如任务的执行时间为 11:02:05,这里得到的就是秒数 5
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
pushTimeRing(ringSecond, jobInfo.getId());
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
对于过期时间在5s内的任务,xxl-job
的做法是立即触发一次,触发完成后,设置任务的下一次执行时间。处理完这两步操作后,如果这个任务在未来5s内还要再次执行,那要如何呢?这里分了3步:
- 计算执行时间的秒数,也就是
jobInfo.getTriggerNextTime()/1000)%60
,比如执行时间是2022-05-12 12:00:03
,这样计算得到的秒数为3
- 根据秒数,将任务id放到时间轮指定的位置,也就是
pushTimeRing(...)
方法 - 刷新任务下一次的执行时间,也就是
refreshNextValidTime(...)
方法
实际上,在下一小节中,对未到执行时间的任务
的处理,也是按以上3步操作来进行的。注意到以上有个方法pushTimeRing(...)
,关于该方法的操作,在下面的篇幅中再展开。
3. 未到执行时间的任务
最后就是处理未到执行时间的任务了,代码如下:
else {
// 未来5秒内执行的任务,加入时间轮
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
处理方式与过期5s内的任务
中对未来要执行的任务处理一模一样,这里就不多说了。
7. 时间轮处理
对于需要在未来执行的任务,xxl-job
的做法是调用pushTimeRing(...)
方法,接下来我们就来看看这个方法做了什么,进入 JobScheduleHelper#pushTimeRing
:
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : "
+ ringSecond + " = " + Arrays.asList(ringItemData) );
}
对于时间轮,结构如下:
就一个秒盘一样,拥有 0-59
60个索引位,每个索引位都是一个List<Integer>
结构,存放的是未来要执行任务的jobId
。在调用pushTimeRing(...)
前,会先通过jobInfo.getTriggerNextTime()/1000)%60
计算执行时间的秒数,这个秒数就是时间轮的索引位。
将任务添加到时间轮后,任务的获取操作就算完成了,那么放入时间轮中的任务又在什么时候拿出来交给触发器呢?这个就是ringThread
线程的任务了,在后面会继续深入。
8. 更新任务下一次执行时间
对于每类任务,在代码的最后,都会调用refreshNextValidTime(...)
方法,前面已经提到过该方法是用来设置任务下一次执行任务的,但并未更新到数据库中。在处理完3类任务后,接下来就是数据库的更新操作了,代码如下:
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
这里才是真正地进行数据库的操作,更新任务下一次执行时间的方法为XxlJobInfoDao#scheduleUpdate
,执行的sql如下:
UPDATE xxl_job_info
SET
trigger_last_time = #{triggerLastTime},
trigger_next_time = #{triggerNextTime},
trigger_status = #{triggerStatus}
WHERE id = #{id}
介绍完调度线程执行的流程后,我们对该线程的执行流程梳理如下:
- 获取锁
- 加载过期的任务以及即将执行的任务
- 过期的任务:过期时间超过5s,按配置的规则处理(立即执行1次、什么也不做)
- 立即执行的任务:过期时间在5s内,立即执行1次;对于在未来5s内还需要执行的任务,放入时间轮,待到时触发
- 即将执行的任务:放入时间轮,待到时触发
- 更新任务下一次执行时间
- 释放锁、休眠操作
ringThread
我们再来看看在JobScheduleHelper#start
中启动的另一个线程:ringThread
。在介绍scheduleThread
线程的流程时,对于未来5s内需要执行的线程,xxl-job
是放了一个名为时间轮
的结构中,而ringThread
所做的工作就是从时间轮
中获取到点任务,将其交给触发器的,直接进入ringThread
的run()
方法:
public void run() {
while (!ringThreadToStop) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
// 这里的i仅取值 0、1,从准确性上讲,应该仅处理i=0刻度的任务就可以了,
// 这里作者为了避免处理耗时太长,跨过刻度,因此向前校验一个刻度,即i=1
for (int i = 0; i < 2; i++) {
// 注意刻度的计算,为了防止出现负数,需要加60再取余,跟循环队列的处理类似
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = "
+ Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
// 获取了需要处理的任务,在这里进行触发操作
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
这个方法的内容还是比较简单的,对照着代码中的注释,基本上能明白其操作流程了,这里我们简单地分析下。
整个方法的主要操作是在while (!ringThreadToStop)
循环中,关于while
循环的套路以及ringThreadToStop
值的更新,前面介绍得已经够多了,这里就不多说了。
代码一开始,就是一段休眠操作,休眠的时间为1000 - System.currentTimeMillis() % 1000
毫秒,这是一个对齐整秒数的操作,在分析scheduleThread
线程的流程时已经详细分析过这种操作,这里就不多分析了。
接下来就是从时间轮中获取jobId
列表的操作,key
为当前时间的秒数,需要注意的是,这里取了两个秒数:(nowSecond+60-0)%60
、(nowSecond+60-1)%60
,本来只取(nowSecond+60-0)%60
对应的jobId列表就可以了,这里作者为了避免处理耗时太长,跨过刻度,因此向前多取了一个刻度。
获取到jobId列表后,接着就是调用JobTriggerPoolHelper.trigger(...)
进行触发操作,到这里任务的调度就算是完成了。
本小节的最后,我们也来总结下该线程的执行流程:
- 从时间轮中获取当前秒数对应的
jobIdList
- 遍历
jobIdList
,对每一个jobId
分别进行触发操作
总结
本文重点分析了xxl-job
的调度器执行流程,调度器由两个线程相互协作完成:
scheduleThread
:定时从数据库中获取待执行的任务,并将这些任务放入时间轮中ringThread
:定时从时间轮中获取将要执行的任务,进行触发操作
详细流程如下:
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。