stevenzwu edited a comment on issue #7020: [FLINK-10774] [Kafka] connection 
leak when partition discovery is disabled an…
URL: https://github.com/apache/flink/pull/7020#issuecomment-438544127
 
 
   we saw some exception regarding redundant calls to close method. will need 
to fix it.
   
   ```
   2018-11-13 18:26:54,928 ERROR 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - failed to 
close partitionDiscoverer
   java.lang.IllegalStateException: This consumer has already been closed.
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1613)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1526)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1506)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.closeConnections(Kafka09PartitionDiscoverer.java:97)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.close(AbstractPartitionDiscoverer.java:101)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:673)
        at 
org.apache.flink.streaming.api.operators.StreamSource.cancel(StreamSource.java:108)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cancelTask(SourceStreamTask.java:100)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:369)
        at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1481)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   maybe we should add synchronized to this method?
   
   ```
        @Override
        protected void closeConnections() throws Exception {
                if (this.kafkaConsumer != null) {
                        this.kafkaConsumer.close();
   
                        // de-reference the consumer to avoid closing multiple 
times
                        this.kafkaConsumer = null;
                }
        }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to