Hi, I'm using kafka-0.8.1.1, this is a good log system, and I'm very appreciate for your works. But I'm also found some problem:
1. producer: kafka.producer.async.DefaultEventHandler.scala handle(): I suggest that sendPartitionPerTopicCache should be cleared every batch, since It would not well distributed in every partition. 2. consumer: kafka.consumer.SimpleConsumer.scala disconnect(): I suggest that we should delete "if(blockingChannel.isConnected)", since I came across this problem when switch broken down, and then broker was abnormally shutted down: 2014-12-04 17:12:14,260 [ReplicaFetcherThread-7-8] ERROR kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-7-8], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1069738; ClientId: ReplicaFetcherThread-7-8; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [t.v.3,0] -> PartitionFetchInfo(0,1048576) java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:29) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) -- Best Regards, Allen huang