Hello Eno,
Yes, I have followed a similar code to setup the streams application. Does any 
other code inside the application affect the bootstrapping steps? 
I have a custom interface in which I populate Streams properties using 
environment variables. I attach the state store to the builder using:

builder.addProcessor(...)
builder.addStateStore(stateStoreSupplier, PROCESSOR_NAME)
…
builder.build()

The state store is created in this way:
Stores.create(name)
        .withKeys(keySerde)
        .withValues(valueSerde)
        .persistent()
        .build();

I am using the following streams configurations:
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
timeStampExtractor);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, keySerdes);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, valueSerdes);
props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, partitionGrouper);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer);
props.put(StreamsConfig.STATE_DIR_CONFIG, stateStoreDir);

Are there any new properties I am missing? I am not seeing any errors on the 
broker. Do I need to enable auto topic creation on the broker for this to work?

> On 07-Aug-2017, at 1:05 PM, Eno Thereska <eno.there...@gmail.com> wrote:
> 
> HI Anish,
> 
> Yeah, changing the input topic partitions at runtime could be problematic. 
> But it doesn’t seem like that’s what’s going on here. (For regex the 
> application it will work fine).
> 
> Are there any broker failures going on while test is running? Also, I wonder 
> about how the rest of your code looks like. There is some code here 
> https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java#L158
>  
> <https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java>
>  that shows how to create the state stores and initialize Kafka Streams and 
> the order of doing things. Could you please double check if it matches your 
> code?
> 
> Thanks
> Eno
> 
> 
>> On Aug 5, 2017, at 3:22 AM, Anish Mashankar <an...@systeminsights.com> wrote:
>> 
>> Hello Eno,
>> So, if I change the input topic partitions, it affects the ability of kafka
>> streams to find partitions for the state store changelog? I think I'm
>> missing something here.
>> In my case, the application was new, so it's for sure that there were no
>> changes.
>> Also, if I have regex for the input topic on kafka streams and a new topic
>> is added to kafka matching the regex, the application will break?
>> 
>> On Fri, Aug 4, 2017, 8:33 PM Eno Thereska <eno.there...@gmail.com 
>> <mailto:eno.there...@gmail.com>> wrote:
>> 
>>> Hi,
>>> 
>>> Could you check if this helps:
>>> 
>>> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
>>>  
>>> <https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition>
>>> <
>>> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
>>>  
>>> <https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition>
>>>> 
>>> 
>>> Thanks
>>> Eno
>>>> On Aug 4, 2017, at 12:48 PM, Anish Mashankar <an...@systeminsights.com>
>>> wrote:
>>>> 
>>>> Hello Eno,
>>>> Thanks for considering the question.
>>>> 
>>>> How I am creating the state stores:
>>>> 
>>>> StateStoreSupplier stateStoreSupplier =
>>>> 
>>> StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
>>>> TopologyBuilder builder = ...
>>>> builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");
>>>> 
>>>> The Error Message with stack trace is as follows:
>>>> 
>>>> 2017-08-04 17:11:23,184 53205
>>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
>>>> o.a.k.s.p.internals.StreamThread - stream-thread
>>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
>>>> active task -727063541_0 with assigned partitions [testing-topic-0]
>>>> 
>>>> 2017-08-04 17:11:23,185 53206
>>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
>>>> o.a.k.s.p.internals.StreamThread - stream-thread
>>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
>>>> assignment took 41778 ms.
>>>> current active tasks: []
>>>> current standby tasks: []
>>>> 
>>>> 2017-08-04 17:11:23,187 53208
>>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
>>>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
>>>> for group testing-2 failed on partition assignment
>>>> org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
>>>> change log (testing-2-testing-2-store-changelog) does not contain
>>> partition
>>>> 0
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
>>>> at
>>>> 
>>> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
>>>> at
>>>> 
>>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
>>>> at
>>>> 
>>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
>>>> at
>>>> 
>>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>>>> at
>>>> 
>>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>>>> at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>>>> at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>>>> 
>>>> I hope this shares more light on the situation.
>>>> Thanks
>>>> 
>>>> On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com
>>> <mailto:eno.there...@gmail.com <mailto:eno.there...@gmail.com>>> wrote:
>>>> 
>>>>> Hi Anish,
>>>>> 
>>>>> Could you give more info on how you create the state stores in your
>>> code?
>>>>> Also could you copy-paste the exact error message from the log?
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>>> On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com 
>>>>>> <mailto:an...@systeminsights.com>>
>>>>> wrote:
>>>>>> 
>>>>>> I have a new application, call it streamsApp with state stores S1 and
>>> S2.
>>>>>> So, according to the documentation, upon the first time startup, the
>>>>>> application should've created the changelog topics
>>>>> streamsApp-S1-changelog
>>>>>> and streamsApp-S2-changelog. But I see that these topics are not
>>> created.
>>>>>> Also, the application throws an error that it couldn't find any
>>> partition
>>>>>> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and
>>> then
>>>>>> exits*. *To get it working, I manually created the topics, but I am
>>>>>> skeptical because the docs say that this convention might change any
>>>>> time.
>>>>>> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message
>>>>>> protocol set to v0.10.0. Am I missing something?
>>>>>> --
>>>>>> 
>>>>>> Regards,
>>>>>> Anish Samir Mashankar
>>>>>> R&D Engineer
>>>>>> System Insights
>>>>>> +91-9789870733 <+91%2097898%2070733>
>>>>> 
>>>>> --
>>>> 
>>>> Regards,
>>>> Anish Samir Mashankar
>>>> R&D Engineer
>>>> System Insights
>>>> +91-9789870733
>>> 
>>> --
>> 
>> Regards,
>> Anish Samir Mashankar
>> R&D Engineer
>> System Insights
>> +91-9789870733
> 

Reply via email to