Hi William and Mate, I've just tested compressed and uncompressed restore with the same amount of data and the result matches with yours: - Compressed restore operation performing well because snappy skips data only for a single block which is relatively small - Uncompressed restore operation performing well after applied my PR [1]
After it's going to be merged we can say both compressed and uncompressed state is safe to use. Thanks for everybody for the efforts to sort this out! [1] https://github.com/apache/flink/pull/25509 G On Thu, Oct 17, 2024 at 11:06 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Mate, > > Thanks for the deep dive! I've had a slight look at the code and it makes > sense why you and William is not seeing slowness with compressed state. > Tomorrow I'll do some tests and come back with the results... > > @William Wallace <theanonymous31...@gmail.com> I think the restore should > work without the memory-threshold setting with compression. > When compression is off then my PR is going to be the cure🙂 > > G > > > On Thu, Oct 17, 2024 at 8:06 PM Mate Czagany <czmat...@gmail.com> wrote: > >> Hi William, >> >> I think your findings are correct, I could easily reproduce the issue >> with snapshot-compression set to false, but I was unable to with >> snapshot-compression set to true. >> >> When using compressed state, the available() call will return the number >> of bytes in the Snappy internal buffer that has not been decompressed yet >> [1] [2]. It is safe to skip bytes here, as this will not cause the next >> call to seek() to seek backwards. >> >> When not using compressed state, the available() call will return the >> number of bytes buffered by the underlying BufferedInputStream which >> buffers data from the filesystem, e.g. S3. In my tests the buffer size was >> 4096, and if Flink read e.g. 50 bytes of data, the result of available() >> was then 4046. Skipping 4046 bytes (or any number of bytes) meant that for >> the next seek() call the buffer had to seek backwards, and for S3 that >> meant closing and re-opening the stream, resulting in a new GET request for >> each element in the list state. >> >> I think Gabor's PR [3] is the right solution, I can't think of any >> situation where we would have to skip any bytes in the stream when not >> using compressed state. I also think that compressed state is not affected >> by this. >> >> [1] >> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L313 >> [2] >> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L566 >> [3] https://github.com/apache/flink/pull/25509 >> >> Best, >> Mate >> >> William Wallace <theanonymous31...@gmail.com> ezt írta (időpont: 2024. >> okt. 17., Cs, 19:23): >> >>> Hi G, >>> >>> We did a test today using >>> ``` >>> execution.checkpointing.snapshot-compression: true >>> state.storage.fs.memory-threshold: 500kb >>> ``` >>> across 6 jobs with different parallelism and volume load. >>> I will use one as an example - 70 slots - I had 70 files of 670kb >>> corresponding to the subtask state containing the KafkaSource operator. In >>> total compressed savepoint size was around 360MB, 201 files, biggest was >>> ~11MB. Job restored ok from checkpoint and savepoint not seeing the >>> millions of reads we were observing before. (forced stop/run few times) >>> >>> We decided to let this soak for some time since our checkpoints can >>> reach more than 10GB (uncompressed). >>> >>> Let me know if you have any updates. I will let you know if I observe >>> anything else. >>> Please let us know if you have any new findings. >>> >>> Thank you. >>> >>> On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> Could you please let us know if you see anything wrong when using >>>>> `execution.checkpointing.snapshot-compression: true` since for us this >>>>> seems to have solved the multiple S3 reads issue. >>>>> >>>> When something is working it's never wrong. The question is why is has >>>> been resolved. >>>> Are you still having state.storage.fs.memory-threshold set to 500Kb? >>>> State compression may reduce the state under this threshold which would >>>> make that work. >>>> >>>> For uncompressed state could you please let us know how the change from >>>>> your PR eliminates the multiple calls to S3. Is not very clear to us. >>>>> >>>> Copy from the PR: >>>> Flink state restore from S3 is super slow because skip function is >>>> consuming ~15 seconds for ~6Mb of data. >>>> ... >>>> In this PR the skip going to be called only in case of compression >>>> because otherwise a stream is seekable. >>>> >>>> G >>>> >>>> On Tue, Oct 15, 2024 at 4:30 PM William Wallace < >>>> theanonymous31...@gmail.com> wrote: >>>> >>>>> Thank you for the recommendation and the help. >>>>> >>>>> Could you please let us know if you see anything wrong when using >>>>> `execution.checkpointing.snapshot-compression: true` since for us this >>>>> seems to have solved the multiple S3 reads issue. >>>>> >>>> In debug we see: >>>>> `in.delegate = >>>>> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)` >>>>> >>>>> and >>>>> `in.compressionDelegate = SnappyFramedInputStream` >>>>> and in the logs a file is retrieved only once per subtask >>>>> ``` >>>>> DEBUG com.amazonaws.request [] >>>>> - Sending Request: GET >>>>> https://.../savepoints/flink-compression/.../savepoint-... >>>>> Range: bytes=0-9223372036854775806. >>>>> ``` >>>>> >>>>> For uncompressed state could you please let us know how the change >>>>> from your PR eliminates the multiple calls to S3. Is not very clear to us. >>>>> >>>> Thank you. >>>>> >>>>> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>>> My recommendation is to cherry-pick this PR [1] at top of your Flink >>>>>> distro when possible. >>>>>> Additionally turn off state compression. These should do the trick... >>>>>> >>>>>> [1] https://github.com/apache/flink/pull/25509 >>>>>> >>>>>> G >>>>>> >>>>>> >>>>>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace < >>>>>> theanonymous31...@gmail.com> wrote: >>>>>> >>>>>>> Thank you Gabor for your reply. >>>>>>> >>>>>>> I'm sharing below more findings for both uncompressed and compressed >>>>>>> state with the hope it helps. I'm looking further to your thoughts. >>>>>>> >>>>>>> 1. uncompressed state - observe the >>>>>>> `stateHandle=RelativeFileStateHandle` >>>>>>> ``` >>>>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation >>>>>>> [] - Finished restoring from state handle: >>>>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, >>>>>>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State: >>>>>>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-..., >>>>>>> 12345678-... [... bytes]}. >>>>>>> ``` >>>>>>> >>>>>>> `FSDataInputStream in.delegate` in >>>>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues` >>>>>>> is an instance of >>>>>>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`. >>>>>>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an >>>>>>> actual partial file read which in our case ends in order of millions >>>>>>> because of high job parallelism (subtasks) and job can't recover. >>>>>>> >>>>>>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle >>>>>>> ``` >>>>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation >>>>>>> [] - Finished restoring from state handle: >>>>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, >>>>>>> endKeyGroup=31}}, >>>>>>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...', >>>>>>> dataBytes=...}}. >>>>>>> ``` >>>>>>> `FSDataInputStream in.delegate` in >>>>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues` >>>>>>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream) >>>>>>> This means that for every `offset: offsets = metaInfo.getOffsets()` >>>>>>> we end up doing a read from a `byte[]` which are faster. >>>>>>> >>>>>>> At this point I don't understand how not doing the `skip` operation >>>>>>> in case of uncompressed state can work, since skip is required for the >>>>>>> partial reads, and I apologise if I'm wrong, I don't have the same >>>>>>> level of >>>>>>> understanding as you have. >>>>>>> >>>>>>> What we considered doing was to find a way to actually cache the >>>>>>> file as a byte[] and do the reads from memory ... but it seems the state >>>>>>> compression is doing the same. We are in the process of testing state >>>>>>> compression under production volumes ... can't say how that will >>>>>>> actually >>>>>>> work for us. >>>>>>> >>>>>>> Thank you again for looking into this. I'm looking forward for your >>>>>>> thoughts. Please let me know if I missed or misunderstood something. >>>>>>> Please >>>>>>> let us know your recommendation. >>>>>>> >>>>>>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi < >>>>>>> gabor.g.somo...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi William, >>>>>>>> >>>>>>>> It's a bit old question but I think now we know why this is >>>>>>>> happening. Please see [1] for further details. >>>>>>>> It's an important requirement to use uncompressed state because >>>>>>>> even with the fix compressed state is still problematic. >>>>>>>> >>>>>>>> We've already tested the PR with load but if you can report back it >>>>>>>> would be helpful. >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-36530 >>>>>>>> >>>>>>>> BR, >>>>>>>> G >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace < >>>>>>>> theanonymous31...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Context >>>>>>>>> >>>>>>>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We >>>>>>>>> consume data from ~ 40k Kafka topic partitions in some environments. >>>>>>>>> We are >>>>>>>>> using aligned checkpoints. We set state.storage.fs.memory-threshold: >>>>>>>>> 500kb. >>>>>>>>> >>>>>>>>> Problem >>>>>>>>> >>>>>>>>> At the point when the state for operator using >>>>>>>>> topic-partition-offset-states doesn’t fit in the >>>>>>>>> state.storage.fs.memory-threshold, we end up with a proportionally >>>>>>>>> high >>>>>>>>> number of reads for the checkpoint and savepoint files for each of the >>>>>>>>> topic partition offsets. >>>>>>>>> >>>>>>>>> For example when we have: >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG >>>>>>>>> org.apache.flink.runtime.state.TaskStateManagerImpl [] - >>>>>>>>> Operator >>>>>>>>> 8992e27ae82755cac12dd37f518df782 has remote state >>>>>>>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={ >>>>>>>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, >>>>>>>>> 459, 504, 549 …(offsets is a list 40k elements) >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> For each of the metadata offsets we will have S3 reads for >>>>>>>>> checkpoint/savepoint. The Flink job fails to resume from >>>>>>>>> checkpoint. With debug logs, we see hundred of thousands of AWS GET >>>>>>>>> calls >>>>>>>>> for the same checkpoint file, with different offsets. These AWS calls >>>>>>>>> take >>>>>>>>> such a long time, that our application fails to start and job crashes >>>>>>>>> and >>>>>>>>> starts same reads again and crashes again. >>>>>>>>> >>>>>>>>> >>>>>>>>> We will have: >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request >>>>>>>>> [] - Sending Request: GET >>>>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 >>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: >>>>>>>>> application/octet-stream, >>>>>>>>> Range: bytes=234-9223372036854775806, User-Agent: , >>>>>>>>> cfg/retry-mode/legacy, presto, ) >>>>>>>>> >>>>>>>>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request >>>>>>>>> [] - Sending Request: GET >>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: >>>>>>>>> application/octet-stream, >>>>>>>>> Range: bytes=234-9223372036854775806, User-Agent: , >>>>>>>>> cfg/retry-mode/legacy, presto, ) >>>>>>>>> >>>>>>>>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request >>>>>>>>> [] - Sending Request: GET >>>>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 >>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: >>>>>>>>> application/octet-stream, >>>>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , >>>>>>>>> cfg/retry-mode/legacy, >>>>>>>>> presto, ) >>>>>>>>> >>>>>>>>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request >>>>>>>>> [] - Sending Request: GET >>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: >>>>>>>>> application/octet-stream, >>>>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , >>>>>>>>> cfg/retry-mode/legacy, >>>>>>>>> presto, ) >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> Code which does the multiple reads was isolated to: >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> >>>>>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues >>>>>>>>> >>>>>>>>> private <S> void deserializeOperatorStateValues( >>>>>>>>> >>>>>>>>> PartitionableListState<S> stateListForName, >>>>>>>>> >>>>>>>>> FSDataInputStream in, >>>>>>>>> >>>>>>>>> OperatorStateHandle.StateMetaInfo metaInfo) >>>>>>>>> >>>>>>>>> throws IOException { >>>>>>>>> >>>>>>>>> if (null != metaInfo) { >>>>>>>>> >>>>>>>>> long[] offsets = metaInfo.getOffsets(); >>>>>>>>> >>>>>>>>> if (null != offsets) { >>>>>>>>> >>>>>>>>> DataInputView div = new >>>>>>>>> DataInputViewStreamWrapper(in); >>>>>>>>> >>>>>>>>> TypeSerializer<S> serializer = >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> stateListForName.getStateMetaInfo().getPartitionStateSerializer(); >>>>>>>>> >>>>>>>>> for (long offset : offsets) { >>>>>>>>> >>>>>>>>> in.seek(offset); >>>>>>>>> >>>>>>>>> >>>>>>>>> stateListForName.add(serializer.deserialize(div)); >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> Questions: >>>>>>>>> >>>>>>>>> 1. >>>>>>>>> >>>>>>>>> Please review the behaviour from above and advise if this is >>>>>>>>> expected? >>>>>>>>> 2. >>>>>>>>> >>>>>>>>> The reads for the topic-partition-offset-states are similar to: >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> Sending Request: GET >>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: >>>>>>>>> application/octet-stream, >>>>>>>>> Range: bytes=234-9223372036854775806 >>>>>>>>> >>>>>>>>> Sending Request: GET >>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: >>>>>>>>> application/octet-stream, >>>>>>>>> Range: bytes=279-9223372036854775806 >>>>>>>>> >>>>>>>>> And so on >>>>>>>>> >>>>>>>>> {code} >>>>>>>>> >>>>>>>>> Could this behaviour be optimised in Flink to reduce the number of >>>>>>>>> reads and avoid reading the same data multiple times. Although the >>>>>>>>> range >>>>>>>>> start is changing slightly across calls, the range end is the same >>>>>>>>> (max >>>>>>>>> long int) entire file being retrieved. Do we need to retrieve each >>>>>>>>> offset >>>>>>>>> individually or this could be optimised in Flink to only have one >>>>>>>>> call then >>>>>>>>> then use the data accordingly. >>>>>>>>> >>>>>>>>> 3. >>>>>>>>> >>>>>>>>> Currently we set state.storage.fs.memory-threshold: 700kb to >>>>>>>>> avoid the problem but we expect the number of topic partitions to >>>>>>>>> increase, >>>>>>>>> requiring a further increase for this value. What are our options >>>>>>>>> once we >>>>>>>>> reach the 1MB limit? >>>>>>>>> >>>>>>>>> >>>>>>>>> Thank you. >>>>>>>>> >>>>>>>>