My recommendation is to cherry-pick this PR [1] at top of your Flink distro when possible. Additionally turn off state compression. These should do the trick...
[1] https://github.com/apache/flink/pull/25509 G On Tue, Oct 15, 2024 at 1:03 PM William Wallace <theanonymous31...@gmail.com> wrote: > Thank you Gabor for your reply. > > I'm sharing below more findings for both uncompressed and compressed state > with the hope it helps. I'm looking further to your thoughts. > > 1. uncompressed state - observe the `stateHandle=RelativeFileStateHandle` > ``` > org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] - > Finished restoring from state handle: > KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, > endKeyGroup=31}}, stateHandle=RelativeFileStateHandle State: > s3p://.../savepoints/flink-no-compression/.../savepoint-.../12345678-..., > 12345678-... [... bytes]}. > ``` > > `FSDataInputStream in.delegate` in > `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues` > is an instance of > `ClosingFSDataInputStream(org.apache.flink.fs.s3presto.common.HadoopDataInputStream`. > For every `offset: offsets = metaInfo.getOffsets()` we end up doing an > actual partial file read which in our case ends in order of millions > because of high job parallelism (subtasks) and job can't recover. > > 2. compressed state - observe the stateHandle=ByteStreamStateHandle > ``` > org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation [] - > Finished restoring from state handle: > KeyGroupsSavepointStateHandle{groupRangeOffsets=KeyGroupRangeOffsets{keyGroupRange=KeyGroupRange{startKeyGroup=0, > endKeyGroup=31}}, > stateHandle=ByteStreamStateHandle{handleName='(s3p:.../savepoints/flink-compression/.../savepoint-.../12345678-...', > dataBytes=...}}. > ``` > `FSDataInputStream in.delegate` in > `org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues` > is an instance if `ByteStreamStateHandle(ByteStateHandleInputStream) > This means that for every `offset: offsets = metaInfo.getOffsets()` we end > up doing a read from a `byte[]` which are faster. > > At this point I don't understand how not doing the `skip` operation in > case of uncompressed state can work, since skip is required for the partial > reads, and I apologise if I'm wrong, I don't have the same level of > understanding as you have. > > What we considered doing was to find a way to actually cache the file as a > byte[] and do the reads from memory ... but it seems the state compression > is doing the same. We are in the process of testing state compression under > production volumes ... can't say how that will actually work for us. > > Thank you again for looking into this. I'm looking forward for your > thoughts. Please let me know if I missed or misunderstood something. Please > let us know your recommendation. > > On Tue, Oct 15, 2024 at 8:35 AM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Hi William, >> >> It's a bit old question but I think now we know why this is happening. >> Please see [1] for further details. >> It's an important requirement to use uncompressed state because even with >> the fix compressed state is still problematic. >> >> We've already tested the PR with load but if you can report back it would >> be helpful. >> >> [1] https://issues.apache.org/jira/browse/FLINK-36530 >> >> BR, >> G >> >> >> On Fri, Aug 16, 2024 at 11:25 AM William Wallace < >> theanonymous31...@gmail.com> wrote: >> >>> Context >>> >>> We have recently upgraded from Flink 1.13.6 to Flink 1.19. We consume >>> data from ~ 40k Kafka topic partitions in some environments. We are using >>> aligned checkpoints. We set state.storage.fs.memory-threshold: 500kb. >>> >>> Problem >>> >>> At the point when the state for operator using >>> topic-partition-offset-states doesn’t fit in the >>> state.storage.fs.memory-threshold, we end up with a proportionally high >>> number of reads for the checkpoint and savepoint files for each of the >>> topic partition offsets. >>> >>> For example when we have: >>> >>> {code} >>> >>> [14-Aug-2024 11:39:12.392 UTC] DEBUG >>> org.apache.flink.runtime.state.TaskStateManagerImpl [] - Operator >>> 8992e27ae82755cac12dd37f518df782 has remote state >>> SubtaskState{operatorStateFromBackend=StateObjectCollection{[OperatorStateHandle{stateNameToPartitionOffsets={ >>> SourceReaderState=StateMetaInfo{offsets=[234, 279, 324, 369, 414, 459, >>> 504, 549 …(offsets is a list 40k elements) >>> >>> {code} >>> >>> For each of the metadata offsets we will have S3 reads for >>> checkpoint/savepoint. The Flink job fails to resume from checkpoint. >>> With debug logs, we see hundred of thousands of AWS GET calls for the same >>> checkpoint file, with different offsets. These AWS calls take such a long >>> time, that our application fails to start and job crashes and starts same >>> reads again and crashes again. >>> >>> >>> We will have: >>> >>> {code} >>> >>> [14-Aug-2024 09:32:49.218 UTC] DEBUG com.amazonaws.request >>> [] - Sending Request: GET >>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 >>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>> Range: bytes=234-9223372036854775806, User-Agent: , >>> cfg/retry-mode/legacy, presto, ) >>> >>> [14-Aug-2024 11:39:12.476 UTC] DEBUG com.amazonaws.request >>> [] - Sending Request: GET >>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>> Range: bytes=234-9223372036854775806, User-Agent: , >>> cfg/retry-mode/legacy, presto, ) >>> >>> [14-Aug-2024 09:32:49.286 UTC] DEBUG com.amazonaws.request >>> [] - Sending Request: GET >>> https://s3-bucket-path/checkpoints/b2db0146e6afa2dabf138730580cc257/chk-370/45fd3560-8be5-4ca4-a7e9-8fe260140c18 >>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, >>> presto, ) >>> >>> [14-Aug-2024 11:39:12.530 UTC] DEBUG com.amazonaws.request >>> [] - Sending Request: GET >>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>> Range: bytes=279-9223372036854775806, User-Agent: , cfg/retry-mode/legacy, >>> presto, ) >>> >>> {code} >>> >>> Code which does the multiple reads was isolated to: >>> >>> {code} >>> >>> >>> org.apache.flink.runtime.state.OperatorStateRestoreOperation#deserializeOperatorStateValues >>> >>> private <S> void deserializeOperatorStateValues( >>> >>> PartitionableListState<S> stateListForName, >>> >>> FSDataInputStream in, >>> >>> OperatorStateHandle.StateMetaInfo metaInfo) >>> >>> throws IOException { >>> >>> if (null != metaInfo) { >>> >>> long[] offsets = metaInfo.getOffsets(); >>> >>> if (null != offsets) { >>> >>> DataInputView div = new DataInputViewStreamWrapper(in); >>> >>> TypeSerializer<S> serializer = >>> >>> >>> >>> stateListForName.getStateMetaInfo().getPartitionStateSerializer(); >>> >>> for (long offset : offsets) { >>> >>> in.seek(offset); >>> >>> stateListForName.add(serializer.deserialize(div)); >>> >>> } >>> >>> } >>> >>> } >>> >>> {code} >>> >>> Questions: >>> >>> 1. >>> >>> Please review the behaviour from above and advise if this is >>> expected? >>> 2. >>> >>> The reads for the topic-partition-offset-states are similar to: >>> >>> {code} >>> >>> Sending Request: GET >>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>> Range: bytes=234-9223372036854775806 >>> >>> Sending Request: GET >>> https://s3-bucket-path/savepoint-72ef07-0cbc29a9e0cd/a432a1ba-2275-4c14-abb3-c027cf1342e9 >>> Headers: (amz-sdk-invocation-id:, Content-Type: application/octet-stream, >>> Range: bytes=279-9223372036854775806 >>> >>> And so on >>> >>> {code} >>> >>> Could this behaviour be optimised in Flink to reduce the number of reads >>> and avoid reading the same data multiple times. Although the range start is >>> changing slightly across calls, the range end is the same (max long int) >>> entire file being retrieved. Do we need to retrieve each offset >>> individually or this could be optimised in Flink to only have one call then >>> then use the data accordingly. >>> >>> 3. >>> >>> Currently we set state.storage.fs.memory-threshold: 700kb to avoid >>> the problem but we expect the number of topic partitions to increase, >>> requiring a further increase for this value. What are our options once we >>> reach the 1MB limit? >>> >>> >>> Thank you. >>> >>