Hello,

I have implemented a basic application which uses kafka streams stores and
interactive queries, available there :
https://github.com/pchalcol/kstreams-healthcheck

The healthcheck implementation is based on kafka streams metadata and the
stream state, as illustrated below :
```
String healthcheck() {
Collection<StreamsMetadata> stores = streams.allMetadata();
long storescount = stores.stream()
.filter(meta -> meta.host().contains("localhost") && meta.port() == 4567)
.count();

State state = streams.state();

System.out.println(String.format("Application State: (%d, %s)",
storescount, state.toString()));

// KO if current node is down or if is in 'not running' state
if (storescount == 0 || !state.isRunning()) return "KO";
return "OK";
}
```

I have created the topics with 4 partitions :
`kafka-topics --create --topic events --zookeeper localhost:2181
--partitions 4 --replication-factor 1`
`kafka-topics --create --topic library --zookeeper localhost:2181
--partitions 4 --replication-factor 1`

What I had expected was the healthcheck returning an error whenever the
broker is shut down, which is not the case.

When I check the application status using the following
curl -XGET http://localhost:4567/healthcheck
The server always returns a SUCCESS response, even if the kafka cluster is
down.

You will find below the different tests cases I've done.

1/ The Stream state is not changed after shutting down the kafka cluster
- start kafka
`cd docker && docker-compose up -d`

- start producer
`sbt runMain com.example.streams.Producer`

- start streams and http server
`sbt runMain com.example.streams.Producer`

- healthcheck
`curl -XGET http://localhost:4567/healthcheck`
=> response = {"status": "SUCCESS"}
- shutdown kafka : docker-compose stop

- healthcheck
`curl -XGET http://localhost:4567/healthcheck`
=> response = {"status": "SUCCESS"} while the expected one should be
{"status": "ERROR"}


2/ Sometimes, I also encounter this behaviour, no data seems to be
available when querying the stores
- Start kafka
- Start Producer
- Start Streams and http Server

- Request data : curl -XGET http://localhost:4567/titles
  This http request calls a service which in turn queries the keyvalue store
=> received response
```
{
    "data": [
        {
            "key": 1,
            "value": "Fresh Fruit For Rotting Vegetables"
        },

        ...

        {
            "key": 10,
            "value": "Fear Of A Black Planet"
        }
    ],
    "status": "SUCCESS"
}
```

- Request data : curl -XGET http://localhost:4567/titles/counts
=> received response
```
{
"data": [
{
"key": "fear of a black planet",
"value": 414
},
...
{
"key": "curtain call - the hits",
"value": 431
}
],
"status": "SUCCESS"
}
```

- shutdown kafka

- Request data : curl -XGET http://localhost:4567/titles
=> received response, same as before, which seems to be ok as we are
querying the local store
```
{
    "data": [
        {
            "key": 1,
            "value": "Fresh Fruit For Rotting Vegetables"
        },

        ...

        {
            "key": 10,
            "value": "Fear Of A Black Planet"
        }
    ],
    "status": "SUCCESS"
}
```
- Request data : curl -XGET http://localhost:4567/titles/counts
=> received response, still understandable
```
{
"data": [
{
"key": "fear of a black planet",
"value": 414
},
...
{
"key": "curtain call - the hits",
"value": 431
}
],
"status": "SUCCESS"
}
```

- restart kafka

- Request data : curl -XGET http://localhost:4567/titles
=> received response
```
{
    "data": [],
    "status": "SUCCESS"
}
```

- Request data : curl -XGET http://localhost:4567/titles/counts
=> same here, received response
```
{
    "data": [],
    "status": "SUCCESS"
}
```

I also see this entry in the Streams application logs
```[error]
(kafka-streams-test-bbc3ca50-57b7-434b-a55b-48ca855a7758-StreamThread-1)
org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
caught when producing
org.apache.kafka.streams.errors.StreamsException: task [1_0] exception
caught when producing
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:144)
at
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:283)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at
org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73)
at
org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797)
at
org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
record(s) for kafka-streams-test-events-counts-repartition-1: 30046 ms has
passed since batch creation plus linger time
[trace] Stack trace suppressed: run last compile:runMain for the full
output.
the state store, events-counts, may have migrated to another instance.```

Even if a rebalance has occurred after having restarted my cluster, as I
have only one consumer, I thought it should still see all partitions, so
the store should remain available.
What am I missing here ?

Thank you for your answers.

--
Regards,
Patrice

Reply via email to