Hey Frank, I think I spotted the issue and submitted a patch. Here's a link to the JIRA: https://issues.apache.org/jira/browse/KAFKA-5456. I expect we'll get the fix into 0.11.0.0. Thanks for finding this!
-Jason On Thu, Jun 15, 2017 at 11:53 PM, Frank Lyaruu <flya...@gmail.com> wrote: > Yes, compression was on (lz4), key and value sizes fluctuate, key sizes are > small <10 bytes, value sizes fluctuate also, but nothing crazy, up to about > 100kb. > > I did some stepping through the code and at some point I saw a branch that > used a different path depending on protocol version (something with > LegacyRecord), then I figured updating the broker was worth a shot. > > I can do some more testing, but I need to set up another 0.10.2.1 cluster > first. > > Frank > > On Fri, Jun 16, 2017 at 2:09 AM, Apurva Mehta <apu...@confluent.io> wrote: > > > Finally, was compression enabled when you hit this exception? If so, > which > > compression algorithm was enabled? > > > > On Thu, Jun 15, 2017 at 5:04 PM, Apurva Mehta <apu...@confluent.io> > wrote: > > > > > Frank: it would be even better if you could share the key and value > which > > > was causing this problem. Maybe share it on the JIRA: > > > https://issues.apache.org/jira/browse/KAFKA-5456 ? > > > > > > Thanks, > > > Apurva > > > > > > On Thu, Jun 15, 2017 at 4:07 PM, Apurva Mehta <apu...@confluent.io> > > wrote: > > > > > >> Hi Frank, > > >> > > >> What is is the value of `batch.size` in your producer? What is the > size > > >> of the key and value you are trying to write? > > >> > > >> Thanks, > > >> Apurva > > >> > > >> On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu <flya...@gmail.com> > > wrote: > > >> > > >>> Hey people, I see an error I haven't seen before. It is on a > > lowlevel-API > > >>> based streams application. I've started it once, then it ran fine, > then > > >>> did > > >>> a graceful shutdown and since then I always see this error on > startup. > > >>> > > >>> I'm using yesterday's trunk. > > >>> > > >>> It seems that the MemoryRecordsBuilder overflows somehow, is there > > >>> something I need to configure? > > >>> > > >>> java.lang.NullPointerException > > >>> > > >>> at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) > > >>> at > > >>> org.apache.kafka.clients.producer.internals.RecordAccumulato > > >>> r.append(RecordAccumulator.java:219) > > >>> at > > >>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka > > >>> Producer.java:650) > > >>> at > > >>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr > > >>> oducer.java:604) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.RecordCollector > > >>> Impl.send(RecordCollectorImpl.java:97) > > >>> at > > >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.l > > >>> ogChange(StoreChangeLogger.java:59) > > >>> at > > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > > >>> ueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:58) > > >>> at > > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > > >>> ueStore.put(ChangeLoggingKeyValueStore.java:73) > > >>> at > > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > > >>> e$2.run(MeteredKeyValueStore.java:66) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > > >>> e.put(MeteredKeyValueStore.java:149) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:47) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:48) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:48) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:48) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.OneToOneProcessor.proces > > >>> s(OneToOneProcessor.java:64) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.OneToOneProcessor.proces > > >>> s(OneToOneProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:48) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:48) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:48) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:48) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > > >>> toreProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor. > > >>> process(XmlTransformerProcessor.java:51) > > >>> at > > >>> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor. > > >>> process(XmlTransformerProcessor.java:1) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > > >>> .run(ProcessorNode.java:47) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > > >>> rocess(ProcessorNode.java:133) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > > >>> tImpl.forward(ProcessorContextImpl.java:82) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.SourceNode.proc > > >>> ess(SourceNode.java:80) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamTask.proc > > >>> ess(StreamTask.java:189) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamThread.pr > > >>> ocessAndPunctuate(StreamThread.java:677) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru > > >>> nLoop(StreamThread.java:555) > > >>> at > > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru > > >>> n(StreamThread.java:525) > > >>> > > >> > > >> > > > > > >