This seems to be the same question as "Trying to use Kafka Stream" ?



On 3/14/17 9:05 AM, Mina Aslani wrote:
> Hi,
> I am using below code to read from a topic and count words and write to
> another topic. The example is the one in github.
> My kafka container is in the VM. I do not get any error but I do not see
> any result/output in my output ordCount-output topic either. The program
> also does not stop either!
> 
> Any idea?
> 
> Best regards,
> Mina
> 
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "cloudtrail-events-streaming");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.102:29092");
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> 
> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
> 
> // setting offset reset to earliest so that we can re-run the demo
> code with the same pre-loaded data
> // Note: To re-run the demo, you need to use the offset reset tool:
> // 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> 
> KStreamBuilder builder = new KStreamBuilder();
> 
> KStream<String, String> source = builder.stream("wordCount-input");
> 
> KTable<String, Long> counts = source
>       .flatMapValues(new ValueMapper<String, Iterable<String>>() {
>          @Override
>          public Iterable<String> apply(String value) {
>             return
> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
>          }
>       }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
>          @Override
>          public KeyValue<String, String> apply(String key, String value) {
>             return new KeyValue<>(value, value);
>          }
>       })
>       .groupByKey()
>       .count("Counts");
> 
> // need to override value serde to Long type
> counts.to(Serdes.String(), Serdes.Long(), "wordCount-output");
> 
> LOGGER.info("counts:::::::::" + counts);
> 
> KafkaStreams streams = new KafkaStreams(builder, props);
> 
> streams.cleanUp();
> streams.start();
> 
> // usually the stream application would be running forever,
> // in this example we just let it run for some time and stop since the
> input data is finite.
> Thread.sleep(5000L);
> streams.close();
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to