Which effectively means given scenario is not working with LogAppendTime, correct? Because all internal re-partition topics will always contain "now" instead of real timestamp from original payload message?
Is kafka-streams designed to work with LogAppendTime at all? It seems a lot of stuff will NOT work correctly using built-in ExtractRecordMetadataTimestamp ? On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> wrote: > If broker configures log.message.timestamp.type=LogAppendTime universally, > it will ignore whatever timestamp set in the message metadata and override > it with the append time. So when the messages are fetched by downstream > processors which always use the metadata timestamp extractor, it will get > the append timestamp set by brokers. > > > Guozhang > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > dvsekhval...@gmail.com> > wrote: > > > Hi Guozhang, > > > > interesting, will same logic applies (internal topic rewrite) for brokers > > configured with: > > log.message.timestamp.type=LogAppendTime > > > > ? > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Dmitriy, > > > > > > What you have observed is by design, and it maybe a bit confusing at > > first > > > place. Let me explain: > > > > > > When you do a group-by aggregation like the above case, during the > > > "groupBy((key, > > > value) -> ......)" stage Streams library will do a re-partitioning by > > > sending the original data stream into an internal repartition topic > based > > > on the aggregation key defined in the "groupBy" function and fetch from > > > that topic again. This is similar to a shuffle phase in distributed > > > computing frameworks to make sure the down stream aggregations can be > > done > > > in parallel. When the "groupBy" operator sends the messages to this > > > repartition topic, it will set in the record metadata the extracted > > > timestamp from the payload, and hence for the downstream aggregation > > > operator to read from this repartition topic, it is OK to always use > > > the ExtractRecordMetadataTimestamp > > > to extract that timestamp and use the extracted value to determine > which > > > window this record should fall into. > > > > > > More details can be found in this JIRA: > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > So the record timestamp used during aggregation should be the same as > the > > > one in the payload, if you do observe that is not the case, this is > > > unexpected. In that case could you share your complete code snippet, > > > especially how input stream "in" is defined, and your config properties > > > defined for us to investigate? > > > > > > Guozhang > > > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > > dvsekhval...@gmail.com> > > > wrote: > > > > > > > Good morning, > > > > > > > > we have simple use-case where we want to count number of events by > each > > > > hour grouped by some fields from event itself. > > > > > > > > Our event timestamp is embedded into messages itself (json) and we > > using > > > > trivial custom timestamp extractor (which called and works as > > expected). > > > > > > > > What we facing is that there is always timestamp used that coming > > > > from ExtractRecordMetadataTimestamp when determining matching windows > > for > > > > event, inside KStreamWindowAggregate.process() and never value from > > our > > > > json timestamp extractor. > > > > > > > > Effectively it doesn't work correctly if we test on late data, e.g. > > > > timestamp in a message hour ago from now for instance. Topology > always > > > > calculating matching hour bucket (window) using record timestamp, not > > > > payload. > > > > > > > > Is it expected behaviour ? Are we getting windowing wrong? Any > settings > > > or > > > > other tricks to accommodate our use-case? > > > > > > > > For reference our setup: brokers, kafka-stream and kafka-clients all > of > > > > v1.0.0 > > > > And here is code: > > > > > > > > KTable<Windowed<Tuple>, Long> summaries = in > > > > .groupBy((key, value) -> ......) > > > > .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > > > > .count(); > > > > > > > > Thank you. > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >