Hi devs, I've recently upgraded my Flink v1.20 apps using the FlinkKinesisConsumer API to the newly recommended KinesisStreamsSource API. I'm now experiencing that my Kinesis source operator (and hence, the entire application) can stall in the initialising phase when restarting.
I've been able to replicate the issue with the following conditions: - Source operator uses KinesisStreamsSource Flink-Kinesis connector (org.apache.flink:flink-connector-kinesis:5.0.0-1.20) - Reader type: EFO - Autoscaling enabled - Trigger: Flink application restart near an earlier restart Looking into the job manager logs, I observe the following events leading up to the failure: 1. Registering stream consumer 2. Starting consumption from stream 3. Required number of readers: 2, Registered readers: 0 4. Required number of readers: 2, Registered readers: 1 5. Can change the parallelism of the job. Restarting the job. 6. Job switched from state RUNNING to CANCELLING 7. Job switched from state CANCELLING to CANCELED 8. De-registering stream consumer 9. Running initialization on master for JOB_NAME 10. Waiting for stream consumer to be deregistered 11. Job switched from state CREATED to RUNNING. 12. Registering stream consumer 13. Found existing consumer. Proceeding to read from consumer. 14. Starting consumption from stream 15. Stream consumer has been deregistered 16. Required number of readers: 2, Registered readers: 0 (indefinitely, operator stalls on initializing phase) My best guess at a solution to this: The old connector has the option for lazy or eager consumer initialisation, and we used the default lazy initialisation. I see there's no option for lazy initialisation in the new connector, defaulting to an eager approach. Has anyone seen this issue before? Any advice on how to proceed? Thank you, Jonathan Du