Hi G,

We did a test today using
```
execution.checkpointing.snapshot-compression: true
state.storage.fs.memory-threshold: 500kb
```
across 6 jobs with different parallelism and volume load.
I will use one as an example - 70 slots - I had 70 files of 670kb
corresponding to the subtask state containing the KafkaSource operator. In
total compressed savepoint size was around 360MB, 201 files, biggest was
~11MB. Job restored ok from checkpoint and savepoint not seeing the
millions of reads we were observing before. (forced stop/run few times)

We decided to let this soak for some time since our checkpoints can reach
more than 10GB (uncompressed).

Let me know if you have any updates. I will let you know if I observe
anything else.
Please let us know if you have any new findings.

Thank you.

On Tue, Oct 15, 2024 at 4:38 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Could you please let us know if you see anything wrong when using
>> `execution.checkpointing.snapshot-compression: true` since for us this
>> seems to have solved the multiple S3 reads issue.
>>
> When something is working it's never wrong. The question is why is has
> been resolved.
> Are you still having state.storage.fs.memory-threshold set to 500Kb? State
> compression may reduce the state under this threshold which would make that
> work.
>
> For uncompressed state could you please let us know how the change from
>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>
> Copy from the PR:
> Flink state restore from S3 is super slow because skip function is
> consuming ~15 seconds for ~6Mb of data.
> ...
> In this PR the skip going to be called only in case of compression because
> otherwise a stream is seekable.
>
> G
>
> On Tue, Oct 15, 2024 at 4:30 PM William Wallace <
> theanonymous31...@gmail.com> wrote:
>
>> Thank you for the recommendation and the help.
>>
>> Could you please let us know if you see anything wrong when using
>> `execution.checkpointing.snapshot-compression: true` since for us this
>> seems to have solved the multiple S3 reads issue.
>>
> In debug we see:
>> `in.delegate =
>> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`
>>
>> and
>> `in.compressionDelegate = SnappyFramedInputStream`
>> and  in the logs a file is retrieved only once per subtask
>> ```
>> DEBUG com.amazonaws.request                                        [] -
>> Sending Request: GET 
>> https://.../savepoints/flink-compression/.../savepoint-...
>> Range: bytes=0-9223372036854775806.
>> ```
>>
>> For uncompressed state could you please let us know how the change from
>> your PR eliminates the multiple calls to S3. Is not very clear to us.
>>
> Thank you.
>>
>> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> My recommendation is to cherry-pick this PR [1] at top of your Flink
>>> distro when possible.
>>> Additionally turn off state compression. These should do the trick...
>>>
>>> [1] https://github.com/apache/flink/pull/25509
>>>
>>> G
>>>
>>>
>>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace <
>>> theanonymous31...@gmail.com> wrote:
>>>
>>>> Thank you Gabor for your reply.
>>>>
>>>> I'm sharing below more findings for both uncompressed and compressed
>>>> state with the hope it helps. I'm looking further to your thoughts.
>>>>
>>>> 1. uncompressed state - observe the
>>>> `stateHandle=RelativeFileStateHandle`
>>>> ```
>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation []
>>>> - Finished restoring from state handle:
>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
>>>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
>>>> 12345678-... [... bytes]}.
>>>>  ```
>>>>
>>>> `FSDataInputStream in.delegate` in
>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>>> is an instance of
>>>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
>>>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
>>>> actual partial file read which in our case ends in order of millions
>>>> because of high job parallelism (subtasks) and job can't recover.
>>>>
>>>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
>>>> ```
>>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation []
>>>> - Finished restoring from state handle:
>>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>>> endKeyGroup=31}},
>>>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
>>>> dataBytes=...}}.
>>>> ```
>>>> `FSDataInputStream in.delegate` in
>>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
>>>> This means that for every `offset: offsets = metaInfo.getOffsets()` we
>>>> end up doing a read from a `byte[]` which are faster.
>>>>
>>>> At this point I don't understand how not doing the `skip` operation in
>>>> case of uncompressed state can work, since skip is required for the partial
>>>> reads, and I apologise if I'm wrong, I don't have the same level of
>>>> understanding as you have.
>>>>
>>>> What we considered doing was to find a way to actually cache the file
>>>> as a byte[] and do the reads from memory ... but it seems the state
>>>> compression is doing the same. We are in the process of testing state
>>>> compression under production volumes ... can't say how that will actually
>>>> work for us.
>>>>
>>>> Thank you again for looking into this. I'm looking forward for your
>>>> thoughts. Please let me know if I missed or misunderstood something. Please
>>>> let us know your recommendation.
>>>>
>>>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com> wrote:
>>>>
>>>>> Hi William,
>>>>>
>>>>> It's a bit old question but I think now we know why this is happening.
>>>>> Please see [1] for further details.
>>>>> It's an important requirement to use uncompressed state because even
>>>>> with the fix compressed state is still problematic.
>>>>>
>>>>> We've already tested the PR with load but if you can report back it
>>>>> would be helpful.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-36530
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
>>>>> theanonymous31...@gmail.com> wrote:
>>>>>
>>>>>> Context
>>>>>>
>>>>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume
>>>>>> data from ~ 40k Kafka topic partitions in some environments. We are using
>>>>>> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.
>>>>>>
>>>>>> Problem
>>>>>>
>>>>>> At the point when the state for operator using
>>>>>> topic-partition-offset-states doesn’t fit in the
>>>>>> state.storage.fs.memory-threshold, we end up with a proportionally high
>>>>>> number of reads for the checkpoint and savepoint files for each of the
>>>>>> topic partition offsets.
>>>>>>
>>>>>> For example when we have:
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG
>>>>>> org.apache.flink.runtime.state.TaskStateManagerImpl          [] - 
>>>>>> Operator
>>>>>> 8992e27ae82755cac12dd37f518df782 has remote state
>>>>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
>>>>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414,
>>>>>> 459, 504, 549 …(offsets is a list 40k elements)
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>> For each of the metadata offsets we will have S3 reads for
>>>>>> checkpoint/savepoint. The Flink job fails to resume from checkpoint.
>>>>>> With debug logs, we see hundred of thousands of AWS GET calls for the 
>>>>>> same
>>>>>> checkpoint file, with different offsets. These AWS calls take such a long
>>>>>> time, that our application fails to start and job crashes and starts same
>>>>>> reads again and crashes again.
>>>>>>
>>>>>>
>>>>>> We will have:
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
>>>>>>                             [] - Sending Request: GET
>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>>> Range: bytes=234-9223372036854775806, User-Agent:  ,
>>>>>> cfg/retry-mode/legacy, presto, )
>>>>>>
>>>>>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request
>>>>>>                             [] - Sending Request: GET
>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>>> Range: bytes=234-9223372036854775806, User-Agent: ,
>>>>>> cfg/retry-mode/legacy, presto, )
>>>>>>
>>>>>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request
>>>>>>                             [] - Sending Request: GET
>>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , 
>>>>>> cfg/retry-mode/legacy,
>>>>>> presto, )
>>>>>>
>>>>>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request
>>>>>>                             [] - Sending Request: GET
>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>>> Range: bytes=279-9223372036854775806, User-Agent: , 
>>>>>> cfg/retry-mode/legacy,
>>>>>> presto, )
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>> Code which does the multiple reads was isolated to:
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>>
>>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues
>>>>>>
>>>>>>  private <S> void deserializeOperatorStateValues(
>>>>>>
>>>>>>             PartitionableListState<S> stateListForName,
>>>>>>
>>>>>>             FSDataInputStream in,
>>>>>>
>>>>>>             OperatorStateHandle.StateMetaInfo metaInfo)
>>>>>>
>>>>>>             throws IOException {
>>>>>>
>>>>>>         if (null != metaInfo) {
>>>>>>
>>>>>>             long[] offsets = metaInfo.getOffsets();
>>>>>>
>>>>>>             if (null != offsets) {
>>>>>>
>>>>>>                 DataInputView div = new
>>>>>> DataInputViewStreamWrapper(in);
>>>>>>
>>>>>>                 TypeSerializer<S> serializer =
>>>>>>
>>>>>>
>>>>>>                         
>>>>>> stateListForName.getStateMetaInfo().getPartitionStateSerializer();
>>>>>>
>>>>>>                 for (long offset : offsets) {
>>>>>>
>>>>>>                     in.seek(offset);
>>>>>>
>>>>>>                     stateListForName.add(serializer.deserialize(div));
>>>>>>
>>>>>>                 }
>>>>>>
>>>>>>             }
>>>>>>
>>>>>>         }
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>> Questions:
>>>>>>
>>>>>>    1.
>>>>>>
>>>>>>    Please review the behaviour from above and advise if this is
>>>>>>    expected?
>>>>>>    2.
>>>>>>
>>>>>>    The reads for the topic-partition-offset-states are similar to:
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>> Sending Request: GET
>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>>> Range: bytes=234-9223372036854775806
>>>>>>
>>>>>> Sending Request: GET
>>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>>> Range: bytes=279-9223372036854775806
>>>>>>
>>>>>> And so on
>>>>>>
>>>>>> {code}
>>>>>>
>>>>>> Could this behaviour be optimised in Flink to reduce the number of
>>>>>> reads and avoid reading the same data multiple times. Although the range
>>>>>> start is changing slightly across calls, the range end is the same (max
>>>>>> long int) entire file being retrieved. Do we need to retrieve each offset
>>>>>> individually or this could be optimised in Flink to only have one call 
>>>>>> then
>>>>>> then use the data accordingly.
>>>>>>
>>>>>>    3.
>>>>>>
>>>>>>    Currently we set state.storage.fs.memory-threshold: 700kb to
>>>>>>    avoid the problem but we expect the number of topic partitions to 
>>>>>> increase,
>>>>>>    requiring a further increase for this value. What are our options 
>>>>>> once we
>>>>>>    reach the 1MB limit?
>>>>>>
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>

Reply via email to