If broker configures log.message.timestamp.type=LogAppendTime universally, it will ignore whatever timestamp set in the message metadata and override it with the append time. So when the messages are fetched by downstream processors which always use the metadata timestamp extractor, it will get the append timestamp set by brokers.
Guozhang On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com> wrote: > Hi Guozhang, > > interesting, will same logic applies (internal topic rewrite) for brokers > configured with: > log.message.timestamp.type=LogAppendTime > > ? > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Dmitriy, > > > > What you have observed is by design, and it maybe a bit confusing at > first > > place. Let me explain: > > > > When you do a group-by aggregation like the above case, during the > > "groupBy((key, > > value) -> ......)" stage Streams library will do a re-partitioning by > > sending the original data stream into an internal repartition topic based > > on the aggregation key defined in the "groupBy" function and fetch from > > that topic again. This is similar to a shuffle phase in distributed > > computing frameworks to make sure the down stream aggregations can be > done > > in parallel. When the "groupBy" operator sends the messages to this > > repartition topic, it will set in the record metadata the extracted > > timestamp from the payload, and hence for the downstream aggregation > > operator to read from this repartition topic, it is OK to always use > > the ExtractRecordMetadataTimestamp > > to extract that timestamp and use the extracted value to determine which > > window this record should fall into. > > > > More details can be found in this JIRA: > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > So the record timestamp used during aggregation should be the same as the > > one in the payload, if you do observe that is not the case, this is > > unexpected. In that case could you share your complete code snippet, > > especially how input stream "in" is defined, and your config properties > > defined for us to investigate? > > > > Guozhang > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com> > > wrote: > > > > > Good morning, > > > > > > we have simple use-case where we want to count number of events by each > > > hour grouped by some fields from event itself. > > > > > > Our event timestamp is embedded into messages itself (json) and we > using > > > trivial custom timestamp extractor (which called and works as > expected). > > > > > > What we facing is that there is always timestamp used that coming > > > from ExtractRecordMetadataTimestamp when determining matching windows > for > > > event, inside KStreamWindowAggregate.process() and never value from > our > > > json timestamp extractor. > > > > > > Effectively it doesn't work correctly if we test on late data, e.g. > > > timestamp in a message hour ago from now for instance. Topology always > > > calculating matching hour bucket (window) using record timestamp, not > > > payload. > > > > > > Is it expected behaviour ? Are we getting windowing wrong? Any settings > > or > > > other tricks to accommodate our use-case? > > > > > > For reference our setup: brokers, kafka-stream and kafka-clients all of > > > v1.0.0 > > > And here is code: > > > > > > KTable<Windowed<Tuple>, Long> summaries = in > > > .groupBy((key, value) -> ......) > > > .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > > > .count(); > > > > > > Thank you. > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang