Ivan Torres created FLINK-39308:
-----------------------------------
Summary: Skip empty file-merging operator state snapshots for
empty operator list state
Key: FLINK-39308
URL: https://issues.apache.org/jira/browse/FLINK-39308
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 2.2.0, 1.20.3
Reporter: Ivan Torres
*Problem:*
When operator list state is registered but empty,
DefaultOperatorStateBackendSnapshotStrategy still falls through to the normal
snapshot path. For file-merging checkpoints this can materialize segment-backed
operator state handles even though the corresponding operator state has zero
offsets.
During restore, OperatorStateRestoreOperation opens those tiny segment-backed
handles and reads their metadata. On object stores, this adds avoidable
range-read and open overhead, especially when many tasks register empty
operator state.
*Root cause:*
DefaultOperatorStateBackendSnapshotStrategy.asyncSnapshot() only uses the empty
fast path when there are no registered operator states and no registered
broadcast states. It does not treat the common case "registered operator list
states exist but all are empty" as empty.
*Proposed change:*
If there are no broadcast states and every registered operator list state is
empty, return the same empty snapshot result used by the existing fully-empty
fast path:
* SnapshotResult.empty() for non-file-merging checkpoints
* EmptyFileMergingOperatorStreamStateHandle.create(...) for file-merging
checkpoints
This keeps non-empty state unchanged and avoids creating segment-backed files
for empty operator state.
*Expected outcome:*
* Empty registered operator list state no longer produces tiny file-merging
segments.
* Restore skips opening zero-partition file-merging handles.
* Object-store-backed restores improve when many empty operator states are
present.
* No change in restored contents/order for non-empty operator state.
*Validation:*
* Added focused runtime tests for empty registered operator state snapshots
and file-merging restore.
* Ran OperatorStateBackendTest, OperatorStateRestoreOperationTest, and
SharedStateRegistryTest successfully.
* In a production-shaped Ceph/S3A lab benchmark derived from real checkpoint
metadata, this reduced segment-backed operator handles from 192 to 11 and
improved task deploy->running from 13.15s to 9.50s (~27.7%).
*References:*
*
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
*
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java
*
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java
*
flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
--
This message was sent by Atlassian Jira
(v8.20.10#820010)