Thanks for creating the JIRA ticket. Streams library follows "event-time" concept by default with the metadata timestamp extractor, expecting the timestamp set in this field reflects "when the event happens in real-time":
https://kafka.apache.org/10/documentation/streams/core-concepts#streams_time Following that expectation, the timestamps Streams used for windowed aggregation results is the window start time, indicating "events happened during this window in real-time resulted in this aggregated value". Guozhang On Tue, Mar 6, 2018 at 6:39 AM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com> wrote: > Guozhang, > > here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614 > > i'd also like to continue discussion little bit further about timestamps. > Was trying to test with broker configured "CreateTime" and got question > about sink topic timestamps, back to example: > > KTable<Windowed<Tuple>, Long> summaries = in > .groupBy((key, value) -> ......) > .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > .count(); > > summaries.toStream().to(sink); > > Each record written to sink will get timestamp assigned to grouping window > start time, which quite often will be in the past. > > What the logic behind that? Honestly was expected sink messages to get > "now" timestamp. > > > On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Sounds great! :) > > > > On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com > > > wrote: > > > > > Thanks, that's an option, i'll take a look at configuration. > > > > > > But yeah, i was thinking same, if streams relies on the fact that > > internal > > > topics should use 'CreateTime' configuration, then it is streams > library > > > responsibility to configure it. > > > > > > I can open a Jira ticket :) > > > > > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Dmitriy, > > > > > > > > In your case, you can override this config to CreateTime only for the > > > > internal topics created by Streams, this is documented in > > > > > > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/ > > > > streams/StreamsConfig.html#TOPIC_PREFIX > > > > > > > > > > > > We are also discussing to always override the > > log.message.timestamp.type > > > > config for internal topics to CreateTime, I vaguely remember there > is a > > > > JIRA open for it in case you are interested in contributing to > Streams > > > > library. > > > > > > > > Guozhang > > > > > > > > > > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov < > > > > dvsekhval...@gmail.com > > > > > wrote: > > > > > > > > > Which effectively means given scenario is not working with > > > LogAppendTime, > > > > > correct? Because all internal re-partition topics will always > contain > > > > "now" > > > > > instead of real timestamp from original payload message? > > > > > > > > > > Is kafka-streams designed to work with LogAppendTime at all? It > > seems a > > > > lot > > > > > of stuff will NOT work correctly using > > > > > built-in ExtractRecordMetadataTimestamp ? > > > > > > > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > If broker configures log.message.timestamp.type=LogAppendTime > > > > > universally, > > > > > > it will ignore whatever timestamp set in the message metadata and > > > > > override > > > > > > it with the append time. So when the messages are fetched by > > > downstream > > > > > > processors which always use the metadata timestamp extractor, it > > will > > > > get > > > > > > the append timestamp set by brokers. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > > > > > > dvsekhval...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Guozhang, > > > > > > > > > > > > > > interesting, will same logic applies (internal topic rewrite) > for > > > > > brokers > > > > > > > configured with: > > > > > > > log.message.timestamp.type=LogAppendTime > > > > > > > > > > > > > > ? > > > > > > > > > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hello Dmitriy, > > > > > > > > > > > > > > > > What you have observed is by design, and it maybe a bit > > confusing > > > > at > > > > > > > first > > > > > > > > place. Let me explain: > > > > > > > > > > > > > > > > When you do a group-by aggregation like the above case, > during > > > the > > > > > > > > "groupBy((key, > > > > > > > > value) -> ......)" stage Streams library will do a > > > re-partitioning > > > > by > > > > > > > > sending the original data stream into an internal repartition > > > topic > > > > > > based > > > > > > > > on the aggregation key defined in the "groupBy" function and > > > fetch > > > > > from > > > > > > > > that topic again. This is similar to a shuffle phase in > > > distributed > > > > > > > > computing frameworks to make sure the down stream > aggregations > > > can > > > > be > > > > > > > done > > > > > > > > in parallel. When the "groupBy" operator sends the messages > to > > > this > > > > > > > > repartition topic, it will set in the record metadata the > > > extracted > > > > > > > > timestamp from the payload, and hence for the downstream > > > > aggregation > > > > > > > > operator to read from this repartition topic, it is OK to > > always > > > > use > > > > > > > > the ExtractRecordMetadataTimestamp > > > > > > > > to extract that timestamp and use the extracted value to > > > determine > > > > > > which > > > > > > > > window this record should fall into. > > > > > > > > > > > > > > > > More details can be found in this JIRA: > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > > > > > > > > > > > > > > > > So the record timestamp used during aggregation should be the > > > same > > > > as > > > > > > the > > > > > > > > one in the payload, if you do observe that is not the case, > > this > > > is > > > > > > > > unexpected. In that case could you share your complete code > > > > snippet, > > > > > > > > especially how input stream "in" is defined, and your config > > > > > properties > > > > > > > > defined for us to investigate? > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > > > > > > > dvsekhval...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Good morning, > > > > > > > > > > > > > > > > > > we have simple use-case where we want to count number of > > events > > > > by > > > > > > each > > > > > > > > > hour grouped by some fields from event itself. > > > > > > > > > > > > > > > > > > Our event timestamp is embedded into messages itself (json) > > and > > > > we > > > > > > > using > > > > > > > > > trivial custom timestamp extractor (which called and works > as > > > > > > > expected). > > > > > > > > > > > > > > > > > > What we facing is that there is always timestamp used that > > > coming > > > > > > > > > from ExtractRecordMetadataTimestamp when determining > matching > > > > > windows > > > > > > > for > > > > > > > > > event, inside KStreamWindowAggregate.process() and never > > value > > > > > from > > > > > > > our > > > > > > > > > json timestamp extractor. > > > > > > > > > > > > > > > > > > Effectively it doesn't work correctly if we test on late > > data, > > > > e.g. > > > > > > > > > timestamp in a message hour ago from now for instance. > > Topology > > > > > > always > > > > > > > > > calculating matching hour bucket (window) using record > > > timestamp, > > > > > not > > > > > > > > > payload. > > > > > > > > > > > > > > > > > > Is it expected behaviour ? Are we getting windowing wrong? > > Any > > > > > > settings > > > > > > > > or > > > > > > > > > other tricks to accommodate our use-case? > > > > > > > > > > > > > > > > > > For reference our setup: brokers, kafka-stream and > > > kafka-clients > > > > > all > > > > > > of > > > > > > > > > v1.0.0 > > > > > > > > > And here is code: > > > > > > > > > > > > > > > > > > KTable<Windowed<Tuple>, Long> summaries = in > > > > > > > > > .groupBy((key, value) -> ......) > > > > > > > > > .windowedBy(TimeWindows.of( > TimeUnit.HOURS.toMillis(1l))) > > > > > > > > > .count(); > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang