Hello, I have implemented a basic application which uses kafka streams stores and interactive queries, available there : https://github.com/pchalcol/kstreams-healthcheck
The healthcheck implementation is based on kafka streams metadata and the stream state, as illustrated below : ``` String healthcheck() { Collection<StreamsMetadata> stores = streams.allMetadata(); long storescount = stores.stream() .filter(meta -> meta.host().contains("localhost") && meta.port() == 4567) .count(); State state = streams.state(); System.out.println(String.format("Application State: (%d, %s)", storescount, state.toString())); // KO if current node is down or if is in 'not running' state if (storescount == 0 || !state.isRunning()) return "KO"; return "OK"; } ``` I have created the topics with 4 partitions : `kafka-topics --create --topic events --zookeeper localhost:2181 --partitions 4 --replication-factor 1` `kafka-topics --create --topic library --zookeeper localhost:2181 --partitions 4 --replication-factor 1` What I had expected was the healthcheck returning an error whenever the broker is shut down, which is not the case. When I check the application status using the following curl -XGET http://localhost:4567/healthcheck The server always returns a SUCCESS response, even if the kafka cluster is down. You will find below the different tests cases I've done. 1/ The Stream state is not changed after shutting down the kafka cluster - start kafka `cd docker && docker-compose up -d` - start producer `sbt runMain com.example.streams.Producer` - start streams and http server `sbt runMain com.example.streams.Producer` - healthcheck `curl -XGET http://localhost:4567/healthcheck` => response = {"status": "SUCCESS"} - shutdown kafka : docker-compose stop - healthcheck `curl -XGET http://localhost:4567/healthcheck` => response = {"status": "SUCCESS"} while the expected one should be {"status": "ERROR"} 2/ Sometimes, I also encounter this behaviour, no data seems to be available when querying the stores - Start kafka - Start Producer - Start Streams and http Server - Request data : curl -XGET http://localhost:4567/titles This http request calls a service which in turn queries the keyvalue store => received response ``` { "data": [ { "key": 1, "value": "Fresh Fruit For Rotting Vegetables" }, ... { "key": 10, "value": "Fear Of A Black Planet" } ], "status": "SUCCESS" } ``` - Request data : curl -XGET http://localhost:4567/titles/counts => received response ``` { "data": [ { "key": "fear of a black planet", "value": 414 }, ... { "key": "curtain call - the hits", "value": 431 } ], "status": "SUCCESS" } ``` - shutdown kafka - Request data : curl -XGET http://localhost:4567/titles => received response, same as before, which seems to be ok as we are querying the local store ``` { "data": [ { "key": 1, "value": "Fresh Fruit For Rotting Vegetables" }, ... { "key": 10, "value": "Fear Of A Black Planet" } ], "status": "SUCCESS" } ``` - Request data : curl -XGET http://localhost:4567/titles/counts => received response, still understandable ``` { "data": [ { "key": "fear of a black planet", "value": 414 }, ... { "key": "curtain call - the hits", "value": 431 } ], "status": "SUCCESS" } ``` - restart kafka - Request data : curl -XGET http://localhost:4567/titles => received response ``` { "data": [], "status": "SUCCESS" } ``` - Request data : curl -XGET http://localhost:4567/titles/counts => same here, received response ``` { "data": [], "status": "SUCCESS" } ``` I also see this entry in the Streams application logs ```[error] (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1) org.apache.kafka.streams.errors.StreamsException: task [1_0] exception caught when producing org.apache.kafka.streams.errors.StreamsException: task [1_0] exception caught when producing at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:144) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:283) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253) at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815) at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73) at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797) at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms has passed since batch creation plus linger time [trace] Stack trace suppressed: run last compile:runMain for the full output. the state store, events-counts, may have migrated to another instance.``` Even if a rebalance has occurred after having restarted my cluster, as I have only one consumer, I thought it should still see all partitions, so the store should remain available. What am I missing here ? Thank you for your answers. -- Regards, Patrice