Hi Dumitru,

According to the stack trace that you've shared the NPE is being thrown by this framework called *Avro4S* that you're using. This is important to isolate the problem because it means that it is not Kafka Streams the problem but rather, your serialization framework.

Nevertheless, the Avro specification allows fields to be null if you explicitly specify this in the Avro file. For instance:

```

{
  "type": "record",
  "name": "MyRecord",
  "fields" : [
    {"name": "userId", "type": "long"},              // mandatory field
    {"name": "userName", "type": ["null", "string"]} // optional field
  ]
}

```

The field *userName* above can have null values and be treated as optional. You may want to check if you can make this change in the Avro file or if it is made already, if the serialization framework that you're using don't have problems in handling situations like this.

Thanks,

-- Ricardo

On 6/17/20 11:29 AM, Dumitru-Nicolae Marasoui wrote:
Hello kafka community,
When the following kafka-streams starts with input topic values in avro
format, we get this NPE below. The input is a record and a field of it is
an array of other records. Reading the stack trace below what I understand
is that at some point in deserializing a value structure it encounters an
unexpected null value and hence the NPE. Do you have any hints as to what
may be the problem? In this kafka-streams ETL job we emit multiple messages
from a single input message (flatMapping the array field to the output).
Thank you
Exception in thread
“global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
java.lang.NullPointerException
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
at
com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
at
org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:103)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:428)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Thank you,
Nicolae

Reply via email to