Bumping for the off chance that during this time some sort of a bug was reported that might explain this behaviour.. i will feel more comfortable bumping our kafka versions this way :)
On Wed, Feb 24, 2021 at 12:48 PM Nitay Kufert <nita...@ironsrc.com> wrote: > I guess it's possible but very unlikely because it works perfectly with > all the previous versions and the current one? (2.5.1) > Why did a change in the version introduce NULLS there? > > On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Is it possible that the flattened values contain `null` and hence >> `_.split` >> throws? >> >> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <nita...@ironsrc.com> wrote: >> >> > Hey, missed your replay - but the code i've shared above the logs is the >> > code around those lines (removed some identifiers to make it a little >> bit >> > more generic): >> > >> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k -> >> > > ${v.printForDebug}")}) # return type KStream[Windowed[String], >> > > SingleInputMessage] >> > >> > >> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > Could you share your code around >> > > >> > > > >> > > >> > > >> > >> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) >> > > >> > > That seems to be where NPE is thrown. >> > > >> > > >> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <nita...@ironsrc.com> >> > wrote: >> > > >> > > > Hey, >> > > > *Without any code change*, just by bumping the kafka version from >> 2.5.1 >> > > to >> > > > 2.6.1 (clients only) - my stream application started throwing >> > > > NullPointerException (sometimes, not in a predicted pattern). >> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" >> conf >> > > > that was forgotten there from the older versions. >> > > > >> > > > We are using Scala 2.12, and the line that throws this exception is >> > using >> > > > flatMapValues: >> > > > >> > > > >> > > > > inputStream.flatMapValues(_.split) # return type >> > > > > KStream[Windowed[String], SingleInputMessage] >> > > > >> > > > >> > > > Where inputStream is of type: KStream[Windowed[String], >> InputMessage] >> > and >> > > > the split method splits this InputMessage into several >> > > > SingleInputMessage messages (hence the flat - to avoid >> > > > List[SingleInputMessage]). >> > > > >> > > > The exception: >> > > > >> > > > > java.lang.NullPointerException: null Wrapped by: >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception >> caught in >> > > > > process. taskId=2_2, >> > processor=unique_input_message-repartition-source, >> > > > > topic=service-unique_input_message-repartition, partition=2, >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException >> > > > > >> > > > >> > > > java.lang.NullPointerException: null at >> > > > > >> > > > >> > > >> > >> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678) >> > > > > ... 4 common frames omitted Wrapped by: >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception >> caught in >> > > > > process. taskId=2_2, >> > processor=unique_input_message-repartition-source, >> > > > > topic=service-unique_input_message-repartition, partition=2, >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException at >> > > > > >> > > > >> > > >> > >> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) >> > > > > at >> > > > > >> > > > >> > > >> > >> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) >> > > > > >> > > > >> > > > * 2nd line of the exception is because we are using Scala >> > > > (FunctionsCompatConversions.scala:62) >> > > > >> > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V => >> > > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V, >> > > > > JIterable[VR]] = (value: V) => f(value).*asJava* } >> > > > > >> > > > >> > > > The main thing here is that we didn't change anything in the app >> code.. >> > > so >> > > > i wonder if it's a new bug OR our implementation bug that somehow >> > didn't >> > > > happen in 2.5.1 (or previous versions, since this logic is old) >> > > > >> > > > Thanks and let me know what else can help (i wish i knew how to >> > > reproduce, >> > > > it happened 6 times during the last day and no idea why.. i'll try >> to >> > > catch >> > > > it with logs) >> > > > >> > > > -- >> > > > >> > > > Nitay Kufert >> > > > Backend Team Leader >> > > > [image: ironSource] <http://www.ironsrc.com> >> > > > >> > > > email nita...@ironsrc.com >> > > > mobile +972-54-5480021 >> > > > fax +972-77-5448273 >> > > > skype nitay.kufert.ssa >> > > > 121 Menachem Begin St., Tel Aviv, Israel >> > > > ironsrc.com <http://www.ironsrc.com> >> > > > [image: linkedin] <https://www.linkedin.com/company/ironsource> >> > [image: >> > > > twitter] <https://twitter.com/ironsource> [image: facebook] >> > > > <https://www.facebook.com/ironSource> [image: googleplus] >> > > > <https://plus.google.com/+ironsrc> >> > > > This email (including any attachments) is for the sole use of the >> > > intended >> > > > recipient and may contain confidential information which may be >> > protected >> > > > by legal privilege. If you are not the intended recipient, or the >> > > employee >> > > > or agent responsible for delivering it to the intended recipient, >> you >> > are >> > > > hereby notified that any use, dissemination, distribution or >> copying of >> > > > this communication and/or its content is strictly prohibited. If you >> > are >> > > > not the intended recipient, please immediately notify us by reply >> email >> > > or >> > > > by telephone, delete this email and destroy any copies. Thank you. >> > > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >> > -- >> > >> > Nitay Kufert >> > Backend Team Leader >> > [image: ironSource] <http://www.ironsrc.com> >> > >> > email nita...@ironsrc.com >> > mobile +972-54-5480021 >> > fax +972-77-5448273 >> > skype nitay.kufert.ssa >> > 121 Menachem Begin St., Tel Aviv, Israel >> > ironsrc.com <http://www.ironsrc.com> >> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image: >> > twitter] <https://twitter.com/ironsource> [image: facebook] >> > <https://www.facebook.com/ironSource> [image: googleplus] >> > <https://plus.google.com/+ironsrc> >> > This email (including any attachments) is for the sole use of the >> intended >> > recipient and may contain confidential information which may be >> protected >> > by legal privilege. If you are not the intended recipient, or the >> employee >> > or agent responsible for delivering it to the intended recipient, you >> are >> > hereby notified that any use, dissemination, distribution or copying of >> > this communication and/or its content is strictly prohibited. If you are >> > not the intended recipient, please immediately notify us by reply email >> or >> > by telephone, delete this email and destroy any copies. Thank you. >> > >> >> >> -- >> -- Guozhang >> > > > -- > > Nitay Kufert > Backend Team Leader > [image: ironSource] <http://www.ironsrc.com> > > email nita...@ironsrc.com > mobile +972-54-5480021 > fax +972-77-5448273 > skype nitay.kufert.ssa > 121 Menachem Begin St., Tel Aviv, Israel > ironsrc.com <http://www.ironsrc.com> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image: > twitter] <https://twitter.com/ironsource> [image: facebook] > <https://www.facebook.com/ironSource> [image: googleplus] > <https://plus.google.com/+ironsrc> > This email (including any attachments) is for the sole use of the intended > recipient and may contain confidential information which may be protected > by legal privilege. If you are not the intended recipient, or the employee > or agent responsible for delivering it to the intended recipient, you are > hereby notified that any use, dissemination, distribution or copying of > this communication and/or its content is strictly prohibited. If you are > not the intended recipient, please immediately notify us by reply email or > by telephone, delete this email and destroy any copies. Thank you. >