Hi, I was trying to experiment with Kafka streams, and had following code
KTable<Windowed<String>, Integer> aggregated = locationViews .map((key, value) -> { GenericRecord parsedRecord = parse(value); String parsedKey = parsedRecord.get("region").toString() + parsedRecord.get("location").toString(); return new KeyValue<>(parsedKey, 1); }).reduceByKey((v1, v2) -> v1 + v2, TimeWindows.of("aggregated", 5000), Serdes.String(), Serdes.Integer()); This code fails in RocksDBWindowStore.putInternal, where its trying to get timestamp from message key. I am not producing message with key, so I tried putting TIMESTAMP_EXTRACTOR_CLASS_CONFIG. streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName()); But RocksDBWindowStore, still expects timestamp in the message key and does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. Is it the case that Kafka message should always have a key if we have to use Windowing? or this is an issue with RocksDBWindowStore? Thanks, Unmesh