1.1.1 Case 11: Optimizing SQL and DataFrame
1.1.1.1 Optimizing the Spark SQL Join Function
Scenario
When two tables are joined in the Spark SQL, the Broadcast feature (see Using Broadcast Variables) can be used to broadcast small tables to each node, transferring the operation into a non-shuffle operation and improving task execution efficiency.
![]()
The join operation mentioned here indicates only the inner join operation.
Procedure
Perform the following steps to join tables in the Spark SQL. Assume that both tables A and B have the name column. Join tables A and B.
1. Estimate the table size.
Estimate the table size based on the size of loaded data.
Alternatively, you can check the table size in the storage directory of the Hive database. The path to the Hive database is stored in Spark configuration file hive-site.xml. By default, the path to the Hive database is /user/hive/warehouse. The default path to the Spark service multi-instance database is /user/hive/warehouse, for example, /user/hive1/warehouse.
<property>
<name>hive.metastore.warehouse.dir</name>
<value>${test.warehouse.dir}</value>
<description></description>
</property>
Run the hadoop command to check the table size in the HDFS directory. For example, check the size of table A:
hadoop fs -du -s -h ${test.warehouse.dir}/a
![]()
To perform the broadcast operation, ensure that at least one table is not empty.
2. Configure the threshold for automatic broadcasting.
The threshold for determining whether a table is broadcast is 67108864 (that is, 64 MB) in Spark. If either of the table sizes is smaller than 64 MB, skip this step.
Table 1-1 lists the threshold for automatic broadcasting.
Table 1-1 Parameters
|
Parameter |
Default Value |
Description |
|
spark.sql.autoBroadcastJoinThreshold |
10485760 |
Specifies the maximum value for broadcasting configuration when two tables are joined. If the table size is smaller than the parameter value, broadcasting is performed. If this parameter is set to -1, no broadcasting is performed. For details, visit https://spark.apache.org/docs/latest/sql-programming-guide.html. |
Methods for configuring the threshold for automatic broadcasting:
− In the spark-defaults.conf file, set the value of spark.sql.autoBroadcastJoinThreshold. <size> is set as required, but the value must be greater than either of the table size at least.
spark.sql.autoBroadcastJoinThreshold = <size>
− Run the Hive command to set the threshold. When joining the tables, run the following command in advance:
SET spark.sql.autoBroadcastJoinThreshold=<size>
<size> is set as required, but the value must be greater than either of the table size at least.
3. (Optional) In the following scenarios, you need to run the Analyze command (ANALYZE TABLE tableName COMPUTE STATISTICS noscan;) to update metadata before performing the broadcast operation:
− The to-be-broadcasted table is a newly created partitioned table and the file type is non-Parquet.
− The to-be-broadcasted table is a newly updated partitioned table.
4. Join the two tables.
At least one of the table sizes is smaller than the threshold.
If sizes of tables A and B are smaller than the threshold and the size of table A is smaller than that of table B, run the following command:
SELECT A.name FROM B JOIN A ON A.name = B.name;
If the size of table B is smaller than that of table A, run the following command:
SELECT A.name FROM A JOIN B ON A.name = B.name;
5. Release the Driver memory using the Executor broadcast.
The default BroadCastJoin will collect the contents of small tables to the Driver. Therefore, it is necessary to enlarge the Driver memory. The formula of memory is spark.sql.autoBroadcastJoinThreshold x the number of broadcast tables x 2.
If broadcast tasks are overloaded, the Driver may exit due to faulty OOM. If this case happens, enable the Executor broadcasting and set the broadcast parameter spark.sql.bigdata.useExecutorBroadcast to true to release the Driver memory.
Parameters
|
Parameter |
Description |
Default Value |
|
spark.sql.bigdata.useExecutorBroadcast |
If the parameter is set to true, the Executor broadcast will be used and the data of tables will be stored in the Executor, but not the Driver, releasing the memory of Spark Driver. |
true |
References
A task is ended if a timeout occurs during small table execution.
By default, BroadCastJoin allows only 5 minutes for small table calculation. If the time is exceeded, a timeout will occur. However, the broadcast task of small table calculation is still being executed, resulting in resource waste.
To resolve this issue, you can use either of the following methods:
l Modify the value of spark.sql.broadcastTimeout to increase the timeout duration.
l Reduce the value of spark.sql.autoBroadcastJoinThreshold to disable optimization of BroadCastJoin.
1.1.1.2 Improving Spark SQL Calculation Performance Under Data Skew
Scenario
In multiple tables join of the Spark SQL, a severe skew of the join key may occur and cause imbalanced data between hash buckets. As a result, partial tasks with a large amount of data run slowly and cause low calculation performance, while other tasks with a small data size free the CPU and cause CPU resource waste.
The following configurations balance tasks using the broadcast way and raise the CPU usage to improve performance.
![]()
The data without a skew are divided into buckets using the original method.
Restrictions:
1. The join keys with a skew in two tables are different. For example, A join B on A.name = B.name indicates if name = "zhangsan" in A is skew, name = "zhangsan" in B cannot be skew and other names such as name = "lisi" can be skew.
2. Only the join between two tables is supported.
3. Only TS (TableScan), FIL (Filter), and SEL (Select) are supported from TS (TableScan) to Join.
Procedure
Adjust the following parameters in the spark-defaults.conf file on the client:
Table 1-2 Parameters
|
Parameter |
Description |
Default Value |
|
spark.sql.planner.skewJoin |
Determine whether to enable data skew optimization. True indicates that the optimization is enabled. After the optimization is enabled, the join key is identified based on spark.sql.planner.skewJoin.threshold. The system handles the data using the broadcast way to avoid data skew, raise the CPU usage, and improve performance. |
false |
|
spark.sql.planner.skewJoin.threshold |
Determine the threshold for skew join optimization. If records of specified key is more than this value, we take it as a skew join key and will handle it using broadcast way. |
100000 |
1.1.1.3 Optimizing Spark SQL Performance in the Small File Scenario
Scenario
Many small files whose sizes are much smaller than the size of HDFS modules are generated in the Spark SQL. Each small file maps to a default partition in Spark, namely a task. In the scenario with many small files, Spark generates a large number of tasks. If the SQL includes Shuffle, the number of hash buckets is highly increased and severely affects Spark SQL performance.
In the small file scenario, you can manually specify the split size of each task by the following configurations to avoid generating a large number of tasks and improve performance.
![]()
If the SQL does not include Shuffle, the configurations do not significantly improve performance.
Procedure
Adjust the following parameters in the spark-defaults.conf file on the client:
Table 1-3 Parameters
|
Parameter |
Description |
Default Value |
|
spark.sql.small.file.combine |
Determine whether to enable small file optimization. True indicates that the small file optimization is enabled to avoid generating a large number of small tasks. |
false |
|
spark.sql.small.file.split.size |
Specify the expected split size of single task after small files are combined. Unit: Byte |
256000000 |
1.1.1.4 Optimizing the INSERT...SELECT Operation
Scenario
The INSERT...SELECT operation needs to be optimized if any of the following conditions is true:
l Many small files need to be queried.
l A few large files need to be queried.
l The INSERT...SELECT operation is performed by a non-spark user in beeline/JDBCServer mode.
Procedure
Optimize the INSERT...SELECT operation as follows:
l If the table to be created is the Hive table, set the storage type to Parquet. This enables INSERT...SELECT statements to be run faster.
l Perform the INSERT...SELECT operation as a spark-sql user or spark user (if in beeline/JDBCServer mode). In this way, it is no longer necessary to change the file owner repeatedly, accelerating the execution of INSERT...SELECT statements.
![]()
In beeline/JDBCServer mode, the executor user is the same as the driver user. The driver user is a spark user because the driver is a part of JDBCServer service and started by a spark user. If the beeline user is not a spark user, the file owner must be changed to the beeline user (actual user) because the executor is unaware of the beeline user.
l If many small files need to be queried, set spark.sql.small.file.combine to true. This combines the small files generated by map operations into fewer files and accelerates file renaming, ultimately enabling INSERT...SELECT statements to be run faster.
l If a few large files need to be queried, set spark.sql.small.file.combine to true and set spark.sql.small.file.split.size to an appropriate value. On the one hand, the small files generated by map operations can be combined into fewer files before being renamed; on the other hand, the file size can be well controlled. All these ultimately accelerate the execution of INSERT...SELECT statements.
![]()
The preceding optimizations are not a one-size-fits-all solution. In the following scenarios, it still takes long to perform the INSERT...SELECT operation:
l The dynamic partitioned table contains many partitions.
l Many large files need to be queried.
Even if combination of small files is enabled, many files will be generated, slowing down execution of INSERT...SELECT statements.

