Hong Liang Teoh created FLINK-32116:
---------------------------------------

             Summary: FlinkKinesisConsumer cannot stop-with-savepoint when 
configured with watermark assigner and watermark tracker
                 Key: FLINK-32116
                 URL: https://issues.apache.org/jira/browse/FLINK-32116
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis
    Affects Versions: 1.15.4, 1.16.1, 1.17.0
            Reporter: Hong Liang Teoh


Problem:

When FlinkKinesisConsumer is configured with legacy watermarking system, it is 
unable to take a savepoint during stop-with-savepoint, and will get stuck 
indefinitely.

 

 
{code:java}
FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
SimpleStringSchema(), consumerConfig);
// Set up watermark assigner on Kinesis source
src.setPeriodicWatermarkAssigner(...);
// Set up watermark tracker on Kinesis source
src.setWatermarkTracker(...);{code}
 

 

*Why does it get stuck?*

When watermarks are setup, the `shardConsumer` and `recordEmitter` thread 
communicate using asynchronous queue.

On stop-with-savepoint, shardConsumer waits for queue to empty before 
continuing. recordEmitter is terminated before queue is empty. As such, queue 
is never going to be empty, and app gets stuck indefinitely.

 

*Workarounds*

Use the new watermark framework
{code:java}
FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
SimpleStringSchema(), consumerConfig);
env.addSource(src)
// Set up watermark strategy with both watermark assigner and watermark tracker
    
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to