1 Case 4: Job Pipeline Application
1.1 Scenario Description
Applicable Versions
FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80
Scenario Description
In this example, the publisher job creates 10,000 pieces of data per second. The data is sent by the NettySink operator from the publisher job to downstream jobs. The other two jobs serve as the subscribers and subscribe to one piece of data separately.
Data Planning
1. The publisher job uses customized operators to create about 10,000 pieces of data per second.
2. The data contains two attributes, which are of integer and string types.
3. Configuration files
− nettyconnector.registerserver.topic.storage: used to configure the mandatory path (on a third-party server) to the information about IP addresses, port numbers, and concurrency of NettySink
nettyconnector.registerserver.topic.storage: /flink/nettyconnector
− nettyconnector.sinkserver.port.range: used to set the range of port numbers of NettySink, which is mandatory
nettyconnector.sinkserver.port.range: 28444-28943
− nettyconnector.ssl.enabled: used to determine whether to enable the SSL encryption between NettySink and NettySource. The default value is false.
nettyconnector.ssl.enabled: true
− nettyconnector.sinkserver.subnet: used to configure the network domain
nettyconnector.sinkserver.subnet: 10.162.0.0/16
4. Security authentication
− The SASL authentication of ZooKeeper depends on the HA configurations in the flink-conf.yaml file. For details, see Service Operation Guide > Flink Configuration Management in the FusionInsight HD Product Documentation.
− SSL configurations, such as the keystore, truststore, keystore password, truststore password, and password inherit from the flink-conf.yaml file. For details, see Service Operation Guide > Encrypted Transmission in the FusionInsight HD Product Documentation.
5. API description
− RegisterServerHandler
RegisterServerHandler saves the information, such as IP addresses, port numbers, and concurrency of NettySink for connecting with NettySource. It provides the following interfaces for users:
public interface
RegisterServerHandler {
/**
*Start the registration server.
*@param configuration//Configuration type of Flink
*/
void start(Configuration configuration) throws Exception;
/**
*Create a topic node (directory) on
the registration server.
*@param topic//Name of the topic
node
*/
void createTopicNode(String topic) throw Exception;
/**
*Register the information to a topic node (directory)
*@param topic //Directory to be registered with
@param register//Information to be registered with
*/
void register(String topic, RegisterRecord registerRecord) throws
Exception;
/**
*Delete the topic node.
*@param topic //Topic to be deleted
*/
void deleteTopicNode(String topic) throws
Exception;
/**
*Deregister the registration information.
*@param topic //Topic where the registration information locates
*@param recordId//ID of the registration information to be deregistered
*/
void unregister(String topic, int recordId) throws Exception;
/**
*Query information.
*@param//Topic where the query information locates
*@recordId //ID of the query information
*/
RegisterRecord query(String topic, int recordId) throws Exception;
/**
*Query whether a topic exists.
* @param topic
*/
Boolean isExist(String topic) throws Exception;
/**
*Disable RegisterServerHandler.
*/
void shutdown() throws Exception;
In addition to the preceding interfaces, Flink provides ZookeeperRegisterHandler.
− NettySink operator
Class NettySink(String name,
String topic,
RegisterServerHandler registerServerHandler,
int numberOfSubscribedJobs)
l Name: name of the NettySink
l topic: topic that creates data for the NettySink. The topic of each NettySink (excluding those used for concurrency) must be unique. Otherwise, the subscription may be disordered and data transmission may be abnormal.
l registerServerHandler: handle of the registration server
l numberOfSubscribedJobs: specific number of jobs that subscribe the NettySink. The NettySink sends data only when all subscribers are connected to it.
− NettySource operator
Class NettySource(String
name,
String topic,
RegisterServerHandler registerServerHandler)
l name: name of the NettySource. The NettySource (excluding those used for concurrency) must be unique. Otherwise, connection with the NettySink may be conflicted.
l topic: topic of the subscribed NettySink
l registerServerHandler: handle of the registration server
![]()
The concurrency of NettySource must be the same as that of NettySink. Otherwise, the connection cannot be created.
1.2 Development Idea
1. There are three jobs. One serves as a publisher, and the other two serve as subscribers.
2. The publisher converts created data into byte[] and sends them the subscribers.
3. After receiving the byte[], the subscribers transforms data into the string type and print sampled data.
1.3 Example Code Description
JAVA Example Code
The following uses the logic code as an example.
For details about the complete code, see:
l com.huawei.bigdata.flink.examples.UserSource
l com.huawei.bigdata.flink.examples.TestPipeline_NettySink
l com.huawei.bigdata.flink.examples.TestPipeline_NettySource1
l com.huawei.bigdata.flink.examples.TestPipeline_NettySource2
1. The publisher customizes source operators to create data.
package
com.huawei.bigdata.flink.examples;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.io.Serializable;
public class UserSource extends
RichParallelSourceFunction<Tuple2<Integer, String>> implements
Serializable {
private boolean isRunning = true;
public void open(Configuration configuration) throws
Exception {
super.open(configuration);
}
/**
*Data generation function, which creates 10,000 pieces of data per second
*/
public void run(SourceContext<Tuple2<Integer,
String>> ctx) throws Exception {
while(isRunning) {
for (int i =
0; i < 10000; i++) {
ctx.collect(Tuple2.of(i, "hello-" + i));
}
Thread.sleep(1000);
}
}
public void close() {
isRunning = false;
}
public void cancel() {
isRunning = false;
}
}
2. Code for the publisher
package
com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.sink.NettySink;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySink {
public static void main(String[] args) throws
Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency of the publisher to 2.
env.setBufferTimeout(2);
//Create a ZookeeperRegisterServerHandler.
ZookeeperRegisterServerHandler
zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//Add a customized source operator.
env.addSource(new
UserSource())
.keyBy(0)
.map(new MapFunction<Tuple2<Integer,String>, byte[]>()
{
//Convert the sending information into a byte array.
@Override
public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws
Exception {
return integerStringTuple2.f1.getBytes();
}
}).addSink(new NettySink("NettySink-1", "TOPIC-2",
zkRegisterServerHandler, 2));//Use NettySink to transmits the data.
env.execute();
}
}
3. Code for the first subscriber
package
com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource1 {
public static void main(String[] args) throws
Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency of the publisher to 2.
env.setParallelism(2);
//Create a ZookeeperRegisterServerHandler.
ZookeeperRegisterServerHandler
zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//Add a NettySource operator to receive messages from the publisher.
env.addSource(new
NettySource("NettySource-1", "TOPIC-2",
zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
//Convert the received byte streams into character strings.
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
}
4. Code for the second subscriber
package com.huawei.bigdata.flink.examples;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.netty.source.NettySource;
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;
public class TestPipeline_NettySource2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency of the publisher to 2.
env.setParallelism(2);
//Create a ZookeeperRegisterServerHandler.
ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();
//Add a NettySource operator to receive data from the publisher.
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(new MapFunction<byte[], String>() {
//Convert the received byte arrays into character strings.
@Override
public String map(byte[] b) {
return new String(b);
}
}).print();
env.execute();
}
}
Scala Example Code
The following uses the logic code as an example.
For details about the complete code, see:
l com.huawei.bigdata.flink.examples.UserSource
l com.huawei.bigdata.flink.examples.TestPipeline_NettySink
l com.huawei.bigdata.flink.examples.TestPipeline_NettySource1
l com.huawei.bigdata.flink.examples.TestPipeline_NettySource2
1. Code for sending messages:
package
com.huawei.bigdata.flink.examples
case class Inforamtion(index: Int, content: String) {
def this() = this(0, "")
}
2. The publisher customizes source operators to create data.
package
com.huawei.bigdata.flink.examples
import org.apache.flink.configuration.Configuration
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
class UserSource extends RichParallelSourceFunction[Inforamtion] with
Serializable{
var isRunning = true
override def open(parameters: Configuration): Unit = {
super.open(parameters)
}
//Create 10,000 pieces of data per second.
override def run(sourceContext: SourceContext[Inforamtion]) =
{
while (isRunning) {
for (i <- 0 until 10000) {
sourceContext.collect(Inforamtion(i,
"hello-" + i));
}
Thread.sleep(1000)
}
}
override def close(): Unit = super.close()
override def cancel() = {
isRunning = false
}
}
3. Code for the publisher
package
com.huawei.bigdata.flink.examples
import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.sink.NettySink
import
org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
object TestPipeline_NettySink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency of the publisher to 2.
env.setParallelism(2)
//Set the Zookeeper to the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a user-defined operator to create data.
env.addSource(new UserSource)
.keyBy(0).map(x=>x.content.getBytes)//Convert
the to-be-sent data into a byte array.
.addSink(new NettySink("NettySink-1",
"TOPIC-2", zkRegisterServerHandler,
2) //Add the NettySink operator to send data.
env.execute()
}
}
4. Code for the first subscriber
package
com.huawei.bigdata.flink.examples
import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import
org.apache.flink.streaming.connectors.netty.source.NettySource
import
org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency of the publisher to 2.
env.setParallelism(2)
//Set the Zookeeper to the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a NettySource operator to receive data from the publisher.
env.addSource(new NettySource("NettySource-1", "TOPIC-2",
zkRegisterServerHandler))
.map(x => (1, new String(x)))//Convert the
received byte stream into character strings.
.filter(x => {
Random.nextInt(50000) == 10
})
.print
env.execute()
}
}
5. Code for the second subscriber
package com.huawei.bigdata.flink.examples
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.netty.source.NettySource
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler
import org.apache.flink.streaming.api.scala._
import scala.util.Random
object TestPipeline_NettySource2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency of the publisher to 2.
env.setParallelism(2)
//Create a Zookeeper as the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a NettySource operator to receive data.
env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))
.map(x=>(2, new String(x)))//Convert the received byte array into character strings.
.filter(x=>{
Random.nextInt(50000) == 10
})
.print()
env.execute()
}
}
1.4 Obtaining Example Code
Using the FusionInsight Client
Decompress the FusionInsight client installation package and obtain the examples file, which is saved in the Flink sub-folder of the FusionInsight_Services_ClientConfig folder.
l In security mode, obtain FlinkPipelineJavaExample and FlinkPipelineScalaExample from the flink-examples-security directory.
l In non-security mode, obtain FlinkPipelineJavaExample and FlinkPipelineScalaExample from the flink-examples-normal directory.
Using the Maven Project
Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home
l Security mode
− components/flink/flink-examples-maven-security/FlinkPipelineJavaExample
− components/flink/flink-examples-maven-security/FlinkPipelineScalaExample
l Non-security mode
− components/spark2x/flink-examples-maven/FlinkPipelineJavaExample
− components/spark2x/flink-examples-maven/FlinkPipelineScalaExample
1.5 Debugging the Application
1.5.1 Compiling and Running the Application
1.5.2 Viewing the Debugging Result
Scenarios
After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.
Procedure
l View the running result of the Flink application.
If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.
If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.
− View checkpoint results and files
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
Either the following methods can be used to view the checkpoint file:
l If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/ command to view checkpoint files.
l If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.
− View pipeline results
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
− View the JOIN result of configuration table and steams
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.
− Viewing the Stream SQL Join result
The result is saved in the taskmanager.out file of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.
l Use Apache Flink Dashboard to view the running status of the Flink application.
The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.
On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.
View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdout tag log information.
l View Flink logs.
Both of the following methods can be used to obtain Flink logs:
− Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.
− Log in to the YARN web UI to view logs about JobManager and GC.
On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.
