Could you share your stack trace upon failure?
On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi <unmeshjo...@gmail.com> wrote: > HI, > > I am using confluent-3.0.0-2.11, with kafka and streams versions > org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and > org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem seems > to be with null keys because the original messages are not produced with > keys, and I am creating a key value pair in the map function before > aggregating to KTable. The RocksDBWindowStore putInternal is expecting the > timestamp to be appended to the Key, which was not the case. > It somehow corrected itself, once I started producing messages with some > non null key. > > The code is here > > https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java > > > Thanks, > Unmesh > > > > > > On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > Hello Unmesh, > > > > Timestamp extractor is always applied at the beginning of the topology > for > > each incoming record, and the extracted timestamp is carried throughout > the > > topology. > > > > Could you share your stack trace upon failure with your source code? And > > what version of Kafka Streams are you using? Some old version of the > > library requires the key (as for your case, "String parsedKey = > > parsedRecord.get("region").toString() + parsedRecord.get("location"). > > toString();") to be not null, but it has been resolved in the recent > fixes. > > > > Guozhang > > > > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com> > > wrote: > > > > > Hi, > > > > > > I was trying to experiment with Kafka streams, and had following code > > > > > > KTable<Windowed<String>, Integer> aggregated = locationViews > > > .map((key, value) -> { > > > GenericRecord parsedRecord = parse(value); > > > String parsedKey = parsedRecord.get("region").toString() + > > > parsedRecord.get("location").toString(); > > > return new KeyValue<>(parsedKey, 1); > > > }).reduceByKey((v1, v2) -> v1 + v2, > > > TimeWindows.of("aggregated", 5000), Serdes.String(), > > > Serdes.Integer()); > > > > > > This code fails in RocksDBWindowStore.putInternal, where its trying > > > to get timestamp from message key. > > > > > > I am not producing message with key, so I tried putting > > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > > > > streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > > > WallclockTimestampExtractor.class.getName()); > > > > > > But RocksDBWindowStore, still expects timestamp in the message key and > > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG. > > > > > > Is it the case that Kafka message should always have a key if we have > > > to use Windowing? or this is an issue with RocksDBWindowStore? > > > > > > > > > Thanks, > > > > > > Unmesh > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang