Following this answer, I checked that the auto-created "app1-store1-changelog” topic had 1 partition - which caused the problem. Creating this topic upfront with 3 partitions (which matches the stream source partition count) fixes the problem.
However, I think this should be handled somehow differently … maybe the exception should report the partition count mismatch? Or the auto-created topic should use the partition count of the incoming streams (though I’m not sure that’s always possible - a store might be used multiple times). Finally, is there a way to specify how many partitions should auto-created topics have when creating a state store? I tried with: Map<String, String> storeCfg = new HashMap<>(); storeCfg.put("num.partitions", "3"); StateStoreSupplier testStore = Stores.create("store1") .withStringKeys() .withStringValues() .persistent() .enableLogging(storeCfg) .build(); but that didn’t help. Thanks, Adam > On 13 Feb 2017, at 20:57, Adam Warski <a...@warski.org> wrote: > > >> If you increase the number of partitions in the topic "topic1" after the >> state store is created, you'd need to manually increase the number of >> partitions in the "app1-store1-changelog" topic as well. Or remove the >> topic and let KS recreate it next run. But, either way, hopefully you >> don't need the data in it, 'cause it won't match the partitioning of the >> input topic. :-) > > I’m running these test on “clean” Kafka - only “topic1” is manually created > (with 1 or 3 partitions). > Removing existing data was the first thing I did after getting the exception > :) > (to be more specific I’m removing /tmp/zookeeper, /tmp/kafka-logs, > /tmp/kafka-streams) > > Thanks, > Adam > >> On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski <a...@warski.org >> <mailto:a...@warski.org>> wrote: >> >> > Hello, >> > >> > I have a simple example (or so it would seem) of a stream processor which >> > uses a persistent state store. Testing on one local Kafka (0.10.1.1) node, >> > this starts up without problems for a topic with 1 partition. However, if I >> > create a topic with 3 partitions I’m getting the following exception >> > shortly after the init() method of the Processor is called (init completes >> > without problems): >> > >> > 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread >> > [StreamThread-1] Failed to create an active task 0_1: >> > org.apache.kafka.streams.errors.StreamsException: task [0_1] Store >> > store1's change log (app1-store1-changelog) does not contain partition 1 >> > at org.apache.kafka.streams.processor.internals. >> > ProcessorStateManager.register(ProcessorStateManager.java:185) >> > at org.apache.kafka.streams.processor.internals. >> > ProcessorContextImpl.register(ProcessorContextImpl.java:123) >> > at org.apache.kafka.streams.state.internals.RocksDBStore. >> > init(RocksDBStore.java:169) >> > at org.apache.kafka.streams.state.internals. >> > MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) >> > at org.apache.kafka.streams.processor.internals.AbstractTask. >> > initializeStateStores(AbstractTask.java:81) >> > at org.apache.kafka.streams.processor.internals. >> > StreamTask.<init>(StreamTask.java:119) >> > >> > The code is essentially: >> > >> > StateStoreSupplier testStore = Stores.create("store1") >> > .withStringKeys() >> > .withStringValues() >> > .persistent() >> > .build(); >> > >> > TopologyBuilder builder = new TopologyBuilder(); >> > >> > builder.addSource("source", "topic1") >> > .addProcessor("process", TestProcessor::new, "source") >> > .addStateStore(testStore, "process”); >> > >> > KafkaStreams streams = new KafkaStreams(builder, props); >> > streams.start(); >> > >> > public static class TestProcessor implements Processor<String, String> { >> > @Override >> > public void init(ProcessorContext context) { >> > context.getStateStore("store1"); >> > System.out.println("Initialized"); >> > } >> > } >> > >> > Full source here: https://gist.github.com/adamw/ >> > <https://gist.github.com/adamw/> >> > b5c69f86d8688da23afebd095683faaa <https://gist.github.com/ >> > <https://gist.github.com/>> >> > Full stack trace: https://gist.github.com/adamw/ >> > <https://gist.github.com/adamw/> >> > f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/ >> > <https://gist.github.com/adamw/> >> > f72cdf0c2f0d67425ed9c103a327f3bf> >> > >> > I would be grateful for any pointers! >> > Adam > > > -- > Adam Warski > > http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> > http://www.softwaremill.com <http://www.softwaremill.com/> > http://www.warski.org <http://www.warski.org/> -- Adam Warski http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> http://www.softwaremill.com <http://www.softwaremill.com/> http://www.warski.org <http://www.warski.org/>