[ 
https://issues.apache.org/jira/browse/FLINK-16482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

likang updated FLINK-16482:
---------------------------
    Description: 
            *Background:*

            Today I tried to detect my Flink job with a timing thread, and if 
the job did not read the data for a long time, it automatically exited. But 
when I detect the read timeout and call the cancel function of 
FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's 
recovery mechanism considers that it exited abnormally and re-puller the task.

           *Bug*:

          I checked the Cancel code of the FlinkKafkaConsumer code, and found 
that in fact, the Cancel of KafkaFetcher was first called, then the Close () of 
Handover was called, and then the shutdown () of KafkaConsumerThread was 
called. Finally, the KafkaConsumerThread thread exited the while loop and 
called once after detecting the running identifier. Handover's Close ().
        There will be several problems here: 1. CloseException will be thrown 
when Handover is called in Cancel of KafkaFetcher, here need to remove the call 
of handover.close () 2. The thread in KafkaConsumerThread exits because of 
running = false After the loop, you need to determine whether to exit normally. 
You should not call handover.close () for normal exit, otherwise you will also 
throw a CloseException.

  was:
            *Background:*

            Today I tried to detect my Flink job with a timing thread, and if 
the job did not read the data for a long time, it automatically exited. But 
when I detect the read timeout and call the cancel function of 
FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's 
recovery mechanism considers that it exited abnormally and re-puller the task.

           
            I checked the Cancel code of the FlinkKafkaConsumer code, and found 
that in fact, the Cancel of KafkaFetcher was first called, then the Close () of 
Handover was called, and then the shutdown () of KafkaConsumerThread was 
called. Finally, the KafkaConsumerThread thread exited the while loop and 
called once after detecting the running identifier. Handover's Close ().
       There will be several problems here: 1. CloseException will be thrown 
when Handover is called in Cancel of KafkaFetcher, here need to remove the call 
of handover.close () 2. The thread in KafkaConsumerThread exits because of 
running = false After the loop, you need to determine whether to exit normally. 
You should not call handover.close () for normal exit, otherwise you will also 
throw a CloseException.


> Flink Job throw CloseException when call the FlinkKafkaConsumer cancel 
> function
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-16482
>                 URL: https://issues.apache.org/jira/browse/FLINK-16482
>             Project: Flink
>          Issue Type: Bug
>            Reporter: likang
>            Priority: Critical
>
>             *Background:*
>             Today I tried to detect my Flink job with a timing thread, and if 
> the job did not read the data for a long time, it automatically exited. But 
> when I detect the read timeout and call the cancel function of 
> FlinkKafkaConsumer, I find that a CloseException is thrown, and then Flink's 
> recovery mechanism considers that it exited abnormally and re-puller the task.
>            *Bug*:
>           I checked the Cancel code of the FlinkKafkaConsumer code, and found 
> that in fact, the Cancel of KafkaFetcher was first called, then the Close () 
> of Handover was called, and then the shutdown () of KafkaConsumerThread was 
> called. Finally, the KafkaConsumerThread thread exited the while loop and 
> called once after detecting the running identifier. Handover's Close ().
>         There will be several problems here: 1. CloseException will be thrown 
> when Handover is called in Cancel of KafkaFetcher, here need to remove the 
> call of handover.close () 2. The thread in KafkaConsumerThread exits because 
> of running = false After the loop, you need to determine whether to exit 
> normally. You should not call handover.close () for normal exit, otherwise 
> you will also throw a CloseException.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to