Sounds great! :) On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com > wrote:
> Thanks, that's an option, i'll take a look at configuration. > > But yeah, i was thinking same, if streams relies on the fact that internal > topics should use 'CreateTime' configuration, then it is streams library > responsibility to configure it. > > I can open a Jira ticket :) > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Dmitriy, > > > > In your case, you can override this config to CreateTime only for the > > internal topics created by Streams, this is documented in > > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/ > > streams/StreamsConfig.html#TOPIC_PREFIX > > > > > > We are also discussing to always override the log.message.timestamp.type > > config for internal topics to CreateTime, I vaguely remember there is a > > JIRA open for it in case you are interested in contributing to Streams > > library. > > > > Guozhang > > > > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov < > > dvsekhval...@gmail.com > > > wrote: > > > > > Which effectively means given scenario is not working with > LogAppendTime, > > > correct? Because all internal re-partition topics will always contain > > "now" > > > instead of real timestamp from original payload message? > > > > > > Is kafka-streams designed to work with LogAppendTime at all? It seems a > > lot > > > of stuff will NOT work correctly using > > > built-in ExtractRecordMetadataTimestamp ? > > > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > If broker configures log.message.timestamp.type=LogAppendTime > > > universally, > > > > it will ignore whatever timestamp set in the message metadata and > > > override > > > > it with the append time. So when the messages are fetched by > downstream > > > > processors which always use the metadata timestamp extractor, it will > > get > > > > the append timestamp set by brokers. > > > > > > > > > > > > Guozhang > > > > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov < > > > > dvsekhval...@gmail.com> > > > > wrote: > > > > > > > > > Hi Guozhang, > > > > > > > > > > interesting, will same logic applies (internal topic rewrite) for > > > brokers > > > > > configured with: > > > > > log.message.timestamp.type=LogAppendTime > > > > > > > > > > ? > > > > > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangg...@gmail.com> > > > > wrote: > > > > > > > > > > > Hello Dmitriy, > > > > > > > > > > > > What you have observed is by design, and it maybe a bit confusing > > at > > > > > first > > > > > > place. Let me explain: > > > > > > > > > > > > When you do a group-by aggregation like the above case, during > the > > > > > > "groupBy((key, > > > > > > value) -> ......)" stage Streams library will do a > re-partitioning > > by > > > > > > sending the original data stream into an internal repartition > topic > > > > based > > > > > > on the aggregation key defined in the "groupBy" function and > fetch > > > from > > > > > > that topic again. This is similar to a shuffle phase in > distributed > > > > > > computing frameworks to make sure the down stream aggregations > can > > be > > > > > done > > > > > > in parallel. When the "groupBy" operator sends the messages to > this > > > > > > repartition topic, it will set in the record metadata the > extracted > > > > > > timestamp from the payload, and hence for the downstream > > aggregation > > > > > > operator to read from this repartition topic, it is OK to always > > use > > > > > > the ExtractRecordMetadataTimestamp > > > > > > to extract that timestamp and use the extracted value to > determine > > > > which > > > > > > window this record should fall into. > > > > > > > > > > > > More details can be found in this JIRA: > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785 > > > > > > > > > > > > > > > > > > So the record timestamp used during aggregation should be the > same > > as > > > > the > > > > > > one in the payload, if you do observe that is not the case, this > is > > > > > > unexpected. In that case could you share your complete code > > snippet, > > > > > > especially how input stream "in" is defined, and your config > > > properties > > > > > > defined for us to investigate? > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov < > > > > > > dvsekhval...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Good morning, > > > > > > > > > > > > > > we have simple use-case where we want to count number of events > > by > > > > each > > > > > > > hour grouped by some fields from event itself. > > > > > > > > > > > > > > Our event timestamp is embedded into messages itself (json) and > > we > > > > > using > > > > > > > trivial custom timestamp extractor (which called and works as > > > > > expected). > > > > > > > > > > > > > > What we facing is that there is always timestamp used that > coming > > > > > > > from ExtractRecordMetadataTimestamp when determining matching > > > windows > > > > > for > > > > > > > event, inside KStreamWindowAggregate.process() and never value > > > from > > > > > our > > > > > > > json timestamp extractor. > > > > > > > > > > > > > > Effectively it doesn't work correctly if we test on late data, > > e.g. > > > > > > > timestamp in a message hour ago from now for instance. Topology > > > > always > > > > > > > calculating matching hour bucket (window) using record > timestamp, > > > not > > > > > > > payload. > > > > > > > > > > > > > > Is it expected behaviour ? Are we getting windowing wrong? Any > > > > settings > > > > > > or > > > > > > > other tricks to accommodate our use-case? > > > > > > > > > > > > > > For reference our setup: brokers, kafka-stream and > kafka-clients > > > all > > > > of > > > > > > > v1.0.0 > > > > > > > And here is code: > > > > > > > > > > > > > > KTable<Windowed<Tuple>, Long> summaries = in > > > > > > > .groupBy((key, value) -> ......) > > > > > > > .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) > > > > > > > .count(); > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang