0%

引言

指标体系建设,是目前比较流行的一个概念,不管是以指标引导公司业务开展,还是面试,做数仓的基本绕不开这一话题,本文以本人浅薄的理解,尝试解答次问题。

什么是指标体系建设?

从字面理解,指标体系就是一组指标的集合,因此需要理解指标和体系。

什么是指标?

指标(Indicator)是用于衡量、评估或展示某个特定现象、过程、项目或系统性能的量化或定性的标准。是一种抽象定义的数值,用于衡量业务的量化标准。

指标的主要特点包括:

  • 可量化:指标通常可以用数字来表示,这使得它们易于比较和分析。
  • 相关性:指标应与所衡量的目标或问题直接相关,能够提供有价值的信息。
  • 可比较性:指标应该能够在不同时间点、不同群体或不同情境下进行比较。
  • 可操作性:好的指标应该能够指导行动,帮助决策者理解何时以及如何采取行动以改善状况。
  • 时效性:指标应提供及时的信息,以便决策者能够快速响应变化。
  • 简洁性:有效的指标应该简洁明了,避免复杂和冗余,以便用户容易理解。

在构建指标时,重要的是确保它们是有意义的、可实现的,并且能够真实反映业务或研究的核心需求。常见的指标类型包括关键绩效指标(KPIs)、业务指标(BIs)、运营指标(OIs)等,它们可以是财务的、客户满意度的、生产效率的、市场表现的等等。

什么是指标体系?

指标体系是一个由一系列相互关联的指标组成的系统,是用于衡量业务发展状况的集合。旨在提供一个全面的视角,帮助决策者理解业务状况、识别趋势、发现问题并制定策略。

指标体系通常包括以下几个关键特点:

  • 目标导向:指标体系与组织的战略目标和业务目标紧密相关,确保所选指标能够支持这些目标的实现。
  • 层次结构:指标体系往往具有层次性,从宏观的战略层面到微观的执行层面,不同层级的指标相互关联,共同构成一个完整的评估框架。
  • 可量化:指标应该是可量化的,这意味着它们可以通过数值或数据来衡量,以便进行比较和分析。
  • 动态性:随着业务环境和战略的变化,指标体系需要不断地进行调整和优化,以保持其相关性和有效性。
  • 透明度:指标体系应该是透明的,确保所有相关人员都能理解指标的含义和重要性,以及如何使用这些指标来指导行动。
  • 可操作性:指标不仅需要能够反映业务状况,还应该能够指导实际的业务操作,帮助团队采取行动以改善绩效。

构建一个有效的指标体系对于任何组织来说都是至关重要的,因为它可以帮助确保资源的合理分配,提高效率,促进持续改进,并最终实现组织的战略目标。

指标体系的价值是什么

指标体系的价值主要体现在以下几个方面:

  • 全面支持决策:指标体系提供了量化的数据支持,帮助管理层和决策者更准确地理解业务状况,从而做出更加理性和科学的决策。通过分析关键指标,决策者可以洞察业务趋势,预测未来发展方向,并据此调整战略。

  • 指导业务运营:指标体系中的细分指标能够反映用户行为和业务流程的实时数据,为运营团队提供即时反馈。这有助于运营人员了解产品或服务的表现,及时调整策略,优化用户体验,提高运营效率。

  • 驱动用户增长:通过分析用户行为数据,指标体系可以帮助企业深入理解用户需求和偏好,从而设计更有效的用户获取、留存和转化策略。这有助于企业发现新的增长点,实现用户基数的扩大和用户价值的提升。

  • 统一统计口径:在企业内部,不同部门和团队可能会使用不同的数据标准和定义。一个统一的指标体系可以确保数据的一致性和可比性,避免因统计口径不一致而导致的误解和决策失误。

  • 提高分析效率:一个结构化的指标体系可以减少重复的数据分析工作,提高分析效率。通过标准化的指标,分析师可以更快地定位问题,进行深入分析,并提出改进建议。

  • 促进数据驱动文化:指标体系的建立和应用有助于培养企业内部的数据驱动文化,鼓励员工基于数据进行工作,从而提高整体的业务执行效率和创新能力。

  • 支持持续改进:指标体系不仅用于监控当前表现,还可以用于追踪改进措施的效果。通过持续监控关键指标,企业可以评估策略的有效性,并根据反馈进行调整,实现持续的业务优化。

总之,一个有效的指标体系是企业实现数据驱动决策、提升运营效率、促进业务增长的关键工具。它能够帮助企业更好地理解市场和用户,优化资源分配,提高竞争力。

指标体系举例


通过逐层拆解,实现战略到执行的闭环,通过对数据的量化监控、诊断和预测,实现“怎么样”、“为什么”、以及“怎么干”

为什么要进行指标体系建设


经营环境变化,企业需要转向高质量发展,而数字化经营是驱动企业高质量发展的必由之路。


通过数字化经营体系建设,可以实现如下价值:

  • 看数更准更丰富
  • 业务智能化重塑
  • 数据创造新业务

市面上的指标体系建设有哪些

指标体系建设为今年来新出概念,暂无权威,可以参考互联网大厂的建设思路。

如何进行指标体系建设

想要建设好指标体系,一般需要一个优秀的指标平台,如果没有指标平台,可以以文档暂时代替,本书暂时假设有指标平台。

建设指标体系,主要分为三步:

  • 指标体系设计
  • 指标平台建设
  • 指标平台运营

指标体系设计

设计的目标

  1. 映射业务逻辑经营。
  2. 统一指标口径。
  3. 牵引企业数据底座建设。

好的指标体系标准

  1. 业务好理解,具备实际的业务意义。
  2. 各指标不是孤立存在,相互间存在业务关联构成指标体系,系统化反应业务逻辑。
  3. 不只包含结果指标,同时含有过程指标,从而指导业务执行。
  4. 设计的指标可开发,落地。

指标体系的设计流程

  1. 北极星指标设计:北极星指标是最特殊的指标,是公司唯一重要的指标,指引组织成员超同一目标努力。

  2. 指标拆解:将北极星指标层层拆解为更容易落地的指标,本质上是对目标的拆解,即把战略目标按照一定的业务逻辑拆解到更小的业务目标。

  3. 过程指标设计:基于战略目标层层拆解下来的业务目标,制定相应的业务策略,并设计衡量业务策略有效性的过程指标,实现战略目标落地到具体业务策略上。
    一般的方法模型为OSM模型。O代表业务目标(Object),S代表实现目标的业务策略(strategy),M代表衡量业务策略是否有效的指标(Measure)。

  4. 全局框架设计:把公司所有指标分门别类组织管理起来。

指标平台建设

  1. 实现指标资产的统一管理。
  2. 实现灵活高效的指标开发。
  3. 支撑高性能的指标查询。
  4. 支持丰富的指标应用。

指标平台运营


指标平台运营有两方面:第一,在企业内部推广更多的指标应用和用户,让指标平台用起来,持续产生业务价值;第二,根据指标应用需求不断迭代指标体系,同时制定和执行指标管理的制度和流程,保证指标数据质量。

参考文章

最近读了一系列指标建设体系,加深了本人对市面上指标体系建设这一概念的理解。

阅读文章

阅读总结

总的来说,市面上的指标体系建设一共有两个概念,这两个概念指向的东西完全不同

  • 一个是偏产品运营分析的,主要用于拆解业务,解决我们要什么指标,这些指标怎么反应业务,帮助业务解决问题。常用的方法论为osm模型,具体场景模型为ujm模型,aarrr海盗模型,最终体现为报表、看板、仪表盘等分析产品。

  • 还一种就是系统化的,主要用于规范化、流程化建设数据指标,解决的问题是:怎么找指标?怎么建指标?怎么管理指标?常用的方法论为onedata指标拆解,维度建模。最终体现为各种数据平台前后台产品,数据地图,数据血缘,指标管理等。

阅读笔记

背景

最近在使用dolphinscheduler构建数仓的任务流,有一个场景是天周期调度依赖周周期调度任务。
经过一番搜索后,发现dolphinscheduler有一个任务类型叫做依赖节点,但是ds的文档写的较为简略。
不得已翻看源码,最后发现不支持,需要修改源码支持,本文记录留念。

需求

天调度任务依赖周周期调度。

比如,有一个爬虫任务A,每周一10点运行。

后续任务B,每天运行,但是依赖任务A。

基础介绍

dolhpinscheduler简介

参照官网介绍,一句话减少的话,是大数据场景下的任务调度平台。

Apache DolphinScheduler 是一个分布式易扩展的可视化DAG工作流任务调度开源系统。适用于企业级场景,提供了一个可视化操作任务、工作流和全生命周期数据处理过程的解决方案。

版本

3.1.5

ds 基础相关组件概念介绍

  • 任务
    ds中运行的最小单元,意义如其名字,任务,主要包含任务的一些定义,路径,参数等。
    任务定义: 即任务定义。。。,TaskDefination 对应表 t_ds_task_defination
    任务实例: 即运行的任务记录,TaskInstance,对应表 t_ds_task_instance
  • 工作流
    可以理解为可调度的一组任务,包含了一组任务,任务关系,调度配置。
    工作流定义: ProcessDefination 对应表 t_ds_process_defination
    工作流实例: 即工作流运行记录,ProcessInstance对应 t_ds_process_instance

这里只要重点知道,
1、ds中,任务是不包含具体调度的,具体的调度由工作流维护。
2、一个工作流定义包含多个任务定义
3、一个工作流实例包含多个任务实例

dolphinscheduler中的依赖设计总结。

先说结论

ds中的依赖配置


1、ds中的依赖在任务定义中配置,分为两层,每层通过逻辑【且、或】进行连接,整体的一个结构可以理解为

1
2
3
4
5
6
7
list
逻辑
,list
逻辑
,dependentItem


通过这种设计,进行复杂的组合,如【A且(B或C)】这种依赖组合。

ds中的依赖选择



2、ds的时间周期有月、周、日、时,每个周期有不同的选择,几个典型日期为”today”、”last3Days”,”thisWeek”,”lastMonday”,当天,过去三天,本周,上周,当为范围时间时,会将周期拆解到细化度,
如last3Days,则是添加了3天。
比如传入的是”2023-07-03”
那么得到的是

  • 2023-07-01 00:00:00 2023-07-01 23:59:59
  • 2023-07-02 00:00:00 2023-07-02 23:59:59
  • 2023-07-03 00:00:00 2023-07-03 23:59:59
    最终的依赖选择如下表格所示
周期 具体依赖
本月
本月初
上月
上月末
本周
上周
上周一~上周日
今天
昨天
前2天~前7天
当前小时
前1小时~前24小时

ds 中的依赖类型


ds中的依赖分为分为两类,一对一依赖和一对多依赖。

  • 一对一依赖,指的是只依赖于任务的一个实例
  • 一对多依赖,指的是可以依赖于任务的多个实例。

并依托于这两种方式,实现了同周期依赖、跨周期依赖
可以实现天周期任务依赖周周期任务,也可以实现周周期任务依赖天周期任务。
如上图所示

  • 示例一为同周期依赖,天的示例
  • 示例二为同周期依赖,前三天的示例
  • 示例三为本周依赖示例
  • 示例4为天周期任务依赖周周期任务,上周一依赖示例。

ds中的依赖判断逻辑

3、ds的依赖判断逻辑主要分为3大步

  • 根据时间和工作流code,查询最后一次在业务时间内的工作流实例。
  • 根据最后一次工作流实例和依赖任务code,查询对应的任务id。
  • 根据运行结果,添加依赖逻辑,判断最后的依赖是否完成。

这里举个例子:

有这样两个任务
workFlow1.A和workFlow2.B
其中workFlow2.B依赖workFlow1.A,周期依赖为过去三天。
比如业务日期为“2023-07-03”这天

这么WorkFlow2.B只有当
workFlow1.2023-07-01,workFlow1.2023-07-02,workFlow1.2023-07-03这三个实例都运行后,且这三个工作流实例都包含A这个任务才会执行

dolphinschduelr中的设计缺陷

这里dolphinschduelr的设计有一个致命缺陷,本人已经验证。

因为ds的设计中,调度时间是挂载工作流上的。
而提供的补数功能,支持只运行部分任务。
那么就会导致,部分任务依赖丢失。

举例: 有两个工作流,workFlow1和workFlow2
workFlow1包含A,B两个任务
workFlow2包含一个依赖任务C,该任务依赖workFlow1.A这个任务。

做如下操作:
1、补数workFlow1.A在2023-07-02天的实例,运行workFlow2.C,可以顺利运行
2、补数workFlow1.B在2023-07-02天的实例,运行workFlow2.c,此时workFlow2.C任务会被卡住。

如下图所示。

是不是挺无语的,这是因为findLastProcessInterval获取到的是最后一个运行实例,而我们对workFlow1运行了两次,最后以此为操作2,而操作2中不包含任务A的运行实例,最后导致了运行workFlow2.C任务时失败。

你可以说dolphinscheduler在工作流的设计就是需要整个跑的,这样就能避免,但补数一个不相关的任务,导致另一个任务的依赖被无效,就挺无语的。

大家知道就好,避免踩坑。

dolphinscheduler中的依赖任务详解。

那么,ds中依赖具体的逻辑是怎么样的呢?到底支持哪些功能呢?
带着这些疑惑,一起来看代码,最后再来回答这两个问题。

DependentTaskProcessor


ds中,所有的任务实例都继承自BaseTaskProcessor这个抽象类,该类主要提供了submitTask(),runTask(),killTask()等一系列方法来生产或维护任务运行的实例。

本文的主角,依赖任务DependentTaskProcessor也不例外。

了解ds的依赖逻辑,重点逻辑全在这个DependentTaskProcessor这个类上,看懂了这个类,自然也就了解了ds的依赖逻辑。

DependentTaskProcessor.runTask()

第一站,先来看runTask()是最合适不过的了,了解任务如何运行,自然了解了逻辑

1
2
3
4
5
6
7
8
9
10
11
@Override
public boolean runTask() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
}
if (allDependentItemFinished) {
getTaskDependResult();
endTask();
}
return true;
}

代码比较简单,主要就是三个方法
allDependentTaskFinish() 用于校验所有的依赖任务是否完成,不涉及依赖组合逻辑关系
getTaskDependResult() 用于获取所有的依赖任务结果,涉及依赖组合逻辑关系
endTask() 结束任务,根据任务依赖结果,将状态写入任务实例

allDependentTaskFinish()

首先来看 allDependentTaskFinish() 这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* judge all dependent tasks finish
*
* @return whether all dependent tasks finish
*/
private boolean allDependentTaskFinish() {
boolean finish = true;
for (DependentExecute dependentExecute : dependentTaskList) {
for (Map.Entry<String, DependResult> entry : dependentExecute.getDependResultMap().entrySet()) {
if (!dependResultMap.containsKey(entry.getKey())) {
dependResultMap.put(entry.getKey(), entry.getValue());
// save depend result to log
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}",
entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {
finish = false;
}
}
return finish;
}

该方法遍历依赖任务dependentTaskList,将单个任务的依赖结果放入到这个依赖map中,进而判断任务是否完成。

这里看的有点蒙?主要是因为你不了解dependentExecute是什么?dependentTaskList是什么?dependentExecute.getDependResultMap()是什么?让我们结合操作界面,表结构来讲解,看了后自然就理解了。

dependentTaskList和dependentExecute是什么?

dependentTaskList是DependentTaskProcessor的一个属性,该属性是在initDependParameters()初始化依赖节点参数时创建的

1
this.dependentTaskList.add(new DependentExecute(taskModel.getDependItemList(), taskModel.getRelation()));

dependentTaskList就是DependentExecute的集合。
DependentExecute则是依赖的抽象,这里传入了taskModel.getDependItemList()进行构造对象,需要理解taskModel.getDependItemList是什么。

1
for (DependentTaskModel taskModel : dependentParameters.getDependTaskList()) {

taskModel 是dependentParameters.getDependTaskList(),而dependentParameters对应的则是t_ds_task_instance 这个表的 task_params这个参数。
来看个实例。

1
2
3
4
5
6
7
8
{
"localParams": [],
"resourceList": [],
"conditionResult": "null",
"dependence": "{\"relation\":\"AND\",\"dependTaskList\":[{\"relation\":\"AND\",\"dependItemList\":[{\"projectCode\":9803146682528,\"definitionCode\":10102284412192,\"depTaskCode\":10102281977888,\"cycle\":\"day\",\"dateValue\":\"today\",\"state\":null}]}]}",
"switchResult": "null",
"waitStartTimeout": null
}

看完后,来个对象和字段的对应关系
最重要的dependentItemList对应t_ds_task_instance.task_params.dependence.dependItemList
taskModel对应的则是 t_ds_task_instance.task_params.dependence

再来看下产品截面图进行对应加深印象

可以看到,ds在设计上主要包含两部分,依赖的任务选择,以及逻辑【且、或】的选择。
而逻辑上,设计了两层结构用于复杂逻辑的组合设计,比如【A且B且C】,【A且(B或C)】这样的逻辑。

回到代码,所以本质上,所谓的dependentItem对应的就是选择的一个依赖,外层循环对应的是taskModel,是个list,大概可以表示为list[逻辑,list[depentItem组成]],内层循环对应的则是list[depentItem]。

对应完了,再来回看allDependentTaskFinish方法,dependentExecute.finish(dependentDate),本质就是判断一个一个的所选依赖是否完成。

同时也理解了runTask方法中
allDependentTaskFinish() 用于校验所有的依赖任务是否完成,不涉及依赖组合逻辑关系
getTaskDependResult() 用于获取所有的依赖任务结果,涉及依赖组合逻辑关系
这两个的区别,一个判断只判断结果是否完成,一个结果和逻辑组合是否ok。

dependentExecute.finish(dependentDate)

了解了ds中判断完成的主逻辑,再来进入到细节,了解单个依赖的判断.

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* judge depend item finished
*
* @param currentTime current time
* @return boolean
*/
public boolean finish(Date currentTime) {
if (modelDependResult == DependResult.WAITING) {
modelDependResult = getModelDependResult(currentTime);
return false;
}
return true;
}

finished方法非常简单,跳getModelDependResult

DependentExecute.getModelDependResult

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* get model depend result
*
* @param currentTime current time
* @return DependResult
*/
public DependResult getModelDependResult(Date currentTime) {

List<DependResult> dependResultList = new ArrayList<>();

for (DependentItem dependentItem : dependItemList) {
DependResult dependResult = getDependResultForItem(dependentItem, currentTime);
if (dependResult != DependResult.WAITING) {
dependResultMap.put(dependentItem.getKey(), dependResult);
}
dependResultList.add(dependResult);
}
modelDependResult = DependentUtils.getDependResultForRelation(this.relation, dependResultList);
return modelDependResult;
}

也比较简单,遍历DependentExecute.dependItemList,上文中讲过,DependentExecute对应的是内层,即【list[item]】,逻辑,之后通过getDependResultForItem(dependentItem, currentTime)获取单个的运行结果,再结合依赖事项的逻辑关系,返回内层的依赖结果。

DependentExecute.getDependResultForItem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* get dependent item result
*
* @param item item
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependResultForItem(DependentItem item, Date currentTime) {
String key = item.getKey();
if (dependResultMap.containsKey(key)) {
return dependResultMap.get(key);
}
return getDependentResultForItem(item, currentTime);
}

getDependResultForItem很简单,跳getDependentResultForItem

DependentExecute.getDependentResultForItem

1
2
3
4
5
6
7
8
9
10
11
12
/**
* get dependent item for one dependent item
*
* @param dependentItem dependent item
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime) {
List<DateInterval> dateIntervals =
DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
return calculateResultForTasks(dependentItem, dateIntervals);
}

核心,重点,单个依赖的判断逻辑。
主要分为两步
1、根据传入的业务日期、依赖周期,计算依赖的DateInterval的集合
2、根据获取到的dateInterval的list,查询单个依赖的状态。

DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue())

这个方法根据传入的业务日期、依赖周期,计算依赖的DateInterval的集合
要理解如何获取,需要理解两个属性:currentTime和dateValue以及返回DateInterval是什么

currentTime 是什么

在调度上是业务时间,业务时间这里就不展开了,懂得都懂。
对应的字段为t_ds_process_instance.schedule_time

dateValue是什么?



上的时间周期,周期可以选择月、周、日、时,对应的期限则可以根据不同的周期选择。

DateInterval 是什么?
1
2
3
4
5
6
/**
* date interval class
*/
public class DateInterval {
private Date startTime;
private Date endTime;

很简单,就两个属性,startTIme和endTime。表示一段时间间隔。
这个会在后面查询工作流实例的时候使用,需要业务时间在表示的时间范围内。

DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue())

了解了业务日期、dateValue,dateInterval后,我们再来回到代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* get date interval list by business date and date value.
*
* @param businessDate business date
* @param dateValue date value
* @return date interval list by business date and date value.
*/
public static List<DateInterval> getDateIntervalList(Date businessDate, String dateValue) {
List<DateInterval> result = new ArrayList<>();
switch (dateValue) {
case "currentHour":
result = DependentDateUtils.getLastHoursInterval(businessDate, 0);
break;
case "last1Hour":
result = DependentDateUtils.getLastHoursInterval(businessDate, 1);
break;
......
case "last3Days":
result = DependentDateUtils.getLastDayInterval(businessDate, 3);
break;
default:
break;
}
return result;

这里因代码太长,就不写完了,比较简单,根据dateValue,跳转不同逻辑获取List

举几个例子
比如 “today”对应的时间周期选择为:天、当天
“last3Days” 对应的为:天,过去三天

其他以此类推,比较简单,这里我们重点来看下”today”、”last3Days”,”thisWeek”,”lastMonday”这四个来了解ds的依赖周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* get today day interval list
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getTodayInterval(Date businessDate) {

List<DateInterval> dateIntervals = new ArrayList<>();

Date beginTime = DateUtils.getStartOfDay(businessDate);
Date endTime = DateUtils.getEndOfDay(businessDate);
dateIntervals.add(new DateInterval(beginTime, endTime));
return dateIntervals;
}

today非常简单,比如传入”2023-07-03”,那么获取到的为”2023-07-03 00:00:00”到”2023-07-03 23:59:59”这个DateInterval.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* get last day interval list
* @param businessDate businessDate
* @param someDay someDay
* @return DateInterval list
*/
public static List<DateInterval> getLastDayInterval(Date businessDate, int someDay) {

List<DateInterval> dateIntervals = new ArrayList<>();
for (int index = someDay; index > 0; index--) {
Date lastDay = DateUtils.getSomeDay(businessDate, -index);

Date beginTime = DateUtils.getStartOfDay(lastDay);
Date endTime = DateUtils.getEndOfDay(lastDay);
dateIntervals.add(new DateInterval(beginTime, endTime));
}
return dateIntervals;
}

last3Days,则是添加了3天。
比如传入的是”2023-07-03”
那么得到的是

  • 2023-07-01 00:00:00 2023-07-01 23:59:59
  • 2023-07-02 00:00:00 2023-07-02 23:59:59
  • 2023-07-03 00:00:00 2023-07-03 23:59:59

这样一个列表。

1
2
3
4
5
6
7
8
9
/**
* get interval between monday to businessDate of this week
* @param businessDate businessDate
* @return DateInterval list
*/
public static List<DateInterval> getThisWeekInterval(Date businessDate) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
return getDateIntervalListBetweenTwoDates(mondayThisWeek, businessDate);
}

thisWeek获取的则是周一到今天截止的列表
比如”2023-07-03”为周一,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59

传入”2023-07-04”为周二,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59
  • 2023-07-04 00:00:00 2023-07-04 23:59:59
    以此类推
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* get interval on the day of last week
* default set monday the first day of week
* @param businessDate businessDate
* @param dayOfWeek monday:1,tuesday:2,wednesday:3,thursday:4,friday:5,saturday:6,sunday:7
* @return DateInterval list
*/
public static List<DateInterval> getLastWeekOneDayInterval(Date businessDate, int dayOfWeek) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
Date sunday = DateUtils.getSomeDay(mondayThisWeek, - 1);
Date monday = DateUtils.getMonday(sunday);
Date destDay = DateUtils.getSomeDay(monday, dayOfWeek - 1);
return getDateIntervalListBetweenTwoDates(destDay, destDay);
}

lastMonday获取的则是本周一一天的数据
比如”2023-07-03”为周一,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59

传入”2023-07-04”为周二,获取到的是

  • 2023-07-03 00:00:00 2023-07-03 23:59:59
    以此类推

看到这里,相信大家其实有了自己的猜测,ds根据依赖时间周期,计算出具体的依赖时间,之后查询依赖时间范围内的任务流实例,查看是否有符合的依此来判断是否完成。

calculateResultForTasks

了解了getDateIntervalList,我们再来看下 calculateResultForTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* calculate dependent result for one dependent item.
*
* @param dependentItem dependent item
* @param dateIntervals date intervals
* @return dateIntervals
*/
private DependResult calculateResultForTasks(DependentItem dependentItem,
List<DateInterval> dateIntervals) {

DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
if (processInstance == null) {
return DependResult.WAITING;
}
// need to check workflow for updates, so get all task and check the task state
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
result = dependResultByProcessInstance(processInstance);
} else {
result = getDependTaskResult(dependentItem.getDepTaskCode(), processInstance);
}
if (result != DependResult.SUCCESS) {
break;
}
}
return result;
}

1、遍历前文获取到的dateIntervals,也就是周期对应的具体业务日期。
2、根据definitionCode【工作流定义id】和dateInterval,查询最后一个工作流实例。
3、根据依赖任务code,在第二部获取到的任务实例中查询对应依赖的运行情况

终于来到了最后,这里具体的代码调整就补贴了,直接说最后对应的sql

findLastProcessInterval最后调用ProcessInstanceMapper.queryLastSchedulerProcess()方法,对应的sql为

1
2
3
4
5
6
7
8
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
<if test="startTime!=null and endTime != null ">
and schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime}
</if>
order by end_time desc limit 1

即根据前文计算的dateInterval,查询schedule_time调度时间在这个周期内的,根据end_tiem倒序。

比如传入”2023-07-03”today,那么运行的则是 scheduler_time>=’2023-07-03 00:00:00’ and scheduler_time<=’2023-07-03 23:59:59’

最后再来看下getDependTaskResult方法,最后核心调用TaskIntanceMapper.findValidTaskListByProcessId方法,对应的sql为

1
2
3
4
5
6
7
select
<include refid="baseSql"/>
from t_ds_task_instance
WHERE process_instance_id = #{processInstanceId}
and flag = #{flag}
order by start_time desc

即在上一步获取到的工作流实例中,查看归属于该工作流的任务实例。

至此,所有的判断逻辑已经梳理完毕

dolphinscheduler 支持周周期任务依赖天任务

了解了ds依赖节点的源码,我们知道了ds对周周期任务依赖天周期任务只支持了上周,回到我们一开始的需求

天调度任务依赖周周期调度。
比如,有一个爬虫任务A,每周一10点运行。
后续任务B,每天运行,但是依赖任务A。

在ds中本周为周期依赖本周一。

因此源码改动非常简单。
1、DependentUtils.getDateIntervalList 增加”thisMonday选项”

1
2
3
4
5
6
public static List<DateInterval> getDateIntervalList(Date businessDate, String dateValue) {
case "thisMonday":
result = DependentDateUtils.getThisWeekInterval(businessDate,1);
break;


2、1、DependentDateUtils增减getThisWeekInterval

1
2
3
4
5
public static List<DateInterval> getThisWeekInterval(Date businessDate, int dayOfWeek) {
Date mondayThisWeek = DateUtils.getMonday(businessDate);
Date destDay = DateUtils.getSomeDay(mondayThisWeek, dayOfWeek - 1);
return getDateIntervalListBetweenTwoDates(destDay, destDay);
}

3、在前端项目中增加选项“本周一”,并传值”thisMonday”

总结

总体来说,dolphinscheduler很优秀,大部分的依赖场景基本都支持了,小部分的通过自己修改源码也可以实现,虽然比不上dataworks等成熟的软件,期待后续更加优秀吧。

引言

本文为本人参考资料及自己日常经验总结的数据倾斜相关的知识,主要是利用输出查漏补缺,加深自己的印象。文章脑图如下

数据倾斜的定义

Data skew refers to a phenomenon in distributed systems where the distribution of data across nodes or partitions is uneven. This can result in some nodes having a much larger amount of data to process than others, leading to unequal distribution of workload, longer processing times and reduced performance.
数据偏斜是指分布式系统中数据在节点或分区间分布不均的现象。这可能导致一些节点处理的数据量比其他节点多得多,从而导致工作负载分配不均,处理时间增加以及性能下降。
– by chatGPT

这是以上chatGPT对于数据倾斜的解释,解释的还是非常到位的。

大数据体系下的分布式系统核心理念是分治。

举例来说,如果使用一台服务器处理3T数据,那么耗时是3小时。

那么将3T数据,拆分成3份,没份1T,交由3台服务器去处理,那么理想情况下耗时为1小时,效率大大提高,这便是分布式。

数据倾斜的原因

shuffle操作

了解数据倾斜的原因前,我们需要了解下shuffle操作,这在mapreduce计算框架及spark中都涉及的操作。
//TODO 完善 shuffle概念

了解了shuffle后,我们可以将数据倾斜的原因归为3类

  • map端倾斜
  • 双端倾斜(join倾斜)
  • reduce段倾斜

map端倾斜

map端倾斜的原因是主要就是上游的文件大小不均匀,存在特别极大的文件。

比如有以下五个文件

文件名 文件大小
file1 100M
file2 100M
file3 100M
file4 100M
file5 1G

假如起的是5个map任务,那么file5对应的那个map任务理论上耗时是其他4个的10倍。

该类的解决方法比较简单。

如果文件是可分割的,那么设定单个文件读入的上限,以便将单个文件切割成多个任务。

或者直接混洗数据,将数据重新分布,方便后续使用。

join倾斜

reduce倾斜

数据倾斜的现象

数据倾斜的解决方案

参考

./change-scala-version.sh 2.12

mvn -Pyarn -Phadoop-3.3 -Dhadoop.version=3.3.1 -Phive -Phive-thriftserver -DskipTests clean package -Dmaven.test.skip=true

mvn -Pyarn -Phadoop-3.3 -Dhadoop.version=3.3.1 -Phive -Phive-thriftserver -DskipTests clean package

mvn -e -X -Pscala-2.12 -Pyarn -Phadoop-3.3 -Dhadoop.version=3.3.1 -Phive -Phive-thriftserver -DskipTests clean package

mvn -e -X -Pscala-2.12 -Pyarn -Phadoop-3 -Dhadoop.version=3.3.1 -Phive -Phive-thriftserver -DskipTests clean package

mvn -pl sql/catalyst -e -X -Pyarn -Phadoop-3.3 -Dhadoop.version=3.3.1 -Phive -Phive-thriftserver -DskipTests clean package

mvn -pl common/tags -e -X -Pyarn -Phadoop-3.3 -Dhadoop.version=3.3.1 -Phive -Phive-thriftserver -DskipTests install

[INFO] Spark Project Parent POM ……………………… SUCCESS [ 16.689 s]
[INFO] Spark Project Tags …………………………… SUCCESS [ 1.551 s]
[INFO] Spark Project Sketch …………………………. SUCCESS [ 1.460 s]
[INFO] Spark Project Local DB ……………………….. SUCCESS [ 2.367 s]
[INFO] Spark Project Networking ……………………… SUCCESS [ 2.333 s]
[INFO] Spark Project Shuffle Streaming Service ………… FAILURE [ 3.062 s]
[INFO] Spark Project Unsafe …………………………. SKIPPED
[INFO] Spark Project Launcher ……………………….. SKIPPED
[INFO] Spark Project Core …………………………… SKIPPED
[INFO] Spark Project ML Local Library ………………… SKIPPED
[INFO] Spark Project GraphX …………………………. SKIPPED
[INFO] Spark Project Streaming ………………………. SKIPPED
[INFO] Spark Project Catalyst ……………………….. SKIPPED
[INFO] Spark Project SQL ……………………………. SKIPPED
[INFO] Spark Project ML Library ……………………… SKIPPED
[INFO] Spark Project Tools ………………………….. SKIPPED
[INFO] Spark Project Hive …………………………… SKIPPED
[INFO] Spark Project REPL …………………………… SKIPPED
[INFO] Spark Project Assembly ……………………….. SKIPPED
[INFO] Kafka 0.10+ Token Provider for Streaming ……….. SKIPPED
[INFO] Spark Integration for Kafka 0.10 ………………. SKIPPED
[INFO] Kafka 0.10+ Source for Structured Streaming …….. SKIPPED
[INFO] Spark Project Examples ……………………….. SKIPPED
[INFO] Ivan Spark Project Examples …………………… SKIPPED
[INFO] Spark Integration for Kafka 0.10 Assembly ………. SKIPPED
[INFO] Spark Avro ………………………………….. SKIPPED

mvn -rf :spark-catalyst_2.12 -e -X -Pyarn -Phadoop-3.3 -Dhadoop.version=3.3.1 -Phive -Phive-thriftserver -DskipTests clean package

[ERROR] mvn -rf :spark-catalyst_2.12

mvn dependency:get -Dartifact=org.eclipse.m2e:lifecycle-mapping:1.0.0:jar:sources

mvn dependency:get -Dartifact=org.apache.maven.plugins:maven-downloader-plugin:1.0:jar:sources

1) org.apache.maven.plugins:maven-downloader-plugin:jar:1.0

[ERROR] 2) org.eclipse.m2e:lifecycle-mapping:jar:1.0.0

mvn -e -X idea:module

引言

最近打算给自己的量化交易实现全自动,交易的api本人是通过easytrader这个开源项目实现的,这个项目通过自动化控制客户端进而实现交易的程序控制。

但比较可以的是,easytrader在21年3月14日后这个开源软件就不再更新了,而且本人的券商账户使用的是东方财富,该项目尚未支持。

作为一个程序员,项目不支持,当然是自己实现了。由于东方财富支持web端交易,因此可以通过selenium 实现web端的自动化,其他关于下单、撤单、持仓、当日成交等自动化本人均已顺利实现,然而东方财富的登录有验证码,自动登录暂时遇到困难。

东方财富的验证码样例如下:

由于数字倾斜和横线干扰,直接使用tesseract进行识别准确率低。

本文就是记录解决这一问题的方案及过程,烂笔头胜过好记性吗。

解决方案

问题已经抛出,现在轮到说解决方案了。
验证码破解,可以说是爬虫技术中比较平常遇到的问题了,上网搜了下,个人理解的解决方案有两种

  1. 找到第三方现成的识别技术进行识别
  2. 自己训练识别

方案的比较

第三方现成技术

关于第三方现成的识别,其识别技术叫做OCR (Optical Character Recognition) 是一种技术,用于从图像或扫描的文档中识别文本。它通过分析图像中的字符形状和比例,将其转换为可编辑的电子文本。

这里本人不是此方向的,感兴趣的可以自己搜索,常见的比如百度的通用文字识别

第三方现成技术其有点自然是省心省力,直接调用api即可。
缺点吗,费钱,同时只支持通用场景,如果是非常特殊的可能无法支持。

自己训练识别

这里,本人直接使用的是tesseract识别验证码,而tesseract自带模型训练,同时还有jTessBoxEditor辅助训练工作,因此就直接使用了该模型训练。

至于其他的模型训练,大家可以自行搜索。

自己训练的有点自然是定制化,可以识别各种场景,准确率高。
缺点则是耗时,毕竟要自行训练,自己准备数据,同时对于技术存在要求,新手小白可能无法实现。

比较结论

作为一穷二白的程序员,咱除了没钱,时间有的是,生命在于折腾,因此本人选择自己训练。

具体操作

好了,来到了具体操作环节,模型训练的基本流程为:
环境准备–>测试数据准备–>模型训练–>模型使用。

简单来说就是使用tesseract自带的模型训练,同时使用jTessBoxEditor可视化调整错误数据训练验证码识别,最后进行识别,下面就开始。

工具基础介绍

tesseract

Tesseract 是一个开源的OCR(光学字符识别)引擎,用于从图像和PDF文件中识别文本。它是Google开发的,是目前功能最强大的OCR引擎之一,支持多种语言。它可以运行在多种操作系统上,并且可以被集成到各种应用程序中,以实现文本识别功能。

详细信息大家可以上Tesseract的github的readme进行查看

jTessBoxEditor

jTessBoxEditor 是一个开源的OCR工具,用于创建和编辑Tesseract字符识别引擎的语言训练数据。它允许用户创建语言模型,以便Tesseract引擎能够识别特定语言的文本。它还提供了一种方便的图形界面,可以编辑语言模型的数据,如字符定义和字符图像。

详细信息可以上jTessBoxEditor的github进行查看

环境准备

  • python 3.7.9
  • tesseract 5.3.0
    jTessBoxEdit自带了tesseract,直接使用自带的,否则可能存在不兼容异常的情况。
  • jTessBoxEditor 2.4.0
    下载地址: https://github.com/nguyenq/jTessBoxEditor/releases
    环境变量啥的大家自己配置

测试数据准备

验证码下载

本次的目标是东方财富的登录验证码,测试数据咱也不需要多,100张即可,因此也就不自动化了,直接登录网站 https://jywg.18.cn/ 手动下载100张验证码。

本人存入的目录为org_imgs,如下图。

验证码的降噪处理

验证码的颜色,干扰线等特征会增加训练复杂度,因此需要将这些特征去除

本人只是简单的将颜色去除,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from PIL import Image
import os
path = os.path.dirname(__file__)
origin_path = path + '/captcha/eastmoney/org_imgs/'
new_path = path + '/captcha/eastmoney/clean_imgs/' #用来存放处理好的图片

#从100张图片中提取出字符样本
for image in os.listdir(origin_path)[:100]:

im = Image.open(origin_path+image)
#根据颜色,提取出每一个字符,重新放置到一个新建的白色背景image对象上。每个image只放一个字符。
char_dict = {}
im2 = Image.new('RGB', im.size, (255, 255, 255))
for x in range(im.size[0]): # 宽
for y in range(im.size[1]): # 高
point_color=im.getpixel((x, y)) # 获取rgb 像素
color_sum=point_color[0]+point_color[1]+point_color[2] # 计算总像素,255,255,255为白色,即背景色,合越小越深
if color_sum<=600: # 简单粗暴,小于600的认为是数字的言责
im2.putpixel((x, y), (0, 0, 0)) # 重新写入黑色。
else:
im2.putpixel((x, y), (255, 255, 255))
im2.save(new_path+ image.replace('jpg','tif')) # 直接将jpg转化为为tif
print('成功处理图片{}'.format(image))

处理好的图片效果如下所示。

合并数据准备

  1. 打开jTessBoxEditor,双击train.bat即可,界面如下:

  2. 选择【tools】->【merge tiff】,打开合并成tiff界面,文件类型根据自己图片类型选择,本人为tif,文件按住shift合并选择或crl健多个选择,选择需要合并的原始图片,本人为所有。

  3. 点击打开,进入保存选项,文件名有具体格式,具体为[lang].[fontname].exp[num].tif,其中

  • lang表示语言名称;指的是使用tesseract指定的语言,由于我们的语言是自己训练的,因此命名需要我们取。
  • fontname表示字体名称;
  • num表示序号

本人的保存文件命名为
ivan.eastmoney.exp0.tif

点击保存即可,此时即可生成 ivan.eastmoeny.exp0.tif。

  1. 生成tif图片的box文件。
    输入如下命令,生成tif对应的box文件。
    1
    tesseract ivan.eastmoeny.exp0.tif ivan.eastmoeny.exp0 batch.nochop makebox
    该命令会生成box文件,本人生成的文件为 ivan.eastmoeny.exp0.box
    box文件和tif是一对的,可以简单理解成标注了tif图片位置和解析结果的文件,命名也需要一致,且在同一个文件夹下。

命令的结果如下

enptyPage 表示没有解析结果,这是因为tesseract默认使用的psm为3,表示的是全自动页面分割,但没有 OSD。

这里没有结果方便后面我们重构box位置,如果想要有结果,可以加上 –psm 6 参数,解析文本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Page segmentation modes:
0 Orientation and script detection (OSD) only.
1 Automatic page segmentation with OSD.
2 Automatic page segmentation, but no OSD, or OCR. (not implemented)
3 Fully automatic page segmentation, but no OSD. (Default)
4 Assume a single column of text of variable sizes.
5 Assume a single uniform block of vertically aligned text.
6 Assume a single uniform block of text.
7 Treat the image as a single text line.
8 Treat the image as a single word.
9 Treat the image as a single word in a circle.
10 Treat the image as a single character.
11 Sparse text. Find as much text as possible in no particular order.
12 Sparse text with OSD.
13 Raw line. Treat the image as a single text line,
bypassing hacks that are Tesseract-specific.

测试数据调整

打开之前的jTessBoxEditor软件,选择【box editor】->【open】,然后选择直接合并好的tif文件。

打开后的一个图片如下所示,此时完全没有识别,同时可以看到一共有98个图片。

我们点击insert插入四个box。

然后通过选中左边的1,2,3,4编号box,再通过右边的x,y调整box将box覆盖到我们需要识别的数字上,最后在char中填入正确的结果,选择完成后点击保存按钮。

之后重新打开tif文件,此时点击下一页,会发现box已经选好,只需要填入正确结果即可,此时会有一些工作量,将98个图片正确结果都填入,最后保存。

这样,我们的测试数据就准备好了。

模型训练

  1. 生成lstmf文件
    这一步,我们通过TIF图像文件和box盒子文件生成进行LSTM训练所需的lstmf文件,使用到的命令如下所示:

    1
    tesseract ivan.eastmoney.exp0.tif ivan.eastmoney -l eng --psm 6 lstm.train

    运行结果如下

    运行之后,我们的文件夹下会生成一个名为ivan.eastmoney.exp0.lstmf的文件。

  2. 提取语言的LSTM文件

  3. 我们接着从tesseract_best(链接:https://github.com/tesseract-ocr/tessdata_best)下载相应语言的traineddata文件。
    在前面几步,我们选用的语言是英文,所以在这里选择eng.traineddata文件。
    下载好之后,我们需要从中提取中它的LSTM文件,使用的命令如下所示:

    1
    combine_tessdata -e eng.traineddata eng.lstm

    运行上述命令,我们的文件夹下会生成一个名为eng.lstm的文件。

  4. 新增 eng.training_files.txt文件。
    新建eng.training_files.txt的文本文件,在里面填入第1步生成的lstmf文件的绝对路径。

  5. 训练
    在完成了上述步骤之后,我们基本上可以开始LSTM的训练了。使用下面的命令就可以开始训练了:

    1
    2
    3
    4
    5
    6
    7
    lstmtraining \
    --model_output="output\output" \
    --continue_from="eng.lstm" \
    --train_listfile="eng.training_files.txt" \
    --traineddata="eng.traineddata" \
    --debug_interval -1 \
    --max_iterations 4000

    各个参数具体的含义,可以参考Tesseract官方对于如何进行训练的说明(链接:https://tesseract-ocr.github.io/tessdoc/)

    运行训练不到10分钟就完成了(具体的训练时间要视训练集的大小和训练次数决定)。待Tesseract训练完成之后,在output文件夹下会有很多checkpoint记录文件。

  6. 生成 traindata
    们接着使用命令把这些文件和之前的eng.traineddata合成为新的traineddata文件,使用命令如下:

    1
    2
    3
    4
    5
    lstmtraining \
    --stop_training \
    --continue_from="output\output_checkpoint" \
    --traineddata="eng.traineddata" \
    --model_output="output\ivan.traineddata"


    这一步过后,就会生成我们梦寐以求的triandata.

模型使用。

将生成的traindata文件,本人的为ivan.traineddata放到tesseract安装目录下的tessdata目录下

重新下载一个验证码进行测试,本次的验证码如下

之后使用命令进行验证

1
tessseract --psm 6 yzm_test.jpg stdout -l ivan

其中

  • –psm 6 表示 tesseract以文本块进行解析
  • stdout 表示在控制台输出
  • -l ivan 表示解析的语言为ivan,即之前放入tessdata虾的ivan.traineddata文件

最终结果如下

可以看到,结果正确

遇到问题

Class->NumConfigs == this->fontset_table_.at(Class->font_set_id).size():Error:Assert failed:in file

这个问题查了半天也没搞定,最后将tessract换成jTessBoxEditor自带的tesseract就可以了,估计是啥兼容的问题。

使用 box.train 无效

网上很多教程是使用box.train的,如这篇用jTessBoxEditor训练tesseract模型,本人测试下来无效,估计是训练方式的问题,反正使用lstm训练有效,就不纠结了。

总结

前前后后花了整整3天搞定了验证码识别,从一开始什么都不知道,到后面不断查看文章,操作,查看文档,到最后对tesseract,jTessBoxEdotr是什么及如何操作有所了解,感谢互联网,感谢开源软件,让我这么一个小白都可以搞定验证码的识别。

当人,本人只是从应用层面有所了解,对tesseract整体的设计、源码、lstm模型了解是不深入的,此处就不卷了,能用就行了。

总的来说,使用tesseract训练数据定制化识别一些简单的文字及数字是一个可能及方便的方案。

参考资料

真实场景下的tesseract神经网络训练识别图片验证码

问题

之前本人一直认为多事务事实表和累积快照事实表差不多,甚至是一样的,最近再次读了<**大数据之路**>这本书,发现是本人肤浅了,是本人以前对事实表的理解不到位。

为什么会觉得多事务事实表和累积快照事实表差不多?

首先问下,觉得这两个表差不多,合理吗?丢人吗?
我的回答是,非常的合理。
因为从一些特征上来说,这两个表非常相似。
多事务事实表和累积快照事实表有以下相同的特点

  • 包含多事务,因此包含了多个业务时间,业务度量
  • 事实表的粒度选择,两者往往相同
  • 事实表的度量选择,累积快照基本会包含多事务事实表。
  • 从表结构上来看,两者甚至可以是一样的。

以书上的例子,最典型的交易,包含下单->支付->确认收货时间这三个节点来说。

多事务事实表和累积快照事实表设计的表结构是一模一样的,如下所示

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实

连表结构都一样了,觉得这两个表没啥区别也就没啥好奇怪的了。

那么,多事务事实表和累积快照事实表的区别到底是什么?

答案揭晓,最核心的一点是
事务事实表记录事务发生时的状态,对于实体的某一实例不再更新
累积事实表则对实体的某一实例定期更新
简单来说,就是多事务事实表只会新增记录,不会修改记录,而累积事实表则是修改记录。

举个例子

还是以上文的交易,包含下单->支付->确认收货时间这三个节点来说。
表结构如下

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实

假设有一笔订单,1号下单,2号支付,3号确认收货。

1号下单

此时,多事务事实表会生成已一条记录,如下所示

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实
2023-01-01 order1 2023-01-01 13:41:41 NULL NULL

累积快照事实表也会生成一条记录,如下所示

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实
2023-01-01 order1 2023-01-01 13:41:41 NULL NULL

似乎没啥区别,不要急,继续往下看

2号支付

此时,多事务事实表会再生成已一条记录,order1总计会变成2条记录,如下所示

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实
2023-01-01 order1 2023-01-01 13:41:41 NULL NULL
2023-01-02 order1 2023-01-01 13:41:41 2023-01-02 09:15:15 NULL

累积快照事实表则不会生成,而是在原先的记录上修改,如下所示

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实
2023-01-02 order1 2023-01-01 13:41:41 2023-01-02 09:15:15 NULL

好像有那么点区别了,再往下看

3号确认收货

此时,多事务事实表会再生成已一条记录,order1总计会变成3条记录,如下所示

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实
2023-01-01 order1 2023-01-01 13:41:41 NULL NULL
2023-01-02 order1 2023-01-01 13:41:41 2023-01-02 09:15:15 NULL
2023-01-03 order1 2023-01-01 13:41:41 2023-01-02 09:15:15 2023-01-03 15:15:15

累积快照事实表则继续在原先的记录上修改,不会生成记录,如下所示

日期 子订单id 下单时间 支付时间 确认收货时间 相关事实
2023-01-03 order1 2023-01-01 13:41:41 2023-01-02 09:15:15 2023-01-03 15:15:15

总结

多事务事实表和累积快照事实表都包含多个业务过程

从产生角度来说
多事务事实表的选择,往往是因为多业务过程之间有高度相似性,同时拥有相同的粒度和维度,从成本和下游使用易用性考虑从而建设,关注的仍然是事务的本身。
而累计快照事实表的选择,往往研究的是事件之间的时间间隔。

从表现特点来说
事务事实表记录事务发生时的状态,对于实体的某一实例不再更新
累积事实表则对实体的某一实例定期更新

背景

处于降本增效的良好目标,最近公司在将数仓平台从自建的cdh底层往阿里云上的dataworks迁。

在迁移sql脚本时,踩了很多坑,发现阿里云的maxcomputer的sql语法和hive及spark的有很多区别,本文用于汇总所有踩坑记录

硬件&版本

  • maxcomputer
    2.0引擎,hive兼容模式
  • spark sql
    spark 2.3
  • hive sql
    hive 1.2

阿里云中关于maxcomputer和hive的语法差异文档

参考 与其他SQL语法的差异

支持标准差异

hive

在hive的官方文档Home中,对于sql的支持标准如下。

Hive provides standard SQL functionality, including many of the later SQL:2003, SQL:2011, and SQL:2016 features for analytics.

Hive 提供标准 SQL 功能,包括许多后来的 SQL:2003、SQL:2011 和 SQL:2016 分析功能。

该支持的标准sql功能都支持了,还支持了很多2003、2011、2016中的分析功能

maxcomputer

在阿里云官方文档SQL概述中,有这样一段话

MaxCompute SQL采用的是类似于SQL的语法。它的语法是标准语法ANSI SQL92的一个子集,并有自己的扩展。

虽然说的比较委婉,意思就是标准语法大部分我都支持了,但是有一小部分我魔改了,另外实现了很多独创的牛逼功能。

总结

  • spark sql支持的标准文档暂未找到
  • 从规范性上来说,hive更胜一筹

1、having 差异

差异点

spark sql 支持窗口函数后带having

hive和maxcomputer 的having语法不支持,只支持 在 group 和 distinct 后使用

举例

1
2
3
4
5
6
with tmp as (
select 1 as col
)
select count(col) over(partition by 1) as col_cnt
from tmp
having col_cnt>=1

以上sql在spark sql中可以运行

在hive中会得到以下错误:

在maxcomputer中会提示错误,错误如下

1
FAILED: ODPS-0130071:[4,8] Semantic analysis exception - window function cannot be used in HAVING clause

解决方案

在语句中使用子查询,将having替换为where

1
2
3
4
5
6
7
8
9
with tmp as (
select 1 as col
)
select *
from
(select count(col) over(partition by 1) as col_cnt
from tmp
) a
where col_cnt>=1

2、maxcomputer cross join 超过一定条数后,依然会提示笛卡尔积风险

差异点

spark sql,hive可以使用 cross join语法来表示笛卡尔积关联

maxcomputer 的cross join,在条数超过一定数据量后,会提示笛卡尔积风险

举例

1
2
3
4
5
6
7
8
9
10
with a as (
select 'abc' as a
)
,b as (
select 'bdd' as b
)
select a.*
,b.*
from a
cross join b

以上sql在hive和spark中都可以正常运行
但是在maxcomputer中会提示错误,错误如下

1
FAILED: ODPS-0130252:[10,1] Cartesian product is not allowed - cartesian product is not allowed without mapjoin

意思就是笛卡尔积在没有指定mapjoin的场景下不被允许。
在阿里云官方文档JOIN也有如下一段话告知不支持cross join

这里就不知道阿里云是出于性能还是其他考虑了,违反了sql标准不支持cross join,道理上来说cross join就是显示申明笛卡尔积关联,完全可以支持。

替换方案

参考官方文档,有两种方案
1、对于小表,可以使用sql MAPJOIN HINT语法申明mapjoin,实现cross join

1
2
3
4
5
6
7
8
9
10
11
with a as (
select 'abc' as a
)
,b as (
select 'bdd' as b
)
select /*+ mapjoin(b) */
a.*
,b.*
from a
cross join b

2、如果数据量较大,则只能显示的在查询子表中申明常量列,再使用cross join 和on实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
with a as (
select 'abc' as a
)
,b as (
select 'bdd' as b
)
select
a.*
,b.*
from
(select *
,1 as col
from a
)a
cross join
(select *
,1 as col
from b
)b
on a.col=b.col

3、不等值join 差异

差异点

  1. spark 支持不等值join语法
  2. hive 2.2.0版本之前不支持不等值语法
  3. 2.2.0及以后支持不等值join语法
  4. maxcomputer不支持不等值语法

举例

测试sql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
with table_a as (
select 1 as id_a
,'testa' as value_a
)
,table_b as (
select 2 as id_b
,'testb' as value_b
)
select table_a.id_a
,table_a.value_a
,table_b.id_b
,table_b.value_b
from table_a
left join table_b
on table_a.id_a<table_b.id_b

sql说明
该sql准备了两张表table_a和table_b用于连接测试

使用left join on语法,但是关联关系使用的是 < 不等值关联符号

maxcomputer会报异常:

1
FAILED: ODPS-0130071:[15,4] Semantic analysis exception - expect equality expression (i.e., only use '=' and 'AND') for join condition without mapjoin hint

提示的是期望join的是等值表达式

hive1.2.1运行结果

1
Error while compiling statement: FAILED: SemanticException [Error 10017]: line 15:3 Both left and right aliases encountered in JOIN 'id_b'

提示的是在join中遇到左右别名

不得不说,hive的错误信息有点云里雾里,其实就是不等值join造成的。

hive2.2.3运行结果

hive 2.2.0+版本顺利得到正确结果

spark运行结果

spark2.3也顺利得到结果

替换方案

针对不等值join的替换方案有两种

1、针对小表,使用mapjoin,避免join操作

针对小表,使用mapjoin,避免join操作
maxcomputer中的mapjoin hint语法为: /*+ mapjoin(<table_name>) */ ,详情请查看mapjoin hint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
with table_a as (
select 1 as id_a
,'testa' as value_a
)
,table_b as (
select 2 as id_b
,'testb' as value_b
)
select /*+ mapjoin(table_b) */
table_a.id_a
,table_a.value_a
,table_b.id_b
,table_b.value_b
from table_a
left join table_b
on table_a.id_a<table_b.id_b

可以看到,使用mapjoin hint语法后,sql在maxcomputer中运行正确,顺利拿到了预期结果

由于mapjoin避免shuffle,性能较好,再可以的情况下,优先使用方案1

2、将on的不等值关联语句放入where语句中

inner join的实现方式较为简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
with table_a as (
select 1 as id_a
,'testa' as value_a
,1 as join_col
)
,table_b as (
select 2 as id_b
,'testb' as value_b
,1 as join_col
)
select
table_a.id_a
,table_a.value_a
,table_b.id_b
,table_b.value_b
from table_a
left join table_b
on table_a.join_col=table_b.join_col
where table_a.id_a<table_b.id_b

可以看到,将<判断语句放入where后,sql在maxcomputer运行正确,顺利拿到了预期结果

left join的实现方式非常复杂,不到万不得已不建议使用此方案,建议优先使用 map join hint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
with table_a as (
select 1 as id_a
,'testa' as value_a
,1 as join_col
)
,table_b as (
select 2 as id_b
,'testb' as value_b
,1 as join_col
)
-- 能关联上的部分
,join_part as (
select
table_a.id_a
,table_a.value_a
,table_b.id_b
,table_b.value_b
from table_a
inner join table_b
on table_a.join_col=table_b.join_col
where table_a.id_a<table_b.id_b
)
-- 以自己为主表,left join能关联上的部分,实现 left join不等值效果
select table_a.id_a
,table_a.value_a
,join_part.id_b
,join_part.value_b
from table_a
left join join_part
on table_a.id_a=join_part.id_a

4、array_contains 差异

差异点

spark的array_contains支持类型的隐式转换

hive和maxcomputer array_contains不支持,只支持同类型使用

举例

测试sql

1
select array_contains(split("1,2,3,4",","),1)

sql说明

该sql首先使用split一个字符串获取一个array对象用于测试,之后使用array_contains函数进行判断

split后的array对象为一个string数组,而判断被包含的数字【1】为一个int 对象

maxcomputer运行结果

maxcomputer会报异常:

1
FAILED: ODPS-0130071:[1,44] Semantic analysis exception - invalid type INT of argument 2 for function array_contains, expect STRING, implicit conversion is not allowed 

提示的是array_contains第二个参数期望的是string,但是传入的是int,隐式类型转换不支持

hive运行结果


hive会报错:

1
Error while compiling statement: FAILED: SemanticException [Error 10016]: line 1:43 Argument type mismatch '1': "string" expected at function ARRAY_CONTAINS, but "int" is found  

提示的是array_contains函数期望的是string,但是传入的是int,类型不匹配

spark运行结果

spark能顺利产出结果,结果为true,那么为什么spark可以成功呢?

大概率是spark智能的将1从int转换为了string类型,使得类型得以匹配,通过explain查看物理执行计划来验证

在上图标红的地方可以看到,spark在物理执行计划层面,将int的1隐式的转换为了string类型,验证了我们一开始的猜想。

替换方案

既然知道了在hive和maxcomputer中是类型不匹配导致的array_contains函数报错,那么只需要显示的将类型进行转换即可

1
select array_contains(split("1,2,3,4",","),cast(1 as string))

5、 concat_ws 差异

差异点

spark的concat_ws会支持类型的隐式转换

hive和maxcomputer concat_ws不支持,只支持同类型使用

举例

测试sql

1
select concat_ws(",",array(1,2,3))

sql说明

该sql首先使用array函数构建一个array<int>对象用于测试,之后使用concat_ws函数进行array的拼接

spark 运行结果正确

spark为何可以运行正确?是优化器做得好,有隐式转换?

通过查看物理执行计划,并非如此,那么大概率是spark的concat_ws函数做了处理。

先来看一下concat_ws的源码。

1
2
3
4
5
6
7
8
9
10
11
/**
* Concatenates multiple input string columns together into a single string column,
* using the given separator.
*
* @group string_funcs
* @since 1.5.0
*/
@scala.annotation.varargs
def concat_ws(sep: String, exprs: Column*): Column = withExpr {
ConcatWs(Literal.create(sep, StringType) +: exprs.map(_.expr))
}

自spark1.5版本开始就支持concat_ws函数了。
可以看到传入的array<int> 在代码里被抽象成了Column[]数组。
该函数通过ConcatWs类处理传入Column[]的每条记录,再通过Literal.create创建回数据返回

接着再来看ConcatWs类

1
2
3
4
5
6
7
8
9
10
override def eval(input: InternalRow): Any = {
val flatInputs = children.flatMap { child =>
child.eval(input) match {
case s: UTF8String => Iterator(s)
case arr: ArrayData => arr.toArray[UTF8String](StringType)
case null => Iterator(null.asInstanceOf[UTF8String])
}
}
UTF8String.concatWs(flatInputs.head, flatInputs.tail : _*)
}

可以看到,当传入的是array时,走的是 case arr: ArrayData => arr.toArrayUTF8String ,将原本的array<>转换为array

toArray方法如下:

1
2
3
4
5
6
7
8
9
10
11
def toArray[T: ClassTag](elementType: DataType): Array[T] = {
val size = numElements()
val accessor = InternalRow.getAccessor(elementType)
val values = new Array[T](size)
var i = 0
while (i < size) {
values(i) = accessor(this, i).asInstanceOf[T]
i += 1
}
values
}

将数据转换为传入类型的array,而传入类型为StringType,这就是spark的concat_ws函数能处理array<int>的原因,函数内部将array<int>转换为了array<string>进行处理。

maxcomputer运行结果


maxcomputer会报异常:

1
FAILED: ODPS-0130121:[1,22] Invalid argument type - invalid type ARRAY<INT> of argument 2 for function concat_ws, expect ARRAY<STRING>

提示的是concat_ws只支持ARRAY<STRING>格式,不支持array<int>,这个在阿里云的官方文档函数CONCAT_WS中有说明

hive运行结果


hive会报错:

1
[42000][10016] Error while compiling statement: FAILED: SemanticException [Error 10016]: Line 1:21 Argument type mismatch '3': Argument 2 of function CONCAT_WS must be "string or array<string>", but "array<int>" was found.

提示的是concat_ws传入的array必须是array<string>,不能是其他类型

替换方案

maxcomputer 替换方案

有两种,一种为使用cast函数,将array<int> 显示转换为array,然后使用 concat_ws函数

1
select array_contains(split("1,2,3,4",","),cast(1 as string))

一种为使用阿里提供的增强函数:array_join函数

1
select array_join(array(1,2,3),",")

本人推荐使用第二种

hive 替换方案

当array类型不为string时,目前hive没有函数可以支持此类需求,一个较复杂的方式为将array 数据 explode 列转行后,再讲每行数据的int转为string,再使用collect_list行转列,这样就得到了array,可以使用concat_ws函数了。

1
2
3
4
5
6
7
8
select concat_ws(',',collect_list(col_str)) as rs --行转列,并使用concat_ws
from
(select cast(col as string) as col_str -- int 转为 string
from
(select explode(array(1,2,3)) as col -- 列转行
) t1
) a

上述方案太绕,万不得已的情况下,也只能如此了。
另一种方法,可以使用自定义udf解决。

前言

我们知道,namenode作为hdfs的元数据管理节点,其将所有的元数据都存储在fsimage中,本文结合实际目录文件,了解namenode的元数据到底是什么

fsimage元数据存储在哪?

namenode的image存储目录在hdfs-site.xml中进行配置,配置参数为

1
2
dfs.namenode.name.dir
默认值: file://${hadoop.tmp.dir}/dfs/name

其中hadoop.tmp.dir的配置在core-site.xml中进行配置,默认值为

1
/tmp/hadoop-${user.name}

也就是说如果你不就想配置,namenode的默认目录为

1
/tmp/hadoop-${user.name}/dfs/name

而seconarynamneode,用于做checkpoint合并image和editslog,他的工作配置目录为

1
2
dfs.namenode.checkpoint.dir
默认值: file://${hadoop.tmp.dir}/dfs/namesecondary

这些对应的配置功能可以在官方默认配置文档中找到

元数据文件有哪些?

进入到fsimage目录下,通过ls -l命令查看,如下图所示

可以看到,文件分为以下6类

  1. edits_0000xxxx-0000xxxx
    此类文件为已完结历史edits日志文件,这类文件的命名规则为edits_19位txid,未满19位的txid用0补齐
  2. edits_inprogress_0000xxxx
    该文件为当前正在写入的edits日志文件,规则和edits类似,只是中间多了ingrogress表示正在写入
  3. fsimage_0000xxx
    该文件为镜像文件,命名规则为fsimage_19位txid,其中txid表示该镜像合并到的最新edits日志。
    镜像文件一般为2个。
  4. fsiamge_000xxx.md5
    该文件为镜像文件对应的md5码,用于校验镜像文件是否一致。
  5. seen_txid
    该文件存储了最新的txid,新的edits日志文件命名都在此txid上+1
  6. VERSION
    该文件存储的是namenode的一些版本信息

元数据的文件内容长什么样?

日志文件edits_0000xxxx-0000xxxx

直接通过cat或vi命令查看日志文件的话,会发现edits文件有一堆乱码

想要查看edits文件,需要使用到 hdfs oev命令,该命令为hadoop官方提供的解析工具

常用的一个命令为

1
2
3
hdfs oev -i edits_0000000000000157832-0000000000000158267 -o test_edits.xml
# -i 为输入edits文件地址
# -o 为输出的xml地址

此时,我们再来看已经被解析为xml的test_edits.xml文件,就可以看到如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<EDITS>
<EDITS_VERSION>-66</EDITS_VERSION>
<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
<DATA>
<TXID>157832</TXID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_MKDIR</OPCODE>
<DATA>
<TXID>157833</TXID>
<LENGTH>0</LENGTH>
<INODEID>42861</INODEID>
<PATH>/tmp/hive/hdfs/76876549-32f3-4f95-90e4-54e8846432d4</PATH>
<TIMESTAMP>1663027209019</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>hdfs</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>448</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_MKDIR</OPCODE>
<DATA>
<TXID>157834</TXID>
<LENGTH>0</LENGTH>
<INODEID>42862</INODEID>
<PATH>/tmp/hive/hdfs/76876549-32f3-4f95-90e4-54e8846432d4/_tmp_space.db</PATH>
<TIMESTAMP>1663027209046</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>hdfs</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>448</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_DELETE</OPCODE>
<DATA>
<TXID>157835</TXID>
<LENGTH>0</LENGTH>
<PATH>/user/hive/warehouse/stock.db/finance_indicator</PATH>
<TIMESTAMP>1663027213944</TIMESTAMP>
<RPC_CLIENTID>fa9f4009-c253-4109-a73e-6dfae064020f</RPC_CLIENTID>
<RPC_CALLID>14</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_MKDIR</OPCODE>
<DATA>
<TXID>157836</TXID>
<LENGTH>0</LENGTH>
<INODEID>42863</INODEID>
<PATH>/user/hive/warehouse/stock.db/finance_indicator</PATH>
<TIMESTAMP>1663027213948</TIMESTAMP>
<PERMISSION_STATUS>
<USERNAME>hdfs</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_ACL</OPCODE>
<DATA>
<TXID>157837</TXID>
<SRC>/user/hive/warehouse/stock.db/finance_indicator</SRC>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>USER</TYPE>
<PERM>rwx</PERM>
</ENTRY>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>GROUP</TYPE>
<PERM>r-x</PERM>
</ENTRY>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
<TYPE>OTHER</TYPE>
<PERM>r-x</PERM>
</ENTRY>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_DELETE</OPCODE>
<DATA>
<TXID>157838</TXID>
<LENGTH>0</LENGTH>
<PATH>/user/hive/warehouse/stock.db/stock_price_none</PATH>
<TIMESTAMP>1663027214022</TIMESTAMP>
<RPC_CLIENTID>fa9f4009-c253-4109-a73e-6dfae064020f</RPC_CLIENTID>
<RPC_CALLID>22</RPC_CALLID>
</DATA>
</RECORD>

可以看到,edits日志是由一条一条record组成的,record的oprecode一共有53种,具体可以查看org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes 这个枚举类,这里就不再展开了

具体的操作内容,则可以查看 org.apache.hadoop.hdfs.server.namenode.FSEditLogOp,这个类里的静态子类封装了edits的各种操作抽象,xml种对应的属性在里面都可以找到,这里也不再展开。

edits_inprogress_0000xxxx

文件内容同 日志文件edits_0000xxxx-0000xxxx

fsimage_0000xxx

直接通过cat或vi命令查看image镜像文件的话,会发现image文件有一堆乱码

想要查看image文件,需要使用到 hdfs oiv命令,该命令为hadoop官方提供的镜像解析工具

常用的一个命令为

1
2
3
4
hdfs oiv -i fsimage_0000000000000161098 -o test_image.xml -p XML
# -i 为输入image文件地址
# -o 为输出的xml地址
# -p 表示使用什么格式进行处理

解析结束后,我们再来看已经被解析为xml的test_image.xml文件,就可以看到如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
<?xml version="1.0"?>
<fsimage>
<version>
<layoutVersion>-66</layoutVersion>
<onDiskVersion>1</onDiskVersion>
<oivRevision>a3b9c37a397ad4188041dd80621bdeefc46885f2</oivRevision>
</version>
<NameSection>
<namespaceId>1614349331</namespaceId>
<genstampV1>1000</genstampV1>
<genstampV2>13399</genstampV2>
<genstampV1Limit>0</genstampV1Limit>
<lastAllocatedBlockId>1073754220</lastAllocatedBlockId>
<txid>161098</txid>
</NameSection>
<ErasureCodingSection>
<erasureCodingPolicy>
<policyId>1</policyId>
<policyName>RS-6-3-1024k</policyName>
<cellSize>1048576</cellSize>
<policyState>DISABLED</policyState>
<ecSchema>
<codecName>rs</codecName>
<dataUnits>6</dataUnits>
<parityUnits>3</parityUnits>
</ecSchema>
</erasureCodingPolicy>
<erasureCodingPolicy>
<policyId>2</policyId>
<policyName>RS-3-2-1024k</policyName>
<cellSize>1048576</cellSize>
<policyState>DISABLED</policyState>
<ecSchema>
<codecName>rs</codecName>
<dataUnits>3</dataUnits>
<parityUnits>2</parityUnits>
</ecSchema>
</erasureCodingPolicy>
<erasureCodingPolicy>
<policyId>3</policyId>
<policyName>RS-LEGACY-6-3-1024k</policyName>
<cellSize>1048576</cellSize>
<policyState>DISABLED</policyState>
<ecSchema>
<codecName>rs-legacy</codecName>
<dataUnits>6</dataUnits>
<parityUnits>3</parityUnits>
</ecSchema>
</erasureCodingPolicy>
<erasureCodingPolicy>
<policyId>4</policyId>
<policyName>XOR-2-1-1024k</policyName>
<cellSize>1048576</cellSize>
<policyState>DISABLED</policyState>
<ecSchema>
<codecName>xor</codecName>
<dataUnits>2</dataUnits>
<parityUnits>1</parityUnits>
</ecSchema>
</erasureCodingPolicy>
<erasureCodingPolicy>
<policyId>5</policyId>
<policyName>RS-10-4-1024k</policyName>
<cellSize>1048576</cellSize>
<policyState>DISABLED</policyState>
<ecSchema>
<codecName>rs</codecName>
<dataUnits>10</dataUnits>
<parityUnits>4</parityUnits>
</ecSchema>
</erasureCodingPolicy>
</ErasureCodingSection>
<INodeSection>
<lastInodeId>43596</lastInodeId>
<numInodes>1572</numInodes>
<inode>
<id>16385</id>
<type>DIRECTORY</type>
<name></name>
<mtime>1659683329779</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>9223372036854775807</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16386</id>
<type>DIRECTORY</type>
<name>tmp</name>
<mtime>1659770965040</mtime>
<permission>hdfs:supergroup:0733</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16387</id>
<type>DIRECTORY</type>
<name>hive</name>
<mtime>1659772693233</mtime>
<permission>hdfs:supergroup:0733</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16388</id>
<type>DIRECTORY</type>
<name>hdfs</name>
<mtime>1663029189271</mtime>
<permission>hdfs:supergroup:0700</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16391</id>
<type>DIRECTORY</type>
<name>hive</name>
<mtime>1662261600810</mtime>
<permission>hive:supergroup:0700</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16396</id>
<type>DIRECTORY</type>
<name>user</name>
<mtime>1659683329779</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16397</id>
<type>DIRECTORY</type>
<name>hive</name>
<mtime>1659683329779</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16398</id>
<type>DIRECTORY</type>
<name>warehouse</name>
<mtime>1660102421896</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16399</id>
<type>DIRECTORY</type>
<name>stock.db</name>
<mtime>1663029031404</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16405</id>
<type>DIRECTORY</type>
<name>stk_balance_sheet</name>
<mtime>1661338700499</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16495</id>
<type>DIRECTORY</type>
<name>valuation__7c02ecf9_3e09_44c0_8121_7eb3e7c1d079</name>
<mtime>1660718214011</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16646</id>
<type>DIRECTORY</type>
<name>valuation__4ef5e657_b1b9_4dfb_bb2f_d04e25a4902a</name>
<mtime>1660718214009</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16650</id>
<type>DIRECTORY</type>
<name>valuation__940421e3_2ba0_4c20_b8d3_e01066c21156</name>
<mtime>1660718214012</mtime>
<permission>hdfs:supergroup:0755</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16678</id>
<type>DIRECTORY</type>
<name>46d8d97e-87df-4dff-baa4-c83024cfbce5</name>
<mtime>1659770965946</mtime>
<permission>hdfs:supergroup:0700</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16679</id>
<type>DIRECTORY</type>
<name>_tmp_space.db</name>
<mtime>1659770910781</mtime>
<permission>hdfs:supergroup:0700</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>16743</id>
<type>DIRECTORY</type>
<name>hadoop-yarn</name>
<mtime>1659770965040</mtime>
<permission>hdfs:supergroup:0700</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
<inode>
<id>32524</id>
<type>FILE</type>
<name>job_1646050356665_0363.summary</name>
<replication>1</replication>
<mtime>1661397832374</mtime>
<atime>1661397831960</atime>
<preferredBlockSize>134217728</preferredBlockSize>
<permission>hdfs:supergroup:0770</permission>
<blocks>
<block>
<id>1073747785</id>
<genstamp>6961</genstamp>
<numBytes>482</numBytes>
</block>
</blocks>
<storagePolicyId>0</storagePolicyId>
</inode>

可以看到,image的xml有一下几个特征

  1. 总的被fsimage语句块包围

  2. 里面子的有

    • version
    • NameSection
    • ErasureCodingSection
    • INodeSection
  3. version描述的一些版本信息

    1
    2
    3
    4
    5
    <version>
    <layoutVersion>-66</layoutVersion>
    <onDiskVersion>1</onDiskVersion>
    <oivRevision>a3b9c37a397ad4188041dd80621bdeefc46885f2</oivRevision>
    </version>
  4. NameSection 描述的是命名空间的一些信息

    1
    2
    3
    4
    5
    6
    7
    8
       <NameSection>
    <namespaceId>1614349331</namespaceId>
    <genstampV1>1000</genstampV1>
    <genstampV2>13399</genstampV2>
    <genstampV1Limit>0</genstampV1Limit>
    <lastAllocatedBlockId>1073754220</lastAllocatedBlockId>
    <txid>161098</txid>
    </NameSection>
  5. ErasureCodingSection

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <ErasureCodingSection>
    <erasureCodingPolicy>
    <policyId>1</policyId>
    <policyName>RS-6-3-1024k</policyName>
    <cellSize>1048576</cellSize>
    <policyState>DISABLED</policyState>
    <ecSchema>
    <codecName>rs</codecName>
    <dataUnits>6</dataUnits>
    <parityUnits>3</parityUnits>
    </ecSchema>
    </erasureCodingPolicy>
    <erasureCodingPolicy>
    <policyId>2</policyId>
    <policyName>RS-3-2-1024k</policyName>
    <cellSize>1048576</cellSize>
    <policyState>DISABLED</policyState>
    <ecSchema>
    <codecName>rs</codecName>
    <dataUnits>3</dataUnits>
    <parityUnits>2</parityUnits>
    </ecSchema>
    </erasureCodingPolicy>

    ErasureCodingSection 由一个一个erasureCodingPolicy片段组成,如果了解Erasure Code技术的话,就会明白这是一种纠删码,具体可以查看此篇纠删码Erasure Coding (分布式存储系统)
    常见的纠删码技术有阵列纠删码(Array Code: RAID5、RAID6等)、RS(Reed-Solomon)里德-所罗门类纠删码和LDPC(LowDensity Parity Check Code)低密度奇偶校验纠删码。 LDPC码目前主要用于通信、视频和音频编码等领域。

    看到image中的

    1
    <policyName>RS-3-2-1024k</policyName>

    就能猜测,hdfs中使用的是RS(Reed-Solomon)里德-所罗门类纠删码,感兴趣的可以自行搜索扩展。

  6. INodeSection

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    <INodeSection>
    <lastInodeId>43596</lastInodeId>
    <numInodes>1572</numInodes>
    <inode>
    <id>16385</id>
    <type>DIRECTORY</type>
    <name></name>
    <mtime>1659683329779</mtime>
    <permission>hdfs:supergroup:0755</permission>
    <nsquota>9223372036854775807</nsquota>
    <dsquota>-1</dsquota>
    </inode>
    <inode>
    <id>32524</id>
    <type>FILE</type>
    <name>job_1646050356665_0363.summary</name>
    <replication>1</replication>
    <mtime>1661397832374</mtime>
    <atime>1661397831960</atime>
    <preferredBlockSize>134217728</preferredBlockSize>
    <permission>hdfs:supergroup:0770</permission>
    <blocks>
    <block>
    <id>1073747785</id>
    <genstamp>6961</genstamp>
    <numBytes>482</numBytes>
    </block>
    </blocks>
    <storagePolicyId>0</storagePolicyId>
    </inode>

    INodeSection 由一段一段inode组成,是image中内容最大部分,除了上述的几个片段,image中剩下的内容全部都是inode。

    Node类在Namenode中代表了一个树状结构即Namespace,表示的是目录和文件的抽象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    <inode>
    <id>16386</id>
    <type>DIRECTORY</type>
    <name>tmp</name>
    <mtime>1659770965040</mtime>
    <permission>hdfs:supergroup:0733</permission>
    <nsquota>-1</nsquota>
    <dsquota>-1</dsquota>
    </inode>

    以上述为例,表示的tmp目录的信息,具体可以hadoop的org.apache.hadoop.hdfs.server.namenode.INode

fsiamge_000xxx.md5

该文件为镜像文件对应的md5码,用于校验镜像文件是否一致。

seen_txid

该文件存储了最新的txid,新的edits日志文件命名都在此txid上+1

可以看到seen_txid里的文字为161099
而edists_inprogress的结尾也是161099,保存的是最新的txid事务id

VERSION

该文件存储的是namenode的一些版本信息

参考文章

Hadoop分布式文件系统:架构和设计
纠删码Erasure Coding (分布式存储系统)

背景

最近自己搭建了一套hadoop 3.3.1的集群用于自己的股票指标计算,发现secondary namenode虽然把edits日志合并到了最新的image,但是历史的edits日志并不会删除,导致在元数据目录ls查看文件时,有很多edits文件。

从上图可以看到,当前时间日期为10.8号,但是8.18的edits日志依然没有删除.

通过统计文件数,可以看到一共有10009个文件,扣除VERSION,seen_txid等7个非edits文件,一共有10002个日志文件。

而edits太多,会影响到hadoop的启动。
根据网上相关的文章,在合并完后edits应该会删除,这一现象和了解到的只是相悖,本文为了了解edits啥时候会删除。

元数据各文件详解

从上图的元数据目录查看,可以看到文件分为以下6类

  • edits_000xxxx-000xxxx
  • edits_inprogress_000xxxx
  • fsimage_000xxx
  • fsimage_000xxx.md5
  • seen.txid
  • VERSION

那么,每种文件的作用和功能是什么呢?

具体可以参考本人之前文章: hdfs 中元数据 fsimage,edits详解

edits 文件什么时候删除?

那么,edits文件具体什么时候删除了,这时候就要从源码入手了。

edit log 删除源码探究

1、已知线索: dfs.namenode.name.dir

我们知道,写edits日志需要只要目标文件的目录,而edits日志的存储目录是由hdfs-site.xml文件中的

1
2
dfs.namenode.name.dir
默认值: file://${hadoop.tmp.dir}/dfs/name

进行指定的,那么这个配置就可以作为线索进行搜索,在hadoop源码中进行搜索dfs.namenode.name.dir

可以看到在HdfsClientConfigKeys类中,使用了变量DFS_NAMENODE_NAME_DIR_KEY来引用dfs.namenode.name.dir

2、查看使用DFS_NAMENODE_NAME_DIR_KEY的代码

使用idea的 find usage功能,来查看 DFS_NAMENODE_NAME_DIR_KEY 被哪些类使用

可以看到 在 DfsConfigKeys 中的变量 .DFS_NAMENODE_NAME_DIR_KEY 引用了HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_NAME_DIR_KEY

3、查看使用DfsConfigKeys.DFS_NAMENODE_NAME_DIR_KEY的代码

使用idea的 find usage功能,来查看 DFS_NAMENODE_NAME_DIR_KEY 被哪些类使用

可以看到 在 FSNamesystem类中,提供了一个静态方法

1
2
3
public static Collection<URI> getNamespaceDirs(Configuration conf) {
return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
}

返回元数据的目录。

4、查看使用getNamespaceDirs的代码

继续使用idea的 find usage功能,来查看 getNamespaceDirs 被哪些类使用

可以看到 在 FSNamesystem类中,有一个方法getNamespaceEditsDirs使用了,而这个方法名称看着就像是我们要找的,通过阅读方法注释,了解到这个方法是返回一个排序的日志目录list用于写入。

5、查看使用getNamespaceEditsDirs的代码

继续使用idea的 find usage功能,来查看 getNamespaceEditsDirs 被哪些类使用

可以看到 在 FSImage 类中,构造方法使用了该方法

继续点击this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Construct the FSImage. Set the default checkpoint directories.
*
* Setup storage and initialize the edit log.
*
* @param conf Configuration
* @param imageDirs Directories the image can be stored in.
* @param editsDirs Directories the editlog can be stored in.
* @throws IOException if directories are invalid.
*/
protected FSImage(Configuration conf,
Collection<URI> imageDirs,
List<URI> editsDirs)
throws IOException {
this.conf = conf;

storage = new NNStorage(conf, imageDirs, editsDirs);
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
storage.setRestoreFailedStorage(true);
}

this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);
archivalManager = new NNStorageRetentionManager(conf, storage, editLog);
FSImageFormatProtobuf.initParallelLoad(conf);
}

可以看到this.editLog = FSEditLog.newInstance(conf, storage, editsDirs);使用传入的editsDirs构建了FSEditLog对象。

FSEditLog,文件系统的edit log,这不正是我们想要删除的文件对应的对象吗?

FSImage,文件系统的镜像,包含了FSEditLog。

想要找删除的方法有极大的概率就包含在这两个类中。

6、查看FSEditLog类

1
2
3
4
5
6
7
/**
* FSEditLog maintains a log of the namespace modifications.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLog implements LogsPurgeable {

看类先看注解,FSEditLog的注解为,FSEditLog 维护命名空间修改的日志。

再看继承和接口,只有一个接口,LogsPurgeable

1
2
3
4
5
/**
* Interface used to abstract over classes which manage edit logs that may need
* to be purged.
*/
interface LogsPurgeable {

该接口是对可能需要被清除的edit log日志的管理的抽象。
说人话就是用于清除edit log日志,这不就是本次要找的删除文件操作码。

查看LogsPurgeable

改接口非常简单,之定义了两个方法。
重点来看 purgeLogsOlderThan

1
2
3
4
5
6
7
8
/**
* Remove all edit logs with transaction IDs lower than the given transaction
* ID.
*
* @param minTxIdToKeep the lowest transaction ID that should be retained
* @throws IOException in the event of error
*/
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;

清除所有比传入事务txid低的edit logs 文件。

可以基本确认这就是删除edit log文件的入口了。

接着查看该方法的实现类有哪些。

由于我们要删除的是文件,重点关注 FileJournalManager

7、查看FileJournalManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Purges the unnecessary edits and edits_inprogress files.
*
* Edits files that are ending before the minTxIdToKeep are purged.
* Edits in progress files that are starting before minTxIdToKeep are purged.
* Edits in progress files that are marked as empty, trash, corrupted or
* stale by file extension and starting before minTxIdToKeep are purged.
* Edits in progress files that are after minTxIdToKeep, but before the
* current edits in progress files are marked as stale for clarity.
*
* In case file removal or rename is failing a warning is logged, but that
* does not fail the operation.
*
* @param minTxIdToKeep the lowest transaction ID that should be retained
* @throws IOException if listing the storage directory fails.
*/
@Override
public void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException {
LOG.info("Purging logs older than " + minTxIdToKeep);
//获取目录下的所有文件数组
File[] files = FileUtil.listFiles(sd.getCurrentDir());
//从数组中找出属于edit的文件
List<EditLogFile> editLogs = matchEditLogs(files, true);
//同步锁
synchronized (this) {
//循环遍历
for (EditLogFile log : editLogs) {
//如果日志文件的第一个事务txid及最后一个事务txid都比最小保存事务txid小,那么清除该日志
if (log.getFirstTxId() < minTxIdToKeep &&
log.getLastTxId() < minTxIdToKeep) {
//清除日志
purger.purgeLog(log);
} else if (isStaleInProgressLog(minTxIdToKeep, log)) {
//不然标记为稳定的日志。
purger.markStale(log);
}
}
}
}

该方法比较简单易懂,继续点击查看purger.purgeLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class DeletionStoragePurger implements StoragePurger {
@Override
public void purgeLog(EditLogFile log) {
LOG.info("Purging old edit log {}", log);
deleteOrWarn(log.getFile());
}

private static void deleteOrWarn(File file) {
if (!file.delete()) {
// It's OK if we fail to delete something -- we'll catch it
// next time we swing through this directory.
LOG.warn("Could not delete {}", file);
}
}

非常简单,通过获取EditLogFile的file对象,最后调用file.delete()实现文件删除,如果无法删除,那么告警。

至此,底层的文件删除我们已经找到了,但是上游最小事务txid是如何获取的还未知道。

回到FSEditLog.purgeLogsOlderThan 方法中,通过idea的find useage方法查看该类被哪些类使用


大部分方法都是purgeLogsOlderThan调用purgeLogsOlderThan,只有 NNStorageRetentionManager.purgeLogsOlderThan不是。

8、查看NNStorageRetentionManager

1
2
3
4
5
6
7
8
9
10
/**
* The NNStorageRetentionManager is responsible for inspecting the storage
* directories of the NN and enforcing a retention policy on checkpoints
* and edit logs.
*
* It delegates the actual removal of files to a StoragePurger
* implementation, which might delete the files or instead copy them to
* a filer or HDFS for later analysis.
*/
public class NNStorageRetentionManager {

惯例先看注解

NNStorageRetentionManager 负责检查 NN 的存储目录并对检查点和编辑日志执行保留策略。 它将文件的实际删除委托给 StoragePurger 实现,该实现可能会删除文件或将它们复制到文件管理器或 HDFS 以供以后分析。

日志执行保留策略,看来快到终点了。

查看purgeOldStorage方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void purgeOldStorage(NameNodeFile nnf) throws IOException {
FSImageTransactionalStorageInspector inspector =
new FSImageTransactionalStorageInspector(EnumSet.of(nnf));
storage.inspectStorageDirs(inspector);

long minImageTxId = getImageTxIdToRetain(inspector);
purgeCheckpointsOlderThan(inspector, minImageTxId);

if (nnf == NameNodeFile.IMAGE_ROLLBACK) {
// do not purge edits for IMAGE_ROLLBACK.
return;
}

// If fsimage_N is the image we want to keep, then we need to keep
// all txns > N. We can remove anything < N+1, since fsimage_N
// reflects the state up to and including N. However, we also
// provide a "cushion" of older txns that we keep, which is
// handy for HA, where a remote node may not have as many
// new images.
//
// First, determine the target number of extra transactions to retain based
// on the configured amount.
long minimumRequiredTxId = minImageTxId + 1;
//最小的日志事务id为: 最小需要保留的事务txid减去需要额外保留的事务id,其中minimumRequiredTxId为检查点镜像文件的最后一条事务id,本质上就是保留numExtraEditsToRetain条事务。
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);

//edit log的文件输入流
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
//填充
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
//排序,优先比较第一条事务txid,然后比较最后一条事务txid
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {
return ComparisonChain.start()
.compare(a.getFirstTxId(), b.getFirstTxId())
.compare(a.getLastTxId(), b.getLastTxId())
.result();
}
});

// Remove from consideration any edit logs that are in fact required.
//如果edit log文件的第一个事务txid比最小需要保留的事务txid大,那么该edit log需要保留,从待editLogs list中移除。
while (editLogs.size() > 0 &&
editLogs.get(editLogs.size() - 1).getFirstTxId() >= minimumRequiredTxId) {
editLogs.remove(editLogs.size() - 1);
}

// Next, adjust the number of transactions to retain if doing so would mean
// keeping too many segments around.
//如果editLogs list的条数比需要保留的最大edits文件数多,那么能保留的最小事务id purgeLogsFrom 需要扩大,将purgeLogsFrom 置为该日志的文件事务txid
//如果了解前因后果的话,需要清除的日志由两个参数控制,需要保留的事务数量及需要保留的日志文件数,两个是且的关系,只有两个条件都满足的日志才会保留,有一个不满足就会删除,这里其实就是当日志文件数大于需要保留的日志文件数时,多余的日志文件数需要清除。
while (editLogs.size() > maxExtraEditsSegmentsToRetain) {
purgeLogsFrom = editLogs.get(0).getLastTxId() + 1;
editLogs.remove(0);
}

// Finally, ensure that we're not trying to purge any transactions that we
// actually need.
//最后确认下,本次清除的事务id必须比最低要求低,不然抛出异常。
if (purgeLogsFrom > minimumRequiredTxId) {
throw new AssertionError("Should not purge more edits than required to "
+ "restore: " + purgeLogsFrom + " should be <= "
+ minimumRequiredTxId);
}
//调用方法清除,这个前文已经将了
purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
}

该方法比较长,本人都在关键部位注释。
方法基本就是根据一些逻辑计算出需要保留的最小事务txid,最后调用purgeableLogs.purgeLogsOlderThan清除小于该事务的log文件,如何清除的上文已经讲解。

那么计算逻辑是什么呢?有两点

  1. 保留 numExtraEditsToRetain 条事务
  2. 保留 maxExtraEditsSegmentsToRetain 个edit log日志文件
    两个策略独立运行,只要有一个条件不满足,日志就会被删除。

至此,我们已经知道了edit log日志文件的删除逻辑。

9、查看numExtraEditsToRetain和maxExtraEditsSegmentsToRetain对应的配置参数

最后,查看源码,获得numExtraEditsToRetain和maxExtraEditsSegmentsToRetain对应的配置参数。

numExtraEditsToRetain对应的参数为 dfs.namenode.num.extra.edits.retained

1
2
3
4
this.numExtraEditsToRetain = conf.getLong(
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT);
public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained";

maxExtraEditsSegmentsToRetain对应的参数为 dfs.namenode.max.extra.edits.segments.retained

1
2
3
4
this.maxExtraEditsSegmentsToRetain = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT);
public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained";

在查看hadoop的官方hdfs-default.xml文档获得相关配置的解释。

name value description 翻译
dfs.namenode.num.extra.edits.retained 1000000 The number of extra transactions which should be retained beyond what is minimally necessary for a NN restart. It does not translate directly to file’s age, or the number of files kept, but to the number of transactions (here “edits” means transactions). One edit file may contain several transactions (edits). During checkpoint, NameNode will identify the total number of edits to retain as extra by checking the latest checkpoint transaction value, subtracted by the value of this property. Then, it scans edits files to identify the older ones that don’t include the computed range of retained transactions that are to be kept around, and purges them subsequently. The retainment can be useful for audit purposes or for an HA setup where a remote Standby Node may have been offline for some time and need to have a longer backlog of retained edits in order to start again. Typically each edit is on the order of a few hundred bytes, so the default of 1 million edits should be on the order of hundreds of MBs or low GBs. NOTE: Fewer extra edits may be retained than value specified for this setting if doing so would mean that more segments would be retained than the number configured by dfs.namenode.max.extra.edits.segments.retained. 应该保留的额外事务的数量超出了 NN 重新启动所需的最低限度。它不会直接转换为文件的年龄或保存的文件数量,而是转换为事务的数量(这里“编辑”是指事务)。一个编辑文件可能包含多个事务(编辑)。在检查点期间,NameNode 将通过检查最新的检查点事务值减去此属性的值来确定要保留的编辑总数。然后,它扫描编辑文件以识别不包括计算范围的保留交易的旧文件,并随后清除它们。保留对于审计目的或 HA 设置很有用,其中远程备用节点可能已离线一段时间并且需要保留更长的保留编辑积压才能重新开始。通常,每次编辑大约为几百字节,因此默认的 100 万次编辑应该是数百 MB 或低 GB 的数量级。注意:如果这样做意味着保留的段数多于 dfs.namenode.max.extra.edits.segments.retained 配置的数量,则保留的额外编辑可能少于为此设置指定的值。
dfs.namenode.max.extra.edits.segments.retained 10000 The maximum number of extra edit log segments which should be retained beyond what is minimally necessary for a NN restart. When used in conjunction with dfs.namenode.num.extra.edits.retained, this configuration property serves to cap the number of extra edits files to a reasonable value. 应该保留的额外编辑日志段的最大数量超出了 NN 重新启动所需的最低限度。 当与 dfs.namenode.num.extra.edits.retained 一起使用时,此配置属性用于将额外编辑文件的数量限制为一个合理的值

可以看到,文档和我们源码看到的一致,至此,已完全弄清楚hadoop的元数据是如何删除edit log日志的。

log是 10002个?

通过上节的源码及配置参数,我们再来解释下元数据目录下edit log为什么是10002个。


首先,namenode元数据目录下的文件数一共是10009个


扣除VERSION,edits_ingrogress等非历史edits log文件7个,还剩余10002个日志。

由于本人的hadoop集群事务较少,因此dfs.namenode.num.extra.edits.retained这个1000000保留事务条件未触发,该条件不会进行删除。

dfs.namenode.max.extra.edits.segments.retained 这个10000个保留日志文件数条件达到,因此会进行日志删除。

但是10002个好像比参数的10000个多2个?

注意看下镜像文件

  • fsimage_0000000000000257178

  • fsimage_0000000000000257180
    可以看到镜像文件的保留策略为2个,第一个镜像的合并截止事务为0000000000000257178,也就是上文源码说的最小事务id为0000000000000257178,也就是说 0000000000000257178 这个文件不在五行中。

  • edits_0000000000000254548-0000000000000257178

  • edits_0000000000000257179-0000000000000257180
    属于必须得日志文件,不能删除,否则无法根据日志还原镜像。

修改保留策略

1、修改hdfs-site.xml,添加

1
2
3
4
5
<property>
<name>dfs.namenode.max.extra.edits.segments.retained</name>
<value>100</value>
</property>
修改为100个

2、动态刷新hdfs配置。

1
hdfs dfsadmin  hdfs://nn1:9000 -refreshSuperUserGroupsConfiguration

目的

此页面用于收集平时看到的数仓相关的知识文章

概念

数据仓库、数据湖、数据中台一文读懂

数仓规范

数仓规范详解

数仓建设

主题域文档

指标建设

画像建设

数据建模

数据治理

数据湖

其他

为什么说第三代指标平台的本质是做“轻”数仓
为什么又造了个新词 Data Warebase:我看到了 AI 时代数据平台应当的样子