Got it

Flink:Case 3: Application for Implementing the Asynchronous Checkpoint Mechanism

Latest reply: Aug 3, 2018 01:44:07 3217 1 2 0 0

1.1.1 Case 3: Application for Implementing the Asynchronous Checkpoint Mechanism

1.1.1.1 Scenarios

Applicable Versions

FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80

Scenario Description

Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of 1 second, and achieve strict consistency of the status. In this case, when the application is recovered from a failure, the status of all operators is the same.

Data Planning

1.         Customized operators create about 10,000 pieces of data per second by using self-defined operators.

2.         The created data is of four tuples, including Long, String, String, and Integer.

3.         The statistics are printed on the terminals.

4.         The printed data is of the Long type.

1.1.1.2 Development Idea

1.         The source operator sends 10,000 pieces of data and injects the data into the window operator per second.

2.         The window operator calculates the data volume of preceding 4 seconds at the interval of 1 second.

3.         The statistics are printed on terminals per second. For details, see section Viewing the Debugging Result"Viewing the Debugging Result."

4.         The checkpoint is triggered at the interval of 6 seconds and the checkpoint results are saved in HDFS.

1.1.1.3 Example Code Description

JAVA Example Code

Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of 1 second, and achieve strict consistency of the status.

1.         Snapshot data

The snapshot data is used to save the number of data pieces recorded by operators during creation of snapshots.

import java.io.Seriablizale; 
  
//As a part of the snapshot, this class saves the user-defined status.
public class UDFState implements Serializable {  
    private long count;
      
    //Initialize the user-defined status.
public UDFState() {  
        count = 0L; 
    }  
  
    //Set the user-defined status.
    public void setState(long count) {  
       this.count = count; 
    }  
  
    //Obtain the user-defined status.
    public long geState() {  
        return this.count; 
    }   
}

2.         Data source with checkpoints

Code of the source operator. The code can be used to send data of 10,000 pieces after each pause of one second. When a snapshot is created, the number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.

import org.apache.flink.api.java.tuple.Tuple4; 
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; 
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; 
  
import java.util.ArrayList; 
import java.util.List; 
import java.util.Random; 
  
//This class is a source operator with a checkpoint.
public class SEventSourceWithChk extends RichSourceFunction<Tuple4<Long, String, String, Integer>> implements ListCheckpointed<UDFState> {  
    private Long count = 0L; 
    private boolean isRunning = true; 
    private String alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"; 
  
    //The key logic of the operator is to inject 10,000 tuples to the workflow per second.
public void run(SourceContext<Tuple4<Long, String, String, Integer>> ctx) throws Exception {  
        Random random = new Random(); 
        while(isRunning) {  
            for (int i = 0;i < 10000;i++) {  
                ctx.collect(Tuple4.of(random.nextLong(), "hello-" + count, alphabet, 1))  
                count++; 
            }  
            Thread.sleep(1000); 
        }  
    }  
  
    //Invoke the operator when the task is canceled.
    public void cancel() {  
        isRunning = false; 
    }  
  
    //Create a customized snapshot.
    public List<UDFState> snapshotState(long l, long ll) throws Exception {  
        UDFState udfState = new UDFState(); 
        List<UDFState> listState = new ArrayList<UDFState>(); 
        udfState.setState(count); 
        listState.add(udfState); 
        return listState; 
    }  
   
    //Restore data from the customized snapshot.
    public void restoreState(List<UDFState> list) throws Exception {  
        UDFState udfState = list.get(0); 
        count = udfState.getState(); 
    }  
}

3.         Definition of the window with a checkpoint

This code is about the window operator and is used to calculate the number of tuples in the window.

import org.apache.flink.api.java.tuple.Tuple; 
import org.apache.flink.api.java.tuple.Tuple4; 
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; 
import org.apache.flink.streaming.api.functions.windowing.WindowFunction; 
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; 
import org.apache.flink.util.Collector; 
  
import java.util.ArrayList; 
import java.util.List; 
  
//This class is a window operator with a checkpoint. 
public class WindowStatisticWithChk implements WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple, TimeWindow>, ListCheckpointed<UDFState> {  
    private Long total = 0L; 
       
    //Define the window operator implementation logic to calculate the number of tuples in the window. 
    void apply(Tuple key, TimeWindow window, Iterable<Tuple4<Long, String, String, Integer>> input, 
               Collector<Long> out) throws Exception {  
        long count = 0L; 
        for (Tuple4<Long, String, String, Integer> event : input) {  
            count++; 
        }  
        total += count;
        out.collect(count); 
    }  
  
    //Create a customized snapshot. 
public List<UDFState> snapshotState(Long l, Long ll) {  
        List<UDFState> listState = new ArrayList<UDFState>(); 
        UDFState udfState = new UDFState(); 
        udfState.setState(total); 
        listState.add(udfState); 
        return listState; 
     }  
  
     //Restore the status from the customized snapshot.
     public void restoreState(List<UDFState> list) throws Exception {  
         UDFState udfState = list.get(0);  
         total = udfState.getState(); 
     }  
}

4.         Application code

The code is about the definition of StreamGraph and is used to implement services. The processing time is used as the timestamp for triggering the window.

import org.apache.flink.runtime.state.filesystem.FsStateBackend; 
import org.apache.flink.streaming.api.CheckpointingMode; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; 
import org.apache.flink.streaming.api.windowing.time.Time; 
  
public class FlinkProcessingTimeAPIChkMain {  
    public static void main(String[] args) throws Exception{  
  
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  
        //Set related parameters and enable the checkpoint function. 
        env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink-checkpoint/checkpoint/")); 
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
        env.getCheckpointConfig.setCheckpointInterval(6000); 
          
        //Define the application logic. 
        env.addSource(new SEventSourceWithChk())  
           .keyBy(0)  
         .window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1)))  
           .apply(new WindowStatisticWithChk())  
           .print()  
  
        env.execute(); 
    }  
}

Scala Example Code

Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of 1 second, and achieve strict consistency of the status.

1.         Formats of sent data

case class SEvent(id: Long, name: String, info: String, count: Int)

2.         Snapshot data

The snapshot data is used to save the number of data pieces recorded by operators during creation of snapshots.

//User-defined status
class UDFState extends Serializable{  
    private var count = 0L  
      
    //Set the user-defined status.
def setState(s: Long) = count = s  
  
    //Obtain the user-defined status. 
    def getState = count  
}

3.         Data source with checkpoints

Code of the source operator. The code can be used to send data of 10,000 pieces after each pause of one second. When a snapshot is created, the number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.

import java.util  
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed  
import org.apache.flink.streaming.api.functions.source.RichSourceFunction  
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext  
  
//This class is a source operator with a checkpoint.
class SEventSourceWithChk extends RichSourceFunction[SEvent] with ListCheckpointed[UDFState]{  
    private var count = 0L  
    private var isRunning = true  
    private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"  
  
    //The logic of the source operator is to inject 10,000 tuples to the workflow per second.
override def run(sourceContext: SourceContext[SEvent]): Unit = {  
        while(isRunning) {  
            for (i <- 0 until 10000) {  
                sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))  
                count += 1L  
             }  
             Thread.sleep(1000)  
        }  
    }  
  
    //Invoke the operator when the task is canceled.
override def cancel(): Unit = {  
        isRunning = false; 
    }  
  
     override def close(): Unit = super.close()  
  
     //Create a snapshot 
     override def snapshotState(l: Long, l1: Long): util.List[UDFState] = {  
         val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]  
         val udfState = new UDFState  
         udfState.setState(count)  
         udfList.add(udfState)  
         udfList  
     }  
  
     //Obtain the status from the snapshot. 
     override def restoreState(list: util.List[UDFState]): Unit = {  
         val udfState = list.get(0)  
         count = udfState.getState  
     }  
}

4.         Definition of the window with a checkpoint

This code is about the window operator and is used to calculate the number of tuples in the window.

import java.util  
import org.apache.flink.api.java.tuple.Tuple  
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed  
import org.apache.flink.streaming.api.scala.function.WindowFunction  
import org.apache.flink.streaming.api.windowing.windows.TimeWindow   
import org.apache.flink.util.Collector  
  
//This class is a window operator with a checkpoint. 
class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{  
    private var total = 0L  
      
    //Define the window operator implementation logic to calculate the number of tuples in the window.
override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = {  
      var count = 0L  
        for (event <- input) {  
            count += 1L  
        }  
        total += count  
        out.collect(count)  
     }  
  
     //Create a snapshot for the customized status.
     override def snapshotState(l: Long, l1: Long): util.List[UDFState] = {  
         val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState]  
         val udfState = new UDFState  
         udfState.setState(total)  
         udfList.add(udfState)  
         udfList  
     }  
  
    //Restore the status from the customized snapshot.
    override def restoreState(list: util.List[UDFState]): Unit = {  
        val udfState = list.get(0)  
        total = udfState.getState  
    }  
}

5.         Application code

The code is about the definition of StreamGraph and is used to implement services. The event time is used as the timestamp for triggering the window.

import com.hauwei.rt.flink.core.{SEvent, SEventSourceWithChk, WindowStatisticWithChk}  
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend  
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks  
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}  
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment  
import org.apache.flink.streaming.api.watermark.Watermark  
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows  
import org.apache.flink.streaming.api.windowing.time.Time  
import org.apache.flink.api.scala._  
import org.apache.flink.runtime.state.filesystem.FsStateBackend  
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup  
  
object FlinkEventTimeAPIChkMain {  
    def main(args: Array[String]): Unit ={  
        val env = StreamExecutionEnvironment.getExecutionEnvironment  
        env.setStateBackend(new FsStateBackend("hdfs://hacluster/flink-checkpoint/checkpoint/"))  
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  
        env.getConfig.setAutoWatermarkInterval(2000)  
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  
        env.getCheckpointConfig.setCheckpointInterval(6000)  
  
        //Define the application logic.
        env.addSource(new SEventSourceWithChk)  
           .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] {  
               //Set watermark.
                override def getCurrentWatermark: Watermark = {  
                    new Watermark(System.currentTimeMillis())  
                }  
               //Add a timestamp to each tuple.
                override def extractTimestamp(t: SEvent, l: Long): Long = {  
                     System.currentTimeMillis()  
                }  
           })  
          .keyBy(0)  
          .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1)))  
          .apply(new WindowStatisticWithChk)  
          .print()  
       env.execute()  
    }  
}

1.1.1.4 Obtaining Example Code

Using the FusionInsight Client

Decompress the FusionInsight client installation package and obtain the examples file, which is saved in the Flink sub-folder of the FusionInsight_Services_ClientConfig folder.

l   In security mode, obtain FlinkCheckpointJavaExample and FlinkCheckpointScalaExample from the flink-examples-security directory.

l   In non-security mode, obtain FlinkCheckpointJavaExample and FlinkCheckpointScalaExample from the flink-examples-normal directory.

Using the Maven Project

Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home

l   Security mode

           components/flink/flink-examples-maven-security/FlinkCheckpointJavaExample

           components/flink/flink-examples-maven-security/FlinkCheckpointScalaExample

l   Non-security mode

           components/spark2x/flink-examples-maven/FlinkCheckpointJavaExample

           components/spark2x/flink-examples-maven/FlinkCheckpointScalaExample

1.1.1.5 Debugging the Application

1.1.1.5.1 Compiling and Running the Application
1.1.1.5.2 Viewing the Debugging Result

Scenarios

After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.

Procedure

l   View the running result of the Flink application.

If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.

If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.

           View checkpoint results and files

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

Either the following methods can be used to view the checkpoint file:

l   If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/ command to view checkpoint files.

l   If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.

           View pipeline results

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.

           View the JOIN result of configuration table and steams

The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.

           Viewing the Stream SQL Join result

The result is saved in the taskmanager.out file of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.

l   Use Apache Flink Dashboard to view the running status of the Flink application.

The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.

On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.

View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdout tag log information.

l   View Flink logs.

Both of the following methods can be used to obtain Flink logs:

           Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.

           Log in to the YARN web UI to view logs about JobManager and GC.

On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.

 

This post was last edited by chz at 2018-08-03 06:45.

This article contains more resources

You need to log in to download or view. No account? Register

x

welcomeFlink:Case 3: Application for Implementing the Asynchronous Checkpoint Mechanism-2713491-1
View more
  • x
  • convention:

Comment

You need to log in to comment to the post Login | Register
Comment

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits

Login

Block
Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Reminder
Please bind your phone number to obtain invitation bonus.