Hi Ara, It is a bug in 0.10.1 that has been fixed: https://issues.apache.org/jira/browse/KAFKA-4311 To work around it you should set StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 The fix is available on trunk and 0.10.1 branch and there will be a 0.10.1.1 release any day now.
Thanks, Damian On Fri, 9 Dec 2016 at 01:12 Ara Ebrahimi <ara.ebrah...@argyledata.com> wrote: > Hi, > > Once in a while and quite randomly this happens, but it does happen every > few hundred thousand message: > > 2016-12-03 11:48:05 ERROR StreamThread:249 - stream-thread > [StreamThread-4] Streams application error during processing: > java.lang.ClassCastException: > org.apache.kafka.streams.kstream.internals.Change cannot be cast to > com.argyledata.streams.entity.Activity > at com.argyledata.streams.StreamPipeline$$Lambda$14/33419717.apply(Unknown > Source) > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:35) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.maybeForward(CachingKeyValueStore.java:97) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:84) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) > at > org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:199) > at > org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:190) > at > org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:121) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:147) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:134) > at > org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateValueGetter.get(KStreamAggregate.java:112) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableLeftJoin$KStreamKTableLeftJoinProcessor.process(KStreamKTableLeftJoin.java:61) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:204) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:181) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > > Has anyone else seen this weird problem? > > Ara. > > > > ________________________________ > > This message is for the designated recipient only and may contain > privileged, proprietary, or otherwise confidential information. If you have > received it in error, please notify the sender immediately and delete the > original. Any other use of the e-mail by you is prohibited. Thank you in > advance for your cooperation. > > ________________________________ >