Hi Patrice, Which version of Kafka are you using for this demo app?
Guozhang On Wed, Dec 6, 2017 at 8:04 AM, Patrice Chalcol <pchal...@gmail.com> wrote: > Hi Bill, > > Thanks, I understand. Let me know if you need further information. > > Regards, > Patrice > > 2017-12-06 16:03 GMT+01:00 Bill Bejeck <bbej...@gmail.com>: > > > Hi Patrice, > > > > I haven't forgotten, just sidetracked with other things. I'll get back > to > > you by the end of the week. > > > > Thanks, > > Bill > > > > On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck <bbej...@gmail.com> wrote: > > > > > Patrice, > > > > > > Thanks for reporting this. I'll have a look at what you've posted on > > > Github. > > > > > > Thanks, > > > Bill > > > > > > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com> > > > wrote: > > > > > >> Hello, > > >> > > >> I have implemented a basic application which uses kafka streams stores > > and > > >> interactive queries, available there : > > >> https://github.com/pchalcol/kstreams-healthcheck > > >> > > >> The healthcheck implementation is based on kafka streams metadata and > > the > > >> stream state, as illustrated below : > > >> ``` > > >> String healthcheck() { > > >> Collection<StreamsMetadata> stores = streams.allMetadata(); > > >> long storescount = stores.stream() > > >> .filter(meta -> meta.host().contains("localhost") && meta.port() == > > 4567) > > >> .count(); > > >> > > >> State state = streams.state(); > > >> > > >> System.out.println(String.format("Application State: (%d, %s)", > > >> storescount, state.toString())); > > >> > > >> // KO if current node is down or if is in 'not running' state > > >> if (storescount == 0 || !state.isRunning()) return "KO"; > > >> return "OK"; > > >> } > > >> ``` > > >> > > >> I have created the topics with 4 partitions : > > >> `kafka-topics --create --topic events --zookeeper localhost:2181 > > >> --partitions 4 --replication-factor 1` > > >> `kafka-topics --create --topic library --zookeeper localhost:2181 > > >> --partitions 4 --replication-factor 1` > > >> > > >> What I had expected was the healthcheck returning an error whenever > the > > >> broker is shut down, which is not the case. > > >> > > >> When I check the application status using the following > > >> curl -XGET http://localhost:4567/healthcheck > > >> The server always returns a SUCCESS response, even if the kafka > cluster > > is > > >> down. > > >> > > >> You will find below the different tests cases I've done. > > >> > > >> 1/ The Stream state is not changed after shutting down the kafka > cluster > > >> - start kafka > > >> `cd docker && docker-compose up -d` > > >> > > >> - start producer > > >> `sbt runMain com.example.streams.Producer` > > >> > > >> - start streams and http server > > >> `sbt runMain com.example.streams.Producer` > > >> > > >> - healthcheck > > >> `curl -XGET http://localhost:4567/healthcheck` > > >> <http://localhost:4567/healthcheck> > > >> => response = {"status": "SUCCESS"} > > >> - shutdown kafka : docker-compose stop > > >> > > >> - healthcheck > > >> `curl -XGET http://localhost:4567/healthcheck` > > >> <http://localhost:4567/healthcheck> > > >> => response = {"status": "SUCCESS"} while the expected one should be > > >> {"status": "ERROR"} > > >> > > >> > > >> 2/ Sometimes, I also encounter this behaviour, no data seems to be > > >> available when querying the stores > > >> - Start kafka > > >> - Start Producer > > >> - Start Streams and http Server > > >> > > >> - Request data : curl -XGET http://localhost:4567/titles > > >> This http request calls a service which in turn queries the keyvalue > > >> store > > >> => received response > > >> ``` > > >> { > > >> "data": [ > > >> { > > >> "key": 1, > > >> "value": "Fresh Fruit For Rotting Vegetables" > > >> }, > > >> > > >> ... > > >> > > >> { > > >> "key": 10, > > >> "value": "Fear Of A Black Planet" > > >> } > > >> ], > > >> "status": "SUCCESS" > > >> } > > >> ``` > > >> > > >> - Request data : curl -XGET http://localhost:4567/titles/counts > > >> => received response > > >> ``` > > >> { > > >> "data": [ > > >> { > > >> "key": "fear of a black planet", > > >> "value": 414 > > >> }, > > >> ... > > >> { > > >> "key": "curtain call - the hits", > > >> "value": 431 > > >> } > > >> ], > > >> "status": "SUCCESS" > > >> } > > >> ``` > > >> > > >> - shutdown kafka > > >> > > >> - Request data : curl -XGET http://localhost:4567/titles > > >> => received response, same as before, which seems to be ok as we are > > >> querying the local store > > >> ``` > > >> { > > >> "data": [ > > >> { > > >> "key": 1, > > >> "value": "Fresh Fruit For Rotting Vegetables" > > >> }, > > >> > > >> ... > > >> > > >> { > > >> "key": 10, > > >> "value": "Fear Of A Black Planet" > > >> } > > >> ], > > >> "status": "SUCCESS" > > >> } > > >> ``` > > >> - Request data : curl -XGET http://localhost:4567/titles/counts > > >> => received response, still understandable > > >> ``` > > >> { > > >> "data": [ > > >> { > > >> "key": "fear of a black planet", > > >> "value": 414 > > >> }, > > >> ... > > >> { > > >> "key": "curtain call - the hits", > > >> "value": 431 > > >> } > > >> ], > > >> "status": "SUCCESS" > > >> } > > >> ``` > > >> > > >> - restart kafka > > >> > > >> - Request data : curl -XGET http://localhost:4567/titles > > >> => received response > > >> ``` > > >> { > > >> "data": [], > > >> "status": "SUCCESS" > > >> } > > >> ``` > > >> > > >> - Request data : curl -XGET http://localhost:4567/titles/counts > > >> => same here, received response > > >> ``` > > >> { > > >> "data": [], > > >> "status": "SUCCESS" > > >> } > > >> ``` > > >> > > >> I also see this entry in the Streams application logs > > >> ```[error] > > >> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758- > > StreamThread-1) > > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] > exception > > >> caught when producing > > >> org.apache.kafka.streams.errors.StreamsException: task [1_0] > exception > > >> caught when producing > > >> at > > >> org.apache.kafka.streams.processor.internals.RecordCollector > > >> Impl.checkForException(RecordCollectorImpl.java:136) > > >> at > > >> org.apache.kafka.streams.processor.internals.RecordCollector > > >> Impl.flush(RecordCollectorImpl.java:144) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamTask. > > >> flushState(StreamTask.java:283) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamTask$1. > > >> run(StreamTask.java:264) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamTask. > > >> commitImpl(StreamTask.java:259) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamTask. > > >> commit(StreamTask.java:253) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > >> commitOne(StreamThread.java:815) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > >> access$2800(StreamThread.java:73) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread$2. > > >> apply(StreamThread.java:797) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread.pe > > >> rformOnStreamTasks(StreamThread.java:1448) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > >> commitAll(StreamThread.java:789) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > >> maybeCommit(StreamThread.java:778) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > >> runLoop(StreamThread.java:567) > > >> at > > >> org.apache.kafka.streams.processor.internals.StreamThread. > > >> run(StreamThread.java:527) > > >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring > 1 > > >> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 > ms > > >> has > > >> passed since batch creation plus linger time > > >> [trace] Stack trace suppressed: run last compile:runMain for the full > > >> output. > > >> the state store, events-counts, may have migrated to another > > instance.``` > > >> > > >> Even if a rebalance has occurred after having restarted my cluster, > as I > > >> have only one consumer, I thought it should still see all partitions, > so > > >> the store should remain available. > > >> What am I missing here ? > > >> > > >> Thank you for your answers. > > >> > > >> -- > > >> Regards, > > >> Patrice > > >> > > > > > > > > > > > > -- > Cordialement > Patrice Chalcol > -- -- Guozhang