Hi Alexander,

first, both mailing list should be fine :)

About internal time tracking: Kafka Streams tracks an internal "stream
time" that is determined as the minimum "partition time" over all its
input partitions.

The "partition time" is tracked for each input partition individually
and is the minimum timestamp of all currently buffered record of the
partition.

So depending on how many records from which partitions are fetch on
poll() "stream time" gets advanced accordingly -- this is kinda
non-deterministic because we cannot predict what poll() will return.

Because all buffered records are considered, "stream time" is advance
conservatively. The main idea about this is to keep windows open longer
if we know in advance that there will be a late record (as we observe
the late record in the buffer already). Thus, we can compute "more
correct" results with regard to late arriving records.
(Just a heads up, we might change this behavior in future releases --
thus, it is not documented anywhere but in the code ;) )



About retention time and purging windows: old windows should be dropped
after "stream time" advances beyond the point on which it is guaranteed
to maintain the window. It should happen "as soon as possible" but there
is no strict guarantee (thus "kept at least").

Furthermore, Streams applies a minimum retention time of 1 minute --
thus, for your specific use case, the 30 seconds you do specify are not
used (this is unfortunately not documented :( ). However, this in
unrelated to the behavior you see -- I just mention it for completeness.


Thus for you specific use case, streams time is most likely not advance
to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
most likely in the buffer) and thus, the next b-record with TS=15
seconds will be added the the still open (first) b-window and both
values 2 and 10 get added to 12.


Also keep in mind that we do some deduplication on KTable result using
an internal cache. This can also influence what output record you see.
For further details see:
http://docs.confluent.io/current/streams/developer-guide.html#memory-management



-Matthias

On 1/4/17 5:16 PM, Alexander Demidko wrote:
> Hi folks,
> 
> I'm experimenting with Kafka Streams windowed aggregation and came across
> window retention period behavior I don't fully understand.
> I'm using custom timestamp extractor which gets the timestamp from the
> payload. Values are aggregated using tumbling time windows and summed by
> the key.
> I am using kafka and kafka-streams with 0.10.1.1 version.
> 
> Full code can be found at
> https://gist.github.com/xdralex/845bcf8f06ab0cfcf9785d9f95450b88, but in
> general I'm doing the following:
> 
> val input: KStream[String, String] = builder.stream(Serdes.String(),
> Serdes.String(), "TimeInputTopic")
> val window: Windows[TimeWindow] =
> TimeWindows.of(60000).advanceBy(60000).until(30000)
> 
> val aggregated: KTable[Windowed[String], JInt] = input
>   .mapValues((v: String) => parse(v)._2)
>   .groupByKey(Serdes.String(), Serdes.Integer())
>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> "TimeStore1")
> 
> aggregated.foreach {
>   (w: Windowed[String], s: JInt) =>
>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> }
> 
> Here is the data being sent to TimeInputTopic:
> a,1970-01-01T00:00:00Z 1
> b,1970-01-01T00:00:01Z 2
> a,1970-01-01T00:00:02Z 3
> b,1970-01-01T00:05:01Z 10
> b,1970-01-01T00:00:15Z 10
> 
> Here is the output:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
> 
> Here is what confuses me.
> I would expect that once an event <b,05:01 10> is received, it should
> update some sort of "current" time value (watermark?), and, because 05:01 is
> bigger than 00:00..01:00 + 30 seconds of retention, either:
> 
> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> received, recreate this bucket. This means there would be two outputs for
> 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> Not sure about this behavior because
> http://docs.confluent.io/3.1.1/streams/developer-guide.html#windowing-a-stream
> is saying: "Kafka Streams guarantees to keep a window for *at least this
> specified time*". So I guess the window can be kept longer...
> 
> 
> B) Just drop the incoming event <b,00:15 10> altogether. In this case there
> would be only one output for 00:00..01:00:
> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> I could expect this because
> http://docs.confluent.io/3.1.1/streams/concepts.html#windowing is saying:
> "If a record arrives after the retention period has passed, the record
> cannot be processed and is dropped".
> 
> 
> Hope all this makes sense. Few questions:
> 
> – If the window can be kept in store longer, are there any thresholds when
> it will finally be purged? For example, it would be nice to flush old
> buckets if they are taking too much space.
> 
> – How is "current" time value updated / how do Kafka Streams decide that
> the retention period has passed? Does it maintain a watermark with the
> biggest time seen?
> 
> – What is the right mailing list to ask questions about Kafka Streams? I
> had a choice between this one and Confluent Platform list, and given that
> open source part of CP consists from patched vanilla Kafka, was not sure
> where to write.
> 
> Thanks,
> Alex
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to