Got it

Spark:Case 11: Optimizing SQL and DataFrame

Latest reply: Oct 19, 2018 03:33:58 1053 3 0 0 0

1.1.1 Case 11: Optimizing SQL and DataFrame

1.1.1.1 Optimizing the Spark SQL Join Function

Scenario

When two tables are joined in the Spark SQL, the Broadcast feature (see Using Broadcast Variables) can be used to broadcast small tables to each node, transferring the operation into a non-shuffle operation and improving task execution efficiency.

note

The join operation mentioned here indicates only the inner join operation.

Procedure

Perform the following steps to join tables in the Spark SQL. Assume that both tables A and B have the name column. Join tables A and B.

1.         Estimate the table size.

Estimate the table size based on the size of loaded data.

Alternatively, you can check the table size in the storage directory of the Hive database. The path to the Hive database is stored in Spark configuration file hive-site.xml. By default, the path to the Hive database is /user/hive/warehouse. The default path to the Spark service multi-instance database is /user/hive/warehouse, for example, /user/hive1/warehouse.

<property> 
 <name>hive.metastore.warehouse.dir</name> 
  <value>${test.warehouse.dir}</value> 
  <description></description> 
</property>

Run the hadoop command to check the table size in the HDFS directory. For example, check the size of table A:

hadoop fs -du -s -h ${test.warehouse.dir}/a

note

To perform the broadcast operation, ensure that at least one table is not empty.

2.         Configure the threshold for automatic broadcasting.

The threshold for determining whether a table is broadcast is 67108864 (that is, 64 MB) in Spark. If either of the table sizes is smaller than 64 MB, skip this step.

Table 1-1 lists the threshold for automatic broadcasting.

Table 1-1 Parameters

Parameter

Default    Value

Description

spark.sql.autoBroadcastJoinThreshold

10485760

Specifies the maximum value for broadcasting configuration when two tables are joined. If the table size is smaller than the parameter value, broadcasting is performed. If this parameter is set to -1, no broadcasting is performed.

For details, visit https://spark.apache.org/docs/latest/sql-programming-guide.html.

 

Methods for configuring the threshold for automatic broadcasting:

           In the spark-defaults.conf file, set the value of spark.sql.autoBroadcastJoinThreshold. <size> is set as required, but the value must be greater than either of the table size at least.

spark.sql.autoBroadcastJoinThreshold = <size>

           Run the Hive command to set the threshold. When joining the tables, run the following command in advance:

SET spark.sql.autoBroadcastJoinThreshold=<size>

<size> is set as required, but the value must be greater than either of the table size at least.

3.         (Optional) In the following scenarios, you need to run the Analyze command (ANALYZE TABLE tableName COMPUTE STATISTICS noscan;) to update metadata before performing the broadcast operation:

           The to-be-broadcasted table is a newly created partitioned table and the file type is non-Parquet.

           The to-be-broadcasted table is a newly updated partitioned table.

4.         Join the two tables.

At least one of the table sizes is smaller than the threshold.

If sizes of tables A and B are smaller than the threshold and the size of table A is smaller than that of table B, run the following command:

SELECT A.name FROM B JOIN A ON A.name = B.name;

If the size of table B is smaller than that of table A, run the following command:

SELECT A.name FROM A JOIN B ON A.name = B.name;

5.         Release the Driver memory using the Executor broadcast.

The default BroadCastJoin will collect the contents of small tables to the Driver. Therefore, it is necessary to enlarge the Driver memory. The formula of memory is spark.sql.autoBroadcastJoinThreshold x the number of broadcast tables x 2.

If broadcast tasks are overloaded, the Driver may exit due to faulty OOM. If this case happens, enable the Executor broadcasting and set the broadcast parameter spark.sql.bigdata.useExecutorBroadcast to true to release the Driver memory.

Parameters

Parameter

Description

Default    Value

spark.sql.bigdata.useExecutorBroadcast

If the parameter is set to true, the Executor broadcast will be used and the data of tables will be stored in the Executor, but not the Driver, releasing the memory of Spark Driver.

true

 

References

A task is ended if a timeout occurs during small table execution.

By default, BroadCastJoin allows only 5 minutes for small table calculation. If the time is exceeded, a timeout will occur. However, the broadcast task of small table calculation is still being executed, resulting in resource waste.

To resolve this issue, you can use either of the following methods:

l   Modify the value of spark.sql.broadcastTimeout to increase the timeout duration.

l   Reduce the value of spark.sql.autoBroadcastJoinThreshold to disable optimization of BroadCastJoin.

1.1.1.2 Improving Spark SQL Calculation Performance Under Data Skew

Scenario

In multiple tables join of the Spark SQL, a severe skew of the join key may occur and cause imbalanced data between hash buckets. As a result, partial tasks with a large amount of data run slowly and cause low calculation performance, while other tasks with a small data size free the CPU and cause CPU resource waste.

The following configurations balance tasks using the broadcast way and raise the CPU usage to improve performance.

note

The data without a skew are divided into buckets using the original method.

Restrictions:

1.         The join keys with a skew in two tables are different. For example, A join B on A.name = B.name indicates if name = "zhangsan" in A is skew, name = "zhangsan" in B cannot be skew and other names such as name = "lisi" can be skew.

2.         Only the join between two tables is supported.

3.         Only TS (TableScan), FIL (Filter), and SEL (Select) are supported from TS (TableScan) to Join.

Procedure

Adjust the following parameters in the spark-defaults.conf file on the client:

Table 1-2 Parameters

Parameter

Description

Default    Value

spark.sql.planner.skewJoin

Determine whether to enable data skew optimization. True indicates that the optimization is enabled. After the optimization is enabled, the join key is identified based on spark.sql.planner.skewJoin.threshold. The system handles the data using the broadcast way to avoid data skew, raise the CPU usage, and improve performance.

false

spark.sql.planner.skewJoin.threshold

Determine the threshold for skew join optimization. If records of specified key is more than this value, we take it as a skew join key and will handle it using broadcast way.

100000

 

1.1.1.3 Optimizing Spark SQL Performance in the Small File Scenario

Scenario

Many small files whose sizes are much smaller than the size of HDFS modules are generated in the Spark SQL. Each small file maps to a default partition in Spark, namely a task. In the scenario with many small files, Spark generates a large number of tasks. If the SQL includes Shuffle, the number of hash buckets is highly increased and severely affects Spark SQL performance.

In the small file scenario, you can manually specify the split size of each task by the following configurations to avoid generating a large number of tasks and improve performance.

note

If the SQL does not include Shuffle, the configurations do not significantly improve performance.

Procedure

Adjust the following parameters in the spark-defaults.conf file on the client:

Table 1-3 Parameters

Parameter

Description

Default    Value

spark.sql.small.file.combine

Determine whether to enable small file optimization. True indicates that the small file optimization is enabled to avoid generating a large number of small tasks.

false

spark.sql.small.file.split.size

Specify the expected split size of single task after small files are combined.

Unit: Byte

256000000

 

1.1.1.4 Optimizing the INSERT...SELECT Operation

Scenario

The INSERT...SELECT operation needs to be optimized if any of the following conditions is true:

l   Many small files need to be queried.

l   A few large files need to be queried.

l   The INSERT...SELECT operation is performed by a non-spark user in beeline/JDBCServer mode.

Procedure

Optimize the INSERT...SELECT operation as follows:

l   If the table to be created is the Hive table, set the storage type to Parquet. This enables INSERT...SELECT statements to be run faster.

l   Perform the INSERT...SELECT operation as a spark-sql user or spark user (if in beeline/JDBCServer mode). In this way, it is no longer necessary to change the file owner repeatedly, accelerating the execution of INSERT...SELECT statements.

note

In beeline/JDBCServer mode, the executor user is the same as the driver user. The driver user is a spark user because the driver is a part of JDBCServer service and started by a spark user. If the beeline user is not a spark user, the file owner must be changed to the beeline user (actual user) because the executor is unaware of the beeline user.

l   If many small files need to be queried, set spark.sql.small.file.combine to true. This combines the small files generated by map operations into fewer files and accelerates file renaming, ultimately enabling INSERT...SELECT statements to be run faster.

l   If a few large files need to be queried, set spark.sql.small.file.combine to true and set spark.sql.small.file.split.size to an appropriate value. On the one hand, the small files generated by map operations can be combined into fewer files before being renamed; on the other hand, the file size can be well controlled. All these ultimately accelerate the execution of INSERT...SELECT statements.

note

The preceding optimizations are not a one-size-fits-all solution. In the following scenarios, it still takes long to perform the INSERT...SELECT operation:

l  The dynamic partitioned table contains many partitions.

l  Many large files need to be queried.

Even if combination of small files is enabled, many files will be generated, slowing down execution of INSERT...SELECT statements.

 


This article contains more resources

You need to log in to download or view. No account? Register

x

Spark:Case 11: Optimizing SQL and DataFrame-2781161-1
View more
  • x
  • convention:

Thank you!
View more
  • x
  • convention:

it would be good to add the link to the document as well.
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.