Hi, Jason,

Thanks for the updated KIP. Looks good overall. Just a few minor comments.

20. For case 2, if the consumer receives an OFFSET_NOT_AVAILABLE, I am
wondering if the consumer should refresh the metadata before retrying. This
can allow the consumer to switch to an in-sync replica sooner.

21. Under "protocol changes", there is a sentence "This allows the broker "
that seems broken.

4. About reducing the ISR propagation delay from the broker to the
controller. Jiangjie made that change in KAFKA-2722. Jiangjie, could you
comment on whether it's reasonable to reduce the propagation delay now?

Thanks,

Jun

On Wed, Jan 2, 2019 at 11:06 AM Jason Gustafson <ja...@confluent.io> wrote:

> Hey Jun,
>
> Sorry for the late reply. I have been giving your comments some thought.
> Replies below:
>
> 1. The section on handling FETCH_OFFSET_TOO_LARGE error says "Use the
> > OffsetForLeaderEpoch API to verify the current position with the leader".
> > The OffsetForLeaderEpoch request returns log end offset if the request
> > leader epoch is the latest. So, we won't know the true high watermark
> from
> > that request. It seems that the consumer still needs to send ListOffset
> > request to the leader to obtain high watermark?
>
>
> That's a good point. I think we missed this in KIP-320. I've added a
> replica_id to the OffsetsForLeaderEpoch API to match the Fetch and
> ListOffsets API so that the broker can avoid exposing offsets beyond the
> high watermark. This also means that the OffsetsForLeaderEpoch API needs
> the same handling we added to the ListOffsets API to avoid non-monotonic or
> incorrect responses. Similarly, I've proposed using the
> OFFSET_NOT_AVAILABLE error code in cases where the end offset of an epoch
> would exceed the high watermark. When querying the latest epoch, the leader
> will return OFFSET_NOT_AVAILABLE until the high watermark has reached an
> offset in the leader's current epoch.
>
> By the way, I've modified the KIP to drop the OFFSET_TOO_LARGE and
> OFFSET_TOO_SMALL error codes that I initially proposed. I realized that we
> could continue to use the current OFFSET_OUT_OF_RANGE error and rely on the
> returned start offset to distinguish the two cases.
>
> 2. If a non in-sync replica receives a fetch request from a consumer,
> > should it return a new type of error like ReplicaNotInSync?
>
>
> I gave this quite a bit of thought. It is impossible to avoid fetching from
> out-of-sync replicas in general due to propagation of the ISR state. The
> high watermark that is returned in fetch responses could be used as a more
> timely substitute, but we still can't assume that followers will always
> know when they are in-sync. From a high level, this means that the consumer
> anyway has to take out of range errors with a grain of salt if they come
> from followers. This is only a problem when switching between replicas or
> if resuming from a committed offset. If a consumer is following the same
> out-of-sync replica, then its position will stay in range and, other than
> some extra latency, no harm will be done.
>
> Furthermore, it may not be a good idea for consumers to chase the ISR too
> eagerly since this makes the performance profile harder to predict. The
> leader itself may have some temporarily increased request load which is
> causing followers to fall behind. If consumers then switched to the leader
> after they observed that the follower was out-of-sync, it may make the
> situation worse. Typically, If a follower has fallen out-of-sync, we expect
> it to catch back up shortly. It may be better in this scenario to allow
> consumers to continue fetching from it. On the other hand, if a follower
> stays out-of-sync for a while, the consumer should have the choice to find
> a new replica.
>
> So after thinking about it, I didn't see a lot of benefit in trying to be
> strict about ISR fetching. Potentially it even has downsides. Instead, I
> now see it as more of a heuristic which the consumer can use to keep
> end-to-end latency reasonably bounded. The consumer already has one knob
> the user can tune in order to limit this bound. The `metadata.max.age.ms`
> config controls how often metadata is refreshed. To follow the ISR more
> closely, the user can refresh metadata more frequently.
>
> Note that I've improved the section on out of range handling to be more
> explicit about the cases we needed to handle.
>
> 3. Could ReplicaSelector be closable?
>
>
> Yes, I made this change. As an aside, the question of whether we should use
> a plugin does deserve a bit of discussion. An alternative (suggested by
> David Arthur) that I've been thinking about is to let the broker select the
> preferred follower to fetch from using the Metadata API. For example, we
> could add a `rackId` field to the Metadata API which could be provided
> through user configuration. The broker could then order the ISR list for
> each partition so that the preferred follower is returned first (currently
> the order is random). The consumer could then always fetch from the first
> replica in the ISR list. The benefit is that the broker may have a better
> view of the current load characteristics, so it may be able to make better
> decisions. Client plugins are also much more difficult to control. This may
> have been the point that Mickael was hinting at above.
>
> 4. Currently, the ISR propagation from the leader to the controller can be
> > delayed up to 60 secs through
> ReplicaManager.IsrChangePropagationInterval.
> > In that window, the consumer could still be consuming from a non in-sync
> > replica. The relatively large delay is mostly for reducing the ZK writes
> > and the watcher overhead. Not sure what's the best way to address this.
> We
> > could potentially make this configurable.
>
>
> This is related to the discussion above. We could make it configurable I
> guess. I wonder if it would be reasonable to just reduce the default to
> something like 10 seconds. Do you think we get much benefit from such a
> long delay?
>
> 5. It may be worth mentioning that, to take advantage of affinity, one may
> > also want to have a customized PartitionAssignor to have an affinity
> aware
> > assignment in addition to a customized ReplicaSelector.
>
>
> Yes, this is a good point. I was assuming a situation in which each
> partition had its replicas in all the same datacenters, but you are right
> that this need not be the case. I will mention this in the KIP and give it
> some more thought. I think in the common case, these concerns can be
> treated orthogonally, but it is a bit irritating if you need two separate
> plugins to make the benefit more general.
>
>
> Thanks,
> Jason
>
> On Tue, Dec 11, 2018 at 11:04 AM Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Eno,
> >
> > Thanks for the clarification. From a high level, the main thing to keep
> in
> > mind is that this is an opt-in feature. It is a bit like using acks=1 in
> > the sense that a user is accepting slightly weaker guarantees in order to
> > optimize for some metric (in this case, read locality). The default
> > behavior would read only from the leader and users will get the usual
> > semantics. That said, let me address the scenarios you raised:
> >
> > - scenario 1: an application that both produces and consumes (e.g., like
> >> Kafka streams) produces synchronously a single record to a topic and
> then
> >> attempts to consume that record. Topic is 3-way replicated say. Could it
> >> be
> >> the case that the produce succeeds but the consume fails? The consume
> >> could
> >> go to a replica that has not yet fetched the produce record, right? Or
> is
> >> that not possible?
> >
> >
> > I think it depends on what you mean by "fails." From a replica in the
> > ISR's perspective, it has all of the committed data. The only question is
> > what is safe to expose since the high watermark is always one round trip
> > behind. The proposal is to return a retriable error in this case so that
> > the consumer can distinguish the case from an out of range error and
> retry.
> > No error will be returned to the user, but consumption will be delayed.
> One
> > of the main improvements in the KIP is ensuring that this delay is
> minimal
> > in the common case.
> >
> > Note that even without follower fetching, this scenario is unavoidable.
> > When a replica becomes a leader, it doesn't know what the latest high
> > watermark is until it receives fetches from all followers in the ISR.
> > During this window, committed data is temporarily not visible. We handle
> > this similarly to what is proposed here. Basically we ask the consumer to
> > retry until we know the data is safe.
> >
> > - scenario 2: an application C that only consumes. Again say there is
> only
> >> one record produced (by another application P) to a replicated topic and
> >> that record has not propagated to all replicas yet (it is only at the
> >> leader at time t0). Application C attempts to consume at time t1 and it
> >> does so successfully because the consume fetches from the leader. At
> time
> >> t2 the same application seeks to the beginning of the topic and attempts
> >> to
> >> consume again. Is there a scenario where this second attempt fails
> because
> >> the fetching happens from a replica that does not have the record yet?
> At
> >> time t3 all replicas have the record.
> >
> >
> > Yes, this is possible in the way that I described above. There is a
> > (typically short) window in which committed data may not be visible. It
> is
> > a bit like the partition itself being unavailable temporarily. The data
> has
> > not been lost and is guaranteed to be returned, but the consumer has to
> > wait until the follower knows it is safe to return.
> >
> > One final note: I am iterating on the design a little bit in order to
> > address Jun's comments. Expect a few changes. I realized that there is
> some
> > inconsistency with the current fetch behavior and KIP-207. It is mainly
> in
> > regard to how we handle the transition from becoming a follower to
> becoming
> > a leader.
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Tue, Dec 11, 2018 at 3:46 AM Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Jason,
> >>
> >> My question was on producer + consumer semantics, not just the producer
> >> semantics. I'll rephrase it slightly and split into two questions:
> >> - scenario 1: an application that both produces and consumes (e.g., like
> >> Kafka streams) produces synchronously a single record to a topic and
> then
> >> attempts to consume that record. Topic is 3-way replicated say. Could it
> >> be
> >> the case that the produce succeeds but the consume fails? The consume
> >> could
> >> go to a replica that has not yet fetched the produce record, right? Or
> is
> >> that not possible?
> >> - scenario 2: an application C that only consumes. Again say there is
> only
> >> one record produced (by another application P) to a replicated topic and
> >> that record has not propagated to all replicas yet (it is only at the
> >> leader at time t0). Application C attempts to consume at time t1 and it
> >> does so successfully because the consume fetches from the leader. At
> time
> >> t2 the same application seeks to the beginning of the topic and attempts
> >> to
> >> consume again. Is there a scenario where this second attempt fails
> because
> >> the fetching happens from a replica that does not have the record yet?
> At
> >> time t3 all replicas have the record.
> >>
> >> Thanks
> >> Eno
> >>
> >>
> >>
> >>
> >> On Mon, Dec 10, 2018 at 7:42 PM Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >>
> >> > Hey Eno,
> >> >
> >> > Thanks for the comments. However, I'm a bit confused. I'm not
> >> suggesting we
> >> > change Produce semantics in any way. All writes still go through the
> >> > partition leader and nothing changes with respect to committing to the
> >> ISR.
> >> > The main issue, as I've mentioned in the KIP, is the increased latency
> >> > before a committed offset is exposed on followers.
> >> >
> >> > Perhaps I have misunderstood your question?
> >> >
> >> > Thanks,
> >> > Jason
> >> >
> >> > On Mon, Dec 3, 2018 at 9:18 AM Eno Thereska <eno.there...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Jason,
> >> > >
> >> > > This is an interesting KIP. This will have massive implications for
> >> > > consistency and serialization, since currently the leader for a
> >> partition
> >> > > serializes requests. A few questions for now:
> >> > >
> >> > > - before we deal with the complexity, it'd be great to see a crisp
> >> > example
> >> > > in the motivation as to when this will have the most benefit for a
> >> > > customer. In particular, although the customer might have a multi-DC
> >> > > deployment, the DCs could still be close by in a region, so what is
> >> the
> >> > > expected best-case scenario for a performance gain? E.g., if all DCs
> >> are
> >> > on
> >> > > the east-cost, say. Right now it's not clear to me.
> >> > > - perhaps performance is not the right metric. Is the metric you are
> >> > > optimizing for latency, throughput or cross-DC cost? (I believe it
> is
> >> > > cross-DC cost from the KIP). Just wanted to double-check since I'm
> not
> >> > sure
> >> > > latency would improve. Throughput could really improve from
> >> parallelism
> >> > > (especially in cases when there is mostly consuming going on). So it
> >> > could
> >> > > be throughput as well.
> >> > > - the proposal would probably lead to choosing a more complex
> >> > consistency.
> >> > > I tend to like the description Doug Terry has in his paper
> "Replicated
> >> > Data
> >> > > Consistency Explained Through Baseball"
> >> > >
> >> > >
> >> >
> >>
> https://www.microsoft.com/en-us/research/wp-content/uploads/2011/10/ConsistencyAndBaseballReport.pdf
> >> > > .
> >> > > To start with, could we get in scenarios where a client that has
> both
> >> a
> >> > > producer and a consumer (e.g., Kafka streams) produces a record,
> then
> >> > > attempts to consume it back and the consume() comes back with
> "record
> >> > does
> >> > > not exist"? That's fine, but could complicate application handling
> of
> >> > such
> >> > > scenarios.
> >> > >
> >> > > Thanks,
> >> > > Eno
> >> > >
> >> > > On Mon, Dec 3, 2018 at 12:24 PM Mickael Maison <
> >> mickael.mai...@gmail.com
> >> > >
> >> > > wrote:
> >> > >
> >> > > > Hi Jason,
> >> > > >
> >> > > > Very cool KIP!
> >> > > > A couple of questions:
> >> > > > - I'm guessing the selector will be invoke after each rebalance so
> >> > > > every time the consumer is assigned a partition it will be able to
> >> > > > select it. Is that true?
> >> > > >
> >> > > > - From the selector API, I'm not sure how the consumer will be
> able
> >> to
> >> > > > address some of the choices mentioned in "Finding the preferred
> >> > > > follower". Especially the available bandwidth and the load
> >> balancing.
> >> > > > By only having the list of Nodes, a consumer can pick the nereast
> >> > > > replica (assuming the rack field means anything to users) or
> balance
> >> > > > its own bandwidth but that might not necessarily mean improved
> >> > > > performance or a balanced load on the brokers.
> >> > > >
> >> > > > Thanks
> >> > > > On Mon, Dec 3, 2018 at 11:35 AM Stanislav Kozlovski
> >> > > > <stanis...@confluent.io> wrote:
> >> > > > >
> >> > > > > Hey Jason,
> >> > > > >
> >> > > > > This is certainly a very exciting KIP.
> >> > > > > I assume that no changes will be made to the offset commits and
> >> they
> >> > > will
> >> > > > > continue to be sent to the group coordinator?
> >> > > > >
> >> > > > > I also wanted to address metrics - have we considered any
> changes
> >> > > there?
> >> > > > I
> >> > > > > imagine that it would be valuable for users to be able to
> >> > differentiate
> >> > > > > between which consumers' partitions are fetched from replicas
> and
> >> > which
> >> > > > > aren't. I guess that would need to be addressed both in the
> >> server's
> >> > > > > fetcher lag metrics and in the consumers.
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Stanislav
> >> > > > >
> >> > > > > On Wed, Nov 28, 2018 at 10:08 PM Jun Rao <j...@confluent.io>
> >> wrote:
> >> > > > >
> >> > > > > > Hi, Jason,
> >> > > > > >
> >> > > > > > Thanks for the KIP. Looks good overall. A few minor comments
> >> below.
> >> > > > > >
> >> > > > > > 1. The section on handling FETCH_OFFSET_TOO_LARGE error says
> >> "Use
> >> > the
> >> > > > > > OffsetForLeaderEpoch API to verify the current position with
> the
> >> > > > leader".
> >> > > > > > The OffsetForLeaderEpoch request returns log end offset if the
> >> > > request
> >> > > > > > leader epoch is the latest. So, we won't know the true high
> >> > watermark
> >> > > > from
> >> > > > > > that request. It seems that the consumer still needs to send
> >> > > ListOffset
> >> > > > > > request to the leader to obtain high watermark?
> >> > > > > >
> >> > > > > > 2. If a non in-sync replica receives a fetch request from a
> >> > consumer,
> >> > > > > > should it return a new type of error like ReplicaNotInSync?
> >> > > > > >
> >> > > > > > 3. Could ReplicaSelector be closable?
> >> > > > > >
> >> > > > > > 4. Currently, the ISR propagation from the leader to the
> >> controller
> >> > > > can be
> >> > > > > > delayed up to 60 secs through
> >> > > > ReplicaManager.IsrChangePropagationInterval.
> >> > > > > > In that window, the consumer could still be consuming from a
> non
> >> > > > in-sync
> >> > > > > > replica. The relatively large delay is mostly for reducing the
> >> ZK
> >> > > > writes
> >> > > > > > and the watcher overhead. Not sure what's the best way to
> >> address
> >> > > > this. We
> >> > > > > > could potentially make this configurable.
> >> > > > > >
> >> > > > > > 5. It may be worth mentioning that, to take advantage of
> >> affinity,
> >> > > one
> >> > > > may
> >> > > > > > also want to have a customized PartitionAssignor to have an
> >> > affinity
> >> > > > aware
> >> > > > > > assignment in addition to a customized ReplicaSelector.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > >
> >> > > > > > Jun
> >> > > > > >
> >> > > > > > On Wed, Nov 21, 2018 at 12:54 PM Jason Gustafson <
> >> > ja...@confluent.io
> >> > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Hi All,
> >> > > > > > >
> >> > > > > > > I've posted a KIP to add the often-requested support for
> >> fetching
> >> > > > from
> >> > > > > > > followers:
> >> > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> >> > > > > > > .
> >> > > > > > > Please take a look and let me know what you think.
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Jason
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > > >
> >> > > > > --
> >> > > > > Best,
> >> > > > > Stanislav
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to