We pass in all the producer properties when instantiating the partitioner. Thanks,
Jun On Thu, Oct 30, 2014 at 5:57 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > Hmm I am not sure how to do that. Do I have access to the > VerifiableProperties object that is passed to the partitioner object? > > On Thu, Oct 30, 2014 at 5:54 PM, Jun Rao <jun...@gmail.com> wrote: > > > Potentially, you can pass in state related info through the properties > and > > use those to instantiate MyDeciderThingy. > > > > Thanks, > > > > Jun > > > > On Thu, Oct 30, 2014 at 11:46 AM, Rajiv Kurian <ra...@signalfuse.com> > > wrote: > > > > > Yes I am using the old producer. When I use the producer I do something > > > like this: > > > > > > Properties config = new Properties(); > > > > > > // put other stuff in the config. > > > > > > // Put the partitioner class that kafka will instantiate. > > > > > > config.put("partitioner.class", partitionerClass.getName()); > > > > > > return new Producer<K, M>(new ProducerConfig(config)); > > > > > > So I assume that the kafka code uses reflection to instantiate the > > > partitioner object. My partitioner object looks like this: > > > > > > public class MyPartitioner implements Partitioner<MyStuff> { > > > > > > MyDeciderThingy thingy; > > > > > > public RawBusIdPartitioner(VerifiableProperties props) {} > > > > > > > > > @Override > > > > > > public int partition(MyStuff stuff, int numPartitions) { > > > > > > return thingy.decide(stuff); > > > > > > } > > > > > > } > > > > > > The problem is I don't know how to pass a MyDeciderThingy to my > > > Partitioner object given Kafka instantiates it. > > > > > > Thanks! > > > > > > On Thu, Oct 30, 2014 at 11:34 AM, Joel Koshy <jjkosh...@gmail.com> > > wrote: > > > > > > > Not sure I follow - you just need to extend the Partitioner trait. > You > > > > don't _have_ to use that specific constructor. > > > > > > > > It is slightly different with the new producer, but looks like you > are > > > > on the old producer. > > > > > > > > On Thu, Oct 30, 2014 at 11:16:28AM -0700, Rajiv Kurian wrote: > > > > > Actually I figured out what the problem was. My producer was using > a > > > > > partitioner which was causing a null pointer exception. This > actually > > > > > raises another question for me. I want some state in my partitioner > > and > > > > the > > > > > only constructor that Kafka seems to use is this one: > > > > > > > > > > public MyPartitioner(VerifiableProperties props) {} > > > > > > > > > > How do I inject an object here that I can use to decide what > > partition > > > my > > > > > messages should go to? As an ugly hack I am using a static variable > > > from > > > > > somewhere else. > > > > > > > > > > On Thu, Oct 30, 2014 at 10:25 AM, Rajiv Kurian < > ra...@signalfuse.com > > > > > > > wrote: > > > > > > > > > > > Yes I see a ton of WARN messages on the broker logs of this form: > > > > > > > > > > > > 2014-10-30T17:21:54.281Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition [myTopic, 56], current > > > > leader > > > > > > epoch is 5 > > > > > > > > > > > > 2014-10-30T17:21:54.282Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition myTopic,385], current > > > leader > > > > > > epoch is 5 > > > > > > > > > > > > 2014-10-30T17:21:54.283Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition [myTopic,684], current > > > > leader > > > > > > epoch is 5 > > > > > > > > > > > > 2014-10-30T17:21:54.283Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition [myTopic,1002], > current > > > > leader > > > > > > epoch is > > > > > > > > > > > > On Thu, Oct 30, 2014 at 10:03 AM, Joel Koshy < > jjkosh...@gmail.com> > > > > wrote: > > > > > > > > > > > >> Do you see any errors on the broker logs? Can you check the > > broker's > > > > > >> public access logs and see if there are topic metadata requests > > > coming > > > > > >> in from the producer? > > > > > >> > > > > > >> On Wed, Oct 29, 2014 at 07:15:15PM -0700, Rajiv Kurian wrote: > > > > > >> > I don't see anything else that is relevant. I traced the first > > of > > > > these > > > > > >> > error messages to figure out the ordering. It actually goes > > > > something > > > > > >> like > > > > > >> > this: > > > > > >> > > > > > > >> > 2014-10-30T01:51:32.400Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.082Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.422Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.664Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.902Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send > > requests > > > > for > > > > > >> > topics myTopic with correlation ids in [0,8] > > > > > >> > > > > > > >> > 2014-10-30T01:51:35.007Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.ProducerSendThread ] {}: Error in handling > > batch > > > > of 1 > > > > > >> > events > > > > > >> > > > > > > >> > kafka.common.FailedToSendMessageException: Failed to send > > messages > > > > > >> after 3 > > > > > >> > tries. > > > > > >> > > > > > > >> > at kafka.producer.async.DefaultEventHandler.handle(Unknown > > Source) > > > > > >> > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown > > > > Source) > > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at > > > > > >> > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at > > > > > >> > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > > > > >> > [scala-library-2.10.1.jar:na] > > > > > >> > > > > > > >> > at > kafka.producer.async.ProducerSendThread.processEvents(Unknown > > > > Source) > > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at kafka.producer.async.ProducerSendThread.run(Unknown Source) > > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > Thanks! > > > > > >> > > > > > > >> > On Wed, Oct 29, 2014 at 7:02 PM, Rajiv Kurian < > > > ra...@signalfuse.com > > > > > > > > > > >> wrote: > > > > > >> > > > > > > >> > > This pattern seems to repeat: > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.004Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to send > > > > requests for > > > > > >> > > topics myTopic with correlation ids in [1729,1736] > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.008Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.ProducerSendThread ] {}: Error in handling > > > > batch of > > > > > >> 4 > > > > > >> > > events > > > > > >> > > > > > > > >> > > kafka.common.FailedToSendMessageException: Failed to send > > > messages > > > > > >> after 3 > > > > > >> > > tries. > > > > > >> > > > > > > > >> > > at kafka.producer.async.DefaultEventHandler.handle(Unknown > > > Source) > > > > > >> > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > kafka.producer.async.ProducerSendThread.tryToHandle(Unknown > > > > Source) > > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > > > > > >> > > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > > > > > >> > > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > scala.collection.immutable.Stream.foreach(Stream.scala:547) > > > > > >> > > [scala-library-2.10.1.jar:na] > > > > > >> > > > > > > > >> > > at > > kafka.producer.async.ProducerSendThread.processEvents(Unknown > > > > > >> Source) > > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at kafka.producer.async.ProducerSendThread.run(Unknown > Source) > > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.025Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.174Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.356Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.644Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > On Wed, Oct 29, 2014 at 6:57 PM, Jun Rao <jun...@gmail.com> > > > > wrote: > > > > > >> > > > > > > > >> > >> The log before that will show you the cause of the error. > > Could > > > > you > > > > > >> dig > > > > > >> > >> that out? > > > > > >> > >> > > > > > >> > >> Thanks, > > > > > >> > >> > > > > > >> > >> Jun > > > > > >> > >> > > > > > >> > >> On Wed, Oct 29, 2014 at 6:43 PM, Rajiv Kurian < > > > > ra...@signalfuse.com> > > > > > >> > >> wrote: > > > > > >> > >> > > > > > >> > >> > I keep seeing these errors in my code that is just trying > > to > > > > send > > > > > >> some > > > > > >> > >> data > > > > > >> > >> > using an AsyncProducer: > > > > > >> > >> > > > > > > >> > >> > kafka.common.FailedToSendMessageException: Failed to send > > > > messages > > > > > >> > >> after 3 > > > > > >> > >> > tries. > > > > > >> > >> > > > > > > >> > >> > at > kafka.producer.async.DefaultEventHandler.handle(Unknown > > > > Source) > > > > > >> > >> > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > kafka.producer.async.ProducerSendThread.tryToHandle(Unknown > > > > > >> Source) > > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > scala.collection.immutable.Stream.foreach(Stream.scala:547) > > > > > >> > >> > [scala-library-2.10.1.jar:na] > > > > > >> > >> > > > > > > >> > >> > at > > > > kafka.producer.async.ProducerSendThread.processEvents(Unknown > > > > > >> Source) > > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at kafka.producer.async.ProducerSendThread.run(Unknown > > > Source) > > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.176Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.506Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.647Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.772Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.890Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send > > > > requests > > > > > >> for > > > > > >> > >> > topics myTopic with correlation ids in [169,176] > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.892Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.ProducerSendThread ] {}: Error in > > handling > > > > batch > > > > > >> of 29 > > > > > >> > >> > events > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > I created the topic before starting using > > > bin/kafka-topics.sh. > > > > I > > > > > >> checked > > > > > >> > >> > zookeeper and seems like the topic was indeed created. > Any > > > > ideas? > > > > > >> > >> > > > > > > >> > >> > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > >