Hi All,
We some time get errors like this:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.
    at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600)
~[kafka-clients-0.10.1.1.jar:na]
    at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498)
~[kafka-clients-0.10.1.1.jar:na]
    at
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104)
~[kafka-clients-0.10.1.1.jar:na]
    at
org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:289)
~[kafka-streams-0.10.1.1.jar:na]

And our streams application crash.

Now this error is straightforward. We somehow need to look where our
process is spending that much time. Then we can either fix the process time
to stay within the defaults or change the defaults.
Since we are running High-Level Streams DSL is there anyway to know where
we would be spending that much time.

Our topology is like this
input
.groupByKey()
.aggregate(new Initializer<SortedSet<T>>() {
    public SortedSet<T> apply() {
        return new TreeSet<T>();
    }
}, new Aggregator<String, T, SortedSet<T>>() {
    public SortedSet<T> apply(String key, T value, SortedSet<T> aggregate) {
        aggregate.add(value);
        return aggregate;
    }
}, TimeWindows.of(...).advanceBy(...).until(...), valueSerde, "key-table")
.mapValues(new ValueMapper<SortedSet<T>, SortedSet<U>>() {
    public SortedSet<U> apply(SortedSet<T> t) {
        SortedSet<U> u = new TreeSet<U>();
        ...
        return u;
    }
})
.foreach(new ForeachAction<Windowed<String>, SortedSet<U>>() {
    public void apply(Windowed<String> key, SortedSet<U> nodes) {
        ...
    }
});

So where and how can I add some logging to know what is the time spent
between each poll requests and how that time is divided among various steps
in this topology.

Thanks
Sachin

Reply via email to