HI Anish,

Yeah, changing the input topic partitions at runtime could be problematic. But 
it doesn’t seem like that’s what’s going on here. (For regex the application it 
will work fine).

Are there any broker failures going on while test is running? Also, I wonder 
about how the rest of your code looks like. There is some code here 
https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java#L158
 
<https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java>
 that shows how to create the state stores and initialize Kafka Streams and the 
order of doing things. Could you please double check if it matches your code?

Thanks
Eno


> On Aug 5, 2017, at 3:22 AM, Anish Mashankar <an...@systeminsights.com> wrote:
> 
> Hello Eno,
> So, if I change the input topic partitions, it affects the ability of kafka
> streams to find partitions for the state store changelog? I think I'm
> missing something here.
> In my case, the application was new, so it's for sure that there were no
> changes.
> Also, if I have regex for the input topic on kafka streams and a new topic
> is added to kafka matching the regex, the application will break?
> 
> On Fri, Aug 4, 2017, 8:33 PM Eno Thereska <eno.there...@gmail.com 
> <mailto:eno.there...@gmail.com>> wrote:
> 
>> Hi,
>> 
>> Could you check if this helps:
>> 
>> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
>>  
>> <https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition>
>> <
>> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
>>  
>> <https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition>
>>> 
>> 
>> Thanks
>> Eno
>>> On Aug 4, 2017, at 12:48 PM, Anish Mashankar <an...@systeminsights.com>
>> wrote:
>>> 
>>> Hello Eno,
>>> Thanks for considering the question.
>>> 
>>> How I am creating the state stores:
>>> 
>>> StateStoreSupplier stateStoreSupplier =
>>> 
>> StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
>>> TopologyBuilder builder = ...
>>> builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");
>>> 
>>> The Error Message with stack trace is as follows:
>>> 
>>> 2017-08-04 17:11:23,184 53205
>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
>>> o.a.k.s.p.internals.StreamThread - stream-thread
>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
>>> active task -727063541_0 with assigned partitions [testing-topic-0]
>>> 
>>> 2017-08-04 17:11:23,185 53206
>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
>>> o.a.k.s.p.internals.StreamThread - stream-thread
>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
>>> assignment took 41778 ms.
>>> current active tasks: []
>>> current standby tasks: []
>>> 
>>> 2017-08-04 17:11:23,187 53208
>>> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
>>> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
>>> for group testing-2 failed on partition assignment
>>> org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
>>> change log (testing-2-testing-2-store-changelog) does not contain
>> partition
>>> 0
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
>>> at
>>> 
>> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
>>> at
>>> 
>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
>>> at
>>> 
>> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
>>> at
>>> 
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
>>> at
>>> 
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>>> at
>>> 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>>> at
>>> 
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>>> 
>>> I hope this shares more light on the situation.
>>> Thanks
>>> 
>>> On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com
>> <mailto:eno.there...@gmail.com <mailto:eno.there...@gmail.com>>> wrote:
>>> 
>>>> Hi Anish,
>>>> 
>>>> Could you give more info on how you create the state stores in your
>> code?
>>>> Also could you copy-paste the exact error message from the log?
>>>> 
>>>> Thanks
>>>> Eno
>>>>> On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com 
>>>>> <mailto:an...@systeminsights.com>>
>>>> wrote:
>>>>> 
>>>>> I have a new application, call it streamsApp with state stores S1 and
>> S2.
>>>>> So, according to the documentation, upon the first time startup, the
>>>>> application should've created the changelog topics
>>>> streamsApp-S1-changelog
>>>>> and streamsApp-S2-changelog. But I see that these topics are not
>> created.
>>>>> Also, the application throws an error that it couldn't find any
>> partition
>>>>> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and
>> then
>>>>> exits*. *To get it working, I manually created the topics, but I am
>>>>> skeptical because the docs say that this convention might change any
>>>> time.
>>>>> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message
>>>>> protocol set to v0.10.0. Am I missing something?
>>>>> --
>>>>> 
>>>>> Regards,
>>>>> Anish Samir Mashankar
>>>>> R&D Engineer
>>>>> System Insights
>>>>> +91-9789870733 <+91%2097898%2070733>
>>>> 
>>>> --
>>> 
>>> Regards,
>>> Anish Samir Mashankar
>>> R&D Engineer
>>> System Insights
>>> +91-9789870733
>> 
>> --
> 
> Regards,
> Anish Samir Mashankar
> R&D Engineer
> System Insights
> +91-9789870733

Reply via email to