注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git.
本文来分析执行器注册到admin
的流程,也就是EmbedServer#startRegistry
方法。
有了前面两篇文章的铺垫,我们终于可以分析executor
的注册流程了,注册流程在executor
启动流程中的位置如下:
我们直接进入ExecutorRegistryThread#start
的方法:
public void start(final String appname, final String address){
// 判断参数的合法性
if (appname==null || appname.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, "
+ "appname is null.");
return;
}
// 如果没有admin服务,就不注册了
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, "
+ "adminAddresses is null.");
return;
}
registryThread = new Thread(new Runnable() {
@Override
public void run() {
...
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
}
这个方法主要的功能就是创建并启动了一个名为registryThread
的线程,线程中执行的操作就是executor
的注册操作,其run()
方法如下:
run()
方法的功能还是挺简单的,主要做了两件事:
- 执行注册操作
- 执行摘除操作
接下来我们就来具体分析这两个操作。
执行注册操作
run()
方法一开始,就是一段while
循环:
while
循环里,先是执行了注册操作,然后就是休眠30s,由于是在while
循环中,休眠结束后,又会再次执行注册操作,反复进行下去。那么它会在何时停止注册呢?
从while
的条件来看,就是toStop
为true
时注册操作才停止,而后往下执行executor
摘除操作,改变toStop
的代码如下:
public void toStop() {
// 改变 toStop 的值
toStop = true;
// 如果执行该方法时,线程正好处于休眠中,调用 interrupt() 打断休眠
if (registryThread != null) {
registryThread.interrupt();
try {
registryThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
这个方法是不是看着很眼熟,在前面介绍admin
启动流程时,已经介绍过xxl-job
中线程关闭的套路,这里也是同样的配方。
再进一步探索toStop()
的调用关系:
发现最终来自于XxlJobSpringExecutor#destroy
方法:
public class XxlJobSpringExecutor extends XxlJobExecutor
implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
@Override
public void destroy() {
super.destroy();
}
...
}
关于XxlJobSpringExecutor#destroy
方法,在分析执行器启动流程时,他是由DisposableBean
接口提供,在当前spring bean
(也就是XxlJobSpringExecutor
)销毁时执行,这也可以说,在项目停止时toStop()
方法会调用到。
接着我们来看看注册操作,代码如下:
RegistryParam registryParam = new RegistryParam(
RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
// 注册
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE
== registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, "
+ "registryParam:{}, registryResult:{}",
new Object[]{registryParam, registryResult});
// 其中之一注册成功即可,成功就break
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, "
+ "registryParam:{}, registryResult:{}",
new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, "
+ "registryParam:{}", registryParam, e);
}
}
这块代码还是很清晰的,先是遍历XxlJobExecutor.getAdminBizList()
,逐一进行注册,只要其中之一注册成功了,之后的adminBiz
就不会再注册了,也就是每次注册时,只要有一个adminBiz
注册成功就表示该executor
注册成功了。
执行注册的方法为adminBiz.registry(...)
,也就是AdminBizClient#registry
,它的代码如下:
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry",
accessToken, timeout, registryParam, String.class);
}
这是一个http请求,请求到admin
来处理。
上一篇文章中,我们已经详细分析了executor
与admin
之间的通讯方式,这里我们直接来看admin
的处理方法,进入AdminBizImpl#registry
方法:
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return JobRegistryHelper.getInstance().registry(registryParam);
}
继续查看JobRegistryHelper#registry
方法:
public ReturnT<String> registry(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
int ret = XxlJobAdminConfig.getAdminConfig()
.getXxlJobRegistryDao()
// 更新执行器
.registryUpdate(registryParam.getRegistryGroup(),
registryParam.getRegistryKey(),
registryParam.getRegistryValue(), new Date());
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
// 更新失败了,就注册执行器
.registrySave(registryParam.getRegistryGroup(),
registryParam.getRegistryKey(),
registryParam.getRegistryValue(), new Date());
// 这里是个空方法,没执行任何内容
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
在《admin
启动流程》一文中,我们在分析JobRegistryHelper#start
方法时,介绍到该方法创建了两个线程池:
registryOrRemoveThreadPool
:注册或摘除executor
registryMonitorThread
:executor
有效监测,及时去除挂了的executor
注册操作就是在registryOrRemoveThreadPool
中处理的,注册操作主要是进行两个:
-
registryUpdate(...)
:更新executor
注册信息,sql 如下:UPDATE xxl_job_registry SET `update_time` = #{updateTime} WHERE `registry_group` = #{registryGroup} AND `registry_key` = #{registryKey} AND `registry_value` = #{registryValue}
-
registrySave(...)
:保存executor
注册信息INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`) VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, #{updateTime})
executor
注册成功后,xxl_job_registry
表中就有最新的executor
的注册记录了。
对于xxl_job_registry
表中的记录,JobRegistryHelper
中的 registryMonitorThread
线程会持续监测executor
的注册信息,及时摘除长时未更新注册时间的executor
,这一点在《admin
启动流程》一文中已分析过了,这里就不展开了。
执行摘除操作
executor
停止时,ExecutorRegistryThread#toStop
方法会执行,进而registryThread#run
开始executor
的摘除操作,相关代码如下:
public void run() {
while (!toStop) {
// 省略注册操作
...
}
// 跳出上面的 while 循环(即toStop=true,也就是服务停止)时才执行下面的代码
// registry remove
try {
RegistryParam registryParam = new RegistryParam(
RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
// 执行删除操作
ReturnT<String> registryResult = adminBiz
.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE
== registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, "
+ "registryParam:{}, registryResult:{}",
new Object[]{registryParam, registryResult});
// 其中之一注册成功即可,成功就break
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, "
+ "registryParam:{}, registryResult:{}",
new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, "
+ "registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
同注册操作一样,执行操作也是先遍历 XxlJobExecutor.getAdminBizList()
,逐一进行摘除操作。同样地,只要有一个adminBiz.registryRemove(...)
上执行成功了,就表示该executor
摘除成功了,对后续的adminBiz
就不再执行摘除操作了。
执行摘除操作的方法是adminBiz.registryRemove
,也就是AdminBizClient#registryRemove
,代码如下:
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove",
accessToken, timeout, registryParam, String.class);
}
这个方法中只要是进行一个http请求,将数据发往admin
,然后由admin
处理摘除操作。对于executor
与admin
之间的通讯,在前面的文章已经分析过了,这里直接来看看admin
是如何处理executor
摘除的,进入JobRegistryHelper#registryRemove
方法:
public ReturnT<String> registryRemove(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao()
// 删除执行器
.registryDelete(registryParam.getRegistryGroup(),
registryParam.getRegistryKey(),
registryParam.getRegistryValue());
if (ret > 0) {
// 这里是个空方法,没执行任何内容
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
以上方法最关键的就是XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryDelete
方法了,摘除服务器的操作就是在这里进行的,这个方法执行的的sql如下:
DELETE FROM xxl_job_registry
WHERE registry_group = #{registryGroup}
AND registry_key = #{registryKey}
AND registry_value = #{registryValue}
从这里可以看到,所谓的executor
摘除操作,就是把executor
注册信息从xxl_job_registry
表中删除,这样任务就不再调度到这个executor
了。
关于执行器注册流程的介绍就到这里了,下一篇继续探索xxl-job
其他流程。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。