[ https://issues.apache.org/jira/browse/FLINK-32828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846694#comment-17846694 ]
Piotr Nowojski edited comment on FLINK-32828 at 5/15/24 4:27 PM: ----------------------------------------------------------------- We have just stumbled upon the same issue and I can confirm that it's quite critical bug that can leads to all kinds of incorrect results. The problem is that the initial splits recovered from state are initial are registered in the {{WatermarkOutputMultiplexer}} only when first record from that split has been emitted. Resulting watermark was not properly combined from the initial splits, but only from the splits that have already emitted at least one record. I will publish a fix for that shortly. edit: Also the problem doesn't affect only Kafka, but all FLIP-27 sources (anything that uses {{SourceOperator}}. was (Author: pnowojski): We have just stumbled upon the same issue and I can confirm that it's quite critical bug that can leads to all kinds of incorrect results. The problem is that the initial splits recovered from state are initial are registered in the {{WatermarkOutputMultiplexer}} only when first record from that split has been emitted. Resulting watermark was not properly combined from the initial splits, but only from the splits that have already emitted at least one record. I will publish a fix for that shortly. > Kafka partition aware watermark not handled correctly shortly after job start > up from checkpoint or savepoint > ------------------------------------------------------------------------------------------------------------- > > Key: FLINK-32828 > URL: https://issues.apache.org/jira/browse/FLINK-32828 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Connectors / Kafka > Affects Versions: 1.17.1, 1.19.0, 1.18.1 > Environment: Affected environments: > * Local MiniCluster + Confluent Kafka run in docker > ** See attached files > * Flink job run in Kubernetes using Flink Operator 1.15 + 3 node Kafka > cluster run in Kubernetes cluster > Reporter: Grzegorz Liter > Assignee: Piotr Nowojski > Priority: Critical > Attachments: docker-compose.yml, test-job.java > > > When using KafkaSource with partition aware watermarks. Watermarks are being > emitted even when only one partition has some events just after job startup > from savepoint/checkpoint. After it has some events on other partitions the > watermark behaviour is correct and watermark is emited as a minimum watarmark > from all partition. > > Steps to reproduce: > # Setup a Kafka cluster with a topic that has 2 or more partitions. (see > attached docker-compose.yml) > # > ## {{./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic > test-2 --partitions 4}} > # Create a job that (see attached `test-job.java`): > ## uses a KafkaSource with > `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)` > ## has parallelism lower than number of partitions > ## stores checkpoint/savepoint > # Start job > # Send events only on single partition > ## {{./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic > test-2 --property "parse.key=true" --property "key.separator=:"}} > > {{14:51:19,883 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:32,484 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:51:35,914 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark > -292275055-05-16T16:47:04.192Z}} > Expected: Watermark does not progress. Actual: Watermark does not progress. > 5. Stop the job > 6. Startup job from last checkpoint/savepoint > 7. Send events only on single partitions > {{14:53:41,693 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark > -292275055-05-16T16:47:04.192Z}} > {{14:53:46,088 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark > 2023-08-10T12:53:30.661Z}} > {{14:53:49,520 WARN com.example.TestJob6$InputSink2 [] - > == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark > 2023-08-10T12:53:35.077Z}} > Expected: Watermark does not progress. {color:#ff0000}*Actual: Watermark has > progress*{color} > > {color:#172b4d}To add bit more of context:{color} > {color:#172b4d}8. Send events on other partitions and then send events only > on single partitions{color} > {{{color:#172b4d}14:54:55,112 WARN com.example.TestJob6$InputSink2 > [] - == Received: test-2/0: 2 -> a, timestamp > 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark > 2023-08-10T12:53:38.510Z > 14:54:57,673 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark > 2023-08-10T12:53:38.510Z > 14:55:12,821 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:16,099 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark > 2023-08-10T12:54:44.103Z > 14:55:19,122 WARN com.example.TestJob6$InputSink2 [] - == > Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark > 2023-08-10T12:54:44.103Z{color}}} > {color:#172b4d}Expected: Watermark should progress a bit and then should not > progress when receiving events only on single partition. {color} > {color:#172b4d}Actual: As expected{color} > > > {color:#172b4d}This behavior also shows as a burst of late events just after > startup and then no more late events when job operates normally. {color} -- This message was sent by Atlassian Jira (v8.20.10#820010)