OK. Looks like starting the clock after closing the batch has quite a few
pitfalls. I can't think of a way of to work around it without adding yet
another config. So I won't discuss that here. Anyone? As I said earlier,
I'm not hung up on super-accurate notification times.

If we are going down the max.message.delievery.wait.ms route, what would be
the default? There seem to be a few options.

1. max.message.delievery.wait.ms=null. Nothing changes for those who don't
set it. I.e., batches expire after request.timeout.ms in accumulator. If
they are past the accumulator stage, timeout after retries*(
request.timeout.ms+backoff).

2. max.message.delivery.wait.ms=request.timeout.ms. No obervable behavioral
change at the accumulator level as timeout value is same as before. Retries
will be done if as long as batch is under max.message.delivery.wait.ms.
However, a batch can expire just after one try. That's ok IMO because
request.timeout.ms tend to be large (Default 30000).

3. max.message.delivery.wait.ms=2*request.timeout.ms. Give opportunity for
two retries but warn that retries may not happen at all in some rare
cases and a batch could expire before any attempt.

4. max.message.delivery.wait.ms=something else (a constant?)

Thoughts?

On 23 August 2017 at 09:01, Ismael Juma <ism...@juma.me.uk> wrote:

> Thanks Becket, that seems reasonable. Sumant, would you be willing to
> update the KIP based on the discussion or are you still not convinced?
>
> Ismael
>
> On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > In general max.message.delivery.wait.ms is a cleaner approach. That
> would
> > make the guarantee clearer. That said, there seem subtleties in some
> > scenarios:
> >
> > 1. I agree with Sumante that it is a little weird that a message could be
> > expired immediately if it happens to enter a batch that is about to be
> > expired. But as Jun said, as long as we have multiple messages in a
> batch,
> > there isn't a cheap way to achieve a precise timeout. So the question
> > actually becomes whether it is more user-friendly to expire early (based
> on
> > the batch creation time) or expire late (based on the batch close time).
> I
> > think both are acceptable. Personally I think most users do not really
> care
> > about expire a little late as long as it eventually expires. So I would
> use
> > batch close time as long as there is a bound on that. But it looks that
> we
> > do not really have a bound on when we will close a batch. So expiration
> > based on batch create time may be the only option if we don't want to
> > introduce complexity.
> >
> > 2. If we timeout a batch in a request when it is still in flight, the end
> > result of that batch is unclear to the users. It would be weird that user
> > receive exception saying those messages are expired while they actually
> > have been sent successfully. Also if idempotence is set to true, what
> would
> > the next sequence ID be after the expired batch? Reusing the same
> sequence
> > Id may result in data loss, and increment the sequence ID may cause
> > OutOfOrderSequenceException. Besides, extracting an expired batch from a
> > request also introduces some complexity. Again, personally I think it is
> > fine to expire a little bit late. So maybe we don't need to expire a
> batch
> > that is already in flight. In the worst case we will expire it with delay
> > of request.timeout.ms.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> >> Hi all,
> >>
> >> The discussion has been going on for a while, would it help to have a
> >> call to discuss this? I'd like to start a vote soonish so that we can
> >> include this in 1.0.0. I personally prefer max.message.delivery.wait.ms
> .
> >> It seems like Jun, Apurva and Jason also prefer that. Sumant, it seems
> like
> >> you still prefer a batch.expiry.ms, is that right? What are your
> >> thoughts Joel and Becket?
> >>
> >> Ismael
> >>
> >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> wrote:
> >>
> >>> Hi, Sumant,
> >>>
> >>> The semantics of linger.ms is a bit subtle. The reasoning for the
> >>> current
> >>> implementation is the following. Let's say one sets linger.ms to 0
> (our
> >>> current default value). Creating a batch for every message will be bad
> >>> for
> >>> throughput. Instead, the current implementation only forms a batch when
> >>> the
> >>> batch is sendable (i.e., broker is available, inflight request limit is
> >>> not
> >>> exceeded, etc). That way, the producer has more chance for batching.
> The
> >>> implication is that a batch could be closed longer than linger.ms.
> >>>
> >>> Now, on your concern about not having a precise way to control delay in
> >>> the
> >>> accumulator. It seems the batch.expiry.ms approach will have the same
> >>> issue. If you start the clock when a batch is initialized, you may
> expire
> >>> some messages in the same batch early than batch.expiry.ms. If you
> start
> >>> the clock when the batch is closed, the expiration time could be
> >>> unbounded
> >>> because of the linger.ms implementation described above. Starting the
> >>> expiration clock on batch initialization will at least guarantee the
> time
> >>> to expire the first message is precise, which is probably good enough.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>>
> >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <suta...@gmail.com>
> wrote:
> >>>
> >>> > Question about "the closing of a batch can be delayed longer than
> >>> > linger.ms":
> >>> > Is it possible to cause an indefinite delay? At some point bytes
> limit
> >>> > might kick in. Also, why is closing of a batch coupled with
> >>> availability of
> >>> > its destination? In this approach a batch chosen for eviction due to
> >>> delay
> >>> > needs to "close" anyway, right (without regards to destination
> >>> > availability)?
> >>> >
> >>> > I'm not too worried about notifying at super-exact time specified in
> >>> the
> >>> > configs. But expiring before the full wait-span has elapsed sounds a
> >>> little
> >>> > weird. So expiration time has a +/- spread. It works more like a hint
> >>> than
> >>> > max. So why not message.delivery.wait.hint.ms?
> >>> >
> >>> > Yeah, cancellable future will be similar in complexity.
> >>> >
> >>> > I'm unsure if max.message.delivery.wait.ms will the final nail for
> >>> > producer
> >>> > timeouts. We still won't have a precise way to control delay in just
> >>> the
> >>> > accumulator segment. batch.expiry.ms does not try to abstract. It's
> >>> very
> >>> > specific.
> >>> >
> >>> > My biggest concern at the moment is implementation complexity.
> >>> >
> >>> > At this state, I would like to encourage other independent opinions.
> >>> >
> >>> > Regards,
> >>> > Sumant
> >>> >
> >>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote:
> >>> >
> >>> > > Hi, Sumant,
> >>> > >
> >>> > > 1. Yes, it's probably reasonable to require
> >>> max.message.delivery.wait.ms
> >>> > >
> >>> > > linger.ms. As for retries, perhaps we can set the default retries
> to
> >>> > > infinite or just ignore it. Then the latency will be bounded by
> >>> > > max.message.delivery.wait.ms. request.timeout.ms is the max time
> the
> >>> > > request will be spending on the server. The client can expire an
> >>> inflight
> >>> > > request early if needed.
> >>> > >
> >>> > > 2. Well, since max.message.delivery.wait.ms specifies the max,
> >>> calling
> >>> > the
> >>> > > callback a bit early may be ok? Note that
> >>> max.message.delivery.wait.ms
> >>> > > only
> >>> > > comes into play in the rare error case. So, I am not sure if we
> need
> >>> to
> >>> > be
> >>> > > very precise. The issue with starting the clock on closing a batch
> is
> >>> > that
> >>> > > currently if the leader is not available, the closing of a batch
> can
> >>> be
> >>> > > delayed longer than linger.ms.
> >>> > >
> >>> > > 4. As you said, future.get(timeout) itself doesn't solve the
> problem
> >>> > since
> >>> > > you still need a way to expire the record in the sender. The amount
> >>> of
> >>> > work
> >>> > > to implement a cancellable future is probably the same?
> >>> > >
> >>> > > Overall, my concern with patch work is that we have iterated on the
> >>> > produce
> >>> > > request timeout multiple times and new issues keep coming back.
> >>> Ideally,
> >>> > > this time, we want to have a solution that covers all cases, even
> >>> though
> >>> > > that requires a bit more work.
> >>> > >
> >>> > > Thanks,
> >>> > >
> >>> > > Jun
> >>> > >
> >>> > >
> >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <suta...@gmail.com>
> >>> > wrote:
> >>> > >
> >>> > > > Hi Jun,
> >>> > > >
> >>> > > > Thanks for looking into it.
> >>> > > >
> >>> > > > Yes, we did consider this message-level timeout approach and
> >>> expiring
> >>> > > > batches selectively in a request but rejected it due to the
> >>> reasons of
> >>> > > > added complexity without a strong benefit to counter-weigh that.
> >>> Your
> >>> > > > proposal is a slight variation so I'll mention some issues here.
> >>> > > >
> >>> > > > 1. It sounds like max.message.delivery.wait.ms will overlap with
> >>> "time
> >>> > > > segments" of both linger.ms and retries * (request.timeout.ms +
> >>> > > > retry.backoff.ms). In that case, which config set takes
> >>> precedence? It
> >>> > > > would not make sense to configure configs from both sets.
> >>> Especially,
> >>> > we
> >>> > > > discussed exhaustively internally that retries and
> >>> > > > max.message.delivery.wait.ms can't / shouldn't be configured
> >>> together.
> >>> > > > Retires become moot as you already mention. I think that's going
> >>> to be
> >>> > > > surprising to anyone wanting to use max.message.delivery.wait.ms
> .
> >>> We
> >>> > > > probably need max.message.delivery.wait.ms > linger.ms or
> >>> something
> >>> > like
> >>> > > > that.
> >>> > > >
> >>> > > > 2. If clock starts when a batch is created and expire when
> >>> > > > max.message.delivery.wait.ms is over in the accumulator, the
> last
> >>> few
> >>> > > > messages in the expiring batch may not have lived long enough. As
> >>> the
> >>> > > > config seems to suggests per-message timeout, it's incorrect to
> >>> expire
> >>> > > > messages prematurely. On the other hand if clock starts after
> >>> batch is
> >>> > > > closed (which also implies that linger.ms is not covered by the
> >>> > > > max.message.delivery.wait.ms config), no message would be be
> >>> expired
> >>> > too
> >>> > > > soon. Yeah, expiration may be little bit too late but hey, this
> >>> ain't
> >>> > > > real-time service.
> >>> > > >
> >>> > > > 3. I agree that steps #3, #4, (and #5) are complex to implement.
> >>> On the
> >>> > > > other hand, batch.expiry.ms is next to trivial to implement. We
> >>> just
> >>> > > pass
> >>> > > > the config all the way down to ProducerBatch.maybeExpire and be
> >>> done
> >>> > with
> >>> > > > it.
> >>> > > >
> >>> > > > 4. Do you think the effect of max.message.delivery.wait.ms can
> be
> >>> > > > simulated
> >>> > > > with future.get(timeout) method? Copying excerpt from the kip-91:
> >>> An
> >>> > > > end-to-end timeout may be partially emulated using the
> >>> > > future.get(timeout).
> >>> > > > The timeout must be greater than (batch.expiry.ms + nRetries * (
> >>> > > > request.timeout.ms + retry.backoff.ms)). Note that when future
> >>> times
> >>> > > out,
> >>> > > > Sender may continue to send the records in the background. To
> avoid
> >>> > that,
> >>> > > > implementing a cancellable future is a possibility.
> >>> > > >
> >>> > > > For simplicity, we could just implement a trivial method in
> >>> producer
> >>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and return a number
> >>> based
> >>> > on
> >>> > > > this formula? Users of future.get can use this timeout value.
> >>> > > >
> >>> > > > Thoughts?
> >>> > > >
> >>> > > > Regards,
> >>> > > > Sumant
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > On 11 August 2017 at 07:50, Sumant Tambe <suta...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > >
> >>> > > > > Thanks for the KIP. Nice documentation on all current issues
> >>> with the
> >>> > > > >> timeout.
> >>> > > > >
> >>> > > > > For the KIP writeup, all credit goes to Joel Koshy.
> >>> > > > >
> >>> > > > > I'll follow up on your comments a little later.
> >>> > > > >
> >>> > > > >
> >>> > > > >>
> >>> > > > >> You also brought up a good use case for timing out a message.
> >>> For
> >>> > > > >> applications that collect and send sensor data to Kafka, if
> the
> >>> data
> >>> > > > can't
> >>> > > > >> be sent to Kafka for some reason, the application may prefer
> to
> >>> > buffer
> >>> > > > the
> >>> > > > >> more recent data in the accumulator. Without a timeout, the
> >>> > > accumulator
> >>> > > > >> will be filled with old records and new records can't be
> added.
> >>> > > > >>
> >>> > > > >> Your proposal makes sense for a developer who is familiar with
> >>> how
> >>> > the
> >>> > > > >> producer works. I am not sure if this is very intuitive to the
> >>> users
> >>> > > > since
> >>> > > > >> it may not be very easy for them to figure out how to
> configure
> >>> the
> >>> > > new
> >>> > > > >> knob to bound the amount of the time when a message is
> >>> completed.
> >>> > > > >>
> >>> > > > >> From users' perspective, Apurva's suggestion of
> >>> > > > >> max.message.delivery.wait.ms (which
> >>> > > > >> bounds the time when a message is in the accumulator to the
> time
> >>> > when
> >>> > > > the
> >>> > > > >> callback is called) seems more intuition. You listed this in
> the
> >>> > > > rejected
> >>> > > > >> section since it requires additional logic to rebatch when a
> >>> produce
> >>> > > > >> request expires. However, this may not be too bad. The
> >>> following are
> >>> > > the
> >>> > > > >> things that we have to do.
> >>> > > > >>
> >>> > > > >> 1. The clock starts when a batch is created.
> >>> > > > >> 2. If the batch can't be drained within
> >>> > max.message.delivery.wait.ms,
> >>> > > > all
> >>> > > > >> messages in the batch will fail and the callback will be
> called.
> >>> > > > >> 3. When sending a produce request, we calculate an expireTime
> >>> for
> >>> > the
> >>> > > > >> request that equals to the remaining expiration time for the
> >>> oldest
> >>> > > > batch
> >>> > > > >> in the request.
> >>> > > > >> 4. We set the minimum of the expireTime of all inflight
> >>> requests as
> >>> > > the
> >>> > > > >> timeout in the selector poll call (so that the selector can
> >>> wake up
> >>> > > > before
> >>> > > > >> the expiration time).
> >>> > > > >> 5. If the produce response can't be received within
> expireTime,
> >>> we
> >>> > > > expire
> >>> > > > >> all batches in the produce request whose expiration time has
> >>> been
> >>> > > > reached.
> >>> > > > >> For the rest of the batches, we resend them in a new produce
> >>> > request.
> >>> > > > >> 6. If the producer response has a retriable error, we just
> >>> backoff a
> >>> > > bit
> >>> > > > >> and then retry the produce request as today. The number of
> >>> retries
> >>> > > > doesn't
> >>> > > > >> really matter now. We just keep retrying until the expiration
> >>> time
> >>> > is
> >>> > > > >> reached. It's possible that a produce request is never retried
> >>> due
> >>> > to
> >>> > > > >> expiration. However, this seems the right thing to do since
> the
> >>> > users
> >>> > > > want
> >>> > > > >> to timeout the message at this time.
> >>> > > > >>
> >>> > > > >> Implementation wise, there will be a bit more complexity in
> >>> step 3
> >>> > and
> >>> > > > 4,
> >>> > > > >> but probably not too bad. The benefit is that this is more
> >>> intuitive
> >>> > > to
> >>> > > > >> the
> >>> > > > >> end user.
> >>> > > > >>
> >>> > > > >> Does that sound reasonable to you?
> >>> > > > >>
> >>> > > > >> Thanks,
> >>> > > > >>
> >>> > > > >> Jun
> >>> > > > >>
> >>> > > > >>
> >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <
> >>> suta...@gmail.com>
> >>> > > > wrote:
> >>> > > > >>
> >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <
> >>> apu...@confluent.io>
> >>> > > > >> wrote:
> >>> > > > >> >
> >>> > > > >> > > > > There seems to be no relationship with cluster
> metadata
> >>> > > > >> availability
> >>> > > > >> > or
> >>> > > > >> > > > > staleness. Expiry is just based on the time since the
> >>> batch
> >>> > > has
> >>> > > > >> been
> >>> > > > >> > > > ready.
> >>> > > > >> > > > > Please correct me if I am wrong.
> >>> > > > >> > > > >
> >>> > > > >> > > >
> >>> > > > >> > > > I was not very specific about where we do expiration. I
> >>> > glossed
> >>> > > > over
> >>> > > > >> > some
> >>> > > > >> > > > details because (again) we've other mechanisms to detect
> >>> non
> >>> > > > >> progress.
> >>> > > > >> > > The
> >>> > > > >> > > > condition (!muted.contains(tp) && (isMetadataStale ||
> >>> > > > >> > > > > cluster.leaderFor(tp) == null)) is used in
> >>> > > > >> > > > RecordAccumualtor.expiredBatches:
> >>> > > > >> > > > https://github.com/apache/kafka/blob/trunk/clients/src/
> >>> > > > >> > > > main/java/org/apache/kafka/clients/producer/internals/
> >>> > > > >> > > > RecordAccumulator.java#L443
> >>> > > > >> > > >
> >>> > > > >> > > >
> >>> > > > >> > > > Effectively, we expire in all the following cases
> >>> > > > >> > > > 1) producer is partitioned from the brokers. When
> >>> metadata age
> >>> > > > grows
> >>> > > > >> > > beyond
> >>> > > > >> > > > 3x it's max value. It's safe to say that we're not
> >>> talking to
> >>> > > the
> >>> > > > >> > brokers
> >>> > > > >> > > > at all. Report.
> >>> > > > >> > > > 2) fresh metadata && leader for a partition is not known
> >>> && a
> >>> > > > batch
> >>> > > > >> is
> >>> > > > >> > > > sitting there for longer than request.timeout.ms. This
> >>> is one
> >>> > > > case
> >>> > > > >> we
> >>> > > > >> > > > would
> >>> > > > >> > > > like to improve and use batch.expiry.ms because
> >>> > > > request.timeout.ms
> >>> > > > >> is
> >>> > > > >> > > too
> >>> > > > >> > > > small.
> >>> > > > >> > > > 3) fresh metadata && leader for a partition is known &&
> >>> batch
> >>> > is
> >>> > > > >> > sitting
> >>> > > > >> > > > there for longer than batch.expiry.ms. This is a new
> case
> >>> > that
> >>> > > is
> >>> > > > >> > > > different
> >>> > > > >> > > > from #2. This is the catch-up mode case. Things are
> >>> moving too
> >>> > > > >> slowly.
> >>> > > > >> > > > Pipeline SLAs are broken. Report and shutdown kmm.
> >>> > > > >> > > >
> >>> > > > >> > > > The second and the third cases are useful to a real-time
> >>> app
> >>> > > for a
> >>> > > > >> > > > completely different reason. Report, forget about the
> >>> batch,
> >>> > and
> >>> > > > >> just
> >>> > > > >> > > move
> >>> > > > >> > > > on (without shutting down).
> >>> > > > >> > > >
> >>> > > > >> > > >
> >>> > > > >> > > If I understand correctly, you are talking about a fork of
> >>> > apache
> >>> > > > >> kafka
> >>> > > > >> > > which has these additional conditions? Because that check
> >>> > doesn't
> >>> > > > >> exist
> >>> > > > >> > on
> >>> > > > >> > > trunk today.
> >>> > > > >> >
> >>> > > > >> > Right. It is our internal release in LinkedIn.
> >>> > > > >> >
> >>> > > > >> > Or are you proposing to change the behavior of expiry to
> >>> > > > >> > > account for stale metadata and partitioned producers as
> >>> part of
> >>> > > this
> >>> > > > >> KIP?
> >>> > > > >> >
> >>> > > > >> >
> >>> > > > >> > No. It's our temporary solution in the absence of kip-91.
> Note
> >>> > that
> >>> > > we
> >>> > > > >> dont
> >>> > > > >> > like increasing request.timeout.ms. Without our extra
> >>> conditions
> >>> > > our
> >>> > > > >> > batches expire too soon--a problem in kmm catchup mode.
> >>> > > > >> >
> >>> > > > >> > If we get batch.expiry.ms, we will configure it to 20 mins.
> >>> > > > maybeExpire
> >>> > > > >> > will use the config instead of r.t.ms. The extra conditions
> >>> will
> >>> > be
> >>> > > > >> > unnecessary. All three cases shall be covered via the
> >>> batch.expiry
> >>> > > > >> timeout.
> >>> > > > >> >
> >>> > > > >> > >
> >>> > > > >> > >
> >>> > > > >> >
> >>> > > > >>
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Reply via email to