Great, thanks for the info guys!

On Thu, Jan 5, 2017 at 10:09 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi Alex,
>
> if a window was purged because its retention time passed it will not
> accept any records anymore -- thus, if a very late record arrives, it
> will get dropped without any further notice.
>
> About stream time and partition: yes. And how time is advanced/tracked
> in independent for the window type.
>
>
> -Matthias
>
> On 1/5/17 9:14 PM, Alexander Demidko wrote:
> > Hi Matthias,
> >
> > Thanks for such a thorough response!
> >
> > I guess there are cases when a determinism might be preferred over
> > computing "more correct results" (e.g. in unit tests, where one manually
> > lays out an order of incoming events and wants to get an exact output),
> but
> > from now on I can simply assume that windows might be stored longer than
> > the specified time.
> >
> > Few more questions if you don't mind.
> >
> > - What should happen when the window 00:00..01:00 will finally get purged
> > (and the internal stream time will get bumped to say time 10:00) but
> then I
> > receive an event <b,00:15 10>? Will it create the 00:00..01:00 window
> again
> > or the event will be dropped because it's way older than the internal
> > stream time?
> >
> > - I got a bit confused when you mentioned a key in the window name "open
> > (first) >>>b<<<-window". To make it clear – I assume that because in
> Kafka
> > Streams hopping/tumbling windows are aligned, an internal stream time is
> > not related to the aggregation keys but just to the input partitions,
> > right? I.e. if I have only one partition there will be only one internal
> > stream time watermark regardless of how many keys do I have? Will this
> > behavior be the same for sliding windows? Feel free to just point me to
> the
> > code :)
> >
> > Alex
> >
> >
> >> Hi Alexander,
> >>
> >> first, both mailing list should be fine :)
> >>
> >> About internal time tracking: Kafka Streams tracks an internal "stream
> >> time" that is determined as the minimum "partition time" over all its
> >> input partitions.
> >>
> >> The "partition time" is tracked for each input partition individually
> >> and is the minimum timestamp of all currently buffered record of the
> >> partition.
> >>
> >> So depending on how many records from which partitions are fetch on
> >> poll() "stream time" gets advanced accordingly -- this is kinda
> >> non-deterministic because we cannot predict what poll() will return.
> >>
> >> Because all buffered records are considered, "stream time" is advance
> >> conservatively. The main idea about this is to keep windows open longer
> >> if we know in advance that there will be a late record (as we observe
> >> the late record in the buffer already). Thus, we can compute "more
> >> correct" results with regard to late arriving records.
> >> (Just a heads up, we might change this behavior in future releases --
> >> thus, it is not documented anywhere but in the code ;) )
> >>
> >>
> >>
> >> About retention time and purging windows: old windows should be dropped
> >> after "stream time" advances beyond the point on which it is guaranteed
> >> to maintain the window. It should happen "as soon as possible" but there
> >> is no strict guarantee (thus "kept at least").
> >>
> >> Furthermore, Streams applies a minimum retention time of 1 minute --
> >> thus, for your specific use case, the 30 seconds you do specify are not
> >> used (this is unfortunately not documented :( ). However, this in
> >> unrelated to the behavior you see -- I just mention it for completeness.
> >>
> >>
> >> Thus for you specific use case, streams time is most likely not advance
> >> to 5:01 when record <b,05:01 10> is processed (as record with TS=0:15 is
> >> most likely in the buffer) and thus, the next b-record with TS=15
> >> seconds will be added the the still open (first) b-window and both
> >> values 2 and 10 get added to 12.
> >>
> >>
> >> Also keep in mind that we do some deduplication on KTable result using
> >> an internal cache. This can also influence what output record you see.
> >> For further details see:
> >>
> > http://docs.confluent.io/current/streams/developer-
> guide.html#memory-management
> >
> >
> >> -Matthias
> >
> >
> > On Wed, Jan 4, 2017 at 5:16 PM, Alexander Demidko <
> > alexander.demi...@stitchfix.com> wrote:
> >
> >> Hi folks,
> >>
> >> I'm experimenting with Kafka Streams windowed aggregation and came
> across
> >> window retention period behavior I don't fully understand.
> >> I'm using custom timestamp extractor which gets the timestamp from the
> >> payload. Values are aggregated using tumbling time windows and summed by
> >> the key.
> >> I am using kafka and kafka-streams with 0.10.1.1 version.
> >>
> >> Full code can be found at https://gist.github.com/xdralex/
> >> 845bcf8f06ab0cfcf9785d9f95450b88, but in general I'm doing the
> following:
> >>
> >> val input: KStream[String, String] = builder.stream(Serdes.String(),
> >> Serdes.String(), "TimeInputTopic")
> >> val window: Windows[TimeWindow] = TimeWindows.of(60000).
> >> advanceBy(60000).until(30000)
> >>
> >> val aggregated: KTable[Windowed[String], JInt] = input
> >>   .mapValues((v: String) => parse(v)._2)
> >>   .groupByKey(Serdes.String(), Serdes.Integer())
> >>   .reduce((a: JInt, b: JInt) => (a + b).asInstanceOf[JInt], window,
> >> "TimeStore1")
> >>
> >> aggregated.foreach {
> >>   (w: Windowed[String], s: JInt) =>
> >>     val start = new DateTime(w.window().start(), DateTimeZone.UTC)
> >>     val end = new DateTime(w.window().end(), DateTimeZone.UTC)
> >>     println(s"Aggregated: $start..$end - ${w.key()} - $s")
> >> }
> >>
> >> Here is the data being sent to TimeInputTopic:
> >> a,1970-01-01T00:00:00Z 1
> >> b,1970-01-01T00:00:01Z 2
> >> a,1970-01-01T00:00:02Z 3
> >> b,1970-01-01T00:05:01Z 10
> >> b,1970-01-01T00:00:15Z 10
> >>
> >> Here is the output:
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - a - 4
> >> Aggregated: 1970-01-01T00:05:00.000Z..1970-01-01T00:06:00.000Z - b - 10
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 12
> >>
> >> Here is what confuses me.
> >> I would expect that once an event <b,05:01 10> is received, it should
> >> update some sort of "current" time value (watermark?), and, because
> 05:01 is
> >> bigger than 00:00..01:00 + 30 seconds of retention, either:
> >>
> >> A) Drop the bucket 00:00:00..00:01:00, and once an event <b,00:15 10> is
> >> received, recreate this bucket. This means there would be two outputs
> for
> >> 00:00..01:00:
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 10
> >> Not sure about this behavior because http://docs.confluent.
> >> io/3.1.1/streams/developer-guide.html#windowing-a-stream is saying:
> >> "Kafka Streams guarantees to keep a window for *at least this specified
> >> time*". So I guess the window can be kept longer...
> >>
> >>
> >> B) Just drop the incoming event <b,00:15 10> altogether. In this case
> >> there would be only one output for 00:00..01:00:
> >> Aggregated: 1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z - b - 2
> >> I could expect this because http://docs.confluent.
> >> io/3.1.1/streams/concepts.html#windowing is saying: "If a record
> arrives
> >> after the retention period has passed, the record cannot be processed
> and
> >> is dropped".
> >>
> >>
> >> Hope all this makes sense. Few questions:
> >>
> >> – If the window can be kept in store longer, are there any thresholds
> when
> >> it will finally be purged? For example, it would be nice to flush old
> >> buckets if they are taking too much space.
> >>
> >> – How is "current" time value updated / how do Kafka Streams decide that
> >> the retention period has passed? Does it maintain a watermark with the
> >> biggest time seen?
> >>
> >> – What is the right mailing list to ask questions about Kafka Streams? I
> >> had a choice between this one and Confluent Platform list, and given
> that
> >> open source part of CP consists from patched vanilla Kafka, was not sure
> >> where to write.
> >>
> >> Thanks,
> >> Alex
> >>
> >>
> >>
> >>
> >>
> >>
> >
>
>

Reply via email to