Article / 文章中心

实现一个任务调度系统,看这篇就够了

发布时间:2022-01-28 点击数:1331

阅览一篇「守时使命结构选型」的文章时,一位网友的留言到了我:

我看过那么多所谓的教程,大部分都是教“怎么运用东西”的,没有多少是教“怎么制造东西”的,能教“怎么拷贝东西”的都现已是百里挑一,我国 软件行业,缺的是真正能够“制造东西”的程序员,而绝对不缺那些“运用东西”的程序员! ...... ”这个业界最不需求的便是“会运用XX东西的工程师”,而是“有创造力的软件工程师”!业界所有的饭碗,实质便是“有创造力的软件工程师”供给出来的啊!

写这篇文章,想和大家从头到脚说说使命调度,期望大家读完之后,能够了解完成一个使命调度体系的中心逻辑。

1 Quartz

Quartz是一款Java开源使命调度结构,也是许多Java工程师触摸使命调度的起点。

下图显现了使命调度的全体流程:

Quartz的中心是三个组件。

  • 使命:Job 用于表明被调度的使命;
  • 触发器:Trigger 界说调度时刻的元素,即依照什么时刻规矩去履行使命。一个Job能够被多个Trigger相关,可是一个Trigger 只能相关一个Job;
  • 调度器 :工厂类创立Scheduler,依据触发器界说的时刻规矩调度使命。

上图代码中Quartz 的JobStore是 RAMJobStore,Trigger 和 Job 存储在内存中。

履行使命调度的中心类是 QuartzSchedulerThread 。

  1. 调度线程从JobStore中获取需求履行的的触发器列表,并修正触发器的状况;
  2. Fire触发器,修正触发器信息(下次履行触发器的时刻,以及触发器状况),并存储起来。
  3. 最终创立详细的履行使命目标,经过worker线程池履行使命。

接下来再聊聊 Quartz 的集群布置计划。

Quartz的集群布置计划,需求针对不同的数据库类型(MySQL , ORACLE) 在数据库实例上创立Quartz表,JobStore是: JobStoreSupport 。

这种计划是分布式的,没有担任会集办理的节点,而是利用数据库行级锁的方式来完成集群环境下的并发操控。

scheduler实例在集群方式下首要获取{0}LOCKS表中的行锁,Mysql 获取行锁的句子:

{0}会替换为装备文件默许装备的QRTZ_。sched_name为运用集群的实例名,lock_name便是行级锁名。Quartz主要有两个行级锁触发器拜访锁 (TRIGGER_ACCESS) 和 状况拜访锁(STATE_ACCESS)。

这个架构处理了使命的分布式调度问题,同一个使命只能有一个节点运转,其他节点将不履行使命,当碰到许多短使命时,各个节点频频的竞赛数据库锁,节点越多功用就会越差。

2 分布式锁方式

Quartz的集群方式能够水平扩展,也能够分布式调度,但需求事务方在数据库中增加对应的表,有必定的强侵入性。

有不少研发同学为了防止这种侵入性,也探究出分布式锁方式

事务场景:电商项目,用户下单后一段时刻没有付款,体系就会在超时后封闭该订单。

一般咱们会做一个守时使命每两分钟来检查前半小时的订单,将没有付款的订单列表查询出来,然后对订单中的商品进行库存的恢复,然后将该订单设置为无效。

咱们运用Spring Schedule的方式做一个守时使命。

@Scheduled(cron = "0 */2 * * * ? ") public void doTask() {
   log.info("守时使命发动"); //履行封闭订单的操作 orderService.closeExpireUnpayOrders();
   log.info("守时使命完毕");
 }

在单服务器运转正常,考虑到高可用,事务量激增,架构会演进成集群方式,在同一时刻有多个服务履行一个守时使命,有可能会导致事务紊乱。

处理计划是在使命履行的时分,运用Redis 分布式锁来处理这类问题。

@Scheduled(cron = "0 */2 * * * ? ") public void doTask() {
    log.info("守时使命发动");
    String lockName = "closeExpireUnpayOrdersLock";
    RedisLock redisLock = redisClient.getLock(lockName); //尝试加锁,最多等候3秒,上锁今后5分钟主动解锁 boolean locked = redisLock.tryLock(3, 300, TimeUnit.SECONDS); if(!locked){
        log.info("没有获得分布式锁:{}" , lockName); return;
    } try{ //履行封闭订单的操作 orderService.closeExpireUnpayOrders();
    } finally {
       redisLock.unlock();
    }
    log.info("守时使命完毕");
}

Redis的读写功用极好,分布式锁也比Quartz数据库行级锁更轻量级。当然Redis锁也能够替换成Zookeeper锁,也是同样的机制。

在小型项目中,运用:守时使命结构(Quartz/Spring Schedule)和 分布式锁(redis/zookeeper)有不错的效果。

可是呢?咱们能够发现这种组合有两个问题:

  1. 守时使命在分布式场景下有空跑的状况,并且使命也无法做到分片;
  2. 要想手艺触发使命,必须增加额外的代码才能完结。

3 ElasticJob-Lite 结构

ElasticJob-Lite 定位为轻量级无中心化处理计划,运用 jar 的方式供给分布式使命的和谐服务。
官网架构图

运用内部界说使命类,完成SimpleJob接口,编写自己使命的实际事务流程即可。

public class MyElasticJob implements SimpleJob {
    @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... }
    }
}

举例:运用A有五个使命需求履行,分别是A,B,C,D,E。使命E需求分红四个子使命,运用布置在两台机器上。

运用A在发动后, 5个使命经过 Zookeeper 和谐后被分配到两台机器上,经过Quartz Scheduler 分开履行不同的使命。

ElasticJob 从实质上来讲 ,底层使命调度仍是经过 Quartz ,比较Redis分布式锁 或者 Quartz 分布式布置 ,它的优势在于能够依靠 Zookeeper 这个大杀器 ,将使命经过负载均衡算法分配给运用内的 Quartz Scheduler容器。

从运用者的角度来讲,是十分简略易用的。但从架构来看,调度器和履行器仍然在同一个运用方JVM内,并且容器在发动后,仍然需求做负载均衡。运用假设频频的重启,不断的去选主,对分片做负载均衡,这些都是相对比较的操作。

另外,ElasticJob 的操控台是比较粗糙的,经过读取注册中心数据展现作业状况,更新注册中心数据修正全局使命装备。

4 中心化流派

中心化的原理是:把调度和使命履行,隔离成两个部分:调度中心和履行器。调度中心模块只需求担任使命调度属性,触发调度指令。履行器接收调度指令,去履行详细的事务逻辑,并且两者都能够进行分布式扩容。

4.1 MQ方式

先谈谈我在艺龙促销团队触摸的第一种中心化架构。

调度中心依靠Quartz集群方式,当使命调度时分,发送音讯到RabbitMQ 。事务运用收到使命音讯后,消费使命信息。

这种模型充分利用了MQ解耦的特性,调度中心发送使命,运用方作为履行器的人物,接收使命并履行。

但这种规划强依靠音讯队列,可扩展性和功用,体系负载都和音讯队列有极大的相关。这种架构规划需求架构师对音讯队列十分熟悉。

4.2 XXL-JOB

XXL-JOB 是一个分布式使命调度平台,其中心规划目标是开发敏捷、学习简略、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。

xxl-job 2.3.0架构图

咱们重点剖析下架构图 :

▍ 网络通讯 server-worker 模型

调度中心和履行器 两个模块之间通讯是 server-worker 方式。调度中心本身便是一个SpringBoot 工程,发动会监听8080端口。

履行器发动后,会发动内置服务( EmbedServer )监听9994端口。这样两边都能够给对方发送指令。

那调度中心怎么知道履行器的地址信息呢 ?上图中,履行器会守时发送注册指令 ,这样调度中心就能够获取在线的履行器列表。

经过履行器列表,就能够依据使命装备的路由策略挑选节点履行使命。常见的路由策略有如下三种:

  • 随机节点履行:挑选集群中一个可用的履行节点履行调度使命。适用场景:离线订单结算。

  • 广播履行:在集群中所有的履行节点分发调度使命并履行。适用场景:批量更新运用本地缓存。

  • 分片履行:依照用户自界说分片逻辑进行拆分,分发到集群中不同节点并行履行,提升资源利用效率。适用场景:海量日志统计。

▍ 调度器

调度器是使命调度体系里边十分中心的组件。XXL-JOB 的早期版别是依靠Quartz。

但在v2.1.0版别中彻底去掉了Quartz的依靠,原来需求创立的 Quartz表也替换成了自研的表。

中心的调度类是:JobTriggerPoolHelper 。调用start办法后,会发动两个线程:scheduleThread 和 ringThread 。

首要 scheduleThread 会守时从数据库加载需求调度的使命,这里从实质上仍是根据数据库行锁确保一同只有一个调度中心节点触发使命调度。

Connection conn = XxlJobAdminConfig.getAdminConfig()
                  .getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update");
preparedStatement.execute();
# 触发使命调度 (伪代码) for (XxlJobInfo jobInfo: scheduleList) { // 省掉代码 }
# 事务提交
conn.commit();

调度线程会依据使命的「下次触发时刻」,采取不同的动作:

已过期的使命需求马上履行的,直接放入线程池中触发履行 ,五秒内需求履行的使命放到 ringData 目标里。

ringThread 发动后,守时从 ringData 目标里获取需求履行的使命列表 ,放入到线程池中触发履行。


5 自研在巨人的肩膀上

2018年,我有一段自研使命调度体系的经历。

背景是:兼容技能团队自研的RPC结构,技能团队不需求修正代码,RPC注解办法能够托管在使命调度体系中,直接当做一个使命来履行。

自研过程中,研读了XXL-JOB 源码,一同从阿里云分布式使命调度 SchedulerX 吸取了许多养分。

SchedulerX 1.0 架构图

  • Schedulerx-console 是使命调度的操控台,用于创立、办理守时使命。担任数据的创立、修正和查询。在产品内部与 schedulerx-server 交互。
  • Schedulerx-server 是使命调度的服务端,是 Scheduler的中心组件。担任客户端使命的调度触发以及使命履行状况的监测。
  • Schedulerx-client 是使命调度的客户端。每个接入客户端的运用进程便是一个的 Worker。 Worker 担任与 schedulerx-server 建立通信,让 schedulerx-server发现客户端的机器。 并向schedulerx-server注册当前运用地点的分组,这样 schedulerx-server 才能向客户端守时触发使命。

咱们模仿了SchedulerX的模块,架构规划如下图:

我挑选了 RocketMQ 源码的通讯模块 remoting 作为自研调度体系的通讯结构。根据如下两点:

  1. 我对业界大名鼎鼎的 Dubbo不熟悉,而remoting我现已做了多个轮子,我信任自己能够搞定;
  2. 在阅览 SchedulerX 1.0 client 源码中,发现 SchedulerX 的通讯结构和RocketMQ Remoting许多地方都很类似。它的源码里有现成的工程完成,彻底便是一个瑰宝。

我将 RocketMQ remoting 模块去掉名字服务代码,做了必定程度的定制。

在RocketMQ的remoting里,服务端选用 Processor 方式。

调度中心需求注册两个处理器:回调效果处理器CallBackProcessor和心跳处理器HeartBeatProcessor 。履行器需求注册触发使命处理器TriggerTaskProcessor 。

public void registerProcessor( int requestCode,
             NettyRequestProcessor processor,
             ExecutorService executor);

处理器的接口:

public interface NettyRequestProcessor { RemotingCommand processRequest(
                 ChannelHandlerContext ctx,
                 RemotingCommand request) throws Exception; boolean rejectRequest();
}

关于通讯结构来讲,我并不需求关注通讯细节,只需求完成处理器接口即可。

以触发使命处理器TriggerTaskProcessor举例:

搞定网络通讯后,调度器怎么规划 ?终究我仍是挑选了Quartz 集群方式。主要是根据以下几点原因:

  1. 调衡量不大的状况下 ,Quartz 集群方式满足安稳,并且能够兼容原来的XXL-JOB使命;
  2. 运用时刻轮的话,本身没有满足的实践经验,忧虑出问题。 另外,怎么让使命经过不同的调度服务(schedule-server)触发, 需求有一个和谐器。于是想到Zookeeper。但这样的话,又引进了新的组件。
  3. 研发周期不能太长,想快点出效果。

自研版的调度服务花费一个半月上线了。体系运转十分安稳,研发团队接入也很顺利。 调衡量也不大 ,四个月一共挨近4000万到5000万之间的调衡量。

坦率的讲,自研版的瓶颈,我的脑海里经常能看到。 数据量大,我能够搞定分库分表,但 Quartz 集群根据行级锁的方式 ,注定上限不会太高。

为了免除心中的困惑,我写一个轮子DEMO看看可否work:

  1. 去掉外置的注册中心,调度服务(schedule-server)办理会话;
  2. 引进zookeeper,经过zk和谐调度服务。可是HA机制很粗糙,相当于一个使命调度服务运转,另一个服务standby;
  3. Quartz 替换成时刻轮 (参阅Dubbo里的时刻轮源码)。

这个Demo版别在开发环境能够运转,但有许多细节需求优化,仅仅是个玩具,并没有时机运转到出产环境。

最近读阿里云的一篇文章《怎么经过使命调度完成百万规矩报警》,SchedulerX2.0 高可用架构见下图:

文章说到:

每个运用都会做三备份,经过 zk 抢锁,一主两备,如果某台 Server 挂了,会进行 failover,由其他 Server 接收调度使命。

这次自研使命调度体系从架构来讲,并不杂乱,完成了XXL-JOB的中心功用,也兼容了技能团队的RPC结构,但并没有完成工作流以及mapreduce分片。

SchedulerX 在升级到2.0之后根据全新的Akka 架构,这种架构声称完成高功用工作流引擎,完成进程间通信,减少网络通讯代码。

在我调研的开源使命调度体系中,PowerJob也是根据Akka 架构,一同也完成了工作流和MapReduce履行方式。

我对PowerJob十分感兴趣,也会在学习实践后输出相关文章,敬请期待。

6 技能选型

首要咱们将使命调度开源产品和商业产品 SchedulerX 放在一同,生成一张对照表:

Quartz 和 ElasticJob从实质上仍是归于结构的层面。

中心化产品从架构上来讲愈加清晰,调度层面更灵活,能够支撑更杂乱的调度(mapreduce动态分片,工作流)。

XXL-JOB 从产品层面现已做到极简,开箱即用,调度方式能够满足大部分研发团队的需求。简略易用 + 能打,所以十分受大家欢迎。

其实每个技能团队的技能储备不尽相同,面对的场景也不一样,所以技能选型并不能一概而论。

不管是运用哪种技能,在编写使命事务代码时,仍是需求注意两点:

  • 幂等。当使命被重复履行的时分,或者分布式锁失效的时分,程序仍然能够输出正确的效果;
  • 使命不跑了,千万别惊慌。检查调度日志,JVM层面运用Jstack指令检查仓库,网络通讯要增加超时时刻 ,一般能处理大部分问题。

7 写到最终

2015年其实是十分有趣的一年。ElasticJob 和 XXL-JOB 这两种不同流派的使命调度项目都开源了。

在 XXL-JOB 源码里,至今还保留着许雪里教师在开源我国的一条动态截图:

刚写的使命调度结构 ,Web动态办理使命,实时收效,热乎的。没有意外的话,明天正午推送到git.osc上去。哈哈,下楼炒个面加个荷包蛋庆祝下。

看到这个截图,内心深处居然会有一种共情,嘴角不自禁的上扬。

我又想起:2016年,ElasticJob的作者张亮教师开源了sharding-jdbc 。我在github上创立了一个私有项目,参阅sharding-jdbc的源码,自己完成分库分表的功用。第一个类名叫:ShardingDataSource,时刻定格在 2016/3/29。

我不知道怎么界说“有创造力的软件工程师”,但我信任:一个有好奇心,努力学习,乐于分享,乐意去协助他人的工程师,运气肯定不会太差。


觉得对您有协助的话,请给作者一个「点赞」和「保藏」,咱们下期见。