Hey Everyone, Apologies for the long delay. I am picking this work back up.
After giving this some further thought, I decided it makes the most sense to move replica selection logic into the broker. It is much more difficult to coordinate selection logic in a multi-tenant environment if operators have to coordinate plugins across all client applications (not to mention other languages). Take a look at the updates and let me know what you think: https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica . Thanks! Jason On Fri, Jan 11, 2019 at 10:49 AM Jun Rao <j...@confluent.io> wrote: > Hi, Jason, > > Thanks for the updated KIP. Looks good overall. Just a few minor comments. > > 20. For case 2, if the consumer receives an OFFSET_NOT_AVAILABLE, I am > wondering if the consumer should refresh the metadata before retrying. This > can allow the consumer to switch to an in-sync replica sooner. > > 21. Under "protocol changes", there is a sentence "This allows the broker " > that seems broken. > > 4. About reducing the ISR propagation delay from the broker to the > controller. Jiangjie made that change in KAFKA-2722. Jiangjie, could you > comment on whether it's reasonable to reduce the propagation delay now? > > Thanks, > > Jun > > On Wed, Jan 2, 2019 at 11:06 AM Jason Gustafson <ja...@confluent.io> > wrote: > > > Hey Jun, > > > > Sorry for the late reply. I have been giving your comments some thought. > > Replies below: > > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error says "Use the > > > OffsetForLeaderEpoch API to verify the current position with the > leader". > > > The OffsetForLeaderEpoch request returns log end offset if the request > > > leader epoch is the latest. So, we won't know the true high watermark > > from > > > that request. It seems that the consumer still needs to send ListOffset > > > request to the leader to obtain high watermark? > > > > > > That's a good point. I think we missed this in KIP-320. I've added a > > replica_id to the OffsetsForLeaderEpoch API to match the Fetch and > > ListOffsets API so that the broker can avoid exposing offsets beyond the > > high watermark. This also means that the OffsetsForLeaderEpoch API needs > > the same handling we added to the ListOffsets API to avoid non-monotonic > or > > incorrect responses. Similarly, I've proposed using the > > OFFSET_NOT_AVAILABLE error code in cases where the end offset of an epoch > > would exceed the high watermark. When querying the latest epoch, the > leader > > will return OFFSET_NOT_AVAILABLE until the high watermark has reached an > > offset in the leader's current epoch. > > > > By the way, I've modified the KIP to drop the OFFSET_TOO_LARGE and > > OFFSET_TOO_SMALL error codes that I initially proposed. I realized that > we > > could continue to use the current OFFSET_OUT_OF_RANGE error and rely on > the > > returned start offset to distinguish the two cases. > > > > 2. If a non in-sync replica receives a fetch request from a consumer, > > > should it return a new type of error like ReplicaNotInSync? > > > > > > I gave this quite a bit of thought. It is impossible to avoid fetching > from > > out-of-sync replicas in general due to propagation of the ISR state. The > > high watermark that is returned in fetch responses could be used as a > more > > timely substitute, but we still can't assume that followers will always > > know when they are in-sync. From a high level, this means that the > consumer > > anyway has to take out of range errors with a grain of salt if they come > > from followers. This is only a problem when switching between replicas or > > if resuming from a committed offset. If a consumer is following the same > > out-of-sync replica, then its position will stay in range and, other than > > some extra latency, no harm will be done. > > > > Furthermore, it may not be a good idea for consumers to chase the ISR too > > eagerly since this makes the performance profile harder to predict. The > > leader itself may have some temporarily increased request load which is > > causing followers to fall behind. If consumers then switched to the > leader > > after they observed that the follower was out-of-sync, it may make the > > situation worse. Typically, If a follower has fallen out-of-sync, we > expect > > it to catch back up shortly. It may be better in this scenario to allow > > consumers to continue fetching from it. On the other hand, if a follower > > stays out-of-sync for a while, the consumer should have the choice to > find > > a new replica. > > > > So after thinking about it, I didn't see a lot of benefit in trying to be > > strict about ISR fetching. Potentially it even has downsides. Instead, I > > now see it as more of a heuristic which the consumer can use to keep > > end-to-end latency reasonably bounded. The consumer already has one knob > > the user can tune in order to limit this bound. The `metadata.max.age.ms > ` > > config controls how often metadata is refreshed. To follow the ISR more > > closely, the user can refresh metadata more frequently. > > > > Note that I've improved the section on out of range handling to be more > > explicit about the cases we needed to handle. > > > > 3. Could ReplicaSelector be closable? > > > > > > Yes, I made this change. As an aside, the question of whether we should > use > > a plugin does deserve a bit of discussion. An alternative (suggested by > > David Arthur) that I've been thinking about is to let the broker select > the > > preferred follower to fetch from using the Metadata API. For example, we > > could add a `rackId` field to the Metadata API which could be provided > > through user configuration. The broker could then order the ISR list for > > each partition so that the preferred follower is returned first > (currently > > the order is random). The consumer could then always fetch from the first > > replica in the ISR list. The benefit is that the broker may have a better > > view of the current load characteristics, so it may be able to make > better > > decisions. Client plugins are also much more difficult to control. This > may > > have been the point that Mickael was hinting at above. > > > > 4. Currently, the ISR propagation from the leader to the controller can > be > > > delayed up to 60 secs through > > ReplicaManager.IsrChangePropagationInterval. > > > In that window, the consumer could still be consuming from a non > in-sync > > > replica. The relatively large delay is mostly for reducing the ZK > writes > > > and the watcher overhead. Not sure what's the best way to address this. > > We > > > could potentially make this configurable. > > > > > > This is related to the discussion above. We could make it configurable I > > guess. I wonder if it would be reasonable to just reduce the default to > > something like 10 seconds. Do you think we get much benefit from such a > > long delay? > > > > 5. It may be worth mentioning that, to take advantage of affinity, one > may > > > also want to have a customized PartitionAssignor to have an affinity > > aware > > > assignment in addition to a customized ReplicaSelector. > > > > > > Yes, this is a good point. I was assuming a situation in which each > > partition had its replicas in all the same datacenters, but you are right > > that this need not be the case. I will mention this in the KIP and give > it > > some more thought. I think in the common case, these concerns can be > > treated orthogonally, but it is a bit irritating if you need two separate > > plugins to make the benefit more general. > > > > > > Thanks, > > Jason > > > > On Tue, Dec 11, 2018 at 11:04 AM Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hi Eno, > > > > > > Thanks for the clarification. From a high level, the main thing to keep > > in > > > mind is that this is an opt-in feature. It is a bit like using acks=1 > in > > > the sense that a user is accepting slightly weaker guarantees in order > to > > > optimize for some metric (in this case, read locality). The default > > > behavior would read only from the leader and users will get the usual > > > semantics. That said, let me address the scenarios you raised: > > > > > > - scenario 1: an application that both produces and consumes (e.g., > like > > >> Kafka streams) produces synchronously a single record to a topic and > > then > > >> attempts to consume that record. Topic is 3-way replicated say. Could > it > > >> be > > >> the case that the produce succeeds but the consume fails? The consume > > >> could > > >> go to a replica that has not yet fetched the produce record, right? Or > > is > > >> that not possible? > > > > > > > > > I think it depends on what you mean by "fails." From a replica in the > > > ISR's perspective, it has all of the committed data. The only question > is > > > what is safe to expose since the high watermark is always one round > trip > > > behind. The proposal is to return a retriable error in this case so > that > > > the consumer can distinguish the case from an out of range error and > > retry. > > > No error will be returned to the user, but consumption will be delayed. > > One > > > of the main improvements in the KIP is ensuring that this delay is > > minimal > > > in the common case. > > > > > > Note that even without follower fetching, this scenario is unavoidable. > > > When a replica becomes a leader, it doesn't know what the latest high > > > watermark is until it receives fetches from all followers in the ISR. > > > During this window, committed data is temporarily not visible. We > handle > > > this similarly to what is proposed here. Basically we ask the consumer > to > > > retry until we know the data is safe. > > > > > > - scenario 2: an application C that only consumes. Again say there is > > only > > >> one record produced (by another application P) to a replicated topic > and > > >> that record has not propagated to all replicas yet (it is only at the > > >> leader at time t0). Application C attempts to consume at time t1 and > it > > >> does so successfully because the consume fetches from the leader. At > > time > > >> t2 the same application seeks to the beginning of the topic and > attempts > > >> to > > >> consume again. Is there a scenario where this second attempt fails > > because > > >> the fetching happens from a replica that does not have the record yet? > > At > > >> time t3 all replicas have the record. > > > > > > > > > Yes, this is possible in the way that I described above. There is a > > > (typically short) window in which committed data may not be visible. It > > is > > > a bit like the partition itself being unavailable temporarily. The data > > has > > > not been lost and is guaranteed to be returned, but the consumer has to > > > wait until the follower knows it is safe to return. > > > > > > One final note: I am iterating on the design a little bit in order to > > > address Jun's comments. Expect a few changes. I realized that there is > > some > > > inconsistency with the current fetch behavior and KIP-207. It is mainly > > in > > > regard to how we handle the transition from becoming a follower to > > becoming > > > a leader. > > > > > > Thanks, > > > Jason > > > > > > > > > > > > On Tue, Dec 11, 2018 at 3:46 AM Eno Thereska <eno.there...@gmail.com> > > > wrote: > > > > > >> Hi Jason, > > >> > > >> My question was on producer + consumer semantics, not just the > producer > > >> semantics. I'll rephrase it slightly and split into two questions: > > >> - scenario 1: an application that both produces and consumes (e.g., > like > > >> Kafka streams) produces synchronously a single record to a topic and > > then > > >> attempts to consume that record. Topic is 3-way replicated say. Could > it > > >> be > > >> the case that the produce succeeds but the consume fails? The consume > > >> could > > >> go to a replica that has not yet fetched the produce record, right? Or > > is > > >> that not possible? > > >> - scenario 2: an application C that only consumes. Again say there is > > only > > >> one record produced (by another application P) to a replicated topic > and > > >> that record has not propagated to all replicas yet (it is only at the > > >> leader at time t0). Application C attempts to consume at time t1 and > it > > >> does so successfully because the consume fetches from the leader. At > > time > > >> t2 the same application seeks to the beginning of the topic and > attempts > > >> to > > >> consume again. Is there a scenario where this second attempt fails > > because > > >> the fetching happens from a replica that does not have the record yet? > > At > > >> time t3 all replicas have the record. > > >> > > >> Thanks > > >> Eno > > >> > > >> > > >> > > >> > > >> On Mon, Dec 10, 2018 at 7:42 PM Jason Gustafson <ja...@confluent.io> > > >> wrote: > > >> > > >> > Hey Eno, > > >> > > > >> > Thanks for the comments. However, I'm a bit confused. I'm not > > >> suggesting we > > >> > change Produce semantics in any way. All writes still go through the > > >> > partition leader and nothing changes with respect to committing to > the > > >> ISR. > > >> > The main issue, as I've mentioned in the KIP, is the increased > latency > > >> > before a committed offset is exposed on followers. > > >> > > > >> > Perhaps I have misunderstood your question? > > >> > > > >> > Thanks, > > >> > Jason > > >> > > > >> > On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska <eno.there...@gmail.com > > > > >> > wrote: > > >> > > > >> > > Hi Jason, > > >> > > > > >> > > This is an interesting KIP. This will have massive implications > for > > >> > > consistency and serialization, since currently the leader for a > > >> partition > > >> > > serializes requests. A few questions for now: > > >> > > > > >> > > - before we deal with the complexity, it'd be great to see a crisp > > >> > example > > >> > > in the motivation as to when this will have the most benefit for a > > >> > > customer. In particular, although the customer might have a > multi-DC > > >> > > deployment, the DCs could still be close by in a region, so what > is > > >> the > > >> > > expected best-case scenario for a performance gain? E.g., if all > DCs > > >> are > > >> > on > > >> > > the east-cost, say. Right now it's not clear to me. > > >> > > - perhaps performance is not the right metric. Is the metric you > are > > >> > > optimizing for latency, throughput or cross-DC cost? (I believe it > > is > > >> > > cross-DC cost from the KIP). Just wanted to double-check since I'm > > not > > >> > sure > > >> > > latency would improve. Throughput could really improve from > > >> parallelism > > >> > > (especially in cases when there is mostly consuming going on). So > it > > >> > could > > >> > > be throughput as well. > > >> > > - the proposal would probably lead to choosing a more complex > > >> > consistency. > > >> > > I tend to like the description Doug Terry has in his paper > > "Replicated > > >> > Data > > >> > > Consistency Explained Through Baseball" > > >> > > > > >> > > > > >> > > > >> > > > https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf > > >> > > . > > >> > > To start with, could we get in scenarios where a client that has > > both > > >> a > > >> > > producer and a consumer (e.g., Kafka streams) produces a record, > > then > > >> > > attempts to consume it back and the consume() comes back with > > "record > > >> > does > > >> > > not exist"? That's fine, but could complicate application handling > > of > > >> > such > > >> > > scenarios. > > >> > > > > >> > > Thanks, > > >> > > Eno > > >> > > > > >> > > On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison < > > >> mickael.mai...@gmail.com > > >> > > > > >> > > wrote: > > >> > > > > >> > > > Hi Jason, > > >> > > > > > >> > > > Very cool KIP! > > >> > > > A couple of questions: > > >> > > > - I'm guessing the selector will be invoke after each rebalance > so > > >> > > > every time the consumer is assigned a partition it will be able > to > > >> > > > select it. Is that true? > > >> > > > > > >> > > > - From the selector API, I'm not sure how the consumer will be > > able > > >> to > > >> > > > address some of the choices mentioned in "Finding the preferred > > >> > > > follower". Especially the available bandwidth and the load > > >> balancing. > > >> > > > By only having the list of Nodes, a consumer can pick the > nereast > > >> > > > replica (assuming the rack field means anything to users) or > > balance > > >> > > > its own bandwidth but that might not necessarily mean improved > > >> > > > performance or a balanced load on the brokers. > > >> > > > > > >> > > > Thanks > > >> > > > On Mon, Dec 3, 2018 at 11:35 AM Stanislav Kozlovski > > >> > > > <stanis...@confluent.io> wrote: > > >> > > > > > > >> > > > > Hey Jason, > > >> > > > > > > >> > > > > This is certainly a very exciting KIP. > > >> > > > > I assume that no changes will be made to the offset commits > and > > >> they > > >> > > will > > >> > > > > continue to be sent to the group coordinator? > > >> > > > > > > >> > > > > I also wanted to address metrics - have we considered any > > changes > > >> > > there? > > >> > > > I > > >> > > > > imagine that it would be valuable for users to be able to > > >> > differentiate > > >> > > > > between which consumers' partitions are fetched from replicas > > and > > >> > which > > >> > > > > aren't. I guess that would need to be addressed both in the > > >> server's > > >> > > > > fetcher lag metrics and in the consumers. > > >> > > > > > > >> > > > > Thanks, > > >> > > > > Stanislav > > >> > > > > > > >> > > > > On Wed, Nov 28, 2018 at 10:08 PM Jun Rao <j...@confluent.io> > > >> wrote: > > >> > > > > > > >> > > > > > Hi, Jason, > > >> > > > > > > > >> > > > > > Thanks for the KIP. Looks good overall. A few minor comments > > >> below. > > >> > > > > > > > >> > > > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error says > > >> "Use > > >> > the > > >> > > > > > OffsetForLeaderEpoch API to verify the current position with > > the > > >> > > > leader". > > >> > > > > > The OffsetForLeaderEpoch request returns log end offset if > the > > >> > > request > > >> > > > > > leader epoch is the latest. So, we won't know the true high > > >> > watermark > > >> > > > from > > >> > > > > > that request. It seems that the consumer still needs to send > > >> > > ListOffset > > >> > > > > > request to the leader to obtain high watermark? > > >> > > > > > > > >> > > > > > 2. If a non in-sync replica receives a fetch request from a > > >> > consumer, > > >> > > > > > should it return a new type of error like ReplicaNotInSync? > > >> > > > > > > > >> > > > > > 3. Could ReplicaSelector be closable? > > >> > > > > > > > >> > > > > > 4. Currently, the ISR propagation from the leader to the > > >> controller > > >> > > > can be > > >> > > > > > delayed up to 60 secs through > > >> > > > ReplicaManager.IsrChangePropagationInterval. > > >> > > > > > In that window, the consumer could still be consuming from a > > non > > >> > > > in-sync > > >> > > > > > replica. The relatively large delay is mostly for reducing > the > > >> ZK > > >> > > > writes > > >> > > > > > and the watcher overhead. Not sure what's the best way to > > >> address > > >> > > > this. We > > >> > > > > > could potentially make this configurable. > > >> > > > > > > > >> > > > > > 5. It may be worth mentioning that, to take advantage of > > >> affinity, > > >> > > one > > >> > > > may > > >> > > > > > also want to have a customized PartitionAssignor to have an > > >> > affinity > > >> > > > aware > > >> > > > > > assignment in addition to a customized ReplicaSelector. > > >> > > > > > > > >> > > > > > Thanks, > > >> > > > > > > > >> > > > > > Jun > > >> > > > > > > > >> > > > > > On Wed, Nov 21, 2018 at 12:54 PM Jason Gustafson < > > >> > ja...@confluent.io > > >> > > > > > >> > > > > > wrote: > > >> > > > > > > > >> > > > > > > Hi All, > > >> > > > > > > > > >> > > > > > > I've posted a KIP to add the often-requested support for > > >> fetching > > >> > > > from > > >> > > > > > > followers: > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica > > >> > > > > > > . > > >> > > > > > > Please take a look and let me know what you think. > > >> > > > > > > > > >> > > > > > > Thanks, > > >> > > > > > > Jason > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > -- > > >> > > > > Best, > > >> > > > > Stanislav > > >> > > > > > >> > > > > >> > > > >> > > > > > >