Well, the exception is throw fomr `magnolia.Magnolia$$anon$5.dereference` and I have no idea what this is...
Maybe the used serde does not hangle `null` correctly? As `null` should be transalted to `null`, it might be possible to workaround the issue by "wrapping" the serde with you own impl that just checks for `null` and returns `null` directly and only passed non-null values into the actual serde? Might also be worth to file a bug report for the tool you are using? -Matthias On 10/20/20 9:41 AM, Dumitru-Nicolae Marasoui wrote: > Hi, > I am trying to delete (tombstone) some records in a compacted topic, by > means of emitting a ("key", null) key-value pair, which by compaction would > get removed after some time. > > However, I am getting the exception below: > Exception in thread > "SV-6c606e52-46eb-4a59-a006-27ce6ce1a603-StreamThread-1" > java.lang.NullPointerException > at magnolia.Magnolia$$anon$5.dereference(magnolia.scala:537) > at > com.sksamuel.avro4s.Encoder$$anon$15.$anonfun$encode$10(Encoder.scala:393) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at com.sksamuel.avro4s.Encoder$$anon$15.encode(Encoder.scala:360) > at com.sksamuel.avro4s.ToRecord$$anon$1.to(ToRecord.scala:24) > at com.sksamuel.avro4s.RecordFormat$$anon$1.to(RecordFormat.scala:23) > at > com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:67) > at > org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) > at > com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:64) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111) > at > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > > Do you have any idea? > > Thank you > Nicolae >