Yes, compression was on (lz4), key and value sizes fluctuate, key sizes are small <10 bytes, value sizes fluctuate also, but nothing crazy, up to about 100kb.
I did some stepping through the code and at some point I saw a branch that used a different path depending on protocol version (something with LegacyRecord), then I figured updating the broker was worth a shot. I can do some more testing, but I need to set up another 0.10.2.1 cluster first. Frank On Fri, Jun 16, 2017 at 2:09 AM, Apurva Mehta <apu...@confluent.io> wrote: > Finally, was compression enabled when you hit this exception? If so, which > compression algorithm was enabled? > > On Thu, Jun 15, 2017 at 5:04 PM, Apurva Mehta <apu...@confluent.io> wrote: > > > Frank: it would be even better if you could share the key and value which > > was causing this problem. Maybe share it on the JIRA: > > https://issues.apache.org/jira/browse/KAFKA-5456 ? > > > > Thanks, > > Apurva > > > > On Thu, Jun 15, 2017 at 4:07 PM, Apurva Mehta <apu...@confluent.io> > wrote: > > > >> Hi Frank, > >> > >> What is is the value of `batch.size` in your producer? What is the size > >> of the key and value you are trying to write? > >> > >> Thanks, > >> Apurva > >> > >> On Thu, Jun 15, 2017 at 2:28 AM, Frank Lyaruu <flya...@gmail.com> > wrote: > >> > >>> Hey people, I see an error I haven't seen before. It is on a > lowlevel-API > >>> based streams application. I've started it once, then it ran fine, then > >>> did > >>> a graceful shutdown and since then I always see this error on startup. > >>> > >>> I'm using yesterday's trunk. > >>> > >>> It seems that the MemoryRecordsBuilder overflows somehow, is there > >>> something I need to configure? > >>> > >>> java.lang.NullPointerException > >>> > >>> at org.apache.kafka.common.utils.Utils.notNull(Utils.java:243) > >>> at > >>> org.apache.kafka.clients.producer.internals.RecordAccumulato > >>> r.append(RecordAccumulator.java:219) > >>> at > >>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka > >>> Producer.java:650) > >>> at > >>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr > >>> oducer.java:604) > >>> at > >>> org.apache.kafka.streams.processor.internals.RecordCollector > >>> Impl.send(RecordCollectorImpl.java:97) > >>> at > >>> org.apache.kafka.streams.state.internals.StoreChangeLogger.l > >>> ogChange(StoreChangeLogger.java:59) > >>> at > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > >>> ueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:58) > >>> at > >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyVal > >>> ueStore.put(ChangeLoggingKeyValueStore.java:73) > >>> at > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > >>> e$2.run(MeteredKeyValueStore.java:66) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > >>> e.put(MeteredKeyValueStore.java:149) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:47) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.OneToOneProcessor.proces > >>> s(OneToOneProcessor.java:64) > >>> at > >>> com.dexels.kafka.streams.remotejoin.OneToOneProcessor.proces > >>> s(OneToOneProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.emitMessage(OneToManyGroupedProcessor.java:95) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:80) > >>> at > >>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedP > >>> rocessor.process(OneToManyGroupedProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:48) > >>> at > >>> com.dexels.kafka.streams.remotejoin.StoreProcessor.process(S > >>> toreProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor. > >>> process(XmlTransformerProcessor.java:51) > >>> at > >>> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor. > >>> process(XmlTransformerProcessor.java:1) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1 > >>> .run(ProcessorNode.java:47) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsI > >>> mpl.measureLatencyNs(StreamsMetricsImpl.java:187) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.p > >>> rocess(ProcessorNode.java:133) > >>> at > >>> org.apache.kafka.streams.processor.internals.ProcessorContex > >>> tImpl.forward(ProcessorContextImpl.java:82) > >>> at > >>> org.apache.kafka.streams.processor.internals.SourceNode.proc > >>> ess(SourceNode.java:80) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamTask.proc > >>> ess(StreamTask.java:189) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamThread.pr > >>> ocessAndPunctuate(StreamThread.java:677) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru > >>> nLoop(StreamThread.java:555) > >>> at > >>> org.apache.kafka.streams.processor.internals.StreamThread.ru > >>> n(StreamThread.java:525) > >>> > >> > >> > > >