Roger Hoover created KAFKA-3752:
-----------------------------------
Summary: Provide a way for KStreams to recover from unclean
shutdown
Key: KAFKA-3752
URL: https://issues.apache.org/jira/browse/KAFKA-3752
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 0.10.0.0
Reporter: Roger Hoover
Assignee: Guozhang Wang
If a KStream application gets killed with SIGKILL (e.g. by the Linux OOM
Killer), it may leave behind lock files and fail to recover.
It would be useful to have an options (say --force) to tell KStreams to proceed
even if it finds old LOCK files.
{noformat}
[2016-05-24 17:37:52,886] ERROR Failed to create an active task #0_0 in thread
[StreamThread-1]:
(org.apache.kafka.streams.processor.internals.StreamThread:583)
org.apache.kafka.streams.errors.ProcessorStateException: Error while creating
the state manager
at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.io.IOException: Failed to lock the state directory:
/data/test/2/kafka-streams/test-2/0_0
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
... 32 more
{noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)