Re: Message Encoding in Kafka 0.8

2013-07-20 Thread prashant amar
Can anybody please help with this issue indicated below. I have not heard
from anyone with a solution.

Thanks in advance
Amar


On Tue, Jul 16, 2013 at 11:35 PM, prashant amar  wrote:

> Hello,
>
> Specified below is my code base where I am attempting to marshall a
> complex type and receiving the follow error.
> Am I missing anything here?
>
>  Sending Encoded Messages ..
> [error] (run-main) java.lang.ClassCastException: java.lang.String cannot
> be cast to com.test.groups.MemberRecord
> java.lang.ClassCastException: java.lang.String cannot be cast to
> com.test.groups.MemberRecord
>
> ~
>
> package com.test.groups
>
> import java.util._
> import kafka.javaapi.producer.Producer
> import kafka.producer.KeyedMessage
> import kafka.producer.ProducerConfig
> import kafka.serializer.Encoder
> import kafka.message.Message
> import java.io.ByteArrayOutputStream
> import java.io.DataOutputStream
> import kafka.serializer.Decoder
> import java.io.ByteArrayInputStream
> import java.io.DataInputStream
> import kafka.utils.VerifiableProperties
>
>
>
> class MemberRecord(val memberId: Int, val name: String, val location:
> String) {
>   override def toString = {
> "(" + memberId + "," + name + "," + location + ")"
>   }
> }
>
> class MemberRecordEncoder(props: VerifiableProperties = null) extends
> Encoder[MemberRecord] {
>   def toBytes(member: MemberRecord): Array[Byte] = {
> val outputStream = new ByteArrayOutputStream()
> val dos = new DataOutputStream(outputStream)
> dos.writeInt(member.memberId)
> dos.writeUTF(member.name)
> dos.writeUTF(member.location)
> outputStream.flush
> outputStream.toByteArray
>   }
> }
>
> class MemberRecordDecoder(props: VerifiableProperties = null) extends
> Decoder[MemberRecord] {
>   def fromBytes(messageByte: Array[Byte]):MemberRecord = {
> val message = new Message(messageByte)
> val inputStream = new ByteArrayInputStream(message.payload.array,
> message.payload.arrayOffset, message.payload.limit)
> val dataInputStream = new DataInputStream(inputStream)
> new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
> dataInputStream.readUTF)
>   }
> }
>
>
> object SimpleDataProducer {
>   def main(args: Array[String]) {
> val events = 100
>
> val eprops = new Properties
> eprops.put("metadata.broker.list", "localhost:9092")
> eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
> eprops.put("request.required.acks", "1")
>
>
> val econfg = new ProducerConfig(eprops)
> val eproducer = new Producer[String, MemberRecord](econfg)
>
> val dataP = new SimpleDataProducer
> println(" Sending Encoded Messages .. ")
> dataP.sendEncodedMessage(10,eproducer)
>
> println(" Shutting down Producer ")
> eproducer.close
> println(" Successfully shut down Producer ")
>   }
> }
>
> class SimpleDataProducer {
>
>   val rnd = new Random
>
>   def sendEncodedMessage(nEvents: Int, producer: Producer[String,
> MemberRecord]) {
> for (nEvents <- 0 to nEvents) {
> val message = new MemberRecord(rnd.nextInt(255), "John", "US")
> val producerData = new KeyedMessage[String,
> MemberRecord]("topic-Encoded", "encode", message)
> producer.send(producerData)
> }
>   }
>
> }
>
>
>


Message Encoding in Kafka 0.8

2013-07-20 Thread prashant amar
I noticed that ProducerData has been removed in 0.8 branch

If I'd wish to send a complex message type (encoded message) ,

how would I do it in 0.8?


In 0.7.X (a snippet from Neha's example)

   val producer = new Producer[Message, MemberRecord](config);


// send a single message
val message = new MemberRecord(1, "John", "US")
val producerData = new ProducerData[Message,
MemberRecord]("member-records", message)
producer.send(producerData)

class MemberRecord(val memberId: Int, val name: String, val location: String) {
  override def toString = {
"(" + memberId + "," + name + "," + location + ")"
  }
}

class MemberRecordEncoder extends Encoder[MemberRecord] {
  def toMessage(member: MemberRecord):Message = {
val outputStream = new ByteArrayOutputStream()
val dos = new DataOutputStream(outputStream)
dos.writeInt(member.memberId)
dos.writeUTF(member.name)
dos.writeUTF(member.location)
outputStream.flush

new Message(outputStream.toByteArray)
  }


Re: Message Encoding in Kafka 0.8

2013-07-20 Thread prashant amar
Hello,

Specified below is my code base where I am attempting to marshall a complex
type and receiving the follow error.
Am I missing anything here?

 Sending Encoded Messages ..
[error] (run-main) java.lang.ClassCastException: java.lang.String cannot be
cast to com.test.groups.MemberRecord
java.lang.ClassCastException: java.lang.String cannot be cast to
com.test.groups.MemberRecord

~

package com.test.groups

import java.util._
import kafka.javaapi.producer.Producer
import kafka.producer.KeyedMessage
import kafka.producer.ProducerConfig
import kafka.serializer.Encoder
import kafka.message.Message
import java.io.ByteArrayOutputStream
import java.io.DataOutputStream
import kafka.serializer.Decoder
import java.io.ByteArrayInputStream
import java.io.DataInputStream
import kafka.utils.VerifiableProperties



class MemberRecord(val memberId: Int, val name: String, val location:
String) {
  override def toString = {
"(" + memberId + "," + name + "," + location + ")"
  }
}

class MemberRecordEncoder(props: VerifiableProperties = null) extends
Encoder[MemberRecord] {
  def toBytes(member: MemberRecord): Array[Byte] = {
val outputStream = new ByteArrayOutputStream()
val dos = new DataOutputStream(outputStream)
dos.writeInt(member.memberId)
dos.writeUTF(member.name)
dos.writeUTF(member.location)
outputStream.flush
outputStream.toByteArray
  }
}

class MemberRecordDecoder(props: VerifiableProperties = null) extends
Decoder[MemberRecord] {
  def fromBytes(messageByte: Array[Byte]):MemberRecord = {
val message = new Message(messageByte)
val inputStream = new ByteArrayInputStream(message.payload.array,
message.payload.arrayOffset, message.payload.limit)
val dataInputStream = new DataInputStream(inputStream)
new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF,
dataInputStream.readUTF)
  }
}


object SimpleDataProducer {
  def main(args: Array[String]) {
val events = 100

val eprops = new Properties
eprops.put("metadata.broker.list", "localhost:9092")
eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder");
eprops.put("request.required.acks", "1")


val econfg = new ProducerConfig(eprops)
val eproducer = new Producer[String, MemberRecord](econfg)

val dataP = new SimpleDataProducer
println(" Sending Encoded Messages .. ")
dataP.sendEncodedMessage(10,eproducer)

println(" Shutting down Producer ")
eproducer.close
println(" Successfully shut down Producer ")
  }
}

class SimpleDataProducer {

  val rnd = new Random

  def sendEncodedMessage(nEvents: Int, producer: Producer[String,
MemberRecord]) {
for (nEvents <- 0 to nEvents) {
val message = new MemberRecord(rnd.nextInt(255), "John", "US")
val producerData = new KeyedMessage[String,
MemberRecord]("topic-Encoded", "encode", message)
producer.send(producerData)
}
  }

}


Sample Kafka Producer/Consumer example in Scala

2013-07-20 Thread prashant amar
Hello All

I have been attempting to build an end-to-end example of producer with
multiple consumers using scala.
I managed to build a producer, but when I attempt to build a consumer the
following statement causes the scala REPL to block/hang

val consumerMap = consumer.createMessageStreams(topicCountMap)

Am I missing anything here?

Can anybody please point me to a simple producer with multiple consumers
example in scala?

*Please do not point me to ConsumerConsumer or MirrorMaker as both of them
are fairly complex to even look at (given that I am a novice scala
programmer)*

Any help will be appreciated?

Thanks in advance
Amar


Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
A Design Question that needs verification:

1. Created a topic T with 'n' partitions.
2. Created a consumer group process with 'n + 1' threads subscribing from
topic 'T' with a groupID 'y'
3. Added another consumer group process with 'n + 1' threads subscribing
from same topic 'T' with same groupID 'y'

On doing so, I noticed that the previous consumer group stops consuming and
the new consumer beings to consume

I was attempting to model on demand parallelization in an event where an
consumer group cannot keep up with the events produced. Rather than
increase the threadpool capacity in the same process, it would make sense
to distribute the load across multiple processes.

Advice please?

Regards
Amardeep


Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
Also attempted another pattern where

1. Created a topic T with 'n' partitions.
2. Created a consumer group process with 'n + 1' threads subscribing from
topic 'T' with a groupID 'y'
3. Added another consumer group process with 'n + 1' threads subscribing
from same topic 'T' with same groupID 'z'
(Note that 2 and 3 subscribe from same topic but different groups)

Can a single topic with multiple partitions abetted with multiple consumer
groups increase parallelism is consumption?








On Wed, Sep 11, 2013 at 4:48 PM, prashant amar  wrote:

> A Design Question that needs verification:
>
> 1. Created a topic T with 'n' partitions.
> 2. Created a consumer group process with 'n + 1' threads subscribing from
> topic 'T' with a groupID 'y'
> 3. Added another consumer group process with 'n + 1' threads subscribing
> from same topic 'T' with same groupID 'y'
>
> On doing so, I noticed that the previous consumer group stops consuming
> and the new consumer beings to consume
>
> I was attempting to model on demand parallelization in an event where an
> consumer group cannot keep up with the events produced. Rather than
> increase the threadpool capacity in the same process, it would make sense
> to distribute the load across multiple processes.
>
> Advice please?
>
> Regards
> Amardeep
>


Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
Also noticed another issue

Specified below is the current configuration

Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)

Notice that I have used the same naming convention on the consumer group
set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
topics.

On calling the *ConsumerOffsetChecker* API, I am receiving a
ClosedChannelException

(Check Trace Below)

Is there any namespace collision occurring here ? This issue is
reproducible with the following setup above


*bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
--zkconnect localhost:2181*


2013-09-12 01:01:59,701] INFO Initiating client connection,
connectString=localhost:2181 sessionTimeout=3
watcher=org.I0Itec.zkclient.ZkClient@3af0ce45(org.apache.zookeeper.ZooKeeper)
[2013-09-12 01:01:59,724] INFO Opening socket connection to server
localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,732] INFO Socket connection established to localhost/
127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,741] INFO Session establishment complete on server
localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout
= 3 (org.apache.zookeeper.ClientCnxn)
[2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
Group   Topic  Pid Offset  logSize
Lag Owner
gr2 pe10   129985  130625
   640 none
gr2 pe11   0   0
0   none
gr2 pe20   130493  130493
   0   gr2_ip-XX-6c6f5d94-0
[2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
 (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
 at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
 at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
 at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
 at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
 at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
gr2 pe21   0   0
0   gr2_ip-XXX-6c6f5d94-1
[2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed
(org.apache.zookeeper.ZooKeeper)
[2013-09-12 01:02:00,526] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)







On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede 
 wrote:

> I think you are hitting this -
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F
>
> Let us know if we can improve the documentation to make it clearer.
>
> Thanks,
> Neha
>
>
> On Wed, Sep 11, 2013 at 5:28 PM, prashant amar 
> wrote:
>
> > Also attempted another pattern where
> >
> > 1. Created a topic T with 'n' partitions.
> > 2. Created a consumer group process with 'n + 1' threads subscribing from
> > topic 'T' with a groupID 'y'
> > 3. Added another consumer group process with 'n + 1' threads subscribing
> > from same topic 'T' with same groupID 'z'
> > (Note that 2 and 3 subscribe from same topic but different groups)
> >
> > Can a single topic with multiple partitions abetted

Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
>From the broker log:


INFO Reconnect due to socket error:  (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)


On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao  wrote:

> This means the broker somehow closed the socket connection. Anything in the
> broker log around the same time?
>
> Thanks,
>
> Jun
>
>
> On Wed, Sep 11, 2013 at 6:07 PM, prashant amar 
> wrote:
>
> > Also noticed another issue
> >
> > Specified below is the current configuration
> >
> > Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> > Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
> >
> > Notice that I have used the same naming convention on the consumer group
> > set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
> > topics.
> >
> > On calling the *ConsumerOffsetChecker* API, I am receiving a
> > ClosedChannelException
> >
> > (Check Trace Below)
> >
> > Is there any namespace collision occurring here ? This issue is
> > reproducible with the following setup above
> >
> >
> > *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
> > --zkconnect localhost:2181*
> >
> >
> > 2013-09-12 01:01:59,701] INFO Initiating client connection,
> > connectString=localhost:2181 sessionTimeout=3
> > watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-09-12 01:01:59,724] INFO Opening socket connection to server
> > localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
> > [2013-09-12 01:01:59,732] INFO Socket connection established to
> localhost/
> > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> > [2013-09-12 01:01:59,741] INFO Session establishment complete on server
> > localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
> > timeout
> > = 3 (org.apache.zookeeper.ClientCnxn)
> > [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
> > (org.I0Itec.zkclient.ZkClient)
> > Group   Topic  Pid Offset
>  logSize
> > Lag Owner
> > gr2 pe10   129985  130625
> >640 none
> > gr2 pe11   0   0
> > 0   none
> > gr2 pe20   130493  130493
> >0   gr2_ip-XX-6c6f5d94-0
> > [2013-09-12 01:02:00,514] INFO Reconnect due to socket error:
> >  (kafka.consumer.SimpleConsumer)
> > java.nio.channels.ClosedChannelException
> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> >  at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> >  at
> >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition

Re: Multiple Processes Consuming from Same GroupID

2013-09-11 Thread prashant amar
I usually get this exception when I define > 2 partitions ..

Current configuration :

Single Topic - 4 partitions
1 Consumers Group - 10 Threads




On Wed, Sep 11, 2013 at 10:24 PM, prashant amar  wrote:

> From the broker log:
>
>
> INFO Reconnect due to socket error:  (kafka.consumer.SimpleConsumer)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>  at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> at
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
>  at
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> at
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
>  at
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>  at scala.collection.immutable.List.foreach(List.scala:45)
> at
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
>  at
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
>  at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
>  at
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
>
>
> On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao  wrote:
>
>> This means the broker somehow closed the socket connection. Anything in
>> the
>> broker log around the same time?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Sep 11, 2013 at 6:07 PM, prashant amar 
>> wrote:
>>
>> > Also noticed another issue
>> >
>> > Specified below is the current configuration
>> >
>> > Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
>> > Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2)
>> >
>> > Notice that I have used the same naming convention on the consumer group
>> > set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of
>> > topics.
>> >
>> > On calling the *ConsumerOffsetChecker* API, I am receiving a
>> > ClosedChannelException
>> >
>> > (Check Trace Below)
>> >
>> > Is there any namespace collision occurring here ? This issue is
>> > reproducible with the following setup above
>> >
>> >
>> > *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2
>> > --zkconnect localhost:2181*
>> >
>> >
>> > 2013-09-12 01:01:59,701] INFO Initiating client connection,
>> > connectString=localhost:2181 sessionTimeout=3
>> > watcher=org.I0Itec.zkclient.ZkClient@3af0ce45
>> > (org.apache.zookeeper.ZooKeeper)
>> > [2013-09-12 01:01:59,724] INFO Opening socket connection to server
>> > localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)
>> > [2013-09-12 01:01:59,732] INFO Socket connection established to
>> localhost/
>> > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
>> > [2013-09-12 01:01:59,741] INFO Session establishment complete on server
>> > localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated
>> > timeout
>> > = 3 (org.apache.zookeeper.ClientCnxn)
>> > [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected)
>> > (org.I0Itec.zkclient.ZkClient)
>> > Group   Topic  Pid Offset
>>  logSize
>> > Lag Owner
>> > gr2 pe10   129985
>>  130625
>> >640 none
>> > gr2 pe11   0   0
>> > 0   none
>> > gr2 pe20   130493
>>  130493
>> >0   gr2_ip-XX-6

Producer not distributing across all partitions

2013-09-13 Thread prashant amar
I created a topic with 4 partitions and for some reason the producer is
pushing only to one partition.

This is consistently happening across all topics that I created ...

Is there a specific configuration that I need to apply to ensure that load
is evenly distributed across all partitions?


Group   Topic  Pid Offset  logSize
Lag Owner
perfgroup1  perfpayload1   0   10965   11220
255 perfgroup1_-0
perfgroup1  perfpayload1   1   0   0
0   perfgroup1_-1
perfgroup1  perfpayload1   2   0   0
0   perfgroup1_X-2
perfgroup1  perfpayload1   3   0   0
0   perfgroup1_X-3