注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git.
上一篇文章中,任务调度完成后,最终会调用JobTriggerPoolHelper.trigger(...)
方法进行触发操作,本文将来分析xxl-job
任务触发流程。
JobTriggerPoolHelper#trigger
关于任务的触发,直观感受应该来自于管理后台界面:
点击执行一次后,调用的接口是/jobinfo/trigger
,方法是JobInfoController#triggerJob
代码如下:
@RequestMapping("/trigger")
@ResponseBody
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
if (executorParam == null) {
executorParam = "";
}
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null,
executorParam, addressList);
return ReturnT.SUCCESS;
}
同上一文触发操作一样,最终调用的也是JobTriggerPoolHelper.trigger(...)
方法.
继续,看看JobTriggerPoolHelper.trigger(...)
方法的内容:
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount,
String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam,
executorParam, addressList);
}
仅仅只是调用了JobTriggerPoolHelper#addTrigger
方法,那我们继续下去,看看这个方法:
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 超时次数超过10次,使用 slowTriggerPool
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount,
executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
// 保证统计的是1分钟内的次数
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
// 任务的超时阈值为 500 ms,触发时间超过500ms,表示1次超时
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(
jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
方法内容比较简单,关键部分已经添加了注释,代码一开始,先是选择triggerPool
的操作。在触发器中,triggerPool
分为两大类:
fastTriggerPool
:默认使用slowTriggerPool
:超时触发次数在10次以上就使用该线程池,任务触发的超时阈值为 500 ms
为任务选定好triggerPool
后,就是触发操作了,代码如下:
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
try {
// 处理触发操作
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount,
executorShardingParam, executorParam, addressList);
} catch (Exception e) {
// 省略了许多内容
...
}
}
触发操作是在线程池中执行的,调用的是XxlJobTrigger.trigger(...)
方法来处理。
XxlJobTrigger.trigger
继续进入XxlJobTrigger.trigger(...)
方法,代码如下:
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// 1. 获取任务数据
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig()
.getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
// 2. 处理失败重试次数
int finalFailRetryCount = failRetryCount>=0 ? failRetryCount
: jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao()
.load(jobInfo.getJobGroup());
// 3. 是否指定了执行的机器
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// 4. 分片参数
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0])
&& isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
// 5. 执行,分为广播模式与其他模式
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(
jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
// 依次广播到每台机器
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType,
i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType,
shardingParam[0], shardingParam[1]);
}
}
咱们对该方法的功能逐一分析吧。
1. 获取任务数据
方法中传入的是jobId
,这里需要获取任务的详细信息,执行的方法是
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig()
.getXxlJobInfoDao().loadById(jobId);
执行的就是一个简单的sql查询操作:
SELECT <include refid="Base_Column_List" />
FROM xxl_job_info AS t
WHERE t.id = #{id}
2. 计算失败重试次数
所谓的失败重试次数,是指任务执行失败了,最多重试多少次,次数来源于任务的配置:
3. 是否指定了执行的机器
该参数在执行一次时指定:
点击执行1次后,在弹出的界面中,可以指定机器地址:
4. 分片参数
分片参数有两个:
- index:指定
executor
的下标索引 - total:
executor
的总数量
该参数用于指定使用的executor
。
5. 执行,分为广播模式与其他模式
这里会调用XxlJobTrigger#processTrigger
方法处理执行操作,这里需要提一个概念:路由策略。所谓的路由策略
,是指在多个executor
时,选择哪个executor
执行任务的策略,可以在任务编辑界面设置:
路由策略主要分为两大类:
- 分片广播:所谓的广播,就是将任务的执行请求发往每一个
executor
,该模式下,任务会在每个executor
都会执行 - 其他策略:除了分片广播外的策略外,其他策略均只会选择一个
executor
来执行,选择逻辑由ExecutorRouter
的子类来实现,关于具体的选择策略,在后面再详细展开。
XxlJobTrigger#processTrigger
在XxlJobTrigger.trigger(...)
方法中,调用了processTrigger(...)
来处理触发操作,我们继续进入深入该方法:
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo,
int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// 1. 根据是否为广播模式来处理分片参数
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(
jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(
jobInfo.getExecutorRouteStrategy(), null);
String shardingParam =
(ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum)
? String.valueOf(index).concat("/").concat(String.valueOf(total)) : null;
// 2. 保存执行日志
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、初始化触发参数
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、根据执行策略获取executor的地址
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
// index 与 total 仅在广播模式中发挥作用
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
// 执行策略,仅仅只是为了得到执行器的地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(
triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE,
I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、触发操作
ReturnT<String> triggerResult = null;
if (address != null) {
// 执行
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
// 5、构建触发信息
StringBuffer triggerMsgSb = new StringBuffer();
// 省略触发信息构建
...
// 6、更新触发日志
jobLog.setExecutorAddress(address);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setExecutorShardingParam(shardingParam);
jobLog.setExecutorFailRetryCount(finalFailRetryCount);
//jobLog.setTriggerTime();
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(triggerMsgSb.toString());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
这个方法有点长,关键部分已经做了注释,对照着注释理解起来并不难,这里总结下该方法的逻辑内容:
- 根据是否为广播模式来处理分片参数
- 保存执行日志
- 根据执行策略获取executor的地址
- 触发操作
- 构建触发信息
- 更新触发日志
这里值得注意的是,在选择由哪个executor
执行任务时,如果是广播模式,会使用传入的参数index
与total
计算得到具体使用的executor
,否则就使用界面配置的策略来选择,这点在分析XxlJobTrigger.trigger
方法时也提到过。
在这个方法中,调用了runExecutor(...)
继续处理触发操作,看来我们还得往下走。
XxlJobTrigger#runExecutor
在XxlJobTrigger#processTrigger
方法中,调用了runExecutor(...)
处理触发操作,我们继续进入该方法:
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
// 执行,请求执行器
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if "
+ "the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
// 组装结果参数
StringBuffer runResultSB = new StringBuffer(
I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
这个方法调用的是ExecutorBiz.run(...)
,继续深入:
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout,
triggerParam, String.class);
}
有没有对这个方法十分眼熟?在《admin与executor通讯》一文中,我们介绍了admin
到executor
的交互流程,这个方法run()
就是当时介绍的操作之一。
总结
到这里,触发器的执行流程就分析完成了,相比于前面文章中的其他流程,这块流程还是很清晰直白的,以一张图来总结触发操作的方法执行情况:
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。