I'll try to be more direct with the answer as you already have the context on what the issue is.
When this happens we basically have these options: 1) We can throw an exception (with good wording, so user knows what's going on) and fail the job. This forces user to take an immediate action and fix the issue. 2) We can log a warning and keeping the job running. Most users won't notice this unless they are using tools such as Sentry / StackDriver with automatic alerting. In most cases this will hide a real problem, that could be really hard to discover / repair in later stages. So in other wording, Flink doesn't forget this in order to "proactively guard user against some serious troubles". Can you please elaborate little bit more about the use case and why it needs to be implemented the way it is? Maybe there could be an alternative solution to this. Best, D. On Tue, Sep 14, 2021 at 7:25 PM Constantinos Papadopoulos <cpa...@gmail.com> wrote: > Thanks David. What you are saying makes sense. But, I keep hearing I > shouldn't delete the topic externally, and I keep asking why doesn't Flink > forget about the topic IF it has in fact been deleted externally (for > whatever reason). > > I think I will drop this now. > > On Tue, Sep 14, 2021 at 5:50 PM David Morávek <d...@apache.org> wrote: > >> We are basically describing the same thing with Fabian, just a different >> wording. >> >> The problem is that if you delete the topic externally, you're making an >> assumption that downstream processor (Flink in this case) has already >> consumed and RELIABLY processed all of the data from that topic (which may >> not be true). This would effectively lead to AT_MOST_ONCE delivery >> guarantees (in other words, we are OK with loosing data), which is a >> trade-off that _in_my_opinion_ we shouldn't make here. >> >> Best, >> D. >> >> On Tue, Sep 14, 2021 at 4:37 PM Constantinos Papadopoulos < >> cpa...@gmail.com> wrote: >> >>> Hi all, >>> >>> Thank you for the replies, they are much appreciated. >>> >>> I'm sure I'm missing something obvious here, so bear with me... >>> >>> Fabian, regarding: >>> >>> "Flink will try to recover from the previous checkpoint which is invalid >>> by now because the partition is not available anymore." >>> >>> The above would happen because the partition is not available anymore in >>> Kafka (right?), and not because Flink's partition discoverer has removed it >>> from its cache (i.e. even if Flink leaves it there, the topic doesn't exist >>> in Kafka anymore, so that's the source of the problem in the scenario you >>> outlined). In other words, what would be the *extra* harm from Flink >>> cleaning up the partition from its cache after it knows that the partition >>> is gone - this is the part I still don't understand. >>> >>> David, similarly: >>> >>> "actual topic deletion would need to be performed by Flink (not by the >>> 3rd party system as suggested in the original question)" >>> >>> The situation is that the topic has, for better or worse, already been >>> deleted. So my question is one of cleanup, i.e. how is it useful for Flink >>> to continue remembering the partition of an already-deleted topic? (the >>> checkpoint is invalid regardless, right?) >>> >>> >>> >>> On Tue, Sep 14, 2021 at 5:20 PM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> On 9/14/21 3:57 PM, David Morávek wrote: >>>> >>>> Hi Jan, >>>> >>>> Notion of completeness is just one part of the problem. The second part >>>> is that once you remove the Kafka topic, you are no longer able to replay >>>> the data in case of failure. >>>> >>>> So you basically need a following workflow to ensure correctness: >>>> >>>> 1) Wait until there are no more elements in the topic (this can be done >>>> by checking watermark for that partition as you're suggesting) >>>> 2) Take a checkpoint N >>>> 3) Delete the topic (this effectively makes all the checkpoints < N >>>> invalid) >>>> >>>> Agree. >>>> >>>> >>>> If you switch order of 2) and 3) you have no way to recover from >>>> failure. >>>> >>>> Also for this to work properly, actual topic deletion would need to be >>>> performed by Flink (not by the 3rd party system as suggested in the >>>> original question) in the second phase of 2PC (when you're sure that you've >>>> successfully taken a checkpoint, that has seen all the data). >>>> >>>> Agree, the deletion would have to be preceded by something like >>>> partition drain. What is needed is the watermark reaching end of global >>>> window (+inf) and a checkpoint. After that, the source can be removed and >>>> what happens with it is no concern any more. That applies to all sources in >>>> general. I don't know the implementation details, but it seems that the >>>> topic would have to be somehow marked as "draining", it would then be the >>>> responsibility of the reader to shift the watermark belonging to partitions >>>> of that topic to +inf. It would then be responsibility of Flink to verify >>>> that such source is removed only after a checkpoint is taken. Otherwise >>>> there would be possible risk of data loss. >>>> >>>> This definitely looks like quite complex process. >>>> >>>> >>>> Best, >>>> D. >>>> >>>> On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi, >>>>> >>>>> just out of curiosity, would this problem be solvable by the ability >>>>> to >>>>> remove partitions, that declare, that do not contain more data >>>>> (watermark reaching end of global window)? There is probably another >>>>> problem with that topic can be recreated after being deleted, which >>>>> could result in watermark moving back in time, but this problem might >>>>> be >>>>> there already. >>>>> >>>>> Jan >>>>> >>>>> On 9/14/21 3:08 PM, Fabian Paul wrote: >>>>> > Hi Constantinos, >>>>> > >>>>> > I agree with David that it is not easily possible to remove a >>>>> partition while a Flink job is running. Imagine the following scenario: >>>>> > >>>>> > Your Flink job initially works on 2 partitions belonging to two >>>>> different topics and you have checkpointing enabled to guarantee >>>>> > exactly-once delivery. It implies that on every checkpoint the >>>>> offsets of the Kafka topic are stored in a Flink checkpoint to recover >>>>> > from them in case of a failure. >>>>> > Now you trigger the removal of one of the topics and the discovery >>>>> detects that one of the partitions was removed. If the pipeline >>>>> > now fails before the next checkpoint was taken Flink will try to >>>>> recover from the previous checkpoint which is invalid by now because >>>>> > the partition is not available anymore. >>>>> > >>>>> > Only if you do not care about loosing data it is possible to simply >>>>> ignore the removed partition. >>>>> > >>>>> > Best, >>>>> > Fabian >>>>> >>>>