Thanks to guozhang.
According to your suggestions,I found my new patch to kafka 0.9.0.0 may casue 
the problem,
In delayedfetch.scala,  I include import 
org.apache.kafka.common.errors.NotLeaderForPartitionException but not import 
kafka.common.NotLeaderForPartitionException for intelij auto import,
so the getLeaderReplicaIfLocal’s internal throw( 
kafka.common.NotLeaderForPartitionException) can not be catch by tryComplete(), 
so it throw up to until handle, I think it may be the cause of repeated error 
log and other strange thing.

> 在 2016年10月27日,上午7:31,Guozhang Wang <wangg...@gmail.com> 写道:
> 
> Json,
> 
> As you mentioned yourself the "NotLeaderForPartitionException" thrown
> from getLeaderReplicaIfLocal
> should be caught in the end, and hence I'm not sure why the reported stack
> trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
> seen from "tryComplete". Also I have checked the source code in both
> 0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
> stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
> tell why you could ever see the error message instead of the
> DEBUG-level "Broker
> is no longer the leader of %s, satisfy %s immediately..".
> 
> Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
> is caught and force the delayed fetch request to be sent with some
> potential error code, it will not cause the replica's fetch request to be
> not return successfully to the fetch broker, and hence should not leader
> producer / consumer to fail for a long time. Similarly, since we force
> completing those delayed fetch requests as well, it should not cause a spam
> of repeated error log entries since it should at most print one entry (and
> should be DEBUG not ERROR) for each delayed request whose partition leaders
> have migrated out.
> 
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Oct 26, 2016 at 7:46 AM, Json Tu <kafka...@126.com> wrote:
> 
>> it make the cluster can not provide normal service,which leades some
>> producer or fetch fail for a long time before I restart current broker.
>> this error may be come from some formerly fetch operation which contain
>> this partition,which leads many fetch response error.
>> 
>> The delayFetch's tryComplete() function implements as below,
>> override def tryComplete() : Boolean = {
>> var accumulatedSize = 0
>> fetchMetadata.fetchPartitionStatus.foreach {
>>   case (topicAndPartition, fetchStatus) =>
>>     val fetchOffset = fetchStatus.startOffsetMetadata
>>     try {
>>       if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>>         val replica = 
>> replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
>> topicAndPartition.partition)
>>         /*ignore some codes*/
>>       }
>>     } catch {
>>       /*ignore some code*/
>>       case nle: NotLeaderForPartitionException =>  // Case A
>>         debug("Broker is no longer the leader of %s, satisfy %s
>> immediately".format(topicAndPartition, fetchMetadata))
>>         return forceComplete()
>>     }
>> }
>> /* ignore some codes */
>> }
>> 
>> when meet NotLeaderForPartitionException, it will invoke forceComplete()
>> function, then it will invoke onComplete() function, which implements as
>> below,
>> override def onComplete() {
>> val logReadResults = replicaManager.readFromLocalLog(
>> fetchMetadata.fetchOnlyLeader,
>>   fetchMetadata.fetchOnlyCommitted,
>>   fetchMetadata.fetchPartitionStatus.mapValues(status =>
>> status.fetchInfo))
>> 
>> val fetchPartitionData = logReadResults.mapValues(result =>
>>   FetchResponsePartitionData(result.errorCode, result.hw,
>> result.info.messageSet))
>> 
>> responseCallback(fetchPartitionData)
>> }
>> 
>> so, I think it exit the tryComplete function in advance because of this
>> partition, which makes the partition latter in this request may not be
>> completely be satisfied and return to the fetch broker,
>> which leads some producer and consumer fail for a longtime,I don’t know is
>> it correct
>> 
>>> 在 2016年10月25日,下午8:32,Json Tu <kafka...@126.com> 写道:
>>> 
>>> Hi all,
>>>      I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
>> restart a broker,we find there are many logs as below,
>>> 
>>> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
>> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
>> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
>> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] ->
>> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] ->
>> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
>> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] ->
>> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] ->
>> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] ->
>> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28]
>> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0]
>> -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] ->
>> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14]
>> -> 
>> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31]
>> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] ->
>> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] ->
>> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] ->
>> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] ->
>> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] ->
>> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] ->
>> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1]
>> -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0]
>> -> 
>> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
>> -> 
>> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55]
>> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] ->
>> PartitionFetchInfo(442564,1048576),[waimai_ordersa_
>> topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,
>> 1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,
>> 1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] ->
>> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] ->
>> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] ->
>> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15]
>> -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] ->
>> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24]
>> -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18]
>> -> 
>> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36]
>> -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
>>> kafka.common.NotLeaderForPartitionException: Leader not local for
>> partition [retail.d.ris.spider.request,1] on broker 2141642
>>>       at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(
>> ReplicaManager.scala:296)
>>>       at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
>> DelayedFetch.scala:77)
>>>       at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
>> DelayedFetch.scala:72)
>>>       at scala.collection.immutable.HashMap$HashMap1.foreach(
>> HashMap.scala:224)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
>>>       at kafka.server.DelayedOperationPurgatory$
>> Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>>>       at kafka.server.DelayedOperationPurgatory.checkAndComplete(
>> DelayedOperation.scala:227)
>>>       at kafka.server.ReplicaManager.tryCompleteDelayedFetch(
>> ReplicaManager.scala:202)
>>>       at kafka.cluster.Partition.tryCompleteDelayedRequests(
>> Partition.scala:372)
>>>       at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
>>>       at kafka.cluster.Partition.updateReplicaLogReadResult(
>> Partition.scala:243)
>>>       at kafka.server.ReplicaManager$$anonfun$
>> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
>>>       at kafka.server.ReplicaManager$$anonfun$
>> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
>>>       at scala.collection.immutable.HashMap$HashMap1.foreach(
>> HashMap.scala:224)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at kafka.server.ReplicaManager.updateFollowerLogReadResults(
>> ReplicaManager.scala:849)
>>>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.
>> scala:467)
>>>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
>>>       at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>>>       at kafka.server.KafkaRequestHandler.run(
>> KafkaRequestHandler.scala:60)
>>>       at java.lang.Thread.run(Thread.java:745)
>>> 
>>> 
>>> what confused me is that,retail.d.ris.spider.request is not contained
>> in this request,why will log it in handleFetchRequest,how can it happen and
>> how to resolve it?
>>> 
>>> 
>> 
>> 
>> 
> 
> 
> -- 
> -- Guozhang


Reply via email to