We are basically describing the same thing with Fabian, just a different wording.
The problem is that if you delete the topic externally, you're making an assumption that downstream processor (Flink in this case) has already consumed and RELIABLY processed all of the data from that topic (which may not be true). This would effectively lead to AT_MOST_ONCE delivery guarantees (in other words, we are OK with loosing data), which is a trade-off that _in_my_opinion_ we shouldn't make here. Best, D. On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos <cpa...@gmail.com> wrote: > Hi all, > > Thank you for the replies, they are much appreciated. > > I'm sure I'm missing something obvious here, so bear with me... > > Fabian, regarding: > > "Flink will try to recover from the previous checkpoint which is invalid > by now because the partition is not available anymore." > > The above would happen because the partition is not available anymore in > Kafka (right?), and not because Flink's partition discoverer has removed it > from its cache (i.e. even if Flink leaves it there, the topic doesn't exist > in Kafka anymore, so that's the source of the problem in the scenario you > outlined). In other words, what would be the *extra* harm from Flink > cleaning up the partition from its cache after it knows that the partition > is gone - this is the part I still don't understand. > > David, similarly: > > "actual topic deletion would need to be performed by Flink (not by the 3rd > party system as suggested in the original question)" > > The situation is that the topic has, for better or worse, already been > deleted. So my question is one of cleanup, i.e. how is it useful for Flink > to continue remembering the partition of an already-deleted topic? (the > checkpoint is invalid regardless, right?) > > > > On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský <je...@seznam.cz> wrote: > >> On 9/14/21 3:57 PM, David Morávek wrote: >> >> Hi Jan, >> >> Notion of completeness is just one part of the problem. The second part >> is that once you remove the Kafka topic, you are no longer able to replay >> the data in case of failure. >> >> So you basically need a following workflow to ensure correctness: >> >> 1) Wait until there are no more elements in the topic (this can be done >> by checking watermark for that partition as you're suggesting) >> 2) Take a checkpoint N >> 3) Delete the topic (this effectively makes all the checkpoints < N >> invalid) >> >> Agree. >> >> >> If you switch order of 2) and 3) you have no way to recover from failure. >> >> Also for this to work properly, actual topic deletion would need to be >> performed by Flink (not by the 3rd party system as suggested in the >> original question) in the second phase of 2PC (when you're sure that you've >> successfully taken a checkpoint, that has seen all the data). >> >> Agree, the deletion would have to be preceded by something like partition >> drain. What is needed is the watermark reaching end of global window (+inf) >> and a checkpoint. After that, the source can be removed and what happens >> with it is no concern any more. That applies to all sources in general. I >> don't know the implementation details, but it seems that the topic would >> have to be somehow marked as "draining", it would then be the >> responsibility of the reader to shift the watermark belonging to partitions >> of that topic to +inf. It would then be responsibility of Flink to verify >> that such source is removed only after a checkpoint is taken. Otherwise >> there would be possible risk of data loss. >> >> This definitely looks like quite complex process. >> >> >> Best, >> D. >> >> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi, >>> >>> just out of curiosity, would this problem be solvable by the ability to >>> remove partitions, that declare, that do not contain more data >>> (watermark reaching end of global window)? There is probably another >>> problem with that topic can be recreated after being deleted, which >>> could result in watermark moving back in time, but this problem might be >>> there already. >>> >>> Jan >>> >>> On 9/14/21 3:08 PM, Fabian Paul wrote: >>> > Hi Constantinos, >>> > >>> > I agree with David that it is not easily possible to remove a >>> partition while a Flink job is running. Imagine the following scenario: >>> > >>> > Your Flink job initially works on 2 partitions belonging to two >>> different topics and you have checkpointing enabled to guarantee >>> > exactly-once delivery. It implies that on every checkpoint the offsets >>> of the Kafka topic are stored in a Flink checkpoint to recover >>> > from them in case of a failure. >>> > Now you trigger the removal of one of the topics and the discovery >>> detects that one of the partitions was removed. If the pipeline >>> > now fails before the next checkpoint was taken Flink will try to >>> recover from the previous checkpoint which is invalid by now because >>> > the partition is not available anymore. >>> > >>> > Only if you do not care about loosing data it is possible to simply >>> ignore the removed partition. >>> > >>> > Best, >>> > Fabian >>> >>