On 9/14/21 3:57 PM, David Morávek wrote:
Hi Jan,
Notion of completeness is just one part of the problem. The second
part is that once you remove the Kafka topic, you are no longer able
to replay the data in case of failure.
So you basically need a following workflow to ensure correctness:
1) Wait until there are no more elements in the topic (this can be
done by checking watermark for that partition as you're suggesting)
2) Take a checkpoint N
3) Delete the topic (this effectively makes all the checkpoints < N
invalid)
Agree.
If you switch order of 2) and 3) you have no way to recover from failure.
Also for this to work properly, actual topic deletion would need to be
performed by Flink (not by the 3rd party system as suggested in the
original question) in the second phase of 2PC (when you're sure that
you've successfully taken a checkpoint, that has seen all the data).
Agree, the deletion would have to be preceded by something like
partition drain. What is needed is the watermark reaching end of global
window (+inf) and a checkpoint. After that, the source can be removed
and what happens with it is no concern any more. That applies to all
sources in general. I don't know the implementation details, but it
seems that the topic would have to be somehow marked as "draining", it
would then be the responsibility of the reader to shift the watermark
belonging to partitions of that topic to +inf. It would then be
responsibility of Flink to verify that such source is removed only after
a checkpoint is taken. Otherwise there would be possible risk of data loss.
This definitely looks like quite complex process.
Best,
D.
On Tue, Sep 14, 2021 at 3:44 PM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi,
just out of curiosity, would this problem be solvable by the
ability to
remove partitions, that declare, that do not contain more data
(watermark reaching end of global window)? There is probably another
problem with that topic can be recreated after being deleted, which
could result in watermark moving back in time, but this problem
might be
there already.
Jan
On 9/14/21 3:08 PM, Fabian Paul wrote:
> Hi Constantinos,
>
> I agree with David that it is not easily possible to remove a
partition while a Flink job is running. Imagine the following
scenario:
>
> Your Flink job initially works on 2 partitions belonging to two
different topics and you have checkpointing enabled to guarantee
> exactly-once delivery. It implies that on every checkpoint the
offsets of the Kafka topic are stored in a Flink checkpoint to recover
> from them in case of a failure.
> Now you trigger the removal of one of the topics and the
discovery detects that one of the partitions was removed. If the
pipeline
> now fails before the next checkpoint was taken Flink will try to
recover from the previous checkpoint which is invalid by now because
> the partition is not available anymore.
>
> Only if you do not care about loosing data it is possible to
simply ignore the removed partition.
>
> Best,
> Fabian