Hello Dmitriy, What you have observed is by design, and it maybe a bit confusing at first place. Let me explain:
When you do a group-by aggregation like the above case, during the "groupBy((key, value) -> ......)" stage Streams library will do a re-partitioning by sending the original data stream into an internal repartition topic based on the aggregation key defined in the "groupBy" function and fetch from that topic again. This is similar to a shuffle phase in distributed computing frameworks to make sure the down stream aggregations can be done in parallel. When the "groupBy" operator sends the messages to this repartition topic, it will set in the record metadata the extracted timestamp from the payload, and hence for the downstream aggregation operator to read from this repartition topic, it is OK to always use the ExtractRecordMetadataTimestamp to extract that timestamp and use the extracted value to determine which window this record should fall into. More details can be found in this JIRA: https://issues.apache.org/jira/browse/KAFKA-4785 So the record timestamp used during aggregation should be the same as the one in the payload, if you do observe that is not the case, this is unexpected. In that case could you share your complete code snippet, especially how input stream "in" is defined, and your config properties defined for us to investigate? Guozhang On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com> wrote: > Good morning, > > we have simple use-case where we want to count number of events by each > hour grouped by some fields from event itself. > > Our event timestamp is embedded into messages itself (json) and we using > trivial custom timestamp extractor (which called and works as expected). > > What we facing is that there is always timestamp used that coming > from ExtractRecordMetadataTimestamp when determining matching windows for > event, inside KStreamWindowAggregate.process() and never value from our > json timestamp extractor. > > Effectively it doesn't work correctly if we test on late data, e.g. > timestamp in a message hour ago from now for instance. Topology always > calculating matching hour bucket (window) using record timestamp, not > payload. > > Is it expected behaviour ? Are we getting windowing wrong? Any settings or > other tricks to accommodate our use-case? > > For reference our setup: brokers, kafka-stream and kafka-clients all of > v1.0.0 > And here is code: > > KTable<Windowed<Tuple>, Long> summaries = in > .groupBy((key, value) -> ......) > .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > .count(); > > Thank you. > -- -- Guozhang