Hi, Jiangjie,

If we want to enforce delivery.timeout.ms, we need to take the min right?
Also, if a user sets a large delivery.timeout.ms, we probably don't want to
wait for an inflight request longer than request.timeout.ms.

Thanks,

Jun

On Fri, Aug 25, 2017 at 5:19 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Jason,
>
> I see what you mean. That makes sense. So in the above case after the
> producer resets PID, when it retry batch_0_tp1, the batch will still have
> the old PID even if the producer has already got a new PID.
>
> @Jun, do you mean max(remaining delivery.timeout.ms, request.timeout.ms)
> instead of min(remaining delivery.timeout.ms, request.timeout.ms)?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Aug 25, 2017 at 9:34 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Becket,
> >
> > Good point on expiring inflight requests. Perhaps we can expire an
> inflight
> > request after min(remaining delivery.timeout.ms, request.timeout.ms).
> This
> > way, if a user sets a high delivery.timeout.ms, we can still recover
> from
> > broker power outage sooner.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Aug 24, 2017 at 12:52 PM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> > > Hi Jason,
> > >
> > > delivery.timeout.ms sounds good to me.
> > >
> > > I was referring to the case that we are resetting the PID/sequence
> after
> > > expire a batch. This is more about the sending the batches after the
> > > expired batch.
> > >
> > > The scenario being discussed is expiring one of the batches in a
> > in-flight
> > > request and retry the other batches in the that in-flight request. So
> > > consider the following case:
> > > 1. Producer sends request_0 with two batches (batch_0_tp0 and
> > batch_0_tp1).
> > > 2. Broker receives the request enqueued the request to the log.
> > > 3. Before the producer receives the response from the broker,
> batch_0_tp0
> > > expires. The producer will expire batch_0_tp0 immediately, resets PID,
> > and
> > > then resend batch_0_tp1, and maybe send batch_1_tp0 (i.e. the next
> batch
> > to
> > > the expired batch) as well.
> > >
> > > For batch_0_tp1, it is OK to reuse PID and and sequence number. The
> > problem
> > > is for batch_1_tp0, If we reuse the same PID and the broker has already
> > > appended batch_0_tp0, the broker will think batch_1_tp0 is a duplicate
> > with
> > > the same sequence number. As a result broker will drop batch_0_tp1.
> That
> > is
> > > why we have to either bump up sequence number or reset PID. To avoid
> this
> > > complexity, I was suggesting not expire the in-flight batch
> immediately,
> > > but wait for the produce response. If the batch has been successfully
> > > appended, we do not expire it. Otherwise, we expire it.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Thu, Aug 24, 2017 at 11:26 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > @Becket
> > > >
> > > > Good point about unnecessarily resetting the PID in cases where we
> know
> > > the
> > > > request has failed. Might be worth opening a JIRA to try and improve
> > > this.
> > > >
> > > > So if we expire the batch prematurely and resend all
> > > > > the other batches in the same request, chances are there will be
> > > > > duplicates. If we wait for the response instead, it is less likely
> to
> > > > > introduce duplicates, and we may not need to reset the PID.
> > > >
> > > >
> > > > Not sure I follow this. Are you assuming that we change the batch
> > > > PID/sequence of the retried batches after resetting the PID? I think
> we
> > > > probably need to ensure that when we retry a batch, we always use the
> > > same
> > > > PID/sequence.
> > > >
> > > > By the way, as far as naming, `max.message.delivery.wait.ms` is
> quite
> > a
> > > > mouthful. Could we shorten it? Perhaps `delivery.timeout.ms`?
> > > >
> > > > -Jason
> > > >
> > > > On Wed, Aug 23, 2017 at 8:51 PM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > If TCP timeout is longer than request.timeout.ms, the producer
> will
> > > > always
> > > > > hit request.timeout.ms before hitting TCP timeout, right? That is
> > why
> > > we
> > > > > added request.timeout.ms in the first place.
> > > > >
> > > > > You are right. Currently we are reset the PID and resend the
> batches
> > to
> > > > > avoid OutOfOrderSequenceException when the expired batches are in
> > > retry.
> > > > >
> > > > > This does not distinguish the reasons that caused the retry. There
> > are
> > > > two
> > > > > cases:
> > > > > 1. If the batch was in retry because it received an error response
> > > (e.g.
> > > > > NotLeaderForPartition), we actually don't need to reset PID in this
> > > case
> > > > > because we know that broker did not accept it.
> > > > > 2. If the batch was in retry because it hit a timeout earlier, then
> > we
> > > > > should reset the PID (or optimistically send and only reset PID
> when
> > > > > receive OutOfOrderSequenceException?)
> > > > > Case 1 is probably the most common case, so it looks that we are
> > > > resetting
> > > > > the PID more often than necessary. But because in case 1 the broker
> > > does
> > > > > not have the batch, there isn't much impact on resting PID and
> resend
> > > > other
> > > > > than the additional round trip.
> > > > >
> > > > > Now we are introducing another case:
> > > > > 3. A batch is in retry because we expired an in-flight request
> before
> > > it
> > > > > hits request.timeout.ms.
> > > > >
> > > > > The difference between 2 and 3 is that in case 3 likely the broker
> > has
> > > > > appended the messages. So if we expire the batch prematurely and
> > resend
> > > > all
> > > > > the other batches in the same request, chances are there will be
> > > > > duplicates. If we wait for the response instead, it is less likely
> to
> > > > > introduce duplicates, and we may not need to reset the PID.
> > > > >
> > > > > That said, given that batch expiration is probably already rare
> > enough,
> > > > so
> > > > > it may not be necessary to optimize for that.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 23, 2017 at 5:01 PM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > > Hi, Becket,
> > > > > >
> > > > > > If a message expires while it's in an inflight produce request,
> the
> > > > > > producer will get a new PID if idempotent is enabled. This will
> > > prevent
> > > > > > subsequent messages from hitting OutOfOrderSequenceException. The
> > > issue
> > > > > of
> > > > > > not expiring an inflight request is that if a broker server goes
> > down
> > > > > hard
> > > > > > (e.g. power outage), the time that it takes for the client to
> > detect
> > > > the
> > > > > > socket level error (this will be sth like 8+ minutes with the
> > default
> > > > TCP
> > > > > > setting) is much longer than the default request.timeout.ms.
> > > > > >
> > > > > > Hi, Sumant,
> > > > > >
> > > > > > We can probably just default max.message.delivery.wait.ms to 30
> > > secs,
> > > > > the
> > > > > > current default for request.timeout.ms.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 23, 2017 at 3:38 PM, Sumant Tambe <suta...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > OK. Looks like starting the clock after closing the batch has
> > > quite a
> > > > > few
> > > > > > > pitfalls. I can't think of a way of to work around it without
> > > adding
> > > > > yet
> > > > > > > another config. So I won't discuss that here. Anyone? As I said
> > > > > earlier,
> > > > > > > I'm not hung up on super-accurate notification times.
> > > > > > >
> > > > > > > If we are going down the max.message.delievery.wait.ms route,
> > what
> > > > > would
> > > > > > > be
> > > > > > > the default? There seem to be a few options.
> > > > > > >
> > > > > > > 1. max.message.delievery.wait.ms=null. Nothing changes for
> those
> > > who
> > > > > > don't
> > > > > > > set it. I.e., batches expire after request.timeout.ms in
> > > > accumulator.
> > > > > If
> > > > > > > they are past the accumulator stage, timeout after retries*(
> > > > > > > request.timeout.ms+backoff).
> > > > > > >
> > > > > > > 2. max.message.delivery.wait.ms=request.timeout.ms. No
> obervable
> > > > > > > behavioral
> > > > > > > change at the accumulator level as timeout value is same as
> > before.
> > > > > > Retries
> > > > > > > will be done if as long as batch is under
> > > > max.message.delivery.wait.ms
> > > > > .
> > > > > > > However, a batch can expire just after one try. That's ok IMO
> > > because
> > > > > > > request.timeout.ms tend to be large (Default 30000).
> > > > > > >
> > > > > > > 3. max.message.delivery.wait.ms=2*request.timeout.ms. Give
> > > > opportunity
> > > > > > for
> > > > > > > two retries but warn that retries may not happen at all in some
> > > rare
> > > > > > > cases and a batch could expire before any attempt.
> > > > > > >
> > > > > > > 4. max.message.delivery.wait.ms=something else (a constant?)
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > On 23 August 2017 at 09:01, Ismael Juma <ism...@juma.me.uk>
> > wrote:
> > > > > > >
> > > > > > > > Thanks Becket, that seems reasonable. Sumant, would you be
> > > willing
> > > > to
> > > > > > > > update the KIP based on the discussion or are you still not
> > > > > convinced?
> > > > > > > >
> > > > > > > > Ismael
> > > > > > > >
> > > > > > > > On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin <
> > > becket....@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > In general max.message.delivery.wait.ms is a cleaner
> > approach.
> > > > > That
> > > > > > > > would
> > > > > > > > > make the guarantee clearer. That said, there seem
> subtleties
> > in
> > > > > some
> > > > > > > > > scenarios:
> > > > > > > > >
> > > > > > > > > 1. I agree with Sumante that it is a little weird that a
> > > message
> > > > > > could
> > > > > > > be
> > > > > > > > > expired immediately if it happens to enter a batch that is
> > > about
> > > > to
> > > > > > be
> > > > > > > > > expired. But as Jun said, as long as we have multiple
> > messages
> > > > in a
> > > > > > > > batch,
> > > > > > > > > there isn't a cheap way to achieve a precise timeout. So
> the
> > > > > question
> > > > > > > > > actually becomes whether it is more user-friendly to expire
> > > early
> > > > > > > (based
> > > > > > > > on
> > > > > > > > > the batch creation time) or expire late (based on the batch
> > > close
> > > > > > > time).
> > > > > > > > I
> > > > > > > > > think both are acceptable. Personally I think most users do
> > not
> > > > > > really
> > > > > > > > care
> > > > > > > > > about expire a little late as long as it eventually
> expires.
> > > So I
> > > > > > would
> > > > > > > > use
> > > > > > > > > batch close time as long as there is a bound on that. But
> it
> > > > looks
> > > > > > that
> > > > > > > > we
> > > > > > > > > do not really have a bound on when we will close a batch.
> So
> > > > > > expiration
> > > > > > > > > based on batch create time may be the only option if we
> don't
> > > > want
> > > > > to
> > > > > > > > > introduce complexity.
> > > > > > > > >
> > > > > > > > > 2. If we timeout a batch in a request when it is still in
> > > flight,
> > > > > the
> > > > > > > end
> > > > > > > > > result of that batch is unclear to the users. It would be
> > weird
> > > > > that
> > > > > > > user
> > > > > > > > > receive exception saying those messages are expired while
> > they
> > > > > > actually
> > > > > > > > > have been sent successfully. Also if idempotence is set to
> > > true,
> > > > > what
> > > > > > > > would
> > > > > > > > > the next sequence ID be after the expired batch? Reusing
> the
> > > same
> > > > > > > > sequence
> > > > > > > > > Id may result in data loss, and increment the sequence ID
> may
> > > > cause
> > > > > > > > > OutOfOrderSequenceException. Besides, extracting an expired
> > > batch
> > > > > > from
> > > > > > > a
> > > > > > > > > request also introduces some complexity. Again, personally
> I
> > > > think
> > > > > it
> > > > > > > is
> > > > > > > > > fine to expire a little bit late. So maybe we don't need to
> > > > expire
> > > > > a
> > > > > > > > batch
> > > > > > > > > that is already in flight. In the worst case we will expire
> > it
> > > > with
> > > > > > > delay
> > > > > > > > > of request.timeout.ms.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma <
> > > ism...@juma.me.uk>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> Hi all,
> > > > > > > > >>
> > > > > > > > >> The discussion has been going on for a while, would it
> help
> > to
> > > > > have
> > > > > > a
> > > > > > > > >> call to discuss this? I'd like to start a vote soonish so
> > that
> > > > we
> > > > > > can
> > > > > > > > >> include this in 1.0.0. I personally prefer
> > > > > > > max.message.delivery.wait.ms
> > > > > > > > .
> > > > > > > > >> It seems like Jun, Apurva and Jason also prefer that.
> > Sumant,
> > > it
> > > > > > seems
> > > > > > > > like
> > > > > > > > >> you still prefer a batch.expiry.ms, is that right? What
> are
> > > > your
> > > > > > > > >> thoughts Joel and Becket?
> > > > > > > > >>
> > > > > > > > >> Ismael
> > > > > > > > >>
> > > > > > > > >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <
> j...@confluent.io>
> > > > > wrote:
> > > > > > > > >>
> > > > > > > > >>> Hi, Sumant,
> > > > > > > > >>>
> > > > > > > > >>> The semantics of linger.ms is a bit subtle. The
> reasoning
> > > for
> > > > > the
> > > > > > > > >>> current
> > > > > > > > >>> implementation is the following. Let's say one sets
> > > linger.ms
> > > > > to 0
> > > > > > > > (our
> > > > > > > > >>> current default value). Creating a batch for every
> message
> > > will
> > > > > be
> > > > > > > bad
> > > > > > > > >>> for
> > > > > > > > >>> throughput. Instead, the current implementation only
> forms
> > a
> > > > > batch
> > > > > > > when
> > > > > > > > >>> the
> > > > > > > > >>> batch is sendable (i.e., broker is available, inflight
> > > request
> > > > > > limit
> > > > > > > is
> > > > > > > > >>> not
> > > > > > > > >>> exceeded, etc). That way, the producer has more chance
> for
> > > > > > batching.
> > > > > > > > The
> > > > > > > > >>> implication is that a batch could be closed longer than
> > > > > linger.ms.
> > > > > > > > >>>
> > > > > > > > >>> Now, on your concern about not having a precise way to
> > > control
> > > > > > delay
> > > > > > > in
> > > > > > > > >>> the
> > > > > > > > >>> accumulator. It seems the batch.expiry.ms approach will
> > have
> > > > the
> > > > > > > same
> > > > > > > > >>> issue. If you start the clock when a batch is
> initialized,
> > > you
> > > > > may
> > > > > > > > expire
> > > > > > > > >>> some messages in the same batch early than
> batch.expiry.ms
> > .
> > > If
> > > > > you
> > > > > > > > start
> > > > > > > > >>> the clock when the batch is closed, the expiration time
> > could
> > > > be
> > > > > > > > >>> unbounded
> > > > > > > > >>> because of the linger.ms implementation described above.
> > > > > Starting
> > > > > > > the
> > > > > > > > >>> expiration clock on batch initialization will at least
> > > > guarantee
> > > > > > the
> > > > > > > > time
> > > > > > > > >>> to expire the first message is precise, which is probably
> > > good
> > > > > > > enough.
> > > > > > > > >>>
> > > > > > > > >>> Thanks,
> > > > > > > > >>>
> > > > > > > > >>> Jun
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>>
> > > > > > > > >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <
> > > > suta...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >>>
> > > > > > > > >>> > Question about "the closing of a batch can be delayed
> > > longer
> > > > > than
> > > > > > > > >>> > linger.ms":
> > > > > > > > >>> > Is it possible to cause an indefinite delay? At some
> > point
> > > > > bytes
> > > > > > > > limit
> > > > > > > > >>> > might kick in. Also, why is closing of a batch coupled
> > with
> > > > > > > > >>> availability of
> > > > > > > > >>> > its destination? In this approach a batch chosen for
> > > eviction
> > > > > due
> > > > > > > to
> > > > > > > > >>> delay
> > > > > > > > >>> > needs to "close" anyway, right (without regards to
> > > > destination
> > > > > > > > >>> > availability)?
> > > > > > > > >>> >
> > > > > > > > >>> > I'm not too worried about notifying at super-exact time
> > > > > specified
> > > > > > > in
> > > > > > > > >>> the
> > > > > > > > >>> > configs. But expiring before the full wait-span has
> > elapsed
> > > > > > sounds
> > > > > > > a
> > > > > > > > >>> little
> > > > > > > > >>> > weird. So expiration time has a +/- spread. It works
> more
> > > > like
> > > > > a
> > > > > > > hint
> > > > > > > > >>> than
> > > > > > > > >>> > max. So why not message.delivery.wait.hint.ms?
> > > > > > > > >>> >
> > > > > > > > >>> > Yeah, cancellable future will be similar in complexity.
> > > > > > > > >>> >
> > > > > > > > >>> > I'm unsure if max.message.delivery.wait.ms will the
> > final
> > > > nail
> > > > > > for
> > > > > > > > >>> > producer
> > > > > > > > >>> > timeouts. We still won't have a precise way to control
> > > delay
> > > > in
> > > > > > > just
> > > > > > > > >>> the
> > > > > > > > >>> > accumulator segment. batch.expiry.ms does not try to
> > > > abstract.
> > > > > > > It's
> > > > > > > > >>> very
> > > > > > > > >>> > specific.
> > > > > > > > >>> >
> > > > > > > > >>> > My biggest concern at the moment is implementation
> > > > complexity.
> > > > > > > > >>> >
> > > > > > > > >>> > At this state, I would like to encourage other
> > independent
> > > > > > > opinions.
> > > > > > > > >>> >
> > > > > > > > >>> > Regards,
> > > > > > > > >>> > Sumant
> > > > > > > > >>> >
> > > > > > > > >>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io>
> > > > wrote:
> > > > > > > > >>> >
> > > > > > > > >>> > > Hi, Sumant,
> > > > > > > > >>> > >
> > > > > > > > >>> > > 1. Yes, it's probably reasonable to require
> > > > > > > > >>> max.message.delivery.wait.ms
> > > > > > > > >>> > >
> > > > > > > > >>> > > linger.ms. As for retries, perhaps we can set the
> > > default
> > > > > > > retries
> > > > > > > > to
> > > > > > > > >>> > > infinite or just ignore it. Then the latency will be
> > > > bounded
> > > > > by
> > > > > > > > >>> > > max.message.delivery.wait.ms. request.timeout.ms is
> > the
> > > > max
> > > > > > time
> > > > > > > > the
> > > > > > > > >>> > > request will be spending on the server. The client
> can
> > > > expire
> > > > > > an
> > > > > > > > >>> inflight
> > > > > > > > >>> > > request early if needed.
> > > > > > > > >>> > >
> > > > > > > > >>> > > 2. Well, since max.message.delivery.wait.ms
> specifies
> > > the
> > > > > max,
> > > > > > > > >>> calling
> > > > > > > > >>> > the
> > > > > > > > >>> > > callback a bit early may be ok? Note that
> > > > > > > > >>> max.message.delivery.wait.ms
> > > > > > > > >>> > > only
> > > > > > > > >>> > > comes into play in the rare error case. So, I am not
> > sure
> > > > if
> > > > > we
> > > > > > > > need
> > > > > > > > >>> to
> > > > > > > > >>> > be
> > > > > > > > >>> > > very precise. The issue with starting the clock on
> > > closing
> > > > a
> > > > > > > batch
> > > > > > > > is
> > > > > > > > >>> > that
> > > > > > > > >>> > > currently if the leader is not available, the closing
> > of
> > > a
> > > > > > batch
> > > > > > > > can
> > > > > > > > >>> be
> > > > > > > > >>> > > delayed longer than linger.ms.
> > > > > > > > >>> > >
> > > > > > > > >>> > > 4. As you said, future.get(timeout) itself doesn't
> > solve
> > > > the
> > > > > > > > problem
> > > > > > > > >>> > since
> > > > > > > > >>> > > you still need a way to expire the record in the
> > sender.
> > > > The
> > > > > > > amount
> > > > > > > > >>> of
> > > > > > > > >>> > work
> > > > > > > > >>> > > to implement a cancellable future is probably the
> same?
> > > > > > > > >>> > >
> > > > > > > > >>> > > Overall, my concern with patch work is that we have
> > > > iterated
> > > > > on
> > > > > > > the
> > > > > > > > >>> > produce
> > > > > > > > >>> > > request timeout multiple times and new issues keep
> > coming
> > > > > back.
> > > > > > > > >>> Ideally,
> > > > > > > > >>> > > this time, we want to have a solution that covers all
> > > > cases,
> > > > > > even
> > > > > > > > >>> though
> > > > > > > > >>> > > that requires a bit more work.
> > > > > > > > >>> > >
> > > > > > > > >>> > > Thanks,
> > > > > > > > >>> > >
> > > > > > > > >>> > > Jun
> > > > > > > > >>> > >
> > > > > > > > >>> > >
> > > > > > > > >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <
> > > > > > > suta...@gmail.com>
> > > > > > > > >>> > wrote:
> > > > > > > > >>> > >
> > > > > > > > >>> > > > Hi Jun,
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Thanks for looking into it.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Yes, we did consider this message-level timeout
> > > approach
> > > > > and
> > > > > > > > >>> expiring
> > > > > > > > >>> > > > batches selectively in a request but rejected it
> due
> > to
> > > > the
> > > > > > > > >>> reasons of
> > > > > > > > >>> > > > added complexity without a strong benefit to
> > > > counter-weigh
> > > > > > > that.
> > > > > > > > >>> Your
> > > > > > > > >>> > > > proposal is a slight variation so I'll mention some
> > > > issues
> > > > > > > here.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > 1. It sounds like max.message.delivery.wait.ms
> will
> > > > > overlap
> > > > > > > with
> > > > > > > > >>> "time
> > > > > > > > >>> > > > segments" of both linger.ms and retries * (
> > > > > > request.timeout.ms
> > > > > > > +
> > > > > > > > >>> > > > retry.backoff.ms). In that case, which config set
> > > takes
> > > > > > > > >>> precedence? It
> > > > > > > > >>> > > > would not make sense to configure configs from both
> > > sets.
> > > > > > > > >>> Especially,
> > > > > > > > >>> > we
> > > > > > > > >>> > > > discussed exhaustively internally that retries and
> > > > > > > > >>> > > > max.message.delivery.wait.ms can't / shouldn't be
> > > > > configured
> > > > > > > > >>> together.
> > > > > > > > >>> > > > Retires become moot as you already mention. I think
> > > > that's
> > > > > > > going
> > > > > > > > >>> to be
> > > > > > > > >>> > > > surprising to anyone wanting to use
> > > > > > > max.message.delivery.wait.ms
> > > > > > > > .
> > > > > > > > >>> We
> > > > > > > > >>> > > > probably need max.message.delivery.wait.ms >
> > linger.ms
> > > > or
> > > > > > > > >>> something
> > > > > > > > >>> > like
> > > > > > > > >>> > > > that.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > 2. If clock starts when a batch is created and
> expire
> > > > when
> > > > > > > > >>> > > > max.message.delivery.wait.ms is over in the
> > > accumulator,
> > > > > the
> > > > > > > > last
> > > > > > > > >>> few
> > > > > > > > >>> > > > messages in the expiring batch may not have lived
> > long
> > > > > > enough.
> > > > > > > As
> > > > > > > > >>> the
> > > > > > > > >>> > > > config seems to suggests per-message timeout, it's
> > > > > incorrect
> > > > > > to
> > > > > > > > >>> expire
> > > > > > > > >>> > > > messages prematurely. On the other hand if clock
> > starts
> > > > > after
> > > > > > > > >>> batch is
> > > > > > > > >>> > > > closed (which also implies that linger.ms is not
> > > covered
> > > > > by
> > > > > > > the
> > > > > > > > >>> > > > max.message.delivery.wait.ms config), no message
> > would
> > > > be
> > > > > be
> > > > > > > > >>> expired
> > > > > > > > >>> > too
> > > > > > > > >>> > > > soon. Yeah, expiration may be little bit too late
> but
> > > > hey,
> > > > > > this
> > > > > > > > >>> ain't
> > > > > > > > >>> > > > real-time service.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > 3. I agree that steps #3, #4, (and #5) are complex
> to
> > > > > > > implement.
> > > > > > > > >>> On the
> > > > > > > > >>> > > > other hand, batch.expiry.ms is next to trivial to
> > > > > implement.
> > > > > > > We
> > > > > > > > >>> just
> > > > > > > > >>> > > pass
> > > > > > > > >>> > > > the config all the way down to
> > > ProducerBatch.maybeExpire
> > > > > and
> > > > > > be
> > > > > > > > >>> done
> > > > > > > > >>> > with
> > > > > > > > >>> > > > it.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > 4. Do you think the effect of
> > > > max.message.delivery.wait.ms
> > > > > > can
> > > > > > > > be
> > > > > > > > >>> > > > simulated
> > > > > > > > >>> > > > with future.get(timeout) method? Copying excerpt
> from
> > > the
> > > > > > > kip-91:
> > > > > > > > >>> An
> > > > > > > > >>> > > > end-to-end timeout may be partially emulated using
> > the
> > > > > > > > >>> > > future.get(timeout).
> > > > > > > > >>> > > > The timeout must be greater than (batch.expiry.ms
> +
> > > > > nRetries
> > > > > > > * (
> > > > > > > > >>> > > > request.timeout.ms + retry.backoff.ms)). Note that
> > > when
> > > > > > future
> > > > > > > > >>> times
> > > > > > > > >>> > > out,
> > > > > > > > >>> > > > Sender may continue to send the records in the
> > > > background.
> > > > > To
> > > > > > > > avoid
> > > > > > > > >>> > that,
> > > > > > > > >>> > > > implementing a cancellable future is a possibility.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > For simplicity, we could just implement a trivial
> > > method
> > > > in
> > > > > > > > >>> producer
> > > > > > > > >>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and
> > return
> > > a
> > > > > > number
> > > > > > > > >>> based
> > > > > > > > >>> > on
> > > > > > > > >>> > > > this formula? Users of future.get can use this
> > timeout
> > > > > value.
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Thoughts?
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > Regards,
> > > > > > > > >>> > > > Sumant
> > > > > > > > >>> > > >
> > > > > > > > >>> > > >
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > On 11 August 2017 at 07:50, Sumant Tambe <
> > > > > suta...@gmail.com>
> > > > > > > > >>> wrote:
> > > > > > > > >>> > > >
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > > Thanks for the KIP. Nice documentation on all
> > current
> > > > > > issues
> > > > > > > > >>> with the
> > > > > > > > >>> > > > >> timeout.
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > > For the KIP writeup, all credit goes to Joel
> Koshy.
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > > I'll follow up on your comments a little later.
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> You also brought up a good use case for timing
> > out a
> > > > > > > message.
> > > > > > > > >>> For
> > > > > > > > >>> > > > >> applications that collect and send sensor data
> to
> > > > Kafka,
> > > > > > if
> > > > > > > > the
> > > > > > > > >>> data
> > > > > > > > >>> > > > can't
> > > > > > > > >>> > > > >> be sent to Kafka for some reason, the
> application
> > > may
> > > > > > prefer
> > > > > > > > to
> > > > > > > > >>> > buffer
> > > > > > > > >>> > > > the
> > > > > > > > >>> > > > >> more recent data in the accumulator. Without a
> > > > timeout,
> > > > > > the
> > > > > > > > >>> > > accumulator
> > > > > > > > >>> > > > >> will be filled with old records and new records
> > > can't
> > > > be
> > > > > > > > added.
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> Your proposal makes sense for a developer who is
> > > > > familiar
> > > > > > > with
> > > > > > > > >>> how
> > > > > > > > >>> > the
> > > > > > > > >>> > > > >> producer works. I am not sure if this is very
> > > > intuitive
> > > > > to
> > > > > > > the
> > > > > > > > >>> users
> > > > > > > > >>> > > > since
> > > > > > > > >>> > > > >> it may not be very easy for them to figure out
> how
> > > to
> > > > > > > > configure
> > > > > > > > >>> the
> > > > > > > > >>> > > new
> > > > > > > > >>> > > > >> knob to bound the amount of the time when a
> > message
> > > is
> > > > > > > > >>> completed.
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> From users' perspective, Apurva's suggestion of
> > > > > > > > >>> > > > >> max.message.delivery.wait.ms (which
> > > > > > > > >>> > > > >> bounds the time when a message is in the
> > accumulator
> > > > to
> > > > > > the
> > > > > > > > time
> > > > > > > > >>> > when
> > > > > > > > >>> > > > the
> > > > > > > > >>> > > > >> callback is called) seems more intuition. You
> > listed
> > > > > this
> > > > > > in
> > > > > > > > the
> > > > > > > > >>> > > > rejected
> > > > > > > > >>> > > > >> section since it requires additional logic to
> > > rebatch
> > > > > > when a
> > > > > > > > >>> produce
> > > > > > > > >>> > > > >> request expires. However, this may not be too
> bad.
> > > The
> > > > > > > > >>> following are
> > > > > > > > >>> > > the
> > > > > > > > >>> > > > >> things that we have to do.
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> 1. The clock starts when a batch is created.
> > > > > > > > >>> > > > >> 2. If the batch can't be drained within
> > > > > > > > >>> > max.message.delivery.wait.ms,
> > > > > > > > >>> > > > all
> > > > > > > > >>> > > > >> messages in the batch will fail and the callback
> > > will
> > > > be
> > > > > > > > called.
> > > > > > > > >>> > > > >> 3. When sending a produce request, we calculate
> an
> > > > > > > expireTime
> > > > > > > > >>> for
> > > > > > > > >>> > the
> > > > > > > > >>> > > > >> request that equals to the remaining expiration
> > time
> > > > for
> > > > > > the
> > > > > > > > >>> oldest
> > > > > > > > >>> > > > batch
> > > > > > > > >>> > > > >> in the request.
> > > > > > > > >>> > > > >> 4. We set the minimum of the expireTime of all
> > > > inflight
> > > > > > > > >>> requests as
> > > > > > > > >>> > > the
> > > > > > > > >>> > > > >> timeout in the selector poll call (so that the
> > > > selector
> > > > > > can
> > > > > > > > >>> wake up
> > > > > > > > >>> > > > before
> > > > > > > > >>> > > > >> the expiration time).
> > > > > > > > >>> > > > >> 5. If the produce response can't be received
> > within
> > > > > > > > expireTime,
> > > > > > > > >>> we
> > > > > > > > >>> > > > expire
> > > > > > > > >>> > > > >> all batches in the produce request whose
> > expiration
> > > > time
> > > > > > has
> > > > > > > > >>> been
> > > > > > > > >>> > > > reached.
> > > > > > > > >>> > > > >> For the rest of the batches, we resend them in a
> > new
> > > > > > produce
> > > > > > > > >>> > request.
> > > > > > > > >>> > > > >> 6. If the producer response has a retriable
> error,
> > > we
> > > > > just
> > > > > > > > >>> backoff a
> > > > > > > > >>> > > bit
> > > > > > > > >>> > > > >> and then retry the produce request as today. The
> > > > number
> > > > > of
> > > > > > > > >>> retries
> > > > > > > > >>> > > > doesn't
> > > > > > > > >>> > > > >> really matter now. We just keep retrying until
> the
> > > > > > > expiration
> > > > > > > > >>> time
> > > > > > > > >>> > is
> > > > > > > > >>> > > > >> reached. It's possible that a produce request is
> > > never
> > > > > > > retried
> > > > > > > > >>> due
> > > > > > > > >>> > to
> > > > > > > > >>> > > > >> expiration. However, this seems the right thing
> to
> > > do
> > > > > > since
> > > > > > > > the
> > > > > > > > >>> > users
> > > > > > > > >>> > > > want
> > > > > > > > >>> > > > >> to timeout the message at this time.
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> Implementation wise, there will be a bit more
> > > > complexity
> > > > > > in
> > > > > > > > >>> step 3
> > > > > > > > >>> > and
> > > > > > > > >>> > > > 4,
> > > > > > > > >>> > > > >> but probably not too bad. The benefit is that
> this
> > > is
> > > > > more
> > > > > > > > >>> intuitive
> > > > > > > > >>> > > to
> > > > > > > > >>> > > > >> the
> > > > > > > > >>> > > > >> end user.
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> Does that sound reasonable to you?
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> Thanks,
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> Jun
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <
> > > > > > > > >>> suta...@gmail.com>
> > > > > > > > >>> > > > wrote:
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <
> > > > > > > > >>> apu...@confluent.io>
> > > > > > > > >>> > > > >> wrote:
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > > > > There seems to be no relationship with
> > > cluster
> > > > > > > > metadata
> > > > > > > > >>> > > > >> availability
> > > > > > > > >>> > > > >> > or
> > > > > > > > >>> > > > >> > > > > staleness. Expiry is just based on the
> > time
> > > > > since
> > > > > > > the
> > > > > > > > >>> batch
> > > > > > > > >>> > > has
> > > > > > > > >>> > > > >> been
> > > > > > > > >>> > > > >> > > > ready.
> > > > > > > > >>> > > > >> > > > > Please correct me if I am wrong.
> > > > > > > > >>> > > > >> > > > >
> > > > > > > > >>> > > > >> > > >
> > > > > > > > >>> > > > >> > > > I was not very specific about where we do
> > > > > > expiration.
> > > > > > > I
> > > > > > > > >>> > glossed
> > > > > > > > >>> > > > over
> > > > > > > > >>> > > > >> > some
> > > > > > > > >>> > > > >> > > > details because (again) we've other
> > mechanisms
> > > > to
> > > > > > > detect
> > > > > > > > >>> non
> > > > > > > > >>> > > > >> progress.
> > > > > > > > >>> > > > >> > > The
> > > > > > > > >>> > > > >> > > > condition (!muted.contains(tp) &&
> > > > (isMetadataStale
> > > > > > ||
> > > > > > > > >>> > > > >> > > > > cluster.leaderFor(tp) == null)) is used
> in
> > > > > > > > >>> > > > >> > > > RecordAccumualtor.expiredBatches:
> > > > > > > > >>> > > > >> > > > https://github.com/apache/
> > > > > > > kafka/blob/trunk/clients/src/
> > > > > > > > >>> > > > >> > > > main/java/org/apache/kafka/
> > > > > > > clients/producer/internals/
> > > > > > > > >>> > > > >> > > > RecordAccumulator.java#L443
> > > > > > > > >>> > > > >> > > >
> > > > > > > > >>> > > > >> > > >
> > > > > > > > >>> > > > >> > > > Effectively, we expire in all the
> following
> > > > cases
> > > > > > > > >>> > > > >> > > > 1) producer is partitioned from the
> brokers.
> > > > When
> > > > > > > > >>> metadata age
> > > > > > > > >>> > > > grows
> > > > > > > > >>> > > > >> > > beyond
> > > > > > > > >>> > > > >> > > > 3x it's max value. It's safe to say that
> > we're
> > > > not
> > > > > > > > >>> talking to
> > > > > > > > >>> > > the
> > > > > > > > >>> > > > >> > brokers
> > > > > > > > >>> > > > >> > > > at all. Report.
> > > > > > > > >>> > > > >> > > > 2) fresh metadata && leader for a
> partition
> > is
> > > > not
> > > > > > > known
> > > > > > > > >>> && a
> > > > > > > > >>> > > > batch
> > > > > > > > >>> > > > >> is
> > > > > > > > >>> > > > >> > > > sitting there for longer than
> > > > request.timeout.ms.
> > > > > > > This
> > > > > > > > >>> is one
> > > > > > > > >>> > > > case
> > > > > > > > >>> > > > >> we
> > > > > > > > >>> > > > >> > > > would
> > > > > > > > >>> > > > >> > > > like to improve and use batch.expiry.ms
> > > because
> > > > > > > > >>> > > > request.timeout.ms
> > > > > > > > >>> > > > >> is
> > > > > > > > >>> > > > >> > > too
> > > > > > > > >>> > > > >> > > > small.
> > > > > > > > >>> > > > >> > > > 3) fresh metadata && leader for a
> partition
> > is
> > > > > known
> > > > > > > &&
> > > > > > > > >>> batch
> > > > > > > > >>> > is
> > > > > > > > >>> > > > >> > sitting
> > > > > > > > >>> > > > >> > > > there for longer than batch.expiry.ms.
> This
> > > is
> > > > a
> > > > > > new
> > > > > > > > case
> > > > > > > > >>> > that
> > > > > > > > >>> > > is
> > > > > > > > >>> > > > >> > > > different
> > > > > > > > >>> > > > >> > > > from #2. This is the catch-up mode case.
> > > Things
> > > > > are
> > > > > > > > >>> moving too
> > > > > > > > >>> > > > >> slowly.
> > > > > > > > >>> > > > >> > > > Pipeline SLAs are broken. Report and
> > shutdown
> > > > kmm.
> > > > > > > > >>> > > > >> > > >
> > > > > > > > >>> > > > >> > > > The second and the third cases are useful
> > to a
> > > > > > > real-time
> > > > > > > > >>> app
> > > > > > > > >>> > > for a
> > > > > > > > >>> > > > >> > > > completely different reason. Report,
> forget
> > > > about
> > > > > > the
> > > > > > > > >>> batch,
> > > > > > > > >>> > and
> > > > > > > > >>> > > > >> just
> > > > > > > > >>> > > > >> > > move
> > > > > > > > >>> > > > >> > > > on (without shutting down).
> > > > > > > > >>> > > > >> > > >
> > > > > > > > >>> > > > >> > > >
> > > > > > > > >>> > > > >> > > If I understand correctly, you are talking
> > > about a
> > > > > > fork
> > > > > > > of
> > > > > > > > >>> > apache
> > > > > > > > >>> > > > >> kafka
> > > > > > > > >>> > > > >> > > which has these additional conditions?
> Because
> > > > that
> > > > > > > check
> > > > > > > > >>> > doesn't
> > > > > > > > >>> > > > >> exist
> > > > > > > > >>> > > > >> > on
> > > > > > > > >>> > > > >> > > trunk today.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > Right. It is our internal release in LinkedIn.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > Or are you proposing to change the behavior of
> > > > expiry
> > > > > to
> > > > > > > > >>> > > > >> > > account for stale metadata and partitioned
> > > > producers
> > > > > > as
> > > > > > > > >>> part of
> > > > > > > > >>> > > this
> > > > > > > > >>> > > > >> KIP?
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > No. It's our temporary solution in the absence
> > of
> > > > > > kip-91.
> > > > > > > > Note
> > > > > > > > >>> > that
> > > > > > > > >>> > > we
> > > > > > > > >>> > > > >> dont
> > > > > > > > >>> > > > >> > like increasing request.timeout.ms. Without
> our
> > > > extra
> > > > > > > > >>> conditions
> > > > > > > > >>> > > our
> > > > > > > > >>> > > > >> > batches expire too soon--a problem in kmm
> > catchup
> > > > > mode.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > If we get batch.expiry.ms, we will configure
> it
> > > to
> > > > 20
> > > > > > > mins.
> > > > > > > > >>> > > > maybeExpire
> > > > > > > > >>> > > > >> > will use the config instead of r.t.ms. The
> > extra
> > > > > > > conditions
> > > > > > > > >>> will
> > > > > > > > >>> > be
> > > > > > > > >>> > > > >> > unnecessary. All three cases shall be covered
> > via
> > > > the
> > > > > > > > >>> batch.expiry
> > > > > > > > >>> > > > >> timeout.
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >> > >
> > > > > > > > >>> > > > >> > >
> > > > > > > > >>> > > > >> >
> > > > > > > > >>> > > > >>
> > > > > > > > >>> > > > >
> > > > > > > > >>> > > >
> > > > > > > > >>> > >
> > > > > > > > >>> >
> > > > > > > > >>>
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to