Hello, everyone!
In this article, we will discuss the TTL of the state (State) and give the use case. In addition, we will show how to use and configure the TTL of the state.
Temporary state
State can only be maintained for two main reasons within a limited period of time. For example, it is assumed that an Flink application extracts a user login event for each user and stores the last login time of each user, so as to improve user experience next time.
Size of a control state
The size of the control state can effectively manage the scale of the growing State, the main scenario for this TTL application. Generally, data needs to be reserved temporarily, for example, a user is in a session that is accessed at a time. After the event that the user accesses ends, we do not need to save the status of the user, but the State of the user still occupies the storage space. Flink1.8.0 introduces a TTL-based cleanup of expired states so that we can clear these invalid data. Before that, developers must take additional actions to delete unwanted states to release storage space. This manual cleanup procedure is not only error-prone, but also inefficient. According to the preceding login cases, you do not need to manually clear the cases.
Based on data confidentiality requirements
Assume that we have requirements for the timeliness of data, for example, users are not allowed to access the data in a certain period. We can use the TTL function.
Continuously clear the application status. (Continuous Cleanup)
The State TTL function is introduced in 1.6.0 of Apache Flink. It enables the developer of the stream processing application to configure the expiration time and clear it after defining the time timeout (Time to Live). In Flink 1.8.0, the function is extended, including continuous cleaning of historical data of the RocksDB and the heap state backend (FSStateBackend and MemoryStateBackend), so as to implement the continuous cleaning process of the old entry (according to the TTL setting).
In DataStream API of Flink, the application state is defined by the state descriptor (State Descriptor). Configure the state TTL by passing the StateTtlConfiguration object to the state descriptor. The following Java example demonstrates how to create a state TTL configuration and provide it to the state descriptor, which saves the last login time of the user in the case as a Long value:

When a complete snapshot of a checkpoint or saving point is obtained, the Flink 1.6.0 supports automatic deletion of the expired state. Note that expired state deletion does not apply to incremental checkpoints. The status deletion of full snapshots must be enabled, as shown in the following example:

The full snapshot automatically deletes the expired status.
When a complete snapshot of a checkpoint or saving point is obtained, the Flink 1.6.0 supports automatic deletion of the expired state. Note that expired state deletion does not apply to incremental checks.
Incremental cleanup of the backend of the heap status
This method is specific to the heap state backend (FSStateBackend and MemoryStateBackend). Its implementation is to store the backend to maintain an inert global iterator on all state entries. Some events (such as status access) trigger incremental cleanup. Each time the incremental cleanup is triggered, the iterator deletes the expired data that has been traversed. The following code example demonstrates how to enable incremental cleanup:

The first is that the time spent on incremental cleanup increases data processing latency.
The second should be negligible, but it is still worth mentioning: If there is no state access or no data processing record, the expiration state will not be deleted.
That's all, thanks!



