Json, As you mentioned yourself the "NotLeaderForPartitionException" thrown from getLeaderReplicaIfLocal should be caught in the end, and hence I'm not sure why the reported stack trace "ERROR: ..." throwing the NotLeaderForPartitionException should be seen from "tryComplete". Also I have checked the source code in both 0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really tell why you could ever see the error message instead of the DEBUG-level "Broker is no longer the leader of %s, satisfy %s immediately..".
Following the 0.9.0.0 source code, since it NotLeaderForPartitionException is caught and force the delayed fetch request to be sent with some potential error code, it will not cause the replica's fetch request to be not return successfully to the fetch broker, and hence should not leader producer / consumer to fail for a long time. Similarly, since we force completing those delayed fetch requests as well, it should not cause a spam of repeated error log entries since it should at most print one entry (and should be DEBUG not ERROR) for each delayed request whose partition leaders have migrated out. Guozhang On Wed, Oct 26, 2016 at 7:46 AM, Json Tu <kafka...@126.com> wrote: > it make the cluster can not provide normal service,which leades some > producer or fetch fail for a long time before I restart current broker. > this error may be come from some formerly fetch operation which contain > this partition,which leads many fetch response error. > > The delayFetch's tryComplete() function implements as below, > override def tryComplete() : Boolean = { > var accumulatedSize = 0 > fetchMetadata.fetchPartitionStatus.foreach { > case (topicAndPartition, fetchStatus) => > val fetchOffset = fetchStatus.startOffsetMetadata > try { > if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { > val replica = > replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, > topicAndPartition.partition) > /*ignore some codes*/ > } > } catch { > /*ignore some code*/ > case nle: NotLeaderForPartitionException => // Case A > debug("Broker is no longer the leader of %s, satisfy %s > immediately".format(topicAndPartition, fetchMetadata)) > return forceComplete() > } > } > /* ignore some codes */ > } > > when meet NotLeaderForPartitionException, it will invoke forceComplete() > function, then it will invoke onComplete() function, which implements as > below, > override def onComplete() { > val logReadResults = replicaManager.readFromLocalLog( > fetchMetadata.fetchOnlyLeader, > fetchMetadata.fetchOnlyCommitted, > fetchMetadata.fetchPartitionStatus.mapValues(status => > status.fetchInfo)) > > val fetchPartitionData = logReadResults.mapValues(result => > FetchResponsePartitionData(result.errorCode, result.hw, > result.info.messageSet)) > > responseCallback(fetchPartitionData) > } > > so, I think it exit the tryComplete function in advance because of this > partition, which makes the partition latter in this request may not be > completely be satisfied and return to the fetch broker, > which leads some producer and consumer fail for a longtime,I don’t know is > it correct > > > 在 2016年10月25日,下午8:32,Json Tu <kafka...@126.com> 写道: > > > > Hi all, > > I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I > restart a broker,we find there are many logs as below, > > > > [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling > request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: > ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; > MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> > PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> > PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25] > -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> > PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> > PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> > PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] > -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] > -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> > PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] > -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] > -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> > PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> > PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> > PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> > PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> > PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> > PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> > PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] > -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] > -> > PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13] > -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] > -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> > PartitionFetchInfo(442564,1048576),[waimai_ordersa_ > topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010, > 1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902, > 1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] -> > PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> > PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> > PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> > PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> > PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] > -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> > PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] > -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] > -> > PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] > -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis) > > kafka.common.NotLeaderForPartitionException: Leader not local for > partition [retail.d.ris.spider.request,1] on broker 2141642 > > at kafka.server.ReplicaManager.getLeaderReplicaIfLocal( > ReplicaManager.scala:296) > > at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply( > DelayedFetch.scala:77) > > at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply( > DelayedFetch.scala:72) > > at scala.collection.immutable.HashMap$HashMap1.foreach( > HashMap.scala:224) > > at scala.collection.immutable.HashMap$HashTrieMap.foreach( > HashMap.scala:403) > > at scala.collection.immutable.HashMap$HashTrieMap.foreach( > HashMap.scala:403) > > at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72) > > at kafka.server.DelayedOperationPurgatory$ > Watchers.tryCompleteWatched(DelayedOperation.scala:307) > > at kafka.server.DelayedOperationPurgatory.checkAndComplete( > DelayedOperation.scala:227) > > at kafka.server.ReplicaManager.tryCompleteDelayedFetch( > ReplicaManager.scala:202) > > at kafka.cluster.Partition.tryCompleteDelayedRequests( > Partition.scala:372) > > at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294) > > at kafka.cluster.Partition.updateReplicaLogReadResult( > Partition.scala:243) > > at kafka.server.ReplicaManager$$anonfun$ > updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852) > > at kafka.server.ReplicaManager$$anonfun$ > updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849) > > at scala.collection.immutable.HashMap$HashMap1.foreach( > HashMap.scala:224) > > at scala.collection.immutable.HashMap$HashTrieMap.foreach( > HashMap.scala:403) > > at scala.collection.immutable.HashMap$HashTrieMap.foreach( > HashMap.scala:403) > > at kafka.server.ReplicaManager.updateFollowerLogReadResults( > ReplicaManager.scala:849) > > at kafka.server.ReplicaManager.fetchMessages(ReplicaManager. > scala:467) > > at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434) > > at kafka.server.KafkaApis.handle(KafkaApis.scala:69) > > at kafka.server.KafkaRequestHandler.run( > KafkaRequestHandler.scala:60) > > at java.lang.Thread.run(Thread.java:745) > > > > > > what confused me is that,retail.d.ris.spider.request is not contained > in this request,why will log it in handleFetchRequest,how can it happen and > how to resolve it? > > > > > > > -- -- Guozhang