Hi,
This is the first time that am using Kafka Stream. I would like to read
from input topic and write to output topic. However, I do not see the word
count when I try to run below example. Looks like that it does not connect
to Kafka. I do not see any error though. I tried my localhost kafka as well
as the container in a VM, same situation.
There are over 200 message in the input kafka topic.
Your input is appreciated!
Best regards,
Mina
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import java.util.*;
import java.util.regex.*;
public class WordCountExample {
public static void main(String [] args) {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
"wordcount-streaming");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"<IPADDRESS>:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
10 * 1000);
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> textLines =
builder.stream(stringSerde, stringSerde, "wordcount-input");
final Pattern pattern = Pattern.compile("\\W+",
Pattern.UNICODE_CHARACTER_CLASS);
final KStream<String, Long> wordCounts = textLines
.flatMapValues(value ->
Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "wordcount-output");
final KafkaStreams streams = new KafkaStreams(builder,
streamsConfiguration);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); }
}