Thanks, Vito .. that worked !
On Sun, Sep 17, 2017 at 9:02 PM, 鄭紹志 <v...@is-land.com.tw> wrote: > Hi, Karan, > > It looks like you need to add a property 'value.deserializer' to > kafka-console-consumer.sh. > > For example: > $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic > kstreams4 --from-beginning \ > --property print.key=true \ > --property print.value=true \ > --property > key.deserializer=org.apache.kafka.common.serialization.StringDeserializer > \ > --property > value.deserializer=org.apache.kafka.common.serialization.LongDeserializer > > > Vito > > > > > > ---------- > 鄭紹志 Vito Jeng > 亦思科技股份有限公司 研究發展處 > TEL: 03-5630345 Ext.16 > > On Mon, Sep 18, 2017 at 10:58 AM, karan alang <karan.al...@gmail.com> > wrote: > > > Hello All > > - i've a basic word count Kafka streams code, which reads data from the > > input topic, splits the data (per the separator) and outputs the data > into > > the output topic. > > > > I've a console producer, which is putting data into the input topic > > (kstreams3), > > and a console consumer which is reading the (split) data from output > > topic(kstreams4) > > > > The code seems to be working fine, > > but on the console consumer, i'm Not able to see the text. > > It seems to be printing "blanks", instead of the actual text .. > > (though, console consumer seems to be consuming the correct number of > > lines) > > > > Any pointers on what the issue might be ? > > > > Code -> > > > > Properties props = new Properties(); > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, > > > "streamswordcount-application"); > > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > > > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass().getName()); > > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass().getName()); > > > > > > KStreamBuilder builder = new KStreamBuilder(); > > > KStream<String, String> textLines = builder.stream("kstreams3"); > > > > > > KTable<String, Long> wordCounts = textLines > > > .flatMapValues(textLine -> > > > Arrays.asList(textLine.toLowerCase().split("\\W+"))) > > > .groupBy((key, word)-> word) > > > .count("Counts"); > > > > > > System.out.println(" Kstreams - wordCount " + wordCounts); > > > > > > wordCounts.to(Serdes.String(),Serdes.Long(),"kstreams4"); > > > > > > KafkaStreams streams = new KafkaStreams(builder, props); > > > streams.start(); > > >