Got it

Spark:Case 3: Example of Connecting the Spark Streaming to the Kafka0-8

Latest reply: Jul 7, 2022 03:42:23 692 2 1 0 0

1.1.1 Case 3: Example of Connecting the Spark Streaming to the Kafka0-8

1.1.1.1 Scenario

Applicable Versions

FusionInsight HD V100R002C70, FusionInsight HD V100R002C80

Scenario

Develop a Spark application to perform the following operations on logs about netizens who dwell on online shopping on a weekend.

l   Collect statistics on female netizens who continuously dwell on online shopping for over half an hour.

l   The first column in the log file records names, the second column records gender, and the third column records the dwell duration in the unit of minute. Three columns are separated by comma (,).

log1.txt: logs collected on Saturday

LiuYang,female,20  
YuanJing,male,10  
GuoYijun,male,5  
CaiXuyu,female,50  
Liyuan,male,20  
FangBo,female,50  
LiuYang,female,20  
YuanJing,male,10  
GuoYijun,male,50  
CaiXuyu,female,50  
FangBo,female,60

log2.txt: logs collected on Sunday

LiuYang,female,20  
YuanJing,male,10  
CaiXuyu,female,50  
FangBo,female,50  
GuoYijun,male,5  
CaiXuyu,female,50  
Liyuan,male,20  
CaiXuyu,female,50  
FangBo,female,50  
LiuYang,female,20  
YuanJing,male,10  
FangBo,female,50  
GuoYijun,male,50  
CaiXuyu,female,50  
FangBo,female,60

Data Planning

The data of the Spark Streaming sample project is stored in the Kafka component. A user having the Kafka permission is required.

                               Step 1      Create two text files input_data1.txt and input_data2.txt on the local computer, and copy log1.txt to input_data1.txt and log2.txt to input_data2.txt.

                               Step 2      Create the /home/data directory on the client installation node. Upload the preceding two files to the /home/data directory.

                               Step 3      Set the allow.everyone.if.no.acl.found parameter of Kafka Broker to true.

                               Step 4      Start the Producer of Kafka to send data to Kafka.

java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}

----End

1.1.1.2 Development Guidelines

Collect statistics on the information about female netizens who spend more than half an hour on the weekend in the log file.

The operation is performed in three steps:

l   Receive data from Kafka and generate the corresponding DStream.

l   Filter data information of the time that female netizens spend online.

l   Filter data about netizens whose consecutive online duration exceeds the threshold, and obtain the results.

1.1.1.3 Sample Code Description

1.1.1.3.1 Java Code Example

Function

Collect statistics on female netizens who continuously dwell on online shopping for over half an hour. Print statistics or write them to Kafka.

Spark Streaming Write To Print Sample Code

The following code snippets are used as an example. For complete code, see com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint.

// Parameter parsing:

// <checkPointDir> is the checkpoint directory.

// <batchTime> is the interval for processing Spark Streaming in batches.

// <topics> is the theme subscribed in the Kafka. Use commas (,) to separate multiple themes.

// <brokers> is the Kafka address for obtaining metadata.

public class FemaleInfoCollectionPrint {

    public static void main(String[] args) throws Exception {

        String checkPointDir = args[0];

        String batchTime = args[1];

        String topics = args[2];

        String brokers = args[3]; 

        Duration batchDuration = Durations.seconds(Integer.parseInt(batchTime));

        SparkConf conf = new SparkConf().setAppName("DataSightStreamingExample");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration);

// Set the CheckPoint directory of Spark Streaming.

        jssc.checkpoint(checkPointDir);

// Assemble the Kafka topic list.

        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));

      HashMap<String, String> kafkaParams = new HashMap<String, String>();

        kafkaParams.put("metadata.broker.list", brokers);

// Create kafka stream using brokers and topics.

// 1. Receive data from Kafka and generate the corresponding DStream.

      JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc,String.class,String.class,

                StringDecoder.class,  StringDecoder.class, kafkaParams, topicsSet).map(

                new Function<Tuple2<String, String>, String>() {

                    public String call(Tuple2<String, String> tuple2) {

                        return tuple2._2();

                    }

                }

        );

// 2. Obtain the field attribute of each row.

        JavaDStream<Tuple3<String, String, Integer>> records = lines.map(

                new Function<String, Tuple3<String, String, Integer>>() {

                    public Tuple3<String, String, Integer> call(String line) throws Exception {

                        String[] elems = line.split(",");

                        return new Tuple3<String, String, Integer>(elems[0], elems[1], Integer.parseInt(elems[2]));

                    }

                }

        );

// 3. Screen female netizens' Internet access time data.

        JavaDStream<Tuple2<String, Integer>> femaleRecords = records.filter(new Function<Tuple3<String, String, Integer>, Boolean>() {

            public Boolean call(Tuple3<String, String, Integer> line) throws Exception {

                if (line._2().equals("female")) {

                    return true;

                } else {

                    return false;

                }

            }

        }).map(new Function<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {

            public Tuple2<String, Integer> call(Tuple3<String, String, Integer> stringStringIntegerTuple3) throws Exception {

                return new Tuple2<String, Integer>(stringStringIntegerTuple3._1(), stringStringIntegerTuple3._3());

            }

        });

        JavaDStream<Tuple2<String, Integer>> upTimeUser = femaleRecords.filter(new Function<Tuple2<String, Integer>, Boolean>() {

            public Boolean call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {

                if (stringIntegerTuple2._2() > 30) {

                    return true;

                } else {

                    return false;

                }

            }

        });

// 4. Filter the users whose online duration exceeds the threshold and obtain the result.

        upTimeUser.print();

// 5. The Spark Streaming system is started.

        jssc.start();

        jssc.awaitTermination();

    }

Spark Streaming Write To Kafka Sample Code

The following code snippets are used as an example. For complete code, see the com.huawei.bigdata.spark.examples.FemaleInfoCollectionKafka.

note

After the Spark is upgraded, the new API createDirectStream is recommended. The old API createStream still exists, but the performance and stability are poor. You are advised not to use the old API to develop applications.

// Parameter parsing:

// <checkPointDir> is the checkpoint directory.

// <batchTime> is the interval for processing Spark Streaming in batches.

// <windowTime> is the time span of the statistics data. The unit is second.

// <topics> is the theme subscribed in the Kafka. Use commas (,) to separate multiple themes.

// <brokers> is the Kafka address for obtaining metadata.

public class FemaleInfoCollectionKafka {

    public static void main(String[] args) throws Exception {

        String checkPointDir = args[0];

        String batchTime = args[1];

        final String windowTime = args[2];

        String topics = args[3];

        String brokers = args[4]; 

        Duration batchDuration = Durations.seconds(Integer.parseInt(batchTime));

        Duration windowDuration = Durations.seconds(Integer.parseInt(windowTime));

        SparkConf conf = new SparkConf().setAppName("DataSightStreamingExample");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, batchDuration);

// Set the CheckPoint directory of Spark Streaming. This parameter is mandatory because the window concept exists.

        jssc.checkpoint(checkPointDir);

// Assemble the Kafka topic list.

        HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));

        HashMap<String, String> kafkaParams = new HashMap<String, String>();

        kafkaParams.put("metadata.broker.list", brokers);

// Create kafka stream using brokers and topics.

// 1. Receive data from the Kafka and generats the corresponding DStream.

        JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc,String.class,String.class,

                StringDecoder.class,  StringDecoder.class, kafkaParams, topicsSet).map(

                new Function<Tuple2<String, String>, String>() {

                    @Override

                    public String call(Tuple2<String, String> tuple2) {

                        return tuple2._2();

                    }

                }

        );

// 2. Obtain the field attribute of each row.

        JavaDStream<Tuple3<String, String, Integer>> records = lines.map(

                new Function<String, Tuple3<String, String, Integer>>() {

                    @Override

                    public Tuple3<String, String, Integer> call(String line) throws Exception {

                        String[] elems = line.split(",");

                        return new Tuple3<String, String, Integer>(elems[0], elems[1], Integer.parseInt(elems[2]));

                    }

                }

        );

// 3. Screen female netizens' Internet access time data.

        JavaDStream<Tuple2<String, Integer>> femaleRecords = records.filter(new Function<Tuple3<String, String, Integer>, Boolean>() {

            public Boolean call(Tuple3<String, String, Integer> line) throws Exception {

                if (line._2().equals("female")) {

                    return true;

                } else {

                    return false;

                }

            }

        }).map(new Function<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {

            public Tuple2<String, Integer> call(Tuple3<String, String, Integer> stringStringIntegerTuple3) throws Exception {

                return new Tuple2<String, Integer>(stringStringIntegerTuple3._1(), stringStringIntegerTuple3._3());

            }

        });

// 4. Summarize the Internet access time of each female in a time window.

        JavaPairDStream<String, Integer> aggregateRecords = JavaPairDStream.fromJavaDStream(femaleRecords)

                .reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {

                    public Integer call(Integer integer, Integer integer2) throws Exception {

                        return integer + integer2;

                    }

                }, new Function2<Integer, Integer, Integer>() {

                    public Integer call(Integer integer, Integer integer2) throws Exception {

                        return integer - integer2;

                    }

                }, windowDuration, batchDuration);

// 5. Filter the users whose online duration exceeds the threshold.

        JavaDStream<Tuple2<String, Integer>> upTimeUser = aggregateRecords.filter(new Function<Tuple2<String, Integer>, Boolean>() {

            public Boolean call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {

                if (stringIntegerTuple2._2() > 0.9 * Integer.parseInt(windowTime)) {

                    return true;

                } else {

                    return false;

                }

            }

        }).toJavaDStream();

 // 6. Configure the kafka attribute.

        Properties producerConf = new Properties();

        producerConf.put("serializer.class", "kafka.serializer.DefaultEncoder");

        producerConf.put("key.serializer.class", "kafka.serializer.StringEncoder");

        producerConf.put("metadata.broker.list", brokers);

        producerConf.put("request.required.acks", "1");

 // 7. Send the result as a message to the Kafka. The message subject name is "Default" and is randomly sent to a partition.

        JavaDStreamKafkaWriterFactory.fromJavaDStream(upTimeUser).writeToKafka(producerConf,

                new ProcessingFunc());

// 8. The Spark Streaming system is started.

        jssc.start();

        jssc.awaitTermination();

    }

    }

    static class ProcessingFunc implements Function<Tuple2<String, Integer>, KeyedMessage<String, byte[]>> {

        public KeyedMessage<String, byte[]> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {

            String words = stringIntegerTuple2._1() + "," + stringIntegerTuple2._2().toString();

            return new KeyedMessage<String, byte[]>("default", null, words.getBytes());

        }

    }

}

1.1.1.3.2 Scala Code Example

Function

Collect statistics on female netizens who continuously dwell on online shopping for over half an hour. Print statistics or write them to Kafka.

Spark Streaming Write To Print Sample Code

The following code snippets are used as an example. For complete code, see com.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint.

// Parameter parsing:

// <checkPointDir> is the checkpoint directory.

// <batchTime> is the interval for processing Spark Streaming in batches.

// <topics> is the theme subscribed in the Kafka. Use commas (,) to separate multiple themes.

// <brokers> is the Kafka address for obtaining metadata.

    val Array(checkPointDir, batchTime, topics, brokers) = args

    val batchDuration = Seconds(batchTime.toInt)

// Set up the Spark Streaming startup environment.

    val sparkConf = new SparkConf()

    sparkConf.setAppName("DataSightStreamingExample")

    val ssc = new StreamingContext(sparkConf, batchDuration)

// Set the CheckPoint directory of Spark Streaming.

    ssc.checkpoint(checkPointDir)

// Assemble the Kafka topic list.

     val topicsSet = topics.split(",").toSet

// Create kafka stream using brokers and topics.

// 1. Receive data from the Kafka and generate the corresponding DStream.

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

      ssc, kafkaParams, topicsSet).map(_._2)

// 2. Obtain the field attribute of each row.

// 3. Filter the data of female netizens' Internet access time.

    val femaleRecords = records.filter(_._2 == "female")

      .map(x => (x._1, x._3))

// 4. Filter the users whose online duration exceeds the threshold and obtain the result.

 // 5. The Spark Streaming system is started.

  ssc.start()

    ssc.awaitTermination()

The preceding code cites the following functions:

// Obtain the field function.

  def getRecord(line: String): (String, String, Int) = {

    val elems = line.split(",")

    val name = elems(0)

    val sexy = elems(1)

    val time = elems(2).toInt

    (name, sexy, time)

  }

Spark Streaming Write To Kafka Sample Code

The following code snippets are used as an example. For complete code, see the com.huawei.bigdata.spark.examples.FemaleInfoCollectionKafka.

note

After the Spark is upgraded, the new API createDirectStream is recommended. The old API createStream still exists, but the performance and stability are poor. You are advised not to use the old API to develop applications.

// Parameter parsing:

// <checkPointDir> is the checkpoint directory.

// <batchTime> is the interval for processing Spark Streaming in batches.

// <windowTime> is the time span of the statistics data. The unit is second.

// <topics> is the theme subscribed in the Kafka. Use commas (,) to separate multiple themes.

// <brokers> is the Kafka address for obtaining metadata.

    val Array(checkPointDir, batchTime, windowTime, topics, brokers) = args

    val batchDuration = Seconds(batchTime.toInt)

    val windowDuration = Seconds(windowTime.toInt)

// Set up the Spark Streaming startup environment.

    val sparkConf = new SparkConf()

    sparkConf.setAppName("DataSightStreamingExample")

    val ssc = new StreamingContext(sparkConf, batchDuration)

// Set the CheckPoint directory of Spark Streaming. This parameter is mandatory because the window concept exists.

    ssc.checkpoint(checkPointDir)

// Assemble the Kafka topic list.

    val topicsSet = topics.split(",").toSet

// Create kafka stream using brokers and topics.

// 1. Receive data from the Kafka and generate the corresponding DStream.

    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

      ssc, kafkaParams, topicsSet).map(_._2)

// 2. Obtain the field attribute of each row.

    val records = lines.map(getRecord)

// 3. Filter the data of female netizens' Internet access time.

    val femaleRecords = records.filter(_._2 == "female")

      .map(x=>(x._1, x._3))

// 4. Summarize the Internet access time of each female in a time window.

    val aggregateRecords = femaleRecords

      .reduceByKeyAndWindow(_ + _, _ - _, windowDuration)

// 5. Filter the users whose online duration exceeds the threshold.

    val upTimeUser=aggregateRecords

      .filter(_._2 > 0.9 * windowTime.toInt)

// 6. Configure the kafka attribute.

    val producerConf = new Properties()

    producerConf.put("serializer.class","kafka.serializer.DefaultEncoder")

    producerConf.put("key.serializer.class","kafka.serializer.StringEncoder")

    producerConf.put("metadata.broker.list", brokers)

    producerConf.put("request.required.acks", "1")

// 7. Send the result as a message to the Kafka. The message subject name is "default" and is randomly sent to a partition.

  upTimeUser.writeToKafka (producerConf, (x: (String, Long) ) => {

      val t = x._1 + "," + x._2.toString

      new KeyedMessage[String, Array[Byte]]("default", null, t.getBytes())

    })

// 8. The Spark Streaming system is started.

    ssc.start()

    ssc.awaitTermination()

The preceding code cites the following functions:

// Obtain the field function.

  def getRecord(line: String): (String, String, Int) = {

    val elems = line.split(",")

    val name = elems(0)

    val sexy = elems(1)

    val time = elems(2).toInt

    (name, sexy, time)

  }

1.1.1.4 Obtaining Sample Code

Using the FusionInsight Client

Obtain the sample project in the sampleCode directory in the Spark directory in the FusionInsight_Services_ClientConfig file extracted from the client.

Security mode: SparkStreamingJavaExample and SparkStreamingScalaExample in the spark-examples-security directory

Non-security mode: SparkStreamingJavaExample and SparkStreamingScalaExample in the spark-examples-normal directory

Using the Maven Project

log in to Huawei DevClod (https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home) to download code udner to local PC.

Security mode:

components/spark/spark-examples-security/SparkJavaExample

components/spark/spark-examples-security/SparkScalaExample

Non-security mode:

components/spark/spark-examples-normal/SparkJavaExample

components/spark/spark-examples-normal/SparkScalaExample

1.1.1.5 Application Commissioning

1.1.1.5.1 Compiling and Running the Application

Scenario

After the program code is developed, you can upload the code to the Linux client for running. The running procedures of applications developed in Scala or Java are the same.

note

l  The Spark application can run only in the Linux environment but not in the Windows environment.

l  The Spark application developed in Python does not need to build Artifacts as a jar. You just need to copy the sample projects to the compiler.

l  It is needed to ensure that the version of Python installed on the worker and driver is consistent, otherwise the following error will be reported: "Python in worker has different version %s than that in driver %s."

Procedure

                               Step 1      In the IntelliJ IDEA, configure the Artifacts information about the project before the jar is created.

1.         On the main page of the IDEA, choose File > Project Structures... to enter the Project Structure page.

2.         On the Project Structure page, select Artifacts, click + and choose Jar > From modules with dependencies....

Figure 1-1 Adding the Artifacts

101946de0m0od3d087gwgi.png

 

3.         Select the corresponding module. The module corresponding to the Java sample projects is CollectFemaleInfo. Click OK.

Figure 1-2 Create Jar from Modules

101947j8l8a89c892lr9lz.png

 

4.         Configure the name, type and output directory of the Jar based on the actual condition.

Figure 1-3 Configuring the basic information

101948pln0xytu0cxxsvvg.png

 

5.         Right-click CollectFemaleInfo, choose Put into Output Root, and click Apply.

Figure 1-4 Put into Output Root

101949p02z03ezvevbv00n.png

 

6.         Click OK.

                               Step 2      Create the jar.

1.         On the main page of the IDEA, choose Build > Build Artifacts....

Figure 1-5 Build Artifacts

101950czgu6tta6uuiau6h.png

 

2.         On the displayed menu, choose CollectFemaleInfo > Build to create a jar.

Figure 1-6 Build

101951ue0enztqe0uq6zu0.png

 

3.         If the following information is displayed in the event log, the jar is created successfully. You can obtain the jar from the directory configured in Step 1.4.

21:25:43 Compilation completed successfully in 36 sec

                               Step 3      Copy the jar created in 2 to the Spark running environment (Spark client), such as /opt/hadoopclient/Spark to run the Spark application.

 

Notice

When a Spark task is running, it is prohibited to restart the HDFS service or restart all DataNode instances. Otherwise, the Spark task may fail, resulting in JobHistory data loss.

l   Run the sample projects of Spark Core (including Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

<inputPath> indicates the input directory in the HDFS.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.FemaleInfoCollection --master yarn-client/opt/female/FemaleInfoCollection.jar <inputPath>

l   Run the sample projects of Spark SQL (Java and Scala).

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

<inputPath> indicates the input directory in the HDFS.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.FemaleInfoCollection --master yarn-client/opt/female/FemaleInfoCollection.jar <inputPath>

l   Run the sample projects of Spark Streaming (Java and Scala).

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

note

The location of Spark Streaming Kafka dependency package on the client is different from the location of other dependency packages. For example, the path to the Spark Streaming Kafka dependency package is $SPARK_HOME/lib/streamingClient, whereas the path to other dependency packages is $SPARK_HOME/lib. When running an application, you must add the configuration option to the spark-submit command to specify the path of Spark Streaming Kafka dependency package. The following is an example path:

--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar

Example codes of the Spark Streaming Write To Print is as follows:

bin/spark-submit --master yarn-client--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar --classcom.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /opt/female/FemaleInfoCollectionPrint.jar <checkPointDir> <batchTime> <topics> <brokers>

Example codes of the Spark Streaming Write To Kafka is as follows:

bin/spark-submit --master yarn-client--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar --classcom.huawei.bigdata.spark.examples.FemaleInfoCollectionKafka /opt/female/FemaleInfoCollectionKafka.jar <checkPointDir> <batchTime> <windowTime> <topics> <brokers>

l   Run the sample projects of Accessing the Spark SQL Through JDBC (Java and Scala).

Access the Spark client directory and implement the java -cp command to run the codes.

java -cp$SPARK_HOME/lib/*:$SPARK_HOME/conf:/opt/female/ThriftServerQueriesTest.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf

note

In the preceding command line, you can choose the minimal runtime dependency package based on the sample projects. For details of the runtime dependency packages, see References.

l   Run the Spark on HBase sample application(Java and Scala).

a.         Verify that the configuration options in the Spark client configuration file spark-defaults.conf are correctly configured.

When running the Spark on HBase sample application, set the configuration option spark.hbase.obtainToken.enabled in the Spark client  configuration file spark-defaults.conf to true (The default value is false. Changing the value to true does not affect existing services. If you want to uninstall the HBase service, change the value back to false first.) and the configuration option spark.inputFormat.cache.enabled to false.

Table 1-1 Parameters

Parameter

Description

Default    Value

spark.hbase.obtainToken.enabled

Indicates whether to enable the function of obtaining the HBase token.

false

spark.inputFormat.cache.enabled

Indicates whether to cache the InputFormat that maps to HadoopRDD. If the parameter is set to true, the tasks of the same Executor use the same InputFormat object. In this case, the InputFormat must be thread-safe. If caching the InputFormat is not required, set the parameter to false.

true

 

b.         Access the Spark client directory and implement the bin/spark-submit script to run the code.

Run sample applications in the sequence: TableCreation > TableInputData > TableOutputData.

When the TableInputData sample application is running, <inputPath> needs to be specified. <inputPath>indicates the input path in the HDFS.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.TableInputData --master yarn-client/opt/female/TableInputData.jar <inputPath>

l   Run the Spark Hbase to HBase sample application(Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the code.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn-client/opt/female/FemaleInfoCollection.jar

l   Run the Spark Hive to HBase sample application(Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the code.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn-client/opt/female/FemaleInfoCollection.jar

l   Run the Spark Streaming Kafka to HBase sample application(Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the code.

When the sample application is running, specify the <checkPointDir><topic><brokerList>. <checkPointDir> indicates the directory where the application result is backed up, <topic> indicates the topic that is read from Kafka, <brokerList> indicates the IP address of the Kafka server.

note

On the client, the directory of Spark Streaming Kafka dependency package is different from the directory of other dependency packages. For example, the directory of another dependency package is $SPARK_HOME/lib and the directory of a Spark Streaming Kafka dependency package is $SPARK_HOME/lib/streamingClient. Therefore, when running the application, add the configuration option in the spark-submit command to specify the directory for the Spark Streaming Kafka dependency package, for example, --jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar.

Example code of Spark Streaming To HBase

bin/spark-submit --master yarn-client --jars$SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar --classcom.huawei.bigdata.spark.examples.streaming.SparkOnStreamingToHbase /opt/female/FemaleInfoCollectionPrint.jar <checkPointDir> <topic> <brokerList>

l   Submit the application developed in Python.

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

<inputPath> indicates the input directory in the HDFS.

note

Because the sample code does not contain the authentication information, specify the authentication information by configuring the spark.yarn.keytab and spark.yarn.principle when the application is run.

bin/spark-submit --master yarn-client --conf spark.yarn.keytab=/opt/FIclient/user.keytab --conf spark.yarn.principal=sparkuser

/opt/female/SparkPythonExample/collectFemaleInfo.py <inputPath>

----End

References

The runtime dependency packages for the sample projects of Accessing the Spark SQL Through JDBC (Java and Scala) are as follows:

l   The sample projects of Accessing the Spark SQL Through JDBC (Scala):

           avro-1.7.7.jar

           commons-collections-3.2.2.jar

           commons-configuration-1.6.jar

           commons-io-2.4.jar

           commons-lang-2.6.jar

           commons-logging-1.1.3.jar

           guava-12.0.1.jar

           hadoop-auth-2.7.2.jar

           hadoop-common-2.7.2.jar

           hadoop-mapreduce-client-core-2.7.2.jar

           hive-exec-1.2.1.spark.jar

           hive-jdbc-1.2.1.spark.jar

           hive-metastore-1.2.1.spark.jar

           hive-service-1.2.1.spark.jar

           httpclient-4.5.2.jar

           httpcore-4.4.4.jar

           libthrift-0.9.3.jar

           log4j-1.2.17.jar

           slf4j-api-1.7.10.jar

           zookeeper-3.5.1.jar

           scala-library-2.10.4.jar

l   The sample projects of Accessing the Spark SQL Through JDBC (Java):

           commons-collections-3.2.2.jar

           commons-configuration-1.6.jar

           commons-io-2.4.jar

           commons-lang-2.6.jar

           commons-logging-1.1.3.jar

           guava-2.0.1.jar

           hadoop-auth-2.7.2.jar

           hadoop-common-2.7.2.jar

           hadoop-mapreduce-client-core-2.7.2.jar

           hive-exec-1.2.1.spark.jar

           hive-jdbc-1.2.1.spark.jar

           hive-metastore-1.2.1.spark.jar

           hive-service-1.2.1.spark.jar

           httpclient-4.5.2.jar

           httpcore-4.4.4.jar

           libthrift-0.9.3.jar

           log4j-1.2.17.jar

           slf4j-api-1.7.10.jar

           zookeeper-3.5.1.jar

1.1.1.5.2 Checking the Commissioning Result

Scenario

After a Spark application is run, you can check the running result through one of the following methods:

l   Viewing the command output.

l   Logging in to the Spark WebUI.

l   Viewing Spark logs.

Procedure

l   Check the operating result data of the Spark application.

The data storage directory and format are specified by users in the Spark application. You can obtain the data in the specified file.

l   Check the status of the Spark application.

The Spark contains the following two Web UIs:

           The Spark UI displays the status of applications being executed.

The Spark UI contains the Spark Jobs, Spark Stages, Storage, Environment, and Executors parts. Besides these parts, Spark Streaming is displayed for the Streaming application.

Access to the interface: On the Web UI of the YARN, find the corresponding Spark application, and click the final column ApplicationMaster of the application information to access the Spark UI.

           The History Server UI displays the status of all Spark applications.

The History Server UI displays information such as the application ID, application name, start time, end time, execution time, and user to whom the application belongs. After the application ID is clicked, the Spark UI of the application is displayed.

l   View Spark logs to learn application running conditions.

The logs of Spark offers immediate visibility into application running conditions. You can adjust application programs based on the logs. Log related information can be referenced to Spark in the Log Description in the Administrator Guide.

 


This article contains more resources

You need to log in to download or view. No account? Register

x
  • x
  • convention:

chz
Created Sep 14, 2018 02:23:25

welcome
View more
  • x
  • convention:

user_4495775
Created Jul 7, 2022 03:42:23

note
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.
Information Protection Guide
Thanks for using Huawei Enterprise Support Community! We will help you learn how we collect, use, store and share your personal information and the rights you have in accordance with Privacy Policy and User Agreement.