Hello everyone, I am V. In the actual business scenario, theSpark tasks appear OOM (Out of Memory) The problem is usually caused by the excessive amount of data processed by the task, unreasonable resource allocation, or the existence of performance bottlenecks in the code. For different business scenarios and reasons, the following aspects can be optimized and solved.
I. Analysis of business scenarios and possible OOM causes
-
Excessive data volume:
- Business Scenario: Processing massive datasets (e.g., hundreds of millions of rows of log data or tens of terabytes of datasets), large-scale aggregation, sorting, joining, and other operations need to be performed on the data during task execution.
- OOM Cause: Data does not fit completely into memory, resulting in an overflow, especially in the case of
shuffle
maybejoin
The amount of data skyrocketed during the operation.
-
Data skew:
- Business Scenario: Uneven distribution of processed data (e.g., too much data for a particular user or product), resulting in computational or memory bottlenecks on some nodes.
- OOM Reason: Due to the large amount of data to be processed by some nodes, tasks on some nodes use resources beyond the available memory, while other nodes are lightly loaded.
-
Irrational allocation of resources:
- Business Scenario: Resource allocation is too low, resulting in insufficient memory, CPU, and other resources allocated to a single task.
- OOM Cause: Executor's memory setting is too small, or data is over-cached, resulting in insufficient memory.
-
Excessive caching or poor memory usage in the code:
- Business scenario: frequent use
cache()
、persist()
, or unnecessary operations on data structures that lead to excessive memory consumption. - OOM Cause: The data cache is not released in time, resulting in excessive memory usage.
- Business scenario: frequent use
II. Solutions to the OOM problem
1. Adjusting the memory and CPU resources of the Executor
Ensure, through rational allocation of resources, that eachExecutor
There is enough memory to process the data.
-
Increase memory for Executor:
In Spark, theExecutor
is responsible for executing tasks on cluster nodes, by default eachExecutor
of memory may not be sufficient to handle large data sets. It is possible to increase theExecutor
of memory to mitigate OOM issues.
--executor-memory 8G
This can be done by--executor-memory
option to set eachExecutor
The memory is set to 8GB, for example. For example, set the memory to 8 GB. if there is a large amount of data, you can set a larger memory as appropriate.
-
Adjusting off-heap memory:
Spark also uses a portion of off-heap memory. If a lot of off-heap memory operations are involved, you can increase the off-heap memory with the following configuration:
--conf =true
--conf =4G
-
Adjust the number of CPU cores for Executor:
for eachExecutor
Allocate more CPU cores to speed up the processing of tasks and prevent prolonged memory usage.
--executor-cores 4
pass (a bill or inspection etc)--executor-cores
Setting eachExecutor
The number of cores used. For example, the number of cores can be set to 4 to increase concurrent computing power.
2. Adjusting memory management strategies
Spark's memory management strategy involves the following key parameters, whose optimal configuration can help reduce OOM issues.
-
Adjusting memory management ratios:
Spark and above use a unified memory management model, and you can optimize memory usage by adjusting the following parameters:
--conf =0.8
--conf =0.5
-
: This parameter controls the total ratio of storage to execution memory and defaults to 0.6, which can be adjusted upward as appropriate.
-
: This parameter determines the amount of time spent in the
The percentage of storage memory on the basis of the If more execution memory is needed, the value can be reduced appropriately.
-
Reduce storage footprint of cached data:
-
Clear the cache in a timely manner: Timely recall of data that are no longer required
unpersist()
to clean up the cache and free up memory.
-
Clear the cache in a timely manner: Timely recall of data that are no longer required
()
-
Adjusting the cache level: When caching, use the
StorageLevel.DISK_ONLY
maybeStorageLevel.MEMORY_AND_DISK
, to reduce the memory footprint.
(StorageLevel.MEMORY_AND_DISK)
3. Data slicing and optimization operations
Spark tasks in theshuffle
、join
、groupBy
and other operations typically cause significant memory consumption, the following optimizations mitigate the risk of OOM from these operations.
-
Adjusting the number of partitions:
- For large-scale data operations such as
join
、shuffle
etc., the setting of the number of partitions is crucial. If the number of partitions is too low, it may lead to too much data in some partitions, which in turn leads to memory overflow.
- For large-scale data operations such as
(200)
Or explicitly specify the number of partitions when performing certain operations:
(_ + _, numPartitions = 200)
- The usual experience is to set the number of partitions several times higher than the number of Executors (e.g., 2-4 partitions per core processing).
-
Avoiding excessively wide dependencies:
Wide dependencies (e.g.groupByKey
) can cause memory pressure during shuffle, especially with large amounts of data, and should be avoided as much as possible. It can be avoided by replacingreduceByKey
and other operations with pre-aggregation to reduce memory consumption:
(_ + _)
-
Avoiding data skew:
If there is data skewing and some nodes process large amounts of data, it can easily lead to OOM. the following are common solutions:- Random key splitting: Random prefixes can be added to the data to break up the data and avoid overloading some nodes with too much data.
(x => ((x._1 + new Random().nextInt(10)), x._2))
-
broadcast schedule: in
join
In operation, if a table is small, you can use broadcast variables to broadcast the small table to each node to reduce data transfer and memory usage:
val broadcastVar = (smallTable)
{ partition =>
val small =
(largeRow => ...)
}
4. Tuning Spark's parallelism and Shuffle mechanism
Spark's shuffle operations (such as thegroupByKey
、join
) can result in a large amount of data needing to be transferred between different nodes. If the parallelism is set too low, it can easily lead to too much data being processed by one node, thus triggering OOM.
- Increase parallelism:
--conf =200
Or set it explicitly in the code:
("", "200")
- By default, the
value may be small (e.g., 200), and adjusting the value appropriately based on the data size can reduce the load on a single node.
-
Adjustments to the Shuffle merge mechanism:
Spark 3.0 introduced theAdaptive Query Execution (AQE)The number of partitions in the shuffle can be dynamically adjusted at execution time to avoid overloading certain partitions with too much data:
--conf =true
--conf =64M
AQE can avoid OOM by automatically adjusting the number of shuffle partitions based on the execution of tasks.
V. To summarize
OOM problems in Spark tasks are often caused by excessive data volume, data skewing, unreasonable resource allocation, etc. For different business scenarios, the following measures can be taken to optimize:
- Rationalize memory and CPU allocation: Increase the number of memory and CPU cores for Executor and configure memory management parameters wisely.
- Adjusting the number of partitions and optimizing operations: Reduce memory usage by adjusting the number of partitions, reducing wide dependencies, etc.
- Skewed processing data: Avoid data skewing by random key splits, broadcasting small tables, etc.
-
Optimizing Memory with Caching: Reduce unnecessary
cache()
cap (a poem)persist()
operations and release cached data in a timely manner.
Well, today's content is written here, these optimization methods can be used in combination to effectively solve the OOM problem in Spark tasks. Pay attention to Wei brother love programming, code code passes without hair loss.