Hello Frank,

Your OneToManyGroupedProcessor.java looks fine to me.

Is it consistently re-producible? What if you restart from fresh using the
trunk version?


Guozhang

On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <flya...@gmail.com> wrote:

> Yes, here it is:
>
> https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1
>
> It ran completely fine for the last year (and still does), it just does not
> seem to enjoy the upgrade of Kafka Streams.
>
> regards, Frank
>
> On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yuzhih...@gmail.com> wrote:
>
> > Can you show the related code from OneToManyGroupedProcessor ?
> >
> > Thanks
> >
> > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <flya...@gmail.com> wrote:
> >
> > > Hi, I've upgraded our 0.11 based stream application to the trunk
> version,
> > > and I get an intermittent NPE. It's is quite a big topology, and I
> > haven't
> > > succeeded in reproducing it on a simpler topology.
> > > It builds the topology, starts Kafka Streams, runs for about 20s., and
> > then
> > > it terminates
> > > It seems that the 'currentNode' in the ProcessorContext is null.
> > >
> > > Does this ring a bell for anyone?
> > >
> > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > > 4f17-a684-995320fd426d-StreamThread-12]
> > > ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> -
> > > stream-thread
> > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > > 4f17-a684-995320fd426d-StreamThread-12]
> > > Failed to process stream task 0_0 due to the following error:
> > > java.lang.NullPointerException
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:114)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > forwardMessage(OneToManyGroupedProcessor.java:125)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > forwardJoin(OneToManyGroupedProcessor.java:101)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > process(OneToManyGroupedProcessor.java:70)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > process(OneToManyGroupedProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > > process(PreJoinProcessor.java:25)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > > process(PreJoinProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > > process(StoreProcessor.java:48)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > > process(StoreProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > > XmlTransformerProcessor.java:52)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > > XmlTransformerProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > org.apache.kafka.streams.processor.internals.
> > > SourceNode.process(SourceNode.java:87)
> > >     at
> > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.process(StreamTask.java:288)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> AssignedStreamsTasks.process(
> > > AssignedStreamsTasks.java:94)
> > >     at
> > > org.apache.kafka.streams.processor.internals.TaskManager.process(
> > > TaskManager.java:409)
> > >     at
> > > org.apache.kafka.streams.processor.internals.StreamThread.
> > > processAndMaybeCommit(StreamThread.java:952)
> > >     at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > > StreamThread.java:827)
> > >     at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:767)
> > >     at
> > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:736)
> > >
> >
>



-- 
-- Guozhang

Reply via email to