Ping :) Any thoughts? On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar <vchan...@confluent.io> wrote:
> >> I'm having some trouble wrapping my head around what race conditions > might occur, other than the fundamentally broken state in which different > instances are running totally different topologies. > 3. @both Without the topic partitions that the tasks can map back to, we > have to rely on topology/cluster metadata in each Streams instance to map > the task back. If the source topics are wild carded for e,g then each > instance could have different source topics in topology, until the next > rebalance happens. You can also read my comments from here > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > > >> seems hard to imagine how encoding arbitrarily long topic names plus an > integer for the partition number could be as efficient as task ids, which > are just two integers. > 3. if you still have concerns about the efficacy of dictionary encoding, > happy to engage. The link above also has some benchmark code I used. > Theoretically, we would send each topic name atleast once, so yes if you > compare a 10-20 character topic name + an integer to two integers, it will > be more bytes. But its constant overhead proportional to size of topic name > and with 4,8,12, partitions the size difference between baseline (version 4 > where we just repeated topic names for each topic partition) and the two > approaches becomes narrow. > > >>Plus, Navinder is going to implement a bunch of protocol code that we > might just want to change when the discussion actually does take place, if > ever. > >>it'll just be a mental burden for everyone to remember that we want to > have this follow-up discussion. > 3. Is n't people changing same parts of code and tracking follow ups a > common thing, we need to deal with anyway? For this KIP, is n't it enough > to reason about whether the additional map on top of the topic dictionary > would incur more overhead than the sending task_ids? I don't think it's > case, both of them send two integers. As I see it, we can do a separate > follow up to (re)pursue the task_id conversion and get it working for both > maps within the next release? > > >>Can you elaborate on "breaking up the API"? It looks like there are > already separate API calls in the proposal, one for time-lag, and another > for offset-lag, so are they not already broken up? > The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo > objects which has both time and offset lags. If we had separate APIs, say > (e.g offsetLagForStore(), timeLagForStore()), we can implement offset > version using the offset lag that the streams instance already tracks i.e > no need for external calls. The time based lag API would incur the kafka > read for the timestamp. makes sense? > > Based on the discussions so far, I only see these two pending issues to be > aligned on. Is there any other open item people want to bring up? > > On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > >> Regarding 3) I'm wondering, does your concern still apply even now >> that the pluggable PartitionGrouper interface has been deprecated? >> Now that we can be sure that the DefaultPartitionGrouper is used to >> generate >> the taskId -> partitions mapping, we should be able to convert any taskId >> to any >> partitions. >> >> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <j...@confluent.io> wrote: >> >> > Hey Vinoth, thanks for the reply! >> > >> > 3. >> > I get that it's not the main focus of this KIP, but if it's ok, it >> > would be nice to hash out this point right now. It only came up >> > because this KIP-535 is substantially extending the pattern in >> > question. If we push it off until later, then the reviewers are going >> > to have to suspend their concerns not just while voting for the KIP, >> > but also while reviewing the code. Plus, Navinder is going to >> > implement a bunch of protocol code that we might just want to change >> > when the discussion actually does take place, if ever. Finally, it'll >> > just be a mental burden for everyone to remember that we want to have >> > this follow-up discussion. >> > >> > It makes sense what you say... the specific assignment is already >> > encoded in the "main" portion of the assignment, not in the "userdata" >> > part. It also makes sense that it's simpler to reason about races if >> > you simply get all the information about the topics and partitions >> > directly from the assignor, rather than get the partition number from >> > the assignor and the topic name from your own a priori knowledge of >> > the topology. On the other hand, I'm having some trouble wrapping my >> > head around what race conditions might occur, other than the >> > fundamentally broken state in which different instances are running >> > totally different topologies. Sorry, but can you remind us of the >> > specific condition? >> > >> > To the efficiency counterargument, it seems hard to imagine how >> > encoding arbitrarily long topic names plus an integer for the >> > partition number could be as efficient as task ids, which are just two >> > integers. It seems like this would only be true if topic names were 4 >> > characters or less. >> > >> > 4. >> > Yeah, clearly, it would not be a good idea to query the metadata >> > before every single IQ query. I think there are plenty of established >> > patterns for distributed database clients to follow. Can you elaborate >> > on "breaking up the API"? It looks like there are already separate API >> > calls in the proposal, one for time-lag, and another for offset-lag, >> > so are they not already broken up? FWIW, yes, I agree, the offset lag >> > is already locally known, so we don't need to build in an extra >> > synchronous broker API call, just one for the time-lag. >> > >> > Thanks again for the discussion, >> > -John >> > >> > On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar <vchan...@confluent.io> >> > wrote: >> > > >> > > 3. Right now, we still get the topic partitions assigned as a part of >> the >> > > top level Assignment object (the one that wraps AssignmentInfo) and >> use >> > > that to convert taskIds back. This list of only contains assignments >> for >> > > that particular instance. Attempting to also reverse map for "all" the >> > > tasksIds in the streams cluster i.e all the topic partitions in these >> > > global assignment maps was what was problematic. By explicitly sending >> > the >> > > global assignment maps as actual topic partitions, group coordinator >> > (i.e >> > > the leader that computes the assignment's ) is able to consistently >> > enforce >> > > its view of the topic metadata. Still don't think doing such a change >> > that >> > > forces you to reconsider semantics, is not needed to save bits on >> wire. >> > May >> > > be we can discuss this separately from this KIP? >> > > >> > > 4. There needs to be some caching/interval somewhere though since we >> > don't >> > > want to make 1 kafka read per 1 IQ potentially. But I think its a >> valid >> > > suggestion, to make this call just synchronous and leave the caching >> or >> > how >> > > often you want to call to the application. Would it be good to then >> break >> > > up the APIs for time and offset based lag? We can obtain offset based >> > lag >> > > for free? Only incur the overhead of reading kafka if we want time >> > > based lags? >> > > >> > > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman < >> sop...@confluent.io> >> > > wrote: >> > > >> > > > Adding on to John's response to 3), can you clarify when and why >> > exactly we >> > > > cannot >> > > > convert between taskIds and partitions? If that's really the case I >> > don't >> > > > feel confident >> > > > that the StreamsPartitionAssignor is not full of bugs... >> > > > >> > > > It seems like it currently just encodes a list of all partitions >> (the >> > > > assignment) and also >> > > > a list of the corresponding task ids, duplicated to ensure each >> > partition >> > > > has the corresponding >> > > > taskId at the same offset into the list. Why is that problematic? >> > > > >> > > > >> > > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler <j...@confluent.io> >> > wrote: >> > > > >> > > > > Thanks, all, for considering the points! >> > > > > >> > > > > 3. Interesting. I have a vague recollection of that... Still, >> though, >> > > > > it seems a little fishy. After all, we return the assignments >> > > > > themselves as task ids, and the members have to map these to topic >> > > > > partitions in order to configure themselves properly. If it's too >> > > > > complicated to get this right, then how do we know that Streams is >> > > > > computing the correct partitions at all? >> > > > > >> > > > > 4. How about just checking the log-end timestamp when you call the >> > > > > method? Then, when you get an answer, it's as fresh as it could >> > > > > possibly be. And as a user you have just one, obvious, "knob" to >> > > > > configure how much overhead you want to devote to checking... If >> you >> > > > > want to call the broker API less frequently, you just call the >> > Streams >> > > > > API less frequently. And you don't have to worry about the >> > > > > relationship between your invocations of that method and the >> config >> > > > > setting (e.g., you'll never get a negative number, which you >> could if >> > > > > you check the log-end timestamp less frequently than you check the >> > > > > lag). >> > > > > >> > > > > Thanks, >> > > > > -John >> > > > > >> > > > > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar >> > > > > <navinder_b...@yahoo.com.invalid> wrote: >> > > > > > >> > > > > > Thanks John for going through this. >> > > > > > >> > > > > > - +1, makes sense >> > > > > > - +1, no issues there >> > > > > > - Yeah the initial patch I had submitted for K-7149( >> > > > > https://github.com/apache/kafka/pull/6935) to reduce >> assignmentInfo >> > > > > object had taskIds but the merged PR had similar size according to >> > Vinoth >> > > > > and it was simpler so if the end result is of same size, it would >> not >> > > > make >> > > > > sense to pivot from dictionary and again move to taskIDs. >> > > > > > - Not sure about what a good default would be if we don't >> have a >> > > > > configurable setting. This gives the users the flexibility to the >> > users >> > > > to >> > > > > serve their requirements as at the end of the day it would take >> CPU >> > > > cycles. >> > > > > I am ok with starting it with a default and see how it goes based >> > upon >> > > > > feedback. >> > > > > > >> > > > > > Thanks, >> > > > > > Navinder >> > > > > > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth >> Chandar < >> > > > > vchan...@confluent.io> wrote: >> > > > > > >> > > > > > 1. Was trying to spell them out separately. but makes sense for >> > > > > > readability. done >> > > > > > >> > > > > > 2. No I immediately agree :) .. makes sense. @navinder? >> > > > > > >> > > > > > 3. I actually attempted only sending taskIds while working on >> > > > KAFKA-7149. >> > > > > > Its non-trivial to handle edges cases resulting from newly added >> > topic >> > > > > > partitions and wildcarded topic entries. I ended up simplifying >> it >> > to >> > > > > just >> > > > > > dictionary encoding the topic names to reduce size. We can apply >> > the >> > > > same >> > > > > > technique here for this map. Additionally, we could also >> dictionary >> > > > > encode >> > > > > > HostInfo, given its now repeated twice. I think this would save >> > more >> > > > > space >> > > > > > than having a flag per topic partition entry. Lmk if you are >> okay >> > with >> > > > > > this. >> > > > > > >> > > > > > 4. This opens up a good discussion. Given we support time lag >> > estimates >> > > > > > also, we need to read the tail record of the changelog >> periodically >> > > > > (unlike >> > > > > > offset lag, which we can potentially piggyback on metadata in >> > > > > > ConsumerRecord IIUC). we thought we should have a config that >> > control >> > > > how >> > > > > > often this read happens? Let me know if there is a simple way to >> > get >> > > > > > timestamp value of the tail record that we are missing. >> > > > > > >> > > > > > On Thu, Oct 31, 2019 at 12:58 PM John Roesler < >> j...@confluent.io> >> > > > wrote: >> > > > > > >> > > > > > > Hey Navinder, >> > > > > > > >> > > > > > > Thanks for updating the KIP, it's a lot easier to see the >> current >> > > > > > > state of the proposal now. >> > > > > > > >> > > > > > > A few remarks: >> > > > > > > 1. I'm sure it was just an artifact of revisions, but you have >> > two >> > > > > > > separate sections where you list additions to the KafkaStreams >> > > > > > > interface. Can you consolidate those so we can see all the >> > additions >> > > > > > > at once? >> > > > > > > >> > > > > > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" >> > instead, >> > > > > > > to be clearer that we're specifically measuring a number of >> > offsets? >> > > > > > > If you don't immediately agree, then I'd at least point out >> that >> > we >> > > > > > > usually refer to elements of Kafka topics as "records", not >> > > > > > > "messages", so "recordLagEstimate" might be more appropriate. >> > > > > > > >> > > > > > > 3. The proposal mentions adding a map of the standby >> > _partitions_ for >> > > > > > > each host to AssignmentInfo. I assume this is designed to >> mirror >> > the >> > > > > > > existing "partitionsByHost" map. To keep the size of these >> > metadata >> > > > > > > messages down, maybe we can consider making two changes: >> > > > > > > (a) for both actives and standbys, encode the _task ids_ >> instead >> > of >> > > > > > > _partitions_. Every member of the cluster has a copy of the >> > topology, >> > > > > > > so they can convert task ids into specific partitions on their >> > own, >> > > > > > > and task ids are only (usually) three characters. >> > > > > > > (b) instead of encoding two maps (hostinfo -> actives AND >> > hostinfo -> >> > > > > > > standbys), which requires serializing all the hostinfos twice, >> > maybe >> > > > > > > we can pack them together in one map with a structured value >> > > > (hostinfo >> > > > > > > -> [actives,standbys]). >> > > > > > > Both of these ideas still require bumping the protocol version >> > to 6, >> > > > > > > and they basically mean we drop the existing >> `PartitionsByHost` >> > field >> > > > > > > and add a new `TasksByHost` field with the structured value I >> > > > > > > mentioned. >> > > > > > > >> > > > > > > 4. Can we avoid adding the new "lag refresh" config? The lags >> > would >> > > > > > > necessarily be approximate anyway, so adding the config seems >> to >> > > > > > > increase the operational complexity of the system for little >> > actual >> > > > > > > benefit. >> > > > > > > >> > > > > > > Thanks for the pseudocode, by the way, it really helps >> visualize >> > how >> > > > > > > these new interfaces would play together. And thanks again for >> > the >> > > > > > > update! >> > > > > > > -John >> > > > > > > >> > > > > > > On Thu, Oct 31, 2019 at 2:41 PM John Roesler < >> j...@confluent.io> >> > > > > wrote: >> > > > > > > > >> > > > > > > > Hey Vinoth, >> > > > > > > > >> > > > > > > > I started going over the KIP again yesterday. There are a >> lot >> > of >> > > > > > > > updates, and I didn't finish my feedback in one day. I'm >> > working on >> > > > > it >> > > > > > > > now. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > John >> > > > > > > > >> > > > > > > > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar < >> > > > > vchan...@confluent.io> >> > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > Wondering if anyone has thoughts on these changes? I liked >> > that >> > > > > the new >> > > > > > > > > metadata fetch APIs provide all the information at once >> with >> > > > > consistent >> > > > > > > > > naming.. >> > > > > > > > > >> > > > > > > > > Any guidance on what you would like to be discussed or >> > fleshed >> > > > out >> > > > > more >> > > > > > > > > before we call a VOTE? >> > > > > > > > > >> > > > > > > > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar >> > > > > > > > > <navinder_b...@yahoo.com.invalid> wrote: >> > > > > > > > > >> > > > > > > > > > Hi, >> > > > > > > > > > We have made some edits in the KIP( >> > > > > > > > > > >> > > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance >> > > > > > > ) >> > > > > > > > > > after due deliberation on the agreed design to support >> the >> > new >> > > > > query >> > > > > > > > > > design. This includes the new public API to query >> > offset/time >> > > > lag >> > > > > > > > > > information and other details related to querying >> standby >> > tasks >> > > > > > > which have >> > > > > > > > > > come up after thinking of thorough details. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > - Addition of new config, “lag.fetch.interval.ms” to >> > > > > configure >> > > > > > > the >> > > > > > > > > > interval of time/offset lag >> > > > > > > > > > - Addition of new class StoreLagInfo to store the >> > > > periodically >> > > > > > > obtained >> > > > > > > > > > time/offset lag >> > > > > > > > > > - Addition of two new functions in KafkaStreams, >> > > > > > > List<StoreLagInfo> >> > > > > > > > > > allLagInfo() and List<StoreLagInfo> >> lagInfoForStore(String >> > > > > > > storeName) to >> > > > > > > > > > return the lag information for an instance and a store >> > > > > respectively >> > > > > > > > > > - Addition of new class KeyQueryMetadata. We need >> > > > > topicPartition >> > > > > > > for >> > > > > > > > > > each key to be matched with the lag API for the topic >> > > > partition. >> > > > > One >> > > > > > > way is >> > > > > > > > > > to add new functions and fetch topicPartition from >> > > > > > > StreamsMetadataState but >> > > > > > > > > > we thought having one call and fetching StreamsMetadata >> and >> > > > > > > topicPartition >> > > > > > > > > > is more cleaner. >> > > > > > > > > > - >> > > > > > > > > > Renaming partitionsForHost to activePartitionsForHost in >> > > > > > > StreamsMetadataState >> > > > > > > > > > and partitionsByHostState to activePartitionsByHostState >> > > > > > > > > > in StreamsPartitionAssignor >> > > > > > > > > > - We have also added the pseudo code of how all the >> > changes >> > > > > will >> > > > > > > exist >> > > > > > > > > > together and support the new querying APIs >> > > > > > > > > > >> > > > > > > > > > Please let me know if anything is pending now, before a >> > vote >> > > > can >> > > > > be >> > > > > > > > > > started on this. On Saturday, 26 October, 2019, >> 05:41:44 >> > pm >> > > > IST, >> > > > > > > Navinder >> > > > > > > > > > Brar <navinder_b...@yahoo.com.invalid> wrote: >> > > > > > > > > > >> > > > > > > > > > >> Since there are two soft votes for separate >> > active/standby >> > > > > API >> > > > > > > > > > methods, I also change my position on that. Fine with 2 >> > > > separate >> > > > > > > > > > methods. Once we remove the lag information from these >> > APIs, >> > > > > > > returning a >> > > > > > > > > > List is less attractive, since the ordering has no >> special >> > > > > meaning >> > > > > > > now. >> > > > > > > > > > Agreed, now that we are not returning lag, I am also >> sold >> > on >> > > > > having >> > > > > > > two >> > > > > > > > > > separate functions. We already have one which returns >> > > > > > > streamsMetadata for >> > > > > > > > > > active tasks, and now we can add another one for >> standbys. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Saturday, 26 October, 2019, 03:55:16 am IST, >> Vinoth >> > > > > Chandar < >> > > > > > > > > > vchan...@confluent.io> wrote: >> > > > > > > > > > >> > > > > > > > > > +1 to Sophie's suggestion. Having both lag in terms of >> > time >> > > > and >> > > > > > > offsets is >> > > > > > > > > > good and makes for a more complete API. >> > > > > > > > > > >> > > > > > > > > > Since there are two soft votes for separate >> active/standby >> > API >> > > > > > > methods, I >> > > > > > > > > > also change my position on that. Fine with 2 separate >> > methods. >> > > > > > > > > > Once we remove the lag information from these APIs, >> > returning a >> > > > > List >> > > > > > > is >> > > > > > > > > > less attractive, since the ordering has no special >> meaning >> > now. >> > > > > > > > > > >> > > > > > > > > > >> lag in offsets vs time: Having both, as suggested by >> > Sophie >> > > > > would >> > > > > > > of >> > > > > > > > > > course be best. What is a little unclear to me is, how >> in >> > > > details >> > > > > > > are we >> > > > > > > > > > going to compute both? >> > > > > > > > > > @navinder may be next step is to flesh out these details >> > and >> > > > > surface >> > > > > > > any >> > > > > > > > > > larger changes we need to make if need be. >> > > > > > > > > > >> > > > > > > > > > Any other details we need to cover, before a VOTE can be >> > called >> > > > > on >> > > > > > > this? >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck < >> > bbej...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > I am jumping in a little late here. >> > > > > > > > > > > >> > > > > > > > > > > Overall I agree with the proposal to push decision >> > making on >> > > > > > > what/how to >> > > > > > > > > > > query in the query layer. >> > > > > > > > > > > >> > > > > > > > > > > For point 5 from above, I'm slightly in favor of >> having >> > a new >> > > > > > > method, >> > > > > > > > > > > "standbyMetadataForKey()" or something similar. >> > > > > > > > > > > Because even if we return all tasks in one list, the >> user >> > > > will >> > > > > > > still have >> > > > > > > > > > > to perform some filtering to separate the different >> > tasks, >> > > > so I >> > > > > > > don't >> > > > > > > > > > feel >> > > > > > > > > > > making two calls is a burden, and IMHO makes things >> more >> > > > > > > transparent for >> > > > > > > > > > > the user. >> > > > > > > > > > > If the final vote is for using an "isActive" field, >> I'm >> > good >> > > > > with >> > > > > > > that as >> > > > > > > > > > > well. >> > > > > > > > > > > >> > > > > > > > > > > Just my 2 cents. >> > > > > > > > > > > >> > > > > > > > > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar >> > > > > > > > > > > <navinder_b...@yahoo.com.invalid> wrote: >> > > > > > > > > > > >> > > > > > > > > > > > I think now we are aligned on almost all the design >> > parts. >> > > > > > > Summarising >> > > > > > > > > > > > below what has been discussed above and we have a >> > general >> > > > > > > consensus on. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > - Rather than broadcasting lag across all nodes >> at >> > > > > > > rebalancing/with >> > > > > > > > > > > the >> > > > > > > > > > > > heartbeat, we will just return a list of all >> available >> > > > > standby’s >> > > > > > > in the >> > > > > > > > > > > > system and the user can make IQ query any of those >> > nodes >> > > > > which >> > > > > > > will >> > > > > > > > > > > return >> > > > > > > > > > > > the response, and the lag and offset time. Based on >> > which >> > > > > user >> > > > > > > can >> > > > > > > > > > decide >> > > > > > > > > > > > if he wants to return the response back or call >> another >> > > > > standby. >> > > > > > > > > > > > - The current metadata query frequency will not >> > change. >> > > > > It >> > > > > > > will be >> > > > > > > > > > > the >> > > > > > > > > > > > same as it does now, i.e. before each query. >> > > > > > > > > > > > >> > > > > > > > > > > > - For fetching list<StreamsMetadata> in >> > > > > > > StreamsMetadataState.java >> > > > > > > > > > and >> > > > > > > > > > > > List<QueryableStoreProvider> in >> > > > > > > StreamThreadStateStoreProvider.java >> > > > > > > > > > > (which >> > > > > > > > > > > > will return all active stores which are >> > running/restoring >> > > > and >> > > > > > > replica >> > > > > > > > > > > > stores which are running), we will add new functions >> > and >> > > > not >> > > > > > > disturb >> > > > > > > > > > the >> > > > > > > > > > > > existing functions >> > > > > > > > > > > > >> > > > > > > > > > > > - There is no need to add new StreamsConfig for >> > > > > implementing >> > > > > > > this >> > > > > > > > > > KIP >> > > > > > > > > > > > >> > > > > > > > > > > > - We will add standbyPartitionsByHost in >> > AssignmentInfo >> > > > > and >> > > > > > > > > > > > StreamsMetadataState which would change the existing >> > > > > > > rebuildMetadata() >> > > > > > > > > > > and >> > > > > > > > > > > > setPartitionsByHostState() >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > If anyone has any more concerns please feel free to >> > add. >> > > > Post >> > > > > > > this I >> > > > > > > > > > will >> > > > > > > > > > > > be initiating a vote. >> > > > > > > > > > > > ~Navinder >> > > > > > > > > > > > >> > > > > > > > > > > > On Friday, 25 October, 2019, 12:05:29 pm IST, >> > Matthias >> > > > J. >> > > > > Sax >> > > > > > > < >> > > > > > > > > > > > matth...@confluent.io> wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > Just to close the loop @Vinoth: >> > > > > > > > > > > > >> > > > > > > > > > > > > 1. IIUC John intends to add (or we can do this in >> > this >> > > > > KIP) lag >> > > > > > > > > > > > information >> > > > > > > > > > > > > to AssignmentInfo, which gets sent to every >> > participant. >> > > > > > > > > > > > >> > > > > > > > > > > > As explained by John, currently KIP-441 plans to >> only >> > > > report >> > > > > the >> > > > > > > > > > > > information to the leader. But I guess, with the new >> > > > > proposal to >> > > > > > > not >> > > > > > > > > > > > broadcast this information anyway, this concern is >> > > > > invalidated >> > > > > > > anyway >> > > > > > > > > > > > >> > > > > > > > > > > > > 2. At-least I was under the assumption that it >> can be >> > > > > called >> > > > > > > per >> > > > > > > > > > query, >> > > > > > > > > > > > > since the API docs don't seem to suggest >> otherwise. >> > Do >> > > > you >> > > > > see >> > > > > > > any >> > > > > > > > > > > > > potential issues if we call this every query? (we >> > should >> > > > > > > benchmark >> > > > > > > > > > this >> > > > > > > > > > > > > nonetheless) >> > > > > > > > > > > > >> > > > > > > > > > > > I did not see a real issue if people refresh the >> > metadata >> > > > > > > frequently, >> > > > > > > > > > > > because it would be a local call. My main point was, >> > that >> > > > > this >> > > > > > > would >> > > > > > > > > > > > change the current usage pattern of the API, and we >> > would >> > > > > > > clearly need >> > > > > > > > > > > > to communicate this change. Similar to (1), this >> > concern in >> > > > > > > invalidated >> > > > > > > > > > > > anyway. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > @John: I think it's a great idea to get rid of >> > reporting >> > > > > lag, and >> > > > > > > > > > > > pushing the decision making process about "what to >> > query" >> > > > > into >> > > > > > > the >> > > > > > > > > > query >> > > > > > > > > > > > serving layer itself. This simplifies the overall >> > design of >> > > > > this >> > > > > > > KIP >> > > > > > > > > > > > significantly, and actually aligns very well with >> the >> > idea >> > > > > that >> > > > > > > Kafka >> > > > > > > > > > > > Streams (as it is a library) should only provide the >> > basic >> > > > > > > building >> > > > > > > > > > > > block. Many of my raised questions are invalided by >> > this. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > Some questions are still open though: >> > > > > > > > > > > > >> > > > > > > > > > > > > 10) Do we need to distinguish between >> > active(restoring) >> > > > and >> > > > > > > standby >> > > > > > > > > > > > > tasks? Or could be treat both as the same? >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > @Vinoth: about (5). I see your point about multiple >> > calls >> > > > vs >> > > > > a >> > > > > > > single >> > > > > > > > > > > > call. I still slightly prefer multiple calls, but >> it's >> > > > highly >> > > > > > > > > > subjective >> > > > > > > > > > > > and I would also be fine to add an #isActive() >> method. >> > > > Would >> > > > > be >> > > > > > > good >> > > > > > > > > > the >> > > > > > > > > > > > get feedback from others. >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > For (14), ie, lag in offsets vs time: Having both, >> as >> > > > > suggested >> > > > > > > by >> > > > > > > > > > > > Sophie would of course be best. What is a little >> > unclear to >> > > > > me >> > > > > > > is, how >> > > > > > > > > > > > in details are we going to compute both? >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > -Matthias >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: >> > > > > > > > > > > > > Just to chime in on the "report lag vs timestamp >> > > > > difference" >> > > > > > > issue, I >> > > > > > > > > > > > would >> > > > > > > > > > > > > actually advocate for both. As mentioned already, >> > time >> > > > > > > difference is >> > > > > > > > > > > > > probably a lot easier and/or more useful to reason >> > about >> > > > in >> > > > > > > terms of >> > > > > > > > > > > > > "freshness" >> > > > > > > > > > > > > of the state. But in the case when all queried >> > stores are >> > > > > far >> > > > > > > behind, >> > > > > > > > > > > lag >> > > > > > > > > > > > > could >> > > > > > > > > > > > > be used to estimate the recovery velocity. You can >> > then >> > > > > get a >> > > > > > > (pretty >> > > > > > > > > > > > rough) >> > > > > > > > > > > > > idea of when a store might be ready, and wait >> until >> > > > around >> > > > > > > then to >> > > > > > > > > > > query >> > > > > > > > > > > > > again. >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang < >> > > > > > > wangg...@gmail.com> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > > >> > > > > > > > > > > > >> I think I agree with John's recent reasoning as >> > well: >> > > > > instead >> > > > > > > of >> > > > > > > > > > > letting >> > > > > > > > > > > > >> the storeMetadataAPI to return the staleness >> > > > information, >> > > > > > > letting >> > > > > > > > > > the >> > > > > > > > > > > > >> client to query either active or standby and >> letting >> > > > > standby >> > > > > > > query >> > > > > > > > > > > > response >> > > > > > > > > > > > >> to include both the values + timestamp (or lag, >> as >> > in >> > > > > diffs of >> > > > > > > > > > > > timestamps) >> > > > > > > > > > > > >> would actually be more intuitive -- not only the >> > streams >> > > > > > > client is >> > > > > > > > > > > > simpler, >> > > > > > > > > > > > >> from user's perspective they also do not need to >> > > > > periodically >> > > > > > > > > > refresh >> > > > > > > > > > > > their >> > > > > > > > > > > > >> staleness information from the client, but only >> > need to >> > > > > make >> > > > > > > > > > decisions >> > > > > > > > > > > > on >> > > > > > > > > > > > >> the fly whenever they need to query. >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> Again the standby replica then need to know the >> > current >> > > > > active >> > > > > > > > > > task's >> > > > > > > > > > > > >> timestamp, which can be found from the log end >> > record's >> > > > > > > encoded >> > > > > > > > > > > > timestamp; >> > > > > > > > > > > > >> today we standby tasks do not read that specific >> > record, >> > > > > but >> > > > > > > only >> > > > > > > > > > > > refresh >> > > > > > > > > > > > >> its knowledge on the log end offset, but I think >> > > > > refreshing >> > > > > > > the >> > > > > > > > > > latest >> > > > > > > > > > > > >> record timestamp is not a very bad request to add >> > on the >> > > > > > > standby >> > > > > > > > > > > > replicas. >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> Guozhang >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar < >> > > > > > > > > > vchan...@confluent.io >> > > > > > > > > > > > >> > > > > > > > > > > > >> wrote: >> > > > > > > > > > > > >> >> > > > > > > > > > > > >>> +1 As someone implementing a query routing >> layer, >> > there >> > > > > is >> > > > > > > already >> > > > > > > > > > a >> > > > > > > > > > > > need >> > > > > > > > > > > > >>> to have mechanisms in place to do >> > healthchecks/failure >> > > > > > > detection to >> > > > > > > > > > > > >> detect >> > > > > > > > > > > > >>> failures for queries, while Streams rebalancing >> > > > > eventually >> > > > > > > kicks in >> > > > > > > > > > > the >> > > > > > > > > > > > >>> background. >> > > > > > > > > > > > >>> So, pushing this complexity to the IQ client app >> > keeps >> > > > > > > Streams >> > > > > > > > > > > simpler >> > > > > > > > > > > > as >> > > > > > > > > > > > >>> well. IQs will be potentially issues at an >> order of >> > > > > > > magnitude more >> > > > > > > > > > > > >>> frequently and it can achieve good freshness for >> > the >> > > > lag >> > > > > > > > > > information. >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>> I would like to add however, that we would also >> > need to >> > > > > > > introduce >> > > > > > > > > > > apis >> > > > > > > > > > > > in >> > > > > > > > > > > > >>> KafkaStreams class, for obtaining lag >> information >> > for >> > > > all >> > > > > > > stores >> > > > > > > > > > > local >> > > > > > > > > > > > to >> > > > > > > > > > > > >>> that host. This is for the IQs to relay back >> with >> > the >> > > > > > > response/its >> > > > > > > > > > > own >> > > > > > > > > > > > >>> heartbeat mechanism. >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler < >> > > > > > > j...@confluent.io> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>>> Hi all, >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> I've been mulling about this KIP, and I think I >> > was on >> > > > > the >> > > > > > > wrong >> > > > > > > > > > > track >> > > > > > > > > > > > >>>> earlier with regard to task lags. Tl;dr: I >> don't >> > think >> > > > > we >> > > > > > > should >> > > > > > > > > > add >> > > > > > > > > > > > >>>> lags at all to the metadata API (and also not >> to >> > the >> > > > > > > > > > AssignmentInfo >> > > > > > > > > > > > >>>> protocol message). >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> Like I mentioned early on, reporting lag via >> > > > > > > > > > > > >>>> SubscriptionInfo/AssignmentInfo would only work >> > while >> > > > > > > rebalances >> > > > > > > > > > are >> > > > > > > > > > > > >>>> happening. Once the group stabilizes, no >> members >> > would >> > > > > be >> > > > > > > notified >> > > > > > > > > > > of >> > > > > > > > > > > > >>>> each others' lags anymore. I had been thinking >> > that >> > > > the >> > > > > > > solution >> > > > > > > > > > > would >> > > > > > > > > > > > >>>> be the heartbeat proposal I mentioned earlier, >> but >> > > > that >> > > > > > > proposal >> > > > > > > > > > > would >> > > > > > > > > > > > >>>> have reported the heartbeats of the members >> only >> > to >> > > > the >> > > > > > > leader >> > > > > > > > > > > member >> > > > > > > > > > > > >>>> (the one who makes assignments). To be useful >> in >> > the >> > > > > > > context of >> > > > > > > > > > > _this_ >> > > > > > > > > > > > >>>> KIP, we would also have to report the lags in >> the >> > > > > heartbeat >> > > > > > > > > > > responses >> > > > > > > > > > > > >>>> to of _all_ members. This is a concern to be >> > because >> > > > now >> > > > > > > _all_ the >> > > > > > > > > > > > >>>> lags get reported to _all_ the members on >> _every_ >> > > > > > > heartbeat... a >> > > > > > > > > > lot >> > > > > > > > > > > > >>>> of chatter. >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> Plus, the proposal for KIP-441 is only to >> report >> > the >> > > > > lags >> > > > > > > of each >> > > > > > > > > > > > >>>> _task_. This is the sum of the lags of all the >> > stores >> > > > > in the >> > > > > > > > > > tasks. >> > > > > > > > > > > > >>>> But this would be insufficient for KIP-535. For >> > this >> > > > > kip, >> > > > > > > we would >> > > > > > > > > > > > >>>> want the lag specifically of the store we want >> to >> > > > > query. So >> > > > > > > this >> > > > > > > > > > > > >>>> means, we have to report the lags of all the >> > stores of >> > > > > all >> > > > > > > the >> > > > > > > > > > > members >> > > > > > > > > > > > >>>> to every member... even more chatter! >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> The final nail in the coffin to me is that IQ >> > clients >> > > > > would >> > > > > > > have >> > > > > > > > > > to >> > > > > > > > > > > > >>>> start refreshing their metadata quite >> frequently >> > to >> > > > > stay up >> > > > > > > to >> > > > > > > > > > date >> > > > > > > > > > > on >> > > > > > > > > > > > >>>> the lags, which adds even more overhead to the >> > system. >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> Consider a strawman alternative: we bring >> KIP-535 >> > back >> > > > > to >> > > > > > > > > > extending >> > > > > > > > > > > > >>>> the metadata API to tell the client the active >> and >> > > > > standby >> > > > > > > > > > replicas >> > > > > > > > > > > > >>>> for the key in question (not including and >> > > > > "staleness/lag" >> > > > > > > > > > > > >>>> restriction, just returning all the replicas). >> > Then, >> > > > the >> > > > > > > client >> > > > > > > > > > > picks >> > > > > > > > > > > > >>>> a replica and sends the query. The server >> returns >> > the >> > > > > > > current lag >> > > > > > > > > > > > >>>> along with the response (maybe in an HTML >> header >> > or >> > > > > > > something). >> > > > > > > > > > > Then, >> > > > > > > > > > > > >>>> the client keeps a map of its last observed >> lags >> > for >> > > > > each >> > > > > > > replica, >> > > > > > > > > > > and >> > > > > > > > > > > > >>>> uses this information to prefer fresher >> replicas. >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> OR, if it wants only to query the active >> replica, >> > it >> > > > > would >> > > > > > > throw >> > > > > > > > > > an >> > > > > > > > > > > > >>>> error on any lag response greater than zero, >> > refreshes >> > > > > its >> > > > > > > > > > metadata >> > > > > > > > > > > by >> > > > > > > > > > > > >>>> re-querying the metadata API, and tries again >> > with the >> > > > > > > current >> > > > > > > > > > > active >> > > > > > > > > > > > >>>> replica. >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> This way, the lag information will be super >> fresh >> > for >> > > > > the >> > > > > > > client, >> > > > > > > > > > > and >> > > > > > > > > > > > >>>> we keep the Metadata API / >> > Assignment,Subscription / >> > > > and >> > > > > > > Heartbeat >> > > > > > > > > > > as >> > > > > > > > > > > > >>>> slim as possible. >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> Side note: I do think that some time soon, >> we'll >> > have >> > > > to >> > > > > > > add a >> > > > > > > > > > > library >> > > > > > > > > > > > >>>> for IQ server/clients. I think that this logic >> > will >> > > > > start >> > > > > > > to get >> > > > > > > > > > > > >>>> pretty complex. >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> I hope this thinking is reasonably clear! >> > > > > > > > > > > > >>>> Thanks again, >> > > > > > > > > > > > >>>> -John >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> Does that >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth >> Chandar < >> > > > > > > > > > > > vchan...@confluent.io >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >>>> wrote: >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> Responding to the points raised by Matthias >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> 1. IIUC John intends to add (or we can do >> this in >> > > > this >> > > > > > > KIP) lag >> > > > > > > > > > > > >>>> information >> > > > > > > > > > > > >>>>> to AssignmentInfo, which gets sent to every >> > > > > participant. >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> 2. At-least I was under the assumption that it >> > can be >> > > > > > > called per >> > > > > > > > > > > > >> query, >> > > > > > > > > > > > >>>>> since the API docs don't seem to suggest >> > otherwise. >> > > > Do >> > > > > you >> > > > > > > see >> > > > > > > > > > any >> > > > > > > > > > > > >>>>> potential issues if we call this every query? >> (we >> > > > > should >> > > > > > > > > > benchmark >> > > > > > > > > > > > >> this >> > > > > > > > > > > > >>>>> nonetheless) >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> 4. Agree. metadataForKey() implicitly would >> > return >> > > > the >> > > > > > > active >> > > > > > > > > > host >> > > > > > > > > > > > >>>> metadata >> > > > > > > > > > > > >>>>> (as it was before). We should also document >> this >> > in >> > > > > that >> > > > > > > APIs >> > > > > > > > > > > > >> javadoc, >> > > > > > > > > > > > >>>>> given we have another method(s) that returns >> more >> > > > host >> > > > > > > metadata >> > > > > > > > > > > now. >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> 5. While I see the point, the app/caller has >> to >> > make >> > > > > two >> > > > > > > > > > different >> > > > > > > > > > > > >>> APIs >> > > > > > > > > > > > >>>>> calls to obtain active/standby and potentially >> > do the >> > > > > same >> > > > > > > set of >> > > > > > > > > > > > >>>> operation >> > > > > > > > > > > > >>>>> to query the state. I personally still like a >> > method >> > > > > like >> > > > > > > > > > > isActive() >> > > > > > > > > > > > >>>>> better, but don't have strong opinions. >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> 9. If we do expose the lag information, could >> we >> > just >> > > > > > > leave it >> > > > > > > > > > upto >> > > > > > > > > > > > >> to >> > > > > > > > > > > > >>>> the >> > > > > > > > > > > > >>>>> caller to decide whether it errors out or not >> > and not >> > > > > make >> > > > > > > the >> > > > > > > > > > > > >> decision >> > > > > > > > > > > > >>>>> within Streams? i.e we don't need a new config >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> 14. +1 . If it's easier to do right away. We >> > started >> > > > > with >> > > > > > > number >> > > > > > > > > > of >> > > > > > > > > > > > >>>>> records, following the lead from KIP-441 >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar >> > > > > > > > > > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote: >> > > > > > > > > > > > >>>>> >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> Thanks, everyone for taking a look. Some very >> > cool >> > > > > ideas >> > > > > > > have >> > > > > > > > > > > flown >> > > > > > > > > > > > >>> in. >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>>>> There was a follow-on idea I POCed to >> > continuously >> > > > > > > share lag >> > > > > > > > > > > > >>>>>> information in the heartbeat protocol+1 that >> > would >> > > > be >> > > > > > > great, I >> > > > > > > > > > > will >> > > > > > > > > > > > >>>> update >> > > > > > > > > > > > >>>>>> the KIP assuming this work will finish soon >> > > > > > > > > > > > >>>>>>>> I think that adding a new method to >> > > > > > > StreamsMetadataState and >> > > > > > > > > > > > >>>>>> deprecating the existing method isthe best >> way >> > to >> > > > go; >> > > > > we >> > > > > > > just >> > > > > > > > > > > can't >> > > > > > > > > > > > >>>> change >> > > > > > > > > > > > >>>>>> the return types of any existing methods.+1 >> on >> > this, >> > > > > we >> > > > > > > will add >> > > > > > > > > > > > >> new >> > > > > > > > > > > > >>>>>> methods for users who would be interested in >> > > > querying >> > > > > > > back a >> > > > > > > > > > list >> > > > > > > > > > > > >> of >> > > > > > > > > > > > >>>>>> possible options to query from and leave the >> > current >> > > > > > > function >> > > > > > > > > > > > >>>>>> getStreamsMetadataForKey() untouched for >> users >> > who >> > > > > want >> > > > > > > absolute >> > > > > > > > > > > > >>>>>> consistency. >> > > > > > > > > > > > >>>>>>>> why not just always return all available >> > metadata >> > > > > > > (including >> > > > > > > > > > > > >>>>>> active/standby or lag) and let the caller >> > decide to >> > > > > which >> > > > > > > node >> > > > > > > > > > > they >> > > > > > > > > > > > >>>> want to >> > > > > > > > > > > > >>>>>> route the query+1. I think this makes sense >> as >> > from >> > > > a >> > > > > user >> > > > > > > > > > > > >> standpoint >> > > > > > > > > > > > >>>> there >> > > > > > > > > > > > >>>>>> is no difference b/w an active and a standby >> if >> > both >> > > > > have >> > > > > > > same >> > > > > > > > > > > lag, >> > > > > > > > > > > > >>>> Infact >> > > > > > > > > > > > >>>>>> users would be able to use this API to reduce >> > query >> > > > > load >> > > > > > > on >> > > > > > > > > > > > >> actives, >> > > > > > > > > > > > >>> so >> > > > > > > > > > > > >>>>>> returning all available options along with >> the >> > > > current >> > > > > > > lag in >> > > > > > > > > > each >> > > > > > > > > > > > >>>> would >> > > > > > > > > > > > >>>>>> make sense and leave it to user how they want >> > to use >> > > > > this >> > > > > > > data. >> > > > > > > > > > > > >> This >> > > > > > > > > > > > >>>> has >> > > > > > > > > > > > >>>>>> another added advantage. If a user queries >> any >> > > > random >> > > > > > > machine >> > > > > > > > > > for >> > > > > > > > > > > a >> > > > > > > > > > > > >>>> key and >> > > > > > > > > > > > >>>>>> that machine has a replica for the >> > partition(where >> > > > key >> > > > > > > belongs) >> > > > > > > > > > > > >> user >> > > > > > > > > > > > >>>> might >> > > > > > > > > > > > >>>>>> choose to serve the data from there >> itself(if it >> > > > > doesn’t >> > > > > > > lag >> > > > > > > > > > much) >> > > > > > > > > > > > >>>> rather >> > > > > > > > > > > > >>>>>> than finding the active and making an IQ to >> > that. >> > > > This >> > > > > > > would >> > > > > > > > > > save >> > > > > > > > > > > > >>> some >> > > > > > > > > > > > >>>>>> critical time in serving for some >> applications. >> > > > > > > > > > > > >>>>>>>> Adding the lag in terms of timestamp diff >> > > > comparing >> > > > > the >> > > > > > > > > > > > >> committed >> > > > > > > > > > > > >>>>>> offset.+1 on this, I think it’s more >> readable. >> > But >> > > > as >> > > > > > > John said >> > > > > > > > > > > the >> > > > > > > > > > > > >>>>>> function allMetadataForKey() is just >> returning >> > the >> > > > > > > possible >> > > > > > > > > > > options >> > > > > > > > > > > > >>>> from >> > > > > > > > > > > > >>>>>> where users can query a key, so we can even >> > drop the >> > > > > > > parameter >> > > > > > > > > > > > >>>>>> enableReplicaServing/tolerableDataStaleness >> and >> > just >> > > > > > > return all >> > > > > > > > > > > the >> > > > > > > > > > > > >>>>>> streamsMetadata containing that key along >> with >> > the >> > > > > offset >> > > > > > > limit. >> > > > > > > > > > > > >>>>>> Answering the questions posted by Matthias in >> > > > > sequence. >> > > > > > > > > > > > >>>>>> 1. @John can you please comment on this >> one.2. >> > Yeah >> > > > > the >> > > > > > > usage >> > > > > > > > > > > > >> pattern >> > > > > > > > > > > > >>>>>> would include querying this prior to every >> > request >> > > > 3. >> > > > > > > Will add >> > > > > > > > > > the >> > > > > > > > > > > > >>>> changes >> > > > > > > > > > > > >>>>>> to StreamsMetadata in the KIP, would include >> > changes >> > > > > in >> > > > > > > > > > > > >>>> rebuildMetadata() >> > > > > > > > > > > > >>>>>> etc.4. Makes sense, already addressed above5. >> > Is it >> > > > > > > important >> > > > > > > > > > from >> > > > > > > > > > > > >> a >> > > > > > > > > > > > >>>> user >> > > > > > > > > > > > >>>>>> perspective if they are querying an >> > > > > active(processing), >> > > > > > > > > > > > >>>> active(restoring), >> > > > > > > > > > > > >>>>>> a standby task if we have away of denoting >> lag >> > in a >> > > > > > > readable >> > > > > > > > > > > manner >> > > > > > > > > > > > >>>> which >> > > > > > > > > > > > >>>>>> kind of signifies the user that this is the >> best >> > > > node >> > > > > to >> > > > > > > query >> > > > > > > > > > the >> > > > > > > > > > > > >>>> fresh >> > > > > > > > > > > > >>>>>> data.6. Yes, I intend to return the actives >> and >> > > > > replicas >> > > > > > > in the >> > > > > > > > > > > > >> same >> > > > > > > > > > > > >>>> return >> > > > > > > > > > > > >>>>>> list in allMetadataForKey()7. tricky8. yes, >> we >> > need >> > > > > new >> > > > > > > > > > functions >> > > > > > > > > > > > >> to >> > > > > > > > > > > > >>>> return >> > > > > > > > > > > > >>>>>> activeRestoring and standbyRunning tasks.9. >> > > > > StreamsConfig >> > > > > > > > > > doesn’t >> > > > > > > > > > > > >>> look >> > > > > > > > > > > > >>>> like >> > > > > > > > > > > > >>>>>> of much use to me since we are giving all >> > possible >> > > > > > > options via >> > > > > > > > > > > this >> > > > > > > > > > > > >>>>>> function, or they can use existing function >> > > > > > > > > > > > >>> getStreamsMetadataForKey() >> > > > > > > > > > > > >>>> and >> > > > > > > > > > > > >>>>>> get just the active10. I think treat them >> both >> > the >> > > > > same >> > > > > > > and let >> > > > > > > > > > > the >> > > > > > > > > > > > >>>> lag do >> > > > > > > > > > > > >>>>>> the talking11. We are just sending them the >> > option >> > > > to >> > > > > > > query from >> > > > > > > > > > > in >> > > > > > > > > > > > >>>>>> allMetadataForKey(), which doesn’t include >> any >> > > > > handle. We >> > > > > > > then >> > > > > > > > > > > > >> query >> > > > > > > > > > > > >>>> that >> > > > > > > > > > > > >>>>>> machine for the key where it calls >> allStores() >> > and >> > > > > tries >> > > > > > > to find >> > > > > > > > > > > > >> the >> > > > > > > > > > > > >>>> task >> > > > > > > > > > > > >>>>>> in >> activeRunning/activeRestoring/standbyRunning >> > and >> > > > > adds >> > > > > > > the >> > > > > > > > > > store >> > > > > > > > > > > > >>>> handle >> > > > > > > > > > > > >>>>>> here. 12. Need to verify, but during the >> exact >> > point >> > > > > when >> > > > > > > store >> > > > > > > > > > is >> > > > > > > > > > > > >>>> closed >> > > > > > > > > > > > >>>>>> to transition it from restoring to running >> the >> > > > queries >> > > > > > > will >> > > > > > > > > > fail. >> > > > > > > > > > > > >> The >> > > > > > > > > > > > >>>>>> caller in such case can have their own >> > configurable >> > > > > > > retries to >> > > > > > > > > > > > >> check >> > > > > > > > > > > > >>>> again >> > > > > > > > > > > > >>>>>> or try the replica if a call fails to >> active13. >> > I >> > > > > think >> > > > > > > KIP-216 >> > > > > > > > > > is >> > > > > > > > > > > > >>>> working >> > > > > > > > > > > > >>>>>> on those lines, we might not need few of >> those >> > > > > exceptions >> > > > > > > since >> > > > > > > > > > > now >> > > > > > > > > > > > >>> the >> > > > > > > > > > > > >>>>>> basic idea of this KIP is to support IQ >> during >> > > > > > > rebalancing.14. >> > > > > > > > > > > > >>>> Addressed >> > > > > > > > > > > > >>>>>> above, agreed it looks more readable. >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> On Tuesday, 22 October, 2019, 08:39:07 pm >> > IST, >> > > > > > > Matthias J. >> > > > > > > > > > Sax >> > > > > > > > > > > > >> < >> > > > > > > > > > > > >>>>>> matth...@confluent.io> wrote: >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> One more thought: >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> 14) Is specifying the allowed lag in number >> of >> > > > > records a >> > > > > > > useful >> > > > > > > > > > > way >> > > > > > > > > > > > >>> for >> > > > > > > > > > > > >>>>>> users to declare how stale an instance is >> > allowed to >> > > > > be? >> > > > > > > Would >> > > > > > > > > > it >> > > > > > > > > > > > >> be >> > > > > > > > > > > > >>>>>> more intuitive for users to specify the >> allowed >> > lag >> > > > in >> > > > > > > time >> > > > > > > > > > units >> > > > > > > > > > > > >>>> (would >> > > > > > > > > > > > >>>>>> event time or processing time be better)? It >> > seems >> > > > > hard >> > > > > > > for >> > > > > > > > > > users >> > > > > > > > > > > > >> to >> > > > > > > > > > > > >>>>>> reason how "fresh" a store really is when >> > number of >> > > > > > > records is >> > > > > > > > > > > > >> used. >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> -Matthias >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote: >> > > > > > > > > > > > >>>>>>> Some more follow up thoughts: >> > > > > > > > > > > > >>>>>>> >> > > > > > > > > > > > >>>>>>> 11) If we get a store handle of an >> > > > active(restoring) >> > > > > > > task, and >> > > > > > > > > > > > >> the >> > > > > > > > > > > > >>>> task >> > > > > > > > > > > > >>>>>>> transits to running, does the store handle >> > become >> > > > > > > invalid and a >> > > > > > > > > > > > >> new >> > > > > > > > > > > > >>>> one >> > > > > > > > > > > > >>>>>>> must be retrieved? Or can we "switch it out" >> > > > > underneath >> > > > > > > -- for >> > > > > > > > > > > > >> this >> > > > > > > > > > > > >>>>>>> case, how does the user know when they >> start to >> > > > > query the >> > > > > > > > > > > > >>> up-to-date >> > > > > > > > > > > > >>>>>> state? >> > > > > > > > > > > > >>>>>>> >> > > > > > > > > > > > >>>>>>> 12) Standby tasks will have the store open >> in >> > > > regular >> > > > > > > mode, >> > > > > > > > > > while >> > > > > > > > > > > > >>>>>>> active(restoring) tasks open stores in >> "upgrade >> > > > mode" >> > > > > > > for more >> > > > > > > > > > > > >>>> efficient >> > > > > > > > > > > > >>>>>>> bulk loading. When we switch the store into >> > active >> > > > > mode, >> > > > > > > we >> > > > > > > > > > close >> > > > > > > > > > > > >>> it >> > > > > > > > > > > > >>>> and >> > > > > > > > > > > > >>>>>>> reopen it. What is the impact if we query >> the >> > store >> > > > > > > during >> > > > > > > > > > > > >> restore? >> > > > > > > > > > > > >>>> What >> > > > > > > > > > > > >>>>>>> is the impact if we close the store to >> transit >> > to >> > > > > > > running (eg, >> > > > > > > > > > > > >>> there >> > > > > > > > > > > > >>>>>>> might be open iterators)? >> > > > > > > > > > > > >>>>>>> >> > > > > > > > > > > > >>>>>>> 13) Do we need to introduced new exception >> > types? >> > > > > Compare >> > > > > > > > > > KIP-216 >> > > > > > > > > > > > >>>>>>> ( >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors >> > > > > > > > > > > > >>>>>> ) >> > > > > > > > > > > > >>>>>>> that aims to improve the user experience >> with >> > > > regard >> > > > > to >> > > > > > > IQ >> > > > > > > > > > > > >>>> exceptions. >> > > > > > > > > > > > >>>>>>> >> > > > > > > > > > > > >>>>>>> >> > > > > > > > > > > > >>>>>>> -Matthias >> > > > > > > > > > > > >>>>>>> >> > > > > > > > > > > > >>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote: >> > > > > > > > > > > > >>>>>>>> Thanks for the KIP. >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> Couple of comments: >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 1) With regard to KIP-441, my current >> > > > understanding >> > > > > is >> > > > > > > that >> > > > > > > > > > the >> > > > > > > > > > > > >>> lag >> > > > > > > > > > > > >>>>>>>> information is only reported to the leader >> > (please >> > > > > > > correct me >> > > > > > > > > > > > >> if I >> > > > > > > > > > > > >>>> am >> > > > > > > > > > > > >>>>>>>> wrong). This seems to be quite a >> limitation to >> > > > > actually >> > > > > > > use >> > > > > > > > > > the >> > > > > > > > > > > > >>> lag >> > > > > > > > > > > > >>>>>>>> information. >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 2) The idea of the metadata API is actually >> > to get >> > > > > > > metadata >> > > > > > > > > > once >> > > > > > > > > > > > >>> and >> > > > > > > > > > > > >>>>>>>> only refresh the metadata if a store was >> > migrated. >> > > > > The >> > > > > > > current >> > > > > > > > > > > > >>>> proposal >> > > > > > > > > > > > >>>>>>>> would require to get the metadata before >> each >> > > > query. >> > > > > > > The KIP >> > > > > > > > > > > > >>> should >> > > > > > > > > > > > >>>>>>>> describe the usage pattern and impact in >> more >> > > > > detail. >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 3) Currently, the KIP does not list the >> > public API >> > > > > > > changes in >> > > > > > > > > > > > >>>> detail. >> > > > > > > > > > > > >>>>>>>> Please list all methods you intend to >> > deprecate >> > > > and >> > > > > > > list all >> > > > > > > > > > > > >>>> methods you >> > > > > > > > > > > > >>>>>>>> intend to add (best, using a code-block >> > markup -- >> > > > > > > compare >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements >> > > > > > > > > > > > >>>>>>>> as an example) >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 4) Also note (as already pointed out by >> John), >> > > > that >> > > > > we >> > > > > > > cannot >> > > > > > > > > > > > >> have >> > > > > > > > > > > > >>>> any >> > > > > > > > > > > > >>>>>>>> breaking API changes. Thus, the API should >> be >> > > > > designed >> > > > > > > in a >> > > > > > > > > > > > >> fully >> > > > > > > > > > > > >>>>>>>> backward compatible manner. >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 5) Returning a list of metadata object >> makes >> > it >> > > > hard >> > > > > > > for user >> > > > > > > > > > to >> > > > > > > > > > > > >>>> know if >> > > > > > > > > > > > >>>>>>>> the first object refers to the >> > active(processing), >> > > > > > > > > > > > >>>> active(restoring), or >> > > > > > > > > > > > >>>>>>>> a standby task. IMHO, we should be more >> > explicit. >> > > > > For >> > > > > > > > > > example, a >> > > > > > > > > > > > >>>>>>>> metadata object could have a flag that one >> can >> > > > test >> > > > > via >> > > > > > > > > > > > >>>> `#isActive()`. >> > > > > > > > > > > > >>>>>>>> Or maybe even better, we could keep the >> > current >> > > > API >> > > > > > > as-is and >> > > > > > > > > > > > >> add >> > > > > > > > > > > > >>>>>>>> something like `standbyMetadataForKey()` >> (and >> > > > > similar >> > > > > > > methods >> > > > > > > > > > > > >> for >> > > > > > > > > > > > >>>>>>>> other). Having just a flag `isActive()` is >> a >> > > > little >> > > > > > > subtle and >> > > > > > > > > > > > >>>> having >> > > > > > > > > > > > >>>>>>>> new overloads would make the API much >> clearer >> > > > > (passing >> > > > > > > in a >> > > > > > > > > > > > >>> boolean >> > > > > > > > > > > > >>>> flag >> > > > > > > > > > > > >>>>>>>> does not seem to be a nice API). >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 6) Do you intent to return all standby >> > metadata >> > > > > > > information at >> > > > > > > > > > > > >>> once, >> > > > > > > > > > > > >>>>>>>> similar to `allMetadata()` -- seems to be >> > useful. >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 7) Even if the lag information is >> propagated >> > to >> > > > all >> > > > > > > instances, >> > > > > > > > > > > > >> it >> > > > > > > > > > > > >>>> will >> > > > > > > > > > > > >>>>>>>> happen in an async manner. Hence, I am >> > wondering >> > > > if >> > > > > we >> > > > > > > should >> > > > > > > > > > > > >>>> address >> > > > > > > > > > > > >>>>>>>> this race condition (I think we should). >> The >> > idea >> > > > > would >> > > > > > > be to >> > > > > > > > > > > > >>> check >> > > > > > > > > > > > >>>> if a >> > > > > > > > > > > > >>>>>>>> standby/active(restoring) task is actually >> > still >> > > > > within >> > > > > > > the >> > > > > > > > > > lag >> > > > > > > > > > > > >>>> bounds >> > > > > > > > > > > > >>>>>>>> when a query is executed and we would >> throw an >> > > > > > > exception if >> > > > > > > > > > not. >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 8) The current `KafkaStreams#state()` >> method >> > only >> > > > > > > returns a >> > > > > > > > > > > > >> handle >> > > > > > > > > > > > >>>> to >> > > > > > > > > > > > >>>>>>>> stores of active(processing) tasks. How >> can a >> > user >> > > > > > > actually >> > > > > > > > > > get >> > > > > > > > > > > > >> a >> > > > > > > > > > > > >>>> handle >> > > > > > > > > > > > >>>>>>>> to an store of an active(restoring) or >> standby >> > > > task >> > > > > for >> > > > > > > > > > > > >> querying? >> > > > > > > > > > > > >>>> Seems >> > > > > > > > > > > > >>>>>>>> we should add a new method to get standby >> > handles? >> > > > > > > Changing >> > > > > > > > > > the >> > > > > > > > > > > > >>>>>>>> semantics to existing `state()` would be >> > possible, >> > > > > but >> > > > > > > I think >> > > > > > > > > > > > >>>> adding a >> > > > > > > > > > > > >>>>>>>> new method is preferable? >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 9) How does the user actually specify the >> > > > acceptable >> > > > > > > lag? A >> > > > > > > > > > > > >> global >> > > > > > > > > > > > >>>>>>>> config via StreamsConfig (this would be a >> > public >> > > > API >> > > > > > > change >> > > > > > > > > > that >> > > > > > > > > > > > >>>> needs >> > > > > > > > > > > > >>>>>>>> to be covered in the KIP)? Or on a >> per-store >> > or >> > > > even >> > > > > > > per-query >> > > > > > > > > > > > >>>> basis for >> > > > > > > > > > > > >>>>>>>> more flexibility? We could also have a >> global >> > > > > setting >> > > > > > > that is >> > > > > > > > > > > > >> used >> > > > > > > > > > > > >>>> as >> > > > > > > > > > > > >>>>>>>> default and allow to overwrite it on a >> > per-query >> > > > > basis. >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> 10) Do we need to distinguish between >> > > > > active(restoring) >> > > > > > > and >> > > > > > > > > > > > >>> standby >> > > > > > > > > > > > >>>>>>>> tasks? Or could be treat both as the same? >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> -Matthias >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote: >> > > > > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting >> > "acceptable >> > > > > lag" >> > > > > > > into >> > > > > > > > > > the >> > > > > > > > > > > > >>>>>>>>> configuration at all, or even making it a >> > > > > parameter on >> > > > > > > > > > > > >>>>>> `allMetadataForKey`, >> > > > > > > > > > > > >>>>>>>>> why not just _always_ return all available >> > > > metadata >> > > > > > > > > > (including >> > > > > > > > > > > > >>>>>>>>> active/standby or lag) and let the caller >> > decide >> > > > to >> > > > > > > which >> > > > > > > > > > node >> > > > > > > > > > > > >>> they >> > > > > > > > > > > > >>>>>> want to >> > > > > > > > > > > > >>>>>>>>> route the query? >> > > > > > > > > > > > >>>>>>>>> +1 on exposing lag information via the >> APIs. >> > IMO >> > > > > > > without >> > > > > > > > > > having >> > > > > > > > > > > > >>>>>>>>> continuously updated/fresh lag >> information, >> > its >> > > > > true >> > > > > > > value >> > > > > > > > > > as a >> > > > > > > > > > > > >>>> signal >> > > > > > > > > > > > >>>>>> for >> > > > > > > > > > > > >>>>>>>>> query routing decisions is much limited. >> But >> > we >> > > > can >> > > > > > > design >> > > > > > > > > > the >> > > > > > > > > > > > >>> API >> > > > > > > > > > > > >>>>>> around >> > > > > > > > > > > > >>>>>>>>> this model and iterate? Longer term, we >> > should >> > > > have >> > > > > > > > > > > > >> continuously >> > > > > > > > > > > > >>>>>> shared lag >> > > > > > > > > > > > >>>>>>>>> information. >> > > > > > > > > > > > >>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> more general to refactor it to >> > > > > > > "allMetadataForKey(long >> > > > > > > > > > > > >>>>>>>>> tolerableDataStaleness, ...)", and when >> it's >> > set >> > > > > to 0 >> > > > > > > it >> > > > > > > > > > means >> > > > > > > > > > > > >>>> "active >> > > > > > > > > > > > >>>>>> task >> > > > > > > > > > > > >>>>>>>>> only". >> > > > > > > > > > > > >>>>>>>>> +1 IMO if we plan on having >> > > > > `enableReplicaServing`, it >> > > > > > > makes >> > > > > > > > > > > > >>> sense >> > > > > > > > > > > > >>>> to >> > > > > > > > > > > > >>>>>>>>> generalize based on dataStaleness. This >> seems >> > > > > > > complementary >> > > > > > > > > > to >> > > > > > > > > > > > >>>>>> exposing the >> > > > > > > > > > > > >>>>>>>>> lag information itself. >> > > > > > > > > > > > >>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> This is actually not a public api >> change at >> > > > all, >> > > > > and >> > > > > > > I'm >> > > > > > > > > > > > >>>> planning to >> > > > > > > > > > > > >>>>>>>>> implement it asap as a precursor to the >> rest >> > of >> > > > > KIP-441 >> > > > > > > > > > > > >>>>>>>>> +1 again. Do we have a concrete timeline >> for >> > when >> > > > > this >> > > > > > > change >> > > > > > > > > > > > >>> will >> > > > > > > > > > > > >>>>>> land on >> > > > > > > > > > > > >>>>>>>>> master? I would like to get the >> > implementation >> > > > > wrapped >> > > > > > > up (as >> > > > > > > > > > > > >>> much >> > > > > > > > > > > > >>>> as >> > > > > > > > > > > > >>>>>>>>> possible) by end of the month. :). But I >> > agree >> > > > this >> > > > > > > > > > sequencing >> > > > > > > > > > > > >>>> makes >> > > > > > > > > > > > >>>>>>>>> sense.. >> > > > > > > > > > > > >>>>>>>>> >> > > > > > > > > > > > >>>>>>>>> >> > > > > > > > > > > > >>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang >> > Wang < >> > > > > > > > > > > > >>> wangg...@gmail.com> >> > > > > > > > > > > > >>>>>> wrote: >> > > > > > > > > > > > >>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> Hi Navinder, >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> Thanks for the KIP, I have a high level >> > question >> > > > > > > about the >> > > > > > > > > > > > >>>> proposed >> > > > > > > > > > > > >>>>>> API >> > > > > > > > > > > > >>>>>>>>>> regarding: >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> >> > "StreamsMetadataState::allMetadataForKey(boolean >> > > > > > > > > > > > >>>>>> enableReplicaServing...)" >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> I'm wondering if it's more general to >> > refactor >> > > > it >> > > > > to >> > > > > > > > > > > > >>>>>>>>>> "allMetadataForKey(long >> > tolerableDataStaleness, >> > > > > > > ...)", and >> > > > > > > > > > > > >> when >> > > > > > > > > > > > >>>> it's >> > > > > > > > > > > > >>>>>> set to >> > > > > > > > > > > > >>>>>>>>>> 0 it means "active task only". Behind the >> > scene, >> > > > > we >> > > > > > > can have >> > > > > > > > > > > > >> the >> > > > > > > > > > > > >>>>>> committed >> > > > > > > > > > > > >>>>>>>>>> offsets to encode the stream time as >> well, >> > so >> > > > that >> > > > > > > when >> > > > > > > > > > > > >>> processing >> > > > > > > > > > > > >>>>>> standby >> > > > > > > > > > > > >>>>>>>>>> tasks the stream process knows not long >> the >> > lag >> > > > in >> > > > > > > terms of >> > > > > > > > > > > > >>>> offsets >> > > > > > > > > > > > >>>>>>>>>> comparing to the committed offset >> > (internally we >> > > > > call >> > > > > > > it >> > > > > > > > > > > > >> offset >> > > > > > > > > > > > >>>>>> limit), but >> > > > > > > > > > > > >>>>>>>>>> also the lag in terms of timestamp diff >> > > > comparing >> > > > > the >> > > > > > > > > > > > >> committed >> > > > > > > > > > > > >>>>>> offset. >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> Also encoding the timestamp as part of >> > offset >> > > > have >> > > > > > > other >> > > > > > > > > > > > >>> benefits >> > > > > > > > > > > > >>>> for >> > > > > > > > > > > > >>>>>>>>>> improving Kafka Streams time semantics as >> > well, >> > > > > but >> > > > > > > for >> > > > > > > > > > > > >> KIP-535 >> > > > > > > > > > > > >>>>>> itself I >> > > > > > > > > > > > >>>>>>>>>> think it can help giving users a more >> > intuitive >> > > > > > > interface to >> > > > > > > > > > > > >>>> reason >> > > > > > > > > > > > >>>>>> about. >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> Guozhang >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John >> > Roesler < >> > > > > > > > > > > > >>> j...@confluent.io> >> > > > > > > > > > > > >>>>>> wrote: >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> Hey Navinder, >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> Thanks for the KIP! I've been reading >> over >> > the >> > > > > > > discussion >> > > > > > > > > > > > >> thus >> > > > > > > > > > > > >>>> far, >> > > > > > > > > > > > >>>>>>>>>>> and I have a couple of thoughts to pile >> on >> > as >> > > > > well: >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> It seems confusing to propose the API in >> > terms >> > > > > of the >> > > > > > > > > > current >> > > > > > > > > > > > >>>> system >> > > > > > > > > > > > >>>>>>>>>>> state, but also propose how the API >> would >> > look >> > > > > > > if/when >> > > > > > > > > > > > >> KIP-441 >> > > > > > > > > > > > >>> is >> > > > > > > > > > > > >>>>>>>>>>> implemented. It occurs to me that the >> only >> > part >> > > > > of >> > > > > > > KIP-441 >> > > > > > > > > > > > >> that >> > > > > > > > > > > > >>>> would >> > > > > > > > > > > > >>>>>>>>>>> affect you is the availability of the >> lag >> > > > > > > information in >> > > > > > > > > > the >> > > > > > > > > > > > >>>>>>>>>>> SubscriptionInfo message. This is >> actually >> > not >> > > > a >> > > > > > > public api >> > > > > > > > > > > > >>>> change at >> > > > > > > > > > > > >>>>>>>>>>> all, and I'm planning to implement it >> asap >> > as a >> > > > > > > precursor >> > > > > > > > > > to >> > > > > > > > > > > > >>> the >> > > > > > > > > > > > >>>> rest >> > > > > > > > > > > > >>>>>>>>>>> of KIP-441, so maybe you can just build >> on >> > top >> > > > of >> > > > > > > KIP-441 >> > > > > > > > > > and >> > > > > > > > > > > > >>>> assume >> > > > > > > > > > > > >>>>>>>>>>> the lag information will be available. >> > Then you >> > > > > > > could have >> > > > > > > > > > a >> > > > > > > > > > > > >>> more >> > > > > > > > > > > > >>>>>>>>>>> straightforward proposal (e.g., mention >> > that >> > > > > you'd >> > > > > > > return >> > > > > > > > > > the >> > > > > > > > > > > > >>> lag >> > > > > > > > > > > > >>>>>>>>>>> information in AssignmentInfo as well as >> > in the >> > > > > > > > > > > > >> StreamsMetadata >> > > > > > > > > > > > >>>> in >> > > > > > > > > > > > >>>>>>>>>>> some form, or make use of it in the API >> > > > somehow). >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> I'm partially motivated in that former >> > point >> > > > > because >> > > > > > > it >> > > > > > > > > > seems >> > > > > > > > > > > > >>>> like >> > > > > > > > > > > > >>>>>>>>>>> understanding how callers would bound >> the >> > > > > staleness >> > > > > > > for >> > > > > > > > > > their >> > > > > > > > > > > > >>> use >> > > > > > > > > > > > >>>>>> case >> > > > > > > > > > > > >>>>>>>>>>> is _the_ key point for this KIP. FWIW, I >> > think >> > > > > that >> > > > > > > adding >> > > > > > > > > > a >> > > > > > > > > > > > >>> new >> > > > > > > > > > > > >>>>>>>>>>> method to StreamsMetadataState and >> > deprecating >> > > > > the >> > > > > > > existing >> > > > > > > > > > > > >>>> method is >> > > > > > > > > > > > >>>>>>>>>>> the best way to go; we just can't change >> > the >> > > > > return >> > > > > > > types >> > > > > > > > > > of >> > > > > > > > > > > > >>> any >> > > > > > > > > > > > >>>>>>>>>>> existing methods. >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting >> > "acceptable >> > > > > lag" >> > > > > > > into >> > > > > > > > > > the >> > > > > > > > > > > > >>>>>>>>>>> configuration at all, or even making it >> a >> > > > > parameter >> > > > > > > on >> > > > > > > > > > > > >>>>>>>>>>> `allMetadataForKey`, why not just >> _always_ >> > > > > return all >> > > > > > > > > > > > >> available >> > > > > > > > > > > > >>>>>>>>>>> metadata (including active/standby or >> lag) >> > and >> > > > > let >> > > > > > > the >> > > > > > > > > > caller >> > > > > > > > > > > > >>>> decide >> > > > > > > > > > > > >>>>>>>>>>> to which node they want to route the >> query? >> > > > This >> > > > > > > method >> > > > > > > > > > isn't >> > > > > > > > > > > > >>>> making >> > > > > > > > > > > > >>>>>>>>>>> any queries itself; it's merely telling >> you >> > > > where >> > > > > > > the local >> > > > > > > > > > > > >>>> Streams >> > > > > > > > > > > > >>>>>>>>>>> instance _thinks_ the key in question is >> > > > located. >> > > > > > > Just >> > > > > > > > > > > > >>> returning >> > > > > > > > > > > > >>>> all >> > > > > > > > > > > > >>>>>>>>>>> available information lets the caller >> > implement >> > > > > any >> > > > > > > > > > semantics >> > > > > > > > > > > > >>>> they >> > > > > > > > > > > > >>>>>>>>>>> desire around querying only active >> stores, >> > or >> > > > > > > standbys, or >> > > > > > > > > > > > >>>> recovering >> > > > > > > > > > > > >>>>>>>>>>> stores, or whatever. >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> One fly in the ointment, which you may >> > wish to >> > > > > > > consider if >> > > > > > > > > > > > >>>> proposing >> > > > > > > > > > > > >>>>>>>>>>> to use lag information, is that the >> cluster >> > > > would >> > > > > > > only >> > > > > > > > > > become >> > > > > > > > > > > > >>>> aware >> > > > > > > > > > > > >>>>>> of >> > > > > > > > > > > > >>>>>>>>>>> new lag information during rebalances. >> > Even in >> > > > > the >> > > > > > > full >> > > > > > > > > > > > >>>> expression of >> > > > > > > > > > > > >>>>>>>>>>> KIP-441, this information would stop >> being >> > > > > > > propagated when >> > > > > > > > > > > > >> the >> > > > > > > > > > > > >>>>>> cluster >> > > > > > > > > > > > >>>>>>>>>>> achieves a balanced task distribution. >> > There >> > > > was >> > > > > a >> > > > > > > > > > follow-on >> > > > > > > > > > > > >>>> idea I >> > > > > > > > > > > > >>>>>>>>>>> POCed to continuously share lag >> > information in >> > > > > the >> > > > > > > > > > heartbeat >> > > > > > > > > > > > >>>>>> protocol, >> > > > > > > > > > > > >>>>>>>>>>> which you might be interested in, if you >> > want >> > > > to >> > > > > > > make sure >> > > > > > > > > > > > >> that >> > > > > > > > > > > > >>>> nodes >> > > > > > > > > > > > >>>>>>>>>>> are basically _always_ aware of each >> > others' >> > > > lag >> > > > > on >> > > > > > > > > > different >> > > > > > > > > > > > >>>>>>>>>>> partitions: >> > > > > > > https://github.com/apache/kafka/pull/7096 >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> Thanks again! >> > > > > > > > > > > > >>>>>>>>>>> -John >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder >> > Brar >> > > > > > > > > > > > >>>>>>>>>>> <navinder_b...@yahoo.com.invalid> >> wrote: >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> Thanks, Vinoth. Looks like we are on >> the >> > same >> > > > > page. >> > > > > > > I will >> > > > > > > > > > > > >> add >> > > > > > > > > > > > >>>> some >> > > > > > > > > > > > >>>>>> of >> > > > > > > > > > > > >>>>>>>>>>> these explanations to the KIP as well. >> Have >> > > > > assigned >> > > > > > > the >> > > > > > > > > > > > >>>> KAFKA-6144 >> > > > > > > > > > > > >>>>>> to >> > > > > > > > > > > > >>>>>>>>>>> myself and KAFKA-8994 is closed(by >> you). As >> > > > > > > suggested, we >> > > > > > > > > > > > >> will >> > > > > > > > > > > > >>>>>> replace >> > > > > > > > > > > > >>>>>>>>>>> "replica" with "standby". >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> In the new API, >> > > > > > > > > > > > >>> "StreamsMetadataState::allMetadataForKey(boolean >> > > > > > > > > > > > >>>>>>>>>>> enableReplicaServing, String storeName, >> K >> > key, >> > > > > > > > > > Serializer<K> >> > > > > > > > > > > > >>>>>>>>>>> keySerializer)" Do we really need a per >> key >> > > > > > > configuration? >> > > > > > > > > > > > >> or a >> > > > > > > > > > > > >>>> new >> > > > > > > > > > > > >>>>>>>>>>> StreamsConfig is good enough?>> Coming >> from >> > > > > > > experience, >> > > > > > > > > > when >> > > > > > > > > > > > >>>> teams >> > > > > > > > > > > > >>>>>> are >> > > > > > > > > > > > >>>>>>>>>>> building a platform with Kafka Streams >> and >> > > > these >> > > > > > > API's >> > > > > > > > > > serve >> > > > > > > > > > > > >>>> data to >> > > > > > > > > > > > >>>>>>>>>>> multiple teams, we can't have a >> generalized >> > > > > config >> > > > > > > that >> > > > > > > > > > says >> > > > > > > > > > > > >>> as a >> > > > > > > > > > > > >>>>>>>>>> platform >> > > > > > > > > > > > >>>>>>>>>>> we will support stale reads or not. It >> > should >> > > > be >> > > > > the >> > > > > > > choice >> > > > > > > > > > > > >> of >> > > > > > > > > > > > >>>>>> someone >> > > > > > > > > > > > >>>>>>>>>> who >> > > > > > > > > > > > >>>>>>>>>>> is calling the API's to choose whether >> > they are >> > > > > ok >> > > > > > > with >> > > > > > > > > > stale >> > > > > > > > > > > > >>>> reads >> > > > > > > > > > > > >>>>>> or >> > > > > > > > > > > > >>>>>>>>>> not. >> > > > > > > > > > > > >>>>>>>>>>> Makes sense? >> > > > > > > > > > > > >>>>>>>>>>>> On Thursday, 17 October, 2019, >> > 11:56:02 pm >> > > > > IST, >> > > > > > > Vinoth >> > > > > > > > > > > > >>>> Chandar < >> > > > > > > > > > > > >>>>>>>>>>> vchan...@confluent.io> wrote: >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> Looks like we are covering ground :) >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>> Only if it is within a permissible >> > > > range(say >> > > > > > > 10000) we >> > > > > > > > > > > > >> will >> > > > > > > > > > > > >>>> serve >> > > > > > > > > > > > >>>>>>>>>> from >> > > > > > > > > > > > >>>>>>>>>>>> Restoring state of active. >> > > > > > > > > > > > >>>>>>>>>>>> +1 on having a knob like this.. My >> > reasoning >> > > > is >> > > > > as >> > > > > > > > > > follows. >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> Looking at the Streams state as a >> > read-only >> > > > > > > distributed kv >> > > > > > > > > > > > >>>> store. >> > > > > > > > > > > > >>>>>> With >> > > > > > > > > > > > >>>>>>>>>>>> num_standby = f , we should be able to >> > > > tolerate >> > > > > f >> > > > > > > failures >> > > > > > > > > > > > >> and >> > > > > > > > > > > > >>>> if >> > > > > > > > > > > > >>>>>> there >> > > > > > > > > > > > >>>>>>>>>>> is >> > > > > > > > > > > > >>>>>>>>>>>> a f+1' failure, the system should be >> > > > > unavailable. >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> A) So with num_standby=0, the system >> > should be >> > > > > > > unavailable >> > > > > > > > > > > > >>> even >> > > > > > > > > > > > >>>> if >> > > > > > > > > > > > >>>>>>>>>> there >> > > > > > > > > > > > >>>>>>>>>>> is >> > > > > > > > > > > > >>>>>>>>>>>> 1 failure and thats my argument for not >> > > > allowing >> > > > > > > querying >> > > > > > > > > > in >> > > > > > > > > > > > >>>>>>>>>> restoration >> > > > > > > > > > > > >>>>>>>>>>>> state, esp in this case it will be a >> total >> > > > > rebuild >> > > > > > > of the >> > > > > > > > > > > > >>> state >> > > > > > > > > > > > >>>>>> (which >> > > > > > > > > > > > >>>>>>>>>>> IMO >> > > > > > > > > > > > >>>>>>>>>>>> cannot be considered a normal fault >> free >> > > > > operational >> > > > > > > > > > state). >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> B) Even there are standby's, say >> > > > num_standby=2, >> > > > > if >> > > > > > > the >> > > > > > > > > > user >> > > > > > > > > > > > >>>> decides >> > > > > > > > > > > > >>>>>> to >> > > > > > > > > > > > >>>>>>>>>>> shut >> > > > > > > > > > > > >>>>>>>>>>>> down all 3 instances, then only outcome >> > should >> > > > > be >> > > > > > > > > > > > >>> unavailability >> > > > > > > > > > > > >>>>>> until >> > > > > > > > > > > > >>>>>>>>>>> all >> > > > > > > > > > > > >>>>>>>>>>>> of them come back or state is rebuilt >> on >> > other >> > > > > > > nodes in >> > > > > > > > > > the >> > > > > > > > > > > > >>>>>> cluster. In >> > > > > > > > > > > > >>>>>>>>>>>> normal operations, f <= 2 and when a >> > failure >> > > > > does >> > > > > > > happen >> > > > > > > > > > we >> > > > > > > > > > > > >>> can >> > > > > > > > > > > > >>>> then >> > > > > > > > > > > > >>>>>>>>>>> either >> > > > > > > > > > > > >>>>>>>>>>>> choose to be C over A and fail IQs >> until >> > > > > > > replication is >> > > > > > > > > > > > >> fully >> > > > > > > > > > > > >>>>>> caught up >> > > > > > > > > > > > >>>>>>>>>>> or >> > > > > > > > > > > > >>>>>>>>>>>> choose A over C by serving in restoring >> > state >> > > > as >> > > > > > > long as >> > > > > > > > > > lag >> > > > > > > > > > > > >>> is >> > > > > > > > > > > > >>>>>>>>>> minimal. >> > > > > > > > > > > > >>>>>>>>>>> If >> > > > > > > > > > > > >>>>>>>>>>>> even with f=1 say, all the standbys are >> > > > lagging >> > > > > a >> > > > > > > lot due >> > > > > > > > > > to >> > > > > > > > > > > > >>>> some >> > > > > > > > > > > > >>>>>>>>>> issue, >> > > > > > > > > > > > >>>>>>>>>>>> then that should be considered a >> failure >> > since >> > > > > that >> > > > > > > is >> > > > > > > > > > > > >>> different >> > > > > > > > > > > > >>>>>> from >> > > > > > > > > > > > >>>>>>>>>>>> normal/expected operational mode. >> Serving >> > > > reads >> > > > > with >> > > > > > > > > > > > >> unbounded >> > > > > > > > > > > > >>>>>>>>>>> replication >> > > > > > > > > > > > >>>>>>>>>>>> lag and calling it "available" may not >> be >> > very >> > > > > > > usable or >> > > > > > > > > > > > >> even >> > > > > > > > > > > > >>>>>> desirable >> > > > > > > > > > > > >>>>>>>>>>> :) >> > > > > > > > > > > > >>>>>>>>>>>> IMHO, since it gives the user no way to >> > reason >> > > > > > > about the >> > > > > > > > > > app >> > > > > > > > > > > > >>>> that is >> > > > > > > > > > > > >>>>>>>>>>> going >> > > > > > > > > > > > >>>>>>>>>>>> to query this store. >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> So there is definitely a need to >> > distinguish >> > > > > > > between : >> > > > > > > > > > > > >>>> Replication >> > > > > > > > > > > > >>>>>>>>>>> catchup >> > > > > > > > > > > > >>>>>>>>>>>> while being in fault free state vs >> > Restoration >> > > > > of >> > > > > > > state >> > > > > > > > > > when >> > > > > > > > > > > > >>> we >> > > > > > > > > > > > >>>> lose >> > > > > > > > > > > > >>>>>>>>>> more >> > > > > > > > > > > > >>>>>>>>>>>> than f standbys. This knob is a great >> > starting >> > > > > point >> > > > > > > > > > towards >> > > > > > > > > > > > >>>> this. >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> If you agree with some of the >> explanation >> > > > above, >> > > > > > > please >> > > > > > > > > > feel >> > > > > > > > > > > > >>>> free to >> > > > > > > > > > > > >>>>>>>>>>>> include it in the KIP as well since >> this >> > is >> > > > > sort of >> > > > > > > our >> > > > > > > > > > > > >> design >> > > > > > > > > > > > >>>>>>>>>> principle >> > > > > > > > > > > > >>>>>>>>>>>> here.. >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> Small nits : >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> - let's standardize on "standby" >> instead >> > of >> > > > > > > "replica", KIP >> > > > > > > > > > > > >> or >> > > > > > > > > > > > >>>>>> code, to >> > > > > > > > > > > > >>>>>>>>>>> be >> > > > > > > > > > > > >>>>>>>>>>>> consistent with rest of Streams >> code/docs? >> > > > > > > > > > > > >>>>>>>>>>>> - Can we merge KAFKA-8994 into >> KAFKA-6144 >> > now >> > > > > and >> > > > > > > close >> > > > > > > > > > the >> > > > > > > > > > > > >>>> former? >> > > > > > > > > > > > >>>>>>>>>>>> Eventually need to consolidate >> KAFKA-6555 >> > as >> > > > > well >> > > > > > > > > > > > >>>>>>>>>>>> - In the new API, >> > > > > > > > > > > > >>>> >> "StreamsMetadataState::allMetadataForKey(boolean >> > > > > > > > > > > > >>>>>>>>>>>> enableReplicaServing, String >> storeName, K >> > key, >> > > > > > > > > > Serializer<K> >> > > > > > > > > > > > >>>>>>>>>>> keySerializer)" Do >> > > > > > > > > > > > >>>>>>>>>>>> we really need a per key configuration? >> > or a >> > > > new >> > > > > > > > > > > > >> StreamsConfig >> > > > > > > > > > > > >>>> is >> > > > > > > > > > > > >>>>>> good >> > > > > > > > > > > > >>>>>>>>>>>> enough? >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM >> Navinder >> > Brar >> > > > > > > > > > > > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> >> wrote: >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> @Vinoth, I have incorporated a few of >> the >> > > > > > > discussions we >> > > > > > > > > > > > >> have >> > > > > > > > > > > > >>>> had >> > > > > > > > > > > > >>>>>> in >> > > > > > > > > > > > >>>>>>>>>>> the >> > > > > > > > > > > > >>>>>>>>>>>>> KIP. >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> In the current code, t0 and t1 serve >> > queries >> > > > > from >> > > > > > > > > > > > >>>> Active(Running) >> > > > > > > > > > > > >>>>>>>>>>>>> partition. For case t2, we are >> planning >> > to >> > > > > return >> > > > > > > > > > > > >>>>>>>>>> List<StreamsMetadata> >> > > > > > > > > > > > >>>>>>>>>>>>> such that it returns >> <StreamsMetadata(A), >> > > > > > > > > > > > >> StreamsMetadata(B)> >> > > > > > > > > > > > >>>> so >> > > > > > > > > > > > >>>>>> that >> > > > > > > > > > > > >>>>>>>>>>> if IQ >> > > > > > > > > > > > >>>>>>>>>>>>> fails on A, the replica on B can serve >> > the >> > > > > data by >> > > > > > > > > > enabling >> > > > > > > > > > > > >>>> serving >> > > > > > > > > > > > >>>>>>>>>>> from >> > > > > > > > > > > > >>>>>>>>>>>>> replicas. This still does not solve >> case >> > t3 >> > > > > and t4 >> > > > > > > since >> > > > > > > > > > B >> > > > > > > > > > > > >>> has >> > > > > > > > > > > > >>>> been >> > > > > > > > > > > > >>>>>>>>>>>>> promoted to active but it is in >> Restoring >> > > > > state to >> > > > > > > > > > catchup >> > > > > > > > > > > > >>>> till A’s >> > > > > > > > > > > > >>>>>>>>>>> last >> > > > > > > > > > > > >>>>>>>>>>>>> committed position as we don’t serve >> from >> > > > > > > Restoring state >> > > > > > > > > > > > >> in >> > > > > > > > > > > > >>>> Active >> > > > > > > > > > > > >>>>>>>>>>> and new >> > > > > > > > > > > > >>>>>>>>>>>>> Replica on R is building itself from >> > scratch. >> > > > > Both >> > > > > > > these >> > > > > > > > > > > > >>> cases >> > > > > > > > > > > > >>>> can >> > > > > > > > > > > > >>>>>> be >> > > > > > > > > > > > >>>>>>>>>>>>> solved if we start serving from >> Restoring >> > > > > state of >> > > > > > > active >> > > > > > > > > > > > >> as >> > > > > > > > > > > > >>>> well >> > > > > > > > > > > > >>>>>>>>>>> since it >> > > > > > > > > > > > >>>>>>>>>>>>> is almost equivalent to previous >> Active. >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> There could be a case where all >> replicas >> > of a >> > > > > > > partition >> > > > > > > > > > > > >>> become >> > > > > > > > > > > > >>>>>>>>>>> unavailable >> > > > > > > > > > > > >>>>>>>>>>>>> and active and all replicas of that >> > partition >> > > > > are >> > > > > > > > > > building >> > > > > > > > > > > > >>>>>> themselves >> > > > > > > > > > > > >>>>>>>>>>> from >> > > > > > > > > > > > >>>>>>>>>>>>> scratch, in this case, the state in >> > Active is >> > > > > far >> > > > > > > behind >> > > > > > > > > > > > >> even >> > > > > > > > > > > > >>>>>> though >> > > > > > > > > > > > >>>>>>>>>>> it is >> > > > > > > > > > > > >>>>>>>>>>>>> in Restoring state. To cater to such >> > cases >> > > > > that we >> > > > > > > don’t >> > > > > > > > > > > > >>> serve >> > > > > > > > > > > > >>>> from >> > > > > > > > > > > > >>>>>>>>>>> this >> > > > > > > > > > > > >>>>>>>>>>>>> state we can either add another state >> > before >> > > > > > > Restoring or >> > > > > > > > > > > > >>>> check the >> > > > > > > > > > > > >>>>>>>>>>>>> difference between last committed >> offset >> > and >> > > > > > > current >> > > > > > > > > > > > >>> position. >> > > > > > > > > > > > >>>> Only >> > > > > > > > > > > > >>>>>>>>>> if >> > > > > > > > > > > > >>>>>>>>>>> it >> > > > > > > > > > > > >>>>>>>>>>>>> is within a permissible range (say >> > 10000) we >> > > > > will >> > > > > > > serve >> > > > > > > > > > > > >> from >> > > > > > > > > > > > >>>>>>>>>> Restoring >> > > > > > > > > > > > >>>>>>>>>>> the >> > > > > > > > > > > > >>>>>>>>>>>>> state of Active. >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> On Wednesday, 16 October, 2019, >> > 10:01:35 >> > > > pm >> > > > > IST, >> > > > > > > > > > Vinoth >> > > > > > > > > > > > >>>> Chandar >> > > > > > > > > > > > >>>>>> < >> > > > > > > > > > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote: >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> Thanks for the updates on the KIP, >> > Navinder! >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> Few comments >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> - AssignmentInfo is not public API?. >> But >> > we >> > > > > will >> > > > > > > change >> > > > > > > > > > it >> > > > > > > > > > > > >>> and >> > > > > > > > > > > > >>>> thus >> > > > > > > > > > > > >>>>>>>>>>> need to >> > > > > > > > > > > > >>>>>>>>>>>>> increment the version and test for >> > > > > version_probing >> > > > > > > etc. >> > > > > > > > > > > > >> Good >> > > > > > > > > > > > >>> to >> > > > > > > > > > > > >>>>>>>>>>> separate >> > > > > > > > > > > > >>>>>>>>>>>>> that from StreamsMetadata changes >> (which >> > is >> > > > > public >> > > > > > > API) >> > > > > > > > > > > > >>>>>>>>>>>>> - From what I see, there is going to >> be >> > > > choice >> > > > > > > between >> > > > > > > > > > the >> > > > > > > > > > > > >>>>>> following >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> A) introducing a new >> > > > > > > *KafkaStreams::allMetadataForKey() >> > > > > > > > > > > > >> *API >> > > > > > > > > > > > >>>> that >> > > > > > > > > > > > >>>>>>>>>>>>> potentially returns >> List<StreamsMetadata> >> > > > > ordered >> > > > > > > from >> > > > > > > > > > most >> > > > > > > > > > > > >>>> upto >> > > > > > > > > > > > >>>>>> date >> > > > > > > > > > > > >>>>>>>>>>> to >> > > > > > > > > > > > >>>>>>>>>>>>> least upto date replicas. Today we >> cannot >> > > > fully >> > > > > > > implement >> > > > > > > > > > > > >>> this >> > > > > > > > > > > > >>>>>>>>>>> ordering, >> > > > > > > > > > > > >>>>>>>>>>>>> since all we know is which hosts are >> > active >> > > > and >> > > > > > > which are >> > > > > > > > > > > > >>>> standbys. >> > > > > > > > > > > > >>>>>>>>>>>>> However, this aligns well with the >> > future. >> > > > > KIP-441 >> > > > > > > adds >> > > > > > > > > > the >> > > > > > > > > > > > >>> lag >> > > > > > > > > > > > >>>>>>>>>>> information >> > > > > > > > > > > > >>>>>>>>>>>>> to the rebalancing protocol. We could >> > also >> > > > sort >> > > > > > > replicas >> > > > > > > > > > > > >>> based >> > > > > > > > > > > > >>>> on >> > > > > > > > > > > > >>>>>> the >> > > > > > > > > > > > >>>>>>>>>>>>> report lags eventually. This is fully >> > > > backwards >> > > > > > > > > > compatible >> > > > > > > > > > > > >>> with >> > > > > > > > > > > > >>>>>>>>>>> existing >> > > > > > > > > > > > >>>>>>>>>>>>> clients. Only drawback I see is the >> > naming of >> > > > > the >> > > > > > > > > > existing >> > > > > > > > > > > > >>>> method >> > > > > > > > > > > > >>>>>>>>>>>>> KafkaStreams::metadataForKey, not >> > conveying >> > > > the >> > > > > > > > > > distinction >> > > > > > > > > > > > >>>> that it >> > > > > > > > > > > > >>>>>>>>>>> simply >> > > > > > > > > > > > >>>>>>>>>>>>> returns the active replica i.e >> > > > > > > allMetadataForKey.get(0). >> > > > > > > > > > > > >>>>>>>>>>>>> B) Change >> > KafkaStreams::metadataForKey() to >> > > > > > > return a >> > > > > > > > > > List. >> > > > > > > > > > > > >>>> Its a >> > > > > > > > > > > > >>>>>>>>>>> breaking >> > > > > > > > > > > > >>>>>>>>>>>>> change. >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> I prefer A, since none of the >> > > > > semantics/behavior >> > > > > > > changes >> > > > > > > > > > > > >> for >> > > > > > > > > > > > >>>>>> existing >> > > > > > > > > > > > >>>>>>>>>>>>> users. Love to hear more thoughts. >> Can we >> > > > also >> > > > > > > work this >> > > > > > > > > > > > >> into >> > > > > > > > > > > > >>>> the >> > > > > > > > > > > > >>>>>>>>>> KIP? >> > > > > > > > > > > > >>>>>>>>>>>>> I already implemented A to unblock >> > myself for >> > > > > now. >> > > > > > > Seems >> > > > > > > > > > > > >>>> feasible >> > > > > > > > > > > > >>>>>> to >> > > > > > > > > > > > >>>>>>>>>>> do. >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM >> Vinoth >> > > > > Chandar < >> > > > > > > > > > > > >>>>>>>>>> vchan...@confluent.io >> > > > > > > > > > > > >>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> wrote: >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> I get your point. But suppose there >> > is a >> > > > > > > replica which >> > > > > > > > > > > > >> has >> > > > > > > > > > > > >>>> just >> > > > > > > > > > > > >>>>>>>>>>> become >> > > > > > > > > > > > >>>>>>>>>>>>>> active, so in that case replica will >> > still >> > > > be >> > > > > > > building >> > > > > > > > > > > > >>> itself >> > > > > > > > > > > > >>>> from >> > > > > > > > > > > > >>>>>>>>>>>>> scratch >> > > > > > > > > > > > >>>>>>>>>>>>>> and this active will go to restoring >> > state >> > > > > till it >> > > > > > > > > > catches >> > > > > > > > > > > > >>> up >> > > > > > > > > > > > >>>> with >> > > > > > > > > > > > >>>>>>>>>>>>> previous >> > > > > > > > > > > > >>>>>>>>>>>>>> active, wouldn't serving from a >> > restoring >> > > > > active >> > > > > > > make >> > > > > > > > > > more >> > > > > > > > > > > > >>>> sense >> > > > > > > > > > > > >>>>>>>>>>> than a >> > > > > > > > > > > > >>>>>>>>>>>>>> replica in such case. >> > > > > > > > > > > > >>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>> KIP-441 will change this behavior >> such >> > that >> > > > > > > promotion to >> > > > > > > > > > > > >>>> active >> > > > > > > > > > > > >>>>>>>>>>> happens >> > > > > > > > > > > > >>>>>>>>>>>>>> based on how caught up a replica is. >> So, >> > > > once >> > > > > we >> > > > > > > have >> > > > > > > > > > that >> > > > > > > > > > > > >>>> (work >> > > > > > > > > > > > >>>>>>>>>>> underway >> > > > > > > > > > > > >>>>>>>>>>>>>> already for 2.5 IIUC) and user sets >> > > > > > > > > > num.standby.replicas > >> > > > > > > > > > > > >>> 0, >> > > > > > > > > > > > >>>> then >> > > > > > > > > > > > >>>>>>>>>>> the >> > > > > > > > > > > > >>>>>>>>>>>>>> staleness window should not be that >> > long as >> > > > > you >> > > > > > > > > > describe. >> > > > > > > > > > > > >>> IMO >> > > > > > > > > > > > >>>> if >> > > > > > > > > > > > >>>>>>>>>> user >> > > > > > > > > > > > >>>>>>>>>>>>> wants >> > > > > > > > > > > > >>>>>>>>>>>>>> availability for state, then should >> > > > configure >> > > > > > > > > > > > >>>> num.standby.replicas >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> 0. >> > > > > > > > > > > > >>>>>>>>>>>>> If >> > > > > > > > > > > > >>>>>>>>>>>>>> not, then on a node loss, few >> partitions >> > > > > would be >> > > > > > > > > > > > >>> unavailable >> > > > > > > > > > > > >>>> for >> > > > > > > > > > > > >>>>>> a >> > > > > > > > > > > > >>>>>>>>>>> while >> > > > > > > > > > > > >>>>>>>>>>>>>> (there are other ways to bring this >> > window >> > > > > down, >> > > > > > > which I >> > > > > > > > > > > > >>> won't >> > > > > > > > > > > > >>>>>>>>>> bring >> > > > > > > > > > > > >>>>>>>>>>> in >> > > > > > > > > > > > >>>>>>>>>>>>>> here). We could argue for querying a >> > > > restoring >> > > > > > > active >> > > > > > > > > > > > >> (say a >> > > > > > > > > > > > >>>> new >> > > > > > > > > > > > >>>>>>>>>> node >> > > > > > > > > > > > >>>>>>>>>>>>> added >> > > > > > > > > > > > >>>>>>>>>>>>>> to replace a faulty old node) based >> on >> > AP vs >> > > > > CP >> > > > > > > > > > > > >> principles. >> > > > > > > > > > > > >>>> But >> > > > > > > > > > > > >>>>>> not >> > > > > > > > > > > > >>>>>>>>>>> sure >> > > > > > > > > > > > >>>>>>>>>>>>>> reading really really old values for >> the >> > > > sake >> > > > > of >> > > > > > > > > > > > >>> availability >> > > > > > > > > > > > >>>> is >> > > > > > > > > > > > >>>>>>>>>>> useful. >> > > > > > > > > > > > >>>>>>>>>>>>> No >> > > > > > > > > > > > >>>>>>>>>>>>>> AP data system would be inconsistent >> for >> > > > such >> > > > > a >> > > > > > > long >> > > > > > > > > > time >> > > > > > > > > > > > >> in >> > > > > > > > > > > > >>>>>>>>>>> practice. >> > > > > > > > > > > > >>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>> So, I still feel just limiting this >> to >> > > > standby >> > > > > > > reads >> > > > > > > > > > > > >>> provides >> > > > > > > > > > > > >>>> best >> > > > > > > > > > > > >>>>>>>>>>>>>> semantics. >> > > > > > > > > > > > >>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>> Just my 2c. Would love to see what >> > others >> > > > > think >> > > > > > > as well. >> > > > > > > > > > > > >>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM >> Navinder >> > > > Brar >> > > > > > > > > > > > >>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> >> > wrote: >> > > > > > > > > > > > >>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Hi Vinoth, >> > > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the feedback. >> > > > > > > > > > > > >>>>>>>>>>>>>>> Can we link the JIRA, discussion >> > thread >> > > > > also to >> > > > > > > the >> > > > > > > > > > > > >> KIP.>> >> > > > > > > > > > > > >>>>>> Added. >> > > > > > > > > > > > >>>>>>>>>>>>>>> Based on the discussion on >> KAFKA-6144, >> > I >> > > > was >> > > > > > > under the >> > > > > > > > > > > > >>>> impression >> > > > > > > > > > > > >>>>>>>>>>> that >> > > > > > > > > > > > >>>>>>>>>>>>>>> this KIP is also going to cover >> > exposing of >> > > > > the >> > > > > > > standby >> > > > > > > > > > > > >>>>>>>>>> information >> > > > > > > > > > > > >>>>>>>>>>> in >> > > > > > > > > > > > >>>>>>>>>>>>>>> StreamsMetadata and thus subsume >> > > > KAFKA-8994 . >> > > > > > > That >> > > > > > > > > > would >> > > > > > > > > > > > >>>> require >> > > > > > > > > > > > >>>>>> a >> > > > > > > > > > > > >>>>>>>>>>>>> public >> > > > > > > > > > > > >>>>>>>>>>>>>>> API change?>> Sure, I can add >> changes >> > for >> > > > > 8994 >> > > > > > > in this >> > > > > > > > > > > > >> KIP >> > > > > > > > > > > > >>>> and >> > > > > > > > > > > > >>>>>>>>>> link >> > > > > > > > > > > > >>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well. >> > > > > > > > > > > > >>>>>>>>>>>>>>> KIP seems to be focussing on >> > restoration >> > > > > when a >> > > > > > > new >> > > > > > > > > > node >> > > > > > > > > > > > >>> is >> > > > > > > > > > > > >>>>>>>>>> added. >> > > > > > > > > > > > >>>>>>>>>>>>>>> KIP-441 is underway and has some >> major >> > > > > changes >> > > > > > > proposed >> > > > > > > > > > > > >> for >> > > > > > > > > > > > >>>> this. >> > > > > > > > > > > > >>>>>>>>>> It >> > > > > > > > > > > > >>>>>>>>>>>>> would >> > > > > > > > > > > > >>>>>>>>>>>>>>> be good to clarify dependencies if >> any. >> > > > > Without >> > > > > > > > > > KIP-441, >> > > > > > > > > > > > >> I >> > > > > > > > > > > > >>>> am not >> > > > > > > > > > > > >>>>>>>>>>> very >> > > > > > > > > > > > >>>>>>>>>>>>> sure >> > > > > > > > > > > > >>>>>>>>>>>>>>> if we should allow reads from nodes >> in >> > > > > RESTORING >> > > > > > > state, >> > > > > > > > > > > > >>> which >> > > > > > > > > > > > >>>>>>>>>> could >> > > > > > > > > > > > >>>>>>>>>>>>> amount >> > > > > > > > > > > > >>>>>>>>>>>>>>> to many minutes/few hours of stale >> > reads? >> > > > > This >> > > > > > > is >> > > > > > > > > > > > >>> different >> > > > > > > > > > > > >>>> from >> > > > > > > > > > > > >>>>>>>>>>>>> allowing >> > > > > > > > > > > > >>>>>>>>>>>>>>> querying standby replicas, which >> could >> > be >> > > > > mostly >> > > > > > > caught >> > > > > > > > > > > > >> up >> > > > > > > > > > > > >>>> and >> > > > > > > > > > > > >>>>>> the >> > > > > > > > > > > > >>>>>>>>>>>>>>> staleness window could be much >> > > > > > > smaller/tolerable. (once >> > > > > > > > > > > > >>>> again the >> > > > > > > > > > > > >>>>>>>>>>> focus >> > > > > > > > > > > > >>>>>>>>>>>>> on >> > > > > > > > > > > > >>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But >> > > > suppose >> > > > > > > there is a >> > > > > > > > > > > > >>>> replica >> > > > > > > > > > > > >>>>>>>>>>> which >> > > > > > > > > > > > >>>>>>>>>>>>> has >> > > > > > > > > > > > >>>>>>>>>>>>>>> just become active, so in that case >> > replica >> > > > > will >> > > > > > > still >> > > > > > > > > > be >> > > > > > > > > > > > >>>>>> building >> > > > > > > > > > > > >>>>>>>>>>>>> itself >> > > > > > > > > > > > >>>>>>>>>>>>>>> from scratch and this active will >> go to >> > > > > > > restoring state >> > > > > > > > > > > > >>> till >> > > > > > > > > > > > >>>> it >> > > > > > > > > > > > >>>>>>>>>>> catches >> > > > > > > > > > > > >>>>>>>>>>>>> up >> > > > > > > > > > > > >>>>>>>>>>>>>>> with previous active, wouldn't >> serving >> > > > from a >> > > > > > > restoring >> > > > > > > > > > > > >>>> active >> > > > > > > > > > > > >>>>>>>>>> make >> > > > > > > > > > > > >>>>>>>>>>> more >> > > > > > > > > > > > >>>>>>>>>>>>>>> sense than a replica in such case. >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Finally, we may need to introduce a >> > > > > > > configuration to >> > > > > > > > > > > > >>> control >> > > > > > > > > > > > >>>>>> this. >> > > > > > > > > > > > >>>>>>>>>>> Some >> > > > > > > > > > > > >>>>>>>>>>>>>>> users may prefer errors to stale >> data. >> > Can >> > > > we >> > > > > > > also add >> > > > > > > > > > it >> > > > > > > > > > > > >>> to >> > > > > > > > > > > > >>>> the >> > > > > > > > > > > > >>>>>>>>>>> KIP?>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Will add this. >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Regards, >> > > > > > > > > > > > >>>>>>>>>>>>>>> Navinder >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth >> Chandar < >> > > > > > > > > > v...@confluent.io >> > > > > > > > > > > > >>>>> wrote: >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Hi Navinder,> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few >> > thoughts> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Can we link the JIRA, discussion >> > thread >> > > > > also >> > > > > > > to the >> > > > > > > > > > > > >> KIP> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Based on the discussion on >> > KAFKA-6144, I >> > > > > was >> > > > > > > under >> > > > > > > > > > the >> > > > > > > > > > > > >>>>>>>>>> impression >> > > > > > > > > > > > >>>>>>>>>>>>>>> that> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> this KIP is also going to cover >> > exposing >> > > > of >> > > > > the >> > > > > > > > > > standby >> > > > > > > > > > > > >>>>>>>>>>> information in> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> StreamsMetadata and thus subsume >> > > > KAFKA-8994 >> > > > > . >> > > > > > > That >> > > > > > > > > > would >> > > > > > > > > > > > >>>> require >> > > > > > > > > > > > >>>>>>>>>> a >> > > > > > > > > > > > >>>>>>>>>>>>>>> public> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> API change?> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - KIP seems to be focussing on >> > restoration >> > > > > when >> > > > > > > a new >> > > > > > > > > > > > >> node >> > > > > > > > > > > > >>>> is >> > > > > > > > > > > > >>>>>>>>>>> added.> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> KIP-441 is underway and has some >> major >> > > > > changes >> > > > > > > > > > proposed >> > > > > > > > > > > > >>> for >> > > > > > > > > > > > >>>>>> this. >> > > > > > > > > > > > >>>>>>>>>>> It >> > > > > > > > > > > > >>>>>>>>>>>>>>> would> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> be good to clarify dependencies if >> > any. >> > > > > Without >> > > > > > > > > > > > >> KIP-441, I >> > > > > > > > > > > > >>>> am >> > > > > > > > > > > > >>>>>> not >> > > > > > > > > > > > >>>>>>>>>>> very >> > > > > > > > > > > > >>>>>>>>>>>>>>> sure> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> if we should allow reads from >> nodes in >> > > > > RESTORING >> > > > > > > > > > state, >> > > > > > > > > > > > >>>> which >> > > > > > > > > > > > >>>>>>>>>> could >> > > > > > > > > > > > >>>>>>>>>>>>>>> amount> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> to many minutes/few hours of stale >> > reads? >> > > > > This >> > > > > > > is >> > > > > > > > > > > > >>> different >> > > > > > > > > > > > >>>>>>>>>>>>>>> fromallowing> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> querying standby replicas, which >> > could be >> > > > > mostly >> > > > > > > > > > caught >> > > > > > > > > > > > >> up >> > > > > > > > > > > > >>>> and >> > > > > > > > > > > > >>>>>>>>>> the> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> staleness window could be much >> > > > > > > smaller/tolerable. >> > > > > > > > > > (once >> > > > > > > > > > > > >>>> again >> > > > > > > > > > > > >>>>>> the >> > > > > > > > > > > > >>>>>>>>>>> focus >> > > > > > > > > > > > >>>>>>>>>>>>>>> on> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> KAFKA-8994)> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Finally, we may need to >> introduce a >> > > > > > > configuration to >> > > > > > > > > > > > >>>> control >> > > > > > > > > > > > >>>>>>>>>>> this. >> > > > > > > > > > > > >>>>>>>>>>>>>>> Some> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> users may prefer errors to stale >> > data. Can >> > > > > we >> > > > > > > also add >> > > > > > > > > > > > >> it >> > > > > > > > > > > > >>>> to the >> > > > > > > > > > > > >>>>>>>>>>> KIP?> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Vinoth> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM >> > Navinder >> > > > > Brar> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi,> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Starting a discussion on the KIP >> to >> > Allow >> > > > > state >> > > > > > > > > > stores >> > > > > > > > > > > > >> to >> > > > > > > > > > > > >>>> serve >> > > > > > > > > > > > >>>>>>>>>>>>> stale> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> reads during rebalance(> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> ).> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks & Regards,Navinder> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> LinkedIn> >> > > > > > > > > > > > >>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>>> -- >> > > > > > > > > > > > >>>>>>>>>> -- Guozhang >> > > > > > > > > > > > >>>>>>>>>> >> > > > > > > > > > > > >>>>>>>>> >> > > > > > > > > > > > >>>>>>>> >> > > > > > > > > > > > >>>>>>> >> > > > > > > > > > > > >>>>>> >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>>> >> > > > > > > > > > > > >>> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> -- >> > > > > > > > > > > > >> -- Guozhang >> > > > > > > > > > > > >> >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > >> > > > >> > >> > >> >