Hi, Sumant,

1. Yes, it's probably reasonable to require max.message.delivery.wait.ms >
linger.ms. As for retries, perhaps we can set the default retries to
infinite or just ignore it. Then the latency will be bounded by
max.message.delivery.wait.ms. request.timeout.ms is the max time the
request will be spending on the server. The client can expire an inflight
request early if needed.

2. Well, since max.message.delivery.wait.ms specifies the max, calling the
callback a bit early may be ok? Note that max.message.delivery.wait.ms only
comes into play in the rare error case. So, I am not sure if we need to be
very precise. The issue with starting the clock on closing a batch is that
currently if the leader is not available, the closing of a batch can be
delayed longer than linger.ms.

4. As you said, future.get(timeout) itself doesn't solve the problem since
you still need a way to expire the record in the sender. The amount of work
to implement a cancellable future is probably the same?

Overall, my concern with patch work is that we have iterated on the produce
request timeout multiple times and new issues keep coming back. Ideally,
this time, we want to have a solution that covers all cases, even though
that requires a bit more work.

Thanks,

Jun


On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <suta...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for looking into it.
>
> Yes, we did consider this message-level timeout approach and expiring
> batches selectively in a request but rejected it due to the reasons of
> added complexity without a strong benefit to counter-weigh that. Your
> proposal is a slight variation so I'll mention some issues here.
>
> 1. It sounds like max.message.delivery.wait.ms will overlap with "time
> segments" of both linger.ms and retries * (request.timeout.ms +
> retry.backoff.ms). In that case, which config set takes precedence? It
> would not make sense to configure configs from both sets. Especially, we
> discussed exhaustively internally that retries and
> max.message.delivery.wait.ms can't / shouldn't be configured together.
> Retires become moot as you already mention. I think that's going to be
> surprising to anyone wanting to use max.message.delivery.wait.ms. We
> probably need max.message.delivery.wait.ms > linger.ms or something like
> that.
>
> 2. If clock starts when a batch is created and expire when
> max.message.delivery.wait.ms is over in the accumulator, the last few
> messages in the expiring batch may not have lived long enough. As the
> config seems to suggests per-message timeout, it's incorrect to expire
> messages prematurely. On the other hand if clock starts after batch is
> closed (which also implies that linger.ms is not covered by the
> max.message.delivery.wait.ms config), no message would be be expired too
> soon. Yeah, expiration may be little bit too late but hey, this ain't
> real-time service.
>
> 3. I agree that steps #3, #4, (and #5) are complex to implement. On the
> other hand, batch.expiry.ms is next to trivial to implement. We just pass
> the config all the way down to ProducerBatch.maybeExpire and be done with
> it.
>
> 4. Do you think the effect of max.message.delivery.wait.ms can be
> simulated
> with future.get(timeout) method? Copying excerpt from the kip-91: An
> end-to-end timeout may be partially emulated using the future.get(timeout).
> The timeout must be greater than (batch.expiry.ms + nRetries * (
> request.timeout.ms + retry.backoff.ms)). Note that when future times out,
> Sender may continue to send the records in the background. To avoid that,
> implementing a cancellable future is a possibility.
>
> For simplicity, we could just implement a trivial method in producer
> ProducerConfigs.maxMessageDeliveryWaitMs() and return a number based on
> this formula? Users of future.get can use this timeout value.
>
> Thoughts?
>
> Regards,
> Sumant
>
>
>
> On 11 August 2017 at 07:50, Sumant Tambe <suta...@gmail.com> wrote:
>
> >
> > Thanks for the KIP. Nice documentation on all current issues with the
> >> timeout.
> >
> > For the KIP writeup, all credit goes to Joel Koshy.
> >
> > I'll follow up on your comments a little later.
> >
> >
> >>
> >> You also brought up a good use case for timing out a message. For
> >> applications that collect and send sensor data to Kafka, if the data
> can't
> >> be sent to Kafka for some reason, the application may prefer to buffer
> the
> >> more recent data in the accumulator. Without a timeout, the accumulator
> >> will be filled with old records and new records can't be added.
> >>
> >> Your proposal makes sense for a developer who is familiar with how the
> >> producer works. I am not sure if this is very intuitive to the users
> since
> >> it may not be very easy for them to figure out how to configure the new
> >> knob to bound the amount of the time when a message is completed.
> >>
> >> From users' perspective, Apurva's suggestion of
> >> max.message.delivery.wait.ms (which
> >> bounds the time when a message is in the accumulator to the time when
> the
> >> callback is called) seems more intuition. You listed this in the
> rejected
> >> section since it requires additional logic to rebatch when a produce
> >> request expires. However, this may not be too bad. The following are the
> >> things that we have to do.
> >>
> >> 1. The clock starts when a batch is created.
> >> 2. If the batch can't be drained within max.message.delivery.wait.ms,
> all
> >> messages in the batch will fail and the callback will be called.
> >> 3. When sending a produce request, we calculate an expireTime for the
> >> request that equals to the remaining expiration time for the oldest
> batch
> >> in the request.
> >> 4. We set the minimum of the expireTime of all inflight requests as the
> >> timeout in the selector poll call (so that the selector can wake up
> before
> >> the expiration time).
> >> 5. If the produce response can't be received within expireTime, we
> expire
> >> all batches in the produce request whose expiration time has been
> reached.
> >> For the rest of the batches, we resend them in a new produce request.
> >> 6. If the producer response has a retriable error, we just backoff a bit
> >> and then retry the produce request as today. The number of retries
> doesn't
> >> really matter now. We just keep retrying until the expiration time is
> >> reached. It's possible that a produce request is never retried due to
> >> expiration. However, this seems the right thing to do since the users
> want
> >> to timeout the message at this time.
> >>
> >> Implementation wise, there will be a bit more complexity in step 3 and
> 4,
> >> but probably not too bad. The benefit is that this is more intuitive to
> >> the
> >> end user.
> >>
> >> Does that sound reasonable to you?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com>
> wrote:
> >>
> >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <apu...@confluent.io>
> >> wrote:
> >> >
> >> > > > > There seems to be no relationship with cluster metadata
> >> availability
> >> > or
> >> > > > > staleness. Expiry is just based on the time since the batch has
> >> been
> >> > > > ready.
> >> > > > > Please correct me if I am wrong.
> >> > > > >
> >> > > >
> >> > > > I was not very specific about where we do expiration. I glossed
> over
> >> > some
> >> > > > details because (again) we've other mechanisms to detect non
> >> progress.
> >> > > The
> >> > > > condition (!muted.contains(tp) && (isMetadataStale ||
> >> > > > > cluster.leaderFor(tp) == null)) is used in
> >> > > > RecordAccumualtor.expiredBatches:
> >> > > > https://github.com/apache/kafka/blob/trunk/clients/src/
> >> > > > main/java/org/apache/kafka/clients/producer/internals/
> >> > > > RecordAccumulator.java#L443
> >> > > >
> >> > > >
> >> > > > Effectively, we expire in all the following cases
> >> > > > 1) producer is partitioned from the brokers. When metadata age
> grows
> >> > > beyond
> >> > > > 3x it's max value. It's safe to say that we're not talking to the
> >> > brokers
> >> > > > at all. Report.
> >> > > > 2) fresh metadata && leader for a partition is not known && a
> batch
> >> is
> >> > > > sitting there for longer than request.timeout.ms. This is one
> case
> >> we
> >> > > > would
> >> > > > like to improve and use batch.expiry.ms because
> request.timeout.ms
> >> is
> >> > > too
> >> > > > small.
> >> > > > 3) fresh metadata && leader for a partition is known && batch is
> >> > sitting
> >> > > > there for longer than batch.expiry.ms. This is a new case that is
> >> > > > different
> >> > > > from #2. This is the catch-up mode case. Things are moving too
> >> slowly.
> >> > > > Pipeline SLAs are broken. Report and shutdown kmm.
> >> > > >
> >> > > > The second and the third cases are useful to a real-time app for a
> >> > > > completely different reason. Report, forget about the batch, and
> >> just
> >> > > move
> >> > > > on (without shutting down).
> >> > > >
> >> > > >
> >> > > If I understand correctly, you are talking about a fork of apache
> >> kafka
> >> > > which has these additional conditions? Because that check doesn't
> >> exist
> >> > on
> >> > > trunk today.
> >> >
> >> > Right. It is our internal release in LinkedIn.
> >> >
> >> > Or are you proposing to change the behavior of expiry to
> >> > > account for stale metadata and partitioned producers as part of this
> >> KIP?
> >> >
> >> >
> >> > No. It's our temporary solution in the absence of kip-91. Note that we
> >> dont
> >> > like increasing request.timeout.ms. Without our extra conditions our
> >> > batches expire too soon--a problem in kmm catchup mode.
> >> >
> >> > If we get batch.expiry.ms, we will configure it to 20 mins.
> maybeExpire
> >> > will use the config instead of r.t.ms. The extra conditions will be
> >> > unnecessary. All three cases shall be covered via the batch.expiry
> >> timeout.
> >> >
> >> > >
> >> > >
> >> >
> >>
> >
>

Reply via email to