Hey Jun, Sorry for the late reply. I have been giving your comments some thought. Replies below:
1. The section on handling FETCH_OFFSET_TOO_LARGE error says "Use the > OffsetForLeaderEpoch API to verify the current position with the leader". > The OffsetForLeaderEpoch request returns log end offset if the request > leader epoch is the latest. So, we won't know the true high watermark from > that request. It seems that the consumer still needs to send ListOffset > request to the leader to obtain high watermark? That's a good point. I think we missed this in KIP-320. I've added a replica_id to the OffsetsForLeaderEpoch API to match the Fetch and ListOffsets API so that the broker can avoid exposing offsets beyond the high watermark. This also means that the OffsetsForLeaderEpoch API needs the same handling we added to the ListOffsets API to avoid non-monotonic or incorrect responses. Similarly, I've proposed using the OFFSET_NOT_AVAILABLE error code in cases where the end offset of an epoch would exceed the high watermark. When querying the latest epoch, the leader will return OFFSET_NOT_AVAILABLE until the high watermark has reached an offset in the leader's current epoch. By the way, I've modified the KIP to drop the OFFSET_TOO_LARGE and OFFSET_TOO_SMALL error codes that I initially proposed. I realized that we could continue to use the current OFFSET_OUT_OF_RANGE error and rely on the returned start offset to distinguish the two cases. 2. If a non in-sync replica receives a fetch request from a consumer, > should it return a new type of error like ReplicaNotInSync? I gave this quite a bit of thought. It is impossible to avoid fetching from out-of-sync replicas in general due to propagation of the ISR state. The high watermark that is returned in fetch responses could be used as a more timely substitute, but we still can't assume that followers will always know when they are in-sync. From a high level, this means that the consumer anyway has to take out of range errors with a grain of salt if they come from followers. This is only a problem when switching between replicas or if resuming from a committed offset. If a consumer is following the same out-of-sync replica, then its position will stay in range and, other than some extra latency, no harm will be done. Furthermore, it may not be a good idea for consumers to chase the ISR too eagerly since this makes the performance profile harder to predict. The leader itself may have some temporarily increased request load which is causing followers to fall behind. If consumers then switched to the leader after they observed that the follower was out-of-sync, it may make the situation worse. Typically, If a follower has fallen out-of-sync, we expect it to catch back up shortly. It may be better in this scenario to allow consumers to continue fetching from it. On the other hand, if a follower stays out-of-sync for a while, the consumer should have the choice to find a new replica. So after thinking about it, I didn't see a lot of benefit in trying to be strict about ISR fetching. Potentially it even has downsides. Instead, I now see it as more of a heuristic which the consumer can use to keep end-to-end latency reasonably bounded. The consumer already has one knob the user can tune in order to limit this bound. The `metadata.max.age.ms` config controls how often metadata is refreshed. To follow the ISR more closely, the user can refresh metadata more frequently. Note that I've improved the section on out of range handling to be more explicit about the cases we needed to handle. 3. Could ReplicaSelector be closable? Yes, I made this change. As an aside, the question of whether we should use a plugin does deserve a bit of discussion. An alternative (suggested by David Arthur) that I've been thinking about is to let the broker select the preferred follower to fetch from using the Metadata API. For example, we could add a `rackId` field to the Metadata API which could be provided through user configuration. The broker could then order the ISR list for each partition so that the preferred follower is returned first (currently the order is random). The consumer could then always fetch from the first replica in the ISR list. The benefit is that the broker may have a better view of the current load characteristics, so it may be able to make better decisions. Client plugins are also much more difficult to control. This may have been the point that Mickael was hinting at above. 4. Currently, the ISR propagation from the leader to the controller can be > delayed up to 60 secs through ReplicaManager.IsrChangePropagationInterval. > In that window, the consumer could still be consuming from a non in-sync > replica. The relatively large delay is mostly for reducing the ZK writes > and the watcher overhead. Not sure what's the best way to address this. We > could potentially make this configurable. This is related to the discussion above. We could make it configurable I guess. I wonder if it would be reasonable to just reduce the default to something like 10 seconds. Do you think we get much benefit from such a long delay? 5. It may be worth mentioning that, to take advantage of affinity, one may > also want to have a customized PartitionAssignor to have an affinity aware > assignment in addition to a customized ReplicaSelector. Yes, this is a good point. I was assuming a situation in which each partition had its replicas in all the same datacenters, but you are right that this need not be the case. I will mention this in the KIP and give it some more thought. I think in the common case, these concerns can be treated orthogonally, but it is a bit irritating if you need two separate plugins to make the benefit more general. Thanks, Jason On Tue, Dec 11, 2018 at 11:04 AM Jason Gustafson <ja...@confluent.io> wrote: > Hi Eno, > > Thanks for the clarification. From a high level, the main thing to keep in > mind is that this is an opt-in feature. It is a bit like using acks=1 in > the sense that a user is accepting slightly weaker guarantees in order to > optimize for some metric (in this case, read locality). The default > behavior would read only from the leader and users will get the usual > semantics. That said, let me address the scenarios you raised: > > - scenario 1: an application that both produces and consumes (e.g., like >> Kafka streams) produces synchronously a single record to a topic and then >> attempts to consume that record. Topic is 3-way replicated say. Could it >> be >> the case that the produce succeeds but the consume fails? The consume >> could >> go to a replica that has not yet fetched the produce record, right? Or is >> that not possible? > > > I think it depends on what you mean by "fails." From a replica in the > ISR's perspective, it has all of the committed data. The only question is > what is safe to expose since the high watermark is always one round trip > behind. The proposal is to return a retriable error in this case so that > the consumer can distinguish the case from an out of range error and retry. > No error will be returned to the user, but consumption will be delayed. One > of the main improvements in the KIP is ensuring that this delay is minimal > in the common case. > > Note that even without follower fetching, this scenario is unavoidable. > When a replica becomes a leader, it doesn't know what the latest high > watermark is until it receives fetches from all followers in the ISR. > During this window, committed data is temporarily not visible. We handle > this similarly to what is proposed here. Basically we ask the consumer to > retry until we know the data is safe. > > - scenario 2: an application C that only consumes. Again say there is only >> one record produced (by another application P) to a replicated topic and >> that record has not propagated to all replicas yet (it is only at the >> leader at time t0). Application C attempts to consume at time t1 and it >> does so successfully because the consume fetches from the leader. At time >> t2 the same application seeks to the beginning of the topic and attempts >> to >> consume again. Is there a scenario where this second attempt fails because >> the fetching happens from a replica that does not have the record yet? At >> time t3 all replicas have the record. > > > Yes, this is possible in the way that I described above. There is a > (typically short) window in which committed data may not be visible. It is > a bit like the partition itself being unavailable temporarily. The data has > not been lost and is guaranteed to be returned, but the consumer has to > wait until the follower knows it is safe to return. > > One final note: I am iterating on the design a little bit in order to > address Jun's comments. Expect a few changes. I realized that there is some > inconsistency with the current fetch behavior and KIP-207. It is mainly in > regard to how we handle the transition from becoming a follower to becoming > a leader. > > Thanks, > Jason > > > > On Tue, Dec 11, 2018 at 3:46 AM Eno Thereska <eno.there...@gmail.com> > wrote: > >> Hi Jason, >> >> My question was on producer + consumer semantics, not just the producer >> semantics. I'll rephrase it slightly and split into two questions: >> - scenario 1: an application that both produces and consumes (e.g., like >> Kafka streams) produces synchronously a single record to a topic and then >> attempts to consume that record. Topic is 3-way replicated say. Could it >> be >> the case that the produce succeeds but the consume fails? The consume >> could >> go to a replica that has not yet fetched the produce record, right? Or is >> that not possible? >> - scenario 2: an application C that only consumes. Again say there is only >> one record produced (by another application P) to a replicated topic and >> that record has not propagated to all replicas yet (it is only at the >> leader at time t0). Application C attempts to consume at time t1 and it >> does so successfully because the consume fetches from the leader. At time >> t2 the same application seeks to the beginning of the topic and attempts >> to >> consume again. Is there a scenario where this second attempt fails because >> the fetching happens from a replica that does not have the record yet? At >> time t3 all replicas have the record. >> >> Thanks >> Eno >> >> >> >> >> On Mon, Dec 10, 2018 at 7:42 PM Jason Gustafson <ja...@confluent.io> >> wrote: >> >> > Hey Eno, >> > >> > Thanks for the comments. However, I'm a bit confused. I'm not >> suggesting we >> > change Produce semantics in any way. All writes still go through the >> > partition leader and nothing changes with respect to committing to the >> ISR. >> > The main issue, as I've mentioned in the KIP, is the increased latency >> > before a committed offset is exposed on followers. >> > >> > Perhaps I have misunderstood your question? >> > >> > Thanks, >> > Jason >> > >> > On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska <eno.there...@gmail.com> >> > wrote: >> > >> > > Hi Jason, >> > > >> > > This is an interesting KIP. This will have massive implications for >> > > consistency and serialization, since currently the leader for a >> partition >> > > serializes requests. A few questions for now: >> > > >> > > - before we deal with the complexity, it'd be great to see a crisp >> > example >> > > in the motivation as to when this will have the most benefit for a >> > > customer. In particular, although the customer might have a multi-DC >> > > deployment, the DCs could still be close by in a region, so what is >> the >> > > expected best-case scenario for a performance gain? E.g., if all DCs >> are >> > on >> > > the east-cost, say. Right now it's not clear to me. >> > > - perhaps performance is not the right metric. Is the metric you are >> > > optimizing for latency, throughput or cross-DC cost? (I believe it is >> > > cross-DC cost from the KIP). Just wanted to double-check since I'm not >> > sure >> > > latency would improve. Throughput could really improve from >> parallelism >> > > (especially in cases when there is mostly consuming going on). So it >> > could >> > > be throughput as well. >> > > - the proposal would probably lead to choosing a more complex >> > consistency. >> > > I tend to like the description Doug Terry has in his paper "Replicated >> > Data >> > > Consistency Explained Through Baseball" >> > > >> > > >> > >> https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf >> > > . >> > > To start with, could we get in scenarios where a client that has both >> a >> > > producer and a consumer (e.g., Kafka streams) produces a record, then >> > > attempts to consume it back and the consume() comes back with "record >> > does >> > > not exist"? That's fine, but could complicate application handling of >> > such >> > > scenarios. >> > > >> > > Thanks, >> > > Eno >> > > >> > > On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison < >> mickael.mai...@gmail.com >> > > >> > > wrote: >> > > >> > > > Hi Jason, >> > > > >> > > > Very cool KIP! >> > > > A couple of questions: >> > > > - I'm guessing the selector will be invoke after each rebalance so >> > > > every time the consumer is assigned a partition it will be able to >> > > > select it. Is that true? >> > > > >> > > > - From the selector API, I'm not sure how the consumer will be able >> to >> > > > address some of the choices mentioned in "Finding the preferred >> > > > follower". Especially the available bandwidth and the load >> balancing. >> > > > By only having the list of Nodes, a consumer can pick the nereast >> > > > replica (assuming the rack field means anything to users) or balance >> > > > its own bandwidth but that might not necessarily mean improved >> > > > performance or a balanced load on the brokers. >> > > > >> > > > Thanks >> > > > On Mon, Dec 3, 2018 at 11:35 AM Stanislav Kozlovski >> > > > <stanis...@confluent.io> wrote: >> > > > > >> > > > > Hey Jason, >> > > > > >> > > > > This is certainly a very exciting KIP. >> > > > > I assume that no changes will be made to the offset commits and >> they >> > > will >> > > > > continue to be sent to the group coordinator? >> > > > > >> > > > > I also wanted to address metrics - have we considered any changes >> > > there? >> > > > I >> > > > > imagine that it would be valuable for users to be able to >> > differentiate >> > > > > between which consumers' partitions are fetched from replicas and >> > which >> > > > > aren't. I guess that would need to be addressed both in the >> server's >> > > > > fetcher lag metrics and in the consumers. >> > > > > >> > > > > Thanks, >> > > > > Stanislav >> > > > > >> > > > > On Wed, Nov 28, 2018 at 10:08 PM Jun Rao <j...@confluent.io> >> wrote: >> > > > > >> > > > > > Hi, Jason, >> > > > > > >> > > > > > Thanks for the KIP. Looks good overall. A few minor comments >> below. >> > > > > > >> > > > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error says >> "Use >> > the >> > > > > > OffsetForLeaderEpoch API to verify the current position with the >> > > > leader". >> > > > > > The OffsetForLeaderEpoch request returns log end offset if the >> > > request >> > > > > > leader epoch is the latest. So, we won't know the true high >> > watermark >> > > > from >> > > > > > that request. It seems that the consumer still needs to send >> > > ListOffset >> > > > > > request to the leader to obtain high watermark? >> > > > > > >> > > > > > 2. If a non in-sync replica receives a fetch request from a >> > consumer, >> > > > > > should it return a new type of error like ReplicaNotInSync? >> > > > > > >> > > > > > 3. Could ReplicaSelector be closable? >> > > > > > >> > > > > > 4. Currently, the ISR propagation from the leader to the >> controller >> > > > can be >> > > > > > delayed up to 60 secs through >> > > > ReplicaManager.IsrChangePropagationInterval. >> > > > > > In that window, the consumer could still be consuming from a non >> > > > in-sync >> > > > > > replica. The relatively large delay is mostly for reducing the >> ZK >> > > > writes >> > > > > > and the watcher overhead. Not sure what's the best way to >> address >> > > > this. We >> > > > > > could potentially make this configurable. >> > > > > > >> > > > > > 5. It may be worth mentioning that, to take advantage of >> affinity, >> > > one >> > > > may >> > > > > > also want to have a customized PartitionAssignor to have an >> > affinity >> > > > aware >> > > > > > assignment in addition to a customized ReplicaSelector. >> > > > > > >> > > > > > Thanks, >> > > > > > >> > > > > > Jun >> > > > > > >> > > > > > On Wed, Nov 21, 2018 at 12:54 PM Jason Gustafson < >> > ja...@confluent.io >> > > > >> > > > > > wrote: >> > > > > > >> > > > > > > Hi All, >> > > > > > > >> > > > > > > I've posted a KIP to add the often-requested support for >> fetching >> > > > from >> > > > > > > followers: >> > > > > > > >> > > > > > > >> > > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica >> > > > > > > . >> > > > > > > Please take a look and let me know what you think. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Jason >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > Best, >> > > > > Stanislav >> > > > >> > > >> > >> >