Hello,

I have a simple example (or so it would seem) of a stream processor which uses 
a persistent state store. Testing on one local Kafka (0.10.1.1) node, this 
starts up without problems for a topic with 1 partition. However, if I create a 
topic with 3 partitions I’m getting the following exception shortly after the 
init() method of the Processor is called (init completes without problems):

2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread [StreamThread-1] 
Failed to create an active task 0_1: 
org.apache.kafka.streams.errors.StreamsException: task [0_1] Store store1's 
change log (app1-store1-changelog) does not contain partition 1
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185)
        at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)

The code is essentially:

StateStoreSupplier testStore = Stores.create("store1")
        .withStringKeys()
        .withStringValues()
        .persistent()
        .build();

TopologyBuilder builder = new TopologyBuilder();

builder.addSource("source", "topic1")
        .addProcessor("process", TestProcessor::new, "source")
        .addStateStore(testStore, "process”);

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

public static class TestProcessor implements Processor<String, String> {
    @Override
    public void init(ProcessorContext context) {
        context.getStateStore("store1");
        System.out.println("Initialized");
    }
}

Full source here: 
https://gist.github.com/adamw/b5c69f86d8688da23afebd095683faaa 
<https://gist.github.com/>
Full stack trace: 
https://gist.github.com/adamw/f72cdf0c2f0d67425ed9c103a327f3bf 
<https://gist.github.com/adamw/f72cdf0c2f0d67425ed9c103a327f3bf>

I would be grateful for any pointers!
Adam

-- 
Adam Warski

http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>

Reply via email to