Thanks for the KIP, Apurva. It is a good time to review the configurations
to see if we can improve the user experience. We also might need to think
from users standpoint about the out of the box experience.
01. Generally speaking, I think it makes sense to make idempotence=true so
we can enable producer side pipeline without ordering issue. However, the
impact is that users may occasionally receive OutOfOrderSequencException.
In this case, there is not much user can do if they want to ensure
ordering. They basically have to close the producer in the call back and
resend all the records that are in the RecordAccumulator. This is very
involved. And the users may not have a way to retrieve the Records in the
accumulator anymore. So for the users who really want to achieve the
exactly once semantic, there are actually still a lot of work to do even
with those default. For the rest of the users, they need to handle one more
exception, which might not be a big deal.

02. Setting acks=-1 would significantly reduce the likelihood of
OutOfOrderSequenceException from happening. However, the latency/throughput
impact and additional purgatory burden on the broker are big concerns. And
it does not really guarantee exactly once without broker side
configurations. i.e unclean.leader.election, min.isr, etc. I am not sure if
it is worth making acks=-1 a global config instead of letting the users who
are really care about this to configure correctly.

03. Regarding retries, I think we had some discussion in KIP-91. The
problem of setting retries to max integer is that producer.flush() may take
forever. Will this KIP be depending on KIP-91?

I am not sure about having a cap on the max.in.flight.requests. It seems
that on some long RTT link, sending more requests in the pipeline would be
the only way to keep the latency to be close to RTT.

Thanks,

Jiangjie (Becket) Qin


On Wed, Aug 9, 2017 at 11:28 AM, Apurva Mehta <apu...@confluent.io> wrote:

> Thanks for the comments Ismael and Jason.
>
> Regarding the OutOfOrderSequenceException, it is more likely when you
> enable idempotence and have acks=1, simply because you have a greater
> probability of losing acknowledged data with acks=1, and the error code
> indicates that.
>
> The particular scenario is that a broker acknowledges a message with
> sequence N before replication happens, and then crashes. Since the message
> was acknowledged the producer increments its sequence to N+1. The new
> leader would not have received the message, and still expects sequence N
> from the producer. When it receives N+1 for the next message, it will
> return an OutOfOrderSequenceNumber, correctl/y indicating some previously
> acknowledged messages are missing.
>
> For the idempotent producer alone, the OutOfOrderSequenceException is
> returned in the Future and Callback, indicating to the application that
> some acknowledged data was lost. However, the application can continue
> producing data using the producer instance. The only compatibility issue
> here is that the application will now see a new exception for a state which
> went previously undetected.
>
> For a transactional producer, an OutOfOrderSequenceException is fatal and
> the application must use a new instance of the producer.
>
> Another point about acks=1 with enable.idempotence=true. What semantics are
> we promising here? Essentially we are saying that the default mode would be
> 'if a message is in the log, it will occur only once, but all acknowledged
> messages may not make it to the log'. I don't think that this is a
> desirable default guarantee.
>
> I will update the KIP to indicate that with the new default, applications
> might get a new 'OutOfOrderSequenceException'.
>
> Thanks,
> Apurva
>
> On Wed, Aug 9, 2017 at 9:33 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > Hi Jason,
> >
> > Thanks for the correction. See inline.
> >
> > On Wed, Aug 9, 2017 at 5:13 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Minor correction: the OutOfOrderSequenceException is not fatal for the
> > > idempotent producer and it is not necessarily tied to the acks setting
> > > (though it is more likely to be thrown with acks=1).
> >
> >
> > Right, it would be worth expanding on the specifics of this. My
> > understanding is that common failure scenarios could trigger it.
> >
> >
> > > It is used to signal
> > > the user that there was a gap in the delivery of messages. You can hit
> > this
> > > if there is a pause on the producer and the topic retention kicks in
> and
> > > deletes the last records the producer had written. However, it is
> > possible
> > > for the user to catch it and simply keep producing (internally the
> > producer
> > > will generate a new ProducerId).
> >
> >
> > I see, our documentation states that it's fatal in the following example
> > and in the `send` method. I had overlooked that this was mentioned in the
> > context of transactions. If we were to enable idempotence by default,
> we'd
> > want to flesh out the docs for idempotence without transactions.
> >
> > * try {
> > *     producer.beginTransaction();
> > *     for (int i = 0; i < 100; i++)
> > *         producer.send(new ProducerRecord<>("my-topic",
> > Integer.toString(i), Integer.toString(i)));
> > *     producer.commitTransaction();
> > * } catch (ProducerFencedException | OutOfOrderSequenceException |
> > AuthorizationException e) {
> > *     // We can't recover from these exceptions, so our only option is
> > to close the producer and exit.
> > *     producer.close();
> > * } catch (KafkaException e) {
> > *     // For all other exceptions, just abort the transaction and try
> > again.
> > *     producer.abortTransaction();
> > * }
> > * producer.close();
> >
> > Nevertheless, pre-idempotent-producer code
> > > won't be expecting this exception, and that may cause it to break in
> > cases
> > > where it previously wouldn't. This is probably the biggest risk of the
> > > change.
> > >
> >
> > This is a good point and we should include it in the KIP.
> >
> > Ismael
> >
>

Reply via email to