Got it

Producing and Consuming Data in Kafka (Flink -Case 2)

Latest reply: Jan 11, 2020 00:54:40 4036 2 1 0 0

This post refers to the process of producing and consuming data in Kafka. Please see more details below.


1.1.1 Case 2: Producing and Consuming Data in Kafka

1.1.1.1 Scenarios

Applicable Versions

FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80

Scenario Description

Assume that a Flink service receives one message per second.

Develop a Flink application that can create messages with specific prefixes in real time based on certain service requirements.

Data Planning

Data of the Flink example project is saved in the Kafka component. A user with Kafka permissions can send data to and receive data from Kafka.

1.        Ensure that the clusters, including HDFS, Yarn, Flink, and Kafka have been installed.

2.        Create a topic.

a.        Configure the user permission for creating topics on the server.

Change the value of allow.everyone.if.no.acl.found, the Broker configuration item of Kafka, to true, as shown in Figure 1-59. Then, restart the Kafka service to validate the configuration.

Figure 1-1 Setting the user permission for creating topics

20180803093851410001.png

 

b.        Run Linux CLI to create a topic. Before running related commands, run the kinit command, for example, kinit flinkuser, for man-machine authentication.

note

flinkuserneeds to be created by the user and has the permission to create Kafka topics. For details, see section "Preparing the Developer Account" in the FusionInsight HD Product Documentation (Security Mode).

The command for creating a topic is as follows:

bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka  --partitions {partitionNum} --replication-factor {replicationNum} --topic {Topic}

Table 1-1Parameters used in the command

Parameter

Description

{zkQuorum}

ZooKeeper cluster information in the IP address:port number format

{partitionNum}

Number of partitions for a topic

{replicationNum}

Number of data copies of each partition in a topic

{Topic}

Topic name

 

For example, run the following command on the Kafka client. Assume that the values of IP address:port numberfor the ZooKeeper cluster are 10.96.101.32:24002, 10.96.101.251:24002,10.96.101.177:24002, and 10.91.8.160:24002, and the topic name istopic1.

bin/kafka-topics.sh --create --zookeeper 10.96.101.32:24002,10.96.101.251:24002,10.96.101.177:24002,10.91.8.160:24002/kafka --partitions 5 --replication-factor 1 --topic topic1

3.        Perform security authentication.

The Kerberos authentication, SSL encryption authentication, or Kerberos+SSL authentication can be used.

l  Kerberos authentication configuration

1.        Configuration on the client

In the flink-conf.yamlconfiguration file, add configurations related to Kerberos authentication (that is, add KafkaClient to the contexts item) as follows:

security.kerberos.login.keytab: /home/demo/flink/release/flink-1.2.1/keytab/admin.keytab  security.kerberos.login.principal: admin  security.kerberos.login.contexts: Client,KafkaClient  security.kerberos.login.use-ticket-cache: false

2.        Running parameters

Running parameters about the SASL_PLAINTEXT protocol are as follows:

--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT  --sasl.kerberos.service.name kafka

//10.96.101.32:21007 indicates the IP address:port number of the Kafka server.

l  SSL encryption configuration

1.        Configuration on the server

Set ssl.mode.enable to true, as shown in Figure 1-60.

Figure 1-2 Configuration on the server

20180803093852970003.png

 

2.        Configuration on the client

l  Log in to FusionInsight Manager, choose Services> Service > Kafka Status, and click Download Clienton the displayed Service Status page to download the Kafka client.

Figure 1-3 Configuration on the client

20180803093853460004.png

 

l  Use the ca.crt certificate in the root directory of the client to create a truststore file for the client.

Run the following command:

keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks

The command execution result is similar to the following:

20180803093854013005.png

l  Running parameters

Run the following command to run related parameters. (Ensure that the value of the ssl.truststore.passwordparameter must be the same as the password entered when you create the truststorefile.)

--topic topic1 --bootstrap.servers 10.96.101.32:21008 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_VXXXRXXXCXXUXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password huawei

//10.96.101.32:21008 indicates the IP address:port number of the Kafka server, and VXXXRXXXCXXUXXindicates the version number of FusionInsight.

l  Configuration in Kerberos+SSL mode

After completing preceding configurations on clients and servers of Kerberos and SSL, modify the port numbers and protocol types in running parameters to start the Kerberos+SSL mode.

--topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL  --sasl.kerberos.service.name kafka --ssl.truststore.location /home/zgd/software/FusionInsight_VXXXRXXXCXXUXX_Kafka_ClientConfig/truststore.jks --ssl.truststore.password huawei

//10.96.101.32:21009 indicates the IP address:port number of the Kafka server, and VXXXRXXXCXXUXXindicates the version number of FusionInsight.

1.1.1.2 Development Idea

1.        Start Flink Kafka producer to send data to Kafka.

2.        Start Flink Kafka consumer to receive data from Kafka. Ensure that topics of Kafka consumer are consistent with those of Kafka producer.

3.        Add the prefix to the data content and print the data.

1.1.1.3 Example Code Description

JAVA Example Code

l  Function description

In the Flink application, this code invokes the flink-connector-kafka module's API to produce and consume data.

l  Example code

If the user needs to use FusionInsight Kafka in security mode before the development, obtain the kafka-client-0.11.x.x.jarfile from the FusionInsight client directory.

The following uses the logic code of Kafka consumer and Kafka producer as an example.

For details about the complete code, see com.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.

//Kafka producer code 
public class WriteIntoKafka {  
  
     public static void main(String[] args) throws Exception {  
     //Print the command reference for flink run.  
  
       System.out.println("use command as: "); 
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005"); 
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"); 
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei"); 
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei"); 
  
       System.out.println("******************************************************************************************"); 
  
       System.out.println("<topic> is the kafka topic name"); 
  
       System.out.println("<bootstrap.servers> is the ip:port list of brokers"); 
  
       System.out.println("******************************************************************************************"); 
        
       //Construct the execution environment.
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
       //Set the concurrency.  
       env.setParallelism(1); 
       //Resolve the running parameters. 
       ParameterTool paraTool = ParameterTool.fromArgs(args); 
       //Construct a workflow and write the data created from self-defined sources to Kafka. 
       DataStream<String> messageStream = env.addSource(new SimpleStringGenerator()); 
  
       messageStream.addSink(new FlinkKafkaProducer010<>(paraTool.get("topic"),  
  
           new SimpleStringSchema(),  
  
           paraTool.getProperties())); 
         //Invoke execute to trigger the execution. 
       env.execute(); 
  
     }  
  
  
    //Customize the sources and create a message every second.
     public static class SimpleStringGenerator implements SourceFunction<String> {  
  
       private static final long serialVersionUID = 2174904787118597072L; 
  
       boolean running = true; 
  
       long i = 0; 
  
  
  
       @Override  
  
       public void run(SourceContext<String> ctx) throws Exception {  
  
         while (running) {  
  
           ctx.collect("element-" + (i++)); 
  
           Thread.sleep(1000); 
  
         }  
  
       }  
  
  
  
       @Override  
  
       public void cancel() {  
  
         running = false; 
  
       }  
  
     }  
  
 } 
  
//Kafka consumer code 
public class ReadFromKafka {  
  
     public static void main(String[] args) throws Exception {  
     //Print the command reference for flink run.  
       System.out.println("use command as: "); 
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005"); 
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka"); 
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21008 --security.protocol SSL --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei"); 
  
     System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +  
  
           " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/truststore.jks --ssl.truststore.password huawei"); 
  
       System.out.println  
("******************************************************************************************"); 
  
       System.out.println("<topic> is the kafka topic name"); 
  
       System.out.println("<bootstrap.servers> is the ip:port list of brokers"); 
  
       System.out.println  
("******************************************************************************************"); 
       //Construct the execution environment. 
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
       //Set the concurrency.  
       env.setParallelism(1);
       //Resolve the running parameters. 
       ParameterTool paraTool = ParameterTool.fromArgs(args);
       //Construct a workflow, read data from Kafka, and print the data in a new line. 
       DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),  
  
           new SimpleStringSchema(),  
  
           paraTool.getProperties())); 
  
       messageStream.rebalance().map(new MapFunction<String, String>() {  
  
         @Override  
  
         public String map(String s) throws Exception {  
  
           return "Flink says " + s + System.getProperty("line.separator"); 
  
         }  
  
       }).print(); 
       //Invoke execute to trigger the execution.  
       env.execute(); 
  
     }  
  
 }     

Scala Example Code

l  Function description

In the Flink application, this code invokes the flink-connector-kafka module's API to produce and consume data.

l  Example code

If the user needs to use FusionInsight Kafka in security mode before the development, obtain the kafka-client-0.11.x.x.jarfile from the FusionInsight client directory.

The following uses the logic code of Kafka consumer and Kafka producer as an example.

For details about the complete code, seecom.huawei.bigdata.flink.examples.WriteIntoKafka and com.huawei.bigdata.flink.examples.ReadFromKafka.

//Kafka producer code  
object WriteIntoKafka {  
  
     def main(args: Array[String]) {  
     //Print the command reference for flink run.   
       System.out.println("use command as: ")  
  
       System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.WriteIntoKafka" +  
  
         " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005")  
  
       System.out.println  
("******************************************************************************************")  
  
       System.out.println("<topic> is the kafka topic name")  
  
       System.out.println("<bootstrap.servers> is the ip:port list of brokers")  
  
       System.out.println  
("******************************************************************************************")  
       //Construct the execution environment. 
       val env = StreamExecutionEnvironment.getExecutionEnvironment  
       //Set the concurrency.  
       env.setParallelism(1)  
       //Resolve the running parameters.
       val paraTool = ParameterTool.fromArgs(args)  
       //Construct a workflow and write the data created from self-defined sources to Kafka. 
       val messageStream: DataStream[String] = env.addSource(new SimpleStringGenerator)  
  
       messageStream.addSink(new FlinkKafkaProducer010(  
  
         paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties)) 
       //Invoke execute to trigger the execution.   
       env.execute  
  
     }  
  
 }  
  
  
//Customize the sources and create a message every second. 
 class SimpleStringGenerator extends SourceFunction[String] {  
  
     var running = true  
  
     var i = 0  
  
  
  
     override def run(ctx: SourceContext[String]) {  
  
       while (running) {  
  
         ctx.collect("element-" + i)  
  
         i += 1  
  
         Thread.sleep(1000)  
  
       }  
  
     }  
  
  
  
     override def cancel() {  
  
       running = false  
  
     }  
  
 } 
  
//Kafka consumer code  
object ReadFromKafka {  
  
     def main(args: Array[String]) {  
     //Print the command reference for flink run.  
       System.out.println("use command as: ")
  
     System.out.println("./bin/flink run --class com.huawei.bigdata.flink.examples.ReadFromKafka" +  
  
         " /opt/test.jar --topic topic-test -bootstrap.servers 10.91.8.218:21005")  
  
       System.out.println  
("******************************************************************************************")  
  
       System.out.println("<topic> is the kafka topic name")  
  
       System.out.println("<bootstrap.servers> is the ip:port list of brokers")   
  
       System.out.println  
("******************************************************************************************")  
  
  
      //Construct the execution environment.
       val env = StreamExecutionEnvironment.getExecutionEnvironment   
      //Set the concurrency. 
       env.setParallelism(1)  
      //Resolve the running parameters.
       val paraTool = ParameterTool.fromArgs(args)  
      //Construct a workflow, read data from Kafka, and print the data in a new line.
       val messageStream = env.addSource(new FlinkKafkaConsumer010(  
  
         paraTool.get("topic"), new SimpleStringSchema, paraTool.getProperties))  
  
       messageStream  
  
         .map(s => "Flink says " + s + System.getProperty("line.separator")).print() 
       //Invoke execute to trigger the execution.  
       env.execute()  
  
     }  
  
 }

1.1.1.4 Obtaining Example Code

Using the FusionInsight Client

Decompress the FusionInsight client installation package and obtain the examples file, which is saved in theFlink sub-folder of the FusionInsight_Services_ClientConfigfolder.

l  In security mode, obtain FlinkKafkaJavaExampleand FlinkKafkaScalaExample from the flink-examples-securitydirectory.

l  In non-security mode, obtain FlinkKafkaJavaExampleand FlinkKafkaScalaExample from the flink-examples-normaldirectory.

Using the Maven Project

Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

l  Security mode

          components/flink/flink-examples-maven-security/FlinkKafkaJavaExample

          components/flink/flink-examples-maven-security/FlinkKafkaScalaExample

l  Non-security mode

          components/spark2x/flink-examples-maven/FlinkKafkaJavaExample

          components/spark2x/flink-examples-maven/FlinkKafkaScalaExample

1.1.1.5 Debugging the Application

1.1.1.5.1 Compiling and Running the Application
1.1.1.5.2 Viewing the Debugging Result

Scenarios

After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.

Procedure

l  View the running result of the Flink application.

If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.

If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.

          View checkpoint results and files

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

Either the following methods can be used to view the checkpoint file:

l  If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/command to view checkpoint files.

l  If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.

          View pipeline results

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

          View the JOIN result of configuration table and steams

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.

          Viewing the Stream SQL Join result

The result is saved in the taskmanager.outfile of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.

l  Use Apache Flink Dashboard to view the running status of the Flink application.

The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.

On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.

View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdouttag log information.

l  View Flink logs.

Both of the following methods can be used to obtain Flink logs:

          Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.

          Log in to the YARN web UI to view logs about JobManager and GC.

On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.

 

This article contains more resources

You need to log in to download or view. No account? Register

x

welcome
View more
  • x
  • convention:

helpful thanks.
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.