Got it

Solution:Real-Time Retrieval-Case 1

Latest reply: Aug 17, 2018 02:01:23 801 1 1 0 0

1.1.1 Case 1: Public Security Service

1.1.1.1 Scenarios

Applicable Versions

FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80

Scenario Description

In a public security project, FusionInsight obtains fund flow information of bank accounts, analyzes transaction data to identify the accounts with abnormal fund flows, and further determines whether money laundering exists.

Data Planning

Data source design:

l   Hotel accommodation: Name, ID number, age, gender, hotel address, check-in time, check-out time, and companion

l   Internet access in Internet cafes: Name, ID number, age, gender, Internet cafe address, Internet access date, and duration

l   Checkpoint identity verification: Name, ID number, age, gender, checkpoint location, verification time, and travel mode (self-driving, taking vehicles, or walking)

Query:

l   Operators can quickly search for target personnel information.

Preset Data

l   Use hotel accommodation as an example: The data is separated by pipe characters (|).

Zhang San|41541|23|male|Home Inn|2018-03-02|2018-03-21|Zhang Fei

Save the prepared data to hdfs inputPath = /qlz/hotel.

l   Modify the user authentication information in the p***.properties file.

The values of userKeytabPath, krb4ConfPath, and userPrincipal must be the same as those applied for during security authentication.

l   Replace the cluster configuration files, including core-site.xml, hbase-site.xml, hdfs-site.xml, and hive-site.xml, in the resource directory.

l   Example of the p***.properties file

The following is configuration file p***.properties in the example code.

# Security authentication file path  
userKeytabPath = /opt/qlzClient/qlz_keytab/user.keytab  
krb5ConfPath = /opt/qlzClient/qlz_keytab/krb5.conf  
userPrincipal = qlz  
# Checkpoint file path  
chekPath = /qlz/chekpoint  
# Kafka configuration information  
topic=test002  
brokers = 187.5.89.47:21005,187.5.89.12:21005,187.5.89.66:21005,187.5.88.163:21005  
groupId = example-group1  
# Original data path  
inputPath = /qlz/hotel  
  
# HBase table name  
tableName=POLICE_INFO  
queryCondition = id  
port = 24002  
zkquorum = 187.5.89.12:24002,187.5.89.66:24002,187.5.89.47:24002    

1.1.1.2 Development Idea

Figure 1-1 Development idea

20180817100035517001.png

 

1.         Import in real time the hotel accommodation, Internet access information of Internet cafes, and checkpoint identity check information to Kafka.

2.         Use SparkSteaming to perform microbatch processing on Kafka data and save the cleaned data to HBase.

3.         Create indexes for the key fields in HBase information.

1.1.1.3 Example Code Description

Function Description

Codes are classified into the following:

l   main.scala.com.zr.p***.DataProducer

Read preconfigured HDFS data and insert the data into the Kafka message queue.

l   main.scala.com.zr.p***.KafkaStreaming

Process Kafka message flows in real time, clean data, and save the cleaned data to HBase and Solr.

l   main.scala.com.zr.p***.SolrSearch and main.scala.com.zr.p***.HbaseQuery

Simulate HBase query.

Example Code

Read preconfigured data from HDFS and insert the data into the Kafka queue.

The following code snippet is only an example. For details, see main.scala.com.zr.p***.DataProducer.

 // Original data file path

    val inputPath = propertie.getProperty("inputPath")

      if (inputPath.contains("hotel")) {

        val file = sc.textFile(inputPath)

        file.foreach(x => {

          println("file content: " + x)

        })

        file.foreachPartition((partisions: Iterator[String]) => {

          // Create a producer.

          val producer = new KafkaProducer[String, String](property)

          partisions.foreach(x => {

            val message = new ProducerRecord[String, String](topic.toString, "hotel", x.toString)

            // The producer writes messages to Kafka.

            try {

              producer.send(message)

              print("producer working completed")

            }

            catch {

              case e: Exception => {

                print("Task not serialized")

              }

            }

          })

          producer.close()

        })

      } else if (inputPath.contains("internet")) {

        val file = sc.textFile(inputPath)

        file.foreach(x => {

          println("file content: " + x)

        })

        file.foreachPartition((partisions: Iterator[String]) => {

          // Create a producer.

          val producer = new KafkaProducer[String, String](property)

          partisions.foreach(x => {

            val message = new ProducerRecord[String, String](topic.toString, "internet", x.toString)

            // The producer writes messages to Kafka.

            try {

              producer.send(message)

              print("producer working completed")

            }

            catch {

              case e: Exception => {

                print("Task not serialized")

              }

            }

          })

          producer.close()

        })

      } else {

        val file = sc.textFile(inputPath)

        file.foreach(x => {

          println("file content: " + x)

        })

        file.foreachPartition((partisions: Iterator[String]) => {

          // Create a producer.

          val producer = new KafkaProducer[String, String](property)

          partisions.foreach(x => {

            val message = new ProducerRecord[String, String](topic.toString, "bayonet", x.toString)

            // The producer writes messages to Kafka.

            try {

              producer.send(message)

              print("producer working completed")

            }

            catch {

              case e: Exception => {

                print("Task not serialized")

              }

            }

          })

Consume Kafka messages, clean data, and save the cleaned data to HBase and Solr.

The following code snippet is only an example. For details, see main.scala.com.zr.p***.KafkaStreaming.

    kafkaStream.foreachRDD(rdd => {

      if (!rdd.isEmpty()) {

        rdd.map(x => (x.key(), x.value())).map(s => {

          val split = s._2.split("\\|")

          var put = new Put(Bytes.toBytes(split(1)))

          // Add data to HBase.

          put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("name"), Bytes.toBytes(split(0)))

          put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("age"), Bytes.toBytes(split(2)))

          put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("name"), Bytes.toBytes(split(3)))

          put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("sex"), Bytes.toBytes(split(4)))

          if (s._1.equals("hotel")) {

            // Add indexes (name, address, and time).

            SolrSearch.addDocs(cloudSolrClient, split(0), split(5), split(6), split(1))

            // Add data to HBase.

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("hotelAddr"), Bytes.toBytes(split(5)))

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("checkInTime"), Bytes.toBytes(split(6)))

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("checkOutTime"), Bytes.toBytes(split(7)))

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("acquaintance"), Bytes.toBytes(split(8)))

            (new ImmutableBytesWritable, put)

          } else if (s._1.equals("internet")) {

            // Add indexes (name, address, and time).

            SolrSearch.addDocs(cloudSolrClient, split(0), split(5), split(6), split(1))

            // Add data to HBase.

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("barAddr"), Bytes.toBytes(split(5)))

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("internetDate"), Bytes.toBytes(split(6)))

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("timeSpent"), Bytes.toBytes(split(7)))

            (new ImmutableBytesWritable, put)

          } else {

            // Add indexes (name, address, and time).

            SolrSearch.addDocs(cloudSolrClient, split(0), split(5), split(6), split(1))

            // Add data to HBase.

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("bayonetAddr"), Bytes.toBytes(split(5)))

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("checkDate"), Bytes.toBytes(split(6)))

            put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("tripType"), Bytes.toBytes(split(7)))

            (new ImmutableBytesWritable, put)

          }

        }).saveAsHadoopDataset(jobConf)

      }

    })    

1.1.1.4 Obtaining Example Codes

Maven Project Mode

Download the code under solutions/realtimeRetrieval/RealtimeRetrieval from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

1.1.1.5 Application Commissioning

Procedure

1.         Pack codes into two JAR files: one file's main function class is producer, and the other's main function class is KafkaStreaming. Upload the files to the cluster.

note

When packing the producer code, you need to pack the kafka-clients-0.10.0.1 dependency package together. When packing the kafkaStreaming code, you need to pack the kafka-clients-0.10.0.1 and kafka_2.11-0.10.0.1 dependency packages together. Otherwise, the error message "Class Not Found" is displayed.

2.         Upload solr-solrj-6.2.0.jar and noggit-0.6.jar to a folder in the cluster, for example, /opt/spark.

3.         Run the kafkaStreaming code and then the producer code.

note

This section is used only for the preceding example. For details about how to commission Spark tasks, see the commissioning methods of the Spark component.

 


This article contains more resources

You need to log in to download or view. No account? Register

x

welcome
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.