Hello Eno, So, if I change the input topic partitions, it affects the ability of kafka streams to find partitions for the state store changelog? I think I'm missing something here. In my case, the application was new, so it's for sure that there were no changes. Also, if I have regex for the input topic on kafka streams and a new topic is added to kafka matching the regex, the application will break?
On Fri, Aug 4, 2017, 8:33 PM Eno Thereska <eno.there...@gmail.com> wrote: > Hi, > > Could you check if this helps: > > https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition > < > https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition > > > > Thanks > Eno > > On Aug 4, 2017, at 12:48 PM, Anish Mashankar <an...@systeminsights.com> > wrote: > > > > Hello Eno, > > Thanks for considering the question. > > > > How I am creating the state stores: > > > > StateStoreSupplier stateStoreSupplier = > > > StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build(); > > TopologyBuilder builder = ... > > builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore"); > > > > The Error Message with stack trace is as follows: > > > > 2017-08-04 17:11:23,184 53205 > > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO > > o.a.k.s.p.internals.StreamThread - stream-thread > > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created > > active task -727063541_0 with assigned partitions [testing-topic-0] > > > > 2017-08-04 17:11:23,185 53206 > > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO > > o.a.k.s.p.internals.StreamThread - stream-thread > > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition > > assignment took 41778 ms. > > current active tasks: [] > > current standby tasks: [] > > > > 2017-08-04 17:11:23,187 53208 > > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR > > o.a.k.c.c.i.ConsumerCoordinator - User provided listener > > > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > > for group testing-2 failed on partition assignment > > org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's > > change log (testing-2-testing-2-store-changelog) does not contain > partition > > 0 > > at > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87) > > at > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165) > > at > > > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100) > > at > > > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177) > > at > > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40) > > at > > > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57) > > at > > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) > > at > > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) > > at > > > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) > > at > > > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201) > > at > > > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > > at > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > > at > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > > at > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > > at > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > > > > I hope this shares more light on the situation. > > Thanks > > > > On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com > <mailto:eno.there...@gmail.com>> wrote: > > > >> Hi Anish, > >> > >> Could you give more info on how you create the state stores in your > code? > >> Also could you copy-paste the exact error message from the log? > >> > >> Thanks > >> Eno > >>> On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com> > >> wrote: > >>> > >>> I have a new application, call it streamsApp with state stores S1 and > S2. > >>> So, according to the documentation, upon the first time startup, the > >>> application should've created the changelog topics > >> streamsApp-S1-changelog > >>> and streamsApp-S2-changelog. But I see that these topics are not > created. > >>> Also, the application throws an error that it couldn't find any > partition > >>> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and > then > >>> exits*. *To get it working, I manually created the topics, but I am > >>> skeptical because the docs say that this convention might change any > >> time. > >>> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message > >>> protocol set to v0.10.0. Am I missing something? > >>> -- > >>> > >>> Regards, > >>> Anish Samir Mashankar > >>> R&D Engineer > >>> System Insights > >>> +91-9789870733 <+91%2097898%2070733> > >> > >> -- > > > > Regards, > > Anish Samir Mashankar > > R&D Engineer > > System Insights > > +91-9789870733 > > -- Regards, Anish Samir Mashankar R&D Engineer System Insights +91-9789870733