Hey,
*Without any code change*, just by bumping the kafka version from 2.5.1 to
2.6.1 (clients only) - my stream application started throwing
NullPointerException (sometimes, not in a predicted pattern).
Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf
that was forgotten there from the older versions.

We are using Scala 2.12, and the line that throws this exception is using
flatMapValues:


>  inputStream.flatMapValues(_.split) # return type
> KStream[Windowed[String], SingleInputMessage]


Where inputStream is of type: KStream[Windowed[String], InputMessage] and
the split method splits this InputMessage into several
SingleInputMessage messages (hence the flat - to avoid
List[SingleInputMessage]).

The exception:

> java.lang.NullPointerException: null Wrapped by:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=2_2, processor=unique_input_message-repartition-source,
> topic=service-unique_input_message-repartition, partition=2,
> offset=318846738, stacktrace=java.lang.NullPointerException
>

java.lang.NullPointerException: null at
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> at
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> at
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> at
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> ... 4 common frames omitted Wrapped by:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=2_2, processor=unique_input_message-repartition-source,
> topic=service-unique_input_message-repartition, partition=2,
> offset=318846738, stacktrace=java.lang.NullPointerException at
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> at
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> at
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> at
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>

* 2nd line of the exception is because we are using Scala
(FunctionsCompatConversions.scala:62)

> implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> JIterable[VR]] = (value: V) => f(value).*asJava* }
>

The main thing here is that we didn't change anything in the app code.. so
i wonder if it's a new bug OR our implementation bug that somehow didn't
happen in 2.5.1 (or previous versions, since this logic is old)

Thanks and let me know what else can help (i wish i knew how to reproduce,
it happened 6 times during the last day and no idea why.. i'll try to catch
it with logs)

-- 

Nitay Kufert
Backend Team Leader
[image: ironSource] <http://www.ironsrc.com>

email nita...@ironsrc.com
mobile +972-54-5480021
fax +972-77-5448273
skype nitay.kufert.ssa
121 Menachem Begin St., Tel Aviv, Israel
ironsrc.com <http://www.ironsrc.com>
[image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
twitter] <https://twitter.com/ironsource> [image: facebook]
<https://www.facebook.com/ironSource> [image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.

Reply via email to