Hello Eno,
Thanks for considering the question.

How I am creating the state stores:

StateStoreSupplier stateStoreSupplier =
StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
TopologyBuilder builder = ...
builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");

The Error Message with stack trace is as follows:

2017-08-04 17:11:23,184 53205
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
 o.a.k.s.p.internals.StreamThread - stream-thread
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
active task -727063541_0 with assigned partitions [testing-topic-0]

2017-08-04 17:11:23,185 53206
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
 o.a.k.s.p.internals.StreamThread - stream-thread
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
assignment took 41778 ms.
current active tasks: []
current standby tasks: []

2017-08-04 17:11:23,187 53208
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
for group testing-2 failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
change log (testing-2-testing-2-store-changelog) does not contain partition
0
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

I hope this shares more light on the situation.
Thanks

On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska <eno.there...@gmail.com> wrote:

> Hi Anish,
>
> Could you give more info on how you create the state stores in your code?
> Also could you copy-paste the exact error message from the log?
>
> Thanks
> Eno
> > On Aug 4, 2017, at 9:05 AM, Anish Mashankar <an...@systeminsights.com>
> wrote:
> >
> > I have a new application, call it streamsApp with state stores S1 and S2.
> > So, according to the documentation, upon the first time startup, the
> > application should've created the changelog topics
> streamsApp-S1-changelog
> > and streamsApp-S2-changelog. But I see that these topics are not created.
> > Also, the application throws an error that it couldn't find any partition
> > for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and then
> > exits*. *To get it working, I manually created the topics, but I am
> > skeptical because the docs say that this convention might change any
> time.
> > I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message
> > protocol set to v0.10.0. Am I missing something?
> > --
> >
> > Regards,
> > Anish Samir Mashankar
> > R&D Engineer
> > System Insights
> > +91-9789870733 <+91%2097898%2070733>
>
> --

Regards,
Anish Samir Mashankar
R&D Engineer
System Insights
+91-9789870733

Reply via email to