Krzysztof Dziolak created FLINK-35815:
-----------------------------------------

             Summary: KinesisProxySyncV2 doesn't always retry throttling 
exceptions. 
                 Key: FLINK-35815
                 URL: https://issues.apache.org/jira/browse/FLINK-35815
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kinesis
    Affects Versions: 1.16.1, 1.15.4, aws-connector-4.2.0, aws-connector-4.3.0
            Reporter: Krzysztof Dziolak
            Assignee: Aleksandr Pilipenko
             Fix For: aws-connector-4.4.0


Problem:

When FlinkKinesisConsumer is configured with legacy watermarking system, it is 
unable to take a savepoint during stop-with-savepoint, and will get stuck 
indefinitely.

 

 
{code:java}
FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
SimpleStringSchema(), consumerConfig);
// Set up watermark assigner on Kinesis source
src.setPeriodicWatermarkAssigner(...);
// Set up watermark tracker on Kinesis source
src.setWatermarkTracker(...);{code}
 

 

*Why does it get stuck?*

When watermarks are setup, the `shardConsumer` and `recordEmitter` thread 
communicate using asynchronous queue.

On stop-with-savepoint, shardConsumer waits for queue to empty before 
continuing. recordEmitter is terminated before queue is empty. As such, queue 
is never going to be empty, and app gets stuck indefinitely.

 

*Workarounds*

Use the new watermark framework
{code:java}
FlinkKinesisConsumer src = new FlinkKinesisConsumer("YourStreamHere", new 
SimpleStringSchema(), consumerConfig);
env.addSource(src)
// Set up watermark strategy with both watermark assigner and watermark tracker
    
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to