Following this answer, I checked that the auto-created "app1-store1-changelog” 
topic had 1 partition - which caused the problem.
Creating this topic upfront with 3 partitions (which matches the stream source 
partition count) fixes the problem.

However, I think this should be handled somehow differently … maybe the 
exception should report the partition count mismatch?  
Or the auto-created topic should use the partition count of the incoming 
streams (though I’m not sure that’s always possible - a store might be used 
multiple times).

Finally, is there a way to specify how many partitions should auto-created 
topics have when creating a state store? I tried with:

Map<String, String> storeCfg = new HashMap<>();
storeCfg.put("num.partitions", "3");

StateStoreSupplier testStore = Stores.create("store1")
        .withStringKeys()
        .withStringValues()
        .persistent()
        .enableLogging(storeCfg)
        .build();

but that didn’t help.

Thanks,
Adam

> On 13 Feb 2017, at 20:57, Adam Warski <a...@warski.org> wrote:
> 
> 
>> If you increase the number of partitions in the topic "topic1" after the
>> state store is created, you'd need to manually increase the number of
>> partitions in the "app1-store1-changelog" topic as well.  Or remove the
>> topic and let KS recreate it next run.  But, either way, hopefully you
>> don't need the data in it, 'cause it won't match the partitioning of the
>> input topic. :-)
> 
> I’m running these test on “clean” Kafka - only “topic1” is manually created 
> (with 1 or 3 partitions).
> Removing existing data was the first thing I did after getting the exception 
> :)
> (to be more specific I’m removing /tmp/zookeeper, /tmp/kafka-logs, 
> /tmp/kafka-streams)
> 
> Thanks,
> Adam
> 
>> On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski <a...@warski.org 
>> <mailto:a...@warski.org>> wrote:
>> 
>> > Hello,
>> >
>> > I have a simple example (or so it would seem) of a stream processor which
>> > uses a persistent state store. Testing on one local Kafka (0.10.1.1) node,
>> > this starts up without problems for a topic with 1 partition. However, if I
>> > create a topic with 3 partitions I’m getting the following exception
>> > shortly after the init() method of the Processor is called (init completes
>> > without problems):
>> >
>> > 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread
>> > [StreamThread-1] Failed to create an active task 0_1:
>> > org.apache.kafka.streams.errors.StreamsException: task [0_1] Store
>> > store1's change log (app1-store1-changelog) does not contain partition 1
>> >         at org.apache.kafka.streams.processor.internals.
>> > ProcessorStateManager.register(ProcessorStateManager.java:185)
>> >         at org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
>> >         at org.apache.kafka.streams.state.internals.RocksDBStore.
>> > init(RocksDBStore.java:169)
>> >         at org.apache.kafka.streams.state.internals.
>> > MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
>> >         at org.apache.kafka.streams.processor.internals.AbstractTask.
>> > initializeStateStores(AbstractTask.java:81)
>> >         at org.apache.kafka.streams.processor.internals.
>> > StreamTask.<init>(StreamTask.java:119)
>> >
>> > The code is essentially:
>> >
>> > StateStoreSupplier testStore = Stores.create("store1")
>> >         .withStringKeys()
>> >         .withStringValues()
>> >         .persistent()
>> >         .build();
>> >
>> > TopologyBuilder builder = new TopologyBuilder();
>> >
>> > builder.addSource("source", "topic1")
>> >         .addProcessor("process", TestProcessor::new, "source")
>> >         .addStateStore(testStore, "process”);
>> >
>> > KafkaStreams streams = new KafkaStreams(builder, props);
>> > streams.start();
>> >
>> > public static class TestProcessor implements Processor<String, String> {
>> >     @Override
>> >     public void init(ProcessorContext context) {
>> >         context.getStateStore("store1");
>> >         System.out.println("Initialized");
>> >     }
>> > }
>> >
>> > Full source here: https://gist.github.com/adamw/ 
>> > <https://gist.github.com/adamw/>
>> > b5c69f86d8688da23afebd095683faaa <https://gist.github.com/ 
>> > <https://gist.github.com/>>
>> > Full stack trace: https://gist.github.com/adamw/ 
>> > <https://gist.github.com/adamw/>
>> > f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/ 
>> > <https://gist.github.com/adamw/>
>> > f72cdf0c2f0d67425ed9c103a327f3bf>
>> >
>> > I would be grateful for any pointers!
>> > Adam
> 
> 
> -- 
> Adam Warski
> 
> http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
> http://www.softwaremill.com <http://www.softwaremill.com/>
> http://www.warski.org <http://www.warski.org/>

-- 
Adam Warski

http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>

Reply via email to