[ https://issues.apache.org/jira/browse/FLINK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17890860#comment-17890860 ]
Gabor Somogyi edited comment on FLINK-36530 at 10/18/24 1:45 PM: ----------------------------------------------------------------- [{{94c3b86}}|https://github.com/apache/flink/commit/94c3b86a368d545a0aa3ff6b5c42f6f8ec3e11de] on master [{{60cba35}}|https://github.com/apache/flink/commit/60cba350d7638592ea771dc7cf512798e6248886] on release-1.20 [{{21f79d1}}|https://github.com/apache/flink/commit/21f79d1e0a6a8dbadff8cad1e7785610572b191f] on release-1.19 [{{a38396f}}|https://github.com/apache/flink/commit/a38396fbceaa88992103b79feff71acd7b83e54b] on release-1.18 was (Author: gaborgsomogyi): [{{94c3b86}}|https://github.com/apache/flink/commit/94c3b86a368d545a0aa3ff6b5c42f6f8ec3e11de] on master > Not able to restore list state from S3 > -------------------------------------- > > Key: FLINK-36530 > URL: https://issues.apache.org/jira/browse/FLINK-36530 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 2.0.0, 1.18.2, 1.20.0, 1.19.1 > Reporter: Gabor Somogyi > Assignee: Gabor Somogyi > Priority: Blocker > Labels: pull-request-available > > FLINK-34063 has fixed an important issue with compacted state but introduced > super slow state recovery for both non-compacted and compacted list states > from S3. > Short statement: ~6Mb list state generated from > {code:java} > org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator{code} > restore time is ~62 hours. > Detailed analysis: > During file sink compaction CompactCoordinator with parallelism 1 is > collecting the file list which needs to be compacted (and writes them into > the state). In the problematic scenario the list list size was ~15k entries. > OperatorStateRestoreOperation.deserializeOperatorStateValues gets an offset > for each and every list entry and does basically the following: > {code:java} > for (long offset : offsets) { > in.seek(offset); > stateListForName.add(serializer.deserialize(div)); > }{code} > CompressibleFSDataInputStream.seek has introduced the following code: > {code:java} > final int available = compressingDelegate.available(); > if (available > 0) { > if (available != compressingDelegate.skip(available)) { > throw new IOException("Unable to skip buffered data."); > } > } > {code} > There are 2 problems with the mentioned code part: > * The skip operation is not needed for uncompressed state > * skip takes ~15 seconds for ~6Mb in case of S3 (which ends up in ~62 hours > restore time) > We've already addressed the first issue with a simple if condition but the > second is definitely a harder one. Until the latter is not resolved I would > say that compressed state is not a good choice together with S3 and list > restoral. > Steps to reproduce: > * Create a list operator state with several thousand entries > * Put it to S3 > * Try to restore it from Flink -- This message was sent by Atlassian Jira (v8.20.10#820010)