[ 
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856494#comment-16856494
 ] 

Jiangjie Qin commented on FLINK-10455:
--------------------------------------

Some updates. I think the issue here is still producer leak. The reason is that 
in {{FlinkKafkaProducer011.close()}} will first check the {{asyncException}} 
before it closes all the transactional producers. That may result in producer 
leak. And later tasks after failover will use the transactional IDs saved in 
the state, which are the same as the leaked producers. These task will receive 
ProducerFencedException and again trigger failover.

As long as we guarantee {{FlinkKafkaProducer011.close()}} cleans up everything, 
the classloader issue will be addressed. I'll submit a PR soon. We will need to 
add some test as well.

> Potential Kafka producer leak in case of failures
> -------------------------------------------------
>
>                 Key: FLINK-10455
>                 URL: https://issues.apache.org/jira/browse/FLINK-10455
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.5.2
>            Reporter: Nico Kruber
>            Assignee: Jiangjie Qin
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1
>
>         Attachments: image-2019-06-04-14-25-16-916.png, 
> image-2019-06-04-14-30-55-985.png
>
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we 
> may get an {{ProducerFencedException}}. Documentation around 
> {{ProducerFencedException}} explicitly states that we should close the 
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in 
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in 
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an 
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} 
> simply iterates over the {{pendingCommitTransactions}} which is not touched 
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources 
> from the previous attempt will still linger around.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to