OK, the example before is not enough to exposure problem.
What will happen to the situation under the numAcks is 1,and 
curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica 
only have one replica has caught up to leader,
and this replica is the leader replica itself,this is not safe when the machine 
that deploys leader partition’s broker is restart. 

current code is as belows,
if (minIsr <= curInSyncReplicas.size) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }

why not the code as belows,
if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }

Its seems that only one condition in kafka broker’s code is not enough to 
ensure safe,because replicas in curInSyncReplicas is not Strong synchronization.

> 在 2016年9月23日,下午1:45,Becket Qin <becket....@gmail.com> 写道:
> 
> In order to satisfy a produce response, there are two conditions:
> A. The leader's high watermark should be higher than the requiredOffset
> (max offset in that produce request of that partition)
> B. The number of in sync replica is greater than min.isr.
> 
> The ultimate goal here is to make sure at least min.isr number of replicas
> has caught up to requiredOffset. So the check is not only whether we have
> enough number of replicas in the isr, but also whether those replicas in
> the ISR has caught up to the required offset.
> 
> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
> produce response won't return if min.isr > 0, because
> leaderReplica.highWatermark must be less than requiredOffset given the fact
> that numAcks is 0. i.e. condition A is not met.
> 
> We are actually even doing a stronger than necessary check here.
> Theoretically as long as min.isr number of replicas has caught up to
> requiredOffset, we should be able to return the response, but we also
> require those replicas to be in the ISR.
> 
> On Thu, Sep 22, 2016 at 8:15 PM, Kafka <kafka...@126.com> wrote:
> 
>> @wangguozhang,could you give me some advices.
>> 
>>> 在 2016年9月22日,下午6:56,Kafka <kafka...@126.com> 写道:
>>> 
>>> Hi all,
>>>      in terms of topic, we create a topic with 6 partition,and each
>> with 3 replicas.
>>>       in terms of producer,when we send message with ack -1 using sync
>> interface.
>>>      in terms of brokers,we set min.insync.replicas to 2.
>>> 
>>> after we review the kafka broker’s code,we know that we send a message
>> to broker with ack -1, then we can get response if ISR of this partition is
>> great than or equal to min.insync.replicas,but what confused
>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
>>> we we restart broker which own this partitions’ leader, then controller
>> will start a new leader election, which will choose the first replica in
>> ISR that not equals to current leader as new leader, then this will loss
>> data.
>>> 
>>> 
>>> The main produce handle code shows below:
>>> val numAcks = curInSyncReplicas.count(r => {
>>>         if (!r.isLocal)
>>>           if (r.logEndOffset.messageOffset >= requiredOffset) {
>>>             trace("Replica %d of %s-%d received offset
>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>>>             true
>>>           }
>>>           else
>>>             false
>>>         else
>>>           true /* also count the local (leader) replica */
>>>       })
>>> 
>>>       trace("%d acks satisfied for %s-%d with acks =
>> -1".format(numAcks, topic, partitionId))
>>> 
>>>       val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>>> 
>>>       if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>> ) {
>>>         /*
>>>         * The topic may be configured not to accept messages if there
>> are not enough replicas in ISR
>>>         * in this scenario the request was already appended locally and
>> then added to the purgatory before the ISR was shrunk
>>>         */
>>>         if (minIsr <= curInSyncReplicas.size) {
>>>           (true, ErrorMapping.NoError)
>>>         } else {
>>>           (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>>>         }
>>>       } else
>>>         (false, ErrorMapping.NoError)
>>> 
>>> 
>>> why only logging unAcks and not use numAcks to compare with minIsr, if
>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
>> as ISR shrink procedure is not real time, does this will loss data after
>> leader election?
>>> 
>>> Feedback is greatly appreciated. Thanks.
>>> meituan.inf
>>> 
>>> 
>>> 
>> 
>> 
>> 


Reply via email to