Re: Message Encoding in Kafka 0.8
Can anybody please help with this issue indicated below. I have not heard from anyone with a solution. Thanks in advance Amar On Tue, Jul 16, 2013 at 11:35 PM, prashant amar wrote: > Hello, > > Specified below is my code base where I am attempting to marshall a > complex type and receiving the follow error. > Am I missing anything here? > > Sending Encoded Messages .. > [error] (run-main) java.lang.ClassCastException: java.lang.String cannot > be cast to com.test.groups.MemberRecord > java.lang.ClassCastException: java.lang.String cannot be cast to > com.test.groups.MemberRecord > > ~ > > package com.test.groups > > import java.util._ > import kafka.javaapi.producer.Producer > import kafka.producer.KeyedMessage > import kafka.producer.ProducerConfig > import kafka.serializer.Encoder > import kafka.message.Message > import java.io.ByteArrayOutputStream > import java.io.DataOutputStream > import kafka.serializer.Decoder > import java.io.ByteArrayInputStream > import java.io.DataInputStream > import kafka.utils.VerifiableProperties > > > > class MemberRecord(val memberId: Int, val name: String, val location: > String) { > override def toString = { > "(" + memberId + "," + name + "," + location + ")" > } > } > > class MemberRecordEncoder(props: VerifiableProperties = null) extends > Encoder[MemberRecord] { > def toBytes(member: MemberRecord): Array[Byte] = { > val outputStream = new ByteArrayOutputStream() > val dos = new DataOutputStream(outputStream) > dos.writeInt(member.memberId) > dos.writeUTF(member.name) > dos.writeUTF(member.location) > outputStream.flush > outputStream.toByteArray > } > } > > class MemberRecordDecoder(props: VerifiableProperties = null) extends > Decoder[MemberRecord] { > def fromBytes(messageByte: Array[Byte]):MemberRecord = { > val message = new Message(messageByte) > val inputStream = new ByteArrayInputStream(message.payload.array, > message.payload.arrayOffset, message.payload.limit) > val dataInputStream = new DataInputStream(inputStream) > new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF, > dataInputStream.readUTF) > } > } > > > object SimpleDataProducer { > def main(args: Array[String]) { > val events = 100 > > val eprops = new Properties > eprops.put("metadata.broker.list", "localhost:9092") > eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder"); > eprops.put("request.required.acks", "1") > > > val econfg = new ProducerConfig(eprops) > val eproducer = new Producer[String, MemberRecord](econfg) > > val dataP = new SimpleDataProducer > println(" Sending Encoded Messages .. ") > dataP.sendEncodedMessage(10,eproducer) > > println(" Shutting down Producer ") > eproducer.close > println(" Successfully shut down Producer ") > } > } > > class SimpleDataProducer { > > val rnd = new Random > > def sendEncodedMessage(nEvents: Int, producer: Producer[String, > MemberRecord]) { > for (nEvents <- 0 to nEvents) { > val message = new MemberRecord(rnd.nextInt(255), "John", "US") > val producerData = new KeyedMessage[String, > MemberRecord]("topic-Encoded", "encode", message) > producer.send(producerData) > } > } > > } > > >
Message Encoding in Kafka 0.8
I noticed that ProducerData has been removed in 0.8 branch If I'd wish to send a complex message type (encoded message) , how would I do it in 0.8? In 0.7.X (a snippet from Neha's example) val producer = new Producer[Message, MemberRecord](config); // send a single message val message = new MemberRecord(1, "John", "US") val producerData = new ProducerData[Message, MemberRecord]("member-records", message) producer.send(producerData) class MemberRecord(val memberId: Int, val name: String, val location: String) { override def toString = { "(" + memberId + "," + name + "," + location + ")" } } class MemberRecordEncoder extends Encoder[MemberRecord] { def toMessage(member: MemberRecord):Message = { val outputStream = new ByteArrayOutputStream() val dos = new DataOutputStream(outputStream) dos.writeInt(member.memberId) dos.writeUTF(member.name) dos.writeUTF(member.location) outputStream.flush new Message(outputStream.toByteArray) }
Re: Message Encoding in Kafka 0.8
Hello, Specified below is my code base where I am attempting to marshall a complex type and receiving the follow error. Am I missing anything here? Sending Encoded Messages .. [error] (run-main) java.lang.ClassCastException: java.lang.String cannot be cast to com.test.groups.MemberRecord java.lang.ClassCastException: java.lang.String cannot be cast to com.test.groups.MemberRecord ~ package com.test.groups import java.util._ import kafka.javaapi.producer.Producer import kafka.producer.KeyedMessage import kafka.producer.ProducerConfig import kafka.serializer.Encoder import kafka.message.Message import java.io.ByteArrayOutputStream import java.io.DataOutputStream import kafka.serializer.Decoder import java.io.ByteArrayInputStream import java.io.DataInputStream import kafka.utils.VerifiableProperties class MemberRecord(val memberId: Int, val name: String, val location: String) { override def toString = { "(" + memberId + "," + name + "," + location + ")" } } class MemberRecordEncoder(props: VerifiableProperties = null) extends Encoder[MemberRecord] { def toBytes(member: MemberRecord): Array[Byte] = { val outputStream = new ByteArrayOutputStream() val dos = new DataOutputStream(outputStream) dos.writeInt(member.memberId) dos.writeUTF(member.name) dos.writeUTF(member.location) outputStream.flush outputStream.toByteArray } } class MemberRecordDecoder(props: VerifiableProperties = null) extends Decoder[MemberRecord] { def fromBytes(messageByte: Array[Byte]):MemberRecord = { val message = new Message(messageByte) val inputStream = new ByteArrayInputStream(message.payload.array, message.payload.arrayOffset, message.payload.limit) val dataInputStream = new DataInputStream(inputStream) new MemberRecord(dataInputStream.readInt, dataInputStream.readUTF, dataInputStream.readUTF) } } object SimpleDataProducer { def main(args: Array[String]) { val events = 100 val eprops = new Properties eprops.put("metadata.broker.list", "localhost:9092") eprops.put("serializer.class", "com.test.groups.MemberRecordEncoder"); eprops.put("request.required.acks", "1") val econfg = new ProducerConfig(eprops) val eproducer = new Producer[String, MemberRecord](econfg) val dataP = new SimpleDataProducer println(" Sending Encoded Messages .. ") dataP.sendEncodedMessage(10,eproducer) println(" Shutting down Producer ") eproducer.close println(" Successfully shut down Producer ") } } class SimpleDataProducer { val rnd = new Random def sendEncodedMessage(nEvents: Int, producer: Producer[String, MemberRecord]) { for (nEvents <- 0 to nEvents) { val message = new MemberRecord(rnd.nextInt(255), "John", "US") val producerData = new KeyedMessage[String, MemberRecord]("topic-Encoded", "encode", message) producer.send(producerData) } } }
Sample Kafka Producer/Consumer example in Scala
Hello All I have been attempting to build an end-to-end example of producer with multiple consumers using scala. I managed to build a producer, but when I attempt to build a consumer the following statement causes the scala REPL to block/hang val consumerMap = consumer.createMessageStreams(topicCountMap) Am I missing anything here? Can anybody please point me to a simple producer with multiple consumers example in scala? *Please do not point me to ConsumerConsumer or MirrorMaker as both of them are fairly complex to even look at (given that I am a novice scala programmer)* Any help will be appreciated? Thanks in advance Amar
Multiple Processes Consuming from Same GroupID
A Design Question that needs verification: 1. Created a topic T with 'n' partitions. 2. Created a consumer group process with 'n + 1' threads subscribing from topic 'T' with a groupID 'y' 3. Added another consumer group process with 'n + 1' threads subscribing from same topic 'T' with same groupID 'y' On doing so, I noticed that the previous consumer group stops consuming and the new consumer beings to consume I was attempting to model on demand parallelization in an event where an consumer group cannot keep up with the events produced. Rather than increase the threadpool capacity in the same process, it would make sense to distribute the load across multiple processes. Advice please? Regards Amardeep
Re: Multiple Processes Consuming from Same GroupID
Also attempted another pattern where 1. Created a topic T with 'n' partitions. 2. Created a consumer group process with 'n + 1' threads subscribing from topic 'T' with a groupID 'y' 3. Added another consumer group process with 'n + 1' threads subscribing from same topic 'T' with same groupID 'z' (Note that 2 and 3 subscribe from same topic but different groups) Can a single topic with multiple partitions abetted with multiple consumer groups increase parallelism is consumption? On Wed, Sep 11, 2013 at 4:48 PM, prashant amar wrote: > A Design Question that needs verification: > > 1. Created a topic T with 'n' partitions. > 2. Created a consumer group process with 'n + 1' threads subscribing from > topic 'T' with a groupID 'y' > 3. Added another consumer group process with 'n + 1' threads subscribing > from same topic 'T' with same groupID 'y' > > On doing so, I noticed that the previous consumer group stops consuming > and the new consumer beings to consume > > I was attempting to model on demand parallelization in an event where an > consumer group cannot keep up with the events produced. Rather than > increase the threadpool capacity in the same process, it would make sense > to distribute the load across multiple processes. > > Advice please? > > Regards > Amardeep >
Re: Multiple Processes Consuming from Same GroupID
Also noticed another issue Specified below is the current configuration Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) Notice that I have used the same naming convention on the consumer group set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of topics. On calling the *ConsumerOffsetChecker* API, I am receiving a ClosedChannelException (Check Trace Below) Is there any namespace collision occurring here ? This issue is reproducible with the following setup above *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 --zkconnect localhost:2181* 2013-09-12 01:01:59,701] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=3 watcher=org.I0Itec.zkclient.ZkClient@3af0ce45(org.apache.zookeeper.ZooKeeper) [2013-09-12 01:01:59,724] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,732] INFO Socket connection established to localhost/ 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,741] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated timeout = 3 (org.apache.zookeeper.ClientCnxn) [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) Group Topic Pid Offset logSize Lag Owner gr2 pe10 129985 130625 640 none gr2 pe11 0 0 0 none gr2 pe20 130493 130493 0 gr2_ip-XX-6c6f5d94-0 [2013-09-12 01:02:00,514] INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) gr2 pe21 0 0 0 gr2_ip-XXX-6c6f5d94-1 [2013-09-12 01:02:00,523] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2013-09-12 01:02:00,526] INFO Session: 0x140924380790211 closed (org.apache.zookeeper.ZooKeeper) [2013-09-12 01:02:00,526] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) On Wed, Sep 11, 2013 at 5:46 PM, Neha Narkhede wrote: > I think you are hitting this - > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whysomeoftheconsumersinaconsumergroupneverreceiveanymessage%3F > > Let us know if we can improve the documentation to make it clearer. > > Thanks, > Neha > > > On Wed, Sep 11, 2013 at 5:28 PM, prashant amar > wrote: > > > Also attempted another pattern where > > > > 1. Created a topic T with 'n' partitions. > > 2. Created a consumer group process with 'n + 1' threads subscribing from > > topic 'T' with a groupID 'y' > > 3. Added another consumer group process with 'n + 1' threads subscribing > > from same topic 'T' with same groupID 'z' > > (Note that 2 and 3 subscribe from same topic but different groups) > > > > Can a single topic with multiple partitions abetted
Re: Multiple Processes Consuming from Same GroupID
>From the broker log: INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao wrote: > This means the broker somehow closed the socket connection. Anything in the > broker log around the same time? > > Thanks, > > Jun > > > On Wed, Sep 11, 2013 at 6:07 PM, prashant amar > wrote: > > > Also noticed another issue > > > > Specified below is the current configuration > > > > Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) > > Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) > > > > Notice that I have used the same naming convention on the consumer group > > set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of > > topics. > > > > On calling the *ConsumerOffsetChecker* API, I am receiving a > > ClosedChannelException > > > > (Check Trace Below) > > > > Is there any namespace collision occurring here ? This issue is > > reproducible with the following setup above > > > > > > *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 > > --zkconnect localhost:2181* > > > > > > 2013-09-12 01:01:59,701] INFO Initiating client connection, > > connectString=localhost:2181 sessionTimeout=3 > > watcher=org.I0Itec.zkclient.ZkClient@3af0ce45 > > (org.apache.zookeeper.ZooKeeper) > > [2013-09-12 01:01:59,724] INFO Opening socket connection to server > > localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) > > [2013-09-12 01:01:59,732] INFO Socket connection established to > localhost/ > > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) > > [2013-09-12 01:01:59,741] INFO Session establishment complete on server > > localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated > > timeout > > = 3 (org.apache.zookeeper.ClientCnxn) > > [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) > > (org.I0Itec.zkclient.ZkClient) > > Group Topic Pid Offset > logSize > > Lag Owner > > gr2 pe10 129985 130625 > >640 none > > gr2 pe11 0 0 > > 0 none > > gr2 pe20 130493 130493 > >0 gr2_ip-XX-6c6f5d94-0 > > [2013-09-12 01:02:00,514] INFO Reconnect due to socket error: > > (kafka.consumer.SimpleConsumer) > > java.nio.channels.ClosedChannelException > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) > > at > > > > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) > > at > > > > > kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition
Re: Multiple Processes Consuming from Same GroupID
I usually get this exception when I define > 2 partitions .. Current configuration : Single Topic - 4 partitions 1 Consumers Group - 10 Threads On Wed, Sep 11, 2013 at 10:24 PM, prashant amar wrote: > From the broker log: > > > INFO Reconnect due to socket error: (kafka.consumer.SimpleConsumer) > java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89) > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125) > at > kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) > at > kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) > at scala.collection.immutable.List.foreach(List.scala:45) > at > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153) > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala) > > > On Wed, Sep 11, 2013 at 10:11 PM, Jun Rao wrote: > >> This means the broker somehow closed the socket connection. Anything in >> the >> broker log around the same time? >> >> Thanks, >> >> Jun >> >> >> On Wed, Sep 11, 2013 at 6:07 PM, prashant amar >> wrote: >> >> > Also noticed another issue >> > >> > Specified below is the current configuration >> > >> > Topic1 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) >> > Topic2 -> n Partitions -> 2 Consumer Groups (gr1 and gr2) >> > >> > Notice that I have used the same naming convention on the consumer group >> > set i.e. 'gr1' and 'gr2' are consumer groups associated with 2 sets of >> > topics. >> > >> > On calling the *ConsumerOffsetChecker* API, I am receiving a >> > ClosedChannelException >> > >> > (Check Trace Below) >> > >> > Is there any namespace collision occurring here ? This issue is >> > reproducible with the following setup above >> > >> > >> > *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group gr2 >> > --zkconnect localhost:2181* >> > >> > >> > 2013-09-12 01:01:59,701] INFO Initiating client connection, >> > connectString=localhost:2181 sessionTimeout=3 >> > watcher=org.I0Itec.zkclient.ZkClient@3af0ce45 >> > (org.apache.zookeeper.ZooKeeper) >> > [2013-09-12 01:01:59,724] INFO Opening socket connection to server >> > localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn) >> > [2013-09-12 01:01:59,732] INFO Socket connection established to >> localhost/ >> > 127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) >> > [2013-09-12 01:01:59,741] INFO Session establishment complete on server >> > localhost/127.0.0.1:2181, sessionid = 0x140924380790211, negotiated >> > timeout >> > = 3 (org.apache.zookeeper.ClientCnxn) >> > [2013-09-12 01:01:59,744] INFO zookeeper state changed (SyncConnected) >> > (org.I0Itec.zkclient.ZkClient) >> > Group Topic Pid Offset >> logSize >> > Lag Owner >> > gr2 pe10 129985 >> 130625 >> >640 none >> > gr2 pe11 0 0 >> > 0 none >> > gr2 pe20 130493 >> 130493 >> >0 gr2_ip-XX-6
Producer not distributing across all partitions
I created a topic with 4 partitions and for some reason the producer is pushing only to one partition. This is consistently happening across all topics that I created ... Is there a specific configuration that I need to apply to ensure that load is evenly distributed across all partitions? Group Topic Pid Offset logSize Lag Owner perfgroup1 perfpayload1 0 10965 11220 255 perfgroup1_-0 perfgroup1 perfpayload1 1 0 0 0 perfgroup1_-1 perfgroup1 perfpayload1 2 0 0 0 perfgroup1_X-2 perfgroup1 perfpayload1 3 0 0 0 perfgroup1_X-3