Yes, here it is:

https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1

It ran completely fine for the last year (and still does), it just does not
seem to enjoy the upgrade of Kafka Streams.

regards, Frank

On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <[email protected]> wrote:

> Can you show the related code from OneToManyGroupedProcessor ?
>
> Thanks
>
> On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <[email protected]> wrote:
>
> > Hi, I've upgraded our 0.11 based stream application to the trunk version,
> > and I get an intermittent NPE. It's is quite a big topology, and I
> haven't
> > succeeded in reproducing it on a simpler topology.
> > It builds the topology, starts Kafka Streams, runs for about 20s., and
> then
> > it terminates
> > It seems that the 'currentNode' in the ProcessorContext is null.
> >
> > Does this ring a bell for anyone?
> >
> > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > 4f17-a684-995320fd426d-StreamThread-12]
> > ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
> > stream-thread
> > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > 4f17-a684-995320fd426d-StreamThread-12]
> > Failed to process stream task 0_0 due to the following error:
> > java.lang.NullPointerException
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:114)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > forwardMessage(OneToManyGroupedProcessor.java:125)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > forwardJoin(OneToManyGroupedProcessor.java:101)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > process(OneToManyGroupedProcessor.java:70)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > process(OneToManyGroupedProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > process(PreJoinProcessor.java:25)
> >     at
> > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > process(PreJoinProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > process(StoreProcessor.java:48)
> >     at
> > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > process(StoreProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > XmlTransformerProcessor.java:52)
> >     at
> > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > XmlTransformerProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:87)
> >     at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:288)
> >     at
> >
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(
> > AssignedStreamsTasks.java:94)
> >     at
> > org.apache.kafka.streams.processor.internals.TaskManager.process(
> > TaskManager.java:409)
> >     at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndMaybeCommit(StreamThread.java:952)
> >     at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:827)
> >     at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:767)
> >     at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:736)
> >
>

Reply via email to