Dmitry Vsekhvalnov created KAFKA-6614:
-----------------------------------------

             Summary: kafka-streams to configure internal topics 
message.timestamp.type=CreateTime
                 Key: KAFKA-6614
                 URL: https://issues.apache.org/jira/browse/KAFKA-6614
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Dmitry Vsekhvalnov


After fixing KAFKA-4785 all internal topics using built-in 
*RecordMetadataTimestampExtractor* to read timestamps.

Which doesn't seem to work correctly out of box with kafka brokers configured 
with *log.message.timestamp.type=LogAppendTime* when using custom message 
timestamp extractor.

Example use-case windowed grouping + aggregation on late data:

{code:java}
{code:java}
KTable<Windowed<Tuple>, Long> summaries = in
   .groupBy((key, value) -> ......)
   .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
   .count();{code}
when processing late events:
 # custom timestamp extractor will pick up timestamp in the past from message 
(let's say hour ago)
 # re-partition topic during grouping phase will be written back to kafka using 
timestamp from (1)
 # kafka broker will ignore provided timestamp in (2) to favor ingestion time
 # streams lib will read re-partitioned topic back with 
RecordMetadataTimestampExtractor
 # and will get ingestion timestamp (3), which usually close to "now"
 # window start/end will be incorrectly set based on "now" instead of original 
timestamp from payload

Understand there are ways to configure per-topic timestamp type in kafka 
brokers to solve this, but it will be really nice if kafka-streams library can 
take care of it itself.

To follow "least-surprise" principle.  If library relies on timestamp.type for 
topic it manages it should enforce it.

CC [~guozhang] based on user group email discussion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to