Hi Sandeep, For old producer, I don’t think you can achieve strict even distribution of messages across partitions within the same topic. But You can potentially reduce the sticking time by setting topic.metadata.refresh.interval.ms to be lower, e.g. 1 second.
Kafka-544 added the partition information to MessageAndMetadata. And that is back to 11/15/12 so it should have been included in 0.8.1.1. Do you mean the MessageAndMetadata you got does not partition member or MessageAndMetadata.partition give you nothing? Jiangjie (Becket) Qin On 11/26/14, 10:31 AM, "Palur Sandeep" <psand...@hawk.iit.edu> wrote: >Hi Jiangjie, > >I am using the high level consumer (ZookeeperConsumerConnector), after >getting the message from stream, but I don't see this "message.Partition". >Please help me how to get the partition id form message. > >What is that I can to do get messages evenly distributed among partitions? >do you mean that it is not possible in 0.8.1.1 version? > >On Wed, Nov 26, 2014 at 12:03 PM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> Hi Sandeep, >> >> If you are sending messages to different topics, each topic will stick >>to >> a random partition for 10 min. Since they are likely sticking to >>different >> brokers, you will still see messages roughly evenly distributed. >> If you are using high level consumer (ZookeeperConsumerConnector), after >> getting the message from stream, you can simply call message.Partition >>to >> get the partition id. >> >> Jiangjie (Becket) Qin >> >> On 11/25/14, 5:30 PM, "Palur Sandeep" <psand...@hawk.iit.edu> wrote: >> >> >Hi Jiangjie, >> > >> >This is what I have understood. Please correct me if I am wrong >> > >> >I don¹t use the partition class at all(KeyedMessage<String,String> >>data = >> >new KeyedMessage<String, String>(topic_name,new_mes). It partitions >> >messages randomly to different partitions. I don¹t see it sticking to >>any >> >broker for 10 mins. I guess it follows some random partitioning logic. >>I >> >am >> >using the following 0.8.1.1 version. >> > >> >MessageAndMetadata on consumer side prints the following message: Can >>you >> >help me find out metadat regarding partition number? >> > >> >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key = >> >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload = >> >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])* >> > >> >Thanks >> >Sandeep >> > >> >On Tue, Nov 25, 2014 at 7:07 PM, Jiangjie Qin >><j...@linkedin.com.invalid> >> >wrote: >> > >> >> Palur, >> >> >> >> Just adding to what Guozhang said, the answer to your question might >> >> depend on which producer you are using. >> >> Assuming you are producing messages without keys to the same topic, >>in >> >>new >> >> producer(KafkaProducer), the messages will go to brokers in a round >> >>robin >> >> way, so the messages will end up in brokers evenly distributed. >>Whereas >> >>in >> >> old producer, it actually sticks to a particular broker for 10 min >>(by >> >> default) then switch to another random partition. In that case, if >>you >> >> send messages fast enough, you might see uneven distribution in >>brokers. >> >> >> >> For the consumer, if you are using high level consumer, when reading >> >>from >> >> KafkaStream, you will get MessageAndMetadata, the topic and partition >> >> information is included in it as well as the raw message. >> >> >> >> Jiangjie (Becket) Qin >> >> >> >> >> >> >> >> On 11/25/14, 10:01 AM, "Guozhang Wang" <wangg...@gmail.com> wrote: >> >> >> >> >Palur, >> >> > >> >> >If the 8 partitions are hosted on each one of the nodes, assuming >> >> >replication factor 1 then each node will get roughly 100000 / 8 >> >>messages >> >> >due to the random partitioner. If you want to know exactly how many >> >> >messages is on each broker then you can use a simple consumer which >> >>allows >> >> >you to specify the partition id you want to consume from. >> >> > >> >> >In the new consumer (0.9), each of the consumed message will contain >> >>the >> >> >partition id as part of its message metadata. >> >> > >> >> >Guozhang >> >> > >> >> >On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep >><psand...@hawk.iit.edu> >> >> >wrote: >> >> > >> >> >> Dear Developers, >> >> >> >> >> >> I am using the default partitioning logic(Random Partitioning) to >> >> >>produce >> >> >> messages into brokers. That is I don't use a partitioner.class. >> >> >> >> >> >> My requirement is If I produce 100000 messages using the below >>code >> >>for >> >> >>a >> >> >> broker that has 8 partitions across 8 nodes. How many messages >>will >> >>each >> >> >> partition have? Is there any API that can help me find the broker >>id >> >>of >> >> >>the >> >> >> each message I consume from the consumer side? >> >> >> >> >> >> PS: I dont want to use partitioner.class. I want use the kafka's >> >>default >> >> >> partitioning logic. >> >> >> >> >> >> KeyedMessage<String,String> data = new KeyedMessage<String, >> >> >> String>(topic_name,new_mes); >> >> >> >> >> >> producer.send(data); >> >> >> >> >> >> -- >> >> >> Regards, >> >> >> Sandeep Palur >> >> >> Data-Intensive Distributed Systems Laboratory, CS/IIT >> >> >> Department of Computer Science, Illinois Institute of Technology >> >>(IIT) >> >> >> Phone : 312-647-9833 >> >> >> Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu> >> >> >> >> >> > >> >> > >> >> > >> >> >-- >> >> >-- Guozhang >> >> >> >> >> > >> > >> >-- >> >Regards, >> >Sandeep Palur >> >Data-Intensive Distributed Systems Laboratory, CS/IIT >> >Department of Computer Science, Illinois Institute of Technology (IIT) >> >Phone : 312-647-9833 >> >Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu> >> >> > > >-- >Regards, >Sandeep Palur >Data-Intensive Distributed Systems Laboratory, CS/IIT >Department of Computer Science, Illinois Institute of Technology (IIT) >Phone : 312-647-9833 >Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu>