[ https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313385#comment-15313385 ]
Guozhang Wang commented on KAFKA-3752: -------------------------------------- [~theduderog] I looked through the file locking issue. After the application was crashed, all resources including the file locks should be released by the OS automatically. But there is a race condition during rebalance which could cause one thread re-joining the group much earlier than others to trying to grab the lock of its migrated tasks while the other threads have not released the tasks, hence the lock. But to make sure your encountered issues is the same as I described (KAFKA-3758), I have a couple of questions for you: 1. In your case, how many KafkaStreams instances are you running, and how many threads per each instance? 2. When you have time, could you re-run your application and upload the full log to this ticket upon seeing this issue again? I tried a simple dummy application with SIGKILL but that cannot re-produce this issue. > Provide a way for KStreams to recover from unclean shutdown > ----------------------------------------------------------- > > Key: KAFKA-3752 > URL: https://issues.apache.org/jira/browse/KAFKA-3752 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.0.0 > Reporter: Roger Hoover > Assignee: Guozhang Wang > Labels: architecture > > If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM > Killer), it may leave behind lock files and fail to recover. > It would be useful to have an options (say --force) to tell KStreams to > proceed even if it finds old LOCK files. > {noformat} > [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in > thread [StreamThread-1]: > (org.apache.kafka.streams.processor.internals.StreamThread:583) > org.apache.kafka.streams.errors.ProcessorStateException: Error while creating > the state manager > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > Caused by: java.io.IOException: Failed to lock the state directory: > /data/test/2/kafka-streams/test-2/0_0 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95) > at > org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69) > ... 32 more > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)