Hi William, It's a bit old question but I think now we know why this is happening. Please see [1] for further details. It's an important requirement to use uncompressed state because even with the fix compressed state is still problematic.
We've already tested the PR with load but if you can report back it would be helpful. [1] https://issues.apache.org/jira/browse/FLINK-36530 BR, G On Fri, Aug 16, 2024 at 11:25 AM William Wallace < theanonymous31...@gmail.com> wrote: > Context > > We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume data > from ~ 40k Kafka topic partitions in some environments. We are using > aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb. > > Problem > > At the point when the state for operator using > topic-partition-offset-states doesn’t fit in the > state.storage.fs.memory-threshold, we end up with a proportionally high > number of reads for the checkpoint and savepoint files for each of the > topic partition offsets. > > For example when we have: > > {code} > > [14-Aug-2024 11:39:12.392 UTC] DEBUG > org.apache.flink.runtime.state.TaskStateManagerImpl [] - Operator > 8992e27ae82755cac12dd37f518df782 has remote state > SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={ > SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459, > 504, 549 …(offsets is a list 40k elements) > > {code} > > For each of the metadata offsets we will have S3 reads for > checkpoint/savepoint. The Flink job fails to resume from checkpoint. With > debug logs, we see hundred of thousands of AWS GET calls for the same > checkpoint file, with different offsets. These AWS calls take such a long > time, that our application fails to start and job crashes and starts same > reads again and crashes again. > > > We will have: > > {code} > > [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request > [] - Sending Request: GET > https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 > Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, > Range: bytes=234-9223372036854775806, User-Agent: , > cfg/retry-mode/legacy, presto, ) > > [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request > [] - Sending Request: GET > https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 > Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, > Range: bytes=234-9223372036854775806, User-Agent: , > cfg/retry-mode/legacy, presto, ) > > [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request > [] - Sending Request: GET > https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 > Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, > Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, > presto, ) > > [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request > [] - Sending Request: GET > https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 > Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, > Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, > presto, ) > > {code} > > Code which does the multiple reads was isolated to: > > {code} > > > org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues > > private <S> void deserializeOperatorStateValues( > > PartitionableListState<S> stateListForName, > > FSDataInputStream in, > > OperatorStateHandle.StateMetaInfo metaInfo) > > throws IOException { > > if (null != metaInfo) { > > long[] offsets = metaInfo.getOffsets(); > > if (null != offsets) { > > DataInputView div = new DataInputViewStreamWrapper(in); > > TypeSerializer<S> serializer = > > > > stateListForName.getStateMetaInfo().getPartitionStateSerializer(); > > for (long offset : offsets) { > > in.seek(offset); > > stateListForName.add(serializer.deserialize(div)); > > } > > } > > } > > {code} > > Questions: > > 1. > > Please review the behaviour from above and advise if this is expected? > 2. > > The reads for the topic-partition-offset-states are similar to: > > {code} > > Sending Request: GET > https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 > Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, > Range: bytes=234-9223372036854775806 > > Sending Request: GET > https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 > Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, > Range: bytes=279-9223372036854775806 > > And so on > > {code} > > Could this behaviour be optimised in Flink to reduce the number of reads > and avoid reading the same data multiple times. Although the range start is > changing slightly across calls, the range end is the same (max long int) > entire file being retrieved. Do we need to retrieve each offset > individually or this could be optimised in Flink to only have one call then > then use the data accordingly. > > 3. > > Currently we set state.storage.fs.memory-threshold: 700kb to avoid the > problem but we expect the number of topic partitions to increase, requiring > a further increase for this value. What are our options once we reach the > 1MB limit? > > > Thank you. >