That definitely sounds unusual -- rebalancing normally only happens either when a) there are new workers or b) there are connectivity issues/failures. Is it possible there's something causing large latencies?
-Ewen On Sat, Jul 16, 2016 at 6:09 AM, Kristoffer Sjögren <sto...@gmail.com> wrote: > Hi > > I'm running Kafka Connect in distributed mode with the confluent HDFS > sink connector. > > But the WorkerSinkTask constantly gets interfered with rebalancing > requests from the broker (onPartitionsRevoked) [1] and gets stuck in a > recovery state where the brokers constantly logs "Preparing to > restabilize group ... for generation xx" around every 30 seconds. > > I have configured the connector with very high session timeouts and > low max poll records but it doesn't help. The topic have 100 > partitions and there are 3 brokers. Kafka connect runs two single core > machines. > > request.timeout.ms=310000 > heartbeat.interval.ms=60000 > session.timeout.ms=300000 > max.poll.records=1 > tasks.max=64 > > I'm not sure what else to tweak in order to make the problem go away. > > Cheers, > -Kristoffer > > > [1] > > [2016-07-16 12:52:52,668] ERROR Commit of > WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an > unexpected exception: > (org.apache.kafka.connect.runtime.WorkerSinkTask:180) > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between > subsequent calls to poll() was longer than the configured > session.timeout.ms, which typically implies that the poll loop is > spending too much time message processing. You can address this either > by increasing the session timeout or by reducing the maximum size of > batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > [2016-07-16 12:52:52,676] INFO Revoking previously assigned partitions > [sting_actions_impression-23, sting_actions_impression-21, > sting_actions_impression-22] for group > connect-hdfs-sink-sting-impression > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280) > [2016-07-16 12:52:52,679] INFO > WorkerSinkTask{id=hdfs-sink-sting-impression-18} Committing offsets > (org.apache.kafka.connect.runtime.WorkerSinkTask:244) > [2016-07-16 12:52:52,686] ERROR Commit of > WorkerSinkTask{id=hdfs-sink-sting-impression-18} offsets threw an > unexpected exception: > (org.apache.kafka.connect.runtime.WorkerSinkTask:180) > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be completed since the group has already rebalanced and assigned the > partitions to another member. This means that the time between > subsequent calls to poll() was longer than the configured > session.timeout.ms, which typically implies that the poll loop is > spending too much time message processing. You can address this either > by increasing the session timeout or by reducing the maximum size of > batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54) > at > org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > -- Thanks, Ewen