[ https://issues.apache.org/jira/browse/KAFKA-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943030#comment-15943030 ]
ASF GitHub Bot commented on KAFKA-4953: --------------------------------------- GitHub user SlevinBE opened a pull request: https://github.com/apache/kafka/pull/2741 KAFKA-4953: Global Store: cast exception when initialising with in-memory logged state store Currently it is not possible to initialise a global store with an in-memory logged store via the TopologyBuilder. This results in the following exception: ``` java.lang.ClassCastException: org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl cannot be cast to org.apache.kafka.streams.processor.internals.RecordCollector$Supplier at org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:52) at org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:44) at org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97) at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61) at org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:215) at org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235) ... ``` This PR includes a unit test to verify this behaviour. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Klarrio/kafka global_store_in_memory_logged_state_store_unit_test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2741.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2741 ---- commit 8cfe3523d0c0bcdd3d472ecf6d52b600110c2149 Author: Yennick Trevels <yennick.trev...@klarrio.com> Date: 2017-03-27T10:29:13Z add failing unit test to verify behaviour of a global store bug in combination with an in-memory logged state store ---- > Global Store: cast exception when initialising with in-memory logged state > store > -------------------------------------------------------------------------------- > > Key: KAFKA-4953 > URL: https://issues.apache.org/jira/browse/KAFKA-4953 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Yennick Trevels > > Currently it is not possible to initialise a global store with an in-memory > *logged* store via the TopologyBuilder. This results in the following > exception: > {code} > java.lang.ClassCastException: > org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl > cannot be cast to > org.apache.kafka.streams.processor.internals.RecordCollector$Supplier > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:52) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:44) > at > org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) > at > org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97) > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61) > at > org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:215) > at > org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235) > ... > {code} > I've created a PR which includes a unit this to verify this behaviour. -- This message was sent by Atlassian JIRA (v6.3.15#6346)