GitHub user SlevinBE opened a pull request:

    https://github.com/apache/kafka/pull/2741

    KAFKA-4953: Global Store: cast exception when initialising with in-memory 
logged state store

    Currently it is not possible to initialise a global store with an in-memory 
logged store via the TopologyBuilder. This results in the following exception:
    ```
    java.lang.ClassCastException: 
org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl cannot 
be cast to org.apache.kafka.streams.processor.internals.RecordCollector$Supplier
    
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:52)
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.<init>(StoreChangeLogger.java:44)
        at 
org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.init(InMemoryKeyValueLoggedStore.java:56)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
        at 
org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:97)
        at 
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
        at 
org.apache.kafka.test.ProcessorTopologyTestDriver.<init>(ProcessorTopologyTestDriver.java:215)
        at 
org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.shouldDriveInMemoryLoggedGlobalStore(ProcessorTopologyTest.java:235)
        ...
    ```
    This PR includes a unit test to verify this behaviour.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Klarrio/kafka 
global_store_in_memory_logged_state_store_unit_test

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2741.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2741
    
----
commit 8cfe3523d0c0bcdd3d472ecf6d52b600110c2149
Author: Yennick Trevels <yennick.trev...@klarrio.com>
Date:   2017-03-27T10:29:13Z

    add failing unit test to verify behaviour of a global store bug in 
combination with an in-memory logged state store

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to