[ https://issues.apache.org/jira/browse/FLINK-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
likang updated FLINK-16482: --------------------------- Description: *Background:* Today I tried to detect my Flink job with a timing thread, and if the job did not read the data for a long time, it automatically exited. But when I detect the read timeout and call the cancel function of FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's recovery mechanism considers that it exited abnormally and re-puller the task. *Bug*: I checked the Cancel code of the FlinkKafkaConsumer code, and found that in fact, the Cancel of KafkaFetcher was first called, then the Close () of Handover was called, and then the shutdown () of KafkaConsumerThread was called. Finally, the KafkaConsumerThread thread exited the while loop and called once after detecting the running identifier. Handover's Close (). There will be several problems here: 1. CloseException will be thrown when Handover is called in Cancel of KafkaFetcher, here need to remove the call of handover.close () 2. The thread in KafkaConsumerThread exits because of running = false After the loop, you need to determine whether to exit normally. You should not call handover.close () for normal exit, otherwise you will also throw a CloseException. was: *Background:* Today I tried to detect my Flink job with a timing thread, and if the job did not read the data for a long time, it automatically exited. But when I detect the read timeout and call the cancel function of FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's recovery mechanism considers that it exited abnormally and re-puller the task. I checked the Cancel code of the FlinkKafkaConsumer code, and found that in fact, the Cancel of KafkaFetcher was first called, then the Close () of Handover was called, and then the shutdown () of KafkaConsumerThread was called. Finally, the KafkaConsumerThread thread exited the while loop and called once after detecting the running identifier. Handover's Close (). There will be several problems here: 1. CloseException will be thrown when Handover is called in Cancel of KafkaFetcher, here need to remove the call of handover.close () 2. The thread in KafkaConsumerThread exits because of running = false After the loop, you need to determine whether to exit normally. You should not call handover.close () for normal exit, otherwise you will also throw a CloseException. > Flink Job throw CloseException when call the FlinkKafkaConsumer cancel > function > ------------------------------------------------------------------------------- > > Key: FLINK-16482 > URL: https://issues.apache.org/jira/browse/FLINK-16482 > Project: Flink > Issue Type: Bug > Reporter: likang > Priority: Critical > > *Background:* > Today I tried to detect my Flink job with a timing thread, and if > the job did not read the data for a long time, it automatically exited. But > when I detect the read timeout and call the cancel function of > FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's > recovery mechanism considers that it exited abnormally and re-puller the task. > *Bug*: > I checked the Cancel code of the FlinkKafkaConsumer code, and found > that in fact, the Cancel of KafkaFetcher was first called, then the Close () > of Handover was called, and then the shutdown () of KafkaConsumerThread was > called. Finally, the KafkaConsumerThread thread exited the while loop and > called once after detecting the running identifier. Handover's Close (). > There will be several problems here: 1. CloseException will be thrown > when Handover is called in Cancel of KafkaFetcher, here need to remove the > call of handover.close () 2. The thread in KafkaConsumerThread exits because > of running = false After the loop, you need to determine whether to exit > normally. You should not call handover.close () for normal exit, otherwise > you will also throw a CloseException. -- This message was sent by Atlassian Jira (v8.3.4#803005)