Got it

Solution:Real-Time Processing-Case 1

Latest reply: Aug 10, 2018 02:04:33 717 1 0 0 0

1.1.1 Case 1: Policing Surveillance

1.1.1.1 Scenarios

Applicable Versions

FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80

Scenario Description

The surveillance system of a public security project records suspect information in advance, and monitors in real time hotel accommodation and Internet access information of Internet cafes. If suspect tracks are found, the system immediately reports the tracks. In this example, the suspect tracks are saved to HBase, and the system queries HBase to simulate alarm generation.

Data Planning

Data source design:

l   Hotel accommodation: Name, ID number, age, gender, hotel address, check-in time, check-out time, and companion

l   Internet access in Internet cafes: Name, ID number, age, gender, Internet cafe address, Internet access date, and duration

l   Checkpoint identity verification: Name, ID number, age, gender, checkpoint location, verification time, and travel mode (self-driving, taking vehicles, or walking)

Query:

l   Operators can query the information in real time.

Preset Data

l   Hotel accommodation information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are saved to the hotelDataPath.txt file. The file directory must be the same as the value of hotelDataPath in the ParamsConf.properties file.

Zhang San,41541,23,male,Home Inn,2018-03-02,2018-03-21,Zhang Fei

Li Si,45641,21,male,Hotel Roma,2018-03-06,2018-03-12,Qi Luzhong

Liu Tao,21525,23,female,Home Inn,2018-02-02,2018-02-21,Qiao Fei

l   Internet access information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are saved to the NetData.txt file. The file directory must be the same as the value of hotelDataPath in the ParamsConf.properties file.

Ouyang Xiao,32641,23,male,M9 Internet Cafe,2018-03-22,5

Liu Peng,59684,23,male,Fu Shang Internet Cafe,2018-03-25,4

Li Jing,41563,22,female,Mangrove Internet Cafe,2018-04-02,3

l   Checkpoint information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are saved to the checkDataPath.txt file. The file directory must be the same as the value of hotelDataPath in the ParamsConf.properties file.

Liu Yang,55632,23,male,subway station,2018-03-22,walking

Sun Jian,84554,23,male,highway toll station,2018-03-25,self-driving

Lin Yuner,41151,22,female,airport security check gate,2018-04-02,taking vehicles

l   Surveillance information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are loaded to the controldata table of Hive.

548485,2018-01-02,ON,2018-01-02,2018-02-05,41541,Lin Tao

l   Modify the user authentication information in the ParamsConf.properties file.

The values of userKeytabPath, krb4ConfPath, and userPrincipal must be the same as those applied for during security authentication.

l   Replace the cluster configuration files, including core-site.xml, hbase-site.xml, hdfs-site.xml, and hive-site.xml, in the resource directory.

l   Example of the ParamsConf.properties file

The following is configuration file ParamsConf.properties in the example code.

#checkPoint=/user/test001/checkPoint  
groupId=example-group1  
topics=hotel,net,check  
brokers = 187.5.89.47:21005,187.5.89.66:21005,187.5.89.12:21005  
hotelDataPath=/user/test001/HotelData.txt  
netDataPath = /user/test001/NetData.txt  
checkDataPath=/user/test001/CheckData.txt  
zk.quorum=187.5.89.12:24002,187.5.89.66:24002,187.5.89.47:24002  
zk.port=24002  
#Security authentication#user name userPrincipal=test001  
#keytab file path userKeytabPath=/opt/bigdataClient/user.keytab  
#krb5 file path krb5ConfPath=/opt/bigdataClient/krb5.conf

1.1.1.2 Development Idea

Figure 1-1 Development idea

20180810100255531001.png

 

1.         Import the hotel accommodation, Internet access information of Internet cafes, and checkpoint identity check information to Kafka.

2.         Use SparkSteaming to perform microbatch processing on Kafka data, filter personnel information based on Hive data, and insert the data that meets conditions into HBase.

3.         Implement real-time query based on HBase data.

1.1.1.3 Example Code Description

Function Description

Codes are classified into the following:

l   com.huawei.hbase.client.DataKafkaProducer

Read preconfigured HDFS data and insert the data into the Kafka message queue.

l   com.huawei.hbase.client.SparkStreamingToHbase

Process Kafka message flows in real time, clean data, and save the cleaned data to HBase.

l   com.huawei.hbase.client.HbaseQueryByID

Simulate HBase query.

Example Code

Read preconfigured data from HDFS and insert the data into the Kafka queue.

The following code snippet is only an example. For details, see com.huawei.hbase.client.DataKafkaProducer.

package com.huawei.hbase.client

import java.util.Properties

import java.util.logging.Logger

import com.huawei.hadoop.security.LoginUtil

import org.apache.hadoop.conf.Configuration

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.apache.spark.{SparkConf, SparkContext}

object DataKafkaProducer {

  def main(args: Array[String]): Unit = {

    val log = Logger.getLogger("test")

    val propertie = new Properties()

    val in = this.getClass.getClassLoader().getResourceAsStream("ParamsConf.properties")

    propertie.load(in)

    // Security authentication

    val userPrincipal = propertie.getProperty("userPrincipal")

    val userKeytabPath = propertie.getProperty("userKeytabPath")

    val krb5ConfPath = propertie.getProperty("krb5ConfPath")

    val hadoopConf: Configuration = new Configuration()

    LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)

    log.info("slw---the kerbros is successful")

    val conf = new SparkConf().setAppName("kafka-producer")

    val sc = new SparkContext(conf)

    //defined topic

    log.info("slw---begin to load topic")

    val topics = propertie.getProperty("topics")

    val hotelTopic = (topics.split(","))(0)

    val netTopic = (topics.split(","))(1)

    val gateTopic = (topics.split(","))(2)

    log.info("slw---begin to load brokers")

    val brokers = propertie.getProperty("brokers")

    //println(brokers.toString)

    // Configure the Kafka producer.

    //property.put("acks","0")

    propertie.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)

    propertie.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

    propertie.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")

    // Load the file path.

    log.info("slw---begin to load file path")

    val hotelInputPath = propertie.getProperty("hotelDataPath")

    val netInputPath = propertie.getProperty("netDataPath")

    val checkInputPath = propertie.getProperty("checkDataPath")

      // Spark data entry. Return to the RDD.

      val hotelFile = sc.textFile(hotelInputPath)

      val netFile = sc.textFile(netInputPath)

      val checkFile = sc.textFile(checkInputPath)

      //println(hotelFile.foreach( line => )

      //    print(hotelFile.collect())

      // Define the information source as the key so that data can be extracted from the corresponding topic.

      hotelFile.foreachPartition(lines => {

        val producer = new KafkaProducer[String,String](propertie)

        while (lines.hasNext){

          var  line = lines.next()

          println(line)

          println("Start sending--hotel--data")

          val message = new ProducerRecord[String,String](hotelTopic,"hotel",line)

          producer.send(message)

          println("Sending succeeded")

        }

        producer.close()

      })

      netFile.foreachPartition(lines => {

        val producer = new KafkaProducer[String,String](propertie)

        while (lines.hasNext){

          var  line = lines.next()

          println(line)

          println("Start sending--netBar--data")

          val message = new ProducerRecord[String,String](netTopic,"netBar",line)

          producer.send(message)

          println("Sending succeeded")

        }

        producer.close()

      })

      checkFile.foreachPartition(lines => {

        val producer = new KafkaProducer[String,String](propertie)

        while (lines.hasNext){

          var  line = lines.next()

          println(line)

          println("Start sending--check--data")

          val message = new ProducerRecord[String,String](gateTopic,"check",line)

          producer.send(message)

          println("Sending succeeded")

        }

        producer.close()

      })

        sc.stop()

      }

}    

Consume Kafka messages, clean data, and save the cleaned data to HBase.

The following code snippet is only an example. For details, see com.huawei.hbase.client.SparkStreamingToHbase.

    val ssc = new StreamingContext(sparkConf,Seconds(2))

    // Load the chenkpoint directory.

    ssc.checkpoint(checkPoint)

    //println(checkPoint.toString)

    val brokers = property.getProperty("brokers")

    val kafkaParams = Map[String,String]("bootstrap.servers" -> brokers

      ,"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

      "group.id" -> groupId)

    val topic = property.getProperty("topics").split(",").toSet

    val locationStrategy = LocationStrategies.PreferConsistent

    val consumerStrategies = ConsumerStrategies.Subscribe[String, String](topic, kafkaParams)

    val lines = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategies)

    // Obtain the information about the suspects.

    val hiveContext = SparkSession.builder().appName("gg").enableHiveSupport().getOrCreate()

    import hiveContext.implicits._

    hiveContext.sql("use test001")

    val controlDataSet = hiveContext.sql("SELECT * FROM controldata")

    val pp = controlDataSet.map(x => (x(5).toString,(x(0).toString,x(1).toString,x(2).toString,x(3).toString,x(4).toString,x(6).toString))).rdd

    //val pp = controlDataSet.toString().split(",")

    // Check the two types of data. If the data meets the surveillance conditions, save the data to HBase.

    lines.foreachRDD(rdd => {

      val key = rdd.map( x => x.key())

      // Obtain each RDD's key from the sending data and determine the data type.

      if (key.equals("hotel")){

        // Map data in key-value format.

        val value = rdd.map(v => v.value().split(","))

        val hrdd = value.map(h => (h(1),(h(0),h(2),h(3),h(4),h(5),h(6),h(7))))

        // Join the surveillance data and suspects. Filter out the unmatched data.

        val joinData = (hrdd.join(pp)).map(x =>

          (x._2._1._1,x._1,x._2._1._2,x._2._1._3,

          x._2._1._4,x._2._1._5,x._2._1._6,x._2._1._7))

        // Traverse the joined data and write the data into HBase.

        joinData.foreachPartition( j=> {

          val logger = Logger.getLogger("event-alarm")

          val myConf = HBaseConfiguration.create()

          val tableName = "ControlInfo"

          val zkQuorun = property.getProperty("zk.quorum")

          val zkPort = property.getProperty("zk.port")

          myConf.set("hbase.zookeeper.quorum",zkQuorun)

          myConf.set("hbase.zookeeper.property.clientPort",zkPort)

          myConf.set("hbase.defaults.for.version.skip","true")

          val myTable = new HTable(myConf,TableName.valueOf(tableName))

          myTable.setAutoFlush(false,false)

          myTable.setWriteBufferSize(3*1024*1024)

          while (j.hasNext){

            var s = j.next()

            val p = new Put(Bytes.toBytes(s._2))

            p.add("hotel".getBytes, "name".getBytes, Bytes.toBytes(s._1))

            p.add("hotel".getBytes, "identId".getBytes, Bytes.toBytes(s._2))

            p.add("hotel".getBytes, "age".getBytes, Bytes.toBytes(s._3))

            p.add("hotel".getBytes, "sex".getBytes, Bytes.toBytes(s._4))

            p.add("hotel".getBytes, "hotelAddr".getBytes, Bytes.toBytes(s._5))

            p.add("hotel".getBytes, "startTime".getBytes, Bytes.toBytes(s._6))

            p.add("hotel".getBytes, "leaveTime".getBytes, Bytes.toBytes(s._7))

            p.add("hotel".getBytes, "togetherPer".getBytes, Bytes.toBytes(s._8))

            myTable.put(p)

          }

          myTable.flushCommits()

          myTable.close()

        })

      }

      if (key.equals("netBar")){

        val value = rdd.map(v => v.value().split(","))

        val hrdd = value.map(h => (h(1),(h(0),h(2),h(3),h(4),h(5),h(6))))

        val joinData = (hrdd.join(pp)).map(x =>

          (x._2._1._1,x._1,x._2._1._2,x._2._1._3,

            x._2._1._4,x._2._1._5,x._2._1._6))

        joinData.foreachPartition( j=> {

        val logger = Logger.getLogger("event-alarm")

          val myConf = HBaseConfiguration.create()

          val tableName = "ControlInfo"

          val zkQuorun = property.getProperty("zk.quorum")

        val zkPort = property.getProperty("zk.port")

          myConf.set("hbase.zookeeper.quorum",zkQuorun)

          myConf.set("hbase.zookeeper.property.clientPort",zkPort)

          myConf.set("hbase.defaults.for.version.skip","true")

          val myTable = new HTable(myConf,TableName.valueOf(tableName))

          myTable.setAutoFlush(false,false)

          myTable.setWriteBufferSize(3*1024*1024)

          while (j.hasNext){

            var s = j.next()

            val p = new Put(Bytes.toBytes(s._2))

            p.add("netBar".getBytes, "name".getBytes, Bytes.toBytes(s._1))

            p.add("netBar".getBytes, "identId".getBytes, Bytes.toBytes(s._2))

            p.add("netBar".getBytes, "age".getBytes, Bytes.toBytes(s._3))

            p.add("netBar".getBytes, "sex".getBytes, Bytes.toBytes(s._4))

            p.add("netBar".getBytes, "netBarAddr".getBytes, Bytes.toBytes(s._5))

            p.add("netBar".getBytes, "netDate".getBytes, Bytes.toBytes(s._6))

            p.add("netBar".getBytes, "netTime".getBytes, Bytes.toBytes(s._7))

          myTable.put(p)

          }

          myTable.flushCommits()

          myTable.close()

        })

      }

      if (key.equals("check")){

        val value = rdd.map(v => v.value().split(","))

        val hrdd = value.map(h => (h(1),(h(0),h(2),h(3),h(4),h(5),h(6))))

        val joinData = (hrdd.join(pp)).map(x =>

          (x._2._1._1,x._1,x._2._1._2,x._2._1._3,

            x._2._1._4,x._2._1._5,x._2._1._6))

        joinData.foreachPartition( j=> {

          val logger = Logger.getLogger("event-alarm")

          val myConf = HBaseConfiguration.create()

          val tableName = "ControlInfo"

          val zkQuorun = property.getProperty("zk.quorum")

          val zkPort = property.getProperty("zk.port")

          myConf.set("hbase.zookeeper.quorum",zkQuorun)

          myConf.set("hbase.zookeeper.property.clientPort",zkPort)

          myConf.set("hbase.defaults.for.version.skip","true")

          val myTable = new HTable(myConf,TableName.valueOf(tableName))

          myTable.setAutoFlush(false,false)

          myTable.setWriteBufferSize(3*1024*1024)

          while (j.hasNext){

            var s = j.next()

            val p = new Put(Bytes.toBytes(s._2))

            p.add("gateCheck".getBytes, "name".getBytes, Bytes.toBytes(s._1))

            p.add("gateCheck".getBytes, "identId".getBytes, Bytes.toBytes(s._2))

            p.add("gateCheck".getBytes, "age".getBytes, Bytes.toBytes(s._3))

            p.add("gateCheck".getBytes, "sex".getBytes, Bytes.toBytes(s._4))

            p.add("gateCheck".getBytes, "checkLocation".getBytes, Bytes.toBytes(s._5))

            p.add("gateCheck".getBytes, "checkTime".getBytes, Bytes.toBytes(s._6))

            p.add("gateCheck".getBytes, "outType".getBytes, Bytes.toBytes(s._7))

            myTable.put(p)

          }

          myTable.flushCommits()

          myTable.close()

        })

      }

    })    

1.1.1.4 Obtaining Example Codes

Maven Project Mode

Download the code under solutions/realtimeProcessing/RealTimeProcessingScala from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

1.1.1.5 Application Commissioning

Procedure

Pack the com.huawei.hbase.client.DataKafkaProducer and com.huawei.hbase.client.SparkStreamingToHbase main classes and upload the packages to the cluster. Run the steaming main class to monitor data because it is streaming computing. Then start the producer. If the producer successfully sends data, the streaming obtains the monitored data and processes it.

note

This section is used only for the preceding example. For details about how to commission Spark tasks, see the commissioning methods of the Spark component.

 


This article contains more resources

You need to log in to download or view. No account? Register

x

welcome
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.