Hi Patrice,

I haven't forgotten, just sidetracked with other things.  I'll get back to
you by the end of the week.

Thanks,
Bill

On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck <bbej...@gmail.com> wrote:

> Patrice,
>
> Thanks for reporting this.  I'll have a look at what you've posted on
> Github.
>
> Thanks,
> Bill
>
> On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol <pchal...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have implemented a basic application which uses kafka streams stores and
>> interactive queries, available there :
>> https://github.com/pchalcol/kstreams-healthcheck
>>
>> The healthcheck implementation is based on kafka streams metadata and the
>> stream state, as illustrated below :
>> ```
>> String healthcheck() {
>> Collection<StreamsMetadata> stores = streams.allMetadata();
>> long storescount = stores.stream()
>> .filter(meta -> meta.host().contains("localhost") && meta.port() == 4567)
>> .count();
>>
>> State state = streams.state();
>>
>> System.out.println(String.format("Application State: (%d, %s)",
>> storescount, state.toString()));
>>
>> // KO if current node is down or if is in 'not running' state
>> if (storescount == 0 || !state.isRunning()) return "KO";
>> return "OK";
>> }
>> ```
>>
>> I have created the topics with 4 partitions :
>> `kafka-topics --create --topic events --zookeeper localhost:2181
>> --partitions 4 --replication-factor 1`
>> `kafka-topics --create --topic library --zookeeper localhost:2181
>> --partitions 4 --replication-factor 1`
>>
>> What I had expected was the healthcheck returning an error whenever the
>> broker is shut down, which is not the case.
>>
>> When I check the application status using the following
>> curl -XGET http://localhost:4567/healthcheck
>> The server always returns a SUCCESS response, even if the kafka cluster is
>> down.
>>
>> You will find below the different tests cases I've done.
>>
>> 1/ The Stream state is not changed after shutting down the kafka cluster
>> - start kafka
>> `cd docker && docker-compose up -d`
>>
>> - start producer
>> `sbt runMain com.example.streams.Producer`
>>
>> - start streams and http server
>> `sbt runMain com.example.streams.Producer`
>>
>> - healthcheck
>> `curl -XGET http://localhost:4567/healthcheck`
>> <http://localhost:4567/healthcheck>
>> => response = {"status": "SUCCESS"}
>> - shutdown kafka : docker-compose stop
>>
>> - healthcheck
>> `curl -XGET http://localhost:4567/healthcheck`
>> <http://localhost:4567/healthcheck>
>> => response = {"status": "SUCCESS"} while the expected one should be
>> {"status": "ERROR"}
>>
>>
>> 2/ Sometimes, I also encounter this behaviour, no data seems to be
>> available when querying the stores
>> - Start kafka
>> - Start Producer
>> - Start Streams and http Server
>>
>> - Request data : curl -XGET http://localhost:4567/titles
>>   This http request calls a service which in turn queries the keyvalue
>> store
>> => received response
>> ```
>> {
>>     "data": [
>>         {
>>             "key": 1,
>>             "value": "Fresh Fruit For Rotting Vegetables"
>>         },
>>
>>         ...
>>
>>         {
>>             "key": 10,
>>             "value": "Fear Of A Black Planet"
>>         }
>>     ],
>>     "status": "SUCCESS"
>> }
>> ```
>>
>> - Request data : curl -XGET http://localhost:4567/titles/counts
>> => received response
>> ```
>> {
>> "data": [
>> {
>> "key": "fear of a black planet",
>> "value": 414
>> },
>> ...
>> {
>> "key": "curtain call - the hits",
>> "value": 431
>> }
>> ],
>> "status": "SUCCESS"
>> }
>> ```
>>
>> - shutdown kafka
>>
>> - Request data : curl -XGET http://localhost:4567/titles
>> => received response, same as before, which seems to be ok as we are
>> querying the local store
>> ```
>> {
>>     "data": [
>>         {
>>             "key": 1,
>>             "value": "Fresh Fruit For Rotting Vegetables"
>>         },
>>
>>         ...
>>
>>         {
>>             "key": 10,
>>             "value": "Fear Of A Black Planet"
>>         }
>>     ],
>>     "status": "SUCCESS"
>> }
>> ```
>> - Request data : curl -XGET http://localhost:4567/titles/counts
>> => received response, still understandable
>> ```
>> {
>> "data": [
>> {
>> "key": "fear of a black planet",
>> "value": 414
>> },
>> ...
>> {
>> "key": "curtain call - the hits",
>> "value": 431
>> }
>> ],
>> "status": "SUCCESS"
>> }
>> ```
>>
>> - restart kafka
>>
>> - Request data : curl -XGET http://localhost:4567/titles
>> => received response
>> ```
>> {
>>     "data": [],
>>     "status": "SUCCESS"
>> }
>> ```
>>
>> - Request data : curl -XGET http://localhost:4567/titles/counts
>> => same here, received response
>> ```
>> {
>>     "data": [],
>>     "status": "SUCCESS"
>> }
>> ```
>>
>> I also see this entry in the Streams application logs
>> ```[error]
>> (kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1)
>> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
>> caught when producing
>> org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
>> caught when producing
>> at
>> org.apache.kafka.streams.processor.internals.RecordCollector
>> Impl.checkForException(RecordCollectorImpl.java:136)
>> at
>> org.apache.kafka.streams.processor.internals.RecordCollector
>> Impl.flush(RecordCollectorImpl.java:144)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.
>> flushState(StreamTask.java:283)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask$1.
>> run(StreamTask.java:264)
>> at
>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.
>> commitImpl(StreamTask.java:259)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.
>> commit(StreamTask.java:253)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> commitOne(StreamThread.java:815)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> access$2800(StreamThread.java:73)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$2.
>> apply(StreamThread.java:797)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.pe
>> rformOnStreamTasks(StreamThread.java:1448)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> commitAll(StreamThread.java:789)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> maybeCommit(StreamThread.java:778)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(StreamThread.java:567)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> run(StreamThread.java:527)
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
>> record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms
>> has
>> passed since batch creation plus linger time
>> [trace] Stack trace suppressed: run last compile:runMain for the full
>> output.
>> the state store, events-counts, may have migrated to another instance.```
>>
>> Even if a rebalance has occurred after having restarted my cluster, as I
>> have only one consumer, I thought it should still see all partitions, so
>> the store should remain available.
>> What am I missing here ?
>>
>> Thank you for your answers.
>>
>> --
>> Regards,
>> Patrice
>>
>
>

Reply via email to