Hi Ryanne, Thanks, responses below:
Thanks Jason, I see that the proposed ReplicaSelector would be where that > decision is made. But I'm not certain how a consumer triggers this process? > If a consumer can't reach its preferred rack, how does it ask for a new > assignment? As documented in the KIP, the consumer chooses whether or not to accept the preferred replica through the `replica.selection.policy`. The default behavior is the same as what we have today: the consumer will fetch only from the leader. If the consumer has opted into preferred replica selection, then it will use the replica returned in the Metadata response. If it cannot connect to this replica, it will refresh metadata and try again. It is the same as when connecting to leaders only. I suppose a consumer that can't reach its preferred rack would need to ask > a broker in a second rack. That broker would refer back to the previous > rack, unless the broker also knows that the preferred rack is unreachable. > But why would the second broker ever refer the consumer to a third rack? It > seems that, at that point, the consumer will have already made the decision > to contact the second rack, based on some logic of its own. I am not sure what you mean about "preferred rack." I assume you are referring to preferred replica? As with current logic, if the consumer cannot make progress with a replica (whether it is the leader or not), it will refresh metadata and try again. I think there may be some confusion about the problem we are trying to solve. Our interest is giving operators the ability to save cross-dc costs by leveraging consumer locality. I think you are arguing that follower fetching may be able to solve a separate problem as well, which is routing around network partitions. I think this is a fair point. Generally the replica selection heuristics we would tend to favor more reliable connections in any case. If the consumer is located in the same availability zone as the broker, for example, it is more likely to be able to connect to that broker than in another availability zone or region. So with this, we are likely in a much better state than we were before when the consumer is forced to go wherever the leader is. But I think it's fair that there may be room for additional logic in the future on the client side. Also, if a broker has decided that a rack is overloaded or unreachable, why > not just automatically rebalance its leaders to another rack? The effect > would be much the same, and we already support this. This is what we already do today. This seems orthogonal? As I said initially, selection logic should take replica availability into account. w.r.t. availability, my specific concern is that, if brokers are > responsible for assigning consumers to racks, this means that brokers must > decide when a rack is unavailable to a consumer based on some timeout, and > then further wait for the consumer to request its new assignment. Then, > after another timeout, the racks would reelect leaders, possibly causing > some consumers to change racks again. I think this would end up hurting > consumer HA to some extent. I am not sure what you mean about racks reelecting leaders. Nothing changes with regard to leader election in this KIP. That said, I think I understand the high level concern, which is that the client has some connection-related information which the broker does not have. I think the fact is that neither the broker nor the client have complete knowledge. The client doesn't know anything about load characteristics on the brokers, for example, which I would say is at least of equal importance. This KIP biases broker-side selection because that makes it easier to coordinate selection logic across the many applications running on a cluster. For example, you easily get consistent behavior across different client implementations. This simplifies provisioning and makes it easier for operators to evolve the selection logic. I think I can see in the future that we may also want to have some ability to use follower fetching to route around network problems in the client itself. The `replica.selection.policy` config for example could provide an additional option to implement the policy you are suggesting or perhaps even something custom. I will mention this in the KIP and leave this open as a potential future improvement. As it stands, users can still choose to keep current behavior if they do not want to be bound by the broker's preference. Thanks, Jason On Wed, Mar 20, 2019 at 10:13 AM Ryanne Dolan <ryannedo...@gmail.com> wrote: > Thanks Jason, I see that the proposed ReplicaSelector would be where that > decision is made. But I'm not certain how a consumer triggers this process? > If a consumer can't reach its preferred rack, how does it ask for a new > assignment? > > I suppose a consumer that can't reach its preferred rack would need to ask > a broker in a second rack. That broker would refer back to the previous > rack, unless the broker also knows that the preferred rack is unreachable. > But why would the second broker ever refer the consumer to a third rack? It > seems that, at that point, the consumer will have already made the decision > to contact the second rack, based on some logic of its own. > > Moreover, the broker actually has less information than the consumer does > w.r.t. latency and reachability. For example, a network might be segmented > such that consumers can only reach certain racks, while brokers can all > reach each other. In that case, the ReplicaSelector would need to be aware > of this network topology in order to determine which racks are preferable. > On the other hand, consumer logic could detect which racks are reachable, > the latency when fetching from them, etc. > > Or a broker might decide its rack's latency is too high and refer consumers > to another rack. But the broker can't predict what the latency from the > consumers to the new rack will be. In order to make such a decision, each > broker would need to know the latency between each consumer and every other > broker. > > Also, if a broker has decided that a rack is overloaded or unreachable, why > not just automatically rebalance its leaders to another rack? The effect > would be much the same, and we already support this. > > w.r.t. availability, my specific concern is that, if brokers are > responsible for assigning consumers to racks, this means that brokers must > decide when a rack is unavailable to a consumer based on some timeout, and > then further wait for the consumer to request its new assignment. Then, > after another timeout, the racks would reelect leaders, possibly causing > some consumers to change racks again. I think this would end up hurting > consumer HA to some extent. > > My suggestion is that we drop the ReplicaSelector concept and instead > support two simple options in the consumer config: LEADER (default) and > RACK. The RACK option would cause the client to only connect to the rack > specified by rack.id. If an application wants to do something fancy like > automatically failover between racks, the app can just change its rack.id, > perhaps after a short timeout. > > Ryanne > > On Tue, Mar 19, 2019 at 5:55 PM Jason Gustafson <ja...@confluent.io> > wrote: > > > Hi Ryanne, > > > > Thanks for the comment. If I understand your question correctly, I think > > the answer is no. I would expect typical selection logic to consider > > replica availability first before any other factor. In some cases, > however, > > a user may put a higher priority on saving cross-dc traffic costs. If a > > preferred replica is unavailable, they may prefer to wait some time for > it > > to be restored before routing traffic elsewhere. Does that make sense? > > > > Best, > > Jason > > > > On Tue, Mar 19, 2019 at 3:43 PM Ryanne Dolan <ryannedo...@gmail.com> > > wrote: > > > > > Jason, awesome KIP. > > > > > > I'm wondering how this change would affect availability of the cluster > > when > > > a rack is unreachable. Is there a scenario where availability is > improved > > > or impaired due to the proposed changes? > > > > > > Ryanne > > > > > > On Tue, Mar 19, 2019 at 4:32 PM Jason Gustafson <ja...@confluent.io> > > > wrote: > > > > > > > Hi Jun, > > > > > > > > Yes, that makes sense to me. I have added a ClientMetadata class > which > > > > encapsulates various metadata including the rackId and the client > > address > > > > information. > > > > > > > > Thanks, > > > > Jason > > > > > > > > On Tue, Mar 19, 2019 at 2:17 PM Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Jason, > > > > > > > > > > Thanks for the updated KIP. Just one more comment below. > > > > > > > > > > 100. The ReplicaSelector class has the following method. I am > > wondering > > > > if > > > > > we should additionally pass in the client connection info to the > > > method. > > > > > For example, if rackId is not set, the plugin could potentially > > select > > > > the > > > > > replica based on the IP address of the client. > > > > > > > > > > Node select(String rackId, PartitionInfo partitionInfo) > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Mon, Mar 11, 2019 at 4:24 PM Jason Gustafson < > ja...@confluent.io> > > > > > wrote: > > > > > > > > > > > Hey Everyone, > > > > > > > > > > > > Apologies for the long delay. I am picking this work back up. > > > > > > > > > > > > After giving this some further thought, I decided it makes the > most > > > > sense > > > > > > to move replica selection logic into the broker. It is much more > > > > > difficult > > > > > > to coordinate selection logic in a multi-tenant environment if > > > > operators > > > > > > have to coordinate plugins across all client applications (not to > > > > mention > > > > > > other languages). Take a look at the updates and let me know what > > you > > > > > > think: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica > > > > > > . > > > > > > > > > > > > Thanks! > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 11, 2019 at 10:49 AM Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > > > Hi, Jason, > > > > > > > > > > > > > > Thanks for the updated KIP. Looks good overall. Just a few > minor > > > > > > comments. > > > > > > > > > > > > > > 20. For case 2, if the consumer receives an > > OFFSET_NOT_AVAILABLE, I > > > > am > > > > > > > wondering if the consumer should refresh the metadata before > > > > retrying. > > > > > > This > > > > > > > can allow the consumer to switch to an in-sync replica sooner. > > > > > > > > > > > > > > 21. Under "protocol changes", there is a sentence "This allows > > the > > > > > > broker " > > > > > > > that seems broken. > > > > > > > > > > > > > > 4. About reducing the ISR propagation delay from the broker to > > the > > > > > > > controller. Jiangjie made that change in KAFKA-2722. Jiangjie, > > > could > > > > > you > > > > > > > comment on whether it's reasonable to reduce the propagation > > delay > > > > now? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Wed, Jan 2, 2019 at 11:06 AM Jason Gustafson < > > > ja...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > Sorry for the late reply. I have been giving your comments > some > > > > > > thought. > > > > > > > > Replies below: > > > > > > > > > > > > > > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error says > > "Use > > > > the > > > > > > > > > OffsetForLeaderEpoch API to verify the current position > with > > > the > > > > > > > leader". > > > > > > > > > The OffsetForLeaderEpoch request returns log end offset if > > the > > > > > > request > > > > > > > > > leader epoch is the latest. So, we won't know the true high > > > > > watermark > > > > > > > > from > > > > > > > > > that request. It seems that the consumer still needs to > send > > > > > > ListOffset > > > > > > > > > request to the leader to obtain high watermark? > > > > > > > > > > > > > > > > > > > > > > > > That's a good point. I think we missed this in KIP-320. I've > > > added > > > > a > > > > > > > > replica_id to the OffsetsForLeaderEpoch API to match the > Fetch > > > and > > > > > > > > ListOffsets API so that the broker can avoid exposing offsets > > > > beyond > > > > > > the > > > > > > > > high watermark. This also means that the > OffsetsForLeaderEpoch > > > API > > > > > > needs > > > > > > > > the same handling we added to the ListOffsets API to avoid > > > > > > non-monotonic > > > > > > > or > > > > > > > > incorrect responses. Similarly, I've proposed using the > > > > > > > > OFFSET_NOT_AVAILABLE error code in cases where the end offset > > of > > > an > > > > > > epoch > > > > > > > > would exceed the high watermark. When querying the latest > > epoch, > > > > the > > > > > > > leader > > > > > > > > will return OFFSET_NOT_AVAILABLE until the high watermark has > > > > reached > > > > > > an > > > > > > > > offset in the leader's current epoch. > > > > > > > > > > > > > > > > By the way, I've modified the KIP to drop the > OFFSET_TOO_LARGE > > > and > > > > > > > > OFFSET_TOO_SMALL error codes that I initially proposed. I > > > realized > > > > > that > > > > > > > we > > > > > > > > could continue to use the current OFFSET_OUT_OF_RANGE error > and > > > > rely > > > > > on > > > > > > > the > > > > > > > > returned start offset to distinguish the two cases. > > > > > > > > > > > > > > > > 2. If a non in-sync replica receives a fetch request from a > > > > consumer, > > > > > > > > > should it return a new type of error like ReplicaNotInSync? > > > > > > > > > > > > > > > > > > > > > > > > I gave this quite a bit of thought. It is impossible to avoid > > > > > fetching > > > > > > > from > > > > > > > > out-of-sync replicas in general due to propagation of the ISR > > > > state. > > > > > > The > > > > > > > > high watermark that is returned in fetch responses could be > > used > > > > as a > > > > > > > more > > > > > > > > timely substitute, but we still can't assume that followers > > will > > > > > always > > > > > > > > know when they are in-sync. From a high level, this means > that > > > the > > > > > > > consumer > > > > > > > > anyway has to take out of range errors with a grain of salt > if > > > they > > > > > > come > > > > > > > > from followers. This is only a problem when switching between > > > > > replicas > > > > > > or > > > > > > > > if resuming from a committed offset. If a consumer is > following > > > the > > > > > > same > > > > > > > > out-of-sync replica, then its position will stay in range > and, > > > > other > > > > > > than > > > > > > > > some extra latency, no harm will be done. > > > > > > > > > > > > > > > > Furthermore, it may not be a good idea for consumers to chase > > the > > > > ISR > > > > > > too > > > > > > > > eagerly since this makes the performance profile harder to > > > predict. > > > > > The > > > > > > > > leader itself may have some temporarily increased request > load > > > > which > > > > > is > > > > > > > > causing followers to fall behind. If consumers then switched > to > > > the > > > > > > > leader > > > > > > > > after they observed that the follower was out-of-sync, it may > > > make > > > > > the > > > > > > > > situation worse. Typically, If a follower has fallen > > out-of-sync, > > > > we > > > > > > > expect > > > > > > > > it to catch back up shortly. It may be better in this > scenario > > to > > > > > allow > > > > > > > > consumers to continue fetching from it. On the other hand, > if a > > > > > > follower > > > > > > > > stays out-of-sync for a while, the consumer should have the > > > choice > > > > to > > > > > > > find > > > > > > > > a new replica. > > > > > > > > > > > > > > > > So after thinking about it, I didn't see a lot of benefit in > > > trying > > > > > to > > > > > > be > > > > > > > > strict about ISR fetching. Potentially it even has downsides. > > > > > Instead, > > > > > > I > > > > > > > > now see it as more of a heuristic which the consumer can use > to > > > > keep > > > > > > > > end-to-end latency reasonably bounded. The consumer already > has > > > one > > > > > > knob > > > > > > > > the user can tune in order to limit this bound. The ` > > > > > > metadata.max.age.ms > > > > > > > ` > > > > > > > > config controls how often metadata is refreshed. To follow > the > > > ISR > > > > > more > > > > > > > > closely, the user can refresh metadata more frequently. > > > > > > > > > > > > > > > > Note that I've improved the section on out of range handling > to > > > be > > > > > more > > > > > > > > explicit about the cases we needed to handle. > > > > > > > > > > > > > > > > 3. Could ReplicaSelector be closable? > > > > > > > > > > > > > > > > > > > > > > > > Yes, I made this change. As an aside, the question of whether > > we > > > > > should > > > > > > > use > > > > > > > > a plugin does deserve a bit of discussion. An alternative > > > > (suggested > > > > > by > > > > > > > > David Arthur) that I've been thinking about is to let the > > broker > > > > > select > > > > > > > the > > > > > > > > preferred follower to fetch from using the Metadata API. For > > > > example, > > > > > > we > > > > > > > > could add a `rackId` field to the Metadata API which could be > > > > > provided > > > > > > > > through user configuration. The broker could then order the > ISR > > > > list > > > > > > for > > > > > > > > each partition so that the preferred follower is returned > first > > > > > > > (currently > > > > > > > > the order is random). The consumer could then always fetch > from > > > the > > > > > > first > > > > > > > > replica in the ISR list. The benefit is that the broker may > > have > > > a > > > > > > better > > > > > > > > view of the current load characteristics, so it may be able > to > > > make > > > > > > > better > > > > > > > > decisions. Client plugins are also much more difficult to > > > control. > > > > > This > > > > > > > may > > > > > > > > have been the point that Mickael was hinting at above. > > > > > > > > > > > > > > > > 4. Currently, the ISR propagation from the leader to the > > > controller > > > > > can > > > > > > > be > > > > > > > > > delayed up to 60 secs through > > > > > > > > ReplicaManager.IsrChangePropagationInterval. > > > > > > > > > In that window, the consumer could still be consuming from > a > > > non > > > > > > > in-sync > > > > > > > > > replica. The relatively large delay is mostly for reducing > > the > > > ZK > > > > > > > writes > > > > > > > > > and the watcher overhead. Not sure what's the best way to > > > address > > > > > > this. > > > > > > > > We > > > > > > > > > could potentially make this configurable. > > > > > > > > > > > > > > > > > > > > > > > > This is related to the discussion above. We could make it > > > > > configurable > > > > > > I > > > > > > > > guess. I wonder if it would be reasonable to just reduce the > > > > default > > > > > to > > > > > > > > something like 10 seconds. Do you think we get much benefit > > from > > > > > such a > > > > > > > > long delay? > > > > > > > > > > > > > > > > 5. It may be worth mentioning that, to take advantage of > > > affinity, > > > > > one > > > > > > > may > > > > > > > > > also want to have a customized PartitionAssignor to have an > > > > > affinity > > > > > > > > aware > > > > > > > > > assignment in addition to a customized ReplicaSelector. > > > > > > > > > > > > > > > > > > > > > > > > Yes, this is a good point. I was assuming a situation in > which > > > each > > > > > > > > partition had its replicas in all the same datacenters, but > you > > > are > > > > > > right > > > > > > > > that this need not be the case. I will mention this in the > KIP > > > and > > > > > give > > > > > > > it > > > > > > > > some more thought. I think in the common case, these concerns > > can > > > > be > > > > > > > > treated orthogonally, but it is a bit irritating if you need > > two > > > > > > separate > > > > > > > > plugins to make the benefit more general. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Jason > > > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 11:04 AM Jason Gustafson < > > > > ja...@confluent.io > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Eno, > > > > > > > > > > > > > > > > > > Thanks for the clarification. From a high level, the main > > thing > > > > to > > > > > > keep > > > > > > > > in > > > > > > > > > mind is that this is an opt-in feature. It is a bit like > > using > > > > > acks=1 > > > > > > > in > > > > > > > > > the sense that a user is accepting slightly weaker > guarantees > > > in > > > > > > order > > > > > > > to > > > > > > > > > optimize for some metric (in this case, read locality). The > > > > default > > > > > > > > > behavior would read only from the leader and users will get > > the > > > > > usual > > > > > > > > > semantics. That said, let me address the scenarios you > > raised: > > > > > > > > > > > > > > > > > > - scenario 1: an application that both produces and > consumes > > > > (e.g., > > > > > > > like > > > > > > > > >> Kafka streams) produces synchronously a single record to a > > > topic > > > > > and > > > > > > > > then > > > > > > > > >> attempts to consume that record. Topic is 3-way replicated > > > say. > > > > > > Could > > > > > > > it > > > > > > > > >> be > > > > > > > > >> the case that the produce succeeds but the consume fails? > > The > > > > > > consume > > > > > > > > >> could > > > > > > > > >> go to a replica that has not yet fetched the produce > record, > > > > > right? > > > > > > Or > > > > > > > > is > > > > > > > > >> that not possible? > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it depends on what you mean by "fails." From a > > replica > > > in > > > > > the > > > > > > > > > ISR's perspective, it has all of the committed data. The > only > > > > > > question > > > > > > > is > > > > > > > > > what is safe to expose since the high watermark is always > one > > > > round > > > > > > > trip > > > > > > > > > behind. The proposal is to return a retriable error in this > > > case > > > > so > > > > > > > that > > > > > > > > > the consumer can distinguish the case from an out of range > > > error > > > > > and > > > > > > > > retry. > > > > > > > > > No error will be returned to the user, but consumption will > > be > > > > > > delayed. > > > > > > > > One > > > > > > > > > of the main improvements in the KIP is ensuring that this > > delay > > > > is > > > > > > > > minimal > > > > > > > > > in the common case. > > > > > > > > > > > > > > > > > > Note that even without follower fetching, this scenario is > > > > > > unavoidable. > > > > > > > > > When a replica becomes a leader, it doesn't know what the > > > latest > > > > > high > > > > > > > > > watermark is until it receives fetches from all followers > in > > > the > > > > > ISR. > > > > > > > > > During this window, committed data is temporarily not > > visible. > > > We > > > > > > > handle > > > > > > > > > this similarly to what is proposed here. Basically we ask > the > > > > > > consumer > > > > > > > to > > > > > > > > > retry until we know the data is safe. > > > > > > > > > > > > > > > > > > - scenario 2: an application C that only consumes. Again > say > > > > there > > > > > is > > > > > > > > only > > > > > > > > >> one record produced (by another application P) to a > > replicated > > > > > topic > > > > > > > and > > > > > > > > >> that record has not propagated to all replicas yet (it is > > only > > > > at > > > > > > the > > > > > > > > >> leader at time t0). Application C attempts to consume at > > time > > > t1 > > > > > and > > > > > > > it > > > > > > > > >> does so successfully because the consume fetches from the > > > > leader. > > > > > At > > > > > > > > time > > > > > > > > >> t2 the same application seeks to the beginning of the > topic > > > and > > > > > > > attempts > > > > > > > > >> to > > > > > > > > >> consume again. Is there a scenario where this second > attempt > > > > fails > > > > > > > > because > > > > > > > > >> the fetching happens from a replica that does not have the > > > > record > > > > > > yet? > > > > > > > > At > > > > > > > > >> time t3 all replicas have the record. > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, this is possible in the way that I described above. > > There > > > > is a > > > > > > > > > (typically short) window in which committed data may not be > > > > > visible. > > > > > > It > > > > > > > > is > > > > > > > > > a bit like the partition itself being unavailable > > temporarily. > > > > The > > > > > > data > > > > > > > > has > > > > > > > > > not been lost and is guaranteed to be returned, but the > > > consumer > > > > > has > > > > > > to > > > > > > > > > wait until the follower knows it is safe to return. > > > > > > > > > > > > > > > > > > One final note: I am iterating on the design a little bit > in > > > > order > > > > > to > > > > > > > > > address Jun's comments. Expect a few changes. I realized > that > > > > there > > > > > > is > > > > > > > > some > > > > > > > > > inconsistency with the current fetch behavior and KIP-207. > It > > > is > > > > > > mainly > > > > > > > > in > > > > > > > > > regard to how we handle the transition from becoming a > > follower > > > > to > > > > > > > > becoming > > > > > > > > > a leader. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 3:46 AM Eno Thereska < > > > > > eno.there...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > >> Hi Jason, > > > > > > > > >> > > > > > > > > >> My question was on producer + consumer semantics, not just > > the > > > > > > > producer > > > > > > > > >> semantics. I'll rephrase it slightly and split into two > > > > questions: > > > > > > > > >> - scenario 1: an application that both produces and > consumes > > > > > (e.g., > > > > > > > like > > > > > > > > >> Kafka streams) produces synchronously a single record to a > > > topic > > > > > and > > > > > > > > then > > > > > > > > >> attempts to consume that record. Topic is 3-way replicated > > > say. > > > > > > Could > > > > > > > it > > > > > > > > >> be > > > > > > > > >> the case that the produce succeeds but the consume fails? > > The > > > > > > consume > > > > > > > > >> could > > > > > > > > >> go to a replica that has not yet fetched the produce > record, > > > > > right? > > > > > > Or > > > > > > > > is > > > > > > > > >> that not possible? > > > > > > > > >> - scenario 2: an application C that only consumes. Again > say > > > > there > > > > > > is > > > > > > > > only > > > > > > > > >> one record produced (by another application P) to a > > replicated > > > > > topic > > > > > > > and > > > > > > > > >> that record has not propagated to all replicas yet (it is > > only > > > > at > > > > > > the > > > > > > > > >> leader at time t0). Application C attempts to consume at > > time > > > t1 > > > > > and > > > > > > > it > > > > > > > > >> does so successfully because the consume fetches from the > > > > leader. > > > > > At > > > > > > > > time > > > > > > > > >> t2 the same application seeks to the beginning of the > topic > > > and > > > > > > > attempts > > > > > > > > >> to > > > > > > > > >> consume again. Is there a scenario where this second > attempt > > > > fails > > > > > > > > because > > > > > > > > >> the fetching happens from a replica that does not have the > > > > record > > > > > > yet? > > > > > > > > At > > > > > > > > >> time t3 all replicas have the record. > > > > > > > > >> > > > > > > > > >> Thanks > > > > > > > > >> Eno > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> On Mon, Dec 10, 2018 at 7:42 PM Jason Gustafson < > > > > > ja...@confluent.io > > > > > > > > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > >> > Hey Eno, > > > > > > > > >> > > > > > > > > > >> > Thanks for the comments. However, I'm a bit confused. > I'm > > > not > > > > > > > > >> suggesting we > > > > > > > > >> > change Produce semantics in any way. All writes still go > > > > through > > > > > > the > > > > > > > > >> > partition leader and nothing changes with respect to > > > > committing > > > > > to > > > > > > > the > > > > > > > > >> ISR. > > > > > > > > >> > The main issue, as I've mentioned in the KIP, is the > > > increased > > > > > > > latency > > > > > > > > >> > before a committed offset is exposed on followers. > > > > > > > > >> > > > > > > > > > >> > Perhaps I have misunderstood your question? > > > > > > > > >> > > > > > > > > > >> > Thanks, > > > > > > > > >> > Jason > > > > > > > > >> > > > > > > > > > >> > On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska < > > > > > > eno.there...@gmail.com > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > >> > > > > > > > > > >> > > Hi Jason, > > > > > > > > >> > > > > > > > > > > >> > > This is an interesting KIP. This will have massive > > > > > implications > > > > > > > for > > > > > > > > >> > > consistency and serialization, since currently the > > leader > > > > for > > > > > a > > > > > > > > >> partition > > > > > > > > >> > > serializes requests. A few questions for now: > > > > > > > > >> > > > > > > > > > > >> > > - before we deal with the complexity, it'd be great to > > > see a > > > > > > crisp > > > > > > > > >> > example > > > > > > > > >> > > in the motivation as to when this will have the most > > > benefit > > > > > > for a > > > > > > > > >> > > customer. In particular, although the customer might > > have > > > a > > > > > > > multi-DC > > > > > > > > >> > > deployment, the DCs could still be close by in a > region, > > > so > > > > > what > > > > > > > is > > > > > > > > >> the > > > > > > > > >> > > expected best-case scenario for a performance gain? > > E.g., > > > if > > > > > all > > > > > > > DCs > > > > > > > > >> are > > > > > > > > >> > on > > > > > > > > >> > > the east-cost, say. Right now it's not clear to me. > > > > > > > > >> > > - perhaps performance is not the right metric. Is the > > > metric > > > > > you > > > > > > > are > > > > > > > > >> > > optimizing for latency, throughput or cross-DC cost? > (I > > > > > believe > > > > > > it > > > > > > > > is > > > > > > > > >> > > cross-DC cost from the KIP). Just wanted to > double-check > > > > since > > > > > > I'm > > > > > > > > not > > > > > > > > >> > sure > > > > > > > > >> > > latency would improve. Throughput could really improve > > > from > > > > > > > > >> parallelism > > > > > > > > >> > > (especially in cases when there is mostly consuming > > going > > > > on). > > > > > > So > > > > > > > it > > > > > > > > >> > could > > > > > > > > >> > > be throughput as well. > > > > > > > > >> > > - the proposal would probably lead to choosing a more > > > > complex > > > > > > > > >> > consistency. > > > > > > > > >> > > I tend to like the description Doug Terry has in his > > paper > > > > > > > > "Replicated > > > > > > > > >> > Data > > > > > > > > >> > > Consistency Explained Through Baseball" > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf > > > > > > > > >> > > . > > > > > > > > >> > > To start with, could we get in scenarios where a > client > > > that > > > > > has > > > > > > > > both > > > > > > > > >> a > > > > > > > > >> > > producer and a consumer (e.g., Kafka streams) > produces a > > > > > record, > > > > > > > > then > > > > > > > > >> > > attempts to consume it back and the consume() comes > back > > > > with > > > > > > > > "record > > > > > > > > >> > does > > > > > > > > >> > > not exist"? That's fine, but could complicate > > application > > > > > > handling > > > > > > > > of > > > > > > > > >> > such > > > > > > > > >> > > scenarios. > > > > > > > > >> > > > > > > > > > > >> > > Thanks, > > > > > > > > >> > > Eno > > > > > > > > >> > > > > > > > > > > >> > > On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison < > > > > > > > > >> mickael.mai...@gmail.com > > > > > > > > >> > > > > > > > > > > >> > > wrote: > > > > > > > > >> > > > > > > > > > > >> > > > Hi Jason, > > > > > > > > >> > > > > > > > > > > > >> > > > Very cool KIP! > > > > > > > > >> > > > A couple of questions: > > > > > > > > >> > > > - I'm guessing the selector will be invoke after > each > > > > > > rebalance > > > > > > > so > > > > > > > > >> > > > every time the consumer is assigned a partition it > > will > > > be > > > > > > able > > > > > > > to > > > > > > > > >> > > > select it. Is that true? > > > > > > > > >> > > > > > > > > > > > >> > > > - From the selector API, I'm not sure how the > consumer > > > > will > > > > > be > > > > > > > > able > > > > > > > > >> to > > > > > > > > >> > > > address some of the choices mentioned in "Finding > the > > > > > > preferred > > > > > > > > >> > > > follower". Especially the available bandwidth and > the > > > load > > > > > > > > >> balancing. > > > > > > > > >> > > > By only having the list of Nodes, a consumer can > pick > > > the > > > > > > > nereast > > > > > > > > >> > > > replica (assuming the rack field means anything to > > > users) > > > > or > > > > > > > > balance > > > > > > > > >> > > > its own bandwidth but that might not necessarily > mean > > > > > improved > > > > > > > > >> > > > performance or a balanced load on the brokers. > > > > > > > > >> > > > > > > > > > > > >> > > > Thanks > > > > > > > > >> > > > On Mon, Dec 3, 2018 at 11:35 AM Stanislav Kozlovski > > > > > > > > >> > > > <stanis...@confluent.io> wrote: > > > > > > > > >> > > > > > > > > > > > > >> > > > > Hey Jason, > > > > > > > > >> > > > > > > > > > > > > >> > > > > This is certainly a very exciting KIP. > > > > > > > > >> > > > > I assume that no changes will be made to the > offset > > > > > commits > > > > > > > and > > > > > > > > >> they > > > > > > > > >> > > will > > > > > > > > >> > > > > continue to be sent to the group coordinator? > > > > > > > > >> > > > > > > > > > > > > >> > > > > I also wanted to address metrics - have we > > considered > > > > any > > > > > > > > changes > > > > > > > > >> > > there? > > > > > > > > >> > > > I > > > > > > > > >> > > > > imagine that it would be valuable for users to be > > able > > > > to > > > > > > > > >> > differentiate > > > > > > > > >> > > > > between which consumers' partitions are fetched > from > > > > > > replicas > > > > > > > > and > > > > > > > > >> > which > > > > > > > > >> > > > > aren't. I guess that would need to be addressed > both > > > in > > > > > the > > > > > > > > >> server's > > > > > > > > >> > > > > fetcher lag metrics and in the consumers. > > > > > > > > >> > > > > > > > > > > > > >> > > > > Thanks, > > > > > > > > >> > > > > Stanislav > > > > > > > > >> > > > > > > > > > > > > >> > > > > On Wed, Nov 28, 2018 at 10:08 PM Jun Rao < > > > > > j...@confluent.io> > > > > > > > > >> wrote: > > > > > > > > >> > > > > > > > > > > > > >> > > > > > Hi, Jason, > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > Thanks for the KIP. Looks good overall. A few > > minor > > > > > > comments > > > > > > > > >> below. > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > 1. The section on handling > FETCH_OFFSET_TOO_LARGE > > > > error > > > > > > says > > > > > > > > >> "Use > > > > > > > > >> > the > > > > > > > > >> > > > > > OffsetForLeaderEpoch API to verify the current > > > > position > > > > > > with > > > > > > > > the > > > > > > > > >> > > > leader". > > > > > > > > >> > > > > > The OffsetForLeaderEpoch request returns log end > > > > offset > > > > > if > > > > > > > the > > > > > > > > >> > > request > > > > > > > > >> > > > > > leader epoch is the latest. So, we won't know > the > > > true > > > > > > high > > > > > > > > >> > watermark > > > > > > > > >> > > > from > > > > > > > > >> > > > > > that request. It seems that the consumer still > > needs > > > > to > > > > > > send > > > > > > > > >> > > ListOffset > > > > > > > > >> > > > > > request to the leader to obtain high watermark? > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > 2. If a non in-sync replica receives a fetch > > request > > > > > from > > > > > > a > > > > > > > > >> > consumer, > > > > > > > > >> > > > > > should it return a new type of error like > > > > > > ReplicaNotInSync? > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > 3. Could ReplicaSelector be closable? > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > 4. Currently, the ISR propagation from the > leader > > to > > > > the > > > > > > > > >> controller > > > > > > > > >> > > > can be > > > > > > > > >> > > > > > delayed up to 60 secs through > > > > > > > > >> > > > ReplicaManager.IsrChangePropagationInterval. > > > > > > > > >> > > > > > In that window, the consumer could still be > > > consuming > > > > > > from a > > > > > > > > non > > > > > > > > >> > > > in-sync > > > > > > > > >> > > > > > replica. The relatively large delay is mostly > for > > > > > reducing > > > > > > > the > > > > > > > > >> ZK > > > > > > > > >> > > > writes > > > > > > > > >> > > > > > and the watcher overhead. Not sure what's the > best > > > way > > > > > to > > > > > > > > >> address > > > > > > > > >> > > > this. We > > > > > > > > >> > > > > > could potentially make this configurable. > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > 5. It may be worth mentioning that, to take > > > advantage > > > > of > > > > > > > > >> affinity, > > > > > > > > >> > > one > > > > > > > > >> > > > may > > > > > > > > >> > > > > > also want to have a customized PartitionAssignor > > to > > > > have > > > > > > an > > > > > > > > >> > affinity > > > > > > > > >> > > > aware > > > > > > > > >> > > > > > assignment in addition to a customized > > > > ReplicaSelector. > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > Jun > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > On Wed, Nov 21, 2018 at 12:54 PM Jason > Gustafson < > > > > > > > > >> > ja...@confluent.io > > > > > > > > >> > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > Hi All, > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > I've posted a KIP to add the often-requested > > > support > > > > > for > > > > > > > > >> fetching > > > > > > > > >> > > > from > > > > > > > > >> > > > > > > followers: > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica > > > > > > > > >> > > > > > > . > > > > > > > > >> > > > > > > Please take a look and let me know what you > > think. > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Thanks, > > > > > > > > >> > > > > > > Jason > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > -- > > > > > > > > >> > > > > Best, > > > > > > > > >> > > > > Stanislav > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >