Hi Niels! That could actually work, given a way to identify messages with a unique ID.
Would be quite an exercise to implement... Stephan On Fri, Feb 5, 2016 at 2:14 PM, Niels Basjes <[email protected]> wrote: > @Stephan; > Kafka keeps the messages for a configured TTL (i.e. a few days/weeks). > So my idea is based on the fact that Kafka has all the messages and that I > can read those messages from Kafka to validate if I should or should not > write them again. > > Let me illustrate what I had in mind: > I write messages to Kafka and at the moment of the checkpoint the last > message ID I wrote is 5. > Then I write 6,7,8 > FAIL > Recover: > Open a reader starting at message 5 > Get message 6 -> Read from Kafka --> Already have this --> Skip > Get message 7 -> Read from Kafka --> Already have this --> Skip > Get message 8 -> Read from Kafka --> Already have this --> Skip > Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume > normal operations. > > Like I said: This is just the first rough idea I had on a possible > direction how this can be solved without the latency impact of buffering. > > Niels Basjes > > > On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <[email protected]> wrote: > >> @Niels: I don't fully understand your approach so far. >> >> If you write a message to Kafka between two checkpoints, where do you >> store the information that this particular message is already written (I >> think this would be the ID in your example). >> Such an information would need to be persisted for every written messages >> (or very small group of messages). >> >> Stephan >> >> >> On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <[email protected]> wrote: >> >>> Hi, >>> >>> Buffering the data (in all cases) would hurt the latency so much that >>> Flink is effectively reverting to microbatching (where batch size is >>> checkpoint period) with regards of the output. >>> >>> My initial thoughts on how to solve this was as follows: >>> 1) The output persists the ID of the last message it wrote to Kafka in >>> the checkpoint. >>> 2) Upon recovery the sink would >>> 2a) Record the offset Kafka is at at that point in time >>> 2b) For all 'new' messages validate if it must write this message by >>> reading from Kafka (starting at the offset in the checkpoint) and if the >>> message is already present it would skip it. >>> 3) If a message arrives that has not yet written the message is written. >>> Under the assumption that the messages arrive in the same order as before >>> the sink can now simply run as normal. >>> >>> This way the performance is only impacted in the (short) period after >>> the recovery of a disturbance. >>> >>> What do you think? >>> >>> Niels Basjes >>> >>> >>> >>> On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <[email protected]> wrote: >>> >>>> Hi Niels! >>>> >>>> In general, exactly once output requires transactional cooperation from >>>> the target system. Kafka has that on the roadmap, we should be able to >>>> integrate that once it is out. >>>> That means output is "committed" upon completed checkpoints, which >>>> guarantees nothing is written multiple times. >>>> >>>> Chesnay is working on an interesting prototype as a generic solution >>>> (also for Kafka, while they don't have that feature): >>>> It buffers the data in the sink persistently (using the fault tolerance >>>> state backends) and pushes the results out on notification of a completed >>>> checkpoint. >>>> That gives you exactly once semantics, but involves an extra >>>> materialization of the data. >>>> >>>> >>>> I think that there is actually a fundamental latency issue with >>>> "exactly once sinks", no matter how you implement them in any systems: >>>> You can only commit once you are sure that everything went well, to a >>>> specific point where you are sure no replay will ever be needed. >>>> >>>> So the latency in Flink for an exactly-once output would be at least >>>> the checkpoint interval. >>>> >>>> I'm eager to hear your thoughts on this. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> It is my understanding that the exactly-once semantics regarding the >>>>> input from Kafka is based on the checkpointing in the source component >>>>> retaining the offset where it was at the checkpoint moment. >>>>> >>>>> My question is how does that work for a sink? How can I make sure that >>>>> (in light of failures) each message that is read from Kafka (my input) is >>>>> written to Kafka (my output) exactly once? >>>>> >>>>> >>>>> -- >>>>> Best regards / Met vriendelijke groeten, >>>>> >>>>> Niels Basjes >>>>> >>>> >>>> >>> >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >> >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >
