Not sure I follow - you just need to extend the Partitioner trait. You
don't _have_ to use that specific constructor.

It is slightly different with the new producer, but looks like you are
on the old producer.

On Thu, Oct 30, 2014 at 11:16:28AM -0700, Rajiv Kurian wrote:
> Actually I figured out what the problem was. My producer was using a
> partitioner which was causing a null pointer exception. This actually
> raises another question for me. I want some state in my partitioner and the
> only constructor that Kafka seems to use is this one:
> 
> public MyPartitioner(VerifiableProperties props) {}
> 
> How do I inject an object here that I can use to decide what partition my
> messages should go to? As an ugly hack I am using a static variable from
> somewhere else.
> 
> On Thu, Oct 30, 2014 at 10:25 AM, Rajiv Kurian <ra...@signalfuse.com> wrote:
> 
> > Yes I see a ton of WARN messages on the broker logs of this form:
> >
> > 2014-10-30T17:21:54.281Z WARN  [kafka-request-handler-6            ]
> > [state.change.logger                 ]: Broker 0 received invalid
> > LeaderAndIsr request with correlation id 158 from controller 0 epoch 29083
> > with an older leader epoch 5 for partition [myTopic, 56], current leader
> > epoch is 5
> >
> > 2014-10-30T17:21:54.282Z WARN  [kafka-request-handler-6            ]
> > [state.change.logger                 ]: Broker 0 received invalid
> > LeaderAndIsr request with correlation id 158 from controller 0 epoch 29083
> > with an older leader epoch 5 for partition myTopic,385], current leader
> > epoch is 5
> >
> > 2014-10-30T17:21:54.283Z WARN  [kafka-request-handler-6            ]
> > [state.change.logger                 ]: Broker 0 received invalid
> > LeaderAndIsr request with correlation id 158 from controller 0 epoch 29083
> > with an older leader epoch 5 for partition [myTopic,684], current leader
> > epoch is 5
> >
> > 2014-10-30T17:21:54.283Z WARN  [kafka-request-handler-6            ]
> > [state.change.logger                 ]: Broker 0 received invalid
> > LeaderAndIsr request with correlation id 158 from controller 0 epoch 29083
> > with an older leader epoch 5 for partition [myTopic,1002], current leader
> > epoch is
> >
> > On Thu, Oct 30, 2014 at 10:03 AM, Joel Koshy <jjkosh...@gmail.com> wrote:
> >
> >> Do you see any errors on the broker logs? Can you check the broker's
> >> public access logs and see if there are topic metadata requests coming
> >> in from the producer?
> >>
> >> On Wed, Oct 29, 2014 at 07:15:15PM -0700, Rajiv Kurian wrote:
> >> > I don't see anything else that is relevant. I traced the first of these
> >> > error messages to figure out the ordering. It actually goes something
> >> like
> >> > this:
> >> >
> >> > 2014-10-30T01:51:32.400Z ERROR [ProducerSendThread-                ]
> >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages by
> >> > topic, partition due to: null
> >> >
> >> > 2014-10-30T01:51:34.082Z ERROR [ProducerSendThread-                ]
> >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages by
> >> > topic, partition due to: null
> >> >
> >> > 2014-10-30T01:51:34.422Z ERROR [ProducerSendThread-                ]
> >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages by
> >> > topic, partition due to: null
> >> >
> >> > 2014-10-30T01:51:34.664Z ERROR [ProducerSendThread-                ]
> >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages by
> >> > topic, partition due to: null
> >> >
> >> > 2014-10-30T01:51:34.902Z ERROR [ProducerSendThread-                ]
> >> > [k.producer.async.DefaultEventHandler] {}: Failed to send requests for
> >> > topics myTopic with correlation ids in [0,8]
> >> >
> >> > 2014-10-30T01:51:35.007Z ERROR [ProducerSendThread-                ]
> >> > [k.producer.async.ProducerSendThread ] {}: Error in handling batch of 1
> >> > events
> >> >
> >> > kafka.common.FailedToSendMessageException: Failed to send messages
> >> after 3
> >> > tries.
> >> >
> >> > at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
> >> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >> >
> >> > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown Source)
> >> > [kafka_2.10-0.8.0.jar:0.8.0]
> >> >
> >> > at
> >> >
> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> >> >
> >> > at
> >> >
> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> >> >
> >> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >> > [scala-library-2.10.1.jar:na]
> >> >
> >> > at kafka.producer.async.ProducerSendThread.processEvents(Unknown Source)
> >> > [kafka_2.10-0.8.0.jar:0.8.0]
> >> >
> >> > at kafka.producer.async.ProducerSendThread.run(Unknown Source)
> >> > [kafka_2.10-0.8.0.jar:0.8.0]
> >> >
> >> > Thanks!
> >> >
> >> > On Wed, Oct 29, 2014 at 7:02 PM, Rajiv Kurian <ra...@signalfuse.com>
> >> wrote:
> >> >
> >> > > This pattern seems to repeat:
> >> > >
> >> > > 2014-10-30T01:54:46.004Z ERROR [ProducerSendThread-                ]
> >> > > [k.producer.async.DefaultEventHandler] {}: Failed to send requests for
> >> > > topics myTopic with correlation ids in [1729,1736]
> >> > >
> >> > > 2014-10-30T01:54:46.008Z ERROR [ProducerSendThread-                ]
> >> > > [k.producer.async.ProducerSendThread ] {}: Error in handling batch of
> >> 4
> >> > > events
> >> > >
> >> > > kafka.common.FailedToSendMessageException: Failed to send messages
> >> after 3
> >> > > tries.
> >> > >
> >> > > at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
> >> > > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >> > >
> >> > > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown Source)
> >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >
> >> > > at
> >> > >
> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >
> >> > > at
> >> > >
> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> >> > > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >
> >> > > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >> > > [scala-library-2.10.1.jar:na]
> >> > >
> >> > > at kafka.producer.async.ProducerSendThread.processEvents(Unknown
> >> Source)
> >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >
> >> > > at kafka.producer.async.ProducerSendThread.run(Unknown Source)
> >> > > [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >
> >> > > 2014-10-30T01:54:46.025Z ERROR [ProducerSendThread-                ]
> >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages
> >> by
> >> > > topic, partition due to: null
> >> > >
> >> > > 2014-10-30T01:54:46.174Z ERROR [ProducerSendThread-                ]
> >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages
> >> by
> >> > > topic, partition due to: null
> >> > >
> >> > > 2014-10-30T01:54:46.356Z ERROR [ProducerSendThread-                ]
> >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages
> >> by
> >> > > topic, partition due to: null
> >> > >
> >> > > 2014-10-30T01:54:46.644Z ERROR [ProducerSendThread-                ]
> >> > > [k.producer.async.DefaultEventHandler] {}: Failed to collate messages
> >> by
> >> > > topic, partition due to: null
> >> > >
> >> > > On Wed, Oct 29, 2014 at 6:57 PM, Jun Rao <jun...@gmail.com> wrote:
> >> > >
> >> > >> The log before that will show you the cause of the error. Could you
> >> dig
> >> > >> that out?
> >> > >>
> >> > >> Thanks,
> >> > >>
> >> > >> Jun
> >> > >>
> >> > >> On Wed, Oct 29, 2014 at 6:43 PM, Rajiv Kurian <ra...@signalfuse.com>
> >> > >> wrote:
> >> > >>
> >> > >> > I keep seeing these errors in my code that is just trying to send
> >> some
> >> > >> data
> >> > >> > using an AsyncProducer:
> >> > >> >
> >> > >> > kafka.common.FailedToSendMessageException: Failed to send messages
> >> > >> after 3
> >> > >> > tries.
> >> > >> >
> >> > >> > at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
> >> > >> > ~[kafka_2.10-0.8.0.jar:0.8.0]
> >> > >> >
> >> > >> > at kafka.producer.async.ProducerSendThread.tryToHandle(Unknown
> >> Source)
> >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >> >
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(Unknown
> >> > >> > Source) [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >> >
> >> > >> > at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >> > >> > [scala-library-2.10.1.jar:na]
> >> > >> >
> >> > >> > at kafka.producer.async.ProducerSendThread.processEvents(Unknown
> >> Source)
> >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >> >
> >> > >> > at kafka.producer.async.ProducerSendThread.run(Unknown Source)
> >> > >> > [kafka_2.10-0.8.0.jar:0.8.0]
> >> > >> >
> >> > >> > 2014-10-30T01:40:45.176Z ERROR [ProducerSendThread-
> >> ]
> >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> >> messages by
> >> > >> > topic, partition due to: null
> >> > >> >
> >> > >> > 2014-10-30T01:40:45.506Z ERROR [ProducerSendThread-
> >> ]
> >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> >> messages by
> >> > >> > topic, partition due to: null
> >> > >> >
> >> > >> > 2014-10-30T01:40:45.647Z ERROR [ProducerSendThread-
> >> ]
> >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> >> messages by
> >> > >> > topic, partition due to: null
> >> > >> >
> >> > >> > 2014-10-30T01:40:45.772Z ERROR [ProducerSendThread-
> >> ]
> >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to collate
> >> messages by
> >> > >> > topic, partition due to: null
> >> > >> >
> >> > >> > 2014-10-30T01:40:45.890Z ERROR [ProducerSendThread-
> >> ]
> >> > >> > [k.producer.async.DefaultEventHandler] {}: Failed to send requests
> >> for
> >> > >> > topics myTopic with correlation ids in [169,176]
> >> > >> >
> >> > >> > 2014-10-30T01:40:45.892Z ERROR [ProducerSendThread-
> >> ]
> >> > >> > [k.producer.async.ProducerSendThread ] {}: Error in handling batch
> >> of 29
> >> > >> > events
> >> > >> >
> >> > >> >
> >> > >> > I created the topic before starting using bin/kafka-topics.sh. I
> >> checked
> >> > >> > zookeeper and seems like the topic was indeed created. Any ideas?
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >>

Reply via email to