Thanks to guozhang. According to your suggestions,I found my new patch to kafka 0.9.0.0 may casue the problem, In delayedfetch.scala, I include import org.apache.kafka.common.errors.NotLeaderForPartitionException but not import kafka.common.NotLeaderForPartitionException for intelij auto import, so the getLeaderReplicaIfLocal’s internal throw( kafka.common.NotLeaderForPartitionException) can not be catch by tryComplete(), so it throw up to until handle, I think it may be the cause of repeated error log and other strange thing.
> 在 2016年10月27日,上午7:31,Guozhang Wang <wangg...@gmail.com> 写道: > > Json, > > As you mentioned yourself the "NotLeaderForPartitionException" thrown > from getLeaderReplicaIfLocal > should be caught in the end, and hence I'm not sure why the reported stack > trace "ERROR: ..." throwing the NotLeaderForPartitionException should be > seen from "tryComplete". Also I have checked the source code in both > 0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported > stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really > tell why you could ever see the error message instead of the > DEBUG-level "Broker > is no longer the leader of %s, satisfy %s immediately..". > > Following the 0.9.0.0 source code, since it NotLeaderForPartitionException > is caught and force the delayed fetch request to be sent with some > potential error code, it will not cause the replica's fetch request to be > not return successfully to the fetch broker, and hence should not leader > producer / consumer to fail for a long time. Similarly, since we force > completing those delayed fetch requests as well, it should not cause a spam > of repeated error log entries since it should at most print one entry (and > should be DEBUG not ERROR) for each delayed request whose partition leaders > have migrated out. > > > > Guozhang > > > > On Wed, Oct 26, 2016 at 7:46 AM, Json Tu <kafka...@126.com> wrote: > >> it make the cluster can not provide normal service,which leades some >> producer or fetch fail for a long time before I restart current broker. >> this error may be come from some formerly fetch operation which contain >> this partition,which leads many fetch response error. >> >> The delayFetch's tryComplete() function implements as below, >> override def tryComplete() : Boolean = { >> var accumulatedSize = 0 >> fetchMetadata.fetchPartitionStatus.foreach { >> case (topicAndPartition, fetchStatus) => >> val fetchOffset = fetchStatus.startOffsetMetadata >> try { >> if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { >> val replica = >> replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, >> topicAndPartition.partition) >> /*ignore some codes*/ >> } >> } catch { >> /*ignore some code*/ >> case nle: NotLeaderForPartitionException => // Case A >> debug("Broker is no longer the leader of %s, satisfy %s >> immediately".format(topicAndPartition, fetchMetadata)) >> return forceComplete() >> } >> } >> /* ignore some codes */ >> } >> >> when meet NotLeaderForPartitionException, it will invoke forceComplete() >> function, then it will invoke onComplete() function, which implements as >> below, >> override def onComplete() { >> val logReadResults = replicaManager.readFromLocalLog( >> fetchMetadata.fetchOnlyLeader, >> fetchMetadata.fetchOnlyCommitted, >> fetchMetadata.fetchPartitionStatus.mapValues(status => >> status.fetchInfo)) >> >> val fetchPartitionData = logReadResults.mapValues(result => >> FetchResponsePartitionData(result.errorCode, result.hw, >> result.info.messageSet)) >> >> responseCallback(fetchPartitionData) >> } >> >> so, I think it exit the tryComplete function in advance because of this >> partition, which makes the partition latter in this request may not be >> completely be satisfied and return to the fetch broker, >> which leads some producer and consumer fail for a longtime,I don’t know is >> it correct >> >>> 在 2016年10月25日,下午8:32,Json Tu <kafka...@126.com> 写道: >>> >>> Hi all, >>> I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I >> restart a broker,we find there are many logs as below, >>> >>> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling >> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: >> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; >> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> >> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> >> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25] >> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> >> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> >> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> >> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] >> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] >> -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> >> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] >> -> >> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] >> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> >> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> >> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> >> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> >> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> >> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> >> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> >> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] >> -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] >> -> >> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13] >> -> >> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] >> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> >> PartitionFetchInfo(442564,1048576),[waimai_ordersa_ >> topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010, >> 1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902, >> 1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] -> >> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> >> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> >> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> >> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> >> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] >> -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> >> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] >> -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] >> -> >> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] >> -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis) >>> kafka.common.NotLeaderForPartitionException: Leader not local for >> partition [retail.d.ris.spider.request,1] on broker 2141642 >>> at kafka.server.ReplicaManager.getLeaderReplicaIfLocal( >> ReplicaManager.scala:296) >>> at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply( >> DelayedFetch.scala:77) >>> at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply( >> DelayedFetch.scala:72) >>> at scala.collection.immutable.HashMap$HashMap1.foreach( >> HashMap.scala:224) >>> at scala.collection.immutable.HashMap$HashTrieMap.foreach( >> HashMap.scala:403) >>> at scala.collection.immutable.HashMap$HashTrieMap.foreach( >> HashMap.scala:403) >>> at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72) >>> at kafka.server.DelayedOperationPurgatory$ >> Watchers.tryCompleteWatched(DelayedOperation.scala:307) >>> at kafka.server.DelayedOperationPurgatory.checkAndComplete( >> DelayedOperation.scala:227) >>> at kafka.server.ReplicaManager.tryCompleteDelayedFetch( >> ReplicaManager.scala:202) >>> at kafka.cluster.Partition.tryCompleteDelayedRequests( >> Partition.scala:372) >>> at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294) >>> at kafka.cluster.Partition.updateReplicaLogReadResult( >> Partition.scala:243) >>> at kafka.server.ReplicaManager$$anonfun$ >> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852) >>> at kafka.server.ReplicaManager$$anonfun$ >> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849) >>> at scala.collection.immutable.HashMap$HashMap1.foreach( >> HashMap.scala:224) >>> at scala.collection.immutable.HashMap$HashTrieMap.foreach( >> HashMap.scala:403) >>> at scala.collection.immutable.HashMap$HashTrieMap.foreach( >> HashMap.scala:403) >>> at kafka.server.ReplicaManager.updateFollowerLogReadResults( >> ReplicaManager.scala:849) >>> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager. >> scala:467) >>> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434) >>> at kafka.server.KafkaApis.handle(KafkaApis.scala:69) >>> at kafka.server.KafkaRequestHandler.run( >> KafkaRequestHandler.scala:60) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> what confused me is that,retail.d.ris.spider.request is not contained >> in this request,why will log it in handleFetchRequest,how can it happen and >> how to resolve it? >>> >>> >> >> >> > > > -- > -- Guozhang