Can you try this out with 0.10.2 branch or current trunk? We put some fixed like you suggested already. Would be nice to get feedback if those fixed resolve the issue for you.
Some more comments inline. -Matthias On 2/13/17 12:27 PM, Adam Warski wrote: > Following this answer, I checked that the auto-created > "app1-store1-changelog” topic had 1 partition - which caused the problem. > Creating this topic upfront with 3 partitions (which matches the stream > source partition count) fixes the problem. Makes sense -- however, I am wondering why manually creating the topic is required. After you removed all data from /tmp/[zk|logs|streams] the topic should be deleted and thus should get created with 3 partitions... > > However, I think this should be handled somehow differently … maybe the > exception should report the partition count mismatch? Should happen for 0.10.2 > Or the auto-created topic should use the partition count of the incoming > streams (though I’m not sure that’s always possible - a store might be used > multiple times). This should happen. If not, there is a bug. Can you reliable reproduce this issue? > > Finally, is there a way to specify how many partitions should auto-created > topics have when creating a state store? I tried with: The number of partitions are computed based on the number of tasks, that depend on the number of input partitions. Thus, you cannot configure it. Because the topic must have a specific number of partitions go guarantee correct results, there is nothing to "tune" and thus you cannot configure anything. > > Map<String, String> storeCfg = new HashMap<>(); > storeCfg.put("num.partitions", "3"); > > StateStoreSupplier testStore = Stores.create("store1") > .withStringKeys() > .withStringValues() > .persistent() > .enableLogging(storeCfg) > .build(); > > but that didn’t help. > > Thanks, > Adam > >> On 13 Feb 2017, at 20:57, Adam Warski <a...@warski.org> wrote: >> >> >>> If you increase the number of partitions in the topic "topic1" after the >>> state store is created, you'd need to manually increase the number of >>> partitions in the "app1-store1-changelog" topic as well. Or remove the >>> topic and let KS recreate it next run. But, either way, hopefully you >>> don't need the data in it, 'cause it won't match the partitioning of the >>> input topic. :-) >> >> I’m running these test on “clean” Kafka - only “topic1” is manually created >> (with 1 or 3 partitions). >> Removing existing data was the first thing I did after getting the exception >> :) >> (to be more specific I’m removing /tmp/zookeeper, /tmp/kafka-logs, >> /tmp/kafka-streams) >> >> Thanks, >> Adam >> >>> On Mon, Feb 13, 2017 at 11:59 AM, Adam Warski <a...@warski.org >>> <mailto:a...@warski.org>> wrote: >>> >>>> Hello, >>>> >>>> I have a simple example (or so it would seem) of a stream processor which >>>> uses a persistent state store. Testing on one local Kafka (0.10.1.1) node, >>>> this starts up without problems for a topic with 1 partition. However, if I >>>> create a topic with 3 partitions I’m getting the following exception >>>> shortly after the init() method of the Processor is called (init completes >>>> without problems): >>>> >>>> 2017-02-13 18:41:18 ERROR StreamThread:666 - stream-thread >>>> [StreamThread-1] Failed to create an active task 0_1: >>>> org.apache.kafka.streams.errors.StreamsException: task [0_1] Store >>>> store1's change log (app1-store1-changelog) does not contain partition 1 >>>> at org.apache.kafka.streams.processor.internals. >>>> ProcessorStateManager.register(ProcessorStateManager.java:185) >>>> at org.apache.kafka.streams.processor.internals. >>>> ProcessorContextImpl.register(ProcessorContextImpl.java:123) >>>> at org.apache.kafka.streams.state.internals.RocksDBStore. >>>> init(RocksDBStore.java:169) >>>> at org.apache.kafka.streams.state.internals. >>>> MeteredKeyValueStore.init(MeteredKeyValueStore.java:85) >>>> at org.apache.kafka.streams.processor.internals.AbstractTask. >>>> initializeStateStores(AbstractTask.java:81) >>>> at org.apache.kafka.streams.processor.internals. >>>> StreamTask.<init>(StreamTask.java:119) >>>> >>>> The code is essentially: >>>> >>>> StateStoreSupplier testStore = Stores.create("store1") >>>> .withStringKeys() >>>> .withStringValues() >>>> .persistent() >>>> .build(); >>>> >>>> TopologyBuilder builder = new TopologyBuilder(); >>>> >>>> builder.addSource("source", "topic1") >>>> .addProcessor("process", TestProcessor::new, "source") >>>> .addStateStore(testStore, "process”); >>>> >>>> KafkaStreams streams = new KafkaStreams(builder, props); >>>> streams.start(); >>>> >>>> public static class TestProcessor implements Processor<String, String> { >>>> @Override >>>> public void init(ProcessorContext context) { >>>> context.getStateStore("store1"); >>>> System.out.println("Initialized"); >>>> } >>>> } >>>> >>>> Full source here: https://gist.github.com/adamw/ >>>> <https://gist.github.com/adamw/> >>>> b5c69f86d8688da23afebd095683faaa <https://gist.github.com/ >>>> <https://gist.github.com/>> >>>> Full stack trace: https://gist.github.com/adamw/ >>>> <https://gist.github.com/adamw/> >>>> f72cdf0c2f0d67425ed9c103a327f3bf <https://gist.github.com/adamw/ >>>> <https://gist.github.com/adamw/> >>>> f72cdf0c2f0d67425ed9c103a327f3bf> >>>> >>>> I would be grateful for any pointers! >>>> Adam >> >> >> -- >> Adam Warski >> >> http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski> >> http://www.softwaremill.com <http://www.softwaremill.com/> >> http://www.warski.org <http://www.warski.org/> >
signature.asc
Description: OpenPGP digital signature