Hi,

I was trying to experiment with Kafka streams, and had following code

KTable<Windowed<String>, Integer> aggregated = locationViews
        .map((key, value) -> {
            GenericRecord parsedRecord = parse(value);
            String parsedKey = parsedRecord.get("region").toString() +
parsedRecord.get("location").toString();
            return new KeyValue<>(parsedKey, 1);
        }).reduceByKey((v1, v2) -> v1 + v2,
TimeWindows.of("aggregated", 5000), Serdes.String(),
Serdes.Integer());

This code fails in  RocksDBWindowStore.putInternal, where its trying
to get timestamp from message key.

I am not producing message with key, so I tried putting
TIMESTAMP_EXTRACTOR_CLASS_CONFIG.

streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
WallclockTimestampExtractor.class.getName());

But RocksDBWindowStore, still expects timestamp in the message key and
does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.

Is it the case that Kafka message should always have a key if we have
to use Windowing? or this is an issue with RocksDBWindowStore?


Thanks,

Unmesh

Reply via email to