Memory管理
MemoryAllocator(Spark Tungsten)
off-heap => org.apache.spark.unsafe.memory.UnsafeMemoryAllocator: spark.unsafe.offHeap=true, default false in-heap => org.apache.spark.unsafe.memory.HeapMemoryAllocatorMemoryManager(Spark Tungsten)
executor level => org.apache.spark.unsafe.memory.ExecutorMemoryManager task/thread level => org.apache.spark.unsafe.memory.TaskMemoryManager StaticMemoryManager(<1.5) UnifiedMemoryManager(>=1.6), 主要管理user memory和spark memoryJava Heap Spark System Reserved Memory -> 300M
spark.testing.reservedMemory
Spark inHeap/offHeap Memory:
spark.memory.fraction
User Memory -> (("Java Heap"–"Reserved Memory")*(1.0–spark.memory.fraction), default("Java Heap"–300MB)*0.25) Spark Memory -> (("Java Heap"–"Reserved Memory")*spark.memory.fraction, default("Java Heap"–300MB)*0.75) Storage Memory: cached data + “broadcast” variables + temporary space serialized data Execution Memory: storing the objects required during the execution of Spark tasks
CPU管理
spark(yarn)集群: spark master node + spark worker node * nspark(yarn)管理进程(JVM): yarn ResourceManager + yarn HistoryServer + yarn NodeManager * n spark(yarn) application进程(JVM): spark driver(yarn client) + Application Master + Container/Executor * nspark(standalone)集群: spark master node + spark worker node * nspark(standalone)管理进程(JVM): spark master + spark worker * n spark(standalone) application进程(JVM): spark driver + spark executors
每个application(多个JVM)可以包含一个或者多个job(可以认为一个job就是一次action)
每个job会分为多个stage,各个stage按顺序执行(stage的划分主要依据rdd之间的依赖关系) 每个stage由多个task并行或者顺序执行组成 对于并行的task, 一般一rdd对应多个partion, 一个partion对应需要一个task,即一个partition一个线程使用partitioner的操作 combineByKey aggregateByKey groupByKey reduceByKey cogroup join leftOuterJoin rightOuterJoin fullOuterJoin
单个job需要的executor(jvm进程)个数
spark.executor.instances 每个spark节点可以起一个或多个executor(jvm进程) 执行的并行度:Executor数目 * 每个Executor核数 每个executor拥有固定的核数(jvm线程)以及固定大小的堆(Spark User Memory) spark.executor.cores: 核数, 虚拟核(非cpu核心数) spark.executor.memory: 堆大小, 默认大小512m,代号sparkheap spark.storage.memoryFraction: block cache + broadcasts + task results, 默认0.6 spark.storage.safetyFraction:默认值0.8 sparkheap*spark.storage.memoryFraction*spark.storage.safetyFraction spark.shuffle.memoryFraction: shuffles + joins + sorts + aggregations, 频繁IO需要的buffer, 默认0.2 spark.shuffle.safetyFraction:默认值0.8 sparkheap*spark.shuffle.memoryFraction*spark.shuffle.safetyFractionspark.storage.unrollFraction:默认值0.2, 序列化和反序列化
spark.storage.safetyFraction:默认值0.9 sparkheap *spark.storage.memoryFraction*spark.storage.unrollFraction*spark.storage.safetyFraction每个task可用的内存通过这个公式计算:spark.executor.memory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction) / spark.executor.cores 。
(防止OOM, 可以使用spark.shuffle.safetyFraction)硬盘和IO管理
(只讨论HDFS作为输入)
Spark读取一个HDFS文件,并指定partition分区数 一个HDFS文件由多个block(每个block可以认为是一个本地文件)组成 Spark将同一个HDFS文件的若干个Block合并成一个InputSplit(输入分片),Spark依据这些输入分片生成具体的读取task(即一个InputSplit对应一个读task) 一个InputSplit或者读task的结果是生成目标rdd的一个partition 一个读task会被分配到某一个executor中,每个executor的虚拟核只会执行一个task 一个executor的每个线程只能执行一个task(即task并发度最多是Executor数*Executor虚拟核数) Map阶段一个rdd的partition数目不变 Reduce阶段一个rdd的partition数目不确定,依赖具体算子,某些不变,某些变少,某些可配置