This looks very similar to the error and stacktrace I see when reproducing https://issues.apache.org/jira/browse/KAFKA-1196 -- that's an overflow where the data returned in a FetchResponse exceeds 2GB. (It triggers the error you're seeing because FetchResponse's size overflows to become negative, which breaks tests for whether data has finished sending.) I haven't tested against 0.8.1.1, but it looks identical modulo line #'s. If it's the same issue, unfortunately it won't fix itself, so that log will just keep growing with more error messages as the consumer keeps reconnecting, requesting data, then triggering the error in the broker which forcibly disconnects the consumer.
I'm not certain what to suggest here since KAFKA-1196 still needs a lot of refinement. But given the 0.8.1.1 code I don't think there's much choice but to try to reduce the amount of data that will be returned. One way to do that is is to reduce the # of partitions read in the FetchRequest (i.e. make sure FetchRequests address fewer TopicAndPartitions, maybe putting each TopicAndPartition in its own request). An alternative would be to use more recent offsets (i.e. don't start from the oldest data available in Kafka). A recent enough offset should result in a < 2GB response. -Ewen On Sat, Oct 18, 2014, at 12:07 AM, xingcan wrote: > Hi, all > > Recently, I upgrade my Kafka cluster to 0.8.1.1 and set replication with > num.replica.fetchers=5. Last night, there's something wrong with the > network. Soon, I found the server.log files (not data log!) on every node > reached 4GB in an hour. > I am not sure if it's my inappropriate configuration or other reason. Can > anybody help me with this. Thanks~ > > log file tail > ============================================ > [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because > of > error (kafka.network.Processor) > kafka.common.KafkaException: This operation cannot be completed on a > complete request. > at > kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) > at > kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) > at kafka.network.Processor.write(SocketServer.scala:375) > at kafka.network.Processor.run(SocketServer.scala:247) > at java.lang.Thread.run(Thread.java:745) > [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because > of > error (kafka.network.Processor) > kafka.common.KafkaException: This operation cannot be completed on a > complete request. > at > kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) > at > kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) > at kafka.network.Processor.write(SocketServer.scala:375) > at kafka.network.Processor.run(SocketServer.scala:247) > at java.lang.Thread.run(Thread.java:745) > [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.65 because > of > error (kafka.network.Processor) > kafka.common.KafkaException: This operation cannot be completed on a > complete request. > at > kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34) > at > kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191) > at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214) > at kafka.network.Processor.write(SocketServer.scala:375) > at kafka.network.Processor.run(SocketServer.scala:247) > at java.lang.Thread.run(Thread.java:745) > > > > -- > *Xingcan*