Hey Frank,

I think I spotted the issue and submitted a patch. Here's a link to the
JIRA: https://issues.apache.org/jira/browse/KAFKA-5456. I expect we'll get
the fix into 0.11.0.0. Thanks for finding this!

-Jason

On Thu, Jun 15, 2017 at 11:53 PM, Frank Lyaruu <flya...@gmail.com> wrote:

> Yes, compression was on (lz4), key and value sizes fluctuate, key sizes are
> small <10 bytes, value sizes fluctuate also, but nothing crazy, up to about
> 100kb.
>
> I did some stepping through the code and at some point I saw a branch that
> used a different path depending on protocol version (something with
> LegacyRecord), then I figured updating the broker was worth a shot.
>
> I can do some more testing, but I need to set up another 0.10.2.1 cluster
> first.
>
> Frank
>
> On Fri, Jun 16, 2017 at 2:09 AM, Apurva Mehta <apu...@confluent.io> wrote:
>
> > Finally, was compression enabled when you hit this exception? If so,
> which
> > compression algorithm was enabled?
> >
> > On Thu, Jun 15, 2017 at 5:04 PM, Apurva Mehta <apu...@confluent.io>
> wrote:
> >
> > > Frank: it would be even better if you could share the key and value
> which
> > > was causing this problem. Maybe share it on the JIRA:
> > > https://issues.apache.org/jira/browse/KAFKA-5456 ?
> > >
> > > Thanks,
> > > Apurva
> > >
> > > On Thu, Jun 15, 2017 at 4:07 PM, Apurva Mehta <apu...@confluent.io>
> > wrote:
> > >
> > >> Hi Frank,
> > >>
> > >> What is is the value of `batch.size` in your producer? What is the
> size
> > >> of the key and value you are trying to write?
> > >>
> > >> Thanks,
> > >> Apurva
> > >>
> > >> On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu <flya...@gmail.com>
> > wrote:
> > >>
> > >>> Hey people, I see an error I haven't seen before. It is on a
> > lowlevel-API
> > >>> based streams application. I've started it once, then it ran fine,
> then
> > >>> did
> > >>> a graceful shutdown and since then I always see this error on
> startup.
> > >>>
> > >>> I'm using yesterday's trunk.
> > >>>
> > >>> It seems that the MemoryRecordsBuilder overflows somehow, is there
> > >>> something I need to configure?
> > >>>
> > >>> java.lang.NullPointerException
> > >>>
> > >>> at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243)
> > >>> at
> > >>> org.apache.kafka.clients.producer.internals.RecordAccumulato
> > >>> r.append(RecordAccumulator.java:219)
> > >>> at
> > >>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka
> > >>> Producer.java:650)
> > >>> at
> > >>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr
> > >>> oducer.java:604)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.RecordCollector
> > >>> Impl.send(RecordCollectorImpl.java:97)
> > >>> at
> > >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.l
> > >>> ogChange(StoreChangeLogger.java:59)
> > >>> at
> > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal
> > >>> ueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:58)
> > >>> at
> > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal
> > >>> ueStore.put(ChangeLoggingKeyValueStore.java:73)
> > >>> at
> > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> > >>> e$2.run(MeteredKeyValueStore.java:66)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> > >>> e.put(MeteredKeyValueStore.java:149)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:47)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:80)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:48)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:80)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:48)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:80)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:48)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.OneToOneProcessor.proces
> > >>> s(OneToOneProcessor.java:64)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.OneToOneProcessor.proces
> > >>> s(OneToOneProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:48)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:80)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:48)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:80)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:48)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:80)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP
> > >>> rocessor.process(OneToManyGroupedProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:48)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S
> > >>> toreProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.
> > >>> process(XmlTransformerProcessor.java:51)
> > >>> at
> > >>> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.
> > >>> process(XmlTransformerProcessor.java:1)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1
> > >>> .run(ProcessorNode.java:47)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI
> > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p
> > >>> rocess(ProcessorNode.java:133)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorContex
> > >>> tImpl.forward(ProcessorContextImpl.java:82)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.SourceNode.proc
> > >>> ess(SourceNode.java:80)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.proc
> > >>> ess(StreamTask.java:189)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.pr
> > >>> ocessAndPunctuate(StreamThread.java:677)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru
> > >>> nLoop(StreamThread.java:555)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru
> > >>> n(StreamThread.java:525)
> > >>>
> > >>
> > >>
> > >
> >
>

Reply via email to