> If you increase the number of partitions in the topic "topic1" after the > state store is created, you'd need to manually increase the number of > partitions in the "app1-store1-changelog" topic as well. Or remove the > topic and let KS recreate it next run. But, either way, hopefully you > don't need the data in it, 'cause it won't match the partitioning of the > input topic. :-)
I’m running these test on “clean” Kafka - only “topic1” is manually created (with 1 or 3 partitions). Removing existing data was the first thing I did after getting the exception :) (to be more specific I’m removing /tmp/zookeeper, /tmp/kafka-logs, /tmp/kafka-streams) Thanks, Adam > Mathieu > > > On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski <a...@warski.org> wrote: > > > Hello, > > > > I have a simple example (or so it would seem) of a stream processor which > > uses a persistent state store. Testing on one local Kafka (0.10.1.1) node, > > this starts up without problems for a topic with 1 partition. However, if I > > create a topic with 3 partitions I’m getting the following exception > > shortly after the init() method of the Processor is called (init completes > > without problems): > > > > 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread > > [StreamThread-1] Failed to create an active task 0_1: > > org.apache.kafka.streams.errors.StreamsException: task [0_1] Store > > store1's change log (app1-store1-changelog) does not contain partition 1 > > at org.apache.kafka.streams.processor.internals. > > ProcessorStateManager.register(ProcessorStateManager.java:185) > > at org.apache.kafka.streams.processor.internals. > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > at org.apache.kafka.streams.state.internals.RocksDBStore. > > init(RocksDBStore.java:169) > > at org.apache.kafka.streams.state.internals. > > MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) > > at org.apache.kafka.streams.processor.internals.AbstractTask. > > initializeStateStores(AbstractTask.java:81) > > at org.apache.kafka.streams.processor.internals. > > StreamTask.<init>(StreamTask.java:119) > > > > The code is essentially: > > > > StateStoreSupplier testStore = Stores.create("store1") > > .withStringKeys() > > .withStringValues() > > .persistent() > > .build(); > > > > TopologyBuilder builder = new TopologyBuilder(); > > > > builder.addSource("source", "topic1") > > .addProcessor("process", TestProcessor::new, "source") > > .addStateStore(testStore, "process”); > > > > KafkaStreams streams = new KafkaStreams(builder, props); > > streams.start(); > > > > public static class TestProcessor implements Processor<String, String> { > > @Override > > public void init(ProcessorContext context) { > > context.getStateStore("store1"); > > System.out.println("Initialized"); > > } > > } > > > > Full source here: https://gist.github.com/adamw/ > > b5c69f86d8688da23afebd095683faaa <https://gist.github.com/> > > Full stack trace: https://gist.github.com/adamw/ > > f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/ > > f72cdf0c2f0d67425ed9c103a327f3bf> > > > > I would be grateful for any pointers! > > Adam > > > > -- > > Adam Warski > > > > http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> > > http://www.softwaremill.com <http://www.softwaremill.com/> > > http://www.warski.org <http://www.warski.org/> > > -- Adam Warski http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> http://www.softwaremill.com <http://www.softwaremill.com/> http://www.warski.org <http://www.warski.org/>