Dmitry Vsekhvalnov created KAFKA-6614: -----------------------------------------
Summary: kafka-streams to configure internal topics message.timestamp.type=CreateTime Key: KAFKA-6614 URL: https://issues.apache.org/jira/browse/KAFKA-6614 Project: Kafka Issue Type: Improvement Components: streams Reporter: Dmitry Vsekhvalnov After fixing KAFKA-4785 all internal topics using built-in *RecordMetadataTimestampExtractor* to read timestamps. Which doesn't seem to work correctly out of box with kafka brokers configured with *log.message.timestamp.type=LogAppendTime* when using custom message timestamp extractor. Example use-case windowed grouping + aggregation on late data: {code:java} {code:java} KTable<Windowed<Tuple>, Long> summaries = in .groupBy((key, value) -> ......) .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l))) .count();{code} when processing late events: # custom timestamp extractor will pick up timestamp in the past from message (let's say hour ago) # re-partition topic during grouping phase will be written back to kafka using timestamp from (1) # kafka broker will ignore provided timestamp in (2) to favor ingestion time # streams lib will read re-partitioned topic back with RecordMetadataTimestampExtractor # and will get ingestion timestamp (3), which usually close to "now" # window start/end will be incorrectly set based on "now" instead of original timestamp from payload Understand there are ways to configure per-topic timestamp type in kafka brokers to solve this, but it will be really nice if kafka-streams library can take care of it itself. To follow "least-surprise" principle. If library relies on timestamp.type for topic it manages it should enforce it. CC [~guozhang] based on user group email discussion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)