Which effectively means given scenario is not working with LogAppendTime,
correct? Because all internal re-partition topics will always contain "now"
instead of real timestamp from original payload message?

Is kafka-streams designed to work with LogAppendTime at all? It seems a lot
of stuff will NOT work correctly using
built-in ExtractRecordMetadataTimestamp ?

On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> If broker configures log.message.timestamp.type=LogAppendTime universally,
> it will ignore whatever timestamp set in the message metadata and override
> it with the append time. So when the messages are fetched by downstream
> processors which always use the metadata timestamp extractor, it will get
> the append timestamp set by brokers.
>
>
> Guozhang
>
> On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > interesting, will same logic applies (internal topic rewrite) for brokers
> > configured with:
> >   log.message.timestamp.type=LogAppendTime
> >
> > ?
> >
> > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Dmitriy,
> > >
> > > What you have observed is by design, and it maybe a bit confusing at
> > first
> > > place. Let me explain:
> > >
> > > When you do a group-by aggregation like the above case, during the
> > > "groupBy((key,
> > > value) -> ......)" stage Streams library will do a re-partitioning by
> > > sending the original data stream into an internal repartition topic
> based
> > > on the aggregation key defined in the "groupBy" function and fetch from
> > > that topic again. This is similar to a shuffle phase in distributed
> > > computing frameworks to make sure the down stream aggregations can be
> > done
> > > in parallel. When the "groupBy" operator sends the messages to this
> > > repartition topic, it will set in the record metadata the extracted
> > > timestamp from the payload, and hence for the downstream aggregation
> > > operator to read from this repartition topic, it is OK to always use
> > > the ExtractRecordMetadataTimestamp
> > > to extract that timestamp and use the extracted value to determine
> which
> > > window this record should fall into.
> > >
> > > More details can be found in this JIRA:
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-4785
> > >
> > >
> > > So the record timestamp used during aggregation should be the same as
> the
> > > one in the payload, if you do observe that is not the case, this is
> > > unexpected. In that case could you share your complete code snippet,
> > > especially how input stream "in" is defined, and your config properties
> > > defined for us to investigate?
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com>
> > > wrote:
> > >
> > > > Good morning,
> > > >
> > > > we have simple use-case where we want to count number of events by
> each
> > > > hour grouped by some fields from event itself.
> > > >
> > > > Our event timestamp is embedded into messages itself (json) and we
> > using
> > > > trivial custom timestamp extractor (which called and works as
> > expected).
> > > >
> > > > What we facing is that there is always timestamp used that coming
> > > > from ExtractRecordMetadataTimestamp when determining matching windows
> > for
> > > > event, inside KStreamWindowAggregate.process() and never value from
> > our
> > > > json timestamp extractor.
> > > >
> > > > Effectively it doesn't work correctly if we test on late data, e.g.
> > > > timestamp in a message hour ago from now for instance. Topology
> always
> > > > calculating matching hour bucket (window) using record timestamp, not
> > > > payload.
> > > >
> > > > Is it expected behaviour ? Are we getting windowing wrong? Any
> settings
> > > or
> > > > other tricks to accommodate our use-case?
> > > >
> > > > For reference our setup: brokers, kafka-stream and kafka-clients all
> of
> > > > v1.0.0
> > > > And here is code:
> > > >
> > > > KTable<Windowed<Tuple>, Long> summaries = in
> > > >    .groupBy((key, value) -> ......)
> > > >    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > > >    .count();
> > > >
> > > > Thank you.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to