[ https://issues.apache.org/jira/browse/FLINK-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
likang updated FLINK-16482: --------------------------- Component/s: Connectors / Kafka > Flink Job throw CloseException when call the FlinkKafkaConsumer cancel > function > ------------------------------------------------------------------------------- > > Key: FLINK-16482 > URL: https://issues.apache.org/jira/browse/FLINK-16482 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Reporter: likang > Priority: Critical > Attachments: The bug and solution.docx > > > *Background:* > Today I tried to detect my Flink job with a timing thread, and if > the job did not read the data for a long time, it automatically exited. But > when I detect the read timeout and call the cancel function of > FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's > recovery mechanism considers that it exited abnormally and re-puller the task. > *Bug*: > I checked the Cancel code of the FlinkKafkaConsumer code, and found > that in fact, the Cancel of KafkaFetcher was first called, then the Close () > of Handover was called, and then the shutdown () of KafkaConsumerThread was > called. Finally, the KafkaConsumerThread thread exited the while loop and > called once after detecting the running identifier. Handover's Close (). > There will be several problems here: 1. CloseException will be thrown > when Handover is called in Cancel of KafkaFetcher, here need to remove the > call of handover.close () 2. The thread in KafkaConsumerThread exits because > of running = false After the loop, you need to determine whether to exit > normally. You should not call handover.close () for normal exit, otherwise > you will also throw a CloseException. > Final the details and solutions are in the attachment -- This message was sent by Atlassian Jira (v8.3.4#803005)