Hi devs,

I've recently upgraded my Flink v1.20 apps using the FlinkKinesisConsumer
API to the newly recommended KinesisStreamsSource API. I'm now experiencing
that my Kinesis source operator (and hence, the entire application) can
stall in the initialising phase when restarting.

I've been able to replicate the issue with the following conditions:

   - Source operator uses KinesisStreamsSource Flink-Kinesis connector
   (org.apache.flink:flink-connector-kinesis:5.0.0-1.20)
   - Reader type: EFO
   - Autoscaling enabled
   - Trigger: Flink application restart near an earlier restart

Looking into the job manager logs, I observe the following events leading
up to the failure:

   1. Registering stream consumer
   2. Starting consumption from stream
   3. Required number of readers: 2, Registered readers: 0
   4. Required number of readers: 2, Registered readers: 1
   5. Can change the parallelism of the job. Restarting the job.
   6. Job switched from state RUNNING to CANCELLING
   7. Job switched from state CANCELLING to CANCELED
   8. De-registering stream consumer
   9. Running initialization on master for JOB_NAME
   10. Waiting for stream consumer to be deregistered
   11. Job switched from state CREATED to RUNNING.
   12. Registering stream consumer
   13. Found existing consumer. Proceeding to read from consumer.
   14. Starting consumption from stream
   15. Stream consumer has been deregistered
   16. Required number of readers: 2, Registered readers: 0 (indefinitely,
   operator stalls on initializing phase)

My best guess at a solution to this: The old connector has the option for
lazy or eager consumer initialisation, and we used the default lazy
initialisation. I see there's no option for lazy initialisation in the new
connector, defaulting to an eager approach.

Has anyone seen this issue before? Any advice on how to proceed?

Thank you,
Jonathan Du

Reply via email to