Well, the exception is throw fomr
`magnolia.Magnolia$$anon$5.dereference` and I have no idea what this is...

Maybe the used serde does not hangle `null` correctly?

As `null` should be transalted to `null`, it might be possible to
workaround the issue by "wrapping" the serde with you own impl that just
checks for `null` and returns `null` directly and only passed non-null
values into the actual serde?

Might also be worth to file a bug report for the tool you are using?


-Matthias

On 10/20/20 9:41 AM, Dumitru-Nicolae Marasoui wrote:
> Hi,
> I am trying to delete (tombstone) some records in a compacted topic, by
> means of emitting a ("key", null) key-value pair, which by compaction would
> get removed after some time.
> 
> However, I am getting the exception below:
> Exception in thread
> "SV-6c606e52-46eb-4a59-a006-27ce6ce1a603-StreamThread-1"
> java.lang.NullPointerException
> at magnolia.Magnolia$$anon$5.dereference(magnolia.scala:537)
> at
> com.sksamuel.avro4s.Encoder$$anon$15.$anonfun$encode$10(Encoder.scala:393)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at com.sksamuel.avro4s.Encoder$$anon$15.encode(Encoder.scala:360)
> at com.sksamuel.avro4s.ToRecord$$anon$1.to(ToRecord.scala:24)
> at com.sksamuel.avro4s.RecordFormat$$anon$1.to(RecordFormat.scala:23)
> at
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:67)
> at
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
> at
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:64)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> 
> Do you have any idea?
> 
> Thank you
> Nicolae
> 

Reply via email to