Hi Mate,

Thanks for the deep dive! I've had a slight look at the code and it makes
sense why you and William is not seeing slowness with compressed state.
Tomorrow I'll do some tests and come back with the results...

@William Wallace <theanonymous31...@gmail.com> I think the restore should
work without the memory-threshold setting with compression.
When compression is off then my PR is going to be the cure🙂

G


On Thu, Oct 17, 2024 at 8:06 PM Mate Czagany <czmat...@gmail.com> wrote:

> Hi William,
>
> I think your findings are correct, I could easily reproduce the issue with
> snapshot-compression set to false, but I was unable to with
> snapshot-compression set to true.
>
> When using compressed state, the available() call will return the number
> of bytes in the Snappy internal buffer that has not been decompressed yet
> [1] [2]. It is safe to skip bytes here, as this will not cause the next
> call to seek() to seek backwards.
>
> When not using compressed state, the available() call will return the
> number of bytes buffered by the underlying BufferedInputStream which
> buffers data from the filesystem, e.g. S3. In my tests the buffer size was
> 4096, and if Flink read e.g. 50 bytes of data, the result of available()
> was then 4046. Skipping 4046 bytes (or any number of bytes) meant that for
> the next seek() call the buffer had to seek backwards, and for S3 that
> meant closing and re-opening the stream, resulting in a new GET request for
> each element in the list state.
>
> I think Gabor's PR [3] is the right solution, I can't think of any
> situation where we would have to skip any bytes in the stream when not
> using compressed state. I also think that compressed state is not affected
> by this.
>
> [1]
> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L313
> [2]
> https://github.com/xerial/snappy-java/blob/9f8c3cf74223ed0a8a834134be9c917b9f10ceb5/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java#L566
> [3] https://github.com/apache/flink/pull/25509
>
> Best,
> Mate
>
> William Wallace <theanonymous31...@gmail.com> ezt írta (időpont: 2024.
> okt. 17., Cs, 19:23):
>
>> Hi G,
>>
>> We did a test today using
>> ```
>> execution.checkpointing.snapshot-compression: true
>> state.storage.fs.memory-threshold: 500kb
>> ```
>> across 6 jobs with different parallelism and volume load.
>> I will use one as an example - 70 slots - I had 70 files of 670kb
>> corresponding to the subtask state containing the KafkaSource operator. In
>> total compressed savepoint size was around 360MB, 201 files, biggest was
>> ~11MB. Job restored ok from checkpoint and savepoint not seeing the
>> millions of reads we were observing before. (forced stop/run few times)
>>
>> We decided to let this soak for some time since our checkpoints can reach
>> more than 10GB (uncompressed).
>>
>> Let me know if you have any updates. I will let you know if I observe
>> anything else.
>> Please let us know if you have any new findings.
>>
>> Thank you.
>>
>> On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> Could you please let us know if you see anything wrong when using
>>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>>> seems to have solved the multiple S3 reads issue.
>>>>
>>> When something is working it's never wrong. The question is why is has
>>> been resolved.
>>> Are you still having state.storage.fs.memory-threshold set to 500Kb?
>>> State compression may reduce the state under this threshold which would
>>> make that work.
>>>
>>> For uncompressed state could you please let us know how the change from
>>>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>>>
>>> Copy from the PR:
>>> Flink state restore from S3 is super slow because skip function is
>>> consuming ~15 seconds for ~6Mb of data.
>>> ...
>>> In this PR the skip going to be called only in case of compression
>>> because otherwise a stream is seekable.
>>>
>>> G
>>>
>>> On Tue, Oct 15, 2024 at 4:30 PM William Wallace <
>>> theanonymous31...@gmail.com> wrote:
>>>
>>>> Thank you for the recommendation and the help.
>>>>
>>>> Could you please let us know if you see anything wrong when using
>>>> `execution.checkpointing.snapshot-compression: true` since for us this
>>>> seems to have solved the multiple S3 reads issue.
>>>>
>>> In debug we see:
>>>> `in.delegate =
>>>> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`
>>>>
>>>> and
>>>> `in.compressionDelegate = SnappyFramedInputStream`
>>>> and  in the logs a file is retrieved only once per subtask
>>>> ```
>>>> DEBUG com.amazonaws.request                                        [] -
>>>> Sending Request: GET 
>>>> https://.../savepoints/flink-compression/.../savepoint-...
>>>> Range: bytes=0-9223372036854775806.
>>>> ```
>>>>
>>>> For uncompressed state could you please let us know how the change from
>>>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>>>
>>> Thank you.
>>>>
>>>> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com> wrote:
>>>>
>>>>> My recommendation is to cherry-pick this PR [1] at top of your Flink
>>>>> distro when possible.
>>>>> Additionally turn off state compression. These should do the trick...
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/25509
>>>>>
>>>>> G
>>>>>
>>>>>
>>>>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace <
>>>>> theanonymous31...@gmail.com> wrote:
>>>>>
>>>>>> Thank you Gabor for your reply.
>>>>>>
>>>>>> I'm sharing below more findings for both uncompressed and compressed
>>>>>> state with the hope it helps. I'm looking further to your thoughts.
>>>>>>
>>>>>> 1. uncompressed state - observe the
>>>>>> `stateHandle=RelativeFileStateHandle`
>>>>>> ```
>>>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation
>>>>>> [] - Finished restoring from state handle:
>>>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>>>>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
>>>>>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
>>>>>> 12345678-... [... bytes]}.
>>>>>>  ```
>>>>>>
>>>>>> `FSDataInputStream in.delegate` in
>>>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>>>>> is an instance of
>>>>>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
>>>>>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
>>>>>> actual partial file read which in our case ends in order of millions
>>>>>> because of high job parallelism (subtasks) and job can't recover.
>>>>>>
>>>>>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
>>>>>> ```
>>>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation
>>>>>> [] - Finished restoring from state handle:
>>>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>>>>> endKeyGroup=31}},
>>>>>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
>>>>>> dataBytes=...}}.
>>>>>> ```
>>>>>> `FSDataInputStream in.delegate` in
>>>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>>>>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
>>>>>> This means that for every `offset: offsets = metaInfo.getOffsets()`
>>>>>> we end up doing a read from a `byte[]` which are faster.
>>>>>>
>>>>>> At this point I don't understand how not doing the `skip` operation
>>>>>> in case of uncompressed state can work, since skip is required for the
>>>>>> partial reads, and I apologise if I'm wrong, I don't have the same level 
>>>>>> of
>>>>>> understanding as you have.
>>>>>>
>>>>>> What we considered doing was to find a way to actually cache the file
>>>>>> as a byte[] and do the reads from memory ... but it seems the state
>>>>>> compression is doing the same. We are in the process of testing state
>>>>>> compression under production volumes ... can't say how that will actually
>>>>>> work for us.
>>>>>>
>>>>>> Thank you again for looking into this. I'm looking forward for your
>>>>>> thoughts. Please let me know if I missed or misunderstood something. 
>>>>>> Please
>>>>>> let us know your recommendation.
>>>>>>
>>>>>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi <
>>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi William,
>>>>>>>
>>>>>>> It's a bit old question but I think now we know why this is
>>>>>>> happening. Please see [1] for further details.
>>>>>>> It's an important requirement to use uncompressed state because even
>>>>>>> with the fix compressed state is still problematic.
>>>>>>>
>>>>>>> We've already tested the PR with load but if you can report back it
>>>>>>> would be helpful.
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-36530
>>>>>>>
>>>>>>> BR,
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
>>>>>>> theanonymous31...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Context
>>>>>>>>
>>>>>>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We
>>>>>>>> consume data from ~ 40k Kafka topic partitions in some environments. 
>>>>>>>> We are
>>>>>>>> using aligned checkpoints. We set state.storage.fs.memory-threshold: 
>>>>>>>> 500kb.
>>>>>>>>
>>>>>>>> Problem
>>>>>>>>
>>>>>>>> At the point when the state for operator using
>>>>>>>> topic-partition-offset-states doesn’t fit in the
>>>>>>>> state.storage.fs.memory-threshold, we end up with a proportionally high
>>>>>>>> number of reads for the checkpoint and savepoint files for each of the
>>>>>>>> topic partition offsets.
>>>>>>>>
>>>>>>>> For example when we have:
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG
>>>>>>>> org.apache.flink.runtime.state.TaskStateManagerImpl          [] - 
>>>>>>>> Operator
>>>>>>>> 8992e27ae82755cac12dd37f518df782 has remote state
>>>>>>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
>>>>>>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414,
>>>>>>>> 459, 504, 549 …(offsets is a list 40k elements)
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>> For each of the metadata offsets we will have S3 reads for
>>>>>>>> checkpoint/savepoint. The Flink job fails to resume from
>>>>>>>> checkpoint. With debug logs, we see hundred of thousands of AWS GET 
>>>>>>>> calls
>>>>>>>> for the same checkpoint file, with different offsets. These AWS calls 
>>>>>>>> take
>>>>>>>> such a long time, that our application fails to start and job crashes 
>>>>>>>> and
>>>>>>>> starts same reads again and crashes again.
>>>>>>>>
>>>>>>>>
>>>>>>>> We will have:
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
>>>>>>>>                               [] - Sending Request: GET
>>>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>> application/octet-stream,
>>>>>>>> Range: bytes=234-9223372036854775806, User-Agent:  ,
>>>>>>>> cfg/retry-mode/legacy, presto, )
>>>>>>>>
>>>>>>>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request
>>>>>>>>                               [] - Sending Request: GET
>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>> application/octet-stream,
>>>>>>>> Range: bytes=234-9223372036854775806, User-Agent: ,
>>>>>>>> cfg/retry-mode/legacy, presto, )
>>>>>>>>
>>>>>>>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request
>>>>>>>>                               [] - Sending Request: GET
>>>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>> application/octet-stream,
>>>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , 
>>>>>>>> cfg/retry-mode/legacy,
>>>>>>>> presto, )
>>>>>>>>
>>>>>>>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request
>>>>>>>>                               [] - Sending Request: GET
>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>> application/octet-stream,
>>>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , 
>>>>>>>> cfg/retry-mode/legacy,
>>>>>>>> presto, )
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>> Code which does the multiple reads was isolated to:
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues
>>>>>>>>
>>>>>>>>  private <S> void deserializeOperatorStateValues(
>>>>>>>>
>>>>>>>>             PartitionableListState<S> stateListForName,
>>>>>>>>
>>>>>>>>             FSDataInputStream in,
>>>>>>>>
>>>>>>>>             OperatorStateHandle.StateMetaInfo metaInfo)
>>>>>>>>
>>>>>>>>             throws IOException {
>>>>>>>>
>>>>>>>>         if (null != metaInfo) {
>>>>>>>>
>>>>>>>>             long[] offsets = metaInfo.getOffsets();
>>>>>>>>
>>>>>>>>             if (null != offsets) {
>>>>>>>>
>>>>>>>>                 DataInputView div = new
>>>>>>>> DataInputViewStreamWrapper(in);
>>>>>>>>
>>>>>>>>                 TypeSerializer<S> serializer =
>>>>>>>>
>>>>>>>>
>>>>>>>>                         
>>>>>>>> stateListForName.getStateMetaInfo().getPartitionStateSerializer();
>>>>>>>>
>>>>>>>>                 for (long offset : offsets) {
>>>>>>>>
>>>>>>>>                     in.seek(offset);
>>>>>>>>
>>>>>>>>
>>>>>>>>                     stateListForName.add(serializer.deserialize(div));
>>>>>>>>
>>>>>>>>                 }
>>>>>>>>
>>>>>>>>             }
>>>>>>>>
>>>>>>>>         }
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>> Questions:
>>>>>>>>
>>>>>>>>    1.
>>>>>>>>
>>>>>>>>    Please review the behaviour from above and advise if this is
>>>>>>>>    expected?
>>>>>>>>    2.
>>>>>>>>
>>>>>>>>    The reads for the topic-partition-offset-states are similar to:
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>> Sending Request: GET
>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>> application/octet-stream,
>>>>>>>> Range: bytes=234-9223372036854775806
>>>>>>>>
>>>>>>>> Sending Request: GET
>>>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: 
>>>>>>>> application/octet-stream,
>>>>>>>> Range: bytes=279-9223372036854775806
>>>>>>>>
>>>>>>>> And so on
>>>>>>>>
>>>>>>>> {code}
>>>>>>>>
>>>>>>>> Could this behaviour be optimised in Flink to reduce the number of
>>>>>>>> reads and avoid reading the same data multiple times. Although the 
>>>>>>>> range
>>>>>>>> start is changing slightly across calls, the range end is the same (max
>>>>>>>> long int) entire file being retrieved. Do we need to retrieve each 
>>>>>>>> offset
>>>>>>>> individually or this could be optimised in Flink to only have one call 
>>>>>>>> then
>>>>>>>> then use the data accordingly.
>>>>>>>>
>>>>>>>>    3.
>>>>>>>>
>>>>>>>>    Currently we set state.storage.fs.memory-threshold: 700kb to
>>>>>>>>    avoid the problem but we expect the number of topic partitions to 
>>>>>>>> increase,
>>>>>>>>    requiring a further increase for this value. What are our options 
>>>>>>>> once we
>>>>>>>>    reach the 1MB limit?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you.
>>>>>>>>
>>>>>>>

Reply via email to