Hello Nicolas,

Thanks for reporting this, and your observations about the window retention
mechanism are correct. I think our documentation do have a few lacks about
the windowing operations in that:

1. We should mention about the default retention period of the window as
one day (or should the default value be the window size?).
2. We should mention that the window retention mechanism is based on the
stream time, which is in turn based on the received message's timestamp.
And give an example like the one you mentioned as the repairing scenario to
illustrate how to set the retention period.

And about the "dropped windows gets re-created due to an old message": hmm
that is a good point, maybe it's better to not re-create a dropped window
any more and simply ignore the old message and let it drop on the floor.
Anyone have some thoughts about this issue?

Guozhang


On Thu, Oct 20, 2016 at 5:51 AM, Nicolas Fouché <nfou...@onfocus.io> wrote:

> I forgot to mention that the default maintain duration of a window is 1
> day. Would it be useful to warn the developer is the current maintain
> duration is "not compatible" with the current  window size and interval ?
>
> 2016-10-20 14:49 GMT+02:00 Nicolas Fouché <nfou...@onfocus.io>:
>
> > Hi Michael,
> >
> > thanks for the quick reply. Let's try to explain things a bit better:
> >
> > Within a call to `aggregateByKey`, I specified a window with the size of
> > 15 days and an interval (hop) of 1 day, without setting a maintain
> duration.
> > I produced a message, the timestamp being the current clock time, and
> > looked at the messages produced in my final output topic: 15 messages
> were
> > there, as expected, with a first aggregation.
> > Then I produced a second message, and 15 messages were also produced,
> but:
> > only the 2 messages from the 2 most recent windows had an updated
> > aggregate. The 13 other ones contained the result of an initial
> > aggregation. For example, if the aggregation was a simple counter, I
> would
> > have 2 times a value of *2*, and 13 times a value of *1*.
> > Producing a third message: I would have 2 times a value of *3*, and 13
> > times a value of *1*.
> > etc.
> >
> > Then I wondered about this "maintain duration thing". I changed it to 30
> > days, and then all went well. The counter of every windows was
> incremented
> > normally.
> >
> > So my conclusion was: an aggregate computed in a window *started* before
> > the -now minus the maintain duration- is automatically dropped.
> >
> > To the problem:
> > this aggregate is pushed to a topic, and this topic is consumed by Kafka
> > Connect, to end up in Aerospike by replacing an existing record (
> > https://github.com/aerospike/aerospike-kafka-connector).
> > So if I make a mistake and send a message with a 40 days old timestamp, a
> > new aggregate will be generated for this old window, and the new
> aggregate
> > will overwrite a record in Aerospike.
> >
> > The question is then: I want to prevent my topology from accepting this
> > 30+ days old messages, to avoid destroying data in the final database. Of
> > course I can call `filter` to ignore old messages, but i would have
> > somewhere in my code a window definition, and in another place a filter
> > which would absolutely need to consider the window maintain duration. It
> > would start to get messy with complex topologies. You get the idea, I
> > wonder if a developer has to write this code, or if a config somewhere
> > would help him.
> >
> > This whole behaviour, which I totally accept as "by-design", is as far as
> > I know undocumented. And well, I find it quite harmful. Or did I miss
> > something ?
> >
> > Finally, about "repairing":
> > If I want to "repair" my aggregates by re-producing all my old messages,
> > knowing about this window maintain time is essential. I have to change my
> > code (or my config file) to ensure that my window maintain times are long
> > enough.
> >
> > Thanks for sharing [1], I already read it. I guess I'll ask more specific
> > questions when the time for a repair happens.
> >
> > [1]
> > http://www.confluent.io/blog/data-reprocessing-with-kafka-st
> > reams-resetting-a-streams-application
> >
> >
> >
> > 2016-10-20 10:37 GMT+02:00 Michael Noll <mich...@confluent.io>:
> >
> >> Nicolas,
> >>
> >> > I set the maintain duration of the window to 30 days.
> >> > If it consumes a message older than 30 days, then a new aggregate is
> >> created
> >> for this old window.
> >>
> >> I assume you mean:  If a message should have been included in the
> original
> >> ("old") window but that message happens to arrive late (after the
> >> "original" 30 days), then a new aggregate is created for this old
> window?
> >> I wanted to ask this first because answering your questions depends on
> >> what
> >> exactly you mean here.
> >>
> >>
> >> > The problem is that this old windowed aggregate is of course
> incomplete
> >> and
> >> > will overwrite a record in the final database.
> >>
> >> Not sure I understand -- why would the old windowed aggregate be
> >> incomplete?  Could you explain a bit more what you mean?
> >>
> >>
> >> > By the way, is there any article about replaying old messages. Some
> tips
> >> > and tricks, like "you'd better do that in another deployment of your
> >> > topology", and/or "you'd better use topics dedicated to repair".
> >>
> >> I am not aware of a deep dive article or docs on that just yet.
> There's a
> >> first blog post [1] about Kakfa's new Application Reset Tool that goes
> >> into
> >> this direction, but this is only a first step into the direction of
> >> replaying/reprocessing of old messages.  Do you have specific questions
> >> here that we can help you with in the meantime?
> >>
> >> [1]
> >> http://www.confluent.io/blog/data-reprocessing-with-kafka-st
> >> reams-resetting-a-streams-application
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché <nfou...@onfocus.io>
> >> wrote:
> >>
> >> > Hi,
> >> >
> >> > I aggregate some data with `aggregateByKey` and a `TimeWindows`.
> >> >
> >> > I set the maintain duration of the window to 30 days.
> >> > If it consumes a message older than 30 days, then a new aggregate is
> >> > created for this old window.
> >> > The problem is that this old windowed aggregate is of course
> incomplete
> >> and
> >> > will overwrite a record in the final database.
> >> >
> >> > So is there a way to dismiss these old messages ?
> >> >
> >> > I only see the point of accepting old messages when the topology is
> >> > launched in "repair" mode.
> >> > By the way, is there any article about replaying old messages. Some
> tips
> >> > and tricks, like "you'd better do that in another deployment of your
> >> > topology", and/or "you'd better use topics dedicated to repair".
> >> >
> >> > Thanks
> >> > Nicolas
> >> >
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to