Hi,

I created a simple Spring Boot Application with Kafka and added a
dependency from Kafka Streams. The application can send and receive
messages and works fine.

In the same app, I want to use Kafka Streams to calculate statistic about
the topic.

I wrote the following code from the word count example:

@Service
public class StartupService {

@Value(value = "${kafka.events.topic.name}")
private String eventsTopicName;

@Value(value = "${kafka.bootstrap.servers}")
private String bootstrapAddress;

@PostConstruct
public void init() {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
System.getProperty("java.io.tmpdir")+"\\count");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(eventsTopicName);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
  .groupBy((key, word) -> word)
  .count();
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) ->
{
  System.out.println("Exception in thread:" + thread.getId()  + ",
Message:" + throwable.getMessage());
});
streams.cleanUp();
streams.start();
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
                // Check the result here:

}

}

How can I check the result of the calculation? The last lines in the log
file are:

2019-05-08 19:19:36.091  INFO 18380 --- [-StreamThread-1]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
groupId=test] (Re-)joining group
2019-05-08 19:19:36.163  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.i.StreamsPartitionAssignor     : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer]
Assigned tasks to clients as
{a2d0314e-c872-469e-b34c-c08e2fdf422a=[activeTasks: ([0_0, 1_0])
standbyTasks: ([]) assignedTasks: ([0_0, 1_0]) prevActiveTasks: ([])
prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
2019-05-08 19:19:36.205  INFO 18380 --- [-StreamThread-1]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
groupId=test] Successfully joined group with generation 21
2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
groupId=test] Setting newly assigned partitions [events-0,
test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-0]
2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition
from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
2019-05-08 19:19:36.234  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] partition
assignment took 25 ms.
current active tasks: [0_0, 1_0]
current standby tasks: []
previous active tasks: []

2019-05-08 19:19:36.635  INFO 18380 --- [-StreamThread-1]
org.apache.kafka.clients.Metadata        : Cluster ID:
I6GfqZSORWGKqPHE7zA_cQ
2019-05-08 19:19:36.659  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.i.StoreChangelogReader         : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] Restoring task
1_0's state store KSTREAM-AGGREGATE-STATE-STORE-0000000003 from beginning
of the changelog test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0
2019-05-08 19:19:36.668  INFO 18380 --- [-StreamThread-1]
o.a.k.c.consumer.internals.Fetcher       : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-restore-consumer,
groupId=] Resetting offset for partition
test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog-0 to offset 0.
2019-05-08 19:19:36.852  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition
from PARTITIONS_ASSIGNED to RUNNING
2019-05-08 19:19:36.853  INFO 18380 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams    : stream-client
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a] State transition from
REBALANCING to RUNNING

It's staying in the RUNNING state and doesn't move forward. What I am doing
wrong?



*Pavel Molchanov*

Reply via email to