From what I understood state on sinks is included in the operator state of the sinks and pushed to kafka when 3-phase commit is complete. i.e. when the checkpoint completion notification arrives at the sinks.
There are several pitfalls I am really curious to check and see how they are (going to be) handled, this is of course not as simple as it sounds. It really depends on the guarantees and operations the outside storage gives you. For example, how can we know that the pushed records are actually persisted in kafka in a single transaction? Not as simple as it sounds. @Chesnay can you tell us more? > On 05 Feb 2016, at 13:33, Paris Carbone <[email protected]> wrote: > > That would be good indeed. I just learned about it from Stephan mentioned. It > sounds correct to me along the lines but it would be nice to see the details. > >> On 05 Feb 2016, at 13:32, Ufuk Celebi <[email protected]> wrote: >> >> >>> On 05 Feb 2016, at 13:28, Paris Carbone <[email protected]> wrote: >>> >>> Hi Gabor, >>> >>> The sinks should aware that the global checkpoint is indeed persisted >>> before emitting so they will have to wait until they are notified for its >>> completion before pushing to Kafka. The current view of the local state is >>> not the actual persisted view so checking against is like relying on dirty >>> state. Imagine the following scenario: >>> >>> 1) sink pushes to kafka record k and updates local buffer for k >>> 2) sink snapshots k and the rest of its state on checkpoint barrier >>> 3) global checkpoint fails due to some reason (e.g. another sink subtask >>> failed) and the job gets restarted >>> 4) sink pushes again record k to kafka since the last global snapshots did >>> not complete before and k is not in the local buffer >>> >>> Chesnay’s approach seems to be valid and best effort for the time being. >> >> Chesnay’s approach is not part of this thread. Can you or Chesnay >> elaborate/provide a link? >> >> – Ufuk >> >
