Hi All, We some time get errors like this: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:600) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:498) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1104) ~[kafka-clients-0.10.1.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:289) ~[kafka-streams-0.10.1.1.jar:na]
And our streams application crash. Now this error is straightforward. We somehow need to look where our process is spending that much time. Then we can either fix the process time to stay within the defaults or change the defaults. Since we are running High-Level Streams DSL is there anyway to know where we would be spending that much time. Our topology is like this input .groupByKey() .aggregate(new Initializer<SortedSet<T>>() { public SortedSet<T> apply() { return new TreeSet<T>(); } }, new Aggregator<String, T, SortedSet<T>>() { public SortedSet<T> apply(String key, T value, SortedSet<T> aggregate) { aggregate.add(value); return aggregate; } }, TimeWindows.of(...).advanceBy(...).until(...), valueSerde, "key-table") .mapValues(new ValueMapper<SortedSet<T>, SortedSet<U>>() { public SortedSet<U> apply(SortedSet<T> t) { SortedSet<U> u = new TreeSet<U>(); ... return u; } }) .foreach(new ForeachAction<Windowed<String>, SortedSet<U>>() { public void apply(Windowed<String> key, SortedSet<U> nodes) { ... } }); So where and how can I add some logging to know what is the time spent between each poll requests and how that time is divided among various steps in this topology. Thanks Sachin