Hello Eno, Yes, I have followed a similar code to setup the streams application. Does any other code inside the application affect the bootstrapping steps? I have a custom interface in which I populate Streams properties using environment variables. I attach the state store to the builder using:
builder.addProcessor(...) builder.addStateStore(stateStoreSupplier, PROCESSOR_NAME) … builder.build() The state store is created in this way: Stores.create(name) .withKeys(keySerde) .withValues(valueSerde) .persistent() .build(); I am using the following streams configurations: props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, timeStampExtractor); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdes); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdes); props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, partitionGrouper); props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer); props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir); Are there any new properties I am missing? I am not seeing any errors on the broker. Do I need to enable auto topic creation on the broker for this to work? > On 07-Aug-2017, at 1:05 PM, Eno Thereska <eno.there...@gmail.com> wrote: > > HI Anish, > > Yeah, changing the input topic partitions at runtime could be problematic. > But it doesn’t seem like that’s what’s going on here. (For regex the > application it will work fine). > > Are there any broker failures going on while test is running? Also, I wonder > about how the rest of your code looks like. There is some code here > https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java#L158 > > <https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java> > that shows how to create the state stores and initialize Kafka Streams and > the order of doing things. Could you please double check if it matches your > code? > > Thanks > Eno > > >> On Aug 5, 2017, at 3:22 AM, Anish Mashankar <an...@systeminsights.com> wrote: >> >> Hello Eno, >> So, if I change the input topic partitions, it affects the ability of kafka >> streams to find partitions for the state store changelog? I think I'm >> missing something here. >> In my case, the application was new, so it's for sure that there were no >> changes. >> Also, if I have regex for the input topic on kafka streams and a new topic >> is added to kafka matching the regex, the application will break? >> >> On Fri, Aug 4, 2017, 8:33 PM Eno Thereska <eno.there...@gmail.com >> <mailto:eno.there...@gmail.com>> wrote: >> >>> Hi, >>> >>> Could you check if this helps: >>> >>> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition >>> >>> <https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition> >>> < >>> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition >>> >>> <https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition> >>>> >>> >>> Thanks >>> Eno >>>> On Aug 4, 2017, at 12:48 PM, Anish Mashankar <an...@systeminsights.com> >>> wrote: >>>> >>>> Hello Eno, >>>> Thanks for considering the question. >>>> >>>> How I am creating the state stores: >>>> >>>> StateStoreSupplier stateStoreSupplier = >>>> >>> StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build(); >>>> TopologyBuilder builder = ... >>>> builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore"); >>>> >>>> The Error Message with stack trace is as follows: >>>> >>>> 2017-08-04 17:11:23,184 53205 >>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO >>>> o.a.k.s.p.internals.StreamThread - stream-thread >>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created >>>> active task -727063541_0 with assigned partitions [testing-topic-0] >>>> >>>> 2017-08-04 17:11:23,185 53206 >>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO >>>> o.a.k.s.p.internals.StreamThread - stream-thread >>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition >>>> assignment took 41778 ms. >>>> current active tasks: [] >>>> current standby tasks: [] >>>> >>>> 2017-08-04 17:11:23,187 53208 >>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR >>>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener >>>> for group testing-2 failed on partition assignment >>>> org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's >>>> change log (testing-2-testing-2-store-changelog) does not contain >>> partition >>>> 0 >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100) >>>> at >>>> >>> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177) >>>> at >>>> >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40) >>>> at >>>> >>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57) >>>> at >>>> >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) >>>> at >>>> >>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) >>>> >>>> I hope this shares more light on the situation. >>>> Thanks >>>> >>>> On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com >>> <mailto:eno.there...@gmail.com <mailto:eno.there...@gmail.com>>> wrote: >>>> >>>>> Hi Anish, >>>>> >>>>> Could you give more info on how you create the state stores in your >>> code? >>>>> Also could you copy-paste the exact error message from the log? >>>>> >>>>> Thanks >>>>> Eno >>>>>> On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com >>>>>> <mailto:an...@systeminsights.com>> >>>>> wrote: >>>>>> >>>>>> I have a new application, call it streamsApp with state stores S1 and >>> S2. >>>>>> So, according to the documentation, upon the first time startup, the >>>>>> application should've created the changelog topics >>>>> streamsApp-S1-changelog >>>>>> and streamsApp-S2-changelog. But I see that these topics are not >>> created. >>>>>> Also, the application throws an error that it couldn't find any >>> partition >>>>>> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and >>> then >>>>>> exits*. *To get it working, I manually created the topics, but I am >>>>>> skeptical because the docs say that this convention might change any >>>>> time. >>>>>> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message >>>>>> protocol set to v0.10.0. Am I missing something? >>>>>> -- >>>>>> >>>>>> Regards, >>>>>> Anish Samir Mashankar >>>>>> R&D Engineer >>>>>> System Insights >>>>>> +91-9789870733 <+91%2097898%2070733> >>>>> >>>>> -- >>>> >>>> Regards, >>>> Anish Samir Mashankar >>>> R&D Engineer >>>> System Insights >>>> +91-9789870733 >>> >>> -- >> >> Regards, >> Anish Samir Mashankar >> R&D Engineer >> System Insights >> +91-9789870733 >