0%

问题描述

在自己的服务器上安装了一堆yum后,再次ssh远程登录时,服务器提示:

1
2
3
-bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
-bash: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)
/bin/sh: warning: setlocale: LC_ALL: cannot change locale (en_US.UTF-8)

解决方案

1、重新生产对应的locale文件

1
localedef -v -c -i en_US -f UTF-8 en_US.UTF-8

2、查看系统当前支持的locale定义

1
locale -a 

为什么要用线程池?

池化技术想必大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

线程池提供了一种限制和管理资源(包括执行一个任务)的方式。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。

这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

synchronized 关键字和 volatile 关键字的区别

synchronized 关键字和 volatile 关键字是两个互补的存在,而不是对立的存在!

  1. volatile 关键字是线程同步的轻量级实现,所以 volatile 性能肯定比synchronized关键字要好 。但是 volatile 关键字只能用于变量而 synchronized 关键字可以修饰方法以及代码块 。

  2. volatile 关键字能保证数据的可见性,但不能保证数据的原子性。synchronized 关键字两者都能保证。

  3. volatile关键字主要用于解决变量在多个线程之间的可见性,而 synchronized 关键字解决的是多个线程之间访问资源的同步性。

[toc]

约定规则

Apache Spark有哪些常见的稳定版本,Spark1.6.0的数字分别代表什么意思?

常见的大的稳定版本有Spark 1.3,Spark1.6, Spark 2.0 ,Spark1.6.0的数字含义

  • 第一个数字:1
    major version : 代表大版本更新,一般都会有一些 api 的变化,以及大的优化或是一些结构的改变;
  • 第二个数字:6
    minor version : 代表小版本更新,一般会新加 api,或者是对当前的 api 就行优化,或者是其他内容的更新,比如说 WEB UI 的更新等等;
  • 第三个数字:0
    patch version , 代表修复当前小版本存在的一些 bug,基本不会有任何 api 的改变和功能更新;记得有一个大神曾经说过,如果要切换 spark 版本的话,最好选 patch version 非 0 的版本,因为一般类似于 1.2.0, … 1.6.0 这样的版本是属于大更新的,有可能会有一些隐藏的 bug 或是不稳定性存在,所以最好选择 1.2.1, … 1.6.1 这样的版本。

spark在1.6之前使用的是Akka进行通信,1.6及以后是基于netty。
现阶段的Flink是基于Akka+Netty

基本概念

整体概念

你是怎么理解Spark,它的特点是什么?

Spark是一个基于内存的,用于大规模数据处理(离线计算、实时计算、快速查询(交互式查询))的统一分析引擎。

它内部的组成模块,包含

  • SparkCore,
  • SparkSQL,
  • Spark Streaming,
  • SparkMLlib,
  • SparkGraghx等。

具体的,

  • SparkCore:Spark 的基础组件,提供了任务调度、内存管理和错误恢复等功能。它还定义了 RDD(Resilient Distributed Datasets)数据结构,用于在集群上进行分布式计算。
  • SparkSQL:用于处理结构化数据的组件,支持使用 SQL 查询数据。它提供了 DataFrame 和 Dataset 两个 API,可以方便地进行数据处理和分析。适合处理大规模的结构化数据。
  • Spark Streaming:用于实时数据处理的组件,可以将实时数据流划分为小批次进行处理。它支持各种数据源,如 Kafka、Flume 和 HDFS,并提供了窗口操作和状态管理等功能。适合实时数据分析和流式处理。
  • SparkMLlib:用于机器学习的组件,提供了常见的机器学习算法和工具。它支持分类、回归、聚类和推荐等任务,并提供了特征提取、模型评估和模型调优等功能。适合大规模的机器学习任务。
  • SparkGraghx等:用于图计算的组件,提供了图结构的抽象和常见的图算法。它支持图的构建、遍历和计算,并提供了图分析和图挖掘等功能。适合社交网络分析和图计算任务。

Spark的主要特点包括:

  • 快:Spark计算速度是MapReduce计算速度的10-100倍,Spark使用内存计算技术,以及基于弹性分布式数据集(RDD)的计算模型,可以在内存中对数据进行高效处理,从而比传统的基于磁盘的计算系统更快速。
  • 容错性:Spark可以在节点故障时重新计算丢失的数据,从而避免了数据丢失的问题,保证了任务的可靠性。
  • 多语言支持:Spark提供了多种编程语言API,包括Java、Scala、Python和R等,使得开发者可以使用自己熟悉的语言进行数据处理任务。
  • 数据处理能力:Spark可以处理各种类型的数据,包括结构化数据、半结构化数据和非结构化数据等,并且支持各种数据源的读写操作,如HDFS、Hive、MySQL等。
  • 可扩展性:Spark可以在大规模集群上运行,支持自动分区和并行化处理,从而可以处理PB级别的数据。

总的来说,Spark具有高效的性能、容错性、多语言支持、强大的数据处理能力和良好的可扩展性,适用于各种大规模数据处理任务,如机器学习、图像处理、数据挖掘、日志分析等。

有了mapreduce,为什么还要spark?

mapreduce在设计上存在缺陷,主要为

  • 缺少对迭代的支持
  • 中间数据输出到硬盘存储,产生了较高的延迟

Spark 解决了 Hadoop 的哪些问题?

  1. MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;

Spark:Spark 采用 RDD 计算模型,简单容易上手。

  1. MR:只提供 map 和 reduce 两个操作,表达能力欠缺;

Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;

  1. MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job 之间的管理以来需要开发者自己进行管理;

Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;

  1. MR:中间结果存放在 hdfs 中;

Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;

  1. MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;

Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。

  1. MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;

Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。

Hadoop 和 Spark 使用场景?

Hadoop/MapReduce 和 Spark 最适合的都是做离线型的数据分析,但 Hadoop 特别适合是单次分析的数据量“很大”的情景,而 Spark 则适用于数据量不是很大的情景。

一般情况下,对于中小互联网和企业级的大数据应用而言,单次分析的数量都不会“很大”,因此可以优先考虑使用 Spark。

业务通常认为 Spark 更适用于机器学习之类的“迭代式”应用,80GB 的压缩数据(解压后超过 200GB),10 个节点的集群规模,跑类似“sum+group-by”的应用,MapReduce 花了 5 分钟,而 spark 只需要 2 分钟。

Hadoop 和 Spark 的相同点和不同点?

  1. 计算模型抽象不同
    Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适合高时延环境下批处理计算的应用;

Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算的应用;

spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。

  1. 资源抽象不同
    Spark Task的启动时间快。spark是executor中启动线程的方式Spark采用fork线程的方式,而Hadoop是container中创建新的进程 的方式。
  2. 缓存机制不同
    Spark的缓存机制比HDFS的缓存机制高效。

通常来说,Spark 与 MapReduce 相比,Spark 运行效率更高。请说明效率更高来源于 Spark 内置的哪些机制?

  1. 数据模型优势,rdd的抽象比mapreduce更优秀
    spark是迭代式模型,而mapreduce是批处理模型。

    spark的 DAG、map之间以pipeline方式运行,无需刷磁盘。RDD抽象出一个被分区、不可变、且能并行操作的数据集;从HDFS读取的需要计算的数据,在经过处理后的中间结果会作为RDD单元缓存到内存当中,并可以作为下一次计算的输入信息。最终Spark只需要读取和写入一次HDFS,这样就避免了Hadoop MapReduce的大IO操作。

    MR最大缺点是采用非循环式的数据流模型,编程不够灵活,仅支持map和reduce两种操作。当一个计算逻辑复杂的时候,需要写多个MR任务运行【并且这些MR任务生成的结果在下一个MR任务使用时需要将数据持久化到磁盘才行,这就不可避免的进行遭遇大量磁盘IO影响效率】。

  2. spark可以通过缓存共享rdd,dataframe等,mapreduce只能通过hdfs缓存,效率相差较多。
    比如一个复杂逻辑中 ,一个map-reduce产生的结果A,如果在后续的map-reduce过程中需要反复用到,spark可以把A缓存到内存中,这样后续的map-reduce过程就只需要从内存中读取A即可,也会加快速度

  3. 容错机制 Linage。
    Spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在 节点内存中的只读性的数据集,这些集合石弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算

  4. spark 支持checkpoint,遇错可快速恢复。

  5. 资源模型不同
    spark是多线程模型,每个worker节点运行一个或多个executor服务,每个task作为线程运行在executor中,task间可共享资源,
    而MR是多进程模型,任务调度(频繁申请、释放资源)和启动开销大,不适合低延迟类型作业

重点部分就是 DAG 和 Lingae

Spark 与 MapReduce 的 Shuffle 的区别?

1)从 high-level 的角度来看,两者并没有大的差别。都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。 

2)从 low-level 的角度来看,两者差别不小。Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。
目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。 

3)从实现角度来看,两者也有不少差别。Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read的处理逻辑?以及两个处理逻辑应该怎么高效实现?Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

Spaek 程序执行,有时候默认为什么会产生很多 task,怎么修改默认 task 执行个数?

  1. 输入数据有很多 task,尤其是有很多小文件的时候,有多少个输入
    block 就会有多少个 task 启动;

  2. spark 中有 partition 的概念,每个 partition 都会对应一个 task,task 越多,在处理大规模数据的时候,就会越有效率。不过 task 并不是越多越好,如果平时测试,或者数据量没有那么大,则没有必要 task 数量太多。

  3. 参数可以通过 spark_home/conf/spark-default.conf 配置文件设置:

针对 spark sql 的 task 数量:spark.sql.shuffle.partitions=50

非 spark sql 程序设置生效:spark.default.parallelism=10

spark的有几种部署模式,每种模式特点?

  • 1)Local 本地模式  
    Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类  

    • local:只启动一个executor  
    • local[k]:启动k个executor  
    • local[ * ]:启动跟cpu数目相同的 executor
  • 2)standalone模式  
    分布式部署集群,构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统,自带完整的服务,资源管理和任务监控,这个模式也是其他模式的基础。

  • 3)Spark on yarn模式  
    分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端。

  • 4)Spark On Mesos模式。 
    官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。

    用户可选择两种调度模式之一运行自己的应用程序:

    • (1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。

应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。

  • (2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。

rdd

你是如何理解Spark中(RDD)的概念?它的作用是什么?

  • 概念
    RDD是弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算 的集合。
  • 作用
    提供了一个抽象的数据模型,将具体的应用逻辑表达为一系列转换操作(函数)。另外不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)
  • 特点
    • 容错性:RDD具有容错性,因为它会自动将数据划分成多个分区,并在集群中的多个节点上进行复制,从而实现数据的高可靠性和容错性。
    • 数据共享:RDD允许多个并行操作共享相同的数据集合,以便在不同的计算步骤中复用数据,从而避免了重复的IO操作,提高了计算效率。
    • 优化计算:RDD通过支持多个转换操作和行动操作,允许进行复杂的计算和数据分析,同时也支持对计算过程进行优化,以便最大限度地减少计算成本。
    • 血统跟踪:RDD通过记录其前一个RDD的依赖关系,构建了一个有向无环图(DAG)来跟踪其数据处理流程,从而允许Spark在节点故障时重新计算丢失的分区,实现了弹性计算。
    • 血缘是指RDD之间的依赖关系,这种依赖关系可以通过DAG(有向无环图)来表示。每个RDD都会记录其父RDD的引用和产生该RDD的转换操作,这样,如果某个RDD的分区丢失或出现故障,Spark可以根据血统信息重新计算该RDD的丢失分区,实现了弹性计算。因此,RDD的血统跟踪是Spark实现容错性的重要机制。

RDD的弹性表现在哪几点?

1)自动的进行内存和磁盘的存储切换; 
2)基于Lineage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存;
6)数据调度弹性,DAG TASK调度和资源无关;
7)数据分片的高度弹性。

Spark 中的 RDD 机制理解吗?

rdd 分布式弹性数据集,简单的理解成一种数据结构,是 spark 框架上的通用货币。所有算子都是基于 rdd 来执行的,不同的场景会有不同的 rdd 实现类,但是都可以进行互相转换。rdd 执行过程中会形成 dag 图,然后形成 lineage 保证容错性等。从物理的角度来看 rdd 存储的是 block 和 node 之间的映射。

RDD 是 spark 提供的核心抽象,全称为弹性分布式数据集。

RDD 在逻辑上是一个 hdfs 文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让 RDD 中的数据可以被并行操作(分布式数据集)

比如有个 RDD 有 90W 数据,3 个 partition,则每个分区上有 30W 数据。RDD 通常通过 Hadoop 上的文件,即 HDFS 或者 HIVE 表来创建,还可以通过应用程序中的集合来创建;RDD 最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的 RDD partition 因为节点故障,导致数据丢失,那么 RDD 可以通过自己的数据来源重新计算该 partition。这一切对使用者都是透明的。

RDD 的数据默认存放在内存中,但是当内存资源不足时,spark 会自动将 RDD 数据写入磁盘。比如某结点内存只能处理 20W 数据,那么这 20W 数据就会放入内存中计算,剩下 10W 放到磁盘中。RDD 的弹性体现在于 RDD 上自动进行内存和磁盘之间权衡和切换的机制。

RDD有哪些缺陷?

  1. 不支持细粒度的写和更新操作,Spark写数据是粗粒度的,所谓粗粒度,就是批量写入数据,目的是为了提高效率。但是Spark读数据是细粒度的,也就是说可以一条条的读。

  2. 不支持增量迭代计算,如果对Flink熟悉,可以说下Flink支持增量迭代计算。

RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么?

reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。

groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列(Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。

所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速度,还可以防止使用 groupByKey 造成的内存溢出问题。

如何区分 RDD 的宽窄依赖?

窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;

宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。

介绍一下 cogroup rdd 实现原理,你在什么场景下用过这个 rdd?

cogroup:对多个(2~4)RDD 中的 KV 元素,每个 RDD 中相同 key 中的元素分别聚合成一个集合。

与 reduceByKey 不同的是:reduceByKey 针对一个 RDD中相同的 key 进行合并。而 cogroup 针对多个 RDD中相同的 key 的元素进行合并。

cogroup 的函数实现:这个实现根据要进行合并的两个 RDD 操作,生成一个 CoGroupedRDD 的实例,这个 RDD 的返回结果是把相同的 key 中两个 RDD 分别进行合并操作,最后返回的 RDD 的 value 是一个 Pair 的实例,这个实例包含两个 Iterable 的值,第一个值表示的是 RDD1 中相同 KEY 的值,第二个值表示的是 RDD2 中相同 key 的值。

由于做 cogroup 的操作,需要通过 partitioner 进行重新分区的操作,因此,执行这个流程时,需要执行一次 shuffle 的操作(如果要进行合并的两个 RDD 的都已经是 shuffle 后的 rdd,同时他们对应的 partitioner 相同时,就不需要执行 shuffle)。

场景:表关联查询或者处理重复的 key。

RDD 持久化原理?

spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。

调用 cache()和 persist()方法即可。cache()和 persist()的区别在于,cache()是 persist()的一种简化方式,cache()的底层就是调用 persist()的无参版本 persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,可以使用 unpersist()方法。RDD 持久化是可以手动选择不同的策略的。在调用 persist()时传入对应的 StorageLevel 即可。

依赖&DAG

DAG 是什么?

DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);

原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。

DAG 中为什么要划分 Stage?

并行计算。

一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。

如何划分 DAG 的 stage?

对于窄依赖,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在在同一个 stage 中,可以实现流水线计算)。

对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 stage。

DAG 划分为 Stage 的算法了解吗?

核心算法:回溯算法

从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。

Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。
然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止。

具体划分算法请参考:AMP 实验室发表的论文
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se

简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数?

  • 窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖
  • 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)

那Stage是如何划分的呢?

  • 根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。

每个stage又根据什么决定task个数?

  • Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。

这里为了方便大家理解,贴上一张过程图

Spark中的宽依赖和窄依赖是指RDD之间的依赖关系类型。在Spark中,每个RDD都有一个或多个父RDD和一个或多个子RDD,RDD之间的依赖关系分为宽依赖和窄依赖两种类型:

  • 窄依赖(Narrow Dependency):指一个RDD的每个分区只依赖于父RDD的一个或多个分区,父RDD的每个分区最多只被一个子RDD的分区使用。窄依赖的特点是数据局部性高,可以在同一个节点上完成计算,从而提高计算效率。
  • 宽依赖(Wide Dependency):指一个RDD的一个或多个分区依赖于父RDD的多个分区,或者父RDD的同一个分区被多个子RDD的分区使用。宽依赖的特点是数据局部性较低,需要进行数据的洗牌操作(Shuffle),从而增加了计算成本和网络传输开销。

在Spark中,每个宽依赖和窄依赖之间的转换都会形成一个Stage,每个Stage包含一组具有相同依赖关系的Task。一个Stage中的Task个数由多个因素决定,包括可用的CPU核心数、可用内存大小、数据分区数等。

具体来说,Spark会将RDD划分成多个分区,并在每个分区上执行一个Task,以便实现并行计算。Task的个数通常等于RDD的分区数,这样可以确保所有Task都具有相同的计算量,并且可以在不同的节点上并行执行。

在Spark中,Stage划分的基本原则是:

  • 如果两个RDD之间存在宽依赖,那么它们就属于不同的Stage。这是因为宽依赖需要进行Shuffle操作,需要将数据从多个节点收集到一个节点上进行计算,这会产生较大的网络开销和计算成本。因此,将宽依赖放在不同的Stage中可以提高计算效率。
  • 而对于窄依赖,Spark会尽量将它们放在同一个Stage中,以便在同一个节点上执行计算,从而提高计算效率。

为什么要设计宽窄依赖?

  1. 对于窄依赖:

窄依赖的多个分区可以并行计算;

窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。

  1. 对于宽依赖:

划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。

算子

列举Spark常用的transformation和action算子,有哪些算子会导致Shuffle?

Spark中常用的transformation算子有:

  • map:对RDD中的每个元素应用一个函数,返回一个新的RDD。
  • filter:对RDD中的每个元素应用一个谓词函数,返回一个包含满足谓词的元素的新RDD。
  • flatMap:类似于map,但是每个输入元素可以映射到多个输出元素,返回一个新的RDD。
  • groupByKey:将具有相同key的元素进行分组,返回一个(key, values)的Tuple,其中values是一个迭代器。
  • reduceByKey:将具有相同key的元素进行分组,并将每个key对应的values应用一个reduce函数,返回一个(key, reduced value)的Tuple。
  • join:对两个具有相同key的RDD进行join操作,返回一个新的RDD。

常用的action算子有:

  • count:返回RDD中元素的个数。
  • collect:将RDD中的所有元素收集到Driver节点上,并返回一个数组。
  • first:返回RDD中的第一个元素。
  • take:返回RDD中前n个元素。
  • reduce:对RDD中的元素应用一个reduce函数,返回一个单个值。

上述算子中,groupByKey、reduceByKey、join等算子会导致Shuffle操作,因为它们需要将具有相同key的元素进行分组,而这些元素通常分布在不同的节点上。Shuffle操作会将数据从不同的节点收集到一个节点上进行计算,因此需要消耗大量的网络和计算资源。

join()和cogroup():这两个算子需要将具有相同键的元素进行连接操作,也需要进行Shuffle操作。
sortByKey():这个算子需要对RDD中的元素进行排序,因此需要进行Shuffle操作。
repartition()和coalesce():这两个算子都需要对RDD进行重新分区操作,需要进行Shuffle操作。

reduceByKey与groupByKey的区别,哪一种更具优势?

  • reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
  • groupByKey:按照key进行分组,直接进行shuffle

所以,在实际开发过程中,reduceByKey比groupByKey,更建议使用。但是需要注意是否会影响业务逻辑。

Repartition和Coalesce 的关系与区别,能简单说说吗?

  • 1)关系:
    两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
  • 2)区别:
    repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
    一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。

cache&persists

简述下Spark中的缓存(cache和persist)与checkpoint机制,并指出两者的区别和联系

关于Spark缓存和检查点的区别,大致可以从这3个角度去回答:

  • 位置
    Persist 和 Cache将数据保存在内存,Checkpoint将数据保存在HDFS
  • 生命周期
    Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法,Checkpoint永久存储不会被删除。
  • RDD依赖关系
    Persist 和 Cache,不会丢掉RDD间的依赖链/依赖关系,CheckPoint会斩断依赖链。

Checkpoint 和持久化机制的区别?

最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系)是不变的。但是 checkpoint 执行完之后,rdd 已经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是 checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低

Checkpoint 检查点机制?

应用场景:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用 checkpoint 功能。

原因:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用 checkpoint()方法。之后在 RDD 所处的 job 运行结束之后,会启动一个单独的 job,来将 checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在 spark streaming 中用来保障容错性的主要机制,它可以使 spark streaming 阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

  1. 控制发生失败时需要重算的状态数。Spark streaming 可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。

  2. 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样 spark streaming 就可以读取之前运行的程序处理数据的进度,并从那里继续。

broadcast 广播&累加器

简述Spark中共享变量(广播变量和累加器)的基本原理与用途

关于Spark中的广播变量和累加器的基本原理和用途,

  • 累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
  • 广播变量是在每个机器上缓存一份,不可变,只读的,相同的变量,该节点每个任务都能访问,起到节省资源和优化的作用。它通常用来高效分发较大的对象。

数据存储

Spark 中数据的位置是被谁管理的?

每个数据分片都对应具体物理位置,数据的位置是被blockManager管理,无论数据是在磁盘,内存还是 tacyan,都是由 blockManager 管理。

Spark的数据本地性有哪几种?

spark的数据本地性有3种

  1. process_local是指读取缓存的本地的数据
  2. node_local指读取本地节点硬盘数据
  3. any指读取非本地节点数据

通常读取数据process_local>node_local>any,尽量使数据以process_local或node_local方式读取,其中process_local还和cache有关,如果rdd,如果rdd经常用的话将该rdd cache到内存中,由于cache是lazy的,必须通过action算子触发

spark streaming

Spark Streaming 以及基本工作原理?

Spark streaming 是 spark core API 的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。

它支持从多种数据源读取数据,比如 Kafka、Flume、Twitter 和 TCP Socket,并且能够使用算子比如 map、reduce、join 和 window 等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。

Spark streaming 内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成 batch,比如每收集一秒的数据封装成一个 batch,然后将每个 batch 交给 spark 的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的 batch 组成的。其中,一个batchInterval累加读取到的数据对应一个RDD的数据

如何实现Spark Streaming读取Flume中的数据?

可以这样说:

  • 前期经过技术调研,查看官网相关资料,发现sparkStreaming整合flume有2种模式,一种是拉模式,一种是推模式,然后在简单的聊聊这2种模式的特点,以及如何部署实现,需要做哪些事情,最后对比两种模式的特点,选择那种模式更好。

  • 推模式:Flume将数据Push推给Spark Streaming

  • 拉模式:Spark Streaming从flume 中Poll拉取数据

Spark Streaming 整合 Kafka 的两种模式?

  1. receiver 方式:将数据拉取到 executor 中做操作,若数据量大,内存存储不下,可以通过 WAL,设置了本地存储,保证数据不丢失,然后使用 Kafka 高级 API 通过 zk 来维护偏移量,保证消费数据。receiver 消费的数据偏移量是在 zk 获取的,此方式效率低,容易出现数据丢失。
  • receiver 方式的容错性:在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用 Spark Streaming 的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的 Kafka 数据写入分布式文件系统(比如 HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

  • Kafka 中的 topic 的 partition,与 Spark 中的 RDD 的 partition 是没有关系的。在 1、KafkaUtils.createStream()中,提高 partition 的数量,只会增加 Receiver 方式中读取 partition 的线程的数量。不会增加 Spark 处理数据的并行度。 可以创建多个 Kafka 输入 DStream,使用不同的 consumer group 和 topic,来通过多个 receiver 并行接收数据。

  1. 基于 Direct 方式:使用 Kafka 底层 Api,其消费者直接连接 kafka 的分区上,因为 createDirectStream 创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,但是需要自己维护偏移量,即用即取,不会给内存造成太大的压力,效率高。
  • 优点:简化并行读取:如果要读取多个 partition,不需要创建多个输入 DStream 然后对它们进行 union 操作。Spark 会创建跟 Kafka partition 一样多的 RDD partition,并且会并行从 Kafka 中读取数据。所以在 Kafka partition 和 RDD partition 之间,有一个一对一的映射关系。

  • 高性能:如果要保证零数据丢失,在基于 receiver 的方式中,需要开启 WAL 机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka 自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到 WAL 中。而基于 direct 的方式,不依赖 Receiver,不需要开启 WAL 机制,只要 Kafka 中作了数据的复制,那么就可以通过 Kafka 的副本进行恢复。

  1. receiver 与和 direct 的比较:
  • 基于 receiver 的方式,是使用 Kafka 的高阶 API 来在 ZooKeeper 中保存消费过的 offset 的。这是消费 Kafka 数据的传统方式。这种方式配合着 WAL 机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为 Spark 和 ZooKeeper 之间可能是不同步的。

  • 基于 direct 的方式,使用 Kafka 的低阶 API,Spark Streaming 自己就负责追踪消费的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

  • Receiver 方式是通过 zookeeper 来连接 kafka 队列,Direct 方式是直接连接到 kafka 的节点上获取数据。

DStream 以及基本工作原理?

DStream:Discretized Stream,离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流;
DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window;

DStream的内部,其实是一系列持续不断产生的RDD,RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集;
DStream中的每个RDD都包含了一个时间段内的数据;
以下图为例,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了RDD@time2,。。。

DStream 是 spark streaming 提供的一种高级抽象,代表了一个持续不断的数据流。

DStream 可以通过输入数据源来创建,比如 Kafka、flume 等,也可以通过其他 DStream 的高阶函数来创建,比如 map、reduce、join 和 window 等。

DStream 内部其实不断产生 RDD,每个 RDD 包含了一个时间段的数据。

Spark streaming 一定是有一个输入的 DStream 接收数据,按照时间划分成一个一个的 batch,并转化为一个 RDD,RDD 的数据是分散在各个子节点的 partition 中。

spark sql

你用 Spark Sql 处理的时候, 处理过程中用的 DataFrame 还是直接写的 Sql?为什么?

这个问题的宗旨是问你 spark sql 中 dataframe 和 sql 的区别,从执行原理、操作方便程度和自定义程度来分析
这个问题。

Spark SQL 是如何将数据写到 Hive 表的?

  • 方式一:是利用 Spark RDD 的 API 将数据写入 hdfs 形成 hdfs 文件,之后再将 hdfs 文件和 hive 表做加载映射。

  • 方式二:利用 Spark SQL 将获取的数据 RDD 转换成 DataFrame,再将 DataFrame 写成缓存表,最后利用 Spark SQL 直接插入 hive 表中。而对于利用 Spark SQL 写 hive 表官方有两种常见的 API,第一种是利用 JavaBean 做映射,第二种是利用 StructType 创建 Schema 做映射。

Spark SQL 执行的流程?

这个问题如果深挖还挺复杂的,这里简单介绍下总体流程:

  1. parser:基于 antlr 框架对 sql 解析,生成抽象语法树。

  2. 变量替换:通过正则表达式找出符合规则的字符串,替换成系统缓存环境的变量

SQLConf 中的spark.sql.variable.substitute,默认是可用的;参考SparkSqlParser

  1. parser:将 antlr 的 tree 转成 spark catalyst 的 LogicPlan,也就是 未解析的逻辑计划;详细参考AstBuild, ParseDriver

  2. analyzer:通过分析器,结合 catalog,把 logical plan 和实际的数据绑定起来,将 未解析的逻辑计划 生成 逻辑计划;详细参考QureyExecution

  3. 缓存替换:通过 CacheManager,替换有相同结果的 logical plan(逻辑计划)

  4. logical plan 优化,基于规则的优化;优化规则参考 Optimizer,优化执行器 RuleExecutor

  5. 生成 spark plan,也就是物理计划;参考QueryPlanner和SparkStrategies

  6. spark plan 准备阶段

  7. 构造 RDD 执行,涉及 spark 的 wholeStageCodegenExec 机制,基于 janino 框架生成 java 代码并编译

架构组件

Spark 有哪些组件?

  1. master:管理集群和节点,不参与计算。

  2. worker:计算节点,进程本身不参与计算,和 master 汇报。

  3. Driver:运行程序的 main 方法,创建 spark context 对象。

  4. spark context:控制整个 application 的生命周期,包括 dagsheduler 和 task scheduler 等组件。

  5. client:用户提交程序的入口。

Spark Master 使用 Zookeeper 进行 HA,有哪些源数据保存到 Zookeeper 里面?

spark 通过这个参数 spark.deploy.zookeeper.dir 指定 master 元数据在 zookeeper 中保存的位置,包括 Worker,Driver 和 Application 以及 Executors。standby 节点要从 zk 中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。

注:Master 切换需要注意 2 点:
1、在 Master 切换的过程中,所有的已经在运行的程序皆正常运行!
因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源,所以在运行时 Job 本身的
调度和处理和 Master 是没有任何关系。
2、在 Master 的切换过程中唯一的影响是不能提交新的 Job:一方面不能够提交新的应用程序给集群,
因为只有 Active Master 才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因
Action 操作触发新的 Job 的提交请求。

HA

Spark Master HA 主从切换过程不会影响到集群已有作业的运行,为什么?

不会的。

因为程序在运行之前,已经申请过资源了,driver 和 Executors 通讯,不需要和 master 进行通讯的。

Spark 主备切换机制原理知道吗?

Master 实际上可以配置两个,Spark 原生的 standalone 模式是支持 Master 主备切换的。当 Active Master 节点挂掉以后,我们可以将 Standby Master 切换为 Active Master。

Spark Master 主备切换可以基于两种机制,一种是基于文件系统的,一种是基于 ZooKeeper 的。

基于文件系统的主备切换机制,需要在 Active Master 挂掉之后手动切换到 Standby Master 上;

而基于 Zookeeper 的主备切换机制,可以实现自动切换 Master。

Spark 如何保证宕机迅速恢复?

  1. 适当增加 spark standby master

  2. 编写 shell 脚本,定期检测 master 状态,出现宕机后对 master 进行重启操作

流程

Spark 的运行流程?


具体运行流程如下:

  1. SparkContext 向资源管理器注册并向资源管理器申请运行 Executor

  2. 资源管理器分配 Executor,然后资源管理器启动 Executor

  3. Executor 发送心跳至资源管理器

  4. SparkContext 构建 DAG 有向无环图

  5. 将 DAG 分解成 Stage(TaskSet)

  6. 把 Stage 发送给 TaskScheduler

  7. Executor 向 SparkContext 申请 Task

  8. TaskScheduler 将 Task 发送给 Executor 运行

  9. 同时 SparkContext 将应用程序代码发放给 Executor

  10. Task 在 Executor 上运行,运行完毕释放所有资源

调优

数据倾斜

对于 Spark 中的数据倾斜问题你有什么好的方案?

  1. 前提是定位数据倾斜,是 OOM 了,还是任务执行缓慢,看日志,看 WebUI

  2. 解决方法,有多个方面:

  • 避免不必要的 shuffle,如使用广播小表的方式,将 reduce-side-join 提升为 map-side-join
  • 分拆发生数据倾斜的记录,分成几个部分进行,然后合并 join 后的结果
  • 改变并行度,可能并行度太少了,导致个别 task 数据压力大
  • 两阶段聚合,先局部聚合,再全局聚合
  • 自定义 paritioner,分散 key 的分布,使其更加均匀

数据倾斜的产生和解决办法?

数据倾斜以为着某一个或者某几个 partition 的数据特别大,导致这几个 partition 上的计算需要耗费相当长的时间。

在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而一个 stage 里面的多个 task 是可以并行执行,task 数目由 partition 数目决定,如果一个 partition 的数目特别大,那么导致这个 task 执行时间很长,导致接下来的 stage 无法执行,从而导致整个 job 执行变慢。

避免数据倾斜,一般是要选用合适的 key,或者自己定义相关的 partitioner,通过加盐或者哈希值来拆分这些 key,从而将这些数据分散到不同的 partition 去执行。

如下算子会导致 shuffle 操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;

当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?

嗯,有点“调优”的味道,感觉真正的“风暴”即将到来,这道题还是很好回答的,我们只需要减少连接数据库的次数即可。

  • 使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。

能介绍下你所知道和使用过的Spark调优吗?

  • 资源参数调优

    1
    2
    3
    4
    5
    6
    7
    8
    9
    num-executors:设置Spark作业总共要用多少个Executor进程来执行

    executor-memory:设置每个Executor进程的内存

    executor-cores:设置每个Executor进程的CPU core数量

    driver-memory:设置Driver进程的内存

    spark.default.parallelism:设置每个stage的默认task数量
  • 开发调优

    • 避免创建重复的RDD
    • 尽可能复用同一个RDD
    • 对多次使用的RDD进行持久化
    • 尽量避免使用shuffle类算子
    • 使用map-side预聚合的shuffle操作
    • 使用高性能的算子
      1
      2
      3
      4
      5
      6
      7
      8
      9
      ①使用reduceByKey/aggregateByKey替代groupByKey

      ②使用mapPartitions替代普通map

      ③使用foreachPartitions替代foreach

      ④使用filter之后进行coalesce操作

      ⑤使用repartitionAndSortWithinPartitions替代repartition与sort类操作
    • 广播大变量
      在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能。
    • 使用Kryo优化序列化性能
    • 优化数据结构
      在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

在实际开发的时候是如何保证数据不丢失的?

可以这样说:

  • flume那边采用的channel是将数据落地到磁盘中,保证数据源端安全性(可以在补充一下,flume在这里的channel可以设置为memory内存中,提高数据接收处理的效率,但是由于数据在内存中,安全机制保证不了,故选择channel为磁盘存储。整个流程运行有一点的延迟性)

  • sparkStreaming通过拉模式整合的时候,使用了FlumeUtils这样一个类,该类是需要依赖一个额外的jar包(spark-streaming-flume_2.10)

  • 要想保证数据不丢失,数据的准确性,可以在构建StreamingConext的时候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)来创建一个StreamingContext,使用StreamingContext.getOrCreate来创建StreamingContext对象,传入的第一个参数是checkpoint的存放目录,第二参数是生成StreamingContext对象的用户自定义函数。如果checkpoint的存放目录存在,则从这个目录中生成StreamingContext对象;如果不存在,才会调用第二个函数来生成新的StreamingContext对象。在creatingFunc函数中,除了生成一个新的StreamingContext操作,还需要完成各种操作,然后调用ssc.checkpoint(checkpointDirectory)来初始化checkpoint功能,最后再返回StreamingContext对象。

这样,在StreamingContext.getOrCreate之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在start()调用前执行。

介绍一下 join 操作优化经验?

join 其实常见的就分为两类: map-side join 和 reduce-side join。

当大表和小表 join 时,用 map-side join 能显著提高效率。

将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘 IO 消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。

如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

在大数据量的情况下,join 是一中非常昂贵的操作,需要在 join 之前应尽可能的先缩小数据量。

对于缩小数据量,有以下几条建议:

  • 若两个 RDD 都有重复的 key,join 操作会使得数据量会急剧的扩大。所有,最好先使用 distinct 或者 combineByKey 操作来减少 key 空间或者用 cogroup 来处理重复的 key,而不是产生所有的交叉结果。在 combine 时,进行机智的分区,可以避免第二次 shuffle。
  1. 如果只在一个 RDD 出现,那你将在无意中丢失你的数据。所以使用外连接会更加安全,这样你就能确保左边的 RDD 或者右边的 RDD 的数据完整性,在 join 之后再过滤数据。

  2. 如果我们容易得到 RDD 的可以的有用的子集合,那么我们可以先用 filter 或者 reduce,如何在再用 join。

Spark 中的 OOM 问题?

  1. map 类型的算子执行中内存溢出如 flatMap,mapPatitions
  • 原因:map 端过程产生大量对象导致内存溢出:这种溢出的原因是在单个 map 中产生了大量的对象导致的针对这种问题。
  1. 解决方案:
  • 增加堆内内存。
  • 在不增加内存的情况下,可以减少每个 Task 处理数据量,使每个 Task 产生大量的对象时,Executor 的内存也能够装得下。具体做法可以在会产生大量对象的 map 操作之前调用 repartition 方法,分区成更小的块传入 map。
  1. shuffle 后内存溢出如 join,reduceByKey,repartition。
  • shuffle 内存溢出的情况可以说都是 shuffle 后,单个文件过大导致的。在 shuffle 的使用,需要传入一个 partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner,默认值是父 RDD 中最大的分区数.这个参数 spark.default.parallelism 只对 HashPartitioner 有效.如果是别的 partitioner 导致的 shuffle 内存溢出就需要重写 partitioner 代码了.
  1. driver 内存溢出
  • 用户在 Dirver 端口生成大对象,比如创建了一个大的集合数据结构。解决方案:将大对象转换成 Executor 端加载,比如调用 sc.textfile 或者评估大对象占用的内存,增加 dirver 端的内存

  • 从 Executor 端收集数据(collect)回 Dirver 端,建议将 driver 端对 collect 回来的数据所作的操作,转换成 executor 端 rdd 操作。

编程

如何使用Spark实现TopN的获取(描述思路或使用伪代码)?

使用Spark实现TopN的一般思路是先使用MapReduce或者Spark计算出各个数据的得分(或者其他排序依据),然后再对这些得分进行排序,最后取出前N个得分最高的数据。其中,排序的过程是需要进行全局操作的,会产生Shuffle操作,因此在性能上需要考虑。
以下是一种使用Spark进行TopN操作的伪代码:

  • 读取数据并将数据转换为RDD格式 rdd = sc.textFile(“path/to/data”).map(parse_data)
  • 计算每个数据的得分 scores_rdd = rdd.map(lambda data: (data, compute_score(data)))
  • 对得分进行排序 sorted_scores_rdd = scores_rdd.sortBy(lambda score: score[1], ascending=False)
  • 取出前N个得分最高的数据 topN_rdd = sorted_scores_rdd.take(N)

其中,parse_data函数用于将原始数据解析成程序中需要的格式,compute_score函数用于计算数据的得分。在第二步和第三步中,需要根据实际情况选择合适的算子,如map()、reduceByKey()、sortBy()等。

参考文章

背景

我们经常会调整hadoop集群的参数,然后希望不重启集群,在线更新配置,然后查看配置是否生效。

1.hadoop hdfs 刷新配置

1
bin/hdfs dfsadmin -refreshSuperUserGroupsConfiguration

2.yarn 刷新配置

1
bin/yarn rmadmin -refreshSuperUserGroupsConfiguration

3.hadoop HA高可用刷新配置

如果集群配置了HA,需要在为主备namenode(node000和node001)同时加载这两个属性(只加载一个不行):

1
2
3
bin/hadoop dfsadmin -fs hdfs://node000:8020 –refreshSuperUserGroupsConfiguration

bin/hadoop dfsadmin -fs hdfs://node001:8020 –refreshSuperUserGroupsConfiguration

4.hadoop 查看配置是否生效

1
hdfs getconf -confkey  dfs.namenode.acls.enabled

5.刷新节点 同样可以 刷新配置

1
hadoop dfsadmin -refreshNodes

6.刷新配置后可以在界面查看配置

hdfs:http://172.16.10.10:50070/conf
yarn:http://172.16.10.10:8088/conf

Spark 为什么比 mapreduce 快?

1、任务模型的优化

  • mapreduce框架中,一个程序只能拥有一个map一个reduce的过程,如果运算逻辑很复杂,一个map+一个reduce是表述不出来的,可能就需要多个map-reduce的过程;mapreduce框架想要做到这个事情,就需要把第一个map-reduce过程产生的结果,写入HDFS,然后由第二个map-reduce过程去hdfs读取后计算,完成后又将结果写入HDFS,再交由第三个map-reduce过程去计算! 重点!!!–这样一来,一个复杂的运算,在mapreduce框架中可能就会发生很多次写入并读取HDFS的操作,而读写HDFS是很慢的事情

  • spark框架,采用的是以rdd为核心,dag为调度,把上面的mapreduce-mapreduce-mapreduce的过程,连续执行,不需要反复落地到HDFS,这样就会比mapreduce快很多啦

2、spark支持在内存中缓存结果
比如一个复杂逻辑中 ,一个map-reduce产生的结果A,如果在后续的map-reduce过程中需要反复用到,spark可以把A缓存到内存中,这样后续的map-reduce过程就只需要从内存中读取A即可,也会加快速度

  1. 资源模型不同
    spark是多线程模型,每个worker节点运行一个或多个executor服务,每个task作为线程运行在executor中,task间可共享资源,
    而MR是多进程模型,任务调度(频繁申请、释放资源)和启动开销大,不适合低延迟类型作业

spark 对比 mapreduce的优势有哪些

  • 计算模型优势,spark的核心技术是弹性分布式数据集(Resilient Distributed Datasets),提供了比 MapReduce 丰富的模型,可以快速在内存中对数据集 进行多次迭代,来支持复杂的数据挖掘算法和图形计算算法。。
  • Spark 和 Hadoop 的根本差异是多个作业之间的数据通信问题 : Spark 多个作业之间数据 通信是基于内存,而 Hadoop 是基于磁盘。
  • Spark Task的启动时间快。Spark采用fork线程的方式,而Hadoop采用创建新的进程 的方式。
  • Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个MR作业之间的数据交 互都要依赖于磁盘交互
  • Spark的缓存机制比HDFS的缓存机制高效。

spark 是什么?

spark的官方是这样定义的

Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.

翻译过来就是 spark是 在单节点机器或集群上执行数据工程、数据科学和机器学习 的多语言引擎。

简单一句话就是Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。

spark 有哪些组件?

  • master:管理集群和节点,不参与计算。
  • worker:计算节点,进程本身不参与计算, 和 master 汇报。
  • Driver:运行程序的 main 方法,创建 spark context 对象。
  • spark context:控制整个 application 的生命周期,包括 dagsheduler 和 task scheduler 等组件。
  • client:用户提交程序的入口。

Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。 Driver 在 Spark 作业执行时主要负责:
➢ 将用户程序转化为作业(job)
➢ 在Executor之间调度任务(task) SchedulerBackend
➢ 跟踪Executor的执行情况
➢ 通过UI展示查询运行情况
实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关 Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为 Driver 类。

Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行。
Executor 有两个核心功能:
➢ 负责运行组成Spark应用的任务,并将结果返回给驱动器进程
➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。

Master & Worker

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调 度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而 Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对 数据进行并行的处理和计算,类似于 Yarn 环境中 NM。

ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用 于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整 个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。
说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster。

spark的一些核心概念

什么是 RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行 计算的集合。
➢ 弹性
⚫ 存储的弹性:内存与磁盘的自动切换;
⚫ 容错的弹性:数据丢失可以自动恢复;
⚫ 计算的弹性:计算出错重试机制;
⚫ 分片的弹性:可根据需要重新分片。
➢ 分布式:数据存储在大数据集群不同节点上
➢ 数据集:RDD封装了计算逻辑,并不保存数据
➢ 数据抽象:RDD是一个抽象类,需要子类具体实现
➢ 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的 RDD 里面封装计算逻辑
➢ 可分区、并行计算

Spark 工作机制

用户在 client 端提交作业后,会由 Driver 运行 main 方法并创建 spark context 上下文。执行 add 算子,形成 dag 图输入 dagscheduler,按照 add 之间的依赖关系划分 stage 输入 task scheduler。task scheduler 会将 stage 划分为 task set 分发到各个节点的 executor 中执行。

简单说一下 hadoop 和 spark 的 shuffle 相同和差异?

1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper (Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送 到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。

2)从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好 处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过 外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每 段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。
如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作; 如果你是 Spark 1.1 的用户,可以将 spark.shuffle.manager 设置为 sort,则 会对数据进行排序。在 Spark 1.2 中,sort 将作为默认的 Shuffle 实现。

3)从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流 程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。 每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。 在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。
如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read 的处理逻辑?以及两个处理逻辑应该怎么高效实现?
Shuffle write 由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力, 另一方面也是为了 fault-tolerance。

spark 工作机制

1 构建 Application 的运行环境,Driver 创建一个 SparkContext
2 SparkContext 向资源管理器(Standalone、Mesos、Yarn)申请 Executor 资源,资源管理器启动 StandaloneExecutorbackend(Executor) 3 Executor 向 SparkContext 申请 Task 4 SparkContext 将应用程序分发给 Executor 5 SparkContext 就建成 DAG 图,DAGScheduler 将 DAG 图解析 成 Stage,每个 Stage 有多个 task,形成 taskset 发送给 task Scheduler,由 task Scheduler 将 Task 发送给 Executor 运行 6 Task 在 Executor 上运行, 运行完释放所有资源

spark 的优化怎么做?

spark 调优比较复杂,但是大体可以分为三个方面来进行
1)平台层面的调优:防止不必要的 jar 包分发,提高数据的本地性,选择高 效的存储格式如 parquet
2)应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录 的资源开销,处理数据倾斜,复用 RDD 进行缓存,作业并行化执行等等
3)JVM 层面的调优:设置合适的资源量,设置合理的 JVM,启用高效的序 列化方法如 kyro,增大 off head 内存等等

数据本地性是在哪个环节确定的?

具体的 task 运行在那他机器上,dag 划分 stage 的时候确定的

RDD 的弹性表现在哪几点?

1)自动的进行内存和磁盘的存储切换;
2)基于 Lineage 的高效容错;
3)task 如果失败会自动进行特定次数的重试;
4)stage 如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint 和 persist,数据计算之后持久化缓存;
6)数据调度弹性,DAG TASK 调度和资源无关;
7)数据分片的高度弹性。

RDD 有哪些缺陷?

1)不支持细粒度的写和更新操作(如网络爬虫),spark 写数据是粗粒度的。 所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就 是说可以一条条的读。
2)不支持增量迭代计算,Flink 支持

Spark 的数据本地性有哪几种?

Spark 中的数据本地性有三种:
1)PROCESS_LOCAL 是指读取缓存在本地节点的数据
2)NODE_LOCAL 是指读取本地节点硬盘数据
3)ANY 是指读取非本地节点数据
通常读取数据 PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以
PROCESS_LOCAL 或 NODE_LOCAL 方式读取。其中 PROCESS_LOCAL 还和 cache 有关,如果 RDD 经常用的话将该 RDD cache 到内存中,注意,由于 cache 是 lazy 的,所以必须通过一个 action 的触发,才能真正的将该 RDD cache 到内存中。

Spark 为什么要持久化,一般什么场景下要进行 persist 操作?

为什么要进行持久化?
spark 所有复杂一点的算法都会有 persist 身影,spark 默认数据放在内存, spark 很多内容都是放在内存的,非常适合高速迭代,1000 个步骤只有第一个 输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就 要容错,rdd 出错或者分片可以根据血统算出来,如果没有对父 rdd 进行 persist 或者 cache 的化,就需要重头做。 以下场景会使用 persist
1)某个步骤计算非常耗时,需要进行 persist 持久化
2)计算链条非常长,重新恢复要算很多步骤,很好使,persist
3)checkpoint 所在的 rdd 要持久化 persist。checkpoint 前,要持久化,
写个 rdd.cache 或者 rdd.persist,将结果保存起来,再写 checkpoint 操作, 这样执行起来会非常快,不需要重新计算 rdd 链条了。checkpoint 之前一定 会进行 persist。
4)shuffle 之后要 persist,shuffle 要进性网络传输,风险很大,数据丢失重 来,恢复代价很大
5)shuffle 之前进行 persist,框架默认将数据持久化到磁盘,这个是框架自 动做的。

介绍一下 join 操作优化经验?

join 其实常见的就分为两类: map-side join 和 reduce-side join。当大表 和小表 join 时,用 map-side join 能显著提高效率。将多份数据进行关联是数
据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变 的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所 有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘 IO 消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。如果其中 有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数 据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍 到数十倍的性能提升。
备注:这个题目面试中非常非常大概率见到,务必搜索相关资料掌握,这里
抛砖引玉。

描述 Yarn 执行一个任务的过程?

1)客户端 client 向 ResouceManager 提交 Application, ResouceManager 接受 Application 并根据集群资源状况选取一个 node 来启 动 Application 的任务调度器 driver(ApplicationMaster)。
2)ResouceManager 找到那个 node,命令其该 node 上的 nodeManager 来启动一个新的 JVM 进程运行程序的 driver(ApplicationMaster)部分, driver(ApplicationMaster)启动时会首先向 ResourceManager 注册,说 明由自己来负责当前程序的运行。
3)driver(ApplicationMaster)开始下载相关 jar 包等各种资源,基于下载 的 jar 等信息决定向 ResourceManager 申请具体的资源内容。
4)ResouceManager 接受到 driver(ApplicationMaster)提出的申请后, 会最大化的满足 资源分配请求,并发送资源的元数据信息给 driver (ApplicationMaster)。
5)driver(ApplicationMaster)收到发过来的资源元数据信息后会根据元 数据信息发指令给具体机器上的 NodeManager,让其启动具体的 container。
6)NodeManager 收到 driver 发来的指令,启动 container,container 启 动后必须向 driver(ApplicationMaster)注册。
7)driver(ApplicationMaster)收到 container 的注册,开始进行任务的 调度和计算,直到 任务完成。
注意:如果 ResourceManager 第一次没有能够满足 driver (ApplicationMaster)的资源请求 ,后续发现有空闲的资源,会主动向 driver(ApplicationMaster)发送可用资源的元数据信息以提供更多的资源用 于当前程序的运行。

Spark on Yarn 模式有哪些优点?

1)与其他计算框架共享集群资源(Spark 框架与 MapReduce 框架同时运行, 如果不用 Yarn 进行资源分配,MapReduce 分到的内存资源会很少,效率低 下);资源按需分配,进而提高集群资源利用等。
2)相较于 Spark 自带的 Standalone 模式,Yarn 的资源分配更加细致。
3)Application 部署简化,例如 Spark,Storm 等多种框架的应用由客户端 提交后,由 Yarn 负责资源的管理和调度,利用 Container 作为资源隔离的单 位,以它为单位去使用内存,cpu 等。
4)Yarn 通过队列的方式,管理同时运行在 Yarn 集群中的多个服务,可根据 不同类型的应用程序负载情况,调整对应的资源使用量,实现资源弹性管理。

谈谈你对 container 的理解?

1)Container 作为资源分配和调度的基本单位,其中封装了的资源如内存, CPU,磁盘,网络带宽等。 目前 yarn 仅仅封装内存和 CPU
2)Container 由 ApplicationMaster 向 ResourceManager 申请的,由 ResouceManager 中的资源调度器异步分配给 ApplicationMaster
3)Container 的运行是由 ApplicationMaster 向资源所在的 NodeManager 发起的,Container 运行时需提供内部执行的任务命令

Spark 使用 parquet 文件存储格式能带来哪些好处?

1)如果说 HDFS 是大数据时代分布式文件系统首选标准,那么 parquet 则 是整个大数据时代文件存储格式实时首选标准。
2)速度更快:从使用 spark sql 操作普通文件 CSV 和 parquet 文件速度对 比上看,绝大多数情况会比使用 csv 等普通文件速度提升 10 倍左右,在一些 普通文件系统无法在 spark 上成功运行的情况下,使用 parquet 很多时候可以 成功运行。
3)parquet 的压缩技术非常稳定出色,在 spark sql 中对压缩技术的处理可 能无法正常的完成工作(例如会导致 lost task,lost executor)但是此时如果 使用 parquet 就可以正常的完成。
4)极大的减少磁盘 I/o,通常情况下能够减少 75%的存储空间,由此可以极大 的减少 spark sql 处理数据的时候的数据输入内容,尤其是在 spark1.6x 中有 个下推过滤器在一些情况下可以极大的减少磁盘的 IO 和内存的占用,(下推 过滤器)。
5)spark 1.6x parquet 方式极大的提升了扫描的吞吐量,极大提高了数据的 查找速度 spark1.6 和 spark1.5x 相比而言,提升了大约 1 倍的速度,在 spark1.6X 中,操作 parquet 时候 cpu 也进行了极大的优化,有效的降低了 cpu 消耗。
6)采用 parquet 可以极大的优化 spark 的调度和执行。我们测试 spark 如 果用 parquet 可以有效的减少 stage 的执行消耗,同时可以优化执行路径

介绍 parition 和 block 有什么关联关系?

1)hdfs 中的 block 是分布式存储的最小单元,等分,可设置冗余,这样设计 有一部分磁盘空间的浪费,但是整齐的 block 大小,便于快速找到、读取对应 的内容;
2)Spark 中的 partion 是弹性分布式数据集 RDD 的最小单元,RDD 是由分 布在各个节点上的 partion 组成的。partion 是指的 spark 在计算过程中,生 成的数据在计算空间内最小单元,同一份数据(RDD)的 partion 大小不一, 数量不定,是根据 application 里的算子和最初读入的数据分块数量决定;
3)block 位于存储空间、partion 位于计算空间,block 的大小是固定的、 partion 大小是不固定的,是从 2 个不同的角度去看数据。

Spark 应用程序的执行过程是什么?

1)构建 Spark Application 的运行环境(启动 SparkContext), SparkContext 向资源管理器(可以是 Standalone、Mesos 或 YARN)注册 并申请运行 Executor 资源;
2)资源管理器分配 Executor 资源并启动 StandaloneExecutorBackend, Executor 运行情况将随着心跳发送到资源管理器上;
3)SparkContext 构建成 DAG 图,将 DAG 图分解成 Stage,并把 Taskset 发送给 Task Scheduler。Executor 向 SparkContext 申请 Task,Task Scheduler 将 Task 发放给 Executor 运行同时 SparkContext 将应用程序代码 发放给 Executor;
4)Task 在 Executor 上运行,运行完毕释放所有资源。

不需要排序的 hash shuffle 是否一定比需要排序的 sort shuffle 速 度快?

不一定,当数据规模小,Hash shuffle 快于 Sorted Shuffle 数据规模大的时 候;当数据量大,sorted Shuffle 会比 Hash shuffle 快很多,因为数量大的有 很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x 之前 spark 使用 hash,适合处理中小规模,1.x 之后,增加了 Sorted shuffle,Spark 更能胜 任大规模处理了。

Sort-based shuffle 的缺陷?

1)如果 mapper 中 task 的数量过大,依旧会产生很多小文件,此时在 shuffle 传递数据的过程中 reducer 段,reduce 会需要同时大量的记录进行反 序列化,导致大量的内存消耗和 GC 的巨大负担,造成系统缓慢甚至崩溃。
2)如果需要在分片内也进行排序,此时需要进行 mapper 段和 reducer 段的 两次排序。

spark.storage.memoryFraction 参数的含义,实际生产中如何调优?

1)用于设置 RDD 持久化数据在 Executor 内存中能占的比例,默认是 0.6,, 默认 Executor 60%的内存,可以用来保存持久化的 RDD 数据。根据你选择的 不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写 入磁盘;
2)如果持久化操作比较多,可以提高 spark.storage.memoryFraction 参数, 使得更多的持久化数据保存在内存中,提高数据的读取性能,如果 shuffle 的 操作比较多,有很多的数据读写操作到 JVM 中,那么应该调小一点,节约出更 多的内存给 JVM,避免过多的 JVM gc 发生。在 web ui 中观察如果发现 gc 时间很长,可以设置 spark.storage.memoryFraction 更小一点。

转载

本文转自 全网最全数仓规范

规范该怎么落地?


1、 规范制定

从 0 到 1,从无到有,这个环节应该有 Leader 或架构师,充分考虑公司实际情况,参考行业标准或约定俗成的规范,综合统一制定。

也可以将规范拆分后交由各个部分核心开发人员编写, Leader 或架构师统一整合。比如我们之前的团队就是,模型设计师负责模型设计规范,ETL 工程师负责 ETL 开发规范,BI 开发人员制定前端开发规范,部署上线规范直接采用项目上已有的即可。

总体上,初稿应该尽量保证规范的完整性和各个部分间的兼容性。

2、规范讨论

初稿完成后,难免有考虑不周的情况,这时候最好有 Leader 牵头,组织部分核心成员(人数不易太多,三五个即可。人多容易造成混乱、决策困难、没有人提意见造成 Leader 一言堂等等问题。)进一步完善各个细节,纠正初稿的不足。

多人共同完善的规范,理论上来讲不会有什么大问题了。

3、规范推行

定稿后,规范已经具备了全面推广的条件,可以下发所有团队成员。

可以通过群聊天,也可以通过正式回邮件的方式,当然为了引起大家的重视,可以专门组会宣讲。

分发宣讲后进入执行阶段,所有人必须严格遵守,如有违犯给予警告,严重的给予惩罚,屡劝不改的取消年终调级调薪等。

为了确保规范的贯彻落实,除了通过以上两点引起全员重视外,还需要组织、制度、流程上的多方面保障。

数据模型应该有统一归口,比如数据架构师,架构师定期检查模型是否合理合规。

组织数据开发人员,定期 Review 每个人的代码,但不必针对个人更不要上纲上线,目的是通过对比和讨论让大家明白什么样的才是好代码,最终使“写好代码”成为基本素养。没有条件的话就有 Leader 负责定期检查,有问题的私下指出来帮助组员逐渐规范。

入职新人,熟读规范后,还应该安排专人指导,是合规性检查的重点关注对象。

讲到这里,大家有没有看出来一个问题?

规范的执行监督,上边提到的,更多是依靠制度流程以及相关人的自觉性,制度流程又依赖于人。这会带来如下几个问题:

短期坚持还好,但长期的专注很难。

有时候人忙起来了,快速产出和规范该选哪个?代码 Review 还要不要做?新建的表要不要找数据架构师审核?

数据建模最好是有专门的人或者小团队去做,其他人使用,这往往会影响整体效率,所以通常都是谁用谁建,但撒出去后再想靠人去检查合规性,真的就太难了。

有条件的最好引入相应的工具加强监管。

比如,我们有指标体系元数据、有词根库元数据、有建表的元数据、有 ETL 流程的元数据等等。

那我们是否可以开发部分报表或其它页面,通过 UI 辅助人去检查,或者通过校验元数据的方法去监管(比如备注是否为空、字段或表命名里的词根是否都在词根库里存在、表或页面等用到的指标是否都存在于指标体系、数据血缘中是否存在闭环或者孤立的节点)。

哈哈,讲了这么多,了解过数据治理的彭友们会不会感觉很熟悉?数仓建设的一开始就需要考虑这些的,最好的管理在于治未病。

5、规范完善

发行稿,从大面上应该不会有啥问题,但细节上可能会有考虑不周的情况,在宣讲阶段、执行阶段遇到问题阻碍的时候,应该根据实际情况对规范做出调整,唯有经过实践检验才能愈发完善,相信经过一段时间的持续实践,规范会成为组织文化的一部分,进而降低沟通成本、提高开发效率、保证交付质量,从而实现团队和个人的双赢。

数仓规范有哪些


这里,我们把数仓规范,一共分为四大类:设计规范、流程规范、质量管理规范、安全规范。

设计规范,又划分为四部分:数据模型设计、命名规范、指标体系设计、词根库。

流程规范,主要是从数仓管理的角度,对数仓场景下的各种流程进行约束。核心流程一共提炼出来五类:需求提交、模型设计、ETL开发、前端开发、上线流程。

质量管控规范,之所以单独列出来,是因为数据质量,跟模型设计一样,对数仓建设的成败关系极大。试想下,一个数据质量都无法保证的数据仓库,有谁会用? 数据质量规范,主要是从数据流动的角度分为三类:源端管控、数仓管理、应用管控。

安全规范,随着国家、社会、企业对数据的越来越重视,另一方面随着互联网的普及使得个人隐私变的越来越难以保证,数据泄露时有发生。数据安全对于数据仓库的重要程度急速提升,所以安全规范被单列了出来。从大的层面上安全规范分为三类:网络安全、账号安全、数据安全。

设计规范

数据模型设计

横向分层

说明

分层设计是数据架构设计的产出之一,在模型设计环节做为强制规范遵守。

分层规范
ODS

贴源层,原始数据不做变化或者仅做最简单的补全后存入。
数据域划分,依据是数据源。

DWD

对数据源做清洗、转换、补全、编码转换后加载到明细数据层。
数据域划分,依据参考下边的纵向分域。

DWS

汇总数据层+主题宽表。
数据域划分,依据参考下边的纵向分域。

ADS

应用层,面向最终应用。

主题域划分,依据是最终应用。生命周期也与应用同步。

层次调用规范

禁止反向调用
ODS 只能被 DWD 调用。
DWD 可以被 DWS 和 ADS 调用。
DWS 只能被 ADS 调用。
数据应用可以调用 DWD、DWS、ADS,但建议优先考虑使用汇总度高的数据。
ODS->DWD->DWS>ADS
ODS->DWD->ADS

纵向分域

定义

主题域通常是联系较为紧密的数据主题的集合,方便寻找和使用数据。

基本原则

高内聚、低耦合。
数量不能太多。建议不超过十个。
必须保持稳定。既能涵盖当 前所有的业务需求,又能在新业务进入时无影响地被包含进已有的数据域中或扩展新的数据域。
需要结合团队和业务的实际情况,比如业务是否稳定、团队成员建模水平等。
适度的抽象。太低不好适应变化,太高不易于理解使用。

分类

数据/业务主题域
依据业务流程划分,实现相对容易。
分析主体域
面向分析场景,实现较难,对业务理解、抽象能力等要求高。

划分依据

按照业务或业务过程划分:比如一个靠销售广告位置的门户网站主题域可能会有广告域,客户域等,而广告域可能就会有广告的库存,销售分析、内部投放分析等主题。

根据需求方划分:比如需求方为财务部,就可以设定对应的财务主题域,而财务主题域里面可能就会有员工工资分析,投资回报比分析等主题。

按照功能或应用划分:比如微信中的朋友圈数据域、群聊数据域等,而朋友圈数据域可能就会有用户动态信息主题、广告主题等。

按照部门划分:比如可能会有运营域、技术域等,运营域中可能会有工资支出分析、活动宣传效果分析等主题。

基本原则

高内聚和低耦合
核心模型与扩展模型分离
公共处理逻辑下沉及单一
成本与性能平衡
数据可回滚
一致性
命名清晰、可理解

附加字段

维表:创建时间、更新时间
事实表:ETL 日期、更新时间

其它要求

表、字段的备注信息,必须言简意赅,在描述清楚的前提下尽量简洁。
字段类型的约束:比如字符串用 String,数值用 Int,年月日都用 String 比如 yyyyMMdd 等。

命名规范

统一规范

采用蛇形命名法,即采用一个下划线分隔词根。
优先使用词根中已有关键字(数仓标准配置中的词根管理),定期 Review 新增命名的不合理性。
禁止采用非标准的缩写。
命名一律采用小写,只能以字母开头。
命名不宜过长。

专有规范


  • 分层-分域-分词根-分时间周期
    正式表,所在层级名称+数据域+表描述+时间周期或加载策略,如增量、快照、拉链/小时、日、周、月、季、年
    中间表,对应正式表+_mid+阿拉伯数字
    临时表,z+创建者姓名检查+表名
  • 视图
    参照表命名规范+_v
  • 字段
    优先从词根中取,多次出现的要增加到词根库
  • 任务
    与目标表名相同
  • 指标
    原子指标
    业务修饰词 + 词根
    衍生指标
    原子指标+时间周期(可选)
    派生指标
    一个原子指标+多个修饰词(可选)+时间周期

代码设计规范

脚本是否有备注、复杂计算逻辑是否有注释释。
任务是否支持多次重跑而输出不变,不能有 insert into 语句。
分区表是否使用分区键过滤并且有有效裁剪。
外连接的过逑条件是否使用正确,例如在左连接的 where 语句存在右表的过滤条件。
关联小表,是否使用/*+ map join * /。
不允许引用别的计算任务临时表。
原则上不允许存在一个任务更新多个目标表。
是否存在笛卡尔积。
禁止在代码里面使用 drop、create、rename 等 DDL 语句。
使用动态分区时,有没有检查分区键值为 NULL 的情况。
对于重要的任务 DQC 质量监控规则是否配置,严禁裸奔。
代码中有没有进行适当的规避数据倾斜语句。

指标体系建设

  • 指标层级划分方式
    • 按分析主题
      • 一级分类
      • 二级分类
    • 按业务过程
      • 一级分类
      • 二级分类
      • 三级分类
  • 指标定义
    • 内容
      • 所属分类
      • 指标类别
      • 名称
      • 描述
      • 口径/算法
      • 计量单位
      • 适用维度
    • 原则
      • 唯一性
      • 可扩展
      • 易理解
    • 类别
      • 原子指标(某一业务事件行为下的度量,不可再拆分的指标) 例如:订单金额
      • 衍生指标(对原子指标进行四则运算)
      • 派生指标(统计周期+统计粒度+业务限定+原子指标)例如:最近一天+新创建的+订单个数(阿里大数据之路对于派生指标的定义:派生指标=原子指标+时间周期修饰词+其它修饰词。唯一归属于某一个原子指标,继承原子指标的数据域)
    • 说明:网上对于指标分类说法不统一,大家知道咋回事儿就行了。搜了一下阿里的大数据之路,没有衍生指标的概念。说法一:衍生指标=派生指标。那么用我上边派生指标的定义即可。说法二:衍生指标是对原子指标进行四则运算得到的。那么衍生指标就是原子指标增加减少几个修饰词或者时间周期扩大缩小后得到的。所以感觉衍生指标有点鸡肋搞不好就变成原子/派生指标了。
  • 指标管理流程
    • 指标新增申请
    • 初审:明确指标口径,检查指标库是否包含
    • 二审:审核指标定义需要的各项元素是否准确完备
    • 入指标库

词根库

  • 定义
    • 把可能会多次用到的短语,集中命名,保证全局范围内的命名含义一致性。
  • 内容
    • 所属分类
    • 名称
    • 英文简称
    • 数据类型
    • 备注
  • 分类
    • 普通词根:描述事物的最小单元体,如:交易-trade。
    • 专有词根:具备约定成俗或行业专属的描述体,如:美元-USD。
  • 公共字段
    • 公共字段=词根组合+其它关键词
    • 公共字段放入词根库不太严谨,但字段命名时候可以直接取用,降低了命名不一致的风险,所以工具化不太完善的公司推荐这样使用。

数仓中的10大陷阱

在《数据仓库工具箱》一书中,提到了数据仓库设计中的十大陷阱。在着手进行数据仓库项目之前,可以先了解这10个常见陷阱,避免在工作中的弯路。

陷阱10

过于迷恋技术和数据,而忘了重点是业务需求和目标

数仓归根结底是要解决业务问题的,对于技术人员,各种高大上的数据架构和层出不穷的新技术通常会比去了解用户需求更具有吸引力。 但是,世界上不存在完美的技术架构,只要是能够满足当下及未来可见的业务需求即可,合适才是最好的。应当把时间投入到理解和梳理业务上,这样才能构建出相对合理的数据模型,从而提高模型的复用性,及时响应业务需求。

陷阱9

没有或找不到一个有影响力的、精通业务、明白事理的高级管理人员作为数仓建设的发起人

说到这点,内心深有体会。之前入职的一家公司,规模也不小,在C轮拿到了阿里几亿的战略投资。我入职后负责从0到1的搭建,心痛的是,全公司竟然找不到一个精通业务还动数据的管理人员,在我多次邮件中申请,才又招了一位高级管理人员。可是毕竟人刚上岗,对业务又不是很熟悉,各个部门都不是很配合,可想而知数仓建设的推进多么痛苦……

数仓建设是多部门合作的结果,只有这样才能够真正的实现数据赋能业务。所以没有高层的支持和重视,数仓的建设将会很难推进。缺乏远见,热情,支持以及公司的资金投资,注定会走向失败。

陷阱8

将项目处理为一个巨大的持续多年的项目,而不是追求更容易管理的、虽然仍然具有挑战性的迭代开发工作

这是一个经常出现的陷阱,试图建设一个庞大的,无所不包的系统,通常是不可取的。似乎只要建设一个“巨型无比“的系统就可以完成任何工作,解决任何问题一样,其实结果往往会适得其反。更糟的是,管理这些项目的人往往没有与业务进行足够详细的协商,从而开发有用的产品。一言以蔽之,挂历上的模特,中看不中用。

陷阱7

分配大量的精力去构建规范化数据结构,在最终呈现数据之前,用尽所有预算

这个陷阱不像其他陷阱一样重要,当然公司有资金,可以随便玩。在Kimball方法论中,对维度模型进行更改所带来的业务风险要比更改源事务数据库小得多,所以应该留出足够的资源来构建它们,但是很少有中小型企业在资源上进行投资以创建完全一致的事实和维度表,更不用说OLAP数据立方体了,所以再多的理论也解决不了实际的问题,先跑起来最重要,不管姿势是否优雅。

陷阱6

将主要精力投入到后端操作型性能和易开发性,而没有重点考虑前端查询的性能和易用性

为用户提供易于阅读的数据展示形式并具有良好的查询性能是优先考虑。

目前我们正在着手做用户路径查询的功能模块,由于前端设计sql过于复杂,性能一直在优化,直到最近上了kudu,体验感好了许多。

陷阱5

存在于应用层的可查询数据设计得过于复杂,应该用过简化解决方案开发出更适合需要的产品

通常,大多数业务用户都希望简化数据表示方式。此外,对这些数据的访问应限于尽可能少入口。提高获取数据的易用性,会大大提升数仓的价值。

其实,这点就引入了数据湖的概念,将数据异构同质化,数据湖的概念会专篇细讲。同时也能发现,无论多么高大上的概念都是基于实际开发中需要解决的问题,脱离了实际,一切都是空谈。

陷阱4

烟囱式开发,不考虑使用可共享的、一致性维度将数据模型联系在一起

开发中常提到维度的一致性,当维度在整个数据仓库中不一致时,就是典型的烟囱式开发。其实,我们使用的维度在本质上是相同的,但是由于数据来自于不同的业务源,并会被随意更新。

典型的例子是“时间”维度,在维模型不一致的情况下,最终用户通常完全不知道为什么一个报表中的数据可能与其他地方生成的报表有显着差异。一种好的做法是将数据模型与主数据管理解决方案联系在一起,该解决方案包含可以在整个数据仓库中普遍使用的参考数据。

陷阱3

只将汇总数据加载到数据仓库中

在事务数据库和数据仓库之间创建的每个ETL过程中,必须保证要有至少一份原子数据存储到数仓中,方便溯源,即将数据同步一份放在准备区(ODS层)

陷阱2

臆想业务、业务需求及分析,其涉及的数据及支持技术都是静态的

我们常提到面向企业级的数据模型,什么意思呢?就是尽量不要开发仅限于某个特定业务需求和分析的数据模型,因为业务在不断地发生变化。一个差劲的模型设计通常是开发了重复的数据模型以及命名约定不一致的。

在设计一个“完美”的事实表、维表与规范化程度之间取得平衡并不是一件容易的事情,但是开发出可伸缩的以适应业务发展的数据模型是非常重要的。这就对业务的理解能力要求很高了。

陷阱1

数据仓库是否成功直接取决于业务人员。如果用户不买账,那么所有的工作都是徒劳

这个是很致命的陷阱,如果从一开始都没有得到业务和高层的重视和认可,那么数仓项目多半是会夭折。从用户的角度出发,如果用户对数仓的数据存在怀疑,对易用性存在吐槽,或者根本点讲未将数据仓库系统当成他们制定决策的基础,那么根本就不会去使用它,结局只会game over。

数仓中的一些常见坏现象有哪些

  • 接到了一个需求,不知道该从那张表出数,表A貌似可以,表B好像也行。问了同事甲,他说他每次都是从C表出的。对着三张表探索了好久,发现谁跟谁都对不上,算了吧,我从源头再算一次吧,结果又变出来一张表D。

  • 数据库里几千张表,好像我用到的也就那么十几张,其它的都是干啥用的呢,问了一圈没有人知道,删掉吧?更没有人敢动。

  • 有个流程报错了,领导让我去看一下,点进去后,屎一样的代码完全看不懂,另外,找了好久死活找不到上游依赖。

  • 有位同事要离职,他负责的那部分内容,换了一个人接手,累死累活好多天依然捋不出个所以然,一气之下又走了一个人。

由于以上种种问题,造成数仓团队的整体开发效率、产出质量、工作幸福感、数仓维护成本等等越来越差。随着人员流动,通常受累的往往是那些任劳任怨、对公司忠诚的员工。

相信做过数据开发的人,多多少少都会有过上边提到的部分苦恼。我觉得问题的根源通常在于没有规范或者规范没有得到贯彻。

OLAP

OLAP和OLTP不同的是,表中单条记录本身并不是查询所关心的,比较典型的特点包括有聚合类算子、涉及多表Join,查询所用谓语/条件没有索引。由于这些操作都非常耗计算资源,而且数据仓库相比数据库在数据量上大很多,因此,OLAP类查询经常表现为cpu-bound而不是io-bound。

按照建模类型划分:

MOLAP

这应该算最传统的数仓了,1993年olap概念提出来时,指的就是MOLAP数仓,M即表示多维。大多数MOLAP产品均对原始数据进行预计算得到用户可能需要的所有结果,将其存储到优化过的多维数组中,也就是常听到的 数据立方体

由于所有可能结果均已计算出来并持久化存储,查询时无需进行复杂计算,且以数组形式可以进行高效的免索引数据访问,因此用户发起的查询均能够稳定地快速响应。这些结果集是高度结构化的,可以进行压缩/编码来减少存储占用空间。

但高性能并不是没有代价的。首先,MOLAP需要进行预计算,这会花去很多时间。如果每次写入增量数据后均要进行全量预计算,显然是低效率的,因此支持仅对增量数据进行迭代计算非常重要。其次,如果业务发生需求变更,需要进行预定模型之外新的查询操作,现有的MOLAP实例就无能为力了,只能重新进行建模和预计算。

在开源软件中,由eBay开发并贡献给Apache基金会的Kylin即属于这类OLAP引擎,支持在百亿规模的数据集上进行亚秒级查询。

下图是官方对Kylin的描述。

kylin

代表

  • Kylin是完全的预计算引擎,通过枚举所有维度的组合,建立各种Cube进行提前聚合,以HBase为基础的OLAP引擎。
  • Druid则是轻量级的提前聚合(roll-up),同时根据倒排索引以及bitmap提高查询效率的时间序列数据和存储引擎。

优点

  • Kylin
  1. 支持数据规模超大(HBase)
  2. 易用性强,支持标准SQL
  3. 性能很高,查询速度很快
  • Druid
  1. 支持的数据规模大(本地存储+DeepStorage–HDFS)
  2. 性能高,列存压缩,预聚合加上倒排索引以及位图索引,秒级查询
  3. 实时性高,可以通过kafka实时导入数据

缺点

  • Kylin
  1. 灵活性较弱,不支持adhoc查询;且没有二级索引,过滤时性能一般;不支持join以及对数据的更新。
  2. 处理方式复杂,需要定义Cube预计算;当维度超过20个时,存储可能会爆炸式增长;且无法查询明细数据了;维护复杂。
  3. 实时性很差,很多时候只能查询前一天或几个小时前的数据。
  • Druid
  1. 灵活性适中,虽然维度之间随意组合,但不支持adhoc查询,不能自由组合查询,且丢失了明细数据。
  2. 易用性较差,不支持join,不支持更新,sql支持很弱(有些插件类似于pinot的PQL语言),只能JSON格式查询;对于去重操作不能精准去重。
  3. 处理方式复杂,需要流处理引擎将数据join成宽表,维护相对复杂;对内存要求较高。

场景

  • Kylin:适合对实时数据需求不高,但响应时间较高的查询,且维度较多,需求较为固定的特定查询;而不适合实时性要求高的adhoc类查询。
  • Druid:数据量大,对实时性要求高且响应时间短,以及维度较少且需求固定的简单聚合类查询(sum,count,TopN),多以Storm和Flink组合进行预处理;而不适合需要join、update和支持SQL和窗口函数等复杂的adhoc查询;不适合用于SQL复杂数据分析的场景。

ROLAP

与MOLAP相反,ROLAP无需预计算,直接在构成多维数据模型的事实表和维度表上进行计算。R即表示关系型(Relational)。显然,这种方式相比MOLAP更具可扩展性,增量数据导入后,无需进行重新计算,用户有新的查询需求时只需写好正确的SQL语句既能完成获取所需的结果。

但ROLAP的不足也很明显,尤其是在数据体量巨大的场景下,用户提交SQL后,获取查询结果所需的时间无法准确预知,可能秒回,也可能需要花费数十分钟甚至数小时。本质上,ROLAP是把MOLAP预计算所需的时间分摊到了用户的每次查询上,肯定会影响用户的查询体验。

当然ROLAP的性能是否能够接受,取决于用户查询的SQL类型,数据规模以及用户对性能的预期。对于相对简单的SQL,比如TPCH中的Query响应时间较快。但如果是复杂SQL,比如TPC-DS中的数据分析和挖掘类的Query,可能需要数分钟。

相比MOLAP,ROLAP的使用门槛更低,在完成星型或雪花型模型的构建,创建对应schema的事实表和维度表并导入数据后,用户只需会写出符合需求的SQL,就可以得到想要的结果。相比创建 数据立方体,显然更加方便。

目前生产环境使用较多的开源ROLAP主要可以分为2大类,一个是宽表模型,另一个是多表组合模型(就是前述的星型或雪花型)。

宽表类型

宽表模型能够提供比多表组合模型更好的查询性能,不足的是支持的SQL操作类型比较有限,比如对Join等复杂操作支持较弱或不支持。

目前该类OLAP系统包括DruidClickHouse等,两者各有优势,Druid支持更大的数据规模,具备一定的预聚合能力,通过倒排索引和位图索引进一步优化查询性能,在广告分析场景、监控报警等时序类应用均有广泛使用;ClickHouse部署架构简单,易用,保存明细数据,依托其向量化查询、减枝等优化能力,具备强劲的查询性能。两者均具备较高的数据实时性,在互联网企业均有广泛使用。

除了上面介绍的Druid和ClickHouse外,ElasticSearch和Solar也可以归为宽表模型。但其系统设计架构有较大不同,这两个一般称为搜索引擎,通过倒排索引,应用Scatter-Gather计算模型提高查询性能。对于搜索类的查询效果较好,但当数据量较大或进行扫描聚合类查询时,查询性能会有较大影响。

代表

  • ClickHouse是个列存数据库,保存原始明细数据,通过MergeTree使得数据存储本地化来提高性能。 是个单机版超高性能的数据库

优点

  1. 性能高,列存压缩比高,通过索引实现秒级响应
  2. 实时性强,支持kafka导入
  3. 处理方式简单,无需预处理,保存明细数据

缺点

  1. 数据规模一般
  2. 灵活性差,不支持任意的adhoc查询,join的支持不好。
  3. 易用性较弱,SQL语法不标准,不支持窗口函数等;维护成本高

多表组合模型

采用星型或雪花型建模是最通用的一种ROLAP系统,常见的包括GreenPlumPrestoImpala等,他们均基于MPP架构,采用该模型和架构的系统具有支持的数据量大、扩展性较好、灵活易用和支持的SQL类型多样等优点。

相比其他类型ROLAP和MOLAP,该类系统性能不具有优势,实时性较一般。通用系统往往比专用系统更难实现和进行优化,这是因为通用系统需要考虑的场景更多,支持的查询类型更丰富。而专用系统只需要针对所服务的某个特定场景进行优化即可,相对复杂度会有所降低。

对于ROLAP系统,尤其是星型或雪花型的系统,如果能够尽可能得缩短响应时间非常重要,这将是该系统的核心竞争力。

代表

  • PrestoImpala以及Spark SQL等利用关系模型来处理OLAP查询,通过并发来提高查询性能。同时三者是有很多相似点。我日常工作中,接触最多也就是这三兄弟和一个大哥(Hive)。Hive就不多谈了,是基于MR最基础的OLAP引擎,也是对于大数据量的分析支持最好得。

优点

  1. 支持的计算数据规模大(非存储引擎)
  2. 灵活性高,随意查询数据
  3. 易用性强,支持标准SQL以及多表join和窗口函数
  4. 处理方式简单,无需预处理,全部后处理,没有冗余数据

缺点

  1. 性能较差,当查询复杂度高且数据量大时,可能分钟级别的响应。同时其不是存储引擎,因此没有本地存储,当join时shuffle开销大,性能差
    举例:SparkSql为例子,其只是计算引擎,导致需要从外部加载数据,从而数据的实时性得不到保证;多表join的时候性能也很难得到秒级的响应。
  2. 实时性较差,不支持数据的实时导入,偏离线处理。 如果需要实时数据,经常的做法是Presto或者Impala和Kudu的结合,解决了Kudu的磁盘存储问题,实时性能也不会太差。

HOLAP

MOLAP和ROLAP各有优缺点,而且是互斥的。如果能够将两者的优点进行互补,那么是个更好的选择。而HOLAP的出现就是这个目的,H表示混合型(Hybrid),这个想法很朴素直接。对于查询频繁而稳定但又耗时的那些SQL,通过预计算来提速;对于较快的查询、发生次数较少或新的查询需求,像ROLAP一样直接通过SQL操作事实表和维度表。

目前似乎没有开源的OLAP系统属于这个类型。相信未来HOLAP可能会得到进一步发展,并获得更大规模的使用。