Thanks for creating the JIRA ticket.

Streams library follows "event-time" concept by default with the metadata
timestamp extractor, expecting the timestamp set in this field reflects
"when the event happens in real-time":

https://kafka.apache.org/10/documentation/streams/core-concepts#streams_time

Following that expectation, the timestamps Streams used for windowed
aggregation results is the window start time, indicating "events
happened during
this window in real-time resulted in this aggregated value".


Guozhang


On Tue, Mar 6, 2018 at 6:39 AM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com>
wrote:

> Guozhang,
>
> here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614
>
> i'd also like to continue discussion little bit further about timestamps.
> Was trying to test with broker configured "CreateTime" and got question
> about sink topic timestamps, back to example:
>
> KTable<Windowed<Tuple>, Long> summaries = in
>    .groupBy((key, value) -> ......)
>    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>    .count();
>
> summaries.toStream().to(sink);
>
> Each record written to sink will get timestamp assigned to grouping window
> start time, which quite often will be in the past.
>
> What the logic behind that? Honestly was expected sink messages to get
> "now" timestamp.
>
>
> On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Sounds great! :)
> >
> > On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com
> > > wrote:
> >
> > > Thanks, that's an option, i'll take a look at configuration.
> > >
> > > But yeah, i was thinking same, if streams relies on the fact that
> > internal
> > > topics should use 'CreateTime' configuration, then it is streams
> library
> > > responsibility to configure it.
> > >
> > > I can open a Jira ticket :)
> > >
> > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Dmitriy,
> > > >
> > > > In your case, you can override this config to CreateTime only for the
> > > > internal topics created by Streams, this is documented in
> > > >
> > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > > > streams/StreamsConfig.html#TOPIC_PREFIX
> > > >
> > > >
> > > > We are also discussing to always override the
> > log.message.timestamp.type
> > > > config for internal topics to CreateTime, I vaguely remember there
> is a
> > > > JIRA open for it in case you are interested in contributing to
> Streams
> > > > library.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com
> > > > > wrote:
> > > >
> > > > > Which effectively means given scenario is not working with
> > > LogAppendTime,
> > > > > correct? Because all internal re-partition topics will always
> contain
> > > > "now"
> > > > > instead of real timestamp from original payload message?
> > > > >
> > > > > Is kafka-streams designed to work with LogAppendTime at all? It
> > seems a
> > > > lot
> > > > > of stuff will NOT work correctly using
> > > > > built-in ExtractRecordMetadataTimestamp ?
> > > > >
> > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > > > universally,
> > > > > > it will ignore whatever timestamp set in the message metadata and
> > > > > override
> > > > > > it with the append time. So when the messages are fetched by
> > > downstream
> > > > > > processors which always use the metadata timestamp extractor, it
> > will
> > > > get
> > > > > > the append timestamp set by brokers.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > > > dvsekhval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Guozhang,
> > > > > > >
> > > > > > > interesting, will same logic applies (internal topic rewrite)
> for
> > > > > brokers
> > > > > > > configured with:
> > > > > > >   log.message.timestamp.type=LogAppendTime
> > > > > > >
> > > > > > > ?
> > > > > > >
> > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Dmitriy,
> > > > > > > >
> > > > > > > > What you have observed is by design, and it maybe a bit
> > confusing
> > > > at
> > > > > > > first
> > > > > > > > place. Let me explain:
> > > > > > > >
> > > > > > > > When you do a group-by aggregation like the above case,
> during
> > > the
> > > > > > > > "groupBy((key,
> > > > > > > > value) -> ......)" stage Streams library will do a
> > > re-partitioning
> > > > by
> > > > > > > > sending the original data stream into an internal repartition
> > > topic
> > > > > > based
> > > > > > > > on the aggregation key defined in the "groupBy" function and
> > > fetch
> > > > > from
> > > > > > > > that topic again. This is similar to a shuffle phase in
> > > distributed
> > > > > > > > computing frameworks to make sure the down stream
> aggregations
> > > can
> > > > be
> > > > > > > done
> > > > > > > > in parallel. When the "groupBy" operator sends the messages
> to
> > > this
> > > > > > > > repartition topic, it will set in the record metadata the
> > > extracted
> > > > > > > > timestamp from the payload, and hence for the downstream
> > > > aggregation
> > > > > > > > operator to read from this repartition topic, it is OK to
> > always
> > > > use
> > > > > > > > the ExtractRecordMetadataTimestamp
> > > > > > > > to extract that timestamp and use the extracted value to
> > > determine
> > > > > > which
> > > > > > > > window this record should fall into.
> > > > > > > >
> > > > > > > > More details can be found in this JIRA:
> > > > > > > >
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > > > > >
> > > > > > > >
> > > > > > > > So the record timestamp used during aggregation should be the
> > > same
> > > > as
> > > > > > the
> > > > > > > > one in the payload, if you do observe that is not the case,
> > this
> > > is
> > > > > > > > unexpected. In that case could you share your complete code
> > > > snippet,
> > > > > > > > especially how input stream "in" is defined, and your config
> > > > > properties
> > > > > > > > defined for us to investigate?
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > > > > > dvsekhval...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Good morning,
> > > > > > > > >
> > > > > > > > > we have simple use-case where we want to count number of
> > events
> > > > by
> > > > > > each
> > > > > > > > > hour grouped by some fields from event itself.
> > > > > > > > >
> > > > > > > > > Our event timestamp is embedded into messages itself (json)
> > and
> > > > we
> > > > > > > using
> > > > > > > > > trivial custom timestamp extractor (which called and works
> as
> > > > > > > expected).
> > > > > > > > >
> > > > > > > > > What we facing is that there is always timestamp used that
> > > coming
> > > > > > > > > from ExtractRecordMetadataTimestamp when determining
> matching
> > > > > windows
> > > > > > > for
> > > > > > > > > event, inside KStreamWindowAggregate.process() and never
> > value
> > > > > from
> > > > > > > our
> > > > > > > > > json timestamp extractor.
> > > > > > > > >
> > > > > > > > > Effectively it doesn't work correctly if we test on late
> > data,
> > > > e.g.
> > > > > > > > > timestamp in a message hour ago from now for instance.
> > Topology
> > > > > > always
> > > > > > > > > calculating matching hour bucket (window) using record
> > > timestamp,
> > > > > not
> > > > > > > > > payload.
> > > > > > > > >
> > > > > > > > > Is it expected behaviour ? Are we getting windowing wrong?
> > Any
> > > > > > settings
> > > > > > > > or
> > > > > > > > > other tricks to accommodate our use-case?
> > > > > > > > >
> > > > > > > > > For reference our setup: brokers, kafka-stream and
> > > kafka-clients
> > > > > all
> > > > > > of
> > > > > > > > > v1.0.0
> > > > > > > > > And here is code:
> > > > > > > > >
> > > > > > > > > KTable<Windowed<Tuple>, Long> summaries = in
> > > > > > > > >    .groupBy((key, value) -> ......)
> > > > > > > > >    .windowedBy(TimeWindows.of(
> TimeUnit.HOURS.toMillis(1l)))
> > > > > > > > >    .count();
> > > > > > > > >
> > > > > > > > > Thank you.
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to