Hong Liang Teoh created FLINK-32116: ---------------------------------------
Summary: FlinkKinesisConsumer cannot stop-with-savepoint when configured with watermark assigner and watermark tracker Key: FLINK-32116 URL: https://issues.apache.org/jira/browse/FLINK-32116 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.15.4, 1.16.1, 1.17.0 Reporter: Hong Liang Teoh Problem: When FlinkKinesisConsumer is configured with legacy watermarking system, it is unable to take a savepoint during stop-with-savepoint, and will get stuck indefinitely. {code:java} FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new SimpleStringSchema(), consumerConfig); // Set up watermark assigner on Kinesis source src.setPeriodicWatermarkAssigner(...); // Set up watermark tracker on Kinesis source src.setWatermarkTracker(...);{code} *Why does it get stuck?* When watermarks are setup, the `shardConsumer` and `recordEmitter` thread communicate using asynchronous queue. On stop-with-savepoint, shardConsumer waits for queue to empty before continuing. recordEmitter is terminated before queue is empty. As such, queue is never going to be empty, and app gets stuck indefinitely. *Workarounds* Use the new watermark framework {code:java} FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new SimpleStringSchema(), consumerConfig); env.addSource(src) // Set up watermark strategy with both watermark assigner and watermark tracker .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)