I agree that we should fix the "end timestamp" in windows after calling WindowedDeserializer, created https://issues.apache.org/jira/browse/KAFKA-4468 for it.
As for Jon's observed issue that some records seem aggregated into incorrect windows, we are interested in the observed behavior that was unexpected? Guozhang On Tue, Nov 29, 2016 at 11:11 AM, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > Seems straightforward enough: I have a 'foreach' after my windowed > aggregation and I see values like these come out: > > (window) start: 1480444200000 end: 1480445400000 > > (record) epoch='1480433282000' > > > If I have a 20 minute window with a 1 minute 'step' I will see my record > come out of the aggregation 20x - with different window start/end. > > On Tue, Nov 29, 2016 at 11:06 AM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > Let us know if we can help with that, what problems are you seeing with > > records in wrong windows? > > > > Eno > > > > > On 29 Nov 2016, at 19:02, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > > > > I've been having problems with records appearing in windows that they > > > clearly don't belong to. Was curious whether this was related but it > > seems > > > not. Bummer. > > > > > > On Tue, Nov 29, 2016 at 8:52 AM, Eno Thereska <eno.there...@gmail.com> > > > wrote: > > > > > >> Hi Jon, > > >> > > >> There is an optimization in org.apache.kafka.streams. > kstream.internals. > > >> WindowedSerializer/Deserializer where we don't encode and decode the > > end > > >> of the window since the user can always calculate it. So instead we > > return > > >> a default of Long.MAX_VALUE, which is the big number you see. > > >> > > >> In other words, use window().start() but not window().end() in this > > case. > > >> If you want to print both, just add the window size to > window().start(). > > >> > > >> Thanks > > >> Eno > > >>> On 29 Nov 2016, at 16:17, Jon Yeargers <jon.yearg...@cedexis.com> > > wrote: > > >>> > > >>> Using the following topology: > > >>> > > >>> KStream<String,SumRecord> kStream = > > >>> kStreamBuilder.stream(stringSerde,transSerde,TOPIC); > > >>> KTable<Windowed<String>, SumRecordCollector> ktAgg = > > >>> kStream.groupByKey().aggregate( > > >>> SumRecordCollector::new, > > >>> new Aggregate(), > > >>> TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000), > > >>> cSerde, "table_stream"); > > >>> > > >>> > > >>> When looking at windows as follows: > > >>> > > >>> ktAgg.toStream().foreach((postKey, postValue) -> { > > >>> LOGGER.debug("start: {} end: {}", > > >>> postkey.window().start(), postkey.window().end()); > > >>> } > > >>> > > >>> The 'start' values are coming through properly incremented but the > > 'end' > > >>> values are all 9223372036854775807. > > >>> > > >>> Is there something wrong with my topology? Some other bug that would > > >> cause > > >>> this? > > >> > > >> > > > > > -- -- Guozhang