Java 技术探秘

人如果没有了梦想,跟闲鱼有什么区别?


  • 首页

  • 源码分析

  • 日常记录

  • 学习笔记

  • 解决方案

  • 关于

  • Search
close

XxlJob09:路由策略

Published at: 2022-06-28   |   Reading: 1442 words ~7min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 在前面配置xxl-job任务时,提到过xxl-job的路由策略,本文将来分析这些策略。 在任务的编辑页面,可以配置路由策略: 所谓的路由策略,是指在多个executor存在时,选择哪一个executor来执行任务的策略。xxl-job支持的路由策略有如下几种: 第一个:选择第一个executor 最后一个:选择最后一个executor 轮询:依次选择executor 随机:随机选择一个executor 一致性hash:使用一致性hash算法来选择executor 最不经常使用:选择最不经常使用的executor 最近最久未使用:选择最久未使用的executor 故障转移:判断当前选择的executor是否可用,若不可用,则选择另一个executor 忙碌转移:判断当前选择的executor是否处于忙碌状态,若处于,则选择另一executor 分片广播:每一个executor都会执行任务 路由策略的选择 路由策略的选择在XxlJobTrigger#processTrigger,相关代码如下: // 获取路由策略枚举类,jobInfo.getExecutorRouteStrategy() 来自于数据库 ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); .... // 路由策略的处理,区分分片广播与其他策略 if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { // 执行策略,仅仅只是为了得到执行器的地址 routeAddressResult = executorRouteStrategyEnum.getRouter() .route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } 在任务的配置时,任务的路由策略会保存在数据库中,在XxlJobTrigger#processTrigger方法中,会把路由策略拿出来以便获取路由策略的枚举类,jobInfo.getExecutorRouteStrategy()就是数据库中保存的策略。
Read More »

XxlJob08:任务执行流程(三)之执行器揭秘

Published at: 2022-06-21   |   Reading: 2778 words ~14min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 在上一篇文章中,要执行的任务终于通过ExecutorBiz.run(...)发往了executor,由此开始了executor上任务的执行操作。关于admin到executor之间的通讯,在《admin与executor通讯》一文中已详细分析过了,这里我们直接看executor进程中ExecutorBiz.run(...)方法的操作。 ExecutorBizImpl#run 获取 jobHandler 进入 ExecutorBizImpl#run 方法,内容如下: public ReturnT<String> run(TriggerParam triggerParam) { // load old:jobHandler + jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; String removeOldReason = null; // 先分析这么多,剩下的接下来再分析 ... 方法一开始,就从XxlJobExecutor中获取JobThread,然后从JobThread中获取IJobHandler。这简单的3行代码,一下子出现了3个类,下面一一来分析。 XxlJobExecutor.loadJobThread 关于XxlJobExecutor,从名称上来看,就是处理xxlJob任务执行逻辑的,在《执行器启动流程》一文中对该类的部分功能也做过分析,这里我们直接来看loadJobThread(...)相关方法的内容: private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>(); public static JobThread loadJobThread(int jobId){ JobThread jobThread = jobThreadRepository.get(jobId); return jobThread; } 在XxlJobExecutor中,有一个ConcurrentMap结构,key是jobId,value是jobThread,loadJobThread(xxx)方法就是从这个ConcurrentMap中获取jobThread的操作了。 关于JobThread放入jobThreadRepository中的操作,我们下面会分析到。 IJobHandler IJobHandler是一个接口,其中定义了3个方法: /** * job handler * * @author xuxueli 2015-12-19 19:06:38 */ public abstract class IJobHandler { /** * 处理任务的执行,当executor收到调度请求时被调用 * execute handler, invoked when executor receives a scheduling request * * @throws Exception */ public abstract void execute() throws Exception; /*@Deprecated public abstract ReturnT<String> execute(String param) throws Exception;*/ /** * 初始化 handler,当 JobThread 初始化时被调用 * init handler, invoked when JobThread init */ public void init() throws Exception { // do something } /** * 销毁 handler,当 JobThread 销毁时被调用 * destroy handler, invoked when JobThread destroy */ public void destroy() throws Exception { // do something } } 每个方法的作用代码中已给出了注释,这个IJobHandler就是执行任务的组件了,它有3个子类:
Read More »

XxlJob07:任务执行流程(二)之触发器揭秘

Published at: 2022-06-14   |   Reading: 737 words ~4min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 上一篇文章中,任务调度完成后,最终会调用JobTriggerPoolHelper.trigger(...)方法进行触发操作,本文将来分析xxl-job任务触发流程。 JobTriggerPoolHelper#trigger 关于任务的触发,直观感受应该来自于管理后台界面: 点击执行一次后,调用的接口是/jobinfo/trigger,方法是JobInfoController#triggerJob代码如下: @RequestMapping("/trigger") @ResponseBody public ReturnT<String> triggerJob(int id, String executorParam, String addressList) { if (executorParam == null) { executorParam = ""; } JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList); return ReturnT.SUCCESS; } 同上一文触发操作一样,最终调用的也是JobTriggerPoolHelper.trigger(...)方法. 继续,看看JobTriggerPoolHelper.trigger(...)方法的内容: public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } 仅仅只是调用了JobTriggerPoolHelper#addTrigger方法,那我们继续下去,看看这个方法: public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // choose thread pool ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.
Read More »

XxlJob06:任务执行流程(一)之调度器揭密

Published at: 2022-06-06   |   Reading: 1428 words ~7min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 从本文开始,我们将用三篇文章来介绍xxl-job最核心的功能——xxl-job任务执行流程。xxl-job任务的执行包含3个组件:调度器、触发器、执行器,本文将重点介绍调度器的流程。 xxl-job调度器位于admin进程,主要功能为精准获取将要执行的任务,处理调度器的方法为JobScheduleHelper#start,在《admin启动流程》一文中也提到过该方法,不过当时并未展开讨论,在本文中将详细分析该方法。 JobScheduleHelper#start 代码如下: public void start(){ // schedule thread scheduleThread = new Thread(new Runnable() { public void run() { // 省略run()方法的内容,下面再展开 ... } }); scheduleThread.setDaemon(true); scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread"); scheduleThread.start(); // ring thread ringThread = new Thread(new Runnable() { public void run() { // 省略run()方法的内容,下面再展开 ... } }); ringThread.setDaemon(true); ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread"); ringThread.start(); } 可以看到,这个方法启动了两个线程: scheduleThread:调度线程,用来获取需要执行的任务 ringThread:时间轮线程,用来精准控制任务的执行时间点 任务的调度就是依靠以上两个线程来完成的,我们继续。 scheduleThread 我们先来分析scheduleThread,查看其 run()方法: @Override public void run() { // 休眠,表示线程启动5s后才去执行任务 try { TimeUnit.
Read More »

XxlJob05:执行器注册流程

Published at: 2022-05-28   |   Reading: 571 words ~3min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 本文来分析执行器注册到admin的流程,也就是EmbedServer#startRegistry方法。 有了前面两篇文章的铺垫,我们终于可以分析executor的注册流程了,注册流程在executor启动流程中的位置如下: 我们直接进入ExecutorRegistryThread#start的方法: public void start(final String appname, final String address){ // 判断参数的合法性 if (appname==null || appname.trim().length()==0) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, " + "appname is null."); return; } // 如果没有admin服务,就不注册了 if (XxlJobExecutor.getAdminBizList() == null) { logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, " + "adminAddresses is null."); return; } registryThread = new Thread(new Runnable() { @Override public void run() { ... } }); registryThread.setDaemon(true); registryThread.setName("xxl-job, executor ExecutorRegistryThread"); registryThread.
Read More »

XxlJob04:admin与executor通讯

Published at: 2022-05-20   |   Reading: 598 words ~3min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 本文将分析执行器(executor)与admin之间的通讯。 在xxl-job中,executor与admin并不是相互独立工作的,他们之前会通过网络通讯相互协作完成任务的调度流程,比如,executor启动时,需要把自己的地址信息(ip:端口)信息告知admin;任务调度时,admin会把任务信息发送给executor,在executor上真正执行任务。这两类通讯中,前者是executor到admin的通讯,后者是admin到executor的通讯,本文接下来就分析这两类通讯。 executor到admin 处理类:AdminBiz admin对executor提供的服务定义在com.xxl.job.core.biz.AdminBiz接口中,内容如下: public interface AdminBiz { /** * callback * * @param callbackParamList * @return */ public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList); /** * registry * * @param registryParam * @return */ public ReturnT<String> registry(RegistryParam registryParam); /** * registry remove * * @param registryParam * @return */ public ReturnT<String> registryRemove(RegistryParam registryParam); } 这是一个接口,可以看到它共有3个方法: callback:处理任务回调 registry:处理执行器(executor)的注册,即将executor注册到admin registryRemove:处理执行器(executor)的下线,即将executor从admin中删除 AdminBiz有2个实现类: com.xxl.job.core.biz.client.AdminBizClient:从名称上看,它是一个客户端类,位于executor进程中,内容如下: public class AdminBizClient implements AdminBiz { // 省略属性及构造方法 .
Read More »

XxlJob03:执行器启动流程

Published at: 2022-05-14   |   Reading: 1864 words ~9min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 本文将分析xxl-job执行器(executor)的启动流程。 执行器接入流程 在分析xxl-job执行器启动流程之前,我们先来看下如何让自己的项目变成xxl-job执行器,这里以springboot框架集成为例,示例项目为xxl-job-executor-sample-springboot。 1. 引入xxl-job-core包 要使用xxl-job的功能,第一步当然是引入其依赖包了,xxl-job-core的GAV坐标如下: <!-- xxl-job-core --> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>${latest.version}</version> </dependency> 2. 新增配置类 这块是为了在项目中引入xxl-job的配置,配置类为 com.xxl.job.executor.core.config.XxlJobConfig: @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class); @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.appname}") private String appname; @Value("${xxl.job.executor.address}") private String address; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private int port; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init.
Read More »

XxlJob02:admin启动流程

Published at: 2022-05-08   |   Reading: 1778 words ~9min
注:本系列源码分析基于XxlJob 2.3.0,gitee仓库链接:https://gitee.com/funcy/xxl-job.git. 本文我们将分析xxl-job-admin的启动流程。xxl-job-admin 是一个springboot项目,直接启动com.xxl.job.admin.XxlJobAdminApplication就可以了,但是在启动过程中,xxl-job相关功能是如何初始化的呢? spring 配置类: XxlJobAdminConfig 经过本人的一番探索,发现xxl-job 相关组件的启动类为com.xxl.job.admin.core.conf.XxlJobAdminConfig: @Component public class XxlJobAdminConfig implements InitializingBean, DisposableBean { private XxlJobScheduler xxlJobScheduler; @Override public void afterPropertiesSet() throws Exception { adminConfig = this; xxlJobScheduler = new XxlJobScheduler(); xxlJobScheduler.init(); } @Override public void destroy() throws Exception { xxlJobScheduler.destroy(); } ... // 省略了属性以及setter、getter方法 } 关于这个类,几点说明如下: 该类被@Component标记,表明这是个spring bean,享有spring bean的生命同期; 该类实现了InitializingBean与DisposableBean两个接口,提供了spring bean在初始化及销毁阶段的一些操作,对应的两个方法如下: afterPropertiesSet:来自InitializingBean接口,用来处理bean在初始化时的一些操作 destroy:来自DisposableBean接口,用来处理bean在销毁时的一些操作 而实际上,XxlJobAdminConfig的afterPropertiesSet与destroy两个方法就是xxl-job启动与关闭的关键所在,而这两个方法都是调用的xxlJobScheduler的方法,接下来我们就来分析XxlJobScheduler#init与XxlJobScheduler#destroy方法。 这两个方法代码如下: 两个方法对比下可以看到,init方法正序启动了一系列组件,而destroy方法逆序关闭了一系列组件,正所谓“先启动的后关闭”。他们启动或关闭的组件如下: JobTriggerPool:任务的触发线程,用来把需要执行的任务提交到执行器 JobRegistry:任务执行器注册监听,用来监听执行器注册操作,及时移除无效的执行器 JobFailMonitor:失败任务监听,用来监听失败任务,发送告警,对于重试次数大于0的失败任务会发再次触发执行 JobComplete:任务完成监听,用来监听任务是否完成,把长时间处于运行中的任务标记为失败 JobLogReport:任务报表,用来汇总任务的整体执行情况,也是管理后台“运行报表”菜单的数据来源 JobSchedule:任务调度,用来获取接下来要执行的任务,将这些任务提交给触发线程 接下来我们就来看下这些组件的实现。 触发线程池:JobTriggerPoolHelper 触发线程池的处理类为JobTriggerPoolHelper,我们来看看它的启动方法JobTriggerPoolHelper.toStart():
Read More »

XxlJob01:环境准备

Published at: 2022-05-04   |   Reading: 122 words ~1min
关于定时任务,想必大家都不会陌生,简单的定时任务有jdk提供的定时任务、spring提供的定时任务,稍微复杂一些有quartz定时任务,分布式任务有xxl-job、elastic-job。由于项目中使用的定时任务是xxl-job,因此本系列将从源码的角度来分析xxl-job定时任务的设计与实现。 关于xxl-job的操作,xxl-job社区贴心地提供了操作文档:https://www.xuxueli.com/xxl-job/,文档不长,一遍看下来xxl-job的操作基本就会了,这块就不过多叙述了。 1. 获取xxl-job源码 既然是源码分析,当然要先获取源码了,xxl-job的github地址为https://github.com/xuxueli/xxl-job。一般来说,直接frok这个仓库就可以了,不过由于国内网络的原因,github 的访问并不稳定,比如checkout到一半网络断开、push超时等。为了规避这些问题,建议使用国内的git仓库——gitee,操作方式如下: 1.1 创建新的gitee仓库 登录个人的gitee,点击“+”号,选择“从github/gitlab导入仓库”: 1.2 导入xxl-job仓库 复制xxl-job的github地址,点击导入: 等待一会,就得到了自己的xxl-job的gitee地址了,本人得到的地址为https://gitee.com/funcy/xxl-job,并且gitee上也提供一个按钮,可以很方便地同步github上的代码: 如果有一天,你发现作者在github提交了大把代码,想及时同步到自己的gitee仓库,只要点击这个按钮就可以了,不过这样可能会覆盖自己的代码,可以通过新建分支来规避。 1.3 创建新分支 接下来,就可以基于该仓库自由操作了,继续,选择一个自己准备放项目的目录,然后clone: git clone https://gitee.com/funcy/xxl-job 接下来,需要基于 tag 创建分支(新分支命为2.3.0.LEARN) git tag git checkout 2.3.0 git checkout -b 2.3.0.LEARN 2.3.0 git push -u origin 2.3.0.LEARN 后续我们所有的分析操作就都在2.3.0.LEARN上进行了。 关于创建分支一些小疑问: 为什么要创建新分支? 主要是防止点击同步时,代码被覆盖。如果直接在master分支,或其他在github中已有的分支上进行修改,点击同步后,所作的修改都会被github上的分支覆盖,而在gitee上新建的分支则不会被覆盖 为什么要基于tag创建新分支而不是master? 一般来说,master上的代码是最新稳定版本的代码,因此会不断变化,而tag上的代码,表示打tag那一刻的代码,之后不再变化,这样就保证了同一个tag上的代码,无论什么时候都是一样的。 事实上,了解了git的tag概念后,会发现tag上的代码checkout下来后,无法再怎么修改都不能提交,即不会被修改;相反地,master或其他分支,修改后可以再提交。由于tag上的代码是不变的,为了能提交代码,必须要基于tag上的代码创建新分支 为什么选择的tag版本是2.3.0而不是其他? 因为当前最新版的xxl-job是2.3.0,当然要分析最新版的代码了。 2. xxl-job 项目结构 拿到代码后,我们来看下xxl-job的项目结构: xxl-job-admin:管理后台,提供任务管理界面,同时也是任务的调度器、触发器,本系列文章中简称为admin。 xxl-job-core:xxl-job的核心组件,同时也是xxl-job对外提供的jar包,提供了任务执行相关逻辑。 xxl-job-executor-samples:执行器示例,在本系列文章中,执行器是指集成了xxl-job任务执行功能的项目,也称为executor xxl-job-executor-sample-frameless:不集成任何框架的示例 xxl-job-executor-sample-springboot:集成了 springboot 框架的示例 关于该项目的更多细节,可以参考官方文档https://www.xuxueli.com/xxl-job/,这里截取官方的架构图: 建议先了解上图中xxl-job的组件,后面的源码分析会一步步揭开这些组件工作的原理。 3. 启动xxl-job 大致了解xxl-job的结构后,接下来我们在idea中启动xxl-job,这样也是方便我们后面调试。 3.1 执行sql脚本 xxl-job的sql脚本位于xxl-job/doc/db目录下,文件名为tables_xxl_job.sql 准备一个mysql数据库,直接执行该脚本即可。需要注意的是,如果数据库中已存在名为xxl_job数据库,执行该脚本会覆盖原来的库,需要注意下。 执行成功后,数据库中会出现如下表: 各表的功能如下: xxl_job_user: 用户信息,用来验证登录到管理后台的用户是否合法 xxl_job_info: 任务信息表,用来配置需要执行的任务 xxl_job_log: 日志表,用来记录任务的执行日志 xxl_job_log_report: 任务执行报表,用来汇总、统计任务的执行情况 xxl_job_logglue: gule模式任务表,主要记录glue模式任务的执行代码 xxl_job_registry: 执行器的注册记录表,每一个执行器都保存一条注册记录 xxl_job_group: 执行器的注册汇总表,多个执行器汇总成一条记录 xxl_job_lock: 锁表,xxl-job用来实现分布式锁的表 3.
Read More »
1

Funcy

9 Blogs
4 Categories
24 Tags
RSS
Gitee 掘金
© 2009 - 2024 Java 技术探秘
Powered by - Hugo v0.127.0
粤ICP备2024268097号-1
0%