[ https://issues.apache.org/jira/browse/FLINK-34063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Morávek updated FLINK-34063: ---------------------------------- Affects Version/s: 1.18.1 > When snapshot compression is enabled, rescaling of a source operator leads to > some splits getting lost > ------------------------------------------------------------------------------------------------------ > > Key: FLINK-34063 > URL: https://issues.apache.org/jira/browse/FLINK-34063 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.18.0, 1.18.1 > Environment: Can be reproduced in any environment. The most important > thing is to enable snapshot compression. > Reporter: Ivan Burmistrov > Assignee: David Morávek > Priority: Blocker > Labels: pull-request-available > Fix For: 1.18.2 > > Attachments: image-2024-01-11-16-27-09-066.png, > image-2024-01-11-16-30-47-466.png > > > h2. Backstory > We've been experimenting with Autoscaling on the Flink 1.18 and faced a > pretty nasty bug. > The symptoms on our production system were as following. After a while after > deploying a job with autoscaler it started accumulating Kafka lag, and this > could only be observed via external lag measurement - from inside Flink > (measured by > {{_KafkaSourceReader_KafkaConsumer_records_lag_max_}} metric) the lag was OK: > !image-2024-01-11-16-27-09-066.png|width=887,height=263! > After some digging, it turned out that the job has lost some Kafka partitions > - i.e. it stopped consuming from them, “forgot” about their existence. That’s > why from the Flink’s perspective everything was fine - the lag was growing on > the partitions Flink no longer knew about. > This was visible on a metric called “Assigned partitions” > (KafkaSourceReader_KafkaConsumer_assigned_partitions): > !image-2024-01-11-16-30-47-466.png|width=1046,height=254! > We see on the chart that the job used to know about 20 partitions, and then > this number got dropped to 16. > This drop has been quickly connected to the job’s scaling events. Or, more > precisely, to the scaling of the source operator - with almost 100% > probability any scaling of the source operator led to partitions loss. > h2. Investigation > We've conducted the investigation. We use the latest Kubernetes operator and > deploy jobs with Native Kubernetes. > The reproducing scenario we used for investigation: > * Launch a job with source operator parallelism = 4, enable DEBUG logging > * Wait until it takes the first checkpoint > * Scale-up the source operator to say 5 (no need to wait for autoscaling, it > can be done via Flink UI) > * Wait until the new checkpoint is taken > * Scale-down the source operator to 3 > These simple actions with almost 100% probability led to some partitions get > lost. > After that we've downloaded all the logs and inspected them. Noticed these > strange records in logs: > {code:java} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.streaming.api.operators.AbstractStreamOperator","log_level":"INFO","message":"Restoring > state for 4 split(s) to reader.","service_name":"data-beaver"} > {"timestamp":1704415753166,"is_logging_enabled":"false","logger_id":"org.apache.flink.connector.base.source.reader.SourceReaderBase","log_level":"INFO","message":"Adding > split(s) to reader: > [ > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-6, StartingOffset: 1964306414, > StoppingOffset: -9223372036854775808], > [Partition: eventmesh-video-play-v1-19, StartingOffset: 1963002538, > StoppingOffset: -9223372036854775808]]", "service_name":"data-beaver"}{code} > We see that some task being restored with 4 splits, however actual splits > have duplicates - we see that in reality 2 unique partitions have been added > ({_}eventmesh-video-play-v1-6{_} and {_}eventmesh-video-play-v1-19{_}). > Digging into the code and the logs a bit more, log lines like this started > looking suspicious: > > {code:java} > {"timestamp":1704415753165,"is_logging_enabled":"false","logger_id":"org.apache.flink.runtime.state.TaskStateManagerImpl","log_level":"DEBUG", > "message":"Operator 156a1ebbc1936f7d4558c8070b35ba93 has remote state > SubtaskState{operatorStateFromBackend=StateObjectCollection{ > [OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, > 244], distributionMode=SPLIT_DISTRIBUTE}}, > delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/fadb4f23-85dd-4048-b466-94c1c5329dd3', > dataBytes=328}}, > OperatorStateHandle{stateNameToPartitionOffsets={SourceReaderState=StateMetaInfo{offsets=[244, > 244], distributionMode=SPLIT_DISTRIBUTE}}, > delegateStateHandle=ByteStreamStateHandle{handleName='gs://data-beaver/checkpoints/moj-tj-dummy-partition-loss-debug-v1/6e1ba15b1b5bedda64836ff48ed1c264/chk-3/102aa50b-78c2-457e-9a2f-0055f1dbeb98', > dataBytes=328}}]}, operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[]}, > keyedStateFromStream=StateObjectCollection{[]}, > inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=656, > checkpointedSize=656} from job manager and local state alternatives [] from > local state store > org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@1f89f054.", > "service_name":"data-beaver"}{code} > > We see these strange offsets *offsets=[244, 244]* that look weird. > And this is a clearly wrong. Because when restoring from snapshot, [this > code|https://github.com/apache/flink/blob/881062f352f8bf8c21ab7cbea95e111fd82fdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java#L350] > will redistribute offsets to different batches - and they will read the same > value. > These offsets are produced by > [this|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableListState.java#L110] > code: > {code:java} > public long[] write(FSDataOutputStream out) throws IOException { > long[] partitionOffsets = new long[internalList.size()]; > DataOutputView dov = new DataOutputViewStreamWrapper(out); > for (int i = 0; i < internalList.size(); ++i) { > S element = internalList.get(i); > partitionOffsets[i] = out.getPos(); > getStateMetaInfo().getPartitionStateSerializer().serialize(element, > dov); > } > return partitionOffsets; > } {code} > The actual implementation that’s being used in this piece of code is > [CompressibleFSDataOutputStream|https://github.com/apache/flink/blob/263f3283724a5081e41f679659fa6a5819350739/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompressibleFSDataOutputStream.java#L30]. > At this moment we realised that we have snapshot compression enabled > (execution.checkpointing.snapshot-compression = true). > If we take a look into how getPos() is implemented in > CompressibleFSDataOutputStream, we'd see that getPos() is delegated to the > actual output stream, while writing is happening through compressing delegate: > > {code:java} > public CompressibleFSDataOutputStream( > CheckpointStateOutputStream delegate, StreamCompressionDecorator > compressionDecorator) > throws IOException { > this.delegate = delegate; > this.compressingDelegate = > compressionDecorator.decorateWithCompression(delegate); > } > @Override > public long getPos() throws IOException { > return delegate.getPos(); > } > @Override > public void write(int b) throws IOException { > compressingDelegate.write(b); > } {code} > This is incorrect when compression is enabled, because compressing delegate > doesn't flush data into the actual output stream immediately > ([link|https://github.com/xerial/snappy-java/blob/ebfbdead182937463735729bd8fe5f4cd69235e4/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java#L279]): > {code:java} > @Override > public void write(int b) > throws IOException > { > if (closed) { > throw new IOException("Stream is closed"); > } > if (buffer.remaining() <= 0) { > flushBuffer(); > } > buffer.put((byte) b); > } {code} > Hence, the position in the _delegate_ doesn't get updated, and all offsets > end up being the same. > h2. Simplest reproducing scenario > Now as we know the culprit, a simple reproducing scenario (verified) is the > following, that can be checked locally eassily: > * Create a Kafka topic with say 20 partitions > * Launch a job reading from this topic with some parallelism, say 5. > *Important: snapshot compression should be enabled in this job* > * Stop the job with savepoint > * Restore the job from this savepoint and pick a different parallelism, say > 3. > * Result: some Kafka partitions will not be consumed anymore. > -- This message was sent by Atlassian Jira (v8.20.10#820010)