[ https://issues.apache.org/jira/browse/KAFKA-5154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993902#comment-15993902 ]
Lukas Gemela edited comment on KAFKA-5154 at 5/2/17 10:33 PM: -------------------------------------------------------------- Hi, we have a kafka streams app consuming from one topic, aggregating data on another internal topic (by using in-memory logged store) and then outputting them to out topic. Input topic has I believe 40 partitions and therefore internal topic has 40 partitions as well. We are running 8 parallel nodes of this app at all times. We are using low level kafka streams API. None of our code is blocking nor asynchronous. I'm not really sure what caused the rebalance as all logs from all nodes are lost now, I've just seen couple of illegalStateExceptions on another nodes, coming from kafka streams stack, but I'm not even sure if they were thrown around the same time as NPE happened on this node. Please also note that in our exception handler we try to stop kafka streams app with our custom timeout and schedule a spring task to start it again. As far I can tell calling boolean isClosed = streams.close(10, TimeUnit.SECONDS) never actually succeed and we always get isClosed = false. Hope it helps! Lukas was (Author: lukas gemela): Hi, we have a kafka streams app consuming from one topic, aggregating data on another internal topic (by using in-memory logged store) and then outputting them to out topic. Input topic has I believe 40 partitions and therefore internal topic has 40 partitions as well. We are running 8 parallel nodes of this app at all times. We are using low level kafka streams API. None of our code is blocking nor asynchronous. I'm not really sure what caused the rebalance as all logs from all nodes are lost now, I've just seen couple of illegalStateExceptions on another nodes, coming from kafka streams stack, but I'm not even sure if they were thrown around the same time as NPE happened on this node. Hope it helps! > Kafka Streams throws NPE during rebalance > ----------------------------------------- > > Key: KAFKA-5154 > URL: https://issues.apache.org/jira/browse/KAFKA-5154 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Lukas Gemela > > please see attached log, Kafka streams throws NullPointerException during > rebalance, which is caught by our custom exception handler > {noformat} > 2017-04-30T17:44:17,675 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T17:44:27,395 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T17:44:27,941 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-27, > poseidonIncidentFeed-29, poseidonIncidentFeed-30, poseidonIncidentFeed-18] > for group hades > 2017-04-30T17:44:27,947 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:48,468 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:44:53,628 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:09,587 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-04-30T17:45:11,961 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @375 - Successfully joined group hades with generation 99 > 2017-04-30T17:45:13,126 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete() > @252 - Setting newly assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T17:46:37,254 INFO kafka-coordinator-heartbeat-thread | hades > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-04-30T18:04:25,993 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-04-30T18:04:29,401 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare() > @393 - Revoking previously assigned partitions [poseidonIncidentFeed-11, > poseidonIncidentFeed-27, poseidonIncidentFeed-25, poseidonIncidentFeed-29, > poseidonIncidentFeed-19, poseidonIncidentFeed-18] for group hades > 2017-04-30T18:05:10,877 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendJoinGroupRequest() > @407 - (Re-)joining group hades > 2017-05-01T00:01:55,707 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead() > @618 - Marking the coordinator 10.210.200.144:9092 (id: 2147483644 rack: > null) dead for group hades > 2017-05-01T00:01:59,027 INFO StreamThread-1 > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onSuccess() > @573 - Discovered coordinator 10.210.200.144:9092 (id: 2147483644 rack: null) > for group hades. > 2017-05-01T00:01:59,031 ERROR StreamThread-1 > org.apache.kafka.streams.processor.internals.StreamThread.run() @376 - > stream-thread [StreamThread-1] Streams application error during processing: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > [kafka-streams-0.10.2.0.jar!/:?] > 2017-05-01T00:02:00,038 INFO StreamThread-1 > org.apache.kafka.clients.producer.KafkaProducer.close() @689 - Closing the > Kafka producer with timeoutMillis = 9223372036854775807 ms. > 2017-05-01T00:02:00,949 WARN StreamThread-1 > org.apache.kafka.streams.processor.internals.StreamThread.setState() @160 - > Unexpected state transition from PARTITIONS_REVOKED to NOT_RUNNING > 2017-05-01T00:02:00,951 ERROR StreamThread-1 > com.williamhill.trading.platform.hades.kafka.KafkaStreamManager.uncaughtException() > @104 - UncaughtException in thread StreamThread-1, stopping kafka streams > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:619) > ~[kafka-streams-0.10.2.0.jar!/:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > ~[kafka-streams-0.10.2.0.jar!/:?] > 2017-05-01T00:02:01,076 WARN kafka-streams-close-thread > org.apache.kafka.streams.processor.internals.StreamThread.setState() @160 - > Unexpected state transition from NOT_RUNNING to PENDING_SHUTDOWN > {noformat} -- This message was sent by Atlassian JIRA (v6.3.15#6346)