1.1.1 Case 1: Financial Data Analysis
1.1.1.1 Scenarios
Applicable Versions
FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80
Scenario Description
In a public security project, FusionInsight obtains fund flow information of bank accounts, analyzes transaction data to identify the accounts with abnormal fund flows, and further determines whether money laundering exists.
Data Planning
Data source design:
l Bank transaction information: Bank account, account name, ID number, transaction date, transaction amount, transaction type (online or offline banking transfer), target account, target account name, target account ID number, and remarks
Theme library design:
l Account information: Bank account, account name, and ID number
l Transaction information library: Bank account, transaction date, transaction amount, transaction type, target account, and remarks
l Money-laundering suspicion library: Bank ID and identification date
Query:
Operators can execute SQL statements to query information, for example, the transaction times, transaction limit, and transaction object of an account in a specified period.
Preset Data
l Bank transaction information
Import the bank account, account name, ID number, transaction date, transaction amount, transaction type (online or offline banking transfer), target account, target account name, target account ID number, and remarks to HDFS.
The following is an example, and the data is separated by pipe characters (|):
1112|jack|23265656|2018-12-03|500|InterBank|154564654|dsas|4856445445|bei
Add the prepared data to the /qlz/test2/part-00000 file in HDFS. The file path must be the same as the value of inputPath in the Fproducer.properties file.
l Modify the user authentication information in the Fproducer.properties file.
The values of userKeytabPath, krb4ConfPath, and userPrincipal must be the same as those applied for during security authentication.
userName and password are used to log in to Elk for security authentication.
![]()
In this example, you must have the operation permissions on Kafka, HDFS, Elk, and Spark.
l Replace the cluster configuration files, including core-site.xml, hbase-site.xml, hdfs-site.xml, and hive-site.xml, in the resource folder.
l Example of the Fproducer.properties file
The following is configuration file Fproducer.properties in the example code.
# Security authentication file path.
Download authentication files from the FusionInsight Manager. For details, see
the corresponding operation guide.
userKeytabPath = /opt/qlzClient/qlz_keytab/user.keytab
krb5ConfPath = /opt/qlzClient/qlz_keytab/krb5.conf
userPrincipal = qlz
# Checkpoint file path
chekPath = /qlz/chekpoint
# Kafka configuration information
topic=test002
brokers = 187.5.89.47:21005,187.5.89.12:21005,187.5.89.66:21005,187.5.88.163:21005
groupId = example-group1
# Original data path
inputPath = /qlz/test2/part-00000
# Transaction record file path
tradeInfoPath = /qlz/tradeInfo
# Account information path
accountInfoPath = /qlz/accountInfo
# Money-laundering account information path
launderPath = /qlz/launderAccount
# Connect to Elk.
userName = jack
password=Qlz0607@
file =/qlz/test
1.1.1.2 Development Idea
Figure 1-1 Development idea
![]()
1. Import bank transaction data to Kafka in real time.
2. Use SparkSteaming to perform microbatch processing on Kafka data and save the cleaned data to Elk.
3. Perform SQL analysis based on the data in Elk to obtain accounts suspected of money laundering and import them to the theme library.
1.1.1.3 Example Code Description
Function Description
Codes are classified into the following:
l main.scala.com.zr.financialElk.DataProducer
Read preconfigured HDFS data and insert the data into the Kafka message queue.
l main.scala.com.zr.financialElk.KafkaStreaming
Process Kafka message flows in real time, clean data, and save the cleaned data to Elk.
l main.scala.com.zr.financialElk.LaunderMoney
Process data in Elk, obtain the accounts suspected of money laundering in advance, and import them to the theme library.
Example Code
Read preconfigured data from HDFS and insert the data into the Kafka queue.
The following code snippet is only an example. For details, see main.scala.com.zr.financialElk.DataProducer.
// Read the configuration file.
val propertie = new Properties()
val in = this.getClass.getClassLoader().getResourceAsStream("Fproducer.properties")
propertie.load(in)
// Perform security authentication.
val userPrincipal = propertie.getProperty("userPrincipal")
val userKeytabPath = propertie.getProperty("userKeytabPath")
val krb5ConfPath = propertie.getProperty("krb5ConfPath")
val hadoopConf: Configuration = new Configuration()
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)
// Load Spark configurations.
val conf = new SparkConf().setAppName("write to kafka").setMaster("local[1]")
val sc = new SparkContext(conf)
val chekPath = propertie.getProperty("chekPath")
println(chekPath)
// Create a topic.
val topic = propertie.getProperty("topic")
// Configure Kafka cluster information.
val brokers = propertie.getProperty("brokers")
val property = new Properties()
property.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
property.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
property.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Original data file path
val inputPath = propertie.getProperty("inputPath")
println("Original data" + inputPath)
// Read the original data.
val file = sc.textFile(inputPath)
file.foreachPartition((partisions: Iterator[String]) => {
// Create a producer.
val producer = new KafkaProducer[String, String](property)
partisions.foreach(x => {
val message = new ProducerRecord[String, String](topic.toString, x.toString)
// The producer writes messages to Kafka.
try {
producer.send(message)
println("producer working completed")
}
catch {
case e: Exception => {
println("Task not serialized")
}
}
})
producer.close()
})
}
Consume Kafka messages, clean data, and save the cleaned data to Elk.
The following code snippet is only an example. For details, see main.scala.com.zr.financialElk.KafkaStreaming.
// Load configurations.
val propertie = new Properties()
val in = this.getClass.getClassLoader().getResourceAsStream("Fproducer.properties")
propertie.load(in)
// Perform security authentication.
val userPrincipal = propertie.getProperty("userPrincipal")
val userKeytabPath = propertie.getProperty("userKeytabPath")
val krb5ConfPath = propertie.getProperty("krb5ConfPath")
val hadoopConf: Configuration = new Configuration()
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)
// Username and password used for Elk connection
val userName = propertie.getProperty("userName")
val pw = propertie.getProperty("password")
// Create an Elk connection.
val conn = ElkConnection.getConnection(userName, pw)
// Load Spark configurations.
val conf = new SparkConf().setMaster("local[2]").setAppName("kafka")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint(propertie.getProperty("chekPath"))
// Kafka topics
val topics = propertie.getProperty("topic").split("\\|").toSet
// Configure the Kafka cluster.
val groupId = propertie.getProperty("groupId")
val brokers = propertie.getProperty("brokers")
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokers,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> groupId
)
/**
* bankCount, name, IDCard, tradeDate, tradeAmount
* tradeType (online or offline banking transfer)
* desBankCount, desName, desIDCard, remark
*/
// Create the tradeInfo table.
val sqlText1 = "CREATE TABLE tradeInfo(bankCount VARCAHR(128),name VARCAHR(128)," +
"IDCard VARCAHR(128),tradeDate VARCAHR(128),tradeAmount INTEGER,tradeType VARCHAR(32)," +
"desBankCount VARCAHR(128),desName VARCAHR(128),desIDCard VARCHAR(128),remark VARCAHR(128));"
ElkConnection.createTable(conn, sqlText1)
// Create the accountInfo table.
val sqlText2 = "CREATE TABLE accountInfo(bankCount VARCAHR(128),name VARCAHR(128),IDCard VARCAHR(128));"
ElkConnection.createTable(conn, sqlText2)
val locationStrategy = LocationStrategies.PreferConsistent
val consumerStrategies = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
// Obtain Kafka data.
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategies)
kafkaStream.flatMap(_.value()).foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val tradeInfo = rdd.flatMap(_.toString().split("\\|")).filter(x => x.length == 10).map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9)))
val accountInfo1 = rdd.flatMap(_.toString().split("\\|")).filter(x => x.length == 10).map(x => (x(0), x(1), x(2)))
val accountInfo2 = rdd.flatMap(_.toString().split("\\|")).filter(x => x.length == 10).map(x => (x(6), x(7), x(8)))
val accountInfo = accountInfo1.union(accountInfo1)
tradeInfo.foreach(x => {
var pst: PreparedStatement = null
try { // Add data.
pst = conn.prepareStatement("INSERT INTO tradeInfo VALUES (?,?,?,?,?,?,?,?,?,?)")
pst.setString(1, x._1.toString)
pst.setString(2, x._2.toString)
pst.setString(3, x._3.toString)
pst.setString(4, x._4.toString)
pst.setInt(5, x._5.toInt)
pst.setString(6, x._6.toString)
pst.setString(7, x._7.toString)
pst.setString(8, x._8.toString)
pst.setString(9, x._9.toString)
pst.setString(10, x._10.toString)
pst.addBatch()
// Perform batch insertion.
pst.executeBatch()
pst.close
} catch {
case e: SQLException =>
if (pst != null) try
pst.close
catch {
case e1: SQLException =>
e1.printStackTrace()
}
e.printStackTrace()
}
})
accountInfo.foreach(x => {
var pst: PreparedStatement = null
try { // Add data.
pst = conn.prepareStatement("INSERT INTO tradeInfo VALUES (?,?,?,?,?,?,?,?,?,?)")
pst.setString(1, x._1.toString)
pst.setString(2, x._2.toString)
pst.setString(3, x._3.toString)
pst.addBatch()
// Perform batch insertion.
pst.executeBatch()
pst.close
} catch {
case e: SQLException =>
if (pst != null) try
pst.close
catch {
case e1: SQLException =>
e1.printStackTrace()
}
e.printStackTrace()
}
})
}
})
ssc.start()
ssc.awaitTermination()
Process data in Elk, obtain the accounts suspected of money laundering, and import them to the theme library.
The following code snippet is only an example. For details, see main.scala.com.zr.financialElk.LaunderMoney.
// Load configurations.
val propertie = new Properties()
val in = this.getClass.getClassLoader().getResourceAsStream("Fproducer.properties")
propertie.load(in)
// Perform security authentication.
val userPrincipal = propertie.getProperty("userPrincipal")
val userKeytabPath = propertie.getProperty("userKeytabPath")
val krb5ConfPath = propertie.getProperty("krb5ConfPath")
val hadoopConf: Configuration = new Configuration()
LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)
// Elk username and password
val userName = propertie.getProperty("userName")
val pw = propertie.getProperty("password")
// Create an Elk connection.
val conn = ElkConnection.getConnection(userName, pw)
var pst: PreparedStatement = null
val sql1 = "select bankCount from tradeInfo group by bankCount"
pst = conn.prepareStatement(sql1)
val rs = pst.executeQuery()
// Identify the accounts that are suspected of money laundering.
val launderAcount: List[String] = List()
while (rs.next()) {
val account = rs.getString(1)
val sql2 = s"select desBankCount from tradeInfo where bankCount=$account group by desBankCount"
pst = conn.prepareStatement(sql2)
val rs2 = pst.executeQuery()
while (rs2.next()) {
val account2 = rs2.getString(1)
if (!account2.equals(account)) {
val sql3 = s"select desBankCount from tradeInfo where bankCount=$account2 group by desBankCount"
pst = conn.prepareStatement(sql3)
val rs3 = pst.executeQuery()
while (rs3.next()) {
val account3 = rs3.getString(1)
if (account3.equals(account)) {
launderAcount.+:(account)
} else {
val sql4 = s"select desBankCount from tradeInfo where bankCount=$account3 group by desBankCount"
pst = conn.prepareStatement(sql4)
val rs4 = pst.executeQuery()
while (rs4.next()) {
val account4 = rs4.getString(1)
if (account4.equals(account)) {
launderAcount.+:(account)
} else {
val sql5 = s"select desBankCount from tradeInfo where bankCount=$account4 group by desBankCount"
pst = conn.prepareStatement(sql5)
val rs5 = pst.executeQuery()
while (rs5.next()) {
val account5 = rs5.getString(1)
if (account5.equals(account)) {
launderAcount.+:(account)
}
}
}
}
}
}
}
}
}
val time = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
val sqlText = "CREATE TABLE accountInfo(bankCount VARCAHR(128),recognize VARCAHR(128));"
ElkConnection.createTable(conn, sqlText)
launderAcount.foreach(x => {
try { // Generate pre-processing statements.
val time = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
pst = conn.prepareStatement("INSERT INTO tradeInfo VALUES (?,?,?,?,?,?,?,?,?,?)")
pst.setString(1, x.toString)
pst.setString(2, time)
pst.addBatch()
// Perform batch insertion.
pst.executeBatch()
pst.close
} catch {
case e: SQLException =>
if (pst != null) try
pst.close
catch {
case e1: SQLException =>
e1.printStackTrace()
}
e.printStackTrace()
}
})
}
1.1.1.4 Obtaining Example Codes
Maven Project Mode
Download the code under solutions/interactiveQuery/offineProcessfinancialElk from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home
1.1.1.5 Application Commissioning
Procedure
1. Pack DataProducer, KafkaStreaming, and LaunderMoney into three JAR files, respectively. Upload the files to the server where the Spark client is installed.
2. After the upload is complete, execute the following Spark tasks in sequence: KafkaStreaming, DataProducer, and LaunderMoney.
3. Save the executed tasks in the accountInfo table of Elk.
![]()
This section is used only for the preceding example. For details about how to commission Spark tasks, see the commissioning methods of the Spark component.
This post was last edited by chz at 2018-08-03 06:32.
