The key point is: if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
The high watermark only moves when all the replicas in ISR have that particular offset. Does that clarify it? Ismael On Thu, Jun 7, 2018 at 10:31 PM Carl Samuelson <c...@brightbobbin.com> wrote: > Hi > > Hopefully this is the correct email address and forum for this. > I asked this question on stack overflow, but did not get an answer: > https://stackoverflow.com/questions/50689177/kafka-ack-all-and-min-isr > > *Summary* > > The docs and code comments for Kafka suggest that when the producer setting > acks is set to allthen an ack will only be sent to the producer when *all > in-sync replicas have caught up*, but the code (Partition.Scala, > checkEnoughReplicasReachOffset) seems to suggest that the ack is sent as > soon as *min in-sync replicas have caught up*. > > *Details* > > The kafka docs have this: > > acks=all This means the leader will wait for the full set of in-sync > replicas to acknowledge the record. source > <https://kafka.apache.org/documentation/> > > Also, looking at the Kafka source code - partition.scala > checkEnoughReplicasReachOffset() has the following comment (emphasis mine): > > Note that this method will only be called if requiredAcks = -1 and we are > waiting for *all replicas*in ISR to be fully caught up to the (local) > leader's offset corresponding to this produce request before we acknowledge > the produce request. > > Finally, this answer <https://stackoverflow.com/a/45783921/746754> on > Stack > Overflow (emphasis mine again) > > Also the min in-sync replica setting specifies the minimum number of > replicas that need to be in-sync for the partition to remain available for > writes. When a producer specifies ack (-1 / all config) it will still wait > for acks from *all in sync replicas* at that moment (independent of the > setting for min in-sync replicas). > > But when I look at the code in Partition.Scala (note minIsr < > curInSyncReplicas.size): > > def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, > Errors) = { > ... > val minIsr = leaderReplica.log.get.config.minInSyncReplicas > if (leaderReplica.highWatermark.messageOffset >= requiredOffset) { > if (minIsr <= curInSyncReplicas.size) > (true, Errors.NONE) > > The code that calls this returns the ack: > > if (error != Errors.NONE || hasEnough) { > status.acksPending = false > status.responseStatus.error = error > } > > So, the code looks like it returns an ack as soon as the in-sync replica > set are greater than min in-sync replicas. However, the documentation and > comments suggest that the ack is only sent once all in-sync replicas have > caught up. What am I missing? At the very least, the comment above > checkEnoughReplicasReachOffset looks like it should be changed. > Regards, > > Carl >