[ 
https://issues.apache.org/jira/browse/KAFKA-5226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025404#comment-16025404
 ] 

Bill Bejeck commented on KAFKA-5226:
------------------------------------

I've been able to reproduce the problem.  The issue is not with the 
`SourceNodeRecordDeserializer`, that's just the symptom.  

My steps to reproduce:
   1. Start 3 instances of a simple streams application with regex subscription.
   2. Add topics with names matching the pattern.
  
It doesn't always happen, so I needed to make a few attempts.

The issue is sometimes when a topic is added (while a streams app with regex 
subscription is running) the new topic does not get passed into the 
`StreamPartitionAssignor.subscribe` method, but it will show up during a 
rebalance.    In the `StreamPartitionAssignor.subscribe` method we update the 
topology with any topics matching the subscription regex.  When we go to create 
the stream task for the new assignment we don't have corresponding source node 
for the topic, causing the error.   

Here's the relevant log lines (logs from all 3 applications attached) from me 
reproducing the error by adding a topic named `foo.Trigger`.  The 
`StreamPartitionAssignor` only gets passed the topics `us.Trigger` and 
`demo.Trigger`.  But during the rebalance the following are assignments are 
passed to the `ConsumerRebalanceListener.onPartitionsAssigned` method 
`us.Trigger`, `demo.Trigger` and `foo.Trigger`

[2017-05-24 15:30:14,367] DEBUG stream-thread 
[reballancing-test-df1e3cfa-5620-4a81-89bf-88ed14115321-StreamThread-1] found 
[us.Trigger, demo.Trigger] topics possibly matching regex 
(org.apache.kafka.streams.processor.internals.StreamPartitionAssignor:258)
....
[2017-05-24 15:30:14,474] INFO stream-thread 
[reballancing-test-df1e3cfa-5620-4a81-89bf-88ed14115321-StreamThread-1] at 
state PARTITIONS_REVOKED: new partitions [foo.Trigger-1, us.Trigger-1, 
demo.Trigger-1] assigned at the end of consumer rebalance.
.....
2017-05-24 15:30:14,478] INFO stream-thread 
[reballancing-test-df1e3cfa-5620-4a81-89bf-88ed14115321-StreamThread-1] Created 
active task 0_1 with assigned partitions [foo.Trigger-1, us.Trigger-1, 
demo.Trigger-1] (org.apache.kafka.streams.processor.internals.StreamThread:1240)



> NullPointerException (NPE) in SourceNodeRecordDeserializer.deserialize
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-5226
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5226
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>         Environment: 64-bit Amazon Linux, JDK8
>            Reporter: Ian Springer
>            Assignee: Bill Bejeck
>         Attachments: kafka.log, streamsAppOne.log, streamsAppThree.log, 
> streamsAppTwo.log
>
>
> I saw the following NPE in our Kafka Streams app, which has 3 nodes running 
> on 3 separate machines.. Out of hundreds of messages processed, the NPE only 
> occurred twice. I are not sure of the cause, so I am unable to reproduce it. 
> I'm hoping the Kafka Streams team can guess the cause based on the stack 
> trace. If I can provide any additional details about our app, please let me 
> know.
>  
> {code}
> INFO  2017-05-10 02:58:26,021 org.apache.kafka.common.utils.AppInfoParser  
> Kafka version : 0.10.2.1
> INFO  2017-05-10 02:58:26,021 org.apache.kafka.common.utils.AppInfoParser  
> Kafka commitId : e89bffd6b2eff799
> INFO  2017-05-10 02:58:26,031 o.s.context.support.DefaultLifecycleProcessor  
> Starting beans in phase 0
> INFO  2017-05-10 02:58:26,075 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from CREATED to RUNNING.
> INFO  2017-05-10 02:58:26,075 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] Started 
> Kafka Stream process
> INFO  2017-05-10 02:58:26,086 o.a.k.c.consumer.internals.AbstractCoordinator  
> Discovered coordinator p1kaf1.prod.apptegic.com:9092 (id: 2147482646 rack: 
> null) for group evergage-app.
> INFO  2017-05-10 02:58:26,126 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Revoking previously assigned partitions [] for group evergage-app
> INFO  2017-05-10 02:58:26,126 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from RUNNING to REBALANCING.
> INFO  2017-05-10 02:58:26,127 o.a.k.c.consumer.internals.AbstractCoordinator  
> (Re-)joining group evergage-app
> INFO  2017-05-10 02:58:27,712 o.a.k.c.consumer.internals.AbstractCoordinator  
> Successfully joined group evergage-app with generation 18
> INFO  2017-05-10 02:58:27,716 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Setting newly assigned partitions [us.app.Trigger-0] for group evergage-app
> INFO  2017-05-10 02:58:27,716 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to REBALANCING.
> INFO  2017-05-10 02:58:27,729 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> state stores
> INFO  2017-05-10 02:58:27,731 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> processor nodes of the topology
> INFO  2017-05-10 02:58:27,742 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to RUNNING.
> [14 hours pass...]
> INFO  2017-05-10 16:21:27,476 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Revoking previously assigned partitions [us.app.Trigger-0] for group 
> evergage-app
> INFO  2017-05-10 16:21:27,477 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from RUNNING to REBALANCING.
> INFO  2017-05-10 16:21:27,482 o.a.k.c.consumer.internals.AbstractCoordinator  
> (Re-)joining group evergage-app
> INFO  2017-05-10 16:21:27,489 o.a.k.c.consumer.internals.AbstractCoordinator  
> Successfully joined group evergage-app with generation 19
> INFO  2017-05-10 16:21:27,489 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Setting newly assigned partitions [us.app.Trigger-0] for group evergage-app
> INFO  2017-05-10 16:21:27,489 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to REBALANCING.
> INFO  2017-05-10 16:21:27,489 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> processor nodes of the topology
> INFO  2017-05-10 16:21:27,493 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to RUNNING.
> INFO  2017-05-10 16:21:30,584 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Revoking previously assigned partitions [us.app.Trigger-0] for group 
> evergage-app
> INFO  2017-05-10 16:21:30,584 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from RUNNING to REBALANCING.
> INFO  2017-05-10 16:21:30,588 o.a.k.c.consumer.internals.AbstractCoordinator  
> (Re-)joining group evergage-app
> INFO  2017-05-10 16:21:30,593 o.a.k.c.consumer.internals.AbstractCoordinator  
> Successfully joined group evergage-app with generation 20
> INFO  2017-05-10 16:21:30,594 o.a.k.c.consumer.internals.ConsumerCoordinator  
> Setting newly assigned partitions [demo.retail.Trigger-0, us.app.Trigger-0] 
> for group evergage-app
> INFO  2017-05-10 16:21:30,594 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to REBALANCING.
> INFO  2017-05-10 16:21:30,595 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> state stores
> INFO  2017-05-10 16:21:30,596 
> o.a.kafka.streams.processor.internals.StreamTask  task [0_0] Initializing 
> processor nodes of the topology
> INFO  2017-05-10 16:21:30,602 org.apache.kafka.streams.KafkaStreams  
> stream-client [evergage-app-bd9c9868-4b9b-4d2e-850f-9b5bec1fc0a9] State 
> transition from REBALANCING to RUNNING.
> INFO  2017-05-10 16:21:30,698 org.apache.kafka.clients.producer.KafkaProducer 
>  Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> WARN  2017-05-10 16:21:30,703 
> o.a.kafka.streams.processor.internals.StreamThread  stream-thread 
> [StreamThread-1] Unexpected state transition from RUNNING to NOT_RUNNING.
> ERROR 2017-05-10 16:21:30,704 
> c.a.a.web.server.UncaughtExceptionLoggingListener  Uncaught exception in 
> thread [StreamThread-1]
> org.apache.kafka.streams.errors.StreamsException: Failed to deserialize key 
> for record. topic=demo.retail.Trigger, partition=0, offset=0
>       at 
> org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:38)
>       at 
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
>       at 
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> Caused by: java.lang.NullPointerException: null
>       at 
> org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:36)
>       ... 5 common frames omitted
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to