Ivan Burmistrov created FLINK-34063: ---------------------------------------
Summary: When snapshot compression is enabled, rescaling of a source operator leads to some splits getting lost Key: FLINK-34063 URL: https://issues.apache.org/jira/browse/FLINK-34063 Project: Flink Issue Type: Bug Environment: Can be reproduced in any environment. The most important thing is to enable snapshot compression. Reporter: Ivan Burmistrov Attachments: image-2024-01-11-16-27-09-066.png, image-2024-01-11-16-30-47-466.png h2. Backstory We've been experimenting with Autoscaling on the Flink 1.18 and faced a pretty nasty bug. The symptoms on our production system were as following. After a while after deploying a job with autoscaler it started accumulating Kafka lag, and this could only be observed via external lag measurement - from inside Flink (measured by {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK: !image-2024-01-11-16-27-09-066.png|width=887,height=263! After some digging, it turned out that the job has lost some Kafka partitions - i.e. it stopped consuming from them, “forgot” about their existence. That’s why from the Flink’s perspective everything was fine - the lag was growing on the partitions Flink no longer knew about. This was visible on a metric called “Assigned partitions” (KafkaSourceReader_KafkaConsumer_assigned_partitions): !image-2024-01-11-16-30-47-466.png|width=1046,height=254! We see on the chart that the job used to know about 20 partitions, and then this number got dropped to 16. This drop has been quickly connected to the job’s scaling events. Or, more precisely, to the scaling of the source operator - with almost 100% probability any scaling of the source operator led to partitions loss. h2. Investigation We've conducted the investigation. We use the latest Kubernetes operator and deploy jobs with Native Kubernetes. The reproducing scenario we used for investigation: * Launch a job with source operator parallelism = 4, enable DEBUG logging * Wait until it takes the first checkpoint * Scale-up the source operator to say 5 (no need to wait for autoscaling, it can be done via Flink UI) * Wait until the new checkpoint is taken * Scale-down the source operator to 3 These simple actions with almost 100% probability led to some partitions get lost. After that we've downloaded all the logs and inspected them. Noticed these strange records in logs: {code:java} {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring state for 4 split(s) to reader.","service_name":"data-beaver"} {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding split(s) to reader: [ [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, StoppingOffset: -9223372036854775808], [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, StoppingOffset: -9223372036854775808], [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, StoppingOffset: -9223372036854775808], [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code} We see that some task being restored with 4 splits, however actual splits have duplicates - we see that in reality 2 unique partitions have been added ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}). Digging into the code and the logs a bit more, log lines like this started looking suspicious: {code:java} {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG", "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{ [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, 244], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3', dataBytes=328}}, OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, 244], distributionMode=SPLIT_DISTRIBUTE}}, delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98', dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]}, keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, resultSubpartitionState=StateObjectCollection{[]}, stateSize=656, checkpointedSize=656} from job manager and local state alternatives [] from local state store org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@1f89f054.", "service_name":"data-beaver"}{code} We see these strange offsets *offsets=[244, 244]* that look weird. And this is a clearly wrong. Because when restoring from snapshot, [this code|https://github.com/apache/flink/blob/881062f352f8bf8c21ab7cbea95e111fd82fdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L350] will redistribute offsets to different batches - and they will read the same value. These offsets are produced by [this|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java#L110] code: {code:java} public long[] write(FSDataOutputStream out) throws IOException { long[] partitionOffsets = new long[internalList.size()]; DataOutputView dov = new DataOutputViewStreamWrapper(out); for (int i = 0; i < internalList.size(); ++i) { S element = internalList.get(i); partitionOffsets[i] = out.getPos(); getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov); } return partitionOffsets; } {code} The actual implementation that’s being used in this piece of code is [CompressibleFSDataOutputStream|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java#L30]. At this moment we realised that we have snapshot compression enabled (execution.checkpointing.snapshot-compression = true). If we take a look into how getPos() is implemented in CompressibleFSDataOutputStream, we'd see that getPos() is delegated to the actual output stream, while writing is happening through compressing delegate: {code:java} public CompressibleFSDataOutputStream( CheckpointStateOutputStream delegate, StreamCompressionDecorator compressionDecorator) throws IOException { this.delegate = delegate; this.compressingDelegate = compressionDecorator.decorateWithCompression(delegate); } @Override public long getPos() throws IOException { return delegate.getPos(); } @Override public void write(int b) throws IOException { compressingDelegate.write(b); } {code} This is incorrect when compression is enabled, because compressing delegate doesn't flush data into the actual output stream immediately ([link|https://github.com/xerial/snappy-java/blob/ebfbdead182937463735729bd8fe5f4cd69235e4/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L279]): {code:java} @Override public void write(int b) throws IOException { if (closed) { throw new IOException("Stream is closed"); } if (buffer.remaining() <= 0) { flushBuffer(); } buffer.put((byte) b); } {code} Hence, the position in the _delegate_ doesn't get updated, and all offsets end up being the same. h2. Simplest reproducing scenario Now as we know the culprit, a simple reproducing scenario (verified) is the following, that can be checked locally eassily: * Create a Kafka topic with say 20 partitions * Launch a job reading from this topic with some parallelism, say 5. *Important: snapshot compression should be enabled in this job* * Stop the job with savepoint * Restore the job from this savepoint and pick a different parallelism, say 3. * Result: some Kafka partitions will not be consumed anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)