Not sure why the locks on the state directory got not release (maybe because of the crash) -- what version do you use? We fixed a couple of bug with this regard lately -- maybe it's fixed in upcoming 0.10.2
For now, you might want to delete the whole state directory (either manually or by calling KafkaStreams#cleanup() before you call KafkaStreams#start()) to get rid of the locks. This will force a state recreation at start of course -- but there is nothing else you can do. For state recreation the store's changelog topic will be used. However, it's hard to monitor the progress, because the restore consumer does not commit any offsets to Kafka. -Matthias On 2/1/17 12:08 PM, Ara Ebrahimi wrote: > Hi, > > My kafka-streams application crashed due to a rebalance event (seems like I > need to increase max.poll.interval.ms even more!) and then when I restarted > the app I noticed existing rocksdb files were gone and while the rest of the > pipeline was processing the part dealing with ktable was sitting there doing > nothing for minutes, util it crashed again complaining about this: > > 2017-02-01 11:48:35 ERROR StreamPipeline:160 - An exception has occurred. > Shutting down the pipeline... > org.apache.kafka.streams.errors.StreamsException: stream-thread > [StreamThread-10] Failed to rebalance > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task > [2_2] Error while creating the state manager > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) > ... 1 more > Caused by: java.io.IOException: task [2_2] Failed to lock the state > directory: /kafka/1/kafka-streams/streams-app/2_2 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101) > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) > ... 13 more > > > It seems to be it was trying to recreate the rocksdb files from scratch, > right? How can I fix this situation and make it recover? > > Which topic is used for recreating rocksdb files? The changelog or > repartition? How can I know how much progress has been made recreating these > files? Which offsets are meaningful? > > Thanks, > Ara. > > > > ________________________________ > > This message is for the designated recipient only and may contain privileged, > proprietary, or otherwise confidential information. If you have received it > in error, please notify the sender immediately and delete the original. Any > other use of the e-mail by you is prohibited. Thank you in advance for your > cooperation. > > ________________________________ >
signature.asc
Description: OpenPGP digital signature