Spark Streaming Reads Data from Kafka and Writes Data to HBase (Spark: Case 8)

Latest reply: Oct 10, 2019 02:11:30 1939 2 1 0

This post refers to the fact that Spark streaming reads data from Kafka and writes the data to HBase. Please see below for more details concerning the topic.


1.1.1 Case 8: Spark Streaming Reads Data from Kafka and Writes Data to HBase.

1.1.1.1 Scenario

Applicable Versions

FusionInsight HD V100R002C70, FusionInsight HD V100R002C80

Scenario

Assume that Kafka receives the consumption records of five users every 30 seconds in a service. HBase table1 stores users' history consumption information.

There are 10 records in table1, indicating that users whose user names are 1 to 10. All users' initial history consumption amount is 0 CNY.

Based on some service requirements, a Spark application must be developed to implement the following functions:

Calculate the consumption information of a user in real time. That is, the total amount of consumption of a user = Amount of consumption of the user (Kafka data) + Amount of history consumption (value in the table1 table) of the user, which is updated to the table1table.

Data Planning

                              Step 1     Create an HBase table and insert data.

Ensure that the JDBCServer is started. On the Spark client, use the Beeline tool to create the table1 table.

1.        Run the following command to create an HBase table named table1:

create table table1

(

key string,

cid string

)

stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'

with serdeproperties(

"hbase.columns.mapping" = ":key,

cf:cid

")

tblproperties("hbase.table.name" = "table1");

2.        Run the following command on HBase to insert data into table1:

put 'table1', '1', 'cf:cid', '0'

put 'table1', '2', 'cf:cid', '0'

put 'table1', '3', 'cf:cid', '0'

put 'table1', '4', 'cf:cid', '0'

put 'table1', '5', 'cf:cid', '0'

put 'table1', '6', 'cf:cid', '0'

put 'table1', '7', 'cf:cid', '0'

put 'table1', '8', 'cf:cid', '0'

put 'table1', '9', 'cf:cid', '0'

put 'table1', '10', 'cf:cid', '0'

                              Step 2     Store data of the Spark Streaming sample project in Kafka. Send data to the Kafka component (the Kafka permission user is required).

1.        Ensure that the clusters are installed, including HDFS, Yarn, Spark, and Kafka.

2.        Change the value of the Kafka Broker configuration parameter allow.everyone.if.no.acl.found to true.

3.        Create a topic.

{zkQuorum} indicates ZooKeeper cluster information in the IP:port format.

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}

4.        Start the Producer of Kafka to send data to Kafka.

{ClassPath} indicates the path for storing the JAR file of the project. The path is specified by users. Follow instructions in section "Compiling and Running the Application" to export the JAR file.

java -cp $SPARK_HOME/lib/*:$SPARK_HOME/lib/streamingClient/*:{ClassPath} com.huawei.bigdata.spark.examples.streaming.StreamingExampleProducer {BrokerList} {Topic}

----End

1.1.1.2 Development Guidelines

1.        Receive data from Kafka and generate the corresponding DStream.

2.        Filter and analyze data.

3.        Find the corresponding record in the HBase table.

4.        Calculate the result and write the result to the HBase table.

1.1.1.3 Sample Code Description

1.1.1.3.1 Java Code Example

Function

In Spark applications, the Spark Streaming invokes the Kafka interface to obtain data, analyzes the data, finds the corresponding HBase table records, and writes the records to the HBase table.

Sample Code

The following code snippets are used as an example. For complete code, see com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase.

/**

* Run the Spark Streaming task, read data from the hbase table1 table based on the value, perform operations on the data, and update the data to the hbase table1 table.

 */

public class SparkOnStreamingToHbase {

  public static void main(String[] args) throws Exception {

    if (args.length < 3) {

      printUsage();

    }

    String checkPointDir = args[0];

    String topics = args[1];

    final String brokers = args[2];

    Duration batchDuration = Durations.seconds(5);

    SparkConf sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase");

    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, batchDuration);

// Set the CheckPointdirectory of Spark Streaming.

    if (!"nocp".equals(checkPointDir)) {

      jssc.checkpoint(checkPointDir);

    }

  final String columnFamily = "cf";

    HashMap<String, String> kafkaParams = new HashMap<String, String>();

    kafkaParams.put("metadata.broker.list", brokers);

    String[] topicArr = topics.split(",");

    Set<String> topicSet = new HashSet<String>(Arrays.asList(topicArr));

// Create kafka stream using brokers and topics.

// Receive data from the Kafka and generate the corresponding DStream.

    JavaDStream<String> lines = KafkaUtils.createDirectStream(jssc, String.class, String.class,

      StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).map(

      new Function<Tuple2<String, String>, String>() {

        public String call(Tuple2<String, String> tuple2) {

// map (_._1) is the key of the message, and map (_._2) is the value of the message.

          return tuple2._2();

        }

      }

    );

    lines.foreachRDD(

      new Function<JavaRDD<String>, Void>() {

        public Void call(JavaRDD<String> rdd) throws Exception {

          rdd.foreachPartition(

            new VoidFunction<Iterator<String>>() {

              public void call(Iterator<String> iterator) throws Exception {

                hBaseWriter(iterator, columnFamily);

              }

            }

          );

          return null;

        }

      }

    );

    jssc.start();

    jssc.awaitTermination();

  }

  /**

* Write data to the executor.

* @param iterator message

 * @param columnFamily

 */

  private static void hBaseWriter(Iterator<String> iterator, String columnFamily) throws IOException {

    Configuration conf = HBaseConfiguration.create();

    Connection connection = null;

    Table table = null;

    try {

      connection = ConnectionFactory.createConnection(conf);

      table = connection.getTable(TableName.valueOf("table1"));

      List<Get> rowList = new ArrayList<Get>();

      while (iterator.hasNext()) {

        Get get = new Get(iterator.next().getBytes());

        rowList.add(get);

      }

      // Obtain data from table1.

      Result[] resultDataBuffer = table.get(rowList);

      // Configure data of table1.

      List<Put> putList = new ArrayList<Put>();

      for (int i = 0; i < resultDataBuffer.length; i++) {

        String row = new String(rowList.get(i).getRow());

        Result resultData = resultDataBuffer[i];

        if (!resultData.isEmpty()) {

          // Obtain the old value based on the cluster family and cloumn.

          String aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));

          Put put = new Put(Bytes.toBytes(row));

          // Calculation reslut

          int resultValue = Integer.valueOf(row) + Integer.valueOf(aCid);

          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue)));

          putList.add(put);

        }

      }

      if (putList.size() > 0) {

        table.put(putList);

      }

    } catch (IOException e) {

      e.printStackTrace();

    } finally {

      if (table != null) {

        try {

          table.close();

        } catch (IOException e) {

          e.printStackTrace();

        }

      }

      if (connection != null) {

        try {

          // Close the Hbase connection.

          connection.close();

        } catch (IOException e) {

          e.printStackTrace();

        }

      }

    }

  }

  private static void printUsage() {

    System.out.println("Usage: {checkPointDir} {topic} {brokerList}");

    System.exit(1);

  }

}

1.1.1.3.2 Scala Code Example

Function

In Spark applications, the Spark Streaming invokes the Kafka interface to obtain data, analyzes the data, finds the corresponding HBase table records, and writes the records to the HBase table.

Sample Code

The following code snippets are used as an example. For complete code, see com.huawei.bigdata.spark.examples.SparkOnStreamingToHbase.

/**

* Run the Spark Streaming task, read data from the hbase table1 table based on the value, perform operations on the data, and update the data to the hbase table1 table.

  */

object SparkOnStreamingToHbase {

  def main(args: Array[String]) {

    if (args.length < 3) {

      printUsage

    }

    val Array(checkPointDir, topics, brokers) = args

    val sparkConf = new SparkConf().setAppName("SparkOnStreamingToHbase")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

// Set the CheckPointdirectory of Spark Streaming.

    if (!"nocp".equals(checkPointDir)) {

      ssc.checkpoint(checkPointDir)

    }

    val columnFamily = "cf"

    val kafkaParams = Map[String, String](

      "metadata.broker.list" -> brokers

    )

    val topicArr = topics.split(",")

    val topicSet = topicArr.toSet

// map (_._1) is the key of the message, and map (_._2) is the value of the message.

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet).map(_._2)

    lines.foreachRDD(rdd => {

//Partition runs on the executor.

      rdd.foreachPartition(iterator => hBaseWriter(iterator, columnFamily))

    })

    ssc.start()

    ssc.awaitTermination()

  }

  /**

* Write data to the executor.

* @param iterator message

 * @param columnFamily

 */

  def hBaseWriter(iterator: Iterator[String], columnFamily: String): Unit = {

    val conf = HBaseConfiguration.create()

    var table: Table = null

    var connection: Connection = null

    try {

      connection = ConnectionFactory.createConnection(conf)

      table = connection.getTable(TableName.valueOf("table1"))

  val iteratorArray = iterator.toArray

      val rowList = new util.ArrayList[Get]()

      for (row <- iteratorArray) {

        val get = new Get(row.getBytes)

        rowList.add(get)

      }

// Obtain table1 data.

      val resultDataBuffer = table.get(rowList)

// Set the table1 data.

      val putList = new util.ArrayList[Put]()

      for (i <- 0 until iteratorArray.size) {

        val row = iteratorArray(i)

        val resultData = resultDataBuffer(i)

        if (!resultData.isEmpty) {

        // Obtain the old value based on the column family and column.//

          val aCid = Bytes.toString(resultData.getValue(columnFamily.getBytes, "cid".getBytes))

          val put = new Put(Bytes.toBytes(row))

          // Calculation result //

          val resultValue = row.toInt + aCid.toInt

          put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(resultValue.toString))

          putList.add(put)

        }

      }

      if (putList.size() > 0) {

        table.put(putList)

      }

    } catch {

      case e: IOException =>

        e.printStackTrace();

    } finally {

      if (table != null) {

        try {

          table.close()

        } catch {

          case e: IOException =>

            e.printStackTrace();

        }

      }

      if (connection != null) {

        try {

          // Close the HBase connection.//

          connection.close()

        } catch {

          case e: IOException =>

            e.printStackTrace()

        }

      }

    }

  }

  private def printUsage {

    System.out.println("Usage: {checkPointDir} {topic} {brokerList}")

    System.exit(1)

  }

}

1.1.1.4 Obtaining Sample Code

Using the FusionInsight Client

Obtain the sample project in the sampleCodedirectory in the Spark2x directory in the FusionInsight_Services_ClientConfigfile extracted from the client.

Security mode: SparkStreamingKafka010JavaExample and SparkStreamingKafka010ScalaExample in thespark-examples-security directory

Non-security mode: SparkStreamingKafka010JavaExample and SparkStreamingKafka010ScalaExample in thespark-examples-normal directory

Using the Maven Project

log in to Huawei DevClod (https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home) to download code udner to local PC.

Security mode:

components/spark/spark-examples-security/SparkJavaExample

components/spark/spark-examples-security/SparkScalaExample

Non-security mode:

components/spark/spark-examples-normal/SparkJavaExample

components/spark/spark-examples-normal/SparkScalaExample

1.1.1.5 Application Commissioning

1.1.1.5.1 Compiling and Running the Application

Scenario

After the program codes are developed, you can upload the codes to the Linux client for running. The running procedures of applications developed in Scala or Java are the same.

note

l The Spark application can run only in the Linux environment but not in the Windows environment.

l The Spark application developed in Python does not need to build Artifacts as a jar. You just need to copy the sample projects to the compiler.

l It is needed to ensure that the version of Python installed on the worker and driver is consistent, otherwise the following error will be reported: "Python in worker has different version %s than that in driver %s."

Procedure

                              Step 1     In the IntelliJ IDEA, configure the Artifacts information about the project before the jar is created.

1.        On the main page of the IDEA, choose File> Project Structures... to enter the Project Structure page.

2.        On the Project Structure page, select Artifacts, click + and choose Jar > From modules with dependencies....

Figure 1-1 Adding the Artifacts

093112itn3337c6wnbdrwu.png

 

3.        Select the corresponding module. The module corresponding to the Java sample projects is CollectFemaleInfo. Click OK.

Figure 1-2 Create Jar from Modules

093113j74lmtj8z7mcovrc.png

 

4.        Configure the name, type and output directory of the Jar based on the actual condition.

Figure 1-3 Configuring the basic information

093114h6i5znhtgiv4999p.png

 

5.        Right-click CollectFemaleInfo, choose Put into Output Root, and click Apply.

Figure 1-4 Put into Output Root

093115la2tr3nwtn***3w2.png

 

6.        Click OK.

                              Step 2     Create the jar.

1.        On the main page of the IDEA, choose Build> Build Artifacts....

Figure 1-5 Build Artifacts

093116qgpnnl5ckcjczspj.png

 

2.        On the displayed menu, choose CollectFemaleInfo> Build to create a jar.

Figure 1-6 Build

093117xljk5ykgm5kdycm5.png

 

3.        If the following information is displayed in the event log, the jar is created successfully. You can obtain the jar from the directory configured in Step 1.4.

21:25:43 Compilation completed successfully in 36 sec

                              Step 3     Copy the jar created in Step 2 to the Spark running environment (Spark client), such as /opt/hadoopclient/Spark to run the Spark application.

 

Notice

When a Spark task is running, it is prohibited to restart the HDFS service or restart all DataNode instances. Otherwise, the Spark task may fail, resulting in JobHistory data loss.

l  Run the sample projects of Spark Core(including Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

<inputPath> indicates the input directory in the HDFS.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.FemaleInfoCollection --master yarn-client/opt/female/FemaleInfoCollection.jar <inputPath>

l  Run the sample projects of Spark SQL (Java and Scala).

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

<inputPath> indicates the input directory in the HDFS.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.FemaleInfoCollection --master yarn-client/opt/female/FemaleInfoCollection.jar <inputPath>

l  Run the sample projects of Spark Streaming (Java and Scala).

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

note

The location of Spark Streaming Kafka dependency package on the client is different from the location of other dependency packages. For example, the path to the Spark Streaming Kafka dependency package is $SPARK_HOME/lib/streamingClient, whereas the path to other dependency packages is $SPARK_HOME/lib. When running an application, you must add the configuration option to the spark-submit command to specify the path of Spark Streaming Kafka dependency package. The following is an example path:

--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar

Example codes of the Spark Streaming Write To Print is as follows:

bin/spark-submit --master yarn-client--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar--classcom.huawei.bigdata.spark.examples.FemaleInfoCollectionPrint /opt/female/FemaleInfoCollectionPrint.jar <checkPointDir> <batchTime> <topics> <brokers>

Example codes of the Spark Streaming Write To Kafka is as follows:

bin/spark-submit --master yarn-client--jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar--classcom.huawei.bigdata.spark.examples.FemaleInfoCollectionKafka /opt/female/FemaleInfoCollectionKafka.jar <checkPointDir> <batchTime> <windowTime> <topics> <brokers>

l  Run the sample projects of Accessing the Spark SQL Through JDBC (Java and Scala).

Access the Spark client directory and implement the java -cp command to run the codes.

java -cp$SPARK_HOME/lib/*:$SPARK_HOME/conf:/opt/female/ThriftServerQueriesTest.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf

note

In the preceding command line, you can choose the minimal runtime dependency package based on the sample projects. For details of the runtime dependency packages, see References.

l  Run the Spark on HBase sample application(Java and Scala).

a.        Verify that the configuration options in the Spark client configuration file spark-defaults.conf are correctly configured.

When running the Spark on HBase sample application, set the configuration option spark.hbase.obtainToken.enabledin the Spark client configuration file spark-defaults.conf to true(The default value is false. Changing the value to true does not affect existing services. If you want to uninstall the HBase service, change the value back to false first. Set the configuration option spark.inputFormat.cache.enabledto false.

Table 1-1Parameters

Parameter

Description

Default    Value

spark.hbase.obtainToken.enabled

Indicates whether to enable the function of obtaining the HBase token.

false

spark.inputFormat.cache.enabled

Indicates whether to cache the InputFormat that maps to HadoopRDD. If the parameter is set to true, the tasks of the same Executor use the same InputFormat object. In this case, the InputFormat must be thread-safe. If caching the InputFormat is not required, set the parameter to false.

true

 

b.        Access the Spark client directory and implement the bin/spark-submit script to run the code.

Run sample applications in the sequence: TableCreation > TableInputData > TableOutputData.

When the TableInputData sample application is running, <inputPath> needs to be specified. <inputPath>indicates the input path in the HDFS.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.TableInputData --master yarn-client/opt/female/TableInputData.jar <inputPath>

l  Run the Spark Hbase to HBase sample application(Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the code.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn-client/opt/female/FemaleInfoCollection.jar

l  Run the Spark Hive to HBase sample application(Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the code.

bin/spark-submit --classcom.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn-client/opt/female/FemaleInfoCollection.jar

l  Run the Spark Streaming Kafka to HBasesample application(Scala and Java).

Access the Spark client directory and implement the bin/spark-submit script to run the code.

When the sample application is running, specify the <checkPointDir><topic><brokerList>. <checkPointDir>indicates the directory where the application result is backed up, <topic>indicates the topic that is read from Kafka, <brokerList>indicates the IP address of the Kafka server.

note

On the client, the directory of Spark Streaming Kafka dependency package is different from the directory of other dependency packages. For example, the directory of another dependency package is $SPARK_HOME/lib and the directory of a Spark Streaming Kafka dependency package is $SPARK_HOME/lib/streamingClient. Therefore, when running the application, add the configuration option in the spark-submitcommand to specify the directory for the Spark Streaming Kafka dependency package, for example, --jars $SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar.

Example code of Spark Streaming To HBase

bin/spark-submit --master yarn-client --jars$SPARK_HOME/lib/streamingClient/kafka-clients-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/kafka_2.10-0.8.2.1.jar,$SPARK_HOME/lib/streamingClient/spark-streaming-kafka_2.10-1.5.1.jar --classcom.huawei.bigdata.spark.examples.streaming.SparkOnStreamingToHbase /opt/female/FemaleInfoCollectionPrint.jar <checkPointDir> <topic> <brokerList>

l  Submit the application developed in Python.

Access the Spark client directory and implement the bin/spark-submit script to run the codes.

<inputPath> indicates the input directory in the HDFS.

note

Because the sample code does not contain the authentication information, specify the authentication information by configuring the spark.yarn.keytab and spark.yarn.principle when the application is run.

bin/spark-submit --master yarn-client --conf spark.yarn.keytab=/opt/FIclient/user.keytab --conf spark.yarn.principal=sparkuser/opt/female/SparkPythonExample/collectFemaleInfo.py <inputPath>

----End

References

The runtime dependency packages for the sample projects of Accessing the Spark SQL Through JDBC (Java and Scala) are as follows:

l  The sample projects of Accessing the Spark SQL Through JDBC (Scala):

          avro-1.7.7.jar

          commons-collections-3.2.2.jar

          commons-configuration-1.6.jar

          commons-io-2.4.jar

          commons-lang-2.6.jar

          commons-logging-1.1.3.jar

          guava-12.0.1.jar

          hadoop-auth-2.7.2.jar

          hadoop-common-2.7.2.jar

          hadoop-mapreduce-client-core-2.7.2.jar

          hive-exec-1.2.1.spark.jar

          hive-jdbc-1.2.1.spark.jar

          hive-metastore-1.2.1.spark.jar

          hive-service-1.2.1.spark.jar

          httpclient-4.5.2.jar

          httpcore-4.4.4.jar

          libthrift-0.9.3.jar

          log4j-1.2.17.jar

          slf4j-api-1.7.10.jar

          zookeeper-3.5.1.jar

          scala-library-2.10.4.jar

l  The sample projects of Accessing the Spark SQL Through JDBC (Java):

          commons-collections-3.2.2.jar

          commons-configuration-1.6.jar

          commons-io-2.4.jar

          commons-lang-2.6.jar

          commons-logging-1.1.3.jar

          guava-2.0.1.jar

          hadoop-auth-2.7.2.jar

          hadoop-common-2.7.2.jar

          hadoop-mapreduce-client-core-2.7.2.jar

          hive-exec-1.2.1.spark.jar

          hive-jdbc-1.2.1.spark.jar

          hive-metastore-1.2.1.spark.jar

          hive-service-1.2.1.spark.jar

          httpclient-4.5.2.jar

          httpcore-4.4.4.jar

          libthrift-0.9.3.jar

          log4j-1.2.17.jar

          slf4j-api-1.7.10.jar

          zookeeper-3.5.1.jar

1.1.1.5.2 Checking the Commissioning Result

Scenario

After a Spark application is run, you can check the running result through one of the following methods:

l  Viewing the command output.

l  Logging in to the Spark WebUI.

l  Viewing Spark logs.

Procedure

l  Check the operating result data of the Spark application.

The data storage directory and format are specified by users in the Spark application. You can obtain the data in the specified file.

l  Check the status of the Spark application.

The Spark contains the following two Web UIs:

          The Spark UI displays the status of applications being executed.

The Spark UI contains the Spark Jobs, Spark Stages, Storage, Environment, and Executorsparts. Besides these parts, Spark Streaming is displayed for the Streaming application.

Access to the interface: On the Web UI of the YARN, find the corresponding Spark application, and click the final column ApplicationMaster of the application information to access the Spark UI.

          The History Server UI displays the status of all Spark applications.

The History Server UI displays information such as the application ID, application name, start time, end time, execution time, and user to whom the application belongs. After the application ID is clicked, the Spark UI of the application is displayed.

l  View Spark logs to learn application running conditions.

The logs of Spark offers immediate visibility into application running conditions. You can adjust application programs based on the logs. Log related information can be referenced to Spark in the Log Description in the Administrator Guide.


This article contains more resources

You need to log in to download or view. No account?Register

x
  • x
  • convention:

chz
Created Oct 12, 2018 01:32:25 Helpful(0) Helpful(0)

Spark Streaming Reads Data from Kafka and Writes Data to HBase (Spark: Case 8)-2775451-1
  • x
  • convention:

songminwang
Admin Created 5 days ago Helpful(0) Helpful(0)

Very useful!
  • x
  • convention:

Reply

Reply
You need to log in to reply to the post Login | Register

Notice Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " Privacy."
If the attachment button is not available, update the Adobe Flash Player to the latest version!
Login and enjoy all the member benefits

Login and enjoy all the member benefits

Login