[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16465996#comment-16465996
 ] 

Edward Rojas commented on FLINK-9303:
-------------------------------------

Hello, 
I reported the issue on the mailing list.
I have being working on a fix on my side, at first I thought this was only 
applicable for the kafka implementation we use, but now that I realize this is 
applicable for kafka as is, I could share it.

My idea is to modify the partition discovery system to check not only new 
partitions, but also check partitions that are no longer available.

Currently, each time the partition discovery is executed, the full list of 
partitions is requested and only new partitions are considered to be added. 
From the whole list we could also compare with the current list and identify 
the partitions to be removed.

Partitions recovered from a saved state also could contain "no longer 
available" partitions, so the check is also performed when the consumer is 
initialized.

Initially I do this validations only when a configuration key is set to true. 

 

Here what I have done, I only this the implementation for kafka connector 0.9 +:
[https://github.com/apache/flink/compare/release-1.5...EAlexRojas:kafka-unassign-partitions?expand=1]

 

I you agree with this approach, I could work a PR. 
Also, I'm interested on using Flink 1.5, any changes this could be taken into 
account for 1.5 ?

> Unassign partitions from Kafka client if partitions become unavailable
> ----------------------------------------------------------------------
>
>                 Key: FLINK-9303
>                 URL: https://issues.apache.org/jira/browse/FLINK-9303
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Major
>             Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to