Following this answer, I checked that the auto-created "app1-store1-changelog”
topic had 1 partition - which caused the problem.
Creating this topic upfront with 3 partitions (which matches the stream source
partition count) fixes the problem.
However, I think this should be handled somehow differently … maybe the
exception should report the partition count mismatch?
Or the auto-created topic should use the partition count of the incoming
streams (though I’m not sure that’s always possible - a store might be used
multiple times).
Finally, is there a way to specify how many partitions should auto-created
topics have when creating a state store? I tried with:
Map<String, String> storeCfg = new HashMap<>();
storeCfg.put("num.partitions", "3");
StateStoreSupplier testStore = Stores.create("store1")
.withStringKeys()
.withStringValues()
.persistent()
.enableLogging(storeCfg)
.build();
but that didn’t help.
Thanks,
Adam
> On 13 Feb 2017, at 20:57, Adam Warski <[email protected]> wrote:
>
>
>> If you increase the number of partitions in the topic "topic1" after the
>> state store is created, you'd need to manually increase the number of
>> partitions in the "app1-store1-changelog" topic as well. Or remove the
>> topic and let KS recreate it next run. But, either way, hopefully you
>> don't need the data in it, 'cause it won't match the partitioning of the
>> input topic. :-)
>
> I’m running these test on “clean” Kafka - only “topic1” is manually created
> (with 1 or 3 partitions).
> Removing existing data was the first thing I did after getting the exception
> :)
> (to be more specific I’m removing /tmp/zookeeper, /tmp/kafka-logs,
> /tmp/kafka-streams)
>
> Thanks,
> Adam
>
>> On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> > Hello,
>> >
>> > I have a simple example (or so it would seem) of a stream processor which
>> > uses a persistent state store. Testing on one local Kafka (0.10.1.1) node,
>> > this starts up without problems for a topic with 1 partition. However, if I
>> > create a topic with 3 partitions I’m getting the following exception
>> > shortly after the init() method of the Processor is called (init completes
>> > without problems):
>> >
>> > 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread
>> > [StreamThread-1] Failed to create an active task 0_1:
>> > org.apache.kafka.streams.errors.StreamsException: task [0_1] Store
>> > store1's change log (app1-store1-changelog) does not contain partition 1
>> > at org.apache.kafka.streams.processor.internals.
>> > ProcessorStateManager.register(ProcessorStateManager.java:185)
>> > at org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
>> > at org.apache.kafka.streams.state.internals.RocksDBStore.
>> > init(RocksDBStore.java:169)
>> > at org.apache.kafka.streams.state.internals.
>> > MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
>> > at org.apache.kafka.streams.processor.internals.AbstractTask.
>> > initializeStateStores(AbstractTask.java:81)
>> > at org.apache.kafka.streams.processor.internals.
>> > StreamTask.<init>(StreamTask.java:119)
>> >
>> > The code is essentially:
>> >
>> > StateStoreSupplier testStore = Stores.create("store1")
>> > .withStringKeys()
>> > .withStringValues()
>> > .persistent()
>> > .build();
>> >
>> > TopologyBuilder builder = new TopologyBuilder();
>> >
>> > builder.addSource("source", "topic1")
>> > .addProcessor("process", TestProcessor::new, "source")
>> > .addStateStore(testStore, "process”);
>> >
>> > KafkaStreams streams = new KafkaStreams(builder, props);
>> > streams.start();
>> >
>> > public static class TestProcessor implements Processor<String, String> {
>> > @Override
>> > public void init(ProcessorContext context) {
>> > context.getStateStore("store1");
>> > System.out.println("Initialized");
>> > }
>> > }
>> >
>> > Full source here: https://gist.github.com/adamw/
>> > <https://gist.github.com/adamw/>
>> > b5c69f86d8688da23afebd095683faaa <https://gist.github.com/
>> > <https://gist.github.com/>>
>> > Full stack trace: https://gist.github.com/adamw/
>> > <https://gist.github.com/adamw/>
>> > f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/
>> > <https://gist.github.com/adamw/>
>> > f72cdf0c2f0d67425ed9c103a327f3bf>
>> >
>> > I would be grateful for any pointers!
>> > Adam
>
>
> --
> Adam Warski
>
> http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
> http://www.softwaremill.com <http://www.softwaremill.com/>
> http://www.warski.org <http://www.warski.org/>
--
Adam Warski
http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>