Could you share your stack trace upon failure?


On Tue, Jun 21, 2016 at 12:05 AM, Unmesh Joshi <unmeshjo...@gmail.com>
wrote:

> HI,
>
> I am using confluent-3.0.0-2.11, with kafka and streams versions
> org.apache.kafka:kafka_2.11:0.10.0.0-cp1 and
> org.apache.kafka:kafka-streams:0.10.0.0-cp1 respectively. The problem seems
> to be with null keys because the original messages are not produced with
> keys, and I am creating a key value pair in the map function before
> aggregating to KTable. The RocksDBWindowStore putInternal is expecting the
> timestamp to be appended to the Key, which was not the case.
> It somehow corrected itself, once I started producing messages with some
> non null key.
>
> The code is here
>
> https://github.com/unmeshjoshi/kafka-geek-day/blob/master/src/test/java/com/geekday/LogAnalyzerTest.java
>
>
> Thanks,
> Unmesh
>
>
>
>
>
> On Tue, Jun 21, 2016 at 10:12 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Hello Unmesh,
> >
> > Timestamp extractor is always applied at the beginning of the topology
> for
> > each incoming record, and the extracted timestamp is carried throughout
> the
> > topology.
> >
> > Could you share your stack trace upon failure with your source code? And
> > what version of Kafka Streams are you using? Some old version of the
> > library requires the key (as for your case, "String parsedKey =
> > parsedRecord.get("region").toString() + parsedRecord.get("location").
> > toString();") to be not null, but it has been resolved in the recent
> fixes.
> >
> > Guozhang
> >
> > On Mon, Jun 20, 2016 at 3:41 AM, Unmesh Joshi <unmeshjo...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > I was trying to experiment with Kafka streams, and had following code
> > >
> > > KTable<Windowed<String>, Integer> aggregated = locationViews
> > >         .map((key, value) -> {
> > >             GenericRecord parsedRecord = parse(value);
> > >             String parsedKey = parsedRecord.get("region").toString() +
> > > parsedRecord.get("location").toString();
> > >             return new KeyValue<>(parsedKey, 1);
> > >         }).reduceByKey((v1, v2) -> v1 + v2,
> > > TimeWindows.of("aggregated", 5000), Serdes.String(),
> > > Serdes.Integer());
> > >
> > > This code fails in  RocksDBWindowStore.putInternal, where its trying
> > > to get timestamp from message key.
> > >
> > > I am not producing message with key, so I tried putting
> > > TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> > >
> > >
> streamsConfiguration.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
> > > WallclockTimestampExtractor.class.getName());
> > >
> > > But RocksDBWindowStore, still expects timestamp in the message key and
> > > does not use TIMESTAMP_EXTRACTOR_CLASS_CONFIG.
> > >
> > > Is it the case that Kafka message should always have a key if we have
> > > to use Windowing? or this is an issue with RocksDBWindowStore?
> > >
> > >
> > > Thanks,
> > >
> > > Unmesh
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to