We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
name *pattern*, and we have partition discovery enabled.

When our product scales up, it adds new topics. When it scales down, it
removes topics.

The problem is that the FlinkKafkaConsumer never seems to forget partitions
that don't exist anymore.

As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:

*[Consumer clientId=consumer-metric-processor-consumer-group-2,
groupId=metric-processor-consumer-group] Error while fetching metadata with
correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*

Over time, the problem becomes worse as scale ups and scale downs continue
happening (and thus the number of deleted partitions continues increasing).

Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
forget partitions that don't exist anymore?

The deleted topics are not returned by the "listTopics" API which the
KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
and their partitions.

Reply via email to