Hello Eno, Thanks for considering the question. How I am creating the state stores:
StateStoreSupplier stateStoreSupplier = StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build(); TopologyBuilder builder = ... builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore"); The Error Message with stack trace is as follows: 2017-08-04 17:11:23,184 53205 [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - stream-thread [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created active task -727063541_0 with assigned partitions [testing-topic-0] 2017-08-04 17:11:23,185 53206 [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - stream-thread [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition assignment took 41778 ms. current active tasks: [] current standby tasks: [] 2017-08-04 17:11:23,187 53208 [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener for group testing-2 failed on partition assignment org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's change log (testing-2-testing-2-store-changelog) does not contain partition 0 at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201) at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) I hope this shares more light on the situation. Thanks On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com> wrote: > Hi Anish, > > Could you give more info on how you create the state stores in your code? > Also could you copy-paste the exact error message from the log? > > Thanks > Eno > > On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com> > wrote: > > > > I have a new application, call it streamsApp with state stores S1 and S2. > > So, according to the documentation, upon the first time startup, the > > application should've created the changelog topics > streamsApp-S1-changelog > > and streamsApp-S2-changelog. But I see that these topics are not created. > > Also, the application throws an error that it couldn't find any partition > > for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and then > > exits*. *To get it working, I manually created the topics, but I am > > skeptical because the docs say that this convention might change any > time. > > I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message > > protocol set to v0.10.0. Am I missing something? > > -- > > > > Regards, > > Anish Samir Mashankar > > R&D Engineer > > System Insights > > +91-9789870733 <+91%2097898%2070733> > > -- Regards, Anish Samir Mashankar R&D Engineer System Insights +91-9789870733