Oh please ignore my last reply.
I find if leaderReplica.highWatermark.messageOffset >= requiredOffset , this 
can ensure all replicas’ leo  in curInSyncReplicas is >=  the requiredOffset.

> 在 2016年9月23日,下午3:39,Kafka <kafka...@126.com> 写道:
> 
> OK, the example before is not enough to exposure problem.
> What will happen to the situation under the numAcks is 1,and 
> curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica 
> only have one replica has caught up to leader,
> and this replica is the leader replica itself,this is not safe when the 
> machine that deploys leader partition’s broker is restart. 
> 
> current code is as belows,
> if (minIsr <= curInSyncReplicas.size) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
> 
> why not the code as belows,
> if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
> 
> Its seems that only one condition in kafka broker’s code is not enough to 
> ensure safe,because replicas in curInSyncReplicas is not Strong 
> synchronization.
> 
>> 在 2016年9月23日,下午1:45,Becket Qin <becket....@gmail.com> 写道:
>> 
>> In order to satisfy a produce response, there are two conditions:
>> A. The leader's high watermark should be higher than the requiredOffset
>> (max offset in that produce request of that partition)
>> B. The number of in sync replica is greater than min.isr.
>> 
>> The ultimate goal here is to make sure at least min.isr number of replicas
>> has caught up to requiredOffset. So the check is not only whether we have
>> enough number of replicas in the isr, but also whether those replicas in
>> the ISR has caught up to the required offset.
>> 
>> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
>> produce response won't return if min.isr > 0, because
>> leaderReplica.highWatermark must be less than requiredOffset given the fact
>> that numAcks is 0. i.e. condition A is not met.
>> 
>> We are actually even doing a stronger than necessary check here.
>> Theoretically as long as min.isr number of replicas has caught up to
>> requiredOffset, we should be able to return the response, but we also
>> require those replicas to be in the ISR.
>> 
>> On Thu, Sep 22, 2016 at 8:15 PM, Kafka <kafka...@126.com> wrote:
>> 
>>> @wangguozhang,could you give me some advices.
>>> 
>>>> 在 2016年9月22日,下午6:56,Kafka <kafka...@126.com> 写道:
>>>> 
>>>> Hi all,
>>>>     in terms of topic, we create a topic with 6 partition,and each
>>> with 3 replicas.
>>>>      in terms of producer,when we send message with ack -1 using sync
>>> interface.
>>>>     in terms of brokers,we set min.insync.replicas to 2.
>>>> 
>>>> after we review the kafka broker’s code,we know that we send a message
>>> to broker with ack -1, then we can get response if ISR of this partition is
>>> great than or equal to min.insync.replicas,but what confused
>>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>>> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
>>>> we we restart broker which own this partitions’ leader, then controller
>>> will start a new leader election, which will choose the first replica in
>>> ISR that not equals to current leader as new leader, then this will loss
>>> data.
>>>> 
>>>> 
>>>> The main produce handle code shows below:
>>>> val numAcks = curInSyncReplicas.count(r => {
>>>>        if (!r.isLocal)
>>>>          if (r.logEndOffset.messageOffset >= requiredOffset) {
>>>>            trace("Replica %d of %s-%d received offset
>>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>>>>            true
>>>>          }
>>>>          else
>>>>            false
>>>>        else
>>>>          true /* also count the local (leader) replica */
>>>>      })
>>>> 
>>>>      trace("%d acks satisfied for %s-%d with acks =
>>> -1".format(numAcks, topic, partitionId))
>>>> 
>>>>      val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>>>> 
>>>>      if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>>> ) {
>>>>        /*
>>>>        * The topic may be configured not to accept messages if there
>>> are not enough replicas in ISR
>>>>        * in this scenario the request was already appended locally and
>>> then added to the purgatory before the ISR was shrunk
>>>>        */
>>>>        if (minIsr <= curInSyncReplicas.size) {
>>>>          (true, ErrorMapping.NoError)
>>>>        } else {
>>>>          (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>>>>        }
>>>>      } else
>>>>        (false, ErrorMapping.NoError)
>>>> 
>>>> 
>>>> why only logging unAcks and not use numAcks to compare with minIsr, if
>>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
>>> as ISR shrink procedure is not real time, does this will loss data after
>>> leader election?
>>>> 
>>>> Feedback is greatly appreciated. Thanks.
>>>> meituan.inf
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
> 
> 


Reply via email to