Hi Guozhang, interesting, will same logic applies (internal topic rewrite) for brokers configured with: log.message.timestamp.type=LogAppendTime
? On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Dmitriy, > > What you have observed is by design, and it maybe a bit confusing at first > place. Let me explain: > > When you do a group-by aggregation like the above case, during the > "groupBy((key, > value) -> ......)" stage Streams library will do a re-partitioning by > sending the original data stream into an internal repartition topic based > on the aggregation key defined in the "groupBy" function and fetch from > that topic again. This is similar to a shuffle phase in distributed > computing frameworks to make sure the down stream aggregations can be done > in parallel. When the "groupBy" operator sends the messages to this > repartition topic, it will set in the record metadata the extracted > timestamp from the payload, and hence for the downstream aggregation > operator to read from this repartition topic, it is OK to always use > the ExtractRecordMetadataTimestamp > to extract that timestamp and use the extracted value to determine which > window this record should fall into. > > More details can be found in this JIRA: > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > So the record timestamp used during aggregation should be the same as the > one in the payload, if you do observe that is not the case, this is > unexpected. In that case could you share your complete code snippet, > especially how input stream "in" is defined, and your config properties > defined for us to investigate? > > Guozhang > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com> > wrote: > > > Good morning, > > > > we have simple use-case where we want to count number of events by each > > hour grouped by some fields from event itself. > > > > Our event timestamp is embedded into messages itself (json) and we using > > trivial custom timestamp extractor (which called and works as expected). > > > > What we facing is that there is always timestamp used that coming > > from ExtractRecordMetadataTimestamp when determining matching windows for > > event, inside KStreamWindowAggregate.process() and never value from our > > json timestamp extractor. > > > > Effectively it doesn't work correctly if we test on late data, e.g. > > timestamp in a message hour ago from now for instance. Topology always > > calculating matching hour bucket (window) using record timestamp, not > > payload. > > > > Is it expected behaviour ? Are we getting windowing wrong? Any settings > or > > other tricks to accommodate our use-case? > > > > For reference our setup: brokers, kafka-stream and kafka-clients all of > > v1.0.0 > > And here is code: > > > > KTable<Windowed<Tuple>, Long> summaries = in > > .groupBy((key, value) -> ......) > > .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > > .count(); > > > > Thank you. > > > > > > -- > -- Guozhang >