Got it

HCIA-Big Data | Fault Tolerance of Flink

Latest reply: Jun 1, 2022 07:01:33 459 8 4 0 0

Hello, everyone!

Flink is a stateful analysis engine by default. However, if a task is suspended during processing, its status in the memory will be lost. If the intermediate computing status is not stored, the computing task is restarted and all data needs to be recalculated. If an intermediate state is stored, you can restore the task to the intermediate state and continue to execute the task. For this, Flink uses the checkpoint mechanism. 

Introduction to Checkpoint

How does Flink ensure exactly-once? It uses a feature called checkpoint to reset the system to the correct state in the event of a failure. Flink state storage depends on the checkpoint mechanism. Checkpoint periodically creates distributed snapshots to back up the state in the program.

Checkpoint Mechanism

Apache Flink offers a lightweight fault tolerance mechanism based on distributed checkpoints. A checkpoint is an automatic, asynchronous snapshot of task/operator state. Flink generates checkpoint barriers at intervals on the input data set and uses barriers to divide the data during the interval into the corresponding checkpoints. When an application error occurs, the states of all operators can be restored from the previous snapshot to ensure data consistency.

For applications with small state, these snapshots are very light-weight and can be drawn frequently without impacting the performance much. During checkpointing, the state is stored at a configurable place (such as the JobManager node or HDFS).

Checkpoint Configuration

1. Enable Checkpointing 

By default, checkpointing is disabled. To enable checkpointing, call enableCheckpointing(n), where n is the checkpoint interval in milliseconds. 

env.enableCheckpointing(1000) // Enable checkpointing and set the checkpoint 
interval to 1000 ms. If the state is large, you are advised to increase the value.

2. Exactly-once or at-least-once processing 

Exactly-once ensures end-to-end data consistency, prevents data loss and duplicates, and delivers poor Flink performance. 

At-least-once applies to scenarios that have high requirements on latency and throughput but low requirements on data consistency. 

Exactly-once is used by default. You can use the setCheckpointingMode() method to set the semantic mode. 


3. Checkpoint timeout 

Specifies the timeout period of checkpoint execution. Once the threshold is reached, Flink interrupts the checkpoint process and regards it as a timeout. 

This metric can be set using the setCheckpointTimeout method. The default value is 10 minutes. 


4. Minimum time between checkpoints 

When checkpoints end up frequently taking longer than the base interval because state grew larger than planned, the system is constantly taking checkpoints. That can mean that too many computing resources are constantly tied up in checkpointing, thereby affecting the overall application performance. To prevent 

such a situation, applications can define a minimum duration between two checkpoints: 


5. Number of concurrent checkpoints 

Specifies the number of checkpoints that can be executed at the same time. By default, the system will not trigger another checkpoint while one is still in progress. If you configure multiple checkpoints, the system can trigger multiple checkpoints at the same time. 


6. Externalized checkpoints 

You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the job fails. This way, you will have a checkpoint to resume from if your job fails. 


Introduction to savepoint

The checkpoint can be retained in an external medium when a job is canceled. Flink also has savepoint, another mechanism to restore job data.


Similar to checkpoints, savepoints allow saving state to external media. If a job fails, it can be restored from an external source. 

What are the differences between savepoints and checkpoints? 

1. Triggering and management: Checkpoints are automatically triggered and managed by Flink, while savepoints are manually triggered and managed by users. 

2. Function: Checkpoints allow fast recovery when tasks encounter exceptions, including network jitter or timeout. On the other hand, savepoints enable scheduled backup and allow you to stop-and-resume jobs, such as modifying code or adjusting concurrency. 

3. Features: Checkpoints are lightweight and can implement automatic recovery from job failures and are deleted by default after job completion. Savepoints, on the other hand, are persistent and saved in a standard format. They allow code or configuration changes. To resume a job from a savepoint, you need to manually specify a path. 

How Do I Restore Job Data?

The checkpoint can be retained in an external media when a job is cancelled. Flink also has another mechanism, savepoint, to restore job data.

Savepoints are a special implementation of checkpoints. The underlying layer uses the checkpoint mechanism. Savepoints are manually triggered by running commands and results are saved to a specified storage path so that the system can be restored to the original computing status during system upgrade and maintenance and end-to-end exactly-once semantics can be ensured.

For details, see The difference between checkpoint and savepoint.

State Storage 

When checkpointing is started, the persistence method and location depend on the selected State Backend. 

By default, states are stored in the memory of TaskManager. The storage location of checkpoint depends on the configuration of State Backend. Flink has the following built-in out-of-the-box State Backends: 

1.  MemoryStateBackend: memory-based. 

2.  FsStateBackend: file system–based, and can be a local file system or an HDFS file system. 

3.  RocksDBStateBackend: RockDB-based, and can be used as the storage medium. 

If this parameter is not set, MemoryStateBackend is used by default. 


new MemoryStateBackend(int maxStateSize, boolean asynchronousSnapshots) MemoryStateBackend: The construction method is to set the maximum StateSize and select whether to perform asynchronous snapshot. This storage status is stored in the memory of the TaskManager node, that is, the execution node. Because the memory capacity is limited, the default value of maxStateSize for a single State is 5 MB. Note 

that the value of maxStateSize is less than or equal to that of akka.framesize (10 MB by default). Since the JobManager memory stores checkpoints, the checkpoint size cannot be larger than the memory of the JobManager. Recommended scenarios: local testing and jobs such as ETL that hold little state and for which JobManager is unlikely to fail, or failure has little impact. The MemoryStateBackend is not recommended in production scenarios. 


FsStateBackend(URI checkpointDataUri, boolean asynchronousSnapshots) FsStateBackend: The construction method is to transfer a file path and determine whether the snapshot is an asynchronous snapshot. The FsStateBackend also holds state data in the memory of the TaskManager, but unlike MemoryStateBackend, it does not have the 5 MB size limit. In terms of the capacity limit, the state size on a single TaskManager cannot exceed the memory size of the TaskManager and the total size cannot exceed the capacity of the configured file system. Recommended 

scenarios: jobs with large state, such as aggregation at the minute-window level and join, and jobs requiring high-availability setups. 


RocksDBStateBackend(URI checkpointDataUri, boolean enable incremental-Checkpointing) 

RocksDBStateBackend: RocksDB is a key-value store. Similar to other storage systems for key-value data, the state is first put into memory. When the memory is about to run out, the state is written to disks. Note that RocksDB does not support synchronous 

Checkpoints. The synchronous snapshot option is not included in the constructor. However, the RocksDBStateBackend is currently the only backend that supports incremental Checkpoints. This enables users to only write incremental state changes, without having to write all the states each time. The external file systems (local file systems or HDFS) store the checkpoints. The state size of a single TaskManager is 

limited to the total size of its memory and disk. The maximum size of a key is 2 GB. 

The total size is not larger than the capacity of the configured file system. 

Recommended scenarios: jobs with very large state, for example, aggregation at the day-window level, jobs requiring high-availability setups, and jobs that do not require high read/write performance. 

Summary of HBase-related posts

[FI Components] Basic principle of Flink

[FI Components] HA Solution Description for Flink

[FI components Log] Flink Component Log Introduction

Flink Basic Concept

FLINK-DDL statements

Flink SQL-Basic concepts

Flink SQL-keywords

Flink SQL-Data type conversion

Flink SQL -DDL statements

Flink SQL - DML statement

Flink SQL - Query statements

Flink SQL - Query statements

Flink SQL-Create a data view

Flink SQL --Window functions

Flink SQL --Built-in functions

Flink SQL Built-in functions

Flink SQL -UDX

Flink SQL Built-in functions --Aggregate functions

Flink SQL Built-in functions CONCAT_AGG

Flink SQL Built-in functions Aggregate functions-COUNT

Flink SQL Built-in functions Aggregate functions-FIRST_VALUE

Flink SQL Built-in functions Aggregate functions-FIRST_VALUE

Flink SQL Built-in functions Aggregate functions-last value

Flink SQL Built-in functions Aggregate functions-MAX

Flink SQL Built-in functions Aggregate functions-MIN

Flink SQL Built-in functions Aggregate functions-SUM

Flink SQL Built-in functions Aggregate functions-VAR_POP

Flink SQL Built-in functions Aggregate functions-STDDEV_POP

That's all, thanks!

  • x
  • convention:

Created May 6, 2022 13:55:32

Good share
View more
  • x
  • convention:

Created May 7, 2022 03:55:25

Interesting reading.
View more
  • x
  • convention:

Moderator Author Created May 7, 2022 09:42:34

Apache Flink has a failure tolerance mechanism that allows data streaming applications to consistently recover their state. Even if there are failures, the technique ensures that the program's state will finally reflect every record from the data stream exactly once.
View more
  • x
  • convention:

olive.zhao Created May 10, 2022 07:43:35 (0) (0)
Created May 31, 2022 15:23:02

Thanks for sharing
View more
  • x
  • convention:

olive.zhao Created Jun 1, 2022 00:39:02 (0) (0)
Created Jun 1, 2022 07:01:33

It is helpful information, thanks for sharing
View more
  • x
  • convention:

olive.zhao Created Jun 6, 2022 08:24:02 (0) (0)


You need to log in to comment to the post Login | Register

Notice: To protect the legitimate rights and interests of you, the community, and third parties, do not release content that may bring legal risks to all parties, including but are not limited to the following:
  • Politically sensitive content
  • Content concerning pornography, gambling, and drug abuse
  • Content that may disclose or infringe upon others ' commercial secrets, intellectual properties, including trade marks, copyrights, and patents, and personal privacy
Do not share your account and password with others. All operations performed using your account will be regarded as your own actions and all consequences arising therefrom will be borne by you. For details, see " User Agreement."

My Followers

Login and enjoy all the member benefits


Are you sure to block this user?
Users on your blacklist cannot comment on your post,cannot mention you, cannot send you private messages.
Please bind your phone number to obtain invitation bonus.
Information Protection Guide
Thanks for using Huawei Enterprise Support Community! We will help you learn how we collect, use, store and share your personal information and the rights you have in accordance with Privacy Policy and User Agreement.