When you are shutting down the restart node, did you see any warn/error on the leader logs?
Guozhang On Fri, Apr 18, 2014 at 1:58 PM, Alex Demidko <alexan...@metamarkets.com>wrote: > Last time saw this exception when tried to use rebalance leadership with > kafka-preferred-replica-election.sh. That's what got in logs: > > LeaderNode: just kafka.common.KafkaException: This operation cannot be > completed on a complete request without any other exceptions. > > > RestartedNode: > > 2014-04-18 20:43:39,281 INFO [ReplicaFetcherThread-2-1] kafka.log.Log - > Rolled new log segment for 'loadtest-170' in 1 ms. > java.lang.OutOfMemoryError: Java heap space > Dumping heap to java_pid28305.hprof ... > Heap dump file created [13126900997 bytes in 11.728 secs] > 2014-04-18 20:44:02,299 INFO [main-SendThread] > org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard > from server in 28101ms for sessionid 0x7455b > 0b68302016, closing socket connection and attempting reconnect > 2014-04-18 20:44:02,328 ERROR [ReplicaFetcherThread-2-1] > kafka.network.BoundedByteBufferReceive - OOME with size 2022780218 > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) > at > > kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63) > at > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > at > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > (...) > 2014-04-18 20:44:41,212 ERROR [ZkClient-EventThread] > org.I0Itec.zkclient.ZkEventThread - Error handling event ZkEvent[New > session event sent to > kafka.controller.KafkaController$SessionExpirationListener@7ea9f7b8] > java.lang.IllegalStateException: Kafka scheduler has not been started > at > kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:116) > at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) > at > > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply$mcV$sp(KafkaController.scala:349) > at > > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:339) > at > > kafka.controller.KafkaController$$anonfun$onControllerResignation$1.apply(KafkaController.scala:339) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > > kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:339) > at > > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1068) > at > > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at > > kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1067) > at kafka.utils.Utils$.inLock(Utils.scala:538) > at > > kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1067) > at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > 2014-04-18 20:44:44,371 INFO [Thread-1-EventThread] > org.apache.zookeeper.ClientCnxn - EventThread shut down > 2014-04-18 20:44:55,526 INFO [ZkClient-EventThread] > kafka.server.KafkaHealthcheck - re-registering broker info in ZK for broker > 2 > 2014-04-18 20:44:58,692 INFO [ZkClient-EventThread] kafka.utils.ZkUtils$ - > Registered broker 2 at path /brokers/ids/2 with address > 2014-04-18 20:45:01,020 INFO [ZkClient-EventThread-15] > kafka.server.KafkaHealthcheck - done re-registering broker > 2014-04-18 20:45:07,996 INFO [ZkClient-EventThread-15] > kafka.server.KafkaHealthcheck - Subscribing to /brokers/topics path to > watch for new topics > 2014-04-18 20:45:07,998 WARN [kafka-request-handler-20] > state.change.logger - Broker 2 received invalid LeaderAndIsr request with > correlation id 11 from controller 1 epoch 13 with an older leader epoch 22 > for partition [loadtest,66], current leader epoch is 22 > (...) > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:376) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at > kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > at > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > Even if I restart RestartedNode it doesn't help, only the LeaderNode > restart helps which I'd really like to avoid because leader is the only > in-sync replica (and also there might be multiple leaders for different > partitions). > > Another question why there is an OOME on a RestartedNode. > > > > > > On Fri, Apr 18, 2014 at 1:30 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Alex, > > > > I think this is a bug on the FetchResponseSend class. Just to confirm, > > before the > > > > kafka.common.KafkaException: This operation cannot be completed on a > > complete request. > > > > do you see other warn/error logs on the current leader? > > > > Guozhang > > > > > > On Fri, Apr 18, 2014 at 11:57 AM, Alexander Demidko < > > alexan...@metamarkets.com> wrote: > > > > > Have tried to reproduce this error, and it occurs pretty consistently > > when > > > node being forcefully shutdown w/o graceful termination. When graceful > > > shutdown was successful no errors occur in a log when the instance was > > > rebooted starts. > > > > > > > > > On Fri, Apr 18, 2014 at 11:17 AM, Alex Demidko < > > alexan...@metamarkets.com > > > >wrote: > > > > > > > These on alive node: > > > > > > > > 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] > > > > state.change.logger - Controller 2 epoch 8 encountered error while > > > electing > > > > leader for partition [loadtest,143] due to: Preferred replica 1 for > > > > partition [loadtest,143] is either not alive or not in the isr. > Current > > > > leader and ISR: [{"leader":2,"leader_epoch":11,"isr":[2]}]. > > > > 2014-04-17 21:36:29,276 ERROR [ZkClient-EventThread-15] > > > > state.change.logger - Controller 2 epoch 8 initiated state change for > > > > partition [loadtest,143] from OnlinePartition to OnlinePartition > failed > > > > > > > > 2014-04-18 00:38:50,014 ERROR [Controller-2-to-broker-1-send-thread] > > > > kafka.controller.RequestSendThread - > > > > [Controller-2-to-broker-1-send-thread], Controller 2's connection to > > > broker > > > > id:1,host:<RESTARTED_BROKER_IP>,port:9092 was unsuccessful > > > > 2014-04-18 00:38:50,314 ERROR [Controller-2-to-broker-1-send-thread] > > > > kafka.controller.RequestSendThread - > > > > [Controller-2-to-broker-1-send-thread], Controller 2 epoch 8 failed > to > > > send > > > > UpdateMetadata request with correlation id 3854 to broker > > > > id:1,host:<RESTARTED_BROKER_IP>,port:9092. Reconnecting to broker. > > > > > > > > > > > > On Apr 18, 2014, at 10:41 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > Any errors from the controller/state-change log? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Fri, Apr 18, 2014 at 9:57 AM, Alex Demidko < > > > alexan...@metamarkets.com > > > > >wrote: > > > > > > > > > >> Hi, > > > > >> > > > > >> I'm performing a producing load test on two node kafka cluster > built > > > > from > > > > >> the last 0.8.1 branch sources. I have topic loadtest with > > replication > > > > >> factor 2 and 256 partitions. Initially both brokers are in ISR and > > > > >> leadership is balanced. When in the middle of the load test one > > broker > > > > was > > > > >> restarted (wasn't able to go with controlled shutdown in specified > > > time > > > > and > > > > >> was killed), I started receiving following errors which as far as > I > > > > >> understand coming from replication: > > > > >> > > > > >> > > > > >> On restarted broker > > > > >> > > > > >> 2014-04-18 16:15:02,214 ERROR [ReplicaFetcherThread-5-2] > > > > >> kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-5-2], > > Error > > > in > > > > >> fetch Name: FetchRequest; Version: 0; CorrelationId: 52890; > > ClientId: > > > > >> ReplicaFetcherThread-5-2; ReplicaId: 1; MaxWait: 1000 ms; > MinBytes: > > 1 > > > > >> bytes; RequestInfo: [loadtest2,71] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,85] -> > > > > >> PartitionFetchInfo(113676000,104857600),[loadtest,189] -> > > > > >> PartitionFetchInfo(112277000,104857600),[loadtest,21] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,205] -> > > > > >> PartitionFetchInfo(112986000,104857600),[loadtest,141] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,253] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,77] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,61] -> > > > > >> PartitionFetchInfo(112490000,104857600),[loadtest,229] -> > > > > >> PartitionFetchInfo(112805000,104857600),[loadtest,133] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest2,15] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest2,63] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,181] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,5] -> > > > > >> PartitionFetchInfo(112530000,104857600),[loadtest,29] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,45] -> > > > > >> PartitionFetchInfo(113113000,104857600),[loadtest2,39] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,37] -> > > > > >> PartitionFetchInfo(112145000,104857600),[loadtest,13] -> > > > > >> PartitionFetchInfo(112915000,104857600),[loadtest,237] -> > > > > >> PartitionFetchInfo(112896000,104857600),[loadtest,149] -> > > > > >> PartitionFetchInfo(113232000,104857600),[loadtest,117] -> > > > > >> PartitionFetchInfo(113100000,104857600),[loadtest,157] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,165] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,101] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,93] -> > > > > >> PartitionFetchInfo(113025000,104857600),[loadtest,125] -> > > > > >> PartitionFetchInfo(112896000,104857600),[loadtest,197] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,109] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,245] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,213] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,53] -> > > > > >> PartitionFetchInfo(0,104857600),[loadtest,173] -> > > > > >> PartitionFetchInfo(112757000,104857600),[loadtest,69] -> > > > > >> PartitionFetchInfo(112378000,104857600),[loadtest,221] -> > > > > >> PartitionFetchInfo(0,104857600) > > > > >> java.io.EOFException: Received -1 when reading from channel, > socket > > > has > > > > >> likely been closed. > > > > >> at kafka.utils.Utils$.read(Utils.scala:376) > > > > >> at > > > > >> > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > > > >> at > > > > >> kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > > > >> at > > > > >> > > > > > > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > > > >> at > > > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > > > >> at > > > > >> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) > > > > >> at > > > > >> > > > > > > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > > > >> at > > > > >> > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > > > > >> at > > > > >> > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > > >> at > > > > >> > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > > > > >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > >> at > > > > >> > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > > > > >> at > > > > >> > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > > >> at > > > > >> > > > > > > > > > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > > > > >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > >> at > > > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > > > > >> at > > > > >> > > > > > > > > > > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96) > > > > >> at > > > > >> > > > > > > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) > > > > >> at > > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) > > > > >> 2014-04-18 16:15:02,215 WARN [ReplicaFetcherThread-1-2] > > > > >> kafka.consumer.SimpleConsumer - Reconnect due to socket error: > null > > > > >> > > > > >> > > > > >> On current leader > > > > >> > > > > >> 2014-04-18 16:15:10,235 ERROR [kafka-processor-9092-1] > > > > >> kafka.network.Processor - Closing socket for /10.41.133.59because > > of > > > > >> error > > > > >> kafka.common.KafkaException: This operation cannot be completed > on a > > > > >> complete request. > > > > >> at > > > > >> > > > > kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) > > > > >> at > > > > >> > > kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) > > > > >> at > > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) > > > > >> at kafka.network.Processor.write(SocketServer.scala:375) > > > > >> at kafka.network.Processor.run(SocketServer.scala:247) > > > > >> at java.lang.Thread.run(Thread.java:744) > > > > >> > > > > >> > > > > >> These errors are constantly bubbling up in logs and restarted > broker > > > > never > > > > >> made it back to ISR even when the load test was stopped. > > > > >> > > > > >> > > > > >> Kafka configuration: > > > > >> > > > > >> host.name = #{IP_ADDR} > > > > >> port = 9092 > > > > >> socket.send.buffer.bytes = 1048576 > > > > >> socket.receive.buffer.bytes = 1048576 > > > > >> socket.request.max.bytes = 104857600 > > > > >> num.io.threads = 24 > > > > >> queued.max.requests = 1024 > > > > >> fetch.purgatory.purge.interval.requests = 1024 > > > > >> producer.purgatory.purge.interval.requests = 1024 > > > > >> > > > > >> > > > > >> > > > > >> broker.id = #{BROKER_ID} > > > > >> > > > > >> log.flush.interval.messages = 100000 > > > > >> log.flush.scheduler.interval.ms = 1000 > > > > >> log.flush.interval.ms = 2000 > > > > >> > > > > >> log.dirs = \ > > > > >> > > > > >> > > > > > > > > > > /mnt1/tmp/kafka-logs,/mnt2/tmp/kafka-logs,/mnt3/tmp/kafka-logs,/mnt4/tmp/kafka-logs,\ > > > > >> > > > > >> > > > > > > > > > > /mnt5/tmp/kafka-logs,/mnt6/tmp/kafka-logs,/mnt7/tmp/kafka-logs,/mnt8/tmp/kafka-logs,\ > > > > >> > > > > >> > > > > > > > > > > /mnt9/tmp/kafka-logs,/mnt10/tmp/kafka-logs,/mnt11/tmp/kafka-logs,/mnt12/tmp/kafka-logs,\ > > > > >> > > > > >> > > > > > > > > > > /mnt13/tmp/kafka-logs,/mnt14/tmp/kafka-logs,/mnt15/tmp/kafka-logs,/mnt16/tmp/kafka-logs,\ > > > > >> > > > > >> > > > > > > > > > > /mnt17/tmp/kafka-logs,/mnt18/tmp/kafka-logs,/mnt19/tmp/kafka-logs,/mnt20/tmp/kafka-logs,\ > > > > >> > > > > >> > > > > > > > > > > /mnt21/tmp/kafka-logs,/mnt22/tmp/kafka-logs,/mnt23/tmp/kafka-logs,/mnt24/tmp/kafka-logs > > > > >> > > > > >> log.segment.bytes = 1000000000 > > > > >> log.roll.hours = 1 > > > > >> > > > > >> log.retention.minutes = 10080 > > > > >> log.retention.check.interval.ms = 300000 > > > > >> log.cleanup.policy = delete > > > > >> > > > > >> log.index.size.max.bytes = 10485760 > > > > >> > > > > >> num.partitions = 256 > > > > >> auto.create.topics.enable = false > > > > >> > > > > >> > > > > >> default.replication.factor = 2 > > > > >> replica.lag.time.max.ms = 15000 > > > > >> replica.lag.max.messages = 750000 > > > > >> num.replica.fetchers = 8 > > > > >> replica.socket.timeout.ms = 30000 > > > > >> replica.socket.receive.buffer.bytes = 1048576 > > > > >> replica.fetch.max.bytes = 104857600 > > > > >> replica.fetch.wait.max.ms = 1000 > > > > >> replica.fetch.min.bytes = 1 > > > > >> replica.high.watermark.checkpoint.interval.ms = 5000 > > > > >> > > > > >> controlled.shutdown.enable = true > > > > >> controlled.shutdown.max.retries = 1 > > > > >> controlled.shutdown.retry.backoff.ms = 300000 > > > > >> > > > > >> auto.leader.rebalance.enable = true > > > > >> leader.imbalance.per.broker.percentage = 10 > > > > >> leader.imbalance.check.interval.seconds = 300 > > > > >> > > > > >> > > > > >> zookeeper.connect = #{ZK_PATH} > > > > >> zookeeper.session.timeout.ms = 30000 > > > > >> zookeeper.connection.timeout.ms = 30000 > > > > >> > > > > >> > > > > >> Why this might happen? > > > > >> > > > > >> > > > > >> Thanks, > > > > >> Alex > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang