Thanks Jason, that helps. I agree my concerns are orthogonal to reducing cross-DC transfer costs -- I'm only interested in how this affects what happens when a rack is unavailable, since, as the name implies, the whole point of stretching a cluster across availability zones is for increased availability. But it sounds like nothing is changing in that regard, so carry on :)
Ryanne On Wed, Mar 20, 2019 at 6:41 PM Jason Gustafson <ja...@confluent.io> wrote: > Hi Ryanne, > > Thanks, responses below: > > Thanks Jason, I see that the proposed ReplicaSelector would be where that > > decision is made. But I'm not certain how a consumer triggers this > process? > > If a consumer can't reach its preferred rack, how does it ask for a new > > assignment? > > > As documented in the KIP, the consumer chooses whether or not to accept the > preferred replica through the `replica.selection.policy`. The default > behavior is the same as what we have today: the consumer will fetch only > from the leader. If the consumer has opted into preferred replica > selection, then it will use the replica returned in the Metadata response. > If it cannot connect to this replica, it will refresh metadata and try > again. It is the same as when connecting to leaders only. > > I suppose a consumer that can't reach its preferred rack would need to ask > > a broker in a second rack. That broker would refer back to the previous > > rack, unless the broker also knows that the preferred rack is > unreachable. > > But why would the second broker ever refer the consumer to a third rack? > It > > seems that, at that point, the consumer will have already made the > decision > > to contact the second rack, based on some logic of its own. > > > I am not sure what you mean about "preferred rack." I assume you are > referring to preferred replica? As with current logic, if the consumer > cannot make progress with a replica (whether it is the leader or not), it > will refresh metadata and try again. > > I think there may be some confusion about the problem we are trying to > solve. Our interest is giving operators the ability to save cross-dc costs > by leveraging consumer locality. I think you are arguing that follower > fetching may be able to solve a separate problem as well, which is routing > around network partitions. I think this is a fair point. Generally the > replica selection heuristics we would tend to favor more reliable > connections in any case. If the consumer is located in the same > availability zone as the broker, for example, it is more likely to be able > to connect to that broker than in another availability zone or region. So > with this, we are likely in a much better state than we were before when > the consumer is forced to go wherever the leader is. But I think it's fair > that there may be room for additional logic in the future on the client > side. > > Also, if a broker has decided that a rack is overloaded or unreachable, why > > not just automatically rebalance its leaders to another rack? The effect > > would be much the same, and we already support this. > > > This is what we already do today. This seems orthogonal? As I said > initially, selection logic should take replica availability into account. > > w.r.t. availability, my specific concern is that, if brokers are > > responsible for assigning consumers to racks, this means that brokers > must > > decide when a rack is unavailable to a consumer based on some timeout, > and > > then further wait for the consumer to request its new assignment. Then, > > after another timeout, the racks would reelect leaders, possibly causing > > some consumers to change racks again. I think this would end up hurting > > consumer HA to some extent. > > > I am not sure what you mean about racks reelecting leaders. Nothing changes > with regard to leader election in this KIP. That said, I think I understand > the high level concern, which is that the client has some > connection-related information which the broker does not have. I think the > fact is that neither the broker nor the client have complete knowledge. The > client doesn't know anything about load characteristics on the brokers, for > example, which I would say is at least of equal importance. This KIP biases > broker-side selection because that makes it easier to coordinate selection > logic across the many applications running on a cluster. For example, you > easily get consistent behavior across different client implementations. > This simplifies provisioning and makes it easier for operators to evolve > the selection logic. > > I think I can see in the future that we may also want to have some ability > to use follower fetching to route around network problems in the client > itself. The `replica.selection.policy` config for example could provide an > additional option to implement the policy you are suggesting or perhaps > even something custom. I will mention this in the KIP and leave this open > as a potential future improvement. As it stands, users can still choose to > keep current behavior if they do not want to be bound by the broker's > preference. > > Thanks, > Jason > > > > On Wed, Mar 20, 2019 at 10:13 AM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > > > Thanks Jason, I see that the proposed ReplicaSelector would be where that > > decision is made. But I'm not certain how a consumer triggers this > process? > > If a consumer can't reach its preferred rack, how does it ask for a new > > assignment? > > > > I suppose a consumer that can't reach its preferred rack would need to > ask > > a broker in a second rack. That broker would refer back to the previous > > rack, unless the broker also knows that the preferred rack is > unreachable. > > But why would the second broker ever refer the consumer to a third rack? > It > > seems that, at that point, the consumer will have already made the > decision > > to contact the second rack, based on some logic of its own. > > > > Moreover, the broker actually has less information than the consumer does > > w.r.t. latency and reachability. For example, a network might be > segmented > > such that consumers can only reach certain racks, while brokers can all > > reach each other. In that case, the ReplicaSelector would need to be > aware > > of this network topology in order to determine which racks are > preferable. > > On the other hand, consumer logic could detect which racks are reachable, > > the latency when fetching from them, etc. > > > > Or a broker might decide its rack's latency is too high and refer > consumers > > to another rack. But the broker can't predict what the latency from the > > consumers to the new rack will be. In order to make such a decision, each > > broker would need to know the latency between each consumer and every > other > > broker. > > > > Also, if a broker has decided that a rack is overloaded or unreachable, > why > > not just automatically rebalance its leaders to another rack? The effect > > would be much the same, and we already support this. > > > > w.r.t. availability, my specific concern is that, if brokers are > > responsible for assigning consumers to racks, this means that brokers > must > > decide when a rack is unavailable to a consumer based on some timeout, > and > > then further wait for the consumer to request its new assignment. Then, > > after another timeout, the racks would reelect leaders, possibly causing > > some consumers to change racks again. I think this would end up hurting > > consumer HA to some extent. > > > > My suggestion is that we drop the ReplicaSelector concept and instead > > support two simple options in the consumer config: LEADER (default) and > > RACK. The RACK option would cause the client to only connect to the rack > > specified by rack.id. If an application wants to do something fancy like > > automatically failover between racks, the app can just change its > rack.id, > > perhaps after a short timeout. > > > > Ryanne > > > > On Tue, Mar 19, 2019 at 5:55 PM Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hi Ryanne, > > > > > > Thanks for the comment. If I understand your question correctly, I > think > > > the answer is no. I would expect typical selection logic to consider > > > replica availability first before any other factor. In some cases, > > however, > > > a user may put a higher priority on saving cross-dc traffic costs. If a > > > preferred replica is unavailable, they may prefer to wait some time for > > it > > > to be restored before routing traffic elsewhere. Does that make sense? > > > > > > Best, > > > Jason > > > > > > On Tue, Mar 19, 2019 at 3:43 PM Ryanne Dolan <ryannedo...@gmail.com> > > > wrote: > > > > > > > Jason, awesome KIP. > > > > > > > > I'm wondering how this change would affect availability of the > cluster > > > when > > > > a rack is unreachable. Is there a scenario where availability is > > improved > > > > or impaired due to the proposed changes? > > > > > > > > Ryanne > > > > > > > > On Tue, Mar 19, 2019 at 4:32 PM Jason Gustafson <ja...@confluent.io> > > > > wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > Yes, that makes sense to me. I have added a ClientMetadata class > > which > > > > > encapsulates various metadata including the rackId and the client > > > address > > > > > information. > > > > > > > > > > Thanks, > > > > > Jason > > > > > > > > > > On Tue, Mar 19, 2019 at 2:17 PM Jun Rao <j...@confluent.io> wrote: > > > > > > > > > > > Hi, Jason, > > > > > > > > > > > > Thanks for the updated KIP. Just one more comment below. > > > > > > > > > > > > 100. The ReplicaSelector class has the following method. I am > > > wondering > > > > > if > > > > > > we should additionally pass in the client connection info to the > > > > method. > > > > > > For example, if rackId is not set, the plugin could potentially > > > select > > > > > the > > > > > > replica based on the IP address of the client. > > > > > > > > > > > > Node select(String rackId, PartitionInfo partitionInfo) > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Mon, Mar 11, 2019 at 4:24 PM Jason Gustafson < > > ja...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > Hey Everyone, > > > > > > > > > > > > > > Apologies for the long delay. I am picking this work back up. > > > > > > > > > > > > > > After giving this some further thought, I decided it makes the > > most > > > > > sense > > > > > > > to move replica selection logic into the broker. It is much > more > > > > > > difficult > > > > > > > to coordinate selection logic in a multi-tenant environment if > > > > > operators > > > > > > > have to coordinate plugins across all client applications (not > to > > > > > mention > > > > > > > other languages). Take a look at the updates and let me know > what > > > you > > > > > > > think: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica > > > > > > > . > > > > > > > > > > > > > > Thanks! > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 11, 2019 at 10:49 AM Jun Rao <j...@confluent.io> > > wrote: > > > > > > > > > > > > > > > Hi, Jason, > > > > > > > > > > > > > > > > Thanks for the updated KIP. Looks good overall. Just a few > > minor > > > > > > > comments. > > > > > > > > > > > > > > > > 20. For case 2, if the consumer receives an > > > OFFSET_NOT_AVAILABLE, I > > > > > am > > > > > > > > wondering if the consumer should refresh the metadata before > > > > > retrying. > > > > > > > This > > > > > > > > can allow the consumer to switch to an in-sync replica > sooner. > > > > > > > > > > > > > > > > 21. Under "protocol changes", there is a sentence "This > allows > > > the > > > > > > > broker " > > > > > > > > that seems broken. > > > > > > > > > > > > > > > > 4. About reducing the ISR propagation delay from the broker > to > > > the > > > > > > > > controller. Jiangjie made that change in KAFKA-2722. > Jiangjie, > > > > could > > > > > > you > > > > > > > > comment on whether it's reasonable to reduce the propagation > > > delay > > > > > now? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Wed, Jan 2, 2019 at 11:06 AM Jason Gustafson < > > > > ja...@confluent.io> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > > > Sorry for the late reply. I have been giving your comments > > some > > > > > > > thought. > > > > > > > > > Replies below: > > > > > > > > > > > > > > > > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error > says > > > "Use > > > > > the > > > > > > > > > > OffsetForLeaderEpoch API to verify the current position > > with > > > > the > > > > > > > > leader". > > > > > > > > > > The OffsetForLeaderEpoch request returns log end offset > if > > > the > > > > > > > request > > > > > > > > > > leader epoch is the latest. So, we won't know the true > high > > > > > > watermark > > > > > > > > > from > > > > > > > > > > that request. It seems that the consumer still needs to > > send > > > > > > > ListOffset > > > > > > > > > > request to the leader to obtain high watermark? > > > > > > > > > > > > > > > > > > > > > > > > > > > That's a good point. I think we missed this in KIP-320. > I've > > > > added > > > > > a > > > > > > > > > replica_id to the OffsetsForLeaderEpoch API to match the > > Fetch > > > > and > > > > > > > > > ListOffsets API so that the broker can avoid exposing > offsets > > > > > beyond > > > > > > > the > > > > > > > > > high watermark. This also means that the > > OffsetsForLeaderEpoch > > > > API > > > > > > > needs > > > > > > > > > the same handling we added to the ListOffsets API to avoid > > > > > > > non-monotonic > > > > > > > > or > > > > > > > > > incorrect responses. Similarly, I've proposed using the > > > > > > > > > OFFSET_NOT_AVAILABLE error code in cases where the end > offset > > > of > > > > an > > > > > > > epoch > > > > > > > > > would exceed the high watermark. When querying the latest > > > epoch, > > > > > the > > > > > > > > leader > > > > > > > > > will return OFFSET_NOT_AVAILABLE until the high watermark > has > > > > > reached > > > > > > > an > > > > > > > > > offset in the leader's current epoch. > > > > > > > > > > > > > > > > > > By the way, I've modified the KIP to drop the > > OFFSET_TOO_LARGE > > > > and > > > > > > > > > OFFSET_TOO_SMALL error codes that I initially proposed. I > > > > realized > > > > > > that > > > > > > > > we > > > > > > > > > could continue to use the current OFFSET_OUT_OF_RANGE error > > and > > > > > rely > > > > > > on > > > > > > > > the > > > > > > > > > returned start offset to distinguish the two cases. > > > > > > > > > > > > > > > > > > 2. If a non in-sync replica receives a fetch request from a > > > > > consumer, > > > > > > > > > > should it return a new type of error like > ReplicaNotInSync? > > > > > > > > > > > > > > > > > > > > > > > > > > > I gave this quite a bit of thought. It is impossible to > avoid > > > > > > fetching > > > > > > > > from > > > > > > > > > out-of-sync replicas in general due to propagation of the > ISR > > > > > state. > > > > > > > The > > > > > > > > > high watermark that is returned in fetch responses could be > > > used > > > > > as a > > > > > > > > more > > > > > > > > > timely substitute, but we still can't assume that followers > > > will > > > > > > always > > > > > > > > > know when they are in-sync. From a high level, this means > > that > > > > the > > > > > > > > consumer > > > > > > > > > anyway has to take out of range errors with a grain of salt > > if > > > > they > > > > > > > come > > > > > > > > > from followers. This is only a problem when switching > between > > > > > > replicas > > > > > > > or > > > > > > > > > if resuming from a committed offset. If a consumer is > > following > > > > the > > > > > > > same > > > > > > > > > out-of-sync replica, then its position will stay in range > > and, > > > > > other > > > > > > > than > > > > > > > > > some extra latency, no harm will be done. > > > > > > > > > > > > > > > > > > Furthermore, it may not be a good idea for consumers to > chase > > > the > > > > > ISR > > > > > > > too > > > > > > > > > eagerly since this makes the performance profile harder to > > > > predict. > > > > > > The > > > > > > > > > leader itself may have some temporarily increased request > > load > > > > > which > > > > > > is > > > > > > > > > causing followers to fall behind. If consumers then > switched > > to > > > > the > > > > > > > > leader > > > > > > > > > after they observed that the follower was out-of-sync, it > may > > > > make > > > > > > the > > > > > > > > > situation worse. Typically, If a follower has fallen > > > out-of-sync, > > > > > we > > > > > > > > expect > > > > > > > > > it to catch back up shortly. It may be better in this > > scenario > > > to > > > > > > allow > > > > > > > > > consumers to continue fetching from it. On the other hand, > > if a > > > > > > > follower > > > > > > > > > stays out-of-sync for a while, the consumer should have the > > > > choice > > > > > to > > > > > > > > find > > > > > > > > > a new replica. > > > > > > > > > > > > > > > > > > So after thinking about it, I didn't see a lot of benefit > in > > > > trying > > > > > > to > > > > > > > be > > > > > > > > > strict about ISR fetching. Potentially it even has > downsides. > > > > > > Instead, > > > > > > > I > > > > > > > > > now see it as more of a heuristic which the consumer can > use > > to > > > > > keep > > > > > > > > > end-to-end latency reasonably bounded. The consumer already > > has > > > > one > > > > > > > knob > > > > > > > > > the user can tune in order to limit this bound. The ` > > > > > > > metadata.max.age.ms > > > > > > > > ` > > > > > > > > > config controls how often metadata is refreshed. To follow > > the > > > > ISR > > > > > > more > > > > > > > > > closely, the user can refresh metadata more frequently. > > > > > > > > > > > > > > > > > > Note that I've improved the section on out of range > handling > > to > > > > be > > > > > > more > > > > > > > > > explicit about the cases we needed to handle. > > > > > > > > > > > > > > > > > > 3. Could ReplicaSelector be closable? > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, I made this change. As an aside, the question of > whether > > > we > > > > > > should > > > > > > > > use > > > > > > > > > a plugin does deserve a bit of discussion. An alternative > > > > > (suggested > > > > > > by > > > > > > > > > David Arthur) that I've been thinking about is to let the > > > broker > > > > > > select > > > > > > > > the > > > > > > > > > preferred follower to fetch from using the Metadata API. > For > > > > > example, > > > > > > > we > > > > > > > > > could add a `rackId` field to the Metadata API which could > be > > > > > > provided > > > > > > > > > through user configuration. The broker could then order the > > ISR > > > > > list > > > > > > > for > > > > > > > > > each partition so that the preferred follower is returned > > first > > > > > > > > (currently > > > > > > > > > the order is random). The consumer could then always fetch > > from > > > > the > > > > > > > first > > > > > > > > > replica in the ISR list. The benefit is that the broker may > > > have > > > > a > > > > > > > better > > > > > > > > > view of the current load characteristics, so it may be able > > to > > > > make > > > > > > > > better > > > > > > > > > decisions. Client plugins are also much more difficult to > > > > control. > > > > > > This > > > > > > > > may > > > > > > > > > have been the point that Mickael was hinting at above. > > > > > > > > > > > > > > > > > > 4. Currently, the ISR propagation from the leader to the > > > > controller > > > > > > can > > > > > > > > be > > > > > > > > > > delayed up to 60 secs through > > > > > > > > > ReplicaManager.IsrChangePropagationInterval. > > > > > > > > > > In that window, the consumer could still be consuming > from > > a > > > > non > > > > > > > > in-sync > > > > > > > > > > replica. The relatively large delay is mostly for > reducing > > > the > > > > ZK > > > > > > > > writes > > > > > > > > > > and the watcher overhead. Not sure what's the best way to > > > > address > > > > > > > this. > > > > > > > > > We > > > > > > > > > > could potentially make this configurable. > > > > > > > > > > > > > > > > > > > > > > > > > > > This is related to the discussion above. We could make it > > > > > > configurable > > > > > > > I > > > > > > > > > guess. I wonder if it would be reasonable to just reduce > the > > > > > default > > > > > > to > > > > > > > > > something like 10 seconds. Do you think we get much benefit > > > from > > > > > > such a > > > > > > > > > long delay? > > > > > > > > > > > > > > > > > > 5. It may be worth mentioning that, to take advantage of > > > > affinity, > > > > > > one > > > > > > > > may > > > > > > > > > > also want to have a customized PartitionAssignor to have > an > > > > > > affinity > > > > > > > > > aware > > > > > > > > > > assignment in addition to a customized ReplicaSelector. > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, this is a good point. I was assuming a situation in > > which > > > > each > > > > > > > > > partition had its replicas in all the same datacenters, but > > you > > > > are > > > > > > > right > > > > > > > > > that this need not be the case. I will mention this in the > > KIP > > > > and > > > > > > give > > > > > > > > it > > > > > > > > > some more thought. I think in the common case, these > concerns > > > can > > > > > be > > > > > > > > > treated orthogonally, but it is a bit irritating if you > need > > > two > > > > > > > separate > > > > > > > > > plugins to make the benefit more general. > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 11:04 AM Jason Gustafson < > > > > > ja...@confluent.io > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Eno, > > > > > > > > > > > > > > > > > > > > Thanks for the clarification. From a high level, the main > > > thing > > > > > to > > > > > > > keep > > > > > > > > > in > > > > > > > > > > mind is that this is an opt-in feature. It is a bit like > > > using > > > > > > acks=1 > > > > > > > > in > > > > > > > > > > the sense that a user is accepting slightly weaker > > guarantees > > > > in > > > > > > > order > > > > > > > > to > > > > > > > > > > optimize for some metric (in this case, read locality). > The > > > > > default > > > > > > > > > > behavior would read only from the leader and users will > get > > > the > > > > > > usual > > > > > > > > > > semantics. That said, let me address the scenarios you > > > raised: > > > > > > > > > > > > > > > > > > > > - scenario 1: an application that both produces and > > consumes > > > > > (e.g., > > > > > > > > like > > > > > > > > > >> Kafka streams) produces synchronously a single record > to a > > > > topic > > > > > > and > > > > > > > > > then > > > > > > > > > >> attempts to consume that record. Topic is 3-way > replicated > > > > say. > > > > > > > Could > > > > > > > > it > > > > > > > > > >> be > > > > > > > > > >> the case that the produce succeeds but the consume > fails? > > > The > > > > > > > consume > > > > > > > > > >> could > > > > > > > > > >> go to a replica that has not yet fetched the produce > > record, > > > > > > right? > > > > > > > Or > > > > > > > > > is > > > > > > > > > >> that not possible? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it depends on what you mean by "fails." From a > > > replica > > > > in > > > > > > the > > > > > > > > > > ISR's perspective, it has all of the committed data. The > > only > > > > > > > question > > > > > > > > is > > > > > > > > > > what is safe to expose since the high watermark is always > > one > > > > > round > > > > > > > > trip > > > > > > > > > > behind. The proposal is to return a retriable error in > this > > > > case > > > > > so > > > > > > > > that > > > > > > > > > > the consumer can distinguish the case from an out of > range > > > > error > > > > > > and > > > > > > > > > retry. > > > > > > > > > > No error will be returned to the user, but consumption > will > > > be > > > > > > > delayed. > > > > > > > > > One > > > > > > > > > > of the main improvements in the KIP is ensuring that this > > > delay > > > > > is > > > > > > > > > minimal > > > > > > > > > > in the common case. > > > > > > > > > > > > > > > > > > > > Note that even without follower fetching, this scenario > is > > > > > > > unavoidable. > > > > > > > > > > When a replica becomes a leader, it doesn't know what the > > > > latest > > > > > > high > > > > > > > > > > watermark is until it receives fetches from all followers > > in > > > > the > > > > > > ISR. > > > > > > > > > > During this window, committed data is temporarily not > > > visible. > > > > We > > > > > > > > handle > > > > > > > > > > this similarly to what is proposed here. Basically we ask > > the > > > > > > > consumer > > > > > > > > to > > > > > > > > > > retry until we know the data is safe. > > > > > > > > > > > > > > > > > > > > - scenario 2: an application C that only consumes. Again > > say > > > > > there > > > > > > is > > > > > > > > > only > > > > > > > > > >> one record produced (by another application P) to a > > > replicated > > > > > > topic > > > > > > > > and > > > > > > > > > >> that record has not propagated to all replicas yet (it > is > > > only > > > > > at > > > > > > > the > > > > > > > > > >> leader at time t0). Application C attempts to consume at > > > time > > > > t1 > > > > > > and > > > > > > > > it > > > > > > > > > >> does so successfully because the consume fetches from > the > > > > > leader. > > > > > > At > > > > > > > > > time > > > > > > > > > >> t2 the same application seeks to the beginning of the > > topic > > > > and > > > > > > > > attempts > > > > > > > > > >> to > > > > > > > > > >> consume again. Is there a scenario where this second > > attempt > > > > > fails > > > > > > > > > because > > > > > > > > > >> the fetching happens from a replica that does not have > the > > > > > record > > > > > > > yet? > > > > > > > > > At > > > > > > > > > >> time t3 all replicas have the record. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes, this is possible in the way that I described above. > > > There > > > > > is a > > > > > > > > > > (typically short) window in which committed data may not > be > > > > > > visible. > > > > > > > It > > > > > > > > > is > > > > > > > > > > a bit like the partition itself being unavailable > > > temporarily. > > > > > The > > > > > > > data > > > > > > > > > has > > > > > > > > > > not been lost and is guaranteed to be returned, but the > > > > consumer > > > > > > has > > > > > > > to > > > > > > > > > > wait until the follower knows it is safe to return. > > > > > > > > > > > > > > > > > > > > One final note: I am iterating on the design a little bit > > in > > > > > order > > > > > > to > > > > > > > > > > address Jun's comments. Expect a few changes. I realized > > that > > > > > there > > > > > > > is > > > > > > > > > some > > > > > > > > > > inconsistency with the current fetch behavior and > KIP-207. > > It > > > > is > > > > > > > mainly > > > > > > > > > in > > > > > > > > > > regard to how we handle the transition from becoming a > > > follower > > > > > to > > > > > > > > > becoming > > > > > > > > > > a leader. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 11, 2018 at 3:46 AM Eno Thereska < > > > > > > eno.there...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> Hi Jason, > > > > > > > > > >> > > > > > > > > > >> My question was on producer + consumer semantics, not > just > > > the > > > > > > > > producer > > > > > > > > > >> semantics. I'll rephrase it slightly and split into two > > > > > questions: > > > > > > > > > >> - scenario 1: an application that both produces and > > consumes > > > > > > (e.g., > > > > > > > > like > > > > > > > > > >> Kafka streams) produces synchronously a single record > to a > > > > topic > > > > > > and > > > > > > > > > then > > > > > > > > > >> attempts to consume that record. Topic is 3-way > replicated > > > > say. > > > > > > > Could > > > > > > > > it > > > > > > > > > >> be > > > > > > > > > >> the case that the produce succeeds but the consume > fails? > > > The > > > > > > > consume > > > > > > > > > >> could > > > > > > > > > >> go to a replica that has not yet fetched the produce > > record, > > > > > > right? > > > > > > > Or > > > > > > > > > is > > > > > > > > > >> that not possible? > > > > > > > > > >> - scenario 2: an application C that only consumes. Again > > say > > > > > there > > > > > > > is > > > > > > > > > only > > > > > > > > > >> one record produced (by another application P) to a > > > replicated > > > > > > topic > > > > > > > > and > > > > > > > > > >> that record has not propagated to all replicas yet (it > is > > > only > > > > > at > > > > > > > the > > > > > > > > > >> leader at time t0). Application C attempts to consume at > > > time > > > > t1 > > > > > > and > > > > > > > > it > > > > > > > > > >> does so successfully because the consume fetches from > the > > > > > leader. > > > > > > At > > > > > > > > > time > > > > > > > > > >> t2 the same application seeks to the beginning of the > > topic > > > > and > > > > > > > > attempts > > > > > > > > > >> to > > > > > > > > > >> consume again. Is there a scenario where this second > > attempt > > > > > fails > > > > > > > > > because > > > > > > > > > >> the fetching happens from a replica that does not have > the > > > > > record > > > > > > > yet? > > > > > > > > > At > > > > > > > > > >> time t3 all replicas have the record. > > > > > > > > > >> > > > > > > > > > >> Thanks > > > > > > > > > >> Eno > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> On Mon, Dec 10, 2018 at 7:42 PM Jason Gustafson < > > > > > > ja...@confluent.io > > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > >> > > > > > > > > > >> > Hey Eno, > > > > > > > > > >> > > > > > > > > > > >> > Thanks for the comments. However, I'm a bit confused. > > I'm > > > > not > > > > > > > > > >> suggesting we > > > > > > > > > >> > change Produce semantics in any way. All writes still > go > > > > > through > > > > > > > the > > > > > > > > > >> > partition leader and nothing changes with respect to > > > > > committing > > > > > > to > > > > > > > > the > > > > > > > > > >> ISR. > > > > > > > > > >> > The main issue, as I've mentioned in the KIP, is the > > > > increased > > > > > > > > latency > > > > > > > > > >> > before a committed offset is exposed on followers. > > > > > > > > > >> > > > > > > > > > > >> > Perhaps I have misunderstood your question? > > > > > > > > > >> > > > > > > > > > > >> > Thanks, > > > > > > > > > >> > Jason > > > > > > > > > >> > > > > > > > > > > >> > On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska < > > > > > > > eno.there...@gmail.com > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > >> > > > > > > > > > > >> > > Hi Jason, > > > > > > > > > >> > > > > > > > > > > > >> > > This is an interesting KIP. This will have massive > > > > > > implications > > > > > > > > for > > > > > > > > > >> > > consistency and serialization, since currently the > > > leader > > > > > for > > > > > > a > > > > > > > > > >> partition > > > > > > > > > >> > > serializes requests. A few questions for now: > > > > > > > > > >> > > > > > > > > > > > >> > > - before we deal with the complexity, it'd be great > to > > > > see a > > > > > > > crisp > > > > > > > > > >> > example > > > > > > > > > >> > > in the motivation as to when this will have the most > > > > benefit > > > > > > > for a > > > > > > > > > >> > > customer. In particular, although the customer might > > > have > > > > a > > > > > > > > multi-DC > > > > > > > > > >> > > deployment, the DCs could still be close by in a > > region, > > > > so > > > > > > what > > > > > > > > is > > > > > > > > > >> the > > > > > > > > > >> > > expected best-case scenario for a performance gain? > > > E.g., > > > > if > > > > > > all > > > > > > > > DCs > > > > > > > > > >> are > > > > > > > > > >> > on > > > > > > > > > >> > > the east-cost, say. Right now it's not clear to me. > > > > > > > > > >> > > - perhaps performance is not the right metric. Is > the > > > > metric > > > > > > you > > > > > > > > are > > > > > > > > > >> > > optimizing for latency, throughput or cross-DC cost? > > (I > > > > > > believe > > > > > > > it > > > > > > > > > is > > > > > > > > > >> > > cross-DC cost from the KIP). Just wanted to > > double-check > > > > > since > > > > > > > I'm > > > > > > > > > not > > > > > > > > > >> > sure > > > > > > > > > >> > > latency would improve. Throughput could really > improve > > > > from > > > > > > > > > >> parallelism > > > > > > > > > >> > > (especially in cases when there is mostly consuming > > > going > > > > > on). > > > > > > > So > > > > > > > > it > > > > > > > > > >> > could > > > > > > > > > >> > > be throughput as well. > > > > > > > > > >> > > - the proposal would probably lead to choosing a > more > > > > > complex > > > > > > > > > >> > consistency. > > > > > > > > > >> > > I tend to like the description Doug Terry has in his > > > paper > > > > > > > > > "Replicated > > > > > > > > > >> > Data > > > > > > > > > >> > > Consistency Explained Through Baseball" > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf > > > > > > > > > >> > > . > > > > > > > > > >> > > To start with, could we get in scenarios where a > > client > > > > that > > > > > > has > > > > > > > > > both > > > > > > > > > >> a > > > > > > > > > >> > > producer and a consumer (e.g., Kafka streams) > > produces a > > > > > > record, > > > > > > > > > then > > > > > > > > > >> > > attempts to consume it back and the consume() comes > > back > > > > > with > > > > > > > > > "record > > > > > > > > > >> > does > > > > > > > > > >> > > not exist"? That's fine, but could complicate > > > application > > > > > > > handling > > > > > > > > > of > > > > > > > > > >> > such > > > > > > > > > >> > > scenarios. > > > > > > > > > >> > > > > > > > > > > > >> > > Thanks, > > > > > > > > > >> > > Eno > > > > > > > > > >> > > > > > > > > > > > >> > > On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison < > > > > > > > > > >> mickael.mai...@gmail.com > > > > > > > > > >> > > > > > > > > > > > >> > > wrote: > > > > > > > > > >> > > > > > > > > > > > >> > > > Hi Jason, > > > > > > > > > >> > > > > > > > > > > > > >> > > > Very cool KIP! > > > > > > > > > >> > > > A couple of questions: > > > > > > > > > >> > > > - I'm guessing the selector will be invoke after > > each > > > > > > > rebalance > > > > > > > > so > > > > > > > > > >> > > > every time the consumer is assigned a partition it > > > will > > > > be > > > > > > > able > > > > > > > > to > > > > > > > > > >> > > > select it. Is that true? > > > > > > > > > >> > > > > > > > > > > > > >> > > > - From the selector API, I'm not sure how the > > consumer > > > > > will > > > > > > be > > > > > > > > > able > > > > > > > > > >> to > > > > > > > > > >> > > > address some of the choices mentioned in "Finding > > the > > > > > > > preferred > > > > > > > > > >> > > > follower". Especially the available bandwidth and > > the > > > > load > > > > > > > > > >> balancing. > > > > > > > > > >> > > > By only having the list of Nodes, a consumer can > > pick > > > > the > > > > > > > > nereast > > > > > > > > > >> > > > replica (assuming the rack field means anything to > > > > users) > > > > > or > > > > > > > > > balance > > > > > > > > > >> > > > its own bandwidth but that might not necessarily > > mean > > > > > > improved > > > > > > > > > >> > > > performance or a balanced load on the brokers. > > > > > > > > > >> > > > > > > > > > > > > >> > > > Thanks > > > > > > > > > >> > > > On Mon, Dec 3, 2018 at 11:35 AM Stanislav > Kozlovski > > > > > > > > > >> > > > <stanis...@confluent.io> wrote: > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Hey Jason, > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > This is certainly a very exciting KIP. > > > > > > > > > >> > > > > I assume that no changes will be made to the > > offset > > > > > > commits > > > > > > > > and > > > > > > > > > >> they > > > > > > > > > >> > > will > > > > > > > > > >> > > > > continue to be sent to the group coordinator? > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > I also wanted to address metrics - have we > > > considered > > > > > any > > > > > > > > > changes > > > > > > > > > >> > > there? > > > > > > > > > >> > > > I > > > > > > > > > >> > > > > imagine that it would be valuable for users to > be > > > able > > > > > to > > > > > > > > > >> > differentiate > > > > > > > > > >> > > > > between which consumers' partitions are fetched > > from > > > > > > > replicas > > > > > > > > > and > > > > > > > > > >> > which > > > > > > > > > >> > > > > aren't. I guess that would need to be addressed > > both > > > > in > > > > > > the > > > > > > > > > >> server's > > > > > > > > > >> > > > > fetcher lag metrics and in the consumers. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Thanks, > > > > > > > > > >> > > > > Stanislav > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > On Wed, Nov 28, 2018 at 10:08 PM Jun Rao < > > > > > > j...@confluent.io> > > > > > > > > > >> wrote: > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > Hi, Jason, > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Thanks for the KIP. Looks good overall. A few > > > minor > > > > > > > comments > > > > > > > > > >> below. > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > 1. The section on handling > > FETCH_OFFSET_TOO_LARGE > > > > > error > > > > > > > says > > > > > > > > > >> "Use > > > > > > > > > >> > the > > > > > > > > > >> > > > > > OffsetForLeaderEpoch API to verify the current > > > > > position > > > > > > > with > > > > > > > > > the > > > > > > > > > >> > > > leader". > > > > > > > > > >> > > > > > The OffsetForLeaderEpoch request returns log > end > > > > > offset > > > > > > if > > > > > > > > the > > > > > > > > > >> > > request > > > > > > > > > >> > > > > > leader epoch is the latest. So, we won't know > > the > > > > true > > > > > > > high > > > > > > > > > >> > watermark > > > > > > > > > >> > > > from > > > > > > > > > >> > > > > > that request. It seems that the consumer still > > > needs > > > > > to > > > > > > > send > > > > > > > > > >> > > ListOffset > > > > > > > > > >> > > > > > request to the leader to obtain high > watermark? > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > 2. If a non in-sync replica receives a fetch > > > request > > > > > > from > > > > > > > a > > > > > > > > > >> > consumer, > > > > > > > > > >> > > > > > should it return a new type of error like > > > > > > > ReplicaNotInSync? > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > 3. Could ReplicaSelector be closable? > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > 4. Currently, the ISR propagation from the > > leader > > > to > > > > > the > > > > > > > > > >> controller > > > > > > > > > >> > > > can be > > > > > > > > > >> > > > > > delayed up to 60 secs through > > > > > > > > > >> > > > ReplicaManager.IsrChangePropagationInterval. > > > > > > > > > >> > > > > > In that window, the consumer could still be > > > > consuming > > > > > > > from a > > > > > > > > > non > > > > > > > > > >> > > > in-sync > > > > > > > > > >> > > > > > replica. The relatively large delay is mostly > > for > > > > > > reducing > > > > > > > > the > > > > > > > > > >> ZK > > > > > > > > > >> > > > writes > > > > > > > > > >> > > > > > and the watcher overhead. Not sure what's the > > best > > > > way > > > > > > to > > > > > > > > > >> address > > > > > > > > > >> > > > this. We > > > > > > > > > >> > > > > > could potentially make this configurable. > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > 5. It may be worth mentioning that, to take > > > > advantage > > > > > of > > > > > > > > > >> affinity, > > > > > > > > > >> > > one > > > > > > > > > >> > > > may > > > > > > > > > >> > > > > > also want to have a customized > PartitionAssignor > > > to > > > > > have > > > > > > > an > > > > > > > > > >> > affinity > > > > > > > > > >> > > > aware > > > > > > > > > >> > > > > > assignment in addition to a customized > > > > > ReplicaSelector. > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > Jun > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > On Wed, Nov 21, 2018 at 12:54 PM Jason > > Gustafson < > > > > > > > > > >> > ja...@confluent.io > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > wrote: > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > Hi All, > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > I've posted a KIP to add the often-requested > > > > support > > > > > > for > > > > > > > > > >> fetching > > > > > > > > > >> > > > from > > > > > > > > > >> > > > > > > followers: > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica > > > > > > > > > >> > > > > > > . > > > > > > > > > >> > > > > > > Please take a look and let me know what you > > > think. > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > Thanks, > > > > > > > > > >> > > > > > > Jason > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > -- > > > > > > > > > >> > > > > Best, > > > > > > > > > >> > > > > Stanislav > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >