Great, thanks for the info guys! On Thu, Jan 5, 2017 at 10:09 PM, Matthias J. Sax <matth...@confluent.io> wrote:
> Hi Alex, > > if a window was purged because its retention time passed it will not > accept any records anymore -- thus, if a very late record arrives, it > will get dropped without any further notice. > > About stream time and partition: yes. And how time is advanced/tracked > in independent for the window type. > > > -Matthias > > On 1/5/17 9:14 PM, Alexander Demidko wrote: > > Hi Matthias, > > > > Thanks for such a thorough response! > > > > I guess there are cases when a determinism might be preferred over > > computing "more correct results" (e.g. in unit tests, where one manually > > lays out an order of incoming events and wants to get an exact output), > but > > from now on I can simply assume that windows might be stored longer than > > the specified time. > > > > Few more questions if you don't mind. > > > > - What should happen when the window 00:00..01:00 will finally get purged > > (and the internal stream time will get bumped to say time 10:00) but > then I > > receive an event <b,00:15 10>? Will it create the 00:00..01:00 window > again > > or the event will be dropped because it's way older than the internal > > stream time? > > > > - I got a bit confused when you mentioned a key in the window name "open > > (first) >>>b<<<-window". To make it clear – I assume that because in > Kafka > > Streams hopping/tumbling windows are aligned, an internal stream time is > > not related to the aggregation keys but just to the input partitions, > > right? I.e. if I have only one partition there will be only one internal > > stream time watermark regardless of how many keys do I have? Will this > > behavior be the same for sliding windows? Feel free to just point me to > the > > code :) > > > > Alex > > > > > >> Hi Alexander, > >> > >> first, both mailing list should be fine :) > >> > >> About internal time tracking: Kafka Streams tracks an internal "stream > >> time" that is determined as the minimum "partition time" over all its > >> input partitions. > >> > >> The "partition time" is tracked for each input partition individually > >> and is the minimum timestamp of all currently buffered record of the > >> partition. > >> > >> So depending on how many records from which partitions are fetch on > >> poll() "stream time" gets advanced accordingly -- this is kinda > >> non-deterministic because we cannot predict what poll() will return. > >> > >> Because all buffered records are considered, "stream time" is advance > >> conservatively. The main idea about this is to keep windows open longer > >> if we know in advance that there will be a late record (as we observe > >> the late record in the buffer already). Thus, we can compute "more > >> correct" results with regard to late arriving records. > >> (Just a heads up, we might change this behavior in future releases -- > >> thus, it is not documented anywhere but in the code ;) ) > >> > >> > >> > >> About retention time and purging windows: old windows should be dropped > >> after "stream time" advances beyond the point on which it is guaranteed > >> to maintain the window. It should happen "as soon as possible" but there > >> is no strict guarantee (thus "kept at least"). > >> > >> Furthermore, Streams applies a minimum retention time of 1 minute -- > >> thus, for your specific use case, the 30 seconds you do specify are not > >> used (this is unfortunately not documented :( ). However, this in > >> unrelated to the behavior you see -- I just mention it for completeness. > >> > >> > >> Thus for you specific use case, streams time is most likely not advance > >> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is > >> most likely in the buffer) and thus, the next b-record with TS=15 > >> seconds will be added the the still open (first) b-window and both > >> values 2 and 10 get added to 12. > >> > >> > >> Also keep in mind that we do some deduplication on KTable result using > >> an internal cache. This can also influence what output record you see. > >> For further details see: > >> > > http://docs.confluent.io/current/streams/developer- > guide.html#memory-management > > > > > >> -Matthias > > > > > > On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko < > > alexander.demi...@stitchfix.com> wrote: > > > >> Hi folks, > >> > >> I'm experimenting with Kafka Streams windowed aggregation and came > across > >> window retention period behavior I don't fully understand. > >> I'm using custom timestamp extractor which gets the timestamp from the > >> payload. Values are aggregated using tumbling time windows and summed by > >> the key. > >> I am using kafka and kafka-streams with 0.10.1.1 version. > >> > >> Full code can be found at https://gist.github.com/xdralex/ > >> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the > following: > >> > >> val input: KStream[String, String] = builder.stream(Serdes.String(), > >> Serdes.String(), "TimeInputTopic") > >> val window: Windows[TimeWindow] = TimeWindows.of(60000). > >> advanceBy(60000).until(30000) > >> > >> val aggregated: KTable[Windowed[String], JInt] = input > >> .mapValues((v: String) => parse(v)._2) > >> .groupByKey(Serdes.String(), Serdes.Integer()) > >> .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window, > >> "TimeStore1") > >> > >> aggregated.foreach { > >> (w: Windowed[String], s: JInt) => > >> val start = new DateTime(w.window().start(), DateTimeZone.UTC) > >> val end = new DateTime(w.window().end(), DateTimeZone.UTC) > >> println(s"Aggregated: $start..$end - ${w.key()} - $s") > >> } > >> > >> Here is the data being sent to TimeInputTopic: > >> a,1970-01-01T00:00:00Z 1 > >> b,1970-01-01T00:00:01Z 2 > >> a,1970-01-01T00:00:02Z 3 > >> b,1970-01-01T00:05:01Z 10 > >> b,1970-01-01T00:00:15Z 10 > >> > >> Here is the output: > >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4 > >> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10 > >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12 > >> > >> Here is what confuses me. > >> I would expect that once an event <b,05:01 10> is received, it should > >> update some sort of "current" time value (watermark?), and, because > 05:01 is > >> bigger than 00:00..01:00 + 30 seconds of retention, either: > >> > >> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is > >> received, recreate this bucket. This means there would be two outputs > for > >> 00:00..01:00: > >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2 > >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10 > >> Not sure about this behavior because http://docs.confluent. > >> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying: > >> "Kafka Streams guarantees to keep a window for *at least this specified > >> time*". So I guess the window can be kept longer... > >> > >> > >> B) Just drop the incoming event <b,00:15 10> altogether. In this case > >> there would be only one output for 00:00..01:00: > >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2 > >> I could expect this because http://docs.confluent. > >> io/3.1.1/streams/concepts.html#windowing is saying: "If a record > arrives > >> after the retention period has passed, the record cannot be processed > and > >> is dropped". > >> > >> > >> Hope all this makes sense. Few questions: > >> > >> – If the window can be kept in store longer, are there any thresholds > when > >> it will finally be purged? For example, it would be nice to flush old > >> buckets if they are taking too much space. > >> > >> – How is "current" time value updated / how do Kafka Streams decide that > >> the retention period has passed? Does it maintain a watermark with the > >> biggest time seen? > >> > >> – What is the right mailing list to ask questions about Kafka Streams? I > >> had a choice between this one and Confluent Platform list, and given > that > >> open source part of CP consists from patched vanilla Kafka, was not sure > >> where to write. > >> > >> Thanks, > >> Alex > >> > >> > >> > >> > >> > >> > > > >