Hello, I have a simple example (or so it would seem) of a stream processor which uses a persistent state store. Testing on one local Kafka (0.10.1.1) node, this starts up without problems for a topic with 1 partition. However, if I create a topic with 3 partitions I’m getting the following exception shortly after the init() method of the Processor is called (init completes without problems):
2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread [StreamThread-1] Failed to create an active task 0_1: org.apache.kafka.streams.errors.StreamsException: task [0_1] Store store1's change log (app1-store1-changelog) does not contain partition 1 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119) The code is essentially: StateStoreSupplier testStore = Stores.create("store1") .withStringKeys() .withStringValues() .persistent() .build(); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source", "topic1") .addProcessor("process", TestProcessor::new, "source") .addStateStore(testStore, "process”); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); public static class TestProcessor implements Processor<String, String> { @Override public void init(ProcessorContext context) { context.getStateStore("store1"); System.out.println("Initialized"); } } Full source here: https://gist.github.com/adamw/b5c69f86d8688da23afebd095683faaa <https://gist.github.com/> Full stack trace: https://gist.github.com/adamw/f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/f72cdf0c2f0d67425ed9c103a327f3bf> I would be grateful for any pointers! Adam -- Adam Warski http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> http://www.softwaremill.com <http://www.softwaremill.com/> http://www.warski.org <http://www.warski.org/>