This seems to be the same question as "Trying to use Kafka Stream" ?
On 3/14/17 9:05 AM, Mina Aslani wrote: > Hi, > I am using below code to read from a topic and count words and write to > another topic. The example is the one in github. > My kafka container is in the VM. I do not get any error but I do not see > any result/output in my output ordCount-output topic either. The program > also does not stop either! > > Any idea? > > Best regards, > Mina > > Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cloudtrail-events-streaming"); > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.102:29092"); > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > > props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10); > props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); > > // setting offset reset to earliest so that we can re-run the demo > code with the same pre-loaded data > // Note: To re-run the demo, you need to use the offset reset tool: > // > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > KStreamBuilder builder = new KStreamBuilder(); > > KStream<String, String> source = builder.stream("wordCount-input"); > > KTable<String, Long> counts = source > .flatMapValues(new ValueMapper<String, Iterable<String>>() { > @Override > public Iterable<String> apply(String value) { > return > Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); > } > }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { > @Override > public KeyValue<String, String> apply(String key, String value) { > return new KeyValue<>(value, value); > } > }) > .groupByKey() > .count("Counts"); > > // need to override value serde to Long type > counts.to(Serdes.String(), Serdes.Long(), "wordCount-output"); > > LOGGER.info("counts:::::::::" + counts); > > KafkaStreams streams = new KafkaStreams(builder, props); > > streams.cleanUp(); > streams.start(); > > // usually the stream application would be running forever, > // in this example we just let it run for some time and stop since the > input data is finite. > Thread.sleep(5000L); > streams.close(); >
signature.asc
Description: OpenPGP digital signature