dolphinscheduler 中依赖节点 实现天调度任务依赖周任务

背景

最近在使用dolphinscheduler构建数仓的任务流,有一个场景是天周期调度依赖周周期调度任务。
经过一番搜索后,发现dolphinscheduler有一个任务类型叫做依赖节点,但是ds的文档写的较为简略。
不得已翻看源码,最后发现不支持,需要修改源码支持,本文记录留念。

需求

天调度任务依赖周周期调度。

比如,有一个爬虫任务A,每周一10点运行。

后续任务B,每天运行,但是依赖任务A。

基础介绍

dolhpinscheduler简介

参照官网介绍,一句话减少的话,是大数据场景下的任务调度平台。

Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

版本

3.1.5

ds 基础相关组件概念介绍

  • 任务
    ds中运行的最小单元,意义如其名字,任务,主要包含任务的一些定义,路径,参数等。
    任务定义: 即任务定义。。。,TaskDefination 对应表 t_ds_task_defination
    任务实例: 即运行的任务记录,TaskInstance,对应表 t_ds_task_instance
  • 工作流
    可以理解为可调度的一组任务,包含了一组任务,任务关系,调度配置。
    工作流定义: ProcessDefination 对应表 t_ds_process_defination
    工作流实例: 即工作流运行记录,ProcessInstance对应 t_ds_process_instance

这里只要重点知道,
1、ds中,任务是不包含具体调度的,具体的调度由工作流维护。
2、一个工作流定义包含多个任务定义
3、一个工作流实例包含多个任务实例

dolphinscheduler中的依赖设计总结。

先说结论

ds中的依赖配置


1、ds中的依赖在任务定义中配置,分为两层,每层通过逻辑【且、或】进行连接,整体的一个结构可以理解为

1
2
3
4
5
6
7
list
逻辑
,list
逻辑
,dependentItem


通过这种设计,进行复杂的组合,如【A且(B或C)】这种依赖组合。

ds中的依赖选择



2、ds的时间周期有月、周、日、时,每个周期有不同的选择,几个典型日期为”today”、”last3Days”,”thisWeek”,”lastMonday”,当天,过去三天,本周,上周,当为范围时间时,会将周期拆解到细化度,
如last3Days,则是添加了3天。
比如传入的是”2023-07-03”
那么得到的是

  • 2023-07-01 00:00:00 2023-07-01 23:59:59
  • 2023-07-02 00:00:00 2023-07-02 23:59:59
  • 2023-07-03 00:00:00 2023-07-03 23:59:59
    最终的依赖选择如下表格所示
周期 具体依赖
本月
本月初
上月
上月末
本周
上周
上周一~上周日
今天
昨天
前2天~前7天
当前小时
前1小时~前24小时

ds 中的依赖类型


ds中的依赖分为分为两类,一对一依赖和一对多依赖。

  • 一对一依赖,指的是只依赖于任务的一个实例
  • 一对多依赖,指的是可以依赖于任务的多个实例。

并依托于这两种方式,实现了同周期依赖、跨周期依赖
可以实现天周期任务依赖周周期任务,也可以实现周周期任务依赖天周期任务。
如上图所示

  • 示例一为同周期依赖,天的示例
  • 示例二为同周期依赖,前三天的示例
  • 示例三为本周依赖示例
  • 示例4为天周期任务依赖周周期任务,上周一依赖示例。

ds中的依赖判断逻辑

3、ds的依赖判断逻辑主要分为3大步

  • 根据时间和工作流code,查询最后一次在业务时间内的工作流实例。
  • 根据最后一次工作流实例和依赖任务code,查询对应的任务id。
  • 根据运行结果,添加依赖逻辑,判断最后的依赖是否完成。

这里举个例子:

有这样两个任务
workFlow1.A和workFlow2.B
其中workFlow2.B依赖workFlow1.A,周期依赖为过去三天。
比如业务日期为“2023-07-03”这天

这么WorkFlow2.B只有当
workFlow1.2023-07-01,workFlow1.2023-07-02,workFlow1.2023-07-03这三个实例都运行后,且这三个工作流实例都包含A这个任务才会执行

dolphinschduelr中的设计缺陷

这里dolphinschduelr的设计有一个致命缺陷,本人已经验证。

因为ds的设计中,调度时间是挂载工作流上的。
而提供的补数功能,支持只运行部分任务。
那么就会导致,部分任务依赖丢失。

举例: 有两个工作流,workFlow1和workFlow2
workFlow1包含A,B两个任务
workFlow2包含一个依赖任务C,该任务依赖workFlow1.A这个任务。

做如下操作:
1、补数workFlow1.A在2023-07-02天的实例,运行workFlow2.C,可以顺利运行
2、补数workFlow1.B在2023-07-02天的实例,运行workFlow2.c,此时workFlow2.C任务会被卡住。

如下图所示。

是不是挺无语的,这是因为findLastProcessInterval获取到的是最后一个运行实例,而我们对workFlow1运行了两次,最后以此为操作2,而操作2中不包含任务A的运行实例,最后导致了运行workFlow2.C任务时失败。

你可以说dolphinscheduler在工作流的设计就是需要整个跑的,这样就能避免,但补数一个不相关的任务,导致另一个任务的依赖被无效,就挺无语的。

大家知道就好,避免踩坑。

dolphinscheduler中的依赖任务详解。

那么,ds中依赖具体的逻辑是怎么样的呢?到底支持哪些功能呢?
带着这些疑惑,一起来看代码,最后再来回答这两个问题。

DependentTaskProcessor


ds中,所有的任务实例都继承自BaseTaskProcessor这个抽象类,该类主要提供了submitTask(),runTask(),killTask()等一系列方法来生产或维护任务运行的实例。

本文的主角,依赖任务DependentTaskProcessor也不例外。

了解ds的依赖逻辑,重点逻辑全在这个DependentTaskProcessor这个类上,看懂了这个类,自然也就了解了ds的依赖逻辑。

DependentTaskProcessor.runTask()

第一站,先来看runTask()是最合适不过的了,了解任务如何运行,自然了解了逻辑

1
2
3
4
5
6
7
8
9
10
11
@Override
public boolean runTask() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
}
if (allDependentItemFinished) {
getTaskDependResult();
endTask();
}
return true;
}

代码比较简单,主要就是三个方法
allDependentTaskFinish() 用于校验所有的依赖任务是否完成,不涉及依赖组合逻辑关系
getTaskDependResult() 用于获取所有的依赖任务结果,涉及依赖组合逻辑关系
endTask() 结束任务,根据任务依赖结果,将状态写入任务实例

allDependentTaskFinish()

首先来看 allDependentTaskFinish() 这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* judge all dependent tasks finish
*
* @return whether all dependent tasks finish
*/
private boolean allDependentTaskFinish() {
boolean finish = true;
for (DependentExecute dependentExecute : dependentTaskList) {
for (Map.Entry<String, DependResult> entry : dependentExecute.getDependResultMap().entrySet()) {
if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
// save depend result to log
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}",
entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {
finish = false;
}
}
return finish;
}

该方法遍历依赖任务dependentTaskList,将单个任务的依赖结果放入到这个依赖map中,进而判断任务是否完成。

这里看的有点蒙?主要是因为你不了解dependentExecute是什么?dependentTaskList是什么?dependentExecute.getDependResultMap()是什么?让我们结合操作界面,表结构来讲解,看了后自然就理解了。

dependentTaskList和dependentExecute是什么?

dependentTaskList是DependentTaskProcessor的一个属性,该属性是在initDependParameters()初始化依赖节点参数时创建的

1
this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));

dependentTaskList就是DependentExecute的集合。
DependentExecute则是依赖的抽象,这里传入了taskModel.getDependItemList()进行构造对象,需要理解taskModel.getDependItemList是什么。

1
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {

taskModel 是dependentParameters.getDependTaskList(),而dependentParameters对应的则是t_ds_task_instance 这个表的 task_params这个参数。
来看个实例。

1
2
3
4
5
6
7
8
{
"localParams": [],
"resourceList": [],
"conditionResult": "null",
"dependence": "{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"projectCode\":9803146682528,\"definitionCode\":10102284412192,\"depTaskCode\":10102281977888,\"cycle\":\"day\",\"dateValue\":\"today\",\"state\":null}]}]}",
"switchResult": "null",
"waitStartTimeout": null
}

看完后,来个对象和字段的对应关系
最重要的dependentItemList对应t_ds_task_instance.task_params.dependence.dependItemList
taskModel对应的则是 t_ds_task_instance.task_params.dependence

再来看下产品截面图进行对应加深印象

可以看到,ds在设计上主要包含两部分,依赖的任务选择,以及逻辑【且、或】的选择。
而逻辑上,设计了两层结构用于复杂逻辑的组合设计,比如【A且B且C】,【A且(B或C)】这样的逻辑。

回到代码,所以本质上,所谓的dependentItem对应的就是选择的一个依赖,外层循环对应的是taskModel,是个list,大概可以表示为list[逻辑,list[depentItem组成]],内层循环对应的则是list[depentItem]。

对应完了,再来回看allDependentTaskFinish方法,dependentExecute.finish(dependentDate),本质就是判断一个一个的所选依赖是否完成。

同时也理解了runTask方法中
allDependentTaskFinish() 用于校验所有的依赖任务是否完成,不涉及依赖组合逻辑关系
getTaskDependResult() 用于获取所有的依赖任务结果,涉及依赖组合逻辑关系
这两个的区别,一个判断只判断结果是否完成,一个结果和逻辑组合是否ok。

dependentExecute.finish(dependentDate)

了解了ds中判断完成的主逻辑,再来进入到细节,了解单个依赖的判断.

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* judge depend item finished
*
* @param currentTime current time
* @return boolean
*/
public boolean finish(Date currentTime) {
if (modelDependResult == DependResult.WAITING) {
modelDependResult = getModelDependResult(currentTime);
return false;
}
return true;
}

finished方法非常简单,跳getModelDependResult

DependentExecute.getModelDependResult

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* get model depend result
*
* @param currentTime current time
* @return DependResult
*/
public DependResult getModelDependResult(Date currentTime) {

List<DependResult> dependResultList = new ArrayList<>();

for (DependentItem dependentItem : dependItemList) {
DependResult dependResult = getDependResultForItem(dependentItem, currentTime);
if (dependResult != DependResult.WAITING) {
dependResultMap.put(dependentItem.getKey(), dependResult);
}
dependResultList.add(dependResult);
}
modelDependResult = DependentUtils.getDependResultForRelation(this.relation, dependResultList);
return modelDependResult;
}

也比较简单,遍历DependentExecute.dependItemList,上文中讲过,DependentExecute对应的是内层,即【list[item]】,逻辑,之后通过getDependResultForItem(dependentItem, currentTime)获取单个的运行结果,再结合依赖事项的逻辑关系,返回内层的依赖结果。

DependentExecute.getDependResultForItem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* get dependent item result
*
* @param item item
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependResultForItem(DependentItem item, Date currentTime) {
String key = item.getKey();
if (dependResultMap.containsKey(key)) {
return dependResultMap.get(key);
}
return getDependentResultForItem(item, currentTime);
}

getDependResultForItem很简单,跳getDependentResultForItem

DependentExecute.getDependentResultForItem

1
2
3
4
5
6
7
8
9
10
11
12
/**
* get dependent item for one dependent item
*
* @param dependentItem dependent item
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime) {
List<DateInterval> dateIntervals =
DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
return calculateResultForTasks(dependentItem, dateIntervals);
}

核心,重点,单个依赖的判断逻辑。
主要分为两步
1、根据传入的业务日期、依赖周期,计算依赖的DateInterval的集合
2、根据获取到的dateInterval的list,查询单个依赖的状态。

DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue())

这个方法根据传入的业务日期、依赖周期,计算依赖的DateInterval的集合
要理解如何获取,需要理解两个属性:currentTime和dateValue以及返回DateInterval是什么

currentTime 是什么

在调度上是业务时间,业务时间这里就不展开了,懂得都懂。
对应的字段为t_ds_process_instance.schedule_time

dateValue是什么?



上的时间周期,周期可以选择月、周、日、时,对应的期限则可以根据不同的周期选择。

DateInterval 是什么?
1
2
3
4
5
6
/**
* date interval class
*/
public class DateInterval {
private Date startTime;
private Date endTime;

很简单,就两个属性,startTIme和endTime。表示一段时间间隔。
这个会在后面查询工作流实例的时候使用,需要业务时间在表示的时间范围内。

DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue())

了解了业务日期、dateValue,dateInterval后,我们再来回到代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* get date interval list by business date and date value.
*
* @param businessDate business date
* @param dateValue date value
* @return date interval list by business date and date value.
*/
public static List<DateInterval> getDateIntervalList(Date businessDate, String dateValue) {
List<DateInterval> result = new ArrayList<>();
switch (dateValue) {
case "currentHour":
result = DependentDateUtils.getLastHoursInterval(businessDate, 0);
break;
case "last1Hour":
result = DependentDateUtils.getLastHoursInterval(businessDate, 1);
break;
......
case "last3Days":
result = DependentDateUtils.getLastDayInterval(businessDate, 3);
break;
default:
break;
}
return result;

这里因代码太长,就不写完了,比较简单,根据dateValue,跳转不同逻辑获取List

举几个例子
比如 “today”对应的时间周期选择为:天、当天
“last3Days” 对应的为:天,过去三天

其他以此类推,比较简单,这里我们重点来看下”today”、”last3Days”,”thisWeek”,”lastMonday”这四个来了解ds的依赖周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* get today day interval list
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getTodayInterval(Date businessDate) {

List<DateInterval> dateIntervals = new ArrayList<>();

Date beginTime = DateUtils.getStartOfDay(businessDate);
Date endTime = DateUtils.getEndOfDay(businessDate);
dateIntervals.add(new DateInterval(beginTime, endTime));
return dateIntervals;
}

today非常简单,比如传入”2023-07-03”,那么获取到的为”2023-07-03 00:00:00”到”2023-07-03 23:59:59”这个DateInterval.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* get last day interval list
* @param businessDate businessDate
* @param someDay someDay
* @return DateInterval list
*/
public static List<DateInterval> getLastDayInterval(Date businessDate, int someDay) {

List<DateInterval> dateIntervals = new ArrayList<>();
for (int index = someDay; index > 0; index--) {
Date lastDay = DateUtils.getSomeDay(businessDate, -index);

Date beginTime = DateUtils.getStartOfDay(lastDay);
Date endTime = DateUtils.getEndOfDay(lastDay);
dateIntervals.add(new DateInterval(beginTime, endTime));
}
return dateIntervals;
}

last3Days,则是添加了3天。
比如传入的是”2023-07-03”
那么得到的是

  • 2023-07-01 00:00:00 2023-07-01 23:59:59
  • 2023-07-02 00:00:00 2023-07-02 23:59:59
  • 2023-07-03 00:00:00 2023-07-03 23:59:59

这样一个列表。

1
2
3
4
5
6
7
8
9
/**
* get interval between monday to businessDate of this week
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getThisWeekInterval(Date businessDate) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
return getDateIntervalListBetweenTwoDates(mondayThisWeek, businessDate);
}

thisWeek获取的则是周一到今天截止的列表
比如”2023-07-03”为周一,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59

传入”2023-07-04”为周二,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59
  • 2023-07-04 00:00:00 2023-07-04 23:59:59
    以此类推
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* get interval on the day of last week
* default set monday the first day of week
* @param businessDate businessDate
* @param dayOfWeek monday:1,tuesday:2,wednesday:3,thursday:4,friday:5,saturday:6,sunday:7
* @return DateInterval list
*/
public static List<DateInterval> getLastWeekOneDayInterval(Date businessDate, int dayOfWeek) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
Date sunday = DateUtils.getSomeDay(mondayThisWeek, - 1);
Date monday = DateUtils.getMonday(sunday);
Date destDay = DateUtils.getSomeDay(monday, dayOfWeek - 1);
return getDateIntervalListBetweenTwoDates(destDay, destDay);
}

lastMonday获取的则是本周一一天的数据
比如”2023-07-03”为周一,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59

传入”2023-07-04”为周二,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59
    以此类推

看到这里,相信大家其实有了自己的猜测,ds根据依赖时间周期,计算出具体的依赖时间,之后查询依赖时间范围内的任务流实例,查看是否有符合的依此来判断是否完成。

calculateResultForTasks

了解了getDateIntervalList,我们再来看下 calculateResultForTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* calculate dependent result for one dependent item.
*
* @param dependentItem dependent item
* @param dateIntervals date intervals
* @return dateIntervals
*/
private DependResult calculateResultForTasks(DependentItem dependentItem,
List<DateInterval> dateIntervals) {

DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
if (processInstance == null) {
return DependResult.WAITING;
}
// need to check workflow for updates, so get all task and check the task state
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
result = dependResultByProcessInstance(processInstance);
} else {
result = getDependTaskResult(dependentItem.getDepTaskCode(), processInstance);
}
if (result != DependResult.SUCCESS) {
break;
}
}
return result;
}

1、遍历前文获取到的dateIntervals,也就是周期对应的具体业务日期。
2、根据definitionCode【工作流定义id】和dateInterval,查询最后一个工作流实例。
3、根据依赖任务code,在第二部获取到的任务实例中查询对应依赖的运行情况

终于来到了最后,这里具体的代码调整就补贴了,直接说最后对应的sql

findLastProcessInterval最后调用ProcessInstanceMapper.queryLastSchedulerProcess()方法,对应的sql为

1
2
3
4
5
6
7
8
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
<if test="startTime!=null and endTime != null ">
and schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime}
</if>
order by end_time desc limit 1

即根据前文计算的dateInterval,查询schedule_time调度时间在这个周期内的,根据end_tiem倒序。

比如传入”2023-07-03”today,那么运行的则是 scheduler_time>=’2023-07-03 00:00:00’ and scheduler_time<=’2023-07-03 23:59:59’

最后再来看下getDependTaskResult方法,最后核心调用TaskIntanceMapper.findValidTaskListByProcessId方法,对应的sql为

1
2
3
4
5
6
7
select
<include refid="baseSql"/>
from t_ds_task_instance
WHERE process_instance_id = #{processInstanceId}
and flag = #{flag}
order by start_time desc

即在上一步获取到的工作流实例中,查看归属于该工作流的任务实例。

至此,所有的判断逻辑已经梳理完毕

dolphinscheduler 支持周周期任务依赖天任务

了解了ds依赖节点的源码,我们知道了ds对周周期任务依赖天周期任务只支持了上周,回到我们一开始的需求

天调度任务依赖周周期调度。
比如,有一个爬虫任务A,每周一10点运行。
后续任务B,每天运行,但是依赖任务A。

在ds中本周为周期依赖本周一。

因此源码改动非常简单。
1、DependentUtils.getDateIntervalList 增加”thisMonday选项”

1
2
3
4
5
6
public static List<DateInterval> getDateIntervalList(Date businessDate, String dateValue) {
case "thisMonday":
result = DependentDateUtils.getThisWeekInterval(businessDate,1);
break;


2、1、DependentDateUtils增减getThisWeekInterval

1
2
3
4
5
public static List<DateInterval> getThisWeekInterval(Date businessDate, int dayOfWeek) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
Date destDay = DateUtils.getSomeDay(mondayThisWeek, dayOfWeek - 1);
return getDateIntervalListBetweenTwoDates(destDay, destDay);
}

3、在前端项目中增加选项“本周一”,并传值”thisMonday”

总结

总体来说,dolphinscheduler很优秀,大部分的依赖场景基本都支持了,小部分的通过自己修改源码也可以实现,虽然比不上dataworks等成熟的软件,期待后续更加优秀吧。