[ https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16856486#comment-16856486 ]
Jiangjie Qin commented on FLINK-10455: -------------------------------------- Some updates. I think the issue here is still producer leak. The reason is that in {{FlinkKafkaProducer011.close()}} will first check the {{asyncException}} before it closes all the transactional producers. That may result in producer leak. And later tasks after failover will use the transactional IDs saved in the state, which are the same as the leaked producers. These task will receive ProducerFencedException and again trigger failover. I think as long as we guarantee {{FlinkKafkaProducer011.close()}} cleans up everything, the classloader issue will be addressed. I'll submit a PR soon. We will need to add some test as well. > Potential Kafka producer leak in case of failures > ------------------------------------------------- > > Key: FLINK-10455 > URL: https://issues.apache.org/jira/browse/FLINK-10455 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.5.2 > Reporter: Nico Kruber > Assignee: Jiangjie Qin > Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.7.0, 1.7.3, 1.9.0, 1.8.1 > > Attachments: image-2019-06-04-14-25-16-916.png, > image-2019-06-04-14-30-55-985.png > > > If the Kafka brokers' timeout is too low for our checkpoint interval [1], we > may get an {{ProducerFencedException}}. Documentation around > {{ProducerFencedException}} explicitly states that we should close the > producer after encountering it. > By looking at the code, it doesn't seem like this is actually done in > {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in > {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an > exception, we don't clean up (nor try to commit) any other transaction. > -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} > simply iterates over the {{pendingCommitTransactions}} which is not touched > during {{close()}} > Now if we restart the failing job on the same Flink cluster, any resources > from the previous attempt will still linger around. > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011 -- This message was sent by Atlassian JIRA (v7.6.3#76005)