> If you increase the number of partitions in the topic "topic1" after the
> state store is created, you'd need to manually increase the number of
> partitions in the "app1-store1-changelog" topic as well.  Or remove the
> topic and let KS recreate it next run.  But, either way, hopefully you
> don't need the data in it, 'cause it won't match the partitioning of the
> input topic. :-)

I’m running these test on “clean” Kafka - only “topic1” is manually created 
(with 1 or 3 partitions).
Removing existing data was the first thing I did after getting the exception :)
(to be more specific I’m removing /tmp/zookeeper, /tmp/kafka-logs, 
/tmp/kafka-streams)

Thanks,
Adam

> Mathieu
> 
> 
> On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski <a...@warski.org> wrote:
> 
> > Hello,
> >
> > I have a simple example (or so it would seem) of a stream processor which
> > uses a persistent state store. Testing on one local Kafka (0.10.1.1) node,
> > this starts up without problems for a topic with 1 partition. However, if I
> > create a topic with 3 partitions I’m getting the following exception
> > shortly after the init() method of the Processor is called (init completes
> > without problems):
> >
> > 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread
> > [StreamThread-1] Failed to create an active task 0_1:
> > org.apache.kafka.streams.errors.StreamsException: task [0_1] Store
> > store1's change log (app1-store1-changelog) does not contain partition 1
> >         at org.apache.kafka.streams.processor.internals.
> > ProcessorStateManager.register(ProcessorStateManager.java:185)
> >         at org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> >         at org.apache.kafka.streams.state.internals.RocksDBStore.
> > init(RocksDBStore.java:169)
> >         at org.apache.kafka.streams.state.internals.
> > MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
> >         at org.apache.kafka.streams.processor.internals.AbstractTask.
> > initializeStateStores(AbstractTask.java:81)
> >         at org.apache.kafka.streams.processor.internals.
> > StreamTask.<init>(StreamTask.java:119)
> >
> > The code is essentially:
> >
> > StateStoreSupplier testStore = Stores.create("store1")
> >         .withStringKeys()
> >         .withStringValues()
> >         .persistent()
> >         .build();
> >
> > TopologyBuilder builder = new TopologyBuilder();
> >
> > builder.addSource("source", "topic1")
> >         .addProcessor("process", TestProcessor::new, "source")
> >         .addStateStore(testStore, "process”);
> >
> > KafkaStreams streams = new KafkaStreams(builder, props);
> > streams.start();
> >
> > public static class TestProcessor implements Processor<String, String> {
> >     @Override
> >     public void init(ProcessorContext context) {
> >         context.getStateStore("store1");
> >         System.out.println("Initialized");
> >     }
> > }
> >
> > Full source here: https://gist.github.com/adamw/
> > b5c69f86d8688da23afebd095683faaa <https://gist.github.com/>
> > Full stack trace: https://gist.github.com/adamw/
> > f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/
> > f72cdf0c2f0d67425ed9c103a327f3bf>
> >
> > I would be grateful for any pointers!
> > Adam
> >
> > --
> > Adam Warski
> >
> > http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
> > http://www.softwaremill.com <http://www.softwaremill.com/>
> > http://www.warski.org <http://www.warski.org/>
> >



-- 
Adam Warski

http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>

Reply via email to