I didn't get much further. When I run with the 1.1.0 release version the stacktrace looks slightly different, but still a very similar NPE, after the same amount of time. One observation is that I use a few different processors, and it seems random which one gets caught in the stack trace.
I've put the description of the topology in another gist: https://gist.github.com/flyaruu/39fba78aec562ae5d6f11d3add6a0881 I've captured the last second or so before the NPE at debug level here: https://gist.github.com/flyaruu/43d31de10e03af160b20e9534f13830e I've increased the heap size (in case of a silent OOM exception), doesn't seem to matter I'm kinda out of ideas. On Tue, Jun 19, 2018 at 11:02 AM Frank Lyaruu <flya...@gmail.com> wrote: > We've tried running a fresh version with yesterday morning's trunk > version, with the same result. > We're running +- 15 KafkaStreams instances, and the one that fails is ithe > biggest one, with >150 processors. > We haven't been able to reproduce this error with smaller sub-sets. > > I'm now going to try this with the Kafka 1.1.0 release version. > > regards, Frank > > On Tue, Jun 19, 2018 at 1:18 AM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Frank, >> >> Your OneToManyGroupedProcessor.java looks fine to me. >> >> Is it consistently re-producible? What if you restart from fresh using the >> trunk version? >> >> >> Guozhang >> >> On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <flya...@gmail.com> wrote: >> >> > Yes, here it is: >> > >> > https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1 >> > >> > It ran completely fine for the last year (and still does), it just does >> not >> > seem to enjoy the upgrade of Kafka Streams. >> > >> > regards, Frank >> > >> > On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yuzhih...@gmail.com> wrote: >> > >> > > Can you show the related code from OneToManyGroupedProcessor ? >> > > >> > > Thanks >> > > >> > > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <flya...@gmail.com> >> wrote: >> > > >> > > > Hi, I've upgraded our 0.11 based stream application to the trunk >> > version, >> > > > and I get an intermittent NPE. It's is quite a big topology, and I >> > > haven't >> > > > succeeded in reproducing it on a simpler topology. >> > > > It builds the topology, starts Kafka Streams, runs for about 20s., >> and >> > > then >> > > > it terminates >> > > > It seems that the 'currentNode' in the ProcessorContext is null. >> > > > >> > > > Does this ring a bell for anyone? >> > > > >> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b- >> > > > 4f17-a684-995320fd426d-StreamThread-12] >> > > > ERROR >> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks >> > - >> > > > stream-thread >> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b- >> > > > 4f17-a684-995320fd426d-StreamThread-12] >> > > > Failed to process stream task 0_0 due to the following error: >> > > > java.lang.NullPointerException >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:114) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:90) >> > > > at >> > > > >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. >> > > > forwardMessage(OneToManyGroupedProcessor.java:125) >> > > > at >> > > > >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. >> > > > forwardJoin(OneToManyGroupedProcessor.java:101) >> > > > at >> > > > >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. >> > > > process(OneToManyGroupedProcessor.java:70) >> > > > at >> > > > >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. >> > > > process(OneToManyGroupedProcessor.java:1) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >> > > > ProcessorNode.java:50) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode. >> > > > runAndMeasureLatency(ProcessorNode.java:244) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > > > ProcessorNode.java:133) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:143) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:126) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:90) >> > > > at >> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor. >> > > > process(PreJoinProcessor.java:25) >> > > > at >> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor. >> > > > process(PreJoinProcessor.java:1) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >> > > > ProcessorNode.java:50) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode. >> > > > runAndMeasureLatency(ProcessorNode.java:244) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > > > ProcessorNode.java:133) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:143) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:126) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:90) >> > > > at >> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor. >> > > > process(StoreProcessor.java:48) >> > > > at >> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor. >> > > > process(StoreProcessor.java:1) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >> > > > ProcessorNode.java:50) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode. >> > > > runAndMeasureLatency(ProcessorNode.java:244) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > > > ProcessorNode.java:133) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:143) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:126) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:90) >> > > > at >> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process( >> > > > XmlTransformerProcessor.java:52) >> > > > at >> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process( >> > > > XmlTransformerProcessor.java:1) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( >> > > > ProcessorNode.java:50) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode. >> > > > runAndMeasureLatency(ProcessorNode.java:244) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( >> > > > ProcessorNode.java:133) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:143) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:126) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.forward( >> > > > ProcessorContextImpl.java:90) >> > > > at >> > > > org.apache.kafka.streams.processor.internals. >> > > > SourceNode.process(SourceNode.java:87) >> > > > at >> > > > org.apache.kafka.streams.processor.internals. >> > > > StreamTask.process(StreamTask.java:288) >> > > > at >> > > > >> > > org.apache.kafka.streams.processor.internals. >> > AssignedStreamsTasks.process( >> > > > AssignedStreamsTasks.java:94) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.TaskManager.process( >> > > > TaskManager.java:409) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.StreamThread. >> > > > processAndMaybeCommit(StreamThread.java:952) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( >> > > > StreamThread.java:827) >> > > > at >> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> > > > StreamThread.java:767) >> > > > at >> > > > org.apache.kafka.streams.processor.internals. >> > > > StreamThread.run(StreamThread.java:736) >> > > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> >