[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16997755#comment-16997755 ]
Rong Rong commented on FLINK-9630: ---------------------------------- Hi All, Is this a duplicate of FLINK-8497 ? > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > ------------------------------------------------------------------------------- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.4.2, 1.5.0 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x > Reporter: Youjun Yuan > Priority: Major > Labels: pull-request-available > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian Jira (v8.3.4#803005)