
We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume data
from ~ 40k Kafka topic partitions in some environments. We are using
aligned checkpoints. We set 500kb.


At the point when the state for operator using
topic-partition-offset-states doesn’t fit in the, we end up with a proportionally high
number of reads for the checkpoint and savepoint files for each of the
topic partition offsets.

For example when we have:


[14-Aug-2024 11:39:12.392 UTC] DEBUG
org.apache.flink.runtime.state.TaskStateManagerImpl          [] - Operator
8992e27ae82755cac12dd37f518df782 has remote state
SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459, 504,
549 …(offsets is a list 40k elements)


For each of the metadata offsets we will have S3 reads for
checkpoint/savepoint. The Flink job fails to resume from checkpoint. With
debug logs, we see hundred of thousands of AWS GET calls for the same
checkpoint file, with different offsets. These AWS calls take such a long
time, that our application fails to start and job crashes and starts same
reads again and crashes again.

We will have:


[14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
                      [] - Sending Request: GET
Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
Range: bytes=234-9223372036854775806, User-Agent:  , cfg/retry-mode/legacy,
presto, )

[14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request
                      [] - Sending Request: GET
Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
Range: bytes=234-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
presto, )

[14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request
                      [] - Sending Request: GET
Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
presto, )

[14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request
                      [] - Sending Request: GET
Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
presto, )


Code which does the multiple reads was isolated to:



 private <S> void deserializeOperatorStateValues(

            PartitionableListState<S> stateListForName,

            FSDataInputStream in,

            OperatorStateHandle.StateMetaInfo metaInfo)

            throws IOException {

        if (null != metaInfo) {

            long[] offsets = metaInfo.getOffsets();

            if (null != offsets) {

                DataInputView div = new DataInputViewStreamWrapper(in);

                TypeSerializer<S> serializer =


                for (long offset : offsets) {









   Please review the behaviour from above and advise if this is expected?

   The reads for the topic-partition-offset-states are similar to:


Sending Request: GET
Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
Range: bytes=234-9223372036854775806

Sending Request: GET
Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
Range: bytes=279-9223372036854775806

And so on


Could this behaviour be optimised in Flink to reduce the number of reads
and avoid reading the same data multiple times. Although the range start is
changing slightly across calls, the range end is the same (max long int)
entire file being retrieved. Do we need to retrieve each offset
individually or this could be optimised in Flink to only have one call then
then use the data accordingly.


   Currently we set 700kb to avoid the
   problem but we expect the number of topic partitions to increase, requiring
   a further increase for this value. What are our options once we reach the
   1MB limit?

Thank you.

Reply via email to