Context We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume data from ~ 40k Kafka topic partitions in some environments. We are using aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.
Problem At the point when the state for operator using topic-partition-offset-states doesn’t fit in the state.storage.fs.memory-threshold, we end up with a proportionally high number of reads for the checkpoint and savepoint files for each of the topic partition offsets. For example when we have: {code} [14-Aug-2024 11:39:12.392 UTC] DEBUG org.apache.flink.runtime.state.TaskStateManagerImpl [] - Operator 8992e27ae82755cac12dd37f518df782 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={ SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459, 504, 549 …(offsets is a list 40k elements) {code} For each of the metadata offsets we will have S3 reads for checkpoint/savepoint. The Flink job fails to resume from checkpoint. With debug logs, we see hundred of thousands of AWS GET calls for the same checkpoint file, with different offsets. These AWS calls take such a long time, that our application fails to start and job crashes and starts same reads again and crashes again. We will have: {code} [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request [] - Sending Request: GET https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, Range: bytes=234-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, presto, ) [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request [] - Sending Request: GET https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, Range: bytes=234-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, presto, ) [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request [] - Sending Request: GET https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, presto, ) [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request [] - Sending Request: GET https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, presto, ) {code} Code which does the multiple reads was isolated to: {code} org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues private <S> void deserializeOperatorStateValues( PartitionableListState<S> stateListForName, FSDataInputStream in, OperatorStateHandle.StateMetaInfo metaInfo) throws IOException { if (null != metaInfo) { long[] offsets = metaInfo.getOffsets(); if (null != offsets) { DataInputView div = new DataInputViewStreamWrapper(in); TypeSerializer<S> serializer = stateListForName.getStateMetaInfo().getPartitionStateSerializer(); for (long offset : offsets) { in.seek(offset); stateListForName.add(serializer.deserialize(div)); } } } {code} Questions: 1. Please review the behaviour from above and advise if this is expected? 2. The reads for the topic-partition-offset-states are similar to: {code} Sending Request: GET https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, Range: bytes=234-9223372036854775806 Sending Request: GET https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, Range: bytes=279-9223372036854775806 And so on {code} Could this behaviour be optimised in Flink to reduce the number of reads and avoid reading the same data multiple times. Although the range start is changing slightly across calls, the range end is the same (max long int) entire file being retrieved. Do we need to retrieve each offset individually or this could be optimised in Flink to only have one call then then use the data accordingly. 3. Currently we set state.storage.fs.memory-threshold: 700kb to avoid the problem but we expect the number of topic partitions to increase, requiring a further increase for this value. What are our options once we reach the 1MB limit? Thank you.