Thanks Frank for reporting the bug, and many thanks to Damian for the quick
catch!

On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu <flya...@gmail.com> wrote:

> The issue seems to be gone. Amazing work, thanks...!
>
> On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi, i believe i found the problem. If possible could you please try with
> > this: https://github.com/dguy/kafka/tree/cache-bug
> >
> > Thanks,
> > Damian
> >
> > On Thu, 13 Oct 2016 at 17:46 Damian Guy <damian....@gmail.com> wrote:
> >
> > > Hi Frank,
> > >
> > > Thanks for reporting. Can you provide a sample of the join you are
> > > running?
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu <flya...@gmail.com> wrote:
> > >
> > > Hi Kafka people,
> > >
> > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka
> > > 0.10.1 release candidate.
> > >
> > > It runs ok for a few thousand of messages, and then it dies with the
> > > following exception:
> > >
> > > Exception in thread "StreamThread-1" java.lang.NullPointerException
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.NamedCache.
> > evict(NamedCache.java:194)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > maybeEvict(ThreadCache.java:190)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > put(ThreadCache.java:121)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > CachingKeyValueStore.java:147)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(
> > CachingKeyValueStore.java:134)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> > KTableAggregateValueGetter.get(KTableReduce.java:121)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$
> > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:83)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:83)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableFilter$
> > KTableFilterProcessor.process(KTableFilter.java:73)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$
> > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$
> > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.
> ForwardingCacheFlushListener.
> > apply(ForwardingCacheFlushListener.java:35)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.
> > maybeForward(CachingKeyValueStore.java:97)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$
> > 000(CachingKeyValueStore.java:34)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(
> > CachingKeyValueStore.java:84)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.NamedCache.
> > flush(NamedCache.java:117)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.NamedCache.
> > evict(NamedCache.java:196)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > maybeEvict(ThreadCache.java:190)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.ThreadCache.
> > put(ThreadCache.java:121)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
> > CachingKeyValueStore.java:187)
> > > at
> > >
> > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
> > CachingKeyValueStore.java:182)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> > KTableReduceProcessor.process(KTableReduce.java:92)
> > > at
> > >
> > > org.apache.kafka.streams.kstream.internals.KTableReduce$
> > KTableReduceProcessor.process(KTableReduce.java:52)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:82)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:204)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:66)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:177)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:427)
> > > at
> > >
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:235)
> > >
> > > I know this isn't a great bug report, as I can't seem to reproduce this
> > in
> > > a more sandboxed situation. Any tips / ideas for further steps?
> > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to