1 Case 10: Spark Core Optimization
1.1 Data Serialization
Scenario
Spark supports the following types of serialization:
l JavaSerializer
l KryoSerializer
Data serialization affects the Spark application performance. In specific data format, KryoSerializer offers 10X higher performance than JavaSerializer. For Int data, performance optimization can be ignored.
KryoSerializer depends on Chill of *******. Not all Java Serializable objects support KryoSerializer. Therefore, class must be manually registered.
Serialization involves task serialization and data serialization. Only JavaSerializer can be used for Spark task serialization. JavaSerializer and KryoSerializer can be used for data serialization.
Procedure
When the Spark program is running, a large amount of data needs to be serialized during the shuffle and RDD cache procedures. By default, JavaSerializer is used. You can also configure KryoSerializer as the data serializer to improve serialization performance.
Add the following code to enable KryoSerializer to be used:
l Implement the class registrator and manually register the class.<0
package com.etl.common;
import com.esotericsoftware.kryo.Kryo;
import org.apache.spark.serializer.KryoRegistrator;
public class DemoRegistrator implements KryoRegistrator
{
@Override
public void registerClasses(Kryo kryo)
{
//Class examples are given below.
Register the custom classes.
kryo.register(AggrateKey.class);
kryo.register(AggrateValue.class);
}
}
You can configure spark.kryo.registrationRequired on Spark client. Whether to require registration with Kryo. If set to 'true', Kryo will throw an exception if an unregistered class is serialized. If set to false (the default), Kryo will write unregistered class names along with each object. Writing class names can cause significant performance overhead. This operation will affect the system performance. If the value of spark.kryo.registrationRequired is configured to true, users need to manually register the class. For a class that is not serialized, the system will not automatically write the class name, but display an exception. Compare the configuration of true with that of false, the configuration of true has the better performance.
l Configure KryoSerializer as the data serializer and class registrator.
val conf = new SparkConf()
conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator",
"com.etl.common.DemoRegistrator")
1.2 Optimizing Memory Configuration
Scenario
Spark is a memory-based computing frame. If the memory is insufficient during computing, the Spark execution efficiency will be adversely affected. You can determine whether memo****ecomes the performance bottleneck by monitoring garbage collection (GC) and evaluating the resilient distributed dataset (RDD) size in the memory, and take performance optimization measures.
To monitor GC of node processes, add the -verbose:gc -XX:+Print***etails -XX:+PrintGCTimeStamps parameter to the spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the client configuration file conf/spark-default.conf. If "Full GC" is frequently reported, GC needs to be optimized. Cache the RDD and query the RDD size in the log. If a large value is found, change the RDD storage level.
Procedure
l To optimize GC, adjust the ratio of the young generation and tenured generation. Add -XX:NewRatio parameter to the spark.driver.extraJavaOptions and spark.executor.extraJavaOptions in the client configuration file conf/spark-default.conf. For example, export SPARK_JAVA_OPTS=" -XX:NewRatio=2". The new generation accounts for 1/3 of the heap, and the tenured generation accounts for 2/3.
l Optimize the RDD data structure when compiling Spark programs.
− Use primitive arrays to replace fastutil arrays, for example, use fastutil library.
− Avoid nested structure.
− Avoid using String in keys.
l Suggest serialize the RDDs when developing Spark programs.
By default, data is not serialized when RDDs are cached. You can set the storage level to serialize the RDDs and minimize memory usage. For example:
testRDD.persist(StorageLevel.MEMORY_ONLY_SER)
1.3 Setting the DOP
Scenario
The degree of parallelism (DOP) specifies the number of tasks to be executed concurrently. It determines the number of data blocks after the shuffle operation. Adjust the DOP to improve the processing capability of the system.
Query the CPU and memory usage. If the tasks and data are not evenly distributed among nodes, increase the DOP. Generally, set the DOP to two or three times that of the total CPUs in the cluster.
Procedure
Configure the DOP parameter using one of the following methods based on the actual memory, CPU, data, and application logic conditions:
l Configure the DOP parameter in the operation function that generates the shuffle. This method has the highest priority.
testRDD.groupByKey(24)
l Configure the DOP using spark.default.parallelism. This method has the lower priority than the preceding one.
val conf = new SparkConf();
conf.set("spark.default.parallelism", 24);
l Configure the value of spark.default.parallelism in the $SPARK_HOME/conf/spark-defaults.conf file. This method has the lowest priority.
spark.default.parallelism 24
1.4 Using Broadcast Variables
Scenario
Broadcast distributes data sets to each node. It allows data to be obtained locally when a data set is needed during a Spark task. If broadcast is not used, data serialization will be scheduled to tasks each time when a task requires data sets. It is time-consuming and makes the task get bigger.
1. If a data set will be used by each slice of a task, broadcast the data set to each node.
2. When small and big tables need to be joined, broadcast small tables to each node. This eliminates the shuffle operation, changing the join operation into a common operation.
Procedure
Add the following code to broadcast the testArr data to each node:
def main(args: Array[String) {
...
val testArr: Array[Long] = new Array[Long](200)
val testBroadcast: Broadcast[Array[Long]] = sc.broadcast(testArr)
val resultRdd: RDD[Long] = inpputRdd.map(input => handleData(testBroadcast, input))
...
}
def handleData(broadcast: Broadcast[Array[Long]], input: String) {
val value = broadcast.value
...
}
1.5 Using the External Shuffle Service to Improve Performance
Scenario
When applications including the shuffle process are run on the Spark, the Executor not only runs tasks, but also writes shuffle data and provides shuffle data for other Executors. If an Executor is heavily loaded and garbage collection (GC) occurs, the Executor cannot provide shuffle data for other Executors, affecting task running.
The external shuffle service is an assistant service existing in the NodeManager for a long period. It captures shuffle data to reduce pressure on the Executor. When Executor GC occurs, tasks on other Executors are not affected.
Procedure
1. Log in to FusionInsight Manager.
2. Choose Services > Spark > Service Configuration. Set Type to All.
3. Choose SparkResource > Default and modify the parameters in the following table.
Table 1-1 Parameter list 1
|
Parameter |
Default Value |
Modification Result |
|
spark.shuffle.service.enabled |
false |
true |
4. Restart the Spark service for the configuration to take effect.
![]()
To use the External Shuffle Service function on the Spark client, you need to download and install the Spark client again. For details, see section Installing a Client.
1.6 Dynamically Scheduling Resources for the YARN
Scenario
Resources are important factors that affect the Spark execution efficiency. When a long-running service (such as the JDBCServer) is allocated with multiple Executors without tasks but resources of other applications are insufficient, resources are wasted and scheduled improperly.
Dynamic resource scheduling enables the number of Executors to be increased or decreased based on the task load conditions. In this way, the Spark system will be more healthy.
Procedure
1. Configure the external shuffle service.
2. In the spark-defaults.conf file, need to add the spark.dynamicAllocation.enabled configuration item, and set its value to true. The default value is false.
3. lists some optional configuration items.
Table 1-2 Parameters for dynamic resource scheduling
|
Configuration Item |
Description |
Default Value |
|
spark.dynamicAllocation.minExecutors |
Minimum number of Executors |
0 |
|
spark.dynamicAllocation.initialExecutors |
Initial number of Executors |
spark.dynamicAllocation.minExecutors |
|
spark.dynamicAllocation.maxExecutors |
Maximum number of Executors |
2048 |
|
spark.dynamicAllocation.schedulerBacklogTimeout |
First timeout period of scheduling |
1(s) |
|
spark.dynamicAllocation. sustainedSchedulerBacklogTimeout |
Later timeout period of scheduling |
spark.dynamicAllocation. schedulerBacklogTimeout |
|
spark.dynamicAllocation.executorIdleTimeout |
Idle timeout period of the Executor |
60(s) |
|
spark.dynamicAllocation.cachedExecutorIdleTimeout |
Idle timeout period of the Executor that contains cached blocks |
Integer.MAX_VALUE |
![]()
l The External Shuffle Service must be configured to use the dynamic resource scheduling function. If the External Shuffle Service is not configured, shuffle files will be lost when the Executor is killed.
l If spark.executor.instances or --num-executors specifies the number of Executor, the dynamic resource allocation will not take effect even if enabled.
l When the dynamic resource allocation function is enabled, if a certain task is allocated to an executor to be deleted (this case is not completely avoidable), the task will fail. A job fails only if the same task fails for four times (this can be configured by spark.task.maxFailures). Therefore, a job seldom fails because a task is allocated to an executor to be deleted. In addition, the probability of job failure can be reduced by increasing the value of spark.task.maxFailures.
1.7 Configuring the Process Memory
Scenario
There are three processes in Spark on YARN mode: Driver, ApplicationMaster, and Executor. The Driver and Executor handles the scheduling and running of the task. The ApplicationMaster handles the start and stop of the container.
Therefore, the configuration of the Driver and Executor is very important to run the spark application. Users can optimize the performance of the cluster as follows.
Procedure
Step 1 Configure the Driver Memory.
The Driver schedules tasks and communicates with the Executor and the ApplicationMaster. Add Driver memory when the number and parallelism level of the tasks increases.
You can configure the memo****ased on the number of the tasks.
l Configure spark.driver.memory in spark-defaults.conf or SPARK_DRIVER_MEMORY in spark-env.sh appropriately.
l Add --driver-memory MEM parameter to configure the memory during use the spark-submit command.
Step 2 Configure the number of the Executors.
One core in an Executor can run one task at the same time. Therefore, more tasks can be processed at the same time if you increase the number of the Executors. You can add the number of the Executors to increase the efficiency if resources are sufficient.
l Configure the spark.executor.instance in spark-defaults.conf or the SPARK_EXECUTOR_INSTANCES in spark-env.sh appropriately. You can also optimize the system by configuring dynamic resource schedule function. For details, see Dynamically Scheduling Resources for the YARN.
l Add --num-executors NUM parameter to configure the number of the Executors during the use of the spark-submit command.
Step 3 Configure the number of the Executor cores.
Multiple cores in an Executor can run multiple tasks at the same time. Therefore, more tasks can be processed at the same time. However, balance of the memory and the number of the cores need to be considered because the memory of the Executor is shared by all of the cores.
l Configure the spark.executor.cores in spark-defaults.conf or the SPARK_EXECUTOR_CORES in spark-env.sh appropriately.
l Add --executor-cores NUM parameter to configure the cores during the use of the spark-submit command.
Step 4 Configure the Executor memory.
The memory of Executor is used to communicate and run task. You can increase the memory for a big task that needs more resources, and reduce the memory to increase the concurrency level for a small task that runs fast.
l Configure the spark.executor.memory in spark-defaults.conf or the SPARK_EXECUTOR_MEMORY in spark-env.sh appropriately.
l Add --executor-memory MEM parameter to configure the memory during the use of the spark-submit command.
----End
Example
l During the spark wordcount calculation, the amount of data is 1.6 TB, and the number of the executors is 250.
The operation failed in the default configuration with Futures timed out and OOM errors.
However each task of wordcount is small and runs fast, the amount of the data is big and the tasks are too many. Therefore the objects on the driver end become huge when there are many tasks. Besides the fact that the executor communicates with the driver once each task is finished, the problem of disconnection between processes caused by insufficient memory occurs.
The application runs successfully when the memory of the Driver is set to 4 GB.
l Many errors still occurred in the default configuration when running TPC-DS test on JDBCServer, such as Executor Lost. All of the tasks run successfully when Driver memory is set to 30 GB, number of the executor cores to 2, and number of the executors to 125.
1.8 Designing the Direction Acyclic Graph (DAG)
Scenario
Optimal program structure helps increase execution efficiency. During application programming, avoid shuffle operations and combine narrow-dependency operations.
Procedure
This topic describes how to design the DAG using the following example:
l Data format: Time passing through the toll station, vehicle license plate number, toll station number...
l Logic: Two vehicles are determined to be raveling together if the following conditions are met:
− The two vehicles pass through tool stations in the same sequence.
− The time difference that the two vehicles pass through the same toll station is below the specified value.
This example can be implemented in two ways.
Figure 1-1 Implementation logic 1
![]()
Logic of implementation 1:
1. Collect information about the toll stations passed by each vehicle based on the vehicle license plate number and sort the toll stations. The following data is obtained:
vehicle license plate number 1, [(time, toll station 3), (time, toll station 2), (time, toll station 4), (time, toll station 5)]
2. Determine the sequence in which the vehicle passed through.
(toll station 3, (vehicle license plate number 1, time, 1st toll station))
(toll station 2, (vehicle license plate number 1, time, 2nd toll station))
(toll station 4, (vehicle license plate number 1, time, 3rd toll station))
(toll station 5, (vehicle license plate number 1, time, 4th toll station))
3. Aggregate data by toll station.
toll station 1, [(vehicle license plate number 1, time, 1st toll station), (vehicle license plate number 2, time, 5th toll station), (vehicle license plate number 3, time, 2nd toll station)]
4. Determine whether the time difference that two vehicles passed through the same toll station is below the specified value. If yes, fetch information about the two vehicles.
(vehicle license plate number 1, vehicle license plate number 2),(1st toll station, 5th toll station)
(vehicle license plate number 1, vehicle license plate number 3),(1st toll station, 2nd toll station)
5. Aggregate data based on the vehicle license plate numbers that passed through the same toll stations.
(vehicle license plate number 1, vehicle license plate number 2), [(1st toll station, 5th toll station), (2nd toll station, 6th toll station), (1st toll station, 7th toll station), (3rd toll station, 8th toll station)]
6. If the two vehicles pass through the same toll stations in sequence, for example, toll stations 3, 4, 5 are the first, second, and third toll station passed by vehicle 1 and the 6th, 7th, and 8th toll station passed by vehicle 2, and the number of toll stations meets the specified requirements, the two vehicles are determined to be traveling together.
The logic of implementation 1 has the following disadvantages:
l The logic is complex.
l Too many shuffle operations affect performance.
Figure 1-2 Implementation logic 2
![]()
Logic of implementation 2:
1. Collect information about the toll stations passed by each vehicle based on the vehicle license plate number and sort the toll stations. The following data is obtained:
vehicle license plate number 1, [(time, toll station 3), (time, toll station 2), (time, toll station 4), (time, toll station 5)]
2. Based on the number of toll stations that must be passed by peer vehicles (it is 3 in this example), divide the toll station sequence as follows:
toll station 3->toll station 2->toll station 4, (vehicle license plate number 1, [time passing through toll station 3, time passing through toll station 2, time passing through toll station 4])
toll station 2->toll station 4->toll station 5, (vehicle license plate number 1, [time passing through toll station 2, time passing through toll station 4, time passing through toll station 5])
3. Aggregate the vehicles that passed through tool stations in the same sequence.
toll station 3->toll station 2->toll station 4, [(vehicle license plate number 1, [time passing through toll station 3, time passing through toll station 2, time passing through toll station 4]), (vehicle license plate number 2, [time passing through toll station, time passing through toll station 3,time passing through toll station, time passing through toll station 2, time passing through toll station 4]), (vehicle license plate number 3, [time passing through toll station 3, time passing through toll station 2, time passing through toll station 4])]
4. Determine whether the time difference that these vehicles passed through the same toll station is below the specified value. If yes, the vehicles are determined to be raveling together.
The logic of implementation 2 has the following advantages:
l The logic is simplified.
l One groupByKey is reduced, that is, one less shuffle operation is performed. It helps improve performance.
1.9 Experience
Use mapPartitions to calculate data by partition.
If the overhead of each record is high, for example,
rdd.map{x=>conn=getDBConn;conn.write(x.toString);conn.close}
Use mapPartitions to calculate data by partition.
rdd.mapPartitions(records => conn.getDBConn;for(item <- records)
write(item.toString); conn.close)
Use mapPartitions to flexibly operate data. For example, to calculate the TopN of a large data, mapPartitions can be used to calculate the TopN of each partition and then sort the TopN of all partitions when N is small. Compared with sorting full data for the TopN, this method has the higher efficiency.
Use coalesce to adjust the number of slices.
Use coalesce to adjust the number of slices. There are two coalesce functions:
coalesce(numPartitions: Int, shuffle: Boolean = false)
When shuffle is set to true, the function is the same as repartition(numPartitions:Int). Partitions are recreated using the shuffle. When shuffle is set to false, partitions of the parent resilient distributed datasets (RDD) are calculated in the same task. In this case, if the value of numPartitions is larger than the number of slicings of the parent RDD, partitions will not be recreated.
The following scenario is encountered, you can choose the coalesce operator:
l If the previous operation involves a large number of filters, use coalesce to minimize the number of zero-loaded tasks. In coalesce(numPartitions, false), the value of numPartitions is smaller than the number of slicings of the parent RDD.
l Use coalesce when the number of slices entered is too big to execute.
l Use coalesce when the programs are suspended in the shuffle operation because of a large number of tasks or the Linux resources are limited. In this case, use coalesce(numPartitions, true) to recreate partitions.
Configure a localDir for each disk.
During the shuffle procedure of Spark, data needs to be written into local disks. The performance bottleneck of Spark is shuffle, and the bottleneck of shuffle is the I/O. To improve the I/O performance, you can configure multiple disks to implement concurrent data writing. If a node is mounted with multiple disks, configure a Spark local Dir for each disk. This can effectively distribute shuffle files in multiple locations, improving disk I/O efficiency. The performance cannot be improved effectively if a disk is configured with multiple directories.
Collect small data sets.
The collect operation does not apply to a large data volume.
When the collect operation is performed, the Executor data will be sent to the Driver. Before performing this operation, ensure that the memory of Driver is sufficient. Otherwise, the Driver process may encounter an OutOfMemory error. If the data volume is unknown, perform the saveAsTextFile operation to write data into the HDFS. If the data volume is known and the Driver has sufficient memory, perform the collect operation.
Use reduceByKey
reduceByKey causes local aggregation on the Map side, which offers a smooth shuffle procedure. The shuffle operations, like groupByKey, will not perform aggregation on the Map side. Therefore, use reduceByKey as possible as you can, and avoid groupByKey().map(x=>(x._1,x._2.size)).
Broadcast map instead of array.
If table query is required for each record of the data transmitted from the Driver side, broadcast the data in the set/map instead of Iterator. The query speed of Set/Map is approximately O(1), while the query speed of Iterator is O(n).
Avoid data skew.
If data skew occurs (certain data volume is extremely large), the execution time of tasks is inconsistent even if there is no Garbage Collection (GC).
l Redefine the keys. Use keys of smaller granularity to optimize the task size.
l Modify the degree of parallelism (DOP).
Optimize the data structure.
l Store data by column. As a result, only the required columns are scanned when data is read.
l When using the Hash Shuffle, set spark.shuffle.consolidateFiles to true to combine the intermediate files of shuffle, minimize the number of shuffle files and file I/O operations, and improve performance. The number of final files is the number of reduce tasks.


