-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA512

Nicolas,

you can filter "old" messages using KStream#transform(). It provides a
ProcessorContext object that allows you to access the timestamp of the
currently processed record. Thus, you "only" need to maintain the
largest timestamp you can ever seen (you should attach a state to you
transformer that hold this TS) and drop a record (by returning null)
if the current record TS is smaller than the "application time" minus
30 days. For record with larger timestamp than "application time" you
need to update "application time" of course to advance it to the
current record's timestamp.

- -Matthias


On 10/20/16 5:51 AM, Nicolas Fouché wrote:
> I forgot to mention that the default maintain duration of a window
> is 1 day. Would it be useful to warn the developer is the current
> maintain duration is "not compatible" with the current  window size
> and interval ?
> 
> 2016-10-20 14:49 GMT+02:00 Nicolas Fouché <nfou...@onfocus.io>:
> 
>> Hi Michael,
>> 
>> thanks for the quick reply. Let's try to explain things a bit
>> better:
>> 
>> Within a call to `aggregateByKey`, I specified a window with the
>> size of 15 days and an interval (hop) of 1 day, without setting a
>> maintain duration. I produced a message, the timestamp being the
>> current clock time, and looked at the messages produced in my
>> final output topic: 15 messages were there, as expected, with a
>> first aggregation. Then I produced a second message, and 15
>> messages were also produced, but: only the 2 messages from the 2
>> most recent windows had an updated aggregate. The 13 other ones
>> contained the result of an initial aggregation. For example, if
>> the aggregation was a simple counter, I would have 2 times a
>> value of *2*, and 13 times a value of *1*. Producing a third
>> message: I would have 2 times a value of *3*, and 13 times a
>> value of *1*. etc.
>> 
>> Then I wondered about this "maintain duration thing". I changed
>> it to 30 days, and then all went well. The counter of every
>> windows was incremented normally.
>> 
>> So my conclusion was: an aggregate computed in a window *started*
>> before the -now minus the maintain duration- is automatically
>> dropped.
>> 
>> To the problem: this aggregate is pushed to a topic, and this
>> topic is consumed by Kafka Connect, to end up in Aerospike by
>> replacing an existing record ( 
>> https://github.com/aerospike/aerospike-kafka-connector). So if I
>> make a mistake and send a message with a 40 days old timestamp,
>> a new aggregate will be generated for this old window, and the
>> new aggregate will overwrite a record in Aerospike.
>> 
>> The question is then: I want to prevent my topology from
>> accepting this 30+ days old messages, to avoid destroying data in
>> the final database. Of course I can call `filter` to ignore old
>> messages, but i would have somewhere in my code a window
>> definition, and in another place a filter which would absolutely
>> need to consider the window maintain duration. It would start to
>> get messy with complex topologies. You get the idea, I wonder if
>> a developer has to write this code, or if a config somewhere 
>> would help him.
>> 
>> This whole behaviour, which I totally accept as "by-design", is
>> as far as I know undocumented. And well, I find it quite harmful.
>> Or did I miss something ?
>> 
>> Finally, about "repairing": If I want to "repair" my aggregates
>> by re-producing all my old messages, knowing about this window
>> maintain time is essential. I have to change my code (or my
>> config file) to ensure that my window maintain times are long 
>> enough.
>> 
>> Thanks for sharing [1], I already read it. I guess I'll ask more
>> specific questions when the time for a repair happens.
>> 
>> [1] http://www.confluent.io/blog/data-reprocessing-with-kafka-st 
>> reams-resetting-a-streams-application
>> 
>> 
>> 
>> 2016-10-20 10:37 GMT+02:00 Michael Noll <mich...@confluent.io>:
>> 
>>> Nicolas,
>>> 
>>>> I set the maintain duration of the window to 30 days. If it
>>>> consumes a message older than 30 days, then a new aggregate
>>>> is
>>> created for this old window.
>>> 
>>> I assume you mean:  If a message should have been included in
>>> the original ("old") window but that message happens to arrive
>>> late (after the "original" 30 days), then a new aggregate is
>>> created for this old window? I wanted to ask this first because
>>> answering your questions depends on what exactly you mean
>>> here.
>>> 
>>> 
>>>> The problem is that this old windowed aggregate is of course
>>>> incomplete
>>> and
>>>> will overwrite a record in the final database.
>>> 
>>> Not sure I understand -- why would the old windowed aggregate
>>> be incomplete?  Could you explain a bit more what you mean?
>>> 
>>> 
>>>> By the way, is there any article about replaying old
>>>> messages. Some tips and tricks, like "you'd better do that in
>>>> another deployment of your topology", and/or "you'd better
>>>> use topics dedicated to repair".
>>> 
>>> I am not aware of a deep dive article or docs on that just yet.
>>> There's a first blog post [1] about Kakfa's new Application
>>> Reset Tool that goes into this direction, but this is only a
>>> first step into the direction of replaying/reprocessing of old
>>> messages.  Do you have specific questions here that we can help
>>> you with in the meantime?
>>> 
>>> [1] 
>>> http://www.confluent.io/blog/data-reprocessing-with-kafka-st 
>>> reams-resetting-a-streams-application
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Oct 20, 2016 at 9:40 AM, Nicolas Fouché
>>> <nfou...@onfocus.io> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I aggregate some data with `aggregateByKey` and a
>>>> `TimeWindows`.
>>>> 
>>>> I set the maintain duration of the window to 30 days. If it
>>>> consumes a message older than 30 days, then a new aggregate
>>>> is created for this old window. The problem is that this old
>>>> windowed aggregate is of course incomplete
>>> and
>>>> will overwrite a record in the final database.
>>>> 
>>>> So is there a way to dismiss these old messages ?
>>>> 
>>>> I only see the point of accepting old messages when the
>>>> topology is launched in "repair" mode. By the way, is there
>>>> any article about replaying old messages. Some tips and
>>>> tricks, like "you'd better do that in another deployment of
>>>> your topology", and/or "you'd better use topics dedicated to
>>>> repair".
>>>> 
>>>> Thanks Nicolas
>>>> 
>>> 
>> 
>> 
> 
-----BEGIN PGP SIGNATURE-----
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJYCQCeAAoJECnhiMLycopPsEoP/RgIc3gNj0LHTW9UguABnunQ
eDb+PT8tMawji67GvugP8Se/DIdGYvYag9cqQ0IVIFfaTcTwefdT00wPLhmFlhUB
RdfceLlNhHgSVBRzSEoUcEMQ8VuMP/J1Mz0S1Ac3SKPA2ocj/6MEizMDmn7PO+/K
dIKFJgHr5sKziwd6QDvLzvqSHrqHD1f1QEd0GjzLoXjTGGgOuPOm2C475Lbljix2
qKDrc8FDNiM41wP0mZFAErE2awaLkp53O0fl/tMPzxIMvv9pFG2fMXl3AMHrEk5n
sJXjpy18k4F/VeDyDrlMrkcqRhg9cQtiH7mPYupffkDq+3dVIqPseyRB9CQ5vm6N
5ZHlE4iPc/ozu7erCboMCZTAb/7iq4kVMKJjF1Ho1ghRx6dy+1bhxSgRybmGVpbz
Loo+Zibm3JjyhnB2BZg5Pp/IyZIn4xMJ2dqSlXxlPzEr5TV+Mc39pVnNmFlCUA2r
q88CZcU0gejJLkwPNBw9jcSg/GDk70pTp6ueo+ELwq4W6L70Q07tU200qXQ00PQW
CUuZldGLQUvSs2LY2yxErKQRWp7K8oN8JqqPDKi5MM/i6KUwwpNH5D01aAZmzSVi
iIsibEWA/yJXqeNf7Tq90BZJurheyWZmFInidM5P3wgPLYrYZAI2XhqypcaQQ4Sd
BJRv8Pm62Vu1YvsIEXi6
=pd50
-----END PGP SIGNATURE-----

Reply via email to