Thanks, This is small fix to ConsoleProducer.scala only. Will use 0.8
branch.

Thanks,
Raja.


On Wed, Aug 28, 2013 at 12:49 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Rajasekar,
>
> We are trying to minimize the number of patches in 0.8 to critical bug
> fixes or broken tooling. If the patch involves significant code changes, we
> would encourage taking it on trunk. If you want to just fix the console
> producer to take the retry argument, I would think it is small enough to
> consider taking it on 0.8 branch since it affects the usability of the
> console producer.
>
> Thanks,
> Neha
>
>
> On Wed, Aug 28, 2013 at 8:36 AM, Rajasekar Elango <rela...@salesforce.com
> >wrote:
>
> > Guozhang ,
> >
> > *The documentation says I need to work off of trunk. Can you confirm If I
> > should be working in trunk or different branch.*
> > *
> > *
> > *Thanks,*
> > *Raja.*
> >
> >
> > On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Cool! You can follow the process of creating a JIRA here:
> > >
> > > http://kafka.apache.org/contributing.html
> > >
> > > And submit patch here:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow
> > >
> > > It will be great if you can also add an entry for this issue in FAQ
> > since I
> > > think this is a common question:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango <
> > rela...@salesforce.com
> > > >wrote:
> > >
> > > > Thanks Guozhang, Changing max retry to 5 worked. Since I am changing
> > > > console producer code, I can also submit patch adding both
> > > > message.send.max.retries
> > > > and retry.backoff.ms to console producer. Can you let me know
> process
> > > for
> > > > submitting patch?
> > > >
> > > > Thanks,
> > > > Raja.
> > > >
> > > >
> > > > On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Rajasekar,
> > > > >
> > > > > The remove fetcher log entry is normal under addition of
> partitions,
> > > > since
> > > > > they indicate that some leader changes have happened so brokers are
> > > > closing
> > > > > the fetchers to the old leaders.
> > > > >
> > > > > I just realized that the console Producer does not have the
> > > > > message.send.max.retries options yet. Could you file a JIRA for
> this
> > > and
> > > > I
> > > > > will followup to add this option? As for now you can hard modify
> the
> > > > > default value from 3 to a larger number.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango
> > > > > <rela...@salesforce.com>wrote:
> > > > >
> > > > > > Thanks Neha & Guozhang,
> > > > > >
> > > > > > When I ran StateChangeLogMerger, I am seeing this message
> repeated
> > 16
> > > > > times
> > > > > > for each partition:
> > > > > >
> > > > > > [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker
> 1]
> > > > > Removing
> > > > > > fetcher for partition [test-60,13]
> > > (kafka.server.ReplicaFetcherManager)
> > > > > > [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created
> > log
> > > > for
> > > > > > partition [test-60,13] in
> > > > > >
> > > /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data.
> > > > > > (kafka.log.LogManager)
> > > > > >
> > > > > > I am also seeing .log and .index files created for this topic in
> > data
> > > > > dir.
> > > > > > Also list topic command shows leaders, replicas and isrs for all
> > > > > > partitions. Do you still think increasing num of retries would
> help
> > > or
> > > > is
> > > > > > it some other issue..? Also console Producer doesn't seem to
>  have
> > > > option
> > > > > > to set num of retries. Is there a way to configure num of retries
> > for
> > > > > > console producer ?
> > > > > >
> > > > > > Thanks,
> > > > > > Raja.
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede <
> > > > neha.narkh...@gmail.com
> > > > > > >wrote:
> > > > > >
> > > > > > > As Guozhang said, your producer might give up sooner than the
> > > leader
> > > > > > > election completes for the new topic. To confirm if your
> producer
> > > > gave
> > > > > up
> > > > > > > too soon, you can run the state change log merge tool for this
> > > topic
> > > > > and
> > > > > > > see when the leader election finished for all partitions
> > > > > > >
> > > > > > > ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger
> --logs
> > > > > > <location
> > > > > > > to all state change logs> --topic <topic>
> > > > > > >
> > > > > > > Note that this tool requires you to give the state change logs
> > for
> > > > all
> > > > > > > brokers in the cluster.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Rajasekar,
> > > > > > > >
> > > > > > > > In 0.8 producers keep a cache of the partition ->
> > > leader_broker_id
> > > > > map
> > > > > > > > which is used to determine to which brokers should the
> messages
> > > be
> > > > > > sent.
> > > > > > > > After new partitions are added, the cache on the producer has
> > not
> > > > > > > populated
> > > > > > > > yet hence it will throw this exception. The producer will
> then
> > > try
> > > > to
> > > > > > > > refresh its cache by asking the brokers "who are the leaders
> of
> > > > these
> > > > > > new
> > > > > > > > partitions that I do not know of before". The brokers at the
> > > > > beginning
> > > > > > > also
> > > > > > > > do not know this information, and will only get this
> > information
> > > > from
> > > > > > > > controller which will only propagation the leader information
> > > after
> > > > > the
> > > > > > > > leader elections have all been finished.
> > > > > > > >
> > > > > > > > If you set num.retries to 3 then it is possible that producer
> > > gives
> > > > > up
> > > > > > > too
> > > > > > > > soon before the leader info ever propagated to producers,
> hence
> > > to
> > > > > > > > producers also. Could you try to increase
> producer.num.retries
> > > and
> > > > > see
> > > > > > if
> > > > > > > > the producer can eventually succeed in re-trying?
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango <
> > > > > > > rela...@salesforce.com
> > > > > > > > >wrote:
> > > > > > > >
> > > > > > > > > Hello everyone,
> > > > > > > > >
> > > > > > > > > We recently increased number of partitions from 4 to 16 and
> > > after
> > > > > > that
> > > > > > > > > console producer mostly fails with
> > LeaderNotAvailableException
> > > > and
> > > > > > > exits
> > > > > > > > > after 3 tries:
> > > > > > > > >
> > > > > > > > > Here is last few lines of console producer log:
> > > > > > > > >
> > > > > > > > > No partition metadata for topic test-41 due to
> > > > > > > > > kafka.common.LeaderNotAvailableException}] for topic
> > [test-41]:
> > > > > class
> > > > > > > > > kafka.common.LeaderNotAvailableException
> > > > > > > > >  (kafka.producer.BrokerPartitionInfo)
> > > > > > > > > [2013-08-27 08:29:30,271] ERROR Failed to collate messages
> by
> > > > > topic,
> > > > > > > > > partition due to: Failed to fetch topic metadata for topic:
> > > > test-41
> > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2013-08-27 08:29:30,271] INFO Back off for 100 ms before
> > > > retrying
> > > > > > > send.
> > > > > > > > > Remaining retries = 0
> > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > [2013-08-27 08:29:30,372] INFO Secure sockets for data
> > transfer
> > > > is
> > > > > > > > enabled
> > > > > > > > > (kafka.producer.SyncProducerConfig)
> > > > > > > > > [2013-08-27 08:29:30,372] INFO Fetching metadata from
> broker
> > > > > > > > > id:0,host:localhost,port:6667,secure:true with correlation
> > id 8
> > > > > for 1
> > > > > > > > > topic(s) Set(test-41) (kafka.client.ClientUtils$)
> > > > > > > > > [2013-08-27 08:29:30,373] INFO begin ssl handshake for
> > > localhost/
> > > > > > > > >
> > > 127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
> > > > > > > > > [2013-08-27 08:29:30,375] INFO finished ssl handshake for
> > > > > localhost/
> > > > > > > > >
> > > 127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel)
> > > > > > > > > [2013-08-27 08:29:30,375] INFO Connected to
> > localhost:6667:true
> > > > for
> > > > > > > > > producing (kafka.producer.SyncProducer)
> > > > > > > > > [2013-08-27 08:29:30,380] INFO Disconnecting from
> > > > > localhost:6667:true
> > > > > > > > > (kafka.producer.SyncProducer)
> > > > > > > > > [2013-08-27 08:29:30,381] INFO Secure sockets for data
> > transfer
> > > > is
> > > > > > > > enabled
> > > > > > > > > (kafka.producer.SyncProducerConfig)
> > > > > > > > > [2013-08-27 08:29:30,381] ERROR Failed to send requests for
> > > > topics
> > > > > > > > test-41
> > > > > > > > > with correlation ids in [0,8]
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > > > messages
> > > > > > > after
> > > > > > > > 3
> > > > > > > > > tries.
> > > > > > > > >         at
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > > >         at kafka.producer.Producer.send(Producer.scala:74)
> > > > > > > > >         at
> > > > > > > >
> kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168)
> > > > > > > > >         at
> > > > > kafka.producer.ConsoleProducer.main(ConsoleProducer.scala)
> > > > > > > > > [2013-08-27 08:29:30,383] INFO Shutting down producer
> > > > > > > > > (kafka.producer.Producer)
> > > > > > > > > [2013-08-27 08:29:30,384] INFO Closing all sync producers
> > > > > > > > > (kafka.producer.ProducerPool)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Also, this happens only for new topics (we have
> > > auto.create.topic
> > > > > set
> > > > > > > to
> > > > > > > > > true), If retry sending message to existing topic, it works
> > > fine.
> > > > > Is
> > > > > > > > there
> > > > > > > > > any tweaking I need to do to broker or to producer to scale
> > > based
> > > > > on
> > > > > > > > number
> > > > > > > > > of partitions?
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > Thanks in advance for help,
> > > > > > > > > Raja.
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Raja.
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Raja.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>



-- 
Thanks,
Raja.

Reply via email to