@Stephan; Kafka keeps the messages for a configured TTL (i.e. a few days/weeks). So my idea is based on the fact that Kafka has all the messages and that I can read those messages from Kafka to validate if I should or should not write them again.
Let me illustrate what I had in mind: I write messages to Kafka and at the moment of the checkpoint the last message ID I wrote is 5. Then I write 6,7,8 FAIL Recover: Open a reader starting at message 5 Get message 6 -> Read from Kafka --> Already have this --> Skip Get message 7 -> Read from Kafka --> Already have this --> Skip Get message 8 -> Read from Kafka --> Already have this --> Skip Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume normal operations. Like I said: This is just the first rough idea I had on a possible direction how this can be solved without the latency impact of buffering. Niels Basjes On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote: > @Niels: I don't fully understand your approach so far. > > If you write a message to Kafka between two checkpoints, where do you > store the information that this particular message is already written (I > think this would be the ID in your example). > Such an information would need to be persisted for every written messages > (or very small group of messages). > > Stephan > > > On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <ni...@basjes.nl> wrote: > >> Hi, >> >> Buffering the data (in all cases) would hurt the latency so much that >> Flink is effectively reverting to microbatching (where batch size is >> checkpoint period) with regards of the output. >> >> My initial thoughts on how to solve this was as follows: >> 1) The output persists the ID of the last message it wrote to Kafka in >> the checkpoint. >> 2) Upon recovery the sink would >> 2a) Record the offset Kafka is at at that point in time >> 2b) For all 'new' messages validate if it must write this message by >> reading from Kafka (starting at the offset in the checkpoint) and if the >> message is already present it would skip it. >> 3) If a message arrives that has not yet written the message is written. >> Under the assumption that the messages arrive in the same order as before >> the sink can now simply run as normal. >> >> This way the performance is only impacted in the (short) period after the >> recovery of a disturbance. >> >> What do you think? >> >> Niels Basjes >> >> >> >> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <se...@apache.org> wrote: >> >>> Hi Niels! >>> >>> In general, exactly once output requires transactional cooperation from >>> the target system. Kafka has that on the roadmap, we should be able to >>> integrate that once it is out. >>> That means output is "committed" upon completed checkpoints, which >>> guarantees nothing is written multiple times. >>> >>> Chesnay is working on an interesting prototype as a generic solution >>> (also for Kafka, while they don't have that feature): >>> It buffers the data in the sink persistently (using the fault tolerance >>> state backends) and pushes the results out on notification of a completed >>> checkpoint. >>> That gives you exactly once semantics, but involves an extra >>> materialization of the data. >>> >>> >>> I think that there is actually a fundamental latency issue with "exactly >>> once sinks", no matter how you implement them in any systems: >>> You can only commit once you are sure that everything went well, to a >>> specific point where you are sure no replay will ever be needed. >>> >>> So the latency in Flink for an exactly-once output would be at least the >>> checkpoint interval. >>> >>> I'm eager to hear your thoughts on this. >>> >>> Greetings, >>> Stephan >>> >>> >>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <ni...@basjes.nl> wrote: >>> >>>> Hi, >>>> >>>> It is my understanding that the exactly-once semantics regarding the >>>> input from Kafka is based on the checkpointing in the source component >>>> retaining the offset where it was at the checkpoint moment. >>>> >>>> My question is how does that work for a sink? How can I make sure that >>>> (in light of failures) each message that is read from Kafka (my input) is >>>> written to Kafka (my output) exactly once? >>>> >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>> >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > > -- Best regards / Met vriendelijke groeten, Niels Basjes