I forgot to mention that the default maintain duration of a window is 1 day. Would it be useful to warn the developer is the current maintain duration is "not compatible" with the current window size and interval ?
2016-10-20 14:49 GMT+02:00 Nicolas Fouché <nfou...@onfocus.io>: > Hi Michael, > > thanks for the quick reply. Let's try to explain things a bit better: > > Within a call to `aggregateByKey`, I specified a window with the size of > 15 days and an interval (hop) of 1 day, without setting a maintain duration. > I produced a message, the timestamp being the current clock time, and > looked at the messages produced in my final output topic: 15 messages were > there, as expected, with a first aggregation. > Then I produced a second message, and 15 messages were also produced, but: > only the 2 messages from the 2 most recent windows had an updated > aggregate. The 13 other ones contained the result of an initial > aggregation. For example, if the aggregation was a simple counter, I would > have 2 times a value of *2*, and 13 times a value of *1*. > Producing a third message: I would have 2 times a value of *3*, and 13 > times a value of *1*. > etc. > > Then I wondered about this "maintain duration thing". I changed it to 30 > days, and then all went well. The counter of every windows was incremented > normally. > > So my conclusion was: an aggregate computed in a window *started* before > the -now minus the maintain duration- is automatically dropped. > > To the problem: > this aggregate is pushed to a topic, and this topic is consumed by Kafka > Connect, to end up in Aerospike by replacing an existing record ( > https://github.com/aerospike/aerospike-kafka-connector). > So if I make a mistake and send a message with a 40 days old timestamp, a > new aggregate will be generated for this old window, and the new aggregate > will overwrite a record in Aerospike. > > The question is then: I want to prevent my topology from accepting this > 30+ days old messages, to avoid destroying data in the final database. Of > course I can call `filter` to ignore old messages, but i would have > somewhere in my code a window definition, and in another place a filter > which would absolutely need to consider the window maintain duration. It > would start to get messy with complex topologies. You get the idea, I > wonder if a developer has to write this code, or if a config somewhere > would help him. > > This whole behaviour, which I totally accept as "by-design", is as far as > I know undocumented. And well, I find it quite harmful. Or did I miss > something ? > > Finally, about "repairing": > If I want to "repair" my aggregates by re-producing all my old messages, > knowing about this window maintain time is essential. I have to change my > code (or my config file) to ensure that my window maintain times are long > enough. > > Thanks for sharing [1], I already read it. I guess I'll ask more specific > questions when the time for a repair happens. > > [1] > http://www.confluent.io/blog/data-reprocessing-with-kafka-st > reams-resetting-a-streams-application > > > > 2016-10-20 10:37 GMT+02:00 Michael Noll <mich...@confluent.io>: > >> Nicolas, >> >> > I set the maintain duration of the window to 30 days. >> > If it consumes a message older than 30 days, then a new aggregate is >> created >> for this old window. >> >> I assume you mean: If a message should have been included in the original >> ("old") window but that message happens to arrive late (after the >> "original" 30 days), then a new aggregate is created for this old window? >> I wanted to ask this first because answering your questions depends on >> what >> exactly you mean here. >> >> >> > The problem is that this old windowed aggregate is of course incomplete >> and >> > will overwrite a record in the final database. >> >> Not sure I understand -- why would the old windowed aggregate be >> incomplete? Could you explain a bit more what you mean? >> >> >> > By the way, is there any article about replaying old messages. Some tips >> > and tricks, like "you'd better do that in another deployment of your >> > topology", and/or "you'd better use topics dedicated to repair". >> >> I am not aware of a deep dive article or docs on that just yet. There's a >> first blog post [1] about Kakfa's new Application Reset Tool that goes >> into >> this direction, but this is only a first step into the direction of >> replaying/reprocessing of old messages. Do you have specific questions >> here that we can help you with in the meantime? >> >> [1] >> http://www.confluent.io/blog/data-reprocessing-with-kafka-st >> reams-resetting-a-streams-application >> >> >> >> >> >> >> On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché <nfou...@onfocus.io> >> wrote: >> >> > Hi, >> > >> > I aggregate some data with `aggregateByKey` and a `TimeWindows`. >> > >> > I set the maintain duration of the window to 30 days. >> > If it consumes a message older than 30 days, then a new aggregate is >> > created for this old window. >> > The problem is that this old windowed aggregate is of course incomplete >> and >> > will overwrite a record in the final database. >> > >> > So is there a way to dismiss these old messages ? >> > >> > I only see the point of accepting old messages when the topology is >> > launched in "repair" mode. >> > By the way, is there any article about replaying old messages. Some tips >> > and tricks, like "you'd better do that in another deployment of your >> > topology", and/or "you'd better use topics dedicated to repair". >> > >> > Thanks >> > Nicolas >> > >> > >