Got it

solution: Interactive Query -Case 1

Latest reply: Aug 3, 2018 04:00:54 913 3 0 0 0

1.1.1 Case 1: Financial Data Analysis

1.1.1.1 Scenarios

Applicable Versions

FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80

Scenario Description

In a public security project, FusionInsight obtains fund flow information of bank accounts, analyzes transaction data to identify the accounts with abnormal fund flows, and further determines whether money laundering exists.

Data Planning

Data source design:

l Bank transaction information: Bank account, account name, ID number, transaction date, transaction amount, transaction type (online or offline banking transfer), target account, target account name, target account ID number, and remarks

Theme library design:

l Account information: Bank account, account name, and ID number

l Transaction information library: Bank account, transaction date, transaction amount, transaction type, target account, and remarks

l Money-laundering suspicion library: Bank ID and identification date

Query:

Operators can execute SQL statements to query information, for example, the transaction times, transaction limit, and transaction object of an account in a specified period.

Preset Data

l Bank transaction information

Import the bank account, account name, ID number, transaction date, transaction amount, transaction type (online or offline banking transfer), target account, target account name, target account ID number, and remarks to HDFS.

The following is an example, and the data is separated by pipe characters (|):

1112|jack|23265656|2018-12-03|500|InterBank|154564654|dsas|4856445445|bei

Add the prepared data to the /qlz/test2/part-00000 file in HDFS. The file path must be the same as the value of inputPath in the Fproducer.properties file.

l Modify the user authentication information in the Fproducer.properties file.

The values of userKeytabPath, krb4ConfPath, and userPrincipal must be the same as those applied for during security authentication.

userName and password are used to log in to Elk for security authentication.

note

In this example, you must have the operation permissions on Kafka, HDFS, Elk, and Spark.

l Replace the cluster configuration files, including core-site.xml, hbase-site.xml, hdfs-site.xml, and hive-site.xml, in the resource folder.

l Example of the Fproducer.properties file

The following is configuration file Fproducer.properties in the example code.

# Security authentication file path. Download authentication files from the FusionInsight Manager. For details, see the corresponding operation guide.
userKeytabPath = /opt/qlzClient/qlz_keytab/user.keytab
krb5ConfPath = /opt/qlzClient/qlz_keytab/krb5.conf
userPrincipal = qlz
# Checkpoint file path
chekPath = /qlz/chekpoint
# Kafka configuration information
topic=test002
brokers = 187.5.89.47:21005,187.5.89.12:21005,187.5.89.66:21005,187.5.88.163:21005
groupId = example-group1
# Original data path
inputPath = /qlz/test2/part-00000
# Transaction record file path
tradeInfoPath = /qlz/tradeInfo
# Account information path
accountInfoPath = /qlz/accountInfo
# Money-laundering account information path
launderPath = /qlz/launderAccount
# Connect to Elk.
userName = jack
password=Qlz0607@
file =/qlz/test

1.1.1.2 Development Idea

Figure 1-1 Development idea

20180803114333467002.png

1. Import bank transaction data to Kafka in real time.

2. Use SparkSteaming to perform microbatch processing on Kafka data and save the cleaned data to Elk.

3. Perform SQL analysis based on the data in Elk to obtain accounts suspected of money laundering and import them to the theme library.

1.1.1.3 Example Code Description

Function Description

Codes are classified into the following:

l main.scala.com.zr.financialElk.DataProducer

Read preconfigured HDFS data and insert the data into the Kafka message queue.

l main.scala.com.zr.financialElk.KafkaStreaming

Process Kafka message flows in real time, clean data, and save the cleaned data to Elk.

l main.scala.com.zr.financialElk.LaunderMoney

Process data in Elk, obtain the accounts suspected of money laundering in advance, and import them to the theme library.

Example Code

Read preconfigured data from HDFS and insert the data into the Kafka queue.

The following code snippet is only an example. For details, see main.scala.com.zr.financialElk.DataProducer.

// Read the configuration file.

val propertie = new Properties()

val in = this.getClass.getClassLoader().getResourceAsStream("Fproducer.properties")

propertie.load(in)

// Perform security authentication.

val userPrincipal = propertie.getProperty("userPrincipal")

val userKeytabPath = propertie.getProperty("userKeytabPath")

val krb5ConfPath = propertie.getProperty("krb5ConfPath")

val hadoopConf: Configuration = new Configuration()

LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)

// Load Spark configurations.

val conf = new SparkConf().setAppName("write to kafka").setMaster("local[1]")

val sc = new SparkContext(conf)

val chekPath = propertie.getProperty("chekPath")

println(chekPath)

// Create a topic.

val topic = propertie.getProperty("topic")

// Configure Kafka cluster information.

val brokers = propertie.getProperty("brokers")

val property = new Properties()

property.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)

property.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

property.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

// Original data file path

val inputPath = propertie.getProperty("inputPath")

println("Original data" + inputPath)

// Read the original data.

val file = sc.textFile(inputPath)

file.foreachPartition((partisions: Iterator[String]) => {

// Create a producer.

val producer = new KafkaProducer[String, String](property)

partisions.foreach(x => {

val message = new ProducerRecord[String, String](topic.toString, x.toString)

// The producer writes messages to Kafka.

try {

producer.send(message)

println("producer working completed")

}

catch {

case e: Exception => {

println("Task not serialized")

}

}

})

producer.close()

})

}

Consume Kafka messages, clean data, and save the cleaned data to Elk.

The following code snippet is only an example. For details, see main.scala.com.zr.financialElk.KafkaStreaming.

// Load configurations.

val propertie = new Properties()

val in = this.getClass.getClassLoader().getResourceAsStream("Fproducer.properties")

propertie.load(in)

// Perform security authentication.

val userPrincipal = propertie.getProperty("userPrincipal")

val userKeytabPath = propertie.getProperty("userKeytabPath")

val krb5ConfPath = propertie.getProperty("krb5ConfPath")

val hadoopConf: Configuration = new Configuration()

LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)

// Username and password used for Elk connection

val userName = propertie.getProperty("userName")

val pw = propertie.getProperty("password")

// Create an Elk connection.

val conn = ElkConnection.getConnection(userName, pw)

// Load Spark configurations.

val conf = new SparkConf().setMaster("local[2]").setAppName("kafka")

val sc = new SparkContext(conf)

val ssc = new StreamingContext(conf, Seconds(2))

ssc.checkpoint(propertie.getProperty("chekPath"))

// Kafka topics

val topics = propertie.getProperty("topic").split("\\|").toSet

// Configure the Kafka cluster.

val groupId = propertie.getProperty("groupId")

val brokers = propertie.getProperty("brokers")

val kafkaParams = Map[String, String](

"bootstrap.servers" -> brokers,

"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",

"group.id" -> groupId

)

/**

* bankCount, name, IDCard, tradeDate, tradeAmount

* tradeType (online or offline banking transfer)

* desBankCount, desName, desIDCard, remark

*/

// Create the tradeInfo table.

val sqlText1 = "CREATE TABLE tradeInfo(bankCount VARCAHR(128),name VARCAHR(128)," +

"IDCard VARCAHR(128),tradeDate VARCAHR(128),tradeAmount INTEGER,tradeType VARCHAR(32)," +

"desBankCount VARCAHR(128),desName VARCAHR(128),desIDCard VARCHAR(128),remark VARCAHR(128));"

ElkConnection.createTable(conn, sqlText1)

// Create the accountInfo table.

val sqlText2 = "CREATE TABLE accountInfo(bankCount VARCAHR(128),name VARCAHR(128),IDCard VARCAHR(128));"

ElkConnection.createTable(conn, sqlText2)

val locationStrategy = LocationStrategies.PreferConsistent

val consumerStrategies = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

// Obtain Kafka data.

val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategies)

kafkaStream.flatMap(_.value()).foreachRDD(rdd => {

if (!rdd.isEmpty()) {

val tradeInfo = rdd.flatMap(_.toString().split("\\|")).filter(x => x.length == 10).map(x => (x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9)))

val accountInfo1 = rdd.flatMap(_.toString().split("\\|")).filter(x => x.length == 10).map(x => (x(0), x(1), x(2)))

val accountInfo2 = rdd.flatMap(_.toString().split("\\|")).filter(x => x.length == 10).map(x => (x(6), x(7), x(8)))

val accountInfo = accountInfo1.union(accountInfo1)

tradeInfo.foreach(x => {

var pst: PreparedStatement = null

try { // Add data.

pst = conn.prepareStatement("INSERT INTO tradeInfo VALUES (?,?,?,?,?,?,?,?,?,?)")

pst.setString(1, x._1.toString)

pst.setString(2, x._2.toString)

pst.setString(3, x._3.toString)

pst.setString(4, x._4.toString)

pst.setInt(5, x._5.toInt)

pst.setString(6, x._6.toString)

pst.setString(7, x._7.toString)

pst.setString(8, x._8.toString)

pst.setString(9, x._9.toString)

pst.setString(10, x._10.toString)

pst.addBatch()

// Perform batch insertion.

pst.executeBatch()

pst.close

} catch {

case e: SQLException =>

if (pst != null) try

pst.close

catch {

case e1: SQLException =>

e1.printStackTrace()

}

e.printStackTrace()

}

})

accountInfo.foreach(x => {

var pst: PreparedStatement = null

try { // Add data.

pst = conn.prepareStatement("INSERT INTO tradeInfo VALUES (?,?,?,?,?,?,?,?,?,?)")

pst.setString(1, x._1.toString)

pst.setString(2, x._2.toString)

pst.setString(3, x._3.toString)

pst.addBatch()

// Perform batch insertion.

pst.executeBatch()

pst.close

} catch {

case e: SQLException =>

if (pst != null) try

pst.close

catch {

case e1: SQLException =>

e1.printStackTrace()

}

e.printStackTrace()

}

})

}

})

ssc.start()

ssc.awaitTermination()

Process data in Elk, obtain the accounts suspected of money laundering, and import them to the theme library.

The following code snippet is only an example. For details, see main.scala.com.zr.financialElk.LaunderMoney.

// Load configurations.

val propertie = new Properties()

val in = this.getClass.getClassLoader().getResourceAsStream("Fproducer.properties")

propertie.load(in)

// Perform security authentication.

val userPrincipal = propertie.getProperty("userPrincipal")

val userKeytabPath = propertie.getProperty("userKeytabPath")

val krb5ConfPath = propertie.getProperty("krb5ConfPath")

val hadoopConf: Configuration = new Configuration()

LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf)

// Elk username and password

val userName = propertie.getProperty("userName")

val pw = propertie.getProperty("password")

// Create an Elk connection.

val conn = ElkConnection.getConnection(userName, pw)

var pst: PreparedStatement = null

val sql1 = "select bankCount from tradeInfo group by bankCount"

pst = conn.prepareStatement(sql1)

val rs = pst.executeQuery()

// Identify the accounts that are suspected of money laundering.

val launderAcount: List[String] = List()

while (rs.next()) {

val account = rs.getString(1)

val sql2 = s"select desBankCount from tradeInfo where bankCount=$account group by desBankCount"

pst = conn.prepareStatement(sql2)

val rs2 = pst.executeQuery()

while (rs2.next()) {

val account2 = rs2.getString(1)

if (!account2.equals(account)) {

val sql3 = s"select desBankCount from tradeInfo where bankCount=$account2 group by desBankCount"

pst = conn.prepareStatement(sql3)

val rs3 = pst.executeQuery()

while (rs3.next()) {

val account3 = rs3.getString(1)

if (account3.equals(account)) {

launderAcount.+:(account)

} else {

val sql4 = s"select desBankCount from tradeInfo where bankCount=$account3 group by desBankCount"

pst = conn.prepareStatement(sql4)

val rs4 = pst.executeQuery()

while (rs4.next()) {

val account4 = rs4.getString(1)

if (account4.equals(account)) {

launderAcount.+:(account)

} else {

val sql5 = s"select desBankCount from tradeInfo where bankCount=$account4 group by desBankCount"

pst = conn.prepareStatement(sql5)

val rs5 = pst.executeQuery()

while (rs5.next()) {

val account5 = rs5.getString(1)

if (account5.equals(account)) {

launderAcount.+:(account)

}

}

}

}

}

}

}

}

}

val time = new SimpleDateFormat("yyyy-MM-dd").format(new Date())

val sqlText = "CREATE TABLE accountInfo(bankCount VARCAHR(128),recognize VARCAHR(128));"

ElkConnection.createTable(conn, sqlText)

launderAcount.foreach(x => {

try { // Generate pre-processing statements.

val time = new SimpleDateFormat("yyyy-MM-dd").format(new Date())

pst = conn.prepareStatement("INSERT INTO tradeInfo VALUES (?,?,?,?,?,?,?,?,?,?)")

pst.setString(1, x.toString)

pst.setString(2, time)

pst.addBatch()

// Perform batch insertion.

pst.executeBatch()

pst.close

} catch {

case e: SQLException =>

if (pst != null) try

pst.close

catch {

case e1: SQLException =>

e1.printStackTrace()

}

e.printStackTrace()

}

})

}

1.1.1.4 Obtaining Example Codes

Maven Project Mode

Download the code under solutions/interactiveQuery/offineProcessfinancialElk from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

1.1.1.5 Application Commissioning

Procedure

1. Pack DataProducer, KafkaStreaming, and LaunderMoney into three JAR files, respectively. Upload the files to the server where the Spark client is installed.

2. After the upload is complete, execute the following Spark tasks in sequence: KafkaStreaming, DataProducer, and LaunderMoney.

3. Save the executed tasks in the accountInfo table of Elk.

note

This section is used only for the preceding example. For details about how to commission Spark tasks, see the commissioning methods of the Spark component.

This post was last edited by chz at 2018-08-03 06:32.

This article contains more resources

You need to log in to download or view. No account? Register

x

welcome
View more
  • x
  • convention:

Thanks
View more
  • x
  • convention:

Better to add link to the document as well
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.