[ 
https://issues.apache.org/jira/browse/KAFKA-3752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15313385#comment-15313385
 ] 

Guozhang Wang commented on KAFKA-3752:
--------------------------------------

[~theduderog] I looked through the file locking issue. After the application 
was crashed, all resources including the file locks should be released by the 
OS automatically. But there is a race condition during rebalance which could 
cause one thread re-joining the group much earlier than others to trying to 
grab the lock of its migrated tasks while the other threads have not released 
the tasks, hence the lock. But to make sure your encountered issues is the same 
as I described (KAFKA-3758), I have a couple of questions for you:

1. In your case, how many KafkaStreams instances are you running, and how many 
threads per each instance?
2. When you have time, could you re-run your application and upload the full 
log to this ticket upon seeing this issue again? I tried a simple dummy 
application with SIGKILL but that cannot re-produce this issue.

> Provide a way for KStreams to recover from unclean shutdown
> -----------------------------------------------------------
>
>                 Key: KAFKA-3752
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3752
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.0.0
>            Reporter: Roger Hoover
>            Assignee: Guozhang Wang
>              Labels: architecture
>
> If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM 
> Killer), it may leave behind lock files and fail to recover.
> It would be useful to have an options (say --force) to tell KStreams to 
> proceed even if it finds old LOCK files.
> {noformat}
> [2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in 
> thread [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:583)
> org.apache.kafka.streams.errors.ProcessorStateException: Error while creating 
> the state manager
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>       at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> Caused by: java.io.IOException: Failed to lock the state directory: 
> /data/test/2/kafka-streams/test-2/0_0
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
>       ... 32 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to