On 9/14/21 3:57 PM, David Morávek wrote:
Hi Jan,

Notion of completeness is just one part of the problem. The second part is that once you remove the Kafka topic, you are no longer able to replay the data in case of failure.

So you basically need a following workflow to ensure correctness:

1) Wait until there are no more elements in the topic (this can be done by checking watermark for that partition as you're suggesting)
2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N invalid)
Agree.

If you switch order of 2) and 3) you have no way to recover from failure.

Also for this to work properly, actual topic deletion would need to be performed by Flink (not by the 3rd party system as suggested in the original question) in the second phase of 2PC (when you're sure that you've successfully taken a checkpoint, that has seen all the data).

Agree, the deletion would have to be preceded by something like partition drain. What is needed is the watermark reaching end of global window (+inf) and a checkpoint. After that, the source can be removed and what happens with it is no concern any more. That applies to all sources in general. I don't know the implementation details, but it seems that the topic would have to be somehow marked as "draining", it would then be the responsibility of the reader to shift the watermark belonging to partitions of that topic to +inf. It would then be responsibility of Flink to verify that such source is removed only after a checkpoint is taken. Otherwise there would be possible risk of data loss.

This definitely looks like quite complex process.


Best,
D.

On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    just out of curiosity, would this problem be solvable by the
    ability to
    remove partitions, that declare, that do not contain more data
    (watermark reaching end of global window)? There is probably another
    problem with that topic can be recreated after being deleted, which
    could result in watermark moving back in time, but this problem
    might be
    there already.

      Jan

    On 9/14/21 3:08 PM, Fabian Paul wrote:
    > Hi Constantinos,
    >
    > I agree with David that it is not easily possible to remove a
    partition while a Flink job is running. Imagine the following
    scenario:
    >
    > Your Flink job initially works on 2 partitions belonging to two
    different topics and you have checkpointing enabled to guarantee
    > exactly-once delivery. It implies that on every checkpoint the
    offsets of the Kafka topic are stored in a Flink checkpoint to recover
    > from them in case of a failure.
    > Now you trigger the removal of one of the topics and the
    discovery detects that one of the partitions was removed. If the
    pipeline
    > now fails before the next checkpoint was taken Flink will try to
    recover from the previous checkpoint which is invalid by now because
    > the partition is not available anymore.
    >
    > Only if you do not care about loosing data it is possible to
    simply ignore the removed partition.
    >
    > Best,
    > Fabian

Reply via email to