1.1.1 Case 1: Public Security Service
1.1.1.1 Scenarios
Applicable Versions
FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80
Scenario Description
In a public security project, FusionInsight obtains fund flow information of bank accounts, analyzes transaction data to identify the accounts with abnormal fund flows, and further determines whether money laundering exists.
Data Planning
Data source design:
l Hotel accommodation: Name, ID number, age, gender, hotel address, check-in time, check-out time, and companion
l Internet access in Internet cafes: Name, ID number, age, gender, Internet cafe address, Internet access date, and duration
l Checkpoint identity verification: Name, ID number, age, gender, checkpoint location, verification time, and travel mode (self-driving, taking vehicles, or walking)
Query:
l Operators can quickly search for target personnel information.
Preset Data
l Use hotel accommodation as an example: The data is separated by pipe characters (|).
Zhang San|41541|23|male|Home Inn|2018-03-02|2018-03-21|Zhang Fei
Save the prepared data to hdfs inputPath = /qlz/hotel.
l Modify the user authentication information in the p***.properties file.
The values of userKeytabPath, krb4ConfPath, and userPrincipal must be the same as those applied for during security authentication.
l Replace the cluster configuration files, including core-site.xml, hbase-site.xml, hdfs-site.xml, and hive-site.xml, in the resource directory.
l Example of the p***.properties file
The following is configuration file p***.properties in the example code.
# Security authentication file
path
userKeytabPath = /opt/qlzClient/qlz_keytab/user.keytab
krb5ConfPath = /opt/qlzClient/qlz_keytab/krb5.conf
userPrincipal = qlz
# Checkpoint file path
chekPath = /qlz/chekpoint
# Kafka configuration information
topic=test002
brokers =
187.5.89.47:21005,187.5.89.12:21005,187.5.89.66:21005,187.5.88.163:21005
groupId = example-group1
# Original data path
inputPath = /qlz/hotel
# HBase table name
tableName=POLICE_INFO
queryCondition = id
port = 24002
zkquorum =
187.5.89.12:24002,187.5.89.66:24002,187.5.89.47:24002
1.1.1.2 Development Idea
Figure 1-1 Development idea
![]()
1. Import in real time the hotel accommodation, Internet access information of Internet cafes, and checkpoint identity check information to Kafka.
2. Use SparkSteaming to perform microbatch processing on Kafka data and save the cleaned data to HBase.
3. Create indexes for the key fields in HBase information.
1.1.1.3 Example Code Description
Function Description
Codes are classified into the following:
l main.scala.com.zr.p***.DataProducer
Read preconfigured HDFS data and insert the data into the Kafka message queue.
l main.scala.com.zr.p***.KafkaStreaming
Process Kafka message flows in real time, clean data, and save the cleaned data to HBase and Solr.
l main.scala.com.zr.p***.SolrSearch and main.scala.com.zr.p***.HbaseQuery
Simulate HBase query.
Example Code
Read preconfigured data from HDFS and insert the data into the Kafka queue.
The following code snippet is only an example. For details, see main.scala.com.zr.p***.DataProducer.
// Original data file path
val inputPath = propertie.getProperty("inputPath")
if (inputPath.contains("hotel")) {
val file = sc.textFile(inputPath)
file.foreach(x => {
println("file content: " + x)
})
file.foreachPartition((partisions: Iterator[String]) => {
// Create a producer.
val producer = new KafkaProducer[String, String](property)
partisions.foreach(x => {
val message = new ProducerRecord[String, String](topic.toString, "hotel", x.toString)
// The producer writes messages to Kafka.
try {
producer.send(message)
print("producer working completed")
}
catch {
case e: Exception => {
print("Task not serialized")
}
}
})
producer.close()
})
} else if (inputPath.contains("internet")) {
val file = sc.textFile(inputPath)
file.foreach(x => {
println("file content: " + x)
})
file.foreachPartition((partisions: Iterator[String]) => {
// Create a producer.
val producer = new KafkaProducer[String, String](property)
partisions.foreach(x => {
val message = new ProducerRecord[String, String](topic.toString, "internet", x.toString)
// The producer writes messages to Kafka.
try {
producer.send(message)
print("producer working completed")
}
catch {
case e: Exception => {
print("Task not serialized")
}
}
})
producer.close()
})
} else {
val file = sc.textFile(inputPath)
file.foreach(x => {
println("file content: " + x)
})
file.foreachPartition((partisions: Iterator[String]) => {
// Create a producer.
val producer = new KafkaProducer[String, String](property)
partisions.foreach(x => {
val message = new ProducerRecord[String, String](topic.toString, "bayonet", x.toString)
// The producer writes messages to Kafka.
try {
producer.send(message)
print("producer working completed")
}
catch {
case e: Exception => {
print("Task not serialized")
}
}
})
Consume Kafka messages, clean data, and save the cleaned data to HBase and Solr.
The following code snippet is only an example. For details, see main.scala.com.zr.p***.KafkaStreaming.
kafkaStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
rdd.map(x => (x.key(), x.value())).map(s => {
val split = s._2.split("\\|")
var put = new Put(Bytes.toBytes(split(1)))
// Add data to HBase.
put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("name"), Bytes.toBytes(split(0)))
put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("age"), Bytes.toBytes(split(2)))
put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("name"), Bytes.toBytes(split(3)))
put.addColumn(Bytes.toBytes("Basic"), Bytes.toBytes("sex"), Bytes.toBytes(split(4)))
if (s._1.equals("hotel")) {
// Add indexes (name, address, and time).
SolrSearch.addDocs(cloudSolrClient, split(0), split(5), split(6), split(1))
// Add data to HBase.
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("hotelAddr"), Bytes.toBytes(split(5)))
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("checkInTime"), Bytes.toBytes(split(6)))
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("checkOutTime"), Bytes.toBytes(split(7)))
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("acquaintance"), Bytes.toBytes(split(8)))
(new ImmutableBytesWritable, put)
} else if (s._1.equals("internet")) {
// Add indexes (name, address, and time).
SolrSearch.addDocs(cloudSolrClient, split(0), split(5), split(6), split(1))
// Add data to HBase.
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("barAddr"), Bytes.toBytes(split(5)))
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("internetDate"), Bytes.toBytes(split(6)))
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("timeSpent"), Bytes.toBytes(split(7)))
(new ImmutableBytesWritable, put)
} else {
// Add indexes (name, address, and time).
SolrSearch.addDocs(cloudSolrClient, split(0), split(5), split(6), split(1))
// Add data to HBase.
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("bayonetAddr"), Bytes.toBytes(split(5)))
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("checkDate"), Bytes.toBytes(split(6)))
put.addColumn(Bytes.toBytes("OtherInfo"), Bytes.toBytes("tripType"), Bytes.toBytes(split(7)))
(new ImmutableBytesWritable, put)
}
}).saveAsHadoopDataset(jobConf)
}
})
1.1.1.4 Obtaining Example Codes
Maven Project Mode
Download the code under solutions/realtimeRetrieval/RealtimeRetrieval from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home
1.1.1.5 Application Commissioning
Procedure
1. Pack codes into two JAR files: one file's main function class is producer, and the other's main function class is KafkaStreaming. Upload the files to the cluster.
![]()
When packing the producer code, you need to pack the kafka-clients-0.10.0.1 dependency package together. When packing the kafkaStreaming code, you need to pack the kafka-clients-0.10.0.1 and kafka_2.11-0.10.0.1 dependency packages together. Otherwise, the error message "Class Not Found" is displayed.
2. Upload solr-solrj-6.2.0.jar and noggit-0.6.jar to a folder in the cluster, for example, /opt/spark.
3. Run the kafkaStreaming code and then the producer code.
![]()
This section is used only for the preceding example. For details about how to commission Spark tasks, see the commissioning methods of the Spark component.
