Thank you for your answer David, which is a confirmation of what we see in
the Flink code.

A few thoughts below:


"as this may easily lead to a data loss"

Removing a topic/partition can indeed lead to data loss if not done
carefully. However, *after* the topic has been deleted, I believe it would
be safe for the partition discoverer to forget it, unless I am missing
something.


"partition removal is not even supported by Kafka"

Topic removal is supported, however, and that is our use case here as well.


"You should only be adding new topics / partitions"

Unfortunately, in some cloud environments, partitions are a precious
commodity and it is often unaffordable to scale them up without
subsequently scaling them down.


In my humble view, forgetting a topic's partitions after the topic's
removal should be supported by the partition discoverer (even if it's an
opt-in capability). Would the Flink community be open to a contribution
that does this?


Best regards,

Constantinos Papadopoulos

On Tue, Sep 14, 2021 at 12:54 PM David Morávek <d...@apache.org> wrote:

> Hi Constantinos,
>
> The partition discovery doesn't support topic / partition removal as this
> may easily lead to a data loss (partition removal is not even supported by
> Kafka for the same reason)
>
> Dynamically adding and removing partitions as part of a business logic is
> just not how Kafka is designed to work. You should only be adding new
> topics / partitions for scale out reasons and even that should be done
> super carefully because it breaks data partitioning.
>
> Best,
> D.
>
> On Tue, Sep 14, 2021 at 11:00 AM Constantinos Papadopoulos <
> cpa...@gmail.com> wrote:
>
>> We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic
>> name *pattern*, and we have partition discovery enabled.
>>
>> When our product scales up, it adds new topics. When it scales down, it
>> removes topics.
>>
>> The problem is that the FlinkKafkaConsumer never seems to forget
>> partitions that don't exist anymore.
>>
>> As a result, our logs are filled with UNKNOWN_TOPIC_OR_PARTITION errors:
>>
>> *[Consumer clientId=consumer-metric-processor-consumer-group-2,
>> groupId=metric-processor-consumer-group] Error while fetching metadata with
>> correlation id 3030663 : {metric-athfvfrt#sgy=UNKNOWN_TOPIC_OR_PARTITION}*
>>
>> Over time, the problem becomes worse as scale ups and scale downs
>> continue happening (and thus the number of deleted partitions continues
>> increasing).
>>
>> Is this a bug, or are we missing how to get the FlinkKafkaConsumer to
>> forget partitions that don't exist anymore?
>>
>> The deleted topics are not returned by the "listTopics" API which the
>> KafkaPartitionDiscoverer calls under the covers, so it's unclear why the
>> KafkaPartitionDiscoverer doesn't then proceed to forget about these topics
>> and their partitions.
>>
>

Reply via email to