[ https://issues.apache.org/jira/browse/KAFKA-5226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025404#comment-16025404 ]
Bill Bejeck commented on KAFKA-5226: ------------------------------------ I've been able to reproduce the problem. The issue is not with the `SourceNodeRecordDeserializer`, that's just the symptom. My steps to reproduce: 1. Start 3 instances of a simple streams application with regex subscription. 2. Add topics with names matching the pattern. It doesn't always happen, so I needed to make a few attempts. The issue is sometimes when a topic is added (while a streams app with regex subscription is running) the new topic does not get passed into the `StreamPartitionAssignor.subscribe` method, but it will show up during a rebalance. In the `StreamPartitionAssignor.subscribe` method we update the topology with any topics matching the subscription regex. When we go to create the stream task for the new assignment we don't have corresponding source node for the topic, causing the error. Here's the relevant log lines (logs from all 3 applications attached) from me reproducing the error by adding a topic named `foo.Trigger`. The `StreamPartitionAssignor` only gets passed the topics `us.Trigger` and `demo.Trigger`. But during the rebalance the following are assignments are passed to the `ConsumerRebalanceListener.onPartitionsAssigned` method `us.Trigger`, `demo.Trigger` and `foo.Trigger` [2017-05-24 15:30:14,367] DEBUG stream-thread [reballancing-test-df1e3cfa-5620-4a81-89bf-88ed14115321-StreamThread-1] found [us.Trigger, demo.Trigger] topics possibly matching regex (org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:258) .... [2017-05-24 15:30:14,474] INFO stream-thread [reballancing-test-df1e3cfa-5620-4a81-89bf-88ed14115321-StreamThread-1] at state PARTITIONS_REVOKED: new partitions [foo.Trigger-1, us.Trigger-1, demo.Trigger-1] assigned at the end of consumer rebalance. ..... 2017-05-24 15:30:14,478] INFO stream-thread [reballancing-test-df1e3cfa-5620-4a81-89bf-88ed14115321-StreamThread-1] Created active task 0_1 with assigned partitions [foo.Trigger-1, us.Trigger-1, demo.Trigger-1] (org.apache.kafka.streams.processor.internals.StreamThread:1240) > NullPointerException (NPE) in SourceNodeRecordDeserializer.deserialize > ---------------------------------------------------------------------- > > Key: KAFKA-5226 > URL: https://issues.apache.org/jira/browse/KAFKA-5226 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.1 > Environment: 64-bit Amazon Linux, JDK8 > Reporter: Ian Springer > Assignee: Bill Bejeck > Attachments: kafka.log, streamsAppOne.log, streamsAppThree.log, > streamsAppTwo.log > > > I saw the following NPE in our Kafka Streams app, which has 3 nodes running > on 3 separate machines.. Out of hundreds of messages processed, the NPE only > occurred twice. I are not sure of the cause, so I am unable to reproduce it. > I'm hoping the Kafka Streams team can guess the cause based on the stack > trace. If I can provide any additional details about our app, please let me > know. > > {code} > INFO 2017-05-10 02:58:26,021 org.apache.kafka.common.utils.AppInfoParser > Kafka version : 0.10.2.1 > INFO 2017-05-10 02:58:26,021 org.apache.kafka.common.utils.AppInfoParser > Kafka commitId : e89bffd6b2eff799 > INFO 2017-05-10 02:58:26,031 o.s.context.support.DefaultLifecycleProcessor > Starting beans in phase 0 > INFO 2017-05-10 02:58:26,075 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from CREATED to RUNNING. > INFO 2017-05-10 02:58:26,075 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] Started > Kafka Stream process > INFO 2017-05-10 02:58:26,086 o.a.k.c.consumer.internals.AbstractCoordinator > Discovered coordinator p1kaf1.prod.apptegic.com:9092 (id: 2147482646 rack: > null) for group evergage-app. > INFO 2017-05-10 02:58:26,126 o.a.k.c.consumer.internals.ConsumerCoordinator > Revoking previously assigned partitions [] for group evergage-app > INFO 2017-05-10 02:58:26,126 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from RUNNING to REBALANCING. > INFO 2017-05-10 02:58:26,127 o.a.k.c.consumer.internals.AbstractCoordinator > (Re-)joining group evergage-app > INFO 2017-05-10 02:58:27,712 o.a.k.c.consumer.internals.AbstractCoordinator > Successfully joined group evergage-app with generation 18 > INFO 2017-05-10 02:58:27,716 o.a.k.c.consumer.internals.ConsumerCoordinator > Setting newly assigned partitions [us.app.Trigger-0] for group evergage-app > INFO 2017-05-10 02:58:27,716 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from REBALANCING to REBALANCING. > INFO 2017-05-10 02:58:27,729 > o.a.kafka.streams.processor.internals.StreamTask task [0_0] Initializing > state stores > INFO 2017-05-10 02:58:27,731 > o.a.kafka.streams.processor.internals.StreamTask task [0_0] Initializing > processor nodes of the topology > INFO 2017-05-10 02:58:27,742 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from REBALANCING to RUNNING. > [14 hours pass...] > INFO 2017-05-10 16:21:27,476 o.a.k.c.consumer.internals.ConsumerCoordinator > Revoking previously assigned partitions [us.app.Trigger-0] for group > evergage-app > INFO 2017-05-10 16:21:27,477 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from RUNNING to REBALANCING. > INFO 2017-05-10 16:21:27,482 o.a.k.c.consumer.internals.AbstractCoordinator > (Re-)joining group evergage-app > INFO 2017-05-10 16:21:27,489 o.a.k.c.consumer.internals.AbstractCoordinator > Successfully joined group evergage-app with generation 19 > INFO 2017-05-10 16:21:27,489 o.a.k.c.consumer.internals.ConsumerCoordinator > Setting newly assigned partitions [us.app.Trigger-0] for group evergage-app > INFO 2017-05-10 16:21:27,489 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from REBALANCING to REBALANCING. > INFO 2017-05-10 16:21:27,489 > o.a.kafka.streams.processor.internals.StreamTask task [0_0] Initializing > processor nodes of the topology > INFO 2017-05-10 16:21:27,493 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from REBALANCING to RUNNING. > INFO 2017-05-10 16:21:30,584 o.a.k.c.consumer.internals.ConsumerCoordinator > Revoking previously assigned partitions [us.app.Trigger-0] for group > evergage-app > INFO 2017-05-10 16:21:30,584 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from RUNNING to REBALANCING. > INFO 2017-05-10 16:21:30,588 o.a.k.c.consumer.internals.AbstractCoordinator > (Re-)joining group evergage-app > INFO 2017-05-10 16:21:30,593 o.a.k.c.consumer.internals.AbstractCoordinator > Successfully joined group evergage-app with generation 20 > INFO 2017-05-10 16:21:30,594 o.a.k.c.consumer.internals.ConsumerCoordinator > Setting newly assigned partitions [demo.retail.Trigger-0, us.app.Trigger-0] > for group evergage-app > INFO 2017-05-10 16:21:30,594 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from REBALANCING to REBALANCING. > INFO 2017-05-10 16:21:30,595 > o.a.kafka.streams.processor.internals.StreamTask task [0_0] Initializing > state stores > INFO 2017-05-10 16:21:30,596 > o.a.kafka.streams.processor.internals.StreamTask task [0_0] Initializing > processor nodes of the topology > INFO 2017-05-10 16:21:30,602 org.apache.kafka.streams.KafkaStreams > stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State > transition from REBALANCING to RUNNING. > INFO 2017-05-10 16:21:30,698 org.apache.kafka.clients.producer.KafkaProducer > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > WARN 2017-05-10 16:21:30,703 > o.a.kafka.streams.processor.internals.StreamThread stream-thread > [StreamThread-1] Unexpected state transition from RUNNING to NOT_RUNNING. > ERROR 2017-05-10 16:21:30,704 > c.a.a.web.server.UncaughtExceptionLoggingListener Uncaught exception in > thread [StreamThread-1] > org.apache.kafka.streams.errors.StreamsException: Failed to deserialize key > for record. topic=demo.retail.Trigger, partition=0, offset=0 > at > org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38) > at > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85) > at > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) > at > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > Caused by: java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:36) > ... 5 common frames omitted > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)