Hmm, I thought delivery.timeout.ms bounds the time from a message is in the accumulator (i.e., when send() returns) to the time when the callback is called. If we wait for request.timeout.ms for an inflight request and the remaining delivery.timeout.ms is less than request.timeout.ms, the callback may be called later than delivery.timeout.ms, right?
Jiangjie's concern on resetting the pid on every expired batch is probably not an issue if we only reset the pid when the expired batch's pid is the same as the current pid, as Jason suggested. Thanks, Jun On Tue, Aug 29, 2017 at 3:09 PM, Jason Gustafson <ja...@confluent.io> wrote: > I think the semantics of delivery.timeout.ms need to allow for the > possibility that the record was actually written. Unless we can keep on > retrying indefinitely, there's really no way to know for sure whether the > record was written or not. A delivery timeout just means that we cannot > guarantee that the record was delivered. > > -Jason > > On Tue, Aug 29, 2017 at 2:51 PM, Becket Qin <becket....@gmail.com> wrote: > > > Hi Jason, > > > > If we expire the batch from user's perspective but still waiting for the > > response, would that mean it is likely that the batch will be > successfully > > appended but the users will receive a TimeoutException? That seems a > little > > non-intuitive to the users. Arguably it maybe OK though because currently > > when TimeoutException is thrown, there is no guarantee whether the > messages > > are delivered or not. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Tue, Aug 29, 2017 at 12:33 PM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > I think I'm with Becket. We should wait for request.timeout.ms for > each > > > produce request we send. We can still await the response internally for > > > PID/sequence maintenance even if we expire the batch from the user's > > > perspective. New sequence numbers would be assigned based on the > current > > > PID until the response returns and we find whether a PID reset is > > actually > > > needed. This makes delivery.timeout.ms a hard limit which is easier to > > > explain. > > > > > > -Jason > > > > > > On Tue, Aug 29, 2017 at 11:10 AM, Sumant Tambe <suta...@gmail.com> > > wrote: > > > > > > > I'm updating the kip-91 writeup. There seems to be some confusion > about > > > > expiring an inflight request. An inflight request gets a full > > > > delivery.timeout.ms duration from creation, right? So it should be > > > > max(remaining delivery.timeout.ms, request.timeout.ms)? > > > > > > > > Jun, we do want to wait for an inflight request for longer than > > > > request.timeout.ms. right? > > > > > > > > What happens to a batch when retries * (request.timeout.ms + > > > > retry.backoff.ms) < delivery.timeout.ms and all retries are > > > exhausted? I > > > > remember an internal discussion where we concluded that retries can > be > > no > > > > longer relevant (i.e., ignored, which is same as retries=MAX_LONG) > when > > > > there's an end-to-end delivery.timeout.ms. Do you agree? > > > > > > > > Regards, > > > > Sumant > > > > > > > > On 27 August 2017 at 12:08, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Jiangjie, > > > > > > > > > > If we want to enforce delivery.timeout.ms, we need to take the min > > > > right? > > > > > Also, if a user sets a large delivery.timeout.ms, we probably > don't > > > want > > > > > to > > > > > wait for an inflight request longer than request.timeout.ms. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Fri, Aug 25, 2017 at 5:19 PM, Becket Qin <becket....@gmail.com> > > > > wrote: > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > I see what you mean. That makes sense. So in the above case after > > the > > > > > > producer resets PID, when it retry batch_0_tp1, the batch will > > still > > > > have > > > > > > the old PID even if the producer has already got a new PID. > > > > > > > > > > > > @Jun, do you mean max(remaining delivery.timeout.ms, > > > > request.timeout.ms) > > > > > > instead of min(remaining delivery.timeout.ms, request.timeout.ms > )? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Fri, Aug 25, 2017 at 9:34 AM, Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > > > Hi, Becket, > > > > > > > > > > > > > > Good point on expiring inflight requests. Perhaps we can expire > > an > > > > > > inflight > > > > > > > request after min(remaining delivery.timeout.ms, > > > request.timeout.ms > > > > ). > > > > > > This > > > > > > > way, if a user sets a high delivery.timeout.ms, we can still > > > recover > > > > > > from > > > > > > > broker power outage sooner. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Thu, Aug 24, 2017 at 12:52 PM, Becket Qin < > > becket....@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > > > > > delivery.timeout.ms sounds good to me. > > > > > > > > > > > > > > > > I was referring to the case that we are resetting the > > > PID/sequence > > > > > > after > > > > > > > > expire a batch. This is more about the sending the batches > > after > > > > the > > > > > > > > expired batch. > > > > > > > > > > > > > > > > The scenario being discussed is expiring one of the batches > in > > a > > > > > > > in-flight > > > > > > > > request and retry the other batches in the that in-flight > > > request. > > > > So > > > > > > > > consider the following case: > > > > > > > > 1. Producer sends request_0 with two batches (batch_0_tp0 and > > > > > > > batch_0_tp1). > > > > > > > > 2. Broker receives the request enqueued the request to the > log. > > > > > > > > 3. Before the producer receives the response from the broker, > > > > > > batch_0_tp0 > > > > > > > > expires. The producer will expire batch_0_tp0 immediately, > > resets > > > > > PID, > > > > > > > and > > > > > > > > then resend batch_0_tp1, and maybe send batch_1_tp0 (i.e. the > > > next > > > > > > batch > > > > > > > to > > > > > > > > the expired batch) as well. > > > > > > > > > > > > > > > > For batch_0_tp1, it is OK to reuse PID and and sequence > number. > > > The > > > > > > > problem > > > > > > > > is for batch_1_tp0, If we reuse the same PID and the broker > has > > > > > already > > > > > > > > appended batch_0_tp0, the broker will think batch_1_tp0 is a > > > > > duplicate > > > > > > > with > > > > > > > > the same sequence number. As a result broker will drop > > > batch_0_tp1. > > > > > > That > > > > > > > is > > > > > > > > why we have to either bump up sequence number or reset PID. > To > > > > avoid > > > > > > this > > > > > > > > complexity, I was suggesting not expire the in-flight batch > > > > > > immediately, > > > > > > > > but wait for the produce response. If the batch has been > > > > successfully > > > > > > > > appended, we do not expire it. Otherwise, we expire it. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 24, 2017 at 11:26 AM, Jason Gustafson < > > > > > ja...@confluent.io> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > @Becket > > > > > > > > > > > > > > > > > > Good point about unnecessarily resetting the PID in cases > > where > > > > we > > > > > > know > > > > > > > > the > > > > > > > > > request has failed. Might be worth opening a JIRA to try > and > > > > > improve > > > > > > > > this. > > > > > > > > > > > > > > > > > > So if we expire the batch prematurely and resend all > > > > > > > > > > the other batches in the same request, chances are there > > will > > > > be > > > > > > > > > > duplicates. If we wait for the response instead, it is > less > > > > > likely > > > > > > to > > > > > > > > > > introduce duplicates, and we may not need to reset the > PID. > > > > > > > > > > > > > > > > > > > > > > > > > > > Not sure I follow this. Are you assuming that we change the > > > batch > > > > > > > > > PID/sequence of the retried batches after resetting the > PID? > > I > > > > > think > > > > > > we > > > > > > > > > probably need to ensure that when we retry a batch, we > always > > > use > > > > > the > > > > > > > > same > > > > > > > > > PID/sequence. > > > > > > > > > > > > > > > > > > By the way, as far as naming, ` > max.message.delivery.wait.ms` > > > is > > > > > > quite > > > > > > > a > > > > > > > > > mouthful. Could we shorten it? Perhaps ` > delivery.timeout.ms > > `? > > > > > > > > > > > > > > > > > > -Jason > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2017 at 8:51 PM, Becket Qin < > > > > becket....@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > If TCP timeout is longer than request.timeout.ms, the > > > producer > > > > > > will > > > > > > > > > always > > > > > > > > > > hit request.timeout.ms before hitting TCP timeout, > right? > > > That > > > > > is > > > > > > > why > > > > > > > > we > > > > > > > > > > added request.timeout.ms in the first place. > > > > > > > > > > > > > > > > > > > > You are right. Currently we are reset the PID and resend > > the > > > > > > batches > > > > > > > to > > > > > > > > > > avoid OutOfOrderSequenceException when the expired > batches > > > are > > > > in > > > > > > > > retry. > > > > > > > > > > > > > > > > > > > > This does not distinguish the reasons that caused the > > retry. > > > > > There > > > > > > > are > > > > > > > > > two > > > > > > > > > > cases: > > > > > > > > > > 1. If the batch was in retry because it received an error > > > > > response > > > > > > > > (e.g. > > > > > > > > > > NotLeaderForPartition), we actually don't need to reset > PID > > > in > > > > > this > > > > > > > > case > > > > > > > > > > because we know that broker did not accept it. > > > > > > > > > > 2. If the batch was in retry because it hit a timeout > > > earlier, > > > > > then > > > > > > > we > > > > > > > > > > should reset the PID (or optimistically send and only > reset > > > PID > > > > > > when > > > > > > > > > > receive OutOfOrderSequenceException?) > > > > > > > > > > Case 1 is probably the most common case, so it looks that > > we > > > > are > > > > > > > > > resetting > > > > > > > > > > the PID more often than necessary. But because in case 1 > > the > > > > > broker > > > > > > > > does > > > > > > > > > > not have the batch, there isn't much impact on resting > PID > > > and > > > > > > resend > > > > > > > > > other > > > > > > > > > > than the additional round trip. > > > > > > > > > > > > > > > > > > > > Now we are introducing another case: > > > > > > > > > > 3. A batch is in retry because we expired an in-flight > > > request > > > > > > before > > > > > > > > it > > > > > > > > > > hits request.timeout.ms. > > > > > > > > > > > > > > > > > > > > The difference between 2 and 3 is that in case 3 likely > the > > > > > broker > > > > > > > has > > > > > > > > > > appended the messages. So if we expire the batch > > prematurely > > > > and > > > > > > > resend > > > > > > > > > all > > > > > > > > > > the other batches in the same request, chances are there > > will > > > > be > > > > > > > > > > duplicates. If we wait for the response instead, it is > less > > > > > likely > > > > > > to > > > > > > > > > > introduce duplicates, and we may not need to reset the > PID. > > > > > > > > > > > > > > > > > > > > That said, given that batch expiration is probably > already > > > rare > > > > > > > enough, > > > > > > > > > so > > > > > > > > > > it may not be necessary to optimize for that. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2017 at 5:01 PM, Jun Rao < > j...@confluent.io > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi, Becket, > > > > > > > > > > > > > > > > > > > > > > If a message expires while it's in an inflight produce > > > > request, > > > > > > the > > > > > > > > > > > producer will get a new PID if idempotent is enabled. > > This > > > > will > > > > > > > > prevent > > > > > > > > > > > subsequent messages from hitting > > > OutOfOrderSequenceException. > > > > > The > > > > > > > > issue > > > > > > > > > > of > > > > > > > > > > > not expiring an inflight request is that if a broker > > server > > > > > goes > > > > > > > down > > > > > > > > > > hard > > > > > > > > > > > (e.g. power outage), the time that it takes for the > > client > > > to > > > > > > > detect > > > > > > > > > the > > > > > > > > > > > socket level error (this will be sth like 8+ minutes > with > > > the > > > > > > > default > > > > > > > > > TCP > > > > > > > > > > > setting) is much longer than the default > > > request.timeout.ms. > > > > > > > > > > > > > > > > > > > > > > Hi, Sumant, > > > > > > > > > > > > > > > > > > > > > > We can probably just default > > max.message.delivery.wait.ms > > > to > > > > > 30 > > > > > > > > secs, > > > > > > > > > > the > > > > > > > > > > > current default for request.timeout.ms. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2017 at 3:38 PM, Sumant Tambe < > > > > > suta...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > OK. Looks like starting the clock after closing the > > batch > > > > has > > > > > > > > quite a > > > > > > > > > > few > > > > > > > > > > > > pitfalls. I can't think of a way of to work around it > > > > without > > > > > > > > adding > > > > > > > > > > yet > > > > > > > > > > > > another config. So I won't discuss that here. Anyone? > > As > > > I > > > > > said > > > > > > > > > > earlier, > > > > > > > > > > > > I'm not hung up on super-accurate notification times. > > > > > > > > > > > > > > > > > > > > > > > > If we are going down the > max.message.delievery.wait.ms > > > > > route, > > > > > > > what > > > > > > > > > > would > > > > > > > > > > > > be > > > > > > > > > > > > the default? There seem to be a few options. > > > > > > > > > > > > > > > > > > > > > > > > 1. max.message.delievery.wait.ms=null. Nothing > changes > > > for > > > > > > those > > > > > > > > who > > > > > > > > > > > don't > > > > > > > > > > > > set it. I.e., batches expire after > request.timeout.ms > > in > > > > > > > > > accumulator. > > > > > > > > > > If > > > > > > > > > > > > they are past the accumulator stage, timeout after > > > > retries*( > > > > > > > > > > > > request.timeout.ms+backoff). > > > > > > > > > > > > > > > > > > > > > > > > 2. max.message.delivery.wait.ms=request.timeout.ms. > No > > > > > > obervable > > > > > > > > > > > > behavioral > > > > > > > > > > > > change at the accumulator level as timeout value is > > same > > > as > > > > > > > before. > > > > > > > > > > > Retries > > > > > > > > > > > > will be done if as long as batch is under > > > > > > > > > max.message.delivery.wait.ms > > > > > > > > > > . > > > > > > > > > > > > However, a batch can expire just after one try. > That's > > ok > > > > IMO > > > > > > > > because > > > > > > > > > > > > request.timeout.ms tend to be large (Default 30000). > > > > > > > > > > > > > > > > > > > > > > > > 3. max.message.delivery.wait.ms=2*request.timeout.ms > . > > > Give > > > > > > > > > opportunity > > > > > > > > > > > for > > > > > > > > > > > > two retries but warn that retries may not happen at > all > > > in > > > > > some > > > > > > > > rare > > > > > > > > > > > > cases and a batch could expire before any attempt. > > > > > > > > > > > > > > > > > > > > > > > > 4. max.message.delivery.wait.ms=something else (a > > > > constant?) > > > > > > > > > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > > > > > On 23 August 2017 at 09:01, Ismael Juma < > > > ism...@juma.me.uk > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Becket, that seems reasonable. Sumant, would > > you > > > > be > > > > > > > > willing > > > > > > > > > to > > > > > > > > > > > > > update the KIP based on the discussion or are you > > still > > > > not > > > > > > > > > > convinced? > > > > > > > > > > > > > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin < > > > > > > > > becket....@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > In general max.message.delivery.wait.ms is a > > cleaner > > > > > > > approach. > > > > > > > > > > That > > > > > > > > > > > > > would > > > > > > > > > > > > > > make the guarantee clearer. That said, there seem > > > > > > subtleties > > > > > > > in > > > > > > > > > > some > > > > > > > > > > > > > > scenarios: > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I agree with Sumante that it is a little weird > > > that > > > > a > > > > > > > > message > > > > > > > > > > > could > > > > > > > > > > > > be > > > > > > > > > > > > > > expired immediately if it happens to enter a > batch > > > that > > > > > is > > > > > > > > about > > > > > > > > > to > > > > > > > > > > > be > > > > > > > > > > > > > > expired. But as Jun said, as long as we have > > multiple > > > > > > > messages > > > > > > > > > in a > > > > > > > > > > > > > batch, > > > > > > > > > > > > > > there isn't a cheap way to achieve a precise > > timeout. > > > > So > > > > > > the > > > > > > > > > > question > > > > > > > > > > > > > > actually becomes whether it is more user-friendly > > to > > > > > expire > > > > > > > > early > > > > > > > > > > > > (based > > > > > > > > > > > > > on > > > > > > > > > > > > > > the batch creation time) or expire late (based on > > the > > > > > batch > > > > > > > > close > > > > > > > > > > > > time). > > > > > > > > > > > > > I > > > > > > > > > > > > > > think both are acceptable. Personally I think > most > > > > users > > > > > do > > > > > > > not > > > > > > > > > > > really > > > > > > > > > > > > > care > > > > > > > > > > > > > > about expire a little late as long as it > eventually > > > > > > expires. > > > > > > > > So I > > > > > > > > > > > would > > > > > > > > > > > > > use > > > > > > > > > > > > > > batch close time as long as there is a bound on > > that. > > > > But > > > > > > it > > > > > > > > > looks > > > > > > > > > > > that > > > > > > > > > > > > > we > > > > > > > > > > > > > > do not really have a bound on when we will close > a > > > > batch. > > > > > > So > > > > > > > > > > > expiration > > > > > > > > > > > > > > based on batch create time may be the only option > > if > > > we > > > > > > don't > > > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > > > > introduce complexity. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. If we timeout a batch in a request when it is > > > still > > > > in > > > > > > > > flight, > > > > > > > > > > the > > > > > > > > > > > > end > > > > > > > > > > > > > > result of that batch is unclear to the users. It > > > would > > > > be > > > > > > > weird > > > > > > > > > > that > > > > > > > > > > > > user > > > > > > > > > > > > > > receive exception saying those messages are > expired > > > > while > > > > > > > they > > > > > > > > > > > actually > > > > > > > > > > > > > > have been sent successfully. Also if idempotence > is > > > set > > > > > to > > > > > > > > true, > > > > > > > > > > what > > > > > > > > > > > > > would > > > > > > > > > > > > > > the next sequence ID be after the expired batch? > > > > Reusing > > > > > > the > > > > > > > > same > > > > > > > > > > > > > sequence > > > > > > > > > > > > > > Id may result in data loss, and increment the > > > sequence > > > > ID > > > > > > may > > > > > > > > > cause > > > > > > > > > > > > > > OutOfOrderSequenceException. Besides, extracting > an > > > > > expired > > > > > > > > batch > > > > > > > > > > > from > > > > > > > > > > > > a > > > > > > > > > > > > > > request also introduces some complexity. Again, > > > > > personally > > > > > > I > > > > > > > > > think > > > > > > > > > > it > > > > > > > > > > > > is > > > > > > > > > > > > > > fine to expire a little bit late. So maybe we > don't > > > > need > > > > > to > > > > > > > > > expire > > > > > > > > > > a > > > > > > > > > > > > > batch > > > > > > > > > > > > > > that is already in flight. In the worst case we > > will > > > > > expire > > > > > > > it > > > > > > > > > with > > > > > > > > > > > > delay > > > > > > > > > > > > > > of request.timeout.ms. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma < > > > > > > > > ism...@juma.me.uk> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hi all, > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> The discussion has been going on for a while, > > would > > > it > > > > > > help > > > > > > > to > > > > > > > > > > have > > > > > > > > > > > a > > > > > > > > > > > > > >> call to discuss this? I'd like to start a vote > > > soonish > > > > > so > > > > > > > that > > > > > > > > > we > > > > > > > > > > > can > > > > > > > > > > > > > >> include this in 1.0.0. I personally prefer > > > > > > > > > > > > max.message.delivery.wait.ms > > > > > > > > > > > > > . > > > > > > > > > > > > > >> It seems like Jun, Apurva and Jason also prefer > > > that. > > > > > > > Sumant, > > > > > > > > it > > > > > > > > > > > seems > > > > > > > > > > > > > like > > > > > > > > > > > > > >> you still prefer a batch.expiry.ms, is that > > right? > > > > What > > > > > > are > > > > > > > > > your > > > > > > > > > > > > > >> thoughts Joel and Becket? > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> Ismael > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao < > > > > > > j...@confluent.io> > > > > > > > > > > wrote: > > > > > > > > > > > > > >> > > > > > > > > > > > > > >>> Hi, Sumant, > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> The semantics of linger.ms is a bit subtle. > The > > > > > > reasoning > > > > > > > > for > > > > > > > > > > the > > > > > > > > > > > > > >>> current > > > > > > > > > > > > > >>> implementation is the following. Let's say one > > sets > > > > > > > > linger.ms > > > > > > > > > > to 0 > > > > > > > > > > > > > (our > > > > > > > > > > > > > >>> current default value). Creating a batch for > > every > > > > > > message > > > > > > > > will > > > > > > > > > > be > > > > > > > > > > > > bad > > > > > > > > > > > > > >>> for > > > > > > > > > > > > > >>> throughput. Instead, the current implementation > > > only > > > > > > forms > > > > > > > a > > > > > > > > > > batch > > > > > > > > > > > > when > > > > > > > > > > > > > >>> the > > > > > > > > > > > > > >>> batch is sendable (i.e., broker is available, > > > > inflight > > > > > > > > request > > > > > > > > > > > limit > > > > > > > > > > > > is > > > > > > > > > > > > > >>> not > > > > > > > > > > > > > >>> exceeded, etc). That way, the producer has more > > > > chance > > > > > > for > > > > > > > > > > > batching. > > > > > > > > > > > > > The > > > > > > > > > > > > > >>> implication is that a batch could be closed > > longer > > > > than > > > > > > > > > > linger.ms. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Now, on your concern about not having a precise > > way > > > > to > > > > > > > > control > > > > > > > > > > > delay > > > > > > > > > > > > in > > > > > > > > > > > > > >>> the > > > > > > > > > > > > > >>> accumulator. It seems the batch.expiry.ms > > approach > > > > > will > > > > > > > have > > > > > > > > > the > > > > > > > > > > > > same > > > > > > > > > > > > > >>> issue. If you start the clock when a batch is > > > > > > initialized, > > > > > > > > you > > > > > > > > > > may > > > > > > > > > > > > > expire > > > > > > > > > > > > > >>> some messages in the same batch early than > > > > > > batch.expiry.ms > > > > > > > . > > > > > > > > If > > > > > > > > > > you > > > > > > > > > > > > > start > > > > > > > > > > > > > >>> the clock when the batch is closed, the > > expiration > > > > time > > > > > > > could > > > > > > > > > be > > > > > > > > > > > > > >>> unbounded > > > > > > > > > > > > > >>> because of the linger.ms implementation > > described > > > > > above. > > > > > > > > > > Starting > > > > > > > > > > > > the > > > > > > > > > > > > > >>> expiration clock on batch initialization will > at > > > > least > > > > > > > > > guarantee > > > > > > > > > > > the > > > > > > > > > > > > > time > > > > > > > > > > > > > >>> to expire the first message is precise, which > is > > > > > probably > > > > > > > > good > > > > > > > > > > > > enough. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Thanks, > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> Jun > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe < > > > > > > > > > suta...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >>> > Question about "the closing of a batch can be > > > > delayed > > > > > > > > longer > > > > > > > > > > than > > > > > > > > > > > > > >>> > linger.ms": > > > > > > > > > > > > > >>> > Is it possible to cause an indefinite delay? > At > > > > some > > > > > > > point > > > > > > > > > > bytes > > > > > > > > > > > > > limit > > > > > > > > > > > > > >>> > might kick in. Also, why is closing of a > batch > > > > > coupled > > > > > > > with > > > > > > > > > > > > > >>> availability of > > > > > > > > > > > > > >>> > its destination? In this approach a batch > > chosen > > > > for > > > > > > > > eviction > > > > > > > > > > due > > > > > > > > > > > > to > > > > > > > > > > > > > >>> delay > > > > > > > > > > > > > >>> > needs to "close" anyway, right (without > regards > > > to > > > > > > > > > destination > > > > > > > > > > > > > >>> > availability)? > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > I'm not too worried about notifying at > > > super-exact > > > > > time > > > > > > > > > > specified > > > > > > > > > > > > in > > > > > > > > > > > > > >>> the > > > > > > > > > > > > > >>> > configs. But expiring before the full > wait-span > > > has > > > > > > > elapsed > > > > > > > > > > > sounds > > > > > > > > > > > > a > > > > > > > > > > > > > >>> little > > > > > > > > > > > > > >>> > weird. So expiration time has a +/- spread. > It > > > > works > > > > > > more > > > > > > > > > like > > > > > > > > > > a > > > > > > > > > > > > hint > > > > > > > > > > > > > >>> than > > > > > > > > > > > > > >>> > max. So why not > message.delivery.wait.hint.ms? > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > Yeah, cancellable future will be similar in > > > > > complexity. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > I'm unsure if max.message.delivery.wait.ms > > will > > > > the > > > > > > > final > > > > > > > > > nail > > > > > > > > > > > for > > > > > > > > > > > > > >>> > producer > > > > > > > > > > > > > >>> > timeouts. We still won't have a precise way > to > > > > > control > > > > > > > > delay > > > > > > > > > in > > > > > > > > > > > > just > > > > > > > > > > > > > >>> the > > > > > > > > > > > > > >>> > accumulator segment. batch.expiry.ms does > not > > > try > > > > to > > > > > > > > > abstract. > > > > > > > > > > > > It's > > > > > > > > > > > > > >>> very > > > > > > > > > > > > > >>> > specific. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > My biggest concern at the moment is > > > implementation > > > > > > > > > complexity. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > At this state, I would like to encourage > other > > > > > > > independent > > > > > > > > > > > > opinions. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > Regards, > > > > > > > > > > > > > >>> > Sumant > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > On 11 August 2017 at 17:35, Jun Rao < > > > > > j...@confluent.io> > > > > > > > > > wrote: > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > Hi, Sumant, > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > 1. Yes, it's probably reasonable to require > > > > > > > > > > > > > >>> max.message.delivery.wait.ms > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > linger.ms. As for retries, perhaps we can > > set > > > > the > > > > > > > > default > > > > > > > > > > > > retries > > > > > > > > > > > > > to > > > > > > > > > > > > > >>> > > infinite or just ignore it. Then the > latency > > > will > > > > > be > > > > > > > > > bounded > > > > > > > > > > by > > > > > > > > > > > > > >>> > > max.message.delivery.wait.ms. > > > request.timeout.ms > > > > > is > > > > > > > the > > > > > > > > > max > > > > > > > > > > > time > > > > > > > > > > > > > the > > > > > > > > > > > > > >>> > > request will be spending on the server. The > > > > client > > > > > > can > > > > > > > > > expire > > > > > > > > > > > an > > > > > > > > > > > > > >>> inflight > > > > > > > > > > > > > >>> > > request early if needed. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > 2. Well, since > max.message.delivery.wait.ms > > > > > > specifies > > > > > > > > the > > > > > > > > > > max, > > > > > > > > > > > > > >>> calling > > > > > > > > > > > > > >>> > the > > > > > > > > > > > > > >>> > > callback a bit early may be ok? Note that > > > > > > > > > > > > > >>> max.message.delivery.wait.ms > > > > > > > > > > > > > >>> > > only > > > > > > > > > > > > > >>> > > comes into play in the rare error case. > So, I > > > am > > > > > not > > > > > > > sure > > > > > > > > > if > > > > > > > > > > we > > > > > > > > > > > > > need > > > > > > > > > > > > > >>> to > > > > > > > > > > > > > >>> > be > > > > > > > > > > > > > >>> > > very precise. The issue with starting the > > clock > > > > on > > > > > > > > closing > > > > > > > > > a > > > > > > > > > > > > batch > > > > > > > > > > > > > is > > > > > > > > > > > > > >>> > that > > > > > > > > > > > > > >>> > > currently if the leader is not available, > the > > > > > closing > > > > > > > of > > > > > > > > a > > > > > > > > > > > batch > > > > > > > > > > > > > can > > > > > > > > > > > > > >>> be > > > > > > > > > > > > > >>> > > delayed longer than linger.ms. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > 4. As you said, future.get(timeout) itself > > > > doesn't > > > > > > > solve > > > > > > > > > the > > > > > > > > > > > > > problem > > > > > > > > > > > > > >>> > since > > > > > > > > > > > > > >>> > > you still need a way to expire the record > in > > > the > > > > > > > sender. > > > > > > > > > The > > > > > > > > > > > > amount > > > > > > > > > > > > > >>> of > > > > > > > > > > > > > >>> > work > > > > > > > > > > > > > >>> > > to implement a cancellable future is > probably > > > the > > > > > > same? > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > Overall, my concern with patch work is that > > we > > > > have > > > > > > > > > iterated > > > > > > > > > > on > > > > > > > > > > > > the > > > > > > > > > > > > > >>> > produce > > > > > > > > > > > > > >>> > > request timeout multiple times and new > issues > > > > keep > > > > > > > coming > > > > > > > > > > back. > > > > > > > > > > > > > >>> Ideally, > > > > > > > > > > > > > >>> > > this time, we want to have a solution that > > > covers > > > > > all > > > > > > > > > cases, > > > > > > > > > > > even > > > > > > > > > > > > > >>> though > > > > > > > > > > > > > >>> > > that requires a bit more work. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > Thanks, > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > Jun > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant > > Tambe > > > < > > > > > > > > > > > > suta...@gmail.com> > > > > > > > > > > > > > >>> > wrote: > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > Hi Jun, > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > Thanks for looking into it. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > Yes, we did consider this message-level > > > timeout > > > > > > > > approach > > > > > > > > > > and > > > > > > > > > > > > > >>> expiring > > > > > > > > > > > > > >>> > > > batches selectively in a request but > > rejected > > > > it > > > > > > due > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > >>> reasons of > > > > > > > > > > > > > >>> > > > added complexity without a strong benefit > > to > > > > > > > > > counter-weigh > > > > > > > > > > > > that. > > > > > > > > > > > > > >>> Your > > > > > > > > > > > > > >>> > > > proposal is a slight variation so I'll > > > mention > > > > > some > > > > > > > > > issues > > > > > > > > > > > > here. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > 1. It sounds like > > > max.message.delivery.wait.ms > > > > > > will > > > > > > > > > > overlap > > > > > > > > > > > > with > > > > > > > > > > > > > >>> "time > > > > > > > > > > > > > >>> > > > segments" of both linger.ms and retries > * > > ( > > > > > > > > > > > request.timeout.ms > > > > > > > > > > > > + > > > > > > > > > > > > > >>> > > > retry.backoff.ms). In that case, which > > > config > > > > > set > > > > > > > > takes > > > > > > > > > > > > > >>> precedence? It > > > > > > > > > > > > > >>> > > > would not make sense to configure configs > > > from > > > > > both > > > > > > > > sets. > > > > > > > > > > > > > >>> Especially, > > > > > > > > > > > > > >>> > we > > > > > > > > > > > > > >>> > > > discussed exhaustively internally that > > > retries > > > > > and > > > > > > > > > > > > > >>> > > > max.message.delivery.wait.ms can't / > > > shouldn't > > > > > be > > > > > > > > > > configured > > > > > > > > > > > > > >>> together. > > > > > > > > > > > > > >>> > > > Retires become moot as you already > > mention. I > > > > > think > > > > > > > > > that's > > > > > > > > > > > > going > > > > > > > > > > > > > >>> to be > > > > > > > > > > > > > >>> > > > surprising to anyone wanting to use > > > > > > > > > > > > max.message.delivery.wait.ms > > > > > > > > > > > > > . > > > > > > > > > > > > > >>> We > > > > > > > > > > > > > >>> > > > probably need > max.message.delivery.wait.ms > > > > > > > > > > linger.ms > > > > > > > > > or > > > > > > > > > > > > > >>> something > > > > > > > > > > > > > >>> > like > > > > > > > > > > > > > >>> > > > that. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > 2. If clock starts when a batch is > created > > > and > > > > > > expire > > > > > > > > > when > > > > > > > > > > > > > >>> > > > max.message.delivery.wait.ms is over in > > the > > > > > > > > accumulator, > > > > > > > > > > the > > > > > > > > > > > > > last > > > > > > > > > > > > > >>> few > > > > > > > > > > > > > >>> > > > messages in the expiring batch may not > have > > > > lived > > > > > > > long > > > > > > > > > > > enough. > > > > > > > > > > > > As > > > > > > > > > > > > > >>> the > > > > > > > > > > > > > >>> > > > config seems to suggests per-message > > timeout, > > > > > it's > > > > > > > > > > incorrect > > > > > > > > > > > to > > > > > > > > > > > > > >>> expire > > > > > > > > > > > > > >>> > > > messages prematurely. On the other hand > if > > > > clock > > > > > > > starts > > > > > > > > > > after > > > > > > > > > > > > > >>> batch is > > > > > > > > > > > > > >>> > > > closed (which also implies that > linger.ms > > is > > > > not > > > > > > > > covered > > > > > > > > > > by > > > > > > > > > > > > the > > > > > > > > > > > > > >>> > > > max.message.delivery.wait.ms config), no > > > > message > > > > > > > would > > > > > > > > > be > > > > > > > > > > be > > > > > > > > > > > > > >>> expired > > > > > > > > > > > > > >>> > too > > > > > > > > > > > > > >>> > > > soon. Yeah, expiration may be little bit > > too > > > > late > > > > > > but > > > > > > > > > hey, > > > > > > > > > > > this > > > > > > > > > > > > > >>> ain't > > > > > > > > > > > > > >>> > > > real-time service. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > 3. I agree that steps #3, #4, (and #5) > are > > > > > complex > > > > > > to > > > > > > > > > > > > implement. > > > > > > > > > > > > > >>> On the > > > > > > > > > > > > > >>> > > > other hand, batch.expiry.ms is next to > > > trivial > > > > > to > > > > > > > > > > implement. > > > > > > > > > > > > We > > > > > > > > > > > > > >>> just > > > > > > > > > > > > > >>> > > pass > > > > > > > > > > > > > >>> > > > the config all the way down to > > > > > > > > ProducerBatch.maybeExpire > > > > > > > > > > and > > > > > > > > > > > be > > > > > > > > > > > > > >>> done > > > > > > > > > > > > > >>> > with > > > > > > > > > > > > > >>> > > > it. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > 4. Do you think the effect of > > > > > > > > > max.message.delivery.wait.ms > > > > > > > > > > > can > > > > > > > > > > > > > be > > > > > > > > > > > > > >>> > > > simulated > > > > > > > > > > > > > >>> > > > with future.get(timeout) method? Copying > > > > excerpt > > > > > > from > > > > > > > > the > > > > > > > > > > > > kip-91: > > > > > > > > > > > > > >>> An > > > > > > > > > > > > > >>> > > > end-to-end timeout may be partially > > emulated > > > > > using > > > > > > > the > > > > > > > > > > > > > >>> > > future.get(timeout). > > > > > > > > > > > > > >>> > > > The timeout must be greater than ( > > > > > batch.expiry.ms > > > > > > + > > > > > > > > > > nRetries > > > > > > > > > > > > * ( > > > > > > > > > > > > > >>> > > > request.timeout.ms + retry.backoff.ms)). > > > Note > > > > > that > > > > > > > > when > > > > > > > > > > > future > > > > > > > > > > > > > >>> times > > > > > > > > > > > > > >>> > > out, > > > > > > > > > > > > > >>> > > > Sender may continue to send the records > in > > > the > > > > > > > > > background. > > > > > > > > > > To > > > > > > > > > > > > > avoid > > > > > > > > > > > > > >>> > that, > > > > > > > > > > > > > >>> > > > implementing a cancellable future is a > > > > > possibility. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > For simplicity, we could just implement a > > > > trivial > > > > > > > > method > > > > > > > > > in > > > > > > > > > > > > > >>> producer > > > > > > > > > > > > > >>> > > > ProducerConfigs. > maxMessageDeliveryWaitMs() > > > and > > > > > > > return > > > > > > > > a > > > > > > > > > > > number > > > > > > > > > > > > > >>> based > > > > > > > > > > > > > >>> > on > > > > > > > > > > > > > >>> > > > this formula? Users of future.get can use > > > this > > > > > > > timeout > > > > > > > > > > value. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > Thoughts? > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > Regards, > > > > > > > > > > > > > >>> > > > Sumant > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > On 11 August 2017 at 07:50, Sumant Tambe > < > > > > > > > > > > suta...@gmail.com> > > > > > > > > > > > > > >>> wrote: > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > Thanks for the KIP. Nice documentation > on > > > all > > > > > > > current > > > > > > > > > > > issues > > > > > > > > > > > > > >>> with the > > > > > > > > > > > > > >>> > > > >> timeout. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > For the KIP writeup, all credit goes to > > > Joel > > > > > > Koshy. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > I'll follow up on your comments a > little > > > > later. > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> You also brought up a good use case > for > > > > timing > > > > > > > out a > > > > > > > > > > > > message. > > > > > > > > > > > > > >>> For > > > > > > > > > > > > > >>> > > > >> applications that collect and send > > sensor > > > > data > > > > > > to > > > > > > > > > Kafka, > > > > > > > > > > > if > > > > > > > > > > > > > the > > > > > > > > > > > > > >>> data > > > > > > > > > > > > > >>> > > > can't > > > > > > > > > > > > > >>> > > > >> be sent to Kafka for some reason, the > > > > > > application > > > > > > > > may > > > > > > > > > > > prefer > > > > > > > > > > > > > to > > > > > > > > > > > > > >>> > buffer > > > > > > > > > > > > > >>> > > > the > > > > > > > > > > > > > >>> > > > >> more recent data in the accumulator. > > > > Without a > > > > > > > > > timeout, > > > > > > > > > > > the > > > > > > > > > > > > > >>> > > accumulator > > > > > > > > > > > > > >>> > > > >> will be filled with old records and > new > > > > > records > > > > > > > > can't > > > > > > > > > be > > > > > > > > > > > > > added. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> Your proposal makes sense for a > > developer > > > > who > > > > > is > > > > > > > > > > familiar > > > > > > > > > > > > with > > > > > > > > > > > > > >>> how > > > > > > > > > > > > > >>> > the > > > > > > > > > > > > > >>> > > > >> producer works. I am not sure if this > is > > > > very > > > > > > > > > intuitive > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > > >>> users > > > > > > > > > > > > > >>> > > > since > > > > > > > > > > > > > >>> > > > >> it may not be very easy for them to > > figure > > > > out > > > > > > how > > > > > > > > to > > > > > > > > > > > > > configure > > > > > > > > > > > > > >>> the > > > > > > > > > > > > > >>> > > new > > > > > > > > > > > > > >>> > > > >> knob to bound the amount of the time > > when > > > a > > > > > > > message > > > > > > > > is > > > > > > > > > > > > > >>> completed. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> From users' perspective, Apurva's > > > suggestion > > > > > of > > > > > > > > > > > > > >>> > > > >> max.message.delivery.wait.ms (which > > > > > > > > > > > > > >>> > > > >> bounds the time when a message is in > the > > > > > > > accumulator > > > > > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > > time > > > > > > > > > > > > > >>> > when > > > > > > > > > > > > > >>> > > > the > > > > > > > > > > > > > >>> > > > >> callback is called) seems more > > intuition. > > > > You > > > > > > > listed > > > > > > > > > > this > > > > > > > > > > > in > > > > > > > > > > > > > the > > > > > > > > > > > > > >>> > > > rejected > > > > > > > > > > > > > >>> > > > >> section since it requires additional > > logic > > > > to > > > > > > > > rebatch > > > > > > > > > > > when a > > > > > > > > > > > > > >>> produce > > > > > > > > > > > > > >>> > > > >> request expires. However, this may not > > be > > > > too > > > > > > bad. > > > > > > > > The > > > > > > > > > > > > > >>> following are > > > > > > > > > > > > > >>> > > the > > > > > > > > > > > > > >>> > > > >> things that we have to do. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> 1. The clock starts when a batch is > > > created. > > > > > > > > > > > > > >>> > > > >> 2. If the batch can't be drained > within > > > > > > > > > > > > > >>> > max.message.delivery.wait.ms, > > > > > > > > > > > > > >>> > > > all > > > > > > > > > > > > > >>> > > > >> messages in the batch will fail and > the > > > > > callback > > > > > > > > will > > > > > > > > > be > > > > > > > > > > > > > called. > > > > > > > > > > > > > >>> > > > >> 3. When sending a produce request, we > > > > > calculate > > > > > > an > > > > > > > > > > > > expireTime > > > > > > > > > > > > > >>> for > > > > > > > > > > > > > >>> > the > > > > > > > > > > > > > >>> > > > >> request that equals to the remaining > > > > > expiration > > > > > > > time > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > > >>> oldest > > > > > > > > > > > > > >>> > > > batch > > > > > > > > > > > > > >>> > > > >> in the request. > > > > > > > > > > > > > >>> > > > >> 4. We set the minimum of the > expireTime > > of > > > > all > > > > > > > > > inflight > > > > > > > > > > > > > >>> requests as > > > > > > > > > > > > > >>> > > the > > > > > > > > > > > > > >>> > > > >> timeout in the selector poll call (so > > that > > > > the > > > > > > > > > selector > > > > > > > > > > > can > > > > > > > > > > > > > >>> wake up > > > > > > > > > > > > > >>> > > > before > > > > > > > > > > > > > >>> > > > >> the expiration time). > > > > > > > > > > > > > >>> > > > >> 5. If the produce response can't be > > > received > > > > > > > within > > > > > > > > > > > > > expireTime, > > > > > > > > > > > > > >>> we > > > > > > > > > > > > > >>> > > > expire > > > > > > > > > > > > > >>> > > > >> all batches in the produce request > whose > > > > > > > expiration > > > > > > > > > time > > > > > > > > > > > has > > > > > > > > > > > > > >>> been > > > > > > > > > > > > > >>> > > > reached. > > > > > > > > > > > > > >>> > > > >> For the rest of the batches, we resend > > > them > > > > > in a > > > > > > > new > > > > > > > > > > > produce > > > > > > > > > > > > > >>> > request. > > > > > > > > > > > > > >>> > > > >> 6. If the producer response has a > > > retriable > > > > > > error, > > > > > > > > we > > > > > > > > > > just > > > > > > > > > > > > > >>> backoff a > > > > > > > > > > > > > >>> > > bit > > > > > > > > > > > > > >>> > > > >> and then retry the produce request as > > > today. > > > > > The > > > > > > > > > number > > > > > > > > > > of > > > > > > > > > > > > > >>> retries > > > > > > > > > > > > > >>> > > > doesn't > > > > > > > > > > > > > >>> > > > >> really matter now. We just keep > retrying > > > > until > > > > > > the > > > > > > > > > > > > expiration > > > > > > > > > > > > > >>> time > > > > > > > > > > > > > >>> > is > > > > > > > > > > > > > >>> > > > >> reached. It's possible that a produce > > > > request > > > > > is > > > > > > > > never > > > > > > > > > > > > retried > > > > > > > > > > > > > >>> due > > > > > > > > > > > > > >>> > to > > > > > > > > > > > > > >>> > > > >> expiration. However, this seems the > > right > > > > > thing > > > > > > to > > > > > > > > do > > > > > > > > > > > since > > > > > > > > > > > > > the > > > > > > > > > > > > > >>> > users > > > > > > > > > > > > > >>> > > > want > > > > > > > > > > > > > >>> > > > >> to timeout the message at this time. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> Implementation wise, there will be a > bit > > > > more > > > > > > > > > complexity > > > > > > > > > > > in > > > > > > > > > > > > > >>> step 3 > > > > > > > > > > > > > >>> > and > > > > > > > > > > > > > >>> > > > 4, > > > > > > > > > > > > > >>> > > > >> but probably not too bad. The benefit > is > > > > that > > > > > > this > > > > > > > > is > > > > > > > > > > more > > > > > > > > > > > > > >>> intuitive > > > > > > > > > > > > > >>> > > to > > > > > > > > > > > > > >>> > > > >> the > > > > > > > > > > > > > >>> > > > >> end user. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> Does that sound reasonable to you? > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> Thanks, > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> Jun > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, > Sumant > > > > Tambe > > > > > < > > > > > > > > > > > > > >>> suta...@gmail.com> > > > > > > > > > > > > > >>> > > > wrote: > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM > Apurva > > > > Mehta > > > > > < > > > > > > > > > > > > > >>> apu...@confluent.io> > > > > > > > > > > > > > >>> > > > >> wrote: > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > > > > There seems to be no > > relationship > > > > with > > > > > > > > cluster > > > > > > > > > > > > > metadata > > > > > > > > > > > > > >>> > > > >> availability > > > > > > > > > > > > > >>> > > > >> > or > > > > > > > > > > > > > >>> > > > >> > > > > staleness. Expiry is just > based > > on > > > > the > > > > > > > time > > > > > > > > > > since > > > > > > > > > > > > the > > > > > > > > > > > > > >>> batch > > > > > > > > > > > > > >>> > > has > > > > > > > > > > > > > >>> > > > >> been > > > > > > > > > > > > > >>> > > > >> > > > ready. > > > > > > > > > > > > > >>> > > > >> > > > > Please correct me if I am > wrong. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > > >>> > > > >> > > > I was not very specific about > > where > > > we > > > > > do > > > > > > > > > > > expiration. > > > > > > > > > > > > I > > > > > > > > > > > > > >>> > glossed > > > > > > > > > > > > > >>> > > > over > > > > > > > > > > > > > >>> > > > >> > some > > > > > > > > > > > > > >>> > > > >> > > > details because (again) we've > > other > > > > > > > mechanisms > > > > > > > > > to > > > > > > > > > > > > detect > > > > > > > > > > > > > >>> non > > > > > > > > > > > > > >>> > > > >> progress. > > > > > > > > > > > > > >>> > > > >> > > The > > > > > > > > > > > > > >>> > > > >> > > > condition (!muted.contains(tp) > && > > > > > > > > > (isMetadataStale > > > > > > > > > > > || > > > > > > > > > > > > > >>> > > > >> > > > > cluster.leaderFor(tp) == > null)) > > is > > > > > used > > > > > > in > > > > > > > > > > > > > >>> > > > >> > > > RecordAccumualtor. > expiredBatches: > > > > > > > > > > > > > >>> > > > >> > > > https://github.com/apache/ > > > > > > > > > > > > kafka/blob/trunk/clients/src/ > > > > > > > > > > > > > >>> > > > >> > > > main/java/org/apache/kafka/ > > > > > > > > > > > > clients/producer/internals/ > > > > > > > > > > > > > >>> > > > >> > > > RecordAccumulator.java#L443 > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > > >>> > > > >> > > > Effectively, we expire in all > the > > > > > > following > > > > > > > > > cases > > > > > > > > > > > > > >>> > > > >> > > > 1) producer is partitioned from > > the > > > > > > brokers. > > > > > > > > > When > > > > > > > > > > > > > >>> metadata age > > > > > > > > > > > > > >>> > > > grows > > > > > > > > > > > > > >>> > > > >> > > beyond > > > > > > > > > > > > > >>> > > > >> > > > 3x it's max value. It's safe to > > say > > > > that > > > > > > > we're > > > > > > > > > not > > > > > > > > > > > > > >>> talking to > > > > > > > > > > > > > >>> > > the > > > > > > > > > > > > > >>> > > > >> > brokers > > > > > > > > > > > > > >>> > > > >> > > > at all. Report. > > > > > > > > > > > > > >>> > > > >> > > > 2) fresh metadata && leader for > a > > > > > > partition > > > > > > > is > > > > > > > > > not > > > > > > > > > > > > known > > > > > > > > > > > > > >>> && a > > > > > > > > > > > > > >>> > > > batch > > > > > > > > > > > > > >>> > > > >> is > > > > > > > > > > > > > >>> > > > >> > > > sitting there for longer than > > > > > > > > > request.timeout.ms. > > > > > > > > > > > > This > > > > > > > > > > > > > >>> is one > > > > > > > > > > > > > >>> > > > case > > > > > > > > > > > > > >>> > > > >> we > > > > > > > > > > > > > >>> > > > >> > > > would > > > > > > > > > > > > > >>> > > > >> > > > like to improve and use > > > > batch.expiry.ms > > > > > > > > because > > > > > > > > > > > > > >>> > > > request.timeout.ms > > > > > > > > > > > > > >>> > > > >> is > > > > > > > > > > > > > >>> > > > >> > > too > > > > > > > > > > > > > >>> > > > >> > > > small. > > > > > > > > > > > > > >>> > > > >> > > > 3) fresh metadata && leader for > a > > > > > > partition > > > > > > > is > > > > > > > > > > known > > > > > > > > > > > > && > > > > > > > > > > > > > >>> batch > > > > > > > > > > > > > >>> > is > > > > > > > > > > > > > >>> > > > >> > sitting > > > > > > > > > > > > > >>> > > > >> > > > there for longer than > > > batch.expiry.ms > > > > . > > > > > > This > > > > > > > > is > > > > > > > > > a > > > > > > > > > > > new > > > > > > > > > > > > > case > > > > > > > > > > > > > >>> > that > > > > > > > > > > > > > >>> > > is > > > > > > > > > > > > > >>> > > > >> > > > different > > > > > > > > > > > > > >>> > > > >> > > > from #2. This is the catch-up > mode > > > > case. > > > > > > > > Things > > > > > > > > > > are > > > > > > > > > > > > > >>> moving too > > > > > > > > > > > > > >>> > > > >> slowly. > > > > > > > > > > > > > >>> > > > >> > > > Pipeline SLAs are broken. Report > > and > > > > > > > shutdown > > > > > > > > > kmm. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > > >>> > > > >> > > > The second and the third cases > are > > > > > useful > > > > > > > to a > > > > > > > > > > > > real-time > > > > > > > > > > > > > >>> app > > > > > > > > > > > > > >>> > > for a > > > > > > > > > > > > > >>> > > > >> > > > completely different reason. > > Report, > > > > > > forget > > > > > > > > > about > > > > > > > > > > > the > > > > > > > > > > > > > >>> batch, > > > > > > > > > > > > > >>> > and > > > > > > > > > > > > > >>> > > > >> just > > > > > > > > > > > > > >>> > > > >> > > move > > > > > > > > > > > > > >>> > > > >> > > > on (without shutting down). > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > > >>> > > > >> > > If I understand correctly, you are > > > > talking > > > > > > > > about a > > > > > > > > > > > fork > > > > > > > > > > > > of > > > > > > > > > > > > > >>> > apache > > > > > > > > > > > > > >>> > > > >> kafka > > > > > > > > > > > > > >>> > > > >> > > which has these additional > > conditions? > > > > > > Because > > > > > > > > > that > > > > > > > > > > > > check > > > > > > > > > > > > > >>> > doesn't > > > > > > > > > > > > > >>> > > > >> exist > > > > > > > > > > > > > >>> > > > >> > on > > > > > > > > > > > > > >>> > > > >> > > trunk today. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > Right. It is our internal release in > > > > > LinkedIn. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > Or are you proposing to change the > > > > behavior > > > > > of > > > > > > > > > expiry > > > > > > > > > > to > > > > > > > > > > > > > >>> > > > >> > > account for stale metadata and > > > > partitioned > > > > > > > > > producers > > > > > > > > > > > as > > > > > > > > > > > > > >>> part of > > > > > > > > > > > > > >>> > > this > > > > > > > > > > > > > >>> > > > >> KIP? > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > No. It's our temporary solution in > the > > > > > absence > > > > > > > of > > > > > > > > > > > kip-91. > > > > > > > > > > > > > Note > > > > > > > > > > > > > >>> > that > > > > > > > > > > > > > >>> > > we > > > > > > > > > > > > > >>> > > > >> dont > > > > > > > > > > > > > >>> > > > >> > like increasing request.timeout.ms. > > > > Without > > > > > > our > > > > > > > > > extra > > > > > > > > > > > > > >>> conditions > > > > > > > > > > > > > >>> > > our > > > > > > > > > > > > > >>> > > > >> > batches expire too soon--a problem > in > > > kmm > > > > > > > catchup > > > > > > > > > > mode. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > If we get batch.expiry.ms, we will > > > > > configure > > > > > > it > > > > > > > > to > > > > > > > > > 20 > > > > > > > > > > > > mins. > > > > > > > > > > > > > >>> > > > maybeExpire > > > > > > > > > > > > > >>> > > > >> > will use the config instead of > r.t.ms > > . > > > > The > > > > > > > extra > > > > > > > > > > > > conditions > > > > > > > > > > > > > >>> will > > > > > > > > > > > > > >>> > be > > > > > > > > > > > > > >>> > > > >> > unnecessary. All three cases shall > be > > > > > covered > > > > > > > via > > > > > > > > > the > > > > > > > > > > > > > >>> batch.expiry > > > > > > > > > > > > > >>> > > > >> timeout. > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > > >>> > > > >> > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > > >>> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >