Hey Navinder,

Thanks for updating the KIP, it's a lot easier to see the current
state of the proposal now.

A few remarks:
1. I'm sure it was just an artifact of revisions, but you have two
separate sections where you list additions to the KafkaStreams
interface. Can you consolidate those so we can see all the additions
at once?

2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
to be clearer that we're specifically measuring a number of offsets?
If you don't immediately agree, then I'd at least point out that we
usually refer to elements of Kafka topics as "records", not
"messages", so "recordLagEstimate" might be more appropriate.

3. The proposal mentions adding a map of the standby _partitions_ for
each host to AssignmentInfo. I assume this is designed to mirror the
existing "partitionsByHost" map. To keep the size of these metadata
messages down, maybe we can consider making two changes:
(a) for both actives and standbys, encode the _task ids_ instead of
_partitions_. Every member of the cluster has a copy of the topology,
so they can convert task ids into specific partitions on their own,
and task ids are only (usually) three characters.
(b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
standbys), which requires serializing all the hostinfos twice, maybe
we can pack them together in one map with a structured value (hostinfo
-> [actives,standbys]).
Both of these ideas still require bumping the protocol version to 6,
and they basically mean we drop the existing `PartitionsByHost` field
and add a new `TasksByHost` field with the structured value I
mentioned.

4. Can we avoid adding the new "lag refresh" config? The lags would
necessarily be approximate anyway, so adding the config seems to
increase the operational complexity of the system for little actual
benefit.

Thanks for the pseudocode, by the way, it really helps visualize how
these new interfaces would play together. And thanks again for the
update!
-John

On Thu, Oct 31, 2019 at 2:41 PM John Roesler <j...@confluent.io> wrote:
>
> Hey Vinoth,
>
> I started going over the KIP again yesterday. There are a lot of
> updates, and I didn't finish my feedback in one day. I'm working on it
> now.
>
> Thanks,
> John
>
> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <vchan...@confluent.io> wrote:
> >
> > Wondering if anyone has thoughts on these changes? I liked that the new
> > metadata fetch APIs provide all the information at once with consistent
> > naming..
> >
> > Any guidance on what you would like to be discussed or fleshed out more
> > before we call a VOTE?
> >
> > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> > <navinder_b...@yahoo.com.invalid> wrote:
> >
> > > Hi,
> > > We have made some edits in the KIP(
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
> > > after due deliberation on the agreed design to support the new query
> > > design. This includes the new public API to query offset/time lag
> > > information and other details related to querying standby tasks which have
> > > come up after thinking of thorough details.
> > >
> > >
> > >
> > >    - Addition of new config, “lag.fetch.interval.ms” to configure the
> > > interval of time/offset lag
> > >    - Addition of new class StoreLagInfo to store the periodically obtained
> > > time/offset lag
> > >    - Addition of two new functions in KafkaStreams, List<StoreLagInfo>
> > > allLagInfo() and List<StoreLagInfo> lagInfoForStore(String storeName) to
> > > return the lag information for an instance and a store respectively
> > >    - Addition of new class KeyQueryMetadata. We need topicPartition for
> > > each key to be matched with the lag API for the topic partition. One way 
> > > is
> > > to add new functions and fetch topicPartition from StreamsMetadataState 
> > > but
> > > we thought having one call and fetching StreamsMetadata and topicPartition
> > > is more cleaner.
> > >    -
> > > Renaming partitionsForHost to activePartitionsForHost in 
> > > StreamsMetadataState
> > > and partitionsByHostState to activePartitionsByHostState
> > > in StreamsPartitionAssignor
> > >    - We have also added the pseudo code of how all the changes will exist
> > > together and support the new querying APIs
> > >
> > > Please let me know if anything is pending now, before a vote can be
> > > started on this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, 
> > > Navinder
> > > Brar <navinder_b...@yahoo.com.invalid> wrote:
> > >
> > >  >> Since there are two soft votes for separate active/standby API
> > > methods, I also change my position on that. Fine with 2 separate
> > > methods. Once we remove the lag information from these APIs, returning a
> > > List is less attractive, since the ordering has no special meaning now.
> > > Agreed, now that we are not returning lag, I am also sold on having two
> > > separate functions. We already have one which returns streamsMetadata for
> > > active tasks, and now we can add another one for standbys.
> > >
> > >
> > >
> > >     On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar <
> > > vchan...@confluent.io> wrote:
> > >
> > >  +1 to Sophie's suggestion. Having both lag in terms of time and offsets 
> > > is
> > > good and makes for a more complete API.
> > >
> > > Since there are two soft votes for separate active/standby API methods, I
> > > also change my position on that. Fine with 2 separate methods.
> > > Once we remove the lag information from these APIs, returning a List is
> > > less attractive, since the ordering has no special meaning now.
> > >
> > > >> lag in offsets vs time: Having both, as suggested by Sophie would of
> > > course be best. What is a little unclear to me is, how in details are we
> > > going to compute both?
> > > @navinder may be next step is to flesh out these details and surface any
> > > larger changes we need to make if need be.
> > >
> > > Any other details we need to cover, before a VOTE can be called on this?
> > >
> > >
> > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <bbej...@gmail.com> wrote:
> > >
> > > > I am jumping in a little late here.
> > > >
> > > > Overall I agree with the proposal to push decision making on what/how to
> > > > query in the query layer.
> > > >
> > > > For point 5 from above, I'm slightly in favor of having a new method,
> > > > "standbyMetadataForKey()" or something similar.
> > > > Because even if we return all tasks in one list, the user will still 
> > > > have
> > > > to perform some filtering to separate the different tasks, so I don't
> > > feel
> > > > making two calls is a burden, and IMHO makes things more transparent for
> > > > the user.
> > > > If the final vote is for using an "isActive" field, I'm good with that 
> > > > as
> > > > well.
> > > >
> > > > Just my 2 cents.
> > > >
> > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> > > > <navinder_b...@yahoo.com.invalid> wrote:
> > > >
> > > > > I think now we are aligned on almost all the design parts. Summarising
> > > > > below what has been discussed above and we have a general consensus 
> > > > > on.
> > > > >
> > > > >
> > > > >    - Rather than broadcasting lag across all nodes at rebalancing/with
> > > > the
> > > > > heartbeat, we will just return a list of all available standby’s in 
> > > > > the
> > > > > system and the user can make IQ query any of those nodes which will
> > > > return
> > > > > the response, and the lag and offset time. Based on which user can
> > > decide
> > > > > if he wants to return the response back or call another standby.
> > > > >    -  The current metadata query frequency will not change. It will be
> > > > the
> > > > > same as it does now, i.e. before each query.
> > > > >
> > > > >    -  For fetching list<StreamsMetadata> in StreamsMetadataState.java
> > > and
> > > > > List<QueryableStoreProvider> in StreamThreadStateStoreProvider.java
> > > > (which
> > > > > will return all active stores which are running/restoring and replica
> > > > > stores which are running), we will add new functions and not disturb
> > > the
> > > > > existing functions
> > > > >
> > > > >    - There is no need to add new StreamsConfig for implementing this
> > > KIP
> > > > >
> > > > >    - We will add standbyPartitionsByHost in AssignmentInfo and
> > > > > StreamsMetadataState which would change the existing rebuildMetadata()
> > > > and
> > > > > setPartitionsByHostState()
> > > > >
> > > > >
> > > > >
> > > > > If anyone has any more concerns please feel free to add. Post this I
> > > will
> > > > > be initiating a vote.
> > > > > ~Navinder
> > > > >
> > > > >    On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> > > > > matth...@confluent.io> wrote:
> > > > >
> > > > >  Just to close the loop @Vinoth:
> > > > >
> > > > > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > > > > information
> > > > > > to AssignmentInfo, which gets sent to every participant.
> > > > >
> > > > > As explained by John, currently KIP-441 plans to only report the
> > > > > information to the leader. But I guess, with the new proposal to not
> > > > > broadcast this information anyway, this concern is invalidated anyway
> > > > >
> > > > > > 2. At-least I was under the assumption that it can be called per
> > > query,
> > > > > > since the API docs don't seem to suggest otherwise. Do you see any
> > > > > > potential issues if we call this every query? (we should benchmark
> > > this
> > > > > > nonetheless)
> > > > >
> > > > > I did not see a real issue if people refresh the metadata frequently,
> > > > > because it would be a local call. My main point was, that this would
> > > > > change the current usage pattern of the API, and we would clearly need
> > > > > to communicate this change. Similar to (1), this concern in 
> > > > > invalidated
> > > > > anyway.
> > > > >
> > > > >
> > > > > @John: I think it's a great idea to get rid of reporting lag, and
> > > > > pushing the decision making process about "what to query" into the
> > > query
> > > > > serving layer itself. This simplifies the overall design of this KIP
> > > > > significantly, and actually aligns very well with the idea that Kafka
> > > > > Streams (as it is a library) should only provide the basic building
> > > > > block. Many of my raised questions are invalided by this.
> > > > >
> > > > >
> > > > >
> > > > > Some questions are still open though:
> > > > >
> > > > > > 10) Do we need to distinguish between active(restoring) and standby
> > > > > > tasks? Or could be treat both as the same?
> > > > >
> > > > >
> > > > > @Vinoth: about (5). I see your point about multiple calls vs a single
> > > > > call. I still slightly prefer multiple calls, but it's highly
> > > subjective
> > > > > and I would also be fine to add an #isActive() method. Would be good
> > > the
> > > > > get feedback from others.
> > > > >
> > > > >
> > > > > For (14), ie, lag in offsets vs time: Having both, as suggested by
> > > > > Sophie would of course be best. What is a little unclear to me is, how
> > > > > in details are we going to compute both?
> > > > >
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > >
> > > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> > > > > > Just to chime in on the "report lag vs timestamp difference" issue, 
> > > > > > I
> > > > > would
> > > > > > actually advocate for both. As mentioned already, time difference is
> > > > > > probably a lot easier and/or more useful to reason about in terms of
> > > > > > "freshness"
> > > > > > of the state. But in the case when all queried stores are far 
> > > > > > behind,
> > > > lag
> > > > > > could
> > > > > > be used to estimate the recovery velocity. You can then get a 
> > > > > > (pretty
> > > > > rough)
> > > > > > idea of when a store might be ready, and wait until around then to
> > > > query
> > > > > > again.
> > > > > >
> > > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang <wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> I think I agree with John's recent reasoning as well: instead of
> > > > letting
> > > > > >> the storeMetadataAPI to return the staleness information, letting
> > > the
> > > > > >> client to query either active or standby and letting standby query
> > > > > response
> > > > > >> to include both the values + timestamp (or lag, as in diffs of
> > > > > timestamps)
> > > > > >> would actually be more intuitive -- not only the streams client is
> > > > > simpler,
> > > > > >> from user's perspective they also do not need to periodically
> > > refresh
> > > > > their
> > > > > >> staleness information from the client, but only need to make
> > > decisions
> > > > > on
> > > > > >> the fly whenever they need to query.
> > > > > >>
> > > > > >> Again the standby replica then need to know the current active
> > > task's
> > > > > >> timestamp, which can be found from the log end record's encoded
> > > > > timestamp;
> > > > > >> today we standby tasks do not read that specific record, but only
> > > > > refresh
> > > > > >> its knowledge on the log end offset, but I think refreshing the
> > > latest
> > > > > >> record timestamp is not a very bad request to add on the standby
> > > > > replicas.
> > > > > >>
> > > > > >>
> > > > > >> Guozhang
> > > > > >>
> > > > > >>
> > > > > >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar <
> > > vchan...@confluent.io
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> +1 As someone implementing a query routing layer, there is already
> > > a
> > > > > need
> > > > > >>> to have mechanisms in place to do healthchecks/failure detection 
> > > > > >>> to
> > > > > >> detect
> > > > > >>> failures for queries, while Streams rebalancing eventually kicks 
> > > > > >>> in
> > > > the
> > > > > >>> background.
> > > > > >>> So, pushing this complexity to the IQ client app keeps Streams
> > > > simpler
> > > > > as
> > > > > >>> well. IQs will be potentially issues at an order of magnitude more
> > > > > >>> frequently and it can achieve good freshness for the lag
> > > information.
> > > > > >>>
> > > > > >>> I would like to add however, that we would also need to introduce
> > > > apis
> > > > > in
> > > > > >>> KafkaStreams class, for obtaining lag information for all stores
> > > > local
> > > > > to
> > > > > >>> that host. This is for the IQs to relay back with the response/its
> > > > own
> > > > > >>> heartbeat mechanism.
> > > > > >>>
> > > > > >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler <j...@confluent.io>
> > > > > wrote:
> > > > > >>>
> > > > > >>>> Hi all,
> > > > > >>>>
> > > > > >>>> I've been mulling about this KIP, and I think I was on the wrong
> > > > track
> > > > > >>>> earlier with regard to task lags. Tl;dr: I don't think we should
> > > add
> > > > > >>>> lags at all to the metadata API (and also not to the
> > > AssignmentInfo
> > > > > >>>> protocol message).
> > > > > >>>>
> > > > > >>>> Like I mentioned early on, reporting lag via
> > > > > >>>> SubscriptionInfo/AssignmentInfo would only work while rebalances
> > > are
> > > > > >>>> happening. Once the group stabilizes, no members would be 
> > > > > >>>> notified
> > > > of
> > > > > >>>> each others' lags anymore. I had been thinking that the solution
> > > > would
> > > > > >>>> be the heartbeat proposal I mentioned earlier, but that proposal
> > > > would
> > > > > >>>> have reported the heartbeats of the members only to the leader
> > > > member
> > > > > >>>> (the one who makes assignments). To be useful in the context of
> > > > _this_
> > > > > >>>> KIP, we would also have to report the lags in the heartbeat
> > > > responses
> > > > > >>>> to of _all_ members. This is a concern to be because now _all_ 
> > > > > >>>> the
> > > > > >>>> lags get reported to _all_ the members on _every_ heartbeat... a
> > > lot
> > > > > >>>> of chatter.
> > > > > >>>>
> > > > > >>>> Plus, the proposal for KIP-441 is only to report the lags of each
> > > > > >>>> _task_. This is the sum of the lags of all the stores in the
> > > tasks.
> > > > > >>>> But this would be insufficient for KIP-535. For this kip, we 
> > > > > >>>> would
> > > > > >>>> want the lag specifically of the store we want to query. So this
> > > > > >>>> means, we have to report the lags of all the stores of all the
> > > > members
> > > > > >>>> to every member... even more chatter!
> > > > > >>>>
> > > > > >>>> The final nail in the coffin to me is that IQ clients would have
> > > to
> > > > > >>>> start refreshing their metadata quite frequently to stay up to
> > > date
> > > > on
> > > > > >>>> the lags, which adds even more overhead to the system.
> > > > > >>>>
> > > > > >>>> Consider a strawman alternative: we bring KIP-535 back to
> > > extending
> > > > > >>>> the metadata API to tell the client the active and standby
> > > replicas
> > > > > >>>> for the key in question (not including and "staleness/lag"
> > > > > >>>> restriction, just returning all the replicas). Then, the client
> > > > picks
> > > > > >>>> a replica and sends the query. The server returns the current lag
> > > > > >>>> along with the response (maybe in an HTML header or something).
> > > > Then,
> > > > > >>>> the client keeps a map of its last observed lags for each 
> > > > > >>>> replica,
> > > > and
> > > > > >>>> uses this information to prefer fresher replicas.
> > > > > >>>>
> > > > > >>>> OR, if it wants only to query the active replica, it would throw
> > > an
> > > > > >>>> error on any lag response greater than zero, refreshes its
> > > metadata
> > > > by
> > > > > >>>> re-querying the metadata API, and tries again with the current
> > > > active
> > > > > >>>> replica.
> > > > > >>>>
> > > > > >>>> This way, the lag information will be super fresh for the client,
> > > > and
> > > > > >>>> we keep the Metadata API / Assignment,Subscription / and 
> > > > > >>>> Heartbeat
> > > > as
> > > > > >>>> slim as possible.
> > > > > >>>>
> > > > > >>>> Side note: I do think that some time soon, we'll have to add a
> > > > library
> > > > > >>>> for IQ server/clients. I think that this logic will start to get
> > > > > >>>> pretty complex.
> > > > > >>>>
> > > > > >>>> I hope this thinking is reasonably clear!
> > > > > >>>> Thanks again,
> > > > > >>>> -John
> > > > > >>>>
> > > > > >>>> Does that
> > > > > >>>>
> > > > > >>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar <
> > > > > vchan...@confluent.io
> > > > > >>>
> > > > > >>>> wrote:
> > > > > >>>>>
> > > > > >>>>> Responding to the points raised by Matthias
> > > > > >>>>>
> > > > > >>>>> 1. IIUC John intends to add (or we can do this in this KIP) lag
> > > > > >>>> information
> > > > > >>>>> to AssignmentInfo, which gets sent to every participant.
> > > > > >>>>>
> > > > > >>>>> 2. At-least I was under the assumption that it can be called per
> > > > > >> query,
> > > > > >>>>> since the API docs don't seem to suggest otherwise. Do you see
> > > any
> > > > > >>>>> potential issues if we call this every query? (we should
> > > benchmark
> > > > > >> this
> > > > > >>>>> nonetheless)
> > > > > >>>>>
> > > > > >>>>> 4. Agree. metadataForKey() implicitly would return the active
> > > host
> > > > > >>>> metadata
> > > > > >>>>> (as it was before). We should also document this in that APIs
> > > > > >> javadoc,
> > > > > >>>>> given we have another method(s) that returns more host metadata
> > > > now.
> > > > > >>>>>
> > > > > >>>>> 5.  While I see the point, the app/caller has to make two
> > > different
> > > > > >>> APIs
> > > > > >>>>> calls to obtain active/standby and potentially do the same set 
> > > > > >>>>> of
> > > > > >>>> operation
> > > > > >>>>> to query the state. I personally still like a method like
> > > > isActive()
> > > > > >>>>> better, but don't have strong opinions.
> > > > > >>>>>
> > > > > >>>>> 9. If we do expose the lag information, could we just leave it
> > > upto
> > > > > >> to
> > > > > >>>> the
> > > > > >>>>> caller to decide whether it errors out or not and not make the
> > > > > >> decision
> > > > > >>>>> within Streams? i.e we don't need a new config
> > > > > >>>>>
> > > > > >>>>> 14. +1 . If it's easier to do right away. We started with number
> > > of
> > > > > >>>>> records, following the lead from KIP-441
> > > > > >>>>>
> > > > > >>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
> > > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > >>>>>
> > > > > >>>>>>
> > > > > >>>>>> Thanks, everyone for taking a look. Some very cool ideas have
> > > > flown
> > > > > >>> in.
> > > > > >>>>>>
> > > > > >>>>>>>> There was a follow-on idea I POCed to continuously share lag
> > > > > >>>>>> information in the heartbeat protocol+1 that would be great, I
> > > > will
> > > > > >>>> update
> > > > > >>>>>> the KIP assuming this work will finish soon
> > > > > >>>>>>>> I think that adding a new method to StreamsMetadataState and
> > > > > >>>>>> deprecating the existing method isthe best way to go; we just
> > > > can't
> > > > > >>>> change
> > > > > >>>>>> the return types of any existing methods.+1 on this, we will 
> > > > > >>>>>> add
> > > > > >> new
> > > > > >>>>>> methods for users who would be interested in querying back a
> > > list
> > > > > >> of
> > > > > >>>>>> possible options to query from and leave the current function
> > > > > >>>>>> getStreamsMetadataForKey() untouched for users who want 
> > > > > >>>>>> absolute
> > > > > >>>>>> consistency.
> > > > > >>>>>>>> why not just always return all available metadata (including
> > > > > >>>>>> active/standby or lag) and let the caller decide to which node
> > > > they
> > > > > >>>> want to
> > > > > >>>>>> route the query+1. I think this makes sense as from a user
> > > > > >> standpoint
> > > > > >>>> there
> > > > > >>>>>> is no difference b/w an active and a standby if both have same
> > > > lag,
> > > > > >>>> Infact
> > > > > >>>>>> users would be able to use this API to reduce query load on
> > > > > >> actives,
> > > > > >>> so
> > > > > >>>>>> returning all available options along with the current lag in
> > > each
> > > > > >>>> would
> > > > > >>>>>> make sense and leave it to user how they want to use this data.
> > > > > >> This
> > > > > >>>> has
> > > > > >>>>>> another added advantage. If a user queries any random machine
> > > for
> > > > a
> > > > > >>>> key and
> > > > > >>>>>> that machine has a replica for the partition(where key belongs)
> > > > > >> user
> > > > > >>>> might
> > > > > >>>>>> choose to serve the data from there itself(if it doesn’t lag
> > > much)
> > > > > >>>> rather
> > > > > >>>>>> than finding the active and making an IQ to that. This would
> > > save
> > > > > >>> some
> > > > > >>>>>> critical time in serving for some applications.
> > > > > >>>>>>>> Adding the lag in terms of timestamp diff comparing the
> > > > > >> committed
> > > > > >>>>>> offset.+1 on this, I think it’s more readable. But as John said
> > > > the
> > > > > >>>>>> function allMetadataForKey() is just returning the possible
> > > > options
> > > > > >>>> from
> > > > > >>>>>> where users can query a key, so we can even drop the parameter
> > > > > >>>>>> enableReplicaServing/tolerableDataStaleness and just return all
> > > > the
> > > > > >>>>>> streamsMetadata containing that key along with the offset 
> > > > > >>>>>> limit.
> > > > > >>>>>> Answering the questions posted by Matthias in sequence.
> > > > > >>>>>> 1. @John can you please comment on this one.2. Yeah the usage
> > > > > >> pattern
> > > > > >>>>>> would include querying this prior to every request 3. Will add
> > > the
> > > > > >>>> changes
> > > > > >>>>>> to StreamsMetadata in the KIP, would include changes in
> > > > > >>>> rebuildMetadata()
> > > > > >>>>>> etc.4. Makes sense, already addressed above5. Is it important
> > > from
> > > > > >> a
> > > > > >>>> user
> > > > > >>>>>> perspective if they are querying an  active(processing),
> > > > > >>>> active(restoring),
> > > > > >>>>>> a standby task if we have away of denoting lag in a readable
> > > > manner
> > > > > >>>> which
> > > > > >>>>>> kind of signifies the user that this is the best node to query
> > > the
> > > > > >>>> fresh
> > > > > >>>>>> data.6. Yes, I intend to return the actives and replicas in the
> > > > > >> same
> > > > > >>>> return
> > > > > >>>>>> list in allMetadataForKey()7. tricky8. yes, we need new
> > > functions
> > > > > >> to
> > > > > >>>> return
> > > > > >>>>>> activeRestoring and standbyRunning tasks.9. StreamsConfig
> > > doesn’t
> > > > > >>> look
> > > > > >>>> like
> > > > > >>>>>> of much use to me since we are giving all possible options via
> > > > this
> > > > > >>>>>> function, or they can use existing function
> > > > > >>> getStreamsMetadataForKey()
> > > > > >>>> and
> > > > > >>>>>> get just the active10. I think treat them both the same and let
> > > > the
> > > > > >>>> lag do
> > > > > >>>>>> the talking11. We are just sending them the option to query 
> > > > > >>>>>> from
> > > > in
> > > > > >>>>>> allMetadataForKey(), which doesn’t include any handle. We then
> > > > > >> query
> > > > > >>>> that
> > > > > >>>>>> machine for the key where it calls allStores() and tries to 
> > > > > >>>>>> find
> > > > > >> the
> > > > > >>>> task
> > > > > >>>>>> in activeRunning/activeRestoring/standbyRunning and adds the
> > > store
> > > > > >>>> handle
> > > > > >>>>>> here. 12. Need to verify, but during the exact point when store
> > > is
> > > > > >>>> closed
> > > > > >>>>>> to transition it from restoring to running the queries will
> > > fail.
> > > > > >> The
> > > > > >>>>>> caller in such case can have their own configurable retries to
> > > > > >> check
> > > > > >>>> again
> > > > > >>>>>> or try the replica if a call fails to active13. I think KIP-216
> > > is
> > > > > >>>> working
> > > > > >>>>>> on those lines, we might not need few of those exceptions since
> > > > now
> > > > > >>> the
> > > > > >>>>>> basic idea of this KIP is to support IQ during rebalancing.14.
> > > > > >>>> Addressed
> > > > > >>>>>> above, agreed it looks more readable.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>    On Tuesday, 22 October, 2019, 08:39:07 pm IST, Matthias J.
> > > Sax
> > > > > >> <
> > > > > >>>>>> matth...@confluent.io> wrote:
> > > > > >>>>>>
> > > > > >>>>>>  One more thought:
> > > > > >>>>>>
> > > > > >>>>>> 14) Is specifying the allowed lag in number of records a useful
> > > > way
> > > > > >>> for
> > > > > >>>>>> users to declare how stale an instance is allowed to be? Would
> > > it
> > > > > >> be
> > > > > >>>>>> more intuitive for users to specify the allowed lag in time
> > > units
> > > > > >>>> (would
> > > > > >>>>>> event time or processing time be better)? It seems hard for
> > > users
> > > > > >> to
> > > > > >>>>>> reason how "fresh" a store really is when number of records is
> > > > > >> used.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> -Matthias
> > > > > >>>>>>
> > > > > >>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote:
> > > > > >>>>>>> Some more follow up thoughts:
> > > > > >>>>>>>
> > > > > >>>>>>> 11) If we get a store handle of an active(restoring) task, and
> > > > > >> the
> > > > > >>>> task
> > > > > >>>>>>> transits to running, does the store handle become invalid and 
> > > > > >>>>>>> a
> > > > > >> new
> > > > > >>>> one
> > > > > >>>>>>> must be retrieved? Or can we "switch it out" underneath -- for
> > > > > >> this
> > > > > >>>>>>> case, how does the user know when they start to query the
> > > > > >>> up-to-date
> > > > > >>>>>> state?
> > > > > >>>>>>>
> > > > > >>>>>>> 12) Standby tasks will have the store open in regular mode,
> > > while
> > > > > >>>>>>> active(restoring) tasks open stores in "upgrade mode" for more
> > > > > >>>> efficient
> > > > > >>>>>>> bulk loading. When we switch the store into active mode, we
> > > close
> > > > > >>> it
> > > > > >>>> and
> > > > > >>>>>>> reopen it. What is the impact if we query the store during
> > > > > >> restore?
> > > > > >>>> What
> > > > > >>>>>>> is the impact if we close the store to transit to running (eg,
> > > > > >>> there
> > > > > >>>>>>> might be open iterators)?
> > > > > >>>>>>>
> > > > > >>>>>>> 13) Do we need to introduced new exception types? Compare
> > > KIP-216
> > > > > >>>>>>> (
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> > > > > >>>>>> )
> > > > > >>>>>>> that aims to improve the user experience with regard to IQ
> > > > > >>>> exceptions.
> > > > > >>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>> -Matthias
> > > > > >>>>>>>
> > > > > >>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote:
> > > > > >>>>>>>> Thanks for the KIP.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Couple of comments:
> > > > > >>>>>>>>
> > > > > >>>>>>>> 1) With regard to KIP-441, my current understanding is that
> > > the
> > > > > >>> lag
> > > > > >>>>>>>> information is only reported to the leader (please correct me
> > > > > >> if I
> > > > > >>>> am
> > > > > >>>>>>>> wrong). This seems to be quite a limitation to actually use
> > > the
> > > > > >>> lag
> > > > > >>>>>>>> information.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 2) The idea of the metadata API is actually to get metadata
> > > once
> > > > > >>> and
> > > > > >>>>>>>> only refresh the metadata if a store was migrated. The 
> > > > > >>>>>>>> current
> > > > > >>>> proposal
> > > > > >>>>>>>> would require to get the metadata before each query. The KIP
> > > > > >>> should
> > > > > >>>>>>>> describe the usage pattern and impact in more detail.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 3) Currently, the KIP does not list the public API changes in
> > > > > >>>> detail.
> > > > > >>>>>>>> Please list all methods you intend to deprecate and list all
> > > > > >>>> methods you
> > > > > >>>>>>>> intend to add (best, using a code-block markup -- compare
> > > > > >>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
> > > > > >>>>>>>> as an example)
> > > > > >>>>>>>>
> > > > > >>>>>>>> 4) Also note (as already pointed out by John), that we cannot
> > > > > >> have
> > > > > >>>> any
> > > > > >>>>>>>> breaking API changes. Thus, the API should be designed in a
> > > > > >> fully
> > > > > >>>>>>>> backward compatible manner.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 5) Returning a list of metadata object makes it hard for user
> > > to
> > > > > >>>> know if
> > > > > >>>>>>>> the first object refers to the active(processing),
> > > > > >>>> active(restoring), or
> > > > > >>>>>>>> a standby task. IMHO, we should be more explicit. For
> > > example, a
> > > > > >>>>>>>> metadata object could have a flag that one can test via
> > > > > >>>> `#isActive()`.
> > > > > >>>>>>>> Or maybe even better, we could keep the current API as-is and
> > > > > >> add
> > > > > >>>>>>>> something like `standbyMetadataForKey()` (and similar methods
> > > > > >> for
> > > > > >>>>>>>> other). Having just a flag `isActive()` is a little subtle 
> > > > > >>>>>>>> and
> > > > > >>>> having
> > > > > >>>>>>>> new overloads would make the API much clearer (passing in a
> > > > > >>> boolean
> > > > > >>>> flag
> > > > > >>>>>>>> does not seem to be a nice API).
> > > > > >>>>>>>>
> > > > > >>>>>>>> 6) Do you intent to return all standby metadata information 
> > > > > >>>>>>>> at
> > > > > >>> once,
> > > > > >>>>>>>> similar to `allMetadata()` -- seems to be useful.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 7) Even if the lag information is propagated to all 
> > > > > >>>>>>>> instances,
> > > > > >> it
> > > > > >>>> will
> > > > > >>>>>>>> happen in an async manner. Hence, I am wondering if we should
> > > > > >>>> address
> > > > > >>>>>>>> this race condition (I think we should). The idea would be to
> > > > > >>> check
> > > > > >>>> if a
> > > > > >>>>>>>> standby/active(restoring) task is actually still within the
> > > lag
> > > > > >>>> bounds
> > > > > >>>>>>>> when a query is executed and we would throw an exception if
> > > not.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 8) The current `KafkaStreams#state()` method only returns a
> > > > > >> handle
> > > > > >>>> to
> > > > > >>>>>>>> stores of active(processing) tasks. How can a user actually
> > > get
> > > > > >> a
> > > > > >>>> handle
> > > > > >>>>>>>> to an store of an active(restoring) or standby task for
> > > > > >> querying?
> > > > > >>>> Seems
> > > > > >>>>>>>> we should add a new method to get standby handles? Changing
> > > the
> > > > > >>>>>>>> semantics to existing `state()` would be possible, but I 
> > > > > >>>>>>>> think
> > > > > >>>> adding a
> > > > > >>>>>>>> new method is preferable?
> > > > > >>>>>>>>
> > > > > >>>>>>>> 9) How does the user actually specify the acceptable lag? A
> > > > > >> global
> > > > > >>>>>>>> config via StreamsConfig (this would be a public API change
> > > that
> > > > > >>>> needs
> > > > > >>>>>>>> to be covered in the KIP)? Or on a per-store or even 
> > > > > >>>>>>>> per-query
> > > > > >>>> basis for
> > > > > >>>>>>>> more flexibility? We could also have a global setting that is
> > > > > >> used
> > > > > >>>> as
> > > > > >>>>>>>> default and allow to overwrite it on a per-query basis.
> > > > > >>>>>>>>
> > > > > >>>>>>>> 10) Do we need to distinguish between active(restoring) and
> > > > > >>> standby
> > > > > >>>>>>>> tasks? Or could be treat both as the same?
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> -Matthias
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
> > > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable lag" into
> > > the
> > > > > >>>>>>>>> configuration at all, or even making it a parameter on
> > > > > >>>>>> `allMetadataForKey`,
> > > > > >>>>>>>>> why not just _always_ return all available metadata
> > > (including
> > > > > >>>>>>>>> active/standby or lag) and let the caller decide to which
> > > node
> > > > > >>> they
> > > > > >>>>>> want to
> > > > > >>>>>>>>> route the query?
> > > > > >>>>>>>>> +1 on exposing lag information via the APIs. IMO without
> > > having
> > > > > >>>>>>>>> continuously updated/fresh lag information, its true value
> > > as a
> > > > > >>>> signal
> > > > > >>>>>> for
> > > > > >>>>>>>>> query routing decisions is much limited. But we can design
> > > the
> > > > > >>> API
> > > > > >>>>>> around
> > > > > >>>>>>>>> this model and iterate? Longer term, we should have
> > > > > >> continuously
> > > > > >>>>>> shared lag
> > > > > >>>>>>>>> information.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>> more general to refactor it to "allMetadataForKey(long
> > > > > >>>>>>>>> tolerableDataStaleness, ...)", and when it's set to 0 it
> > > means
> > > > > >>>> "active
> > > > > >>>>>> task
> > > > > >>>>>>>>> only".
> > > > > >>>>>>>>> +1 IMO if we plan on having `enableReplicaServing`, it makes
> > > > > >>> sense
> > > > > >>>> to
> > > > > >>>>>>>>> generalize based on dataStaleness. This seems complementary
> > > to
> > > > > >>>>>> exposing the
> > > > > >>>>>>>>> lag information itself.
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>>> This is actually not a public api change at all, and I'm
> > > > > >>>> planning to
> > > > > >>>>>>>>> implement it asap as a precursor to the rest of KIP-441
> > > > > >>>>>>>>> +1 again. Do we have a concrete timeline for when this 
> > > > > >>>>>>>>> change
> > > > > >>> will
> > > > > >>>>>> land on
> > > > > >>>>>>>>> master? I would like to get the implementation wrapped up 
> > > > > >>>>>>>>> (as
> > > > > >>> much
> > > > > >>>> as
> > > > > >>>>>>>>> possible) by end of the month. :). But I agree this
> > > sequencing
> > > > > >>>> makes
> > > > > >>>>>>>>> sense..
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang <
> > > > > >>> wangg...@gmail.com>
> > > > > >>>>>> wrote:
> > > > > >>>>>>>>>
> > > > > >>>>>>>>>> Hi Navinder,
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Thanks for the KIP, I have a high level question about the
> > > > > >>>> proposed
> > > > > >>>>>> API
> > > > > >>>>>>>>>> regarding:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > > >>>>>> enableReplicaServing...)"
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> I'm wondering if it's more general to refactor it to
> > > > > >>>>>>>>>> "allMetadataForKey(long tolerableDataStaleness, ...)", and
> > > > > >> when
> > > > > >>>> it's
> > > > > >>>>>> set to
> > > > > >>>>>>>>>> 0 it means "active task only". Behind the scene, we can 
> > > > > >>>>>>>>>> have
> > > > > >> the
> > > > > >>>>>> committed
> > > > > >>>>>>>>>> offsets to encode the stream time as well, so that when
> > > > > >>> processing
> > > > > >>>>>> standby
> > > > > >>>>>>>>>> tasks the stream process knows not long the lag in terms of
> > > > > >>>> offsets
> > > > > >>>>>>>>>> comparing to the committed offset (internally we call it
> > > > > >> offset
> > > > > >>>>>> limit), but
> > > > > >>>>>>>>>> also the lag in terms of timestamp diff comparing the
> > > > > >> committed
> > > > > >>>>>> offset.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Also encoding the timestamp as part of offset have other
> > > > > >>> benefits
> > > > > >>>> for
> > > > > >>>>>>>>>> improving Kafka Streams time semantics as well, but for
> > > > > >> KIP-535
> > > > > >>>>>> itself I
> > > > > >>>>>>>>>> think it can help giving users a more intuitive interface 
> > > > > >>>>>>>>>> to
> > > > > >>>> reason
> > > > > >>>>>> about.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> Guozhang
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John Roesler <
> > > > > >>> j...@confluent.io>
> > > > > >>>>>> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> Hey Navinder,
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Thanks for the KIP! I've been reading over the discussion
> > > > > >> thus
> > > > > >>>> far,
> > > > > >>>>>>>>>>> and I have a couple of thoughts to pile on as well:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> It seems confusing to propose the API in terms of the
> > > current
> > > > > >>>> system
> > > > > >>>>>>>>>>> state, but also propose how the API would look if/when
> > > > > >> KIP-441
> > > > > >>> is
> > > > > >>>>>>>>>>> implemented. It occurs to me that the only part of KIP-441
> > > > > >> that
> > > > > >>>> would
> > > > > >>>>>>>>>>> affect you is the availability of the lag information in
> > > the
> > > > > >>>>>>>>>>> SubscriptionInfo message. This is actually not a public 
> > > > > >>>>>>>>>>> api
> > > > > >>>> change at
> > > > > >>>>>>>>>>> all, and I'm planning to implement it asap as a precursor
> > > to
> > > > > >>> the
> > > > > >>>> rest
> > > > > >>>>>>>>>>> of KIP-441, so maybe you can just build on top of KIP-441
> > > and
> > > > > >>>> assume
> > > > > >>>>>>>>>>> the lag information will be available. Then you could have
> > > a
> > > > > >>> more
> > > > > >>>>>>>>>>> straightforward proposal (e.g., mention that you'd return
> > > the
> > > > > >>> lag
> > > > > >>>>>>>>>>> information in AssignmentInfo as well as in the
> > > > > >> StreamsMetadata
> > > > > >>>> in
> > > > > >>>>>>>>>>> some form, or make use of it in the API somehow).
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> I'm partially motivated in that former point because it
> > > seems
> > > > > >>>> like
> > > > > >>>>>>>>>>> understanding how callers would bound the staleness for
> > > their
> > > > > >>> use
> > > > > >>>>>> case
> > > > > >>>>>>>>>>> is _the_ key point for this KIP. FWIW, I think that adding
> > > a
> > > > > >>> new
> > > > > >>>>>>>>>>> method to StreamsMetadataState and deprecating the 
> > > > > >>>>>>>>>>> existing
> > > > > >>>> method is
> > > > > >>>>>>>>>>> the best way to go; we just can't change the return types
> > > of
> > > > > >>> any
> > > > > >>>>>>>>>>> existing methods.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable lag" into
> > > the
> > > > > >>>>>>>>>>> configuration at all, or even making it a parameter on
> > > > > >>>>>>>>>>> `allMetadataForKey`, why not just _always_ return all
> > > > > >> available
> > > > > >>>>>>>>>>> metadata (including active/standby or lag) and let the
> > > caller
> > > > > >>>> decide
> > > > > >>>>>>>>>>> to which node they want to route the query? This method
> > > isn't
> > > > > >>>> making
> > > > > >>>>>>>>>>> any queries itself; it's merely telling you where the 
> > > > > >>>>>>>>>>> local
> > > > > >>>> Streams
> > > > > >>>>>>>>>>> instance _thinks_ the key in question is located. Just
> > > > > >>> returning
> > > > > >>>> all
> > > > > >>>>>>>>>>> available information lets the caller implement any
> > > semantics
> > > > > >>>> they
> > > > > >>>>>>>>>>> desire around querying only active stores, or standbys, or
> > > > > >>>> recovering
> > > > > >>>>>>>>>>> stores, or whatever.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> One fly in the ointment, which you may wish to consider if
> > > > > >>>> proposing
> > > > > >>>>>>>>>>> to use lag information, is that the cluster would only
> > > become
> > > > > >>>> aware
> > > > > >>>>>> of
> > > > > >>>>>>>>>>> new lag information during rebalances. Even in the full
> > > > > >>>> expression of
> > > > > >>>>>>>>>>> KIP-441, this information would stop being propagated when
> > > > > >> the
> > > > > >>>>>> cluster
> > > > > >>>>>>>>>>> achieves a balanced task distribution. There was a
> > > follow-on
> > > > > >>>> idea I
> > > > > >>>>>>>>>>> POCed to continuously share lag information in the
> > > heartbeat
> > > > > >>>>>> protocol,
> > > > > >>>>>>>>>>> which you might be interested in, if you want to make sure
> > > > > >> that
> > > > > >>>> nodes
> > > > > >>>>>>>>>>> are basically _always_ aware of each others' lag on
> > > different
> > > > > >>>>>>>>>>> partitions: https://github.com/apache/kafka/pull/7096
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Thanks again!
> > > > > >>>>>>>>>>> -John
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
> > > > > >>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Thanks, Vinoth. Looks like we are on the same page. I 
> > > > > >>>>>>>>>>>> will
> > > > > >> add
> > > > > >>>> some
> > > > > >>>>>> of
> > > > > >>>>>>>>>>> these explanations to the KIP as well. Have assigned the
> > > > > >>>> KAFKA-6144
> > > > > >>>>>> to
> > > > > >>>>>>>>>>> myself and KAFKA-8994 is closed(by you). As suggested, we
> > > > > >> will
> > > > > >>>>>> replace
> > > > > >>>>>>>>>>> "replica" with "standby".
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> In the new API,
> > > > > >>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > > >>>>>>>>>>> enableReplicaServing, String storeName, K key,
> > > Serializer<K>
> > > > > >>>>>>>>>>> keySerializer)" Do we really need a per key configuration?
> > > > > >> or a
> > > > > >>>> new
> > > > > >>>>>>>>>>> StreamsConfig is good enough?>> Coming from experience,
> > > when
> > > > > >>>> teams
> > > > > >>>>>> are
> > > > > >>>>>>>>>>> building a platform with Kafka Streams and these API's
> > > serve
> > > > > >>>> data to
> > > > > >>>>>>>>>>> multiple teams, we can't have a generalized config that
> > > says
> > > > > >>> as a
> > > > > >>>>>>>>>> platform
> > > > > >>>>>>>>>>> we will support stale reads or not. It should be the 
> > > > > >>>>>>>>>>> choice
> > > > > >> of
> > > > > >>>>>> someone
> > > > > >>>>>>>>>> who
> > > > > >>>>>>>>>>> is calling the API's to choose whether they are ok with
> > > stale
> > > > > >>>> reads
> > > > > >>>>>> or
> > > > > >>>>>>>>>> not.
> > > > > >>>>>>>>>>> Makes sense?
> > > > > >>>>>>>>>>>>    On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth
> > > > > >>>> Chandar <
> > > > > >>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>  Looks like we are covering ground :)
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Only if it is within a permissible  range(say 10000) we
> > > > > >> will
> > > > > >>>> serve
> > > > > >>>>>>>>>> from
> > > > > >>>>>>>>>>>> Restoring state of active.
> > > > > >>>>>>>>>>>> +1 on having a knob like this.. My reasoning is as
> > > follows.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Looking at the Streams state as a read-only distributed 
> > > > > >>>>>>>>>>>> kv
> > > > > >>>> store.
> > > > > >>>>>> With
> > > > > >>>>>>>>>>>> num_standby = f , we should be able to tolerate f 
> > > > > >>>>>>>>>>>> failures
> > > > > >> and
> > > > > >>>> if
> > > > > >>>>>> there
> > > > > >>>>>>>>>>> is
> > > > > >>>>>>>>>>>> a f+1' failure, the system should be unavailable.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> A) So with num_standby=0, the system should be 
> > > > > >>>>>>>>>>>> unavailable
> > > > > >>> even
> > > > > >>>> if
> > > > > >>>>>>>>>> there
> > > > > >>>>>>>>>>> is
> > > > > >>>>>>>>>>>> 1 failure and thats my argument for not allowing querying
> > > in
> > > > > >>>>>>>>>> restoration
> > > > > >>>>>>>>>>>> state, esp in this case it will be a total rebuild of the
> > > > > >>> state
> > > > > >>>>>> (which
> > > > > >>>>>>>>>>> IMO
> > > > > >>>>>>>>>>>> cannot be considered a normal fault free operational
> > > state).
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> B) Even there are standby's, say num_standby=2, if the
> > > user
> > > > > >>>> decides
> > > > > >>>>>> to
> > > > > >>>>>>>>>>> shut
> > > > > >>>>>>>>>>>> down all 3 instances, then only outcome should be
> > > > > >>> unavailability
> > > > > >>>>>> until
> > > > > >>>>>>>>>>> all
> > > > > >>>>>>>>>>>> of them come back or state is rebuilt on other nodes in
> > > the
> > > > > >>>>>> cluster. In
> > > > > >>>>>>>>>>>> normal operations, f <= 2 and when a failure does happen
> > > we
> > > > > >>> can
> > > > > >>>> then
> > > > > >>>>>>>>>>> either
> > > > > >>>>>>>>>>>> choose to be C over A and fail IQs until replication is
> > > > > >> fully
> > > > > >>>>>> caught up
> > > > > >>>>>>>>>>> or
> > > > > >>>>>>>>>>>> choose A over C by serving in restoring state as long as
> > > lag
> > > > > >>> is
> > > > > >>>>>>>>>> minimal.
> > > > > >>>>>>>>>>> If
> > > > > >>>>>>>>>>>> even with f=1 say, all the standbys are lagging a lot due
> > > to
> > > > > >>>> some
> > > > > >>>>>>>>>> issue,
> > > > > >>>>>>>>>>>> then that should be considered a failure since that is
> > > > > >>> different
> > > > > >>>>>> from
> > > > > >>>>>>>>>>>> normal/expected operational mode. Serving reads with
> > > > > >> unbounded
> > > > > >>>>>>>>>>> replication
> > > > > >>>>>>>>>>>> lag and calling it "available" may not be very usable or
> > > > > >> even
> > > > > >>>>>> desirable
> > > > > >>>>>>>>>>> :)
> > > > > >>>>>>>>>>>> IMHO, since it gives the user no way to reason about the
> > > app
> > > > > >>>> that is
> > > > > >>>>>>>>>>> going
> > > > > >>>>>>>>>>>> to query this store.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> So there is definitely a need to distinguish between :
> > > > > >>>> Replication
> > > > > >>>>>>>>>>> catchup
> > > > > >>>>>>>>>>>> while being in fault free state vs Restoration of state
> > > when
> > > > > >>> we
> > > > > >>>> lose
> > > > > >>>>>>>>>> more
> > > > > >>>>>>>>>>>> than f standbys. This knob is a great starting point
> > > towards
> > > > > >>>> this.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> If you agree with some of the explanation above, please
> > > feel
> > > > > >>>> free to
> > > > > >>>>>>>>>>>> include it in the KIP as well since this is sort of our
> > > > > >> design
> > > > > >>>>>>>>>> principle
> > > > > >>>>>>>>>>>> here..
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> Small nits :
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> - let's standardize on "standby" instead of "replica", 
> > > > > >>>>>>>>>>>> KIP
> > > > > >> or
> > > > > >>>>>> code,  to
> > > > > >>>>>>>>>>> be
> > > > > >>>>>>>>>>>> consistent with rest of Streams code/docs?
> > > > > >>>>>>>>>>>> - Can we merge KAFKA-8994 into KAFKA-6144 now and close
> > > the
> > > > > >>>> former?
> > > > > >>>>>>>>>>>> Eventually need to consolidate KAFKA-6555 as well
> > > > > >>>>>>>>>>>> - In the new API,
> > > > > >>>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > > >>>>>>>>>>>> enableReplicaServing, String storeName, K key,
> > > Serializer<K>
> > > > > >>>>>>>>>>> keySerializer)" Do
> > > > > >>>>>>>>>>>> we really need a per key configuration? or a new
> > > > > >> StreamsConfig
> > > > > >>>> is
> > > > > >>>>>> good
> > > > > >>>>>>>>>>>> enough?
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
> > > > > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> @Vinoth, I have incorporated a few of the discussions we
> > > > > >> have
> > > > > >>>> had
> > > > > >>>>>> in
> > > > > >>>>>>>>>>> the
> > > > > >>>>>>>>>>>>> KIP.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> In the current code, t0 and t1 serve queries from
> > > > > >>>> Active(Running)
> > > > > >>>>>>>>>>>>> partition. For case t2, we are planning to return
> > > > > >>>>>>>>>> List<StreamsMetadata>
> > > > > >>>>>>>>>>>>> such that it returns <StreamsMetadata(A),
> > > > > >> StreamsMetadata(B)>
> > > > > >>>> so
> > > > > >>>>>> that
> > > > > >>>>>>>>>>> if IQ
> > > > > >>>>>>>>>>>>> fails on A, the replica on B can serve the data by
> > > enabling
> > > > > >>>> serving
> > > > > >>>>>>>>>>> from
> > > > > >>>>>>>>>>>>> replicas. This still does not solve case t3 and t4 since
> > > B
> > > > > >>> has
> > > > > >>>> been
> > > > > >>>>>>>>>>>>> promoted to active but it is in Restoring state to
> > > catchup
> > > > > >>>> till A’s
> > > > > >>>>>>>>>>> last
> > > > > >>>>>>>>>>>>> committed position as we don’t serve from Restoring 
> > > > > >>>>>>>>>>>>> state
> > > > > >> in
> > > > > >>>> Active
> > > > > >>>>>>>>>>> and new
> > > > > >>>>>>>>>>>>> Replica on R is building itself from scratch. Both these
> > > > > >>> cases
> > > > > >>>> can
> > > > > >>>>>> be
> > > > > >>>>>>>>>>>>> solved if we start serving from Restoring state of 
> > > > > >>>>>>>>>>>>> active
> > > > > >> as
> > > > > >>>> well
> > > > > >>>>>>>>>>> since it
> > > > > >>>>>>>>>>>>> is almost equivalent to previous Active.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> There could be a case where all replicas of a partition
> > > > > >>> become
> > > > > >>>>>>>>>>> unavailable
> > > > > >>>>>>>>>>>>> and active and all replicas of that partition are
> > > building
> > > > > >>>>>> themselves
> > > > > >>>>>>>>>>> from
> > > > > >>>>>>>>>>>>> scratch, in this case, the state in Active is far behind
> > > > > >> even
> > > > > >>>>>> though
> > > > > >>>>>>>>>>> it is
> > > > > >>>>>>>>>>>>> in Restoring state. To cater to such cases that we don’t
> > > > > >>> serve
> > > > > >>>> from
> > > > > >>>>>>>>>>> this
> > > > > >>>>>>>>>>>>> state we can either add another state before Restoring 
> > > > > >>>>>>>>>>>>> or
> > > > > >>>> check the
> > > > > >>>>>>>>>>>>> difference between last committed offset and current
> > > > > >>> position.
> > > > > >>>> Only
> > > > > >>>>>>>>>> if
> > > > > >>>>>>>>>>> it
> > > > > >>>>>>>>>>>>> is within a permissible range (say 10000) we will serve
> > > > > >> from
> > > > > >>>>>>>>>> Restoring
> > > > > >>>>>>>>>>> the
> > > > > >>>>>>>>>>>>> state of Active.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>    On Wednesday, 16 October, 2019, 10:01:35 pm IST,
> > > Vinoth
> > > > > >>>> Chandar
> > > > > >>>>>> <
> > > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>  Thanks for the updates on the KIP, Navinder!
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> Few comments
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> - AssignmentInfo is not public API?. But we will change
> > > it
> > > > > >>> and
> > > > > >>>> thus
> > > > > >>>>>>>>>>> need to
> > > > > >>>>>>>>>>>>> increment the version and test for version_probing etc.
> > > > > >> Good
> > > > > >>> to
> > > > > >>>>>>>>>>> separate
> > > > > >>>>>>>>>>>>> that from StreamsMetadata changes (which is public API)
> > > > > >>>>>>>>>>>>> - From what I see, there is going to be choice between
> > > the
> > > > > >>>>>> following
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>  A) introducing a new *KafkaStreams::allMetadataForKey()
> > > > > >> *API
> > > > > >>>> that
> > > > > >>>>>>>>>>>>> potentially returns List<StreamsMetadata> ordered from
> > > most
> > > > > >>>> upto
> > > > > >>>>>> date
> > > > > >>>>>>>>>>> to
> > > > > >>>>>>>>>>>>> least upto date replicas. Today we cannot fully 
> > > > > >>>>>>>>>>>>> implement
> > > > > >>> this
> > > > > >>>>>>>>>>> ordering,
> > > > > >>>>>>>>>>>>> since all we know is which hosts are active and which 
> > > > > >>>>>>>>>>>>> are
> > > > > >>>> standbys.
> > > > > >>>>>>>>>>>>> However, this aligns well with the future. KIP-441 adds
> > > the
> > > > > >>> lag
> > > > > >>>>>>>>>>> information
> > > > > >>>>>>>>>>>>> to the rebalancing protocol. We could also sort replicas
> > > > > >>> based
> > > > > >>>> on
> > > > > >>>>>> the
> > > > > >>>>>>>>>>>>> report lags eventually. This is fully backwards
> > > compatible
> > > > > >>> with
> > > > > >>>>>>>>>>> existing
> > > > > >>>>>>>>>>>>> clients. Only drawback I see is the naming of the
> > > existing
> > > > > >>>> method
> > > > > >>>>>>>>>>>>> KafkaStreams::metadataForKey, not conveying the
> > > distinction
> > > > > >>>> that it
> > > > > >>>>>>>>>>> simply
> > > > > >>>>>>>>>>>>> returns the active replica i.e allMetadataForKey.get(0).
> > > > > >>>>>>>>>>>>>  B) Change KafkaStreams::metadataForKey() to return a
> > > List.
> > > > > >>>> Its a
> > > > > >>>>>>>>>>> breaking
> > > > > >>>>>>>>>>>>> change.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> I prefer A, since none of the semantics/behavior changes
> > > > > >> for
> > > > > >>>>>> existing
> > > > > >>>>>>>>>>>>> users. Love to hear more thoughts. Can we also work this
> > > > > >> into
> > > > > >>>> the
> > > > > >>>>>>>>>> KIP?
> > > > > >>>>>>>>>>>>> I already implemented A to unblock myself for now. Seems
> > > > > >>>> feasible
> > > > > >>>>>> to
> > > > > >>>>>>>>>>> do.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar <
> > > > > >>>>>>>>>> vchan...@confluent.io
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> I get your point. But suppose there is a replica 
> > > > > >>>>>>>>>>>>>>>> which
> > > > > >> has
> > > > > >>>> just
> > > > > >>>>>>>>>>> become
> > > > > >>>>>>>>>>>>>> active, so in that case replica will still be building
> > > > > >>> itself
> > > > > >>>> from
> > > > > >>>>>>>>>>>>> scratch
> > > > > >>>>>>>>>>>>>> and this active will go to restoring state till it
> > > catches
> > > > > >>> up
> > > > > >>>> with
> > > > > >>>>>>>>>>>>> previous
> > > > > >>>>>>>>>>>>>> active, wouldn't serving from a restoring active make
> > > more
> > > > > >>>> sense
> > > > > >>>>>>>>>>> than a
> > > > > >>>>>>>>>>>>>> replica in such case.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> KIP-441 will change this behavior such that promotion 
> > > > > >>>>>>>>>>>>>> to
> > > > > >>>> active
> > > > > >>>>>>>>>>> happens
> > > > > >>>>>>>>>>>>>> based on how caught up a replica is. So, once we have
> > > that
> > > > > >>>> (work
> > > > > >>>>>>>>>>> underway
> > > > > >>>>>>>>>>>>>> already for 2.5 IIUC) and user sets
> > > num.standby.replicas >
> > > > > >>> 0,
> > > > > >>>> then
> > > > > >>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>> staleness window should not be that long as you
> > > describe.
> > > > > >>> IMO
> > > > > >>>> if
> > > > > >>>>>>>>>> user
> > > > > >>>>>>>>>>>>> wants
> > > > > >>>>>>>>>>>>>> availability for state, then should configure
> > > > > >>>> num.standby.replicas
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> 0.
> > > > > >>>>>>>>>>>>> If
> > > > > >>>>>>>>>>>>>> not, then on a node loss, few partitions would be
> > > > > >>> unavailable
> > > > > >>>> for
> > > > > >>>>>> a
> > > > > >>>>>>>>>>> while
> > > > > >>>>>>>>>>>>>> (there are other ways to bring this window down, which 
> > > > > >>>>>>>>>>>>>> I
> > > > > >>> won't
> > > > > >>>>>>>>>> bring
> > > > > >>>>>>>>>>> in
> > > > > >>>>>>>>>>>>>> here). We could argue for querying a restoring active
> > > > > >> (say a
> > > > > >>>> new
> > > > > >>>>>>>>>> node
> > > > > >>>>>>>>>>>>> added
> > > > > >>>>>>>>>>>>>> to replace a faulty old node) based on AP vs CP
> > > > > >> principles.
> > > > > >>>> But
> > > > > >>>>>> not
> > > > > >>>>>>>>>>> sure
> > > > > >>>>>>>>>>>>>> reading really really old values for the sake of
> > > > > >>> availability
> > > > > >>>> is
> > > > > >>>>>>>>>>> useful.
> > > > > >>>>>>>>>>>>> No
> > > > > >>>>>>>>>>>>>> AP data system would be inconsistent for such a long
> > > time
> > > > > >> in
> > > > > >>>>>>>>>>> practice.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> So, I still feel just limiting this to standby reads
> > > > > >>> provides
> > > > > >>>> best
> > > > > >>>>>>>>>>>>>> semantics.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Just my 2c. Would love to see what others think as 
> > > > > >>>>>>>>>>>>>> well.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
> > > > > >>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Hi Vinoth,
> > > > > >>>>>>>>>>>>>>> Thanks for the feedback.
> > > > > >>>>>>>>>>>>>>>  Can we link the JIRA, discussion thread also to the
> > > > > >> KIP.>>
> > > > > >>>>>> Added.
> > > > > >>>>>>>>>>>>>>> Based on the discussion on KAFKA-6144, I was under the
> > > > > >>>> impression
> > > > > >>>>>>>>>>> that
> > > > > >>>>>>>>>>>>>>> this KIP is also going to cover exposing of the 
> > > > > >>>>>>>>>>>>>>> standby
> > > > > >>>>>>>>>> information
> > > > > >>>>>>>>>>> in
> > > > > >>>>>>>>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That
> > > would
> > > > > >>>> require
> > > > > >>>>>> a
> > > > > >>>>>>>>>>>>> public
> > > > > >>>>>>>>>>>>>>> API change?>> Sure, I can add changes for 8994 in this
> > > > > >> KIP
> > > > > >>>> and
> > > > > >>>>>>>>>> link
> > > > > >>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well.
> > > > > >>>>>>>>>>>>>>>  KIP seems to be focussing on restoration when a new
> > > node
> > > > > >>> is
> > > > > >>>>>>>>>> added.
> > > > > >>>>>>>>>>>>>>> KIP-441 is underway and has some major changes 
> > > > > >>>>>>>>>>>>>>> proposed
> > > > > >> for
> > > > > >>>> this.
> > > > > >>>>>>>>>> It
> > > > > >>>>>>>>>>>>> would
> > > > > >>>>>>>>>>>>>>> be good to clarify dependencies if any. Without
> > > KIP-441,
> > > > > >> I
> > > > > >>>> am not
> > > > > >>>>>>>>>>> very
> > > > > >>>>>>>>>>>>> sure
> > > > > >>>>>>>>>>>>>>> if we should allow reads from nodes in RESTORING 
> > > > > >>>>>>>>>>>>>>> state,
> > > > > >>> which
> > > > > >>>>>>>>>> could
> > > > > >>>>>>>>>>>>> amount
> > > > > >>>>>>>>>>>>>>> to many minutes/few hours of stale reads?  This is
> > > > > >>> different
> > > > > >>>> from
> > > > > >>>>>>>>>>>>> allowing
> > > > > >>>>>>>>>>>>>>> querying standby replicas, which could be mostly 
> > > > > >>>>>>>>>>>>>>> caught
> > > > > >> up
> > > > > >>>> and
> > > > > >>>>>> the
> > > > > >>>>>>>>>>>>>>> staleness window could be much smaller/tolerable. 
> > > > > >>>>>>>>>>>>>>> (once
> > > > > >>>> again the
> > > > > >>>>>>>>>>> focus
> > > > > >>>>>>>>>>>>> on
> > > > > >>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But suppose there is 
> > > > > >>>>>>>>>>>>>>> a
> > > > > >>>> replica
> > > > > >>>>>>>>>>> which
> > > > > >>>>>>>>>>>>> has
> > > > > >>>>>>>>>>>>>>> just become active, so in that case replica will still
> > > be
> > > > > >>>>>> building
> > > > > >>>>>>>>>>>>> itself
> > > > > >>>>>>>>>>>>>>> from scratch and this active will go to restoring 
> > > > > >>>>>>>>>>>>>>> state
> > > > > >>> till
> > > > > >>>> it
> > > > > >>>>>>>>>>> catches
> > > > > >>>>>>>>>>>>> up
> > > > > >>>>>>>>>>>>>>> with previous active, wouldn't serving from a 
> > > > > >>>>>>>>>>>>>>> restoring
> > > > > >>>> active
> > > > > >>>>>>>>>> make
> > > > > >>>>>>>>>>> more
> > > > > >>>>>>>>>>>>>>> sense than a replica in such case.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Finally, we may need to introduce a configuration to
> > > > > >>> control
> > > > > >>>>>> this.
> > > > > >>>>>>>>>>> Some
> > > > > >>>>>>>>>>>>>>> users may prefer errors to stale data. Can we also add
> > > it
> > > > > >>> to
> > > > > >>>> the
> > > > > >>>>>>>>>>> KIP?>>
> > > > > >>>>>>>>>>>>>>> Will add this.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Regards,
> > > > > >>>>>>>>>>>>>>> Navinder
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth Chandar <
> > > v...@confluent.io
> > > > > >>>>> wrote:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Hi Navinder,>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few thoughts>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> - Can we link the JIRA, discussion thread also to the
> > > > > >> KIP>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> - Based on the discussion on KAFKA-6144, I was under
> > > the
> > > > > >>>>>>>>>> impression
> > > > > >>>>>>>>>>>>>>> that>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> this KIP is also going to cover exposing of the
> > > standby
> > > > > >>>>>>>>>>> information in>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That
> > > would
> > > > > >>>> require
> > > > > >>>>>>>>>> a
> > > > > >>>>>>>>>>>>>>> public>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> API change?>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> - KIP seems to be focussing on restoration when a new
> > > > > >> node
> > > > > >>>> is
> > > > > >>>>>>>>>>> added.>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> KIP-441 is underway and has some major changes
> > > proposed
> > > > > >>> for
> > > > > >>>>>> this.
> > > > > >>>>>>>>>>> It
> > > > > >>>>>>>>>>>>>>> would>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> be good to clarify dependencies if any. Without
> > > > > >> KIP-441, I
> > > > > >>>> am
> > > > > >>>>>> not
> > > > > >>>>>>>>>>> very
> > > > > >>>>>>>>>>>>>>> sure>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> if we should allow reads from nodes in RESTORING
> > > state,
> > > > > >>>> which
> > > > > >>>>>>>>>> could
> > > > > >>>>>>>>>>>>>>> amount>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> to many minutes/few hours of stale reads?  This is
> > > > > >>> different
> > > > > >>>>>>>>>>>>>>> fromallowing>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> querying standby replicas, which could be mostly
> > > caught
> > > > > >> up
> > > > > >>>> and
> > > > > >>>>>>>>>> the>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> staleness window could be much smaller/tolerable.
> > > (once
> > > > > >>>> again
> > > > > >>>>>> the
> > > > > >>>>>>>>>>> focus
> > > > > >>>>>>>>>>>>>>> on>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> KAFKA-8994)>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> - Finally, we may need to introduce a configuration 
> > > > > >>>>>>>>>>>>>>>> to
> > > > > >>>> control
> > > > > >>>>>>>>>>> this.
> > > > > >>>>>>>>>>>>>>> Some>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> users may prefer errors to stale data. Can we also 
> > > > > >>>>>>>>>>>>>>>> add
> > > > > >> it
> > > > > >>>> to the
> > > > > >>>>>>>>>>> KIP?>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Thanks>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> Vinoth>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Hi,>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Starting a discussion on the KIP to Allow state
> > > stores
> > > > > >> to
> > > > > >>>> serve
> > > > > >>>>>>>>>>>>> stale>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> reads during rebalance(>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> ).>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> Thanks & Regards,Navinder>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>> LinkedIn>
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> --
> > > > > >>>>>>>>>> -- Guozhang
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> -- Guozhang
> > > > > >>
> > > > > >
> > > > >
> > > >

Reply via email to