1.1.1 Case 3: Application for Implementing the Asynchronous Checkpoint Mechanism
1.1.1.1 Scenarios
Applicable Versions
FusionInsight HD V100R002C70 and FusionInsight HD V100R002C80
Scenario Description
Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of 1 second, and achieve strict consistency of the status. In this case, when the application is recovered from a failure, the status of all operators is the same.
Data Planning
1. Customized operators create about 10,000 pieces of data per second by using self-defined operators.
2. The created data is of four tuples, including Long, String, String, and Integer.
3. The statistics are printed on the terminals.
4. The printed data is of the Long type.
1.1.1.2 Development Idea
1. The source operator sends 10,000 pieces of data and injects the data into the window operator per second.
2. The window operator calculates the data volume of preceding 4 seconds at the interval of 1 second.
3. The statistics are printed on terminals per second. For details, see section Viewing the Debugging Result"Viewing the Debugging Result."
4. The checkpoint is triggered at the interval of 6 seconds and the checkpoint results are saved in HDFS.
1.1.1.3 Example Code Description
JAVA Example Code
Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of 1 second, and achieve strict consistency of the status.
1. Snapshot data
The snapshot data is used to save the number of data pieces recorded by operators during creation of snapshots.
import java.io.Seriablizale;
//As a part of the snapshot, this class saves the user-defined status.
public class UDFState implements Serializable {
private long count;
//Initialize the user-defined status.
public UDFState() {
count = 0L;
}
//Set the user-defined status.
public void setState(long count) {
this.count = count;
}
//Obtain the user-defined status.
public long geState() {
return this.count;
}
}
2. Data source with checkpoints
Code of the source operator. The code can be used to send data of 10,000 pieces after each pause of one second. When a snapshot is created, the number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.
import
org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
//This class is a source operator with a checkpoint.
public class SEventSourceWithChk extends RichSourceFunction<Tuple4<Long,
String, String, Integer>> implements ListCheckpointed<UDFState>
{
private Long count = 0L;
private boolean isRunning = true;
private String alphabet =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321";
//The key logic of the operator is to inject 10,000 tuples
to the workflow per second.
public void run(SourceContext<Tuple4<Long, String, String,
Integer>> ctx) throws Exception {
Random random = new Random();
while(isRunning) {
for (int i =
0;i < 10000;i++) {
ctx.collect(Tuple4.of(random.nextLong(), "hello-" + count, alphabet,
1))
count++;
}
Thread.sleep(1000);
}
}
//Invoke the operator when the task is canceled.
public void cancel() {
isRunning = false;
}
//Create a customized snapshot.
public List<UDFState> snapshotState(long l, long ll)
throws Exception {
UDFState udfState = new
UDFState();
List<UDFState> listState = new
ArrayList<UDFState>();
udfState.setState(count);
listState.add(udfState);
return listState;
}
//Restore data from the customized snapshot.
public void restoreState(List<UDFState> list) throws
Exception {
UDFState udfState =
list.get(0);
count = udfState.getState();
}
}
3. Definition of the window with a checkpoint
This code is about the window operator and is used to calculate the number of tuples in the window.
import
org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
//This class is a window operator with a checkpoint.
public class WindowStatisticWithChk implements
WindowFunction<Tuple4<Long, String, String, Integer>, Long, Tuple,
TimeWindow>, ListCheckpointed<UDFState> {
private Long total = 0L;
//Define the window operator implementation logic to
calculate the number of tuples in the window.
void apply(Tuple key, TimeWindow window,
Iterable<Tuple4<Long, String, String, Integer>> input,
Collector<Long> out) throws Exception {
long count = 0L;
for (Tuple4<Long, String, String,
Integer> event : input) {
count++;
}
total += count;
out.collect(count);
}
//Create a customized snapshot.
public List<UDFState> snapshotState(Long l, Long ll) {
List<UDFState> listState = new
ArrayList<UDFState>();
UDFState udfState = new
UDFState();
udfState.setState(total);
listState.add(udfState);
return listState;
}
//Restore the status from the customized snapshot.
public void restoreState(List<UDFState> list)
throws Exception {
UDFState udfState =
list.get(0);
total =
udfState.getState();
}
}
4. Application code
The code is about the definition of StreamGraph and is used to implement services. The processing time is used as the timestamp for triggering the window.
import
org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class FlinkProcessingTimeAPIChkMain {
public static void main(String[] args) throws
Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//Set related parameters and enable
the checkpoint function.
env.setStateBackend(new
FsStateBackend("hdfs://hacluster/flink-checkpoint/checkpoint/"));
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig.setCheckpointInterval(6000);
//Define the application
logic.
env.addSource(new
SEventSourceWithChk())
.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(1)))
.apply(new
WindowStatisticWithChk())
.print()
env.execute();
}
}
Scala Example Code
Assume that you want to collect data volume in the window covering preceding 4 seconds at the interval of 1 second, and achieve strict consistency of the status.
1. Formats of sent data
case class SEvent(id: Long, name: String, info: String, count: Int)
2. Snapshot data
The snapshot data is used to save the number of data pieces recorded by operators during creation of snapshots.
//User-defined status
class UDFState extends Serializable{
private var count = 0L
//Set the user-defined status.
def setState(s: Long) = count = s
//Obtain the user-defined status.
def getState = count
}
3. Data source with checkpoints
Code of the source operator. The code can be used to send data of 10,000 pieces after each pause of one second. When a snapshot is created, the number of sent data pieces is recorded in UDFState. When the snapshot is used for restoration, the number of sent data pieces recorded in UDFState is read and assigned to the count variable.
import java.util
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
//This class is a source operator with a checkpoint.
class SEventSourceWithChk extends RichSourceFunction[SEvent] with
ListCheckpointed[UDFState]{
private var count = 0L
private var isRunning = true
private val alphabet =
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
//The logic of the source operator is to inject 10,000
tuples to the workflow per second.
override def run(sourceContext: SourceContext[SEvent]): Unit = {
while(isRunning) {
for (i <-
0 until 10000) {
sourceContext.collect(SEvent(1, "hello-"+count,
alphabet,1))
count += 1L
}
Thread.sleep(1000)
}
}
//Invoke the operator when the task is canceled.
override def cancel(): Unit = {
isRunning = false;
}
override def close(): Unit = super.close()
//Create a snapshot
override def snapshotState(l: Long, l1: Long):
util.List[UDFState] = {
val udfList:
util.ArrayList[UDFState] = new util.ArrayList[UDFState]
val udfState = new
UDFState
udfState.setState(count)
udfList.add(udfState)
udfList
}
//Obtain the status from the snapshot.
override def restoreState(list: util.List[UDFState]):
Unit = {
val udfState =
list.get(0)
count =
udfState.getState
}
}
4. Definition of the window with a checkpoint
This code is about the window operator and is used to calculate the number of tuples in the window.
import java.util
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
//This class is a window operator with a checkpoint.
class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple,
TimeWindow] with ListCheckpointed[UDFState]{
private var total = 0L
//Define the window operator implementation logic to
calculate the number of tuples in the window.
override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent],
out: Collector[Long]): Unit = {
var count = 0L
for (event <- input)
{
count +=
1L
}
total += count
out.collect(count)
}
//Create a snapshot for the customized status.
override def snapshotState(l: Long, l1: Long):
util.List[UDFState] = {
val udfList:
util.ArrayList[UDFState] = new util.ArrayList[UDFState]
val udfState = new
UDFState
udfState.setState(total)
udfList.add(udfState)
udfList
}
//Restore the status from the customized snapshot.
override def restoreState(list: util.List[UDFState]): Unit =
{
val udfState =
list.get(0)
total =
udfState.getState
}
}
5. Application code
The code is about the definition of StreamGraph and is used to implement services. The event time is used as the timestamp for triggering the window.
import
com.hauwei.rt.flink.core.{SEvent, SEventSourceWithChk,
WindowStatisticWithChk}
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.{CheckpointingMode,
TimeCharacteristic}
import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
object FlinkEventTimeAPIChkMain {
def main(args: Array[String]): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new
FsStateBackend("hdfs://hacluster/flink-checkpoint/checkpoint/"))
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(2000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointInterval(6000)
//Define the application logic.
env.addSource(new SEventSourceWithChk)
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent]
{
//Set watermark.
override def getCurrentWatermark: Watermark = {
new Watermark(System.currentTimeMillis())
}
//Add a timestamp to each tuple.
override def extractTimestamp(t: SEvent, l: Long): Long = {
System.currentTimeMillis()
}
})
.keyBy(0)
.window(SlidingEventTimeWindows.of(Time.seconds(4),
Time.seconds(1)))
.apply(new
WindowStatisticWithChk)
.print()
env.execute()
}
}
1.1.1.4 Obtaining Example Code
Using the FusionInsight Client
Decompress the FusionInsight client installation package and obtain the examples file, which is saved in the Flink sub-folder of the FusionInsight_Services_ClientConfig folder.
l In security mode, obtain FlinkCheckpointJavaExample and FlinkCheckpointScalaExample from the flink-examples-security directory.
l In non-security mode, obtain FlinkCheckpointJavaExample and FlinkCheckpointScalaExample from the flink-examples-normal directory.
Using the Maven Project
Download the code from the Huawei DevCloud website to the local computer. Huawei DevCloud URL: https://codehub-cn-south-1.devcloud.huaweicloud.com/codehub/7076065/home
l Security mode
− components/flink/flink-examples-maven-security/FlinkCheckpointJavaExample
− components/flink/flink-examples-maven-security/FlinkCheckpointScalaExample
l Non-security mode
− components/spark2x/flink-examples-maven/FlinkCheckpointJavaExample
− components/spark2x/flink-examples-maven/FlinkCheckpointScalaExample
1.1.1.5 Debugging the Application
1.1.1.5.1 Compiling and Running the Application
1.1.1.5.2 Viewing the Debugging Result
Scenarios
After a Flink application completes running, you can view the running result, or use Apache Flink Dashboard to view application running status.
Procedure
l View the running result of the Flink application.
If you want to check the execution result, view the Stdout log of TaskManager on the Apache Flink Dashboard.
If the execution result is exported to a file or a location specified by Flink, view the result from the exported file or the location. The checkpoint, pipeline, and join between configuration tables and streams are used as examples.
− View checkpoint results and files
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
Either the following methods can be used to view the checkpoint file:
l If the checkpoint snapshot information is saved in the HDFS, run the hdfs dfs -ls hdfs://hacluster/flink-checkpoint/ command to view checkpoint files.
l If the checkpoint snapshot information is saved to a local file, log in to each node to view checkpoint files.
− View pipeline results
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Apache Flink Dashboard, click the task manager label, and click out.
− View the JOIN result of configuration table and steams
The pipeline results are stored in the taskmanager.out file of Flink. If you want to view pipeline results, log in to the Flink WebUI, click the task manager label, and click out.
− Viewing the Stream SQL Join result
The result is saved in the taskmanager.out file of Flink. You can view the result by clicking the out button under the task manager label on the web UI of Flink.
l Use Apache Flink Dashboard to view the running status of the Flink application.
The Apache Flink Dashboard mainly includes Overview, Running Jobs, Completed Jobs, Task Managers, Job Manager and Logout and so on.
On In the YARN web UI, find the desired Flink application. Click the ApplicationMaster at the last column of the application to switch to the Apache Flink Dashboard.
View the print results of the program execution: find the corresponding Task Manager to see the corresponding Stdout tag log information.
l View Flink logs.
Both of the following methods can be used to obtain Flink logs:
− Log in to the Apache Flink Dashboard and view logs of TaskManagers and JobManager.
− Log in to the YARN web UI to view logs about JobManager and GC.
On the YARN web UI wind, find the desired Flink application. Click the ID of the application. On the switched page, click Logs in the Logs column.
This post was last edited by chz at 2018-08-03 06:45.

