Hi, I created a simple Spring Boot Application with Kafka and added a dependency from Kafka Streams. The application can send and receive messages and works fine.
In the same app, I want to use Kafka Streams to calculate statistic about the topic. I wrote the following code from the word count example: @Service public class StartupService { @Value(value = "${kafka.events.topic.name}") private String eventsTopicName; @Value(value = "${kafka.bootstrap.servers}") private String bootstrapAddress; @PostConstruct public void init() { Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, System.getProperty("java.io.tmpdir")+"\\count"); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream(eventsTopicName); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable<String, Long> wordCounts = textLines .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((key, word) -> word) .count(); Topology topology = builder.build(); KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> { System.out.println("Exception in thread:" + thread.getId() + ", Message:" + throwable.getMessage()); }); streams.cleanUp(); streams.start(); try { Thread.sleep(20000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } // Check the result here: } } How can I check the result of the calculation? The last lines in the log file are: 2019-05-08 19:19:36.091 INFO 18380 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer, groupId=test] (Re-)joining group 2019-05-08 19:19:36.163 INFO 18380 --- [-StreamThread-1] o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer] Assigned tasks to clients as {a2d0314e-c872-469e-b34c-c08e2fdf422a=[activeTasks: ([0_0, 1_0]) standbyTasks: ([]) assignedTasks: ([0_0, 1_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}. 2019-05-08 19:19:36.205 INFO 18380 --- [-StreamThread-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer, groupId=test] Successfully joined group with generation 21 2019-05-08 19:19:36.209 INFO 18380 --- [-StreamThread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer, groupId=test] Setting newly assigned partitions [events-0, test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-0] 2019-05-08 19:19:36.209 INFO 18380 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED 2019-05-08 19:19:36.234 INFO 18380 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] partition assignment took 25 ms. current active tasks: [0_0, 1_0] current standby tasks: [] previous active tasks: [] 2019-05-08 19:19:36.635 INFO 18380 --- [-StreamThread-1] org.apache.kafka.clients.Metadata : Cluster ID: I6GfqZSORWGKqPHE7zA_cQ 2019-05-08 19:19:36.659 INFO 18380 --- [-StreamThread-1] o.a.k.s.p.i.StoreChangelogReader : stream-thread [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] Restoring task 1_0's state store KSTREAM-AGGREGATE-STATE-STORE-0000000003 from beginning of the changelog test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0 2019-05-08 19:19:36.668 INFO 18380 --- [-StreamThread-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-restore-consumer, groupId=] Resetting offset for partition test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0 to offset 0. 2019-05-08 19:19:36.852 INFO 18380 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING 2019-05-08 19:19:36.853 INFO 18380 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [test-a2d0314e-c872-469e-b34c-c08e2fdf422a] State transition from REBALANCING to RUNNING It's staying in the RUNNING state and doesn't move forward. What I am doing wrong? *Pavel Molchanov*