Hi all We keep having this issue. After increasing the fetch threads, we cleared the entire cluster, upgraded to 0.10.0.1, started all nodes, and all was well. We cannot reduce the fetch size, as it is equal to our max.message.size. Increasing the number of replica threads to a higher count increased the memory usage too much, causing countless out of heapspace / out of direct buffer memory exceptions. We have now set two fetch threads, which leaves some headroom. Our full config is pasted below [2].
Today, to make sure this issue was resolved we tried adding a fourth server to the cluster, and then reassigned all partitions. Unfortunately, the fourth node will not sync up. This is a snippet from its log file: ... [2016-09-05 16:13:52,296] WARN [ReplicaFetcherThread-0-2], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@318e6f11 (kafka.server.ReplicaFetcherThread) org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading field 'record_set': Error reading bytes of size 104856899, only 19862997 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:136) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-09-05 16:13:58,227] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5dfb502b (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 0 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-09-05 16:14:03,228] WARN [ReplicaFetcherThread-1-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@1831418c (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 0 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-09-05 16:14:14,350] WARN [ReplicaFetcherThread-1-1], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@560e37d5 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 1 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-09-05 16:14:20,803] WARN [ReplicaFetcherThread-0-1], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@4ebf97a4 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 1 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-09-05 16:14:27,274] WARN [ReplicaFetcherThread-1-2], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@e16a83c (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 2 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-09-05 16:14:32,202] WARN [ReplicaFetcherThread-0-2], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@75a17632 (kafka.server.ReplicaFetcherThread) org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading field 'record_set': Error reading bytes of size 104856899, only 23442710 bytes available at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) at org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:136) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-09-05 16:14:38,232] WARN [ReplicaFetcherThread-0-0], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@3b531475 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 0 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) ... These message appear continuously, the fourth node never becomes an ISR for topics on which messages are being actively produced. Some further searching has brought me to a JIRA issue [1], but this seems scheduled for 0.10.1.0 instead of 0.10.0.2? We are happy to build and test Kafka ourselves, should this fix already be available somewhere. Any advice is appreciated as we are a bit stuck. Thanks Wannes [1] https://issues.apache.org/jira/browse/KAFKA-3916 [2] server.properties: auto.create.topics.enable=true auto.leader.rebalance.enable=true background.threads=4 broker.id=3 controlled.shutdown.enable=true controlled.shutdown.max.retries=3 controlled.shutdown.retry.backoff.ms=5000 controller.message.queue.size=10 controller.socket.timeout.ms=30000 default.replication.factor=3 fetch.purgatory.purge.interval.requests=10000 leader.imbalance.check.interval.seconds=300 leader.imbalance.per.broker.percentage=10 log.cleaner.backoff.ms=15000 log.cleaner.dedupe.buffer.size=524288000 log.cleaner.delete.retention.ms=86400000 log.cleaner.enable=false log.cleaner.io.buffer.load.factor=0.9 log.cleaner.io.buffer.size=524288 log.cleaner.min.cleanable.ratio=0.5 log.cleaner.threads=1 log.cleanup.policy=delete log.delete.delay.ms=60000 log.dirs=/data/kafka log.flush.offset.checkpoint.interval.ms=60000 log.flush.scheduler.interval.ms=3000 log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.bytes=-1 log.retention.check.interval.ms=300000 log.retention.hours=168 log.retention.minutes=10080 log.roll.hours=168 log.segment.bytes=104857600 message.max.bytes=105906176 num.io.threads=8 num.network.threads=3 num.partitions=16 num.replica.fetchers=2 offset.metadata.max.bytes=1024 port=9092 producer.purgatory.purge.interval.requests=10000 queued.max.requests=500 replica.fetch.backoff.ms=5000 replica.fetch.max.bytes=105906176 replica.fetch.min.bytes=1 replica.fetch.wait.max.ms=5000 replica.high.watermark.checkpoint.interval.ms=5000 replica.lag.max.messages=4000 replica.lag.time.max.ms=60000 replica.socket.receive.buffer.bytes=65536 replica.socket.timeout.ms=30000 retention.ms=3600000 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 socket.send.buffer.bytes=102400 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 zookeeper.session.timeout.ms=6000 zookeeper.sync.time.ms=2000 On Wed, Aug 17, 2016 at 10:38 PM, Jun Rao <j...@confluent.io> wrote: > Jarko, > > Do you have many topic partitions? Currently, if #partitions * > fetched_bytes in the response exceeds 2GB, we will get an integer overflow > and weird things can happen. We are trying to address this better in > KIP-74. If this is the issue, for now, you can try reducing the fetch size > or increasing the replica fetch threads to work around the issue. > > Thanks, > > Jun > > On Wed, Aug 17, 2016 at 3:04 AM, J Mes <jarko...@gmail.com> wrote: > > > Hello, > > > > I have a cluster of 3 nodes running kafka v.0.10.0.0. This cluster was > > starter about a week ago with no data, no issues starting up. > > Today we noticed 1 of the servers in the cluster did not work anymore, we > > checked and indeed the server was not working anymore and all data was > old. > > > > We restarted the node without data, thinking it should sync up and then > > join the cluster again, but we keep getting the following error: > > > > [2016-08-17 12:02:23,620] WARN [ReplicaFetcherThread-0-1], Error in fetch > > kafka.server.ReplicaFetcherThread$FetchRequest@62b3e70c (kafka.server. > > ReplicaFetcherThread) > > org.apache.kafka.common.protocol.types.SchemaException: Error reading > > field 'responses': Error reading field 'partition_responses': Error > reading > > field 'record_set': Error reading bytes of size 104856430, only 18764961 > > bytes available > > at org.apache.kafka.common.protocol.types.Schema.read( > > Schema.java:73) > > at org.apache.kafka.clients.NetworkClient.parseResponse( > > NetworkClient.java:380) > > at org.apache.kafka.clients.NetworkClient. > handleCompletedReceives( > > NetworkClient.java:449) > > at org.apache.kafka.clients.NetworkClient.poll( > > NetworkClient.java:269) > > at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2( > > NetworkClientBlockingOps.scala:136) > > at kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > NetworkClientBlockingOps$$pollContinuously$extension( > > NetworkClientBlockingOps.scala:143) > > at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$ > > extension(NetworkClientBlockingOps.scala:80) > > at kafka.server.ReplicaFetcherThread.sendRequest( > > ReplicaFetcherThread.scala:244) > > at kafka.server.ReplicaFetcherThread.fetch( > > ReplicaFetcherThread.scala:229) > > at kafka.server.ReplicaFetcherThread.fetch( > > ReplicaFetcherThread.scala:42) > > at kafka.server.AbstractFetcherThread.processFetchRequest( > > AbstractFetcherThread.scala:107) > > at kafka.server.AbstractFetcherThread.doWork( > > AbstractFetcherThread.scala:98) > > at kafka.utils.ShutdownableThread.run( > ShutdownableThread.scala:63) > > > > All nodes are running the exact same version of zookepeer/kafka. > > > > When we clear all data from all nodes and start again, everything > works... > > > > Any idea anyone? > > > > Kr, > > Jarko Mesuere >