We pass in all the producer properties when instantiating the partitioner.

Thanks,

Jun

On Thu, Oct 30, 2014 at 5:57 PM, Rajiv Kurian <ra...@signalfuse.com> wrote:

> Hmm I am not sure how to do that. Do I have access to the
> VerifiableProperties object that is passed to the partitioner object?
>
> On Thu, Oct 30, 2014 at 5:54 PM, Jun Rao <jun...@gmail.com> wrote:
>
> > Potentially, you can pass in state related info through the properties
> and
> > use those to instantiate MyDeciderThingy.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 30, 2014 at 11:46 AM, Rajiv Kurian <ra...@signalfuse.com>
> > wrote:
> >
> > > Yes I am using the old producer. When I use the producer I do something
> > > like this:
> > >
> > > Properties config = new Properties();
> > >
> > > // put other stuff in the config.
> > >
> > > // Put the partitioner class that kafka will instantiate.
> > >
> > > config.put("partitioner.class", partitionerClass.getName());
> > >
> > > return new Producer<K, M>(new ProducerConfig(config));
> > >
> > > So I assume that the kafka code uses reflection to instantiate the
> > > partitioner object. My partitioner object looks like this:
> > >
> > > public class MyPartitioner implements Partitioner<MyStuff> {
> > >
> > >   MyDeciderThingy thingy;
> > >
> > >     public RawBusIdPartitioner(VerifiableProperties props) {}
> > >
> > >
> > >     @Override
> > >
> > >     public int partition(MyStuff stuff, int numPartitions) {
> > >
> > >         return thingy.decide(stuff);
> > >
> > >     }
> > >
> > > }
> > >
> > > The problem is I  don't know how to pass a MyDeciderThingy to my
> > > Partitioner object given Kafka instantiates it.
> > >
> > > Thanks!
> > >
> > > On Thu, Oct 30, 2014 at 11:34 AM, Joel Koshy <jjkosh...@gmail.com>
> > wrote:
> > >
> > > > Not sure I follow - you just need to extend the Partitioner trait.
> You
> > > > don't _have_ to use that specific constructor.
> > > >
> > > > It is slightly different with the new producer, but looks like you
> are
> > > > on the old producer.
> > > >
> > > > On Thu, Oct 30, 2014 at 11:16:28AM -0700, Rajiv Kurian wrote:
> > > > > Actually I figured out what the problem was. My producer was using
> a
> > > > > partitioner which was causing a null pointer exception. This
> actually
> > > > > raises another question for me. I want some state in my partitioner
> > and
> > > > the
> > > > > only constructor that Kafka seems to use is this one:
> > > > >
> > > > > public MyPartitioner(VerifiableProperties props) {}
> > > > >
> > > > > How do I inject an object here that I can use to decide what
> > partition
> > > my
> > > > > messages should go to? As an ugly hack I am using a static variable
> > > from
> > > > > somewhere else.
> > > > >
> > > > > On Thu, Oct 30, 2014 at 10:25 AM, Rajiv Kurian <
> ra...@signalfuse.com
> > >
> > > > wrote:
> > > > >
> > > > > > Yes I see a ton of WARN messages on the broker logs of this form:
> > > > > >
> > > > > > 2014-10-30T17:21:54.281Z WARN  [kafka-request-handler-6
> > ]
> > > > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > > > LeaderAndIsr request with correlation id 158 from controller 0
> > epoch
> > > > 29083
> > > > > > with an older leader epoch 5 for partition [myTopic, 56], current
> > > > leader
> > > > > > epoch is 5
> > > > > >
> > > > > > 2014-10-30T17:21:54.282Z WARN  [kafka-request-handler-6
> > ]
> > > > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > > > LeaderAndIsr request with correlation id 158 from controller 0
> > epoch
> > > > 29083
> > > > > > with an older leader epoch 5 for partition myTopic,385], current
> > > leader
> > > > > > epoch is 5
> > > > > >
> > > > > > 2014-10-30T17:21:54.283Z WARN  [kafka-request-handler-6
> > ]
> > > > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > > > LeaderAndIsr request with correlation id 158 from controller 0
> > epoch
> > > > 29083
> > > > > > with an older leader epoch 5 for partition [myTopic,684], current
> > > > leader
> > > > > > epoch is 5
> > > > > >
> > > > > > 2014-10-30T17:21:54.283Z WARN  [kafka-request-handler-6
> > ]
> > > > > > [state.change.logger                 ]: Broker 0 received invalid
> > > > > > LeaderAndIsr request with correlation id 158 from controller 0
> > epoch
> > > > 29083
> > > > > > with an older leader epoch 5 for partition [myTopic,1002],
> current
> > > > leader
> > > > > > epoch is
> > > > > >
> > > > > > On Thu, Oct 30, 2014 at 10:03 AM, Joel Koshy <
> jjkosh...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> Do you see any errors on the broker logs? Can you check the
> > broker's
> > > > > >> public access logs and see if there are topic metadata requests
> > > coming
> > > > > >> in from the producer?
> > > > > >>
> > > > > >> On Wed, Oct 29, 2014 at 07:15:15PM -0700, Rajiv Kurian wrote:
> > > > > >> > I don't see anything else that is relevant. I traced the first
> > of
> > > > these
> > > > > >> > error messages to figure out the ordering. It actually goes
> > > > something
> > > > > >> like
> > > > > >> > this:
> > > > > >> >
> > > > > >> > 2014-10-30T01:51:32.400Z ERROR [ProducerSendThread-
> > >   ]
> > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages by
> > > > > >> > topic, partition due to: null
> > > > > >> >
> > > > > >> > 2014-10-30T01:51:34.082Z ERROR [ProducerSendThread-
> > >   ]
> > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages by
> > > > > >> > topic, partition due to: null
> > > > > >> >
> > > > > >> > 2014-10-30T01:51:34.422Z ERROR [ProducerSendThread-
> > >   ]
> > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages by
> > > > > >> > topic, partition due to: null
> > > > > >> >
> > > > > >> > 2014-10-30T01:51:34.664Z ERROR [ProducerSendThread-
> > >   ]
> > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages by
> > > > > >> > topic, partition due to: null
> > > > > >> >
> > > > > >> > 2014-10-30T01:51:34.902Z ERROR [ProducerSendThread-
> > >   ]
> > > > > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send
> > requests
> > > > for
> > > > > >> > topics myTopic with correlation ids in [0,8]
> > > > > >> >
> > > > > >> > 2014-10-30T01:51:35.007Z ERROR [ProducerSendThread-
> > >   ]
> > > > > >> > [k.producer.async.ProducerSendThread ] {}: Error in handling
> > batch
> > > > of 1
> > > > > >> > events
> > > > > >> >
> > > > > >> > kafka.common.FailedToSendMessageException: Failed to send
> > messages
> > > > > >> after 3
> > > > > >> > tries.
> > > > > >> >
> > > > > >> > at kafka.producer.async.DefaultEventHandler.handle(Unknown
> > Source)
> > > > > >> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> >
> > > > > >> > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown
> > > > Source)
> > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> >
> > > > > >> > at
> > > > > >> >
> > > > > >>
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> >
> > > > > >> > at
> > > > > >> >
> > > > > >>
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > > > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> >
> > > > > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > > > > >> > [scala-library-2.10.1.jar:na]
> > > > > >> >
> > > > > >> > at
> kafka.producer.async.ProducerSendThread.processEvents(Unknown
> > > > Source)
> > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> >
> > > > > >> > at kafka.producer.async.ProducerSendThread.run(Unknown Source)
> > > > > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> >
> > > > > >> > Thanks!
> > > > > >> >
> > > > > >> > On Wed, Oct 29, 2014 at 7:02 PM, Rajiv Kurian <
> > > ra...@signalfuse.com
> > > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > This pattern seems to repeat:
> > > > > >> > >
> > > > > >> > > 2014-10-30T01:54:46.004Z ERROR [ProducerSendThread-
> > > >   ]
> > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to send
> > > > requests for
> > > > > >> > > topics myTopic with correlation ids in [1729,1736]
> > > > > >> > >
> > > > > >> > > 2014-10-30T01:54:46.008Z ERROR [ProducerSendThread-
> > > >   ]
> > > > > >> > > [k.producer.async.ProducerSendThread ] {}: Error in handling
> > > > batch of
> > > > > >> 4
> > > > > >> > > events
> > > > > >> > >
> > > > > >> > > kafka.common.FailedToSendMessageException: Failed to send
> > > messages
> > > > > >> after 3
> > > > > >> > > tries.
> > > > > >> > >
> > > > > >> > > at kafka.producer.async.DefaultEventHandler.handle(Unknown
> > > Source)
> > > > > >> > > ~[kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >
> > > > > >> > > at
> kafka.producer.async.ProducerSendThread.tryToHandle(Unknown
> > > > Source)
> > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >
> > > > > >> > > at
> > > > > >> > >
> > > > > >>
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >
> > > > > >> > > at
> > > > > >> > >
> > > > > >>
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > > > >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >
> > > > > >> > > at
> scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > > > > >> > > [scala-library-2.10.1.jar:na]
> > > > > >> > >
> > > > > >> > > at
> > kafka.producer.async.ProducerSendThread.processEvents(Unknown
> > > > > >> Source)
> > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >
> > > > > >> > > at kafka.producer.async.ProducerSendThread.run(Unknown
> Source)
> > > > > >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >
> > > > > >> > > 2014-10-30T01:54:46.025Z ERROR [ProducerSendThread-
> > > >   ]
> > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages
> > > > > >> by
> > > > > >> > > topic, partition due to: null
> > > > > >> > >
> > > > > >> > > 2014-10-30T01:54:46.174Z ERROR [ProducerSendThread-
> > > >   ]
> > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages
> > > > > >> by
> > > > > >> > > topic, partition due to: null
> > > > > >> > >
> > > > > >> > > 2014-10-30T01:54:46.356Z ERROR [ProducerSendThread-
> > > >   ]
> > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages
> > > > > >> by
> > > > > >> > > topic, partition due to: null
> > > > > >> > >
> > > > > >> > > 2014-10-30T01:54:46.644Z ERROR [ProducerSendThread-
> > > >   ]
> > > > > >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> > > > messages
> > > > > >> by
> > > > > >> > > topic, partition due to: null
> > > > > >> > >
> > > > > >> > > On Wed, Oct 29, 2014 at 6:57 PM, Jun Rao <jun...@gmail.com>
> > > > wrote:
> > > > > >> > >
> > > > > >> > >> The log before that will show you the cause of the error.
> > Could
> > > > you
> > > > > >> dig
> > > > > >> > >> that out?
> > > > > >> > >>
> > > > > >> > >> Thanks,
> > > > > >> > >>
> > > > > >> > >> Jun
> > > > > >> > >>
> > > > > >> > >> On Wed, Oct 29, 2014 at 6:43 PM, Rajiv Kurian <
> > > > ra...@signalfuse.com>
> > > > > >> > >> wrote:
> > > > > >> > >>
> > > > > >> > >> > I keep seeing these errors in my code that is just trying
> > to
> > > > send
> > > > > >> some
> > > > > >> > >> data
> > > > > >> > >> > using an AsyncProducer:
> > > > > >> > >> >
> > > > > >> > >> > kafka.common.FailedToSendMessageException: Failed to send
> > > > messages
> > > > > >> > >> after 3
> > > > > >> > >> > tries.
> > > > > >> > >> >
> > > > > >> > >> > at
> kafka.producer.async.DefaultEventHandler.handle(Unknown
> > > > Source)
> > > > > >> > >> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >> >
> > > > > >> > >> > at
> > > kafka.producer.async.ProducerSendThread.tryToHandle(Unknown
> > > > > >> Source)
> > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >> >
> > > > > >> > >> > at
> > > > > >> > >> >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >>
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >> >
> > > > > >> > >> > at
> > > > > >> > >> >
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >>
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> > > > > >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >> >
> > > > > >> > >> > at
> > > scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > > > > >> > >> > [scala-library-2.10.1.jar:na]
> > > > > >> > >> >
> > > > > >> > >> > at
> > > > kafka.producer.async.ProducerSendThread.processEvents(Unknown
> > > > > >> Source)
> > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >> >
> > > > > >> > >> > at kafka.producer.async.ProducerSendThread.run(Unknown
> > > Source)
> > > > > >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> > > > > >> > >> >
> > > > > >> > >> > 2014-10-30T01:40:45.176Z ERROR [ProducerSendThread-
> > > > > >> ]
> > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to
> > collate
> > > > > >> messages by
> > > > > >> > >> > topic, partition due to: null
> > > > > >> > >> >
> > > > > >> > >> > 2014-10-30T01:40:45.506Z ERROR [ProducerSendThread-
> > > > > >> ]
> > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to
> > collate
> > > > > >> messages by
> > > > > >> > >> > topic, partition due to: null
> > > > > >> > >> >
> > > > > >> > >> > 2014-10-30T01:40:45.647Z ERROR [ProducerSendThread-
> > > > > >> ]
> > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to
> > collate
> > > > > >> messages by
> > > > > >> > >> > topic, partition due to: null
> > > > > >> > >> >
> > > > > >> > >> > 2014-10-30T01:40:45.772Z ERROR [ProducerSendThread-
> > > > > >> ]
> > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to
> > collate
> > > > > >> messages by
> > > > > >> > >> > topic, partition due to: null
> > > > > >> > >> >
> > > > > >> > >> > 2014-10-30T01:40:45.890Z ERROR [ProducerSendThread-
> > > > > >> ]
> > > > > >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send
> > > > requests
> > > > > >> for
> > > > > >> > >> > topics myTopic with correlation ids in [169,176]
> > > > > >> > >> >
> > > > > >> > >> > 2014-10-30T01:40:45.892Z ERROR [ProducerSendThread-
> > > > > >> ]
> > > > > >> > >> > [k.producer.async.ProducerSendThread ] {}: Error in
> > handling
> > > > batch
> > > > > >> of 29
> > > > > >> > >> > events
> > > > > >> > >> >
> > > > > >> > >> >
> > > > > >> > >> > I created the topic before starting using
> > > bin/kafka-topics.sh.
> > > > I
> > > > > >> checked
> > > > > >> > >> > zookeeper and seems like the topic was indeed created.
> Any
> > > > ideas?
> > > > > >> > >> >
> > > > > >> > >>
> > > > > >> > >
> > > > > >> > >
> > > > > >>
> > > >
> > >
> >
>

Reply via email to