Thanks, This is small fix to ConsoleProducer.scala only. Will use 0.8 branch.
Thanks, Raja. On Wed, Aug 28, 2013 at 12:49 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Rajasekar, > > We are trying to minimize the number of patches in 0.8 to critical bug > fixes or broken tooling. If the patch involves significant code changes, we > would encourage taking it on trunk. If you want to just fix the console > producer to take the retry argument, I would think it is small enough to > consider taking it on 0.8 branch since it affects the usability of the > console producer. > > Thanks, > Neha > > > On Wed, Aug 28, 2013 at 8:36 AM, Rajasekar Elango <rela...@salesforce.com > >wrote: > > > Guozhang , > > > > *The documentation says I need to work off of trunk. Can you confirm If I > > should be working in trunk or different branch.* > > * > > * > > *Thanks,* > > *Raja.* > > > > > > On Tue, Aug 27, 2013 at 8:33 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Cool! You can follow the process of creating a JIRA here: > > > > > > http://kafka.apache.org/contributing.html > > > > > > And submit patch here: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Git+Workflow > > > > > > It will be great if you can also add an entry for this issue in FAQ > > since I > > > think this is a common question: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ > > > > > > Guozhang > > > > > > > > > On Tue, Aug 27, 2013 at 2:38 PM, Rajasekar Elango < > > rela...@salesforce.com > > > >wrote: > > > > > > > Thanks Guozhang, Changing max retry to 5 worked. Since I am changing > > > > console producer code, I can also submit patch adding both > > > > message.send.max.retries > > > > and retry.backoff.ms to console producer. Can you let me know > process > > > for > > > > submitting patch? > > > > > > > > Thanks, > > > > Raja. > > > > > > > > > > > > On Tue, Aug 27, 2013 at 4:03 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hello Rajasekar, > > > > > > > > > > The remove fetcher log entry is normal under addition of > partitions, > > > > since > > > > > they indicate that some leader changes have happened so brokers are > > > > closing > > > > > the fetchers to the old leaders. > > > > > > > > > > I just realized that the console Producer does not have the > > > > > message.send.max.retries options yet. Could you file a JIRA for > this > > > and > > > > I > > > > > will followup to add this option? As for now you can hard modify > the > > > > > default value from 3 to a larger number. > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Tue, Aug 27, 2013 at 12:37 PM, Rajasekar Elango > > > > > <rela...@salesforce.com>wrote: > > > > > > > > > > > Thanks Neha & Guozhang, > > > > > > > > > > > > When I ran StateChangeLogMerger, I am seeing this message > repeated > > 16 > > > > > times > > > > > > for each partition: > > > > > > > > > > > > [2013-08-27 12:30:02,535] INFO [ReplicaFetcherManager on broker > 1] > > > > > Removing > > > > > > fetcher for partition [test-60,13] > > > (kafka.server.ReplicaFetcherManager) > > > > > > [2013-08-27 12:30:02,536] INFO [Log Manager on Broker 1] Created > > log > > > > for > > > > > > partition [test-60,13] in > > > > > > > > > /home/relango/dev/mandm/kafka/main/target/dist/mandm-kafka/kafka-data. > > > > > > (kafka.log.LogManager) > > > > > > > > > > > > I am also seeing .log and .index files created for this topic in > > data > > > > > dir. > > > > > > Also list topic command shows leaders, replicas and isrs for all > > > > > > partitions. Do you still think increasing num of retries would > help > > > or > > > > is > > > > > > it some other issue..? Also console Producer doesn't seem to > have > > > > option > > > > > > to set num of retries. Is there a way to configure num of retries > > for > > > > > > console producer ? > > > > > > > > > > > > Thanks, > > > > > > Raja. > > > > > > > > > > > > > > > > > > On Tue, Aug 27, 2013 at 12:52 PM, Neha Narkhede < > > > > neha.narkh...@gmail.com > > > > > > >wrote: > > > > > > > > > > > > > As Guozhang said, your producer might give up sooner than the > > > leader > > > > > > > election completes for the new topic. To confirm if your > producer > > > > gave > > > > > up > > > > > > > too soon, you can run the state change log merge tool for this > > > topic > > > > > and > > > > > > > see when the leader election finished for all partitions > > > > > > > > > > > > > > ./bin/kafka-run-class.sh kafka.tools.StateChangeLogMerger > --logs > > > > > > <location > > > > > > > to all state change logs> --topic <topic> > > > > > > > > > > > > > > Note that this tool requires you to give the state change logs > > for > > > > all > > > > > > > brokers in the cluster. > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > Neha > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 27, 2013 at 9:45 AM, Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hello Rajasekar, > > > > > > > > > > > > > > > > In 0.8 producers keep a cache of the partition -> > > > leader_broker_id > > > > > map > > > > > > > > which is used to determine to which brokers should the > messages > > > be > > > > > > sent. > > > > > > > > After new partitions are added, the cache on the producer has > > not > > > > > > > populated > > > > > > > > yet hence it will throw this exception. The producer will > then > > > try > > > > to > > > > > > > > refresh its cache by asking the brokers "who are the leaders > of > > > > these > > > > > > new > > > > > > > > partitions that I do not know of before". The brokers at the > > > > > beginning > > > > > > > also > > > > > > > > do not know this information, and will only get this > > information > > > > from > > > > > > > > controller which will only propagation the leader information > > > after > > > > > the > > > > > > > > leader elections have all been finished. > > > > > > > > > > > > > > > > If you set num.retries to 3 then it is possible that producer > > > gives > > > > > up > > > > > > > too > > > > > > > > soon before the leader info ever propagated to producers, > hence > > > to > > > > > > > > producers also. Could you try to increase > producer.num.retries > > > and > > > > > see > > > > > > if > > > > > > > > the producer can eventually succeed in re-trying? > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 27, 2013 at 8:53 AM, Rajasekar Elango < > > > > > > > rela...@salesforce.com > > > > > > > > >wrote: > > > > > > > > > > > > > > > > > Hello everyone, > > > > > > > > > > > > > > > > > > We recently increased number of partitions from 4 to 16 and > > > after > > > > > > that > > > > > > > > > console producer mostly fails with > > LeaderNotAvailableException > > > > and > > > > > > > exits > > > > > > > > > after 3 tries: > > > > > > > > > > > > > > > > > > Here is last few lines of console producer log: > > > > > > > > > > > > > > > > > > No partition metadata for topic test-41 due to > > > > > > > > > kafka.common.LeaderNotAvailableException}] for topic > > [test-41]: > > > > > class > > > > > > > > > kafka.common.LeaderNotAvailableException > > > > > > > > > (kafka.producer.BrokerPartitionInfo) > > > > > > > > > [2013-08-27 08:29:30,271] ERROR Failed to collate messages > by > > > > > topic, > > > > > > > > > partition due to: Failed to fetch topic metadata for topic: > > > > test-41 > > > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > [2013-08-27 08:29:30,271] INFO Back off for 100 ms before > > > > retrying > > > > > > > send. > > > > > > > > > Remaining retries = 0 > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > [2013-08-27 08:29:30,372] INFO Secure sockets for data > > transfer > > > > is > > > > > > > > enabled > > > > > > > > > (kafka.producer.SyncProducerConfig) > > > > > > > > > [2013-08-27 08:29:30,372] INFO Fetching metadata from > broker > > > > > > > > > id:0,host:localhost,port:6667,secure:true with correlation > > id 8 > > > > > for 1 > > > > > > > > > topic(s) Set(test-41) (kafka.client.ClientUtils$) > > > > > > > > > [2013-08-27 08:29:30,373] INFO begin ssl handshake for > > > localhost/ > > > > > > > > > > > > 127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel) > > > > > > > > > [2013-08-27 08:29:30,375] INFO finished ssl handshake for > > > > > localhost/ > > > > > > > > > > > > 127.0.0.1:6667//127.0.0.1:36640(kafka.security.SSLSocketChannel) > > > > > > > > > [2013-08-27 08:29:30,375] INFO Connected to > > localhost:6667:true > > > > for > > > > > > > > > producing (kafka.producer.SyncProducer) > > > > > > > > > [2013-08-27 08:29:30,380] INFO Disconnecting from > > > > > localhost:6667:true > > > > > > > > > (kafka.producer.SyncProducer) > > > > > > > > > [2013-08-27 08:29:30,381] INFO Secure sockets for data > > transfer > > > > is > > > > > > > > enabled > > > > > > > > > (kafka.producer.SyncProducerConfig) > > > > > > > > > [2013-08-27 08:29:30,381] ERROR Failed to send requests for > > > > topics > > > > > > > > test-41 > > > > > > > > > with correlation ids in [0,8] > > > > > > > (kafka.producer.async.DefaultEventHandler) > > > > > > > > > kafka.common.FailedToSendMessageException: Failed to send > > > > messages > > > > > > > after > > > > > > > > 3 > > > > > > > > > tries. > > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > > > > > > > > > at kafka.producer.Producer.send(Producer.scala:74) > > > > > > > > > at > > > > > > > > > kafka.producer.ConsoleProducer$.main(ConsoleProducer.scala:168) > > > > > > > > > at > > > > > kafka.producer.ConsoleProducer.main(ConsoleProducer.scala) > > > > > > > > > [2013-08-27 08:29:30,383] INFO Shutting down producer > > > > > > > > > (kafka.producer.Producer) > > > > > > > > > [2013-08-27 08:29:30,384] INFO Closing all sync producers > > > > > > > > > (kafka.producer.ProducerPool) > > > > > > > > > > > > > > > > > > > > > > > > > > > Also, this happens only for new topics (we have > > > auto.create.topic > > > > > set > > > > > > > to > > > > > > > > > true), If retry sending message to existing topic, it works > > > fine. > > > > > Is > > > > > > > > there > > > > > > > > > any tweaking I need to do to broker or to producer to scale > > > based > > > > > on > > > > > > > > number > > > > > > > > > of partitions? > > > > > > > > > > > > > > > > > > -- > > > > > > > > > Thanks in advance for help, > > > > > > > > > Raja. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Thanks, > > > > > > Raja. > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Raja. > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > Thanks, > > Raja. > > > -- Thanks, Raja.