>
> Could you please let us know if you see anything wrong when using
> `execution.checkpointing.snapshot-compression: true` since for us this
> seems to have solved the multiple S3 reads issue.
>
When something is working it's never wrong. The question is why is has been
resolved.
Are you still having state.storage.fs.memory-threshold set to 500Kb? State
compression may reduce the state under this threshold which would make that
work.

For uncompressed state could you please let us know how the change from
> your PR eliminates the multiple calls to S3. Is not very clear to us.
>
Copy from the PR:
Flink state restore from S3 is super slow because skip function is
consuming ~15 seconds for ~6Mb of data.
...
In this PR the skip going to be called only in case of compression because
otherwise a stream is seekable.

G

On Tue, Oct 15, 2024 at 4:30 PM William Wallace <theanonymous31...@gmail.com>
wrote:

> Thank you for the recommendation and the help.
>
> Could you please let us know if you see anything wrong when using
> `execution.checkpointing.snapshot-compression: true` since for us this
> seems to have solved the multiple S3 reads issue.
>
In debug we see:
> `in.delegate =
> ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream)`
>
> and
> `in.compressionDelegate = SnappyFramedInputStream`
> and  in the logs a file is retrieved only once per subtask
> ```
> DEBUG com.amazonaws.request                                        [] -
> Sending Request: GET 
> https://.../savepoints/flink-compression/.../savepoint-...
> Range: bytes=0-9223372036854775806.
> ```
>
> For uncompressed state could you please let us know how the change from
> your PR eliminates the multiple calls to S3. Is not very clear to us.
>
Thank you.
>
> On Tue, Oct 15, 2024 at 1:42 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> My recommendation is to cherry-pick this PR [1] at top of your Flink
>> distro when possible.
>> Additionally turn off state compression. These should do the trick...
>>
>> [1] https://github.com/apache/flink/pull/25509
>>
>> G
>>
>>
>> On Tue, Oct 15, 2024 at 1:03 PM William Wallace <
>> theanonymous31...@gmail.com> wrote:
>>
>>> Thank you Gabor for your reply.
>>>
>>> I'm sharing below more findings for both uncompressed and compressed
>>> state with the hope it helps. I'm looking further to your thoughts.
>>>
>>> 1. uncompressed state - observe the `stateHandle=RelativeFileStateHandle`
>>> ```
>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
>>> Finished restoring from state handle:
>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
>>> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
>>> 12345678-... [... bytes]}.
>>>  ```
>>>
>>> `FSDataInputStream in.delegate` in
>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>> is an instance of
>>> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
>>> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
>>> actual partial file read which in our case ends in order of millions
>>> because of high job parallelism (subtasks) and job can't recover.
>>>
>>> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
>>> ```
>>> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
>>> Finished restoring from state handle:
>>> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
>>> endKeyGroup=31}},
>>> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
>>> dataBytes=...}}.
>>> ```
>>> `FSDataInputStream in.delegate` in
>>> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
>>> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
>>> This means that for every `offset: offsets = metaInfo.getOffsets()` we
>>> end up doing a read from a `byte[]` which are faster.
>>>
>>> At this point I don't understand how not doing the `skip` operation in
>>> case of uncompressed state can work, since skip is required for the partial
>>> reads, and I apologise if I'm wrong, I don't have the same level of
>>> understanding as you have.
>>>
>>> What we considered doing was to find a way to actually cache the file as
>>> a byte[] and do the reads from memory ... but it seems the state
>>> compression is doing the same. We are in the process of testing state
>>> compression under production volumes ... can't say how that will actually
>>> work for us.
>>>
>>> Thank you again for looking into this. I'm looking forward for your
>>> thoughts. Please let me know if I missed or misunderstood something. Please
>>> let us know your recommendation.
>>>
>>> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> Hi William,
>>>>
>>>> It's a bit old question but I think now we know why this is happening.
>>>> Please see [1] for further details.
>>>> It's an important requirement to use uncompressed state because even
>>>> with the fix compressed state is still problematic.
>>>>
>>>> We've already tested the PR with load but if you can report back it
>>>> would be helpful.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-36530
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
>>>> theanonymous31...@gmail.com> wrote:
>>>>
>>>>> Context
>>>>>
>>>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume
>>>>> data from ~ 40k Kafka topic partitions in some environments. We are using
>>>>> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.
>>>>>
>>>>> Problem
>>>>>
>>>>> At the point when the state for operator using
>>>>> topic-partition-offset-states doesn’t fit in the
>>>>> state.storage.fs.memory-threshold, we end up with a proportionally high
>>>>> number of reads for the checkpoint and savepoint files for each of the
>>>>> topic partition offsets.
>>>>>
>>>>> For example when we have:
>>>>>
>>>>> {code}
>>>>>
>>>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG
>>>>> org.apache.flink.runtime.state.TaskStateManagerImpl          [] - Operator
>>>>> 8992e27ae82755cac12dd37f518df782 has remote state
>>>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
>>>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414,
>>>>> 459, 504, 549 …(offsets is a list 40k elements)
>>>>>
>>>>> {code}
>>>>>
>>>>> For each of the metadata offsets we will have S3 reads for
>>>>> checkpoint/savepoint. The Flink job fails to resume from checkpoint.
>>>>> With debug logs, we see hundred of thousands of AWS GET calls for the same
>>>>> checkpoint file, with different offsets. These AWS calls take such a long
>>>>> time, that our application fails to start and job crashes and starts same
>>>>> reads again and crashes again.
>>>>>
>>>>>
>>>>> We will have:
>>>>>
>>>>> {code}
>>>>>
>>>>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
>>>>>                             [] - Sending Request: GET
>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>> Range: bytes=234-9223372036854775806, User-Agent:  ,
>>>>> cfg/retry-mode/legacy, presto, )
>>>>>
>>>>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request
>>>>>                             [] - Sending Request: GET
>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>> Range: bytes=234-9223372036854775806, User-Agent: ,
>>>>> cfg/retry-mode/legacy, presto, )
>>>>>
>>>>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request
>>>>>                             [] - Sending Request: GET
>>>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
>>>>> presto, )
>>>>>
>>>>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request
>>>>>                             [] - Sending Request: GET
>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
>>>>> presto, )
>>>>>
>>>>> {code}
>>>>>
>>>>> Code which does the multiple reads was isolated to:
>>>>>
>>>>> {code}
>>>>>
>>>>>
>>>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues
>>>>>
>>>>>  private <S> void deserializeOperatorStateValues(
>>>>>
>>>>>             PartitionableListState<S> stateListForName,
>>>>>
>>>>>             FSDataInputStream in,
>>>>>
>>>>>             OperatorStateHandle.StateMetaInfo metaInfo)
>>>>>
>>>>>             throws IOException {
>>>>>
>>>>>         if (null != metaInfo) {
>>>>>
>>>>>             long[] offsets = metaInfo.getOffsets();
>>>>>
>>>>>             if (null != offsets) {
>>>>>
>>>>>                 DataInputView div = new DataInputViewStreamWrapper(in);
>>>>>
>>>>>                 TypeSerializer<S> serializer =
>>>>>
>>>>>
>>>>>                         
>>>>> stateListForName.getStateMetaInfo().getPartitionStateSerializer();
>>>>>
>>>>>                 for (long offset : offsets) {
>>>>>
>>>>>                     in.seek(offset);
>>>>>
>>>>>                     stateListForName.add(serializer.deserialize(div));
>>>>>
>>>>>                 }
>>>>>
>>>>>             }
>>>>>
>>>>>         }
>>>>>
>>>>> {code}
>>>>>
>>>>> Questions:
>>>>>
>>>>>    1.
>>>>>
>>>>>    Please review the behaviour from above and advise if this is
>>>>>    expected?
>>>>>    2.
>>>>>
>>>>>    The reads for the topic-partition-offset-states are similar to:
>>>>>
>>>>> {code}
>>>>>
>>>>> Sending Request: GET
>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>> Range: bytes=234-9223372036854775806
>>>>>
>>>>> Sending Request: GET
>>>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>>>> Range: bytes=279-9223372036854775806
>>>>>
>>>>> And so on
>>>>>
>>>>> {code}
>>>>>
>>>>> Could this behaviour be optimised in Flink to reduce the number of
>>>>> reads and avoid reading the same data multiple times. Although the range
>>>>> start is changing slightly across calls, the range end is the same (max
>>>>> long int) entire file being retrieved. Do we need to retrieve each offset
>>>>> individually or this could be optimised in Flink to only have one call 
>>>>> then
>>>>> then use the data accordingly.
>>>>>
>>>>>    3.
>>>>>
>>>>>    Currently we set state.storage.fs.memory-threshold: 700kb to avoid
>>>>>    the problem but we expect the number of topic partitions to increase,
>>>>>    requiring a further increase for this value. What are our options once 
>>>>> we
>>>>>    reach the 1MB limit?
>>>>>
>>>>>
>>>>> Thank you.
>>>>>
>>>>

Reply via email to