Thanks Frank for reporting the bug, and many thanks to Damian for the quick catch!
On Thu, Oct 13, 2016 at 12:30 PM, Frank Lyaruu <flya...@gmail.com> wrote: > The issue seems to be gone. Amazing work, thanks...! > > On Thu, Oct 13, 2016 at 6:56 PM, Damian Guy <damian....@gmail.com> wrote: > > > Hi, i believe i found the problem. If possible could you please try with > > this: https://github.com/dguy/kafka/tree/cache-bug > > > > Thanks, > > Damian > > > > On Thu, 13 Oct 2016 at 17:46 Damian Guy <damian....@gmail.com> wrote: > > > > > Hi Frank, > > > > > > Thanks for reporting. Can you provide a sample of the join you are > > > running? > > > > > > Thanks, > > > Damian > > > > > > On Thu, 13 Oct 2016 at 16:10 Frank Lyaruu <flya...@gmail.com> wrote: > > > > > > Hi Kafka people, > > > > > > I'm joining a bunch of Kafka Topics using Kafka Streams, with the Kafka > > > 0.10.1 release candidate. > > > > > > It runs ok for a few thousand of messages, and then it dies with the > > > following exception: > > > > > > Exception in thread "StreamThread-1" java.lang.NullPointerException > > > at > > > > > > org.apache.kafka.streams.state.internals.NamedCache. > > evict(NamedCache.java:194) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache. > > maybeEvict(ThreadCache.java:190) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache. > > put(ThreadCache.java:121) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > CachingKeyValueStore.java:147) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get( > > CachingKeyValueStore.java:134) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > > KTableAggregateValueGetter.get(KTableReduce.java:121) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:77) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$ > > KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:48) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > > KTableFilterProcessor.process(KTableFilter.java:83) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > > KTableFilterProcessor.process(KTableFilter.java:73) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > > KTableFilterProcessor.process(KTableFilter.java:83) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$ > > KTableFilterProcessor.process(KTableFilter.java:73) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ > > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:52) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger$ > > KTableKTableJoinMergeProcessor.process(KTableKTableJoinMerger.java:49) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ > > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:83) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin$ > > KTableKTableRightJoinProcessor.process(KTableKTableRightJoin.java:49) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.kstream.internals. > ForwardingCacheFlushListener. > > apply(ForwardingCacheFlushListener.java:35) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore. > > maybeForward(CachingKeyValueStore.java:97) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$ > > 000(CachingKeyValueStore.java:34) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply( > > CachingKeyValueStore.java:84) > > > at > > > > > > org.apache.kafka.streams.state.internals.NamedCache. > > flush(NamedCache.java:117) > > > at > > > > > > org.apache.kafka.streams.state.internals.NamedCache. > > evict(NamedCache.java:196) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache. > > maybeEvict(ThreadCache.java:190) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache. > > put(ThreadCache.java:121) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put( > > CachingKeyValueStore.java:187) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingKeyValueStore.put( > > CachingKeyValueStore.java:182) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > > KTableReduceProcessor.process(KTableReduce.java:92) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableReduce$ > > KTableReduceProcessor.process(KTableReduce.java:52) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process( > > ProcessorNode.java:82) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > SourceNode.process(SourceNode.java:66) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamTask.process(StreamTask.java:177) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:427) > > > at > > > > > > org.apache.kafka.streams.processor.internals. > > StreamThread.run(StreamThread.java:235) > > > > > > I know this isn't a great bug report, as I can't seem to reproduce this > > in > > > a more sandboxed situation. Any tips / ideas for further steps? > > > > > > > > > -- -- Guozhang