This post refers to the process of producing and consuming data in Kafka. Please see more details below.
1.1.1 Case 2: Producing and Consuming Data in Kafka
1.1.1.1 Scenarios
Applicable Versions
FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80
Scenario Description
Assume that a Flink service receives one message per second.
Develop a Flink application that can create messages with specific prefixes in real time based on certain service requirements.
Data Planning
Data of the Flink example project is saved in the Kafka component. A user with Kafka permissions can send data to and receive data from Kafka.
1. Ensure that the clusters, including HDFS, Yarn, Flink, and Kafka have been installed.
2. Create a topic.
a. Configure the user permission for creating topics on the server.
Change the value of allow.everyone.if.no.acl.found, the Broker configuration item of Kafka, to true, as shown in Figure 1-59. Then, restart the Kafka service to validate the configuration.
Figure 1-1 Setting the user permission for creating topics
![]()
b. Run Linux CLI to create a topic. Before running related commands, run the kinit command, for example, kinit flinkuser, for man-machine authentication.
![]()
flinkuserneeds to be created by the user and has the permission to create Kafka topics. For details, see section "Preparing the Developer Account" in the FusionInsight HD Product Documentation (Security Mode).
The command for creating a topic is as follows:
bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}
Table 1-1Parameters used in the command
Parameter | Description |
{zkQuorum} | ZooKeeper cluster information in the IP address:port number format |
{partitionNum} | Number of partitions for a topic |
{replicationNum} | Number of data copies of each partition in a topic |
{Topic} | Topic name |
For example, run the following command on the Kafka client. Assume that the values of IP address:port numberfor the ZooKeeper cluster are 10.96.101.32:24002, 10.96.101.251:24002,10.96.101.177:24002, and 10.91.8.160:24002, and the topic name istopic1.
bin/kafka-topics.sh --create --zookeeper 10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002/kafka --partitions 5 --replication-factor 1 --topic topic1
3. Perform security authentication.
The Kerberos authentication, SSL encryption authentication, or Kerberos+SSL authentication can be used.
l Kerberos authentication configuration
1. Configuration on the client
In the flink-conf.yamlconfiguration file, add configurations related to Kerberos authentication (that is, add KafkaClient to the contexts item) as follows:
security.kerberos.login.keytab: /home/demo/flink/release/flink-1.2.1/keytab/admin.keytab security.kerberos.login.principal: admin security.kerberos.login.contexts: Client,KafkaClient security.kerberos.login.use-ticket-cache: false
2. Running parameters
Running parameters about the SASL_PLAINTEXT protocol are as follows:
--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka
//10.96.101.32:21007 indicates the IP address:port number of the Kafka server.
l SSL encryption configuration
1. Configuration on the server
Set ssl.mode.enable to true, as shown in Figure 1-60.
Figure 1-2 Configuration on the server
![]()
2. Configuration on the client
l Log in to FusionInsight Manager, choose Services> Service > Kafka Status, and click Download Clienton the displayed Service Status page to download the Kafka client.
Figure 1-3 Configuration on the client
![]()
l Use the ca.crt certificate in the root directory of the client to create a truststore file for the client.
Run the following command:
keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks
The command execution result is similar to the following:
![]()
l Running parameters
Run the following command to run related parameters. (Ensure that the value of the ssl.truststore.passwordparameter must be the same as the password entered when you create the truststorefile.)
--topic topic1 --bootstrap.servers 10.96.101.32:21008 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_VXXXRXXXCXXUXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password huawei
//10.96.101.32:21008 indicates the IP address:port number of the Kafka server, and VXXXRXXXCXXUXXindicates the version number of FusionInsight.
l Configuration in Kerberos+SSL mode
After completing preceding configurations on clients and servers of Kerberos and SSL, modify the port numbers and protocol types in running parameters to start the Kerberos+SSL mode.
--topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/zgd/software/FusionInsight_VXXXRXXXCXXUXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password huawei
//10.96.101.32:21009 indicates the IP address:port number of the Kafka server, and VXXXRXXXCXXUXXindicates the version number of FusionInsight.
1.1.1.2 Development Idea
1. Start Flink Kafka producer to send data to Kafka.
2. Start Flink Kafka consumer to receive data from Kafka. Ensure that topics of Kafka consumer are consistent with those of Kafka producer.
3. Add the prefix to the data content and print the data.
1.1.1.3 Example Code Description
JAVA Example Code
l Function description
In the Flink application, this code invokes the flink-connector-kafka module's API to produce and consume data.
l Example code
If the user needs to use FusionInsight Kafka in security mode before the development, obtain the kafka-client-0.11.x.x.jarfile from the FusionInsight client directory.
The following uses the logic code of Kafka consumer and Kafka producer as an example.
For details about the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.
//Kafka producer code
public class WriteIntoKafka {
public static void main(String[] args) throws
Exception {
//Print the command reference for flink run.
System.out.println("use command as:
");
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers
10.91.8.218:21005");
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007
--security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name
kafka");
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008
--security.protocol SSL --ssl.truststore.location /home/truststore.jks
--ssl.truststore.password huawei");
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009
--security.protocol SASL_SSL --sasl.kerberos.service.name kafka
--ssl.truststore.location /home/truststore.jks --ssl.truststore.password
huawei");
System.out.println("******************************************************************************************");
System.out.println("<topic> is
the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of
brokers");
System.out.println("******************************************************************************************");
//Construct the execution environment.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency.
env.setParallelism(1);
//Resolve the running parameters.
ParameterTool paraTool =
ParameterTool.fromArgs(args);
//Construct a workflow and write the data
created from self-defined sources to Kafka.
DataStream<String> messageStream =
env.addSource(new SimpleStringGenerator());
messageStream.addSink(new
FlinkKafkaProducer010<>(paraTool.get("topic"),
new SimpleStringSchema(),
paraTool.getProperties()));
//Invoke execute to
trigger the execution.
env.execute();
}
//Customize the sources and create a message every second.
public static class SimpleStringGenerator implements
SourceFunction<String> {
private static final long serialVersionUID
= 2174904787118597072L;
boolean running = true;
long i = 0;
@Override
public void run(SourceContext<String>
ctx) throws Exception {
while (running) {
ctx.collect("element-" + (i++));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
//Kafka consumer code
public class ReadFromKafka {
public static void main(String[] args) throws
Exception {
//Print the command reference for flink run.
System.out.println("use command as:
");
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.ReadFromKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005");
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.ReadFromKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007
--security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name
kafka");
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.ReadFromKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008
--security.protocol SSL --ssl.truststore.location /home/truststore.jks
--ssl.truststore.password huawei");
System.out.println("./bin/flink run --class
com.huawei.bigdata.flink.examples.ReadFromKafka" +
"
/opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009
--security.protocol SASL_SSL --sasl.kerberos.service.name kafka
--ssl.truststore.location /home/truststore.jks --ssl.truststore.password
huawei");
System.out.println
("******************************************************************************************");
System.out.println("<topic> is
the kafka topic name");
System.out.println("<bootstrap.servers> is the ip:port list of
brokers");
System.out.println
("******************************************************************************************");
//Construct the execution
environment.
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency.
env.setParallelism(1);
//Resolve the running parameters.
ParameterTool paraTool =
ParameterTool.fromArgs(args);
//Construct a workflow, read data from
Kafka, and print the data in a new line.
DataStream<String> messageStream =
env.addSource(new
FlinkKafkaConsumer010<>(paraTool.get("topic"),
new
SimpleStringSchema(),
paraTool.getProperties()));
messageStream.rebalance().map(new
MapFunction<String, String>() {
@Override
public String map(String s)
throws Exception {
return "Flink
says " + s + System.getProperty("line.separator");
}
}).print();
//Invoke execute to trigger the
execution.
env.execute();
}
}
Scala Example Code
l Function description
In the Flink application, this code invokes the flink-connector-kafka module's API to produce and consume data.
l Example code
If the user needs to use FusionInsight Kafka in security mode before the development, obtain the kafka-client-0.11.x.x.jarfile from the FusionInsight client directory.
The following uses the logic code of Kafka consumer and Kafka producer as an example.
For details about the complete code, seecom.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.
//Kafka producer code
object WriteIntoKafka {
def main(args: Array[String]) {
//Print the command reference for flink run.
System.out.println("use command as:
")
System.out.println("./bin/flink run
--class com.huawei.bigdata.flink.examples.WriteIntoKafka" +
" /opt/test.jar --topic
topic-test -bootstrap.servers 10.91.8.218:21005")
System.out.println
("******************************************************************************************")
System.out.println("<topic> is
the kafka topic name")
System.out.println("<bootstrap.servers> is the ip:port list of
brokers")
System.out.println
("******************************************************************************************")
//Construct the execution
environment.
val env =
StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency.
env.setParallelism(1)
//Resolve the running parameters.
val paraTool =
ParameterTool.fromArgs(args)
//Construct a workflow and write the data
created from self-defined sources to Kafka.
val messageStream: DataStream[String] =
env.addSource(new SimpleStringGenerator)
messageStream.addSink(new
FlinkKafkaProducer010(
paraTool.get("topic"), new SimpleStringSchema,
paraTool.getProperties))
//Invoke execute to trigger the
execution.
env.execute
}
}
//Customize the sources and create a message every second.
class SimpleStringGenerator extends SourceFunction[String] {
var running = true
var i = 0
override def run(ctx: SourceContext[String])
{
while (running) {
ctx.collect("element-" + i)
i += 1
Thread.sleep(1000)
}
}
override def cancel() {
running = false
}
}
//Kafka consumer code
object ReadFromKafka {
def main(args: Array[String]) {
//Print the command reference for flink run.
System.out.println("use command as:
")
System.out.println("./bin/flink run --class
com.huawei.bigdata.flink.examples.ReadFromKafka" +
" /opt/test.jar --topic
topic-test -bootstrap.servers 10.91.8.218:21005")
System.out.println
("******************************************************************************************")
System.out.println("<topic> is
the kafka topic name")
System.out.println("<bootstrap.servers> is the ip:port list of
brokers")
System.out.println
("******************************************************************************************")
//Construct the execution environment.
val env =
StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency.
env.setParallelism(1)
//Resolve the running parameters.
val paraTool =
ParameterTool.fromArgs(args)
//Construct a workflow, read data from Kafka,
and print the data in a new line.
val messageStream = env.addSource(new
FlinkKafkaConsumer010(
paraTool.get("topic"), new SimpleStringSchema,
paraTool.getProperties))
messageStream
.map(s => "Flink says
" + s + System.getProperty("line.separator")).print()
//Invoke execute to trigger the
execution.
env.execute()
}
}
1.1.1.4 Obtaining Example Code
Using the FusionInsight Client
Decompress the FusionInsight client installation package and obtain the examples file, which is saved in theFlink sub-folder of the FusionInsight_Services_ClientConfigfolder.
l In security mode, obtain FlinkKafkaJavaExampleand FlinkKafkaScalaExample from the flink-examples-securitydirectory.
l In non-security mode, obtain FlinkKafkaJavaExampleand FlinkKafkaScalaExample from the flink-examples-normaldirectory.
Using the Maven Project
Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home
l Security mode
− components/flink/flink-examples-maven-security/FlinkKafkaJavaExample
− components/flink/flink-examples-maven-security/FlinkKafkaScalaExample
l Non-security mode
− components/spark2x/flink-examples-maven/FlinkKafkaJavaExample
− components/spark2x/flink-examples-maven/FlinkKafkaScalaExample
1.1.1.5 Debugging the Application
1.1.1.5.1 Compiling and Running the Application
1.1.1.5.2 Viewing the Debugging Result
Scenarios
After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.
Procedure
l View the running result of the Flink application.
If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.
If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.
− View checkpoint results and files
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
Either the following methods can be used to view the checkpoint file:
l If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/command to view checkpoint files.
l If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.
− View pipeline results
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
− View the JOIN result of configuration table and steams
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.
− Viewing the Stream SQL Join result
The result is saved in the taskmanager.outfile of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.
l Use Apache Flink Dashboard to view the running status of the Flink application.
The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.
On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.
View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdouttag log information.
l View Flink logs.
Both of the following methods can be used to obtain Flink logs:
− Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.
− Log in to the YARN web UI to view logs about JobManager and GC.
On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.
