My recommendation is to cherry-pick this PR [1] at top of your Flink distro
when possible.
Additionally turn off state compression. These should do the trick...

[1] https://github.com/apache/flink/pull/25509

G


On Tue, Oct 15, 2024 at 1:03 PM William Wallace <theanonymous31...@gmail.com>
wrote:

> Thank you Gabor for your reply.
>
> I'm sharing below more findings for both uncompressed and compressed state
> with the hope it helps. I'm looking further to your thoughts.
>
> 1. uncompressed state - observe the `stateHandle=RelativeFileStateHandle`
> ```
> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
> Finished restoring from state handle:
> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
> endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State:
> s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-...,
> 12345678-... [... bytes]}.
>  ```
>
> `FSDataInputStream in.delegate` in
> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
> is an instance of
> `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`.
> For every `offset: offsets = metaInfo.getOffsets()` we end up doing an
> actual partial file read which in our case ends in order of millions
> because of high job parallelism (subtasks) and job can't recover.
>
> 2. compressed state - observe the stateHandle=ByteStreamStateHandle
> ```
> org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] -
> Finished restoring from state handle:
> KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0,
> endKeyGroup=31}},
> stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...',
> dataBytes=...}}.
> ```
> `FSDataInputStream in.delegate` in
> `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues`
> is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream)
> This means that for every `offset: offsets = metaInfo.getOffsets()` we end
> up doing a read from a `byte[]` which are faster.
>
> At this point I don't understand how not doing the `skip` operation in
> case of uncompressed state can work, since skip is required for the partial
> reads, and I apologise if I'm wrong, I don't have the same level of
> understanding as you have.
>
> What we considered doing was to find a way to actually cache the file as a
> byte[] and do the reads from memory ... but it seems the state compression
> is doing the same. We are in the process of testing state compression under
> production volumes ... can't say how that will actually work for us.
>
> Thank you again for looking into this. I'm looking forward for your
> thoughts. Please let me know if I missed or misunderstood something. Please
> let us know your recommendation.
>
> On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Hi William,
>>
>> It's a bit old question but I think now we know why this is happening.
>> Please see [1] for further details.
>> It's an important requirement to use uncompressed state because even with
>> the fix compressed state is still problematic.
>>
>> We've already tested the PR with load but if you can report back it would
>> be helpful.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-36530
>>
>> BR,
>> G
>>
>>
>> On Fri, Aug 16, 2024 at 11:25 AM William Wallace <
>> theanonymous31...@gmail.com> wrote:
>>
>>> Context
>>>
>>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume
>>> data from ~ 40k Kafka topic partitions in some environments. We are using
>>> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb.
>>>
>>> Problem
>>>
>>> At the point when the state for operator using
>>> topic-partition-offset-states doesn’t fit in the
>>> state.storage.fs.memory-threshold, we end up with a proportionally high
>>> number of reads for the checkpoint and savepoint files for each of the
>>> topic partition offsets.
>>>
>>> For example when we have:
>>>
>>> {code}
>>>
>>> [14-Aug-2024 11:39:12.392 UTC] DEBUG
>>> org.apache.flink.runtime.state.TaskStateManagerImpl          [] - Operator
>>> 8992e27ae82755cac12dd37f518df782 has remote state
>>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={
>>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459,
>>> 504, 549 …(offsets is a list 40k elements)
>>>
>>> {code}
>>>
>>> For each of the metadata offsets we will have S3 reads for
>>> checkpoint/savepoint. The Flink job fails to resume from checkpoint.
>>> With debug logs, we see hundred of thousands of AWS GET calls for the same
>>> checkpoint file, with different offsets. These AWS calls take such a long
>>> time, that our application fails to start and job crashes and starts same
>>> reads again and crashes again.
>>>
>>>
>>> We will have:
>>>
>>> {code}
>>>
>>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request
>>>                           [] - Sending Request: GET
>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>> Range: bytes=234-9223372036854775806, User-Agent:  ,
>>> cfg/retry-mode/legacy, presto, )
>>>
>>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request
>>>                           [] - Sending Request: GET
>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>> Range: bytes=234-9223372036854775806, User-Agent: ,
>>> cfg/retry-mode/legacy, presto, )
>>>
>>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request
>>>                           [] - Sending Request: GET
>>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18
>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
>>> presto, )
>>>
>>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request
>>>                           [] - Sending Request: GET
>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy,
>>> presto, )
>>>
>>> {code}
>>>
>>> Code which does the multiple reads was isolated to:
>>>
>>> {code}
>>>
>>>
>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues
>>>
>>>  private <S> void deserializeOperatorStateValues(
>>>
>>>             PartitionableListState<S> stateListForName,
>>>
>>>             FSDataInputStream in,
>>>
>>>             OperatorStateHandle.StateMetaInfo metaInfo)
>>>
>>>             throws IOException {
>>>
>>>         if (null != metaInfo) {
>>>
>>>             long[] offsets = metaInfo.getOffsets();
>>>
>>>             if (null != offsets) {
>>>
>>>                 DataInputView div = new DataInputViewStreamWrapper(in);
>>>
>>>                 TypeSerializer<S> serializer =
>>>
>>>
>>>                         
>>> stateListForName.getStateMetaInfo().getPartitionStateSerializer();
>>>
>>>                 for (long offset : offsets) {
>>>
>>>                     in.seek(offset);
>>>
>>>                     stateListForName.add(serializer.deserialize(div));
>>>
>>>                 }
>>>
>>>             }
>>>
>>>         }
>>>
>>> {code}
>>>
>>> Questions:
>>>
>>>    1.
>>>
>>>    Please review the behaviour from above and advise if this is
>>>    expected?
>>>    2.
>>>
>>>    The reads for the topic-partition-offset-states are similar to:
>>>
>>> {code}
>>>
>>> Sending Request: GET
>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>> Range: bytes=234-9223372036854775806
>>>
>>> Sending Request: GET
>>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9
>>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream,
>>> Range: bytes=279-9223372036854775806
>>>
>>> And so on
>>>
>>> {code}
>>>
>>> Could this behaviour be optimised in Flink to reduce the number of reads
>>> and avoid reading the same data multiple times. Although the range start is
>>> changing slightly across calls, the range end is the same (max long int)
>>> entire file being retrieved. Do we need to retrieve each offset
>>> individually or this could be optimised in Flink to only have one call then
>>> then use the data accordingly.
>>>
>>>    3.
>>>
>>>    Currently we set state.storage.fs.memory-threshold: 700kb to avoid
>>>    the problem but we expect the number of topic partitions to increase,
>>>    requiring a further increase for this value. What are our options once we
>>>    reach the 1MB limit?
>>>
>>>
>>> Thank you.
>>>
>>

Reply via email to