Hi, Becket, Good point on expiring inflight requests. Perhaps we can expire an inflight request after min(remaining delivery.timeout.ms, request.timeout.ms). This way, if a user sets a high delivery.timeout.ms, we can still recover from broker power outage sooner.
Thanks, Jun On Thu, Aug 24, 2017 at 12:52 PM, Becket Qin <becket....@gmail.com> wrote: > Hi Jason, > > delivery.timeout.ms sounds good to me. > > I was referring to the case that we are resetting the PID/sequence after > expire a batch. This is more about the sending the batches after the > expired batch. > > The scenario being discussed is expiring one of the batches in a in-flight > request and retry the other batches in the that in-flight request. So > consider the following case: > 1. Producer sends request_0 with two batches (batch_0_tp0 and batch_0_tp1). > 2. Broker receives the request enqueued the request to the log. > 3. Before the producer receives the response from the broker, batch_0_tp0 > expires. The producer will expire batch_0_tp0 immediately, resets PID, and > then resend batch_0_tp1, and maybe send batch_1_tp0 (i.e. the next batch to > the expired batch) as well. > > For batch_0_tp1, it is OK to reuse PID and and sequence number. The problem > is for batch_1_tp0, If we reuse the same PID and the broker has already > appended batch_0_tp0, the broker will think batch_1_tp0 is a duplicate with > the same sequence number. As a result broker will drop batch_0_tp1. That is > why we have to either bump up sequence number or reset PID. To avoid this > complexity, I was suggesting not expire the in-flight batch immediately, > but wait for the produce response. If the batch has been successfully > appended, we do not expire it. Otherwise, we expire it. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Thu, Aug 24, 2017 at 11:26 AM, Jason Gustafson <ja...@confluent.io> > wrote: > > > @Becket > > > > Good point about unnecessarily resetting the PID in cases where we know > the > > request has failed. Might be worth opening a JIRA to try and improve > this. > > > > So if we expire the batch prematurely and resend all > > > the other batches in the same request, chances are there will be > > > duplicates. If we wait for the response instead, it is less likely to > > > introduce duplicates, and we may not need to reset the PID. > > > > > > Not sure I follow this. Are you assuming that we change the batch > > PID/sequence of the retried batches after resetting the PID? I think we > > probably need to ensure that when we retry a batch, we always use the > same > > PID/sequence. > > > > By the way, as far as naming, `max.message.delivery.wait.ms` is quite a > > mouthful. Could we shorten it? Perhaps `delivery.timeout.ms`? > > > > -Jason > > > > On Wed, Aug 23, 2017 at 8:51 PM, Becket Qin <becket....@gmail.com> > wrote: > > > > > Hi Jun, > > > > > > If TCP timeout is longer than request.timeout.ms, the producer will > > always > > > hit request.timeout.ms before hitting TCP timeout, right? That is why > we > > > added request.timeout.ms in the first place. > > > > > > You are right. Currently we are reset the PID and resend the batches to > > > avoid OutOfOrderSequenceException when the expired batches are in > retry. > > > > > > This does not distinguish the reasons that caused the retry. There are > > two > > > cases: > > > 1. If the batch was in retry because it received an error response > (e.g. > > > NotLeaderForPartition), we actually don't need to reset PID in this > case > > > because we know that broker did not accept it. > > > 2. If the batch was in retry because it hit a timeout earlier, then we > > > should reset the PID (or optimistically send and only reset PID when > > > receive OutOfOrderSequenceException?) > > > Case 1 is probably the most common case, so it looks that we are > > resetting > > > the PID more often than necessary. But because in case 1 the broker > does > > > not have the batch, there isn't much impact on resting PID and resend > > other > > > than the additional round trip. > > > > > > Now we are introducing another case: > > > 3. A batch is in retry because we expired an in-flight request before > it > > > hits request.timeout.ms. > > > > > > The difference between 2 and 3 is that in case 3 likely the broker has > > > appended the messages. So if we expire the batch prematurely and resend > > all > > > the other batches in the same request, chances are there will be > > > duplicates. If we wait for the response instead, it is less likely to > > > introduce duplicates, and we may not need to reset the PID. > > > > > > That said, given that batch expiration is probably already rare enough, > > so > > > it may not be necessary to optimize for that. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Wed, Aug 23, 2017 at 5:01 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Becket, > > > > > > > > If a message expires while it's in an inflight produce request, the > > > > producer will get a new PID if idempotent is enabled. This will > prevent > > > > subsequent messages from hitting OutOfOrderSequenceException. The > issue > > > of > > > > not expiring an inflight request is that if a broker server goes down > > > hard > > > > (e.g. power outage), the time that it takes for the client to detect > > the > > > > socket level error (this will be sth like 8+ minutes with the default > > TCP > > > > setting) is much longer than the default request.timeout.ms. > > > > > > > > Hi, Sumant, > > > > > > > > We can probably just default max.message.delivery.wait.ms to 30 > secs, > > > the > > > > current default for request.timeout.ms. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Wed, Aug 23, 2017 at 3:38 PM, Sumant Tambe <suta...@gmail.com> > > wrote: > > > > > > > > > OK. Looks like starting the clock after closing the batch has > quite a > > > few > > > > > pitfalls. I can't think of a way of to work around it without > adding > > > yet > > > > > another config. So I won't discuss that here. Anyone? As I said > > > earlier, > > > > > I'm not hung up on super-accurate notification times. > > > > > > > > > > If we are going down the max.message.delievery.wait.ms route, what > > > would > > > > > be > > > > > the default? There seem to be a few options. > > > > > > > > > > 1. max.message.delievery.wait.ms=null. Nothing changes for those > who > > > > don't > > > > > set it. I.e., batches expire after request.timeout.ms in > > accumulator. > > > If > > > > > they are past the accumulator stage, timeout after retries*( > > > > > request.timeout.ms+backoff). > > > > > > > > > > 2. max.message.delivery.wait.ms=request.timeout.ms. No obervable > > > > > behavioral > > > > > change at the accumulator level as timeout value is same as before. > > > > Retries > > > > > will be done if as long as batch is under > > max.message.delivery.wait.ms > > > . > > > > > However, a batch can expire just after one try. That's ok IMO > because > > > > > request.timeout.ms tend to be large (Default 30000). > > > > > > > > > > 3. max.message.delivery.wait.ms=2*request.timeout.ms. Give > > opportunity > > > > for > > > > > two retries but warn that retries may not happen at all in some > rare > > > > > cases and a batch could expire before any attempt. > > > > > > > > > > 4. max.message.delivery.wait.ms=something else (a constant?) > > > > > > > > > > Thoughts? > > > > > > > > > > On 23 August 2017 at 09:01, Ismael Juma <ism...@juma.me.uk> wrote: > > > > > > > > > > > Thanks Becket, that seems reasonable. Sumant, would you be > willing > > to > > > > > > update the KIP based on the discussion or are you still not > > > convinced? > > > > > > > > > > > > Ismael > > > > > > > > > > > > On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin < > becket....@gmail.com> > > > > > wrote: > > > > > > > > > > > > > In general max.message.delivery.wait.ms is a cleaner approach. > > > That > > > > > > would > > > > > > > make the guarantee clearer. That said, there seem subtleties in > > > some > > > > > > > scenarios: > > > > > > > > > > > > > > 1. I agree with Sumante that it is a little weird that a > message > > > > could > > > > > be > > > > > > > expired immediately if it happens to enter a batch that is > about > > to > > > > be > > > > > > > expired. But as Jun said, as long as we have multiple messages > > in a > > > > > > batch, > > > > > > > there isn't a cheap way to achieve a precise timeout. So the > > > question > > > > > > > actually becomes whether it is more user-friendly to expire > early > > > > > (based > > > > > > on > > > > > > > the batch creation time) or expire late (based on the batch > close > > > > > time). > > > > > > I > > > > > > > think both are acceptable. Personally I think most users do not > > > > really > > > > > > care > > > > > > > about expire a little late as long as it eventually expires. > So I > > > > would > > > > > > use > > > > > > > batch close time as long as there is a bound on that. But it > > looks > > > > that > > > > > > we > > > > > > > do not really have a bound on when we will close a batch. So > > > > expiration > > > > > > > based on batch create time may be the only option if we don't > > want > > > to > > > > > > > introduce complexity. > > > > > > > > > > > > > > 2. If we timeout a batch in a request when it is still in > flight, > > > the > > > > > end > > > > > > > result of that batch is unclear to the users. It would be weird > > > that > > > > > user > > > > > > > receive exception saying those messages are expired while they > > > > actually > > > > > > > have been sent successfully. Also if idempotence is set to > true, > > > what > > > > > > would > > > > > > > the next sequence ID be after the expired batch? Reusing the > same > > > > > > sequence > > > > > > > Id may result in data loss, and increment the sequence ID may > > cause > > > > > > > OutOfOrderSequenceException. Besides, extracting an expired > batch > > > > from > > > > > a > > > > > > > request also introduces some complexity. Again, personally I > > think > > > it > > > > > is > > > > > > > fine to expire a little bit late. So maybe we don't need to > > expire > > > a > > > > > > batch > > > > > > > that is already in flight. In the worst case we will expire it > > with > > > > > delay > > > > > > > of request.timeout.ms. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma < > ism...@juma.me.uk> > > > > > wrote: > > > > > > > > > > > > > >> Hi all, > > > > > > >> > > > > > > >> The discussion has been going on for a while, would it help to > > > have > > > > a > > > > > > >> call to discuss this? I'd like to start a vote soonish so that > > we > > > > can > > > > > > >> include this in 1.0.0. I personally prefer > > > > > max.message.delivery.wait.ms > > > > > > . > > > > > > >> It seems like Jun, Apurva and Jason also prefer that. Sumant, > it > > > > seems > > > > > > like > > > > > > >> you still prefer a batch.expiry.ms, is that right? What are > > your > > > > > > >> thoughts Joel and Becket? > > > > > > >> > > > > > > >> Ismael > > > > > > >> > > > > > > >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> > > > wrote: > > > > > > >> > > > > > > >>> Hi, Sumant, > > > > > > >>> > > > > > > >>> The semantics of linger.ms is a bit subtle. The reasoning > for > > > the > > > > > > >>> current > > > > > > >>> implementation is the following. Let's say one sets > linger.ms > > > to 0 > > > > > > (our > > > > > > >>> current default value). Creating a batch for every message > will > > > be > > > > > bad > > > > > > >>> for > > > > > > >>> throughput. Instead, the current implementation only forms a > > > batch > > > > > when > > > > > > >>> the > > > > > > >>> batch is sendable (i.e., broker is available, inflight > request > > > > limit > > > > > is > > > > > > >>> not > > > > > > >>> exceeded, etc). That way, the producer has more chance for > > > > batching. > > > > > > The > > > > > > >>> implication is that a batch could be closed longer than > > > linger.ms. > > > > > > >>> > > > > > > >>> Now, on your concern about not having a precise way to > control > > > > delay > > > > > in > > > > > > >>> the > > > > > > >>> accumulator. It seems the batch.expiry.ms approach will have > > the > > > > > same > > > > > > >>> issue. If you start the clock when a batch is initialized, > you > > > may > > > > > > expire > > > > > > >>> some messages in the same batch early than batch.expiry.ms. > If > > > you > > > > > > start > > > > > > >>> the clock when the batch is closed, the expiration time could > > be > > > > > > >>> unbounded > > > > > > >>> because of the linger.ms implementation described above. > > > Starting > > > > > the > > > > > > >>> expiration clock on batch initialization will at least > > guarantee > > > > the > > > > > > time > > > > > > >>> to expire the first message is precise, which is probably > good > > > > > enough. > > > > > > >>> > > > > > > >>> Thanks, > > > > > > >>> > > > > > > >>> Jun > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe < > > suta...@gmail.com > > > > > > > > > > wrote: > > > > > > >>> > > > > > > >>> > Question about "the closing of a batch can be delayed > longer > > > than > > > > > > >>> > linger.ms": > > > > > > >>> > Is it possible to cause an indefinite delay? At some point > > > bytes > > > > > > limit > > > > > > >>> > might kick in. Also, why is closing of a batch coupled with > > > > > > >>> availability of > > > > > > >>> > its destination? In this approach a batch chosen for > eviction > > > due > > > > > to > > > > > > >>> delay > > > > > > >>> > needs to "close" anyway, right (without regards to > > destination > > > > > > >>> > availability)? > > > > > > >>> > > > > > > > >>> > I'm not too worried about notifying at super-exact time > > > specified > > > > > in > > > > > > >>> the > > > > > > >>> > configs. But expiring before the full wait-span has elapsed > > > > sounds > > > > > a > > > > > > >>> little > > > > > > >>> > weird. So expiration time has a +/- spread. It works more > > like > > > a > > > > > hint > > > > > > >>> than > > > > > > >>> > max. So why not message.delivery.wait.hint.ms? > > > > > > >>> > > > > > > > >>> > Yeah, cancellable future will be similar in complexity. > > > > > > >>> > > > > > > > >>> > I'm unsure if max.message.delivery.wait.ms will the final > > nail > > > > for > > > > > > >>> > producer > > > > > > >>> > timeouts. We still won't have a precise way to control > delay > > in > > > > > just > > > > > > >>> the > > > > > > >>> > accumulator segment. batch.expiry.ms does not try to > > abstract. > > > > > It's > > > > > > >>> very > > > > > > >>> > specific. > > > > > > >>> > > > > > > > >>> > My biggest concern at the moment is implementation > > complexity. > > > > > > >>> > > > > > > > >>> > At this state, I would like to encourage other independent > > > > > opinions. > > > > > > >>> > > > > > > > >>> > Regards, > > > > > > >>> > Sumant > > > > > > >>> > > > > > > > >>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> > > wrote: > > > > > > >>> > > > > > > > >>> > > Hi, Sumant, > > > > > > >>> > > > > > > > > >>> > > 1. Yes, it's probably reasonable to require > > > > > > >>> max.message.delivery.wait.ms > > > > > > >>> > > > > > > > > >>> > > linger.ms. As for retries, perhaps we can set the > default > > > > > retries > > > > > > to > > > > > > >>> > > infinite or just ignore it. Then the latency will be > > bounded > > > by > > > > > > >>> > > max.message.delivery.wait.ms. request.timeout.ms is the > > max > > > > time > > > > > > the > > > > > > >>> > > request will be spending on the server. The client can > > expire > > > > an > > > > > > >>> inflight > > > > > > >>> > > request early if needed. > > > > > > >>> > > > > > > > > >>> > > 2. Well, since max.message.delivery.wait.ms specifies > the > > > max, > > > > > > >>> calling > > > > > > >>> > the > > > > > > >>> > > callback a bit early may be ok? Note that > > > > > > >>> max.message.delivery.wait.ms > > > > > > >>> > > only > > > > > > >>> > > comes into play in the rare error case. So, I am not sure > > if > > > we > > > > > > need > > > > > > >>> to > > > > > > >>> > be > > > > > > >>> > > very precise. The issue with starting the clock on > closing > > a > > > > > batch > > > > > > is > > > > > > >>> > that > > > > > > >>> > > currently if the leader is not available, the closing of > a > > > > batch > > > > > > can > > > > > > >>> be > > > > > > >>> > > delayed longer than linger.ms. > > > > > > >>> > > > > > > > > >>> > > 4. As you said, future.get(timeout) itself doesn't solve > > the > > > > > > problem > > > > > > >>> > since > > > > > > >>> > > you still need a way to expire the record in the sender. > > The > > > > > amount > > > > > > >>> of > > > > > > >>> > work > > > > > > >>> > > to implement a cancellable future is probably the same? > > > > > > >>> > > > > > > > > >>> > > Overall, my concern with patch work is that we have > > iterated > > > on > > > > > the > > > > > > >>> > produce > > > > > > >>> > > request timeout multiple times and new issues keep coming > > > back. > > > > > > >>> Ideally, > > > > > > >>> > > this time, we want to have a solution that covers all > > cases, > > > > even > > > > > > >>> though > > > > > > >>> > > that requires a bit more work. > > > > > > >>> > > > > > > > > >>> > > Thanks, > > > > > > >>> > > > > > > > > >>> > > Jun > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe < > > > > > suta...@gmail.com> > > > > > > >>> > wrote: > > > > > > >>> > > > > > > > > >>> > > > Hi Jun, > > > > > > >>> > > > > > > > > > >>> > > > Thanks for looking into it. > > > > > > >>> > > > > > > > > > >>> > > > Yes, we did consider this message-level timeout > approach > > > and > > > > > > >>> expiring > > > > > > >>> > > > batches selectively in a request but rejected it due to > > the > > > > > > >>> reasons of > > > > > > >>> > > > added complexity without a strong benefit to > > counter-weigh > > > > > that. > > > > > > >>> Your > > > > > > >>> > > > proposal is a slight variation so I'll mention some > > issues > > > > > here. > > > > > > >>> > > > > > > > > > >>> > > > 1. It sounds like max.message.delivery.wait.ms will > > > overlap > > > > > with > > > > > > >>> "time > > > > > > >>> > > > segments" of both linger.ms and retries * ( > > > > request.timeout.ms > > > > > + > > > > > > >>> > > > retry.backoff.ms). In that case, which config set > takes > > > > > > >>> precedence? It > > > > > > >>> > > > would not make sense to configure configs from both > sets. > > > > > > >>> Especially, > > > > > > >>> > we > > > > > > >>> > > > discussed exhaustively internally that retries and > > > > > > >>> > > > max.message.delivery.wait.ms can't / shouldn't be > > > configured > > > > > > >>> together. > > > > > > >>> > > > Retires become moot as you already mention. I think > > that's > > > > > going > > > > > > >>> to be > > > > > > >>> > > > surprising to anyone wanting to use > > > > > max.message.delivery.wait.ms > > > > > > . > > > > > > >>> We > > > > > > >>> > > > probably need max.message.delivery.wait.ms > linger.ms > > or > > > > > > >>> something > > > > > > >>> > like > > > > > > >>> > > > that. > > > > > > >>> > > > > > > > > > >>> > > > 2. If clock starts when a batch is created and expire > > when > > > > > > >>> > > > max.message.delivery.wait.ms is over in the > accumulator, > > > the > > > > > > last > > > > > > >>> few > > > > > > >>> > > > messages in the expiring batch may not have lived long > > > > enough. > > > > > As > > > > > > >>> the > > > > > > >>> > > > config seems to suggests per-message timeout, it's > > > incorrect > > > > to > > > > > > >>> expire > > > > > > >>> > > > messages prematurely. On the other hand if clock starts > > > after > > > > > > >>> batch is > > > > > > >>> > > > closed (which also implies that linger.ms is not > covered > > > by > > > > > the > > > > > > >>> > > > max.message.delivery.wait.ms config), no message would > > be > > > be > > > > > > >>> expired > > > > > > >>> > too > > > > > > >>> > > > soon. Yeah, expiration may be little bit too late but > > hey, > > > > this > > > > > > >>> ain't > > > > > > >>> > > > real-time service. > > > > > > >>> > > > > > > > > > >>> > > > 3. I agree that steps #3, #4, (and #5) are complex to > > > > > implement. > > > > > > >>> On the > > > > > > >>> > > > other hand, batch.expiry.ms is next to trivial to > > > implement. > > > > > We > > > > > > >>> just > > > > > > >>> > > pass > > > > > > >>> > > > the config all the way down to > ProducerBatch.maybeExpire > > > and > > > > be > > > > > > >>> done > > > > > > >>> > with > > > > > > >>> > > > it. > > > > > > >>> > > > > > > > > > >>> > > > 4. Do you think the effect of > > max.message.delivery.wait.ms > > > > can > > > > > > be > > > > > > >>> > > > simulated > > > > > > >>> > > > with future.get(timeout) method? Copying excerpt from > the > > > > > kip-91: > > > > > > >>> An > > > > > > >>> > > > end-to-end timeout may be partially emulated using the > > > > > > >>> > > future.get(timeout). > > > > > > >>> > > > The timeout must be greater than (batch.expiry.ms + > > > nRetries > > > > > * ( > > > > > > >>> > > > request.timeout.ms + retry.backoff.ms)). Note that > when > > > > future > > > > > > >>> times > > > > > > >>> > > out, > > > > > > >>> > > > Sender may continue to send the records in the > > background. > > > To > > > > > > avoid > > > > > > >>> > that, > > > > > > >>> > > > implementing a cancellable future is a possibility. > > > > > > >>> > > > > > > > > > >>> > > > For simplicity, we could just implement a trivial > method > > in > > > > > > >>> producer > > > > > > >>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and return > a > > > > number > > > > > > >>> based > > > > > > >>> > on > > > > > > >>> > > > this formula? Users of future.get can use this timeout > > > value. > > > > > > >>> > > > > > > > > > >>> > > > Thoughts? > > > > > > >>> > > > > > > > > > >>> > > > Regards, > > > > > > >>> > > > Sumant > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > > On 11 August 2017 at 07:50, Sumant Tambe < > > > suta...@gmail.com> > > > > > > >>> wrote: > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > > >>> > > > > Thanks for the KIP. Nice documentation on all current > > > > issues > > > > > > >>> with the > > > > > > >>> > > > >> timeout. > > > > > > >>> > > > > > > > > > > >>> > > > > For the KIP writeup, all credit goes to Joel Koshy. > > > > > > >>> > > > > > > > > > > >>> > > > > I'll follow up on your comments a little later. > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > >> > > > > > > >>> > > > >> You also brought up a good use case for timing out a > > > > > message. > > > > > > >>> For > > > > > > >>> > > > >> applications that collect and send sensor data to > > Kafka, > > > > if > > > > > > the > > > > > > >>> data > > > > > > >>> > > > can't > > > > > > >>> > > > >> be sent to Kafka for some reason, the application > may > > > > prefer > > > > > > to > > > > > > >>> > buffer > > > > > > >>> > > > the > > > > > > >>> > > > >> more recent data in the accumulator. Without a > > timeout, > > > > the > > > > > > >>> > > accumulator > > > > > > >>> > > > >> will be filled with old records and new records > can't > > be > > > > > > added. > > > > > > >>> > > > >> > > > > > > >>> > > > >> Your proposal makes sense for a developer who is > > > familiar > > > > > with > > > > > > >>> how > > > > > > >>> > the > > > > > > >>> > > > >> producer works. I am not sure if this is very > > intuitive > > > to > > > > > the > > > > > > >>> users > > > > > > >>> > > > since > > > > > > >>> > > > >> it may not be very easy for them to figure out how > to > > > > > > configure > > > > > > >>> the > > > > > > >>> > > new > > > > > > >>> > > > >> knob to bound the amount of the time when a message > is > > > > > > >>> completed. > > > > > > >>> > > > >> > > > > > > >>> > > > >> From users' perspective, Apurva's suggestion of > > > > > > >>> > > > >> max.message.delivery.wait.ms (which > > > > > > >>> > > > >> bounds the time when a message is in the accumulator > > to > > > > the > > > > > > time > > > > > > >>> > when > > > > > > >>> > > > the > > > > > > >>> > > > >> callback is called) seems more intuition. You listed > > > this > > > > in > > > > > > the > > > > > > >>> > > > rejected > > > > > > >>> > > > >> section since it requires additional logic to > rebatch > > > > when a > > > > > > >>> produce > > > > > > >>> > > > >> request expires. However, this may not be too bad. > The > > > > > > >>> following are > > > > > > >>> > > the > > > > > > >>> > > > >> things that we have to do. > > > > > > >>> > > > >> > > > > > > >>> > > > >> 1. The clock starts when a batch is created. > > > > > > >>> > > > >> 2. If the batch can't be drained within > > > > > > >>> > max.message.delivery.wait.ms, > > > > > > >>> > > > all > > > > > > >>> > > > >> messages in the batch will fail and the callback > will > > be > > > > > > called. > > > > > > >>> > > > >> 3. When sending a produce request, we calculate an > > > > > expireTime > > > > > > >>> for > > > > > > >>> > the > > > > > > >>> > > > >> request that equals to the remaining expiration time > > for > > > > the > > > > > > >>> oldest > > > > > > >>> > > > batch > > > > > > >>> > > > >> in the request. > > > > > > >>> > > > >> 4. We set the minimum of the expireTime of all > > inflight > > > > > > >>> requests as > > > > > > >>> > > the > > > > > > >>> > > > >> timeout in the selector poll call (so that the > > selector > > > > can > > > > > > >>> wake up > > > > > > >>> > > > before > > > > > > >>> > > > >> the expiration time). > > > > > > >>> > > > >> 5. If the produce response can't be received within > > > > > > expireTime, > > > > > > >>> we > > > > > > >>> > > > expire > > > > > > >>> > > > >> all batches in the produce request whose expiration > > time > > > > has > > > > > > >>> been > > > > > > >>> > > > reached. > > > > > > >>> > > > >> For the rest of the batches, we resend them in a new > > > > produce > > > > > > >>> > request. > > > > > > >>> > > > >> 6. If the producer response has a retriable error, > we > > > just > > > > > > >>> backoff a > > > > > > >>> > > bit > > > > > > >>> > > > >> and then retry the produce request as today. The > > number > > > of > > > > > > >>> retries > > > > > > >>> > > > doesn't > > > > > > >>> > > > >> really matter now. We just keep retrying until the > > > > > expiration > > > > > > >>> time > > > > > > >>> > is > > > > > > >>> > > > >> reached. It's possible that a produce request is > never > > > > > retried > > > > > > >>> due > > > > > > >>> > to > > > > > > >>> > > > >> expiration. However, this seems the right thing to > do > > > > since > > > > > > the > > > > > > >>> > users > > > > > > >>> > > > want > > > > > > >>> > > > >> to timeout the message at this time. > > > > > > >>> > > > >> > > > > > > >>> > > > >> Implementation wise, there will be a bit more > > complexity > > > > in > > > > > > >>> step 3 > > > > > > >>> > and > > > > > > >>> > > > 4, > > > > > > >>> > > > >> but probably not too bad. The benefit is that this > is > > > more > > > > > > >>> intuitive > > > > > > >>> > > to > > > > > > >>> > > > >> the > > > > > > >>> > > > >> end user. > > > > > > >>> > > > >> > > > > > > >>> > > > >> Does that sound reasonable to you? > > > > > > >>> > > > >> > > > > > > >>> > > > >> Thanks, > > > > > > >>> > > > >> > > > > > > >>> > > > >> Jun > > > > > > >>> > > > >> > > > > > > >>> > > > >> > > > > > > >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe < > > > > > > >>> suta...@gmail.com> > > > > > > >>> > > > wrote: > > > > > > >>> > > > >> > > > > > > >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta < > > > > > > >>> apu...@confluent.io> > > > > > > >>> > > > >> wrote: > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > There seems to be no relationship with > cluster > > > > > > metadata > > > > > > >>> > > > >> availability > > > > > > >>> > > > >> > or > > > > > > >>> > > > >> > > > > staleness. Expiry is just based on the time > > > since > > > > > the > > > > > > >>> batch > > > > > > >>> > > has > > > > > > >>> > > > >> been > > > > > > >>> > > > >> > > > ready. > > > > > > >>> > > > >> > > > > Please correct me if I am wrong. > > > > > > >>> > > > >> > > > > > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > > I was not very specific about where we do > > > > expiration. > > > > > I > > > > > > >>> > glossed > > > > > > >>> > > > over > > > > > > >>> > > > >> > some > > > > > > >>> > > > >> > > > details because (again) we've other mechanisms > > to > > > > > detect > > > > > > >>> non > > > > > > >>> > > > >> progress. > > > > > > >>> > > > >> > > The > > > > > > >>> > > > >> > > > condition (!muted.contains(tp) && > > (isMetadataStale > > > > || > > > > > > >>> > > > >> > > > > cluster.leaderFor(tp) == null)) is used in > > > > > > >>> > > > >> > > > RecordAccumualtor.expiredBatches: > > > > > > >>> > > > >> > > > https://github.com/apache/ > > > > > kafka/blob/trunk/clients/src/ > > > > > > >>> > > > >> > > > main/java/org/apache/kafka/ > > > > > clients/producer/internals/ > > > > > > >>> > > > >> > > > RecordAccumulator.java#L443 > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > > Effectively, we expire in all the following > > cases > > > > > > >>> > > > >> > > > 1) producer is partitioned from the brokers. > > When > > > > > > >>> metadata age > > > > > > >>> > > > grows > > > > > > >>> > > > >> > > beyond > > > > > > >>> > > > >> > > > 3x it's max value. It's safe to say that we're > > not > > > > > > >>> talking to > > > > > > >>> > > the > > > > > > >>> > > > >> > brokers > > > > > > >>> > > > >> > > > at all. Report. > > > > > > >>> > > > >> > > > 2) fresh metadata && leader for a partition is > > not > > > > > known > > > > > > >>> && a > > > > > > >>> > > > batch > > > > > > >>> > > > >> is > > > > > > >>> > > > >> > > > sitting there for longer than > > request.timeout.ms. > > > > > This > > > > > > >>> is one > > > > > > >>> > > > case > > > > > > >>> > > > >> we > > > > > > >>> > > > >> > > > would > > > > > > >>> > > > >> > > > like to improve and use batch.expiry.ms > because > > > > > > >>> > > > request.timeout.ms > > > > > > >>> > > > >> is > > > > > > >>> > > > >> > > too > > > > > > >>> > > > >> > > > small. > > > > > > >>> > > > >> > > > 3) fresh metadata && leader for a partition is > > > known > > > > > && > > > > > > >>> batch > > > > > > >>> > is > > > > > > >>> > > > >> > sitting > > > > > > >>> > > > >> > > > there for longer than batch.expiry.ms. This > is > > a > > > > new > > > > > > case > > > > > > >>> > that > > > > > > >>> > > is > > > > > > >>> > > > >> > > > different > > > > > > >>> > > > >> > > > from #2. This is the catch-up mode case. > Things > > > are > > > > > > >>> moving too > > > > > > >>> > > > >> slowly. > > > > > > >>> > > > >> > > > Pipeline SLAs are broken. Report and shutdown > > kmm. > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > > The second and the third cases are useful to a > > > > > real-time > > > > > > >>> app > > > > > > >>> > > for a > > > > > > >>> > > > >> > > > completely different reason. Report, forget > > about > > > > the > > > > > > >>> batch, > > > > > > >>> > and > > > > > > >>> > > > >> just > > > > > > >>> > > > >> > > move > > > > > > >>> > > > >> > > > on (without shutting down). > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > If I understand correctly, you are talking > about a > > > > fork > > > > > of > > > > > > >>> > apache > > > > > > >>> > > > >> kafka > > > > > > >>> > > > >> > > which has these additional conditions? Because > > that > > > > > check > > > > > > >>> > doesn't > > > > > > >>> > > > >> exist > > > > > > >>> > > > >> > on > > > > > > >>> > > > >> > > trunk today. > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > Right. It is our internal release in LinkedIn. > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > Or are you proposing to change the behavior of > > expiry > > > to > > > > > > >>> > > > >> > > account for stale metadata and partitioned > > producers > > > > as > > > > > > >>> part of > > > > > > >>> > > this > > > > > > >>> > > > >> KIP? > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > No. It's our temporary solution in the absence of > > > > kip-91. > > > > > > Note > > > > > > >>> > that > > > > > > >>> > > we > > > > > > >>> > > > >> dont > > > > > > >>> > > > >> > like increasing request.timeout.ms. Without our > > extra > > > > > > >>> conditions > > > > > > >>> > > our > > > > > > >>> > > > >> > batches expire too soon--a problem in kmm catchup > > > mode. > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > If we get batch.expiry.ms, we will configure it > to > > 20 > > > > > mins. > > > > > > >>> > > > maybeExpire > > > > > > >>> > > > >> > will use the config instead of r.t.ms. The extra > > > > > conditions > > > > > > >>> will > > > > > > >>> > be > > > > > > >>> > > > >> > unnecessary. All three cases shall be covered via > > the > > > > > > >>> batch.expiry > > > > > > >>> > > > >> timeout. > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >