Hi Matthias,

Thanks for such a thorough response!

I guess there are cases when a determinism might be preferred over
computing "more correct results" (e.g. in unit tests, where one manually
lays out an order of incoming events and wants to get an exact output), but
from now on I can simply assume that windows might be stored longer than
the specified time.

Few more questions if you don't mind.

- What should happen when the window 00:00..01:00 will finally get purged
(and the internal stream time will get bumped to say time 10:00) but then I
receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again
or the event will be dropped because it's way older than the internal
stream time?

- I got a bit confused when you mentioned a key in the window name "open
(first) >>>b<<<-window". To make it clear – I assume that because in Kafka
Streams hopping/tumbling windows are aligned, an internal stream time is
not related to the aggregation keys but just to the input partitions,
right? I.e. if I have only one partition there will be only one internal
stream time watermark regardless of how many keys do I have? Will this
behavior be the same for sliding windows? Feel free to just point me to the
code :)


> Hi Alexander,
> first, both mailing list should be fine :)
> About internal time tracking: Kafka Streams tracks an internal "stream
> time" that is determined as the minimum "partition time" over all its
> input partitions.
> The "partition time" is tracked for each input partition individually
> and is the minimum timestamp of all currently buffered record of the
> partition.
> So depending on how many records from which partitions are fetch on
> poll() "stream time" gets advanced accordingly -- this is kinda
> non-deterministic because we cannot predict what poll() will return.
> Because all buffered records are considered, "stream time" is advance
> conservatively. The main idea about this is to keep windows open longer
> if we know in advance that there will be a late record (as we observe
> the late record in the buffer already). Thus, we can compute "more
> correct" results with regard to late arriving records.
> (Just a heads up, we might change this behavior in future releases --
> thus, it is not documented anywhere but in the code ;) )
> About retention time and purging windows: old windows should be dropped
> after "stream time" advances beyond the point on which it is guaranteed
> to maintain the window. It should happen "as soon as possible" but there
> is no strict guarantee (thus "kept at least").
> Furthermore, Streams applies a minimum retention time of 1 minute --
> thus, for your specific use case, the 30 seconds you do specify are not
> used (this is unfortunately not documented :( ). However, this in
> unrelated to the behavior you see -- I just mention it for completeness.
> Thus for you specific use case, streams time is most likely not advance
> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
> most likely in the buffer) and thus, the next b-record with TS=15
> seconds will be added the the still open (first) b-window and both
> values 2 and 10 get added to 12.
> Also keep in mind that we do some deduplication on KTable result using
> an internal cache. This can also influence what output record you see.
> For further details see:

> -Matthias

On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
alexander.demi...@stitchfix.com> wrote:

> Hi folks,
> I'm experimenting with Kafka Streams windowed aggregation and came across
> window retention period behavior I don't fully understand.
> I'm using custom timestamp extractor which gets the timestamp from the
> payload. Values are aggregated using tumbling time windows and summed by
> the key.
> I am using kafka and kafka-streams with version.
> Full code can be found at https://gist.github.com/xdralex/
> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following:
> val input: KStream[String, String] = builder.stream(Serdes.String(),
> Serdes.String(), "TimeInputTopic")
> val window: Windows[TimeWindow] = TimeWindows.of(60000).
> advanceBy(60000).until(30000)
> val aggregated: KTable[Windowed[String], JInt] = input
>   .mapValues((v: String) => parse(v)._2)
>   .groupByKey(Serdes.String(), Serdes.Integer())
>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> "TimeStore1")
> aggregated.foreach {
>   (w: Windowed[String], s: JInt) =>
>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> }
> Here is the data being sent to TimeInputTopic:
> a,1970-01-01T00:00:00Z 1
> b,1970-01-01T00:00:01Z 2
> a,1970-01-01T00:00:02Z 3
> b,1970-01-01T00:05:01Z 10
> b,1970-01-01T00:00:15Z 10
> Here is the output:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
> Here is what confuses me.
> I would expect that once an event <b,05:01 10> is received, it should
> update some sort of "current" time value (watermark?), and, because 05:01 is
> bigger than 00:00..01:00 + 30 seconds of retention, either:
> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> received, recreate this bucket. This means there would be two outputs for
> 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> Not sure about this behavior because http://docs.confluent.
> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
> "Kafka Streams guarantees to keep a window for *at least this specified
> time*". So I guess the window can be kept longer...
> B) Just drop the incoming event <b,00:15 10> altogether. In this case
> there would be only one output for 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> I could expect this because http://docs.confluent.
> io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives
> after the retention period has passed, the record cannot be processed and
> is dropped".
> Hope all this makes sense. Few questions:
> – If the window can be kept in store longer, are there any thresholds when
> it will finally be purged? For example, it would be nice to flush old
> buckets if they are taking too much space.
> – How is "current" time value updated / how do Kafka Streams decide that
> the retention period has passed? Does it maintain a watermark with the
> biggest time seen?
> – What is the right mailing list to ask questions about Kafka Streams? I
> had a choice between this one and Confluent Platform list, and given that
> open source part of CP consists from patched vanilla Kafka, was not sure
> where to write.
> Thanks,
> Alex

Reply via email to