0%

关于主题域划分的疑问

主题间应该尽量不交叉使用,但是在轻度汇总层以上,非常容易交叉。如电商,会有用户主题、交易主题,那么,用户交易应该属于哪个?

结论

个人观点,主题域之间的交叉使用无法避免,对于用户交易这样粒度的汇总表,放到以实体为主的用户主题中,不在交易主题中体现,交易主题中体现的是最细粒度的轻度汇总。

即:
1、事实明细层、维度层按照业务过程、实体概念等划分,如用户基础信息,交易表,活动表。
2、轻度汇总,及多个维度的中高度汇总,按照业务过程划分,如用户商品汇总表。
3、高度到单一实体的汇总,按照实体概念汇总,如用户活动表。
4、ads层则按照数据集市概念进行划分,如xx专题分析、财务部等。

什么是主题划分

inmon的主题

inmon中的主题,是面向企业数据模型中的企业主题域来划分的,其本质就是高层模型中的实体,用于后续划分表归属的。

在inmon的《数据仓库》一书中,对于主题的定义是这样的。

传统的操作型系统是围绕公司的功能性应用进行组织的。对一个保险公司来说,应用问题可能是汽车保险、健康保险、人寿保险与意外伤亡保险。公司的主要主题域可能是顾客、保险单、保险费与索赔。
数据仓库面向在高层的企业数据模型中已定义好的企业主题域。典型的主题域有:顾客、产品、交易或活动、政策、索赔、账目。
在数据仓库中,每一个主要主题域都是以一组相关的表来具体实现的。一个主题域可能由10个,100个或更多的相互联系的物理表组层。例如,一个顾客主题域可能由顾客基本信息表,顾客活动表组层。

而所谓的高层模型,指的是inmon的建模三步骤:数据建模分为三个层次:高层模型(称为实体关系图,或ERD),中间层建模(称为数据项集或DIS)、底层建模(称为物理模型)。

kimball 中的主题

kimball在《数据仓库工具箱》一书中,并没有提及主题这一概念。

为什么要主题划分

chatGpt中的回答是这样的

主题划分在数据仓库设计和建设过程中是非常重要的,主要基于以下几个原因:
1.提高查询效率和性能:通过将数据按照主题进行划分,可以将相关的数据存储在一起,减少查询时需要扫描的数据量,从而提高查询效率和性能。
2.简化数据管理和维护:主题划分可以帮助数据仓库管理员更轻松地管理和维护数据。每个主题都有明确的边界和责任人,使得数据管理和维护变得更加清晰和可控。
3.提高数据质量和一致性:主题划分有助于提高数据的质量和一致性。相同主题下的数据通常遵循相同的数据标准和规范,可以更容易地进行数据清洗、整合和转换,从而提高数据的一致性和准确性。
4.支持业务分析和决策:主题划分使得数据更加符合业务逻辑和业务需求,有助于支持业务分析和决策。通过针对不同主题进行分析,可以更好地理解业务的运作情况,发现潜在的业务机会和问题。
5.降低数据耦合性:通过将数据按照主题进行划分,可以降低数据之间的耦合性,使得数据更加灵活和可扩展。不同主题之间的数据可以相对独立地进行处理和管理,有利于系统的扩展和维护。

个人理解
1.主题是对所有数据业务的抽象归类,类似字典,便于业务理解使用,可以更好的支持业务分析和决策。
2. 按主题划分后,开发人员更好维护数据,避免数据的重复开发,提高一致性。

怎么做主题划分

1.按照业务系统划分
因为大部分企业都已经经历过了信息化建设或者正处于信息化建设当中,企业各种业务系统都已经部署完成,财务部门有财务系统、销售部门有销售系统、生产部门有生产系统、供应链部门有供应链系统……

这些不同的业务系统,因为只会储存对应业务流程中产生的数据,下级数据主题都互相紧贴,是天然的主题域,业务系统有几种,就可以划分为几种主题域。

如 生产主题域、财务主题域、人力主题域

2.按照需求划分
很多时候,企业需要长期对某个方向进行分析,因为这个长期分析的过程涉及到各种主题,会对数据进行细分、归纳,在这个过程中,就由需求诞生了主题域。

就拿销售分析来说,这个分析过程会涉及到的对象有产品、仓库、经销商、顾客等,其中每一个分析对象就是一个数仓主题,而包含归纳这些主题的销售分析就成为了一个相应的主题域。

3.按照功能划分
在现代社会,软件是每个加入互联网的网民都会使用到的东西,这些由企业开发的软件拥有着不同的功能模块,比如说社交软件中就会有聊天、朋友圈、群聊、发送文件等功能。

从这些功能中选一个模块,聊天模块会涉及到数据仓库中的用户主题、图片主题、文字主题等,所以聊天模块也能被归纳为聊天主题域。

4.按照部门划分
现代企业都有着不同的业务部门,这些部门也会形成各种不同的主题域,比如说销售域、生产域、财务域等,而这些主题域也是由不同的数据主题组成的。

与分层有啥关系?

数仓分层是从存储,从管理角度对数据进行组织,是纵向空间上的划分。
主题域划分是从使用,从业务的角度对数据进行组织,是横向业务上的划分。

1、ods层,不参与划分
2、事实明细层、维度层按照业务过程、实体概念等划分,如用户基础信息,交易表,活动表。
3、轻度汇总,及多个维度的中高度汇总,按照业务过程划分,如用户商品汇总表。
4、高度到单一实体的汇总,按照实体概念汇总,如用户活动表。
5、ads层则按照数据集市概念进行划分,如xx专题分析、财务部等。

参考

问题

连续登录问题,是实际分析需求和面试中常见的问题。

其实,连续登录问题常规来说有两种场景,其难度完全不一样。

第一种是指定连续登录天数,求满足的用户
第二种则是求用户连续登录了多少天。

本文只是简单探讨。

语法

本文使用的spark sql的语法,如有差异,自行替换。

表结构&数据

明细就直接忽略了,直接以用户登录日期粒度的表结构为主,实际需求中可以自行聚合到这个粒度。
表结构&数据

user_id login_date
1 ‘2023-01-01’
1 ‘2023-01-02’
1 ‘2023-01-03’
1 ‘2023-01-04’
1 ‘2023-01-11’
1 ‘2023-01-12’
1 ‘2023-01-15’
2 ‘2023-01-15’
3 ‘2023-01-01’
3 ‘2023-01-02’
3 ‘2023-01-03’
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
with user_log as (
select col1 as user_id, col2 as login_date
from values
(1, '2023-01-01')
,(1, '2023-01-02')
,(1, '2023-01-03')
,(1, '2023-01-04')
,(1, '2023-01-11')
,(1, '2023-01-12')
,(1, '2023-01-15')
,(2, '2023-01-15')
,(3, '2023-01-01')
,(3, '2023-01-02')
,(3, '2023-01-03')
)
select * from user_log

场景

场景1 给定具体日期,且指定连续登录天数

例如: 求在2023-01-03号那天连续登录3天的用户。

这种场景是最简单的,直接聚合+having

1
2
3
4
5
6
7
select user_id
,count(*) as cnt
from user_log
where login_date between date_sub('2023-01-03',2) and '2023-01-03'
group by user_id
having cnt>=3
order by user_id

结果:

user_id cnt
1 3
1 3

场景2,不给定具体日期,但指定连续登录天数

例如:求用户在哪几天连续登录2天。

这部分需要用到窗口函数了,同时由于日期间隔为1,可以使用range特性

1
2
3
4
select *
, count(*) over (partition by user_id order by datediff(login_date, '2000-01-01') range between 1 preceding and current row ) as continue_cnt
from user_log
order by user_id,login_date

结果如下

user_id login_date continue_cnt
1 2023-01-01 1
1 2023-01-02 2
1 2023-01-03 2
1 2023-01-04 2
1 2023-01-11 1
1 2023-01-12 2
1 2023-01-15 1
2 2023-01-15 1
3 2023-01-01 1
3 2023-01-02 2
3 2023-01-03 2

结果没有过滤,是为了方便理解,大家自行加上continue_cnt=2即可。

场景3,既不给定具体业务日期,也不指定连续登录天数。

这种场景是最难的,最耗性能的,但也是实用性最高的。

row_number 解法

说实话,这个解法本人没有想出来,是网上搜到的,使用row_number进行排序,利用日期差特性,如果是连续的,那么实际的差值日期是一样的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
,login_date_grp as (
select *
,date_sub(login_date,login_date_rn) as continue_date_grp --连续登录日期分组
from
(
select *
,row_number() over (partition by user_id order by login_date) as login_date_rn
from user_log
order by user_id,login_date
) a
)
select user_id
,login_date
,row_number() over(partition by user_id,continue_date_grp order by login_date) as continue_login_day_cnt --连续登录天数
,login_date_rn --用户登录日期排序
,continue_date_grp --用户连续登录日期分组
from login_date_grp
order by user_id
,login_date

结果如下

user_id login_date continue_login_days_cnt login_date_rn continue_date_grp
1 2023-01-01 1 1 2022-12-31
1 2023-01-02 2 2 2022-12-31
1 2023-01-03 3 3 2022-12-31
1 2023-01-04 4 4 2022-12-31
1 2023-01-11 1 5 2023-01-06
1 2023-01-12 2 6 2023-01-06
1 2023-01-15 1 7 2023-01-08
2 2023-01-15 1 1 2023-01-14
3 2023-01-01 1 1 2022-12-31
3 2023-01-02 2 2 2022-12-31
3 2023-01-03 3 3 2022-12-31

可以看到,结果是符合我们预期的,这时候只要外面再套上一层过滤条件即可。

看的稍微有点蒙?这里稍微解释一下,这里巧妙的利用了等差数组的特性。
比如有如下一个用户连续5天登录的数据,同时我们对同一用户按照日期进行row num排序,再求每一行和’2023-01-01’这一天的差值日期,然后再对这两列求差值,会看到如下结果

用户id 登录日期 日期排序 和’2022-12-31’的间隔天数 日期排序和’2023-01-01’的间隔天数的差值
1 2023-01-01 1 1 0
1 2023-01-02 2 2 0
1 2023-01-03 3 3 0
1 2023-01-04 4 4 0
1 2023-01-05 5 5 0

我们可以观察到,如果日期是连续的,那么本身和’2022-12-31’的间隔天数这一列就是+1的规则,而row num也是自然+1的,差值永远是1

此时,如果删除2023-01-03这一天的登录记录,那么结果就会变为

用户id 登录日期 日期排序 和’2022-12-31’的间隔天数 日期排序和’2022-12-31’的间隔天数的差值
1 2023-01-01 1 1 0
1 2023-01-02 2 2 0
1 2023-01-04 3 4 1
1 2023-01-05 4 5 1
可以看到,随着2023-01-03这条记录被删除,在2023-01-04这一天,间隔从0变为了1,而2023-01-05这天还是连续登录的,间隔仍然是1.

此时就能发现规律了,如果是连续登录的,那么间隔差值就不会变,这里其实可以用数学公式证明,这里就补展开了。

所以,为什么代码里用的是date_sub作为分组?其实我们要求的是差值,而日期本身就是连续的数字,date_sub(日期,rn) 是不是就是可以看成差值。 还节约了计算资源。

这题的解法,本人认为还是有很多精巧设计在里面的,一般确实很难想到。

使用日期维表获取上一次未登陆日期

这个是本人想到的,可以利用日期维表,补全没有登录的天数,然后利用窗口函数求出上一次未登陆日期,再利用datediff求出差值即连续登录天数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
dim_date as (
select t1.*
,date_add('2022-12-31',row_num) as dt_ymd
from
(
select row_number() over(order by m) as row_num
from
(select split(repeat(",",15),",") as multi) a
LATERAL VIEW explode(multi) tmpTable as m
) t1
)
,user_full_date as (
select t1.user_id
,t2.dt_ymd
from
(select distinct user_id from user_log) t1
cross join dim_date t2
)
,last_unlogin as (
select a.user_id
,a.dt_ymd
,b.user_id as user_id2
,b.login_date
,last_value(if(b.login_date is not null,null,a.dt_ymd),true) over(partition by a.user_id order by a.dt_ymd rows between unbounded preceding and current row ) as last_unlogin_date
from user_full_date a
left join user_log b
on a.dt_ymd=b.login_date
and a.user_id=b.user_id
)
select user_id
,login_date
,date_diff(login_date,nvl(last_unlogin_date,'2022-12-31')) as continue_days
,last_unlogin_date
from last_unlogin
where user_id2 is not null
order by user_id,dt_ymd


结果如下

user_id login_date continue_days last_unlogin_date
1 2023-01-01 1 null
1 2023-01-02 2 null
1 2023-01-03 3 null
1 2023-01-04 4 null
1 2023-01-11 1 2023-01-10
1 2023-01-12 2 2023-01-10
1 2023-01-15 1 2023-01-14
2 2023-01-15 1 2023-01-14
3 2023-01-01 1 null
3 2023-01-02 2 null
3 2023-01-03 3 null

可以看到结果符合预期。

总结

一直以为自己sql水平还是不错的,原来以前一直没怎么处理过连续登录问题,面试被问了还是懵逼了一下。该总结的还得总结一下。

总的来说,连续登录还是比较难的,尤其是考虑性能的情况下,使用row_number利用等差数据确实是一个很精妙的解法。

数据集市定义

数据集市是一个结构概念,它是企业级数据仓库的一个子集,主要面向部门级业务,并且只面向某个特定的主题。

有两种类型的数据集市——独立型和从属型

数据集市的应用

数据集市是数仓之上更聚焦的业务主题合集,更偏向于应对业务数据快速高效应用的需求,一般用于商业智能系统中探索式和交互式数据分析应用。

数据集市的一般架构

如何建设数据集市

  • 自上而下建设
  • 自下而上建设

数据集市和数据仓库的区别

类目 数据仓库 数据集市
数据来源 生产系统外部数据等 数据仓库或生产系统等外部数据
规模范围 企业级 部门或工作组级
主题 以企业为主题 以部门或特殊分析为主题
数据粒度 最细粒度 较粗的粒度

分层诞生的背景

我们可以想象这样一个场景。
第一天,小A要写一个任务,获取每日的成交金额,然后制作成一张报表给老板看,于是小A就蹭蹭蹭的写了一个脚本,从数据抽取,到sum的逻辑加工,然后使用报表查询。完美收工

第二天,小A又接到一个任务,这一次是运营那边需要每日的交易金额,展现的活动页面上,小A想,上次取过,直接用上次的表拿来就用,完美收工。

第三天,小A的第一个业务方说,我每天的成交金额不要浙江省的,小A想改那个脚本,但是运营也在用,犯了难,只能cv大法了。

第四天,新同事小B接到一个任务,要取上海市的每日成交金额,最近跌的厉害,小B看了下小A写表,没法用啊,于是蹭蹭蹭自己从抽取,到sum加工,到where过滤,到报表查询重新写了一遍。

看出什么问题了没,任务管理混乱,没有复用,重复开发,应用方需要花费大量的精力在底层细节上。

而分层便是基于任务功能做出的层级划分,分工开发,提高效率。

什么是分层

分层是以解耦为核心,基于任务功能做出的分工和层级划分。每一层分工计算,提高效率。

分层的好处

  • 清晰数据结构: 每一层都有其作用域,方便理解定位和使用
  • 统一数据口径:提供统一的数据出口。
  • 减少重复开发: 通过中间层,减少重复计算。
  • 把复杂问题简单化: 将一个复杂任务拆解为多个步骤,处理单一功能,方便理解、排查和恢复。
  • 屏蔽原始数据异常: 屏蔽业务影响,通过下层改动对上层无感知。
  • 业务应用方从细节解放

传统的分层

这里就不细说了,ods,cdm,ads

什么是数据模型?

数据模型就是数据的组织和存储方法,它强调从业务、数据存取和使用角度合理存储数据。

怎么理解?

  • 把数据看做图书馆的书,希望书分门别类方案,如何分门别类便是我们的模型
  • 把数据看做城市建筑,希望城市建筑布局合理,布局方式便是我们的模型
  • 把数据看做纸质资料,希望资料按类别、年份分不同文件夹放好,这也是模型

数据模型的好处

  • 性能:良好的数据模型能帮助我们快速查询所需的数据,减少IO
  • 成本:良好的数据模型能极大的减少不必要的数据冗余,也能实现计算结果的复用,减少存储和计算成本。
  • 效率:能改善用户使用数据的体验,提高使用效率。
  • 质量:减少数据统计口径的不一致性,减少计算错误的可能性。

有最好的模型吗

没有,只有最合适的模型,模型是在业务,存储和使用者三方的总和考虑,同时也是性能,成本,效率,质量之间的平衡考虑。

常见的数据模型有哪些?

ER模型

ER模型,实体关系模型(Entity-Relationship),将事务抽象为数据实体和关系。

er模型符合三范式。

谁用er模型

wiliam H。inmon 在building the data warehouse一书中使用

数据仓库3NF和OLTP系统中的3NF区别

数据仓库的3NF是站在企业角下面向主题的抽象,而不是针对某个具体的业务流程的实体对象关系的抽取。

ER模型的特点

  • 需要全面了解企业业务和数据
  • 实施周期非常长
  • 对建模人员要求高

ER如何建模的

出发点是整合数据,将各个系统中的数据以整个企业角度按主题进行相似性组合和合并,并进行一致性处理

建模步骤

  • 高层模型:(称为实体关系图,或ERD)高度抽象,描述主题和主题的关系,描述总体概括
    主要由集成范围决定哪些实体属于模型范围而哪些不是。
    将不同的用户的观点整合。
    产出主要主题或实体。
  • 中层模型:(数据项集或DIS)细化主题的数据项
    主要数据分组
    二级数据分组
    连接器
    数据的类型
  • 物理模型:表的合并、分区等设计

举个例子

还是以最常见的商城为例
在业务系统中,我们的系统会分为:

  • trade 交易
  • pay 支付
  • crm crm系统
  • fin财务系统

而我们会根据公司重新组织,分成

  • 交易主题
  • 流量主题
  • 销售主题
  • 财务主题
  • 营销主题
  • 采购主题
  • 供应商主题
  • 门店主题

第一步是高层维度,即各个主题的er关系。
比如交易主题和销售、供应商、门店、采购、财务这些都有关系。
而采购和供应商有关系,和销售则没有关系
第二部 则是中层模型
比如在销售主题内,销售人员和出勤、订单、访问、电话等都有管理,形成不同的中层数据项
第三步 则是物理模型
最终落地如何存储,分表还是合并,是否分区等。

维度建模

维度建模是palph kimball所倡导的,在 the data warehouse toolkit一书中提及。

其基于分析决策需求须发构建模型,典型的代表为星形模型和雪花模型。

如何建模

  • 选择决策的业务过程
  • 选择粒度 维度的组合
  • 识别维度 维度属性,用于分析时进行的分组和筛选
  • 选择事实 确定衡量的指标

举个例子

分析过去一星期浙江的哪个商品卖的最好

选择业务过程

  • 选择决策的业务过程
    卖的最好,其排序是根据下单金额,也就是下单这一业务过程。
  • 选择粒度
    根据需求,过去一星期为业务时间,也就是下单时间,浙江为地域,理解成门店开在浙江的,商品卖的好,商品。
    因此粒度为:时间,门店-地域,商品
  • 识别维度 维度属性
    这里属性较少,极简情况下为日期、星期(第n周)、省份,商品id,商品名称
  • 选择事实
    下单金额。

参考

  • 大数据之路
  • building the datawarehouse
  • the data warehouse toolkit: the definitive guid to dimensional modeling

引言

在上一篇spark源码阅读-spark启动流程中,了解到了spark提交命令 spark-submit shell端的运行情况。

举例来说

1
2
3
4
5
6
7
8
9
10
/opt/soft/spark/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 1 \
--executor-memory 800M \
--driver-memory 700M \
--executor-cores 2 \
--class org.apache.spark.examples.JavaWordCount \
/opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar \
file:/opt/soft/spark/examples/src/main/resources/kv1.txt

上述shell是像spark提交自带的wordcount程序,那么经过shell端解析,最终获取到的执行命令为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/opt/jdk/bin/java \
-cp /opt/soft/spark/conf/:/opt/soft/spark/jars/*:/opt/soft/hadoop-3.3.1/etc/hadoop/ \
-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=700M \
--class org.apache.spark.examples.JavaWordCount --num-executors 1 \
--executor-memory 800M \
--executor-cores 2 \
/opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar \
file:/opt/soft/spark/examples/src/main/resources/kv1.txt

可以看到,最后运行的是org.apache.spark.deploy.SparkSubmit这个类,并将一开始的传入的参数继续传入到这个类里面,本文就简单了解一下sparkSubmit这个类到底做了什么。

spark-submit的运行流程

整体概览


上图即spark-submit后运行的过程,该进程主要涉及以下几个类。

  • SparkSubmit Object java main入口
  • SparkSubit Class 启动 Spark 应用程序的主要网关。
    该程序处理并设置Spark相关的依赖项的类路径,并抽象了一层,以支持在不同集群管理器上运行和部署模式上。
  • SparkApplication spark应用程序的入口点,本人用的yarn cluster模式,因此是YarnClusterApplication
  • Client 客户端,用于和yarn交互。如创建app,或kill。

详细执行

如上图所示

1. 通过java命令,启动了一个jvm,该jvm类的入口类为org.apache.spark.deploy.SparkSubmit

2. 进入main,该方法比较简单,主要为new SparkSubmit,并调用doSumit方法

1
2
3
4
5
6
7
8
9
10
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit
override protected def logError(msg: => String): Unit
override def doSubmit(args: Array[String]): Unit
......
}
submit.doSubmit(args)

3. SparkSubmit类中,参数不断进行传递doSumit->submit->runMain

最后在runMain方法中通过反射创建了YarnClusterApplication的实例,并调用start方法

1
2
3
4
5
6
7
8
9
10
// doSubmit->submit, doSumit 调用 submit方法,走的是case SparkSubmitAction.SUBMIT
def doSubmit(args: Array[String]): Unit = {
......
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//submit->doRunMain->runMain submit方法中直接调用函数doRunMain,doRunMain判断是否需要代理用户,之后就调用runMain方法
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
......
} else {
runMain(args, uninitLog)
}
}

// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//最后,在runMain方法中,通过反射创建了YarnClusterApplication,并调用start方法。关于SparkApplication,参见下文解释。
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
......
//这里走的是第一个分支,通过反射创建
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
......
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
} finally {
......
}
}

4. YarnClusterApplication.start方法new了Client,并继续调用run方法

其中Client类里初始化了yarn的各种连接
并且此时将待运行程序的参数org.apache.spark.examples.JavaWordCount等参数通过new ClientArguments(args)传递给了Client这个类,这些参数在后续构建yarn container时会进行使用。

1
2
3
4
5
6
7
8
9
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
conf.remove(ARCHIVES)

new Client(new ClientArguments(args), conf, null).run()
}

5. Client.run调用submitApplication方法

最后在submitApplication方法中,通过和yarn的交互,最后通过yarnClient.submitApplication(appContext)提交了application给yarn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//run调用 submitApplication
def run(): Unit = {
submitApplication()
......
}
//submitApplication方法提交yarn application
def submitApplication(): Unit = {
ResourceRequestHelper.validateResources(sparkConf)

try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()

if (log.isDebugEnabled) {
logDebug("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
}

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
this.appId = newAppResponse.getApplicationId()

// The app staging dir based on the STAGING_DIR configuration if configured
// otherwise based on the users home directory.
// scalastyle:off FileSystemGet
val appStagingBaseDir = sparkConf.get(STAGING_DIR)
.map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
// scalastyle:on FileSystemGet

new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
} catch {
case e: Throwable =>
if (stagingDirPath != null) {
cleanupStagingDir()
}
throw e
}
}

6. 至此,本地jvm进程就结束了,剩下的yarn会启动driver进程

其他补充

SparkApplication

1
2
3
4
5
6
7
8
/**
* Entry point for a Spark application. Implementations must provide a no-argument constructor.
*/
private[spark] trait SparkApplication {

def start(args: Array[String], conf: SparkConf): Unit

}

SparkApplication是一个接口,只有一个方法,start,就是启动。通过该接口的实现,实现了不同资源的启动。具体有5个实现。

  • TestSparkApplication 测试类
  • ClientApp 以client模式启动,即driver启动在本地。
  • JavaMainApplication 直接用java main启动,这个暂时不是很清楚
  • RestSubmissionClientApp 通过rest接口启动
  • YarnClusterApplication 对接yarn资源启动,即driver启动在yarn容器上

提交的yarn的application长什么样?执行的是什么程序命令?

提交到yarn的application执行的命令在appContext.proto.amContainerSpec.command里。

其中,

  • appContext为ApplicationSubmissionContext类,具体实现为ApplicationSubmissionContextPBImpl
  • proto 为 ApplicationSubmissionContextProto
  • amContainerSpec 为 ContainerLaunchContextProto
  • command 为 LazyStringList

最终的命令如下所示:

可以看到,最终启动的java org.apache.spark.deploy.yarn.ApplicationMaster 这个类,同时将wordCount的启动命令 -- class org.apache.spark.examples.JavaWordCount --jar....等参数传递进去,等待进一步处理。

那么,appContext.proto.amContainerSpec.command是何时构建的呢?

基础流程是YarnClusterApplication->Client->ContainerLaunchContext->ApplicationSubmissionContext 这样传递的。

回到上文第四步,4. YarnClusterApplication.start方法new了Client,并继续调用run方法,在该方法中,YarnClusterApplication将args传递给了Client对象。

再回到上文第5步,5-clientrun调用submitapplication方法,在该方法的submitapplication,调用了createContainerLaunchContext方法构建ContainerLaunchContext,参数就是在这时候传递的。

1
2
3
4
5
6
7
8
9
10
11
def submitApplication(): Unit = {
ResourceRequestHelper.validateResources(sparkConf)
......
try {
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)
......
}
.....
}

再来看createContainerLaunchContext的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def createContainerLaunchContext(): ContainerLaunchContext = {
......
// 将 --class参数提取成 userClass常量
val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar =
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
......
// 将 userClass 和其他参数合并成am的参数汇总
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
Seq("--dist-cache-conf",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))

//根据amArgs参数拼装最后执行的命令
// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
//将命令传递给amContainer并返回。
amContainer.setCommands(printableCommands.asJava)
......
}

最后,通过

1
val appContext = createApplicationSubmissionContext(newApp, containerContext)

将container传递给ApplicationSubmissionContext

在通过

1
yarnClient.submitApplication(appContext)

提交app,这样,app就知道了启动后应该运行什么。

application是如何获取到代码的?

spark-submit的时候,jvm是本地的,jar就在本地,因此可以执行,然后提交后在yarn上运行是异地的,yarn上的application是如何拿到代码的呢?

答:
jar路径需要全局可访问,hdfs就是一个比较好的选择。

总结

SparkSubmit类,如它的名字,提交spark任务,经过SparkSubmit类,最终将之前编写好的spark程序(如wordCount)提交到了运行环境上(yarn或spark集群或k8s等),后续就到了真正的运行流程了。

引言

在上两篇

终于进入了yarn端

最后拿到的一个命令如下

1
2
3
4
5
6
7
/bin/bash -c /opt/jdk/bin/java -server
-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Xmx700m -Djava.io.tmpdir=/data/storage/hadoop/tmp/hadoop-hdfs/nm-local-dir/usercache/hdfs/appcache/application_1675490517176_0054/container_1675490517176_0054_01_000001/tmp -Dspark.yarn.app.container.log.dir=/opt/soft/hadoop-3.3.1/logs/userlogs/application_1675490517176_0054/container_1675490517176_0054_01_000001
org.apache.spark.deploy.yarn.ApplicationMaster
--class 'org.apache.spark.examples.JavaWordCount' --jar file:/opt/soft/spark_3.4/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar
--arg 'file:/opt/soft/spark/examples/src/main/resources/kv1.txt'
--properties-file /data/storage/hadoop/tmp/hadoop-hdfs/nm-local-dir/usercache/hdfs/appcache/application_1675490517176_0054/container_1675490517176_0054_01_000001/__spark_conf__/__spark_conf__.properties --dist-cache-conf /data/storage/hadoop/tmp/hadoop-hdfs/nm-local-dir/usercache/hdfs/appcache/application_1675490517176_0054/container_1675490517176_0054_01_000001/__spark_conf__/__spark_dist_cache__.properties 1> /opt/soft/hadoop-3.3.1/logs/userlogs/application_1675490517176_0054/container_1675490517176_0054_01_000001/stdout 2> /opt/soft/hadoop-3.3.1/logs/userlogs/application_1675490517176_0054/container_1675490517176_0054_01_000001/stderr

以上便是我们的入口了,本文开始正式了解spark的运行流程。

spark的启动流程

整体概览

先来看一下运行的整体流程,记住一下流程,这就是我们看代码的导航地图。

spark的启动流程分为以下几个步骤

  1. SparkContext 向资源管理器注册并向资源管理器申请运行 Executor
  2. 资源管理器分配 Executor,然后资源管理器启动 Executor
  3. Executor 发送心跳至资源管理器
  4. SparkContext 构建 DAG 有向无环图
  5. 将 DAG 分解成 Stage(TaskSet)
  6. 把 Stage 发送给 TaskScheduler
  7. Executor 向 SparkContext 申请 Task
  8. TaskScheduler 将 Task 发送给 Executor 运行
  9. 同时 SparkContext 将应用程序代码发放给 Executor
  10. Task 在 Executor 上运行,运行完毕释放所有资源

Spark Application Master的启动流程

启动流程概览

driver 端debug命令

1
/opt/soft/spark/bin/spark-submit --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,address=5105,server=y,suspend=y" --master yarn --deploy-mode cluster  --num-executors 1 --executor-memory 800M --driver-memory 700M --executor-cores 2  --class org.apache.spark.examples.JavaWordCount  /opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar file:/opt/soft/spark/examples/src/main/resources/kv1.txt

启动流程详细过程

1. yarn container 里命令启动jvm 运行ApplicationMaster.main

在上文spark源码阅读-spark启动流程 2 spark-submit 代码端执行情况一文中,最终在yarn端container启动的主要命令为:

1
2
3
4
bash -c /opt/jdk/bin/java -server
org.apache.spark.deploy.yarn.ApplicationMaster
--class 'org.apache.spark.examples.JavaWordCount'
--jar file:/opt/soft/spark_3.4/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar

所以入口还是shell命令,该shell 命令为container内创建,使用java启动了一个jvm,jvm的入口为org.apache.spark.deploy.yarn.ApplicationMaster 这个类的main。

此时查看spark相关的进程,主要有两个。

进程号为27778的进程为yarn container启动的shell,该命令使用bash -c 启动了一个子进程,子进程运行真正的程序,即进程号为27788的程序。

查看27778的父进程,可以看到进程号为27776的进程,该进程为default_container_executor.sh

即我们像yarn提交的application 程序,yarn在其中一个container中已shell的命令进行运行。

2. main里 new ApplicationMaster类并调用run方法

ApplicationMaster的main方法比较简单,主要就是new了一个ApplicationMaster类,并调用run方法

1
2
3
4
5
6
7
8
9
def main(args: Array[String]): Unit = {
......
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
......
ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = System.exit(master.run())
})
}

3. ApplicationMaster的运行

run

调用run方法,该函数为applicationMaster运行的主流程。其代码比较长,但去除一些配置,上下文装配外,其主流程比较简单,根据模式,决定是runDriver还是runExecutorLauncher,本人的路径为runDriver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final def run(): Int = {
try {
......
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
} catch {
......
} finally {
......
}
exitCode
}

4. runDriver

runDriver的源码如下,启动了一个线程运行我们的spark主程序,即提交的JavaWordCount的main类。

之后等待userClassThread初始化完成spark context,并设置相关的环境变量,完成后,

  • 通过registerAm注册applicationMaster
  • 通过createAllocator启动executor
  • 通过userClassThread.join()等待userClassThread运行完成,之后退出程序。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    private def runDriver(): Unit = {
    ......
    //启动用户的application线程,即JavaWordCount的main类
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    ......
    try {
    val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
    Duration(totalWaitTime, TimeUnit.MILLISECONDS))
    if (sc != null) {
    ......
    registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
    ......
    createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
    } else {
    ......
    }
    resumeDriver()
    userClassThread.join()
    } catch {
    ......
    } finally {
    resumeDriver()
    }
    }

5. startUserApplication

startUserApplication 的源码如下,比较简单,就创建了一个线程并启动,该线程通过反射执行我们传入的class,即JavaWordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private def startUserApplication(): Thread = {
......
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])

val userThread = new Thread {
override def run(): Unit = {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
.....
} finally {
......
}
}
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}

6.userThtread: JavaWordCount.main

初始化的核心就是这段创建SparkSession,在getOrCreate方法中会创建sparkContext

1
2
3
4
SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();

7. sparkSession.getOrCreate

当SparkContext未初始化时,通过SparkContext.getOrCreate获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  def getOrCreate(): SparkSession = synchronized {
......
// Global synchronization so we will only set the default session once.
SparkSession.synchronized {
......
val sparkContext = userSuppliedContext.getOrElse {
......
SparkContext.getOrCreate(sparkConf)
// Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
}
......
}

return session
}
}

8. SparkContext.getOrCreate

该方法比较简单,主要用于获取或实例化 SparkContext ,并将其注册为单例对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def getOrCreate(config: SparkConf): SparkContext = {
// Synchronize to ensure that multiple create requests don't trigger an exception
// from assertNoOtherContextIsRunning within setActiveContext
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config))
} else {
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
}
}
activeContext.get()
}
}

9.new SparkContext(config)

该方法其实为SparkContext的初始化,SparkContext的属性较多,我们只关注启动流程上关注的信息,即设调用了YarnClusterScheduler.postStartHook()

在SparContext.class内部,有一大段代码块,该段代码块会被编译进构造器中,也就是在new class时候会运行,整个方法见识创建了一个YarnClusterScheduler,然后在启动完成后调用postStartHook方法。

1
2
3
4
5
6
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
// Post init
_taskScheduler.postStartHook()

10 为什么是YarnClusterScheduler.postStartHook()?

还记得上文第四部4-rundriver方法中,启动完userThread后,main线程会等待SparkContext初始化完成,那么如何判断的呢,是通过多线程间的promise和futhur获取的。

1
2
3
4
5
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
......
}

而sparkContextPromise的对象放入是通过什么方法呢,是sparkContextInitialized

1
2
3
4
5
6
7
8
private def sparkContextInitialized(sc: SparkContext) = {
sparkContextPromise.synchronized {
// Notify runDriver function that SparkContext is available
sparkContextPromise.success(sc)
// Pause the user class thread in order to make proper initialization in runDriver function.
sparkContextPromise.wait()
}
}

而sparkContextInitialized又被ApplicationMaster.sparkContextInitialized这个静态方法调用。

1
2
3
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
master.sparkContextInitialized(sc)
}

而ApplicationMaster.sparkContextInitialized又被YarnClusterScheduler类中的postStartHook方法调用,YarnClusterScheduler继承自YarnScheule基础自TaskScheuleImpl,本质上就是任务调度暴露了hook接口,方便后续使用。

1
2
3
4
5
override def postStartHook(): Unit = {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}

所有,整体流程就是userThread在完成SparkContext的初始化后,告知schedule启动完成,schedule将告知ApplicationMaster启动完成,ApplicationMaster接受后,继续之前阻塞的程序。

11.registerAM

这一步就是注册applicationMaster,其实也是driver了,很简单,直接调用client.register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def registerAM(
host: String,
port: Int,
_sparkConf: SparkConf,
uiAddress: Option[String],
appAttempt: ApplicationAttemptId): Unit = {
val appId = appAttempt.getApplicationId().toString()
val attemptId = appAttempt.getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
registered = true
}

YarnClient.register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def register(
driverHost: String,
driverPort: Int,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String): Unit = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress

val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}

logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
registered = true
}
}

这一段都是和yarn相关的,就不展开了

12 createAllocator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private def createAllocator(
driverRef: RpcEndpointRef,
_sparkConf: SparkConf,
rpcEnv: RpcEnv,
appAttemptId: ApplicationAttemptId,
distCacheConf: SparkConf): Unit = {
.....
val appId = appAttemptId.getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val localResources = prepareLocalResources(distCacheConf)

// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}

allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)

// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER, sparkConf)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
// do not register static sources in this case as per SPARK-25277
ms.start(false)
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}

executor也启动完毕,这里也就不展开了。

发现

最近偶然间,发现一个以前spark sql不知道的写法,窗口函数的窗口是可以起别名的。

本文就作为笔记罗列下窗口别名的写法。

哪里看?

要了解最官方的语法及解释,自然是看官方的解释了。
spark的官方为 Window Functions
hive的官方为LanguageManual WindowingAndAnalytics

本文以spark sql语法为准

窗口别名的基本语法

1
2
3
4
5
6
7
8
select 
window_function [ nulls_option ] OVER window_name

from xxx

window window_name as ( [ { PARTITION | DISTRIBUTE } BY partition_col_name = partition_col_val ( [ , ... ] ) ]
{ ORDER | SORT } BY expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ]
[ window_frame ] )

举例来说

1
2
3
4
5
6
7
8
9
10
with tmp as (
select '张三' as name,100 as score
union all
select '张三' as name,80 as score
)
select *
,lead(score) over w as lead_score
from tmp
window w as (partition by name order by score)
order by score

此时结果为

name score lead_score
张三 80 100
张三 100 null

有什么作用?

当我们同一个窗口,要取的字段比较多时,窗口的partition by xx order by xxx就会重复编写,此时,如要要修改窗口,那么就会要改很多地方,而使用别名,则只需修改一个地方,同时,代码也会简洁很多。

还是举个例子,比如有如下的订单表

id 用户名 商品 支付时间 购买数量 支付金额
1 张三 奶粉 2023-01-01 13:53:52 2 120
2 张三 尿不湿 2023-01-02 14:14:14 3 200
需求是加上上一次同一个用户购买的商品名称,支付时间,购买金额用于分析比较。

那么,正常的一个sql写法为

1
2
3
4
5
6
7
8
9
10
11
with item_order as (
select 1 as id,'张三' as name,'奶粉' as item_name,'2023-01-01 13:53:52' as pay_time,2 as item_cnt,120 as pay_amt
union all
select 2 as id,'张三' as name,'尿不湿' as item_name,'2023-01-02 14:14:14' as pay_time,3 as item_cnt,200 as pay_amt
)
select *
,lag(item_name) over (partition by name order by pay_time) as lag_item_name
,lag(pay_time) over (partition by name order by pay_time) as lag_pay_time
,lag(pay_amt) over (partition by name order by pay_time) as lag_pay_amt
from item_order
order by name,pay_time

查看上述sql,会发现写了三次 over (partition by name order by pay_time)

而采用窗口别名复用的方法,sql可以变为如下

1
2
3
4
5
6
7
8
9
10
11
12
13
with item_order as (
select 1 as id,'张三' as name,'奶粉' as item_name,'2023-01-01 13:53:52' as pay_time,2 as item_cnt,120 as pay_amt
union all
select 2 as id,'张三' as name,'尿不湿' as item_name,'2023-01-02 14:14:14' as pay_time,3 as item_cnt,200 as pay_amt
)
select *
,lag(item_name) over name_window as lag_item_name
,lag(pay_time) over name_window as lag_pay_time
,lag(pay_amt) over name_window as lag_pay_amt
from item_order
window name_window as (partition by name order by pay_time)
order by name,pay_time

可以看到对窗口name_window进行了复用,此时只需修改 name_window 一个地方就可以了。

参考资料

数据仓库定义

数据仓库,按照传统的定义,数据仓库是一个面向主题的(Subject Oriented)、集成的(Integrate)、稳定的(Non -Volatile)、反映历史变化(Time Variant),用来支持管理人员决策的数据集合。

举例来说,运营用户进行营销活动,分析最后的一个营销效果,一个基本的流程是,将业务数据同步到数据仓库,数据仓库每天将加工好的数据存入数仓或其他olap引擎,业务上再使用这部分数据进行营销效果分析等操作。

在上述例子中,营销反应了面向主题,数据同步反应了集成的,每日加工反应了历史变化,存储或其他olap引擎,反应了持久的这一特征。

之所以诞生专门的数据仓库,一方面是数据有集成的这一需求,另一方面则是传统的数据库oltp的系统设计无法支持大数据量的历史数据查询,因此olap等系统应运而生。

面向主题

操作型数据库的数据组织面向事务处理任务,各个业务系统之间各自分离,而数据仓库中的数据是按照一定的主题域进行组织。

主题是一个抽象的概念,是数据归类的标准,是指用户使用数据仓库进行决策时所关心的重点方面,一个主题通常与多个操作型信息系统相关。每一个主题基本对应一个宏观的分析领域。

例如,我们公司数据仓库的主题:用户

用户数据来源:从pc端登录、移动端登录、微信小程序端登录等几个不同端的业务系统数据库中抽取的数据整理而成。这些用户信息有可能是一致的,也可能是不一致的,这些信息需要统一整合才能完整体现用户。

集成

面向事务处理的操作型数据库通常与某些特定的应用相关,数据库之间相互独立,并且往往是异构的。而数据仓库中的数据是在对原有分散的数据库数据抽取、清理的基础上经过系统加工、汇总和整理得到的,必须消除源数据中的不一致性,以保证数据仓库内的信息是关于整个企业的一致的全局信息。

具体如下:

  1. 数据进入数据仓库后、使用之前,必须经过加工与集成。
  2. 对不同的数据源进行统一数据结构和编码。统一原始数据中的不一致之处,如字段的同名异义,异名同义,单位不统一,字长不一致等。
  3. 将原始数据结构做一个从面向应用到面向主题的大转变。

持久的

操作型数据库中的数据通常实时更新,数据根据需要及时发生变化。数据仓库的数据主要供企业决策分析之用,所涉及的数据操作主要是数据查询,一旦某个数据进入数据仓库以后,一般情况下将被长期保留,也就是数据仓库中一般有大量的查询操作,但修改和删除操作很少,通常只需要定期的加载、刷新。

数据仓库中包括了大量的历史数据。

数据经集成进入数据仓库后是极少或根本不更新的。Hadoop的hdfs设计理念也是如此,不建议手动删改,完美契合。

反映历史变化

操作型数据库主要关心当前某一个时间段内的数据,而数据仓库中的数据通常包含历史信息,系统记录了企业从过去某一时点(如开始应用数据仓库的时点)到目前的各个阶段的信息,通过这些信息,可以对企业的发展历程和未来趋势做出定量分析和预测。

企业数据仓库的建设,是以现有企业业务系统和大量业务数据的积累为基础。数据仓库不是静态的概念,只有把信息及时交给需要这些信息的使用者,供他们做出改善其业务经营的决策,信息才能发挥作用,信息才有意义。而把信息加以整理归纳和重组,并及时提供给相应的管理决策人员,是数据仓库的根本任务。因此,从产业界的角度看,数据仓库建设是一个工程,是一个过程。

数据仓库内的数据时限一般在5-10年以上,甚至永不删除,这些数据的键码都包含时间项,标明数据的历史时期,方便做时间趋势分析,或者溯源。

当下在我们公司,充值订单类数据是永久保存的,登录数据保存五年,埋点数据一年左右,因为过了时效性也就没有可分析价值了。

总结讲,数据仓库,即为企业数据的模型沉淀,为了能更快的发展大数据应用,提供可靠的模型来快速迭代。

数据仓库主要是用来解决什么问题?

诞生的背景

面对当今竞争日趋激烈与瞬息万变的市场,各级管理人员迫切需要根据企业的现状和历史数据做出判断和决策。

因此,各级管理人员希望能够从企业信息系统中获取有效的、一致的决策支持信息,及时准确地把握市场变化的脉博,做出正确有效的判断和抉择。

也就是说,数据处理的重点应该从传统的业务处理扩展到在线分析处理,并从中得到面向各种主题的统计信息和决策支持信息。

随着企业事务处理系统的运行和建立,数据量越来越大,企业数据源越来越多。这种需求就比以往任何时候都更加迫切,也更加难于实现。

我们以简单的商城为例,一开始,搭建商城,数据处理的重点为业务数据,运行一段时间了,想要知道每天交易额多少,数据量少的时候直接sum订单表。之后,随着业务越来越好,数据量越来越大,sum订单表越来越慢,越来越无法支撑汇总数据,知道之后业务爆发,分库分表后,从数据库拿到gmv几乎已经成了一件不可能的事情。

数据仓库技术就是针对上述问题而产生的一种技术解决方案,它是基于大规模数据库的决策支持系统环境的核心。

解决了什么问题?

大的方面,将数据处理的重点扩展到在线分析处理,解决了获取全面决策信息难得问题,降低了获取信息的代价,提高了准确率,为信息处理提供了理想的基础。

具体的

  • 集成的问题
  • 持久的问题
  • 反应历史变化的问题
  • 面向主题的问题

现象

最近在远程debug spark程序方便自己理解源码时,在一处启动了多线程的地方,死活进不去另一个线程的debug断点,但是程序确认是执行了的,代码如下:

在第二个和第三个断点处无法进入。

上网搜索后,原来是idea 的默认debug模式问题,默认是不开启多线程的.

修复

1.右键debug断点或点击左下角打开查看断点页面

2.在debug修改选项中,将suspend选择为thread,同时为了以后方便,设置为make default。

idea debug的suspend的all 和thread区别

简单来说,all就是,所有的线程来了,哪个断点先运行到就进行中断并debug,此时就阻塞了一个线程,那么剩下的线程就不会被阻塞,会直接运行,不会进入debug。

thread则是线程级别阻塞,每个线程独立阻塞进入debug,需要手动让每个线程都运行。

总结

这个小坑,用了这么久idea确实不知道,看来自己多线程的开发还是太少了。

默认情况下,打debug肯定是期望进入的,所以默认thread适用于大部分场景。

不知道idea处于什么考虑默认设置为all,大概率是本人尚未领悟该场景