0%

目的

此页面用于收集平时看到的数仓相关的知识文章

概念

数据仓库、数据湖、数据中台一文读懂

数仓规范

数仓规范详解

数仓建设

主题域文档

指标建设

画像建设

数据建模

数据治理

数据湖

数据中台

其他

为什么说第三代指标平台的本质是做“轻”数仓
为什么又造了个新词 Data Warebase:我看到了 AI 时代数据平台应当的样子

前言

在职场中,PPT 往往是向上沟通的刚需,作为码农,这部分能力非常欠缺,长期焦虑,一直期望有一款ai产品能满足我的需求。
近期终于找到这个宝藏,他就是 Gemini 的 Canvas 功能,在 PPT 生成领域已经从“概念”进化到了“生产力阶段”。
记录实测下来的全自动闭环工作流,希望能帮大家从繁琐的排版中解脱出来。

实战流程

1. 结构化输入:从文档到大纲

AI 生成 PPT 的质量,80% 取决于你投喂的素材。

  • 操作要领: 上传你的业务文档或原始数据,要求 Gemini 按照“逻辑严密、层次清晰”的原则生成大纲。
  • 专业指令: “请深度解析这份报告,提取核心结论与风险控制建议,并将其转化为一份 12 页的 PPT 逻辑架构。”

2. 激活 Canvas 创作模式

当大纲确认无误后,直接启动 Gemini 的内置 Slides 渲染功能。

  1. 点击toolbar,点击 Canvas 模式
    alt text
  2. 复制刚才生成的ppt大概,输入,并要求gemini生成ppt
    alt text
  3. 如果生成了html,则输入生成slides
  4. 最后,就会生成ppt了,点击右上角导出幻灯片,即可。
    alt text

3. 跨平台流转:云端存储与找回

生成后的文件无需手动保存,它会自动同步。

  • 关键路径: 点击“Export to Slides”后,文件会自动进入你的 Google Drive 根目录。如果生成过程中网页报错,别急着重来,去云硬盘里翻翻,通常惊喜(文件)已经在那儿了。
  • 示意图:
    alt text

4. 本地化导出与兼容性处理

最后一步,回归我们熟悉的 PowerPoint 格式:File -> Download -> .pptx

  • 示意图: Google Slides 导出菜单操作路径截图

核心避坑技巧(全是实操泪水)

1. 下载失败的终极解决方案

下载后如果发现无法下载,通常是由于云端字体在本地缺失。

  • 专家心得: 我在实践中发现,直接让 Gemini “全稿采用微软雅黑(Microsoft YaHei)” 是最省心的方案。

2. 定制视觉风格:让 PPT 更有“高级感”

不需要自己调色,把审美需求写进 Prompt 里:

  • 配色方案: 严格提取香槟金 (#E5D8BC) 作为点缀色,背景采用纯白,配合深炭黑文字,打造极简商务风。
  • 视觉元素: 为核心指标和风险预警点配置对应的线性 SVG 图标,提升页面精致度。

3. 多版本迭代逻辑

如果某页排版不尽如人意,可以利用 Gemini 的生成多样性,要求它针对某一特定页面生成多个 Layout(布局)版本,你只需做那个“最后拍板”的决策者。

4. 个别的ppt rpompt

1
2
3
1、色彩方案:严格提取了您提供的图片中的香槟/米金色 (#E5D8BC) 作为主基调,搭配深炭黑提升商务稳重感,背景采用纯白以确保投影时的极致清晰度。
2、字体方案:全稿采用微软雅黑,在各种显示设备上都能保持极佳的可读性。
3、定制图标:每一页的关键能力和风险点都配置了定制化的 SVG 线性图标,提升了视觉精致度。

结语

AI时代,善用工具,从此告别ppt焦虑

关于主题域划分的疑问

主题间应该尽量不交叉使用,但是在轻度汇总层以上,非常容易交叉。如电商,会有用户主题、交易主题,那么,用户交易应该属于哪个?

结论

个人观点,主题域之间的交叉使用无法避免,对于用户交易这样粒度的汇总表,放到以实体为主的用户主题中,不在交易主题中体现,交易主题中体现的是最细粒度的轻度汇总。

即:
1、事实明细层、维度层按照业务过程、实体概念等划分,如用户基础信息,交易表,活动表。
2、轻度汇总,及多个维度的中高度汇总,按照业务过程划分,如用户商品汇总表。
3、高度到单一实体的汇总,按照实体概念汇总,如用户活动表。
4、ads层则按照数据集市概念进行划分,如xx专题分析、财务部等。

什么是主题划分

inmon的主题

inmon中的主题,是面向企业数据模型中的企业主题域来划分的,其本质就是高层模型中的实体,用于后续划分表归属的。

在inmon的《数据仓库》一书中,对于主题的定义是这样的。

传统的操作型系统是围绕公司的功能性应用进行组织的。对一个保险公司来说,应用问题可能是汽车保险、健康保险、人寿保险与意外伤亡保险。公司的主要主题域可能是顾客、保险单、保险费与索赔。
数据仓库面向在高层的企业数据模型中已定义好的企业主题域。典型的主题域有:顾客、产品、交易或活动、政策、索赔、账目。
在数据仓库中,每一个主要主题域都是以一组相关的表来具体实现的。一个主题域可能由10个,100个或更多的相互联系的物理表组层。例如,一个顾客主题域可能由顾客基本信息表,顾客活动表组层。

而所谓的高层模型,指的是inmon的建模三步骤:数据建模分为三个层次:高层模型(称为实体关系图,或ERD),中间层建模(称为数据项集或DIS)、底层建模(称为物理模型)。

kimball 中的主题

kimball在《数据仓库工具箱》一书中,并没有提及主题这一概念。

为什么要主题划分

chatGpt中的回答是这样的

主题划分在数据仓库设计和建设过程中是非常重要的,主要基于以下几个原因:
1.提高查询效率和性能:通过将数据按照主题进行划分,可以将相关的数据存储在一起,减少查询时需要扫描的数据量,从而提高查询效率和性能。
2.简化数据管理和维护:主题划分可以帮助数据仓库管理员更轻松地管理和维护数据。每个主题都有明确的边界和责任人,使得数据管理和维护变得更加清晰和可控。
3.提高数据质量和一致性:主题划分有助于提高数据的质量和一致性。相同主题下的数据通常遵循相同的数据标准和规范,可以更容易地进行数据清洗、整合和转换,从而提高数据的一致性和准确性。
4.支持业务分析和决策:主题划分使得数据更加符合业务逻辑和业务需求,有助于支持业务分析和决策。通过针对不同主题进行分析,可以更好地理解业务的运作情况,发现潜在的业务机会和问题。
5.降低数据耦合性:通过将数据按照主题进行划分,可以降低数据之间的耦合性,使得数据更加灵活和可扩展。不同主题之间的数据可以相对独立地进行处理和管理,有利于系统的扩展和维护。

个人理解
1.主题是对所有数据业务的抽象归类,类似字典,便于业务理解使用,可以更好的支持业务分析和决策。
2. 按主题划分后,开发人员更好维护数据,避免数据的重复开发,提高一致性。

怎么做主题划分

1.按照业务系统划分
因为大部分企业都已经经历过了信息化建设或者正处于信息化建设当中,企业各种业务系统都已经部署完成,财务部门有财务系统、销售部门有销售系统、生产部门有生产系统、供应链部门有供应链系统……

这些不同的业务系统,因为只会储存对应业务流程中产生的数据,下级数据主题都互相紧贴,是天然的主题域,业务系统有几种,就可以划分为几种主题域。

如 生产主题域、财务主题域、人力主题域

2.按照需求划分
很多时候,企业需要长期对某个方向进行分析,因为这个长期分析的过程涉及到各种主题,会对数据进行细分、归纳,在这个过程中,就由需求诞生了主题域。

就拿销售分析来说,这个分析过程会涉及到的对象有产品、仓库、经销商、顾客等,其中每一个分析对象就是一个数仓主题,而包含归纳这些主题的销售分析就成为了一个相应的主题域。

3.按照功能划分
在现代社会,软件是每个加入互联网的网民都会使用到的东西,这些由企业开发的软件拥有着不同的功能模块,比如说社交软件中就会有聊天、朋友圈、群聊、发送文件等功能。

从这些功能中选一个模块,聊天模块会涉及到数据仓库中的用户主题、图片主题、文字主题等,所以聊天模块也能被归纳为聊天主题域。

4.按照部门划分
现代企业都有着不同的业务部门,这些部门也会形成各种不同的主题域,比如说销售域、生产域、财务域等,而这些主题域也是由不同的数据主题组成的。

与分层有啥关系?

数仓分层是从存储,从管理角度对数据进行组织,是纵向空间上的划分。
主题域划分是从使用,从业务的角度对数据进行组织,是横向业务上的划分。

1、ods层,不参与划分
2、事实明细层、维度层按照业务过程、实体概念等划分,如用户基础信息,交易表,活动表。
3、轻度汇总,及多个维度的中高度汇总,按照业务过程划分,如用户商品汇总表。
4、高度到单一实体的汇总,按照实体概念汇总,如用户活动表。
5、ads层则按照数据集市概念进行划分,如xx专题分析、财务部等。

参考

问题

连续登录问题,是实际分析需求和面试中常见的问题。

其实,连续登录问题常规来说有两种场景,其难度完全不一样。

第一种是指定连续登录天数,求满足的用户
第二种则是求用户连续登录了多少天。

本文只是简单探讨。

语法

本文使用的spark sql的语法,如有差异,自行替换。

表结构&数据

明细就直接忽略了,直接以用户登录日期粒度的表结构为主,实际需求中可以自行聚合到这个粒度。
表结构&数据

user_id login_date
1 ‘2023-01-01’
1 ‘2023-01-02’
1 ‘2023-01-03’
1 ‘2023-01-04’
1 ‘2023-01-11’
1 ‘2023-01-12’
1 ‘2023-01-15’
2 ‘2023-01-15’
3 ‘2023-01-01’
3 ‘2023-01-02’
3 ‘2023-01-03’
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
with user_log as (
select col1 as user_id, col2 as login_date
from values
(1, '2023-01-01')
,(1, '2023-01-02')
,(1, '2023-01-03')
,(1, '2023-01-04')
,(1, '2023-01-11')
,(1, '2023-01-12')
,(1, '2023-01-15')
,(2, '2023-01-15')
,(3, '2023-01-01')
,(3, '2023-01-02')
,(3, '2023-01-03')
)
select * from user_log

场景

场景1 给定具体日期,且指定连续登录天数

例如: 求在2023-01-03号那天连续登录3天的用户。

这种场景是最简单的,直接聚合+having

1
2
3
4
5
6
7
select user_id
,count(*) as cnt
from user_log
where login_date between date_sub('2023-01-03',2) and '2023-01-03'
group by user_id
having cnt>=3
order by user_id

结果:

user_id cnt
1 3
1 3

场景2,不给定具体日期,但指定连续登录天数

例如:求用户在哪几天连续登录2天。

这部分需要用到窗口函数了,同时由于日期间隔为1,可以使用range特性

1
2
3
4
select *
, count(*) over (partition by user_id order by datediff(login_date, '2000-01-01') range between 1 preceding and current row ) as continue_cnt
from user_log
order by user_id,login_date

结果如下

user_id login_date continue_cnt
1 2023-01-01 1
1 2023-01-02 2
1 2023-01-03 2
1 2023-01-04 2
1 2023-01-11 1
1 2023-01-12 2
1 2023-01-15 1
2 2023-01-15 1
3 2023-01-01 1
3 2023-01-02 2
3 2023-01-03 2

结果没有过滤,是为了方便理解,大家自行加上continue_cnt=2即可。

场景3,既不给定具体业务日期,也不指定连续登录天数。

这种场景是最难的,最耗性能的,但也是实用性最高的。

row_number 解法

说实话,这个解法本人没有想出来,是网上搜到的,使用row_number进行排序,利用日期差特性,如果是连续的,那么实际的差值日期是一样的特性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
,login_date_grp as (
select *
,date_sub(login_date,login_date_rn) as continue_date_grp --连续登录日期分组
from
(
select *
,row_number() over (partition by user_id order by login_date) as login_date_rn
from user_log
order by user_id,login_date
) a
)
select user_id
,login_date
,row_number() over(partition by user_id,continue_date_grp order by login_date) as continue_login_day_cnt --连续登录天数
,login_date_rn --用户登录日期排序
,continue_date_grp --用户连续登录日期分组
from login_date_grp
order by user_id
,login_date

结果如下

user_id login_date continue_login_days_cnt login_date_rn continue_date_grp
1 2023-01-01 1 1 2022-12-31
1 2023-01-02 2 2 2022-12-31
1 2023-01-03 3 3 2022-12-31
1 2023-01-04 4 4 2022-12-31
1 2023-01-11 1 5 2023-01-06
1 2023-01-12 2 6 2023-01-06
1 2023-01-15 1 7 2023-01-08
2 2023-01-15 1 1 2023-01-14
3 2023-01-01 1 1 2022-12-31
3 2023-01-02 2 2 2022-12-31
3 2023-01-03 3 3 2022-12-31

可以看到,结果是符合我们预期的,这时候只要外面再套上一层过滤条件即可。

看的稍微有点蒙?这里稍微解释一下,这里巧妙的利用了等差数组的特性。
比如有如下一个用户连续5天登录的数据,同时我们对同一用户按照日期进行row num排序,再求每一行和’2023-01-01’这一天的差值日期,然后再对这两列求差值,会看到如下结果

用户id 登录日期 日期排序 和’2022-12-31’的间隔天数 日期排序和’2023-01-01’的间隔天数的差值
1 2023-01-01 1 1 0
1 2023-01-02 2 2 0
1 2023-01-03 3 3 0
1 2023-01-04 4 4 0
1 2023-01-05 5 5 0

我们可以观察到,如果日期是连续的,那么本身和’2022-12-31’的间隔天数这一列就是+1的规则,而row num也是自然+1的,差值永远是1

此时,如果删除2023-01-03这一天的登录记录,那么结果就会变为

用户id 登录日期 日期排序 和’2022-12-31’的间隔天数 日期排序和’2022-12-31’的间隔天数的差值
1 2023-01-01 1 1 0
1 2023-01-02 2 2 0
1 2023-01-04 3 4 1
1 2023-01-05 4 5 1
可以看到,随着2023-01-03这条记录被删除,在2023-01-04这一天,间隔从0变为了1,而2023-01-05这天还是连续登录的,间隔仍然是1.

此时就能发现规律了,如果是连续登录的,那么间隔差值就不会变,这里其实可以用数学公式证明,这里就补展开了。

所以,为什么代码里用的是date_sub作为分组?其实我们要求的是差值,而日期本身就是连续的数字,date_sub(日期,rn) 是不是就是可以看成差值。 还节约了计算资源。

这题的解法,本人认为还是有很多精巧设计在里面的,一般确实很难想到。

使用日期维表获取上一次未登陆日期

这个是本人想到的,可以利用日期维表,补全没有登录的天数,然后利用窗口函数求出上一次未登陆日期,再利用datediff求出差值即连续登录天数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
dim_date as (
select t1.*
,date_add('2022-12-31',row_num) as dt_ymd
from
(
select row_number() over(order by m) as row_num
from
(select split(repeat(",",15),",") as multi) a
LATERAL VIEW explode(multi) tmpTable as m
) t1
)
,user_full_date as (
select t1.user_id
,t2.dt_ymd
from
(select distinct user_id from user_log) t1
cross join dim_date t2
)
,last_unlogin as (
select a.user_id
,a.dt_ymd
,b.user_id as user_id2
,b.login_date
,last_value(if(b.login_date is not null,null,a.dt_ymd),true) over(partition by a.user_id order by a.dt_ymd rows between unbounded preceding and current row ) as last_unlogin_date
from user_full_date a
left join user_log b
on a.dt_ymd=b.login_date
and a.user_id=b.user_id
)
select user_id
,login_date
,date_diff(login_date,nvl(last_unlogin_date,'2022-12-31')) as continue_days
,last_unlogin_date
from last_unlogin
where user_id2 is not null
order by user_id,dt_ymd


结果如下

user_id login_date continue_days last_unlogin_date
1 2023-01-01 1 null
1 2023-01-02 2 null
1 2023-01-03 3 null
1 2023-01-04 4 null
1 2023-01-11 1 2023-01-10
1 2023-01-12 2 2023-01-10
1 2023-01-15 1 2023-01-14
2 2023-01-15 1 2023-01-14
3 2023-01-01 1 null
3 2023-01-02 2 null
3 2023-01-03 3 null

可以看到结果符合预期。

总结

一直以为自己sql水平还是不错的,原来以前一直没怎么处理过连续登录问题,面试被问了还是懵逼了一下。该总结的还得总结一下。

总的来说,连续登录还是比较难的,尤其是考虑性能的情况下,使用row_number利用等差数据确实是一个很精妙的解法。

数据集市定义

数据集市是一个结构概念,它是企业级数据仓库的一个子集,主要面向部门级业务,并且只面向某个特定的主题。

有两种类型的数据集市——独立型和从属型

数据集市的应用

数据集市是数仓之上更聚焦的业务主题合集,更偏向于应对业务数据快速高效应用的需求,一般用于商业智能系统中探索式和交互式数据分析应用。

数据集市的一般架构

如何建设数据集市

  • 自上而下建设
  • 自下而上建设

数据集市和数据仓库的区别

类目 数据仓库 数据集市
数据来源 生产系统外部数据等 数据仓库或生产系统等外部数据
规模范围 企业级 部门或工作组级
主题 以企业为主题 以部门或特殊分析为主题
数据粒度 最细粒度 较粗的粒度

分层诞生的背景

我们可以想象这样一个场景。
第一天,小A要写一个任务,获取每日的成交金额,然后制作成一张报表给老板看,于是小A就蹭蹭蹭的写了一个脚本,从数据抽取,到sum的逻辑加工,然后使用报表查询。完美收工

第二天,小A又接到一个任务,这一次是运营那边需要每日的交易金额,展现的活动页面上,小A想,上次取过,直接用上次的表拿来就用,完美收工。

第三天,小A的第一个业务方说,我每天的成交金额不要浙江省的,小A想改那个脚本,但是运营也在用,犯了难,只能cv大法了。

第四天,新同事小B接到一个任务,要取上海市的每日成交金额,最近跌的厉害,小B看了下小A写表,没法用啊,于是蹭蹭蹭自己从抽取,到sum加工,到where过滤,到报表查询重新写了一遍。

看出什么问题了没,任务管理混乱,没有复用,重复开发,应用方需要花费大量的精力在底层细节上。

而分层便是基于任务功能做出的层级划分,分工开发,提高效率。

什么是分层

分层是以解耦为核心,基于任务功能做出的分工和层级划分。每一层分工计算,提高效率。

分层的好处

  • 清晰数据结构: 每一层都有其作用域,方便理解定位和使用
  • 统一数据口径:提供统一的数据出口。
  • 减少重复开发: 通过中间层,减少重复计算。
  • 把复杂问题简单化: 将一个复杂任务拆解为多个步骤,处理单一功能,方便理解、排查和恢复。
  • 屏蔽原始数据异常: 屏蔽业务影响,通过下层改动对上层无感知。
  • 业务应用方从细节解放

传统的分层

这里就不细说了,ods,cdm,ads

什么是数据模型?

数据模型就是数据的组织和存储方法,它强调从业务、数据存取和使用角度合理存储数据。

怎么理解?

  • 把数据看做图书馆的书,希望书分门别类方案,如何分门别类便是我们的模型
  • 把数据看做城市建筑,希望城市建筑布局合理,布局方式便是我们的模型
  • 把数据看做纸质资料,希望资料按类别、年份分不同文件夹放好,这也是模型

数据模型的好处

  • 性能:良好的数据模型能帮助我们快速查询所需的数据,减少IO
  • 成本:良好的数据模型能极大的减少不必要的数据冗余,也能实现计算结果的复用,减少存储和计算成本。
  • 效率:能改善用户使用数据的体验,提高使用效率。
  • 质量:减少数据统计口径的不一致性,减少计算错误的可能性。

有最好的模型吗

没有,只有最合适的模型,模型是在业务,存储和使用者三方的总和考虑,同时也是性能,成本,效率,质量之间的平衡考虑。

常见的数据模型有哪些?

ER模型

ER模型,实体关系模型(Entity-Relationship),将事务抽象为数据实体和关系。

er模型符合三范式。

谁用er模型

wiliam H。inmon 在building the data warehouse一书中使用

数据仓库3NF和OLTP系统中的3NF区别

数据仓库的3NF是站在企业角下面向主题的抽象,而不是针对某个具体的业务流程的实体对象关系的抽取。

ER模型的特点

  • 需要全面了解企业业务和数据
  • 实施周期非常长
  • 对建模人员要求高

ER如何建模的

出发点是整合数据,将各个系统中的数据以整个企业角度按主题进行相似性组合和合并,并进行一致性处理

建模步骤

  • 高层模型:(称为实体关系图,或ERD)高度抽象,描述主题和主题的关系,描述总体概括
    主要由集成范围决定哪些实体属于模型范围而哪些不是。
    将不同的用户的观点整合。
    产出主要主题或实体。
  • 中层模型:(数据项集或DIS)细化主题的数据项
    主要数据分组
    二级数据分组
    连接器
    数据的类型
  • 物理模型:表的合并、分区等设计

举个例子

还是以最常见的商城为例
在业务系统中,我们的系统会分为:

  • trade 交易
  • pay 支付
  • crm crm系统
  • fin财务系统

而我们会根据公司重新组织,分成

  • 交易主题
  • 流量主题
  • 销售主题
  • 财务主题
  • 营销主题
  • 采购主题
  • 供应商主题
  • 门店主题

第一步是高层维度,即各个主题的er关系。
比如交易主题和销售、供应商、门店、采购、财务这些都有关系。
而采购和供应商有关系,和销售则没有关系
第二部 则是中层模型
比如在销售主题内,销售人员和出勤、订单、访问、电话等都有管理,形成不同的中层数据项
第三步 则是物理模型
最终落地如何存储,分表还是合并,是否分区等。

维度建模

维度建模是palph kimball所倡导的,在 the data warehouse toolkit一书中提及。

其基于分析决策需求须发构建模型,典型的代表为星形模型和雪花模型。

如何建模

  • 选择决策的业务过程
  • 选择粒度 维度的组合
  • 识别维度 维度属性,用于分析时进行的分组和筛选
  • 选择事实 确定衡量的指标

举个例子

分析过去一星期浙江的哪个商品卖的最好

选择业务过程

  • 选择决策的业务过程
    卖的最好,其排序是根据下单金额,也就是下单这一业务过程。
  • 选择粒度
    根据需求,过去一星期为业务时间,也就是下单时间,浙江为地域,理解成门店开在浙江的,商品卖的好,商品。
    因此粒度为:时间,门店-地域,商品
  • 识别维度 维度属性
    这里属性较少,极简情况下为日期、星期(第n周)、省份,商品id,商品名称
  • 选择事实
    下单金额。

参考

  • 大数据之路
  • building the datawarehouse
  • the data warehouse toolkit: the definitive guid to dimensional modeling

引言

在上一篇spark源码阅读-spark启动流程中,了解到了spark提交命令 spark-submit shell端的运行情况。

举例来说

1
2
3
4
5
6
7
8
9
10
/opt/soft/spark/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors 1 \
--executor-memory 800M \
--driver-memory 700M \
--executor-cores 2 \
--class org.apache.spark.examples.JavaWordCount \
/opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar \
file:/opt/soft/spark/examples/src/main/resources/kv1.txt

上述shell是像spark提交自带的wordcount程序,那么经过shell端解析,最终获取到的执行命令为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/opt/jdk/bin/java \
-cp /opt/soft/spark/conf/:/opt/soft/spark/jars/*:/opt/soft/hadoop-3.3.1/etc/hadoop/ \
-XX:+IgnoreUnrecognizedVMOptions \
--add-opens=java.base/java.lang=ALL-UNNAMED \
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
--add-opens=java.base/java.io=ALL-UNNAMED \
--add-opens=java.base/java.net=ALL-UNNAMED \
--add-opens=java.base/java.nio=ALL-UNNAMED \
--add-opens=java.base/java.util=ALL-UNNAMED \
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED \
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED \
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED \
--add-opens=java.base/sun.security.action=ALL-UNNAMED \
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED \
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED \
-Djdk.reflect.useDirectMethodHandle=false \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode cluster \
--conf spark.driver.memory=700M \
--class org.apache.spark.examples.JavaWordCount --num-executors 1 \
--executor-memory 800M \
--executor-cores 2 \
/opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar \
file:/opt/soft/spark/examples/src/main/resources/kv1.txt

可以看到,最后运行的是org.apache.spark.deploy.SparkSubmit这个类,并将一开始的传入的参数继续传入到这个类里面,本文就简单了解一下sparkSubmit这个类到底做了什么。

spark-submit的运行流程

整体概览


上图即spark-submit后运行的过程,该进程主要涉及以下几个类。

  • SparkSubmit Object java main入口
  • SparkSubit Class 启动 Spark 应用程序的主要网关。
    该程序处理并设置Spark相关的依赖项的类路径,并抽象了一层,以支持在不同集群管理器上运行和部署模式上。
  • SparkApplication spark应用程序的入口点,本人用的yarn cluster模式,因此是YarnClusterApplication
  • Client 客户端,用于和yarn交互。如创建app,或kill。

详细执行

如上图所示

1. 通过java命令,启动了一个jvm,该jvm类的入口类为org.apache.spark.deploy.SparkSubmit

2. 进入main,该方法比较简单,主要为new SparkSubmit,并调用doSumit方法

1
2
3
4
5
6
7
8
9
10
val submit = new SparkSubmit() {
self =>
override protected def parseArguments(args: Array[String]): SparkSubmitArguments
override protected def logInfo(msg: => String): Unit = printMessage(msg)
override protected def logWarning(msg: => String): Unit
override protected def logError(msg: => String): Unit
override def doSubmit(args: Array[String]): Unit
......
}
submit.doSubmit(args)

3. SparkSubmit类中,参数不断进行传递doSumit->submit->runMain

最后在runMain方法中通过反射创建了YarnClusterApplication的实例,并调用start方法

1
2
3
4
5
6
7
8
9
10
// doSubmit->submit, doSumit 调用 submit方法,走的是case SparkSubmitAction.SUBMIT
def doSubmit(args: Array[String]): Unit = {
......
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//submit->doRunMain->runMain submit方法中直接调用函数doRunMain,doRunMain判断是否需要代理用户,之后就调用runMain方法
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
......
} else {
runMain(args, uninitLog)
}
}

// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
logInfo("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
logWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args, false)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//最后,在runMain方法中,通过反射创建了YarnClusterApplication,并调用start方法。关于SparkApplication,参见下文解释。
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
......
//这里走的是第一个分支,通过反射创建
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
......
try {
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
} finally {
......
}
}

4. YarnClusterApplication.start方法new了Client,并继续调用run方法

其中Client类里初始化了yarn的各种连接
并且此时将待运行程序的参数org.apache.spark.examples.JavaWordCount等参数通过new ClientArguments(args)传递给了Client这个类,这些参数在后续构建yarn container时会进行使用。

1
2
3
4
5
6
7
8
9
override def start(args: Array[String], conf: SparkConf): Unit = {
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
conf.remove(JARS)
conf.remove(FILES)
conf.remove(ARCHIVES)

new Client(new ClientArguments(args), conf, null).run()
}

5. Client.run调用submitApplication方法

最后在submitApplication方法中,通过和yarn的交互,最后通过yarnClient.submitApplication(appContext)提交了application给yarn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//run调用 submitApplication
def run(): Unit = {
submitApplication()
......
}
//submitApplication方法提交yarn application
def submitApplication(): Unit = {
ResourceRequestHelper.validateResources(sparkConf)

try {
launcherBackend.connect()
yarnClient.init(hadoopConf)
yarnClient.start()

if (log.isDebugEnabled) {
logDebug("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
}

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
this.appId = newAppResponse.getApplicationId()

// The app staging dir based on the STAGING_DIR configuration if configured
// otherwise based on the users home directory.
// scalastyle:off FileSystemGet
val appStagingBaseDir = sparkConf.get(STAGING_DIR)
.map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
// scalastyle:on FileSystemGet

new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
} catch {
case e: Throwable =>
if (stagingDirPath != null) {
cleanupStagingDir()
}
throw e
}
}

6. 至此,本地jvm进程就结束了,剩下的yarn会启动driver进程

其他补充

SparkApplication

1
2
3
4
5
6
7
8
/**
* Entry point for a Spark application. Implementations must provide a no-argument constructor.
*/
private[spark] trait SparkApplication {

def start(args: Array[String], conf: SparkConf): Unit

}

SparkApplication是一个接口,只有一个方法,start,就是启动。通过该接口的实现,实现了不同资源的启动。具体有5个实现。

  • TestSparkApplication 测试类
  • ClientApp 以client模式启动,即driver启动在本地。
  • JavaMainApplication 直接用java main启动,这个暂时不是很清楚
  • RestSubmissionClientApp 通过rest接口启动
  • YarnClusterApplication 对接yarn资源启动,即driver启动在yarn容器上

提交的yarn的application长什么样?执行的是什么程序命令?

提交到yarn的application执行的命令在appContext.proto.amContainerSpec.command里。

其中,

  • appContext为ApplicationSubmissionContext类,具体实现为ApplicationSubmissionContextPBImpl
  • proto 为 ApplicationSubmissionContextProto
  • amContainerSpec 为 ContainerLaunchContextProto
  • command 为 LazyStringList

最终的命令如下所示:

可以看到,最终启动的java org.apache.spark.deploy.yarn.ApplicationMaster 这个类,同时将wordCount的启动命令 -- class org.apache.spark.examples.JavaWordCount --jar....等参数传递进去,等待进一步处理。

那么,appContext.proto.amContainerSpec.command是何时构建的呢?

基础流程是YarnClusterApplication->Client->ContainerLaunchContext->ApplicationSubmissionContext 这样传递的。

回到上文第四步,4. YarnClusterApplication.start方法new了Client,并继续调用run方法,在该方法中,YarnClusterApplication将args传递给了Client对象。

再回到上文第5步,5-clientrun调用submitapplication方法,在该方法的submitapplication,调用了createContainerLaunchContext方法构建ContainerLaunchContext,参数就是在这时候传递的。

1
2
3
4
5
6
7
8
9
10
11
def submitApplication(): Unit = {
ResourceRequestHelper.validateResources(sparkConf)
......
try {
// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext()
val appContext = createApplicationSubmissionContext(newApp, containerContext)
......
}
.....
}

再来看createContainerLaunchContext的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private def createContainerLaunchContext(): ContainerLaunchContext = {
......
// 将 --class参数提取成 userClass常量
val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
val userJar =
if (args.userJar != null) {
Seq("--jar", args.userJar)
} else {
Nil
}
......
// 将 userClass 和其他参数合并成am的参数汇总
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
Seq("--properties-file",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE)) ++
Seq("--dist-cache-conf",
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, DIST_CACHE_CONF_FILE))

//根据amArgs参数拼装最后执行的命令
// Command for the ApplicationMaster
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

// TODO: it would be nicer to just make sure there are no null commands here
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
//将命令传递给amContainer并返回。
amContainer.setCommands(printableCommands.asJava)
......
}

最后,通过

1
val appContext = createApplicationSubmissionContext(newApp, containerContext)

将container传递给ApplicationSubmissionContext

在通过

1
yarnClient.submitApplication(appContext)

提交app,这样,app就知道了启动后应该运行什么。

application是如何获取到代码的?

spark-submit的时候,jvm是本地的,jar就在本地,因此可以执行,然后提交后在yarn上运行是异地的,yarn上的application是如何拿到代码的呢?

答:
jar路径需要全局可访问,hdfs就是一个比较好的选择。

总结

SparkSubmit类,如它的名字,提交spark任务,经过SparkSubmit类,最终将之前编写好的spark程序(如wordCount)提交到了运行环境上(yarn或spark集群或k8s等),后续就到了真正的运行流程了。

引言

在上两篇

终于进入了yarn端

最后拿到的一个命令如下

1
2
3
4
5
6
7
/bin/bash -c /opt/jdk/bin/java -server
-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false -Xmx700m -Djava.io.tmpdir=/data/storage/hadoop/tmp/hadoop-hdfs/nm-local-dir/usercache/hdfs/appcache/application_1675490517176_0054/container_1675490517176_0054_01_000001/tmp -Dspark.yarn.app.container.log.dir=/opt/soft/hadoop-3.3.1/logs/userlogs/application_1675490517176_0054/container_1675490517176_0054_01_000001
org.apache.spark.deploy.yarn.ApplicationMaster
--class 'org.apache.spark.examples.JavaWordCount' --jar file:/opt/soft/spark_3.4/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar
--arg 'file:/opt/soft/spark/examples/src/main/resources/kv1.txt'
--properties-file /data/storage/hadoop/tmp/hadoop-hdfs/nm-local-dir/usercache/hdfs/appcache/application_1675490517176_0054/container_1675490517176_0054_01_000001/__spark_conf__/__spark_conf__.properties --dist-cache-conf /data/storage/hadoop/tmp/hadoop-hdfs/nm-local-dir/usercache/hdfs/appcache/application_1675490517176_0054/container_1675490517176_0054_01_000001/__spark_conf__/__spark_dist_cache__.properties 1> /opt/soft/hadoop-3.3.1/logs/userlogs/application_1675490517176_0054/container_1675490517176_0054_01_000001/stdout 2> /opt/soft/hadoop-3.3.1/logs/userlogs/application_1675490517176_0054/container_1675490517176_0054_01_000001/stderr

以上便是我们的入口了,本文开始正式了解spark的运行流程。

spark的启动流程

整体概览

先来看一下运行的整体流程,记住一下流程,这就是我们看代码的导航地图。

spark的启动流程分为以下几个步骤

  1. SparkContext 向资源管理器注册并向资源管理器申请运行 Executor
  2. 资源管理器分配 Executor,然后资源管理器启动 Executor
  3. Executor 发送心跳至资源管理器
  4. SparkContext 构建 DAG 有向无环图
  5. 将 DAG 分解成 Stage(TaskSet)
  6. 把 Stage 发送给 TaskScheduler
  7. Executor 向 SparkContext 申请 Task
  8. TaskScheduler 将 Task 发送给 Executor 运行
  9. 同时 SparkContext 将应用程序代码发放给 Executor
  10. Task 在 Executor 上运行,运行完毕释放所有资源

Spark Application Master的启动流程

启动流程概览

driver 端debug命令

1
/opt/soft/spark/bin/spark-submit --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,address=5105,server=y,suspend=y" --master yarn --deploy-mode cluster  --num-executors 1 --executor-memory 800M --driver-memory 700M --executor-cores 2  --class org.apache.spark.examples.JavaWordCount  /opt/soft/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar file:/opt/soft/spark/examples/src/main/resources/kv1.txt

启动流程详细过程

1. yarn container 里命令启动jvm 运行ApplicationMaster.main

在上文spark源码阅读-spark启动流程 2 spark-submit 代码端执行情况一文中,最终在yarn端container启动的主要命令为:

1
2
3
4
bash -c /opt/jdk/bin/java -server
org.apache.spark.deploy.yarn.ApplicationMaster
--class 'org.apache.spark.examples.JavaWordCount'
--jar file:/opt/soft/spark_3.4/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar

所以入口还是shell命令,该shell 命令为container内创建,使用java启动了一个jvm,jvm的入口为org.apache.spark.deploy.yarn.ApplicationMaster 这个类的main。

此时查看spark相关的进程,主要有两个。

进程号为27778的进程为yarn container启动的shell,该命令使用bash -c 启动了一个子进程,子进程运行真正的程序,即进程号为27788的程序。

查看27778的父进程,可以看到进程号为27776的进程,该进程为default_container_executor.sh

即我们像yarn提交的application 程序,yarn在其中一个container中已shell的命令进行运行。

2. main里 new ApplicationMaster类并调用run方法

ApplicationMaster的main方法比较简单,主要就是new了一个ApplicationMaster类,并调用run方法

1
2
3
4
5
6
7
8
9
def main(args: Array[String]): Unit = {
......
master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
......
ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = System.exit(master.run())
})
}

3. ApplicationMaster的运行

run

调用run方法,该函数为applicationMaster运行的主流程。其代码比较长,但去除一些配置,上下文装配外,其主流程比较简单,根据模式,决定是runDriver还是runExecutorLauncher,本人的路径为runDriver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final def run(): Int = {
try {
......
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
} catch {
......
} finally {
......
}
exitCode
}

4. runDriver

runDriver的源码如下,启动了一个线程运行我们的spark主程序,即提交的JavaWordCount的main类。

之后等待userClassThread初始化完成spark context,并设置相关的环境变量,完成后,

  • 通过registerAm注册applicationMaster
  • 通过createAllocator启动executor
  • 通过userClassThread.join()等待userClassThread运行完成,之后退出程序。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    private def runDriver(): Unit = {
    ......
    //启动用户的application线程,即JavaWordCount的main类
    userClassThread = startUserApplication()

    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    ......
    try {
    val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
    Duration(totalWaitTime, TimeUnit.MILLISECONDS))
    if (sc != null) {
    ......
    registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
    ......
    createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
    } else {
    ......
    }
    resumeDriver()
    userClassThread.join()
    } catch {
    ......
    } finally {
    resumeDriver()
    }
    }

5. startUserApplication

startUserApplication 的源码如下,比较简单,就创建了一个线程并启动,该线程通过反射执行我们传入的class,即JavaWordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private def startUserApplication(): Thread = {
......
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])

val userThread = new Thread {
override def run(): Unit = {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
.....
} finally {
......
}
}
}
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start()
userThread
}

6.userThtread: JavaWordCount.main

初始化的核心就是这段创建SparkSession,在getOrCreate方法中会创建sparkContext

1
2
3
4
SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();

7. sparkSession.getOrCreate

当SparkContext未初始化时,通过SparkContext.getOrCreate获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  def getOrCreate(): SparkSession = synchronized {
......
// Global synchronization so we will only set the default session once.
SparkSession.synchronized {
......
val sparkContext = userSuppliedContext.getOrElse {
......
SparkContext.getOrCreate(sparkConf)
// Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
}
......
}

return session
}
}

8. SparkContext.getOrCreate

该方法比较简单,主要用于获取或实例化 SparkContext ,并将其注册为单例对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def getOrCreate(config: SparkConf): SparkContext = {
// Synchronize to ensure that multiple create requests don't trigger an exception
// from assertNoOtherContextIsRunning within setActiveContext
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
if (activeContext.get() == null) {
setActiveContext(new SparkContext(config))
} else {
if (config.getAll.nonEmpty) {
logWarning("Using an existing SparkContext; some configuration may not take effect.")
}
}
activeContext.get()
}
}

9.new SparkContext(config)

该方法其实为SparkContext的初始化,SparkContext的属性较多,我们只关注启动流程上关注的信息,即设调用了YarnClusterScheduler.postStartHook()

在SparContext.class内部,有一大段代码块,该段代码块会被编译进构造器中,也就是在new class时候会运行,整个方法见识创建了一个YarnClusterScheduler,然后在启动完成后调用postStartHook方法。

1
2
3
4
5
6
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
// Post init
_taskScheduler.postStartHook()

10 为什么是YarnClusterScheduler.postStartHook()?

还记得上文第四部4-rundriver方法中,启动完userThread后,main线程会等待SparkContext初始化完成,那么如何判断的呢,是通过多线程间的promise和futhur获取的。

1
2
3
4
5
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
......
}

而sparkContextPromise的对象放入是通过什么方法呢,是sparkContextInitialized

1
2
3
4
5
6
7
8
private def sparkContextInitialized(sc: SparkContext) = {
sparkContextPromise.synchronized {
// Notify runDriver function that SparkContext is available
sparkContextPromise.success(sc)
// Pause the user class thread in order to make proper initialization in runDriver function.
sparkContextPromise.wait()
}
}

而sparkContextInitialized又被ApplicationMaster.sparkContextInitialized这个静态方法调用。

1
2
3
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
master.sparkContextInitialized(sc)
}

而ApplicationMaster.sparkContextInitialized又被YarnClusterScheduler类中的postStartHook方法调用,YarnClusterScheduler继承自YarnScheule基础自TaskScheuleImpl,本质上就是任务调度暴露了hook接口,方便后续使用。

1
2
3
4
5
override def postStartHook(): Unit = {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}

所有,整体流程就是userThread在完成SparkContext的初始化后,告知schedule启动完成,schedule将告知ApplicationMaster启动完成,ApplicationMaster接受后,继续之前阻塞的程序。

11.registerAM

这一步就是注册applicationMaster,其实也是driver了,很简单,直接调用client.register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def registerAM(
host: String,
port: Int,
_sparkConf: SparkConf,
uiAddress: Option[String],
appAttempt: ApplicationAttemptId): Unit = {
val appId = appAttempt.getApplicationId().toString()
val attemptId = appAttempt.getAttemptId().toString()
val historyAddress = ApplicationMaster
.getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId)

client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
registered = true
}

YarnClient.register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def register(
driverHost: String,
driverPort: Int,
conf: YarnConfiguration,
sparkConf: SparkConf,
uiAddress: Option[String],
uiHistoryAddress: String): Unit = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress

val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}

logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
registered = true
}
}

这一段都是和yarn相关的,就不展开了

12 createAllocator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private def createAllocator(
driverRef: RpcEndpointRef,
_sparkConf: SparkConf,
rpcEnv: RpcEnv,
appAttemptId: ApplicationAttemptId,
distCacheConf: SparkConf): Unit = {
.....
val appId = appAttemptId.getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val localResources = prepareLocalResources(distCacheConf)

// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}

allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)

// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))

allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER, sparkConf)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
// do not register static sources in this case as per SPARK-25277
ms.start(false)
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}

executor也启动完毕,这里也就不展开了。

发现

最近偶然间,发现一个以前spark sql不知道的写法,窗口函数的窗口是可以起别名的。

本文就作为笔记罗列下窗口别名的写法。

哪里看?

要了解最官方的语法及解释,自然是看官方的解释了。
spark的官方为 Window Functions
hive的官方为LanguageManual WindowingAndAnalytics

本文以spark sql语法为准

窗口别名的基本语法

1
2
3
4
5
6
7
8
select 
window_function [ nulls_option ] OVER window_name

from xxx

window window_name as ( [ { PARTITION | DISTRIBUTE } BY partition_col_name = partition_col_val ( [ , ... ] ) ]
{ ORDER | SORT } BY expression [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [ , ... ]
[ window_frame ] )

举例来说

1
2
3
4
5
6
7
8
9
10
with tmp as (
select '张三' as name,100 as score
union all
select '张三' as name,80 as score
)
select *
,lead(score) over w as lead_score
from tmp
window w as (partition by name order by score)
order by score

此时结果为

name score lead_score
张三 80 100
张三 100 null

有什么作用?

当我们同一个窗口,要取的字段比较多时,窗口的partition by xx order by xxx就会重复编写,此时,如要要修改窗口,那么就会要改很多地方,而使用别名,则只需修改一个地方,同时,代码也会简洁很多。

还是举个例子,比如有如下的订单表

id 用户名 商品 支付时间 购买数量 支付金额
1 张三 奶粉 2023-01-01 13:53:52 2 120
2 张三 尿不湿 2023-01-02 14:14:14 3 200
需求是加上上一次同一个用户购买的商品名称,支付时间,购买金额用于分析比较。

那么,正常的一个sql写法为

1
2
3
4
5
6
7
8
9
10
11
with item_order as (
select 1 as id,'张三' as name,'奶粉' as item_name,'2023-01-01 13:53:52' as pay_time,2 as item_cnt,120 as pay_amt
union all
select 2 as id,'张三' as name,'尿不湿' as item_name,'2023-01-02 14:14:14' as pay_time,3 as item_cnt,200 as pay_amt
)
select *
,lag(item_name) over (partition by name order by pay_time) as lag_item_name
,lag(pay_time) over (partition by name order by pay_time) as lag_pay_time
,lag(pay_amt) over (partition by name order by pay_time) as lag_pay_amt
from item_order
order by name,pay_time

查看上述sql,会发现写了三次 over (partition by name order by pay_time)

而采用窗口别名复用的方法,sql可以变为如下

1
2
3
4
5
6
7
8
9
10
11
12
13
with item_order as (
select 1 as id,'张三' as name,'奶粉' as item_name,'2023-01-01 13:53:52' as pay_time,2 as item_cnt,120 as pay_amt
union all
select 2 as id,'张三' as name,'尿不湿' as item_name,'2023-01-02 14:14:14' as pay_time,3 as item_cnt,200 as pay_amt
)
select *
,lag(item_name) over name_window as lag_item_name
,lag(pay_time) over name_window as lag_pay_time
,lag(pay_amt) over name_window as lag_pay_amt
from item_order
window name_window as (partition by name order by pay_time)
order by name,pay_time

可以看到对窗口name_window进行了复用,此时只需修改 name_window 一个地方就可以了。

参考资料