Thanks Guozhang and others for helping me. I am able to send and receive 10MB size messages now. In case anybody has the same requirement. Please make the following changes
*server.properties* message.max.bytes=10485800 replica.fetch.max.bytes=10485800 socket.send.buffer.bytes=104857600 socket.receive.buffer.bytes=104857600 socket.request.max.bytes=104857600 *consumer.properties* props.put("fetch.message.max.bytes","10485800"); *Topic* bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic big_topic -config max.message.bytes=10485800 On Tue, Dec 2, 2014 at 7:15 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Palur, > > First you need to make sure the message is received at Kafka: > > message.max.bytes > > controls the maximum size of a message that can be accepted, and > > fetch.message.max.bytes > > controls the maximum number of bytes a consumer issues in one fetch. > > > Guozhang > > > On Mon, Dec 1, 2014 at 7:25 PM, Palur Sandeep <psand...@hawk.iit.edu> > wrote: > > > Thank you so much Jiangle. I got it working. > > > > I have another problem the consumer doesnt receive message if it is big: > > When the producer sends 256kb messages to broker, consumer is able to > > retrieve it, but when producer sends 10MB messages to the broker, the > > consumer doesn’t receive any message. > > > > Please tell me how to make the consumer receive 10MB messages. > > > > On Mon, Dec 1, 2014 at 10:24 AM, Jiangjie Qin <j...@linkedin.com.invalid > > > > wrote: > > > > > I think you are printing the class Message instead of > MessageAndMetadata. > > > The output you got was from Message.toString. > > > > > > Can you just try something like below? > > > > > > ... > > > ConsumeIterator iter = consumerStream.iterator(); // assuming you have > > got > > > a consumer stream. > > > MessageAndMetadata messageAndMetadta = iter.next(); > > > System.out.println(“topic: “ + messageAndMeatadata.topic() + > ”partition: > > “ > > > + messageAndMetadata.partition()); > > > > > > > > > Jiangjie (Becket) Qin > > > > > > On 11/26/14, 12:56 PM, "Palur Sandeep" <psand...@hawk.iit.edu> wrote: > > > > > > >Hi Jiangle, > > > > > > > > > > > >Thanks for the information. This is what I get when I print > > > >MessageandMetadata > > > > > > > >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, key = > > > >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload = > > > >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])* > > > > > > > >Can you please tell me where can I find partition number in this? > > > > > > > > > > > > > > > >On Wed, Nov 26, 2014 at 1:29 PM, Jiangjie Qin > <j...@linkedin.com.invalid > > > > > > >wrote: > > > > > > > >> Hi Sandeep, > > > >> > > > >> For old producer, I don’t think you can achieve strict even > > distribution > > > >> of messages across partitions within the same topic. But You can > > > >> potentially reduce the sticking time by setting > > > >> topic.metadata.refresh.interval.ms to be lower, e.g. 1 second. > > > >> > > > >> Kafka-544 added the partition information to MessageAndMetadata. And > > > >>that > > > >> is back to 11/15/12 so it should have been included in 0.8.1.1. Do > you > > > >> mean the MessageAndMetadata you got does not partition member or > > > >> MessageAndMetadata.partition give you nothing? > > > >> > > > >> Jiangjie (Becket) Qin > > > >> > > > >> On 11/26/14, 10:31 AM, "Palur Sandeep" <psand...@hawk.iit.edu> > wrote: > > > >> > > > >> >Hi Jiangjie, > > > >> > > > > >> >I am using the high level consumer (ZookeeperConsumerConnector), > > after > > > >> >getting the message from stream, but I don't see this > > > >>"message.Partition". > > > >> >Please help me how to get the partition id form message. > > > >> > > > > >> >What is that I can to do get messages evenly distributed among > > > >>partitions? > > > >> >do you mean that it is not possible in 0.8.1.1 version? > > > >> > > > > >> >On Wed, Nov 26, 2014 at 12:03 PM, Jiangjie Qin > > > >><j...@linkedin.com.invalid > > > >> > > > > >> >wrote: > > > >> > > > > >> >> Hi Sandeep, > > > >> >> > > > >> >> If you are sending messages to different topics, each topic will > > > >>stick > > > >> >>to > > > >> >> a random partition for 10 min. Since they are likely sticking to > > > >> >>different > > > >> >> brokers, you will still see messages roughly evenly distributed. > > > >> >> If you are using high level consumer > (ZookeeperConsumerConnector), > > > >>after > > > >> >> getting the message from stream, you can simply call > > > >>message.Partition > > > >> >>to > > > >> >> get the partition id. > > > >> >> > > > >> >> Jiangjie (Becket) Qin > > > >> >> > > > >> >> On 11/25/14, 5:30 PM, "Palur Sandeep" <psand...@hawk.iit.edu> > > wrote: > > > >> >> > > > >> >> >Hi Jiangjie, > > > >> >> > > > > >> >> >This is what I have understood. Please correct me if I am wrong > > > >> >> > > > > >> >> >I don¹t use the partition class at > all(KeyedMessage<String,String> > > > >> >>data = > > > >> >> >new KeyedMessage<String, String>(topic_name,new_mes). It > > partitions > > > >> >> >messages randomly to different partitions. I don¹t see it > sticking > > > >>to > > > >> >>any > > > >> >> >broker for 10 mins. I guess it follows some random partitioning > > > >>logic. > > > >> >>I > > > >> >> >am > > > >> >> >using the following 0.8.1.1 version. > > > >> >> > > > > >> >> >MessageAndMetadata on consumer side prints the following > message: > > > >>Can > > > >> >>you > > > >> >> >help me find out metadat regarding partition number? > > > >> >> > > > > >> >> >*Thread 0: Message(magic = 0, attributes = 0, crc = 127991357, > > key = > > > >> >> >java.nio.HeapByteBuffer[pos=0 lim=1 cap=55], payload = > > > >> >> >java.nio.HeapByteBuffer[pos=0 lim=50 cap=50])* > > > >> >> > > > > >> >> >Thanks > > > >> >> >Sandeep > > > >> >> > > > > >> >> >On Tue, Nov 25, 2014 at 7:07 PM, Jiangjie Qin > > > >> >><j...@linkedin.com.invalid> > > > >> >> >wrote: > > > >> >> > > > > >> >> >> Palur, > > > >> >> >> > > > >> >> >> Just adding to what Guozhang said, the answer to your question > > > >>might > > > >> >> >> depend on which producer you are using. > > > >> >> >> Assuming you are producing messages without keys to the same > > > >>topic, > > > >> >>in > > > >> >> >>new > > > >> >> >> producer(KafkaProducer), the messages will go to brokers in a > > > >>round > > > >> >> >>robin > > > >> >> >> way, so the messages will end up in brokers evenly > distributed. > > > >> >>Whereas > > > >> >> >>in > > > >> >> >> old producer, it actually sticks to a particular broker for 10 > > min > > > >> >>(by > > > >> >> >> default) then switch to another random partition. In that > case, > > if > > > >> >>you > > > >> >> >> send messages fast enough, you might see uneven distribution > in > > > >> >>brokers. > > > >> >> >> > > > >> >> >> For the consumer, if you are using high level consumer, when > > > >>reading > > > >> >> >>from > > > >> >> >> KafkaStream, you will get MessageAndMetadata, the topic and > > > >>partition > > > >> >> >> information is included in it as well as the raw message. > > > >> >> >> > > > >> >> >> Jiangjie (Becket) Qin > > > >> >> >> > > > >> >> >> > > > >> >> >> > > > >> >> >> On 11/25/14, 10:01 AM, "Guozhang Wang" <wangg...@gmail.com> > > > wrote: > > > >> >> >> > > > >> >> >> >Palur, > > > >> >> >> > > > > >> >> >> >If the 8 partitions are hosted on each one of the nodes, > > assuming > > > >> >> >> >replication factor 1 then each node will get roughly 100000 > / 8 > > > >> >> >>messages > > > >> >> >> >due to the random partitioner. If you want to know exactly > how > > > >>many > > > >> >> >> >messages is on each broker then you can use a simple consumer > > > >>which > > > >> >> >>allows > > > >> >> >> >you to specify the partition id you want to consume from. > > > >> >> >> > > > > >> >> >> >In the new consumer (0.9), each of the consumed message will > > > >>contain > > > >> >> >>the > > > >> >> >> >partition id as part of its message metadata. > > > >> >> >> > > > > >> >> >> >Guozhang > > > >> >> >> > > > > >> >> >> >On Tue, Nov 25, 2014 at 7:47 AM, Palur Sandeep > > > >> >><psand...@hawk.iit.edu> > > > >> >> >> >wrote: > > > >> >> >> > > > > >> >> >> >> Dear Developers, > > > >> >> >> >> > > > >> >> >> >> I am using the default partitioning logic(Random > > Partitioning) > > > >>to > > > >> >> >> >>produce > > > >> >> >> >> messages into brokers. That is I don't use a > > partitioner.class. > > > >> >> >> >> > > > >> >> >> >> My requirement is If I produce 100000 messages using the > > below > > > >> >>code > > > >> >> >>for > > > >> >> >> >>a > > > >> >> >> >> broker that has 8 partitions across 8 nodes. How many > > messages > > > >> >>will > > > >> >> >>each > > > >> >> >> >> partition have? Is there any API that can help me find the > > > >>broker > > > >> >>id > > > >> >> >>of > > > >> >> >> >>the > > > >> >> >> >> each message I consume from the consumer side? > > > >> >> >> >> > > > >> >> >> >> PS: I dont want to use partitioner.class. I want use the > > > >>kafka's > > > >> >> >>default > > > >> >> >> >> partitioning logic. > > > >> >> >> >> > > > >> >> >> >> KeyedMessage<String,String> data = new > KeyedMessage<String, > > > >> >> >> >> String>(topic_name,new_mes); > > > >> >> >> >> > > > >> >> >> >> producer.send(data); > > > >> >> >> >> > > > >> >> >> >> -- > > > >> >> >> >> Regards, > > > >> >> >> >> Sandeep Palur > > > >> >> >> >> Data-Intensive Distributed Systems Laboratory, CS/IIT > > > >> >> >> >> Department of Computer Science, Illinois Institute of > > > >>Technology > > > >> >> >>(IIT) > > > >> >> >> >> Phone : 312-647-9833 > > > >> >> >> >> Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu> > > > >> >> >> >> > > > >> >> >> > > > > >> >> >> > > > > >> >> >> > > > > >> >> >> >-- > > > >> >> >> >-- Guozhang > > > >> >> >> > > > >> >> >> > > > >> >> > > > > >> >> > > > > >> >> >-- > > > >> >> >Regards, > > > >> >> >Sandeep Palur > > > >> >> >Data-Intensive Distributed Systems Laboratory, CS/IIT > > > >> >> >Department of Computer Science, Illinois Institute of Technology > > > >>(IIT) > > > >> >> >Phone : 312-647-9833 > > > >> >> >Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu> > > > >> >> > > > >> >> > > > >> > > > > >> > > > > >> >-- > > > >> >Regards, > > > >> >Sandeep Palur > > > >> >Data-Intensive Distributed Systems Laboratory, CS/IIT > > > >> >Department of Computer Science, Illinois Institute of Technology > > (IIT) > > > >> >Phone : 312-647-9833 > > > >> >Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu> > > > >> > > > >> > > > > > > > > > > > >-- > > > >Regards, > > > >Sandeep Palur > > > >Data-Intensive Distributed Systems Laboratory, CS/IIT > > > >Department of Computer Science, Illinois Institute of Technology (IIT) > > > >Phone : 312-647-9833 > > > >Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu> > > > > > > > > > > > > -- > > Regards, > > Sandeep Palur > > Data-Intensive Distributed Systems Laboratory, CS/IIT > > Department of Computer Science, Illinois Institute of Technology (IIT) > > Phone : 312-647-9833 > > Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu> > > > > > > -- > -- Guozhang > -- Regards, Sandeep Palur Data-Intensive Distributed Systems Laboratory, CS/IIT Department of Computer Science, Illinois Institute of Technology (IIT) Phone : 312-647-9833 Email : psand...@hawk.iit.edu <sraja...@hawk.iit.edu>