Hi,

   I'm using kafka-0.8.1.1, this is a good log system, and I'm very appreciate 
for your works. But I'm also found some problem:

   1. producer:

     kafka.producer.async.DefaultEventHandler.scala handle(): I suggest that 
sendPartitionPerTopicCache should be cleared every batch, since It would not 
well distributed in every partition.

  2. consumer:

      kafka.consumer.SimpleConsumer.scala disconnect(): I suggest that we 
should delete "if(blockingChannel.isConnected)", since I came across this 
problem when switch broken down, and then broker was abnormally shutted down:

 2014-12-04 17:12:14,260 [ReplicaFetcherThread-7-8] ERROR 
kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-7-8], Error in fetch 
Name: FetchRequest; Version: 0; CorrelationId: 1069738; ClientId: 
ReplicaFetcherThread-7-8; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes; 
RequestInfo: [t.v.3,0] -> PartitionFetchInfo(0,1048576) 
java.nio.channels.UnresolvedAddressException at 
sun.nio.ch.Net.checkAddress(Net.java:29) at 
sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) at 
kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at 
kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at 
kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at 
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)





--


Best Regards,

Allen huang

Reply via email to