Sounds great! :)

On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <dvsekhval...@gmail.com
> wrote:

> Thanks, that's an option, i'll take a look at configuration.
>
> But yeah, i was thinking same, if streams relies on the fact that internal
> topics should use 'CreateTime' configuration, then it is streams library
> responsibility to configure it.
>
> I can open a Jira ticket :)
>
> On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Dmitriy,
> >
> > In your case, you can override this config to CreateTime only for the
> > internal topics created by Streams, this is documented in
> >
> > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > streams/StreamsConfig.html#TOPIC_PREFIX
> >
> >
> > We are also discussing to always override the log.message.timestamp.type
> > config for internal topics to CreateTime, I vaguely remember there is a
> > JIRA open for it in case you are interested in contributing to Streams
> > library.
> >
> > Guozhang
> >
> >
> > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com
> > > wrote:
> >
> > > Which effectively means given scenario is not working with
> LogAppendTime,
> > > correct? Because all internal re-partition topics will always contain
> > "now"
> > > instead of real timestamp from original payload message?
> > >
> > > Is kafka-streams designed to work with LogAppendTime at all? It seems a
> > lot
> > > of stuff will NOT work correctly using
> > > built-in ExtractRecordMetadataTimestamp ?
> > >
> > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > universally,
> > > > it will ignore whatever timestamp set in the message metadata and
> > > override
> > > > it with the append time. So when the messages are fetched by
> downstream
> > > > processors which always use the metadata timestamp extractor, it will
> > get
> > > > the append timestamp set by brokers.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > interesting, will same logic applies (internal topic rewrite) for
> > > brokers
> > > > > configured with:
> > > > >   log.message.timestamp.type=LogAppendTime
> > > > >
> > > > > ?
> > > > >
> > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Dmitriy,
> > > > > >
> > > > > > What you have observed is by design, and it maybe a bit confusing
> > at
> > > > > first
> > > > > > place. Let me explain:
> > > > > >
> > > > > > When you do a group-by aggregation like the above case, during
> the
> > > > > > "groupBy((key,
> > > > > > value) -> ......)" stage Streams library will do a
> re-partitioning
> > by
> > > > > > sending the original data stream into an internal repartition
> topic
> > > > based
> > > > > > on the aggregation key defined in the "groupBy" function and
> fetch
> > > from
> > > > > > that topic again. This is similar to a shuffle phase in
> distributed
> > > > > > computing frameworks to make sure the down stream aggregations
> can
> > be
> > > > > done
> > > > > > in parallel. When the "groupBy" operator sends the messages to
> this
> > > > > > repartition topic, it will set in the record metadata the
> extracted
> > > > > > timestamp from the payload, and hence for the downstream
> > aggregation
> > > > > > operator to read from this repartition topic, it is OK to always
> > use
> > > > > > the ExtractRecordMetadataTimestamp
> > > > > > to extract that timestamp and use the extracted value to
> determine
> > > > which
> > > > > > window this record should fall into.
> > > > > >
> > > > > > More details can be found in this JIRA:
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > > >
> > > > > >
> > > > > > So the record timestamp used during aggregation should be the
> same
> > as
> > > > the
> > > > > > one in the payload, if you do observe that is not the case, this
> is
> > > > > > unexpected. In that case could you share your complete code
> > snippet,
> > > > > > especially how input stream "in" is defined, and your config
> > > properties
> > > > > > defined for us to investigate?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 5:53 AM, Dmitriy Vsekhvalnov <
> > > > > > dvsekhval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Good morning,
> > > > > > >
> > > > > > > we have simple use-case where we want to count number of events
> > by
> > > > each
> > > > > > > hour grouped by some fields from event itself.
> > > > > > >
> > > > > > > Our event timestamp is embedded into messages itself (json) and
> > we
> > > > > using
> > > > > > > trivial custom timestamp extractor (which called and works as
> > > > > expected).
> > > > > > >
> > > > > > > What we facing is that there is always timestamp used that
> coming
> > > > > > > from ExtractRecordMetadataTimestamp when determining matching
> > > windows
> > > > > for
> > > > > > > event, inside KStreamWindowAggregate.process() and never value
> > > from
> > > > > our
> > > > > > > json timestamp extractor.
> > > > > > >
> > > > > > > Effectively it doesn't work correctly if we test on late data,
> > e.g.
> > > > > > > timestamp in a message hour ago from now for instance. Topology
> > > > always
> > > > > > > calculating matching hour bucket (window) using record
> timestamp,
> > > not
> > > > > > > payload.
> > > > > > >
> > > > > > > Is it expected behaviour ? Are we getting windowing wrong? Any
> > > > settings
> > > > > > or
> > > > > > > other tricks to accommodate our use-case?
> > > > > > >
> > > > > > > For reference our setup: brokers, kafka-stream and
> kafka-clients
> > > all
> > > > of
> > > > > > > v1.0.0
> > > > > > > And here is code:
> > > > > > >
> > > > > > > KTable<Windowed<Tuple>, Long> summaries = in
> > > > > > >    .groupBy((key, value) -> ......)
> > > > > > >    .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
> > > > > > >    .count();
> > > > > > >
> > > > > > > Thank you.
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to