Dmitry Minkovsky created KAFKA-7536:
---------------------------------------

             Summary: TopologyTestDriver cannot pre-populate GlobalKTable
                 Key: KAFKA-7536
                 URL: https://issues.apache.org/jira/browse/KAFKA-7536
             Project: Kafka
          Issue Type: Bug
            Reporter: Dmitry Minkovsky


I have a GlobalKTable that's defined as

{code}

GlobalKTable<String, ByteString> userIdsByEmail = topology          
   .globalTable(USER_IDS_BY_EMAIL.name,
                       USER_IDS_BY_EMAIL.consumed(),
                       Materialized.as("user-ids-by-email"));
{code}

And the following test in Spock:

{code}
    def topology = // my topology
    def driver = new TopologyTestDriver(topology, config())

    def cleanup() {
        driver.close()
    }

    def "create from email request"() {

        def store = driver.getKeyValueStore('user-ids-by-email')
        store.put('string', ByteString.copyFrom(new byte[0]))
{code}

When I run this, I get the following:

{code}

[2018-10-23 19:35:27,055] INFO 
(org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
Restoring state for global store user-ids-by-email

java.lang.NullPointerException
        at 
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
        at 
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
        at pony.message.MessageWriteStreamsTest.create from mailgun email 
request(MessageWriteStreamsTest.groovy:52)

[2018-10-23 19:35:27,189] INFO 
(org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
[main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
{code}

I've noticed that I can {{put()}} to the store if I first write to it with 
{{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to