Hi Constantinos,

The partition discovery doesn't support topic / partition removal as this
may easily lead to a data loss (partition removal is not even supported by
Kafka for the same reason)

Dynamically adding and removing partitions as part of a business logic is
just not how Kafka is designed to work. You should only be adding new
topics / partitions for scale out reasons and even that should be done
super carefully because it breaks data partitioning.

Best,
D.

On Tue, Sep 14, 2021 at 11:00 AM Constantinos Papadopoulos <cpa...@gmail.com>
wrote:

> We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
> name *pattern*, and we have partition discovery enabled.
>
> When our product scales up, it adds new topics. When it scales down, it
> removes topics.
>
> The problem is that the FlinkKafkaConsumer never seems to forget
> partitions that don't exist anymore.
>
> As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:
>
> *[Consumer clientId=consumer-metric-processor-consumer-group-2,
> groupId=metric-processor-consumer-group] Error while fetching metadata with
> correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*
>
> Over time, the problem becomes worse as scale ups and scale downs continue
> happening (and thus the number of deleted partitions continues increasing).
>
> Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
> forget partitions that don't exist anymore?
>
> The deleted topics are not returned by the "listTopics" API which the
> KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
> KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
> and their partitions.
>

Reply via email to