Hello,
I have a simple example (or so it would seem) of a stream processor which uses
a persistent state store. Testing on one local Kafka (0.10.1.1) node, this
starts up without problems for a topic with 1 partition. However, if I create a
topic with 3 partitions I’m getting the following exception shortly after the
init() method of the Processor is called (init completes without problems):
2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread [StreamThread-1]
Failed to create an active task 0_1:
org.apache.kafka.streams.errors.StreamsException: task [0_1] Store store1's
change log (app1-store1-changelog) does not contain partition 1
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:185)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:169)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:85)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:119)
The code is essentially:
StateStoreSupplier testStore = Stores.create("store1")
.withStringKeys()
.withStringValues()
.persistent()
.build();
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source", "topic1")
.addProcessor("process", TestProcessor::new, "source")
.addStateStore(testStore, "process”);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
public static class TestProcessor implements Processor<String, String> {
@Override
public void init(ProcessorContext context) {
context.getStateStore("store1");
System.out.println("Initialized");
}
}
Full source here:
https://gist.github.com/adamw/b5c69f86d8688da23afebd095683faaa
<https://gist.github.com/>
Full stack trace:
https://gist.github.com/adamw/f72cdf0c2f0d67425ed9c103a327f3bf
<https://gist.github.com/adamw/f72cdf0c2f0d67425ed9c103a327f3bf>
I would be grateful for any pointers!
Adam
--
Adam Warski
http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>