Re: Kafka Partition Discovery

2021-09-24 Thread Roman Khachatryan
Hi, It seems like a useful feature, but it's probably better to have it in the Kafka consumer. There is a related KIP in progress: https://cwiki.apache.org/confluence/display/KAFKA/KIP-580%3A+Exponential+Backoff+for+Kafka+Clients I'd like to pull Arvid into the discussion as he might be better fa

Kafka Partition Discovery

2021-09-22 Thread Mason Chen
Hi all, We are sometimes facing a connection issue with Kafka when a broker restarts ``` java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitio

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-15 Thread Seth Wiesman
I just want to chime in that if you really do need to drop a partition, Flink already supports a solution. If you manually stop the job with a savepoint and restart it with a new UID on the source operator, along with passing the --allowNonRestoredState flag to the client, the source will disregar

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-15 Thread David Morávek
I'll try to be more direct with the answer as you already have the context on what the issue is. When this happens we basically have these options: 1) We can throw an exception (with good wording, so user knows what's going on) and fail the job. This forces user to take an immediate action and fi

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Thanks David. What you are saying makes sense. But, I keep hearing I shouldn't delete the topic externally, and I keep asking why doesn't Flink forget about the topic IF it has in fact been deleted externally (for whatever reason). I think I will drop this now. On Tue, Sep 14, 2021 at 5:50 PM Dav

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
We are basically describing the same thing with Fabian, just a different wording. The problem is that if you delete the topic externally, you're making an assumption that downstream processor (Flink in this case) has already consumed and RELIABLY processed all of the data from that topic (which ma

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Hi all, Thank you for the replies, they are much appreciated. I'm sure I'm missing something obvious here, so bear with me... Fabian, regarding: "Flink will try to recover from the previous checkpoint which is invalid by now because the partition is not available anymore." The above would happ

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský
On 9/14/21 3:57 PM, David Morávek wrote: Hi Jan, Notion of completeness is just one part of the problem. The second part is that once you remove the Kafka topic, you are no longer able to replay the data in case of failure. So you basically need a following workflow to ensure correctness: 1

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
Hi Jan, Notion of completeness is just one part of the problem. The second part is that once you remove the Kafka topic, you are no longer able to replay the data in case of failure. So you basically need a following workflow to ensure correctness: 1) Wait until there are no more elements in the

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Jan Lukavský
Hi, just out of curiosity, would this problem be solvable by the ability to remove partitions, that declare, that do not contain more data (watermark reaching end of global window)? There is probably another problem with that topic can be recreated after being deleted, which could result in w

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Fabian Paul
Hi Constantinos, I agree with David that it is not easily possible to remove a partition while a Flink job is running. Imagine the following scenario: Your Flink job initially works on 2 partitions belonging to two different topics and you have checkpointing enabled to guarantee exactly-once de

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
Thank you for your answer David, which is a confirmation of what we see in the Flink code. A few thoughts below: "as this may easily lead to a data loss" Removing a topic/partition can indeed lead to data loss if not done carefully. However, *after* the topic has been deleted, I believe it woul

Re: Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread David Morávek
Hi Constantinos, The partition discovery doesn't support topic / partition removal as this may easily lead to a data loss (partition removal is not even supported by Kafka for the same reason) Dynamically adding and removing partitions as part of a business logic is just not how Kafka is designed

Flink Kafka Partition Discovery Never Forgets Deleted Partitions

2021-09-14 Thread Constantinos Papadopoulos
We are on Flink 1.12.1, we initialize our FlinkKafkaConsumer with a topic name *pattern*, and we have partition discovery enabled. When our product scales up, it adds new topics. When it scales down, it removes topics. The problem is that the FlinkKafkaConsumer never seems to forget partitions th