Hello, everyone!
The post will share with you three modes for SparkSQL to read MySQL.
Test version: FusionInsight HD 8.0.2
Example description:
SparkSQL reads MySQL data to DataFrame.
The following are scala languages:
Method 1:
package com.huawei.bigdata.spark.demo import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} object TestReadMysql1 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("TestMySQL1") .config("spark.sql.shuffle.partitions", 1).getOrCreate() val properties = new Properties(); properties.setProperty("user", "root") properties.setProperty("password", "yourpassword") val person: DataFrame = spark.read.jdbc("jdbc:mysql://172.X.X.X:3306/sparktest", "person", properties) person.show() spark.read .jdbc("jdbc:mysql://172.16.4.170:3306/sparktest", "(select person.id,person.name,person.age,score.score from person,score where person.id = score.id) T", properties) .show() spark.close() } }
Method 2:
package com.huawei.bigdata.spark.demo import org.apache.spark.sql.{DataFrame, SparkSession} object TestReadMysql2 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("TestMySQL2") .config("spark.sql.shuffle.partitions", 1).getOrCreate() val map: Map[String, String] = Map[String, String]( elems = "url" -> "jdbc:mysql://172.X.X.X:3306/sparktest", "driver" -> "com.mysql.cj.jdbc.Driver", "user" -> "root", "password" -> "yourpassword", "dbtable" -> "person" ) val score: DataFrame = spark.read.format("jdbc").options(map).load score.printSchema() score.show() spark.close() } }
Method 3:
package com.huawei.bigdata.spark.demoimport org.apache.spark.sql.{DataFrame, DataFrameReader, SparkSession} object TestReadMysql3 { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder().appName("TestMySQL3") .config("spark.sql.shuffle.partitions", 1).getOrCreate() val reader: DataFrameReader = spark.read.format("jdbc") .option("url", "jdbc:mysql://172.X.X.X:3306/sparktes") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "yourpassword") .option("dbtable", "person") val source2: DataFrame = reader.load() source2.show() spark.close() } }
Command Reference for Submitting a Task
1. The sample code in the attachment executes the maven package and obtains the spark-mysql-demo-1.0-SNAPSHOT.jar package.
2. Create a test directory on the client node, for example, /opt/haosuwei, and upload spark-mysql-demo-1.0-SNAPSHOT.jar to the directory.
3. Download the authentication credential user.keytab of the authentication user (for example, user developuser) from the FusionInsight Manager page and upload it to the directory.
4. Create the run.sh script.
#!/bin/bash source /opt/client/bigdata_env developuser ./user.keytab #spark-submit --master yarn --deploy-mode client --jars ./mysql-connector-java-6.0.6.jar --class com.huawei.bigdata.spark.demo.TestReadMysql2 spark-mysql-demo-1.0-SNAPSHOT.jar spark-submit --master yarn --deploy-mode cluster --jars ./mysql-connector-java-6.0.6.jar --class com.huawei.bigdata.spark.demo.TestReadMysql2 spark-mysql-demo-1.0-SNAPSHOT.jar
That's all, thanks!