Hey Apurva, Thanks for the KIP. I have read through the KIP and the prior discussion in this thread. I have three concerns that are related to Becket's comments:
- Is it true that, as Becket has mentioned, producer.flush() may block infinitely if retries=MAX_INT? This seems like a possible reason to break user's application. I think we probably should avoid causing correctness penalty for application. - It seems that OutOfOrderSequenceException will be a new exception thrown to user after this config change. Can you clarify whether this will cause correctness penalty for application? - It is not very clear to me whether the benefit of increasing acks from 1 to all is worth the performance hit. For users who have not already overridden acks to all, it is very likely that they are not already doing other complicated work (e.g. close producer in callback) that are necessary for exactly-once delivery. Thus those users won't have exactly-once semantics by simply picking up the change in the default acks configuration. It seems that the only benefit of this config change is the well-known tradeoff between performance and message loss rate. I am not sure this is a strong reason to risk reducing existing user's performance. I think my point is that we should not to make change that will break user's existing application. And we should try to avoid reducing user's performance unless there is strong benefit of doing so (e.g. exactly-once). Thanks, Dong On Wed, Aug 9, 2017 at 10:43 PM, Apurva Mehta <apu...@confluent.io> wrote: > Thanks for your email Becket. > > Your observations around using acks=1 and acks=-1 are correct. Do note that > getting an OutOfOrderSequence means that acknowledged data has been lost. > This could be due to a weaker acks setting like acks=1 or due to a topic > which is not configured to handle broker failures cleanly (unclean leader > election is enabled, etc.). Either way, you are right in observing that if > an app is very serious about having exactly one copy of each ack'd message > in the log, it is a significant effort to recover from this error. > > However, I propose an alternate way of thinking about this: is it > worthwhile shipping Kafka with the defaults tuned for strong semantics? > That is essentially what is being proposed here, and of course there will > be tradeoffs with performance and deployment costs-- you can't have your > cake and eat it too. > > And if we want to ship Kafka with strong semantics by default, we might > want to make the default topic level settings as well as the client > settings more robust. This means, for instance, disabling unclean leader > election by default. If there are other configs we need to change on the > broker side to ensure that ack'd messages are not lost due to transient > failures, we should change those as well as part of a future KIP. > > Personally, I think that the defaults should provide robust guarantees. > > And this brings me to another point: these are just proposed defaults. > Nothing is being taken away in terms of flexibility to tune for different > behavior. > > Finally, the way idempotence is implemented means that there needs to be > some cap on max.in.flight when idempotence is enabled -- that is just a > tradeoff of the feature. Do we have any data that there are installations > which benefit greatly for a value of max.in.flight > 5? For instance, > LinkedIn probably has the largest and most demanding deployment of Kafka. > Are there any applications which use max.inflight > 5? That would be good > data to have. > > Thanks, > Apurva > > > > > > On Wed, Aug 9, 2017 at 2:59 PM, Becket Qin <becket....@gmail.com> wrote: > > > Thanks for the KIP, Apurva. It is a good time to review the > configurations > > to see if we can improve the user experience. We also might need to think > > from users standpoint about the out of the box experience. > > > > 01. Generally speaking, I think it makes sense to make idempotence=true > so > > we can enable producer side pipeline without ordering issue. However, the > > impact is that users may occasionally receive OutOfOrderSequencException. > > In this case, there is not much user can do if they want to ensure > > ordering. They basically have to close the producer in the call back and > > resend all the records that are in the RecordAccumulator. This is very > > involved. And the users may not have a way to retrieve the Records in the > > accumulator anymore. So for the users who really want to achieve the > > exactly once semantic, there are actually still a lot of work to do even > > with those default. For the rest of the users, they need to handle one > more > > exception, which might not be a big deal. > > > > 02. Setting acks=-1 would significantly reduce the likelihood of > > OutOfOrderSequenceException from happening. However, the > latency/throughput > > impact and additional purgatory burden on the broker are big concerns. > And > > it does not really guarantee exactly once without broker side > > configurations. i.e unclean.leader.election, min.isr, etc. I am not sure > if > > it is worth making acks=-1 a global config instead of letting the users > who > > are really care about this to configure correctly. > > > > 03. Regarding retries, I think we had some discussion in KIP-91. The > > problem of setting retries to max integer is that producer.flush() may > take > > forever. Will this KIP be depending on KIP-91? > > > > I am not sure about having a cap on the max.in.flight.requests. It seems > > that on some long RTT link, sending more requests in the pipeline would > be > > the only way to keep the latency to be close to RTT. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Wed, Aug 9, 2017 at 11:28 AM, Apurva Mehta <apu...@confluent.io> > wrote: > > > > > Thanks for the comments Ismael and Jason. > > > > > > Regarding the OutOfOrderSequenceException, it is more likely when you > > > enable idempotence and have acks=1, simply because you have a greater > > > probability of losing acknowledged data with acks=1, and the error code > > > indicates that. > > > > > > The particular scenario is that a broker acknowledges a message with > > > sequence N before replication happens, and then crashes. Since the > > message > > > was acknowledged the producer increments its sequence to N+1. The new > > > leader would not have received the message, and still expects sequence > N > > > from the producer. When it receives N+1 for the next message, it will > > > return an OutOfOrderSequenceNumber, correctl/y indicating some > previously > > > acknowledged messages are missing. > > > > > > For the idempotent producer alone, the OutOfOrderSequenceException is > > > returned in the Future and Callback, indicating to the application that > > > some acknowledged data was lost. However, the application can continue > > > producing data using the producer instance. The only compatibility > issue > > > here is that the application will now see a new exception for a state > > which > > > went previously undetected. > > > > > > For a transactional producer, an OutOfOrderSequenceException is fatal > and > > > the application must use a new instance of the producer. > > > > > > Another point about acks=1 with enable.idempotence=true. What semantics > > are > > > we promising here? Essentially we are saying that the default mode > would > > be > > > 'if a message is in the log, it will occur only once, but all > > acknowledged > > > messages may not make it to the log'. I don't think that this is a > > > desirable default guarantee. > > > > > > I will update the KIP to indicate that with the new default, > applications > > > might get a new 'OutOfOrderSequenceException'. > > > > > > Thanks, > > > Apurva > > > > > > On Wed, Aug 9, 2017 at 9:33 AM, Ismael Juma <ism...@juma.me.uk> wrote: > > > > > > > Hi Jason, > > > > > > > > Thanks for the correction. See inline. > > > > > > > > On Wed, Aug 9, 2017 at 5:13 PM, Jason Gustafson <ja...@confluent.io> > > > > wrote: > > > > > > > > > Minor correction: the OutOfOrderSequenceException is not fatal for > > the > > > > > idempotent producer and it is not necessarily tied to the acks > > setting > > > > > (though it is more likely to be thrown with acks=1). > > > > > > > > > > > > Right, it would be worth expanding on the specifics of this. My > > > > understanding is that common failure scenarios could trigger it. > > > > > > > > > > > > > It is used to signal > > > > > the user that there was a gap in the delivery of messages. You can > > hit > > > > this > > > > > if there is a pause on the producer and the topic retention kicks > in > > > and > > > > > deletes the last records the producer had written. However, it is > > > > possible > > > > > for the user to catch it and simply keep producing (internally the > > > > producer > > > > > will generate a new ProducerId). > > > > > > > > > > > > I see, our documentation states that it's fatal in the following > > example > > > > and in the `send` method. I had overlooked that this was mentioned in > > the > > > > context of transactions. If we were to enable idempotence by default, > > > we'd > > > > want to flesh out the docs for idempotence without transactions. > > > > > > > > * try { > > > > * producer.beginTransaction(); > > > > * for (int i = 0; i < 100; i++) > > > > * producer.send(new ProducerRecord<>("my-topic", > > > > Integer.toString(i), Integer.toString(i))); > > > > * producer.commitTransaction(); > > > > * } catch (ProducerFencedException | OutOfOrderSequenceException | > > > > AuthorizationException e) { > > > > * // We can't recover from these exceptions, so our only option > is > > > > to close the producer and exit. > > > > * producer.close(); > > > > * } catch (KafkaException e) { > > > > * // For all other exceptions, just abort the transaction and try > > > > again. > > > > * producer.abortTransaction(); > > > > * } > > > > * producer.close(); > > > > > > > > Nevertheless, pre-idempotent-producer code > > > > > won't be expecting this exception, and that may cause it to break > in > > > > cases > > > > > where it previously wouldn't. This is probably the biggest risk of > > the > > > > > change. > > > > > > > > > > > > > This is a good point and we should include it in the KIP. > > > > > > > > Ismael > > > > > > > > > >