注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git.
在前面配置xxl-job
任务时,提到过xxl-job
的路由策略,本文将来分析这些策略。
在任务的编辑页面,可以配置路由策略:
所谓的路由策略,是指在多个executor
存在时,选择哪一个executor
来执行任务的策略。xxl-job
支持的路由策略有如下几种:
- 第一个:选择第一个
executor
- 最后一个:选择最后一个
executor
- 轮询:依次选择
executor
- 随机:随机选择一个
executor
- 一致性hash:使用一致性hash算法来选择
executor
- 最不经常使用:选择最不经常使用的
executor
- 最近最久未使用:选择最久未使用的
executor
- 故障转移:判断当前选择的
executor
是否可用,若不可用,则选择另一个executor
- 忙碌转移:判断当前选择的
executor
是否处于忙碌状态,若处于,则选择另一executor
- 分片广播:每一个
executor
都会执行任务
路由策略的选择
路由策略的选择在XxlJobTrigger#processTrigger
,相关代码如下:
// 获取路由策略枚举类,jobInfo.getExecutorRouteStrategy() 来自于数据库
ExecutorRouteStrategyEnum executorRouteStrategyEnum =
ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
....
// 路由策略的处理,区分分片广播与其他策略
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
// 执行策略,仅仅只是为了得到执行器的地址
routeAddressResult = executorRouteStrategyEnum.getRouter()
.route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
在任务的配置时,任务的路由策略会保存在数据库中,在XxlJobTrigger#processTrigger
方法中,会把路由策略拿出来以便获取路由策略的枚举类,jobInfo.getExecutorRouteStrategy()
就是数据库中保存的策略。
路由枚举类ExecutorRouteStrategyEnum
的代码如下:
public enum ExecutorRouteStrategyEnum {
/** 第一个 */
FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
/** 最后一个 */
LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
/** 轮询 */
ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
/** 随机 */
RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
/** 一致性hash */
CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"),
new ExecutorRouteConsistentHash()),
/** 最不经常使用 */
LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
/** 最近最久未使用 */
LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
/** 故障转移 */
FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
/** 忙碌转移 */
BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
/** 分片广播 */
SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
...
public static ExecutorRouteStrategyEnum match(String name,
ExecutorRouteStrategyEnum defaultItem){
if (name != null) {
for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
if (item.name().equals(name)) {
return item;
}
}
}
return defaultItem;
}
}
ExecutorRouteStrategyEnum
的枚举值中包含了该策略的处理类,获取到ExecutorRouteStrategyEnum
时,同时也就获取到了该策略的处理类了。
策略处理的接口为ExecutorRouter
,代码如下:
public abstract class ExecutorRouter {
protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);
/**
* route address
*
* @param addressList
* @return ReturnT.content=address
*/
public abstract ReturnT<String> route(TriggerParam triggerParam,
List<String> addressList);
}
接口中只有一个方法route(xxx)
,它的返回值就是要执行任务的executor
的地址。以上10种策略,除了分片广播外,其他每种策略都有一个对应的实现类:
接下来我们就来逐一分析这些策略.
分片广播
之所以把分片广播策略放在最前面,是因为没有一个策略类是用来处理分片广播策略的,它的实现是xxl-job
对它做了额外处理。介绍xxl-job
对分片广播策略的额外处理前,需要先回顾下任务的触发流程,直接上图:
以上流程是《任务执行流程(二)之触发器揭秘》中介绍的内容,跟随着触发流程,我们到XxlJobTrigger#trigger
方法:
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
...
// 额外处理分片广播策略,这一行是关键
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==
ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
// 依次广播到每台机器
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i,
group.getRegistryList().size());
}
}
...
以上方法删减了其他代码,重点保留了分片广播的处理,如果是分片广播策略,就遍历executor
列表,对每个executor
都执行processTrigger
方法。
继续,进入XxlJobTrigger#processTrigger
方法:
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo,
int finalFailRetryCount,
TriggerTypeEnum triggerType,
int index, int total){
....
// 路由策略的处理,区分分片广播与其他策略
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
}
...
}
方法中的index
,就是XxlJobTrigger#trigger
传入的executor
列表的下标id,对分片广播策略来说,这个index
就是执行任务的executor
了。
第一个
处理第一个
路由策略的类是ExecutorRouteFirst
,代码如下:
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<String>(addressList.get(0));
}
这里就是获取exceutor
地址列表中的第一个exceutor
的地址。
最后一个
处理最后一个
路由策略的类是ExecutorRouteFirst
,代码如下:
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
return new ReturnT<String>(addressList.get(addressList.size()-1));
}
与第一个
策略相对应,这里就是获取exceutor
地址列表中的最后一个exceutor
的地址。
轮询
处理轮询
策略的类是ExecutorRouteRound
,代码如下:
/** 执行次数,key 为 jobId,value 为次数 */
private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob
= new ConcurrentHashMap<>();
private static long CACHE_VALID_TIME = 0;
/**
* 计数操作
*
* 如果首次执行或任务执行次数大于1000000了,那么执行次数重置为100以内的整数
* 否则,每次请求时,count次数都加1
*/
private static int count(int jobId) {
// cache clear
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
routeCountEachJob.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
AtomicInteger count = routeCountEachJob.get(jobId);
if (count == null || count.get() > 1000000) {
// 初始化时主动Random一次,缓解首次压力
count = new AtomicInteger(new Random().nextInt(100));
} else {
// count++
count.addAndGet(1);
}
routeCountEachJob.put(jobId, count);
return count.get();
}
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
// 求余,计算executor的索引
String address = addressList.get(
count(triggerParam.getJobId())%addressList.size());
return new ReturnT<String>(address);
}
在 ExecutorRouteRound
中,有一个ConcurrentMap
,用来统计任务的执行次数(key
为 jobId
,value
为次数)。
ExecutorRouteRound
中,count(xxx)
方法的核心逻辑就两句话:如果首次执行或任务执行次数大于1000000了,那么执行次数重置为100以内的整数;否则,每次请求时,count次数都加1。
最后来看看route(xxx)
方法,这个方法最核心的代码就一行:
count(triggerParam.getJobId())%addressList.size()
即获取任务总的执行次数,然后对executor
总实例数进行求余操作,得到的结果就是executor
列表的下标索引了,再调用addressList.get(xxx)
就能得到具体的executor
地址了。
随机
处理随机策略的类是ExecutorRouteRandom
,代码如下:
private static Random localRandom = new Random();
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
String address = addressList.get(localRandom.nextInt(addressList.size()));
return new ReturnT<String>(address);
}
这个实现很简单,就是随机生成一个0~executor实例总数
之间的整数,这个整数就是执行任务的executor
了。
一致性hash
接着我们来介绍一个有意思的策略:一致性hash。关于一致性算法的相关介绍,这里就不过多介绍了,我们重点来看xxl-job
关于它的实现,进入ExecutorRouteConsistentHash
:
/** 虚拟节点的数量 */
private static int VIRTUAL_NODE_NUM = 100;
/**
* 处理address的获取
*/
public String hashJob(int jobId, List<String> addressList) {
// 构建 addressRing
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
// 根据 jobId 的 hash 值
long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return addressRing.firstEntry().getValue();
}
方法的一开始,使用了TreeMap
来保存节点,key
是节点生成的hash值,value
是address
。
我们都知道TreeMap
是一个有序的结构,它会根据key
指定的排序规则来排序,这里的key
是Long
类型,因此会按从小到的顺序排序。
从for (int i = 0; i < VIRTUAL_NODE_NUM; i++)
来看,它会为每个address
生成100个虚拟节点(VIRTUAL_NODE_NUM
值为100),节点生成的规则为hash("SHARD-" + address + "-NODE-" + i)
,其中i
取值0~99。规则中的hash(xxxx)
方法是ExecutorRouteConsistentHash
的私有方法,它的作用就是将传入字符串进行md5
计算,然后将得到的值截取前4个Byte转成Long类型。
举例说明,比如现在executor
的address
为a1
、a2
、a3
、a4
,经过上述虚拟化计算后,得到的TreeMap
如下:
TreeMap
中的key
是Long
类型且有序递增,value
就是打散后的a1
、a2
、a3
、a4
了。需要注意的是,上述key
是为了说明而设置的,真实计算得到的值可能并不是如此。
整个TreeMap
中的元素最多400,不排除存在key
相同从而导致覆盖的情况,比如hash("a1-NODE-10")
与 hash("a4-NODE-32")
得到的结果都是47,这样后放入的value
会覆盖先放入的。
得到这个TreeMap
后,接着就是根据jobId
得到对应的address
了,这里同样也对jobId
进行hash
求值,然后拿jobId
的哈希值去切分TreeMap
:
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
假设jobId
的hash
值是30,以上图为例,tailMap
的操作后得到的结果如下:
对于一致性hash
算法来说,得到的lastRing
的首个元素的value
值就是求得的address
了。回到我们示例中,32
对应的a2
就是一致性hash
算法计算得到的地址。
如果jobId
的hash
值是164,大于163了,大于key
的最大值163,此时得到的lastRing
里什么元素也没有,应该如何呢?此时直接取TreeMap
中的第一个元素就可以了。回到我们示例中,即14
对应的a3
就是一致性hash
算法计算得到的地址。
最不经常使用
处理随机策略的类是ExecutorRouteLFU
,
/**
* 保存任务的执行次数
* jobLfuMap 内容:<任务id, <executor地址, 任务在该executor上执行的次数>>
*/
private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap
= new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
/** 任务执行次数的缓存时间 */
private static long CACHE_VALID_TIME = 0;
/**
* 计算使用哪个 executor
*/
public String route(int jobId, List<String> addressList) {
// 1. 如果缓存到期,清除 jobLfuMap
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLfuMap.clear();
// 缓存一天
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// 2. 创建 lfuItemMap
HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);
if (lfuItemMap == null) {
lfuItemMap = new HashMap<String, Integer>();
jobLfuMap.putIfAbsent(jobId, lfuItemMap);
}
// 3. 初始化 lfuItemMap
for (String address: addressList) {
if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
// 初始化时主动Random一次,缓解首次压力
lfuItemMap.put(address, new Random().nextInt(addressList.size()));
}
}
// 4. 去重无用的地址
List<String> delKeys = new ArrayList<>();
for (String existKey: lfuItemMap.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lfuItemMap.remove(delKey);
}
}
// 5. 对执行次数排序
List<Map.Entry<String, Integer>> lfuItemList
= new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
// 6. 获取执行次数最少的address
Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
String minAddress = addressItem.getKey();
addressItem.setValue(addressItem.getValue() + 1);
return addressItem.getKey();
}
ExecutorRouteLFU
中有两个属性:
-
jobLfuMap
:类型是ConcurrentMap
,它保存的key-value
为<任务id, <executor地址, 任务在该executor上执行的次数>>
,嵌套型的ConcurrentMap
,用表格示意大概是这样: -
CACHE_VALID_TIME
:类型是long
,记录了缓存过期时间
计算使用哪个executor
的关键在于route(int, List<String>)
,下面我们来逐一分析它的功能。
1. 清除 jobLfuMap
每次执行方法时,都会判断缓存是否到期,如果到期,就清理jobLfuMap
,相当于重新统计任务执行次数。目前缓存时间是1000*60*60*24
毫秒,也就是1天。
2. 创建 lfuItemMap
如果jobId
对应的lfuItemMap
不存在,就新建一个,并放入jobLfuMap
中。lfuItemMap
的key
是address
,value
是任务在该executor
上执行的次数。
值得一提的,为了避免覆盖,代码中使用的是putIfAbsent(...)
方法:
lfuItemMap = new HashMap<String, Integer>();
jobLfuMap.putIfAbsent(jobId, lfuItemMap);
关于putIfAbsent(...)
方法,ConcurrentHasmMap
的注释如下:
该方法的作用是,仅在指定的key
不存在进才放入map
中,这点想必大家都知道,而它的返回值就有意思了:在原来有值的情况下,返回原来的值,否则返回null
。
实际上这里直接使用put(...)
方法就可以了,在《执行器揭秘》一文的分析中可以得知,xxl-job
传为每个jobId
创建一个jobThread
,即同一个jobId
产生的任务由同一个jobThread
来执行,不会产生并发问题。
3. 初始化 lfuItemMap
初始化方式就比较简单了,对于不在jobItemMap
中的address
,放入其中并且指定一个随机数值。
4. 去重无用的地址
如果运行途中有executor
实例挂了,那executor
的地址就不会出现在addressList
中了,此时需要把这些非存活状态的address
从jobItemMap
中剔除出去。关于Map
的遍历-删除
操作,这些都是java基础内容,这里就不多说了。
结合3、4步,这里以一个实例来说明下操作流程:
假设 jobItemMap
中存在address
为:
现阶段存活的addressList
为:
即 a1 跟 a2 挂了,新增了 a5, a6 两个实例
经过第3步后,jobItemMap
中存在address
为:
即新增了 a5, a6 两个实例
经过第4步后,jobItemMap
中存在address
为:
即剔除了 a1, a2 两个非存活状态的实例
这样之后,jobItemMap
中的address
就都是存活的executor
实例了。
5. 对执行次数排序
xxl-job
在这块处理的比较简单,流程如下:
- 使用
lfuItemMap.entrySet()
方法,得到lfuItemMap
的entrySet
- 使用
new ArrayList(entrySet)
的方式,将第1步得到的entrySet
转成List
类型,List
存放的元素为Map.Entry
,这个key
就是address
,value
就是该address
执行任务的次数 - 使用
Collections.sort
对第2步得到的List
排序,排序对象为Map.Entry
的value
,规则为Integer
类型由小到大
6. 获取执行次数最少的address
在第5步排序完成之后,在lfuItemList
中的第一个元素就是执行次数最小的executor
了(即最不经常使用的executor
),直接使用lfuItemList.get(0)
获取就行了。
获取后,就表示该executor
执行1次任务了,别忘了执行次数+1
的操作:
addressItem.setValue(addressItem.getValue() + 1);
最近最久未使用
处理最近最久未使用的类为ExecutorRouteLRU
,代码如下:
/**
* 保存任务的执行地址
* jobLRUMap 内容:<任务id, <executor地址, executor地址>>
*/
private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap
= new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
/** 任务执行次数的缓存时间 */
private static long CACHE_VALID_TIME = 0;
/**
* 计算使用哪个 executor
*/
public String route(int jobId, List<String> addressList) {
// 1. 清理缓存
if (System.currentTimeMillis() > CACHE_VALID_TIME) {
jobLRUMap.clear();
CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
}
// 2. 创建 lruItem
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
jobLRUMap.putIfAbsent(jobId, lruItem);
}
// 3. 初始化 lruItem
for (String address: addressList) {
if (!lruItem.containsKey(address)) {
lruItem.put(address, address);
}
}
// 4. 移除无用的地址
List<String> delKeys = new ArrayList<>();
for (String existKey: lruItem.keySet()) {
if (!addressList.contains(existKey)) {
delKeys.add(existKey);
}
}
if (delKeys.size() > 0) {
for (String delKey: delKeys) {
lruItem.remove(delKey);
}
}
// 5. 获取最近最久未使用的地址
String eldestKey = lruItem.entrySet().iterator().next().getKey();
// 6. 通过 get(...) 方法访问元素
String eldestValue = lruItem.get(eldestKey);
return eldestValue;
}
ExecutorRouteLRU
中有两个属性:
-
jobLRUMap
:类型是ConcurrentMap
,它保存的key-value
为<任务id, <executor地址, executor地址>>
,嵌套型的Map
,内部嵌套了一个LinkedHashMap
,实现LRU
的关键就是这个LinkedHashMap
了。整个结构用表格示意大概是这样: -
CACHE_VALID_TIME
:类型是long
,记录了缓存过期时间
计算使用哪个executor
的关键在于route(int, List<String>)
,它的步骤如下:
- 清理缓存
- 创建 lruItem
- 初始化 lruItem
- 移除无用的地址
- 获取最近最久未使用的地址
- 通过 get(…) 方法访问元素
从代码形式上看,ExecutorRouteLRU
的route(int, List<String>)
方法与ExecutorRouteLFU
的route(int, List<String>)
方法非常相似,在本小节我们重点关注功能的实现。
最近最久未使用(LRU)的实现关键在于LinkedHashMap
,注意它的实例化:
lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
该构造方法的注释如下:
重点关键accessOrder
的值:true 表示访问顺序排序(get/put时排序),false表示插入顺序排序。
与HashMap
不同,在LinkedHashMap
中,除了数组+链表
(或数组+红黑树
)来保存元素外,还有一个双向链表来维护元素的顺序:
代码节选自 java.util.LinkedHashMap
/**
* 双向链表的结点
*/
static class Entry<K,V> extends HashMap.Node<K,V> {
Entry<K,V> before, after;
Entry(int hash, K key, V value, Node<K,V> next) {
super(hash, key, value, next);
}
}
/** 双向链表的头节点 */
transient LinkedHashMap.Entry<K,V> head;
/** 双向链表的尾节点 */
transient LinkedHashMap.Entry<K,V> tail;
查看代码可以发现,在LinkedHashMap
的put/get
操作时,都会维护这个链表(在put
操作时,维护链表的操作在newNode(...)
方法中)。正是由于这个链表的存在,LinkedHashMap
才能保证元素有序。
代码中传入的是true
,表示按访问顺序排序,即调用完get/put
后,即访问过的元素就会被移到链表的尾部,双向链表的第一个元素就是最久未使用的地址了。
有了LinkedHashMap
的一些了解后,再来看route(int, List<String>)
就比较清晰了,实际上前面的4步:
- 清理缓存
- 创建 lruItem
- 初始化 lruItem
- 移除无用的地址
都是在构建包含有效address
的LinkedHashMap
,此时的LinkedHashMap
中首个元素就是最久未使用的address
了,接着就是取首个元素操作:
String eldestKey = lruItem.entrySet().iterator().next().getKey();
在 LinkedHashMap
中,key
与value
都是address
,这里得到的eldestKey
就是最久未使用的address
了。不过到这里还没完,还要把这个address
移到LinkedHashMap
的最后位置,下次再获取首个元素时就又是最久未使用的address
了,这就是get
的操作:
String eldestValue = lruItem.get(eldestKey);
总的来说,最近最久未使用(LRU)的实现利用了LinkedHashMap
的几个特点:
- 构造方法中
accessOrder
传true
表示按访问元素排序 accessOrder=true
的情况下,访问过的元素就会被移到链表的尾部,即首个元素就是最久未使用的元素get
的操作就是访问操作
故障转移
处理故障转移的类是ExecutorRouteFailover
,代码如下:
@Override
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer beatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> beatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// 判断是否存活
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error(e.getMessage(), e);
beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
// 结果信息
beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
.append(I18nUtil.getString("jobconf_beat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(beatResult.getCode())
.append("<br>msg:").append(beatResult.getMsg());
// executor 存活,return
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
beatResult.setMsg(beatResultSB.toString());
beatResult.setContent(address);
return beatResult;
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
}
整个方法的流程如下:
- 遍历所有的
executor
- 对每一个
executor
,判断是否存活,如果存活就返回该executor
,结束;否则就继续判断下一个executor
是否存活 - 如果最终没有存活的
executor
,返回失败
方法流程比较简单,重点在于如果判断executor
是否存活,代码中调用的是executorBiz.beat()
,跟进去就到ExecutorBizClient#beat
:
@Override
public ReturnT<String> beat() {
return XxlJobRemotingUtil.postBody(addressUrl+"beat",
accessToken, timeout, "", String.class);
}
注意到路由策略的执行都是在admin
实例,在ExecutorBizClient#beat
方法中会发送http请求到executor
来探测该executor
是否存活,admin
到executor
的通讯流程在《admin与executor通讯》一文中已经分析过了,这里我们进入executor
中对应的处理方法ExecutorBizImpl#beat
:
@Override
public ReturnT<String> beat() {
return ReturnT.SUCCESS;
}
方法中只是返回了一个成功标识,除此之外并无其他逻辑。
忙碌转移
处理忙碌转移策略的类是ExecutorRouteBusyover
,方法如下:
public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
StringBuffer idleBeatResultSB = new StringBuffer();
for (String address : addressList) {
// beat
ReturnT<String> idleBeatResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// 判断当前executor是否忙碌
idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
} catch (Exception e) {
logger.error(e.getMessage(), e);
idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
// 处理参数
idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
.append(I18nUtil.getString("jobconf_idleBeat") + ":")
.append("<br>address:").append(address)
.append("<br>code:").append(idleBeatResult.getCode())
.append("<br>msg:").append(idleBeatResult.getMsg());
// 返回成功标识,表示当前executor不忙碌,return
if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
idleBeatResult.setMsg(idleBeatResultSB.toString());
idleBeatResult.setContent(address);
return idleBeatResult;
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
}
忙碌转移
策略的代码形式与故障转移
基本一样,处理流程如下:
整个方法的流程如下:
- 遍历所有的
executor
- 对每一个
executor
,判断是否忙碌,如果不忙碌就返回该executor
,结束;否则就继续判断下一个executor
是否存活 - 如果最终没有空闲的
executor
,返回失败
检测executor
忙碌的方法为executorBiz.idleBeat(xxx)
,跟进该方法就到了ExecutorBizClient#idleBeat
:
@Override
public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout,
idleBeatParam, String.class);
}
通故障转移中的ExecutorBizClient#beat
一样,该方法也是通过http请求到executor
上来检测当前executor
是否处于空闲中。废话不多说,直接进入executor
实例上对应的处理方法ExecutorBizImpl#idleBeat
:
@Override
public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam) {
boolean isRunningOrHasQueue = false;
JobThread jobThread = XxlJobExecutor.loadJobThread(idleBeatParam.getJobId());
// 忙碌条件
if (jobThread != null && jobThread.isRunningOrHasQueue()) {
isRunningOrHasQueue = true;
}
if (isRunningOrHasQueue) {
return new ReturnT<String>(ReturnT.FAIL_CODE,
"job thread is running or has trigger queue.");
}
return ReturnT.SUCCESS;
}
代码中出现了JobThread
这个类,在《执行器揭秘》一文中,对JobThread
已经分析过,这里就不多作介绍了。
代码中,判断是否忙碌的处理为:
jobThread != null && jobThread.isRunningOrHasQueue()
对jobThread != null
条件,在jobThread
不为null
时,可能并没有任务在执行,jobThread
只是在等待任务的到来;因此还需要进一步判断,且看jobThread.isRunningOrHasQueue()
方法:
public boolean isRunningOrHasQueue() {
return running || triggerQueue.size()>0;
}
这个方法非常之简单,只是判断了running
的值与triggerQueue
的长度,两个值在哪里更改的呢?
running
我们来看看JobThread
的 run()
方法:
/** 是否执行任务的标识 */
private boolean running = false;
@Override
public void run() {
// init
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
running = false;
idleTimes++;
TriggerParam triggerParam = null;
try {
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
// 任务在执行,变更 running 的值
running = true;
...
}
}
...
}
}
从代码来看,executor
在执行该任务时,running
就会变成true
,表明此时不空闲了。
triggerQueue
关于triggerQueue
,在《执行器揭秘》一文中已经分析过,这里总结如下:
- 在
executor
接收到执行该任务的请求时,并不是直接执行该任务,只是将任务放入jobThread
中的triggerQueue
中,等待jobThread
执行。 - 如果
triggerQueue
的长度大于0,表示jobThread
来不及执行而导致任务堆积了,此时表明executor
处于忙碌中。
总结
路由策略,即选择使用哪个executor
来执行任务的策略。本文分析了xxl-job
中10大路由策略:
- 分片广播
- 第一个
- 最后一个
- 轮询
- 随机
- 一致性hash
- 最不经常使用
- 最近最久未使用
- 故障转移
- 忙碌转移
从代码的角度来分析了这些路由策略的实现过程,了解这些策略背后的实现原理后,在我们平时使用的xxl-job
过程中,可以更好地选择合适的策略。
限于作者个人水平,文中难免有错误之处,欢迎指正!原创不易,商业转载请联系作者获得授权,非商业转载请注明出处。