Hi Constantinos, The partition discovery doesn't support topic / partition removal as this may easily lead to a data loss (partition removal is not even supported by Kafka for the same reason)
Dynamically adding and removing partitions as part of a business logic is just not how Kafka is designed to work. You should only be adding new topics / partitions for scale out reasons and even that should be done super carefully because it breaks data partitioning. Best, D. On Tue, Sep 14, 2021 at 11:00 AM Constantinos Papadopoulos <cpa...@gmail.com> wrote: > We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic > name *pattern*, and we have partition discovery enabled. > > When our product scales up, it adds new topics. When it scales down, it > removes topics. > > The problem is that the FlinkKafkaConsumer never seems to forget > partitions that don't exist anymore. > > As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors: > > *[Consumer clientId=consumer-metric-processor-consumer-group-2, > groupId=metric-processor-consumer-group] Error while fetching metadata with > correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}* > > Over time, the problem becomes worse as scale ups and scale downs continue > happening (and thus the number of deleted partitions continues increasing). > > Is this a bug, or are we missing how to get the FlinkKafkaConsumer to > forget partitions that don't exist anymore? > > The deleted topics are not returned by the "listTopics" API which the > KafkaPartitionDiscoverer calls under the covers, so it's unclear why the > KafkaPartitionDiscoverer doesn't then proceed to forget about these topics > and their partitions. >