[ 
https://issues.apache.org/jira/browse/FLINK-36530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated FLINK-36530:
----------------------------------
    Description: 
FLINK-34063 has fixed an important issue with compacted state but introduced 
super slow state recovery for both non-compacted and compacted list states from 
S3.

Short statement: ~6Mb list state generated from 
{code:java}
org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator{code}
restore time is ~62 hours.

Detailed analysis:
During file sink compaction CompactCoordinator with parallelism 1 is collecting 
the file list which needs to be compacted (and writes them into the state). In 
the problematic scenario the list list size was ~15k entries.
OperatorStateRestoreOperation.deserializeOperatorStateValues gets an offset for 
each and every list entry and does basically the following:
{code:java}
for (long offset : offsets) {
    in.seek(offset);
    stateListForName.add(serializer.deserialize(div));
}{code}
CompressibleFSDataInputStream.seek has introduced the following code:
{code:java}
final int available = compressingDelegate.available();
if (available > 0) {
    if (available != compressingDelegate.skip(available)) {
        throw new IOException("Unable to skip buffered data.");
    }
}
{code}
There are 2 problems with the mentioned code part:
 * The skip operation is not needed for uncompressed state
 * skip takes ~15 seconds for ~6Mb in case of S3 (which ends up in ~62 hours 
restore time)

We've already addressed the first issue with a simple if condition but the 
second is definitely a harder one. Until the latter is not resolved I would say 
that compressed state is not a good choice together with S3 and list restoral.

Steps to reproduce:
 * Create a list operator state with several thousand entries
 * Put it to S3
 * Try to restore it from Flink

  was:
FLINK-34063 has fixed an important issue with compacted state but introduced 
super slow state recovery for both non-compacted and compacted list states from 
S3.

Short statement: ~6Mb list state generated from 
{code:java}
org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator{code}
restore time is ~62 hours.

Detailed analysis:
During file sink compaction CompactCoordinator with parallelism 1 is collecting 
the file list which needs to be compacted (and writes them into the state). In 
the problematic scenario the list list size was ~15k entries.
OperatorStateRestoreOperation.deserializeOperatorStateValues gets an offeset 
for each and every list entry and does basically the following:
{code:java}
for (long offset : offsets) {
    in.seek(offset);
    stateListForName.add(serializer.deserialize(div));
}{code}
CompressibleFSDataInputStream.seek has introduced the following code:
{code:java}
final int available = compressingDelegate.available();
if (available > 0) {
    if (available != compressingDelegate.skip(available)) {
        throw new IOException("Unable to skip buffered data.");
    }
}
{code}
There are 2 problems with the mentioned code part:
 * The skip operation is not needed for uncompressed state
 * skip takes ~15 seconds for ~6Mb in case of S3 (which ends up in ~62 hours 
restore time)

We've already addressed the first issue with a simple if condition but the 
second is definitely a harder one. Until the latter is not resolved I would say 
that compressed state is not a good choice together with S3 and list restoral.

Steps to reproduce:
 * Create a list operator state with several thousand entries
 * Put it to S3
 * Try to restore it from Flink


> Not able to restore list state from S3
> --------------------------------------
>
>                 Key: FLINK-36530
>                 URL: https://issues.apache.org/jira/browse/FLINK-36530
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 2.0.0, 1.18.1, 1.20.0, 1.19.1
>            Reporter: Gabor Somogyi
>            Assignee: Gabor Somogyi
>            Priority: Blocker
>              Labels: pull-request-available
>
> FLINK-34063 has fixed an important issue with compacted state but introduced 
> super slow state recovery for both non-compacted and compacted list states 
> from S3.
> Short statement: ~6Mb list state generated from 
> {code:java}
> org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator{code}
> restore time is ~62 hours.
> Detailed analysis:
> During file sink compaction CompactCoordinator with parallelism 1 is 
> collecting the file list which needs to be compacted (and writes them into 
> the state). In the problematic scenario the list list size was ~15k entries.
> OperatorStateRestoreOperation.deserializeOperatorStateValues gets an offset 
> for each and every list entry and does basically the following:
> {code:java}
> for (long offset : offsets) {
>     in.seek(offset);
>     stateListForName.add(serializer.deserialize(div));
> }{code}
> CompressibleFSDataInputStream.seek has introduced the following code:
> {code:java}
> final int available = compressingDelegate.available();
> if (available > 0) {
>     if (available != compressingDelegate.skip(available)) {
>         throw new IOException("Unable to skip buffered data.");
>     }
> }
> {code}
> There are 2 problems with the mentioned code part:
>  * The skip operation is not needed for uncompressed state
>  * skip takes ~15 seconds for ~6Mb in case of S3 (which ends up in ~62 hours 
> restore time)
> We've already addressed the first issue with a simple if condition but the 
> second is definitely a harder one. Until the latter is not resolved I would 
> say that compressed state is not a good choice together with S3 and list 
> restoral.
> Steps to reproduce:
>  * Create a list operator state with several thousand entries
>  * Put it to S3
>  * Try to restore it from Flink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to