[ https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465996#comment-16465996 ]
Edward Rojas commented on FLINK-9303: ------------------------------------- Hello, I reported the issue on the mailing list. I have being working on a fix on my side, at first I thought this was only applicable for the kafka implementation we use, but now that I realize this is applicable for kafka as is, I could share it. My idea is to modify the partition discovery system to check not only new partitions, but also check partitions that are no longer available. Currently, each time the partition discovery is executed, the full list of partitions is requested and only new partitions are considered to be added. From the whole list we could also compare with the current list and identify the partitions to be removed. Partitions recovered from a saved state also could contain "no longer available" partitions, so the check is also performed when the consumer is initialized. Initially I do this validations only when a configuration key is set to true. Here what I have done, I only this the implementation for kafka connector 0.9 +: [https://github.com/apache/flink/compare/release-1.5...EAlexRojas:kafka-unassign-partitions?expand=1] I you agree with this approach, I could work a PR. Also, I'm interested on using Flink 1.5, any changes this could be taken into account for 1.5 ? > Unassign partitions from Kafka client if partitions become unavailable > ---------------------------------------------------------------------- > > Key: FLINK-9303 > URL: https://issues.apache.org/jira/browse/FLINK-9303 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Reporter: Tzu-Li (Gordon) Tai > Priority: Major > Fix For: 1.6.0 > > > Originally reported in ML: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html] > The problem is that the Kafka consumer has no notion of "closed" partitions > at the moment, so statically assigned partitions to the Kafka client is never > removed and is always continuously requested for records. > This causes LOG noises as reported in the reported mail thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)