注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git.
本文将分析执行器(executor
)与admin
之间的通讯。
在xxl-job
中,executor
与admin
并不是相互独立工作的,他们之前会通过网络通讯相互协作完成任务的调度流程,比如,executor
启动时,需要把自己的地址信息(ip:端口)信息告知admin
;任务调度时,admin
会把任务信息发送给executor
,在executor
上真正执行任务。这两类通讯中,前者是executor
到admin
的通讯,后者是admin
到executor
的通讯,本文接下来就分析这两类通讯。
executor
到admin
处理类:AdminBiz
admin
对executor
提供的服务定义在com.xxl.job.core.biz.AdminBiz
接口中,内容如下:
public interface AdminBiz {
/**
* callback
*
* @param callbackParamList
* @return
*/
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
/**
* registry
*
* @param registryParam
* @return
*/
public ReturnT<String> registry(RegistryParam registryParam);
/**
* registry remove
*
* @param registryParam
* @return
*/
public ReturnT<String> registryRemove(RegistryParam registryParam);
}
这是一个接口,可以看到它共有3个方法:
callback
:处理任务回调registry
:处理执行器(executor
)的注册,即将executor
注册到admin
registryRemove
:处理执行器(executor
)的下线,即将executor
从admin
中删除
AdminBiz
有2个实现类:
-
com.xxl.job.core.biz.client.AdminBizClient
:从名称上看,它是一个客户端类,位于executor
进程中,内容如下:public class AdminBizClient implements AdminBiz { // 省略属性及构造方法 ... @Override public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class); } @Override public ReturnT<String> registry(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class); } @Override public ReturnT<String> registryRemove(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class); } }
这3个方法最终都调用了
XxlJobRemotingUtil#postBody
方法,而XxlJobRemotingUtil#postBody
方法就是用来发起http请求的,即executor
通过http协议将数据发送到admin
。 -
com.xxl.job.admin.service.impl.AdminBizImpl
:这个类就是具体的业务处理类了,它位于admin
进程中,代码如下:public class AdminBizImpl implements AdminBiz { @Override public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) { return JobCompleteHelper.getInstance().callback(callbackParamList); } @Override public ReturnT<String> registry(RegistryParam registryParam) { return JobRegistryHelper.getInstance().registry(registryParam); } @Override public ReturnT<String> registryRemove(RegistryParam registryParam) { return JobRegistryHelper.getInstance().registryRemove(registryParam); } }
从代码中可以看到,真正干活的是
JobCompleteHelper
与JobRegistryHelper
类,这两个类的start()
方法在介绍admin启动流程有介绍过,当时介绍的是启动,这里就是真正的使用了。关于这些方法的具体细节本文就不展开了,等到分析到具体功能时再详细阐述。
admin
请求入口
到了这里,我们就明白了executor
是通过http请求将数据发送到admin
的,那么admin
的请求入口又是在哪里呢?xxl-job-admin
是一个springboot项目,请求入口是通过springmvc
实现的,具体的的controller
为com.xxl.job.admin.controller.JobApiController
:
@Controller
@RequestMapping("/api")
public class JobApiController {
@Resource
private AdminBiz adminBiz;
/**
* api
*
* @param uri
* @param data
* @return
*/
@RequestMapping("/{uri}")
@ResponseBody
@PermissionLimit(limit=false)
public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri,
@RequestBody(required = false) String data) {
// 省略校验代码
...
// 服务映射
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data,
List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE,
"invalid request, uri-mapping("+ uri +") not found.");
}
}
}
它对外开放的入口是/api/{url}
,在JobApiController#api
方法中处理具体的请求路径,这些路径包括:
/api/callback
:处理方法是AdminBizImpl#callback
/api/registry
:处理方法是AdminBizImpl#registry
/api/registryRemove
:处理方法是AdminBizImpl#registryRemove
对于这3个方法的作用,xxl-job
文档已经说明得很清楚了:
想要了解的小伙伴可自动参考。
小结
最后以一幅图来总结下executor
与admin
之间的通讯流程:
admin
到executor
的通讯
处理类:ExecutorBiz
executor
对admin
提供的服务定义在com.xxl.job.core.biz.ExecutorBiz
接口中,内容如下:
public interface ExecutorBiz {
/**
* beat
* @return
*/
public ReturnT<String> beat();
/**
* idle beat
*
* @param idleBeatParam
* @return
*/
public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam);
/**
* run
* @param triggerParam
* @return
*/
public ReturnT<String> run(TriggerParam triggerParam);
/**
* kill
* @param killParam
* @return
*/
public ReturnT<String> kill(KillParam killParam);
/**
* log
* @param logParam
* @return
*/
public ReturnT<LogResult> log(LogParam logParam);
}
对该接口内的方法说明如下:
beat()
:存活检测,该方法仅是返回了一个SUCCESS
,admin
可通过该方法的返回值判断executor
是否存活idleBeat()
:空闲检测,admin
可通过该方法来判断executor
是否处于空闲中run()
:任务执行方法,执行具体的任务kill()
:杀死正在执行中任务log()
:获取任务执行日志
ExecutorBiz
接口也有两个实现类:
-
com.xxl.job.core.biz.client.ExecutorBizClient
:请求发起类,位于admin
进程中,代码如下:public class ExecutorBizClient implements ExecutorBiz { // 省略构造方法及属性 ... @Override public ReturnT<String> beat() { return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class); } @Override public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){ return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class); } @Override public ReturnT<String> run(TriggerParam triggerParam) { return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); } @Override public ReturnT<String> kill(KillParam killParam) { return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class); } @Override public ReturnT<LogResult> log(LogParam logParam) { return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class); } }
同
AdminBizClient
一样,ExecutorBizClient
也是通过http协议请求executor
接口的,XxlJobRemotingUtil.postBody(...)
中出现的addressUrl
就是executor
的地址。 -
com.xxl.job.core.biz.impl.ExecutorBizImpl
:业务处理类,位于executor
进程中,在这个类中才真正处理executor
的各种操作。关于该类的具体内容,本文就不展开了,后面等到分析具体功能才详细阐述。
executor
请求入口
admin
通过springmvc
对executor
开放了请求入口,那executor
是不是同样以springmvc
对admin
开放入口呢?
在上一篇介绍《xxl-job执行器启动流程》中,我们知道在XxlJobExecutor#initEmbedServer
方法中启动了一个netty
服务,而这个netty
服务正是用来提供对admin
的请求入口的!
关于netty
的启动流程及相关知识的介绍并非本文重点,我们直接进入请求处理方法EmbedHttpServerHandler#channelRead0
:
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg)
throws Exception {
// http属性的处理,如请求参数、uri、请求方法、accessToken等
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// 在业务线程池中处理请求
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// 在这里处理请求
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
继续,进入EmbedHttpServerHandler#process
方法:
private Object process(HttpMethod httpMethod, String uri, String requestData,
String accessTokenReq) {
// 省略参数校验
...
// 为每个uri分配特定的处理方法
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData,
IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData,
TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE,
"invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE,
"request error:" + ThrowableUtil.toString(e));
}
}
可以看到,最近处理请求uri
的方法就是EmbedHttpServerHandler#process
了,而它最终调用的也是ExecutorBizImpl
的方法。
关于 executor
对外接口,xxl-job
也贴心地为我们提供了官方文档:
这里留个思考题:admin
与executor
之间的通讯,同样提供http请求入口,为何admin
使用的是springmvc
,而executor
使用的却是netty
呢?executor
能不能也使用springmvc
呢?
小结
最后以一幅图来总结下admin
与executor
之间的通讯流程:
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。