Thanks Jason, that helps. I agree my concerns are orthogonal to reducing
cross-DC transfer costs -- I'm only interested in how this affects what
happens when a rack is unavailable, since, as the name implies, the whole
point of stretching a cluster across availability zones is for increased
availability. But it sounds like nothing is changing in that regard, so
carry on :)

Ryanne

On Wed, Mar 20, 2019 at 6:41 PM Jason Gustafson <ja...@confluent.io> wrote:

> Hi Ryanne,
>
> Thanks, responses below:
>
> Thanks Jason, I see that the proposed ReplicaSelector would be where that
> > decision is made. But I'm not certain how a consumer triggers this
> process?
> > If a consumer can't reach its preferred rack, how does it ask for a new
> > assignment?
>
>
> As documented in the KIP, the consumer chooses whether or not to accept the
> preferred replica through the `replica.selection.policy`. The default
> behavior is the same as what we have today: the consumer will fetch only
> from the leader. If the consumer has opted into preferred replica
> selection, then it will use the replica returned in the Metadata response.
> If it cannot connect to this replica, it will refresh metadata and try
> again. It is the same as when connecting to leaders only.
>
> I suppose a consumer that can't reach its preferred rack would need to ask
> > a broker in a second rack. That broker would refer back to the previous
> > rack, unless the broker also knows that the preferred rack is
> unreachable.
> > But why would the second broker ever refer the consumer to a third rack?
> It
> > seems that, at that point, the consumer will have already made the
> decision
> > to contact the second rack, based on some logic of its own.
>
>
> I am not sure what you mean about "preferred rack." I assume you are
> referring to preferred replica? As with current logic, if the consumer
> cannot make progress with a replica (whether it is the leader or not), it
> will refresh metadata and try again.
>
> I think there may be some confusion about the problem we are trying to
> solve. Our interest is giving operators the ability to save cross-dc costs
> by leveraging consumer locality. I think you are arguing that follower
> fetching may be able to solve a separate problem as well, which is routing
> around network partitions. I think this is a fair point. Generally the
> replica selection heuristics we would tend to favor more reliable
> connections in any case. If the consumer is located in the same
> availability zone as the broker, for example, it is more likely to be able
> to connect to that broker than in another availability zone or region. So
> with this, we are likely in a much better state than we were before when
> the consumer is forced to go wherever the leader is. But I think it's fair
> that there may be room for additional logic in the future on the client
> side.
>
> Also, if a broker has decided that a rack is overloaded or unreachable, why
> > not just automatically rebalance its leaders to another rack? The effect
> > would be much the same, and we already support this.
>
>
> This is what we already do today. This seems orthogonal? As I said
> initially, selection logic should take replica availability into account.
>
> w.r.t. availability, my specific concern is that, if brokers are
> > responsible for assigning consumers to racks, this means that brokers
> must
> > decide when a rack is unavailable to a consumer based on some timeout,
> and
> > then further wait for the consumer to request its new assignment. Then,
> > after another timeout, the racks would reelect leaders, possibly causing
> > some consumers to change racks again. I think this would end up hurting
> > consumer HA to some extent.
>
>
> I am not sure what you mean about racks reelecting leaders. Nothing changes
> with regard to leader election in this KIP. That said, I think I understand
> the high level concern, which is that the client has some
> connection-related information which the broker does not have. I think the
> fact is that neither the broker nor the client have complete knowledge. The
> client doesn't know anything about load characteristics on the brokers, for
> example, which I would say is at least of equal importance. This KIP biases
> broker-side selection because that makes it easier to coordinate selection
> logic across the many applications running on a cluster. For example, you
> easily get consistent behavior across different client implementations.
> This simplifies provisioning and makes it easier for operators to evolve
> the selection logic.
>
> I think I can see in the future that we may also want to have some ability
> to use follower fetching to route around network problems in the client
> itself. The `replica.selection.policy` config for example could provide an
> additional option to implement the policy you are suggesting or perhaps
> even something custom. I will mention this in the KIP and leave this open
> as a potential future improvement. As it stands, users can still choose to
> keep current behavior if they do not want to be bound by the broker's
> preference.
>
> Thanks,
> Jason
>
>
>
> On Wed, Mar 20, 2019 at 10:13 AM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
> > Thanks Jason, I see that the proposed ReplicaSelector would be where that
> > decision is made. But I'm not certain how a consumer triggers this
> process?
> > If a consumer can't reach its preferred rack, how does it ask for a new
> > assignment?
> >
> > I suppose a consumer that can't reach its preferred rack would need to
> ask
> > a broker in a second rack. That broker would refer back to the previous
> > rack, unless the broker also knows that the preferred rack is
> unreachable.
> > But why would the second broker ever refer the consumer to a third rack?
> It
> > seems that, at that point, the consumer will have already made the
> decision
> > to contact the second rack, based on some logic of its own.
> >
> > Moreover, the broker actually has less information than the consumer does
> > w.r.t. latency and reachability. For example, a network might be
> segmented
> > such that consumers can only reach certain racks, while brokers can all
> > reach each other. In that case, the ReplicaSelector would need to be
> aware
> > of this network topology in order to determine which racks are
> preferable.
> > On the other hand, consumer logic could detect which racks are reachable,
> > the latency when fetching from them, etc.
> >
> > Or a broker might decide its rack's latency is too high and refer
> consumers
> > to another rack. But the broker can't predict what the latency from the
> > consumers to the new rack will be. In order to make such a decision, each
> > broker would need to know the latency between each consumer and every
> other
> > broker.
> >
> > Also, if a broker has decided that a rack is overloaded or unreachable,
> why
> > not just automatically rebalance its leaders to another rack? The effect
> > would be much the same, and we already support this.
> >
> > w.r.t. availability, my specific concern is that, if brokers are
> > responsible for assigning consumers to racks, this means that brokers
> must
> > decide when a rack is unavailable to a consumer based on some timeout,
> and
> > then further wait for the consumer to request its new assignment. Then,
> > after another timeout, the racks would reelect leaders, possibly causing
> > some consumers to change racks again. I think this would end up hurting
> > consumer HA to some extent.
> >
> > My suggestion is that we drop the ReplicaSelector concept and instead
> > support two simple options in the consumer config: LEADER (default) and
> > RACK. The RACK option would cause the client to only connect to the rack
> > specified by rack.id. If an application wants to do something fancy like
> > automatically failover between racks, the app can just change its
> rack.id,
> > perhaps after a short timeout.
> >
> > Ryanne
> >
> > On Tue, Mar 19, 2019 at 5:55 PM Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Ryanne,
> > >
> > > Thanks for the comment. If I understand your question correctly, I
> think
> > > the answer is no. I would expect typical selection logic to consider
> > > replica availability first before any other factor. In some cases,
> > however,
> > > a user may put a higher priority on saving cross-dc traffic costs. If a
> > > preferred replica is unavailable, they may prefer to wait some time for
> > it
> > > to be restored before routing traffic elsewhere. Does that make sense?
> > >
> > > Best,
> > > Jason
> > >
> > > On Tue, Mar 19, 2019 at 3:43 PM Ryanne Dolan <ryannedo...@gmail.com>
> > > wrote:
> > >
> > > > Jason, awesome KIP.
> > > >
> > > > I'm wondering how this change would affect availability of the
> cluster
> > > when
> > > > a rack is unreachable. Is there a scenario where availability is
> > improved
> > > > or impaired due to the proposed changes?
> > > >
> > > > Ryanne
> > > >
> > > > On Tue, Mar 19, 2019 at 4:32 PM Jason Gustafson <ja...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Yes, that makes sense to me. I have added a ClientMetadata class
> > which
> > > > > encapsulates various metadata including the rackId and the client
> > > address
> > > > > information.
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Tue, Mar 19, 2019 at 2:17 PM Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > > Hi, Jason,
> > > > > >
> > > > > > Thanks for the updated KIP. Just one more comment below.
> > > > > >
> > > > > > 100. The ReplicaSelector class has the following method. I am
> > > wondering
> > > > > if
> > > > > > we should additionally pass in the client connection info to the
> > > > method.
> > > > > > For example, if rackId is not set, the plugin could potentially
> > > select
> > > > > the
> > > > > > replica based on the IP address of the client.
> > > > > >
> > > > > > Node select(String rackId, PartitionInfo partitionInfo)
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Mon, Mar 11, 2019 at 4:24 PM Jason Gustafson <
> > ja...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Hey Everyone,
> > > > > > >
> > > > > > > Apologies for the long delay. I am picking this work back up.
> > > > > > >
> > > > > > > After giving this some further thought, I decided it makes the
> > most
> > > > > sense
> > > > > > > to move replica selection logic into the broker. It is much
> more
> > > > > > difficult
> > > > > > > to coordinate selection logic in a multi-tenant environment if
> > > > > operators
> > > > > > > have to coordinate plugins across all client applications (not
> to
> > > > > mention
> > > > > > > other languages). Take a look at the updates and let me know
> what
> > > you
> > > > > > > think:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > > > > > > .
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Jason
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jan 11, 2019 at 10:49 AM Jun Rao <j...@confluent.io>
> > wrote:
> > > > > > >
> > > > > > > > Hi, Jason,
> > > > > > > >
> > > > > > > > Thanks for the updated KIP. Looks good overall. Just a few
> > minor
> > > > > > > comments.
> > > > > > > >
> > > > > > > > 20. For case 2, if the consumer receives an
> > > OFFSET_NOT_AVAILABLE, I
> > > > > am
> > > > > > > > wondering if the consumer should refresh the metadata before
> > > > > retrying.
> > > > > > > This
> > > > > > > > can allow the consumer to switch to an in-sync replica
> sooner.
> > > > > > > >
> > > > > > > > 21. Under "protocol changes", there is a sentence "This
> allows
> > > the
> > > > > > > broker "
> > > > > > > > that seems broken.
> > > > > > > >
> > > > > > > > 4. About reducing the ISR propagation delay from the broker
> to
> > > the
> > > > > > > > controller. Jiangjie made that change in KAFKA-2722.
> Jiangjie,
> > > > could
> > > > > > you
> > > > > > > > comment on whether it's reasonable to reduce the propagation
> > > delay
> > > > > now?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Wed, Jan 2, 2019 at 11:06 AM Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Jun,
> > > > > > > > >
> > > > > > > > > Sorry for the late reply. I have been giving your comments
> > some
> > > > > > > thought.
> > > > > > > > > Replies below:
> > > > > > > > >
> > > > > > > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error
> says
> > > "Use
> > > > > the
> > > > > > > > > > OffsetForLeaderEpoch API to verify the current position
> > with
> > > > the
> > > > > > > > leader".
> > > > > > > > > > The OffsetForLeaderEpoch request returns log end offset
> if
> > > the
> > > > > > > request
> > > > > > > > > > leader epoch is the latest. So, we won't know the true
> high
> > > > > > watermark
> > > > > > > > > from
> > > > > > > > > > that request. It seems that the consumer still needs to
> > send
> > > > > > > ListOffset
> > > > > > > > > > request to the leader to obtain high watermark?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > That's a good point. I think we missed this in KIP-320.
> I've
> > > > added
> > > > > a
> > > > > > > > > replica_id to the OffsetsForLeaderEpoch API to match the
> > Fetch
> > > > and
> > > > > > > > > ListOffsets API so that the broker can avoid exposing
> offsets
> > > > > beyond
> > > > > > > the
> > > > > > > > > high watermark. This also means that the
> > OffsetsForLeaderEpoch
> > > > API
> > > > > > > needs
> > > > > > > > > the same handling we added to the ListOffsets API to avoid
> > > > > > > non-monotonic
> > > > > > > > or
> > > > > > > > > incorrect responses. Similarly, I've proposed using the
> > > > > > > > > OFFSET_NOT_AVAILABLE error code in cases where the end
> offset
> > > of
> > > > an
> > > > > > > epoch
> > > > > > > > > would exceed the high watermark. When querying the latest
> > > epoch,
> > > > > the
> > > > > > > > leader
> > > > > > > > > will return OFFSET_NOT_AVAILABLE until the high watermark
> has
> > > > > reached
> > > > > > > an
> > > > > > > > > offset in the leader's current epoch.
> > > > > > > > >
> > > > > > > > > By the way, I've modified the KIP to drop the
> > OFFSET_TOO_LARGE
> > > > and
> > > > > > > > > OFFSET_TOO_SMALL error codes that I initially proposed. I
> > > > realized
> > > > > > that
> > > > > > > > we
> > > > > > > > > could continue to use the current OFFSET_OUT_OF_RANGE error
> > and
> > > > > rely
> > > > > > on
> > > > > > > > the
> > > > > > > > > returned start offset to distinguish the two cases.
> > > > > > > > >
> > > > > > > > > 2. If a non in-sync replica receives a fetch request from a
> > > > > consumer,
> > > > > > > > > > should it return a new type of error like
> ReplicaNotInSync?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I gave this quite a bit of thought. It is impossible to
> avoid
> > > > > > fetching
> > > > > > > > from
> > > > > > > > > out-of-sync replicas in general due to propagation of the
> ISR
> > > > > state.
> > > > > > > The
> > > > > > > > > high watermark that is returned in fetch responses could be
> > > used
> > > > > as a
> > > > > > > > more
> > > > > > > > > timely substitute, but we still can't assume that followers
> > > will
> > > > > > always
> > > > > > > > > know when they are in-sync. From a high level, this means
> > that
> > > > the
> > > > > > > > consumer
> > > > > > > > > anyway has to take out of range errors with a grain of salt
> > if
> > > > they
> > > > > > > come
> > > > > > > > > from followers. This is only a problem when switching
> between
> > > > > > replicas
> > > > > > > or
> > > > > > > > > if resuming from a committed offset. If a consumer is
> > following
> > > > the
> > > > > > > same
> > > > > > > > > out-of-sync replica, then its position will stay in range
> > and,
> > > > > other
> > > > > > > than
> > > > > > > > > some extra latency, no harm will be done.
> > > > > > > > >
> > > > > > > > > Furthermore, it may not be a good idea for consumers to
> chase
> > > the
> > > > > ISR
> > > > > > > too
> > > > > > > > > eagerly since this makes the performance profile harder to
> > > > predict.
> > > > > > The
> > > > > > > > > leader itself may have some temporarily increased request
> > load
> > > > > which
> > > > > > is
> > > > > > > > > causing followers to fall behind. If consumers then
> switched
> > to
> > > > the
> > > > > > > > leader
> > > > > > > > > after they observed that the follower was out-of-sync, it
> may
> > > > make
> > > > > > the
> > > > > > > > > situation worse. Typically, If a follower has fallen
> > > out-of-sync,
> > > > > we
> > > > > > > > expect
> > > > > > > > > it to catch back up shortly. It may be better in this
> > scenario
> > > to
> > > > > > allow
> > > > > > > > > consumers to continue fetching from it. On the other hand,
> > if a
> > > > > > > follower
> > > > > > > > > stays out-of-sync for a while, the consumer should have the
> > > > choice
> > > > > to
> > > > > > > > find
> > > > > > > > > a new replica.
> > > > > > > > >
> > > > > > > > > So after thinking about it, I didn't see a lot of benefit
> in
> > > > trying
> > > > > > to
> > > > > > > be
> > > > > > > > > strict about ISR fetching. Potentially it even has
> downsides.
> > > > > > Instead,
> > > > > > > I
> > > > > > > > > now see it as more of a heuristic which the consumer can
> use
> > to
> > > > > keep
> > > > > > > > > end-to-end latency reasonably bounded. The consumer already
> > has
> > > > one
> > > > > > > knob
> > > > > > > > > the user can tune in order to limit this bound. The `
> > > > > > > metadata.max.age.ms
> > > > > > > > `
> > > > > > > > > config controls how often metadata is refreshed. To follow
> > the
> > > > ISR
> > > > > > more
> > > > > > > > > closely, the user can refresh metadata more frequently.
> > > > > > > > >
> > > > > > > > > Note that I've improved the section on out of range
> handling
> > to
> > > > be
> > > > > > more
> > > > > > > > > explicit about the cases we needed to handle.
> > > > > > > > >
> > > > > > > > > 3. Could ReplicaSelector be closable?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, I made this change. As an aside, the question of
> whether
> > > we
> > > > > > should
> > > > > > > > use
> > > > > > > > > a plugin does deserve a bit of discussion. An alternative
> > > > > (suggested
> > > > > > by
> > > > > > > > > David Arthur) that I've been thinking about is to let the
> > > broker
> > > > > > select
> > > > > > > > the
> > > > > > > > > preferred follower to fetch from using the Metadata API.
> For
> > > > > example,
> > > > > > > we
> > > > > > > > > could add a `rackId` field to the Metadata API which could
> be
> > > > > > provided
> > > > > > > > > through user configuration. The broker could then order the
> > ISR
> > > > > list
> > > > > > > for
> > > > > > > > > each partition so that the preferred follower is returned
> > first
> > > > > > > > (currently
> > > > > > > > > the order is random). The consumer could then always fetch
> > from
> > > > the
> > > > > > > first
> > > > > > > > > replica in the ISR list. The benefit is that the broker may
> > > have
> > > > a
> > > > > > > better
> > > > > > > > > view of the current load characteristics, so it may be able
> > to
> > > > make
> > > > > > > > better
> > > > > > > > > decisions. Client plugins are also much more difficult to
> > > > control.
> > > > > > This
> > > > > > > > may
> > > > > > > > > have been the point that Mickael was hinting at above.
> > > > > > > > >
> > > > > > > > > 4. Currently, the ISR propagation from the leader to the
> > > > controller
> > > > > > can
> > > > > > > > be
> > > > > > > > > > delayed up to 60 secs through
> > > > > > > > > ReplicaManager.IsrChangePropagationInterval.
> > > > > > > > > > In that window, the consumer could still be consuming
> from
> > a
> > > > non
> > > > > > > > in-sync
> > > > > > > > > > replica. The relatively large delay is mostly for
> reducing
> > > the
> > > > ZK
> > > > > > > > writes
> > > > > > > > > > and the watcher overhead. Not sure what's the best way to
> > > > address
> > > > > > > this.
> > > > > > > > > We
> > > > > > > > > > could potentially make this configurable.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > This is related to the discussion above. We could make it
> > > > > > configurable
> > > > > > > I
> > > > > > > > > guess. I wonder if it would be reasonable to just reduce
> the
> > > > > default
> > > > > > to
> > > > > > > > > something like 10 seconds. Do you think we get much benefit
> > > from
> > > > > > such a
> > > > > > > > > long delay?
> > > > > > > > >
> > > > > > > > > 5. It may be worth mentioning that, to take advantage of
> > > > affinity,
> > > > > > one
> > > > > > > > may
> > > > > > > > > > also want to have a customized PartitionAssignor to have
> an
> > > > > > affinity
> > > > > > > > > aware
> > > > > > > > > > assignment in addition to a customized ReplicaSelector.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, this is a good point. I was assuming a situation in
> > which
> > > > each
> > > > > > > > > partition had its replicas in all the same datacenters, but
> > you
> > > > are
> > > > > > > right
> > > > > > > > > that this need not be the case. I will mention this in the
> > KIP
> > > > and
> > > > > > give
> > > > > > > > it
> > > > > > > > > some more thought. I think in the common case, these
> concerns
> > > can
> > > > > be
> > > > > > > > > treated orthogonally, but it is a bit irritating if you
> need
> > > two
> > > > > > > separate
> > > > > > > > > plugins to make the benefit more general.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Jason
> > > > > > > > >
> > > > > > > > > On Tue, Dec 11, 2018 at 11:04 AM Jason Gustafson <
> > > > > ja...@confluent.io
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Eno,
> > > > > > > > > >
> > > > > > > > > > Thanks for the clarification. From a high level, the main
> > > thing
> > > > > to
> > > > > > > keep
> > > > > > > > > in
> > > > > > > > > > mind is that this is an opt-in feature. It is a bit like
> > > using
> > > > > > acks=1
> > > > > > > > in
> > > > > > > > > > the sense that a user is accepting slightly weaker
> > guarantees
> > > > in
> > > > > > > order
> > > > > > > > to
> > > > > > > > > > optimize for some metric (in this case, read locality).
> The
> > > > > default
> > > > > > > > > > behavior would read only from the leader and users will
> get
> > > the
> > > > > > usual
> > > > > > > > > > semantics. That said, let me address the scenarios you
> > > raised:
> > > > > > > > > >
> > > > > > > > > > - scenario 1: an application that both produces and
> > consumes
> > > > > (e.g.,
> > > > > > > > like
> > > > > > > > > >> Kafka streams) produces synchronously a single record
> to a
> > > > topic
> > > > > > and
> > > > > > > > > then
> > > > > > > > > >> attempts to consume that record. Topic is 3-way
> replicated
> > > > say.
> > > > > > > Could
> > > > > > > > it
> > > > > > > > > >> be
> > > > > > > > > >> the case that the produce succeeds but the consume
> fails?
> > > The
> > > > > > > consume
> > > > > > > > > >> could
> > > > > > > > > >> go to a replica that has not yet fetched the produce
> > record,
> > > > > > right?
> > > > > > > Or
> > > > > > > > > is
> > > > > > > > > >> that not possible?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I think it depends on what you mean by "fails." From a
> > > replica
> > > > in
> > > > > > the
> > > > > > > > > > ISR's perspective, it has all of the committed data. The
> > only
> > > > > > > question
> > > > > > > > is
> > > > > > > > > > what is safe to expose since the high watermark is always
> > one
> > > > > round
> > > > > > > > trip
> > > > > > > > > > behind. The proposal is to return a retriable error in
> this
> > > > case
> > > > > so
> > > > > > > > that
> > > > > > > > > > the consumer can distinguish the case from an out of
> range
> > > > error
> > > > > > and
> > > > > > > > > retry.
> > > > > > > > > > No error will be returned to the user, but consumption
> will
> > > be
> > > > > > > delayed.
> > > > > > > > > One
> > > > > > > > > > of the main improvements in the KIP is ensuring that this
> > > delay
> > > > > is
> > > > > > > > > minimal
> > > > > > > > > > in the common case.
> > > > > > > > > >
> > > > > > > > > > Note that even without follower fetching, this scenario
> is
> > > > > > > unavoidable.
> > > > > > > > > > When a replica becomes a leader, it doesn't know what the
> > > > latest
> > > > > > high
> > > > > > > > > > watermark is until it receives fetches from all followers
> > in
> > > > the
> > > > > > ISR.
> > > > > > > > > > During this window, committed data is temporarily not
> > > visible.
> > > > We
> > > > > > > > handle
> > > > > > > > > > this similarly to what is proposed here. Basically we ask
> > the
> > > > > > > consumer
> > > > > > > > to
> > > > > > > > > > retry until we know the data is safe.
> > > > > > > > > >
> > > > > > > > > > - scenario 2: an application C that only consumes. Again
> > say
> > > > > there
> > > > > > is
> > > > > > > > > only
> > > > > > > > > >> one record produced (by another application P) to a
> > > replicated
> > > > > > topic
> > > > > > > > and
> > > > > > > > > >> that record has not propagated to all replicas yet (it
> is
> > > only
> > > > > at
> > > > > > > the
> > > > > > > > > >> leader at time t0). Application C attempts to consume at
> > > time
> > > > t1
> > > > > > and
> > > > > > > > it
> > > > > > > > > >> does so successfully because the consume fetches from
> the
> > > > > leader.
> > > > > > At
> > > > > > > > > time
> > > > > > > > > >> t2 the same application seeks to the beginning of the
> > topic
> > > > and
> > > > > > > > attempts
> > > > > > > > > >> to
> > > > > > > > > >> consume again. Is there a scenario where this second
> > attempt
> > > > > fails
> > > > > > > > > because
> > > > > > > > > >> the fetching happens from a replica that does not have
> the
> > > > > record
> > > > > > > yet?
> > > > > > > > > At
> > > > > > > > > >> time t3 all replicas have the record.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes, this is possible in the way that I described above.
> > > There
> > > > > is a
> > > > > > > > > > (typically short) window in which committed data may not
> be
> > > > > > visible.
> > > > > > > It
> > > > > > > > > is
> > > > > > > > > > a bit like the partition itself being unavailable
> > > temporarily.
> > > > > The
> > > > > > > data
> > > > > > > > > has
> > > > > > > > > > not been lost and is guaranteed to be returned, but the
> > > > consumer
> > > > > > has
> > > > > > > to
> > > > > > > > > > wait until the follower knows it is safe to return.
> > > > > > > > > >
> > > > > > > > > > One final note: I am iterating on the design a little bit
> > in
> > > > > order
> > > > > > to
> > > > > > > > > > address Jun's comments. Expect a few changes. I realized
> > that
> > > > > there
> > > > > > > is
> > > > > > > > > some
> > > > > > > > > > inconsistency with the current fetch behavior and
> KIP-207.
> > It
> > > > is
> > > > > > > mainly
> > > > > > > > > in
> > > > > > > > > > regard to how we handle the transition from becoming a
> > > follower
> > > > > to
> > > > > > > > > becoming
> > > > > > > > > > a leader.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Jason
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Dec 11, 2018 at 3:46 AM Eno Thereska <
> > > > > > eno.there...@gmail.com
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi Jason,
> > > > > > > > > >>
> > > > > > > > > >> My question was on producer + consumer semantics, not
> just
> > > the
> > > > > > > > producer
> > > > > > > > > >> semantics. I'll rephrase it slightly and split into two
> > > > > questions:
> > > > > > > > > >> - scenario 1: an application that both produces and
> > consumes
> > > > > > (e.g.,
> > > > > > > > like
> > > > > > > > > >> Kafka streams) produces synchronously a single record
> to a
> > > > topic
> > > > > > and
> > > > > > > > > then
> > > > > > > > > >> attempts to consume that record. Topic is 3-way
> replicated
> > > > say.
> > > > > > > Could
> > > > > > > > it
> > > > > > > > > >> be
> > > > > > > > > >> the case that the produce succeeds but the consume
> fails?
> > > The
> > > > > > > consume
> > > > > > > > > >> could
> > > > > > > > > >> go to a replica that has not yet fetched the produce
> > record,
> > > > > > right?
> > > > > > > Or
> > > > > > > > > is
> > > > > > > > > >> that not possible?
> > > > > > > > > >> - scenario 2: an application C that only consumes. Again
> > say
> > > > > there
> > > > > > > is
> > > > > > > > > only
> > > > > > > > > >> one record produced (by another application P) to a
> > > replicated
> > > > > > topic
> > > > > > > > and
> > > > > > > > > >> that record has not propagated to all replicas yet (it
> is
> > > only
> > > > > at
> > > > > > > the
> > > > > > > > > >> leader at time t0). Application C attempts to consume at
> > > time
> > > > t1
> > > > > > and
> > > > > > > > it
> > > > > > > > > >> does so successfully because the consume fetches from
> the
> > > > > leader.
> > > > > > At
> > > > > > > > > time
> > > > > > > > > >> t2 the same application seeks to the beginning of the
> > topic
> > > > and
> > > > > > > > attempts
> > > > > > > > > >> to
> > > > > > > > > >> consume again. Is there a scenario where this second
> > attempt
> > > > > fails
> > > > > > > > > because
> > > > > > > > > >> the fetching happens from a replica that does not have
> the
> > > > > record
> > > > > > > yet?
> > > > > > > > > At
> > > > > > > > > >> time t3 all replicas have the record.
> > > > > > > > > >>
> > > > > > > > > >> Thanks
> > > > > > > > > >> Eno
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Mon, Dec 10, 2018 at 7:42 PM Jason Gustafson <
> > > > > > ja...@confluent.io
> > > > > > > >
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hey Eno,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks for the comments. However, I'm a bit confused.
> > I'm
> > > > not
> > > > > > > > > >> suggesting we
> > > > > > > > > >> > change Produce semantics in any way. All writes still
> go
> > > > > through
> > > > > > > the
> > > > > > > > > >> > partition leader and nothing changes with respect to
> > > > > committing
> > > > > > to
> > > > > > > > the
> > > > > > > > > >> ISR.
> > > > > > > > > >> > The main issue, as I've mentioned in the KIP, is the
> > > > increased
> > > > > > > > latency
> > > > > > > > > >> > before a committed offset is exposed on followers.
> > > > > > > > > >> >
> > > > > > > > > >> > Perhaps I have misunderstood your question?
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks,
> > > > > > > > > >> > Jason
> > > > > > > > > >> >
> > > > > > > > > >> > On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska <
> > > > > > > eno.there...@gmail.com
> > > > > > > > >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Hi Jason,
> > > > > > > > > >> > >
> > > > > > > > > >> > > This is an interesting KIP. This will have massive
> > > > > > implications
> > > > > > > > for
> > > > > > > > > >> > > consistency and serialization, since currently the
> > > leader
> > > > > for
> > > > > > a
> > > > > > > > > >> partition
> > > > > > > > > >> > > serializes requests. A few questions for now:
> > > > > > > > > >> > >
> > > > > > > > > >> > > - before we deal with the complexity, it'd be great
> to
> > > > see a
> > > > > > > crisp
> > > > > > > > > >> > example
> > > > > > > > > >> > > in the motivation as to when this will have the most
> > > > benefit
> > > > > > > for a
> > > > > > > > > >> > > customer. In particular, although the customer might
> > > have
> > > > a
> > > > > > > > multi-DC
> > > > > > > > > >> > > deployment, the DCs could still be close by in a
> > region,
> > > > so
> > > > > > what
> > > > > > > > is
> > > > > > > > > >> the
> > > > > > > > > >> > > expected best-case scenario for a performance gain?
> > > E.g.,
> > > > if
> > > > > > all
> > > > > > > > DCs
> > > > > > > > > >> are
> > > > > > > > > >> > on
> > > > > > > > > >> > > the east-cost, say. Right now it's not clear to me.
> > > > > > > > > >> > > - perhaps performance is not the right metric. Is
> the
> > > > metric
> > > > > > you
> > > > > > > > are
> > > > > > > > > >> > > optimizing for latency, throughput or cross-DC cost?
> > (I
> > > > > > believe
> > > > > > > it
> > > > > > > > > is
> > > > > > > > > >> > > cross-DC cost from the KIP). Just wanted to
> > double-check
> > > > > since
> > > > > > > I'm
> > > > > > > > > not
> > > > > > > > > >> > sure
> > > > > > > > > >> > > latency would improve. Throughput could really
> improve
> > > > from
> > > > > > > > > >> parallelism
> > > > > > > > > >> > > (especially in cases when there is mostly consuming
> > > going
> > > > > on).
> > > > > > > So
> > > > > > > > it
> > > > > > > > > >> > could
> > > > > > > > > >> > > be throughput as well.
> > > > > > > > > >> > > - the proposal would probably lead to choosing a
> more
> > > > > complex
> > > > > > > > > >> > consistency.
> > > > > > > > > >> > > I tend to like the description Doug Terry has in his
> > > paper
> > > > > > > > > "Replicated
> > > > > > > > > >> > Data
> > > > > > > > > >> > > Consistency Explained Through Baseball"
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf
> > > > > > > > > >> > > .
> > > > > > > > > >> > > To start with, could we get in scenarios where a
> > client
> > > > that
> > > > > > has
> > > > > > > > > both
> > > > > > > > > >> a
> > > > > > > > > >> > > producer and a consumer (e.g., Kafka streams)
> > produces a
> > > > > > record,
> > > > > > > > > then
> > > > > > > > > >> > > attempts to consume it back and the consume() comes
> > back
> > > > > with
> > > > > > > > > "record
> > > > > > > > > >> > does
> > > > > > > > > >> > > not exist"? That's fine, but could complicate
> > > application
> > > > > > > handling
> > > > > > > > > of
> > > > > > > > > >> > such
> > > > > > > > > >> > > scenarios.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Thanks,
> > > > > > > > > >> > > Eno
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison <
> > > > > > > > > >> mickael.mai...@gmail.com
> > > > > > > > > >> > >
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Hi Jason,
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Very cool KIP!
> > > > > > > > > >> > > > A couple of questions:
> > > > > > > > > >> > > > - I'm guessing the selector will be invoke after
> > each
> > > > > > > rebalance
> > > > > > > > so
> > > > > > > > > >> > > > every time the consumer is assigned a partition it
> > > will
> > > > be
> > > > > > > able
> > > > > > > > to
> > > > > > > > > >> > > > select it. Is that true?
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > - From the selector API, I'm not sure how the
> > consumer
> > > > > will
> > > > > > be
> > > > > > > > > able
> > > > > > > > > >> to
> > > > > > > > > >> > > > address some of the choices mentioned in "Finding
> > the
> > > > > > > preferred
> > > > > > > > > >> > > > follower". Especially the available bandwidth and
> > the
> > > > load
> > > > > > > > > >> balancing.
> > > > > > > > > >> > > > By only having the list of Nodes, a consumer can
> > pick
> > > > the
> > > > > > > > nereast
> > > > > > > > > >> > > > replica (assuming the rack field means anything to
> > > > users)
> > > > > or
> > > > > > > > > balance
> > > > > > > > > >> > > > its own bandwidth but that might not necessarily
> > mean
> > > > > > improved
> > > > > > > > > >> > > > performance or a balanced load on the brokers.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Thanks
> > > > > > > > > >> > > > On Mon, Dec 3, 2018 at 11:35 AM Stanislav
> Kozlovski
> > > > > > > > > >> > > > <stanis...@confluent.io> wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Hey Jason,
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > This is certainly a very exciting KIP.
> > > > > > > > > >> > > > > I assume that no changes will be made to the
> > offset
> > > > > > commits
> > > > > > > > and
> > > > > > > > > >> they
> > > > > > > > > >> > > will
> > > > > > > > > >> > > > > continue to be sent to the group coordinator?
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > I also wanted to address metrics - have we
> > > considered
> > > > > any
> > > > > > > > > changes
> > > > > > > > > >> > > there?
> > > > > > > > > >> > > > I
> > > > > > > > > >> > > > > imagine that it would be valuable for users to
> be
> > > able
> > > > > to
> > > > > > > > > >> > differentiate
> > > > > > > > > >> > > > > between which consumers' partitions are fetched
> > from
> > > > > > > replicas
> > > > > > > > > and
> > > > > > > > > >> > which
> > > > > > > > > >> > > > > aren't. I guess that would need to be addressed
> > both
> > > > in
> > > > > > the
> > > > > > > > > >> server's
> > > > > > > > > >> > > > > fetcher lag metrics and in the consumers.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Thanks,
> > > > > > > > > >> > > > > Stanislav
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > On Wed, Nov 28, 2018 at 10:08 PM Jun Rao <
> > > > > > j...@confluent.io>
> > > > > > > > > >> wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > > Hi, Jason,
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Thanks for the KIP. Looks good overall. A few
> > > minor
> > > > > > > comments
> > > > > > > > > >> below.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > 1. The section on handling
> > FETCH_OFFSET_TOO_LARGE
> > > > > error
> > > > > > > says
> > > > > > > > > >> "Use
> > > > > > > > > >> > the
> > > > > > > > > >> > > > > > OffsetForLeaderEpoch API to verify the current
> > > > > position
> > > > > > > with
> > > > > > > > > the
> > > > > > > > > >> > > > leader".
> > > > > > > > > >> > > > > > The OffsetForLeaderEpoch request returns log
> end
> > > > > offset
> > > > > > if
> > > > > > > > the
> > > > > > > > > >> > > request
> > > > > > > > > >> > > > > > leader epoch is the latest. So, we won't know
> > the
> > > > true
> > > > > > > high
> > > > > > > > > >> > watermark
> > > > > > > > > >> > > > from
> > > > > > > > > >> > > > > > that request. It seems that the consumer still
> > > needs
> > > > > to
> > > > > > > send
> > > > > > > > > >> > > ListOffset
> > > > > > > > > >> > > > > > request to the leader to obtain high
> watermark?
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > 2. If a non in-sync replica receives a fetch
> > > request
> > > > > > from
> > > > > > > a
> > > > > > > > > >> > consumer,
> > > > > > > > > >> > > > > > should it return a new type of error like
> > > > > > > ReplicaNotInSync?
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > 3. Could ReplicaSelector be closable?
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > 4. Currently, the ISR propagation from the
> > leader
> > > to
> > > > > the
> > > > > > > > > >> controller
> > > > > > > > > >> > > > can be
> > > > > > > > > >> > > > > > delayed up to 60 secs through
> > > > > > > > > >> > > > ReplicaManager.IsrChangePropagationInterval.
> > > > > > > > > >> > > > > > In that window, the consumer could still be
> > > > consuming
> > > > > > > from a
> > > > > > > > > non
> > > > > > > > > >> > > > in-sync
> > > > > > > > > >> > > > > > replica. The relatively large delay is mostly
> > for
> > > > > > reducing
> > > > > > > > the
> > > > > > > > > >> ZK
> > > > > > > > > >> > > > writes
> > > > > > > > > >> > > > > > and the watcher overhead. Not sure what's the
> > best
> > > > way
> > > > > > to
> > > > > > > > > >> address
> > > > > > > > > >> > > > this. We
> > > > > > > > > >> > > > > > could potentially make this configurable.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > 5. It may be worth mentioning that, to take
> > > > advantage
> > > > > of
> > > > > > > > > >> affinity,
> > > > > > > > > >> > > one
> > > > > > > > > >> > > > may
> > > > > > > > > >> > > > > > also want to have a customized
> PartitionAssignor
> > > to
> > > > > have
> > > > > > > an
> > > > > > > > > >> > affinity
> > > > > > > > > >> > > > aware
> > > > > > > > > >> > > > > > assignment in addition to a customized
> > > > > ReplicaSelector.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Jun
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > On Wed, Nov 21, 2018 at 12:54 PM Jason
> > Gustafson <
> > > > > > > > > >> > ja...@confluent.io
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > > wrote:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > > Hi All,
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > I've posted a KIP to add the often-requested
> > > > support
> > > > > > for
> > > > > > > > > >> fetching
> > > > > > > > > >> > > > from
> > > > > > > > > >> > > > > > > followers:
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> > > > > > > > > >> > > > > > > .
> > > > > > > > > >> > > > > > > Please take a look and let me know what you
> > > think.
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > > > Thanks,
> > > > > > > > > >> > > > > > > Jason
> > > > > > > > > >> > > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > --
> > > > > > > > > >> > > > > Best,
> > > > > > > > > >> > > > > Stanislav
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to