1.1.1 Case 1: Policing Surveillance
1.1.1.1 Scenarios
Applicable Versions
FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80
Scenario Description
The surveillance system of a public security project records suspect information in advance, and monitors in real time hotel accommodation and Internet access information of Internet cafes. If suspect tracks are found, the system immediately reports the tracks. In this example, the suspect tracks are saved to HBase, and the system queries HBase to simulate alarm generation.
Data Planning
Data source design:
l Hotel accommodation: Name, ID number, age, gender, hotel address, check-in time, check-out time, and companion
l Internet access in Internet cafes: Name, ID number, age, gender, Internet cafe address, Internet access date, and duration
l Checkpoint identity verification: Name, ID number, age, gender, checkpoint location, verification time, and travel mode (self-driving, taking vehicles, or walking)
Query:
l Operators can query the information in real time.
Preset Data
l Hotel accommodation information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are saved to the hotelDataPath.txt file. The file directory must be the same as the value of hotelDataPath in the ParamsConf.properties file.
Zhang San,41541,23,male,Home Inn,2018-03-02,2018-03-21,Zhang Fei
Li Si,45641,21,male,Hotel Roma,2018-03-06,2018-03-12,Qi Luzhong
Liu Tao,21525,23,female,Home Inn,2018-02-02,2018-02-21,Qiao Fei
l Internet access information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are saved to the NetData.txt file. The file directory must be the same as the value of hotelDataPath in the ParamsConf.properties file.
Ouyang Xiao,32641,23,male,M9 Internet Cafe,2018-03-22,5
Liu Peng,59684,23,male,Fu Shang Internet Cafe,2018-03-25,4
Li Jing,41563,22,female,Mangrove Internet Cafe,2018-04-02,3
l Checkpoint information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are saved to the checkDataPath.txt file. The file directory must be the same as the value of hotelDataPath in the ParamsConf.properties file.
Liu Yang,55632,23,male,subway station,2018-03-22,walking
Sun Jian,84554,23,male,highway toll station,2018-03-25,self-driving
Lin Yuner,41151,22,female,airport security check gate,2018-04-02,taking vehicles
l Surveillance information: Use commas (,) to separate each part in a record. Each line contains only one record. The records are loaded to the controldata table of Hive.
548485,2018-01-02,ON,2018-01-02,2018-02-05,41541,Lin Tao
l Modify the user authentication information in the ParamsConf.properties file.
The values of userKeytabPath, krb4ConfPath, and userPrincipal must be the same as those applied for during security authentication.
l Replace the cluster configuration files, including core-site.xml, hbase-site.xml, hdfs-site.xml, and hive-site.xml, in the resource directory.
l Example of the ParamsConf.properties file
The following is configuration file ParamsConf.properties in the example code.
#checkPoint=/user/test001/checkPoint
groupId=example-group1
topics=hotel,net,check
brokers = 187.5.89.47:21005,187.5.89.66:21005,187.5.89.12:21005
hotelDataPath=/user/test001/HotelData.txt
netDataPath = /user/test001/NetData.txt
checkDataPath=/user/test001/CheckData.txt
zk.quorum=187.5.89.12:24002,187.5.89.66:24002,187.5.89.47:24002
zk.port=24002
#Security authentication#user name userPrincipal=test001
#keytab file path userKeytabPath=/opt/bigdataClient/user.keytab
#krb5 file path krb5ConfPath=/opt/bigdataClient/krb5.conf
1.1.1.2 Development Idea
Figure 1-1 Development idea
![]()
1. Import the hotel accommodation, Internet access information of Internet cafes, and checkpoint identity check information to Kafka.
2. Use SparkSteaming to perform microbatch processing on Kafka data, filter personnel information based on Hive data, and insert the data that meets conditions into HBase.
3. Implement real-time query based on HBase data.
1.1.1.3 Example Code Description
Function Description
Codes are classified into the following:
l com.huawei.hbase.client.DataKafkaProducer
Read preconfigured HDFS data and insert the data into the Kafka message queue.
l com.huawei.hbase.client.SparkStreamingToHbase
Process Kafka message flows in real time, clean data, and save the cleaned data to HBase.
l com.huawei.hbase.client.HbaseQueryByID
Simulate HBase query.
Example Code
Read preconfigured data from HDFS and insert the data into the Kafka queue.
The following code snippet is only an example. For details, see com.huawei.hbase.client.DataKafkaProducer.
package com.huawei.hbase.client
import java.util.Properties
import java.util.logging.Logger
import com.huawei.hadoop.security.LoginUtil
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.{SparkConf, SparkContext}
object DataKafkaProducer {
def main(args: Array[String]): Unit = {
val log = Logger.getLogger("test")
val propertie = new Properties()
val in = this.getClass.getClassLoader().getResourceAsStream("ParamsConf.properties")
propertie.load(in)
// Security authentication
val userPrincipal = propertie.getProperty("userPrincipal")
val userKeytabPath = propertie.getProperty("userKeytabPath")
val krb5ConfPath = propertie.getProperty("krb5ConfPath")
val hadoopConf: Configuration = new Configuration()
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)
log.info("slw---the kerbros is successful")
val conf = new SparkConf().setAppName("kafka-producer")
val sc = new SparkContext(conf)
//defined topic
log.info("slw---begin to load topic")
val topics = propertie.getProperty("topics")
val hotelTopic = (topics.split(","))(0)
val netTopic = (topics.split(","))(1)
val gateTopic = (topics.split(","))(2)
log.info("slw---begin to load brokers")
val brokers = propertie.getProperty("brokers")
//println(brokers.toString)
// Configure the Kafka producer.
//property.put("acks","0")
propertie.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
propertie.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
propertie.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
// Load the file path.
log.info("slw---begin to load file path")
val hotelInputPath = propertie.getProperty("hotelDataPath")
val netInputPath = propertie.getProperty("netDataPath")
val checkInputPath = propertie.getProperty("checkDataPath")
// Spark data entry. Return to the RDD.
val hotelFile = sc.textFile(hotelInputPath)
val netFile = sc.textFile(netInputPath)
val checkFile = sc.textFile(checkInputPath)
//println(hotelFile.foreach( line => )
// print(hotelFile.collect())
// Define the information source as the key so that data can be extracted from the corresponding topic.
hotelFile.foreachPartition(lines => {
val producer = new KafkaProducer[String,String](propertie)
while (lines.hasNext){
var line = lines.next()
println(line)
println("Start sending--hotel--data")
val message = new ProducerRecord[String,String](hotelTopic,"hotel",line)
producer.send(message)
println("Sending succeeded")
}
producer.close()
})
netFile.foreachPartition(lines => {
val producer = new KafkaProducer[String,String](propertie)
while (lines.hasNext){
var line = lines.next()
println(line)
println("Start sending--netBar--data")
val message = new ProducerRecord[String,String](netTopic,"netBar",line)
producer.send(message)
println("Sending succeeded")
}
producer.close()
})
checkFile.foreachPartition(lines => {
val producer = new KafkaProducer[String,String](propertie)
while (lines.hasNext){
var line = lines.next()
println(line)
println("Start sending--check--data")
val message = new ProducerRecord[String,String](gateTopic,"check",line)
producer.send(message)
println("Sending succeeded")
}
producer.close()
})
sc.stop()
}
}
Consume Kafka messages, clean data, and save the cleaned data to HBase.
The following code snippet is only an example. For details, see com.huawei.hbase.client.SparkStreamingToHbase.
val ssc = new StreamingContext(sparkConf,Seconds(2))
// Load the chenkpoint directory.
ssc.checkpoint(checkPoint)
//println(checkPoint.toString)
val brokers = property.getProperty("brokers")
val kafkaParams = Map[String,String]("bootstrap.servers" -> brokers
,"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> groupId)
val topic = property.getProperty("topics").split(",").toSet
val locationStrategy = LocationStrategies.PreferConsistent
val consumerStrategies = ConsumerStrategies.Subscribe[String, String](topic, kafkaParams)
val lines = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategies)
// Obtain the information about the suspects.
val hiveContext = SparkSession.builder().appName("gg").enableHiveSupport().getOrCreate()
import hiveContext.implicits._
hiveContext.sql("use test001")
val controlDataSet = hiveContext.sql("SELECT * FROM controldata")
val pp = controlDataSet.map(x => (x(5).toString,(x(0).toString,x(1).toString,x(2).toString,x(3).toString,x(4).toString,x(6).toString))).rdd
//val pp = controlDataSet.toString().split(",")
// Check the two types of data. If the data meets the surveillance conditions, save the data to HBase.
lines.foreachRDD(rdd => {
val key = rdd.map( x => x.key())
// Obtain each RDD's key from the sending data and determine the data type.
if (key.equals("hotel")){
// Map data in key-value format.
val value = rdd.map(v => v.value().split(","))
val hrdd = value.map(h => (h(1),(h(0),h(2),h(3),h(4),h(5),h(6),h(7))))
// Join the surveillance data and suspects. Filter out the unmatched data.
val joinData = (hrdd.join(pp)).map(x =>
(x._2._1._1,x._1,x._2._1._2,x._2._1._3,
x._2._1._4,x._2._1._5,x._2._1._6,x._2._1._7))
// Traverse the joined data and write the data into HBase.
joinData.foreachPartition( j=> {
val logger = Logger.getLogger("event-alarm")
val myConf = HBaseConfiguration.create()
val tableName = "ControlInfo"
val zkQuorun = property.getProperty("zk.quorum")
val zkPort = property.getProperty("zk.port")
myConf.set("hbase.zookeeper.quorum",zkQuorun)
myConf.set("hbase.zookeeper.property.clientPort",zkPort)
myConf.set("hbase.defaults.for.version.skip","true")
val myTable = new HTable(myConf,TableName.valueOf(tableName))
myTable.setAutoFlush(false,false)
myTable.setWriteBufferSize(3*1024*1024)
while (j.hasNext){
var s = j.next()
val p = new Put(Bytes.toBytes(s._2))
p.add("hotel".getBytes, "name".getBytes, Bytes.toBytes(s._1))
p.add("hotel".getBytes, "identId".getBytes, Bytes.toBytes(s._2))
p.add("hotel".getBytes, "age".getBytes, Bytes.toBytes(s._3))
p.add("hotel".getBytes, "sex".getBytes, Bytes.toBytes(s._4))
p.add("hotel".getBytes, "hotelAddr".getBytes, Bytes.toBytes(s._5))
p.add("hotel".getBytes, "startTime".getBytes, Bytes.toBytes(s._6))
p.add("hotel".getBytes, "leaveTime".getBytes, Bytes.toBytes(s._7))
p.add("hotel".getBytes, "togetherPer".getBytes, Bytes.toBytes(s._8))
myTable.put(p)
}
myTable.flushCommits()
myTable.close()
})
}
if (key.equals("netBar")){
val value = rdd.map(v => v.value().split(","))
val hrdd = value.map(h => (h(1),(h(0),h(2),h(3),h(4),h(5),h(6))))
val joinData = (hrdd.join(pp)).map(x =>
(x._2._1._1,x._1,x._2._1._2,x._2._1._3,
x._2._1._4,x._2._1._5,x._2._1._6))
joinData.foreachPartition( j=> {
val logger = Logger.getLogger("event-alarm")
val myConf = HBaseConfiguration.create()
val tableName = "ControlInfo"
val zkQuorun = property.getProperty("zk.quorum")
val zkPort = property.getProperty("zk.port")
myConf.set("hbase.zookeeper.quorum",zkQuorun)
myConf.set("hbase.zookeeper.property.clientPort",zkPort)
myConf.set("hbase.defaults.for.version.skip","true")
val myTable = new HTable(myConf,TableName.valueOf(tableName))
myTable.setAutoFlush(false,false)
myTable.setWriteBufferSize(3*1024*1024)
while (j.hasNext){
var s = j.next()
val p = new Put(Bytes.toBytes(s._2))
p.add("netBar".getBytes, "name".getBytes, Bytes.toBytes(s._1))
p.add("netBar".getBytes, "identId".getBytes, Bytes.toBytes(s._2))
p.add("netBar".getBytes, "age".getBytes, Bytes.toBytes(s._3))
p.add("netBar".getBytes, "sex".getBytes, Bytes.toBytes(s._4))
p.add("netBar".getBytes, "netBarAddr".getBytes, Bytes.toBytes(s._5))
p.add("netBar".getBytes, "netDate".getBytes, Bytes.toBytes(s._6))
p.add("netBar".getBytes, "netTime".getBytes, Bytes.toBytes(s._7))
myTable.put(p)
}
myTable.flushCommits()
myTable.close()
})
}
if (key.equals("check")){
val value = rdd.map(v => v.value().split(","))
val hrdd = value.map(h => (h(1),(h(0),h(2),h(3),h(4),h(5),h(6))))
val joinData = (hrdd.join(pp)).map(x =>
(x._2._1._1,x._1,x._2._1._2,x._2._1._3,
x._2._1._4,x._2._1._5,x._2._1._6))
joinData.foreachPartition( j=> {
val logger = Logger.getLogger("event-alarm")
val myConf = HBaseConfiguration.create()
val tableName = "ControlInfo"
val zkQuorun = property.getProperty("zk.quorum")
val zkPort = property.getProperty("zk.port")
myConf.set("hbase.zookeeper.quorum",zkQuorun)
myConf.set("hbase.zookeeper.property.clientPort",zkPort)
myConf.set("hbase.defaults.for.version.skip","true")
val myTable = new HTable(myConf,TableName.valueOf(tableName))
myTable.setAutoFlush(false,false)
myTable.setWriteBufferSize(3*1024*1024)
while (j.hasNext){
var s = j.next()
val p = new Put(Bytes.toBytes(s._2))
p.add("gateCheck".getBytes, "name".getBytes, Bytes.toBytes(s._1))
p.add("gateCheck".getBytes, "identId".getBytes, Bytes.toBytes(s._2))
p.add("gateCheck".getBytes, "age".getBytes, Bytes.toBytes(s._3))
p.add("gateCheck".getBytes, "sex".getBytes, Bytes.toBytes(s._4))
p.add("gateCheck".getBytes, "checkLocation".getBytes, Bytes.toBytes(s._5))
p.add("gateCheck".getBytes, "checkTime".getBytes, Bytes.toBytes(s._6))
p.add("gateCheck".getBytes, "outType".getBytes, Bytes.toBytes(s._7))
myTable.put(p)
}
myTable.flushCommits()
myTable.close()
})
}
})
1.1.1.4 Obtaining Example Codes
Maven Project Mode
Download the code under solutions/realtimeProcessing/RealTimeProcessingScala from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home
1.1.1.5 Application Commissioning
Procedure
Pack the com.huawei.hbase.client.DataKafkaProducer and com.huawei.hbase.client.SparkStreamingToHbase main classes and upload the packages to the cluster. Run the steaming main class to monitor data because it is streaming computing. Then start the producer. If the producer successfully sends data, the streaming obtains the monitored data and processes it.
![]()
This section is used only for the preceding example. For details about how to commission Spark tasks, see the commissioning methods of the Spark component.
