Hi Patrice,

Which version of Kafka are you using for this demo app?

Guozhang

On Wed, Dec 6, 2017 at 8:04 AM, Patrice Chalcol <pchal...@gmail.com> wrote:

> Hi Bill,
>
> Thanks, I understand. Let me know if you need further information.
>
> Regards,
> Patrice
>
> 2017-12-06 16:03 GMT+01:00 Bill Bejeck <bbej...@gmail.com>:
>
> > Hi Patrice,
> >
> > I haven't forgotten, just sidetracked with other things.  I'll get back
> to
> > you by the end of the week.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck <bbej...@gmail.com> wrote:
> >
> > > Patrice,
> > >
> > > Thanks for reporting this.  I'll have a look at what you've posted on
> > > Github.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com>
> > > wrote:
> > >
> > >> Hello,
> > >>
> > >> I have implemented a basic application which uses kafka streams stores
> > and
> > >> interactive queries, available there :
> > >> https://github.com/pchalcol/kstreams-healthcheck
> > >>
> > >> The healthcheck implementation is based on kafka streams metadata and
> > the
> > >> stream state, as illustrated below :
> > >> ```
> > >> String healthcheck() {
> > >> Collection<StreamsMetadata> stores = streams.allMetadata();
> > >> long storescount = stores.stream()
> > >> .filter(meta -> meta.host().contains("localhost") && meta.port() ==
> > 4567)
> > >> .count();
> > >>
> > >> State state = streams.state();
> > >>
> > >> System.out.println(String.format("Application State: (%d, %s)",
> > >> storescount, state.toString()));
> > >>
> > >> // KO if current node is down or if is in 'not running' state
> > >> if (storescount == 0 || !state.isRunning()) return "KO";
> > >> return "OK";
> > >> }
> > >> ```
> > >>
> > >> I have created the topics with 4 partitions :
> > >> `kafka-topics --create --topic events --zookeeper localhost:2181
> > >> --partitions 4 --replication-factor 1`
> > >> `kafka-topics --create --topic library --zookeeper localhost:2181
> > >> --partitions 4 --replication-factor 1`
> > >>
> > >> What I had expected was the healthcheck returning an error whenever
> the
> > >> broker is shut down, which is not the case.
> > >>
> > >> When I check the application status using the following
> > >> curl -XGET http://localhost:4567/healthcheck
> > >> The server always returns a SUCCESS response, even if the kafka
> cluster
> > is
> > >> down.
> > >>
> > >> You will find below the different tests cases I've done.
> > >>
> > >> 1/ The Stream state is not changed after shutting down the kafka
> cluster
> > >> - start kafka
> > >> `cd docker && docker-compose up -d`
> > >>
> > >> - start producer
> > >> `sbt runMain com.example.streams.Producer`
> > >>
> > >> - start streams and http server
> > >> `sbt runMain com.example.streams.Producer`
> > >>
> > >> - healthcheck
> > >> `curl -XGET http://localhost:4567/healthcheck`
> > >> <http://localhost:4567/healthcheck>
> > >> => response = {"status": "SUCCESS"}
> > >> - shutdown kafka : docker-compose stop
> > >>
> > >> - healthcheck
> > >> `curl -XGET http://localhost:4567/healthcheck`
> > >> <http://localhost:4567/healthcheck>
> > >> => response = {"status": "SUCCESS"} while the expected one should be
> > >> {"status": "ERROR"}
> > >>
> > >>
> > >> 2/ Sometimes, I also encounter this behaviour, no data seems to be
> > >> available when querying the stores
> > >> - Start kafka
> > >> - Start Producer
> > >> - Start Streams and http Server
> > >>
> > >> - Request data : curl -XGET http://localhost:4567/titles
> > >>   This http request calls a service which in turn queries the keyvalue
> > >> store
> > >> => received response
> > >> ```
> > >> {
> > >>     "data": [
> > >>         {
> > >>             "key": 1,
> > >>             "value": "Fresh Fruit For Rotting Vegetables"
> > >>         },
> > >>
> > >>         ...
> > >>
> > >>         {
> > >>             "key": 10,
> > >>             "value": "Fear Of A Black Planet"
> > >>         }
> > >>     ],
> > >>     "status": "SUCCESS"
> > >> }
> > >> ```
> > >>
> > >> - Request data : curl -XGET http://localhost:4567/titles/counts
> > >> => received response
> > >> ```
> > >> {
> > >> "data": [
> > >> {
> > >> "key": "fear of a black planet",
> > >> "value": 414
> > >> },
> > >> ...
> > >> {
> > >> "key": "curtain call - the hits",
> > >> "value": 431
> > >> }
> > >> ],
> > >> "status": "SUCCESS"
> > >> }
> > >> ```
> > >>
> > >> - shutdown kafka
> > >>
> > >> - Request data : curl -XGET http://localhost:4567/titles
> > >> => received response, same as before, which seems to be ok as we are
> > >> querying the local store
> > >> ```
> > >> {
> > >>     "data": [
> > >>         {
> > >>             "key": 1,
> > >>             "value": "Fresh Fruit For Rotting Vegetables"
> > >>         },
> > >>
> > >>         ...
> > >>
> > >>         {
> > >>             "key": 10,
> > >>             "value": "Fear Of A Black Planet"
> > >>         }
> > >>     ],
> > >>     "status": "SUCCESS"
> > >> }
> > >> ```
> > >> - Request data : curl -XGET http://localhost:4567/titles/counts
> > >> => received response, still understandable
> > >> ```
> > >> {
> > >> "data": [
> > >> {
> > >> "key": "fear of a black planet",
> > >> "value": 414
> > >> },
> > >> ...
> > >> {
> > >> "key": "curtain call - the hits",
> > >> "value": 431
> > >> }
> > >> ],
> > >> "status": "SUCCESS"
> > >> }
> > >> ```
> > >>
> > >> - restart kafka
> > >>
> > >> - Request data : curl -XGET http://localhost:4567/titles
> > >> => received response
> > >> ```
> > >> {
> > >>     "data": [],
> > >>     "status": "SUCCESS"
> > >> }
> > >> ```
> > >>
> > >> - Request data : curl -XGET http://localhost:4567/titles/counts
> > >> => same here, received response
> > >> ```
> > >> {
> > >>     "data": [],
> > >>     "status": "SUCCESS"
> > >> }
> > >> ```
> > >>
> > >> I also see this entry in the Streams application logs
> > >> ```[error]
> > >> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-
> > StreamThread-1)
> > >> org.apache.kafka.streams.errors.StreamsException: task [1_0]
> exception
> > >> caught when producing
> > >> org.apache.kafka.streams.errors.StreamsException: task [1_0]
> exception
> > >> caught when producing
> > >> at
> > >> org.apache.kafka.streams.processor.internals.RecordCollector
> > >> Impl.checkForException(RecordCollectorImpl.java:136)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.RecordCollector
> > >> Impl.flush(RecordCollectorImpl.java:144)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamTask.
> > >> flushState(StreamTask.java:283)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamTask$1.
> > >> run(StreamTask.java:264)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamTask.
> > >> commitImpl(StreamTask.java:259)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamTask.
> > >> commit(StreamTask.java:253)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.
> > >> commitOne(StreamThread.java:815)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.
> > >> access$2800(StreamThread.java:73)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread$2.
> > >> apply(StreamThread.java:797)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.pe
> > >> rformOnStreamTasks(StreamThread.java:1448)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.
> > >> commitAll(StreamThread.java:789)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.
> > >> maybeCommit(StreamThread.java:778)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.
> > >> runLoop(StreamThread.java:567)
> > >> at
> > >> org.apache.kafka.streams.processor.internals.StreamThread.
> > >> run(StreamThread.java:527)
> > >> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring
> 1
> > >> record(s) for kafka-streams-test-events-counts-repartition-1: 30046
> ms
> > >> has
> > >> passed since batch creation plus linger time
> > >> [trace] Stack trace suppressed: run last compile:runMain for the full
> > >> output.
> > >> the state store, events-counts, may have migrated to another
> > instance.```
> > >>
> > >> Even if a rebalance has occurred after having restarted my cluster,
> as I
> > >> have only one consumer, I thought it should still see all partitions,
> so
> > >> the store should remain available.
> > >> What am I missing here ?
> > >>
> > >> Thank you for your answers.
> > >>
> > >> --
> > >> Regards,
> > >> Patrice
> > >>
> > >
> > >
> >
>
>
>
> --
> Cordialement
> Patrice Chalcol
>



-- 
-- Guozhang

Reply via email to