We've tried running a fresh version with yesterday morning's trunk version, with the same result. We're running +- 15 KafkaStreams instances, and the one that fails is ithe biggest one, with >150 processors. We haven't been able to reproduce this error with smaller sub-sets.
I'm now going to try this with the Kafka 1.1.0 release version. regards, Frank On Tue, Jun 19, 2018 at 1:18 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Frank, > > Your OneToManyGroupedProcessor.java looks fine to me. > > Is it consistently re-producible? What if you restart from fresh using the > trunk version? > > > Guozhang > > On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <flya...@gmail.com> wrote: > > > Yes, here it is: > > > > https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1 > > > > It ran completely fine for the last year (and still does), it just does > not > > seem to enjoy the upgrade of Kafka Streams. > > > > regards, Frank > > > > On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yuzhih...@gmail.com> wrote: > > > > > Can you show the related code from OneToManyGroupedProcessor ? > > > > > > Thanks > > > > > > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <flya...@gmail.com> > wrote: > > > > > > > Hi, I've upgraded our 0.11 based stream application to the trunk > > version, > > > > and I get an intermittent NPE. It's is quite a big topology, and I > > > haven't > > > > succeeded in reproducing it on a simpler topology. > > > > It builds the topology, starts Kafka Streams, runs for about 20s., > and > > > then > > > > it terminates > > > > It seems that the 'currentNode' in the ProcessorContext is null. > > > > > > > > Does this ring a bell for anyone? > > > > > > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b- > > > > 4f17-a684-995320fd426d-StreamThread-12] > > > > ERROR > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > > - > > > > stream-thread > > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b- > > > > 4f17-a684-995320fd426d-StreamThread-12] > > > > Failed to process stream task 0_0 due to the following error: > > > > java.lang.NullPointerException > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:114) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:90) > > > > at > > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > > > > forwardMessage(OneToManyGroupedProcessor.java:125) > > > > at > > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > > > > forwardJoin(OneToManyGroupedProcessor.java:101) > > > > at > > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > > > > process(OneToManyGroupedProcessor.java:70) > > > > at > > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor. > > > > process(OneToManyGroupedProcessor.java:1) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > > > > ProcessorNode.java:50) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode. > > > > runAndMeasureLatency(ProcessorNode.java:244) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > > > ProcessorNode.java:133) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:143) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:126) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:90) > > > > at > > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor. > > > > process(PreJoinProcessor.java:25) > > > > at > > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor. > > > > process(PreJoinProcessor.java:1) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > > > > ProcessorNode.java:50) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode. > > > > runAndMeasureLatency(ProcessorNode.java:244) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > > > ProcessorNode.java:133) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:143) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:126) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:90) > > > > at > > > > com.dexels.kafka.streams.remotejoin.StoreProcessor. > > > > process(StoreProcessor.java:48) > > > > at > > > > com.dexels.kafka.streams.remotejoin.StoreProcessor. > > > > process(StoreProcessor.java:1) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > > > > ProcessorNode.java:50) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode. > > > > runAndMeasureLatency(ProcessorNode.java:244) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > > > ProcessorNode.java:133) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:143) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:126) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:90) > > > > at > > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process( > > > > XmlTransformerProcessor.java:52) > > > > at > > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process( > > > > XmlTransformerProcessor.java:1) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run( > > > > ProcessorNode.java:50) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode. > > > > runAndMeasureLatency(ProcessorNode.java:244) > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > > > ProcessorNode.java:133) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:143) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:126) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward( > > > > ProcessorContextImpl.java:90) > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > > SourceNode.process(SourceNode.java:87) > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > > StreamTask.process(StreamTask.java:288) > > > > at > > > > > > > org.apache.kafka.streams.processor.internals. > > AssignedStreamsTasks.process( > > > > AssignedStreamsTasks.java:94) > > > > at > > > > org.apache.kafka.streams.processor.internals.TaskManager.process( > > > > TaskManager.java:409) > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread. > > > > processAndMaybeCommit(StreamThread.java:952) > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > > > > StreamThread.java:827) > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > > > StreamThread.java:767) > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > > StreamThread.run(StreamThread.java:736) > > > > > > > > > > > > > -- > -- Guozhang >