[
https://issues.apache.org/jira/browse/KAFKA-4311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Damian Guy updated KAFKA-4311:
------------------------------
Summary: Multi layer cache eviction causes forwarding to incorrect
ProcessorNode (was: Multi layer cache eviction causes forwarding to incorrect
Processor Node )
> Multi layer cache eviction causes forwarding to incorrect ProcessorNode
> ------------------------------------------------------------------------
>
> Key: KAFKA-4311
> URL: https://issues.apache.org/jira/browse/KAFKA-4311
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.0
> Reporter: Damian Guy
> Assignee: Damian Guy
> Fix For: 0.10.1.1
>
>
> The two exceptions below were reported by Frank on the dev mailing list.
> After investigation, the root cause is multiple cache evictions happening in
> the same topology.
> Given a topology like the one below. If a record arriving in `tableOne`
> causes a cache eviction, it will trigger the `leftJoin` that will do a `get`
> from `reducer-store`. If the key is not currently cached in `reducer-store`,
> but is in the backing store, it will be put into the cache, and it may also
> trigger an eviction. If it does trigger an eviction and the eldest entry is
> dirty it will flush the dirty keys. It is at this point that the exception in
> the comment happens (ClassCastException). This occurs because the
> ProcessorContext is still set to the context of the `leftJoin` and the next
> child in the topology is `mapValues`.
> We need to set the correct `ProcessorNode`, on the context, in the
> `ForwardingCacheFlushListener` prior to calling `context.forward`. We also
> need to set remember to reset the `ProcessorNode` to the previous node once
> `context.forward` has completed.
> {code}
> final KTable<String, String> one = builder.table(Serdes.String(),
> Serdes.String(), tableOne, tableOne);
> final KTable<Long, String> two = builder.table(Serdes.Long(),
> Serdes.String(), tableTwo, tableTwo);
> final KTable<String, Long> reduce = two.groupBy(new
> KeyValueMapper<Long, String, KeyValue<String, Long>>() {
> @Override
> public KeyValue<String, Long> apply(final Long key, final String
> value) {
> return new KeyValue<>(value, key);
> }
> }, Serdes.String(), Serdes.Long())
> .reduce(new Reducer<Long>() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 + value2;
> }
> }, new Reducer<Long>() {
> @Override
> public Long apply(final Long value1, final Long value2) {
> return value1 - value2;
> }
> }, "reducer-store");
> one.leftJoin(reduce, new ValueJoiner<String, Long, String>() {
> @Override
> public String apply(final String value1, final Long value2) {
> return value1 + ":" + value2;
> }
> })
> .mapValues(new ValueMapper<String, String>() {
> @Override
> public String apply(final String value) {
> return value;
> }
> });
> {code}
> This exception is actually a symptom of the exception reported below in the
> comment. After the first exception is thrown, the StreamThread triggers a
> shutdown that then throws this exception.
> [StreamThread-1] ERROR
> org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
> [StreamThread-1] Failed to close state manager for StreamTask 0_0:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed
> to close state store addr-organization
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:342)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:121)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:341)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:322)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:338)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:299)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:262)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:245)
> Caused by: java.lang.IllegalStateException: Key found in dirty key set, but
> entry is null
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:112)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:100)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:111)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.close(CachingKeyValueStore.java:117)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:340)
> ... 7 more
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)