I agree, that we might want to drop the time-base lag for the initial implementation. There is no good way to get this information without a broker side change.
(100) For the offset lag information, I don't see a reason why the app should drive when this information is updated though, because KS will update this information anyway (once per `commit.interval.ms` -- and updating it more frequently does not make sense, as it most likely won't change more frequently anyway). If you all insist that the app should drive it, I can live with it, but I think it makes the API unnecessarily complex without a benefit. (101) I still don't understand why we need to have `KeyQueryMetadata` though. Note, that an instance can only report lag for it's local stores, but not remote stores as it does not know to what offset a remote standby has caught up to. > Because we needed to return the topicPartition the key belongs to, in > order to correlate with the lag information from the other set of APIs. My suggestion is to get the lag information from `StreamsMetadata` -- which partition the store belongs to can be completely encapsulated within KS as all information is local, and I don't think we need to expose it to the user at all. We can just add `long StreamsMetadata#offsetLag(store, key)`. If the store is local we return its lag, if it's remote we return `-1` (ie, UNKNOWN). As an alternative, we can change the return type to `Optional<Long>`. This works for active and standby task alike. Note, that a user must verify if `StreamsMeatadata` is for itself (local) or remote anyway. We only need to provide a way that allows users to distinguish between active an standby. (More below.) I am actually not even sure, why we added `StreamsMetadata#topicPartitions()` originally -- seems pretty useless. Can we deprecate it as side cleanup in this KIP? Or do I miss something? (102) > There are basically 2 reasons. One is that instead of having two functions, > one to get StreamsMetadata for active and one for replicas. We are fetching > both in a single call and we have a way to get only active or only replicas > from the KeyQueryMetadata object(just like isStandby() and isActive() > discussion we had earlier) I understand, that we need two methods. However, I think we can simplify the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4 existing methods for standby tasks: // note that `standbyMetadataForKey` return a Collection in contrast to existing `metadataForKey` > Collection<StreamsMetadata> allStandbyMetadata() > Collection<StreamsMetadata> allStandbyMetadataForStore(String storeName) > Collection<StreamsMetadata> metadataForKey(String storeName, K key, > Serializer<K> keySerializer) > Collection<StreamsMetadata> metadataForKey(String storeName, K key, > StreamPartitioner<? super K, ?> partitioner) Because the existing methods return all active metadata, there is no reason to return `KeyQueryMetadata` as it's more complicated to get the standby metadata. With `KeyQueryMetadata` the user needs to make more calls to get the metadata: KafkaStreams#allMetadataForKey() #getActive() KafkaStreams#allMetadataForKey() #getStandby() vs: KafkaStreams#metadataForKey() KafkaStreams#standbyMetadataForKey() The wrapping of both within `KeyQueryMetadata` does not seem to provide any benefit but increase our public API surface. @Guozhang: (1.1. + 1.2.) From my understanding `allMetadata()` (and other existing methods) will only return the metadata of _active_ tasks for backward compatibility reasons. If we would return standby metadata, existing code would potentially "break" because the code might pick a standby to query a key without noticing. -Matthias On 11/8/19 6:07 AM, Navinder Brar wrote: > Thanks, Guozhang for going through it again. > > - 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata > is not topicName, but the partition number. I agree changelog topicNames and > store names will have 1-1 mapping but we also need the partition number of > the changelog for which are calculating the lag. Now we can add partition > number in StreamsMetadata but it will be orthogonal to the definition of > StreamsMetadata i.e.- “Represents the state of an instance (process) in a > {@link KafkaStreams} application.” If we add partition number in this, it > doesn’t stay metadata for an instance, because now it is storing the > partition information for a key being queried. So, having “KeyQueryMetadata” > simplifies this as now it contains all the metadata and also changelog and > partition information for which we need to calculate the lag. > > Another way is having another function in parallel to metadataForKey, which > returns the partition number for the key being queried. But then we would > need 2 calls to StreamsMetadataState, once to fetch metadata and another to > fetch partition number. Let me know if any of these two ways seem more > intuitive than KeyQueryMetadata then we can try to converge on one. > - 1.3: Again, it is required for the partition number. We can drop store > name though. > - 2.1: I think this was done in accordance with the opinion from John as > time lag would be better implemented with a broker level change and offset > change is readily implementable. @vinoth? > - 2.2.1: Good point. +1 > - 2.2.2: I am not well aware of it, @vinoth any comments? > - 3.1: I think we have already agreed on dropping this, we need to KIP. > Also, is there any opinion on lagInfoForStore(String storeName) vs > lagInfoForStore(String storeName, int partition) > - 3.2: But in functions such as onAssignment(), onPartitionsAssigned(), > for standbyTasks also the topicPartitions we use are input topic partitions > and not changelog partitions. Would this be breaking from that semantics? > > > On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang > <wangg...@gmail.com> wrote: > > Hi Navinder, Vinoth, thanks for the updated KIP! > > Read through the discussions so far and made another pass on the wiki page, > and here are some more comments: > > 1. About the public APIs: > > 1.1. It is not clear to me how allStandbyMetadataForStore > and allStandbyMetadata would be differentiated from the original APIs given > that we will augment StreamsMetadata to include both active and standby > topic-partitions and store names, so I think we can still use allMetadata > and allMetadataForStore to get the collection of instance metadata that > host the store both as active and standbys. Are there any specific use > cases where we ONLY want to get the standby's metadata? And even if there > are, we can easily filter it out from the allMetadata / allMetadataForStore > right? > > 1.2. Similarly I'm wondering for allMetadataForKey, can we return the same > type: "Collection<StreamsMetadata>" which includes 1 for active, and N-1 > for standbys, and callers can easily identify them by looking inside the > StreamsMetadata objects? In addition I feel the "topicPartition" field > inside "KeyQueryMetadata" is not very important since the changelog > topic-name is always 1-1 mapping to the store name, so as long as the store > name matches, the changelog topic name should always match (i.e. in > the pseudo code, just checking store names should be sufficient). If all of > the above assumption is true, I think we can save us from introducing one > more public class here. > > 1.3. Similarly in StoreLagInfo, seems not necessary to include the topic > partition name in addition to the store name. > > 2. About querying store lags: we've discussed about separating the querying > of the lag information and the querying of the host information so I still > support having separate APIs here. More thoughts: > > 2.1. Compared with offsets, I'm wondering would time-difference be more > intuitive for users to define the acceptable "staleness"? More strictly, > are there any scenarios where we would actually prefer offsets over > timestamps except that the timestamps are not available? > > 2.2. I'm also a bit leaning towards not putting the burden of periodically > refreshing our lag and caching it (and introducing another config) on the > streams side but document clearly its cost and let users to consider its > call frequency; of course in terms of implementation there are some > optimizations we can consider: > > 1) for restoring active tasks, the log-end-offset is read once since it is > not expected to change, and that offset / timestamp can be remembered for > lag calculation and we do not need to refresh again; > 2) for standby tasks, there's a "Map<TopicPartition, Long> > endOffsets(Collection<TopicPartition> partitions)" in KafkaConsumer to > batch a list of topic-partitions in one round-trip, and we can use that to > let our APIs be sth. like "lagInfoForStores(Collection<String> storeNames)" > to enable the batching effects. > > 3. Misc.: > > 3.1 There's a typo on the pseudo code "globalLagInforation". Also it seems > not describing how that information is collected (personally I also feel > one "lagInfoForStores" is sufficient). > 3.2 Note there's a slight semantical difference between active and > standby's "partitions" inside StreamsMetadata, for active tasks the > partitions are actually input topic partitions for the task: some of them > may also act as changelog topics but these are exceptional cases; for > standby tasks the "standbyTopicPartitions" are actually the changelog > topics of the task. So maybe renaming it to "standbyChangelogPartitions" to > differentiate it? > > > Overall I think this would be a really good KIP to add to Streams, thank > you so much! > > > Guozhang > > On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar > <navinder_b...@yahoo.com.invalid> wrote: > >> +1 on implementing offset based lag for now and push time-based lag to a >> later point in time when broker changes are done. Although time-based lag >> enhances the readability, it would not be a make or break change for >> implementing this KIP. >> >> Vinoth has explained the role of KeyQueryMetadata, let me in add in my 2 >> cents as well. >> - There are basically 2 reasons. One is that instead of having two >> functions, one to get StreamsMetadata for active and one for replicas. We >> are fetching both in a single call and we have a way to get only active or >> only replicas from the KeyQueryMetadata object(just like isStandby() and >> isActive() discussion we had earlier) >> - Since even after fetching the metadata now we have a requirement of >> fetching the topicPartition for which the query came:- to fetch lag for >> that specific topicPartition. Instead of having another call to fetch the >> partition from StreamsMetadataState we thought using one single call and >> fetching partition and all metadata would be better. >> - Another option was to change StreamsMetadata object and add >> topicPartition in that for which the query came but it doesn’t make sense >> in terms of semantics as it StreamsMetadata. Also, KeyQueryMetadata >> represents all the metadata for the Key being queried, i.e. the partition >> it belongs to and the list of StreamsMetadata(hosts) active or replica >> where the key could be found. >> >> >> >> >> >> On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth Chandar < >> vchan...@confluent.io> wrote: >> >> +1 to John, suggestion on Duration/Instant and dropping the API to fetch >> all store's lags. However, I do think we need to return lags per topic >> partition. So not sure if single return value would work? We need some new >> class that holds a TopicPartition and Duration/Instant variables together? >> >> 10) Because we needed to return the topicPartition the key belongs to, in >> order to correlate with the lag information from the other set of APIs. >> Otherwise, we don't know which topic partition's lag estimate to use. We >> tried to illustrate this on the example code. StreamsMetadata is simply >> capturing state of a streams host/instance, where as TopicPartition depends >> on the key passed in. This is a side effect of our decision to decouple lag >> based filtering on the metadata apis. >> >> 20) Goes back to the previous point. We needed to return information that >> is key specific, at which point it seemed natural for the KeyQueryMetadata >> to contain active, standby, topic partition for that key. If we merely >> returned a standbyMetadataForKey() -> Collection<StreamsMetadata> standby, >> an active metadataForKey() -> StreamsMetadata, and new >> getTopicPartition(key) -> topicPartition object back to the caller, then >> arguably you could do the same kind of correlation. IMO having a the >> KeyQueryMetadata class to encapsulate all this is a friendlier API. >> allStandbyMetadata() and allStandbyMetadataForStore() are just counter >> parts for metadataForStore() and allMetadata() that we introduce mostly for >> consistent API semantics. (their presence implicitly could help denote >> metadataForStore() is for active instances. Happy to drop them if their >> utility is not clear) >> >> 30) This would assume we refresh all the standby lag information every >> time we query for that StreamsMetadata for a specific store? For time based >> lag, this will involve fetching the tail kafka record at once from multiple >> kafka topic partitions? I would prefer not to couple them like this and >> have the ability to make granular store (or even topic partition level) >> fetches for lag information. >> >> 32) I actually prefer John's suggestion to let the application drive the >> lag fetches/updation and not have flags as the KIP current points to. Are >> you reexamining that position? >> >> On fetching lag information, +1 we could do this much more efficiently with >> a broker changes. Given I don't yet have a burning need for the time based >> lag, I think we can sequence the APIs such that the offset based ones are >> implemented first, while we have a broker side change? >> Given we decoupled the offset and time based lag API, I am willing to drop >> the time based lag functionality (since its not needed right away for my >> use-case). @navinder . thoughts? >> >> >> On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Navinder, >>> >>> thanks for updating the KIP. Couple of follow up questions: >>> >>> >>> (10) Why do we need to introduce the class `KeyQueryMetadata`? >>> >>> (20) Why do we introduce the two methods `allMetadataForKey()`? Would it >>> not be simpler to add `Collection<StreamMetatdata> >>> standbyMetadataForKey(...)`. This would align with new methods >>> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? >>> >>> (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just >>> extend `StreamMetadata` with the corresponding attributes and methods >>> (of active task, the lag would always be reported as zero) >>> >>> (32) Via (30) we can avoid the two new methods `#allLagInfo()` and >>> `#lagInfoForStore()`, too, reducing public API and making it simpler to >>> use the feature. >>> >>> Btw: If we make `StreamMetadata` thread safe, the lag information can be >>> updated in the background without the need that the application >>> refreshes its metadata. Hence, the user can get active and/or standby >>> metadata once, and only needs to refresh it, if a rebalance happened. >>> >>> >>> About point (4) of the previous thread: I was also thinking about >>> when/how to update the time-lag information, and I agree that we should >>> not update it for each query. >>> >>> "How": That we need to fetch the last record is a little bit >>> unfortunate, but I don't see any other way without a broker change. One >>> issue I still see is with "exactly-once" -- if transaction markers are >>> in the topic, the last message is not at offset "endOffset - 1" and as >>> multiple transaction markers might be after each other, it's unclear how >>> to identify the offset of the last record... Thoughts? >>> >>> Hence, it might be worth to look into a broker change as a potential >>> future improvement. It might be possible that the broker caches the >>> latest timestamp per partition to serve this data efficiently, similar >>> to `#endOffset()`. >>> >>> "When": We refresh the end-offset information based on the >>> `commit.interval.ms` -- doing it more often is not really useful, as >>> state store caches will most likely buffer up all writes to changelogs >>> anyway and are only flushed on commit (including a flush of the >>> producer). Hence, I would suggest to update the time-lag information >>> based on the same strategy in the background. This way there is no >>> additional config or methods and the user does not need to worry about >>> it at all. >>> >>> To avoid refresh overhead if we don't need it (a user might not use IQ >>> to begin with), it might be worth to maintain an internal flag >>> `updateTimeLagEnabled` that is set to `false` initially and only set to >>> `true` on the first call of a user to get standby-metadata. >>> >>> >>> -Matthias >>> >>> >>> >>> On 11/4/19 5:13 PM, Vinoth Chandar wrote: >>>>>> I'm having some trouble wrapping my head around what race conditions >>>> might occur, other than the fundamentally broken state in which >> different >>>> instances are running totally different topologies. >>>> 3. @both Without the topic partitions that the tasks can map back to, >> we >>>> have to rely on topology/cluster metadata in each Streams instance to >> map >>>> the task back. If the source topics are wild carded for e,g then each >>>> instance could have different source topics in topology, until the next >>>> rebalance happens. You can also read my comments from here >>>> >>> >> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 >>>> >>>> >>>>>> seems hard to imagine how encoding arbitrarily long topic names plus >> an >>>> integer for the partition number could be as efficient as task ids, >> which >>>> are just two integers. >>>> 3. if you still have concerns about the efficacy of dictionary >> encoding, >>>> happy to engage. The link above also has some benchmark code I used. >>>> Theoretically, we would send each topic name atleast once, so yes if >> you >>>> compare a 10-20 character topic name + an integer to two integers, it >>> will >>>> be more bytes. But its constant overhead proportional to size of topic >>> name >>>> and with 4,8,12, partitions the size difference between baseline >>> (version 4 >>>> where we just repeated topic names for each topic partition) and the >> two >>>> approaches becomes narrow. >>>> >>>>>> Plus, Navinder is going to implement a bunch of protocol code that we >>>> might just want to change when the discussion actually does take place, >>> if >>>> ever. >>>>>> it'll just be a mental burden for everyone to remember that we want >> to >>>> have this follow-up discussion. >>>> 3. Is n't people changing same parts of code and tracking follow ups a >>>> common thing, we need to deal with anyway? For this KIP, is n't it >>> enough >>>> to reason about whether the additional map on top of the topic >> dictionary >>>> would incur more overhead than the sending task_ids? I don't think it's >>>> case, both of them send two integers. As I see it, we can do a separate >>>> follow up to (re)pursue the task_id conversion and get it working for >>> both >>>> maps within the next release? >>>> >>>>>> Can you elaborate on "breaking up the API"? It looks like there are >>>> already separate API calls in the proposal, one for time-lag, and >> another >>>> for offset-lag, so are they not already broken up? >>>> The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo >>> objects >>>> which has both time and offset lags. If we had separate APIs, say (e.g >>>> offsetLagForStore(), timeLagForStore()), we can implement offset >> version >>>> using the offset lag that the streams instance already tracks i.e no >> need >>>> for external calls. The time based lag API would incur the kafka read >> for >>>> the timestamp. makes sense? >>>> >>>> Based on the discussions so far, I only see these two pending issues to >>> be >>>> aligned on. Is there any other open item people want to bring up? >>>> >>>> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman < >> sop...@confluent.io >>>> >>>> wrote: >>>> >>>>> Regarding 3) I'm wondering, does your concern still apply even now >>>>> that the pluggable PartitionGrouper interface has been deprecated? >>>>> Now that we can be sure that the DefaultPartitionGrouper is used to >>>>> generate >>>>> the taskId -> partitions mapping, we should be able to convert any >>> taskId >>>>> to any >>>>> partitions. >>>>> >>>>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <j...@confluent.io> >> wrote: >>>>> >>>>>> Hey Vinoth, thanks for the reply! >>>>>> >>>>>> 3. >>>>>> I get that it's not the main focus of this KIP, but if it's ok, it >>>>>> would be nice to hash out this point right now. It only came up >>>>>> because this KIP-535 is substantially extending the pattern in >>>>>> question. If we push it off until later, then the reviewers are going >>>>>> to have to suspend their concerns not just while voting for the KIP, >>>>>> but also while reviewing the code. Plus, Navinder is going to >>>>>> implement a bunch of protocol code that we might just want to change >>>>>> when the discussion actually does take place, if ever. Finally, it'll >>>>>> just be a mental burden for everyone to remember that we want to have >>>>>> this follow-up discussion. >>>>>> >>>>>> It makes sense what you say... the specific assignment is already >>>>>> encoded in the "main" portion of the assignment, not in the >> "userdata" >>>>>> part. It also makes sense that it's simpler to reason about races if >>>>>> you simply get all the information about the topics and partitions >>>>>> directly from the assignor, rather than get the partition number from >>>>>> the assignor and the topic name from your own a priori knowledge of >>>>>> the topology. On the other hand, I'm having some trouble wrapping my >>>>>> head around what race conditions might occur, other than the >>>>>> fundamentally broken state in which different instances are running >>>>>> totally different topologies. Sorry, but can you remind us of the >>>>>> specific condition? >>>>>> >>>>>> To the efficiency counterargument, it seems hard to imagine how >>>>>> encoding arbitrarily long topic names plus an integer for the >>>>>> partition number could be as efficient as task ids, which are just >> two >>>>>> integers. It seems like this would only be true if topic names were 4 >>>>>> characters or less. >>>>>> >>>>>> 4. >>>>>> Yeah, clearly, it would not be a good idea to query the metadata >>>>>> before every single IQ query. I think there are plenty of established >>>>>> patterns for distributed database clients to follow. Can you >> elaborate >>>>>> on "breaking up the API"? It looks like there are already separate >> API >>>>>> calls in the proposal, one for time-lag, and another for offset-lag, >>>>>> so are they not already broken up? FWIW, yes, I agree, the offset lag >>>>>> is already locally known, so we don't need to build in an extra >>>>>> synchronous broker API call, just one for the time-lag. >>>>>> >>>>>> Thanks again for the discussion, >>>>>> -John >>>>>> >>>>>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar < >> vchan...@confluent.io> >>>>>> wrote: >>>>>>> >>>>>>> 3. Right now, we still get the topic partitions assigned as a part >> of >>>>> the >>>>>>> top level Assignment object (the one that wraps AssignmentInfo) and >>> use >>>>>>> that to convert taskIds back. This list of only contains assignments >>>>> for >>>>>>> that particular instance. Attempting to also reverse map for "all" >> the >>>>>>> tasksIds in the streams cluster i.e all the topic partitions in >> these >>>>>>> global assignment maps was what was problematic. By explicitly >> sending >>>>>> the >>>>>>> global assignment maps as actual topic partitions, group >> coordinator >>>>>> (i.e >>>>>>> the leader that computes the assignment's ) is able to consistently >>>>>> enforce >>>>>>> its view of the topic metadata. Still don't think doing such a >> change >>>>>> that >>>>>>> forces you to reconsider semantics, is not needed to save bits on >>> wire. >>>>>> May >>>>>>> be we can discuss this separately from this KIP? >>>>>>> >>>>>>> 4. There needs to be some caching/interval somewhere though since we >>>>>> don't >>>>>>> want to make 1 kafka read per 1 IQ potentially. But I think its a >>> valid >>>>>>> suggestion, to make this call just synchronous and leave the caching >>> or >>>>>> how >>>>>>> often you want to call to the application. Would it be good to then >>>>> break >>>>>>> up the APIs for time and offset based lag? We can obtain offset >> based >>>>>> lag >>>>>>> for free? Only incur the overhead of reading kafka if we want time >>>>>>> based lags? >>>>>>> >>>>>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman < >>>>> sop...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Adding on to John's response to 3), can you clarify when and why >>>>>> exactly we >>>>>>>> cannot >>>>>>>> convert between taskIds and partitions? If that's really the case I >>>>>> don't >>>>>>>> feel confident >>>>>>>> that the StreamsPartitionAssignor is not full of bugs... >>>>>>>> >>>>>>>> It seems like it currently just encodes a list of all partitions >> (the >>>>>>>> assignment) and also >>>>>>>> a list of the corresponding task ids, duplicated to ensure each >>>>>> partition >>>>>>>> has the corresponding >>>>>>>> taskId at the same offset into the list. Why is that problematic? >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler <j...@confluent.io> >>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks, all, for considering the points! >>>>>>>>> >>>>>>>>> 3. Interesting. I have a vague recollection of that... Still, >>>>> though, >>>>>>>>> it seems a little fishy. After all, we return the assignments >>>>>>>>> themselves as task ids, and the members have to map these to topic >>>>>>>>> partitions in order to configure themselves properly. If it's too >>>>>>>>> complicated to get this right, then how do we know that Streams is >>>>>>>>> computing the correct partitions at all? >>>>>>>>> >>>>>>>>> 4. How about just checking the log-end timestamp when you call the >>>>>>>>> method? Then, when you get an answer, it's as fresh as it could >>>>>>>>> possibly be. And as a user you have just one, obvious, "knob" to >>>>>>>>> configure how much overhead you want to devote to checking... If >>>>> you >>>>>>>>> want to call the broker API less frequently, you just call the >>>>>> Streams >>>>>>>>> API less frequently. And you don't have to worry about the >>>>>>>>> relationship between your invocations of that method and the >> config >>>>>>>>> setting (e.g., you'll never get a negative number, which you could >>>>> if >>>>>>>>> you check the log-end timestamp less frequently than you check the >>>>>>>>> lag). >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> -John >>>>>>>>> >>>>>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar >>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>> >>>>>>>>>> Thanks John for going through this. >>>>>>>>>> >>>>>>>>>> - +1, makes sense >>>>>>>>>> - +1, no issues there >>>>>>>>>> - Yeah the initial patch I had submitted for K-7149( >>>>>>>>> https://github.com/apache/kafka/pull/6935) to reduce >>>>> assignmentInfo >>>>>>>>> object had taskIds but the merged PR had similar size according to >>>>>> Vinoth >>>>>>>>> and it was simpler so if the end result is of same size, it would >>>>> not >>>>>>>> make >>>>>>>>> sense to pivot from dictionary and again move to taskIDs. >>>>>>>>>> - Not sure about what a good default would be if we don't >>>>> have a >>>>>>>>> configurable setting. This gives the users the flexibility to the >>>>>> users >>>>>>>> to >>>>>>>>> serve their requirements as at the end of the day it would take >> CPU >>>>>>>> cycles. >>>>>>>>> I am ok with starting it with a default and see how it goes based >>>>>> upon >>>>>>>>> feedback. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Navinder >>>>>>>>>> On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar >>>>> < >>>>>>>>> vchan...@confluent.io> wrote: >>>>>>>>>> >>>>>>>>>> 1. Was trying to spell them out separately. but makes sense for >>>>>>>>>> readability. done >>>>>>>>>> >>>>>>>>>> 2. No I immediately agree :) .. makes sense. @navinder? >>>>>>>>>> >>>>>>>>>> 3. I actually attempted only sending taskIds while working on >>>>>>>> KAFKA-7149. >>>>>>>>>> Its non-trivial to handle edges cases resulting from newly added >>>>>> topic >>>>>>>>>> partitions and wildcarded topic entries. I ended up simplifying >>>>> it >>>>>> to >>>>>>>>> just >>>>>>>>>> dictionary encoding the topic names to reduce size. We can apply >>>>>> the >>>>>>>> same >>>>>>>>>> technique here for this map. Additionally, we could also >>>>> dictionary >>>>>>>>> encode >>>>>>>>>> HostInfo, given its now repeated twice. I think this would save >>>>>> more >>>>>>>>> space >>>>>>>>>> than having a flag per topic partition entry. Lmk if you are okay >>>>>> with >>>>>>>>>> this. >>>>>>>>>> >>>>>>>>>> 4. This opens up a good discussion. Given we support time lag >>>>>> estimates >>>>>>>>>> also, we need to read the tail record of the changelog >>>>> periodically >>>>>>>>> (unlike >>>>>>>>>> offset lag, which we can potentially piggyback on metadata in >>>>>>>>>> ConsumerRecord IIUC). we thought we should have a config that >>>>>> control >>>>>>>> how >>>>>>>>>> often this read happens? Let me know if there is a simple way to >>>>>> get >>>>>>>>>> timestamp value of the tail record that we are missing. >>>>>>>>>> >>>>>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler <j...@confluent.io >>>>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hey Navinder, >>>>>>>>>>> >>>>>>>>>>> Thanks for updating the KIP, it's a lot easier to see the >>>>> current >>>>>>>>>>> state of the proposal now. >>>>>>>>>>> >>>>>>>>>>> A few remarks: >>>>>>>>>>> 1. I'm sure it was just an artifact of revisions, but you have >>>>>> two >>>>>>>>>>> separate sections where you list additions to the KafkaStreams >>>>>>>>>>> interface. Can you consolidate those so we can see all the >>>>>> additions >>>>>>>>>>> at once? >>>>>>>>>>> >>>>>>>>>>> 2. For messageLagEstimate, can I suggest "offsetLagEstimate" >>>>>> instead, >>>>>>>>>>> to be clearer that we're specifically measuring a number of >>>>>> offsets? >>>>>>>>>>> If you don't immediately agree, then I'd at least point out >>>>> that >>>>>> we >>>>>>>>>>> usually refer to elements of Kafka topics as "records", not >>>>>>>>>>> "messages", so "recordLagEstimate" might be more appropriate. >>>>>>>>>>> >>>>>>>>>>> 3. The proposal mentions adding a map of the standby >>>>>> _partitions_ for >>>>>>>>>>> each host to AssignmentInfo. I assume this is designed to >>>>> mirror >>>>>> the >>>>>>>>>>> existing "partitionsByHost" map. To keep the size of these >>>>>> metadata >>>>>>>>>>> messages down, maybe we can consider making two changes: >>>>>>>>>>> (a) for both actives and standbys, encode the _task ids_ >>>>> instead >>>>>> of >>>>>>>>>>> _partitions_. Every member of the cluster has a copy of the >>>>>> topology, >>>>>>>>>>> so they can convert task ids into specific partitions on their >>>>>> own, >>>>>>>>>>> and task ids are only (usually) three characters. >>>>>>>>>>> (b) instead of encoding two maps (hostinfo -> actives AND >>>>>> hostinfo -> >>>>>>>>>>> standbys), which requires serializing all the hostinfos twice, >>>>>> maybe >>>>>>>>>>> we can pack them together in one map with a structured value >>>>>>>> (hostinfo >>>>>>>>>>> -> [actives,standbys]). >>>>>>>>>>> Both of these ideas still require bumping the protocol version >>>>>> to 6, >>>>>>>>>>> and they basically mean we drop the existing `PartitionsByHost` >>>>>> field >>>>>>>>>>> and add a new `TasksByHost` field with the structured value I >>>>>>>>>>> mentioned. >>>>>>>>>>> >>>>>>>>>>> 4. Can we avoid adding the new "lag refresh" config? The lags >>>>>> would >>>>>>>>>>> necessarily be approximate anyway, so adding the config seems >>>>> to >>>>>>>>>>> increase the operational complexity of the system for little >>>>>> actual >>>>>>>>>>> benefit. >>>>>>>>>>> >>>>>>>>>>> Thanks for the pseudocode, by the way, it really helps >>>>> visualize >>>>>> how >>>>>>>>>>> these new interfaces would play together. And thanks again for >>>>>> the >>>>>>>>>>> update! >>>>>>>>>>> -John >>>>>>>>>>> >>>>>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler < >>>>> j...@confluent.io> >>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hey Vinoth, >>>>>>>>>>>> >>>>>>>>>>>> I started going over the KIP again yesterday. There are a lot >>>>>> of >>>>>>>>>>>> updates, and I didn't finish my feedback in one day. I'm >>>>>> working on >>>>>>>>> it >>>>>>>>>>>> now. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> John >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar < >>>>>>>>> vchan...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Wondering if anyone has thoughts on these changes? I liked >>>>>> that >>>>>>>>> the new >>>>>>>>>>>>> metadata fetch APIs provide all the information at once >>>>> with >>>>>>>>> consistent >>>>>>>>>>>>> naming.. >>>>>>>>>>>>> >>>>>>>>>>>>> Any guidance on what you would like to be discussed or >>>>>> fleshed >>>>>>>> out >>>>>>>>> more >>>>>>>>>>>>> before we call a VOTE? >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar >>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> We have made some edits in the KIP( >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance >>>>>>>>>>> ) >>>>>>>>>>>>>> after due deliberation on the agreed design to support >>>>> the >>>>>> new >>>>>>>>> query >>>>>>>>>>>>>> design. This includes the new public API to query >>>>>> offset/time >>>>>>>> lag >>>>>>>>>>>>>> information and other details related to querying standby >>>>>> tasks >>>>>>>>>>> which have >>>>>>>>>>>>>> come up after thinking of thorough details. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> - Addition of new config, “lag.fetch.interval.ms” to >>>>>>>>> configure >>>>>>>>>>> the >>>>>>>>>>>>>> interval of time/offset lag >>>>>>>>>>>>>> - Addition of new class StoreLagInfo to store the >>>>>>>> periodically >>>>>>>>>>> obtained >>>>>>>>>>>>>> time/offset lag >>>>>>>>>>>>>> - Addition of two new functions in KafkaStreams, >>>>>>>>>>> List<StoreLagInfo> >>>>>>>>>>>>>> allLagInfo() and List<StoreLagInfo> >>>>> lagInfoForStore(String >>>>>>>>>>> storeName) to >>>>>>>>>>>>>> return the lag information for an instance and a store >>>>>>>>> respectively >>>>>>>>>>>>>> - Addition of new class KeyQueryMetadata. We need >>>>>>>>> topicPartition >>>>>>>>>>> for >>>>>>>>>>>>>> each key to be matched with the lag API for the topic >>>>>>>> partition. >>>>>>>>> One >>>>>>>>>>> way is >>>>>>>>>>>>>> to add new functions and fetch topicPartition from >>>>>>>>>>> StreamsMetadataState but >>>>>>>>>>>>>> we thought having one call and fetching StreamsMetadata >>>>> and >>>>>>>>>>> topicPartition >>>>>>>>>>>>>> is more cleaner. >>>>>>>>>>>>>> - >>>>>>>>>>>>>> Renaming partitionsForHost to activePartitionsForHost in >>>>>>>>>>> StreamsMetadataState >>>>>>>>>>>>>> and partitionsByHostState to activePartitionsByHostState >>>>>>>>>>>>>> in StreamsPartitionAssignor >>>>>>>>>>>>>> - We have also added the pseudo code of how all the >>>>>> changes >>>>>>>>> will >>>>>>>>>>> exist >>>>>>>>>>>>>> together and support the new querying APIs >>>>>>>>>>>>>> >>>>>>>>>>>>>> Please let me know if anything is pending now, before a >>>>>> vote >>>>>>>> can >>>>>>>>> be >>>>>>>>>>>>>> started on this. On Saturday, 26 October, 2019, 05:41:44 >>>>>> pm >>>>>>>> IST, >>>>>>>>>>> Navinder >>>>>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >> Since there are two soft votes for separate >>>>>> active/standby >>>>>>>>> API >>>>>>>>>>>>>> methods, I also change my position on that. Fine with 2 >>>>>>>> separate >>>>>>>>>>>>>> methods. Once we remove the lag information from these >>>>>> APIs, >>>>>>>>>>> returning a >>>>>>>>>>>>>> List is less attractive, since the ordering has no >>>>> special >>>>>>>>> meaning >>>>>>>>>>> now. >>>>>>>>>>>>>> Agreed, now that we are not returning lag, I am also sold >>>>>> on >>>>>>>>> having >>>>>>>>>>> two >>>>>>>>>>>>>> separate functions. We already have one which returns >>>>>>>>>>> streamsMetadata for >>>>>>>>>>>>>> active tasks, and now we can add another one for >>>>> standbys. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth >>>>>>>>> Chandar < >>>>>>>>>>>>>> vchan...@confluent.io> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> +1 to Sophie's suggestion. Having both lag in terms of >>>>>> time >>>>>>>> and >>>>>>>>>>> offsets is >>>>>>>>>>>>>> good and makes for a more complete API. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Since there are two soft votes for separate >>>>> active/standby >>>>>> API >>>>>>>>>>> methods, I >>>>>>>>>>>>>> also change my position on that. Fine with 2 separate >>>>>> methods. >>>>>>>>>>>>>> Once we remove the lag information from these APIs, >>>>>> returning a >>>>>>>>> List >>>>>>>>>>> is >>>>>>>>>>>>>> less attractive, since the ordering has no special >>>>> meaning >>>>>> now. >>>>>>>>>>>>>> >>>>>>>>>>>>>>>> lag in offsets vs time: Having both, as suggested by >>>>>> Sophie >>>>>>>>> would >>>>>>>>>>> of >>>>>>>>>>>>>> course be best. What is a little unclear to me is, how in >>>>>>>> details >>>>>>>>>>> are we >>>>>>>>>>>>>> going to compute both? >>>>>>>>>>>>>> @navinder may be next step is to flesh out these details >>>>>> and >>>>>>>>> surface >>>>>>>>>>> any >>>>>>>>>>>>>> larger changes we need to make if need be. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any other details we need to cover, before a VOTE can be >>>>>> called >>>>>>>>> on >>>>>>>>>>> this? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck < >>>>>> bbej...@gmail.com >>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am jumping in a little late here. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Overall I agree with the proposal to push decision >>>>>> making on >>>>>>>>>>> what/how to >>>>>>>>>>>>>>> query in the query layer. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> For point 5 from above, I'm slightly in favor of having >>>>>> a new >>>>>>>>>>> method, >>>>>>>>>>>>>>> "standbyMetadataForKey()" or something similar. >>>>>>>>>>>>>>> Because even if we return all tasks in one list, the >>>>> user >>>>>>>> will >>>>>>>>>>> still have >>>>>>>>>>>>>>> to perform some filtering to separate the different >>>>>> tasks, >>>>>>>> so I >>>>>>>>>>> don't >>>>>>>>>>>>>> feel >>>>>>>>>>>>>>> making two calls is a burden, and IMHO makes things >>>>> more >>>>>>>>>>> transparent for >>>>>>>>>>>>>>> the user. >>>>>>>>>>>>>>> If the final vote is for using an "isActive" field, I'm >>>>>> good >>>>>>>>> with >>>>>>>>>>> that as >>>>>>>>>>>>>>> well. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Just my 2 cents. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar >>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think now we are aligned on almost all the design >>>>>> parts. >>>>>>>>>>> Summarising >>>>>>>>>>>>>>>> below what has been discussed above and we have a >>>>>> general >>>>>>>>>>> consensus on. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Rather than broadcasting lag across all nodes at >>>>>>>>>>> rebalancing/with >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> heartbeat, we will just return a list of all >>>>> available >>>>>>>>> standby’s >>>>>>>>>>> in the >>>>>>>>>>>>>>>> system and the user can make IQ query any of those >>>>>> nodes >>>>>>>>> which >>>>>>>>>>> will >>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>> the response, and the lag and offset time. Based on >>>>>> which >>>>>>>>> user >>>>>>>>>>> can >>>>>>>>>>>>>> decide >>>>>>>>>>>>>>>> if he wants to return the response back or call >>>>> another >>>>>>>>> standby. >>>>>>>>>>>>>>>> - The current metadata query frequency will not >>>>>> change. >>>>>>>>> It >>>>>>>>>>> will be >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> same as it does now, i.e. before each query. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - For fetching list<StreamsMetadata> in >>>>>>>>>>> StreamsMetadataState.java >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> List<QueryableStoreProvider> in >>>>>>>>>>> StreamThreadStateStoreProvider.java >>>>>>>>>>>>>>> (which >>>>>>>>>>>>>>>> will return all active stores which are >>>>>> running/restoring >>>>>>>> and >>>>>>>>>>> replica >>>>>>>>>>>>>>>> stores which are running), we will add new functions >>>>>> and >>>>>>>> not >>>>>>>>>>> disturb >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> existing functions >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - There is no need to add new StreamsConfig for >>>>>>>>> implementing >>>>>>>>>>> this >>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - We will add standbyPartitionsByHost in >>>>>> AssignmentInfo >>>>>>>>> and >>>>>>>>>>>>>>>> StreamsMetadataState which would change the existing >>>>>>>>>>> rebuildMetadata() >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>> setPartitionsByHostState() >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If anyone has any more concerns please feel free to >>>>>> add. >>>>>>>> Post >>>>>>>>>>> this I >>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> be initiating a vote. >>>>>>>>>>>>>>>> ~Navinder >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Friday, 25 October, 2019, 12:05:29 pm IST, >>>>>> Matthias >>>>>>>> J. >>>>>>>>> Sax >>>>>>>>>>> < >>>>>>>>>>>>>>>> matth...@confluent.io> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Just to close the loop @Vinoth: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. IIUC John intends to add (or we can do this in >>>>>> this >>>>>>>>> KIP) lag >>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>> to AssignmentInfo, which gets sent to every >>>>>> participant. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As explained by John, currently KIP-441 plans to only >>>>>>>> report >>>>>>>>> the >>>>>>>>>>>>>>>> information to the leader. But I guess, with the new >>>>>>>>> proposal to >>>>>>>>>>> not >>>>>>>>>>>>>>>> broadcast this information anyway, this concern is >>>>>>>>> invalidated >>>>>>>>>>> anyway >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. At-least I was under the assumption that it can >>>>> be >>>>>>>>> called >>>>>>>>>>> per >>>>>>>>>>>>>> query, >>>>>>>>>>>>>>>>> since the API docs don't seem to suggest otherwise. >>>>>> Do >>>>>>>> you >>>>>>>>> see >>>>>>>>>>> any >>>>>>>>>>>>>>>>> potential issues if we call this every query? (we >>>>>> should >>>>>>>>>>> benchmark >>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> nonetheless) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I did not see a real issue if people refresh the >>>>>> metadata >>>>>>>>>>> frequently, >>>>>>>>>>>>>>>> because it would be a local call. My main point was, >>>>>> that >>>>>>>>> this >>>>>>>>>>> would >>>>>>>>>>>>>>>> change the current usage pattern of the API, and we >>>>>> would >>>>>>>>>>> clearly need >>>>>>>>>>>>>>>> to communicate this change. Similar to (1), this >>>>>> concern in >>>>>>>>>>> invalidated >>>>>>>>>>>>>>>> anyway. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @John: I think it's a great idea to get rid of >>>>>> reporting >>>>>>>>> lag, and >>>>>>>>>>>>>>>> pushing the decision making process about "what to >>>>>> query" >>>>>>>>> into >>>>>>>>>>> the >>>>>>>>>>>>>> query >>>>>>>>>>>>>>>> serving layer itself. This simplifies the overall >>>>>> design of >>>>>>>>> this >>>>>>>>>>> KIP >>>>>>>>>>>>>>>> significantly, and actually aligns very well with the >>>>>> idea >>>>>>>>> that >>>>>>>>>>> Kafka >>>>>>>>>>>>>>>> Streams (as it is a library) should only provide the >>>>>> basic >>>>>>>>>>> building >>>>>>>>>>>>>>>> block. Many of my raised questions are invalided by >>>>>> this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Some questions are still open though: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 10) Do we need to distinguish between >>>>>> active(restoring) >>>>>>>> and >>>>>>>>>>> standby >>>>>>>>>>>>>>>>> tasks? Or could be treat both as the same? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Vinoth: about (5). I see your point about multiple >>>>>> calls >>>>>>>> vs >>>>>>>>> a >>>>>>>>>>> single >>>>>>>>>>>>>>>> call. I still slightly prefer multiple calls, but >>>>> it's >>>>>>>> highly >>>>>>>>>>>>>> subjective >>>>>>>>>>>>>>>> and I would also be fine to add an #isActive() >>>>> method. >>>>>>>> Would >>>>>>>>> be >>>>>>>>>>> good >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> get feedback from others. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For (14), ie, lag in offsets vs time: Having both, as >>>>>>>>> suggested >>>>>>>>>>> by >>>>>>>>>>>>>>>> Sophie would of course be best. What is a little >>>>>> unclear to >>>>>>>>> me >>>>>>>>>>> is, how >>>>>>>>>>>>>>>> in details are we going to compute both? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote: >>>>>>>>>>>>>>>>> Just to chime in on the "report lag vs timestamp >>>>>>>>> difference" >>>>>>>>>>> issue, I >>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>> actually advocate for both. As mentioned already, >>>>>> time >>>>>>>>>>> difference is >>>>>>>>>>>>>>>>> probably a lot easier and/or more useful to reason >>>>>> about >>>>>>>> in >>>>>>>>>>> terms of >>>>>>>>>>>>>>>>> "freshness" >>>>>>>>>>>>>>>>> of the state. But in the case when all queried >>>>>> stores are >>>>>>>>> far >>>>>>>>>>> behind, >>>>>>>>>>>>>>> lag >>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>> be used to estimate the recovery velocity. You can >>>>>> then >>>>>>>>> get a >>>>>>>>>>> (pretty >>>>>>>>>>>>>>>> rough) >>>>>>>>>>>>>>>>> idea of when a store might be ready, and wait until >>>>>>>> around >>>>>>>>>>> then to >>>>>>>>>>>>>>> query >>>>>>>>>>>>>>>>> again. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang < >>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I think I agree with John's recent reasoning as >>>>>> well: >>>>>>>>> instead >>>>>>>>>>> of >>>>>>>>>>>>>>> letting >>>>>>>>>>>>>>>>>> the storeMetadataAPI to return the staleness >>>>>>>> information, >>>>>>>>>>> letting >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> client to query either active or standby and >>>>> letting >>>>>>>>> standby >>>>>>>>>>> query >>>>>>>>>>>>>>>> response >>>>>>>>>>>>>>>>>> to include both the values + timestamp (or lag, as >>>>>> in >>>>>>>>> diffs of >>>>>>>>>>>>>>>> timestamps) >>>>>>>>>>>>>>>>>> would actually be more intuitive -- not only the >>>>>> streams >>>>>>>>>>> client is >>>>>>>>>>>>>>>> simpler, >>>>>>>>>>>>>>>>>> from user's perspective they also do not need to >>>>>>>>> periodically >>>>>>>>>>>>>> refresh >>>>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>> staleness information from the client, but only >>>>>> need to >>>>>>>>> make >>>>>>>>>>>>>> decisions >>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>> the fly whenever they need to query. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Again the standby replica then need to know the >>>>>> current >>>>>>>>> active >>>>>>>>>>>>>> task's >>>>>>>>>>>>>>>>>> timestamp, which can be found from the log end >>>>>> record's >>>>>>>>>>> encoded >>>>>>>>>>>>>>>> timestamp; >>>>>>>>>>>>>>>>>> today we standby tasks do not read that specific >>>>>> record, >>>>>>>>> but >>>>>>>>>>> only >>>>>>>>>>>>>>>> refresh >>>>>>>>>>>>>>>>>> its knowledge on the log end offset, but I think >>>>>>>>> refreshing >>>>>>>>>>> the >>>>>>>>>>>>>> latest >>>>>>>>>>>>>>>>>> record timestamp is not a very bad request to add >>>>>> on the >>>>>>>>>>> standby >>>>>>>>>>>>>>>> replicas. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar < >>>>>>>>>>>>>> vchan...@confluent.io >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> +1 As someone implementing a query routing layer, >>>>>> there >>>>>>>>> is >>>>>>>>>>> already >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>> to have mechanisms in place to do >>>>>> healthchecks/failure >>>>>>>>>>> detection to >>>>>>>>>>>>>>>>>> detect >>>>>>>>>>>>>>>>>>> failures for queries, while Streams rebalancing >>>>>>>>> eventually >>>>>>>>>>> kicks in >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> background. >>>>>>>>>>>>>>>>>>> So, pushing this complexity to the IQ client app >>>>>> keeps >>>>>>>>>>> Streams >>>>>>>>>>>>>>> simpler >>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>> well. IQs will be potentially issues at an order >>>>> of >>>>>>>>>>> magnitude more >>>>>>>>>>>>>>>>>>> frequently and it can achieve good freshness for >>>>>> the >>>>>>>> lag >>>>>>>>>>>>>> information. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I would like to add however, that we would also >>>>>> need to >>>>>>>>>>> introduce >>>>>>>>>>>>>>> apis >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> KafkaStreams class, for obtaining lag information >>>>>> for >>>>>>>> all >>>>>>>>>>> stores >>>>>>>>>>>>>>> local >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> that host. This is for the IQs to relay back with >>>>>> the >>>>>>>>>>> response/its >>>>>>>>>>>>>>> own >>>>>>>>>>>>>>>>>>> heartbeat mechanism. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler < >>>>>>>>>>> j...@confluent.io> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I've been mulling about this KIP, and I think I >>>>>> was on >>>>>>>>> the >>>>>>>>>>> wrong >>>>>>>>>>>>>>> track >>>>>>>>>>>>>>>>>>>> earlier with regard to task lags. Tl;dr: I don't >>>>>> think >>>>>>>>> we >>>>>>>>>>> should >>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>> lags at all to the metadata API (and also not to >>>>>> the >>>>>>>>>>>>>> AssignmentInfo >>>>>>>>>>>>>>>>>>>> protocol message). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Like I mentioned early on, reporting lag via >>>>>>>>>>>>>>>>>>>> SubscriptionInfo/AssignmentInfo would only work >>>>>> while >>>>>>>>>>> rebalances >>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>> happening. Once the group stabilizes, no members >>>>>> would >>>>>>>>> be >>>>>>>>>>> notified >>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>> each others' lags anymore. I had been thinking >>>>>> that >>>>>>>> the >>>>>>>>>>> solution >>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> be the heartbeat proposal I mentioned earlier, >>>>> but >>>>>>>> that >>>>>>>>>>> proposal >>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> have reported the heartbeats of the members only >>>>>> to >>>>>>>> the >>>>>>>>>>> leader >>>>>>>>>>>>>>> member >>>>>>>>>>>>>>>>>>>> (the one who makes assignments). To be useful in >>>>>> the >>>>>>>>>>> context of >>>>>>>>>>>>>>> _this_ >>>>>>>>>>>>>>>>>>>> KIP, we would also have to report the lags in >>>>> the >>>>>>>>> heartbeat >>>>>>>>>>>>>>> responses >>>>>>>>>>>>>>>>>>>> to of _all_ members. This is a concern to be >>>>>> because >>>>>>>> now >>>>>>>>>>> _all_ the >>>>>>>>>>>>>>>>>>>> lags get reported to _all_ the members on >>>>> _every_ >>>>>>>>>>> heartbeat... a >>>>>>>>>>>>>> lot >>>>>>>>>>>>>>>>>>>> of chatter. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Plus, the proposal for KIP-441 is only to report >>>>>> the >>>>>>>>> lags >>>>>>>>>>> of each >>>>>>>>>>>>>>>>>>>> _task_. This is the sum of the lags of all the >>>>>> stores >>>>>>>>> in the >>>>>>>>>>>>>> tasks. >>>>>>>>>>>>>>>>>>>> But this would be insufficient for KIP-535. For >>>>>> this >>>>>>>>> kip, >>>>>>>>>>> we would >>>>>>>>>>>>>>>>>>>> want the lag specifically of the store we want >>>>> to >>>>>>>>> query. So >>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> means, we have to report the lags of all the >>>>>> stores of >>>>>>>>> all >>>>>>>>>>> the >>>>>>>>>>>>>>> members >>>>>>>>>>>>>>>>>>>> to every member... even more chatter! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The final nail in the coffin to me is that IQ >>>>>> clients >>>>>>>>> would >>>>>>>>>>> have >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> start refreshing their metadata quite frequently >>>>>> to >>>>>>>>> stay up >>>>>>>>>>> to >>>>>>>>>>>>>> date >>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>> the lags, which adds even more overhead to the >>>>>> system. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Consider a strawman alternative: we bring >>>>> KIP-535 >>>>>> back >>>>>>>>> to >>>>>>>>>>>>>> extending >>>>>>>>>>>>>>>>>>>> the metadata API to tell the client the active >>>>> and >>>>>>>>> standby >>>>>>>>>>>>>> replicas >>>>>>>>>>>>>>>>>>>> for the key in question (not including and >>>>>>>>> "staleness/lag" >>>>>>>>>>>>>>>>>>>> restriction, just returning all the replicas). >>>>>> Then, >>>>>>>> the >>>>>>>>>>> client >>>>>>>>>>>>>>> picks >>>>>>>>>>>>>>>>>>>> a replica and sends the query. The server >>>>> returns >>>>>> the >>>>>>>>>>> current lag >>>>>>>>>>>>>>>>>>>> along with the response (maybe in an HTML header >>>>>> or >>>>>>>>>>> something). >>>>>>>>>>>>>>> Then, >>>>>>>>>>>>>>>>>>>> the client keeps a map of its last observed lags >>>>>> for >>>>>>>>> each >>>>>>>>>>> replica, >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> uses this information to prefer fresher >>>>> replicas. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> OR, if it wants only to query the active >>>>> replica, >>>>>> it >>>>>>>>> would >>>>>>>>>>> throw >>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>> error on any lag response greater than zero, >>>>>> refreshes >>>>>>>>> its >>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>> by >>>>>>>>>>>>>>>>>>>> re-querying the metadata API, and tries again >>>>>> with the >>>>>>>>>>> current >>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>>>>>> replica. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This way, the lag information will be super >>>>> fresh >>>>>> for >>>>>>>>> the >>>>>>>>>>> client, >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> we keep the Metadata API / >>>>>> Assignment,Subscription / >>>>>>>> and >>>>>>>>>>> Heartbeat >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>> slim as possible. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Side note: I do think that some time soon, we'll >>>>>> have >>>>>>>> to >>>>>>>>>>> add a >>>>>>>>>>>>>>> library >>>>>>>>>>>>>>>>>>>> for IQ server/clients. I think that this logic >>>>>> will >>>>>>>>> start >>>>>>>>>>> to get >>>>>>>>>>>>>>>>>>>> pretty complex. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I hope this thinking is reasonably clear! >>>>>>>>>>>>>>>>>>>> Thanks again, >>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Does that >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar >>>>> < >>>>>>>>>>>>>>>> vchan...@confluent.io >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Responding to the points raised by Matthias >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 1. IIUC John intends to add (or we can do this >>>>> in >>>>>>>> this >>>>>>>>>>> KIP) lag >>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>> to AssignmentInfo, which gets sent to every >>>>>>>>> participant. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 2. At-least I was under the assumption that it >>>>>> can be >>>>>>>>>>> called per >>>>>>>>>>>>>>>>>> query, >>>>>>>>>>>>>>>>>>>>> since the API docs don't seem to suggest >>>>>> otherwise. >>>>>>>> Do >>>>>>>>> you >>>>>>>>>>> see >>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>> potential issues if we call this every query? >>>>> (we >>>>>>>>> should >>>>>>>>>>>>>> benchmark >>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> nonetheless) >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 4. Agree. metadataForKey() implicitly would >>>>>> return >>>>>>>> the >>>>>>>>>>> active >>>>>>>>>>>>>> host >>>>>>>>>>>>>>>>>>>> metadata >>>>>>>>>>>>>>>>>>>>> (as it was before). We should also document >>>>> this >>>>>> in >>>>>>>>> that >>>>>>>>>>> APIs >>>>>>>>>>>>>>>>>> javadoc, >>>>>>>>>>>>>>>>>>>>> given we have another method(s) that returns >>>>> more >>>>>>>> host >>>>>>>>>>> metadata >>>>>>>>>>>>>>> now. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 5. While I see the point, the app/caller has >>>>> to >>>>>> make >>>>>>>>> two >>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>> APIs >>>>>>>>>>>>>>>>>>>>> calls to obtain active/standby and potentially >>>>>> do the >>>>>>>>> same >>>>>>>>>>> set of >>>>>>>>>>>>>>>>>>>> operation >>>>>>>>>>>>>>>>>>>>> to query the state. I personally still like a >>>>>> method >>>>>>>>> like >>>>>>>>>>>>>>> isActive() >>>>>>>>>>>>>>>>>>>>> better, but don't have strong opinions. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 9. If we do expose the lag information, could >>>>> we >>>>>> just >>>>>>>>>>> leave it >>>>>>>>>>>>>> upto >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> caller to decide whether it errors out or not >>>>>> and not >>>>>>>>> make >>>>>>>>>>> the >>>>>>>>>>>>>>>>>> decision >>>>>>>>>>>>>>>>>>>>> within Streams? i.e we don't need a new config >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 14. +1 . If it's easier to do right away. We >>>>>> started >>>>>>>>> with >>>>>>>>>>> number >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>> records, following the lead from KIP-441 >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar >>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, everyone for taking a look. Some very >>>>>> cool >>>>>>>>> ideas >>>>>>>>>>> have >>>>>>>>>>>>>>> flown >>>>>>>>>>>>>>>>>>> in. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> There was a follow-on idea I POCed to >>>>>> continuously >>>>>>>>>>> share lag >>>>>>>>>>>>>>>>>>>>>> information in the heartbeat protocol+1 that >>>>>> would >>>>>>>> be >>>>>>>>>>> great, I >>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> update >>>>>>>>>>>>>>>>>>>>>> the KIP assuming this work will finish soon >>>>>>>>>>>>>>>>>>>>>>>> I think that adding a new method to >>>>>>>>>>> StreamsMetadataState and >>>>>>>>>>>>>>>>>>>>>> deprecating the existing method isthe best way >>>>>> to >>>>>>>> go; >>>>>>>>> we >>>>>>>>>>> just >>>>>>>>>>>>>>> can't >>>>>>>>>>>>>>>>>>>> change >>>>>>>>>>>>>>>>>>>>>> the return types of any existing methods.+1 on >>>>>> this, >>>>>>>>> we >>>>>>>>>>> will add >>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>> methods for users who would be interested in >>>>>>>> querying >>>>>>>>>>> back a >>>>>>>>>>>>>> list >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> possible options to query from and leave the >>>>>> current >>>>>>>>>>> function >>>>>>>>>>>>>>>>>>>>>> getStreamsMetadataForKey() untouched for users >>>>>> who >>>>>>>>> want >>>>>>>>>>> absolute >>>>>>>>>>>>>>>>>>>>>> consistency. >>>>>>>>>>>>>>>>>>>>>>>> why not just always return all available >>>>>> metadata >>>>>>>>>>> (including >>>>>>>>>>>>>>>>>>>>>> active/standby or lag) and let the caller >>>>>> decide to >>>>>>>>> which >>>>>>>>>>> node >>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>> want to >>>>>>>>>>>>>>>>>>>>>> route the query+1. I think this makes sense as >>>>>> from >>>>>>>> a >>>>>>>>> user >>>>>>>>>>>>>>>>>> standpoint >>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>> is no difference b/w an active and a standby >>>>> if >>>>>> both >>>>>>>>> have >>>>>>>>>>> same >>>>>>>>>>>>>>> lag, >>>>>>>>>>>>>>>>>>>> Infact >>>>>>>>>>>>>>>>>>>>>> users would be able to use this API to reduce >>>>>> query >>>>>>>>> load >>>>>>>>>>> on >>>>>>>>>>>>>>>>>> actives, >>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>> returning all available options along with the >>>>>>>> current >>>>>>>>>>> lag in >>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>> make sense and leave it to user how they want >>>>>> to use >>>>>>>>> this >>>>>>>>>>> data. >>>>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>>>> another added advantage. If a user queries any >>>>>>>> random >>>>>>>>>>> machine >>>>>>>>>>>>>> for >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> key and >>>>>>>>>>>>>>>>>>>>>> that machine has a replica for the >>>>>> partition(where >>>>>>>> key >>>>>>>>>>> belongs) >>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>> choose to serve the data from there itself(if >>>>> it >>>>>>>>> doesn’t >>>>>>>>>>> lag >>>>>>>>>>>>>> much) >>>>>>>>>>>>>>>>>>>> rather >>>>>>>>>>>>>>>>>>>>>> than finding the active and making an IQ to >>>>>> that. >>>>>>>> This >>>>>>>>>>> would >>>>>>>>>>>>>> save >>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>> critical time in serving for some >>>>> applications. >>>>>>>>>>>>>>>>>>>>>>>> Adding the lag in terms of timestamp diff >>>>>>>> comparing >>>>>>>>> the >>>>>>>>>>>>>>>>>> committed >>>>>>>>>>>>>>>>>>>>>> offset.+1 on this, I think it’s more readable. >>>>>> But >>>>>>>> as >>>>>>>>>>> John said >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> function allMetadataForKey() is just returning >>>>>> the >>>>>>>>>>> possible >>>>>>>>>>>>>>> options >>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>> where users can query a key, so we can even >>>>>> drop the >>>>>>>>>>> parameter >>>>>>>>>>>>>>>>>>>>>> enableReplicaServing/tolerableDataStaleness >>>>> and >>>>>> just >>>>>>>>>>> return all >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> streamsMetadata containing that key along with >>>>>> the >>>>>>>>> offset >>>>>>>>>>> limit. >>>>>>>>>>>>>>>>>>>>>> Answering the questions posted by Matthias in >>>>>>>>> sequence. >>>>>>>>>>>>>>>>>>>>>> 1. @John can you please comment on this one.2. >>>>>> Yeah >>>>>>>>> the >>>>>>>>>>> usage >>>>>>>>>>>>>>>>>> pattern >>>>>>>>>>>>>>>>>>>>>> would include querying this prior to every >>>>>> request >>>>>>>> 3. >>>>>>>>>>> Will add >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> changes >>>>>>>>>>>>>>>>>>>>>> to StreamsMetadata in the KIP, would include >>>>>> changes >>>>>>>>> in >>>>>>>>>>>>>>>>>>>> rebuildMetadata() >>>>>>>>>>>>>>>>>>>>>> etc.4. Makes sense, already addressed above5. >>>>>> Is it >>>>>>>>>>> important >>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>> perspective if they are querying an >>>>>>>>> active(processing), >>>>>>>>>>>>>>>>>>>> active(restoring), >>>>>>>>>>>>>>>>>>>>>> a standby task if we have away of denoting lag >>>>>> in a >>>>>>>>>>> readable >>>>>>>>>>>>>>> manner >>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>> kind of signifies the user that this is the >>>>> best >>>>>>>> node >>>>>>>>> to >>>>>>>>>>> query >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> fresh >>>>>>>>>>>>>>>>>>>>>> data.6. Yes, I intend to return the actives >>>>> and >>>>>>>>> replicas >>>>>>>>>>> in the >>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>> list in allMetadataForKey()7. tricky8. yes, we >>>>>> need >>>>>>>>> new >>>>>>>>>>>>>> functions >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>> activeRestoring and standbyRunning tasks.9. >>>>>>>>> StreamsConfig >>>>>>>>>>>>>> doesn’t >>>>>>>>>>>>>>>>>>> look >>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>> of much use to me since we are giving all >>>>>> possible >>>>>>>>>>> options via >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>> function, or they can use existing function >>>>>>>>>>>>>>>>>>> getStreamsMetadataForKey() >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> get just the active10. I think treat them both >>>>>> the >>>>>>>>> same >>>>>>>>>>> and let >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> lag do >>>>>>>>>>>>>>>>>>>>>> the talking11. We are just sending them the >>>>>> option >>>>>>>> to >>>>>>>>>>> query from >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> allMetadataForKey(), which doesn’t include any >>>>>>>>> handle. We >>>>>>>>>>> then >>>>>>>>>>>>>>>>>> query >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> machine for the key where it calls allStores() >>>>>> and >>>>>>>>> tries >>>>>>>>>>> to find >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> task >>>>>>>>>>>>>>>>>>>>>> in >>>>> activeRunning/activeRestoring/standbyRunning >>>>>> and >>>>>>>>> adds >>>>>>>>>>> the >>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>>>>> here. 12. Need to verify, but during the exact >>>>>> point >>>>>>>>> when >>>>>>>>>>> store >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> closed >>>>>>>>>>>>>>>>>>>>>> to transition it from restoring to running the >>>>>>>> queries >>>>>>>>>>> will >>>>>>>>>>>>>> fail. >>>>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>>>>>>>> caller in such case can have their own >>>>>> configurable >>>>>>>>>>> retries to >>>>>>>>>>>>>>>>>> check >>>>>>>>>>>>>>>>>>>> again >>>>>>>>>>>>>>>>>>>>>> or try the replica if a call fails to >>>>> active13. >>>>>> I >>>>>>>>> think >>>>>>>>>>> KIP-216 >>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>> working >>>>>>>>>>>>>>>>>>>>>> on those lines, we might not need few of those >>>>>>>>> exceptions >>>>>>>>>>> since >>>>>>>>>>>>>>> now >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> basic idea of this KIP is to support IQ during >>>>>>>>>>> rebalancing.14. >>>>>>>>>>>>>>>>>>>> Addressed >>>>>>>>>>>>>>>>>>>>>> above, agreed it looks more readable. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Tuesday, 22 October, 2019, 08:39:07 pm >>>>>> IST, >>>>>>>>>>> Matthias J. >>>>>>>>>>>>>> Sax >>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> One more thought: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 14) Is specifying the allowed lag in number of >>>>>>>>> records a >>>>>>>>>>> useful >>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> users to declare how stale an instance is >>>>>> allowed to >>>>>>>>> be? >>>>>>>>>>> Would >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> more intuitive for users to specify the >>>>> allowed >>>>>> lag >>>>>>>> in >>>>>>>>>>> time >>>>>>>>>>>>>> units >>>>>>>>>>>>>>>>>>>> (would >>>>>>>>>>>>>>>>>>>>>> event time or processing time be better)? It >>>>>> seems >>>>>>>>> hard >>>>>>>>>>> for >>>>>>>>>>>>>> users >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> reason how "fresh" a store really is when >>>>>> number of >>>>>>>>>>> records is >>>>>>>>>>>>>>>>>> used. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote: >>>>>>>>>>>>>>>>>>>>>>> Some more follow up thoughts: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 11) If we get a store handle of an >>>>>>>> active(restoring) >>>>>>>>>>> task, and >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> task >>>>>>>>>>>>>>>>>>>>>>> transits to running, does the store handle >>>>>> become >>>>>>>>>>> invalid and a >>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>> must be retrieved? Or can we "switch it out" >>>>>>>>> underneath >>>>>>>>>>> -- for >>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> case, how does the user know when they start >>>>> to >>>>>>>>> query the >>>>>>>>>>>>>>>>>>> up-to-date >>>>>>>>>>>>>>>>>>>>>> state? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 12) Standby tasks will have the store open in >>>>>>>> regular >>>>>>>>>>> mode, >>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>> active(restoring) tasks open stores in >>>>> "upgrade >>>>>>>> mode" >>>>>>>>>>> for more >>>>>>>>>>>>>>>>>>>> efficient >>>>>>>>>>>>>>>>>>>>>>> bulk loading. When we switch the store into >>>>>> active >>>>>>>>> mode, >>>>>>>>>>> we >>>>>>>>>>>>>> close >>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>> reopen it. What is the impact if we query the >>>>>> store >>>>>>>>>>> during >>>>>>>>>>>>>>>>>> restore? >>>>>>>>>>>>>>>>>>>> What >>>>>>>>>>>>>>>>>>>>>>> is the impact if we close the store to >>>>> transit >>>>>> to >>>>>>>>>>> running (eg, >>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>> might be open iterators)? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 13) Do we need to introduced new exception >>>>>> types? >>>>>>>>> Compare >>>>>>>>>>>>>> KIP-216 >>>>>>>>>>>>>>>>>>>>>>> ( >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors >>>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>>>> that aims to improve the user experience with >>>>>>>> regard >>>>>>>>> to >>>>>>>>>>> IQ >>>>>>>>>>>>>>>>>>>> exceptions. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote: >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Couple of comments: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 1) With regard to KIP-441, my current >>>>>>>> understanding >>>>>>>>> is >>>>>>>>>>> that >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> lag >>>>>>>>>>>>>>>>>>>>>>>> information is only reported to the leader >>>>>> (please >>>>>>>>>>> correct me >>>>>>>>>>>>>>>>>> if I >>>>>>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>>>>>>>>>> wrong). This seems to be quite a limitation >>>>> to >>>>>>>>> actually >>>>>>>>>>> use >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> lag >>>>>>>>>>>>>>>>>>>>>>>> information. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 2) The idea of the metadata API is actually >>>>>> to get >>>>>>>>>>> metadata >>>>>>>>>>>>>> once >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>> only refresh the metadata if a store was >>>>>> migrated. >>>>>>>>> The >>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>> proposal >>>>>>>>>>>>>>>>>>>>>>>> would require to get the metadata before >>>>> each >>>>>>>> query. >>>>>>>>>>> The KIP >>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>> describe the usage pattern and impact in >>>>> more >>>>>>>>> detail. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 3) Currently, the KIP does not list the >>>>>> public API >>>>>>>>>>> changes in >>>>>>>>>>>>>>>>>>>> detail. >>>>>>>>>>>>>>>>>>>>>>>> Please list all methods you intend to >>>>>> deprecate >>>>>>>> and >>>>>>>>>>> list all >>>>>>>>>>>>>>>>>>>> methods you >>>>>>>>>>>>>>>>>>>>>>>> intend to add (best, using a code-block >>>>>> markup -- >>>>>>>>>>> compare >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements >>>>>>>>>>>>>>>>>>>>>>>> as an example) >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 4) Also note (as already pointed out by >>>>> John), >>>>>>>> that >>>>>>>>> we >>>>>>>>>>> cannot >>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>> breaking API changes. Thus, the API should >>>>> be >>>>>>>>> designed >>>>>>>>>>> in a >>>>>>>>>>>>>>>>>> fully >>>>>>>>>>>>>>>>>>>>>>>> backward compatible manner. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 5) Returning a list of metadata object makes >>>>>> it >>>>>>>> hard >>>>>>>>>>> for user >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> know if >>>>>>>>>>>>>>>>>>>>>>>> the first object refers to the >>>>>> active(processing), >>>>>>>>>>>>>>>>>>>> active(restoring), or >>>>>>>>>>>>>>>>>>>>>>>> a standby task. IMHO, we should be more >>>>>> explicit. >>>>>>>>> For >>>>>>>>>>>>>> example, a >>>>>>>>>>>>>>>>>>>>>>>> metadata object could have a flag that one >>>>> can >>>>>>>> test >>>>>>>>> via >>>>>>>>>>>>>>>>>>>> `#isActive()`. >>>>>>>>>>>>>>>>>>>>>>>> Or maybe even better, we could keep the >>>>>> current >>>>>>>> API >>>>>>>>>>> as-is and >>>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>>>>>> something like `standbyMetadataForKey()` >>>>> (and >>>>>>>>> similar >>>>>>>>>>> methods >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>> other). Having just a flag `isActive()` is a >>>>>>>> little >>>>>>>>>>> subtle and >>>>>>>>>>>>>>>>>>>> having >>>>>>>>>>>>>>>>>>>>>>>> new overloads would make the API much >>>>> clearer >>>>>>>>> (passing >>>>>>>>>>> in a >>>>>>>>>>>>>>>>>>> boolean >>>>>>>>>>>>>>>>>>>> flag >>>>>>>>>>>>>>>>>>>>>>>> does not seem to be a nice API). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 6) Do you intent to return all standby >>>>>> metadata >>>>>>>>>>> information at >>>>>>>>>>>>>>>>>>> once, >>>>>>>>>>>>>>>>>>>>>>>> similar to `allMetadata()` -- seems to be >>>>>> useful. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 7) Even if the lag information is propagated >>>>>> to >>>>>>>> all >>>>>>>>>>> instances, >>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>>>> happen in an async manner. Hence, I am >>>>>> wondering >>>>>>>> if >>>>>>>>> we >>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>> address >>>>>>>>>>>>>>>>>>>>>>>> this race condition (I think we should). The >>>>>> idea >>>>>>>>> would >>>>>>>>>>> be to >>>>>>>>>>>>>>>>>>> check >>>>>>>>>>>>>>>>>>>> if a >>>>>>>>>>>>>>>>>>>>>>>> standby/active(restoring) task is actually >>>>>> still >>>>>>>>> within >>>>>>>>>>> the >>>>>>>>>>>>>> lag >>>>>>>>>>>>>>>>>>>> bounds >>>>>>>>>>>>>>>>>>>>>>>> when a query is executed and we would throw >>>>> an >>>>>>>>>>> exception if >>>>>>>>>>>>>> not. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 8) The current `KafkaStreams#state()` method >>>>>> only >>>>>>>>>>> returns a >>>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> stores of active(processing) tasks. How can >>>>> a >>>>>> user >>>>>>>>>>> actually >>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>>>>>>> to an store of an active(restoring) or >>>>> standby >>>>>>>> task >>>>>>>>> for >>>>>>>>>>>>>>>>>> querying? >>>>>>>>>>>>>>>>>>>> Seems >>>>>>>>>>>>>>>>>>>>>>>> we should add a new method to get standby >>>>>> handles? >>>>>>>>>>> Changing >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> semantics to existing `state()` would be >>>>>> possible, >>>>>>>>> but >>>>>>>>>>> I think >>>>>>>>>>>>>>>>>>>> adding a >>>>>>>>>>>>>>>>>>>>>>>> new method is preferable? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 9) How does the user actually specify the >>>>>>>> acceptable >>>>>>>>>>> lag? A >>>>>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>>>>>> config via StreamsConfig (this would be a >>>>>> public >>>>>>>> API >>>>>>>>>>> change >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>>>>>>>>> to be covered in the KIP)? Or on a per-store >>>>>> or >>>>>>>> even >>>>>>>>>>> per-query >>>>>>>>>>>>>>>>>>>> basis for >>>>>>>>>>>>>>>>>>>>>>>> more flexibility? We could also have a >>>>> global >>>>>>>>> setting >>>>>>>>>>> that is >>>>>>>>>>>>>>>>>> used >>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>> default and allow to overwrite it on a >>>>>> per-query >>>>>>>>> basis. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> 10) Do we need to distinguish between >>>>>>>>> active(restoring) >>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> standby >>>>>>>>>>>>>>>>>>>>>>>> tasks? Or could be treat both as the same? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering, rather than putting >>>>>> "acceptable >>>>>>>>> lag" >>>>>>>>>>> into >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> configuration at all, or even making it a >>>>>>>>> parameter on >>>>>>>>>>>>>>>>>>>>>> `allMetadataForKey`, >>>>>>>>>>>>>>>>>>>>>>>>> why not just _always_ return all available >>>>>>>> metadata >>>>>>>>>>>>>> (including >>>>>>>>>>>>>>>>>>>>>>>>> active/standby or lag) and let the caller >>>>>> decide >>>>>>>> to >>>>>>>>>>> which >>>>>>>>>>>>>> node >>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>> want to >>>>>>>>>>>>>>>>>>>>>>>>> route the query? >>>>>>>>>>>>>>>>>>>>>>>>> +1 on exposing lag information via the >>>>> APIs. >>>>>> IMO >>>>>>>>>>> without >>>>>>>>>>>>>> having >>>>>>>>>>>>>>>>>>>>>>>>> continuously updated/fresh lag information, >>>>>> its >>>>>>>>> true >>>>>>>>>>> value >>>>>>>>>>>>>> as a >>>>>>>>>>>>>>>>>>>> signal >>>>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>> query routing decisions is much limited. >>>>> But >>>>>> we >>>>>>>> can >>>>>>>>>>> design >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>>> around >>>>>>>>>>>>>>>>>>>>>>>>> this model and iterate? Longer term, we >>>>>> should >>>>>>>> have >>>>>>>>>>>>>>>>>> continuously >>>>>>>>>>>>>>>>>>>>>> shared lag >>>>>>>>>>>>>>>>>>>>>>>>> information. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> more general to refactor it to >>>>>>>>>>> "allMetadataForKey(long >>>>>>>>>>>>>>>>>>>>>>>>> tolerableDataStaleness, ...)", and when >>>>> it's >>>>>> set >>>>>>>>> to 0 >>>>>>>>>>> it >>>>>>>>>>>>>> means >>>>>>>>>>>>>>>>>>>> "active >>>>>>>>>>>>>>>>>>>>>> task >>>>>>>>>>>>>>>>>>>>>>>>> only". >>>>>>>>>>>>>>>>>>>>>>>>> +1 IMO if we plan on having >>>>>>>>> `enableReplicaServing`, it >>>>>>>>>>> makes >>>>>>>>>>>>>>>>>>> sense >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> generalize based on dataStaleness. This >>>>> seems >>>>>>>>>>> complementary >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> exposing the >>>>>>>>>>>>>>>>>>>>>>>>> lag information itself. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> This is actually not a public api change >>>>> at >>>>>>>> all, >>>>>>>>> and >>>>>>>>>>> I'm >>>>>>>>>>>>>>>>>>>> planning to >>>>>>>>>>>>>>>>>>>>>>>>> implement it asap as a precursor to the >>>>> rest >>>>>> of >>>>>>>>> KIP-441 >>>>>>>>>>>>>>>>>>>>>>>>> +1 again. Do we have a concrete timeline >>>>> for >>>>>> when >>>>>>>>> this >>>>>>>>>>> change >>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>> land on >>>>>>>>>>>>>>>>>>>>>>>>> master? I would like to get the >>>>>> implementation >>>>>>>>> wrapped >>>>>>>>>>> up (as >>>>>>>>>>>>>>>>>>> much >>>>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>>>>>>> possible) by end of the month. :). But I >>>>>> agree >>>>>>>> this >>>>>>>>>>>>>> sequencing >>>>>>>>>>>>>>>>>>>> makes >>>>>>>>>>>>>>>>>>>>>>>>> sense.. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang >>>>>> Wang < >>>>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Navinder, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I have a high level >>>>>> question >>>>>>>>>>> about the >>>>>>>>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>>>>>>> regarding: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> "StreamsMetadataState::allMetadataForKey(boolean >>>>>>>>>>>>>>>>>>>>>> enableReplicaServing...)" >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if it's more general to >>>>>> refactor >>>>>>>> it >>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>> "allMetadataForKey(long >>>>>> tolerableDataStaleness, >>>>>>>>>>> ...)", and >>>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>> it's >>>>>>>>>>>>>>>>>>>>>> set to >>>>>>>>>>>>>>>>>>>>>>>>>> 0 it means "active task only". Behind the >>>>>> scene, >>>>>>>>> we >>>>>>>>>>> can have >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> committed >>>>>>>>>>>>>>>>>>>>>>>>>> offsets to encode the stream time as well, >>>>>> so >>>>>>>> that >>>>>>>>>>> when >>>>>>>>>>>>>>>>>>> processing >>>>>>>>>>>>>>>>>>>>>> standby >>>>>>>>>>>>>>>>>>>>>>>>>> tasks the stream process knows not long >>>>> the >>>>>> lag >>>>>>>> in >>>>>>>>>>> terms of >>>>>>>>>>>>>>>>>>>> offsets >>>>>>>>>>>>>>>>>>>>>>>>>> comparing to the committed offset >>>>>> (internally we >>>>>>>>> call >>>>>>>>>>> it >>>>>>>>>>>>>>>>>> offset >>>>>>>>>>>>>>>>>>>>>> limit), but >>>>>>>>>>>>>>>>>>>>>>>>>> also the lag in terms of timestamp diff >>>>>>>> comparing >>>>>>>>> the >>>>>>>>>>>>>>>>>> committed >>>>>>>>>>>>>>>>>>>>>> offset. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Also encoding the timestamp as part of >>>>>> offset >>>>>>>> have >>>>>>>>>>> other >>>>>>>>>>>>>>>>>>> benefits >>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>>>>>> improving Kafka Streams time semantics as >>>>>> well, >>>>>>>>> but >>>>>>>>>>> for >>>>>>>>>>>>>>>>>> KIP-535 >>>>>>>>>>>>>>>>>>>>>> itself I >>>>>>>>>>>>>>>>>>>>>>>>>> think it can help giving users a more >>>>>> intuitive >>>>>>>>>>> interface to >>>>>>>>>>>>>>>>>>>> reason >>>>>>>>>>>>>>>>>>>>>> about. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John >>>>>> Roesler < >>>>>>>>>>>>>>>>>>> j...@confluent.io> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Navinder, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I've been reading >>>>> over >>>>>> the >>>>>>>>>>> discussion >>>>>>>>>>>>>>>>>> thus >>>>>>>>>>>>>>>>>>>> far, >>>>>>>>>>>>>>>>>>>>>>>>>>> and I have a couple of thoughts to pile >>>>> on >>>>>> as >>>>>>>>> well: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> It seems confusing to propose the API in >>>>>> terms >>>>>>>>> of the >>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>> system >>>>>>>>>>>>>>>>>>>>>>>>>>> state, but also propose how the API would >>>>>> look >>>>>>>>>>> if/when >>>>>>>>>>>>>>>>>> KIP-441 >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> implemented. It occurs to me that the >>>>> only >>>>>> part >>>>>>>>> of >>>>>>>>>>> KIP-441 >>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>> affect you is the availability of the lag >>>>>>>>>>> information in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> SubscriptionInfo message. This is >>>>> actually >>>>>> not >>>>>>>> a >>>>>>>>>>> public api >>>>>>>>>>>>>>>>>>>> change at >>>>>>>>>>>>>>>>>>>>>>>>>>> all, and I'm planning to implement it >>>>> asap >>>>>> as a >>>>>>>>>>> precursor >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> rest >>>>>>>>>>>>>>>>>>>>>>>>>>> of KIP-441, so maybe you can just build >>>>> on >>>>>> top >>>>>>>> of >>>>>>>>>>> KIP-441 >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> assume >>>>>>>>>>>>>>>>>>>>>>>>>>> the lag information will be available. >>>>>> Then you >>>>>>>>>>> could have >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>>> straightforward proposal (e.g., mention >>>>>> that >>>>>>>>> you'd >>>>>>>>>>> return >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> lag >>>>>>>>>>>>>>>>>>>>>>>>>>> information in AssignmentInfo as well as >>>>>> in the >>>>>>>>>>>>>>>>>> StreamsMetadata >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>> some form, or make use of it in the API >>>>>>>> somehow). >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm partially motivated in that former >>>>>> point >>>>>>>>> because >>>>>>>>>>> it >>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>> understanding how callers would bound the >>>>>>>>> staleness >>>>>>>>>>> for >>>>>>>>>>>>>> their >>>>>>>>>>>>>>>>>>> use >>>>>>>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>>>>>>>>>> is _the_ key point for this KIP. FWIW, I >>>>>> think >>>>>>>>> that >>>>>>>>>>> adding >>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>>> method to StreamsMetadataState and >>>>>> deprecating >>>>>>>>> the >>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>> method is >>>>>>>>>>>>>>>>>>>>>>>>>>> the best way to go; we just can't change >>>>>> the >>>>>>>>> return >>>>>>>>>>> types >>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>>>>>>>>>>>>> existing methods. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering, rather than putting >>>>>> "acceptable >>>>>>>>> lag" >>>>>>>>>>> into >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> configuration at all, or even making it a >>>>>>>>> parameter >>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>>>> `allMetadataForKey`, why not just >>>>> _always_ >>>>>>>>> return all >>>>>>>>>>>>>>>>>> available >>>>>>>>>>>>>>>>>>>>>>>>>>> metadata (including active/standby or >>>>> lag) >>>>>> and >>>>>>>>> let >>>>>>>>>>> the >>>>>>>>>>>>>> caller >>>>>>>>>>>>>>>>>>>> decide >>>>>>>>>>>>>>>>>>>>>>>>>>> to which node they want to route the >>>>> query? >>>>>>>> This >>>>>>>>>>> method >>>>>>>>>>>>>> isn't >>>>>>>>>>>>>>>>>>>> making >>>>>>>>>>>>>>>>>>>>>>>>>>> any queries itself; it's merely telling >>>>> you >>>>>>>> where >>>>>>>>>>> the local >>>>>>>>>>>>>>>>>>>> Streams >>>>>>>>>>>>>>>>>>>>>>>>>>> instance _thinks_ the key in question is >>>>>>>> located. >>>>>>>>>>> Just >>>>>>>>>>>>>>>>>>> returning >>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>>>> available information lets the caller >>>>>> implement >>>>>>>>> any >>>>>>>>>>>>>> semantics >>>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>>>>>>>> desire around querying only active >>>>> stores, >>>>>> or >>>>>>>>>>> standbys, or >>>>>>>>>>>>>>>>>>>> recovering >>>>>>>>>>>>>>>>>>>>>>>>>>> stores, or whatever. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> One fly in the ointment, which you may >>>>>> wish to >>>>>>>>>>> consider if >>>>>>>>>>>>>>>>>>>> proposing >>>>>>>>>>>>>>>>>>>>>>>>>>> to use lag information, is that the >>>>> cluster >>>>>>>> would >>>>>>>>>>> only >>>>>>>>>>>>>> become >>>>>>>>>>>>>>>>>>>> aware >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>> new lag information during rebalances. >>>>>> Even in >>>>>>>>> the >>>>>>>>>>> full >>>>>>>>>>>>>>>>>>>> expression of >>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441, this information would stop >>>>> being >>>>>>>>>>> propagated when >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> cluster >>>>>>>>>>>>>>>>>>>>>>>>>>> achieves a balanced task distribution. >>>>>> There >>>>>>>> was >>>>>>>>> a >>>>>>>>>>>>>> follow-on >>>>>>>>>>>>>>>>>>>> idea I >>>>>>>>>>>>>>>>>>>>>>>>>>> POCed to continuously share lag >>>>>> information in >>>>>>>>> the >>>>>>>>>>>>>> heartbeat >>>>>>>>>>>>>>>>>>>>>> protocol, >>>>>>>>>>>>>>>>>>>>>>>>>>> which you might be interested in, if you >>>>>> want >>>>>>>> to >>>>>>>>>>> make sure >>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> nodes >>>>>>>>>>>>>>>>>>>>>>>>>>> are basically _always_ aware of each >>>>>> others' >>>>>>>> lag >>>>>>>>> on >>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>> partitions: >>>>>>>>>>> https://github.com/apache/kafka/pull/7096 >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks again! >>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder >>>>>> Brar >>>>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, Vinoth. Looks like we are on the >>>>>> same >>>>>>>>> page. >>>>>>>>>>> I will >>>>>>>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>>>>>>> these explanations to the KIP as well. >>>>> Have >>>>>>>>> assigned >>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> KAFKA-6144 >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> myself and KAFKA-8994 is closed(by you). >>>>> As >>>>>>>>>>> suggested, we >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>> replace >>>>>>>>>>>>>>>>>>>>>>>>>>> "replica" with "standby". >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> In the new API, >>>>>>>>>>>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean >>>>>>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing, String storeName, K >>>>>> key, >>>>>>>>>>>>>> Serializer<K> >>>>>>>>>>>>>>>>>>>>>>>>>>> keySerializer)" Do we really need a per >>>>> key >>>>>>>>>>> configuration? >>>>>>>>>>>>>>>>>> or a >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig is good enough?>> Coming >>>>> from >>>>>>>>>>> experience, >>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>>> teams >>>>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>> building a platform with Kafka Streams >>>>> and >>>>>>>> these >>>>>>>>>>> API's >>>>>>>>>>>>>> serve >>>>>>>>>>>>>>>>>>>> data to >>>>>>>>>>>>>>>>>>>>>>>>>>> multiple teams, we can't have a >>>>> generalized >>>>>>>>> config >>>>>>>>>>> that >>>>>>>>>>>>>> says >>>>>>>>>>>>>>>>>>> as a >>>>>>>>>>>>>>>>>>>>>>>>>> platform >>>>>>>>>>>>>>>>>>>>>>>>>>> we will support stale reads or not. It >>>>>> should >>>>>>>> be >>>>>>>>> the >>>>>>>>>>> choice >>>>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>>>>>> someone >>>>>>>>>>>>>>>>>>>>>>>>>> who >>>>>>>>>>>>>>>>>>>>>>>>>>> is calling the API's to choose whether >>>>>> they are >>>>>>>>> ok >>>>>>>>>>> with >>>>>>>>>>>>>> stale >>>>>>>>>>>>>>>>>>>> reads >>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>> not. >>>>>>>>>>>>>>>>>>>>>>>>>>> Makes sense? >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thursday, 17 October, 2019, >>>>>> 11:56:02 pm >>>>>>>>> IST, >>>>>>>>>>> Vinoth >>>>>>>>>>>>>>>>>>>> Chandar < >>>>>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Looks like we are covering ground :) >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Only if it is within a permissible >>>>>>>> range(say >>>>>>>>>>> 10000) we >>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>> serve >>>>>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>> Restoring state of active. >>>>>>>>>>>>>>>>>>>>>>>>>>>> +1 on having a knob like this.. My >>>>>> reasoning >>>>>>>> is >>>>>>>>> as >>>>>>>>>>>>>> follows. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking at the Streams state as a >>>>>> read-only >>>>>>>>>>> distributed kv >>>>>>>>>>>>>>>>>>>> store. >>>>>>>>>>>>>>>>>>>>>> With >>>>>>>>>>>>>>>>>>>>>>>>>>>> num_standby = f , we should be able to >>>>>>>> tolerate >>>>>>>>> f >>>>>>>>>>> failures >>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>> a f+1' failure, the system should be >>>>>>>>> unavailable. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> A) So with num_standby=0, the system >>>>>> should be >>>>>>>>>>> unavailable >>>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>>> 1 failure and thats my argument for not >>>>>>>> allowing >>>>>>>>>>> querying >>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>> restoration >>>>>>>>>>>>>>>>>>>>>>>>>>>> state, esp in this case it will be a >>>>> total >>>>>>>>> rebuild >>>>>>>>>>> of the >>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>> (which >>>>>>>>>>>>>>>>>>>>>>>>>>> IMO >>>>>>>>>>>>>>>>>>>>>>>>>>>> cannot be considered a normal fault free >>>>>>>>> operational >>>>>>>>>>>>>> state). >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> B) Even there are standby's, say >>>>>>>> num_standby=2, >>>>>>>>> if >>>>>>>>>>> the >>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>> decides >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> shut >>>>>>>>>>>>>>>>>>>>>>>>>>>> down all 3 instances, then only outcome >>>>>> should >>>>>>>>> be >>>>>>>>>>>>>>>>>>> unavailability >>>>>>>>>>>>>>>>>>>>>> until >>>>>>>>>>>>>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>>>>>>>>>>>> of them come back or state is rebuilt on >>>>>> other >>>>>>>>>>> nodes in >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> cluster. In >>>>>>>>>>>>>>>>>>>>>>>>>>>> normal operations, f <= 2 and when a >>>>>> failure >>>>>>>>> does >>>>>>>>>>> happen >>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>>>>>>> either >>>>>>>>>>>>>>>>>>>>>>>>>>>> choose to be C over A and fail IQs until >>>>>>>>>>> replication is >>>>>>>>>>>>>>>>>> fully >>>>>>>>>>>>>>>>>>>>>> caught up >>>>>>>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>>>> choose A over C by serving in restoring >>>>>> state >>>>>>>> as >>>>>>>>>>> long as >>>>>>>>>>>>>> lag >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>> minimal. >>>>>>>>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>>>>>> even with f=1 say, all the standbys are >>>>>>>> lagging >>>>>>>>> a >>>>>>>>>>> lot due >>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>> issue, >>>>>>>>>>>>>>>>>>>>>>>>>>>> then that should be considered a failure >>>>>> since >>>>>>>>> that >>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>> normal/expected operational mode. >>>>> Serving >>>>>>>> reads >>>>>>>>> with >>>>>>>>>>>>>>>>>> unbounded >>>>>>>>>>>>>>>>>>>>>>>>>>> replication >>>>>>>>>>>>>>>>>>>>>>>>>>>> lag and calling it "available" may not >>>>> be >>>>>> very >>>>>>>>>>> usable or >>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>> desirable >>>>>>>>>>>>>>>>>>>>>>>>>>> :) >>>>>>>>>>>>>>>>>>>>>>>>>>>> IMHO, since it gives the user no way to >>>>>> reason >>>>>>>>>>> about the >>>>>>>>>>>>>> app >>>>>>>>>>>>>>>>>>>> that is >>>>>>>>>>>>>>>>>>>>>>>>>>> going >>>>>>>>>>>>>>>>>>>>>>>>>>>> to query this store. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> So there is definitely a need to >>>>>> distinguish >>>>>>>>>>> between : >>>>>>>>>>>>>>>>>>>> Replication >>>>>>>>>>>>>>>>>>>>>>>>>>> catchup >>>>>>>>>>>>>>>>>>>>>>>>>>>> while being in fault free state vs >>>>>> Restoration >>>>>>>>> of >>>>>>>>>>> state >>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>> lose >>>>>>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>>>> than f standbys. This knob is a great >>>>>> starting >>>>>>>>> point >>>>>>>>>>>>>> towards >>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> If you agree with some of the >>>>> explanation >>>>>>>> above, >>>>>>>>>>> please >>>>>>>>>>>>>> feel >>>>>>>>>>>>>>>>>>>> free to >>>>>>>>>>>>>>>>>>>>>>>>>>>> include it in the KIP as well since this >>>>>> is >>>>>>>>> sort of >>>>>>>>>>> our >>>>>>>>>>>>>>>>>> design >>>>>>>>>>>>>>>>>>>>>>>>>> principle >>>>>>>>>>>>>>>>>>>>>>>>>>>> here.. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Small nits : >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> - let's standardize on "standby" instead >>>>>> of >>>>>>>>>>> "replica", KIP >>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>> code, to >>>>>>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>> consistent with rest of Streams >>>>> code/docs? >>>>>>>>>>>>>>>>>>>>>>>>>>>> - Can we merge KAFKA-8994 into >>>>> KAFKA-6144 >>>>>> now >>>>>>>>> and >>>>>>>>>>> close >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> former? >>>>>>>>>>>>>>>>>>>>>>>>>>>> Eventually need to consolidate >>>>> KAFKA-6555 >>>>>> as >>>>>>>>> well >>>>>>>>>>>>>>>>>>>>>>>>>>>> - In the new API, >>>>>>>>>>>>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean >>>>>>>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing, String storeName, >>>>> K >>>>>> key, >>>>>>>>>>>>>> Serializer<K> >>>>>>>>>>>>>>>>>>>>>>>>>>> keySerializer)" Do >>>>>>>>>>>>>>>>>>>>>>>>>>>> we really need a per key configuration? >>>>>> or a >>>>>>>> new >>>>>>>>>>>>>>>>>> StreamsConfig >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>>>>>>>>> enough? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder >>>>>> Brar >>>>>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> >>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Vinoth, I have incorporated a few of >>>>> the >>>>>>>>>>> discussions we >>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>> had >>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the current code, t0 and t1 serve >>>>>> queries >>>>>>>>> from >>>>>>>>>>>>>>>>>>>> Active(Running) >>>>>>>>>>>>>>>>>>>>>>>>>>>>> partition. For case t2, we are planning >>>>>> to >>>>>>>>> return >>>>>>>>>>>>>>>>>>>>>>>>>> List<StreamsMetadata> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> such that it returns >>>>> <StreamsMetadata(A), >>>>>>>>>>>>>>>>>> StreamsMetadata(B)> >>>>>>>>>>>>>>>>>>>> so >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>> if IQ >>>>>>>>>>>>>>>>>>>>>>>>>>>>> fails on A, the replica on B can serve >>>>>> the >>>>>>>>> data by >>>>>>>>>>>>>> enabling >>>>>>>>>>>>>>>>>>>> serving >>>>>>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>> replicas. This still does not solve >>>>> case >>>>>> t3 >>>>>>>>> and t4 >>>>>>>>>>> since >>>>>>>>>>>>>> B >>>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>> been >>>>>>>>>>>>>>>>>>>>>>>>>>>>> promoted to active but it is in >>>>> Restoring >>>>>>>>> state to >>>>>>>>>>>>>> catchup >>>>>>>>>>>>>>>>>>>> till A’s >>>>>>>>>>>>>>>>>>>>>>>>>>> last >>>>>>>>>>>>>>>>>>>>>>>>>>>>> committed position as we don’t serve >>>>> from >>>>>>>>>>> Restoring state >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>> Active >>>>>>>>>>>>>>>>>>>>>>>>>>> and new >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Replica on R is building itself from >>>>>> scratch. >>>>>>>>> Both >>>>>>>>>>> these >>>>>>>>>>>>>>>>>>> cases >>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>>>>>> solved if we start serving from >>>>> Restoring >>>>>>>>> state of >>>>>>>>>>> active >>>>>>>>>>>>>>>>>> as >>>>>>>>>>>>>>>>>>>> well >>>>>>>>>>>>>>>>>>>>>>>>>>> since it >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is almost equivalent to previous >>>>> Active. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> There could be a case where all >>>>> replicas >>>>>> of a >>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>> become >>>>>>>>>>>>>>>>>>>>>>>>>>> unavailable >>>>>>>>>>>>>>>>>>>>>>>>>>>>> and active and all replicas of that >>>>>> partition >>>>>>>>> are >>>>>>>>>>>>>> building >>>>>>>>>>>>>>>>>>>>>> themselves >>>>>>>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>> scratch, in this case, the state in >>>>>> Active is >>>>>>>>> far >>>>>>>>>>> behind >>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>>>>>> though >>>>>>>>>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>> in Restoring state. To cater to such >>>>>> cases >>>>>>>>> that we >>>>>>>>>>> don’t >>>>>>>>>>>>>>>>>>> serve >>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state we can either add another state >>>>>> before >>>>>>>>>>> Restoring or >>>>>>>>>>>>>>>>>>>> check the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> difference between last committed >>>>> offset >>>>>> and >>>>>>>>>>> current >>>>>>>>>>>>>>>>>>> position. >>>>>>>>>>>>>>>>>>>> Only >>>>>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is within a permissible range (say >>>>>> 10000) we >>>>>>>>> will >>>>>>>>>>> serve >>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>> Restoring >>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state of Active. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wednesday, 16 October, 2019, >>>>>> 10:01:35 >>>>>>>> pm >>>>>>>>> IST, >>>>>>>>>>>>>> Vinoth >>>>>>>>>>>>>>>>>>>> Chandar >>>>>>>>>>>>>>>>>>>>>> < >>>>>>>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the updates on the KIP, >>>>>> Navinder! >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Few comments >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - AssignmentInfo is not public API?. >>>>> But >>>>>> we >>>>>>>>> will >>>>>>>>>>> change >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> thus >>>>>>>>>>>>>>>>>>>>>>>>>>> need to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> increment the version and test for >>>>>>>>> version_probing >>>>>>>>>>> etc. >>>>>>>>>>>>>>>>>> Good >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> separate >>>>>>>>>>>>>>>>>>>>>>>>>>>>> that from StreamsMetadata changes >>>>> (which >>>>>> is >>>>>>>>> public >>>>>>>>>>> API) >>>>>>>>>>>>>>>>>>>>>>>>>>>>> - From what I see, there is going to be >>>>>>>> choice >>>>>>>>>>> between >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> A) introducing a new >>>>>>>>>>> *KafkaStreams::allMetadataForKey() >>>>>>>>>>>>>>>>>> *API >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>> potentially returns >>>>> List<StreamsMetadata> >>>>>>>>> ordered >>>>>>>>>>> from >>>>>>>>>>>>>> most >>>>>>>>>>>>>>>>>>>> upto >>>>>>>>>>>>>>>>>>>>>> date >>>>>>>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>>>> least upto date replicas. Today we >>>>> cannot >>>>>>>> fully >>>>>>>>>>> implement >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>> ordering, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> since all we know is which hosts are >>>>>> active >>>>>>>> and >>>>>>>>>>> which are >>>>>>>>>>>>>>>>>>>> standbys. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, this aligns well with the >>>>>> future. >>>>>>>>> KIP-441 >>>>>>>>>>> adds >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> lag >>>>>>>>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the rebalancing protocol. We could >>>>>> also >>>>>>>> sort >>>>>>>>>>> replicas >>>>>>>>>>>>>>>>>>> based >>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>> report lags eventually. This is fully >>>>>>>> backwards >>>>>>>>>>>>>> compatible >>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>>>>>>>>>>> clients. Only drawback I see is the >>>>>> naming of >>>>>>>>> the >>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>> method >>>>>>>>>>>>>>>>>>>>>>>>>>>>> KafkaStreams::metadataForKey, not >>>>>> conveying >>>>>>>> the >>>>>>>>>>>>>> distinction >>>>>>>>>>>>>>>>>>>> that it >>>>>>>>>>>>>>>>>>>>>>>>>>> simply >>>>>>>>>>>>>>>>>>>>>>>>>>>>> returns the active replica i.e >>>>>>>>>>> allMetadataForKey.get(0). >>>>>>>>>>>>>>>>>>>>>>>>>>>>> B) Change >>>>>> KafkaStreams::metadataForKey() to >>>>>>>>>>> return a >>>>>>>>>>>>>> List. >>>>>>>>>>>>>>>>>>>> Its a >>>>>>>>>>>>>>>>>>>>>>>>>>> breaking >>>>>>>>>>>>>>>>>>>>>>>>>>>>> change. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I prefer A, since none of the >>>>>>>>> semantics/behavior >>>>>>>>>>> changes >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> existing >>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. Love to hear more thoughts. Can >>>>> we >>>>>>>> also >>>>>>>>>>> work this >>>>>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> KIP? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I already implemented A to unblock >>>>>> myself for >>>>>>>>> now. >>>>>>>>>>> Seems >>>>>>>>>>>>>>>>>>>> feasible >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>>>> do. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth >>>>>>>>> Chandar < >>>>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I get your point. But suppose there >>>>>> is a >>>>>>>>>>> replica which >>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>>>> become >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> active, so in that case replica will >>>>>> still >>>>>>>> be >>>>>>>>>>> building >>>>>>>>>>>>>>>>>>> itself >>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>> scratch >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and this active will go to restoring >>>>>> state >>>>>>>>> till it >>>>>>>>>>>>>> catches >>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>>>>>>> previous >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> active, wouldn't serving from a >>>>>> restoring >>>>>>>>> active >>>>>>>>>>> make >>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>> sense >>>>>>>>>>>>>>>>>>>>>>>>>>> than a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replica in such case. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 will change this behavior such >>>>>> that >>>>>>>>>>> promotion to >>>>>>>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>>>>>>>>>>>>> happens >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on how caught up a replica is. >>>>> So, >>>>>>>> once >>>>>>>>> we >>>>>>>>>>> have >>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> (work >>>>>>>>>>>>>>>>>>>>>>>>>>> underway >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already for 2.5 IIUC) and user sets >>>>>>>>>>>>>> num.standby.replicas > >>>>>>>>>>>>>>>>>>> 0, >>>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window should not be that >>>>>> long as >>>>>>>>> you >>>>>>>>>>>>>> describe. >>>>>>>>>>>>>>>>>>> IMO >>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wants >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> availability for state, then should >>>>>>>> configure >>>>>>>>>>>>>>>>>>>> num.standby.replicas >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> 0. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not, then on a node loss, few >>>>> partitions >>>>>>>>> would be >>>>>>>>>>>>>>>>>>> unavailable >>>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (there are other ways to bring this >>>>>> window >>>>>>>>> down, >>>>>>>>>>> which I >>>>>>>>>>>>>>>>>>> won't >>>>>>>>>>>>>>>>>>>>>>>>>> bring >>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here). We could argue for querying a >>>>>>>> restoring >>>>>>>>>>> active >>>>>>>>>>>>>>>>>> (say a >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>>>>> node >>>>>>>>>>>>>>>>>>>>>>>>>>>>> added >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to replace a faulty old node) based on >>>>>> AP vs >>>>>>>>> CP >>>>>>>>>>>>>>>>>> principles. >>>>>>>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading really really old values for >>>>> the >>>>>>>> sake >>>>>>>>> of >>>>>>>>>>>>>>>>>>> availability >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> useful. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> No >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AP data system would be inconsistent >>>>> for >>>>>>>> such >>>>>>>>> a >>>>>>>>>>> long >>>>>>>>>>>>>> time >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>> practice. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, I still feel just limiting this to >>>>>>>> standby >>>>>>>>>>> reads >>>>>>>>>>>>>>>>>>> provides >>>>>>>>>>>>>>>>>>>> best >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just my 2c. Would love to see what >>>>>> others >>>>>>>>> think >>>>>>>>>>> as well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM >>>>> Navinder >>>>>>>> Brar >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> >>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Vinoth, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can we link the JIRA, discussion >>>>>> thread >>>>>>>>> also to >>>>>>>>>>> the >>>>>>>>>>>>>>>>>> KIP.>> >>>>>>>>>>>>>>>>>>>>>> Added. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Based on the discussion on >>>>> KAFKA-6144, >>>>>> I >>>>>>>> was >>>>>>>>>>> under the >>>>>>>>>>>>>>>>>>>> impression >>>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this KIP is also going to cover >>>>>> exposing of >>>>>>>>> the >>>>>>>>>>> standby >>>>>>>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsMetadata and thus subsume >>>>>>>> KAFKA-8994 . >>>>>>>>>>> That >>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> require >>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>>> public >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API change?>> Sure, I can add changes >>>>>> for >>>>>>>>> 8994 >>>>>>>>>>> in this >>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> link >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP seems to be focussing on >>>>>> restoration >>>>>>>>> when a >>>>>>>>>>> new >>>>>>>>>>>>>> node >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>> added. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 is underway and has some >>>>> major >>>>>>>>> changes >>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>> It >>>>>>>>>>>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be good to clarify dependencies if >>>>> any. >>>>>>>>> Without >>>>>>>>>>>>>> KIP-441, >>>>>>>>>>>>>>>>>> I >>>>>>>>>>>>>>>>>>>> am not >>>>>>>>>>>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we should allow reads from nodes >>>>> in >>>>>>>>> RESTORING >>>>>>>>>>> state, >>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>>>>>> amount >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to many minutes/few hours of stale >>>>>> reads? >>>>>>>>> This >>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> querying standby replicas, which >>>>> could >>>>>> be >>>>>>>>> mostly >>>>>>>>>>> caught >>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window could be much >>>>>>>>>>> smaller/tolerable. (once >>>>>>>>>>>>>>>>>>>> again the >>>>>>>>>>>>>>>>>>>>>>>>>>> focus >>>>>>>>>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But >>>>>>>> suppose >>>>>>>>>>> there is a >>>>>>>>>>>>>>>>>>>> replica >>>>>>>>>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just become active, so in that case >>>>>> replica >>>>>>>>> will >>>>>>>>>>> still >>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> building >>>>>>>>>>>>>>>>>>>>>>>>>>>>> itself >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from scratch and this active will go >>>>> to >>>>>>>>>>> restoring state >>>>>>>>>>>>>>>>>>> till >>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>>>>> catches >>>>>>>>>>>>>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with previous active, wouldn't >>>>> serving >>>>>>>> from a >>>>>>>>>>> restoring >>>>>>>>>>>>>>>>>>>> active >>>>>>>>>>>>>>>>>>>>>>>>>> make >>>>>>>>>>>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sense than a replica in such case. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Finally, we may need to introduce a >>>>>>>>>>> configuration to >>>>>>>>>>>>>>>>>>> control >>>>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>> Some >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users may prefer errors to stale >>>>> data. >>>>>> Can >>>>>>>> we >>>>>>>>>>> also add >>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> KIP?>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Will add this. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Navinder >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth >>>>> Chandar < >>>>>>>>>>>>>> v...@confluent.io >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Navinder,> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few >>>>>> thoughts> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Can we link the JIRA, discussion >>>>>> thread >>>>>>>>> also >>>>>>>>>>> to the >>>>>>>>>>>>>>>>>> KIP> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Based on the discussion on >>>>>> KAFKA-6144, I >>>>>>>>> was >>>>>>>>>>> under >>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>> impression >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this KIP is also going to cover >>>>>> exposing >>>>>>>> of >>>>>>>>> the >>>>>>>>>>>>>> standby >>>>>>>>>>>>>>>>>>>>>>>>>>> information in> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsMetadata and thus subsume >>>>>>>> KAFKA-8994 >>>>>>>>> . >>>>>>>>>>> That >>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> require >>>>>>>>>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API change?> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - KIP seems to be focussing on >>>>>> restoration >>>>>>>>> when >>>>>>>>>>> a new >>>>>>>>>>>>>>>>>> node >>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>>>>> added.> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 is underway and has some >>>>> major >>>>>>>>> changes >>>>>>>>>>>>>> proposed >>>>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>> It >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be good to clarify dependencies if >>>>>> any. >>>>>>>>> Without >>>>>>>>>>>>>>>>>> KIP-441, I >>>>>>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we should allow reads from nodes >>>>> in >>>>>>>>> RESTORING >>>>>>>>>>>>>> state, >>>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> amount> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to many minutes/few hours of stale >>>>>> reads? >>>>>>>>> This >>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> different >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fromallowing> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> querying standby replicas, which >>>>>> could be >>>>>>>>> mostly >>>>>>>>>>>>>> caught >>>>>>>>>>>>>>>>>> up >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>>> the> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window could be much >>>>>>>>>>> smaller/tolerable. >>>>>>>>>>>>>> (once >>>>>>>>>>>>>>>>>>>> again >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> focus >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-8994)> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Finally, we may need to introduce >>>>> a >>>>>>>>>>> configuration to >>>>>>>>>>>>>>>>>>>> control >>>>>>>>>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Some> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users may prefer errors to stale >>>>>> data. Can >>>>>>>>> we >>>>>>>>>>> also add >>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>> to the >>>>>>>>>>>>>>>>>>>>>>>>>>> KIP?> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Vinoth> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM >>>>>> Navinder >>>>>>>>> Brar> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Starting a discussion on the KIP to >>>>>> Allow >>>>>>>>> state >>>>>>>>>>>>>> stores >>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>> serve >>>>>>>>>>>>>>>>>>>>>>>>>>>>> stale> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reads during rebalance(> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ).> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks & Regards,Navinder> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LinkedIn> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> > > >
signature.asc
Description: OpenPGP digital signature