Hi G, We did a test today using ``` execution.checkpointing.snapshot-compression: true state.storage.fs.memory-threshold: 500kb ``` across 6 jobs with different parallelism and volume load. I will use one as an example - 70 slots - I had 70 files of 670kb corresponding to the subtask state containing the KafkaSource operator. In total compressed savepoint size was around 360MB, 201 files, biggest was ~11MB. Job restored ok from checkpoint and savepoint not seeing the millions of reads we were observing before. (forced stop/run few times)
We decided to let this soak for some time since our checkpoints can reach more than 10GB (uncompressed). Let me know if you have any updates. I will let you know if I observe anything else. Please let us know if you have any new findings. Thank you. On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Could you please let us know if you see anything wrong when using >> `execution.checkpointing.snapshot-compression: true` since for us this >> seems to have solved the multiple S3 reads issue. >> > When something is working it's never wrong. The question is why is has > been resolved. > Are you still having state.storage.fs.memory-threshold set to 500Kb? State > compression may reduce the state under this threshold which would make that > work. > > For uncompressed state could you please let us know how the change from >> your PR eliminates the multiple calls to S3. Is not very clear to us. >> > Copy from the PR: > Flink state restore from S3 is super slow because skip function is > consuming ~15 seconds for ~6Mb of data. > ... > In this PR the skip going to be called only in case of compression because > otherwise a stream is seekable. > > G > > On Tue, Oct 15, 2024 at 4:30 PM William Wallace < > theanonymous31...@gmail.com> wrote: > >> Thank you for the recommendation and the help. >> >> Could you please let us know if you see anything wrong when using >> `execution.checkpointing.snapshot-compression: true` since for us this >> seems to have solved the multiple S3 reads issue. >> > In debug we see: >> `in.delegate = >> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)` >> >> and >> `in.compressionDelegate = SnappyFramedInputStream` >> and in the logs a file is retrieved only once per subtask >> ``` >> DEBUG com.amazonaws.request [] - >> Sending Request: GET >> https://.../savepoints/flink-compression/.../savepoint-... >> Range: bytes=0-9223372036854775806. >> ``` >> >> For uncompressed state could you please let us know how the change from >> your PR eliminates the multiple calls to S3. Is not very clear to us. >> > Thank you. >> >> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> My recommendation is to cherry-pick this PR [1] at top of your Flink >>> distro when possible. >>> Additionally turn off state compression. These should do the trick... >>> >>> [1] https://github.com/apache/flink/pull/25509 >>> >>> G >>> >>> >>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace < >>> theanonymous31...@gmail.com> wrote: >>> >>>> Thank you Gabor for your reply. >>>> >>>> I'm sharing below more findings for both uncompressed and compressed >>>> state with the hope it helps. I'm looking further to your thoughts. >>>> >>>> 1. uncompressed state - observe the >>>> `stateHandle=RelativeFileStateHandle` >>>> ``` >>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] >>>> - Finished restoring from state handle: >>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, >>>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State: >>>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-..., >>>> 12345678-... [... bytes]}. >>>> ``` >>>> >>>> `FSDataInputStream in.delegate` in >>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues` >>>> is an instance of >>>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`. >>>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an >>>> actual partial file read which in our case ends in order of millions >>>> because of high job parallelism (subtasks) and job can't recover. >>>> >>>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle >>>> ``` >>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] >>>> - Finished restoring from state handle: >>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, >>>> endKeyGroup=31}}, >>>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...', >>>> dataBytes=...}}. >>>> ``` >>>> `FSDataInputStream in.delegate` in >>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues` >>>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream) >>>> This means that for every `offset: offsets = metaInfo.getOffsets()` we >>>> end up doing a read from a `byte[]` which are faster. >>>> >>>> At this point I don't understand how not doing the `skip` operation in >>>> case of uncompressed state can work, since skip is required for the partial >>>> reads, and I apologise if I'm wrong, I don't have the same level of >>>> understanding as you have. >>>> >>>> What we considered doing was to find a way to actually cache the file >>>> as a byte[] and do the reads from memory ... but it seems the state >>>> compression is doing the same. We are in the process of testing state >>>> compression under production volumes ... can't say how that will actually >>>> work for us. >>>> >>>> Thank you again for looking into this. I'm looking forward for your >>>> thoughts. Please let me know if I missed or misunderstood something. Please >>>> let us know your recommendation. >>>> >>>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi < >>>> gabor.g.somo...@gmail.com> wrote: >>>> >>>>> Hi William, >>>>> >>>>> It's a bit old question but I think now we know why this is happening. >>>>> Please see [1] for further details. >>>>> It's an important requirement to use uncompressed state because even >>>>> with the fix compressed state is still problematic. >>>>> >>>>> We've already tested the PR with load but if you can report back it >>>>> would be helpful. >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-36530 >>>>> >>>>> BR, >>>>> G >>>>> >>>>> >>>>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace < >>>>> theanonymous31...@gmail.com> wrote: >>>>> >>>>>> Context >>>>>> >>>>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume >>>>>> data from ~ 40k Kafka topic partitions in some environments. We are using >>>>>> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb. >>>>>> >>>>>> Problem >>>>>> >>>>>> At the point when the state for operator using >>>>>> topic-partition-offset-states doesn’t fit in the >>>>>> state.storage.fs.memory-threshold, we end up with a proportionally high >>>>>> number of reads for the checkpoint and savepoint files for each of the >>>>>> topic partition offsets. >>>>>> >>>>>> For example when we have: >>>>>> >>>>>> {code} >>>>>> >>>>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG >>>>>> org.apache.flink.runtime.state.TaskStateManagerImpl [] - >>>>>> Operator >>>>>> 8992e27ae82755cac12dd37f518df782 has remote state >>>>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={ >>>>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, >>>>>> 459, 504, 549 …(offsets is a list 40k elements) >>>>>> >>>>>> {code} >>>>>> >>>>>> For each of the metadata offsets we will have S3 reads for >>>>>> checkpoint/savepoint. The Flink job fails to resume from checkpoint. >>>>>> With debug logs, we see hundred of thousands of AWS GET calls for the >>>>>> same >>>>>> checkpoint file, with different offsets. These AWS calls take such a long >>>>>> time, that our application fails to start and job crashes and starts same >>>>>> reads again and crashes again. >>>>>> >>>>>> >>>>>> We will have: >>>>>> >>>>>> {code} >>>>>> >>>>>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request >>>>>> [] - Sending Request: GET >>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 >>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>>>>> Range: bytes=234-9223372036854775806, User-Agent: , >>>>>> cfg/retry-mode/legacy, presto, ) >>>>>> >>>>>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request >>>>>> [] - Sending Request: GET >>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>>>>> Range: bytes=234-9223372036854775806, User-Agent: , >>>>>> cfg/retry-mode/legacy, presto, ) >>>>>> >>>>>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request >>>>>> [] - Sending Request: GET >>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 >>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>>>>> Range: bytes=279-9223372036854775806, User-Agent: , >>>>>> cfg/retry-mode/legacy, >>>>>> presto, ) >>>>>> >>>>>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request >>>>>> [] - Sending Request: GET >>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>>>>> Range: bytes=279-9223372036854775806, User-Agent: , >>>>>> cfg/retry-mode/legacy, >>>>>> presto, ) >>>>>> >>>>>> {code} >>>>>> >>>>>> Code which does the multiple reads was isolated to: >>>>>> >>>>>> {code} >>>>>> >>>>>> >>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues >>>>>> >>>>>> private <S> void deserializeOperatorStateValues( >>>>>> >>>>>> PartitionableListState<S> stateListForName, >>>>>> >>>>>> FSDataInputStream in, >>>>>> >>>>>> OperatorStateHandle.StateMetaInfo metaInfo) >>>>>> >>>>>> throws IOException { >>>>>> >>>>>> if (null != metaInfo) { >>>>>> >>>>>> long[] offsets = metaInfo.getOffsets(); >>>>>> >>>>>> if (null != offsets) { >>>>>> >>>>>> DataInputView div = new >>>>>> DataInputViewStreamWrapper(in); >>>>>> >>>>>> TypeSerializer<S> serializer = >>>>>> >>>>>> >>>>>> >>>>>> stateListForName.getStateMetaInfo().getPartitionStateSerializer(); >>>>>> >>>>>> for (long offset : offsets) { >>>>>> >>>>>> in.seek(offset); >>>>>> >>>>>> stateListForName.add(serializer.deserialize(div)); >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> {code} >>>>>> >>>>>> Questions: >>>>>> >>>>>> 1. >>>>>> >>>>>> Please review the behaviour from above and advise if this is >>>>>> expected? >>>>>> 2. >>>>>> >>>>>> The reads for the topic-partition-offset-states are similar to: >>>>>> >>>>>> {code} >>>>>> >>>>>> Sending Request: GET >>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>>>>> Range: bytes=234-9223372036854775806 >>>>>> >>>>>> Sending Request: GET >>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>>>>> Range: bytes=279-9223372036854775806 >>>>>> >>>>>> And so on >>>>>> >>>>>> {code} >>>>>> >>>>>> Could this behaviour be optimised in Flink to reduce the number of >>>>>> reads and avoid reading the same data multiple times. Although the range >>>>>> start is changing slightly across calls, the range end is the same (max >>>>>> long int) entire file being retrieved. Do we need to retrieve each offset >>>>>> individually or this could be optimised in Flink to only have one call >>>>>> then >>>>>> then use the data accordingly. >>>>>> >>>>>> 3. >>>>>> >>>>>> Currently we set state.storage.fs.memory-threshold: 700kb to >>>>>> avoid the problem but we expect the number of topic partitions to >>>>>> increase, >>>>>> requiring a further increase for this value. What are our options >>>>>> once we >>>>>> reach the 1MB limit? >>>>>> >>>>>> >>>>>> Thank you. >>>>>> >>>>>