>>I'm wondering, rather than putting "acceptable lag" into the
configuration at all, or even making it a parameter on `allMetadataForKey`,
why not just _always_ return all available metadata (including
active/standby or lag) and let the caller decide to which node they want to
route the query?
+1 on exposing lag information via the APIs. IMO without having
continuously updated/fresh lag information, its true value as a signal for
query routing decisions is much limited. But we can design the API around
this model and iterate? Longer term, we should have continuously shared lag
information.

>>more general to refactor it to "allMetadataForKey(long
tolerableDataStaleness, ...)", and when it's set to 0 it means "active task
only".
+1 IMO if we plan on having `enableReplicaServing`, it makes sense to
generalize based on dataStaleness. This seems complementary to exposing the
lag information itself.

>>This is actually not a public api change at all, and I'm planning to
implement it asap as a precursor to the rest of KIP-441
+1 again. Do we have a concrete timeline for when this change will land on
master? I would like to get the implementation wrapped up (as much as
possible) by end of the month. :). But I agree this sequencing makes
sense..


On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Navinder,
>
> Thanks for the KIP, I have a high level question about the proposed API
> regarding:
>
> "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)"
>
> I'm wondering if it's more general to refactor it to
> "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to
> 0 it means "active task only". Behind the scene, we can have the committed
> offsets to encode the stream time as well, so that when processing standby
> tasks the stream process knows not long the lag in terms of offsets
> comparing to the committed offset (internally we call it offset limit), but
> also the lag in terms of timestamp diff comparing the committed offset.
>
> Also encoding the timestamp as part of offset have other benefits for
> improving Kafka Streams time semantics as well, but for KIP-535 itself I
> think it can help giving users a more intuitive interface to reason about.
>
>
> Guozhang
>
> On Mon, Oct 21, 2019 at 12:30 PM John Roesler <j...@confluent.io> wrote:
>
> > Hey Navinder,
> >
> > Thanks for the KIP! I've been reading over the discussion thus far,
> > and I have a couple of thoughts to pile on as well:
> >
> > It seems confusing to propose the API in terms of the current system
> > state, but also propose how the API would look if/when KIP-441 is
> > implemented. It occurs to me that the only part of KIP-441 that would
> > affect you is the availability of the lag information in the
> > SubscriptionInfo message. This is actually not a public api change at
> > all, and I'm planning to implement it asap as a precursor to the rest
> > of KIP-441, so maybe you can just build on top of KIP-441 and assume
> > the lag information will be available. Then you could have a more
> > straightforward proposal (e.g., mention that you'd return the lag
> > information in AssignmentInfo as well as in the StreamsMetadata in
> > some form, or make use of it in the API somehow).
> >
> > I'm partially motivated in that former point because it seems like
> > understanding how callers would bound the staleness for their use case
> > is _the_ key point for this KIP. FWIW, I think that adding a new
> > method to StreamsMetadataState and deprecating the existing method is
> > the best way to go; we just can't change the return types of any
> > existing methods.
> >
> > I'm wondering, rather than putting "acceptable lag" into the
> > configuration at all, or even making it a parameter on
> > `allMetadataForKey`, why not just _always_ return all available
> > metadata (including active/standby or lag) and let the caller decide
> > to which node they want to route the query? This method isn't making
> > any queries itself; it's merely telling you where the local Streams
> > instance _thinks_ the key in question is located. Just returning all
> > available information lets the caller implement any semantics they
> > desire around querying only active stores, or standbys, or recovering
> > stores, or whatever.
> >
> > One fly in the ointment, which you may wish to consider if proposing
> > to use lag information, is that the cluster would only become aware of
> > new lag information during rebalances. Even in the full expression of
> > KIP-441, this information would stop being propagated when the cluster
> > achieves a balanced task distribution. There was a follow-on idea I
> > POCed to continuously share lag information in the heartbeat protocol,
> > which you might be interested in, if you want to make sure that nodes
> > are basically _always_ aware of each others' lag on different
> > partitions: https://github.com/apache/kafka/pull/7096
> >
> > Thanks again!
> > -John
> >
> >
> > On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
> > <navinder_b...@yahoo.com.invalid> wrote:
> > >
> > > Thanks, Vinoth. Looks like we are on the same page. I will add some of
> > these explanations to the KIP as well. Have assigned the KAFKA-6144 to
> > myself and KAFKA-8994 is closed(by you). As suggested, we will replace
> > "replica" with "standby".
> > >
> > > In the new API, "StreamsMetadataState::allMetadataForKey(boolean
> > enableReplicaServing, String storeName, K key, Serializer<K>
> > keySerializer)" Do we really need a per key configuration? or a new
> > StreamsConfig is good enough?>> Coming from experience, when teams are
> > building a platform with Kafka Streams and these API's serve data to
> > multiple teams, we can't have a generalized config that says as a
> platform
> > we will support stale reads or not. It should be the choice of someone
> who
> > is calling the API's to choose whether they are ok with stale reads or
> not.
> > Makes sense?
> > >     On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> > >
> > >  Looks like we are covering ground :)
> > >
> > > >>Only if it is within a permissible  range(say 10000) we will serve
> from
> > > Restoring state of active.
> > > +1 on having a knob like this.. My reasoning is as follows.
> > >
> > > Looking at the Streams state as a read-only distributed kv store. With
> > > num_standby = f , we should be able to tolerate f failures and if there
> > is
> > > a f+1' failure, the system should be unavailable.
> > >
> > > A) So with num_standby=0, the system should be unavailable even if
> there
> > is
> > > 1 failure and thats my argument for not allowing querying in
> restoration
> > > state, esp in this case it will be a total rebuild of the state (which
> > IMO
> > > cannot be considered a normal fault free operational state).
> > >
> > > B) Even there are standby's, say num_standby=2, if the user decides to
> > shut
> > > down all 3 instances, then only outcome should be unavailability until
> > all
> > > of them come back or state is rebuilt on other nodes in the cluster. In
> > > normal operations, f <= 2 and when a failure does happen we can then
> > either
> > > choose to be C over A and fail IQs until replication is fully caught up
> > or
> > > choose A over C by serving in restoring state as long as lag is
> minimal.
> > If
> > > even with f=1 say, all the standbys are lagging a lot due to some
> issue,
> > > then that should be considered a failure since that is different from
> > > normal/expected operational mode. Serving reads with unbounded
> > replication
> > > lag and calling it "available" may not be very usable or even desirable
> > :)
> > > IMHO, since it gives the user no way to reason about the app that is
> > going
> > > to query this store.
> > >
> > > So there is definitely a need to distinguish between :  Replication
> > catchup
> > > while being in fault free state vs Restoration of state when we lose
> more
> > > than f standbys. This knob is a great starting point towards this.
> > >
> > > If you agree with some of the explanation above, please feel free to
> > > include it in the KIP as well since this is sort of our design
> principle
> > > here..
> > >
> > > Small nits :
> > >
> > > - let's standardize on "standby" instead of "replica", KIP or code,  to
> > be
> > > consistent with rest of Streams code/docs?
> > > - Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
> > > Eventually need to consolidate KAFKA-6555 as well
> > > - In the new API, "StreamsMetadataState::allMetadataForKey(boolean
> > > enableReplicaServing, String storeName, K key, Serializer<K>
> > keySerializer)" Do
> > > we really need a per key configuration? or a new StreamsConfig is good
> > > enough?
> > >
> > > On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
> > > <navinder_b...@yahoo.com.invalid> wrote:
> > >
> > > > @Vinoth, I have incorporated a few of the discussions we have had in
> > the
> > > > KIP.
> > > >
> > > > In the current code, t0 and t1 serve queries from Active(Running)
> > > > partition. For case t2, we are planning to return
> List<StreamsMetadata>
> > > > such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that
> > if IQ
> > > > fails on A, the replica on B can serve the data by enabling serving
> > from
> > > > replicas. This still does not solve case t3 and t4 since B has been
> > > > promoted to active but it is in Restoring state to catchup till A’s
> > last
> > > > committed position as we don’t serve from Restoring state in Active
> > and new
> > > > Replica on R is building itself from scratch. Both these cases can be
> > > > solved if we start serving from Restoring state of active as well
> > since it
> > > > is almost equivalent to previous Active.
> > > >
> > > > There could be a case where all replicas of a partition become
> > unavailable
> > > > and active and all replicas of that partition are building themselves
> > from
> > > > scratch, in this case, the state in Active is far behind even though
> > it is
> > > > in Restoring state. To cater to such cases that we don’t serve from
> > this
> > > > state we can either add another state before Restoring or check the
> > > > difference between last committed offset and current position. Only
> if
> > it
> > > > is within a permissible range (say 10000) we will serve from
> Restoring
> > the
> > > > state of Active.
> > > >
> > > >
> > > >    On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar <
> > > > vchan...@confluent.io> wrote:
> > > >
> > > >  Thanks for the updates on the KIP, Navinder!
> > > >
> > > > Few comments
> > > >
> > > > - AssignmentInfo is not public API?. But we will change it and thus
> > need to
> > > > increment the version and test for version_probing etc. Good to
> > separate
> > > > that from StreamsMetadata changes (which is public API)
> > > > - From what I see, there is going to be choice between the following
> > > >
> > > >  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
> > > > potentially returns List<StreamsMetadata> ordered from most upto date
> > to
> > > > least upto date replicas. Today we cannot fully implement this
> > ordering,
> > > > since all we know is which hosts are active and which are standbys.
> > > > However, this aligns well with the future. KIP-441 adds the lag
> > information
> > > > to the rebalancing protocol. We could also sort replicas based on the
> > > > report lags eventually. This is fully backwards compatible with
> > existing
> > > > clients. Only drawback I see is the naming of the existing method
> > > > KafkaStreams::metadataForKey, not conveying the distinction that it
> > simply
> > > > returns the active replica i.e allMetadataForKey.get(0).
> > > >  B) Change KafkaStreams::metadataForKey() to return a List. Its a
> > breaking
> > > > change.
> > > >
> > > > I prefer A, since none of the semantics/behavior changes for existing
> > > > users. Love to hear more thoughts. Can we also work this into the
> KIP?
> > > > I already implemented A to unblock myself for now. Seems feasible to
> > do.
> > > >
> > > >
> > > > On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar <
> vchan...@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > >>I get your point. But suppose there is a replica which has just
> > become
> > > > > active, so in that case replica will still be building itself from
> > > > scratch
> > > > > and this active will go to restoring state till it catches up with
> > > > previous
> > > > > active, wouldn't serving from a restoring active make more sense
> > than a
> > > > > replica in such case.
> > > > >
> > > > > KIP-441 will change this behavior such that promotion to active
> > happens
> > > > > based on how caught up a replica is. So, once we have that (work
> > underway
> > > > > already for 2.5 IIUC) and user sets num.standby.replicas > 0, then
> > the
> > > > > staleness window should not be that long as you describe. IMO if
> user
> > > > wants
> > > > > availability for state, then should configure num.standby.replicas
> >
> > 0.
> > > > If
> > > > > not, then on a node loss, few partitions would be unavailable for a
> > while
> > > > > (there are other ways to bring this window down, which I won't
> bring
> > in
> > > > > here). We could argue for querying a restoring active (say a new
> node
> > > > added
> > > > > to replace a faulty old node) based on AP vs CP principles. But not
> > sure
> > > > > reading really really old values for the sake of availability is
> > useful.
> > > > No
> > > > > AP data system would be inconsistent for such a long time in
> > practice.
> > > > >
> > > > > So, I still feel just limiting this to standby reads provides best
> > > > > semantics.
> > > > >
> > > > > Just my 2c. Would love to see what others think as well.
> > > > >
> > > > > On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
> > > > > <navinder_b...@yahoo.com.invalid> wrote:
> > > > >
> > > > >> Hi Vinoth,
> > > > >> Thanks for the feedback.
> > > > >>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
> > > > >> Based on the discussion on KAFKA-6144, I was under the impression
> > that
> > > > >> this KIP is also going to cover exposing of the standby
> information
> > in
> > > > >> StreamsMetadata and thus subsume KAFKA-8994 . That would require a
> > > > public
> > > > >> API change?>> Sure, I can add changes for 8994 in this KIP and
> link
> > > > >> KAFKA-6144 to KAFKA-8994 as well.
> > > > >>  KIP seems to be focussing on restoration when a new node is
> added.
> > > > >> KIP-441 is underway and has some major changes proposed for this.
> It
> > > > would
> > > > >> be good to clarify dependencies if any. Without KIP-441, I am not
> > very
> > > > sure
> > > > >> if we should allow reads from nodes in RESTORING state, which
> could
> > > > amount
> > > > >> to many minutes/few hours of stale reads?  This is different from
> > > > allowing
> > > > >> querying standby replicas, which could be mostly caught up and the
> > > > >> staleness window could be much smaller/tolerable. (once again the
> > focus
> > > > on
> > > > >> KAFKA-8994).>> I get your point. But suppose there is a replica
> > which
> > > > has
> > > > >> just become active, so in that case replica will still be building
> > > > itself
> > > > >> from scratch and this active will go to restoring state till it
> > catches
> > > > up
> > > > >> with previous active, wouldn't serving from a restoring active
> make
> > more
> > > > >> sense than a replica in such case.
> > > > >>
> > > > >> Finally, we may need to introduce a configuration to control this.
> > Some
> > > > >> users may prefer errors to stale data. Can we also add it to the
> > KIP?>>
> > > > >> Will add this.
> > > > >>
> > > > >> Regards,
> > > > >> Navinder
> > > > >>
> > > > >>
> > > > >> On2019/10/14 16:56:49, Vinoth Chandar <v...@confluent.io>wrote:
> > > > >>
> > > > >> >Hi Navinder,>
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >Thanks for sharing the KIP! Few thoughts>
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >- Can we link the JIRA, discussion thread also to the KIP>
> > > > >>
> > > > >> >- Based on the discussion on KAFKA-6144, I was under the
> impression
> > > > >> that>
> > > > >>
> > > > >> >this KIP is also going to cover exposing of the standby
> > information in>
> > > > >>
> > > > >> >StreamsMetadata and thus subsume KAFKA-8994 . That would require
> a
> > > > >> public>
> > > > >>
> > > > >> >API change?>
> > > > >>
> > > > >> >- KIP seems to be focussing on restoration when a new node is
> > added.>
> > > > >>
> > > > >> >KIP-441 is underway and has some major changes proposed for this.
> > It
> > > > >> would>
> > > > >>
> > > > >> >be good to clarify dependencies if any. Without KIP-441, I am not
> > very
> > > > >> sure>
> > > > >>
> > > > >> >if we should allow reads from nodes in RESTORING state, which
> could
> > > > >> amount>
> > > > >>
> > > > >> >to many minutes/few hours of stale reads?  This is different
> > > > >> fromallowing>
> > > > >>
> > > > >> >querying standby replicas, which could be mostly caught up and
> the>
> > > > >>
> > > > >> >staleness window could be much smaller/tolerable. (once again the
> > focus
> > > > >> on>
> > > > >>
> > > > >> >KAFKA-8994)>
> > > > >>
> > > > >> >- Finally, we may need to introduce a configuration to control
> > this.
> > > > >> Some>
> > > > >>
> > > > >> >users may prefer errors to stale data. Can we also add it to the
> > KIP?>
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >Thanks>
> > > > >>
> > > > >> >Vinoth>
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
> > > > >>
> > > > >> ><na...@yahoo.com.invalid>wrote:>
> > > > >>
> > > > >> >
> > > > >>
> > > > >> >> Hi,>
> > > > >>
> > > > >> >> Starting a discussion on the KIP to Allow state stores to serve
> > > > stale>
> > > > >>
> > > > >> >> reads during rebalance(>
> > > > >>
> > > > >> >>
> > > > >>
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > > > >> >
> > > > >>
> > > > >> >> ).>
> > > > >>
> > > > >> >> Thanks & Regards,Navinder>
> > > > >>
> > > > >> >> LinkedIn>
> > > > >>
> > > > >> >>>
> > > > >> >
> > > > >
> > > > >
> > > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to