We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic name *pattern*, and we have partition discovery enabled.
When our product scales up, it adds new topics. When it scales down, it removes topics. The problem is that the FlinkKafkaConsumer never seems to forget partitions that don't exist anymore. As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors: *[Consumer clientId=consumer-metric-processor-consumer-group-2, groupId=metric-processor-consumer-group] Error while fetching metadata with correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}* Over time, the problem becomes worse as scale ups and scale downs continue happening (and thus the number of deleted partitions continues increasing). Is this a bug, or are we missing how to get the FlinkKafkaConsumer to forget partitions that don't exist anymore? The deleted topics are not returned by the "listTopics" API which the KafkaPartitionDiscoverer calls under the covers, so it's unclear why the KafkaPartitionDiscoverer doesn't then proceed to forget about these topics and their partitions.