Hi Alex, if a window was purged because its retention time passed it will not accept any records anymore -- thus, if a very late record arrives, it will get dropped without any further notice.
About stream time and partition: yes. And how time is advanced/tracked in independent for the window type. -Matthias On 1/5/17 9:14 PM, Alexander Demidko wrote: > Hi Matthias, > > Thanks for such a thorough response! > > I guess there are cases when a determinism might be preferred over > computing "more correct results" (e.g. in unit tests, where one manually > lays out an order of incoming events and wants to get an exact output), but > from now on I can simply assume that windows might be stored longer than > the specified time. > > Few more questions if you don't mind. > > - What should happen when the window 00:00..01:00 will finally get purged > (and the internal stream time will get bumped to say time 10:00) but then I > receive an event <b,00:15 10>? Will it create the 00:00..01:00 window again > or the event will be dropped because it's way older than the internal > stream time? > > - I got a bit confused when you mentioned a key in the window name "open > (first) >>>b<<<-window". To make it clear – I assume that because in Kafka > Streams hopping/tumbling windows are aligned, an internal stream time is > not related to the aggregation keys but just to the input partitions, > right? I.e. if I have only one partition there will be only one internal > stream time watermark regardless of how many keys do I have? Will this > behavior be the same for sliding windows? Feel free to just point me to the > code :) > > Alex > > >> Hi Alexander, >> >> first, both mailing list should be fine :) >> >> About internal time tracking: Kafka Streams tracks an internal "stream >> time" that is determined as the minimum "partition time" over all its >> input partitions. >> >> The "partition time" is tracked for each input partition individually >> and is the minimum timestamp of all currently buffered record of the >> partition. >> >> So depending on how many records from which partitions are fetch on >> poll() "stream time" gets advanced accordingly -- this is kinda >> non-deterministic because we cannot predict what poll() will return. >> >> Because all buffered records are considered, "stream time" is advance >> conservatively. The main idea about this is to keep windows open longer >> if we know in advance that there will be a late record (as we observe >> the late record in the buffer already). Thus, we can compute "more >> correct" results with regard to late arriving records. >> (Just a heads up, we might change this behavior in future releases -- >> thus, it is not documented anywhere but in the code ;) ) >> >> >> >> About retention time and purging windows: old windows should be dropped >> after "stream time" advances beyond the point on which it is guaranteed >> to maintain the window. It should happen "as soon as possible" but there >> is no strict guarantee (thus "kept at least"). >> >> Furthermore, Streams applies a minimum retention time of 1 minute -- >> thus, for your specific use case, the 30 seconds you do specify are not >> used (this is unfortunately not documented :( ). However, this in >> unrelated to the behavior you see -- I just mention it for completeness. >> >> >> Thus for you specific use case, streams time is most likely not advance >> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is >> most likely in the buffer) and thus, the next b-record with TS=15 >> seconds will be added the the still open (first) b-window and both >> values 2 and 10 get added to 12. >> >> >> Also keep in mind that we do some deduplication on KTable result using >> an internal cache. This can also influence what output record you see. >> For further details see: >> > http://docs.confluent.io/current/streams/developer-guide.html#memory-management > > >> -Matthias > > > On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko < > alexander.demi...@stitchfix.com> wrote: > >> Hi folks, >> >> I'm experimenting with Kafka Streams windowed aggregation and came across >> window retention period behavior I don't fully understand. >> I'm using custom timestamp extractor which gets the timestamp from the >> payload. Values are aggregated using tumbling time windows and summed by >> the key. >> I am using kafka and kafka-streams with 0.10.1.1 version. >> >> Full code can be found at https://gist.github.com/xdralex/ >> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the following: >> >> val input: KStream[String, String] = builder.stream(Serdes.String(), >> Serdes.String(), "TimeInputTopic") >> val window: Windows[TimeWindow] = TimeWindows.of(60000). >> advanceBy(60000).until(30000) >> >> val aggregated: KTable[Windowed[String], JInt] = input >> .mapValues((v: String) => parse(v)._2) >> .groupByKey(Serdes.String(), Serdes.Integer()) >> .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window, >> "TimeStore1") >> >> aggregated.foreach { >> (w: Windowed[String], s: JInt) => >> val start = new DateTime(w.window().start(), DateTimeZone.UTC) >> val end = new DateTime(w.window().end(), DateTimeZone.UTC) >> println(s"Aggregated: $start..$end - ${w.key()} - $s") >> } >> >> Here is the data being sent to TimeInputTopic: >> a,1970-01-01T00:00:00Z 1 >> b,1970-01-01T00:00:01Z 2 >> a,1970-01-01T00:00:02Z 3 >> b,1970-01-01T00:05:01Z 10 >> b,1970-01-01T00:00:15Z 10 >> >> Here is the output: >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4 >> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10 >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12 >> >> Here is what confuses me. >> I would expect that once an event <b,05:01 10> is received, it should >> update some sort of "current" time value (watermark?), and, because 05:01 is >> bigger than 00:00..01:00 + 30 seconds of retention, either: >> >> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is >> received, recreate this bucket. This means there would be two outputs for >> 00:00..01:00: >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2 >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10 >> Not sure about this behavior because http://docs.confluent. >> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying: >> "Kafka Streams guarantees to keep a window for *at least this specified >> time*". So I guess the window can be kept longer... >> >> >> B) Just drop the incoming event <b,00:15 10> altogether. In this case >> there would be only one output for 00:00..01:00: >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2 >> I could expect this because http://docs.confluent. >> io/3.1.1/streams/concepts.html#windowing is saying: "If a record arrives >> after the retention period has passed, the record cannot be processed and >> is dropped". >> >> >> Hope all this makes sense. Few questions: >> >> – If the window can be kept in store longer, are there any thresholds when >> it will finally be purged? For example, it would be nice to flush old >> buckets if they are taking too much space. >> >> – How is "current" time value updated / how do Kafka Streams decide that >> the retention period has passed? Does it maintain a watermark with the >> biggest time seen? >> >> – What is the right mailing list to ask questions about Kafka Streams? I >> had a choice between this one and Confluent Platform list, and given that >> open source part of CP consists from patched vanilla Kafka, was not sure >> where to write. >> >> Thanks, >> Alex >> >> >> >> >> >> >
signature.asc
Description: OpenPGP digital signature