Hi all,

The discussion has been going on for a while, would it help to have a call
to discuss this? I'd like to start a vote soonish so that we can include
this in 1.0.0. I personally prefer max.message.delivery.wait.ms. It seems
like Jun, Apurva and Jason also prefer that. Sumant, it seems like you
still prefer a batch.expiry.ms, is that right? What are your thoughts Joel
and Becket?

Ismael

On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Sumant,
>
> The semantics of linger.ms is a bit subtle. The reasoning for the current
> implementation is the following. Let's say one sets linger.ms to 0 (our
> current default value). Creating a batch for every message will be bad for
> throughput. Instead, the current implementation only forms a batch when the
> batch is sendable (i.e., broker is available, inflight request limit is not
> exceeded, etc). That way, the producer has more chance for batching. The
> implication is that a batch could be closed longer than linger.ms.
>
> Now, on your concern about not having a precise way to control delay in the
> accumulator. It seems the batch.expiry.ms approach will have the same
> issue. If you start the clock when a batch is initialized, you may expire
> some messages in the same batch early than batch.expiry.ms. If you start
> the clock when the batch is closed, the expiration time could be unbounded
> because of the linger.ms implementation described above. Starting the
> expiration clock on batch initialization will at least guarantee the time
> to expire the first message is precise, which is probably good enough.
>
> Thanks,
>
> Jun
>
>
>
> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > Question about "the closing of a batch can be delayed longer than
> > linger.ms":
> > Is it possible to cause an indefinite delay? At some point bytes limit
> > might kick in. Also, why is closing of a batch coupled with availability
> of
> > its destination? In this approach a batch chosen for eviction due to
> delay
> > needs to "close" anyway, right (without regards to destination
> > availability)?
> >
> > I'm not too worried about notifying at super-exact time specified in the
> > configs. But expiring before the full wait-span has elapsed sounds a
> little
> > weird. So expiration time has a +/- spread. It works more like a hint
> than
> > max. So why not message.delivery.wait.hint.ms?
> >
> > Yeah, cancellable future will be similar in complexity.
> >
> > I'm unsure if max.message.delivery.wait.ms will the final nail for
> > producer
> > timeouts. We still won't have a precise way to control delay in just the
> > accumulator segment. batch.expiry.ms does not try to abstract. It's very
> > specific.
> >
> > My biggest concern at the moment is implementation complexity.
> >
> > At this state, I would like to encourage other independent opinions.
> >
> > Regards,
> > Sumant
> >
> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Sumant,
> > >
> > > 1. Yes, it's probably reasonable to require
> max.message.delivery.wait.ms
> > >
> > > linger.ms. As for retries, perhaps we can set the default retries to
> > > infinite or just ignore it. Then the latency will be bounded by
> > > max.message.delivery.wait.ms. request.timeout.ms is the max time the
> > > request will be spending on the server. The client can expire an
> inflight
> > > request early if needed.
> > >
> > > 2. Well, since max.message.delivery.wait.ms specifies the max, calling
> > the
> > > callback a bit early may be ok? Note that max.message.delivery.wait.ms
> > > only
> > > comes into play in the rare error case. So, I am not sure if we need to
> > be
> > > very precise. The issue with starting the clock on closing a batch is
> > that
> > > currently if the leader is not available, the closing of a batch can be
> > > delayed longer than linger.ms.
> > >
> > > 4. As you said, future.get(timeout) itself doesn't solve the problem
> > since
> > > you still need a way to expire the record in the sender. The amount of
> > work
> > > to implement a cancellable future is probably the same?
> > >
> > > Overall, my concern with patch work is that we have iterated on the
> > produce
> > > request timeout multiple times and new issues keep coming back.
> Ideally,
> > > this time, we want to have a solution that covers all cases, even
> though
> > > that requires a bit more work.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <suta...@gmail.com>
> > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for looking into it.
> > > >
> > > > Yes, we did consider this message-level timeout approach and expiring
> > > > batches selectively in a request but rejected it due to the reasons
> of
> > > > added complexity without a strong benefit to counter-weigh that. Your
> > > > proposal is a slight variation so I'll mention some issues here.
> > > >
> > > > 1. It sounds like max.message.delivery.wait.ms will overlap with
> "time
> > > > segments" of both linger.ms and retries * (request.timeout.ms +
> > > > retry.backoff.ms). In that case, which config set takes precedence?
> It
> > > > would not make sense to configure configs from both sets. Especially,
> > we
> > > > discussed exhaustively internally that retries and
> > > > max.message.delivery.wait.ms can't / shouldn't be configured
> together.
> > > > Retires become moot as you already mention. I think that's going to
> be
> > > > surprising to anyone wanting to use max.message.delivery.wait.ms. We
> > > > probably need max.message.delivery.wait.ms > linger.ms or something
> > like
> > > > that.
> > > >
> > > > 2. If clock starts when a batch is created and expire when
> > > > max.message.delivery.wait.ms is over in the accumulator, the last
> few
> > > > messages in the expiring batch may not have lived long enough. As the
> > > > config seems to suggests per-message timeout, it's incorrect to
> expire
> > > > messages prematurely. On the other hand if clock starts after batch
> is
> > > > closed (which also implies that linger.ms is not covered by the
> > > > max.message.delivery.wait.ms config), no message would be be expired
> > too
> > > > soon. Yeah, expiration may be little bit too late but hey, this ain't
> > > > real-time service.
> > > >
> > > > 3. I agree that steps #3, #4, (and #5) are complex to implement. On
> the
> > > > other hand, batch.expiry.ms is next to trivial to implement. We just
> > > pass
> > > > the config all the way down to ProducerBatch.maybeExpire and be done
> > with
> > > > it.
> > > >
> > > > 4. Do you think the effect of max.message.delivery.wait.ms can be
> > > > simulated
> > > > with future.get(timeout) method? Copying excerpt from the kip-91: An
> > > > end-to-end timeout may be partially emulated using the
> > > future.get(timeout).
> > > > The timeout must be greater than (batch.expiry.ms + nRetries * (
> > > > request.timeout.ms + retry.backoff.ms)). Note that when future times
> > > out,
> > > > Sender may continue to send the records in the background. To avoid
> > that,
> > > > implementing a cancellable future is a possibility.
> > > >
> > > > For simplicity, we could just implement a trivial method in producer
> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and return a number based
> > on
> > > > this formula? Users of future.get can use this timeout value.
> > > >
> > > > Thoughts?
> > > >
> > > > Regards,
> > > > Sumant
> > > >
> > > >
> > > >
> > > > On 11 August 2017 at 07:50, Sumant Tambe <suta...@gmail.com> wrote:
> > > >
> > > > >
> > > > > Thanks for the KIP. Nice documentation on all current issues with
> the
> > > > >> timeout.
> > > > >
> > > > > For the KIP writeup, all credit goes to Joel Koshy.
> > > > >
> > > > > I'll follow up on your comments a little later.
> > > > >
> > > > >
> > > > >>
> > > > >> You also brought up a good use case for timing out a message. For
> > > > >> applications that collect and send sensor data to Kafka, if the
> data
> > > > can't
> > > > >> be sent to Kafka for some reason, the application may prefer to
> > buffer
> > > > the
> > > > >> more recent data in the accumulator. Without a timeout, the
> > > accumulator
> > > > >> will be filled with old records and new records can't be added.
> > > > >>
> > > > >> Your proposal makes sense for a developer who is familiar with how
> > the
> > > > >> producer works. I am not sure if this is very intuitive to the
> users
> > > > since
> > > > >> it may not be very easy for them to figure out how to configure
> the
> > > new
> > > > >> knob to bound the amount of the time when a message is completed.
> > > > >>
> > > > >> From users' perspective, Apurva's suggestion of
> > > > >> max.message.delivery.wait.ms (which
> > > > >> bounds the time when a message is in the accumulator to the time
> > when
> > > > the
> > > > >> callback is called) seems more intuition. You listed this in the
> > > > rejected
> > > > >> section since it requires additional logic to rebatch when a
> produce
> > > > >> request expires. However, this may not be too bad. The following
> are
> > > the
> > > > >> things that we have to do.
> > > > >>
> > > > >> 1. The clock starts when a batch is created.
> > > > >> 2. If the batch can't be drained within
> > max.message.delivery.wait.ms,
> > > > all
> > > > >> messages in the batch will fail and the callback will be called.
> > > > >> 3. When sending a produce request, we calculate an expireTime for
> > the
> > > > >> request that equals to the remaining expiration time for the
> oldest
> > > > batch
> > > > >> in the request.
> > > > >> 4. We set the minimum of the expireTime of all inflight requests
> as
> > > the
> > > > >> timeout in the selector poll call (so that the selector can wake
> up
> > > > before
> > > > >> the expiration time).
> > > > >> 5. If the produce response can't be received within expireTime, we
> > > > expire
> > > > >> all batches in the produce request whose expiration time has been
> > > > reached.
> > > > >> For the rest of the batches, we resend them in a new produce
> > request.
> > > > >> 6. If the producer response has a retriable error, we just
> backoff a
> > > bit
> > > > >> and then retry the produce request as today. The number of retries
> > > > doesn't
> > > > >> really matter now. We just keep retrying until the expiration time
> > is
> > > > >> reached. It's possible that a produce request is never retried due
> > to
> > > > >> expiration. However, this seems the right thing to do since the
> > users
> > > > want
> > > > >> to timeout the message at this time.
> > > > >>
> > > > >> Implementation wise, there will be a bit more complexity in step 3
> > and
> > > > 4,
> > > > >> but probably not too bad. The benefit is that this is more
> intuitive
> > > to
> > > > >> the
> > > > >> end user.
> > > > >>
> > > > >> Does that sound reasonable to you?
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >>
> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com>
> > > > wrote:
> > > > >>
> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <
> apu...@confluent.io>
> > > > >> wrote:
> > > > >> >
> > > > >> > > > > There seems to be no relationship with cluster metadata
> > > > >> availability
> > > > >> > or
> > > > >> > > > > staleness. Expiry is just based on the time since the
> batch
> > > has
> > > > >> been
> > > > >> > > > ready.
> > > > >> > > > > Please correct me if I am wrong.
> > > > >> > > > >
> > > > >> > > >
> > > > >> > > > I was not very specific about where we do expiration. I
> > glossed
> > > > over
> > > > >> > some
> > > > >> > > > details because (again) we've other mechanisms to detect non
> > > > >> progress.
> > > > >> > > The
> > > > >> > > > condition (!muted.contains(tp) && (isMetadataStale ||
> > > > >> > > > > cluster.leaderFor(tp) == null)) is used in
> > > > >> > > > RecordAccumualtor.expiredBatches:
> > > > >> > > > https://github.com/apache/kafka/blob/trunk/clients/src/
> > > > >> > > > main/java/org/apache/kafka/clients/producer/internals/
> > > > >> > > > RecordAccumulator.java#L443
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > Effectively, we expire in all the following cases
> > > > >> > > > 1) producer is partitioned from the brokers. When metadata
> age
> > > > grows
> > > > >> > > beyond
> > > > >> > > > 3x it's max value. It's safe to say that we're not talking
> to
> > > the
> > > > >> > brokers
> > > > >> > > > at all. Report.
> > > > >> > > > 2) fresh metadata && leader for a partition is not known &&
> a
> > > > batch
> > > > >> is
> > > > >> > > > sitting there for longer than request.timeout.ms. This is
> one
> > > > case
> > > > >> we
> > > > >> > > > would
> > > > >> > > > like to improve and use batch.expiry.ms because
> > > > request.timeout.ms
> > > > >> is
> > > > >> > > too
> > > > >> > > > small.
> > > > >> > > > 3) fresh metadata && leader for a partition is known &&
> batch
> > is
> > > > >> > sitting
> > > > >> > > > there for longer than batch.expiry.ms. This is a new case
> > that
> > > is
> > > > >> > > > different
> > > > >> > > > from #2. This is the catch-up mode case. Things are moving
> too
> > > > >> slowly.
> > > > >> > > > Pipeline SLAs are broken. Report and shutdown kmm.
> > > > >> > > >
> > > > >> > > > The second and the third cases are useful to a real-time app
> > > for a
> > > > >> > > > completely different reason. Report, forget about the batch,
> > and
> > > > >> just
> > > > >> > > move
> > > > >> > > > on (without shutting down).
> > > > >> > > >
> > > > >> > > >
> > > > >> > > If I understand correctly, you are talking about a fork of
> > apache
> > > > >> kafka
> > > > >> > > which has these additional conditions? Because that check
> > doesn't
> > > > >> exist
> > > > >> > on
> > > > >> > > trunk today.
> > > > >> >
> > > > >> > Right. It is our internal release in LinkedIn.
> > > > >> >
> > > > >> > Or are you proposing to change the behavior of expiry to
> > > > >> > > account for stale metadata and partitioned producers as part
> of
> > > this
> > > > >> KIP?
> > > > >> >
> > > > >> >
> > > > >> > No. It's our temporary solution in the absence of kip-91. Note
> > that
> > > we
> > > > >> dont
> > > > >> > like increasing request.timeout.ms. Without our extra
> conditions
> > > our
> > > > >> > batches expire too soon--a problem in kmm catchup mode.
> > > > >> >
> > > > >> > If we get batch.expiry.ms, we will configure it to 20 mins.
> > > > maybeExpire
> > > > >> > will use the config instead of r.t.ms. The extra conditions
> will
> > be
> > > > >> > unnecessary. All three cases shall be covered via the
> batch.expiry
> > > > >> timeout.
> > > > >> >
> > > > >> > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to