Hi William and Mate,

I've just tested compressed and uncompressed restore with the same amount
of data and the result matches with yours:
- Compressed restore operation performing well because snappy skips data
only for a single block which is relatively small
- Uncompressed restore operation performing well after applied my PR [1]

After it's going to be merged we can say both compressed and uncompressed
state is safe to use.

Thanks for everybody for the efforts to sort this out!

[1] https://github.com/apache/flink/pull/25509

G


On Thu, Oct 17, 2024 at 11:06 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Mate,
>
> Thanks for the deep dive! I've had a slight look at the code and it makes
> sense why you and William is not seeing slowness with compressed state.
> Tomorrow I'll do some tests and come back with the results...
>
> @William Wallace <theanonymous31...@gmail.com> I think the restore should
> work without the memory-threshold setting with compression.
> When compression is off then my PR is going to be the cure🙂
>
> G
>
>
> On Thu, Oct 17, 2024 at 8:06 PM Mate Czagany <czmat...@gmail.com> wrote:
>
>> Hi William,
>>
>> I think your findings are correct, I could easily reproduce the issue
>> with snapshot-compression set to false, but I was unable to with
>> snapshot-compression set to true.
>>
>> When using compressed state, the available() call will return the number
>> of bytes in the Snappy internal buffer that has not been decompressed yet
>> [1] [2]. It is safe to skip bytes here, as this will not cause the next
>> call to seek() to seek backwards.
>>
>> When not using compressed state, the available() call will return the
>> number of bytes buffered by the underlying BufferedInputStream which
>> buffers data from the filesystem, e.g. S3. In my tests the buffer size was
>> 4096, and if Flink read e.g. 50 bytes of data, the result of available()
>> was then 4046. Skipping 4046 bytes (or any number of bytes) meant that for
>> the next seek() call the buffer had to seek backwards, and for S3 that
>> meant closing and re-opening the stream, resulting in a new GET request for
>> each element in the list state.
>>
>> I think Gabor's PR [3] is the right solution, I can't think of any
>> situation where we would have to skip any bytes in the stream when not
>> using compressed state. I also think that compressed state is not affected
>> by this.
>>
>> [1]
>> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L313
>> [2]
>> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L566
>> [3] https://github.com/apache/flink/pull/25509
>>
>> Best,
>> Mate
>>
>> William Wallace <theanonymous31...@gmail.com> ezt írta (időpont: 2024.
>> okt. 17., Cs, 19:23):
>>
>>> Hi G,
>>>
>>> We did a test today using
>>> ```
>>> execution.checkpointing.snapshot-compression: true
>>> state.storage.fs.memory-threshold: 500kb
>>> ```
>>> across 6 jobs with different parallelism and volume load.
>>> I will use one as an example - 70 slots - I had 70 files of 670kb
>>> corresponding to the subtask state containing the KafkaSource operator. In
>>> total compressed savepoint size was around 360MB, 201 files, biggest was
>>> ~11MB. Job restored ok from checkpoint and savepoint not seeing the
>>> millions of reads we were observing before. (forced stop/run few times)
>>>
>>> We decided to let this soak for some time since our checkpoints can
>>> reach more than 10GB (uncompressed).
>>>
>>> Let me know if you have any updates. I will let you know if I observe
>>> anything else.
>>> Please let us know if you have any new findings.
>>>
>>> Thank you.
>>>
>>> On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Could you please let us know if you see anything wrong when using
>>>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>>>> seems to have solved the multiple S3 reads issue.
>>>>>
>>>> When something is working it's never wrong. The question is why is has
>>>> been resolved.
>>>> Are you still having state.storage.fs.memory-threshold set to 500Kb?
>>>> State compression may reduce the state under this threshold which would
>>>> make that work.
>>>>
>>>> For uncompressed state could you please let us know how the change from
>>>>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>>>>
>>>> Copy from the PR:
>>>> Flink state restore from S3 is super slow because skip function is
>>>> consuming ~15 seconds for ~6Mb of data.
>>>> ...
>>>> In this PR the skip going to be called only in case of compression
>>>> because otherwise a stream is seekable.
>>>>
>>>> G
>>>>
>>>> On Tue, Oct 15, 2024 at 4:30 PM William Wallace <
>>>> theanonymous31...@gmail.com> wrote:
>>>>
>>>>> Thank you for the recommendation and the help.
>>>>>
>>>>> Could you please let us know if you see anything wrong when using
>>>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>>>> seems to have solved the multiple S3 reads issue.
>>>>>
>>>> In debug we see:
>>>>> `in.delegate =
>>>>> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`
>>>>>
>>>>> and
>>>>> `in.compressionDelegate = SnappyFramedInputStream`
>>>>> and  in the logs a file is retrieved only once per subtask
>>>>> ```
>>>>> DEBUG com.amazonaws.request                                        []
>>>>> - Sending Request: GET 
>>>>> https://.../savepoints/flink-compression/.../savepoint-...
>>>>> Range: bytes=0-9223372036854775806.
>>>>> ```
>>>>>
>>>>> For uncompressed state could you please let us know how the change
>>>>> from your PR eliminates the multiple calls to S3. Is not very clear to us.
>>>>>
>>>> Thank you.
>>>>>
>>>>> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>>> My recommendation is to cherry-pick this PR [1] at top of your Flink
>>>>>> distro when possible.
>>>>>> Additionally turn off state compression. These should do the trick...
>>>>>>
>>>>>> [1] https://github.com/apache/flink/pull/25509
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace <
>>>>>> theanonymous31...@gmail.com> wrote:
>>>>>>
>>>>>>> Thank you Gabor for your reply.
>>>>>>>
>>>>>>> I'm sharing below more findings for both uncompressed and compressed
>>>>>>> state with the hope it helps. I'm looking further to your thoughts.
>>>>>>>
>>>>>>> 1. uncompressed state - observe the
>>>>>>> `stateHandle=RelativeFileStateHandle`
>>>>>>> ```
>>>>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation
>>>>>>> [] - Finished restoring from state handle:
>>>>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>>>>>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
>>>>>>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
>>>>>>> 12345678-... [... bytes]}.
>>>>>>>  ```
>>>>>>>
>>>>>>> `FSDataInputStream in.delegate` in
>>>>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>>>>>> is an instance of
>>>>>>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
>>>>>>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
>>>>>>> actual partial file read which in our case ends in order of millions
>>>>>>> because of high job parallelism (subtasks) and job can't recover.
>>>>>>>
>>>>>>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
>>>>>>> ```
>>>>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation
>>>>>>> [] - Finished restoring from state handle:
>>>>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>>>>>> endKeyGroup=31}},
>>>>>>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
>>>>>>> dataBytes=...}}.
>>>>>>> ```
>>>>>>> `FSDataInputStream in.delegate` in
>>>>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>>>>>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
>>>>>>> This means that for every `offset: offsets = metaInfo.getOffsets()`
>>>>>>> we end up doing a read from a `byte[]` which are faster.
>>>>>>>
>>>>>>> At this point I don't understand how not doing the `skip` operation
>>>>>>> in case of uncompressed state can work, since skip is required for the
>>>>>>> partial reads, and I apologise if I'm wrong, I don't have the same 
>>>>>>> level of
>>>>>>> understanding as you have.
>>>>>>>
>>>>>>> What we considered doing was to find a way to actually cache the
>>>>>>> file as a byte[] and do the reads from memory ... but it seems the state
>>>>>>> compression is doing the same. We are in the process of testing state
>>>>>>> compression under production volumes ... can't say how that will 
>>>>>>> actually
>>>>>>> work for us.
>>>>>>>
>>>>>>> Thank you again for looking into this. I'm looking forward for your
>>>>>>> thoughts. Please let me know if I missed or misunderstood something. 
>>>>>>> Please
>>>>>>> let us know your recommendation.
>>>>>>>
>>>>>>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi <
>>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi William,
>>>>>>>>
>>>>>>>> It's a bit old question but I think now we know why this is
>>>>>>>> happening. Please see [1] for further details.
>>>>>>>> It's an important requirement to use uncompressed state because
>>>>>>>> even with the fix compressed state is still problematic.
>>>>>>>>
>>>>>>>> We've already tested the PR with load but if you can report back it
>>>>>>>> would be helpful.
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-36530
>>>>>>>>
>>>>>>>> BR,
>>>>>>>> G
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
>>>>>>>> theanonymous31...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Context
>>>>>>>>>
>>>>>>>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We
>>>>>>>>> consume data from ~ 40k Kafka topic partitions in some environments. 
>>>>>>>>> We are
>>>>>>>>> using aligned checkpoints. We set state.storage.fs.memory-threshold: 
>>>>>>>>> 500kb.
>>>>>>>>>
>>>>>>>>> Problem
>>>>>>>>>
>>>>>>>>> At the point when the state for operator using
>>>>>>>>> topic-partition-offset-states doesn’t fit in the
>>>>>>>>> state.storage.fs.memory-threshold, we end up with a proportionally 
>>>>>>>>> high
>>>>>>>>> number of reads for the checkpoint and savepoint files for each of the
>>>>>>>>> topic partition offsets.
>>>>>>>>>
>>>>>>>>> For example when we have:
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG
>>>>>>>>> org.apache.flink.runtime.state.TaskStateManagerImpl          [] - 
>>>>>>>>> Operator
>>>>>>>>> 8992e27ae82755cac12dd37f518df782 has remote state
>>>>>>>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
>>>>>>>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414,
>>>>>>>>> 459, 504, 549 …(offsets is a list 40k elements)
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>> For each of the metadata offsets we will have S3 reads for
>>>>>>>>> checkpoint/savepoint. The Flink job fails to resume from
>>>>>>>>> checkpoint. With debug logs, we see hundred of thousands of AWS GET 
>>>>>>>>> calls
>>>>>>>>> for the same checkpoint file, with different offsets. These AWS calls 
>>>>>>>>> take
>>>>>>>>> such a long time, that our application fails to start and job crashes 
>>>>>>>>> and
>>>>>>>>> starts same reads again and crashes again.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We will have:
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
>>>>>>>>>                                 [] - Sending Request: GET
>>>>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>>> application/octet-stream,
>>>>>>>>> Range: bytes=234-9223372036854775806, User-Agent:  ,
>>>>>>>>> cfg/retry-mode/legacy, presto, )
>>>>>>>>>
>>>>>>>>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request
>>>>>>>>>                                 [] - Sending Request: GET
>>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>>> application/octet-stream,
>>>>>>>>> Range: bytes=234-9223372036854775806, User-Agent: ,
>>>>>>>>> cfg/retry-mode/legacy, presto, )
>>>>>>>>>
>>>>>>>>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request
>>>>>>>>>                                 [] - Sending Request: GET
>>>>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>>> application/octet-stream,
>>>>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , 
>>>>>>>>> cfg/retry-mode/legacy,
>>>>>>>>> presto, )
>>>>>>>>>
>>>>>>>>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request
>>>>>>>>>                                 [] - Sending Request: GET
>>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>>> application/octet-stream,
>>>>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , 
>>>>>>>>> cfg/retry-mode/legacy,
>>>>>>>>> presto, )
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>> Code which does the multiple reads was isolated to:
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues
>>>>>>>>>
>>>>>>>>>  private <S> void deserializeOperatorStateValues(
>>>>>>>>>
>>>>>>>>>             PartitionableListState<S> stateListForName,
>>>>>>>>>
>>>>>>>>>             FSDataInputStream in,
>>>>>>>>>
>>>>>>>>>             OperatorStateHandle.StateMetaInfo metaInfo)
>>>>>>>>>
>>>>>>>>>             throws IOException {
>>>>>>>>>
>>>>>>>>>         if (null != metaInfo) {
>>>>>>>>>
>>>>>>>>>             long[] offsets = metaInfo.getOffsets();
>>>>>>>>>
>>>>>>>>>             if (null != offsets) {
>>>>>>>>>
>>>>>>>>>                 DataInputView div = new
>>>>>>>>> DataInputViewStreamWrapper(in);
>>>>>>>>>
>>>>>>>>>                 TypeSerializer<S> serializer =
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                         
>>>>>>>>> stateListForName.getStateMetaInfo().getPartitionStateSerializer();
>>>>>>>>>
>>>>>>>>>                 for (long offset : offsets) {
>>>>>>>>>
>>>>>>>>>                     in.seek(offset);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>                     stateListForName.add(serializer.deserialize(div));
>>>>>>>>>
>>>>>>>>>                 }
>>>>>>>>>
>>>>>>>>>             }
>>>>>>>>>
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>> Questions:
>>>>>>>>>
>>>>>>>>>    1.
>>>>>>>>>
>>>>>>>>>    Please review the behaviour from above and advise if this is
>>>>>>>>>    expected?
>>>>>>>>>    2.
>>>>>>>>>
>>>>>>>>>    The reads for the topic-partition-offset-states are similar to:
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>> Sending Request: GET
>>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>>> application/octet-stream,
>>>>>>>>> Range: bytes=234-9223372036854775806
>>>>>>>>>
>>>>>>>>> Sending Request: GET
>>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>>> application/octet-stream,
>>>>>>>>> Range: bytes=279-9223372036854775806
>>>>>>>>>
>>>>>>>>> And so on
>>>>>>>>>
>>>>>>>>> {code}
>>>>>>>>>
>>>>>>>>> Could this behaviour be optimised in Flink to reduce the number of
>>>>>>>>> reads and avoid reading the same data multiple times. Although the 
>>>>>>>>> range
>>>>>>>>> start is changing slightly across calls, the range end is the same 
>>>>>>>>> (max
>>>>>>>>> long int) entire file being retrieved. Do we need to retrieve each 
>>>>>>>>> offset
>>>>>>>>> individually or this could be optimised in Flink to only have one 
>>>>>>>>> call then
>>>>>>>>> then use the data accordingly.
>>>>>>>>>
>>>>>>>>>    3.
>>>>>>>>>
>>>>>>>>>    Currently we set state.storage.fs.memory-threshold: 700kb to
>>>>>>>>>    avoid the problem but we expect the number of topic partitions to 
>>>>>>>>> increase,
>>>>>>>>>    requiring a further increase for this value. What are our options 
>>>>>>>>> once we
>>>>>>>>>    reach the 1MB limit?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thank you.
>>>>>>>>>
>>>>>>>>

Reply via email to