Hey, missed your replay - but the code i've shared above the logs is the code around those lines (removed some identifiers to make it a little bit more generic):
> inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k -> > ${v.printForDebug}")}) # return type KStream[Windowed[String], > SingleInputMessage] On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wangg...@gmail.com> wrote: > Could you share your code around > > > > > com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) > > That seems to be where NPE is thrown. > > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <nita...@ironsrc.com> wrote: > > > Hey, > > *Without any code change*, just by bumping the kafka version from 2.5.1 > to > > 2.6.1 (clients only) - my stream application started throwing > > NullPointerException (sometimes, not in a predicted pattern). > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf > > that was forgotten there from the older versions. > > > > We are using Scala 2.12, and the line that throws this exception is using > > flatMapValues: > > > > > > > inputStream.flatMapValues(_.split) # return type > > > KStream[Windowed[String], SingleInputMessage] > > > > > > Where inputStream is of type: KStream[Windowed[String], InputMessage] and > > the split method splits this InputMessage into several > > SingleInputMessage messages (hence the flat - to avoid > > List[SingleInputMessage]). > > > > The exception: > > > > > java.lang.NullPointerException: null Wrapped by: > > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > > process. taskId=2_2, processor=unique_input_message-repartition-source, > > > topic=service-unique_input_message-repartition, partition=2, > > > offset=318846738, stacktrace=java.lang.NullPointerException > > > > > > > java.lang.NullPointerException: null at > > > > > > com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) > > > at > > > > > > org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26) > > > at > > > > > > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76) > > > at > > > > > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > > > at > > > > > > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142) > > > at > > > > > > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131) > > > at > > > > > > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678) > > > ... 4 common frames omitted Wrapped by: > > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > > > process. taskId=2_2, processor=unique_input_message-repartition-source, > > > topic=service-unique_input_message-repartition, partition=2, > > > offset=318846738, stacktrace=java.lang.NullPointerException at > > > > > > com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) > > > at > > > > > > org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26) > > > at > > > > > > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76) > > > at > > > > > > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > > > at > > > > > > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) > > > at > > > > > > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134) > > > at > > > > > > org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142) > > > at > > > > > > org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131) > > > at > > > > > > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221) > > > at > > > > > > org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > > > at > > > > > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > > > at > > > > > > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678) > > > at > > > > > > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678) > > > at > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695) > > > at > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) > > > at > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) > > > > > > > * 2nd line of the exception is because we are using Scala > > (FunctionsCompatConversions.scala:62) > > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V => > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V, > > > JIterable[VR]] = (value: V) => f(value).*asJava* } > > > > > > > The main thing here is that we didn't change anything in the app code.. > so > > i wonder if it's a new bug OR our implementation bug that somehow didn't > > happen in 2.5.1 (or previous versions, since this logic is old) > > > > Thanks and let me know what else can help (i wish i knew how to > reproduce, > > it happened 6 times during the last day and no idea why.. i'll try to > catch > > it with logs) > > > > -- > > > > Nitay Kufert > > Backend Team Leader > > [image: ironSource] <http://www.ironsrc.com> > > > > email nita...@ironsrc.com > > mobile +972-54-5480021 > > fax +972-77-5448273 > > skype nitay.kufert.ssa > > 121 Menachem Begin St., Tel Aviv, Israel > > ironsrc.com <http://www.ironsrc.com> > > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image: > > twitter] <https://twitter.com/ironsource> [image: facebook] > > <https://www.facebook.com/ironSource> [image: googleplus] > > <https://plus.google.com/+ironsrc> > > This email (including any attachments) is for the sole use of the > intended > > recipient and may contain confidential information which may be protected > > by legal privilege. If you are not the intended recipient, or the > employee > > or agent responsible for delivering it to the intended recipient, you are > > hereby notified that any use, dissemination, distribution or copying of > > this communication and/or its content is strictly prohibited. If you are > > not the intended recipient, please immediately notify us by reply email > or > > by telephone, delete this email and destroy any copies. Thank you. > > > > > -- > -- Guozhang > -- Nitay Kufert Backend Team Leader [image: ironSource] <http://www.ironsrc.com> email nita...@ironsrc.com mobile +972-54-5480021 fax +972-77-5448273 skype nitay.kufert.ssa 121 Menachem Begin St., Tel Aviv, Israel ironsrc.com <http://www.ironsrc.com> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image: twitter] <https://twitter.com/ironsource> [image: facebook] <https://www.facebook.com/ironSource> [image: googleplus] <https://plus.google.com/+ironsrc> This email (including any attachments) is for the sole use of the intended recipient and may contain confidential information which may be protected by legal privilege. If you are not the intended recipient, or the employee or agent responsible for delivering it to the intended recipient, you are hereby notified that any use, dissemination, distribution or copying of this communication and/or its content is strictly prohibited. If you are not the intended recipient, please immediately notify us by reply email or by telephone, delete this email and destroy any copies. Thank you.