Hi Patrice, I haven't forgotten, just sidetracked with other things. I'll get back to you by the end of the week.
Thanks, Bill On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck <bbej...@gmail.com> wrote: > Patrice, > > Thanks for reporting this. I'll have a look at what you've posted on > Github. > > Thanks, > Bill > > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com> > wrote: > >> Hello, >> >> I have implemented a basic application which uses kafka streams stores and >> interactive queries, available there : >> https://github.com/pchalcol/kstreams-healthcheck >> >> The healthcheck implementation is based on kafka streams metadata and the >> stream state, as illustrated below : >> ``` >> String healthcheck() { >> Collection<StreamsMetadata> stores = streams.allMetadata(); >> long storescount = stores.stream() >> .filter(meta -> meta.host().contains("localhost") && meta.port() == 4567) >> .count(); >> >> State state = streams.state(); >> >> System.out.println(String.format("Application State: (%d, %s)", >> storescount, state.toString())); >> >> // KO if current node is down or if is in 'not running' state >> if (storescount == 0 || !state.isRunning()) return "KO"; >> return "OK"; >> } >> ``` >> >> I have created the topics with 4 partitions : >> `kafka-topics --create --topic events --zookeeper localhost:2181 >> --partitions 4 --replication-factor 1` >> `kafka-topics --create --topic library --zookeeper localhost:2181 >> --partitions 4 --replication-factor 1` >> >> What I had expected was the healthcheck returning an error whenever the >> broker is shut down, which is not the case. >> >> When I check the application status using the following >> curl -XGET http://localhost:4567/healthcheck >> The server always returns a SUCCESS response, even if the kafka cluster is >> down. >> >> You will find below the different tests cases I've done. >> >> 1/ The Stream state is not changed after shutting down the kafka cluster >> - start kafka >> `cd docker && docker-compose up -d` >> >> - start producer >> `sbt runMain com.example.streams.Producer` >> >> - start streams and http server >> `sbt runMain com.example.streams.Producer` >> >> - healthcheck >> `curl -XGET http://localhost:4567/healthcheck` >> <http://localhost:4567/healthcheck> >> => response = {"status": "SUCCESS"} >> - shutdown kafka : docker-compose stop >> >> - healthcheck >> `curl -XGET http://localhost:4567/healthcheck` >> <http://localhost:4567/healthcheck> >> => response = {"status": "SUCCESS"} while the expected one should be >> {"status": "ERROR"} >> >> >> 2/ Sometimes, I also encounter this behaviour, no data seems to be >> available when querying the stores >> - Start kafka >> - Start Producer >> - Start Streams and http Server >> >> - Request data : curl -XGET http://localhost:4567/titles >> This http request calls a service which in turn queries the keyvalue >> store >> => received response >> ``` >> { >> "data": [ >> { >> "key": 1, >> "value": "Fresh Fruit For Rotting Vegetables" >> }, >> >> ... >> >> { >> "key": 10, >> "value": "Fear Of A Black Planet" >> } >> ], >> "status": "SUCCESS" >> } >> ``` >> >> - Request data : curl -XGET http://localhost:4567/titles/counts >> => received response >> ``` >> { >> "data": [ >> { >> "key": "fear of a black planet", >> "value": 414 >> }, >> ... >> { >> "key": "curtain call - the hits", >> "value": 431 >> } >> ], >> "status": "SUCCESS" >> } >> ``` >> >> - shutdown kafka >> >> - Request data : curl -XGET http://localhost:4567/titles >> => received response, same as before, which seems to be ok as we are >> querying the local store >> ``` >> { >> "data": [ >> { >> "key": 1, >> "value": "Fresh Fruit For Rotting Vegetables" >> }, >> >> ... >> >> { >> "key": 10, >> "value": "Fear Of A Black Planet" >> } >> ], >> "status": "SUCCESS" >> } >> ``` >> - Request data : curl -XGET http://localhost:4567/titles/counts >> => received response, still understandable >> ``` >> { >> "data": [ >> { >> "key": "fear of a black planet", >> "value": 414 >> }, >> ... >> { >> "key": "curtain call - the hits", >> "value": 431 >> } >> ], >> "status": "SUCCESS" >> } >> ``` >> >> - restart kafka >> >> - Request data : curl -XGET http://localhost:4567/titles >> => received response >> ``` >> { >> "data": [], >> "status": "SUCCESS" >> } >> ``` >> >> - Request data : curl -XGET http://localhost:4567/titles/counts >> => same here, received response >> ``` >> { >> "data": [], >> "status": "SUCCESS" >> } >> ``` >> >> I also see this entry in the Streams application logs >> ```[error] >> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1) >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception >> caught when producing >> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception >> caught when producing >> at >> org.apache.kafka.streams.processor.internals.RecordCollector >> Impl.checkForException(RecordCollectorImpl.java:136) >> at >> org.apache.kafka.streams.processor.internals.RecordCollector >> Impl.flush(RecordCollectorImpl.java:144) >> at >> org.apache.kafka.streams.processor.internals.StreamTask. >> flushState(StreamTask.java:283) >> at >> org.apache.kafka.streams.processor.internals.StreamTask$1. >> run(StreamTask.java:264) >> at >> org.apache.kafka.streams.processor.internals.StreamsMetricsI >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) >> at >> org.apache.kafka.streams.processor.internals.StreamTask. >> commitImpl(StreamTask.java:259) >> at >> org.apache.kafka.streams.processor.internals.StreamTask. >> commit(StreamTask.java:253) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> commitOne(StreamThread.java:815) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> access$2800(StreamThread.java:73) >> at >> org.apache.kafka.streams.processor.internals.StreamThread$2. >> apply(StreamThread.java:797) >> at >> org.apache.kafka.streams.processor.internals.StreamThread.pe >> rformOnStreamTasks(StreamThread.java:1448) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> commitAll(StreamThread.java:789) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> maybeCommit(StreamThread.java:778) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> runLoop(StreamThread.java:567) >> at >> org.apache.kafka.streams.processor.internals.StreamThread. >> run(StreamThread.java:527) >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 >> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms >> has >> passed since batch creation plus linger time >> [trace] Stack trace suppressed: run last compile:runMain for the full >> output. >> the state store, events-counts, may have migrated to another instance.``` >> >> Even if a rebalance has occurred after having restarted my cluster, as I >> have only one consumer, I thought it should still see all partitions, so >> the store should remain available. >> What am I missing here ? >> >> Thank you for your answers. >> >> -- >> Regards, >> Patrice >> > >