Got it

Flink:Case 4: Job Pipeline Application

Latest reply: Aug 24, 2018 01:33:24 770 1 0 0 0

1 Case 4: Job Pipeline Application

1.1 Scenario Description

Applicable Versions

FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80

Scenario Description

In this example, the publisher job creates 10,000 pieces of data per second. The data is sent by the NettySink operator from the publisher job to downstream jobs. The other two jobs serve as the subscribers and subscribe to one piece of data separately.

Data Planning

1.         The publisher job uses customized operators to create about 10,000 pieces of data per second.

2.         The data contains two attributes, which are of integer and string types.

3.         Configuration files

           nettyconnector.registerserver.topic.storage: used to configure the mandatory path (on a third-party server) to the information about IP addresses, port numbers, and concurrency of NettySink

nettyconnector.registerserver.topic.storage: /flink/nettyconnector

           nettyconnector.sinkserver.port.range: used to set the range of port numbers of NettySink, which is mandatory

nettyconnector.sinkserver.port.range: 28444-28943

           nettyconnector.ssl.enabled: used to determine whether to enable the SSL encryption between NettySink and NettySource. The default value is false.

nettyconnector.ssl.enabled: true

           nettyconnector.sinkserver.subnet: used to configure the network domain

nettyconnector.sinkserver.subnet: 10.162.0.0/16

4.         Security authentication

           The SASL authentication of ZooKeeper depends on the HA configurations in the flink-conf.yaml file. For details, see Service Operation Guide > Flink Configuration Management in the FusionInsight HD Product Documentation.

           SSL configurations, such as the keystore, truststore, keystore password, truststore password, and password inherit from the flink-conf.yaml file. For details, see Service Operation Guide > Encrypted Transmission in the FusionInsight HD Product Documentation.

5.         API description

           RegisterServerHandler

RegisterServerHandler saves the information, such as IP addresses, port numbers, and concurrency of NettySink for connecting with NettySource. It provides the following interfaces for users:

public interface RegisterServerHandler {  
 
/**  
 *Start the registration server.
*@param configuration//Configuration type of Flink
 */  
void start(Configuration configuration) throws Exception;  
/**  
        *Create a topic node (directory) on the registration server. 
        *@param topic//Name of the topic node
        */  
void createTopicNode(String topic) throw Exception;  
/**  
*Register the information to a topic node (directory)
*@param topic //Directory to be registered with
@param register//Information to be registered with
*/  
void register(String topic, RegisterRecord registerRecord) throws Exception;  
/**  
       *Delete the topic node.
       *@param topic //Topic to be deleted
       */  
    void deleteTopicNode(String topic) throws Exception;  
/**  
 *Deregister the registration information.
 *@param topic //Topic where the registration information locates
 *@param recordId//ID of the registration information to be deregistered
 */  
void unregister(String topic, int recordId) throws Exception;  
/**  
*Query information.
*@param//Topic where the query information locates
*@recordId //ID of the query information
*/  
RegisterRecord query(String topic, int recordId) throws Exception;  
/**  
*Query whether a topic exists.  
    * @param topic  
    */  
Boolean isExist(String topic) throws Exception;  
/**  
*Disable RegisterServerHandler.
 */  
void shutdown() throws Exception;

In addition to the preceding interfaces, Flink provides ZookeeperRegisterHandler.

           NettySink operator

Class NettySink(String name, 
String topic, 
RegisterServerHandler registerServerHandler,  
int numberOfSubscribedJobs)

l   Name: name of the NettySink

l   topic: topic that creates data for the NettySink. The topic of each NettySink (excluding those used for concurrency) must be unique. Otherwise, the subscription may be disordered and data transmission may be abnormal.

l   registerServerHandler: handle of the registration server

l   numberOfSubscribedJobs: specific number of jobs that subscribe the NettySink. The NettySink sends data only when all subscribers are connected to it.

           NettySource operator

Class  NettySource(String name,  
String topic,  
RegisterServerHandler registerServerHandler)

l   name: name of the NettySource. The NettySource (excluding those used for concurrency) must be unique. Otherwise, connection with the NettySink may be conflicted.

l   topic: topic of the subscribed NettySink

l   registerServerHandler: handle of the registration server

note

The concurrency of NettySource must be the same as that of NettySink. Otherwise, the connection cannot be created.

1.2 Development Idea

1.         There are three jobs. One serves as a publisher, and the other two serve as subscribers.

2.         The publisher converts created data into byte[] and sends them the subscribers.

3.         After receiving the byte[], the subscribers transforms data into the string type and print sampled data.

1.3 Example Code Description

JAVA Example Code

The following uses the logic code as an example.

For details about the complete code, see:

l   com.huawei.bigdata.flink.examples.UserSource

l   com.huawei.bigdata.flink.examples.TestPipeline_NettySink

l   com.huawei.bigdata.flink.examples.TestPipeline_NettySource1

l   com.huawei.bigdata.flink.examples.TestPipeline_NettySource2

1.         The publisher customizes source operators to create data.

package com.huawei.bigdata.flink.examples;  
 
import org.apache.flink.api.java.tuple.Tuple2;  
import org.apache.flink.configuration.Configuration;  
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;  
 
import java.io.Serializable;  
 
public class UserSource extends RichParallelSourceFunction<Tuple2<Integer, String>> implements Serializable {  
 
    private boolean isRunning = true;  
 
    public void open(Configuration configuration) throws Exception {  
        super.open(configuration);  
 
    }  
/**  
*Data generation function, which creates 10,000 pieces of data per second
 */  
    public void run(SourceContext<Tuple2<Integer, String>> ctx) throws Exception {  
 
        while(isRunning) {  
            for (int i = 0; i < 10000; i++) {  
                ctx.collect(Tuple2.of(i, "hello-" + i));  
            }  
            Thread.sleep(1000);  
        }  
    }  
 
    public void close() {  
        isRunning = false;  
    }  
 
    public void cancel() {  
        isRunning = false;  
    }  
}

2.         Code for the publisher

package com.huawei.bigdata.flink.examples;  
 
import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.api.java.tuple.Tuple2;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.netty.sink.NettySink;  
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;  
 
public class TestPipeline_NettySink {  
 
    public static void main(String[] args) throws Exception{  
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Set the concurrency of the publisher to 2.
        env.setBufferTimeout(2);  
 
//Create a ZookeeperRegisterServerHandler.
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();  
//Add a customized source operator. 
        env.addSource(new UserSource())  
                .keyBy(0)  
                .map(new MapFunction<Tuple2<Integer,String>, byte[]>() {  
                    //Convert the sending information into a byte array. 
@Override  
                    public byte[] map(Tuple2<Integer, String> integerStringTuple2) throws Exception {  
                        return integerStringTuple2.f1.getBytes();  
                    }  
                }).addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler, 2));//Use NettySink to transmits the data.
 
        env.execute();  
 
    }  
}

3.         Code for the first subscriber

package com.huawei.bigdata.flink.examples;  
 
import org.apache.flink.api.common.functions.MapFunction;  
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;  
import org.apache.flink.streaming.connectors.netty.source.NettySource;  
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;  
 
public class TestPipeline_NettySource1 {  
 
    public static void main(String[] args) throws Exception{  
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
//Set the concurrency of the publisher to 2.
env.setParallelism(2);  
 
//Create a ZookeeperRegisterServerHandler.
        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler(); 
//Add a NettySource operator to receive messages from the publisher.
        env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))  
              .map(new MapFunction<byte[], String>() { 
                  //Convert the received byte streams into character strings.
    @Override  
                    public String map(byte[] b) {  
                        return new String(b);  
                  }  
                }).print();  
 
        env.execute();  
    }  
}

4.         Code for the second subscriber

package com.huawei.bigdata.flink.examples;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.netty.source.NettySource;

import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler;

public class TestPipeline_NettySource2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//Set the concurrency of the publisher to 2.

 env.setParallelism(2);

//Create a ZookeeperRegisterServerHandler.

        ZookeeperRegisterServerHandler zkRegisterServerHandler = new ZookeeperRegisterServerHandler();

//Add a NettySource operator to receive data from the publisher.

        env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))

                .map(new MapFunction<byte[], String>() {

          //Convert the received byte arrays into character strings.

                    @Override

                    public String map(byte[] b) {

                        return new String(b);

                    }

                }).print();

        env.execute();

    }

}

Scala Example Code

The following uses the logic code as an example.

For details about the complete code, see:

l   com.huawei.bigdata.flink.examples.UserSource

l   com.huawei.bigdata.flink.examples.TestPipeline_NettySink

l   com.huawei.bigdata.flink.examples.TestPipeline_NettySource1

l   com.huawei.bigdata.flink.examples.TestPipeline_NettySource2

1.         Code for sending messages:

package com.huawei.bigdata.flink.examples  
 
case class Inforamtion(index: Int, content: String) {  
 
  def this() = this(0, "")  
}

2.         The publisher customizes source operators to create data.

package com.huawei.bigdata.flink.examples  
 
import org.apache.flink.configuration.Configuration  
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction  
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext  
 
class UserSource extends RichParallelSourceFunction[Inforamtion] with Serializable{  
 
  var isRunning = true  
 
  override def open(parameters: Configuration): Unit = {  
    super.open(parameters)  
     
  }   
 
//Create 10,000 pieces of data per second.
  override def run(sourceContext: SourceContext[Inforamtion]) = {  
 
    while (isRunning) {  
      for (i <- 0 until 10000) {  
        sourceContext.collect(Inforamtion(i, "hello-" + i));  
 
      }  
      Thread.sleep(1000)  
    }  
  }  
 
  override def close(): Unit = super.close()  
 
  override def cancel() = {  
    isRunning = false  
  }  
}

3.         Code for the publisher

package com.huawei.bigdata.flink.examples  
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment  
import org.apache.flink.streaming.connectors.netty.sink.NettySink  
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler  
import org.apache.flink.streaming.api.scala._  
 
object TestPipeline_NettySink {  
 
  def main(args: Array[String]): Unit = {  
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency of the publisher to 2.
env.setParallelism(2)
//Set the Zookeeper to the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a user-defined operator to create data.
env.addSource(new UserSource)
      .keyBy(0).map(x=>x.content.getBytes)//Convert the to-be-sent data into a byte array.
      .addSink(new NettySink("NettySink-1", "TOPIC-2", zkRegisterServerHandler,
2) //Add the NettySink operator to send data.
 
    env.execute()  
  }  
}

4.         Code for the first subscriber

package com.huawei.bigdata.flink.examples   
 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment  
import org.apache.flink.streaming.connectors.netty.source.NettySource  
import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler  
import org.apache.flink.streaming.api.scala._  
 
import scala.util.Random  
 
 
object TestPipeline_NettySource1 {  
 
  def main(args: Array[String]): Unit = {  
 
  val env = StreamExecutionEnvironment.getExecutionEnvironment
//Set the concurrency of the publisher to 2.
  env.setParallelism(2)
//Set the Zookeeper to the registration server
val zkRegisterServerHandler = new ZookeeperRegisterServerHandler
//Add a NettySource operator to receive data from the publisher.
env.addSource(new NettySource("NettySource-1", "TOPIC-2", zkRegisterServerHandler))
      .map(x => (1, new String(x)))//Convert the received byte stream into character strings.
      .filter(x => {  
        Random.nextInt(50000) == 10
      })  
      .print  
 
    env.execute()   
  }  
}

5.         Code for the second subscriber

package com.huawei.bigdata.flink.examples

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import org.apache.flink.streaming.connectors.netty.source.NettySource

import org.apache.flink.streaming.connectors.netty.utils.ZookeeperRegisterServerHandler

import org.apache.flink.streaming.api.scala._

import scala.util.Random

object TestPipeline_NettySource2 {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

//Set the concurrency of the publisher to 2.

 env.setParallelism(2)

//Create a Zookeeper as the registration server

    val zkRegisterServerHandler = new ZookeeperRegisterServerHandler

//Add a NettySource operator to receive data.

env.addSource(new NettySource("NettySource-2", "TOPIC-2", zkRegisterServerHandler))

      .map(x=>(2, new String(x)))//Convert the received byte array into character strings.

      .filter(x=>{

        Random.nextInt(50000) == 10

      })

      .print()

    env.execute()

  }

}

1.4 Obtaining Example Code

Using the FusionInsight Client

Decompress the FusionInsight client installation package and obtain the examples file, which is saved in the Flink sub-folder of the FusionInsight_Services_ClientConfig folder.

l   In security mode, obtain FlinkPipelineJavaExample and FlinkPipelineScalaExample from the flink-examples-security directory.

l   In non-security mode, obtain FlinkPipelineJavaExample and FlinkPipelineScalaExample from the flink-examples-normal directory.

Using the Maven Project

Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

l   Security mode

           components/flink/flink-examples-maven-security/FlinkPipelineJavaExample

           components/flink/flink-examples-maven-security/FlinkPipelineScalaExample

l   Non-security mode

           components/spark2x/flink-examples-maven/FlinkPipelineJavaExample

           components/spark2x/flink-examples-maven/FlinkPipelineScalaExample

1.5 Debugging the Application

1.5.1 Compiling and Running the Application

1.5.2 Viewing the Debugging Result

Scenarios

After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.

Procedure

l   View the running result of the Flink application.

If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.

If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.

           View checkpoint results and files

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

Either the following methods can be used to view the checkpoint file:

l   If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/ command to view checkpoint files.

l   If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.

           View pipeline results

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

           View the JOIN result of configuration table and steams

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.

           Viewing the Stream SQL Join result

The result is saved in the taskmanager.out file of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.

l   Use Apache Flink Dashboard to view the running status of the Flink application.

The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.

On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.

View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdout tag log information.

l   View Flink logs.

Both of the following methods can be used to obtain Flink logs:

           Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.

           Log in to the YARN web UI to view logs about JobManager and GC.

On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.

 


This article contains more resources

You need to log in to download or view. No account? Register

x

welcome
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.