Ping :) Any thoughts?

On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar <vchan...@confluent.io> wrote:

> >>  I'm having some trouble wrapping my head around what race conditions
> might occur, other than the fundamentally broken state in which different
> instances are running totally different topologies.
> 3. @both Without the topic partitions that the tasks can map back to, we
> have to rely on topology/cluster metadata in each Streams instance to map
> the task back. If the source topics are wild carded for e,g then each
> instance could have different source topics in topology, until the next
> rebalance happens. You can also read my comments from here
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
>
>
> >> seems hard to imagine how encoding arbitrarily long topic names plus an
> integer for the partition number could be as efficient as task ids, which
> are just two integers.
> 3. if you still have concerns about the efficacy of dictionary encoding,
> happy to engage. The link above also has some benchmark code I used.
> Theoretically, we would send each topic name atleast once, so yes if you
> compare a 10-20 character topic name + an integer to two integers, it will
> be more bytes. But its constant overhead proportional to size of topic name
> and with 4,8,12, partitions the size difference between baseline (version 4
> where we just repeated topic names for each topic partition) and the two
> approaches becomes narrow.
>
> >>Plus, Navinder is going to implement a bunch of protocol code that we
> might just want to change when the discussion actually does take place, if
> ever.
> >>it'll just be a mental burden for everyone to remember that we want to
> have this follow-up discussion.
> 3. Is n't people changing same parts of code and tracking follow ups a
> common thing, we need to deal with anyway?  For this KIP, is n't it enough
> to reason about whether the additional map on top of the topic dictionary
> would incur more overhead than the sending task_ids? I don't think it's
> case, both of them send two integers. As I see it, we can do a separate
> follow up to (re)pursue the task_id conversion and get it working for both
> maps within the next release?
>
> >>Can you elaborate on "breaking up the API"? It looks like there are
> already separate API calls in the proposal, one for time-lag, and another
> for offset-lag, so are they not already broken up?
> The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo
> objects which has both time and offset lags. If we had separate APIs, say
> (e.g offsetLagForStore(), timeLagForStore()), we can implement offset
> version using the offset lag that the streams instance already tracks i.e
> no need for external calls. The time based lag API would incur the kafka
> read for the timestamp. makes sense?
>
> Based on the discussions so far, I only see these two pending issues to be
> aligned on. Is there any other open item people want to bring up?
>
> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
>> Regarding 3) I'm wondering, does your concern still apply even now
>> that the pluggable PartitionGrouper interface has been deprecated?
>> Now that we can be sure that the DefaultPartitionGrouper is used to
>> generate
>> the taskId -> partitions mapping, we should be able to convert any taskId
>> to any
>> partitions.
>>
>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <j...@confluent.io> wrote:
>>
>> > Hey Vinoth, thanks for the reply!
>> >
>> > 3.
>> > I get that it's not the main focus of this KIP, but if it's ok, it
>> > would be nice to hash out this point right now. It only came up
>> > because this KIP-535 is substantially extending the pattern in
>> > question. If we push it off until later, then the reviewers are going
>> > to have to suspend their concerns not just while voting for the KIP,
>> > but also while reviewing the code. Plus, Navinder is going to
>> > implement a bunch of protocol code that we might just want to change
>> > when the discussion actually does take place, if ever. Finally, it'll
>> > just be a mental burden for everyone to remember that we want to have
>> > this follow-up discussion.
>> >
>> > It makes sense what you say... the specific assignment is already
>> > encoded in the "main" portion of the assignment, not in the "userdata"
>> > part. It also makes sense that it's simpler to reason about races if
>> > you simply get all the information about the topics and partitions
>> > directly from the assignor, rather than get the partition number from
>> > the assignor and the topic name from your own a priori knowledge of
>> > the topology. On the other hand, I'm having some trouble wrapping my
>> > head around what race conditions might occur, other than the
>> > fundamentally broken state in which different instances are running
>> > totally different topologies. Sorry, but can you remind us of the
>> > specific condition?
>> >
>> > To the efficiency counterargument, it seems hard to imagine how
>> > encoding arbitrarily long topic names plus an integer for the
>> > partition number could be as efficient as task ids, which are just two
>> > integers. It seems like this would only be true if topic names were 4
>> > characters or less.
>> >
>> > 4.
>> > Yeah, clearly, it would not be a good idea to query the metadata
>> > before every single IQ query. I think there are plenty of established
>> > patterns for distributed database clients to follow. Can you elaborate
>> > on "breaking up the API"? It looks like there are already separate API
>> > calls in the proposal, one for time-lag, and another for offset-lag,
>> > so are they not already broken up? FWIW, yes, I agree, the offset lag
>> > is already locally known, so we don't need to build in an extra
>> > synchronous broker API call, just one for the time-lag.
>> >
>> > Thanks again for the discussion,
>> > -John
>> >
>> > On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar <vchan...@confluent.io>
>> > wrote:
>> > >
>> > > 3. Right now, we still get the topic partitions assigned as a part of
>> the
>> > > top level Assignment object (the one that wraps AssignmentInfo) and
>> use
>> > > that to convert taskIds back. This list of only contains assignments
>> for
>> > > that particular instance. Attempting to also reverse map for "all" the
>> > > tasksIds in the streams cluster i.e all the topic partitions in these
>> > > global assignment maps was what was problematic. By explicitly sending
>> > the
>> > > global assignment maps as actual topic partitions,  group coordinator
>> > (i.e
>> > > the leader that computes the assignment's ) is able to consistently
>> > enforce
>> > > its view of the topic metadata. Still don't think doing such a change
>> > that
>> > > forces you to reconsider semantics, is not needed to save bits on
>> wire.
>> > May
>> > > be we can discuss this separately from this KIP?
>> > >
>> > > 4. There needs to be some caching/interval somewhere though since we
>> > don't
>> > > want to make 1 kafka read per 1 IQ potentially. But I think its a
>> valid
>> > > suggestion, to make this call just synchronous and leave the caching
>> or
>> > how
>> > > often you want to call to the application. Would it be good to then
>> break
>> > > up the APIs for time and offset based lag?  We can obtain offset based
>> > lag
>> > > for free? Only incur the overhead of reading kafka if we want time
>> > > based lags?
>> > >
>> > > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <
>> sop...@confluent.io>
>> > > wrote:
>> > >
>> > > > Adding on to John's response to 3), can you clarify when and why
>> > exactly we
>> > > > cannot
>> > > > convert between taskIds and partitions? If that's really the case I
>> > don't
>> > > > feel confident
>> > > > that the StreamsPartitionAssignor is not full of bugs...
>> > > >
>> > > > It seems like it currently just encodes a list of all partitions
>> (the
>> > > > assignment) and also
>> > > > a list of the corresponding task ids, duplicated to ensure each
>> > partition
>> > > > has the corresponding
>> > > > taskId at the same offset into the list. Why is that problematic?
>> > > >
>> > > >
>> > > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler <j...@confluent.io>
>> > wrote:
>> > > >
>> > > > > Thanks, all, for considering the points!
>> > > > >
>> > > > > 3. Interesting. I have a vague recollection of that... Still,
>> though,
>> > > > > it seems a little fishy. After all, we return the assignments
>> > > > > themselves as task ids, and the members have to map these to topic
>> > > > > partitions in order to configure themselves properly. If it's too
>> > > > > complicated to get this right, then how do we know that Streams is
>> > > > > computing the correct partitions at all?
>> > > > >
>> > > > > 4. How about just checking the log-end timestamp when you call the
>> > > > > method? Then, when you get an answer, it's as fresh as it could
>> > > > > possibly be. And as a user you have just one, obvious, "knob" to
>> > > > > configure how much overhead you want to devote to checking... If
>> you
>> > > > > want to call the broker API less frequently, you just call the
>> > Streams
>> > > > > API less frequently. And you don't have to worry about the
>> > > > > relationship between your invocations of that method and the
>> config
>> > > > > setting (e.g., you'll never get a negative number, which you
>> could if
>> > > > > you check the log-end timestamp less frequently than you check the
>> > > > > lag).
>> > > > >
>> > > > > Thanks,
>> > > > > -John
>> > > > >
>> > > > > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
>> > > > > <navinder_b...@yahoo.com.invalid> wrote:
>> > > > > >
>> > > > > > Thanks John for going through this.
>> > > > > >
>> > > > > >    - +1, makes sense
>> > > > > >    - +1, no issues there
>> > > > > >    - Yeah the initial patch I had submitted for K-7149(
>> > > > > https://github.com/apache/kafka/pull/6935) to reduce
>> assignmentInfo
>> > > > > object had taskIds but the merged PR had similar size according to
>> > Vinoth
>> > > > > and it was simpler so if the end result is of same size, it would
>> not
>> > > > make
>> > > > > sense to pivot from dictionary and again move to taskIDs.
>> > > > > >    - Not sure about what a good default would be if we don't
>> have a
>> > > > > configurable setting. This gives the users the flexibility to the
>> > users
>> > > > to
>> > > > > serve their requirements as at the end of the day it would take
>> CPU
>> > > > cycles.
>> > > > > I am ok with starting it with a default and see how it goes based
>> > upon
>> > > > > feedback.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Navinder
>> > > > > >     On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth
>> Chandar <
>> > > > > vchan...@confluent.io> wrote:
>> > > > > >
>> > > > > >  1. Was trying to spell them out separately. but makes sense for
>> > > > > > readability. done
>> > > > > >
>> > > > > > 2. No I immediately agree :) .. makes sense. @navinder?
>> > > > > >
>> > > > > > 3. I actually attempted only sending taskIds while working on
>> > > > KAFKA-7149.
>> > > > > > Its non-trivial to handle edges cases resulting from newly added
>> > topic
>> > > > > > partitions and wildcarded topic entries. I ended up simplifying
>> it
>> > to
>> > > > > just
>> > > > > > dictionary encoding the topic names to reduce size. We can apply
>> > the
>> > > > same
>> > > > > > technique here for this map. Additionally, we could also
>> dictionary
>> > > > > encode
>> > > > > > HostInfo, given its now repeated twice. I think this would save
>> > more
>> > > > > space
>> > > > > > than having a flag per topic partition entry. Lmk if you are
>> okay
>> > with
>> > > > > > this.
>> > > > > >
>> > > > > > 4. This opens up a good discussion. Given we support time lag
>> > estimates
>> > > > > > also, we need to read the tail record of the changelog
>> periodically
>> > > > > (unlike
>> > > > > > offset lag, which we can potentially piggyback on metadata in
>> > > > > > ConsumerRecord IIUC). we thought we should have a config that
>> > control
>> > > > how
>> > > > > > often this read happens? Let me know if there is a simple way to
>> > get
>> > > > > > timestamp value of the tail record that we are missing.
>> > > > > >
>> > > > > > On Thu, Oct 31, 2019 at 12:58 PM John Roesler <
>> j...@confluent.io>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hey Navinder,
>> > > > > > >
>> > > > > > > Thanks for updating the KIP, it's a lot easier to see the
>> current
>> > > > > > > state of the proposal now.
>> > > > > > >
>> > > > > > > A few remarks:
>> > > > > > > 1. I'm sure it was just an artifact of revisions, but you have
>> > two
>> > > > > > > separate sections where you list additions to the KafkaStreams
>> > > > > > > interface. Can you consolidate those so we can see all the
>> > additions
>> > > > > > > at once?
>> > > > > > >
>> > > > > > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate"
>> > instead,
>> > > > > > > to be clearer that we're specifically measuring a number of
>> > offsets?
>> > > > > > > If you don't immediately agree, then I'd at least point out
>> that
>> > we
>> > > > > > > usually refer to elements of Kafka topics as "records", not
>> > > > > > > "messages", so "recordLagEstimate" might be more appropriate.
>> > > > > > >
>> > > > > > > 3. The proposal mentions adding a map of the standby
>> > _partitions_ for
>> > > > > > > each host to AssignmentInfo. I assume this is designed to
>> mirror
>> > the
>> > > > > > > existing "partitionsByHost" map. To keep the size of these
>> > metadata
>> > > > > > > messages down, maybe we can consider making two changes:
>> > > > > > > (a) for both actives and standbys, encode the _task ids_
>> instead
>> > of
>> > > > > > > _partitions_. Every member of the cluster has a copy of the
>> > topology,
>> > > > > > > so they can convert task ids into specific partitions on their
>> > own,
>> > > > > > > and task ids are only (usually) three characters.
>> > > > > > > (b) instead of encoding two maps (hostinfo -> actives AND
>> > hostinfo ->
>> > > > > > > standbys), which requires serializing all the hostinfos twice,
>> > maybe
>> > > > > > > we can pack them together in one map with a structured value
>> > > > (hostinfo
>> > > > > > > -> [actives,standbys]).
>> > > > > > > Both of these ideas still require bumping the protocol version
>> > to 6,
>> > > > > > > and they basically mean we drop the existing
>> `PartitionsByHost`
>> > field
>> > > > > > > and add a new `TasksByHost` field with the structured value I
>> > > > > > > mentioned.
>> > > > > > >
>> > > > > > > 4. Can we avoid adding the new "lag refresh" config? The lags
>> > would
>> > > > > > > necessarily be approximate anyway, so adding the config seems
>> to
>> > > > > > > increase the operational complexity of the system for little
>> > actual
>> > > > > > > benefit.
>> > > > > > >
>> > > > > > > Thanks for the pseudocode, by the way, it really helps
>> visualize
>> > how
>> > > > > > > these new interfaces would play together. And thanks again for
>> > the
>> > > > > > > update!
>> > > > > > > -John
>> > > > > > >
>> > > > > > > On Thu, Oct 31, 2019 at 2:41 PM John Roesler <
>> j...@confluent.io>
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > Hey Vinoth,
>> > > > > > > >
>> > > > > > > > I started going over the KIP again yesterday. There are a
>> lot
>> > of
>> > > > > > > > updates, and I didn't finish my feedback in one day. I'm
>> > working on
>> > > > > it
>> > > > > > > > now.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > John
>> > > > > > > >
>> > > > > > > > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <
>> > > > > vchan...@confluent.io>
>> > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > Wondering if anyone has thoughts on these changes? I liked
>> > that
>> > > > > the new
>> > > > > > > > > metadata fetch APIs provide all the information at once
>> with
>> > > > > consistent
>> > > > > > > > > naming..
>> > > > > > > > >
>> > > > > > > > > Any guidance on what you would like to be discussed or
>> > fleshed
>> > > > out
>> > > > > more
>> > > > > > > > > before we call a VOTE?
>> > > > > > > > >
>> > > > > > > > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
>> > > > > > > > > <navinder_b...@yahoo.com.invalid> wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi,
>> > > > > > > > > > We have made some edits in the KIP(
>> > > > > > > > > >
>> > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>> > > > > > > )
>> > > > > > > > > > after due deliberation on the agreed design to support
>> the
>> > new
>> > > > > query
>> > > > > > > > > > design. This includes the new public API to query
>> > offset/time
>> > > > lag
>> > > > > > > > > > information and other details related to querying
>> standby
>> > tasks
>> > > > > > > which have
>> > > > > > > > > > come up after thinking of thorough details.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >    - Addition of new config, “lag.fetch.interval.ms” to
>> > > > > configure
>> > > > > > > the
>> > > > > > > > > > interval of time/offset lag
>> > > > > > > > > >    - Addition of new class StoreLagInfo to store the
>> > > > periodically
>> > > > > > > obtained
>> > > > > > > > > > time/offset lag
>> > > > > > > > > >    - Addition of two new functions in KafkaStreams,
>> > > > > > > List<StoreLagInfo>
>> > > > > > > > > > allLagInfo() and List<StoreLagInfo>
>> lagInfoForStore(String
>> > > > > > > storeName) to
>> > > > > > > > > > return the lag information for an instance and a store
>> > > > > respectively
>> > > > > > > > > >    - Addition of new class KeyQueryMetadata. We need
>> > > > > topicPartition
>> > > > > > > for
>> > > > > > > > > > each key to be matched with the lag API for the topic
>> > > > partition.
>> > > > > One
>> > > > > > > way is
>> > > > > > > > > > to add new functions and fetch topicPartition from
>> > > > > > > StreamsMetadataState but
>> > > > > > > > > > we thought having one call and fetching StreamsMetadata
>> and
>> > > > > > > topicPartition
>> > > > > > > > > > is more cleaner.
>> > > > > > > > > >    -
>> > > > > > > > > > Renaming partitionsForHost to activePartitionsForHost in
>> > > > > > > StreamsMetadataState
>> > > > > > > > > > and partitionsByHostState to activePartitionsByHostState
>> > > > > > > > > > in StreamsPartitionAssignor
>> > > > > > > > > >    - We have also added the pseudo code of how all the
>> > changes
>> > > > > will
>> > > > > > > exist
>> > > > > > > > > > together and support the new querying APIs
>> > > > > > > > > >
>> > > > > > > > > > Please let me know if anything is pending now, before a
>> > vote
>> > > > can
>> > > > > be
>> > > > > > > > > > started on this.  On Saturday, 26 October, 2019,
>> 05:41:44
>> > pm
>> > > > IST,
>> > > > > > > Navinder
>> > > > > > > > > > Brar <navinder_b...@yahoo.com.invalid> wrote:
>> > > > > > > > > >
>> > > > > > > > > >  >> Since there are two soft votes for separate
>> > active/standby
>> > > > > API
>> > > > > > > > > > methods, I also change my position on that. Fine with 2
>> > > > separate
>> > > > > > > > > > methods. Once we remove the lag information from these
>> > APIs,
>> > > > > > > returning a
>> > > > > > > > > > List is less attractive, since the ordering has no
>> special
>> > > > > meaning
>> > > > > > > now.
>> > > > > > > > > > Agreed, now that we are not returning lag, I am also
>> sold
>> > on
>> > > > > having
>> > > > > > > two
>> > > > > > > > > > separate functions. We already have one which returns
>> > > > > > > streamsMetadata for
>> > > > > > > > > > active tasks, and now we can add another one for
>> standbys.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >    On Saturday, 26 October, 2019, 03:55:16 am IST,
>> Vinoth
>> > > > > Chandar <
>> > > > > > > > > > vchan...@confluent.io> wrote:
>> > > > > > > > > >
>> > > > > > > > > >  +1 to Sophie's suggestion. Having both lag in terms of
>> > time
>> > > > and
>> > > > > > > offsets is
>> > > > > > > > > > good and makes for a more complete API.
>> > > > > > > > > >
>> > > > > > > > > > Since there are two soft votes for separate
>> active/standby
>> > API
>> > > > > > > methods, I
>> > > > > > > > > > also change my position on that. Fine with 2 separate
>> > methods.
>> > > > > > > > > > Once we remove the lag information from these APIs,
>> > returning a
>> > > > > List
>> > > > > > > is
>> > > > > > > > > > less attractive, since the ordering has no special
>> meaning
>> > now.
>> > > > > > > > > >
>> > > > > > > > > > >> lag in offsets vs time: Having both, as suggested by
>> > Sophie
>> > > > > would
>> > > > > > > of
>> > > > > > > > > > course be best. What is a little unclear to me is, how
>> in
>> > > > details
>> > > > > > > are we
>> > > > > > > > > > going to compute both?
>> > > > > > > > > > @navinder may be next step is to flesh out these details
>> > and
>> > > > > surface
>> > > > > > > any
>> > > > > > > > > > larger changes we need to make if need be.
>> > > > > > > > > >
>> > > > > > > > > > Any other details we need to cover, before a VOTE can be
>> > called
>> > > > > on
>> > > > > > > this?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <
>> > bbej...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > I am jumping in a little late here.
>> > > > > > > > > > >
>> > > > > > > > > > > Overall I agree with the proposal to push decision
>> > making on
>> > > > > > > what/how to
>> > > > > > > > > > > query in the query layer.
>> > > > > > > > > > >
>> > > > > > > > > > > For point 5 from above, I'm slightly in favor of
>> having
>> > a new
>> > > > > > > method,
>> > > > > > > > > > > "standbyMetadataForKey()" or something similar.
>> > > > > > > > > > > Because even if we return all tasks in one list, the
>> user
>> > > > will
>> > > > > > > still have
>> > > > > > > > > > > to perform some filtering to separate the different
>> > tasks,
>> > > > so I
>> > > > > > > don't
>> > > > > > > > > > feel
>> > > > > > > > > > > making two calls is a burden, and IMHO makes things
>> more
>> > > > > > > transparent for
>> > > > > > > > > > > the user.
>> > > > > > > > > > > If the final vote is for using an "isActive" field,
>> I'm
>> > good
>> > > > > with
>> > > > > > > that as
>> > > > > > > > > > > well.
>> > > > > > > > > > >
>> > > > > > > > > > > Just my 2 cents.
>> > > > > > > > > > >
>> > > > > > > > > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>> > > > > > > > > > > <navinder_b...@yahoo.com.invalid> wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > I think now we are aligned on almost all the design
>> > parts.
>> > > > > > > Summarising
>> > > > > > > > > > > > below what has been discussed above and we have a
>> > general
>> > > > > > > consensus on.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >    - Rather than broadcasting lag across all nodes
>> at
>> > > > > > > rebalancing/with
>> > > > > > > > > > > the
>> > > > > > > > > > > > heartbeat, we will just return a list of all
>> available
>> > > > > standby’s
>> > > > > > > in the
>> > > > > > > > > > > > system and the user can make IQ query any of those
>> > nodes
>> > > > > which
>> > > > > > > will
>> > > > > > > > > > > return
>> > > > > > > > > > > > the response, and the lag and offset time. Based on
>> > which
>> > > > > user
>> > > > > > > can
>> > > > > > > > > > decide
>> > > > > > > > > > > > if he wants to return the response back or call
>> another
>> > > > > standby.
>> > > > > > > > > > > >    -  The current metadata query frequency will not
>> > change.
>> > > > > It
>> > > > > > > will be
>> > > > > > > > > > > the
>> > > > > > > > > > > > same as it does now, i.e. before each query.
>> > > > > > > > > > > >
>> > > > > > > > > > > >    -  For fetching list<StreamsMetadata> in
>> > > > > > > StreamsMetadataState.java
>> > > > > > > > > > and
>> > > > > > > > > > > > List<QueryableStoreProvider> in
>> > > > > > > StreamThreadStateStoreProvider.java
>> > > > > > > > > > > (which
>> > > > > > > > > > > > will return all active stores which are
>> > running/restoring
>> > > > and
>> > > > > > > replica
>> > > > > > > > > > > > stores which are running), we will add new functions
>> > and
>> > > > not
>> > > > > > > disturb
>> > > > > > > > > > the
>> > > > > > > > > > > > existing functions
>> > > > > > > > > > > >
>> > > > > > > > > > > >    - There is no need to add new StreamsConfig for
>> > > > > implementing
>> > > > > > > this
>> > > > > > > > > > KIP
>> > > > > > > > > > > >
>> > > > > > > > > > > >    - We will add standbyPartitionsByHost in
>> > AssignmentInfo
>> > > > > and
>> > > > > > > > > > > > StreamsMetadataState which would change the existing
>> > > > > > > rebuildMetadata()
>> > > > > > > > > > > and
>> > > > > > > > > > > > setPartitionsByHostState()
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > If anyone has any more concerns please feel free to
>> > add.
>> > > > Post
>> > > > > > > this I
>> > > > > > > > > > will
>> > > > > > > > > > > > be initiating a vote.
>> > > > > > > > > > > > ~Navinder
>> > > > > > > > > > > >
>> > > > > > > > > > > >    On Friday, 25 October, 2019, 12:05:29 pm IST,
>> > Matthias
>> > > > J.
>> > > > > Sax
>> > > > > > > <
>> > > > > > > > > > > > matth...@confluent.io> wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > >  Just to close the loop @Vinoth:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > 1. IIUC John intends to add (or we can do this in
>> > this
>> > > > > KIP) lag
>> > > > > > > > > > > > information
>> > > > > > > > > > > > > to AssignmentInfo, which gets sent to every
>> > participant.
>> > > > > > > > > > > >
>> > > > > > > > > > > > As explained by John, currently KIP-441 plans to
>> only
>> > > > report
>> > > > > the
>> > > > > > > > > > > > information to the leader. But I guess, with the new
>> > > > > proposal to
>> > > > > > > not
>> > > > > > > > > > > > broadcast this information anyway, this concern is
>> > > > > invalidated
>> > > > > > > anyway
>> > > > > > > > > > > >
>> > > > > > > > > > > > > 2. At-least I was under the assumption that it
>> can be
>> > > > > called
>> > > > > > > per
>> > > > > > > > > > query,
>> > > > > > > > > > > > > since the API docs don't seem to suggest
>> otherwise.
>> > Do
>> > > > you
>> > > > > see
>> > > > > > > any
>> > > > > > > > > > > > > potential issues if we call this every query? (we
>> > should
>> > > > > > > benchmark
>> > > > > > > > > > this
>> > > > > > > > > > > > > nonetheless)
>> > > > > > > > > > > >
>> > > > > > > > > > > > I did not see a real issue if people refresh the
>> > metadata
>> > > > > > > frequently,
>> > > > > > > > > > > > because it would be a local call. My main point was,
>> > that
>> > > > > this
>> > > > > > > would
>> > > > > > > > > > > > change the current usage pattern of the API, and we
>> > would
>> > > > > > > clearly need
>> > > > > > > > > > > > to communicate this change. Similar to (1), this
>> > concern in
>> > > > > > > invalidated
>> > > > > > > > > > > > anyway.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > @John: I think it's a great idea to get rid of
>> > reporting
>> > > > > lag, and
>> > > > > > > > > > > > pushing the decision making process about "what to
>> > query"
>> > > > > into
>> > > > > > > the
>> > > > > > > > > > query
>> > > > > > > > > > > > serving layer itself. This simplifies the overall
>> > design of
>> > > > > this
>> > > > > > > KIP
>> > > > > > > > > > > > significantly, and actually aligns very well with
>> the
>> > idea
>> > > > > that
>> > > > > > > Kafka
>> > > > > > > > > > > > Streams (as it is a library) should only provide the
>> > basic
>> > > > > > > building
>> > > > > > > > > > > > block. Many of my raised questions are invalided by
>> > this.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Some questions are still open though:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > 10) Do we need to distinguish between
>> > active(restoring)
>> > > > and
>> > > > > > > standby
>> > > > > > > > > > > > > tasks? Or could be treat both as the same?
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > @Vinoth: about (5). I see your point about multiple
>> > calls
>> > > > vs
>> > > > > a
>> > > > > > > single
>> > > > > > > > > > > > call. I still slightly prefer multiple calls, but
>> it's
>> > > > highly
>> > > > > > > > > > subjective
>> > > > > > > > > > > > and I would also be fine to add an #isActive()
>> method.
>> > > > Would
>> > > > > be
>> > > > > > > good
>> > > > > > > > > > the
>> > > > > > > > > > > > get feedback from others.
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > For (14), ie, lag in offsets vs time: Having both,
>> as
>> > > > > suggested
>> > > > > > > by
>> > > > > > > > > > > > Sophie would of course be best. What is a little
>> > unclear to
>> > > > > me
>> > > > > > > is, how
>> > > > > > > > > > > > in details are we going to compute both?
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > -Matthias
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
>> > > > > > > > > > > > > Just to chime in on the "report lag vs timestamp
>> > > > > difference"
>> > > > > > > issue, I
>> > > > > > > > > > > > would
>> > > > > > > > > > > > > actually advocate for both. As mentioned already,
>> > time
>> > > > > > > difference is
>> > > > > > > > > > > > > probably a lot easier and/or more useful to reason
>> > about
>> > > > in
>> > > > > > > terms of
>> > > > > > > > > > > > > "freshness"
>> > > > > > > > > > > > > of the state. But in the case when all queried
>> > stores are
>> > > > > far
>> > > > > > > behind,
>> > > > > > > > > > > lag
>> > > > > > > > > > > > > could
>> > > > > > > > > > > > > be used to estimate the recovery velocity. You can
>> > then
>> > > > > get a
>> > > > > > > (pretty
>> > > > > > > > > > > > rough)
>> > > > > > > > > > > > > idea of when a store might be ready, and wait
>> until
>> > > > around
>> > > > > > > then to
>> > > > > > > > > > > query
>> > > > > > > > > > > > > again.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang <
>> > > > > > > wangg...@gmail.com>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >> I think I agree with John's recent reasoning as
>> > well:
>> > > > > instead
>> > > > > > > of
>> > > > > > > > > > > letting
>> > > > > > > > > > > > >> the storeMetadataAPI to return the staleness
>> > > > information,
>> > > > > > > letting
>> > > > > > > > > > the
>> > > > > > > > > > > > >> client to query either active or standby and
>> letting
>> > > > > standby
>> > > > > > > query
>> > > > > > > > > > > > response
>> > > > > > > > > > > > >> to include both the values + timestamp (or lag,
>> as
>> > in
>> > > > > diffs of
>> > > > > > > > > > > > timestamps)
>> > > > > > > > > > > > >> would actually be more intuitive -- not only the
>> > streams
>> > > > > > > client is
>> > > > > > > > > > > > simpler,
>> > > > > > > > > > > > >> from user's perspective they also do not need to
>> > > > > periodically
>> > > > > > > > > > refresh
>> > > > > > > > > > > > their
>> > > > > > > > > > > > >> staleness information from the client, but only
>> > need to
>> > > > > make
>> > > > > > > > > > decisions
>> > > > > > > > > > > > on
>> > > > > > > > > > > > >> the fly whenever they need to query.
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> Again the standby replica then need to know the
>> > current
>> > > > > active
>> > > > > > > > > > task's
>> > > > > > > > > > > > >> timestamp, which can be found from the log end
>> > record's
>> > > > > > > encoded
>> > > > > > > > > > > > timestamp;
>> > > > > > > > > > > > >> today we standby tasks do not read that specific
>> > record,
>> > > > > but
>> > > > > > > only
>> > > > > > > > > > > > refresh
>> > > > > > > > > > > > >> its knowledge on the log end offset, but I think
>> > > > > refreshing
>> > > > > > > the
>> > > > > > > > > > latest
>> > > > > > > > > > > > >> record timestamp is not a very bad request to add
>> > on the
>> > > > > > > standby
>> > > > > > > > > > > > replicas.
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> Guozhang
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar <
>> > > > > > > > > > vchan...@confluent.io
>> > > > > > > > > > > >
>> > > > > > > > > > > > >> wrote:
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >>> +1 As someone implementing a query routing
>> layer,
>> > there
>> > > > > is
>> > > > > > > already
>> > > > > > > > > > a
>> > > > > > > > > > > > need
>> > > > > > > > > > > > >>> to have mechanisms in place to do
>> > healthchecks/failure
>> > > > > > > detection to
>> > > > > > > > > > > > >> detect
>> > > > > > > > > > > > >>> failures for queries, while Streams rebalancing
>> > > > > eventually
>> > > > > > > kicks in
>> > > > > > > > > > > the
>> > > > > > > > > > > > >>> background.
>> > > > > > > > > > > > >>> So, pushing this complexity to the IQ client app
>> > keeps
>> > > > > > > Streams
>> > > > > > > > > > > simpler
>> > > > > > > > > > > > as
>> > > > > > > > > > > > >>> well. IQs will be potentially issues at an
>> order of
>> > > > > > > magnitude more
>> > > > > > > > > > > > >>> frequently and it can achieve good freshness for
>> > the
>> > > > lag
>> > > > > > > > > > information.
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>> I would like to add however, that we would also
>> > need to
>> > > > > > > introduce
>> > > > > > > > > > > apis
>> > > > > > > > > > > > in
>> > > > > > > > > > > > >>> KafkaStreams class, for obtaining lag
>> information
>> > for
>> > > > all
>> > > > > > > stores
>> > > > > > > > > > > local
>> > > > > > > > > > > > to
>> > > > > > > > > > > > >>> that host. This is for the IQs to relay back
>> with
>> > the
>> > > > > > > response/its
>> > > > > > > > > > > own
>> > > > > > > > > > > > >>> heartbeat mechanism.
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler <
>> > > > > > > j...@confluent.io>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>>> Hi all,
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> I've been mulling about this KIP, and I think I
>> > was on
>> > > > > the
>> > > > > > > wrong
>> > > > > > > > > > > track
>> > > > > > > > > > > > >>>> earlier with regard to task lags. Tl;dr: I
>> don't
>> > think
>> > > > > we
>> > > > > > > should
>> > > > > > > > > > add
>> > > > > > > > > > > > >>>> lags at all to the metadata API (and also not
>> to
>> > the
>> > > > > > > > > > AssignmentInfo
>> > > > > > > > > > > > >>>> protocol message).
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> Like I mentioned early on, reporting lag via
>> > > > > > > > > > > > >>>> SubscriptionInfo/AssignmentInfo would only work
>> > while
>> > > > > > > rebalances
>> > > > > > > > > > are
>> > > > > > > > > > > > >>>> happening. Once the group stabilizes, no
>> members
>> > would
>> > > > > be
>> > > > > > > notified
>> > > > > > > > > > > of
>> > > > > > > > > > > > >>>> each others' lags anymore. I had been thinking
>> > that
>> > > > the
>> > > > > > > solution
>> > > > > > > > > > > would
>> > > > > > > > > > > > >>>> be the heartbeat proposal I mentioned earlier,
>> but
>> > > > that
>> > > > > > > proposal
>> > > > > > > > > > > would
>> > > > > > > > > > > > >>>> have reported the heartbeats of the members
>> only
>> > to
>> > > > the
>> > > > > > > leader
>> > > > > > > > > > > member
>> > > > > > > > > > > > >>>> (the one who makes assignments). To be useful
>> in
>> > the
>> > > > > > > context of
>> > > > > > > > > > > _this_
>> > > > > > > > > > > > >>>> KIP, we would also have to report the lags in
>> the
>> > > > > heartbeat
>> > > > > > > > > > > responses
>> > > > > > > > > > > > >>>> to of _all_ members. This is a concern to be
>> > because
>> > > > now
>> > > > > > > _all_ the
>> > > > > > > > > > > > >>>> lags get reported to _all_ the members on
>> _every_
>> > > > > > > heartbeat... a
>> > > > > > > > > > lot
>> > > > > > > > > > > > >>>> of chatter.
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> Plus, the proposal for KIP-441 is only to
>> report
>> > the
>> > > > > lags
>> > > > > > > of each
>> > > > > > > > > > > > >>>> _task_. This is the sum of the lags of all the
>> > stores
>> > > > > in the
>> > > > > > > > > > tasks.
>> > > > > > > > > > > > >>>> But this would be insufficient for KIP-535. For
>> > this
>> > > > > kip,
>> > > > > > > we would
>> > > > > > > > > > > > >>>> want the lag specifically of the store we want
>> to
>> > > > > query. So
>> > > > > > > this
>> > > > > > > > > > > > >>>> means, we have to report the lags of all the
>> > stores of
>> > > > > all
>> > > > > > > the
>> > > > > > > > > > > members
>> > > > > > > > > > > > >>>> to every member... even more chatter!
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> The final nail in the coffin to me is that IQ
>> > clients
>> > > > > would
>> > > > > > > have
>> > > > > > > > > > to
>> > > > > > > > > > > > >>>> start refreshing their metadata quite
>> frequently
>> > to
>> > > > > stay up
>> > > > > > > to
>> > > > > > > > > > date
>> > > > > > > > > > > on
>> > > > > > > > > > > > >>>> the lags, which adds even more overhead to the
>> > system.
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> Consider a strawman alternative: we bring
>> KIP-535
>> > back
>> > > > > to
>> > > > > > > > > > extending
>> > > > > > > > > > > > >>>> the metadata API to tell the client the active
>> and
>> > > > > standby
>> > > > > > > > > > replicas
>> > > > > > > > > > > > >>>> for the key in question (not including and
>> > > > > "staleness/lag"
>> > > > > > > > > > > > >>>> restriction, just returning all the replicas).
>> > Then,
>> > > > the
>> > > > > > > client
>> > > > > > > > > > > picks
>> > > > > > > > > > > > >>>> a replica and sends the query. The server
>> returns
>> > the
>> > > > > > > current lag
>> > > > > > > > > > > > >>>> along with the response (maybe in an HTML
>> header
>> > or
>> > > > > > > something).
>> > > > > > > > > > > Then,
>> > > > > > > > > > > > >>>> the client keeps a map of its last observed
>> lags
>> > for
>> > > > > each
>> > > > > > > replica,
>> > > > > > > > > > > and
>> > > > > > > > > > > > >>>> uses this information to prefer fresher
>> replicas.
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> OR, if it wants only to query the active
>> replica,
>> > it
>> > > > > would
>> > > > > > > throw
>> > > > > > > > > > an
>> > > > > > > > > > > > >>>> error on any lag response greater than zero,
>> > refreshes
>> > > > > its
>> > > > > > > > > > metadata
>> > > > > > > > > > > by
>> > > > > > > > > > > > >>>> re-querying the metadata API, and tries again
>> > with the
>> > > > > > > current
>> > > > > > > > > > > active
>> > > > > > > > > > > > >>>> replica.
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> This way, the lag information will be super
>> fresh
>> > for
>> > > > > the
>> > > > > > > client,
>> > > > > > > > > > > and
>> > > > > > > > > > > > >>>> we keep the Metadata API /
>> > Assignment,Subscription /
>> > > > and
>> > > > > > > Heartbeat
>> > > > > > > > > > > as
>> > > > > > > > > > > > >>>> slim as possible.
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> Side note: I do think that some time soon,
>> we'll
>> > have
>> > > > to
>> > > > > > > add a
>> > > > > > > > > > > library
>> > > > > > > > > > > > >>>> for IQ server/clients. I think that this logic
>> > will
>> > > > > start
>> > > > > > > to get
>> > > > > > > > > > > > >>>> pretty complex.
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> I hope this thinking is reasonably clear!
>> > > > > > > > > > > > >>>> Thanks again,
>> > > > > > > > > > > > >>>> -John
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> Does that
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth
>> Chandar <
>> > > > > > > > > > > > vchan...@confluent.io
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>>> wrote:
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> Responding to the points raised by Matthias
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> 1. IIUC John intends to add (or we can do
>> this in
>> > > > this
>> > > > > > > KIP) lag
>> > > > > > > > > > > > >>>> information
>> > > > > > > > > > > > >>>>> to AssignmentInfo, which gets sent to every
>> > > > > participant.
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> 2. At-least I was under the assumption that it
>> > can be
>> > > > > > > called per
>> > > > > > > > > > > > >> query,
>> > > > > > > > > > > > >>>>> since the API docs don't seem to suggest
>> > otherwise.
>> > > > Do
>> > > > > you
>> > > > > > > see
>> > > > > > > > > > any
>> > > > > > > > > > > > >>>>> potential issues if we call this every query?
>> (we
>> > > > > should
>> > > > > > > > > > benchmark
>> > > > > > > > > > > > >> this
>> > > > > > > > > > > > >>>>> nonetheless)
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> 4. Agree. metadataForKey() implicitly would
>> > return
>> > > > the
>> > > > > > > active
>> > > > > > > > > > host
>> > > > > > > > > > > > >>>> metadata
>> > > > > > > > > > > > >>>>> (as it was before). We should also document
>> this
>> > in
>> > > > > that
>> > > > > > > APIs
>> > > > > > > > > > > > >> javadoc,
>> > > > > > > > > > > > >>>>> given we have another method(s) that returns
>> more
>> > > > host
>> > > > > > > metadata
>> > > > > > > > > > > now.
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> 5.  While I see the point, the app/caller has
>> to
>> > make
>> > > > > two
>> > > > > > > > > > different
>> > > > > > > > > > > > >>> APIs
>> > > > > > > > > > > > >>>>> calls to obtain active/standby and potentially
>> > do the
>> > > > > same
>> > > > > > > set of
>> > > > > > > > > > > > >>>> operation
>> > > > > > > > > > > > >>>>> to query the state. I personally still like a
>> > method
>> > > > > like
>> > > > > > > > > > > isActive()
>> > > > > > > > > > > > >>>>> better, but don't have strong opinions.
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> 9. If we do expose the lag information, could
>> we
>> > just
>> > > > > > > leave it
>> > > > > > > > > > upto
>> > > > > > > > > > > > >> to
>> > > > > > > > > > > > >>>> the
>> > > > > > > > > > > > >>>>> caller to decide whether it errors out or not
>> > and not
>> > > > > make
>> > > > > > > the
>> > > > > > > > > > > > >> decision
>> > > > > > > > > > > > >>>>> within Streams? i.e we don't need a new config
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> 14. +1 . If it's easier to do right away. We
>> > started
>> > > > > with
>> > > > > > > number
>> > > > > > > > > > of
>> > > > > > > > > > > > >>>>> records, following the lead from KIP-441
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
>> > > > > > > > > > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote:
>> > > > > > > > > > > > >>>>>
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>> Thanks, everyone for taking a look. Some very
>> > cool
>> > > > > ideas
>> > > > > > > have
>> > > > > > > > > > > flown
>> > > > > > > > > > > > >>> in.
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>>>> There was a follow-on idea I POCed to
>> > continuously
>> > > > > > > share lag
>> > > > > > > > > > > > >>>>>> information in the heartbeat protocol+1 that
>> > would
>> > > > be
>> > > > > > > great, I
>> > > > > > > > > > > will
>> > > > > > > > > > > > >>>> update
>> > > > > > > > > > > > >>>>>> the KIP assuming this work will finish soon
>> > > > > > > > > > > > >>>>>>>> I think that adding a new method to
>> > > > > > > StreamsMetadataState and
>> > > > > > > > > > > > >>>>>> deprecating the existing method isthe best
>> way
>> > to
>> > > > go;
>> > > > > we
>> > > > > > > just
>> > > > > > > > > > > can't
>> > > > > > > > > > > > >>>> change
>> > > > > > > > > > > > >>>>>> the return types of any existing methods.+1
>> on
>> > this,
>> > > > > we
>> > > > > > > will add
>> > > > > > > > > > > > >> new
>> > > > > > > > > > > > >>>>>> methods for users who would be interested in
>> > > > querying
>> > > > > > > back a
>> > > > > > > > > > list
>> > > > > > > > > > > > >> of
>> > > > > > > > > > > > >>>>>> possible options to query from and leave the
>> > current
>> > > > > > > function
>> > > > > > > > > > > > >>>>>> getStreamsMetadataForKey() untouched for
>> users
>> > who
>> > > > > want
>> > > > > > > absolute
>> > > > > > > > > > > > >>>>>> consistency.
>> > > > > > > > > > > > >>>>>>>> why not just always return all available
>> > metadata
>> > > > > > > (including
>> > > > > > > > > > > > >>>>>> active/standby or lag) and let the caller
>> > decide to
>> > > > > which
>> > > > > > > node
>> > > > > > > > > > > they
>> > > > > > > > > > > > >>>> want to
>> > > > > > > > > > > > >>>>>> route the query+1. I think this makes sense
>> as
>> > from
>> > > > a
>> > > > > user
>> > > > > > > > > > > > >> standpoint
>> > > > > > > > > > > > >>>> there
>> > > > > > > > > > > > >>>>>> is no difference b/w an active and a standby
>> if
>> > both
>> > > > > have
>> > > > > > > same
>> > > > > > > > > > > lag,
>> > > > > > > > > > > > >>>> Infact
>> > > > > > > > > > > > >>>>>> users would be able to use this API to reduce
>> > query
>> > > > > load
>> > > > > > > on
>> > > > > > > > > > > > >> actives,
>> > > > > > > > > > > > >>> so
>> > > > > > > > > > > > >>>>>> returning all available options along with
>> the
>> > > > current
>> > > > > > > lag in
>> > > > > > > > > > each
>> > > > > > > > > > > > >>>> would
>> > > > > > > > > > > > >>>>>> make sense and leave it to user how they want
>> > to use
>> > > > > this
>> > > > > > > data.
>> > > > > > > > > > > > >> This
>> > > > > > > > > > > > >>>> has
>> > > > > > > > > > > > >>>>>> another added advantage. If a user queries
>> any
>> > > > random
>> > > > > > > machine
>> > > > > > > > > > for
>> > > > > > > > > > > a
>> > > > > > > > > > > > >>>> key and
>> > > > > > > > > > > > >>>>>> that machine has a replica for the
>> > partition(where
>> > > > key
>> > > > > > > belongs)
>> > > > > > > > > > > > >> user
>> > > > > > > > > > > > >>>> might
>> > > > > > > > > > > > >>>>>> choose to serve the data from there
>> itself(if it
>> > > > > doesn’t
>> > > > > > > lag
>> > > > > > > > > > much)
>> > > > > > > > > > > > >>>> rather
>> > > > > > > > > > > > >>>>>> than finding the active and making an IQ to
>> > that.
>> > > > This
>> > > > > > > would
>> > > > > > > > > > save
>> > > > > > > > > > > > >>> some
>> > > > > > > > > > > > >>>>>> critical time in serving for some
>> applications.
>> > > > > > > > > > > > >>>>>>>> Adding the lag in terms of timestamp diff
>> > > > comparing
>> > > > > the
>> > > > > > > > > > > > >> committed
>> > > > > > > > > > > > >>>>>> offset.+1 on this, I think it’s more
>> readable.
>> > But
>> > > > as
>> > > > > > > John said
>> > > > > > > > > > > the
>> > > > > > > > > > > > >>>>>> function allMetadataForKey() is just
>> returning
>> > the
>> > > > > > > possible
>> > > > > > > > > > > options
>> > > > > > > > > > > > >>>> from
>> > > > > > > > > > > > >>>>>> where users can query a key, so we can even
>> > drop the
>> > > > > > > parameter
>> > > > > > > > > > > > >>>>>> enableReplicaServing/tolerableDataStaleness
>> and
>> > just
>> > > > > > > return all
>> > > > > > > > > > > the
>> > > > > > > > > > > > >>>>>> streamsMetadata containing that key along
>> with
>> > the
>> > > > > offset
>> > > > > > > limit.
>> > > > > > > > > > > > >>>>>> Answering the questions posted by Matthias in
>> > > > > sequence.
>> > > > > > > > > > > > >>>>>> 1. @John can you please comment on this
>> one.2.
>> > Yeah
>> > > > > the
>> > > > > > > usage
>> > > > > > > > > > > > >> pattern
>> > > > > > > > > > > > >>>>>> would include querying this prior to every
>> > request
>> > > > 3.
>> > > > > > > Will add
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>> changes
>> > > > > > > > > > > > >>>>>> to StreamsMetadata in the KIP, would include
>> > changes
>> > > > > in
>> > > > > > > > > > > > >>>> rebuildMetadata()
>> > > > > > > > > > > > >>>>>> etc.4. Makes sense, already addressed above5.
>> > Is it
>> > > > > > > important
>> > > > > > > > > > from
>> > > > > > > > > > > > >> a
>> > > > > > > > > > > > >>>> user
>> > > > > > > > > > > > >>>>>> perspective if they are querying an
>> > > > > active(processing),
>> > > > > > > > > > > > >>>> active(restoring),
>> > > > > > > > > > > > >>>>>> a standby task if we have away of denoting
>> lag
>> > in a
>> > > > > > > readable
>> > > > > > > > > > > manner
>> > > > > > > > > > > > >>>> which
>> > > > > > > > > > > > >>>>>> kind of signifies the user that this is the
>> best
>> > > > node
>> > > > > to
>> > > > > > > query
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>> fresh
>> > > > > > > > > > > > >>>>>> data.6. Yes, I intend to return the actives
>> and
>> > > > > replicas
>> > > > > > > in the
>> > > > > > > > > > > > >> same
>> > > > > > > > > > > > >>>> return
>> > > > > > > > > > > > >>>>>> list in allMetadataForKey()7. tricky8. yes,
>> we
>> > need
>> > > > > new
>> > > > > > > > > > functions
>> > > > > > > > > > > > >> to
>> > > > > > > > > > > > >>>> return
>> > > > > > > > > > > > >>>>>> activeRestoring and standbyRunning tasks.9.
>> > > > > StreamsConfig
>> > > > > > > > > > doesn’t
>> > > > > > > > > > > > >>> look
>> > > > > > > > > > > > >>>> like
>> > > > > > > > > > > > >>>>>> of much use to me since we are giving all
>> > possible
>> > > > > > > options via
>> > > > > > > > > > > this
>> > > > > > > > > > > > >>>>>> function, or they can use existing function
>> > > > > > > > > > > > >>> getStreamsMetadataForKey()
>> > > > > > > > > > > > >>>> and
>> > > > > > > > > > > > >>>>>> get just the active10. I think treat them
>> both
>> > the
>> > > > > same
>> > > > > > > and let
>> > > > > > > > > > > the
>> > > > > > > > > > > > >>>> lag do
>> > > > > > > > > > > > >>>>>> the talking11. We are just sending them the
>> > option
>> > > > to
>> > > > > > > query from
>> > > > > > > > > > > in
>> > > > > > > > > > > > >>>>>> allMetadataForKey(), which doesn’t include
>> any
>> > > > > handle. We
>> > > > > > > then
>> > > > > > > > > > > > >> query
>> > > > > > > > > > > > >>>> that
>> > > > > > > > > > > > >>>>>> machine for the key where it calls
>> allStores()
>> > and
>> > > > > tries
>> > > > > > > to find
>> > > > > > > > > > > > >> the
>> > > > > > > > > > > > >>>> task
>> > > > > > > > > > > > >>>>>> in
>> activeRunning/activeRestoring/standbyRunning
>> > and
>> > > > > adds
>> > > > > > > the
>> > > > > > > > > > store
>> > > > > > > > > > > > >>>> handle
>> > > > > > > > > > > > >>>>>> here. 12. Need to verify, but during the
>> exact
>> > point
>> > > > > when
>> > > > > > > store
>> > > > > > > > > > is
>> > > > > > > > > > > > >>>> closed
>> > > > > > > > > > > > >>>>>> to transition it from restoring to running
>> the
>> > > > queries
>> > > > > > > will
>> > > > > > > > > > fail.
>> > > > > > > > > > > > >> The
>> > > > > > > > > > > > >>>>>> caller in such case can have their own
>> > configurable
>> > > > > > > retries to
>> > > > > > > > > > > > >> check
>> > > > > > > > > > > > >>>> again
>> > > > > > > > > > > > >>>>>> or try the replica if a call fails to
>> active13.
>> > I
>> > > > > think
>> > > > > > > KIP-216
>> > > > > > > > > > is
>> > > > > > > > > > > > >>>> working
>> > > > > > > > > > > > >>>>>> on those lines, we might not need few of
>> those
>> > > > > exceptions
>> > > > > > > since
>> > > > > > > > > > > now
>> > > > > > > > > > > > >>> the
>> > > > > > > > > > > > >>>>>> basic idea of this KIP is to support IQ
>> during
>> > > > > > > rebalancing.14.
>> > > > > > > > > > > > >>>> Addressed
>> > > > > > > > > > > > >>>>>> above, agreed it looks more readable.
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>>    On Tuesday, 22 October, 2019, 08:39:07 pm
>> > IST,
>> > > > > > > Matthias J.
>> > > > > > > > > > Sax
>> > > > > > > > > > > > >> <
>> > > > > > > > > > > > >>>>>> matth...@confluent.io> wrote:
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>>  One more thought:
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>> 14) Is specifying the allowed lag in number
>> of
>> > > > > records a
>> > > > > > > useful
>> > > > > > > > > > > way
>> > > > > > > > > > > > >>> for
>> > > > > > > > > > > > >>>>>> users to declare how stale an instance is
>> > allowed to
>> > > > > be?
>> > > > > > > Would
>> > > > > > > > > > it
>> > > > > > > > > > > > >> be
>> > > > > > > > > > > > >>>>>> more intuitive for users to specify the
>> allowed
>> > lag
>> > > > in
>> > > > > > > time
>> > > > > > > > > > units
>> > > > > > > > > > > > >>>> (would
>> > > > > > > > > > > > >>>>>> event time or processing time be better)? It
>> > seems
>> > > > > hard
>> > > > > > > for
>> > > > > > > > > > users
>> > > > > > > > > > > > >> to
>> > > > > > > > > > > > >>>>>> reason how "fresh" a store really is when
>> > number of
>> > > > > > > records is
>> > > > > > > > > > > > >> used.
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>> -Matthias
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote:
>> > > > > > > > > > > > >>>>>>> Some more follow up thoughts:
>> > > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > > >>>>>>> 11) If we get a store handle of an
>> > > > active(restoring)
>> > > > > > > task, and
>> > > > > > > > > > > > >> the
>> > > > > > > > > > > > >>>> task
>> > > > > > > > > > > > >>>>>>> transits to running, does the store handle
>> > become
>> > > > > > > invalid and a
>> > > > > > > > > > > > >> new
>> > > > > > > > > > > > >>>> one
>> > > > > > > > > > > > >>>>>>> must be retrieved? Or can we "switch it out"
>> > > > > underneath
>> > > > > > > -- for
>> > > > > > > > > > > > >> this
>> > > > > > > > > > > > >>>>>>> case, how does the user know when they
>> start to
>> > > > > query the
>> > > > > > > > > > > > >>> up-to-date
>> > > > > > > > > > > > >>>>>> state?
>> > > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > > >>>>>>> 12) Standby tasks will have the store open
>> in
>> > > > regular
>> > > > > > > mode,
>> > > > > > > > > > while
>> > > > > > > > > > > > >>>>>>> active(restoring) tasks open stores in
>> "upgrade
>> > > > mode"
>> > > > > > > for more
>> > > > > > > > > > > > >>>> efficient
>> > > > > > > > > > > > >>>>>>> bulk loading. When we switch the store into
>> > active
>> > > > > mode,
>> > > > > > > we
>> > > > > > > > > > close
>> > > > > > > > > > > > >>> it
>> > > > > > > > > > > > >>>> and
>> > > > > > > > > > > > >>>>>>> reopen it. What is the impact if we query
>> the
>> > store
>> > > > > > > during
>> > > > > > > > > > > > >> restore?
>> > > > > > > > > > > > >>>> What
>> > > > > > > > > > > > >>>>>>> is the impact if we close the store to
>> transit
>> > to
>> > > > > > > running (eg,
>> > > > > > > > > > > > >>> there
>> > > > > > > > > > > > >>>>>>> might be open iterators)?
>> > > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > > >>>>>>> 13) Do we need to introduced new exception
>> > types?
>> > > > > Compare
>> > > > > > > > > > KIP-216
>> > > > > > > > > > > > >>>>>>> (
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
>> > > > > > > > > > > > >>>>>> )
>> > > > > > > > > > > > >>>>>>> that aims to improve the user experience
>> with
>> > > > regard
>> > > > > to
>> > > > > > > IQ
>> > > > > > > > > > > > >>>> exceptions.
>> > > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > > >>>>>>> -Matthias
>> > > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > > >>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote:
>> > > > > > > > > > > > >>>>>>>> Thanks for the KIP.
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> Couple of comments:
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 1) With regard to KIP-441, my current
>> > > > understanding
>> > > > > is
>> > > > > > > that
>> > > > > > > > > > the
>> > > > > > > > > > > > >>> lag
>> > > > > > > > > > > > >>>>>>>> information is only reported to the leader
>> > (please
>> > > > > > > correct me
>> > > > > > > > > > > > >> if I
>> > > > > > > > > > > > >>>> am
>> > > > > > > > > > > > >>>>>>>> wrong). This seems to be quite a
>> limitation to
>> > > > > actually
>> > > > > > > use
>> > > > > > > > > > the
>> > > > > > > > > > > > >>> lag
>> > > > > > > > > > > > >>>>>>>> information.
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 2) The idea of the metadata API is actually
>> > to get
>> > > > > > > metadata
>> > > > > > > > > > once
>> > > > > > > > > > > > >>> and
>> > > > > > > > > > > > >>>>>>>> only refresh the metadata if a store was
>> > migrated.
>> > > > > The
>> > > > > > > current
>> > > > > > > > > > > > >>>> proposal
>> > > > > > > > > > > > >>>>>>>> would require to get the metadata before
>> each
>> > > > query.
>> > > > > > > The KIP
>> > > > > > > > > > > > >>> should
>> > > > > > > > > > > > >>>>>>>> describe the usage pattern and impact in
>> more
>> > > > > detail.
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 3) Currently, the KIP does not list the
>> > public API
>> > > > > > > changes in
>> > > > > > > > > > > > >>>> detail.
>> > > > > > > > > > > > >>>>>>>> Please list all methods you intend to
>> > deprecate
>> > > > and
>> > > > > > > list all
>> > > > > > > > > > > > >>>> methods you
>> > > > > > > > > > > > >>>>>>>> intend to add (best, using a code-block
>> > markup --
>> > > > > > > compare
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
>> > > > > > > > > > > > >>>>>>>> as an example)
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 4) Also note (as already pointed out by
>> John),
>> > > > that
>> > > > > we
>> > > > > > > cannot
>> > > > > > > > > > > > >> have
>> > > > > > > > > > > > >>>> any
>> > > > > > > > > > > > >>>>>>>> breaking API changes. Thus, the API should
>> be
>> > > > > designed
>> > > > > > > in a
>> > > > > > > > > > > > >> fully
>> > > > > > > > > > > > >>>>>>>> backward compatible manner.
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 5) Returning a list of metadata object
>> makes
>> > it
>> > > > hard
>> > > > > > > for user
>> > > > > > > > > > to
>> > > > > > > > > > > > >>>> know if
>> > > > > > > > > > > > >>>>>>>> the first object refers to the
>> > active(processing),
>> > > > > > > > > > > > >>>> active(restoring), or
>> > > > > > > > > > > > >>>>>>>> a standby task. IMHO, we should be more
>> > explicit.
>> > > > > For
>> > > > > > > > > > example, a
>> > > > > > > > > > > > >>>>>>>> metadata object could have a flag that one
>> can
>> > > > test
>> > > > > via
>> > > > > > > > > > > > >>>> `#isActive()`.
>> > > > > > > > > > > > >>>>>>>> Or maybe even better, we could keep the
>> > current
>> > > > API
>> > > > > > > as-is and
>> > > > > > > > > > > > >> add
>> > > > > > > > > > > > >>>>>>>> something like `standbyMetadataForKey()`
>> (and
>> > > > > similar
>> > > > > > > methods
>> > > > > > > > > > > > >> for
>> > > > > > > > > > > > >>>>>>>> other). Having just a flag `isActive()` is
>> a
>> > > > little
>> > > > > > > subtle and
>> > > > > > > > > > > > >>>> having
>> > > > > > > > > > > > >>>>>>>> new overloads would make the API much
>> clearer
>> > > > > (passing
>> > > > > > > in a
>> > > > > > > > > > > > >>> boolean
>> > > > > > > > > > > > >>>> flag
>> > > > > > > > > > > > >>>>>>>> does not seem to be a nice API).
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 6) Do you intent to return all standby
>> > metadata
>> > > > > > > information at
>> > > > > > > > > > > > >>> once,
>> > > > > > > > > > > > >>>>>>>> similar to `allMetadata()` -- seems to be
>> > useful.
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 7) Even if the lag information is
>> propagated
>> > to
>> > > > all
>> > > > > > > instances,
>> > > > > > > > > > > > >> it
>> > > > > > > > > > > > >>>> will
>> > > > > > > > > > > > >>>>>>>> happen in an async manner. Hence, I am
>> > wondering
>> > > > if
>> > > > > we
>> > > > > > > should
>> > > > > > > > > > > > >>>> address
>> > > > > > > > > > > > >>>>>>>> this race condition (I think we should).
>> The
>> > idea
>> > > > > would
>> > > > > > > be to
>> > > > > > > > > > > > >>> check
>> > > > > > > > > > > > >>>> if a
>> > > > > > > > > > > > >>>>>>>> standby/active(restoring) task is actually
>> > still
>> > > > > within
>> > > > > > > the
>> > > > > > > > > > lag
>> > > > > > > > > > > > >>>> bounds
>> > > > > > > > > > > > >>>>>>>> when a query is executed and we would
>> throw an
>> > > > > > > exception if
>> > > > > > > > > > not.
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 8) The current `KafkaStreams#state()`
>> method
>> > only
>> > > > > > > returns a
>> > > > > > > > > > > > >> handle
>> > > > > > > > > > > > >>>> to
>> > > > > > > > > > > > >>>>>>>> stores of active(processing) tasks. How
>> can a
>> > user
>> > > > > > > actually
>> > > > > > > > > > get
>> > > > > > > > > > > > >> a
>> > > > > > > > > > > > >>>> handle
>> > > > > > > > > > > > >>>>>>>> to an store of an active(restoring) or
>> standby
>> > > > task
>> > > > > for
>> > > > > > > > > > > > >> querying?
>> > > > > > > > > > > > >>>> Seems
>> > > > > > > > > > > > >>>>>>>> we should add a new method to get standby
>> > handles?
>> > > > > > > Changing
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>>>>>> semantics to existing `state()` would be
>> > possible,
>> > > > > but
>> > > > > > > I think
>> > > > > > > > > > > > >>>> adding a
>> > > > > > > > > > > > >>>>>>>> new method is preferable?
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 9) How does the user actually specify the
>> > > > acceptable
>> > > > > > > lag? A
>> > > > > > > > > > > > >> global
>> > > > > > > > > > > > >>>>>>>> config via StreamsConfig (this would be a
>> > public
>> > > > API
>> > > > > > > change
>> > > > > > > > > > that
>> > > > > > > > > > > > >>>> needs
>> > > > > > > > > > > > >>>>>>>> to be covered in the KIP)? Or on a
>> per-store
>> > or
>> > > > even
>> > > > > > > per-query
>> > > > > > > > > > > > >>>> basis for
>> > > > > > > > > > > > >>>>>>>> more flexibility? We could also have a
>> global
>> > > > > setting
>> > > > > > > that is
>> > > > > > > > > > > > >> used
>> > > > > > > > > > > > >>>> as
>> > > > > > > > > > > > >>>>>>>> default and allow to overwrite it on a
>> > per-query
>> > > > > basis.
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> 10) Do we need to distinguish between
>> > > > > active(restoring)
>> > > > > > > and
>> > > > > > > > > > > > >>> standby
>> > > > > > > > > > > > >>>>>>>> tasks? Or could be treat both as the same?
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> -Matthias
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
>> > > > > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting
>> > "acceptable
>> > > > > lag"
>> > > > > > > into
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>>>>>>> configuration at all, or even making it a
>> > > > > parameter on
>> > > > > > > > > > > > >>>>>> `allMetadataForKey`,
>> > > > > > > > > > > > >>>>>>>>> why not just _always_ return all available
>> > > > metadata
>> > > > > > > > > > (including
>> > > > > > > > > > > > >>>>>>>>> active/standby or lag) and let the caller
>> > decide
>> > > > to
>> > > > > > > which
>> > > > > > > > > > node
>> > > > > > > > > > > > >>> they
>> > > > > > > > > > > > >>>>>> want to
>> > > > > > > > > > > > >>>>>>>>> route the query?
>> > > > > > > > > > > > >>>>>>>>> +1 on exposing lag information via the
>> APIs.
>> > IMO
>> > > > > > > without
>> > > > > > > > > > having
>> > > > > > > > > > > > >>>>>>>>> continuously updated/fresh lag
>> information,
>> > its
>> > > > > true
>> > > > > > > value
>> > > > > > > > > > as a
>> > > > > > > > > > > > >>>> signal
>> > > > > > > > > > > > >>>>>> for
>> > > > > > > > > > > > >>>>>>>>> query routing decisions is much limited.
>> But
>> > we
>> > > > can
>> > > > > > > design
>> > > > > > > > > > the
>> > > > > > > > > > > > >>> API
>> > > > > > > > > > > > >>>>>> around
>> > > > > > > > > > > > >>>>>>>>> this model and iterate? Longer term, we
>> > should
>> > > > have
>> > > > > > > > > > > > >> continuously
>> > > > > > > > > > > > >>>>>> shared lag
>> > > > > > > > > > > > >>>>>>>>> information.
>> > > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> more general to refactor it to
>> > > > > > > "allMetadataForKey(long
>> > > > > > > > > > > > >>>>>>>>> tolerableDataStaleness, ...)", and when
>> it's
>> > set
>> > > > > to 0
>> > > > > > > it
>> > > > > > > > > > means
>> > > > > > > > > > > > >>>> "active
>> > > > > > > > > > > > >>>>>> task
>> > > > > > > > > > > > >>>>>>>>> only".
>> > > > > > > > > > > > >>>>>>>>> +1 IMO if we plan on having
>> > > > > `enableReplicaServing`, it
>> > > > > > > makes
>> > > > > > > > > > > > >>> sense
>> > > > > > > > > > > > >>>> to
>> > > > > > > > > > > > >>>>>>>>> generalize based on dataStaleness. This
>> seems
>> > > > > > > complementary
>> > > > > > > > > > to
>> > > > > > > > > > > > >>>>>> exposing the
>> > > > > > > > > > > > >>>>>>>>> lag information itself.
>> > > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> This is actually not a public api
>> change at
>> > > > all,
>> > > > > and
>> > > > > > > I'm
>> > > > > > > > > > > > >>>> planning to
>> > > > > > > > > > > > >>>>>>>>> implement it asap as a precursor to the
>> rest
>> > of
>> > > > > KIP-441
>> > > > > > > > > > > > >>>>>>>>> +1 again. Do we have a concrete timeline
>> for
>> > when
>> > > > > this
>> > > > > > > change
>> > > > > > > > > > > > >>> will
>> > > > > > > > > > > > >>>>>> land on
>> > > > > > > > > > > > >>>>>>>>> master? I would like to get the
>> > implementation
>> > > > > wrapped
>> > > > > > > up (as
>> > > > > > > > > > > > >>> much
>> > > > > > > > > > > > >>>> as
>> > > > > > > > > > > > >>>>>>>>> possible) by end of the month. :). But I
>> > agree
>> > > > this
>> > > > > > > > > > sequencing
>> > > > > > > > > > > > >>>> makes
>> > > > > > > > > > > > >>>>>>>>> sense..
>> > > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang
>> > Wang <
>> > > > > > > > > > > > >>> wangg...@gmail.com>
>> > > > > > > > > > > > >>>>>> wrote:
>> > > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>> Hi Navinder,
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>> Thanks for the KIP, I have a high level
>> > question
>> > > > > > > about the
>> > > > > > > > > > > > >>>> proposed
>> > > > > > > > > > > > >>>>>> API
>> > > > > > > > > > > > >>>>>>>>>> regarding:
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>
>> > "StreamsMetadataState::allMetadataForKey(boolean
>> > > > > > > > > > > > >>>>>> enableReplicaServing...)"
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>> I'm wondering if it's more general to
>> > refactor
>> > > > it
>> > > > > to
>> > > > > > > > > > > > >>>>>>>>>> "allMetadataForKey(long
>> > tolerableDataStaleness,
>> > > > > > > ...)", and
>> > > > > > > > > > > > >> when
>> > > > > > > > > > > > >>>> it's
>> > > > > > > > > > > > >>>>>> set to
>> > > > > > > > > > > > >>>>>>>>>> 0 it means "active task only". Behind the
>> > scene,
>> > > > > we
>> > > > > > > can have
>> > > > > > > > > > > > >> the
>> > > > > > > > > > > > >>>>>> committed
>> > > > > > > > > > > > >>>>>>>>>> offsets to encode the stream time as
>> well,
>> > so
>> > > > that
>> > > > > > > when
>> > > > > > > > > > > > >>> processing
>> > > > > > > > > > > > >>>>>> standby
>> > > > > > > > > > > > >>>>>>>>>> tasks the stream process knows not long
>> the
>> > lag
>> > > > in
>> > > > > > > terms of
>> > > > > > > > > > > > >>>> offsets
>> > > > > > > > > > > > >>>>>>>>>> comparing to the committed offset
>> > (internally we
>> > > > > call
>> > > > > > > it
>> > > > > > > > > > > > >> offset
>> > > > > > > > > > > > >>>>>> limit), but
>> > > > > > > > > > > > >>>>>>>>>> also the lag in terms of timestamp diff
>> > > > comparing
>> > > > > the
>> > > > > > > > > > > > >> committed
>> > > > > > > > > > > > >>>>>> offset.
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>> Also encoding the timestamp as part of
>> > offset
>> > > > have
>> > > > > > > other
>> > > > > > > > > > > > >>> benefits
>> > > > > > > > > > > > >>>> for
>> > > > > > > > > > > > >>>>>>>>>> improving Kafka Streams time semantics as
>> > well,
>> > > > > but
>> > > > > > > for
>> > > > > > > > > > > > >> KIP-535
>> > > > > > > > > > > > >>>>>> itself I
>> > > > > > > > > > > > >>>>>>>>>> think it can help giving users a more
>> > intuitive
>> > > > > > > interface to
>> > > > > > > > > > > > >>>> reason
>> > > > > > > > > > > > >>>>>> about.
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>> Guozhang
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John
>> > Roesler <
>> > > > > > > > > > > > >>> j...@confluent.io>
>> > > > > > > > > > > > >>>>>> wrote:
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> Hey Navinder,
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> Thanks for the KIP! I've been reading
>> over
>> > the
>> > > > > > > discussion
>> > > > > > > > > > > > >> thus
>> > > > > > > > > > > > >>>> far,
>> > > > > > > > > > > > >>>>>>>>>>> and I have a couple of thoughts to pile
>> on
>> > as
>> > > > > well:
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> It seems confusing to propose the API in
>> > terms
>> > > > > of the
>> > > > > > > > > > current
>> > > > > > > > > > > > >>>> system
>> > > > > > > > > > > > >>>>>>>>>>> state, but also propose how the API
>> would
>> > look
>> > > > > > > if/when
>> > > > > > > > > > > > >> KIP-441
>> > > > > > > > > > > > >>> is
>> > > > > > > > > > > > >>>>>>>>>>> implemented. It occurs to me that the
>> only
>> > part
>> > > > > of
>> > > > > > > KIP-441
>> > > > > > > > > > > > >> that
>> > > > > > > > > > > > >>>> would
>> > > > > > > > > > > > >>>>>>>>>>> affect you is the availability of the
>> lag
>> > > > > > > information in
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>>>>>>>>> SubscriptionInfo message. This is
>> actually
>> > not
>> > > > a
>> > > > > > > public api
>> > > > > > > > > > > > >>>> change at
>> > > > > > > > > > > > >>>>>>>>>>> all, and I'm planning to implement it
>> asap
>> > as a
>> > > > > > > precursor
>> > > > > > > > > > to
>> > > > > > > > > > > > >>> the
>> > > > > > > > > > > > >>>> rest
>> > > > > > > > > > > > >>>>>>>>>>> of KIP-441, so maybe you can just build
>> on
>> > top
>> > > > of
>> > > > > > > KIP-441
>> > > > > > > > > > and
>> > > > > > > > > > > > >>>> assume
>> > > > > > > > > > > > >>>>>>>>>>> the lag information will be available.
>> > Then you
>> > > > > > > could have
>> > > > > > > > > > a
>> > > > > > > > > > > > >>> more
>> > > > > > > > > > > > >>>>>>>>>>> straightforward proposal (e.g., mention
>> > that
>> > > > > you'd
>> > > > > > > return
>> > > > > > > > > > the
>> > > > > > > > > > > > >>> lag
>> > > > > > > > > > > > >>>>>>>>>>> information in AssignmentInfo as well as
>> > in the
>> > > > > > > > > > > > >> StreamsMetadata
>> > > > > > > > > > > > >>>> in
>> > > > > > > > > > > > >>>>>>>>>>> some form, or make use of it in the API
>> > > > somehow).
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> I'm partially motivated in that former
>> > point
>> > > > > because
>> > > > > > > it
>> > > > > > > > > > seems
>> > > > > > > > > > > > >>>> like
>> > > > > > > > > > > > >>>>>>>>>>> understanding how callers would bound
>> the
>> > > > > staleness
>> > > > > > > for
>> > > > > > > > > > their
>> > > > > > > > > > > > >>> use
>> > > > > > > > > > > > >>>>>> case
>> > > > > > > > > > > > >>>>>>>>>>> is _the_ key point for this KIP. FWIW, I
>> > think
>> > > > > that
>> > > > > > > adding
>> > > > > > > > > > a
>> > > > > > > > > > > > >>> new
>> > > > > > > > > > > > >>>>>>>>>>> method to StreamsMetadataState and
>> > deprecating
>> > > > > the
>> > > > > > > existing
>> > > > > > > > > > > > >>>> method is
>> > > > > > > > > > > > >>>>>>>>>>> the best way to go; we just can't change
>> > the
>> > > > > return
>> > > > > > > types
>> > > > > > > > > > of
>> > > > > > > > > > > > >>> any
>> > > > > > > > > > > > >>>>>>>>>>> existing methods.
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting
>> > "acceptable
>> > > > > lag"
>> > > > > > > into
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>>>>>>>>> configuration at all, or even making it
>> a
>> > > > > parameter
>> > > > > > > on
>> > > > > > > > > > > > >>>>>>>>>>> `allMetadataForKey`, why not just
>> _always_
>> > > > > return all
>> > > > > > > > > > > > >> available
>> > > > > > > > > > > > >>>>>>>>>>> metadata (including active/standby or
>> lag)
>> > and
>> > > > > let
>> > > > > > > the
>> > > > > > > > > > caller
>> > > > > > > > > > > > >>>> decide
>> > > > > > > > > > > > >>>>>>>>>>> to which node they want to route the
>> query?
>> > > > This
>> > > > > > > method
>> > > > > > > > > > isn't
>> > > > > > > > > > > > >>>> making
>> > > > > > > > > > > > >>>>>>>>>>> any queries itself; it's merely telling
>> you
>> > > > where
>> > > > > > > the local
>> > > > > > > > > > > > >>>> Streams
>> > > > > > > > > > > > >>>>>>>>>>> instance _thinks_ the key in question is
>> > > > located.
>> > > > > > > Just
>> > > > > > > > > > > > >>> returning
>> > > > > > > > > > > > >>>> all
>> > > > > > > > > > > > >>>>>>>>>>> available information lets the caller
>> > implement
>> > > > > any
>> > > > > > > > > > semantics
>> > > > > > > > > > > > >>>> they
>> > > > > > > > > > > > >>>>>>>>>>> desire around querying only active
>> stores,
>> > or
>> > > > > > > standbys, or
>> > > > > > > > > > > > >>>> recovering
>> > > > > > > > > > > > >>>>>>>>>>> stores, or whatever.
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> One fly in the ointment, which you may
>> > wish to
>> > > > > > > consider if
>> > > > > > > > > > > > >>>> proposing
>> > > > > > > > > > > > >>>>>>>>>>> to use lag information, is that the
>> cluster
>> > > > would
>> > > > > > > only
>> > > > > > > > > > become
>> > > > > > > > > > > > >>>> aware
>> > > > > > > > > > > > >>>>>> of
>> > > > > > > > > > > > >>>>>>>>>>> new lag information during rebalances.
>> > Even in
>> > > > > the
>> > > > > > > full
>> > > > > > > > > > > > >>>> expression of
>> > > > > > > > > > > > >>>>>>>>>>> KIP-441, this information would stop
>> being
>> > > > > > > propagated when
>> > > > > > > > > > > > >> the
>> > > > > > > > > > > > >>>>>> cluster
>> > > > > > > > > > > > >>>>>>>>>>> achieves a balanced task distribution.
>> > There
>> > > > was
>> > > > > a
>> > > > > > > > > > follow-on
>> > > > > > > > > > > > >>>> idea I
>> > > > > > > > > > > > >>>>>>>>>>> POCed to continuously share lag
>> > information in
>> > > > > the
>> > > > > > > > > > heartbeat
>> > > > > > > > > > > > >>>>>> protocol,
>> > > > > > > > > > > > >>>>>>>>>>> which you might be interested in, if you
>> > want
>> > > > to
>> > > > > > > make sure
>> > > > > > > > > > > > >> that
>> > > > > > > > > > > > >>>> nodes
>> > > > > > > > > > > > >>>>>>>>>>> are basically _always_ aware of each
>> > others'
>> > > > lag
>> > > > > on
>> > > > > > > > > > different
>> > > > > > > > > > > > >>>>>>>>>>> partitions:
>> > > > > > > https://github.com/apache/kafka/pull/7096
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> Thanks again!
>> > > > > > > > > > > > >>>>>>>>>>> -John
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder
>> > Brar
>> > > > > > > > > > > > >>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
>> wrote:
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> Thanks, Vinoth. Looks like we are on
>> the
>> > same
>> > > > > page.
>> > > > > > > I will
>> > > > > > > > > > > > >> add
>> > > > > > > > > > > > >>>> some
>> > > > > > > > > > > > >>>>>> of
>> > > > > > > > > > > > >>>>>>>>>>> these explanations to the KIP as well.
>> Have
>> > > > > assigned
>> > > > > > > the
>> > > > > > > > > > > > >>>> KAFKA-6144
>> > > > > > > > > > > > >>>>>> to
>> > > > > > > > > > > > >>>>>>>>>>> myself and KAFKA-8994 is closed(by
>> you). As
>> > > > > > > suggested, we
>> > > > > > > > > > > > >> will
>> > > > > > > > > > > > >>>>>> replace
>> > > > > > > > > > > > >>>>>>>>>>> "replica" with "standby".
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> In the new API,
>> > > > > > > > > > > > >>> "StreamsMetadataState::allMetadataForKey(boolean
>> > > > > > > > > > > > >>>>>>>>>>> enableReplicaServing, String storeName,
>> K
>> > key,
>> > > > > > > > > > Serializer<K>
>> > > > > > > > > > > > >>>>>>>>>>> keySerializer)" Do we really need a per
>> key
>> > > > > > > configuration?
>> > > > > > > > > > > > >> or a
>> > > > > > > > > > > > >>>> new
>> > > > > > > > > > > > >>>>>>>>>>> StreamsConfig is good enough?>> Coming
>> from
>> > > > > > > experience,
>> > > > > > > > > > when
>> > > > > > > > > > > > >>>> teams
>> > > > > > > > > > > > >>>>>> are
>> > > > > > > > > > > > >>>>>>>>>>> building a platform with Kafka Streams
>> and
>> > > > these
>> > > > > > > API's
>> > > > > > > > > > serve
>> > > > > > > > > > > > >>>> data to
>> > > > > > > > > > > > >>>>>>>>>>> multiple teams, we can't have a
>> generalized
>> > > > > config
>> > > > > > > that
>> > > > > > > > > > says
>> > > > > > > > > > > > >>> as a
>> > > > > > > > > > > > >>>>>>>>>> platform
>> > > > > > > > > > > > >>>>>>>>>>> we will support stale reads or not. It
>> > should
>> > > > be
>> > > > > the
>> > > > > > > choice
>> > > > > > > > > > > > >> of
>> > > > > > > > > > > > >>>>>> someone
>> > > > > > > > > > > > >>>>>>>>>> who
>> > > > > > > > > > > > >>>>>>>>>>> is calling the API's to choose whether
>> > they are
>> > > > > ok
>> > > > > > > with
>> > > > > > > > > > stale
>> > > > > > > > > > > > >>>> reads
>> > > > > > > > > > > > >>>>>> or
>> > > > > > > > > > > > >>>>>>>>>> not.
>> > > > > > > > > > > > >>>>>>>>>>> Makes sense?
>> > > > > > > > > > > > >>>>>>>>>>>>    On Thursday, 17 October, 2019,
>> > 11:56:02 pm
>> > > > > IST,
>> > > > > > > Vinoth
>> > > > > > > > > > > > >>>> Chandar <
>> > > > > > > > > > > > >>>>>>>>>>> vchan...@confluent.io> wrote:
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>  Looks like we are covering ground :)
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>> Only if it is within a permissible
>> > > > range(say
>> > > > > > > 10000) we
>> > > > > > > > > > > > >> will
>> > > > > > > > > > > > >>>> serve
>> > > > > > > > > > > > >>>>>>>>>> from
>> > > > > > > > > > > > >>>>>>>>>>>> Restoring state of active.
>> > > > > > > > > > > > >>>>>>>>>>>> +1 on having a knob like this.. My
>> > reasoning
>> > > > is
>> > > > > as
>> > > > > > > > > > follows.
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> Looking at the Streams state as a
>> > read-only
>> > > > > > > distributed kv
>> > > > > > > > > > > > >>>> store.
>> > > > > > > > > > > > >>>>>> With
>> > > > > > > > > > > > >>>>>>>>>>>> num_standby = f , we should be able to
>> > > > tolerate
>> > > > > f
>> > > > > > > failures
>> > > > > > > > > > > > >> and
>> > > > > > > > > > > > >>>> if
>> > > > > > > > > > > > >>>>>> there
>> > > > > > > > > > > > >>>>>>>>>>> is
>> > > > > > > > > > > > >>>>>>>>>>>> a f+1' failure, the system should be
>> > > > > unavailable.
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> A) So with num_standby=0, the system
>> > should be
>> > > > > > > unavailable
>> > > > > > > > > > > > >>> even
>> > > > > > > > > > > > >>>> if
>> > > > > > > > > > > > >>>>>>>>>> there
>> > > > > > > > > > > > >>>>>>>>>>> is
>> > > > > > > > > > > > >>>>>>>>>>>> 1 failure and thats my argument for not
>> > > > allowing
>> > > > > > > querying
>> > > > > > > > > > in
>> > > > > > > > > > > > >>>>>>>>>> restoration
>> > > > > > > > > > > > >>>>>>>>>>>> state, esp in this case it will be a
>> total
>> > > > > rebuild
>> > > > > > > of the
>> > > > > > > > > > > > >>> state
>> > > > > > > > > > > > >>>>>> (which
>> > > > > > > > > > > > >>>>>>>>>>> IMO
>> > > > > > > > > > > > >>>>>>>>>>>> cannot be considered a normal fault
>> free
>> > > > > operational
>> > > > > > > > > > state).
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> B) Even there are standby's, say
>> > > > num_standby=2,
>> > > > > if
>> > > > > > > the
>> > > > > > > > > > user
>> > > > > > > > > > > > >>>> decides
>> > > > > > > > > > > > >>>>>> to
>> > > > > > > > > > > > >>>>>>>>>>> shut
>> > > > > > > > > > > > >>>>>>>>>>>> down all 3 instances, then only outcome
>> > should
>> > > > > be
>> > > > > > > > > > > > >>> unavailability
>> > > > > > > > > > > > >>>>>> until
>> > > > > > > > > > > > >>>>>>>>>>> all
>> > > > > > > > > > > > >>>>>>>>>>>> of them come back or state is rebuilt
>> on
>> > other
>> > > > > > > nodes in
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>>>> cluster. In
>> > > > > > > > > > > > >>>>>>>>>>>> normal operations, f <= 2 and when a
>> > failure
>> > > > > does
>> > > > > > > happen
>> > > > > > > > > > we
>> > > > > > > > > > > > >>> can
>> > > > > > > > > > > > >>>> then
>> > > > > > > > > > > > >>>>>>>>>>> either
>> > > > > > > > > > > > >>>>>>>>>>>> choose to be C over A and fail IQs
>> until
>> > > > > > > replication is
>> > > > > > > > > > > > >> fully
>> > > > > > > > > > > > >>>>>> caught up
>> > > > > > > > > > > > >>>>>>>>>>> or
>> > > > > > > > > > > > >>>>>>>>>>>> choose A over C by serving in restoring
>> > state
>> > > > as
>> > > > > > > long as
>> > > > > > > > > > lag
>> > > > > > > > > > > > >>> is
>> > > > > > > > > > > > >>>>>>>>>> minimal.
>> > > > > > > > > > > > >>>>>>>>>>> If
>> > > > > > > > > > > > >>>>>>>>>>>> even with f=1 say, all the standbys are
>> > > > lagging
>> > > > > a
>> > > > > > > lot due
>> > > > > > > > > > to
>> > > > > > > > > > > > >>>> some
>> > > > > > > > > > > > >>>>>>>>>> issue,
>> > > > > > > > > > > > >>>>>>>>>>>> then that should be considered a
>> failure
>> > since
>> > > > > that
>> > > > > > > is
>> > > > > > > > > > > > >>> different
>> > > > > > > > > > > > >>>>>> from
>> > > > > > > > > > > > >>>>>>>>>>>> normal/expected operational mode.
>> Serving
>> > > > reads
>> > > > > with
>> > > > > > > > > > > > >> unbounded
>> > > > > > > > > > > > >>>>>>>>>>> replication
>> > > > > > > > > > > > >>>>>>>>>>>> lag and calling it "available" may not
>> be
>> > very
>> > > > > > > usable or
>> > > > > > > > > > > > >> even
>> > > > > > > > > > > > >>>>>> desirable
>> > > > > > > > > > > > >>>>>>>>>>> :)
>> > > > > > > > > > > > >>>>>>>>>>>> IMHO, since it gives the user no way to
>> > reason
>> > > > > > > about the
>> > > > > > > > > > app
>> > > > > > > > > > > > >>>> that is
>> > > > > > > > > > > > >>>>>>>>>>> going
>> > > > > > > > > > > > >>>>>>>>>>>> to query this store.
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> So there is definitely a need to
>> > distinguish
>> > > > > > > between :
>> > > > > > > > > > > > >>>> Replication
>> > > > > > > > > > > > >>>>>>>>>>> catchup
>> > > > > > > > > > > > >>>>>>>>>>>> while being in fault free state vs
>> > Restoration
>> > > > > of
>> > > > > > > state
>> > > > > > > > > > when
>> > > > > > > > > > > > >>> we
>> > > > > > > > > > > > >>>> lose
>> > > > > > > > > > > > >>>>>>>>>> more
>> > > > > > > > > > > > >>>>>>>>>>>> than f standbys. This knob is a great
>> > starting
>> > > > > point
>> > > > > > > > > > towards
>> > > > > > > > > > > > >>>> this.
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> If you agree with some of the
>> explanation
>> > > > above,
>> > > > > > > please
>> > > > > > > > > > feel
>> > > > > > > > > > > > >>>> free to
>> > > > > > > > > > > > >>>>>>>>>>>> include it in the KIP as well since
>> this
>> > is
>> > > > > sort of
>> > > > > > > our
>> > > > > > > > > > > > >> design
>> > > > > > > > > > > > >>>>>>>>>> principle
>> > > > > > > > > > > > >>>>>>>>>>>> here..
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> Small nits :
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> - let's standardize on "standby"
>> instead
>> > of
>> > > > > > > "replica", KIP
>> > > > > > > > > > > > >> or
>> > > > > > > > > > > > >>>>>> code,  to
>> > > > > > > > > > > > >>>>>>>>>>> be
>> > > > > > > > > > > > >>>>>>>>>>>> consistent with rest of Streams
>> code/docs?
>> > > > > > > > > > > > >>>>>>>>>>>> - Can we merge KAFKA-8994 into
>> KAFKA-6144
>> > now
>> > > > > and
>> > > > > > > close
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>> former?
>> > > > > > > > > > > > >>>>>>>>>>>> Eventually need to consolidate
>> KAFKA-6555
>> > as
>> > > > > well
>> > > > > > > > > > > > >>>>>>>>>>>> - In the new API,
>> > > > > > > > > > > > >>>>
>> "StreamsMetadataState::allMetadataForKey(boolean
>> > > > > > > > > > > > >>>>>>>>>>>> enableReplicaServing, String
>> storeName, K
>> > key,
>> > > > > > > > > > Serializer<K>
>> > > > > > > > > > > > >>>>>>>>>>> keySerializer)" Do
>> > > > > > > > > > > > >>>>>>>>>>>> we really need a per key configuration?
>> > or a
>> > > > new
>> > > > > > > > > > > > >> StreamsConfig
>> > > > > > > > > > > > >>>> is
>> > > > > > > > > > > > >>>>>> good
>> > > > > > > > > > > > >>>>>>>>>>>> enough?
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM
>> Navinder
>> > Brar
>> > > > > > > > > > > > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
>> wrote:
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> @Vinoth, I have incorporated a few of
>> the
>> > > > > > > discussions we
>> > > > > > > > > > > > >> have
>> > > > > > > > > > > > >>>> had
>> > > > > > > > > > > > >>>>>> in
>> > > > > > > > > > > > >>>>>>>>>>> the
>> > > > > > > > > > > > >>>>>>>>>>>>> KIP.
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> In the current code, t0 and t1 serve
>> > queries
>> > > > > from
>> > > > > > > > > > > > >>>> Active(Running)
>> > > > > > > > > > > > >>>>>>>>>>>>> partition. For case t2, we are
>> planning
>> > to
>> > > > > return
>> > > > > > > > > > > > >>>>>>>>>> List<StreamsMetadata>
>> > > > > > > > > > > > >>>>>>>>>>>>> such that it returns
>> <StreamsMetadata(A),
>> > > > > > > > > > > > >> StreamsMetadata(B)>
>> > > > > > > > > > > > >>>> so
>> > > > > > > > > > > > >>>>>> that
>> > > > > > > > > > > > >>>>>>>>>>> if IQ
>> > > > > > > > > > > > >>>>>>>>>>>>> fails on A, the replica on B can serve
>> > the
>> > > > > data by
>> > > > > > > > > > enabling
>> > > > > > > > > > > > >>>> serving
>> > > > > > > > > > > > >>>>>>>>>>> from
>> > > > > > > > > > > > >>>>>>>>>>>>> replicas. This still does not solve
>> case
>> > t3
>> > > > > and t4
>> > > > > > > since
>> > > > > > > > > > B
>> > > > > > > > > > > > >>> has
>> > > > > > > > > > > > >>>> been
>> > > > > > > > > > > > >>>>>>>>>>>>> promoted to active but it is in
>> Restoring
>> > > > > state to
>> > > > > > > > > > catchup
>> > > > > > > > > > > > >>>> till A’s
>> > > > > > > > > > > > >>>>>>>>>>> last
>> > > > > > > > > > > > >>>>>>>>>>>>> committed position as we don’t serve
>> from
>> > > > > > > Restoring state
>> > > > > > > > > > > > >> in
>> > > > > > > > > > > > >>>> Active
>> > > > > > > > > > > > >>>>>>>>>>> and new
>> > > > > > > > > > > > >>>>>>>>>>>>> Replica on R is building itself from
>> > scratch.
>> > > > > Both
>> > > > > > > these
>> > > > > > > > > > > > >>> cases
>> > > > > > > > > > > > >>>> can
>> > > > > > > > > > > > >>>>>> be
>> > > > > > > > > > > > >>>>>>>>>>>>> solved if we start serving from
>> Restoring
>> > > > > state of
>> > > > > > > active
>> > > > > > > > > > > > >> as
>> > > > > > > > > > > > >>>> well
>> > > > > > > > > > > > >>>>>>>>>>> since it
>> > > > > > > > > > > > >>>>>>>>>>>>> is almost equivalent to previous
>> Active.
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> There could be a case where all
>> replicas
>> > of a
>> > > > > > > partition
>> > > > > > > > > > > > >>> become
>> > > > > > > > > > > > >>>>>>>>>>> unavailable
>> > > > > > > > > > > > >>>>>>>>>>>>> and active and all replicas of that
>> > partition
>> > > > > are
>> > > > > > > > > > building
>> > > > > > > > > > > > >>>>>> themselves
>> > > > > > > > > > > > >>>>>>>>>>> from
>> > > > > > > > > > > > >>>>>>>>>>>>> scratch, in this case, the state in
>> > Active is
>> > > > > far
>> > > > > > > behind
>> > > > > > > > > > > > >> even
>> > > > > > > > > > > > >>>>>> though
>> > > > > > > > > > > > >>>>>>>>>>> it is
>> > > > > > > > > > > > >>>>>>>>>>>>> in Restoring state. To cater to such
>> > cases
>> > > > > that we
>> > > > > > > don’t
>> > > > > > > > > > > > >>> serve
>> > > > > > > > > > > > >>>> from
>> > > > > > > > > > > > >>>>>>>>>>> this
>> > > > > > > > > > > > >>>>>>>>>>>>> state we can either add another state
>> > before
>> > > > > > > Restoring or
>> > > > > > > > > > > > >>>> check the
>> > > > > > > > > > > > >>>>>>>>>>>>> difference between last committed
>> offset
>> > and
>> > > > > > > current
>> > > > > > > > > > > > >>> position.
>> > > > > > > > > > > > >>>> Only
>> > > > > > > > > > > > >>>>>>>>>> if
>> > > > > > > > > > > > >>>>>>>>>>> it
>> > > > > > > > > > > > >>>>>>>>>>>>> is within a permissible range (say
>> > 10000) we
>> > > > > will
>> > > > > > > serve
>> > > > > > > > > > > > >> from
>> > > > > > > > > > > > >>>>>>>>>> Restoring
>> > > > > > > > > > > > >>>>>>>>>>> the
>> > > > > > > > > > > > >>>>>>>>>>>>> state of Active.
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>    On Wednesday, 16 October, 2019,
>> > 10:01:35
>> > > > pm
>> > > > > IST,
>> > > > > > > > > > Vinoth
>> > > > > > > > > > > > >>>> Chandar
>> > > > > > > > > > > > >>>>>> <
>> > > > > > > > > > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote:
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>  Thanks for the updates on the KIP,
>> > Navinder!
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> Few comments
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> - AssignmentInfo is not public API?.
>> But
>> > we
>> > > > > will
>> > > > > > > change
>> > > > > > > > > > it
>> > > > > > > > > > > > >>> and
>> > > > > > > > > > > > >>>> thus
>> > > > > > > > > > > > >>>>>>>>>>> need to
>> > > > > > > > > > > > >>>>>>>>>>>>> increment the version and test for
>> > > > > version_probing
>> > > > > > > etc.
>> > > > > > > > > > > > >> Good
>> > > > > > > > > > > > >>> to
>> > > > > > > > > > > > >>>>>>>>>>> separate
>> > > > > > > > > > > > >>>>>>>>>>>>> that from StreamsMetadata changes
>> (which
>> > is
>> > > > > public
>> > > > > > > API)
>> > > > > > > > > > > > >>>>>>>>>>>>> - From what I see, there is going to
>> be
>> > > > choice
>> > > > > > > between
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>>>> following
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>  A) introducing a new
>> > > > > > > *KafkaStreams::allMetadataForKey()
>> > > > > > > > > > > > >> *API
>> > > > > > > > > > > > >>>> that
>> > > > > > > > > > > > >>>>>>>>>>>>> potentially returns
>> List<StreamsMetadata>
>> > > > > ordered
>> > > > > > > from
>> > > > > > > > > > most
>> > > > > > > > > > > > >>>> upto
>> > > > > > > > > > > > >>>>>> date
>> > > > > > > > > > > > >>>>>>>>>>> to
>> > > > > > > > > > > > >>>>>>>>>>>>> least upto date replicas. Today we
>> cannot
>> > > > fully
>> > > > > > > implement
>> > > > > > > > > > > > >>> this
>> > > > > > > > > > > > >>>>>>>>>>> ordering,
>> > > > > > > > > > > > >>>>>>>>>>>>> since all we know is which hosts are
>> > active
>> > > > and
>> > > > > > > which are
>> > > > > > > > > > > > >>>> standbys.
>> > > > > > > > > > > > >>>>>>>>>>>>> However, this aligns well with the
>> > future.
>> > > > > KIP-441
>> > > > > > > adds
>> > > > > > > > > > the
>> > > > > > > > > > > > >>> lag
>> > > > > > > > > > > > >>>>>>>>>>> information
>> > > > > > > > > > > > >>>>>>>>>>>>> to the rebalancing protocol. We could
>> > also
>> > > > sort
>> > > > > > > replicas
>> > > > > > > > > > > > >>> based
>> > > > > > > > > > > > >>>> on
>> > > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > > >>>>>>>>>>>>> report lags eventually. This is fully
>> > > > backwards
>> > > > > > > > > > compatible
>> > > > > > > > > > > > >>> with
>> > > > > > > > > > > > >>>>>>>>>>> existing
>> > > > > > > > > > > > >>>>>>>>>>>>> clients. Only drawback I see is the
>> > naming of
>> > > > > the
>> > > > > > > > > > existing
>> > > > > > > > > > > > >>>> method
>> > > > > > > > > > > > >>>>>>>>>>>>> KafkaStreams::metadataForKey, not
>> > conveying
>> > > > the
>> > > > > > > > > > distinction
>> > > > > > > > > > > > >>>> that it
>> > > > > > > > > > > > >>>>>>>>>>> simply
>> > > > > > > > > > > > >>>>>>>>>>>>> returns the active replica i.e
>> > > > > > > allMetadataForKey.get(0).
>> > > > > > > > > > > > >>>>>>>>>>>>>  B) Change
>> > KafkaStreams::metadataForKey() to
>> > > > > > > return a
>> > > > > > > > > > List.
>> > > > > > > > > > > > >>>> Its a
>> > > > > > > > > > > > >>>>>>>>>>> breaking
>> > > > > > > > > > > > >>>>>>>>>>>>> change.
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> I prefer A, since none of the
>> > > > > semantics/behavior
>> > > > > > > changes
>> > > > > > > > > > > > >> for
>> > > > > > > > > > > > >>>>>> existing
>> > > > > > > > > > > > >>>>>>>>>>>>> users. Love to hear more thoughts.
>> Can we
>> > > > also
>> > > > > > > work this
>> > > > > > > > > > > > >> into
>> > > > > > > > > > > > >>>> the
>> > > > > > > > > > > > >>>>>>>>>> KIP?
>> > > > > > > > > > > > >>>>>>>>>>>>> I already implemented A to unblock
>> > myself for
>> > > > > now.
>> > > > > > > Seems
>> > > > > > > > > > > > >>>> feasible
>> > > > > > > > > > > > >>>>>> to
>> > > > > > > > > > > > >>>>>>>>>>> do.
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM
>> Vinoth
>> > > > > Chandar <
>> > > > > > > > > > > > >>>>>>>>>> vchan...@confluent.io
>> > > > > > > > > > > > >>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>> wrote:
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> I get your point. But suppose there
>> > is a
>> > > > > > > replica which
>> > > > > > > > > > > > >> has
>> > > > > > > > > > > > >>>> just
>> > > > > > > > > > > > >>>>>>>>>>> become
>> > > > > > > > > > > > >>>>>>>>>>>>>> active, so in that case replica will
>> > still
>> > > > be
>> > > > > > > building
>> > > > > > > > > > > > >>> itself
>> > > > > > > > > > > > >>>> from
>> > > > > > > > > > > > >>>>>>>>>>>>> scratch
>> > > > > > > > > > > > >>>>>>>>>>>>>> and this active will go to restoring
>> > state
>> > > > > till it
>> > > > > > > > > > catches
>> > > > > > > > > > > > >>> up
>> > > > > > > > > > > > >>>> with
>> > > > > > > > > > > > >>>>>>>>>>>>> previous
>> > > > > > > > > > > > >>>>>>>>>>>>>> active, wouldn't serving from a
>> > restoring
>> > > > > active
>> > > > > > > make
>> > > > > > > > > > more
>> > > > > > > > > > > > >>>> sense
>> > > > > > > > > > > > >>>>>>>>>>> than a
>> > > > > > > > > > > > >>>>>>>>>>>>>> replica in such case.
>> > > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>> KIP-441 will change this behavior
>> such
>> > that
>> > > > > > > promotion to
>> > > > > > > > > > > > >>>> active
>> > > > > > > > > > > > >>>>>>>>>>> happens
>> > > > > > > > > > > > >>>>>>>>>>>>>> based on how caught up a replica is.
>> So,
>> > > > once
>> > > > > we
>> > > > > > > have
>> > > > > > > > > > that
>> > > > > > > > > > > > >>>> (work
>> > > > > > > > > > > > >>>>>>>>>>> underway
>> > > > > > > > > > > > >>>>>>>>>>>>>> already for 2.5 IIUC) and user sets
>> > > > > > > > > > num.standby.replicas >
>> > > > > > > > > > > > >>> 0,
>> > > > > > > > > > > > >>>> then
>> > > > > > > > > > > > >>>>>>>>>>> the
>> > > > > > > > > > > > >>>>>>>>>>>>>> staleness window should not be that
>> > long as
>> > > > > you
>> > > > > > > > > > describe.
>> > > > > > > > > > > > >>> IMO
>> > > > > > > > > > > > >>>> if
>> > > > > > > > > > > > >>>>>>>>>> user
>> > > > > > > > > > > > >>>>>>>>>>>>> wants
>> > > > > > > > > > > > >>>>>>>>>>>>>> availability for state, then should
>> > > > configure
>> > > > > > > > > > > > >>>> num.standby.replicas
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>> 0.
>> > > > > > > > > > > > >>>>>>>>>>>>> If
>> > > > > > > > > > > > >>>>>>>>>>>>>> not, then on a node loss, few
>> partitions
>> > > > > would be
>> > > > > > > > > > > > >>> unavailable
>> > > > > > > > > > > > >>>> for
>> > > > > > > > > > > > >>>>>> a
>> > > > > > > > > > > > >>>>>>>>>>> while
>> > > > > > > > > > > > >>>>>>>>>>>>>> (there are other ways to bring this
>> > window
>> > > > > down,
>> > > > > > > which I
>> > > > > > > > > > > > >>> won't
>> > > > > > > > > > > > >>>>>>>>>> bring
>> > > > > > > > > > > > >>>>>>>>>>> in
>> > > > > > > > > > > > >>>>>>>>>>>>>> here). We could argue for querying a
>> > > > restoring
>> > > > > > > active
>> > > > > > > > > > > > >> (say a
>> > > > > > > > > > > > >>>> new
>> > > > > > > > > > > > >>>>>>>>>> node
>> > > > > > > > > > > > >>>>>>>>>>>>> added
>> > > > > > > > > > > > >>>>>>>>>>>>>> to replace a faulty old node) based
>> on
>> > AP vs
>> > > > > CP
>> > > > > > > > > > > > >> principles.
>> > > > > > > > > > > > >>>> But
>> > > > > > > > > > > > >>>>>> not
>> > > > > > > > > > > > >>>>>>>>>>> sure
>> > > > > > > > > > > > >>>>>>>>>>>>>> reading really really old values for
>> the
>> > > > sake
>> > > > > of
>> > > > > > > > > > > > >>> availability
>> > > > > > > > > > > > >>>> is
>> > > > > > > > > > > > >>>>>>>>>>> useful.
>> > > > > > > > > > > > >>>>>>>>>>>>> No
>> > > > > > > > > > > > >>>>>>>>>>>>>> AP data system would be inconsistent
>> for
>> > > > such
>> > > > > a
>> > > > > > > long
>> > > > > > > > > > time
>> > > > > > > > > > > > >> in
>> > > > > > > > > > > > >>>>>>>>>>> practice.
>> > > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>> So, I still feel just limiting this
>> to
>> > > > standby
>> > > > > > > reads
>> > > > > > > > > > > > >>> provides
>> > > > > > > > > > > > >>>> best
>> > > > > > > > > > > > >>>>>>>>>>>>>> semantics.
>> > > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>> Just my 2c. Would love to see what
>> > others
>> > > > > think
>> > > > > > > as well.
>> > > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM
>> Navinder
>> > > > Brar
>> > > > > > > > > > > > >>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
>> > wrote:
>> > > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Hi Vinoth,
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the feedback.
>> > > > > > > > > > > > >>>>>>>>>>>>>>>  Can we link the JIRA, discussion
>> > thread
>> > > > > also to
>> > > > > > > the
>> > > > > > > > > > > > >> KIP.>>
>> > > > > > > > > > > > >>>>>> Added.
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Based on the discussion on
>> KAFKA-6144,
>> > I
>> > > > was
>> > > > > > > under the
>> > > > > > > > > > > > >>>> impression
>> > > > > > > > > > > > >>>>>>>>>>> that
>> > > > > > > > > > > > >>>>>>>>>>>>>>> this KIP is also going to cover
>> > exposing of
>> > > > > the
>> > > > > > > standby
>> > > > > > > > > > > > >>>>>>>>>> information
>> > > > > > > > > > > > >>>>>>>>>>> in
>> > > > > > > > > > > > >>>>>>>>>>>>>>> StreamsMetadata and thus subsume
>> > > > KAFKA-8994 .
>> > > > > > > That
>> > > > > > > > > > would
>> > > > > > > > > > > > >>>> require
>> > > > > > > > > > > > >>>>>> a
>> > > > > > > > > > > > >>>>>>>>>>>>> public
>> > > > > > > > > > > > >>>>>>>>>>>>>>> API change?>> Sure, I can add
>> changes
>> > for
>> > > > > 8994
>> > > > > > > in this
>> > > > > > > > > > > > >> KIP
>> > > > > > > > > > > > >>>> and
>> > > > > > > > > > > > >>>>>>>>>> link
>> > > > > > > > > > > > >>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well.
>> > > > > > > > > > > > >>>>>>>>>>>>>>>  KIP seems to be focussing on
>> > restoration
>> > > > > when a
>> > > > > > > new
>> > > > > > > > > > node
>> > > > > > > > > > > > >>> is
>> > > > > > > > > > > > >>>>>>>>>> added.
>> > > > > > > > > > > > >>>>>>>>>>>>>>> KIP-441 is underway and has some
>> major
>> > > > > changes
>> > > > > > > proposed
>> > > > > > > > > > > > >> for
>> > > > > > > > > > > > >>>> this.
>> > > > > > > > > > > > >>>>>>>>>> It
>> > > > > > > > > > > > >>>>>>>>>>>>> would
>> > > > > > > > > > > > >>>>>>>>>>>>>>> be good to clarify dependencies if
>> any.
>> > > > > Without
>> > > > > > > > > > KIP-441,
>> > > > > > > > > > > > >> I
>> > > > > > > > > > > > >>>> am not
>> > > > > > > > > > > > >>>>>>>>>>> very
>> > > > > > > > > > > > >>>>>>>>>>>>> sure
>> > > > > > > > > > > > >>>>>>>>>>>>>>> if we should allow reads from nodes
>> in
>> > > > > RESTORING
>> > > > > > > state,
>> > > > > > > > > > > > >>> which
>> > > > > > > > > > > > >>>>>>>>>> could
>> > > > > > > > > > > > >>>>>>>>>>>>> amount
>> > > > > > > > > > > > >>>>>>>>>>>>>>> to many minutes/few hours of stale
>> > reads?
>> > > > > This
>> > > > > > > is
>> > > > > > > > > > > > >>> different
>> > > > > > > > > > > > >>>> from
>> > > > > > > > > > > > >>>>>>>>>>>>> allowing
>> > > > > > > > > > > > >>>>>>>>>>>>>>> querying standby replicas, which
>> could
>> > be
>> > > > > mostly
>> > > > > > > caught
>> > > > > > > > > > > > >> up
>> > > > > > > > > > > > >>>> and
>> > > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > > >>>>>>>>>>>>>>> staleness window could be much
>> > > > > > > smaller/tolerable. (once
>> > > > > > > > > > > > >>>> again the
>> > > > > > > > > > > > >>>>>>>>>>> focus
>> > > > > > > > > > > > >>>>>>>>>>>>> on
>> > > > > > > > > > > > >>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But
>> > > > suppose
>> > > > > > > there is a
>> > > > > > > > > > > > >>>> replica
>> > > > > > > > > > > > >>>>>>>>>>> which
>> > > > > > > > > > > > >>>>>>>>>>>>> has
>> > > > > > > > > > > > >>>>>>>>>>>>>>> just become active, so in that case
>> > replica
>> > > > > will
>> > > > > > > still
>> > > > > > > > > > be
>> > > > > > > > > > > > >>>>>> building
>> > > > > > > > > > > > >>>>>>>>>>>>> itself
>> > > > > > > > > > > > >>>>>>>>>>>>>>> from scratch and this active will
>> go to
>> > > > > > > restoring state
>> > > > > > > > > > > > >>> till
>> > > > > > > > > > > > >>>> it
>> > > > > > > > > > > > >>>>>>>>>>> catches
>> > > > > > > > > > > > >>>>>>>>>>>>> up
>> > > > > > > > > > > > >>>>>>>>>>>>>>> with previous active, wouldn't
>> serving
>> > > > from a
>> > > > > > > restoring
>> > > > > > > > > > > > >>>> active
>> > > > > > > > > > > > >>>>>>>>>> make
>> > > > > > > > > > > > >>>>>>>>>>> more
>> > > > > > > > > > > > >>>>>>>>>>>>>>> sense than a replica in such case.
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Finally, we may need to introduce a
>> > > > > > > configuration to
>> > > > > > > > > > > > >>> control
>> > > > > > > > > > > > >>>>>> this.
>> > > > > > > > > > > > >>>>>>>>>>> Some
>> > > > > > > > > > > > >>>>>>>>>>>>>>> users may prefer errors to stale
>> data.
>> > Can
>> > > > we
>> > > > > > > also add
>> > > > > > > > > > it
>> > > > > > > > > > > > >>> to
>> > > > > > > > > > > > >>>> the
>> > > > > > > > > > > > >>>>>>>>>>> KIP?>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Will add this.
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Regards,
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Navinder
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth
>> Chandar <
>> > > > > > > > > > v...@confluent.io
>> > > > > > > > > > > > >>>>> wrote:
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Hi Navinder,>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few
>> > thoughts>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Can we link the JIRA, discussion
>> > thread
>> > > > > also
>> > > > > > > to the
>> > > > > > > > > > > > >> KIP>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Based on the discussion on
>> > KAFKA-6144, I
>> > > > > was
>> > > > > > > under
>> > > > > > > > > > the
>> > > > > > > > > > > > >>>>>>>>>> impression
>> > > > > > > > > > > > >>>>>>>>>>>>>>> that>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> this KIP is also going to cover
>> > exposing
>> > > > of
>> > > > > the
>> > > > > > > > > > standby
>> > > > > > > > > > > > >>>>>>>>>>> information in>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> StreamsMetadata and thus subsume
>> > > > KAFKA-8994
>> > > > > .
>> > > > > > > That
>> > > > > > > > > > would
>> > > > > > > > > > > > >>>> require
>> > > > > > > > > > > > >>>>>>>>>> a
>> > > > > > > > > > > > >>>>>>>>>>>>>>> public>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> API change?>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> - KIP seems to be focussing on
>> > restoration
>> > > > > when
>> > > > > > > a new
>> > > > > > > > > > > > >> node
>> > > > > > > > > > > > >>>> is
>> > > > > > > > > > > > >>>>>>>>>>> added.>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> KIP-441 is underway and has some
>> major
>> > > > > changes
>> > > > > > > > > > proposed
>> > > > > > > > > > > > >>> for
>> > > > > > > > > > > > >>>>>> this.
>> > > > > > > > > > > > >>>>>>>>>>> It
>> > > > > > > > > > > > >>>>>>>>>>>>>>> would>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> be good to clarify dependencies if
>> > any.
>> > > > > Without
>> > > > > > > > > > > > >> KIP-441, I
>> > > > > > > > > > > > >>>> am
>> > > > > > > > > > > > >>>>>> not
>> > > > > > > > > > > > >>>>>>>>>>> very
>> > > > > > > > > > > > >>>>>>>>>>>>>>> sure>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> if we should allow reads from
>> nodes in
>> > > > > RESTORING
>> > > > > > > > > > state,
>> > > > > > > > > > > > >>>> which
>> > > > > > > > > > > > >>>>>>>>>> could
>> > > > > > > > > > > > >>>>>>>>>>>>>>> amount>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> to many minutes/few hours of stale
>> > reads?
>> > > > > This
>> > > > > > > is
>> > > > > > > > > > > > >>> different
>> > > > > > > > > > > > >>>>>>>>>>>>>>> fromallowing>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> querying standby replicas, which
>> > could be
>> > > > > mostly
>> > > > > > > > > > caught
>> > > > > > > > > > > > >> up
>> > > > > > > > > > > > >>>> and
>> > > > > > > > > > > > >>>>>>>>>> the>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> staleness window could be much
>> > > > > > > smaller/tolerable.
>> > > > > > > > > > (once
>> > > > > > > > > > > > >>>> again
>> > > > > > > > > > > > >>>>>> the
>> > > > > > > > > > > > >>>>>>>>>>> focus
>> > > > > > > > > > > > >>>>>>>>>>>>>>> on>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> KAFKA-8994)>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Finally, we may need to
>> introduce a
>> > > > > > > configuration to
>> > > > > > > > > > > > >>>> control
>> > > > > > > > > > > > >>>>>>>>>>> this.
>> > > > > > > > > > > > >>>>>>>>>>>>>>> Some>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> users may prefer errors to stale
>> > data. Can
>> > > > > we
>> > > > > > > also add
>> > > > > > > > > > > > >> it
>> > > > > > > > > > > > >>>> to the
>> > > > > > > > > > > > >>>>>>>>>>> KIP?>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> Vinoth>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM
>> > Navinder
>> > > > > Brar>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi,>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Starting a discussion on the KIP
>> to
>> > Allow
>> > > > > state
>> > > > > > > > > > stores
>> > > > > > > > > > > > >> to
>> > > > > > > > > > > > >>>> serve
>> > > > > > > > > > > > >>>>>>>>>>>>> stale>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> reads during rebalance(>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> ).>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks & Regards,Navinder>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>> LinkedIn>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>> --
>> > > > > > > > > > > > >>>>>>>>>> -- Guozhang
>> > > > > > > > > > > > >>>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>>
>> > > > > > > > > > > > >>>>>>>>
>> > > > > > > > > > > > >>>>>>>
>> > > > > > > > > > > > >>>>>>
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>>
>> > > > > > > > > > > > >>>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >> --
>> > > > > > > > > > > > >> -- Guozhang
>> > > > > > > > > > > > >>
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > >
>> > > > >
>> > > >
>> >
>> >
>>
>

Reply via email to