[ https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15298949#comment-15298949 ]
Guozhang Wang commented on KAFKA-3752: -------------------------------------- We are adding locks to the directory only for the case where there are multiple stream threads on the same machine, when one thread is accessing (hence owning) this directory and the other is cleaning it. We should revisit this issue and see if there is a better solution than using locks here. > Provide a way for KStreams to recover from unclean shutdown > ----------------------------------------------------------- > > Key: KAFKA-3752 > URL: https://issues.apache.org/jira/browse/KAFKA-3752 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Roger Hoover > Assignee: Guozhang Wang > Labels: architecture > > If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM > Killer), it may leave behind lock files and fail to recover. > It would be useful to have an options (say --force) to tell KStreams to > proceed even if it finds old LOCK files. > {noformat} > [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in > thread [StreamThread-1]: > (org.apache.kafka.streams.processor.internals.StreamThread:583) > org.apache.kafka.streams.errors.ProcessorStateException: Error while creating > the state manager > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > Caused by: java.io.IOException: Failed to lock the state directory: > /data/test/2/kafka-streams/test-2/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95) > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) > ... 32 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)