Patrice, Thanks for reporting this. I'll have a look at what you've posted on Github.
Thanks, Bill On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com> wrote: > Hello, > > I have implemented a basic application which uses kafka streams stores and > interactive queries, available there : > https://github.com/pchalcol/kstreams-healthcheck > > The healthcheck implementation is based on kafka streams metadata and the > stream state, as illustrated below : > ``` > String healthcheck() { > Collection<StreamsMetadata> stores = streams.allMetadata(); > long storescount = stores.stream() > .filter(meta -> meta.host().contains("localhost") && meta.port() == 4567) > .count(); > > State state = streams.state(); > > System.out.println(String.format("Application State: (%d, %s)", > storescount, state.toString())); > > // KO if current node is down or if is in 'not running' state > if (storescount == 0 || !state.isRunning()) return "KO"; > return "OK"; > } > ``` > > I have created the topics with 4 partitions : > `kafka-topics --create --topic events --zookeeper localhost:2181 > --partitions 4 --replication-factor 1` > `kafka-topics --create --topic library --zookeeper localhost:2181 > --partitions 4 --replication-factor 1` > > What I had expected was the healthcheck returning an error whenever the > broker is shut down, which is not the case. > > When I check the application status using the following > curl -XGET http://localhost:4567/healthcheck > The server always returns a SUCCESS response, even if the kafka cluster is > down. > > You will find below the different tests cases I've done. > > 1/ The Stream state is not changed after shutting down the kafka cluster > - start kafka > `cd docker && docker-compose up -d` > > - start producer > `sbt runMain com.example.streams.Producer` > > - start streams and http server > `sbt runMain com.example.streams.Producer` > > - healthcheck > `curl -XGET http://localhost:4567/healthcheck` > => response = {"status": "SUCCESS"} > - shutdown kafka : docker-compose stop > > - healthcheck > `curl -XGET http://localhost:4567/healthcheck` > => response = {"status": "SUCCESS"} while the expected one should be > {"status": "ERROR"} > > > 2/ Sometimes, I also encounter this behaviour, no data seems to be > available when querying the stores > - Start kafka > - Start Producer > - Start Streams and http Server > > - Request data : curl -XGET http://localhost:4567/titles > This http request calls a service which in turn queries the keyvalue > store > => received response > ``` > { > "data": [ > { > "key": 1, > "value": "Fresh Fruit For Rotting Vegetables" > }, > > ... > > { > "key": 10, > "value": "Fear Of A Black Planet" > } > ], > "status": "SUCCESS" > } > ``` > > - Request data : curl -XGET http://localhost:4567/titles/counts > => received response > ``` > { > "data": [ > { > "key": "fear of a black planet", > "value": 414 > }, > ... > { > "key": "curtain call - the hits", > "value": 431 > } > ], > "status": "SUCCESS" > } > ``` > > - shutdown kafka > > - Request data : curl -XGET http://localhost:4567/titles > => received response, same as before, which seems to be ok as we are > querying the local store > ``` > { > "data": [ > { > "key": 1, > "value": "Fresh Fruit For Rotting Vegetables" > }, > > ... > > { > "key": 10, > "value": "Fear Of A Black Planet" > } > ], > "status": "SUCCESS" > } > ``` > - Request data : curl -XGET http://localhost:4567/titles/counts > => received response, still understandable > ``` > { > "data": [ > { > "key": "fear of a black planet", > "value": 414 > }, > ... > { > "key": "curtain call - the hits", > "value": 431 > } > ], > "status": "SUCCESS" > } > ``` > > - restart kafka > > - Request data : curl -XGET http://localhost:4567/titles > => received response > ``` > { > "data": [], > "status": "SUCCESS" > } > ``` > > - Request data : curl -XGET http://localhost:4567/titles/counts > => same here, received response > ``` > { > "data": [], > "status": "SUCCESS" > } > ``` > > I also see this entry in the Streams application logs > ```[error] > (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1) > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > caught when producing > org.apache.kafka.streams.errors.StreamsException: task [1_0] exception > caught when producing > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl. > checkForException(RecordCollectorImpl.java:136) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush( > RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState( > StreamTask.java:283) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask. > java:264) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.processor.internals.StreamTask.commitImpl( > StreamTask.java:259) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask. > java:253) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne( > StreamThread.java:815) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$2800( > StreamThread.java:73) > at > org.apache.kafka.streams.processor.internals.StreamThread$2.apply( > StreamThread.java:797) > at > org.apache.kafka.streams.processor.internals.StreamThread. > performOnStreamTasks(StreamThread.java:1448) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll( > StreamThread.java:789) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit( > StreamThread.java:778) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:567) > at > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:527) > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms has > passed since batch creation plus linger time > [trace] Stack trace suppressed: run last compile:runMain for the full > output. > the state store, events-counts, may have migrated to another instance.``` > > Even if a rebalance has occurred after having restarted my cluster, as I > have only one consumer, I thought it should still see all partitions, so > the store should remain available. > What am I missing here ? > > Thank you for your answers. > > -- > Regards, > Patrice >