spark 八股文

[toc]

约定规则

Apache Spark有哪些常见的稳定版本,Spark1.6.0的数字分别代表什么意思?

常见的大的稳定版本有Spark 1.3,Spark1.6, Spark 2.0 ,Spark1.6.0的数字含义

  • 第一个数字:1
    major version : 代表大版本更新,一般都会有一些 api 的变化,以及大的优化或是一些结构的改变;
  • 第二个数字:6
    minor version : 代表小版本更新,一般会新加 api,或者是对当前的 api 就行优化,或者是其他内容的更新,比如说 WEB UI 的更新等等;
  • 第三个数字:0
    patch version , 代表修复当前小版本存在的一些 bug,基本不会有任何 api 的改变和功能更新;记得有一个大神曾经说过,如果要切换 spark 版本的话,最好选 patch version 非 0 的版本,因为一般类似于 1.2.0, … 1.6.0 这样的版本是属于大更新的,有可能会有一些隐藏的 bug 或是不稳定性存在,所以最好选择 1.2.1, … 1.6.1 这样的版本。

spark在1.6之前使用的是Akka进行通信,1.6及以后是基于netty。
现阶段的Flink是基于Akka+Netty

基本概念

整体概念

你是怎么理解Spark,它的特点是什么?

Spark是一个基于内存的,用于大规模数据处理(离线计算、实时计算、快速查询(交互式查询))的统一分析引擎。

它内部的组成模块,包含

  • SparkCore,
  • SparkSQL,
  • Spark Streaming,
  • SparkMLlib,
  • SparkGraghx等。

具体的,

  • SparkCore:Spark 的基础组件,提供了任务调度、内存管理和错误恢复等功能。它还定义了 RDD(Resilient Distributed Datasets)数据结构,用于在集群上进行分布式计算。
  • SparkSQL:用于处理结构化数据的组件,支持使用 SQL 查询数据。它提供了 DataFrame 和 Dataset 两个 API,可以方便地进行数据处理和分析。适合处理大规模的结构化数据。
  • Spark Streaming:用于实时数据处理的组件,可以将实时数据流划分为小批次进行处理。它支持各种数据源,如 Kafka、Flume 和 HDFS,并提供了窗口操作和状态管理等功能。适合实时数据分析和流式处理。
  • SparkMLlib:用于机器学习的组件,提供了常见的机器学习算法和工具。它支持分类、回归、聚类和推荐等任务,并提供了特征提取、模型评估和模型调优等功能。适合大规模的机器学习任务。
  • SparkGraghx等:用于图计算的组件,提供了图结构的抽象和常见的图算法。它支持图的构建、遍历和计算,并提供了图分析和图挖掘等功能。适合社交网络分析和图计算任务。

Spark的主要特点包括:

  • 快:Spark计算速度是MapReduce计算速度的10-100倍,Spark使用内存计算技术,以及基于弹性分布式数据集(RDD)的计算模型,可以在内存中对数据进行高效处理,从而比传统的基于磁盘的计算系统更快速。
  • 容错性:Spark可以在节点故障时重新计算丢失的数据,从而避免了数据丢失的问题,保证了任务的可靠性。
  • 多语言支持:Spark提供了多种编程语言API,包括Java、Scala、Python和R等,使得开发者可以使用自己熟悉的语言进行数据处理任务。
  • 数据处理能力:Spark可以处理各种类型的数据,包括结构化数据、半结构化数据和非结构化数据等,并且支持各种数据源的读写操作,如HDFS、Hive、MySQL等。
  • 可扩展性:Spark可以在大规模集群上运行,支持自动分区和并行化处理,从而可以处理PB级别的数据。

总的来说,Spark具有高效的性能、容错性、多语言支持、强大的数据处理能力和良好的可扩展性,适用于各种大规模数据处理任务,如机器学习、图像处理、数据挖掘、日志分析等。

有了mapreduce,为什么还要spark?

mapreduce在设计上存在缺陷,主要为

  • 缺少对迭代的支持
  • 中间数据输出到硬盘存储,产生了较高的延迟

Spark 解决了 Hadoop 的哪些问题?

  1. MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;

Spark:Spark 采用 RDD 计算模型,简单容易上手。

  1. MR:只提供 map 和 reduce 两个操作,表达能力欠缺;

Spark:Spark 采用更加丰富的算子模型,包括 map、flatmap、groupbykey、reducebykey 等;

  1. MR:一个 job 只能包含 map 和 reduce 两个阶段,复杂的任务需要包含很多个 job,这些 job 之间的管理以来需要开发者自己进行管理;

Spark:Spark 中一个 job 可以包含多个转换操作,在调度时可以生成多个 stage,而且如果多个 map 操作的分区不变,是可以放在同一个 task 里面去执行;

  1. MR:中间结果存放在 hdfs 中;

Spark:Spark 的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是 hdfs;

  1. MR:只有等到所有的 map task 执行完毕后才能执行 reduce task;

Spark:Spark 中分区相同的转换构成流水线在一个 task 中执行,分区不同的需要进行 shuffle 操作,被划分成不同的 stage 需要等待前面的 stage 执行完才能执行。

  1. MR:只适合 batch 批处理,时延高,对于交互式处理和实时处理支持不够;

Spark:Spark streaming 可以将流拆成时间间隔的 batch 进行处理,实时计算。

Hadoop 和 Spark 使用场景?

Hadoop/MapReduce 和 Spark 最适合的都是做离线型的数据分析,但 Hadoop 特别适合是单次分析的数据量“很大”的情景,而 Spark 则适用于数据量不是很大的情景。

一般情况下,对于中小互联网和企业级的大数据应用而言,单次分析的数量都不会“很大”,因此可以优先考虑使用 Spark。

业务通常认为 Spark 更适用于机器学习之类的“迭代式”应用,80GB 的压缩数据(解压后超过 200GB),10 个节点的集群规模,跑类似“sum+group-by”的应用,MapReduce 花了 5 分钟,而 spark 只需要 2 分钟。

Hadoop 和 Spark 的相同点和不同点?

  1. 计算模型抽象不同
    Hadoop 底层使用 MapReduce 计算架构,只有 map 和 reduce 两种操作,表达能力比较欠缺,而且在 MR 过程中会重复的读写 hdfs,造成大量的磁盘 io 读写操作,所以适合高时延环境下批处理计算的应用;

Spark 是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括 map、reduce、filter、flatmap、groupbykey、reducebykey、union 和 join 等,数据分析更加快速,所以适合低时延环境下计算的应用;

spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。

  1. 资源抽象不同
    Spark Task的启动时间快。spark是executor中启动线程的方式Spark采用fork线程的方式,而Hadoop是container中创建新的进程 的方式。
  2. 缓存机制不同
    Spark的缓存机制比HDFS的缓存机制高效。

通常来说,Spark 与 MapReduce 相比,Spark 运行效率更高。请说明效率更高来源于 Spark 内置的哪些机制?

  1. 数据模型优势,rdd的抽象比mapreduce更优秀
    spark是迭代式模型,而mapreduce是批处理模型。

    spark的 DAG、map之间以pipeline方式运行,无需刷磁盘。RDD抽象出一个被分区、不可变、且能并行操作的数据集;从HDFS读取的需要计算的数据,在经过处理后的中间结果会作为RDD单元缓存到内存当中,并可以作为下一次计算的输入信息。最终Spark只需要读取和写入一次HDFS,这样就避免了Hadoop MapReduce的大IO操作。

    MR最大缺点是采用非循环式的数据流模型,编程不够灵活,仅支持map和reduce两种操作。当一个计算逻辑复杂的时候,需要写多个MR任务运行【并且这些MR任务生成的结果在下一个MR任务使用时需要将数据持久化到磁盘才行,这就不可避免的进行遭遇大量磁盘IO影响效率】。

  2. spark可以通过缓存共享rdd,dataframe等,mapreduce只能通过hdfs缓存,效率相差较多。
    比如一个复杂逻辑中 ,一个map-reduce产生的结果A,如果在后续的map-reduce过程中需要反复用到,spark可以把A缓存到内存中,这样后续的map-reduce过程就只需要从内存中读取A即可,也会加快速度

  3. 容错机制 Linage。
    Spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在 节点内存中的只读性的数据集,这些集合石弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算

  4. spark 支持checkpoint,遇错可快速恢复。

  5. 资源模型不同
    spark是多线程模型,每个worker节点运行一个或多个executor服务,每个task作为线程运行在executor中,task间可共享资源,
    而MR是多进程模型,任务调度(频繁申请、释放资源)和启动开销大,不适合低延迟类型作业

重点部分就是 DAG 和 Lingae

Spark 与 MapReduce 的 Shuffle 的区别?

1)从 high-level 的角度来看,两者并没有大的差别。都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。 

2)从 low-level 的角度来看,两者差别不小。Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。
目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。 

3)从实现角度来看,两者也有不少差别。Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read的处理逻辑?以及两个处理逻辑应该怎么高效实现?Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

Spaek 程序执行,有时候默认为什么会产生很多 task,怎么修改默认 task 执行个数?

  1. 输入数据有很多 task,尤其是有很多小文件的时候,有多少个输入
    block 就会有多少个 task 启动;

  2. spark 中有 partition 的概念,每个 partition 都会对应一个 task,task 越多,在处理大规模数据的时候,就会越有效率。不过 task 并不是越多越好,如果平时测试,或者数据量没有那么大,则没有必要 task 数量太多。

  3. 参数可以通过 spark_home/conf/spark-default.conf 配置文件设置:

针对 spark sql 的 task 数量:spark.sql.shuffle.partitions=50

非 spark sql 程序设置生效:spark.default.parallelism=10

spark的有几种部署模式,每种模式特点?

  • 1)Local 本地模式  
    Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类  

    • local:只启动一个executor  
    • local[k]:启动k个executor  
    • local[ * ]:启动跟cpu数目相同的 executor
  • 2)standalone模式  
    分布式部署集群,构建一个基于Mster+Slaves的资源调度集群,Spark任务提交给Master运行。是Spark自身的一个调度系统,自带完整的服务,资源管理和任务监控,这个模式也是其他模式的基础。

  • 3)Spark on yarn模式  
    分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端。

  • 4)Spark On Mesos模式。 
    官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。

    用户可选择两种调度模式之一运行自己的应用程序:

    • (1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。

应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。

  • (2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。

rdd

你是如何理解Spark中(RDD)的概念?它的作用是什么?

  • 概念
    RDD是弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算 的集合。
  • 作用
    提供了一个抽象的数据模型,将具体的应用逻辑表达为一系列转换操作(函数)。另外不同RDD之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘IO和序列化开销,并且还提供了更多的API(map/reduec/filter/groupBy…)
  • 特点
    • 容错性:RDD具有容错性,因为它会自动将数据划分成多个分区,并在集群中的多个节点上进行复制,从而实现数据的高可靠性和容错性。
    • 数据共享:RDD允许多个并行操作共享相同的数据集合,以便在不同的计算步骤中复用数据,从而避免了重复的IO操作,提高了计算效率。
    • 优化计算:RDD通过支持多个转换操作和行动操作,允许进行复杂的计算和数据分析,同时也支持对计算过程进行优化,以便最大限度地减少计算成本。
    • 血统跟踪:RDD通过记录其前一个RDD的依赖关系,构建了一个有向无环图(DAG)来跟踪其数据处理流程,从而允许Spark在节点故障时重新计算丢失的分区,实现了弹性计算。
    • 血缘是指RDD之间的依赖关系,这种依赖关系可以通过DAG(有向无环图)来表示。每个RDD都会记录其父RDD的引用和产生该RDD的转换操作,这样,如果某个RDD的分区丢失或出现故障,Spark可以根据血统信息重新计算该RDD的丢失分区,实现了弹性计算。因此,RDD的血统跟踪是Spark实现容错性的重要机制。

RDD的弹性表现在哪几点?

1)自动的进行内存和磁盘的存储切换; 
2)基于Lineage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存;
6)数据调度弹性,DAG TASK调度和资源无关;
7)数据分片的高度弹性。

Spark 中的 RDD 机制理解吗?

rdd 分布式弹性数据集,简单的理解成一种数据结构,是 spark 框架上的通用货币。所有算子都是基于 rdd 来执行的,不同的场景会有不同的 rdd 实现类,但是都可以进行互相转换。rdd 执行过程中会形成 dag 图,然后形成 lineage 保证容错性等。从物理的角度来看 rdd 存储的是 block 和 node 之间的映射。

RDD 是 spark 提供的核心抽象,全称为弹性分布式数据集。

RDD 在逻辑上是一个 hdfs 文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让 RDD 中的数据可以被并行操作(分布式数据集)

比如有个 RDD 有 90W 数据,3 个 partition,则每个分区上有 30W 数据。RDD 通常通过 Hadoop 上的文件,即 HDFS 或者 HIVE 表来创建,还可以通过应用程序中的集合来创建;RDD 最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的 RDD partition 因为节点故障,导致数据丢失,那么 RDD 可以通过自己的数据来源重新计算该 partition。这一切对使用者都是透明的。

RDD 的数据默认存放在内存中,但是当内存资源不足时,spark 会自动将 RDD 数据写入磁盘。比如某结点内存只能处理 20W 数据,那么这 20W 数据就会放入内存中计算,剩下 10W 放到磁盘中。RDD 的弹性体现在于 RDD 上自动进行内存和磁盘之间权衡和切换的机制。

RDD有哪些缺陷?

  1. 不支持细粒度的写和更新操作,Spark写数据是粗粒度的,所谓粗粒度,就是批量写入数据,目的是为了提高效率。但是Spark读数据是细粒度的,也就是说可以一条条的读。

  2. 不支持增量迭代计算,如果对Flink熟悉,可以说下Flink支持增量迭代计算。

RDD 中 reduceBykey 与 groupByKey 哪个性能好,为什么?

reduceByKey:reduceByKey 会在结果发送至 reducer 之前会对每个 mapper 在本地进行 merge,有点类似于在 MapReduce 中的 combiner。这样做的好处在于,在 map 端进行一次 reduce 之后,数据量会大幅度减小,从而减小传输,保证 reduce 端能够更快的进行结果计算。

groupByKey:groupByKey 会对每一个 RDD 中的 value 值进行聚合形成一个序列(Iterator),此操作发生在 reduce 端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成 OutOfMemoryError。

所以在进行大量数据的 reduce 操作时候建议使用 reduceByKey。不仅可以提高速度,还可以防止使用 groupByKey 造成的内存溢出问题。

如何区分 RDD 的宽窄依赖?

窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;

宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。

介绍一下 cogroup rdd 实现原理,你在什么场景下用过这个 rdd?

cogroup:对多个(2~4)RDD 中的 KV 元素,每个 RDD 中相同 key 中的元素分别聚合成一个集合。

与 reduceByKey 不同的是:reduceByKey 针对一个 RDD中相同的 key 进行合并。而 cogroup 针对多个 RDD中相同的 key 的元素进行合并。

cogroup 的函数实现:这个实现根据要进行合并的两个 RDD 操作,生成一个 CoGroupedRDD 的实例,这个 RDD 的返回结果是把相同的 key 中两个 RDD 分别进行合并操作,最后返回的 RDD 的 value 是一个 Pair 的实例,这个实例包含两个 Iterable 的值,第一个值表示的是 RDD1 中相同 KEY 的值,第二个值表示的是 RDD2 中相同 key 的值。

由于做 cogroup 的操作,需要通过 partitioner 进行重新分区的操作,因此,执行这个流程时,需要执行一次 shuffle 的操作(如果要进行合并的两个 RDD 的都已经是 shuffle 后的 rdd,同时他们对应的 partitioner 相同时,就不需要执行 shuffle)。

场景:表关联查询或者处理重复的 key。

RDD 持久化原理?

spark 非常重要的一个功能特性就是可以将 RDD 持久化在内存中。

调用 cache()和 persist()方法即可。cache()和 persist()的区别在于,cache()是 persist()的一种简化方式,cache()的底层就是调用 persist()的无参版本 persist(MEMORY_ONLY),将数据持久化到内存中。

如果需要从内存中清除缓存,可以使用 unpersist()方法。RDD 持久化是可以手动选择不同的策略的。在调用 persist()时传入对应的 StorageLevel 即可。

依赖&DAG

DAG 是什么?

DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);

原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。

DAG 中为什么要划分 Stage?

并行计算。

一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。

如何划分 DAG 的 stage?

对于窄依赖,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在在同一个 stage 中,可以实现流水线计算)。

对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 stage。

DAG 划分为 Stage 的算法了解吗?

核心算法:回溯算法

从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。

Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。
然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止。

具体划分算法请参考:AMP 实验室发表的论文
《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》
http://xueshu.baidu.com/usercenter/paper/show?paperid=b33564e60f0a7e7a1889a9da10963461&site=xueshu_se

简述Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数?

  • 窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖
  • 宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)

那Stage是如何划分的呢?

  • 根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。

每个stage又根据什么决定task个数?

  • Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。

这里为了方便大家理解,贴上一张过程图

Spark中的宽依赖和窄依赖是指RDD之间的依赖关系类型。在Spark中,每个RDD都有一个或多个父RDD和一个或多个子RDD,RDD之间的依赖关系分为宽依赖和窄依赖两种类型:

  • 窄依赖(Narrow Dependency):指一个RDD的每个分区只依赖于父RDD的一个或多个分区,父RDD的每个分区最多只被一个子RDD的分区使用。窄依赖的特点是数据局部性高,可以在同一个节点上完成计算,从而提高计算效率。
  • 宽依赖(Wide Dependency):指一个RDD的一个或多个分区依赖于父RDD的多个分区,或者父RDD的同一个分区被多个子RDD的分区使用。宽依赖的特点是数据局部性较低,需要进行数据的洗牌操作(Shuffle),从而增加了计算成本和网络传输开销。

在Spark中,每个宽依赖和窄依赖之间的转换都会形成一个Stage,每个Stage包含一组具有相同依赖关系的Task。一个Stage中的Task个数由多个因素决定,包括可用的CPU核心数、可用内存大小、数据分区数等。

具体来说,Spark会将RDD划分成多个分区,并在每个分区上执行一个Task,以便实现并行计算。Task的个数通常等于RDD的分区数,这样可以确保所有Task都具有相同的计算量,并且可以在不同的节点上并行执行。

在Spark中,Stage划分的基本原则是:

  • 如果两个RDD之间存在宽依赖,那么它们就属于不同的Stage。这是因为宽依赖需要进行Shuffle操作,需要将数据从多个节点收集到一个节点上进行计算,这会产生较大的网络开销和计算成本。因此,将宽依赖放在不同的Stage中可以提高计算效率。
  • 而对于窄依赖,Spark会尽量将它们放在同一个Stage中,以便在同一个节点上执行计算,从而提高计算效率。

为什么要设计宽窄依赖?

  1. 对于窄依赖:

窄依赖的多个分区可以并行计算;

窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。

  1. 对于宽依赖:

划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。

算子

列举Spark常用的transformation和action算子,有哪些算子会导致Shuffle?

Spark中常用的transformation算子有:

  • map:对RDD中的每个元素应用一个函数,返回一个新的RDD。
  • filter:对RDD中的每个元素应用一个谓词函数,返回一个包含满足谓词的元素的新RDD。
  • flatMap:类似于map,但是每个输入元素可以映射到多个输出元素,返回一个新的RDD。
  • groupByKey:将具有相同key的元素进行分组,返回一个(key, values)的Tuple,其中values是一个迭代器。
  • reduceByKey:将具有相同key的元素进行分组,并将每个key对应的values应用一个reduce函数,返回一个(key, reduced value)的Tuple。
  • join:对两个具有相同key的RDD进行join操作,返回一个新的RDD。

常用的action算子有:

  • count:返回RDD中元素的个数。
  • collect:将RDD中的所有元素收集到Driver节点上,并返回一个数组。
  • first:返回RDD中的第一个元素。
  • take:返回RDD中前n个元素。
  • reduce:对RDD中的元素应用一个reduce函数,返回一个单个值。

上述算子中,groupByKey、reduceByKey、join等算子会导致Shuffle操作,因为它们需要将具有相同key的元素进行分组,而这些元素通常分布在不同的节点上。Shuffle操作会将数据从不同的节点收集到一个节点上进行计算,因此需要消耗大量的网络和计算资源。

join()和cogroup():这两个算子需要将具有相同键的元素进行连接操作,也需要进行Shuffle操作。
sortByKey():这个算子需要对RDD中的元素进行排序,因此需要进行Shuffle操作。
repartition()和coalesce():这两个算子都需要对RDD进行重新分区操作,需要进行Shuffle操作。

reduceByKey与groupByKey的区别,哪一种更具优势?

  • reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
  • groupByKey:按照key进行分组,直接进行shuffle

所以,在实际开发过程中,reduceByKey比groupByKey,更建议使用。但是需要注意是否会影响业务逻辑。

Repartition和Coalesce 的关系与区别,能简单说说吗?

  • 1)关系:
    两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
  • 2)区别:
    repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
    一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。

cache&persists

简述下Spark中的缓存(cache和persist)与checkpoint机制,并指出两者的区别和联系

关于Spark缓存和检查点的区别,大致可以从这3个角度去回答:

  • 位置
    Persist 和 Cache将数据保存在内存,Checkpoint将数据保存在HDFS
  • 生命周期
    Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法,Checkpoint永久存储不会被删除。
  • RDD依赖关系
    Persist 和 Cache,不会丢掉RDD间的依赖链/依赖关系,CheckPoint会斩断依赖链。

Checkpoint 和持久化机制的区别?

最主要的区别在于持久化只是将数据保存在 BlockManager 中,但是 RDD 的 lineage(血缘关系,依赖关系)是不变的。但是 checkpoint 执行完之后,rdd 已经没有之前所谓的依赖 rdd 了,而只有一个强行为其设置的 checkpointRDD,checkpoint 之后 rdd 的 lineage 就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是 checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低

Checkpoint 检查点机制?

应用场景:当 spark 应用程序特别复杂,从初始的 RDD 开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用 checkpoint 功能。

原因:对于特别复杂的 Spark 应用,会出现某个反复使用的 RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用 checkpoint()方法。之后在 RDD 所处的 job 运行结束之后,会启动一个单独的 job,来将 checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在 spark streaming 中用来保障容错性的主要机制,它可以使 spark streaming 阶段性的把应用数据存储到诸如 HDFS 等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

  1. 控制发生失败时需要重算的状态数。Spark streaming 可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。

  2. 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样 spark streaming 就可以读取之前运行的程序处理数据的进度,并从那里继续。

broadcast 广播&累加器

简述Spark中共享变量(广播变量和累加器)的基本原理与用途

关于Spark中的广播变量和累加器的基本原理和用途,

  • 累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
  • 广播变量是在每个机器上缓存一份,不可变,只读的,相同的变量,该节点每个任务都能访问,起到节省资源和优化的作用。它通常用来高效分发较大的对象。

数据存储

Spark 中数据的位置是被谁管理的?

每个数据分片都对应具体物理位置,数据的位置是被blockManager管理,无论数据是在磁盘,内存还是 tacyan,都是由 blockManager 管理。

Spark的数据本地性有哪几种?

spark的数据本地性有3种

  1. process_local是指读取缓存的本地的数据
  2. node_local指读取本地节点硬盘数据
  3. any指读取非本地节点数据

通常读取数据process_local>node_local>any,尽量使数据以process_local或node_local方式读取,其中process_local还和cache有关,如果rdd,如果rdd经常用的话将该rdd cache到内存中,由于cache是lazy的,必须通过action算子触发

spark streaming

Spark Streaming 以及基本工作原理?

Spark streaming 是 spark core API 的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。

它支持从多种数据源读取数据,比如 Kafka、Flume、Twitter 和 TCP Socket,并且能够使用算子比如 map、reduce、join 和 window 等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。

Spark streaming 内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成 batch,比如每收集一秒的数据封装成一个 batch,然后将每个 batch 交给 spark 的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的 batch 组成的。其中,一个batchInterval累加读取到的数据对应一个RDD的数据

如何实现Spark Streaming读取Flume中的数据?

可以这样说:

  • 前期经过技术调研,查看官网相关资料,发现sparkStreaming整合flume有2种模式,一种是拉模式,一种是推模式,然后在简单的聊聊这2种模式的特点,以及如何部署实现,需要做哪些事情,最后对比两种模式的特点,选择那种模式更好。

  • 推模式:Flume将数据Push推给Spark Streaming

  • 拉模式:Spark Streaming从flume 中Poll拉取数据

Spark Streaming 整合 Kafka 的两种模式?

  1. receiver 方式:将数据拉取到 executor 中做操作,若数据量大,内存存储不下,可以通过 WAL,设置了本地存储,保证数据不丢失,然后使用 Kafka 高级 API 通过 zk 来维护偏移量,保证消费数据。receiver 消费的数据偏移量是在 zk 获取的,此方式效率低,容易出现数据丢失。
  • receiver 方式的容错性:在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用 Spark Streaming 的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的 Kafka 数据写入分布式文件系统(比如 HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

  • Kafka 中的 topic 的 partition,与 Spark 中的 RDD 的 partition 是没有关系的。在 1、KafkaUtils.createStream()中,提高 partition 的数量,只会增加 Receiver 方式中读取 partition 的线程的数量。不会增加 Spark 处理数据的并行度。 可以创建多个 Kafka 输入 DStream,使用不同的 consumer group 和 topic,来通过多个 receiver 并行接收数据。

  1. 基于 Direct 方式:使用 Kafka 底层 Api,其消费者直接连接 kafka 的分区上,因为 createDirectStream 创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,但是需要自己维护偏移量,即用即取,不会给内存造成太大的压力,效率高。
  • 优点:简化并行读取:如果要读取多个 partition,不需要创建多个输入 DStream 然后对它们进行 union 操作。Spark 会创建跟 Kafka partition 一样多的 RDD partition,并且会并行从 Kafka 中读取数据。所以在 Kafka partition 和 RDD partition 之间,有一个一对一的映射关系。

  • 高性能:如果要保证零数据丢失,在基于 receiver 的方式中,需要开启 WAL 机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka 自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到 WAL 中。而基于 direct 的方式,不依赖 Receiver,不需要开启 WAL 机制,只要 Kafka 中作了数据的复制,那么就可以通过 Kafka 的副本进行恢复。

  1. receiver 与和 direct 的比较:
  • 基于 receiver 的方式,是使用 Kafka 的高阶 API 来在 ZooKeeper 中保存消费过的 offset 的。这是消费 Kafka 数据的传统方式。这种方式配合着 WAL 机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为 Spark 和 ZooKeeper 之间可能是不同步的。

  • 基于 direct 的方式,使用 Kafka 的低阶 API,Spark Streaming 自己就负责追踪消费的 offset,并保存在 checkpoint 中。Spark 自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

  • Receiver 方式是通过 zookeeper 来连接 kafka 队列,Direct 方式是直接连接到 kafka 的节点上获取数据。

DStream 以及基本工作原理?

DStream:Discretized Stream,离散流,Spark Streaming提供的一种高级抽象,代表了一个持续不断的数据流;
DStream可以通过输入数据源来创建,比如Kafka、Flume,也可以通过对其他DStream应用高阶函数来创建,比如map、reduce、join、window;

DStream的内部,其实是一系列持续不断产生的RDD,RDD是Spark Core的核心抽象,即,不可变的,分布式的数据集;
DStream中的每个RDD都包含了一个时间段内的数据;
以下图为例,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了RDD@time2,。。。

DStream 是 spark streaming 提供的一种高级抽象,代表了一个持续不断的数据流。

DStream 可以通过输入数据源来创建,比如 Kafka、flume 等,也可以通过其他 DStream 的高阶函数来创建,比如 map、reduce、join 和 window 等。

DStream 内部其实不断产生 RDD,每个 RDD 包含了一个时间段的数据。

Spark streaming 一定是有一个输入的 DStream 接收数据,按照时间划分成一个一个的 batch,并转化为一个 RDD,RDD 的数据是分散在各个子节点的 partition 中。

spark sql

你用 Spark Sql 处理的时候, 处理过程中用的 DataFrame 还是直接写的 Sql?为什么?

这个问题的宗旨是问你 spark sql 中 dataframe 和 sql 的区别,从执行原理、操作方便程度和自定义程度来分析
这个问题。

Spark SQL 是如何将数据写到 Hive 表的?

  • 方式一:是利用 Spark RDD 的 API 将数据写入 hdfs 形成 hdfs 文件,之后再将 hdfs 文件和 hive 表做加载映射。

  • 方式二:利用 Spark SQL 将获取的数据 RDD 转换成 DataFrame,再将 DataFrame 写成缓存表,最后利用 Spark SQL 直接插入 hive 表中。而对于利用 Spark SQL 写 hive 表官方有两种常见的 API,第一种是利用 JavaBean 做映射,第二种是利用 StructType 创建 Schema 做映射。

Spark SQL 执行的流程?

这个问题如果深挖还挺复杂的,这里简单介绍下总体流程:

  1. parser:基于 antlr 框架对 sql 解析,生成抽象语法树。

  2. 变量替换:通过正则表达式找出符合规则的字符串,替换成系统缓存环境的变量

SQLConf 中的spark.sql.variable.substitute,默认是可用的;参考SparkSqlParser

  1. parser:将 antlr 的 tree 转成 spark catalyst 的 LogicPlan,也就是 未解析的逻辑计划;详细参考AstBuild, ParseDriver

  2. analyzer:通过分析器,结合 catalog,把 logical plan 和实际的数据绑定起来,将 未解析的逻辑计划 生成 逻辑计划;详细参考QureyExecution

  3. 缓存替换:通过 CacheManager,替换有相同结果的 logical plan(逻辑计划)

  4. logical plan 优化,基于规则的优化;优化规则参考 Optimizer,优化执行器 RuleExecutor

  5. 生成 spark plan,也就是物理计划;参考QueryPlanner和SparkStrategies

  6. spark plan 准备阶段

  7. 构造 RDD 执行,涉及 spark 的 wholeStageCodegenExec 机制,基于 janino 框架生成 java 代码并编译

架构组件

Spark 有哪些组件?

  1. master:管理集群和节点,不参与计算。

  2. worker:计算节点,进程本身不参与计算,和 master 汇报。

  3. Driver:运行程序的 main 方法,创建 spark context 对象。

  4. spark context:控制整个 application 的生命周期,包括 dagsheduler 和 task scheduler 等组件。

  5. client:用户提交程序的入口。

Spark Master 使用 Zookeeper 进行 HA,有哪些源数据保存到 Zookeeper 里面?

spark 通过这个参数 spark.deploy.zookeeper.dir 指定 master 元数据在 zookeeper 中保存的位置,包括 Worker,Driver 和 Application 以及 Executors。standby 节点要从 zk 中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。

注:Master 切换需要注意 2 点:
1、在 Master 切换的过程中,所有的已经在运行的程序皆正常运行!
因为 Spark Application 在运行前就已经通过 Cluster Manager 获得了计算资源,所以在运行时 Job 本身的
调度和处理和 Master 是没有任何关系。
2、在 Master 的切换过程中唯一的影响是不能提交新的 Job:一方面不能够提交新的应用程序给集群,
因为只有 Active Master 才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因
Action 操作触发新的 Job 的提交请求。

HA

Spark Master HA 主从切换过程不会影响到集群已有作业的运行,为什么?

不会的。

因为程序在运行之前,已经申请过资源了,driver 和 Executors 通讯,不需要和 master 进行通讯的。

Spark 主备切换机制原理知道吗?

Master 实际上可以配置两个,Spark 原生的 standalone 模式是支持 Master 主备切换的。当 Active Master 节点挂掉以后,我们可以将 Standby Master 切换为 Active Master。

Spark Master 主备切换可以基于两种机制,一种是基于文件系统的,一种是基于 ZooKeeper 的。

基于文件系统的主备切换机制,需要在 Active Master 挂掉之后手动切换到 Standby Master 上;

而基于 Zookeeper 的主备切换机制,可以实现自动切换 Master。

Spark 如何保证宕机迅速恢复?

  1. 适当增加 spark standby master

  2. 编写 shell 脚本,定期检测 master 状态,出现宕机后对 master 进行重启操作

流程

Spark 的运行流程?


具体运行流程如下:

  1. SparkContext 向资源管理器注册并向资源管理器申请运行 Executor

  2. 资源管理器分配 Executor,然后资源管理器启动 Executor

  3. Executor 发送心跳至资源管理器

  4. SparkContext 构建 DAG 有向无环图

  5. 将 DAG 分解成 Stage(TaskSet)

  6. 把 Stage 发送给 TaskScheduler

  7. Executor 向 SparkContext 申请 Task

  8. TaskScheduler 将 Task 发送给 Executor 运行

  9. 同时 SparkContext 将应用程序代码发放给 Executor

  10. Task 在 Executor 上运行,运行完毕释放所有资源

调优

数据倾斜

对于 Spark 中的数据倾斜问题你有什么好的方案?

  1. 前提是定位数据倾斜,是 OOM 了,还是任务执行缓慢,看日志,看 WebUI

  2. 解决方法,有多个方面:

  • 避免不必要的 shuffle,如使用广播小表的方式,将 reduce-side-join 提升为 map-side-join
  • 分拆发生数据倾斜的记录,分成几个部分进行,然后合并 join 后的结果
  • 改变并行度,可能并行度太少了,导致个别 task 数据压力大
  • 两阶段聚合,先局部聚合,再全局聚合
  • 自定义 paritioner,分散 key 的分布,使其更加均匀

数据倾斜的产生和解决办法?

数据倾斜以为着某一个或者某几个 partition 的数据特别大,导致这几个 partition 上的计算需要耗费相当长的时间。

在 spark 中同一个应用程序划分成多个 stage,这些 stage 之间是串行执行的,而一个 stage 里面的多个 task 是可以并行执行,task 数目由 partition 数目决定,如果一个 partition 的数目特别大,那么导致这个 task 执行时间很长,导致接下来的 stage 无法执行,从而导致整个 job 执行变慢。

避免数据倾斜,一般是要选用合适的 key,或者自己定义相关的 partitioner,通过加盐或者哈希值来拆分这些 key,从而将这些数据分散到不同的 partition 去执行。

如下算子会导致 shuffle 操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;

当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?

嗯,有点“调优”的味道,感觉真正的“风暴”即将到来,这道题还是很好回答的,我们只需要减少连接数据库的次数即可。

  • 使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。

能介绍下你所知道和使用过的Spark调优吗?

  • 资源参数调优

    1
    2
    3
    4
    5
    6
    7
    8
    9
    num-executors:设置Spark作业总共要用多少个Executor进程来执行

    executor-memory:设置每个Executor进程的内存

    executor-cores:设置每个Executor进程的CPU core数量

    driver-memory:设置Driver进程的内存

    spark.default.parallelism:设置每个stage的默认task数量
  • 开发调优

    • 避免创建重复的RDD
    • 尽可能复用同一个RDD
    • 对多次使用的RDD进行持久化
    • 尽量避免使用shuffle类算子
    • 使用map-side预聚合的shuffle操作
    • 使用高性能的算子
      1
      2
      3
      4
      5
      6
      7
      8
      9
      ①使用reduceByKey/aggregateByKey替代groupByKey

      ②使用mapPartitions替代普通map

      ③使用foreachPartitions替代foreach

      ④使用filter之后进行coalesce操作

      ⑤使用repartitionAndSortWithinPartitions替代repartition与sort类操作
    • 广播大变量
      在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能。
    • 使用Kryo优化序列化性能
    • 优化数据结构
      在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。

在实际开发的时候是如何保证数据不丢失的?

可以这样说:

  • flume那边采用的channel是将数据落地到磁盘中,保证数据源端安全性(可以在补充一下,flume在这里的channel可以设置为memory内存中,提高数据接收处理的效率,但是由于数据在内存中,安全机制保证不了,故选择channel为磁盘存储。整个流程运行有一点的延迟性)

  • sparkStreaming通过拉模式整合的时候,使用了FlumeUtils这样一个类,该类是需要依赖一个额外的jar包(spark-streaming-flume_2.10)

  • 要想保证数据不丢失,数据的准确性,可以在构建StreamingConext的时候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)来创建一个StreamingContext,使用StreamingContext.getOrCreate来创建StreamingContext对象,传入的第一个参数是checkpoint的存放目录,第二参数是生成StreamingContext对象的用户自定义函数。如果checkpoint的存放目录存在,则从这个目录中生成StreamingContext对象;如果不存在,才会调用第二个函数来生成新的StreamingContext对象。在creatingFunc函数中,除了生成一个新的StreamingContext操作,还需要完成各种操作,然后调用ssc.checkpoint(checkpointDirectory)来初始化checkpoint功能,最后再返回StreamingContext对象。

这样,在StreamingContext.getOrCreate之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在start()调用前执行。

介绍一下 join 操作优化经验?

join 其实常见的就分为两类: map-side join 和 reduce-side join。

当大表和小表 join 时,用 map-side join 能显著提高效率。

将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘 IO 消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。

如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。

在大数据量的情况下,join 是一中非常昂贵的操作,需要在 join 之前应尽可能的先缩小数据量。

对于缩小数据量,有以下几条建议:

  • 若两个 RDD 都有重复的 key,join 操作会使得数据量会急剧的扩大。所有,最好先使用 distinct 或者 combineByKey 操作来减少 key 空间或者用 cogroup 来处理重复的 key,而不是产生所有的交叉结果。在 combine 时,进行机智的分区,可以避免第二次 shuffle。
  1. 如果只在一个 RDD 出现,那你将在无意中丢失你的数据。所以使用外连接会更加安全,这样你就能确保左边的 RDD 或者右边的 RDD 的数据完整性,在 join 之后再过滤数据。

  2. 如果我们容易得到 RDD 的可以的有用的子集合,那么我们可以先用 filter 或者 reduce,如何在再用 join。

Spark 中的 OOM 问题?

  1. map 类型的算子执行中内存溢出如 flatMap,mapPatitions
  • 原因:map 端过程产生大量对象导致内存溢出:这种溢出的原因是在单个 map 中产生了大量的对象导致的针对这种问题。
  1. 解决方案:
  • 增加堆内内存。
  • 在不增加内存的情况下,可以减少每个 Task 处理数据量,使每个 Task 产生大量的对象时,Executor 的内存也能够装得下。具体做法可以在会产生大量对象的 map 操作之前调用 repartition 方法,分区成更小的块传入 map。
  1. shuffle 后内存溢出如 join,reduceByKey,repartition。
  • shuffle 内存溢出的情况可以说都是 shuffle 后,单个文件过大导致的。在 shuffle 的使用,需要传入一个 partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner,默认值是父 RDD 中最大的分区数.这个参数 spark.default.parallelism 只对 HashPartitioner 有效.如果是别的 partitioner 导致的 shuffle 内存溢出就需要重写 partitioner 代码了.
  1. driver 内存溢出
  • 用户在 Dirver 端口生成大对象,比如创建了一个大的集合数据结构。解决方案:将大对象转换成 Executor 端加载,比如调用 sc.textfile 或者评估大对象占用的内存,增加 dirver 端的内存

  • 从 Executor 端收集数据(collect)回 Dirver 端,建议将 driver 端对 collect 回来的数据所作的操作,转换成 executor 端 rdd 操作。

编程

如何使用Spark实现TopN的获取(描述思路或使用伪代码)?

使用Spark实现TopN的一般思路是先使用MapReduce或者Spark计算出各个数据的得分(或者其他排序依据),然后再对这些得分进行排序,最后取出前N个得分最高的数据。其中,排序的过程是需要进行全局操作的,会产生Shuffle操作,因此在性能上需要考虑。
以下是一种使用Spark进行TopN操作的伪代码:

  • 读取数据并将数据转换为RDD格式 rdd = sc.textFile(“path/to/data”).map(parse_data)
  • 计算每个数据的得分 scores_rdd = rdd.map(lambda data: (data, compute_score(data)))
  • 对得分进行排序 sorted_scores_rdd = scores_rdd.sortBy(lambda score: score[1], ascending=False)
  • 取出前N个得分最高的数据 topN_rdd = sorted_scores_rdd.take(N)

其中,parse_data函数用于将原始数据解析成程序中需要的格式,compute_score函数用于计算数据的得分。在第二步和第三步中,需要根据实际情况选择合适的算子,如map()、reduceByKey()、sortBy()等。

参考文章