[ https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Damian Guy updated KAFKA-4311: ------------------------------ Summary: Multi layer cache eviction causes forwarding to incorrect ProcessorNode (was: Multi layer cache eviction causes forwarding to incorrect Processor Node ) > Multi layer cache eviction causes forwarding to incorrect ProcessorNode > ------------------------------------------------------------------------ > > Key: KAFKA-4311 > URL: https://issues.apache.org/jira/browse/KAFKA-4311 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0 > Reporter: Damian Guy > Assignee: Damian Guy > Fix For: 0.10.1.1 > > > The two exceptions below were reported by Frank on the dev mailing list. > After investigation, the root cause is multiple cache evictions happening in > the same topology. > Given a topology like the one below. If a record arriving in `tableOne` > causes a cache eviction, it will trigger the `leftJoin` that will do a `get` > from `reducer-store`. If the key is not currently cached in `reducer-store`, > but is in the backing store, it will be put into the cache, and it may also > trigger an eviction. If it does trigger an eviction and the eldest entry is > dirty it will flush the dirty keys. It is at this point that the exception in > the comment happens (ClassCastException). This occurs because the > ProcessorContext is still set to the context of the `leftJoin` and the next > child in the topology is `mapValues`. > We need to set the correct `ProcessorNode`, on the context, in the > `ForwardingCacheFlushListener` prior to calling `context.forward`. We also > need to set remember to reset the `ProcessorNode` to the previous node once > `context.forward` has completed. > {code} > final KTable<String, String> one = builder.table(Serdes.String(), > Serdes.String(), tableOne, tableOne); > final KTable<Long, String> two = builder.table(Serdes.Long(), > Serdes.String(), tableTwo, tableTwo); > final KTable<String, Long> reduce = two.groupBy(new > KeyValueMapper<Long, String, KeyValue<String, Long>>() { > @Override > public KeyValue<String, Long> apply(final Long key, final String > value) { > return new KeyValue<>(value, key); > } > }, Serdes.String(), Serdes.Long()) > .reduce(new Reducer<Long>() { > @Override > public Long apply(final Long value1, final Long value2) { > return value1 + value2; > } > }, new Reducer<Long>() { > @Override > public Long apply(final Long value1, final Long value2) { > return value1 - value2; > } > }, "reducer-store"); > one.leftJoin(reduce, new ValueJoiner<String, Long, String>() { > @Override > public String apply(final String value1, final Long value2) { > return value1 + ":" + value2; > } > }) > .mapValues(new ValueMapper<String, String>() { > @Override > public String apply(final String value) { > return value; > } > }); > {code} > This exception is actually a symptom of the exception reported below in the > comment. After the first exception is thrown, the StreamThread triggers a > shutdown that then throws this exception. > [StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.StreamThread - stream-thread > [StreamThread-1] Failed to close state manager for StreamTask 0_0: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed > to close state store addr-organization > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342) > at > org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121) > at > org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341) > at > org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322) > at > org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338) > at > org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299) > at > org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245) > Caused by: java.lang.IllegalStateException: Key found in dirty key set, but > entry is null > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340) > ... 7 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)