I didn't get much further. When I run with the 1.1.0 release version the
stacktrace looks slightly different, but still a very similar NPE, after
the same amount of time.
One observation is that I use a few different processors, and it seems
random which one gets caught in the stack trace.

I've put the description of the topology in another gist:

https://gist.github.com/flyaruu/39fba78aec562ae5d6f11d3add6a0881

I've captured the last second or so before the NPE at debug level here:

https://gist.github.com/flyaruu/43d31de10e03af160b20e9534f13830e

I've increased the heap size (in case of a silent OOM exception), doesn't
seem to matter

I'm kinda out of ideas.



On Tue, Jun 19, 2018 at 11:02 AM Frank Lyaruu <flya...@gmail.com> wrote:

> We've tried running a fresh version with yesterday morning's trunk
> version, with the same result.
> We're running +- 15 KafkaStreams instances, and the one that fails is ithe
> biggest one, with >150 processors.
> We haven't been able to reproduce this error with smaller sub-sets.
>
> I'm now going to try this with the Kafka 1.1.0 release version.
>
> regards, Frank
>
> On Tue, Jun 19, 2018 at 1:18 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Frank,
>>
>> Your OneToManyGroupedProcessor.java looks fine to me.
>>
>> Is it consistently re-producible? What if you restart from fresh using the
>> trunk version?
>>
>>
>> Guozhang
>>
>> On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <flya...@gmail.com> wrote:
>>
>> > Yes, here it is:
>> >
>> > https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1
>> >
>> > It ran completely fine for the last year (and still does), it just does
>> not
>> > seem to enjoy the upgrade of Kafka Streams.
>> >
>> > regards, Frank
>> >
>> > On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yuzhih...@gmail.com> wrote:
>> >
>> > > Can you show the related code from OneToManyGroupedProcessor ?
>> > >
>> > > Thanks
>> > >
>> > > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <flya...@gmail.com>
>> wrote:
>> > >
>> > > > Hi, I've upgraded our 0.11 based stream application to the trunk
>> > version,
>> > > > and I get an intermittent NPE. It's is quite a big topology, and I
>> > > haven't
>> > > > succeeded in reproducing it on a simpler topology.
>> > > > It builds the topology, starts Kafka Streams, runs for about 20s.,
>> and
>> > > then
>> > > > it terminates
>> > > > It seems that the 'currentNode' in the ProcessorContext is null.
>> > > >
>> > > > Does this ring a bell for anyone?
>> > > >
>> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
>> > > > 4f17-a684-995320fd426d-StreamThread-12]
>> > > > ERROR
>> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
>> > -
>> > > > stream-thread
>> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
>> > > > 4f17-a684-995320fd426d-StreamThread-12]
>> > > > Failed to process stream task 0_0 due to the following error:
>> > > > java.lang.NullPointerException
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:114)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > forwardMessage(OneToManyGroupedProcessor.java:125)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > forwardJoin(OneToManyGroupedProcessor.java:101)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > process(OneToManyGroupedProcessor.java:70)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > process(OneToManyGroupedProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
>> > > > process(PreJoinProcessor.java:25)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
>> > > > process(PreJoinProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
>> > > > process(StoreProcessor.java:48)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
>> > > > process(StoreProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
>> > > > XmlTransformerProcessor.java:52)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
>> > > > XmlTransformerProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > > SourceNode.process(SourceNode.java:87)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamTask.process(StreamTask.java:288)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > AssignedStreamsTasks.process(
>> > > > AssignedStreamsTasks.java:94)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.TaskManager.process(
>> > > > TaskManager.java:409)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.
>> > > > processAndMaybeCommit(StreamThread.java:952)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
>> > > > StreamThread.java:827)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > > > StreamThread.java:767)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.run(StreamThread.java:736)
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to