Hi, For the producer, we have moved to a new producer API (under org.apache.kafka.clients.producer.KafkaProducer). Please feel free to give that a spin and report any issues that you see.
I think the consumer issue you reported is being discussed in another thread and is fixed in 0.8.2. Can you check if you still see the issue in 0.8.2-beta <http://kafka.apache.org/downloads.html>? On Sun, Dec 14, 2014 at 10:08 PM, 黄震 <skyhuang...@163.com> wrote: > > Hi, > > I'm using kafka-0.8.1.1, this is a good log system, and I'm very > appreciate for your works. But I'm also found some problem: > > 1. producer: > > kafka.producer.async.DefaultEventHandler.scala handle(): I suggest > that sendPartitionPerTopicCache should be cleared every batch, since It > would not well distributed in every partition. > > 2. consumer: > > kafka.consumer.SimpleConsumer.scala disconnect(): I suggest that we > should delete "if(blockingChannel.isConnected)", since I came across this > problem when switch broken down, and then broker was abnormally shutted > down: > > 2014-12-04 17:12:14,260 [ReplicaFetcherThread-7-8] ERROR > kafka.server.ReplicaFetcherThread - [ReplicaFetcherThread-7-8], Error in > fetch Name: FetchRequest; Version: 0; CorrelationId: 1069738; ClientId: > ReplicaFetcherThread-7-8; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 1 bytes; > RequestInfo: [t.v.3,0] -> PartitionFetchInfo(0,1048576) > java.nio.channels.UnresolvedAddressException at > sun.nio.ch.Net.checkAddress(Net.java:29) at > sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:512) at > kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) at > kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) at > kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57) at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > > > > > > -- > > > Best Regards, > > Allen huang -- Thanks, Neha