[ 
https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek updated FLINK-34063:
----------------------------------
    Affects Version/s: 1.18.1

> When snapshot compression is enabled, rescaling of a source operator leads to 
> some splits getting lost
> ------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34063
>                 URL: https://issues.apache.org/jira/browse/FLINK-34063
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.18.0, 1.18.1
>         Environment: Can be reproduced in any environment. The most important 
> thing is to enable snapshot compression.
>            Reporter: Ivan Burmistrov
>            Assignee: David Morávek
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.18.2
>
>         Attachments: image-2024-01-11-16-27-09-066.png, 
> image-2024-01-11-16-30-47-466.png
>
>
> h2. Backstory
> We've been experimenting with Autoscaling on the Flink 1.18 and faced a 
> pretty nasty bug. 
> The symptoms on our production system were as following. After a while after 
> deploying a job with autoscaler it started accumulating Kafka lag, and this 
> could only be observed via external lag measurement - from inside Flink 
> (measured by
> {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
> !image-2024-01-11-16-27-09-066.png|width=887,height=263!
> After some digging, it turned out that the job has lost some Kafka partitions 
> - i.e. it stopped consuming from them, “forgot” about their existence. That’s 
> why from the Flink’s perspective everything was fine - the lag was growing on 
> the partitions Flink no longer knew about.
> This was visible on a metric called “Assigned partitions” 
> (KafkaSourceReader_KafkaConsumer_assigned_partitions):
> !image-2024-01-11-16-30-47-466.png|width=1046,height=254!
> We see on the chart that the job used to know about 20 partitions, and then 
> this number got dropped to 16.
> This drop has been quickly connected to the job’s scaling events. Or, more 
> precisely, to the scaling of the source operator - with almost 100% 
> probability any scaling of the source operator led to partitions loss.
> h2. Investigation
> We've conducted the investigation. We use the latest Kubernetes operator and 
> deploy jobs with Native Kubernetes.
> The reproducing scenario we used for investigation:
>  * Launch a job with source operator parallelism = 4, enable DEBUG logging
>  * Wait until it takes the first checkpoint
>  * Scale-up the source operator to say 5 (no need to wait for autoscaling, it 
> can be done via Flink UI)
>  * Wait until the new checkpoint is taken
>  * Scale-down the source operator to 3
> These simple actions with almost 100% probability led to some partitions get 
> lost.
> After that we've downloaded all the logs and inspected them. Noticed these 
> strange records in logs:
> {code:java}
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
>  state for 4 split(s) to reader.","service_name":"data-beaver"} 
> {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
>  split(s) to reader: 
> [
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
> StoppingOffset: -9223372036854775808], 
> [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
> StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
> We see that some task being restored with 4 splits, however actual splits 
> have duplicates - we see that in reality 2 unique partitions have been added 
> ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).
> Digging into the code and the logs a bit more, log lines like this started 
> looking suspicious:
>  
> {code:java}
> {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
>  "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state 
> SubtaskState{operatorStateFromBackend=StateObjectCollection{ 
> [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
>  244], distributionMode=SPLIT_DISTRIBUTE}}, 
> delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3',
>  dataBytes=328}}, 
> OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
>  244], distributionMode=SPLIT_DISTRIBUTE}}, 
> delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98',
>  dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]}, 
> keyedStateFromBackend=StateObjectCollection{[]}, 
> keyedStateFromStream=StateObjectCollection{[]}, 
> inputChannelState=StateObjectCollection{[]}, 
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=656, 
> checkpointedSize=656} from job manager and local state alternatives [] from 
> local state store 
> org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@1f89f054.", 
> "service_name":"data-beaver"}{code}
>  
> We see these strange offsets *offsets=[244, 244]* that look weird.
> And this is a clearly wrong. Because when restoring from snapshot, [this 
> code|https://github.com/apache/flink/blob/881062f352f8bf8c21ab7cbea95e111fd82fdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L350]
>  will redistribute offsets to different batches - and they will read the same 
> value.
> These offsets are produced by 
> [this|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java#L110]
>  code:
> {code:java}
> public long[] write(FSDataOutputStream out) throws IOException {
>       long[] partitionOffsets = new long[internalList.size()];
>       DataOutputView dov = new DataOutputViewStreamWrapper(out);
>       for (int i = 0; i < internalList.size(); ++i) {
>           S element = internalList.get(i);
>           partitionOffsets[i] = out.getPos();
>           getStateMetaInfo().getPartitionStateSerializer().serialize(element, 
> dov);
>       }
>       return partitionOffsets;
> } {code}
> The actual implementation that’s being used in this piece of code is 
> [CompressibleFSDataOutputStream|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java#L30].
> At this moment we realised that we have snapshot compression enabled 
> (execution.checkpointing.snapshot-compression = true).
> If we take a look into how getPos() is implemented in 
> CompressibleFSDataOutputStream, we'd see that getPos() is delegated to the 
> actual output stream, while writing is happening through compressing delegate:
>  
> {code:java}
> public CompressibleFSDataOutputStream(
>             CheckpointStateOutputStream delegate, StreamCompressionDecorator 
> compressionDecorator)
>             throws IOException {
>         this.delegate = delegate;
>         this.compressingDelegate = 
> compressionDecorator.decorateWithCompression(delegate);
>     }
>     @Override
>     public long getPos() throws IOException {
>         return delegate.getPos();
>     }
>     @Override
>     public void write(int b) throws IOException {
>         compressingDelegate.write(b);
>     } {code}
> This is incorrect when compression is enabled, because compressing delegate 
> doesn't flush data into the actual output stream immediately 
> ([link|https://github.com/xerial/snappy-java/blob/ebfbdead182937463735729bd8fe5f4cd69235e4/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L279]):
> {code:java}
> @Override
> public void write(int b)
>         throws IOException
> {
>    if (closed) {
>        throw new IOException("Stream is closed");
>    }
>    if (buffer.remaining() <= 0) {
>      flushBuffer();
>    }
>    buffer.put((byte) b);
> } {code}
> Hence, the position in the _delegate_ doesn't get updated, and all offsets 
> end up being the same.
> h2. Simplest reproducing scenario
> Now as we know the culprit, a simple reproducing scenario (verified) is the 
> following, that can be checked locally eassily:
>  * Create a Kafka topic with say 20 partitions
>  * Launch a job reading from this topic with some parallelism, say 5. 
> *Important: snapshot compression should be enabled in this job*
>  * Stop the job with savepoint
>  * Restore the job from this savepoint and pick a different parallelism, say 
> 3.
>  * Result: some Kafka partitions will not be consumed anymore.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to