Hi Michael, thanks for the quick reply. Let's try to explain things a bit better:
Within a call to `aggregateByKey`, I specified a window with the size of 15 days and an interval (hop) of 1 day, without setting a maintain duration. I produced a message, the timestamp being the current clock time, and looked at the messages produced in my final output topic: 15 messages were there, as expected, with a first aggregation. Then I produced a second message, and 15 messages were also produced, but: only the 2 messages from the 2 most recent windows had an updated aggregate. The 13 other ones contained the result of an initial aggregation. For example, if the aggregation was a simple counter, I would have 2 times a value of *2*, and 13 times a value of *1*. Producing a third message: I would have 2 times a value of *3*, and 13 times a value of *1*. etc. Then I wondered about this "maintain duration thing". I changed it to 30 days, and then all went well. The counter of every windows was incremented normally. So my conclusion was: an aggregate computed in a window *started* before the -now minus the maintain duration- is automatically dropped. To the problem: this aggregate is pushed to a topic, and this topic is consumed by Kafka Connect, to end up in Aerospike by replacing an existing record ( https://github.com/aerospike/aerospike-kafka-connector). So if I make a mistake and send a message with a 40 days old timestamp, a new aggregate will be generated for this old window, and the new aggregate will overwrite a record in Aerospike. The question is then: I want to prevent my topology from accepting this 30+ days old messages, to avoid destroying data in the final database. Of course I can call `filter` to ignore old messages, but i would have somewhere in my code a window definition, and in another place a filter which would absolutely need to consider the window maintain duration. It would start to get messy with complex topologies. You get the idea, I wonder if a developer has to write this code, or if a config somewhere would help him. This whole behaviour, which I totally accept as "by-design", is as far as I know undocumented. And well, I find it quite harmful. Or did I miss something ? Finally, about "repairing": If I want to "repair" my aggregates by re-producing all my old messages, knowing about this window maintain time is essential. I have to change my code (or my config file) to ensure that my window maintain times are long enough. Thanks for sharing [1], I already read it. I guess I'll ask more specific questions when the time for a repair happens. [1] http://www.confluent.io/blog/data-reprocessing-with-kafka- streams-resetting-a-streams-application 2016-10-20 10:37 GMT+02:00 Michael Noll <mich...@confluent.io>: > Nicolas, > > > I set the maintain duration of the window to 30 days. > > If it consumes a message older than 30 days, then a new aggregate is > created > for this old window. > > I assume you mean: If a message should have been included in the original > ("old") window but that message happens to arrive late (after the > "original" 30 days), then a new aggregate is created for this old window? > I wanted to ask this first because answering your questions depends on what > exactly you mean here. > > > > The problem is that this old windowed aggregate is of course incomplete > and > > will overwrite a record in the final database. > > Not sure I understand -- why would the old windowed aggregate be > incomplete? Could you explain a bit more what you mean? > > > > By the way, is there any article about replaying old messages. Some tips > > and tricks, like "you'd better do that in another deployment of your > > topology", and/or "you'd better use topics dedicated to repair". > > I am not aware of a deep dive article or docs on that just yet. There's a > first blog post [1] about Kakfa's new Application Reset Tool that goes into > this direction, but this is only a first step into the direction of > replaying/reprocessing of old messages. Do you have specific questions > here that we can help you with in the meantime? > > [1] > http://www.confluent.io/blog/data-reprocessing-with-kafka- > streams-resetting-a-streams-application > > > > > > > On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché <nfou...@onfocus.io> > wrote: > > > Hi, > > > > I aggregate some data with `aggregateByKey` and a `TimeWindows`. > > > > I set the maintain duration of the window to 30 days. > > If it consumes a message older than 30 days, then a new aggregate is > > created for this old window. > > The problem is that this old windowed aggregate is of course incomplete > and > > will overwrite a record in the final database. > > > > So is there a way to dismiss these old messages ? > > > > I only see the point of accepting old messages when the topology is > > launched in "repair" mode. > > By the way, is there any article about replaying old messages. Some tips > > and tricks, like "you'd better do that in another deployment of your > > topology", and/or "you'd better use topics dedicated to repair". > > > > Thanks > > Nicolas > > >