Yeah the intention of requiring that properties is to pipe through all the configuration that goes to the producer to the partitioner. That way if your partitioner needs to query some external system it can get the configuration for that.
-Jay On Thu, Oct 30, 2014 at 5:57 PM, Rajiv Kurian <ra...@signalfuse.com> wrote: > Hmm I am not sure how to do that. Do I have access to the > VerifiableProperties object that is passed to the partitioner object? > > On Thu, Oct 30, 2014 at 5:54 PM, Jun Rao <jun...@gmail.com> wrote: > > > Potentially, you can pass in state related info through the properties > and > > use those to instantiate MyDeciderThingy. > > > > Thanks, > > > > Jun > > > > On Thu, Oct 30, 2014 at 11:46 AM, Rajiv Kurian <ra...@signalfuse.com> > > wrote: > > > > > Yes I am using the old producer. When I use the producer I do something > > > like this: > > > > > > Properties config = new Properties(); > > > > > > // put other stuff in the config. > > > > > > // Put the partitioner class that kafka will instantiate. > > > > > > config.put("partitioner.class", partitionerClass.getName()); > > > > > > return new Producer<K, M>(new ProducerConfig(config)); > > > > > > So I assume that the kafka code uses reflection to instantiate the > > > partitioner object. My partitioner object looks like this: > > > > > > public class MyPartitioner implements Partitioner<MyStuff> { > > > > > > MyDeciderThingy thingy; > > > > > > public RawBusIdPartitioner(VerifiableProperties props) {} > > > > > > > > > @Override > > > > > > public int partition(MyStuff stuff, int numPartitions) { > > > > > > return thingy.decide(stuff); > > > > > > } > > > > > > } > > > > > > The problem is I don't know how to pass a MyDeciderThingy to my > > > Partitioner object given Kafka instantiates it. > > > > > > Thanks! > > > > > > On Thu, Oct 30, 2014 at 11:34 AM, Joel Koshy <jjkosh...@gmail.com> > > wrote: > > > > > > > Not sure I follow - you just need to extend the Partitioner trait. > You > > > > don't _have_ to use that specific constructor. > > > > > > > > It is slightly different with the new producer, but looks like you > are > > > > on the old producer. > > > > > > > > On Thu, Oct 30, 2014 at 11:16:28AM -0700, Rajiv Kurian wrote: > > > > > Actually I figured out what the problem was. My producer was using > a > > > > > partitioner which was causing a null pointer exception. This > actually > > > > > raises another question for me. I want some state in my partitioner > > and > > > > the > > > > > only constructor that Kafka seems to use is this one: > > > > > > > > > > public MyPartitioner(VerifiableProperties props) {} > > > > > > > > > > How do I inject an object here that I can use to decide what > > partition > > > my > > > > > messages should go to? As an ugly hack I am using a static variable > > > from > > > > > somewhere else. > > > > > > > > > > On Thu, Oct 30, 2014 at 10:25 AM, Rajiv Kurian < > ra...@signalfuse.com > > > > > > > wrote: > > > > > > > > > > > Yes I see a ton of WARN messages on the broker logs of this form: > > > > > > > > > > > > 2014-10-30T17:21:54.281Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition [myTopic, 56], current > > > > leader > > > > > > epoch is 5 > > > > > > > > > > > > 2014-10-30T17:21:54.282Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition myTopic,385], current > > > leader > > > > > > epoch is 5 > > > > > > > > > > > > 2014-10-30T17:21:54.283Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition [myTopic,684], current > > > > leader > > > > > > epoch is 5 > > > > > > > > > > > > 2014-10-30T17:21:54.283Z WARN [kafka-request-handler-6 > > ] > > > > > > [state.change.logger ]: Broker 0 received invalid > > > > > > LeaderAndIsr request with correlation id 158 from controller 0 > > epoch > > > > 29083 > > > > > > with an older leader epoch 5 for partition [myTopic,1002], > current > > > > leader > > > > > > epoch is > > > > > > > > > > > > On Thu, Oct 30, 2014 at 10:03 AM, Joel Koshy < > jjkosh...@gmail.com> > > > > wrote: > > > > > > > > > > > >> Do you see any errors on the broker logs? Can you check the > > broker's > > > > > >> public access logs and see if there are topic metadata requests > > > coming > > > > > >> in from the producer? > > > > > >> > > > > > >> On Wed, Oct 29, 2014 at 07:15:15PM -0700, Rajiv Kurian wrote: > > > > > >> > I don't see anything else that is relevant. I traced the first > > of > > > > these > > > > > >> > error messages to figure out the ordering. It actually goes > > > > something > > > > > >> like > > > > > >> > this: > > > > > >> > > > > > > >> > 2014-10-30T01:51:32.400Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.082Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.422Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.664Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages by > > > > > >> > topic, partition due to: null > > > > > >> > > > > > > >> > 2014-10-30T01:51:34.902Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send > > requests > > > > for > > > > > >> > topics myTopic with correlation ids in [0,8] > > > > > >> > > > > > > >> > 2014-10-30T01:51:35.007Z ERROR [ProducerSendThread- > > > ] > > > > > >> > [k.producer.async.ProducerSendThread ] {}: Error in handling > > batch > > > > of 1 > > > > > >> > events > > > > > >> > > > > > > >> > kafka.common.FailedToSendMessageException: Failed to send > > messages > > > > > >> after 3 > > > > > >> > tries. > > > > > >> > > > > > > >> > at kafka.producer.async.DefaultEventHandler.handle(Unknown > > Source) > > > > > >> > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown > > > > Source) > > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at > > > > > >> > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at > > > > > >> > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:547) > > > > > >> > [scala-library-2.10.1.jar:na] > > > > > >> > > > > > > >> > at > kafka.producer.async.ProducerSendThread.processEvents(Unknown > > > > Source) > > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > at kafka.producer.async.ProducerSendThread.run(Unknown Source) > > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > >> > Thanks! > > > > > >> > > > > > > >> > On Wed, Oct 29, 2014 at 7:02 PM, Rajiv Kurian < > > > ra...@signalfuse.com > > > > > > > > > > >> wrote: > > > > > >> > > > > > > >> > > This pattern seems to repeat: > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.004Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to send > > > > requests for > > > > > >> > > topics myTopic with correlation ids in [1729,1736] > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.008Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.ProducerSendThread ] {}: Error in handling > > > > batch of > > > > > >> 4 > > > > > >> > > events > > > > > >> > > > > > > > >> > > kafka.common.FailedToSendMessageException: Failed to send > > > messages > > > > > >> after 3 > > > > > >> > > tries. > > > > > >> > > > > > > > >> > > at kafka.producer.async.DefaultEventHandler.handle(Unknown > > > Source) > > > > > >> > > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > kafka.producer.async.ProducerSendThread.tryToHandle(Unknown > > > > Source) > > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > > > > > >> > > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > > > > > >> > > > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at > scala.collection.immutable.Stream.foreach(Stream.scala:547) > > > > > >> > > [scala-library-2.10.1.jar:na] > > > > > >> > > > > > > > >> > > at > > kafka.producer.async.ProducerSendThread.processEvents(Unknown > > > > > >> Source) > > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > at kafka.producer.async.ProducerSendThread.run(Unknown > Source) > > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.025Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.174Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.356Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > 2014-10-30T01:54:46.644Z ERROR [ProducerSendThread- > > > > ] > > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate > > > > messages > > > > > >> by > > > > > >> > > topic, partition due to: null > > > > > >> > > > > > > > >> > > On Wed, Oct 29, 2014 at 6:57 PM, Jun Rao <jun...@gmail.com> > > > > wrote: > > > > > >> > > > > > > > >> > >> The log before that will show you the cause of the error. > > Could > > > > you > > > > > >> dig > > > > > >> > >> that out? > > > > > >> > >> > > > > > >> > >> Thanks, > > > > > >> > >> > > > > > >> > >> Jun > > > > > >> > >> > > > > > >> > >> On Wed, Oct 29, 2014 at 6:43 PM, Rajiv Kurian < > > > > ra...@signalfuse.com> > > > > > >> > >> wrote: > > > > > >> > >> > > > > > >> > >> > I keep seeing these errors in my code that is just trying > > to > > > > send > > > > > >> some > > > > > >> > >> data > > > > > >> > >> > using an AsyncProducer: > > > > > >> > >> > > > > > > >> > >> > kafka.common.FailedToSendMessageException: Failed to send > > > > messages > > > > > >> > >> after 3 > > > > > >> > >> > tries. > > > > > >> > >> > > > > > > >> > >> > at > kafka.producer.async.DefaultEventHandler.handle(Unknown > > > > Source) > > > > > >> > >> > ~[kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > kafka.producer.async.ProducerSendThread.tryToHandle(Unknown > > > > > >> Source) > > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > > > > > > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown > > > > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at > > > scala.collection.immutable.Stream.foreach(Stream.scala:547) > > > > > >> > >> > [scala-library-2.10.1.jar:na] > > > > > >> > >> > > > > > > >> > >> > at > > > > kafka.producer.async.ProducerSendThread.processEvents(Unknown > > > > > >> Source) > > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > at kafka.producer.async.ProducerSendThread.run(Unknown > > > Source) > > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0] > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.176Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.506Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.647Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.772Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to > > collate > > > > > >> messages by > > > > > >> > >> > topic, partition due to: null > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.890Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send > > > > requests > > > > > >> for > > > > > >> > >> > topics myTopic with correlation ids in [169,176] > > > > > >> > >> > > > > > > >> > >> > 2014-10-30T01:40:45.892Z ERROR [ProducerSendThread- > > > > > >> ] > > > > > >> > >> > [k.producer.async.ProducerSendThread ] {}: Error in > > handling > > > > batch > > > > > >> of 29 > > > > > >> > >> > events > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > I created the topic before starting using > > > bin/kafka-topics.sh. > > > > I > > > > > >> checked > > > > > >> > >> > zookeeper and seems like the topic was indeed created. > Any > > > > ideas? > > > > > >> > >> > > > > > > >> > >> > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > > > >