Hi Guozhang,

interesting, will same logic applies (internal topic rewrite) for brokers
configured with:
  log.message.timestamp.type=LogAppendTime

?

On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Dmitriy,
>
> What you have observed is by design, and it maybe a bit confusing at first
> place. Let me explain:
>
> When you do a group-by aggregation like the above case, during the
> "groupBy((key,
> value) -> ......)" stage Streams library will do a re-partitioning by
> sending the original data stream into an internal repartition topic based
> on the aggregation key defined in the "groupBy" function and fetch from
> that topic again. This is similar to a shuffle phase in distributed
> computing frameworks to make sure the down stream aggregations can be done
> in parallel. When the "groupBy" operator sends the messages to this
> repartition topic, it will set in the record metadata the extracted
> timestamp from the payload, and hence for the downstream aggregation
> operator to read from this repartition topic, it is OK to always use
> the ExtractRecordMetadataTimestamp
> to extract that timestamp and use the extracted value to determine which
> window this record should fall into.
>
> More details can be found in this JIRA:
>
> https://issues.apache.org/jira/browse/KAFKA-4785
>
>
> So the record timestamp used during aggregation should be the same as the
> one in the payload, if you do observe that is not the case, this is
> unexpected. In that case could you share your complete code snippet,
> especially how input stream "in" is defined, and your config properties
> defined for us to investigate?
>
> Guozhang
>
>
> On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com>
> wrote:
>
> > Good morning,
> >
> > we have simple use-case where we want to count number of events by each
> > hour grouped by some fields from event itself.
> >
> > Our event timestamp is embedded into messages itself (json) and we using
> > trivial custom timestamp extractor (which called and works as expected).
> >
> > What we facing is that there is always timestamp used that coming
> > from ExtractRecordMetadataTimestamp when determining matching windows for
> > event, inside KStreamWindowAggregate.process() and never value from our
> > json timestamp extractor.
> >
> > Effectively it doesn't work correctly if we test on late data, e.g.
> > timestamp in a message hour ago from now for instance. Topology always
> > calculating matching hour bucket (window) using record timestamp, not
> > payload.
> >
> > Is it expected behaviour ? Are we getting windowing wrong? Any settings
> or
> > other tricks to accommodate our use-case?
> >
> > For reference our setup: brokers, kafka-stream and kafka-clients all of
> > v1.0.0
> > And here is code:
> >
> > KTable<Windowed<Tuple>, Long> summaries = in
> >    .groupBy((key, value) -> ......)
> >    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> >    .count();
> >
> > Thank you.
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to