Becket,

I think this proposal actually does a great deal to address the
configuration complexity. It is true that there are a number of knobs, but
the result of this change is that 99% of people don't need to think about
them (and the mechanism we have to communicate that is to reduce the
importance setting that translates to the docs so people know these are low
level tuning things). Instead we can just focus on trying to make things
safe and fast by default with the full guarantees. Very extreme use cases
may require giving up some of the safety guarantees but I think that's
okay, those people won't necessarily want to change all the configs
together, they'll want to change just the acks setting most likely.

-Jay




On Fri, Aug 11, 2017 at 5:39 PM, Becket Qin <becket....@gmail.com> wrote:

> BTW, I feel that the configurations we have around those guarantees have
> become too complicated for the users. Not sure if this is considered before
> but Maybe we can have some helper functions provided to the users. For
> example:
>
> Properties TopicConfig.forSemantc(Semantic semantic);
> Properties ProducerConfig.forSemantc(Semantic semantic);
>
> Where the semantics are "AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE". So
> users could just pick the one they want. This would be as if we have more
> than one default config sets.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Aug 11, 2017 at 5:26 PM, Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Apurva,
> >
> > Thanks for the reply. When I was thinking of exactly once I am thinking
> of
> > "exactly once with availability", Users probably wouldn't want to
> sacrifice
> > availability for exactly once. To achieve exactly once with same
> > availability and acks=all, users actually need to pay more cost. To
> > tolerate one broker failure, one has to set replication.factor to at
> least
> > 3 and min.isr to at least 2. Do you mean we should also set those to
> > default value? Would it be a little weird because redundancy level is a
> > pretty customized decision so there is no one single correct default
> > configuration for that.
> >
> > The concern I have is that acks=-1 is not only associated with exactly
> > once semantic. I am not sure if the side effect it brings justifies a
> > default config, such as performance, cost, etc.
> >
> > From users' perspective, when idempotence=true and
> > max.in.flight.requests.per.connection > 0, ideally what acks=1 should
> > really mean is that "as long as there is no hardware failure, my message
> is
> > sent exactly once". Do you think this semantic is good enough as a
> default
> > configuration to ship? It is unfortunate this statement is not true today
> > as even when we do leader migration without any broker failure, the
> leader
> > will naively truncate the data that has not been replicated. It is a long
> > existing issue and we should try to fix that.
> >
> > For the max.in.flight.requests.per.connection, can you elaborate a
> little
> > on "Given the nature of the idempotence feature, we have to bound it.".
> > What is the concern here? It seems that when nothing wrong happens,
> > pipelining should just work. And the memory is bounded by the memory
> buffer
> > pool anyways. Sure one has to resend all the subsequent batches if one
> > batch is out of sequence, but that should be rare and we probably should
> > not optimize for that.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Aug 11, 2017 at 2:08 PM, Apurva Mehta <apu...@confluent.io>
> wrote:
> >
> >> Thanks for your email Becket. I would be interested in hearing others
> >> opinions on which should be a better default between acks=1 and
> acks=all.
> >>
> >> One important point on which I disagree is your statement that 'users
> need
> >> to do a lot of work to get exactly-once with acks=all'. This is
> debatable.
> >> If we enable acks=all,  and if we ship with sane topic-level configs
> (like
> >> disabling unclean leader election), then users will get produce
> exceptions
> >> with the default settings only for authorization and config exceptions,
> or
> >> exceptions due to correlated hard failures or software bugs (assuming
> >> replication-factor > 1, which is when acks=all and acks=1 differ). This
> >> should be sufficiently rare that expecting apps to shut down and have
> >> manual intervention to ensure data consistency is not unreasonable.
> >>
> >> So users will not have to have complicated code to ensure exactly-once
> in
> >> their app with my proposed defaults: just shut down the producer when a
> >> `send` returns an error and check manually if you really care about
> >> exactly-once. The latter should happen so rarely that I argue that it
> >> would
> >> be worth the cost. And if all else fails, there are still ways to
> recover
> >> automatically, but those are then very complex as you pointed out.
> >>
> >> Regarding max.in.flight: again, given the nature of the idempotence
> >> feature, we have to bound it. One trade off is that if you have this
> >> cross-dc use case with extremely high client/broker latency, you either
> >> accept lower performance with idempotence (and max.in.flight=5), or
> >> disable
> >> idempotence and keep max.in.flight at 20 or whatever. I think this is a
> >> fair tradeoff.
> >>
> >> Thanks,
> >> Apurva
> >>
> >>
> >> On Fri, Aug 11, 2017 at 11:45 AM, Becket Qin <becket....@gmail.com>
> >> wrote:
> >>
> >> > Hi Apurva,
> >> >
> >> > I agree that most changes we are talking about here are for default
> >> values
> >> > of the configurations and users can always override them. So I think
> the
> >> > question to ask is more about the out of the box experience. If the
> >> change
> >> > makes strict improvement compared with the current settings, that
> would
> >> > make a lot of sense. (e.g. idempotence + pipelined produce requests).
> On
> >> > the other hand, if the out of the box experience is not strictly
> >> improved,
> >> > but just default to address another scenario, we may need to think
> about
> >> > that a bit more (e.g. acks=all).
> >> >
> >> > The way I view this is the following: For the users who wants exactly
> >> once,
> >> > they need to do a lot of extra work even if we do all the right
> >> > configurations. That means for those users, they need to understand
> all
> >> the
> >> > failure cases and properly handle them. For those users, they probably
> >> > already understand (or at least needs to understand) how to configure
> >> the
> >> > cluster. So providing the default configurations for them do not
> provide
> >> > much additional benefit. For the other users, who care about low
> latency
> >> > and high throughput but not require the most strong semantic, shipping
> >> the
> >> > default settings to be the strong semantic at the cost of latency and
> >> > throughput will force them to look into the configurations and tune
> for
> >> > throughput and latency, which is something they don't need to in the
> >> > previous versions. Therefore, I feel it may not be necessary to ship
> >> Kafka
> >> > with the strongest guarantee.
> >> >
> >> > In terms of the max.in.flight.request. In some long latency pipeline,
> >> (e.g
> >> > cross ocean pipeline), the latency could be a couple of hundreds ms.
> >> > Assuming we have 10 Gbps bandwidth and 10 MB average produce request
> >> size.
> >> > When the latency is 200 ms, because each requests takes about 10 ms to
> >> > send, we need to have max.in.flight.requests ~ 20 in order to fully
> >> utilize
> >> > the network bandwidth. When the requests are smaller, we will need to
> >> > pipeline more requests.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Aug 10, 2017 at 10:43 PM, Apurva Mehta <apu...@confluent.io>
> >> > wrote:
> >> >
> >> > > Hi Dong,
> >> > >
> >> > > Thanks for your comments.
> >> > >
> >> > > Yes, with retries=MAX_INT, producer.flush() may block. I think there
> >> are
> >> > > two solutions: a good one would be to adopt some form of KIP-91 to
> >> bound
> >> > > the time a message can remain unacknowledged. Alternately, we could
> >> set
> >> > the
> >> > > default retries to 10 or something. I prefer implementing KIP-91
> along
> >> > with
> >> > > this KIP to solve this problem, but it isn't a strong dependency.
> >> > >
> >> > > Yes, OutOfOrderSequence is a new exception. It indicates a
> previously
> >> > > acknowledged message was lost. This could happen even today, but
> >> there is
> >> > > no way for the client to detect it. With KIP-98 and the new sequence
> >> > > numbers, we can. If applications ignore it, they would have the same
> >> > > behavior as the already have, except with the explicit knowledge
> that
> >> > > something has been lost.
> >> > >
> >> > > Finally, from my perspective, the best the reason to make acks=all
> the
> >> > > default is that it would be a coherent default to have. Along with
> >> > enabling
> >> > > idempotence, acks=all, and retries=MAX_INT would mean that
> >> acknowledged
> >> > > messages would appear in the log exactly once. The 'fatal'
> exceptions
> >> > would
> >> > > be either AuthorizationExceptions, ConfigExceptions, or rare data
> loss
> >> > > issues due to concurrent failures or software bugs. So while this is
> >> not
> >> > a
> >> > > guarantee of exactly once, it is practically as close to it as you
> can
> >> > get.
> >> > > I think this is a strong enough reason to enable acks=all.
> >> > >
> >> > > Thanks,
> >> > > Apurva
> >> > >
> >> > >
> >> > > On Thu, Aug 10, 2017 at 1:04 AM, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hey Apurva,
> >> > > >
> >> > > > Thanks for the KIP. I have read through the KIP and the prior
> >> > discussion
> >> > > in
> >> > > > this thread. I have three concerns that are related to Becket's
> >> > comments:
> >> > > >
> >> > > > - Is it true that, as Becket has mentioned, producer.flush() may
> >> block
> >> > > > infinitely if retries=MAX_INT? This seems like a possible reason
> to
> >> > break
> >> > > > user's application. I think we probably should avoid causing
> >> > correctness
> >> > > > penalty for application.
> >> > > >
> >> > > > - It seems that OutOfOrderSequenceException will be a new
> exception
> >> > > thrown
> >> > > > to user after this config change. Can you clarify whether this
> will
> >> > cause
> >> > > > correctness penalty for application?
> >> > > >
> >> > > > - It is not very clear to me whether the benefit of increasing
> acks
> >> > from
> >> > > 1
> >> > > > to all is worth the performance hit. For users who have not
> already
> >> > > > overridden acks to all, it is very likely that they are not
> already
> >> > doing
> >> > > > other complicated work (e.g. close producer in callback) that are
> >> > > necessary
> >> > > > for exactly-once delivery. Thus those users won't have
> exactly-once
> >> > > > semantics by simply picking up the change in the default acks
> >> > > > configuration. It seems that the only benefit of this config
> change
> >> is
> >> > > the
> >> > > > well-known tradeoff between performance and message loss rate. I
> am
> >> not
> >> > > > sure this is a strong reason to risk reducing existing user's
> >> > > performance.
> >> > > >
> >> > > > I think my point is that we should not to make change that will
> >> break
> >> > > > user's existing application. And we should try to avoid reducing
> >> user's
> >> > > > performance unless there is strong benefit of doing so (e.g.
> >> > > exactly-once).
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Wed, Aug 9, 2017 at 10:43 PM, Apurva Mehta <
> apu...@confluent.io>
> >> > > wrote:
> >> > > >
> >> > > > > Thanks for your email Becket.
> >> > > > >
> >> > > > > Your observations around using acks=1 and acks=-1 are correct.
> Do
> >> > note
> >> > > > that
> >> > > > > getting an OutOfOrderSequence means that acknowledged data has
> >> been
> >> > > lost.
> >> > > > > This could be due to a weaker acks setting like acks=1 or due
> to a
> >> > > topic
> >> > > > > which is not configured to handle broker failures cleanly
> (unclean
> >> > > leader
> >> > > > > election is enabled, etc.). Either way, you are right in
> observing
> >> > that
> >> > > > if
> >> > > > > an app is very serious about having exactly one copy of each
> ack'd
> >> > > > message
> >> > > > > in the log, it is a significant effort to recover from this
> error.
> >> > > > >
> >> > > > > However, I propose an alternate way of thinking about this: is
> it
> >> > > > > worthwhile shipping Kafka with the defaults tuned for strong
> >> > semantics?
> >> > > > > That is essentially what is being proposed here, and of course
> >> there
> >> > > will
> >> > > > > be tradeoffs with performance and deployment costs-- you can't
> >> have
> >> > > your
> >> > > > > cake and eat it too.
> >> > > > >
> >> > > > > And if we want to ship Kafka with strong semantics by default,
> we
> >> > might
> >> > > > > want to make the default topic level settings as well as the
> >> client
> >> > > > > settings more robust. This means, for instance, disabling
> unclean
> >> > > leader
> >> > > > > election by default. If there are other configs we need to
> change
> >> on
> >> > > the
> >> > > > > broker side to ensure that ack'd messages are not lost due to
> >> > transient
> >> > > > > failures, we should change those as well as part of a future
> KIP.
> >> > > > >
> >> > > > > Personally, I think that the defaults should provide robust
> >> > guarantees.
> >> > > > >
> >> > > > > And this brings me to another point: these are just proposed
> >> > defaults.
> >> > > > > Nothing is being taken away in terms of flexibility to tune for
> >> > > different
> >> > > > > behavior.
> >> > > > >
> >> > > > > Finally, the way idempotence is implemented means that there
> >> needs to
> >> > > be
> >> > > > > some cap on max.in.flight when idempotence is enabled -- that is
> >> > just a
> >> > > > > tradeoff of the feature. Do we have any data that there are
> >> > > installations
> >> > > > > which benefit greatly for a value of max.in.flight > 5? For
> >> instance,
> >> > > > > LinkedIn probably has the largest and most demanding deployment
> of
> >> > > Kafka.
> >> > > > > Are there any applications which use max.inflight > 5? That
> would
> >> be
> >> > > good
> >> > > > > data to have.
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Apurva
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Wed, Aug 9, 2017 at 2:59 PM, Becket Qin <
> becket....@gmail.com>
> >> > > wrote:
> >> > > > >
> >> > > > > > Thanks for the KIP, Apurva. It is a good time to review the
> >> > > > > configurations
> >> > > > > > to see if we can improve the user experience. We also might
> >> need to
> >> > > > think
> >> > > > > > from users standpoint about the out of the box experience.
> >> > > > > >
> >> > > > > > 01. Generally speaking, I think it makes sense to make
> >> > > idempotence=true
> >> > > > > so
> >> > > > > > we can enable producer side pipeline without ordering issue.
> >> > However,
> >> > > > the
> >> > > > > > impact is that users may occasionally receive
> >> > > > OutOfOrderSequencException.
> >> > > > > > In this case, there is not much user can do if they want to
> >> ensure
> >> > > > > > ordering. They basically have to close the producer in the
> call
> >> > back
> >> > > > and
> >> > > > > > resend all the records that are in the RecordAccumulator. This
> >> is
> >> > > very
> >> > > > > > involved. And the users may not have a way to retrieve the
> >> Records
> >> > in
> >> > > > the
> >> > > > > > accumulator anymore. So for the users who really want to
> achieve
> >> > the
> >> > > > > > exactly once semantic, there are actually still a lot of work
> >> to do
> >> > > > even
> >> > > > > > with those default. For the rest of the users, they need to
> >> handle
> >> > > one
> >> > > > > more
> >> > > > > > exception, which might not be a big deal.
> >> > > > > >
> >> > > > > > 02. Setting acks=-1 would significantly reduce the likelihood
> of
> >> > > > > > OutOfOrderSequenceException from happening. However, the
> >> > > > > latency/throughput
> >> > > > > > impact and additional purgatory burden on the broker are big
> >> > > concerns.
> >> > > > > And
> >> > > > > > it does not really guarantee exactly once without broker side
> >> > > > > > configurations. i.e unclean.leader.election, min.isr, etc. I
> am
> >> not
> >> > > > sure
> >> > > > > if
> >> > > > > > it is worth making acks=-1 a global config instead of letting
> >> the
> >> > > users
> >> > > > > who
> >> > > > > > are really care about this to configure correctly.
> >> > > > > >
> >> > > > > > 03. Regarding retries, I think we had some discussion in
> KIP-91.
> >> > The
> >> > > > > > problem of setting retries to max integer is that
> >> producer.flush()
> >> > > may
> >> > > > > take
> >> > > > > > forever. Will this KIP be depending on KIP-91?
> >> > > > > >
> >> > > > > > I am not sure about having a cap on the
> max.in.flight.requests.
> >> It
> >> > > > seems
> >> > > > > > that on some long RTT link, sending more requests in the
> >> pipeline
> >> > > would
> >> > > > > be
> >> > > > > > the only way to keep the latency to be close to RTT.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > >
> >> > > > > > Jiangjie (Becket) Qin
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Aug 9, 2017 at 11:28 AM, Apurva Mehta <
> >> apu...@confluent.io
> >> > >
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > > Thanks for the comments Ismael and Jason.
> >> > > > > > >
> >> > > > > > > Regarding the OutOfOrderSequenceException, it is more likely
> >> when
> >> > > you
> >> > > > > > > enable idempotence and have acks=1, simply because you have
> a
> >> > > greater
> >> > > > > > > probability of losing acknowledged data with acks=1, and the
> >> > error
> >> > > > code
> >> > > > > > > indicates that.
> >> > > > > > >
> >> > > > > > > The particular scenario is that a broker acknowledges a
> >> message
> >> > > with
> >> > > > > > > sequence N before replication happens, and then crashes.
> Since
> >> > the
> >> > > > > > message
> >> > > > > > > was acknowledged the producer increments its sequence to
> N+1.
> >> The
> >> > > new
> >> > > > > > > leader would not have received the message, and still
> expects
> >> > > > sequence
> >> > > > > N
> >> > > > > > > from the producer. When it receives N+1 for the next
> message,
> >> it
> >> > > will
> >> > > > > > > return an OutOfOrderSequenceNumber, correctl/y indicating
> some
> >> > > > > previously
> >> > > > > > > acknowledged messages are missing.
> >> > > > > > >
> >> > > > > > > For the idempotent producer alone, the
> >> > OutOfOrderSequenceException
> >> > > is
> >> > > > > > > returned in the Future and Callback, indicating to the
> >> > application
> >> > > > that
> >> > > > > > > some acknowledged data was lost. However, the application
> can
> >> > > > continue
> >> > > > > > > producing data using the producer instance. The only
> >> > compatibility
> >> > > > > issue
> >> > > > > > > here is that the application will now see a new exception
> for
> >> a
> >> > > state
> >> > > > > > which
> >> > > > > > > went previously undetected.
> >> > > > > > >
> >> > > > > > > For a transactional producer, an OutOfOrderSequenceException
> >> is
> >> > > fatal
> >> > > > > and
> >> > > > > > > the application must use a new instance of the producer.
> >> > > > > > >
> >> > > > > > > Another point about acks=1 with enable.idempotence=true.
> What
> >> > > > semantics
> >> > > > > > are
> >> > > > > > > we promising here? Essentially we are saying that the
> default
> >> > mode
> >> > > > > would
> >> > > > > > be
> >> > > > > > > 'if a message is in the log, it will occur only once, but
> all
> >> > > > > > acknowledged
> >> > > > > > > messages may not make it to the log'. I don't think that
> this
> >> is
> >> > a
> >> > > > > > > desirable default guarantee.
> >> > > > > > >
> >> > > > > > > I will update the KIP to indicate that with the new default,
> >> > > > > applications
> >> > > > > > > might get a new 'OutOfOrderSequenceException'.
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Apurva
> >> > > > > > >
> >> > > > > > > On Wed, Aug 9, 2017 at 9:33 AM, Ismael Juma <
> >> ism...@juma.me.uk>
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi Jason,
> >> > > > > > > >
> >> > > > > > > > Thanks for the correction. See inline.
> >> > > > > > > >
> >> > > > > > > > On Wed, Aug 9, 2017 at 5:13 PM, Jason Gustafson <
> >> > > > ja...@confluent.io>
> >> > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Minor correction: the OutOfOrderSequenceException is not
> >> > fatal
> >> > > > for
> >> > > > > > the
> >> > > > > > > > > idempotent producer and it is not necessarily tied to
> the
> >> > acks
> >> > > > > > setting
> >> > > > > > > > > (though it is more likely to be thrown with acks=1).
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Right, it would be worth expanding on the specifics of
> >> this. My
> >> > > > > > > > understanding is that common failure scenarios could
> trigger
> >> > it.
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > > It is used to signal
> >> > > > > > > > > the user that there was a gap in the delivery of
> messages.
> >> > You
> >> > > > can
> >> > > > > > hit
> >> > > > > > > > this
> >> > > > > > > > > if there is a pause on the producer and the topic
> >> retention
> >> > > kicks
> >> > > > > in
> >> > > > > > > and
> >> > > > > > > > > deletes the last records the producer had written.
> >> However,
> >> > it
> >> > > is
> >> > > > > > > > possible
> >> > > > > > > > > for the user to catch it and simply keep producing
> >> > (internally
> >> > > > the
> >> > > > > > > > producer
> >> > > > > > > > > will generate a new ProducerId).
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > I see, our documentation states that it's fatal in the
> >> > following
> >> > > > > > example
> >> > > > > > > > and in the `send` method. I had overlooked that this was
> >> > > mentioned
> >> > > > in
> >> > > > > > the
> >> > > > > > > > context of transactions. If we were to enable idempotence
> by
> >> > > > default,
> >> > > > > > > we'd
> >> > > > > > > > want to flesh out the docs for idempotence without
> >> > transactions.
> >> > > > > > > >
> >> > > > > > > > * try {
> >> > > > > > > > *     producer.beginTransaction();
> >> > > > > > > > *     for (int i = 0; i < 100; i++)
> >> > > > > > > > *         producer.send(new ProducerRecord<>("my-topic",
> >> > > > > > > > Integer.toString(i), Integer.toString(i)));
> >> > > > > > > > *     producer.commitTransaction();
> >> > > > > > > > * } catch (ProducerFencedException |
> >> > OutOfOrderSequenceException
> >> > > |
> >> > > > > > > > AuthorizationException e) {
> >> > > > > > > > *     // We can't recover from these exceptions, so our
> only
> >> > > option
> >> > > > > is
> >> > > > > > > > to close the producer and exit.
> >> > > > > > > > *     producer.close();
> >> > > > > > > > * } catch (KafkaException e) {
> >> > > > > > > > *     // For all other exceptions, just abort the
> >> transaction
> >> > and
> >> > > > try
> >> > > > > > > > again.
> >> > > > > > > > *     producer.abortTransaction();
> >> > > > > > > > * }
> >> > > > > > > > * producer.close();
> >> > > > > > > >
> >> > > > > > > > Nevertheless, pre-idempotent-producer code
> >> > > > > > > > > won't be expecting this exception, and that may cause it
> >> to
> >> > > break
> >> > > > > in
> >> > > > > > > > cases
> >> > > > > > > > > where it previously wouldn't. This is probably the
> biggest
> >> > risk
> >> > > > of
> >> > > > > > the
> >> > > > > > > > > change.
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > > This is a good point and we should include it in the KIP.
> >> > > > > > > >
> >> > > > > > > > Ismael
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to