Oh please ignore my last reply. I find if leaderReplica.highWatermark.messageOffset >= requiredOffset , this can ensure all replicas’ leo in curInSyncReplicas is >= the requiredOffset.
> 在 2016年9月23日,下午3:39,Kafka <kafka...@126.com> 写道: > > OK, the example before is not enough to exposure problem. > What will happen to the situation under the numAcks is 1,and > curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica > only have one replica has caught up to leader, > and this replica is the leader replica itself,this is not safe when the > machine that deploys leader partition’s broker is restart. > > current code is as belows, > if (minIsr <= curInSyncReplicas.size) { > (true, ErrorMapping.NoError) > } else { > (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) > } > > why not the code as belows, > if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) { > (true, ErrorMapping.NoError) > } else { > (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) > } > > Its seems that only one condition in kafka broker’s code is not enough to > ensure safe,because replicas in curInSyncReplicas is not Strong > synchronization. > >> 在 2016年9月23日,下午1:45,Becket Qin <becket....@gmail.com> 写道: >> >> In order to satisfy a produce response, there are two conditions: >> A. The leader's high watermark should be higher than the requiredOffset >> (max offset in that produce request of that partition) >> B. The number of in sync replica is greater than min.isr. >> >> The ultimate goal here is to make sure at least min.isr number of replicas >> has caught up to requiredOffset. So the check is not only whether we have >> enough number of replicas in the isr, but also whether those replicas in >> the ISR has caught up to the required offset. >> >> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the >> produce response won't return if min.isr > 0, because >> leaderReplica.highWatermark must be less than requiredOffset given the fact >> that numAcks is 0. i.e. condition A is not met. >> >> We are actually even doing a stronger than necessary check here. >> Theoretically as long as min.isr number of replicas has caught up to >> requiredOffset, we should be able to return the response, but we also >> require those replicas to be in the ISR. >> >> On Thu, Sep 22, 2016 at 8:15 PM, Kafka <kafka...@126.com> wrote: >> >>> @wangguozhang,could you give me some advices. >>> >>>> 在 2016年9月22日,下午6:56,Kafka <kafka...@126.com> 写道: >>>> >>>> Hi all, >>>> in terms of topic, we create a topic with 6 partition,and each >>> with 3 replicas. >>>> in terms of producer,when we send message with ack -1 using sync >>> interface. >>>> in terms of brokers,we set min.insync.replicas to 2. >>>> >>>> after we review the kafka broker’s code,we know that we send a message >>> to broker with ack -1, then we can get response if ISR of this partition is >>> great than or equal to min.insync.replicas,but what confused >>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use >>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the >>> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most, >>>> we we restart broker which own this partitions’ leader, then controller >>> will start a new leader election, which will choose the first replica in >>> ISR that not equals to current leader as new leader, then this will loss >>> data. >>>> >>>> >>>> The main produce handle code shows below: >>>> val numAcks = curInSyncReplicas.count(r => { >>>> if (!r.isLocal) >>>> if (r.logEndOffset.messageOffset >= requiredOffset) { >>>> trace("Replica %d of %s-%d received offset >>> %d".format(r.brokerId, topic, partitionId, requiredOffset)) >>>> true >>>> } >>>> else >>>> false >>>> else >>>> true /* also count the local (leader) replica */ >>>> }) >>>> >>>> trace("%d acks satisfied for %s-%d with acks = >>> -1".format(numAcks, topic, partitionId)) >>>> >>>> val minIsr = leaderReplica.log.get.config.minInSyncReplicas >>>> >>>> if (leaderReplica.highWatermark.messageOffset >= requiredOffset >>> ) { >>>> /* >>>> * The topic may be configured not to accept messages if there >>> are not enough replicas in ISR >>>> * in this scenario the request was already appended locally and >>> then added to the purgatory before the ISR was shrunk >>>> */ >>>> if (minIsr <= curInSyncReplicas.size) { >>>> (true, ErrorMapping.NoError) >>>> } else { >>>> (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) >>>> } >>>> } else >>>> (false, ErrorMapping.NoError) >>>> >>>> >>>> why only logging unAcks and not use numAcks to compare with minIsr, if >>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return, >>> as ISR shrink procedure is not real time, does this will loss data after >>> leader election? >>>> >>>> Feedback is greatly appreciated. Thanks. >>>> meituan.inf >>>> >>>> >>>> >>> >>> >>> > >