This post refers to the procedure of accessing Spark SQL programs via JDBC. Please find more details on the topic as you read further down.
1.1.1 Case 4: Accessing Spark SQL Programs Using JDBC
1.1.1.1 Scenario
Applicable Versions
FusionInsight HD V100R002C70, FusionInsight HD V100R002C80
Scenario
You can customize JDBCServer clients and use JDBC connections to create, load, query, and delete tables.
Data Planning
Step 1 Ensure that the JDBCServer service is started in the multi-active instance mode and at least one instance provides services for external systems. Create the /home/data file on each available JDBCServer node. The content of the file is as follows:
M***da,32
Karlie,23
Candice,27
Step 2 Ensure that the user who starts JDBCServer has permissions to read and write the file.
Step 3 Ensure that the hive-site.xml file exists in the classpath of the client and set the required parameters based on the actual cluster situation. For details about JDBCServer parameters, see "JDBCServer Interface".
----End
1.1.1.2 Development Guidelines
1. Create the child table in the defaultdatabase.
2. Load the data in the /home/data table to the child table.
3. Query data in the child table.
4. Delete the child table.
1.1.1.3 Sample Code Description
1.1.1.3.1 Java Code Example
Function
The JDBC API of the user-defined client is used to submit a data analysis task and return the results.
Sample Code
Step 1 Define an SQL statement. The SQL statement must be a single statement. Note that the statement cannot contain a semicolon (;). Example:
ArrayList<String> sqlList = new
ArrayList<String>();
sqlList.add("CREATE TABLE CHILD (NAME STRING, AGE INT) ROW FORMAT
DELIMITED FIELDS TERMINATED BY ','");
sqlList.add("LOAD DATA LOCAL INPATH '/home/data' INTO TABLE
CHILD");
sqlList.add("SELECT * FROM child");
sqlList.add("DROP TABLE child");
executeSql(url, sqlList);
![]()
The data file in the sample project must be placed in the home directory of the host where the JDBCServer is located.
Step 2 Build JDBC URL.
String securityConfig =
";saslQop=auth-conf;auth=KERBEROS;principal=spark/hadoop.hadoop.com@HADOOP.COM"
+ ";";
Configuration config = new Configuration();
config.addResource(new Path(args[0]));
String zkUrl = config.get("spark.deploy.zookeeper.url");
String zkNamespace = null;
zkNamespace = fileInfo.getProperty("spark.thriftserver.zookeeper.namespace");
if (zkNamespace != null) {
//Delete redundant characters from configuration items.
zkNamespace = zkNamespace.substring(1);
}
StringBuilder sb = new StringBuilder("jdbc:hive2://"
+ zkUrl
+
";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace="
+ zkNamespace
+ securityConfig);
String url = sb.toString();String securityConfig =
';saslQop=auth-conf;auth=KERBEROS;principal=spark/hadoop.hadoop.com@HADOOP.COM'
+ ';'; Configuration config = new Configuration (); config.addResource (new
Path (args[0])); String zkUrl = config.get ('spark.deploy.zookeeper.url');
String zkNamespace = null; zkNamespace = fileInfo.getProperty
('spark.thriftserver.zookeeper.namespace'); if (zkNamespace != null) {// Delete
redundant characters from the configuration item zkNamespace =
zkNamespace.substring (1);} StringBuilder sb = new StringBuilder
('jdbc:hive2://' + zkUrl +
';serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=' + zkNamespace +
securityConfig); String url = sb.toString ();
![]()
After the KERBEROS authentication is successful, the default validity period is one day. If the client needs to connect to the JDBCServer after the validity period expires, the client needs to be authenticated again. Otherwise, the authentication fails. Therefore, if a connection needs to be created for a long time, you need to add the user.principal and user.keytab authentication information to the URL to ensure that the authentication is successful when the connection is established each time. For example, you need to add the following information to the URL: user.principal=sparkuser;user.keytab=/opt/client/user.keytab.
Step 3 Load the Hive JDBC driver.
Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
Step 4 Obtain the JDBC connection, execute the HiveQL statement, return the queried column name and results to the console, and close the JDBC connection.
You can also use the configuration item spark.deploy.zookeeper.urlin the configuration file to replace zk.quorum in the connection string.
In network congestion, configure a timeout interval for a connection between the client and JDBCServer to avoid a client suspension due to timeless wait of the return result from the server. The configuration method is as follows:
Before using the DriverManager.getConnectionmethod to obtain the JDBC connection, add the DriverManager.setLoginTimeout(n)method to configure a timeout interval. n indicates the timeout interval for waiting for the return result from the server. The unit is second, the type is Int, and the default value is 0 (indicating never timing out).
static void executeSql(String url,
ArrayList<String> sqls) throws ClassNotFoundException, SQLException
{
try {
Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
} catch (Exception e) {
e.printStackTrace();
}
Connection connection =
null;
PreparedStatement statement =
null;
try {
connection =
DriverManager.getConnection(url);
for (int i
=0 ; i < sqls.size(); i++) {
String sql = sqls.get(i);
System.out.println("---- Begin executing sql: " + sql + "
----");
statement
= connection.prepareStatement(sql);
ResultSet result = statement.executeQuery();
ResultSetMetaData resultMetaData = result.getMetaData();
Integer colNum = resultMetaData.getColumnCount();
for (int j =1; j <= colNum; j++) {
System.out.println(resultMetaData.getColumnLabel(j) +
"\t");
}
System.out.println();
while (result.next()) {
for
(int j =1; j <= colNum; j++){
System.out.println(result.getString(j) + "\t");
}
System.out.println();
}
System.out.println("---- Done executing sql: " + sql + "
----");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null !=
statement) {
statement.close();
}
if (null !=
connection) {
connection.close();
}
}
}
----End
1.1.1.3.2 Scala Code Example
Function
The JDBC API of the user-defined client is used to submit a data analysis task and return the results.
Sample Code
Step 1 Define an SQL statement. The SQL statement must be a single statement. Note that the statement cannot contain a semicolon (;). Example:
val sqlList = new
ArrayBuffer[String]
sqlList += "CREATE TABLE CHILD (NAME STRING, AGE INT) " +
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ','"
sqlList += "LOAD DATA LOCAL INPATH '/home/data' INTO TABLE
CHILD"
sqlList += "SELECT * FROM child"
sqlList += "DROP TABLE child"
![]()
The data file in the sample project must be placed in the home directory of the host where the JDBCServer is located.
Step 2 Build JDBC URL.
val securityConfig: String =
";saslQop=auth-conf;auth=KERBEROS;principal=spark/hadoop.hadoop.com@HADOOP.COM"
+ ";"
val config: Configuration = new Configuration
config.addResource(new Path(args(0)))
val zkUrl: String =
config.get("spark.deploy.zookeeper.url")
var zkNamespace: String = null
zkNamespace =
fileInfo.getProperty("spark.thriftserver.zookeeper.namespace")
//Delete redundant characters from configuration items.
if (zkNamespace != null) zkNamespace =
zkNamespace.substring(1)
val sb = new StringBuilder("jdbc:hive2://"
+ zkUrl
+
";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace="
+ zkNamespace
+ securityConfig)
val url = sb.toString()val securityConfig: String =
';saslQop=auth-conf;auth=KERBEROS;principal=spark/hadoop.hadoop.com@HADOOP.COM'
+ ';' val config: Configuration = new Configuration config.addResource (new
Path (args (0))) val zkUrl: String = config.get ('spark.deploy.zookeeper.url')
var zkNamespace: String = null zkNamespace = fileInfo.getProperty
('spark.thriftserver.zookeeper.namespace') // Delete redundant characters from
the configuration item if (zkNamespace != null) zkNamespace =
zkNamespace.substring (1) val sb = new StringBuilder ('jdbc:hive2://' + zkUrl +
';serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=' + zkNamespace +
securityConfig) val url = sb.toString ()
![]()
After the KERBEROS authentication is successful, the default validity period is one day. If the client needs to connect to the JDBCServer after the validity period expires, the client needs to be authenticated again. Otherwise, the authentication fails. Therefore, if a connection needs to be created for a long time, you need to add the user.principal and user.keytab authentication information to the URL to ensure that the authentication is successful when the connection is established each time. For example, you need to add the following information to the URL: user.principal=sparkuser;user.keytab=/opt/client/user.keytab.
Step 3 Load the Hive JDBC driver, obtain the JDBC connection, execute the HQL, return the queried column name and result to the console, and close the JDBC connection.
You can also use the configuration itemspark.deploy.zookeeper.url in the configuration file to replace zk.quorumin the connection string.
In network congestion, configure a timeout interval for a connection between the client and JDBCServer to avoid a client suspension due to timeless wait of the return result from the server. The configuration method is as follows:
Before using the DriverManager.getConnectionmethod to obtain the JDBC connection, add the DriverManager.setLoginTimeout(n)method to configure a timeout interval. n indicates the timeout interval for waiting for the return result from the server. The unit is second, the type is Int, and the default value is 0 (indicating never timing out).
def executeSql (url: String, sqls:
Array[String]): Unit = {// Load the Hive JDBC
driver.Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance()
var connection: Connection = null
var statement: PreparedStatement = null
try {
connection = DriverManager.getConnection(url)
for (sql <- sqls) {
println(s"---- Begin executing sql: $sql
----")
statement = connection.prepareStatement(sql)
val result = statement.executeQuery()
val resultMetaData = result.getMetaData
val colNum = resultMetaData.getColumnCount
for (i <- 1 to colNum) {
print(resultMetaData.getColumnLabel(i) +
"\t")
}
println()
while (result.next()) {
for (i <- 1 to colNum) {
print(result.getString(i) +
"\t")
}
println()
}
println(s"---- Done executing sql: $sql ----")
}
} finally {
if (null != statement) {
statement.close()
}
if (null != connection) {
connection.close()
}
}
}
----End
1.1.1.4 Obtaining Sample Code
Using the FusionInsight Client
Obtain the sample project in the sampleCodedirectory in the Spark directory in the FusionInsight_Services_ClientConfigfile extracted from the client.
Security mode: SparkThriftServerJavaExample and SparkThriftServerScalaExample in the spark-examples-security directory
Non-security mode: SparkThriftServerJavaExample and SparkThriftServerScalaExample in the spark-examples-normaldirectory
Using the Maven Project
log in to Huawei DevClod (https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home) to download code udner to local PC.
Security mode:
components/spark/spark-examples-security/SparkJavaExample
components/spark/spark-examples-security/SparkScalaExample
Non-security mode:
components/spark/spark-examples-normal/SparkJavaExample
components/spark/spark-examples-normal/SparkScalaExample
1.1.1.5 Application Commissioning
1.1.1.5.1 Compiling and Running the Application
Scenario
After the program code is developed, you can upload the code to the Linux client for running. The running procedures of applications developed in Scala or Java are the same.
![]()
l The Spark application can run only in the Linux environment but not in the Windows environment.
l The Spark application developed in Python does not need to build Artifacts as a jar. You just need to copy the sample projects to the compiler.
l It is needed to ensure that the version of Python installed on the worker and driver is consistent, otherwise the following error will be reported: "Python in worker has different version %s than that in driver %s."
Procedure
Step 1 In the IntelliJ IDEA, configure the Artifacts information about the project before the jar is created.
1. On the main page of the IDEA, choose File> Project Structures... to enter the Project Structure page.
2. On the Project Structure page, select Artifacts, click + and choose Jar > From modules with dependencies....
Figure 1-1 Adding the Artifacts
![]()
3. Select the corresponding module. The module corresponding to the Java sample projects is CollectFemaleInfo. Click OK.
Figure 1-2 Create Jar from Modules
![]()
4. Configure the name, type and output directory of the Jar based on the actual condition.
Figure 1-3 Configuring the basic information
![]()
5. Right-click CollectFemaleInfo, choose Put into Output Root, and click Apply.
Figure 1-4 Put into Output Root
![]()
6. Click OK.
Step 2 Create the jar.
1. On the main page of the IDEA, choose Build> Build Artifacts....
Figure 1-5 Build Artifacts
![]()
2. On the displayed menu, choose CollectFemaleInfo> Build to create a jar.
Figure 1-6 Build
![]()
3. If the following information is displayed in the event log, the jar is created successfully. You can obtain the jar from the directory configured in Step 1.4.
21:25:43 Compilation completed successfully in 36 sec
Step 3 Copy the jar created in 2 to the Spark running environment (Spark client), such as /opt/hadoopclient/Spark to run the Spark application.
![]()
When a Spark task is running, it is prohibited to restart the HDFS service or restart all DataNode instances. Otherwise, the Spark task may fail, resulting in JobHistory data loss.
l Run the sample projects of Spark Core(including Scala and Java).
Access the Spark client directory and implement the bin/spark-submit script to run the codes.
<inputPath> indicates the input directory in the HDFS.
bin/spark-submit --classcom.huawei.bigdata.spark.examples.FemaleInfoCollection --master yarn-client/opt/female/FemaleInfoCollection.jar <inputPath>
l Run the sample projects of Spark SQL (Java and Scala).
Access the Spark client directory and implement the bin/spark-submit script to run the codes.
<inputPath> indicates the input directory in the HDFS.
bin/spark-submit --classcom.huawei.bigdata.spark.examples.FemaleInfoCollection --master yarn-client/opt/female/FemaleInfoCollection.jar <inputPath>
l Run the sample projects of Spark Streaming (Java and Scala).
Access the Spark client directory and implement the bin/spark-submit script to run the codes.
![]()
The location of Spark Streaming Kafka dependency package on the client is different from the location of other dependency packages. For example, the path to the Spark Streaming Kafka dependency package is $SPARK_HOME/lib/streamingClient, whereas the path to other dependency packages is $SPARK_HOME/lib. When running an application, you must add the configuration option to the spark-submit command to specify the path of Spark Streaming Kafka dependency package. The following is an example path:
--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar
Example codes of the Spark Streaming Write To Print is as follows:
bin/spark-submit --master yarn-client--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar--classcom.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /opt/female/FemaleInfoCollectionPrint.jar <checkPointDir> <batchTime> <topics> <brokers>
Example codes of the Spark Streaming Write To Kafka is as follows:
bin/spark-submit --master yarn-client--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar--classcom.huawei.bigdata.spark.examples.FemaleInfoCollectionKafka /opt/female/FemaleInfoCollectionKafka.jar <checkPointDir> <batchTime> <windowTime> <topics> <brokers>
l Run the sample projects of Accessing the Spark SQL Through JDBC (Java and Scala).
Access the Spark client directory and implement the java -cp command to run the codes.
java -cp$SPARK_HOME/lib/*:$SPARK_HOME/conf:/opt/female/ThriftServerQueriesTest.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf
![]()
In the preceding command line, you can choose the minimal runtime dependency package based on the sample projects. For details of the runtime dependency packages, see References.
l Run the Spark on HBase sample application(Java and Scala).
a. Verify that the configuration options in the Spark client configuration file spark-defaults.conf are correctly configured.
When running the Spark on HBase sample application, set the configuration option spark.hbase.obtainToken.enabledin the Spark client configuration file spark-defaults.conf to true(The default value is false. Changing the value to true does not affect existing services. If you want to uninstall the HBase service, change the value back to false first. Set the configuration option spark.inputFormat.cache.enabledto false.
Table 1-1Parameters
Parameter | Description | Default Value |
spark.hbase.obtainToken.enabled | Indicates whether to enable the function of obtaining the HBase token. | false |
spark.inputFormat.cache.enabled | Indicates whether to cache the InputFormat that maps to HadoopRDD. If the parameter is set to true, the tasks of the same Executor use the same InputFormat object. In this case, the InputFormat must be thread-safe. If caching the InputFormat is not required, set the parameter to false. | true |
b. Access the Spark client directory and implement the bin/spark-submit script to run the code.
Run sample applications in the sequence: TableCreation > TableInputData > TableOutputData.
When the TableInputData sample application is running, <inputPath> needs to be specified. <inputPath>indicates the input path in the HDFS.
bin/spark-submit --classcom.huawei.bigdata.spark.examples.TableInputData --master yarn-client/opt/female/TableInputData.jar <inputPath>
l Run the Spark Hbase to HBase sample application(Scala and Java).
Access the Spark client directory and implement the bin/spark-submit script to run the code.
bin/spark-submit --classcom.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn-client/opt/female/FemaleInfoCollection.jar
l Run the Spark Hive to HBase sample application(Scala and Java).
Access the Spark client directory and implement the bin/spark-submit script to run the code.
bin/spark-submit --classcom.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn-client/opt/female/FemaleInfoCollection.jar
l Run the Spark Streaming Kafka to HBasesample application(Scala and Java).
Access the Spark client directory and implement the bin/spark-submit script to run the code.
When the sample application is running, specify the <checkPointDir><topic><brokerList>. <checkPointDir>indicates the directory where the application result is backed up, <topic>indicates the topic that is read from Kafka, <brokerList>indicates the IP address of the Kafka server.
![]()
On the client, the directory of Spark Streaming Kafka dependency package is different from the directory of other dependency packages. For example, the directory of another dependency package is $SPARK_HOME/lib and the directory of a Spark Streaming Kafka dependency package is $SPARK_HOME/lib/streamingClient. Therefore, when running the application, add the configuration option in the spark-submitcommand to specify the directory for the Spark Streaming Kafka dependency package, for example, --jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar.
Example code of Spark Streaming To HBase
bin/spark-submit --master yarn-client --jars$SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar --classcom.huawei.bigdata.spark.examples.streaming.SparkOnStreamingToHbase /opt/female/FemaleInfoCollectionPrint.jar <checkPointDir> <topic> <brokerList>
l Submit the application developed in Python.
Access the Spark client directory and implement the bin/spark-submit script to run the codes.
<inputPath> indicates the input directory in the HDFS.
![]()
Because the sample code does not contain the authentication information, specify the authentication information by configuring the spark.yarn.keytab and spark.yarn.principle when the application is run.
bin/spark-submit --master yarn-client --conf spark.yarn.keytab=/opt/FIclient/user.keytab --conf spark.yarn.principal=sparkuser/opt/female/SparkPythonExample/collectFemaleInfo.py <inputPath>
----End
References
The runtime dependency packages for the sample projects of Accessing the Spark SQL Through JDBC (Java and Scala) are as follows:
l The sample projects of Accessing the Spark SQL Through JDBC (Scala):
− avro-1.7.7.jar
− commons-collections-3.2.2.jar
− commons-configuration-1.6.jar
− commons-io-2.4.jar
− commons-lang-2.6.jar
− commons-logging-1.1.3.jar
− guava-12.0.1.jar
− hadoop-auth-2.7.2.jar
− hadoop-common-2.7.2.jar
− hadoop-mapreduce-client-core-2.7.2.jar
− hive-exec-1.2.1.spark.jar
− hive-jdbc-1.2.1.spark.jar
− hive-metastore-1.2.1.spark.jar
− hive-service-1.2.1.spark.jar
− httpclient-4.5.2.jar
− httpcore-4.4.4.jar
− libthrift-0.9.3.jar
− log4j-1.2.17.jar
− slf4j-api-1.7.10.jar
− zookeeper-3.5.1.jar
− scala-library-2.10.4.jar
l The sample projects of Accessing the Spark SQL Through JDBC (Java):
− commons-collections-3.2.2.jar
− commons-configuration-1.6.jar
− commons-io-2.4.jar
− commons-lang-2.6.jar
− commons-logging-1.1.3.jar
− guava-2.0.1.jar
− hadoop-auth-2.7.2.jar
− hadoop-common-2.7.2.jar
− hadoop-mapreduce-client-core-2.7.2.jar
− hive-exec-1.2.1.spark.jar
− hive-jdbc-1.2.1.spark.jar
− hive-metastore-1.2.1.spark.jar
− hive-service-1.2.1.spark.jar
− httpclient-4.5.2.jar
− httpcore-4.4.4.jar
− libthrift-0.9.3.jar
− log4j-1.2.17.jar
− slf4j-api-1.7.10.jar
− zookeeper-3.5.1.jar
1.1.1.5.2 Checking the Commissioning Result
Scenario
After a Spark application is run, you can check the running result through one of the following methods:
l Viewing the command output.
l Logging in to the Spark WebUI.
l Viewing Spark logs.
Procedure
l Check the operating result data of the Spark application.
The data storage directory and format are specified by users in the Spark application. You can obtain the data in the specified file.
l Check the status of the Spark application.
The Spark contains the following two Web UIs:
− The Spark UI displays the status of applications being executed.
The Spark UI contains the Spark Jobs, Spark Stages, Storage, Environment, and Executorsparts. Besides these parts, Spark Streaming is displayed for the Streaming application.
Access to the interface: On the Web UI of the YARN, find the corresponding Spark application, and click the final column ApplicationMaster of the application information to access the Spark UI.
− The History Server UI displays the status of all Spark applications.
The History Server UI displays information such as the application ID, application name, start time, end time, execution time, and user to whom the application belongs. After the application ID is clicked, the Spark UI of the application is displayed.
l View Spark logs to learn application running conditions.
The logs of Spark offers immediate visibility into application running conditions. You can adjust application programs based on the logs. Log related information can be referenced to Spark in the Log Description in the Administrator Guide.


