Ivan Burmistrov created FLINK-34063:
---------------------------------------

             Summary: When snapshot compression is enabled, rescaling of a 
source operator leads to some splits getting lost
                 Key: FLINK-34063
                 URL: https://issues.apache.org/jira/browse/FLINK-34063
             Project: Flink
          Issue Type: Bug
         Environment: Can be reproduced in any environment. The most important 
thing is to enable snapshot compression.
            Reporter: Ivan Burmistrov
         Attachments: image-2024-01-11-16-27-09-066.png, 
image-2024-01-11-16-30-47-466.png

h2. Backstory

We've been experimenting with Autoscaling on the Flink 1.18 and faced a pretty 
nasty bug. 

The symptoms on our production system were as following. After a while after 
deploying a job with autoscaler it started accumulating Kafka lag, and this 
could only be observed via external lag measurement - from inside Flink 
(measured by
{{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK:
!image-2024-01-11-16-27-09-066.png|width=887,height=263!

After some digging, it turned out that the job has lost some Kafka partitions - 
i.e. it stopped consuming from them, “forgot” about their existence. That’s why 
from the Flink’s perspective everything was fine - the lag was growing on the 
partitions Flink no longer knew about.

This was visible on a metric called “Assigned partitions” 
(KafkaSourceReader_KafkaConsumer_assigned_partitions):
!image-2024-01-11-16-30-47-466.png|width=1046,height=254!


We see on the chart that the job used to know about 20 partitions, and then 
this number got dropped to 16.

This drop has been quickly connected to the job’s scaling events. Or, more 
precisely, to the scaling of the source operator - with almost 100% probability 
any scaling of the source operator led to partitions loss.
h2. Investigation

We've conducted the investigation. We use the latest Kubernetes operator and 
deploy jobs with Native Kubernetes.

The reproducing scenario we used for investigation:
 * Launch a job with source operator parallelism = 4, enable DEBUG logging
 * Wait until it takes the first checkpoint
 * Scale-up the source operator to say 5 (no need to wait for autoscaling, it 
can be done via Flink UI)
 * Wait until the new checkpoint is taken
 * Scale-down the source operator to 3

These simple actions with almost 100% probability led to some partitions get 
lost.

After that we've downloaded all the logs and inspected them. Noticed these 
strange records in logs:


{code:java}
{"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring
 state for 4 split(s) to reader.","service_name":"data-beaver"} 
{"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding
 split(s) to reader: 
[
[Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
StoppingOffset: -9223372036854775808], 
[Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
StoppingOffset: -9223372036854775808], 
[Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, 
StoppingOffset: -9223372036854775808], 
[Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, 
StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code}
We see that some task being restored with 4 splits, however actual splits have 
duplicates - we see that in reality 2 unique partitions have been added 
({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}).

Digging into the code and the logs a bit more, log lines like this started 
looking suspicious:

 
{code:java}
{"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG",
 "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{ 
[OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
 244], distributionMode=SPLIT_DISTRIBUTE}}, 
delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3',
 dataBytes=328}}, 
OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244,
 244], distributionMode=SPLIT_DISTRIBUTE}}, 
delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98',
 dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[]}, 
keyedStateFromStream=StateObjectCollection{[]}, 
inputChannelState=StateObjectCollection{[]}, 
resultSubpartitionState=StateObjectCollection{[]}, stateSize=656, 
checkpointedSize=656} from job manager and local state alternatives [] from 
local state store 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@1f89f054.", 
"service_name":"data-beaver"}{code}
 

We see these strange offsets *offsets=[244, 244]* that look weird.

And this is a clearly wrong. Because when restoring from snapshot, [this 
code|https://github.com/apache/flink/blob/881062f352f8bf8c21ab7cbea95e111fd82fdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L350]
 will redistribute offsets to different batches - and they will read the same 
value.

These offsets are produced by 
[this|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java#L110]
 code:

{code:java}
public long[] write(FSDataOutputStream out) throws IOException {

      long[] partitionOffsets = new long[internalList.size()];

      DataOutputView dov = new DataOutputViewStreamWrapper(out);

      for (int i = 0; i < internalList.size(); ++i) {
          S element = internalList.get(i);
          partitionOffsets[i] = out.getPos();
          getStateMetaInfo().getPartitionStateSerializer().serialize(element, 
dov);
      }

      return partitionOffsets;
} {code}
The actual implementation that’s being used in this piece of code is 
[CompressibleFSDataOutputStream|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java#L30].

At this moment we realised that we have snapshot compression enabled 
(execution.checkpointing.snapshot-compression = true).

If we take a look into how getPos() is implemented in 
CompressibleFSDataOutputStream, we'd see that getPos() is delegated to the 
actual output stream, while writing is happening through compressing delegate:

 
{code:java}
public CompressibleFSDataOutputStream(
            CheckpointStateOutputStream delegate, StreamCompressionDecorator 
compressionDecorator)
            throws IOException {
        this.delegate = delegate;
        this.compressingDelegate = 
compressionDecorator.decorateWithCompression(delegate);
    }

    @Override
    public long getPos() throws IOException {
        return delegate.getPos();
    }

    @Override
    public void write(int b) throws IOException {
        compressingDelegate.write(b);
    } {code}
This is incorrect when compression is enabled, because compressing delegate 
doesn't flush data into the actual output stream immediately 
([link|https://github.com/xerial/snappy-java/blob/ebfbdead182937463735729bd8fe5f4cd69235e4/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L279]):
{code:java}
@Override
public void write(int b)
        throws IOException
{
   if (closed) {
       throw new IOException("Stream is closed");
   }
   if (buffer.remaining() <= 0) {
     flushBuffer();
   }
   buffer.put((byte) b);
} {code}
Hence, the position in the _delegate_ doesn't get updated, and all offsets end 
up being the same.
h2. Simplest reproducing scenario

Now as we know the culprit, a simple reproducing scenario (verified) is the 
following, that can be checked locally eassily:
 * Create a Kafka topic with say 20 partitions
 * Launch a job reading from this topic with some parallelism, say 5. 
*Important: snapshot compression should be enabled in this job*
 * Stop the job with savepoint
 * Restore the job from this savepoint and pick a different parallelism, say 3.
 * Result: some Kafka partitions will not be consumed anymore.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to