10. I considered that. Had to pick one or the other. Can just return standby too and rename method to may be “allLocalStandbyOffsetLags()” to have it explicit. (Standby should implicitly convey that we are talking about stores)
20. What I meant was, we are returning HostInfo instead of StreamsMetadata since thats sufficient to route query; same for “int partition “ vs topic partition before. Previously KeyQueryMetadata had similar structure but used StreamsMetadata and TopicPartition objects to convey same information @navinder KIP is already upto date with the email I sent, except for the reasonings I was laying out. +1 on revisiting rejected alternatives. Please make the follow up changes On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for the summary Vinoth! > > I buy the overall argument. Couple of clarification questions: > > > 10. Why do we need to include the active stores in > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag > for standbys? > > > 20: What does > > > Thin the KeyQueryMetadata object to just contain the minimum information > > needed. > > exaclty mean? What is the "minimum information needed" ? > > > @Navinder: if you agree, can you update the KIP accoringly? With all the > proposals, it's hard to keep track and it would be great to have the > current proposal summarized in the wiki page. > > Please also update the "Rejected alternative" sections to avoid that we > cycle back to old proposal (including the reason _why_ they got rejected). > > > Thanks a lot! > > > -Matthias > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > Given we have had a healthy discussion on this topic for a month now and > > still with many loose ends and open ended conversations, I thought It > would > > be worthwhile to take a step back and re-evaluate everything in the > context > > of the very real use-case and its specific scenarios. > > > > First, let's remind ourselves of the query routing flow of the streams > > application ("app" here on) > > > > 1. queries get routed to any random streams instance in the cluster > > ("router" here on) > > 2. router then uses Streams metadata to pick active/standby instances > > for that key's store/partition > > 3. router instance also maintains global lag information for all > stores > > and all their partitions, by a gossip/broadcast/heartbeat protocol > (done > > outside of Streams framework), but using KafkaStreams#allMetadata() > for > > streams instance discovery. > > 4. router then uses information in 2 & 3 to determine which instance > to > > send the query to : always picks active instance if alive or the most > > in-sync live standby otherwise. > > > > Few things to note : > > > > A) We choose to decouple how the lag information is obtained (control > > plane) from query path (data plane), since that provides more flexibility > > in designing the control plane. i.e pick any or combination of gossip, > > N-way broadcast, control the rate of propagation, piggybacking on request > > responses > > B) Since the app needs to do its own control plane, talking to other > > instances directly for failure detection & exchanging other metadata, we > > can leave the lag APIs added to KafkaStreams class itself local and > simply > > return lag for all store/partitions on that instance. > > C) Streams preserves its existing behavior of instances only talking to > > each other through the Kafka brokers. > > D) Since the router treats active/standby differently, it would be good > for > > the KafkaStreams APIs to hand them back explicitly, with no additional > > logic needed for computing them. Specifically, the router only knows two > > things - key and store and if we just return a > Collection<StreamsMetadata> > > back, it cannot easily tease apart active and standby. Say, a streams > > instance hosts the same store as both active and standby for different > > partitions, matching by just storename the app will find it in both > active > > and standby lists. > > E) From above, we assume the global lag estimate (lag per store topic > > partition) are continuously reported amongst application instances and > > already available on the router during step 2 above. Hence, attaching lag > > APIs to StreamsMetadata is unnecessary and does not solve the needs > anyway. > > F) Currently returned StreamsMetadata object is really information about > a > > streams instance, that is not very specific to the key being queried. > > Specifically, router has no knowledge of the topic partition a given key > > belongs, this is needed for matching to the global lag information in > step > > 4 above (and as also the example code in the KIP showed before). The > > StreamsMetadata, since it's about the instance itself, would contain all > > topic partitions and stores on that instance, not specific to the given > key. > > G) A cleaner API would thin the amount of information returned to > > specifically the given key and store - which is partition the key belongs > > to, active host info, list of standby host info. KeyQueryMetadata was > > proposed in this spirit, but still hung onto StreamsMetadata as the > member > > field(s) which leaks more than what's required to route. > > H) For this use case, its sufficient to just support offsetLag initially, > > without needing time based lag information right away. So we probably > > don't need a new top level StoreLagInfo class anymore. > > I) On whether the local lag information is cached within Streams or App > > code, its again preferable and be necessary for the App to be able to > > invoke the lag api from an app thread and keep a cached state in memory, > to > > be piggybacked with request responses. If this information is also cached > > inside Streams, it does no harm anyway. > > > > Based on this reasoning, I have made the following updates to the KIP > > > > - Drop the proposed KafkaStreams#allStandbyMetadata() and > > KafkaStreams#allStandbyMetadataForStore() methods, with intent of > > introducing minimal APIs to support the usage described above > > - Drop the KafkaStreams#lagInfoForStore() method and just stick to one > > method that returns all the local store partition's lags. > > - Rename allLagInfo() to allLocalStoreOffsetLags() to be explicit that > the > > lags are local and only on offsets. > > - Drop StoreLagInfo class > > - allLocalStoreOffsetLags() returns Map<String, Map<Integer, Long>>, > which > > maps a store name to a map containing partition to offset lag info. It > will > > include both active and standby store partitions. active always has 0 lag > > since its caught up with changelog topic. > > - Thin the KeyQueryMetadata object to just contain the minimum > information > > needed. Rename allMetadataForKey() method variants to > queryMetadataForKey() > > variants that return KeyQueryMetadata > > - Propose deprecating two current methods that return metadata based on > key > > : KafkaStreams#metadataForKey(..), to get more users to use the new > > queryMetadataForKey APIs > > - We will still have to enhance StreamsMetadata with fields for > > standbyTopicPartitions and standbyStateStoreNames, since that is a core > > object that gets updated upon rebalance. > > > > > > Please let us know if this is agreeable. We will also work some of this > > discussion into the background/proposed changes sections, upon feedback. > > > > > > On Tue, Nov 12, 2019 at 9:17 AM Vinoth Chandar <vchan...@confluent.io> > > wrote: > > > >> In all, is everyone OK with > >> > >> - Dropping KeyQueryMetadata, and the allMetadataForKey() apis > >> - Dropping allLagInfo() from KafkaStreams class, Drop StoreLagInfo > class > >> - Add offsetLag(store, key, serializer) -> Optional<Long> & > >> offsetLag(store, key, partitioner) -> Optional<Long> to StreamsMetadata > >> - Duplicate the current methods for standbyMetadata in KafkaStreams : > >> allStandbyMetadata(), allStandbyMetadataForStore(), two variants of > >> standbyMetadataForKey(), > >> > >> > >> Responses to Guozhang: > >> > >> 1.1 Like I mentioned before, the allStandbyMetadata() and > >> allStandbyMetadataForStore() complement existing allMetadata() and > >> allMetadataForStore(), since we don't want to change behavior of > existing > >> APIs. Based on discussions so far, if we decide to drop > KeyQueryMetadata, > >> then we will need to introduce 4 equivalents for standby metadata as > >> Matthias mentioned. > >> 1.2 I am okay with pushing lag information to a method on > StreamsMetadata > >> (Personally, I won't design it like that, but happy to live with it) > like > >> what Matthias suggested. But assuming topic name <=> store name > equivalency > >> for mapping this seems like a broken API to me. If all of Streams code > were > >> written like this, I can understand. But I don't think its the case? I > >> would not be comfortable making such assumptions outside of public APIs. > >>>> look into each one's standby partition / stores to tell which one > >> StreamsMetadata is corresponding to the instance who holds a specific > key > >> as standby, yes, but I feel this one extra iteration is worth to avoid > >> introducing a new class. > >> This sort of thing would lead to non-standardized/potentially buggy > client > >> implementations, for something I expect the system would hand me > directly. > >> I don't personally feel introducing a new class is so bad, to warrant > the > >> user to do all this matching. Given the current APIs are not explicitly > >> named to denote active metadata, it gives us a chance to build something > >> more direct and clear IMO. If we do allMetadataForKey apis, then we > should > >> clearly separate active and standby ourselves. Alternate is separate > active > >> and standby APIs as Matthias suggests, which I can make peace with. > >> > >> 1.3 Similar as above. In Streams code, we treat topic partitions and > store > >> names separately?. > >> 2.1 I think most databases build replication using logical offsets, not > >> time. Time lag can be a nice to have feature, but offset lag is fully > >> sufficient for a lot of use-cases. > >> 2.2.1 We could support a lagInfoForStores() batch api. makes sense. > >> > >> > >> Responses to Matthias : > >> (100) Streams can still keep the upto date version in memory and > >> implementation could be for now just reading this already refreshed > value. > >> Designing the API, with intent of pushing this to the user keeps doors > >> open for supporting time based lag in the future. > >> (101) I am not sure what the parameters of evaluating approaches here > is. > >> Generally, when I am handed a Metadata object, I don't expect to further > >> query it for more information semantically. I would not also force user > to > >> make separate calls for active and standby metadata. > >> Well, that may be just me. So sure, we can push this into > StreamsMetadata > >> if everyone agrees! > >> +1 on duplicating all 4 methods for standbys in this case. > >> > >> > >> > >> > >> > >> On Tue, Nov 12, 2019 at 4:12 AM Navinder Brar > >> <navinder_b...@yahoo.com.invalid> wrote: > >> > >>> > >>> - Looking back, I agree that 2 calls to StreamsMetadata to fetch > >>> StreamsMetadata and then using something like ‘long > >>> StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is > better > >>> than introducing a new public API i.e. ‘KeyQueryMetadata’. I will > change > >>> the KIP accordingly. > >>> - >> I am actually not even sure, why we added > >>> `StreamsMetadata#topicPartitions()` originally > >>> I think it is helpful in showing which host holds which source topic > >>> partitions for /instances endpoint and when you query a key, you need > to > >>> match the partition on which the key belongs to with the hosts holding > >>> source topic partitions for that partition. Is there any other way to > get > >>> this info? > >>> > >>> On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang < > >>> wangg...@gmail.com> wrote: > >>> > >>> Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams > instances, so > >>> 1) allMetadata would still return the same number of StreamsMetadata in > >>> collection, just that within the StreamsMetadata now you have new APIs > to > >>> access standby partitions / stores. So I think it would not be a > breaking > >>> change to the public API to not include `allStandbyMetadata` and ` > >>> allStandbyMetadataForStore` but still rely on `allMetadata(ForStore)`? > >>> > >>> Regarding 1.1: Good point about the partition number. But I'm still > >>> wondering if it is definitely necessary to introduce a new > >>> `KeyQueryMetadata` > >>> interface class. E.g. suppose our function signature is > >>> > >>> Collection<StreamsMetadata> allMetadataForKey > >>> > >>> When you get the collection of StreamsMetadata you need to iterate over > >>> the > >>> collection and look into each one's standby partition / stores to tell > >>> which one StreamsMetadata is corresponding to the instance who holds a > >>> specific key as standby, yes, but I feel this one extra iteration is > worth > >>> to avoid introducing a new class. > >>> > >>> > >>> Guozhang > >>> > >>> On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax <matth...@confluent.io > > > >>> wrote: > >>> > >>>> I agree, that we might want to drop the time-base lag for the initial > >>>> implementation. There is no good way to get this information without a > >>>> broker side change. > >>>> > >>>> > >>>> > >>>> (100) For the offset lag information, I don't see a reason why the app > >>>> should drive when this information is updated though, because KS will > >>>> update this information anyway (once per `commit.interval.ms` -- and > >>>> updating it more frequently does not make sense, as it most likely > won't > >>>> change more frequently anyway). > >>>> > >>>> If you all insist that the app should drive it, I can live with it, > but > >>>> I think it makes the API unnecessarily complex without a benefit. > >>>> > >>>> > >>>> > >>>> (101) I still don't understand why we need to have `KeyQueryMetadata` > >>>> though. Note, that an instance can only report lag for it's local > >>>> stores, but not remote stores as it does not know to what offset a > >>>> remote standby has caught up to. > >>>> > >>>>> Because we needed to return the topicPartition the key belongs to, in > >>>>> order to correlate with the lag information from the other set of > >>> APIs. > >>>> > >>>> My suggestion is to get the lag information from `StreamsMetadata` -- > >>>> which partition the store belongs to can be completely encapsulated > >>>> within KS as all information is local, and I don't think we need to > >>>> expose it to the user at all. > >>>> > >>>> We can just add `long StreamsMetadata#offsetLag(store, key)`. If the > >>>> store is local we return its lag, if it's remote we return `-1` (ie, > >>>> UNKNOWN). As an alternative, we can change the return type to > >>>> `Optional<Long>`. This works for active and standby task alike. > >>>> > >>>> Note, that a user must verify if `StreamsMeatadata` is for itself > >>>> (local) or remote anyway. We only need to provide a way that allows > >>>> users to distinguish between active an standby. (More below.) > >>>> > >>>> > >>>> I am actually not even sure, why we added > >>>> `StreamsMetadata#topicPartitions()` originally -- seems pretty > useless. > >>>> Can we deprecate it as side cleanup in this KIP? Or do I miss > something? > >>>> > >>>> > >>>> > >>>> (102) > >>>> > >>>>> There are basically 2 reasons. One is that instead of having two > >>>> functions, one to get StreamsMetadata for active and one for replicas. > >>> We > >>>> are fetching both in a single call and we have a way to get only > active > >>> or > >>>> only replicas from the KeyQueryMetadata object(just like isStandby() > and > >>>> isActive() discussion we had earlier) > >>>> > >>>> I understand, that we need two methods. However, I think we can > simplify > >>>> the API and not introduce `KeyQueryMetadata`, but just "duplicate" > all 4 > >>>> existing methods for standby tasks: > >>>> > >>>> // note that `standbyMetadataForKey` return a Collection in contrast > to > >>>> existing `metadataForKey` > >>>> > >>>>> Collection<StreamsMetadata> allStandbyMetadata() > >>>>> Collection<StreamsMetadata> allStandbyMetadataForStore(String > >>>> storeName) > >>>>> Collection<StreamsMetadata> metadataForKey(String storeName, K key, > >>>> Serializer<K> keySerializer) > >>>>> Collection<StreamsMetadata> metadataForKey(String storeName, K key, > >>>> StreamPartitioner<? super K, ?> partitioner) > >>>> > >>>> Because the existing methods return all active metadata, there is no > >>>> reason to return `KeyQueryMetadata` as it's more complicated to get > the > >>>> standby metadata. With `KeyQueryMetadata` the user needs to make more > >>>> calls to get the metadata: > >>>> > >>>> KafkaStreams#allMetadataForKey() > >>>> #getActive() > >>>> > >>>> KafkaStreams#allMetadataForKey() > >>>> #getStandby() > >>>> > >>>> vs: > >>>> > >>>> KafkaStreams#metadataForKey() > >>>> > >>>> KafkaStreams#standbyMetadataForKey() > >>>> > >>>> The wrapping of both within `KeyQueryMetadata` does not seem to > provide > >>>> any benefit but increase our public API surface. > >>>> > >>>> > >>>> > >>>> @Guozhang: > >>>> > >>>> (1.1. + 1.2.) From my understanding `allMetadata()` (and other > existing > >>>> methods) will only return the metadata of _active_ tasks for backward > >>>> compatibility reasons. If we would return standby metadata, existing > >>>> code would potentially "break" because the code might pick a standby > to > >>>> query a key without noticing. > >>>> > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 11/8/19 6:07 AM, Navinder Brar wrote: > >>>>> Thanks, Guozhang for going through it again. > >>>>> > >>>>> - 1.1 & 1.2: The main point of adding topicPartition in > >>>> KeyQueryMetadata is not topicName, but the partition number. I agree > >>>> changelog topicNames and store names will have 1-1 mapping but we also > >>> need > >>>> the partition number of the changelog for which are calculating the > lag. > >>>> Now we can add partition number in StreamsMetadata but it will be > >>>> orthogonal to the definition of StreamsMetadata i.e.- “Represents the > >>> state > >>>> of an instance (process) in a {@link KafkaStreams} application.” If > we > >>> add > >>>> partition number in this, it doesn’t stay metadata for an instance, > >>> because > >>>> now it is storing the partition information for a key being queried. > So, > >>>> having “KeyQueryMetadata” simplifies this as now it contains all the > >>>> metadata and also changelog and partition information for which we > need > >>> to > >>>> calculate the lag. > >>>>> > >>>>> Another way is having another function in parallel to metadataForKey, > >>>> which returns the partition number for the key being queried. But then > >>> we > >>>> would need 2 calls to StreamsMetadataState, once to fetch metadata and > >>>> another to fetch partition number. Let me know if any of these two > ways > >>>> seem more intuitive than KeyQueryMetadata then we can try to converge > on > >>>> one. > >>>>> - 1.3: Again, it is required for the partition number. We can > drop > >>>> store name though. > >>>>> - 2.1: I think this was done in accordance with the opinion from > >>> John > >>>> as time lag would be better implemented with a broker level change and > >>>> offset change is readily implementable. @vinoth? > >>>>> - 2.2.1: Good point. +1 > >>>>> - 2.2.2: I am not well aware of it, @vinoth any comments? > >>>>> - 3.1: I think we have already agreed on dropping this, we need to > >>>> KIP. Also, is there any opinion on lagInfoForStore(String storeName) > vs > >>>> lagInfoForStore(String storeName, int partition) > >>>>> - 3.2: But in functions such as onAssignment(), > >>>> onPartitionsAssigned(), for standbyTasks also the topicPartitions we > use > >>>> are input topic partitions and not changelog partitions. Would this be > >>>> breaking from that semantics? > >>>>> > >>>>> > >>>>> On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang < > >>>> wangg...@gmail.com> wrote: > >>>>> > >>>>> Hi Navinder, Vinoth, thanks for the updated KIP! > >>>>> > >>>>> Read through the discussions so far and made another pass on the wiki > >>>> page, > >>>>> and here are some more comments: > >>>>> > >>>>> 1. About the public APIs: > >>>>> > >>>>> 1.1. It is not clear to me how allStandbyMetadataForStore > >>>>> and allStandbyMetadata would be differentiated from the original APIs > >>>> given > >>>>> that we will augment StreamsMetadata to include both active and > >>> standby > >>>>> topic-partitions and store names, so I think we can still use > >>> allMetadata > >>>>> and allMetadataForStore to get the collection of instance metadata > >>> that > >>>>> host the store both as active and standbys. Are there any specific > use > >>>>> cases where we ONLY want to get the standby's metadata? And even if > >>> there > >>>>> are, we can easily filter it out from the allMetadata / > >>>> allMetadataForStore > >>>>> right? > >>>>> > >>>>> 1.2. Similarly I'm wondering for allMetadataForKey, can we return the > >>>> same > >>>>> type: "Collection<StreamsMetadata>" which includes 1 for active, and > >>> N-1 > >>>>> for standbys, and callers can easily identify them by looking inside > >>> the > >>>>> StreamsMetadata objects? In addition I feel the "topicPartition" > field > >>>>> inside "KeyQueryMetadata" is not very important since the changelog > >>>>> topic-name is always 1-1 mapping to the store name, so as long as the > >>>> store > >>>>> name matches, the changelog topic name should always match (i.e. in > >>>>> the pseudo code, just checking store names should be sufficient). If > >>> all > >>>> of > >>>>> the above assumption is true, I think we can save us from introducing > >>> one > >>>>> more public class here. > >>>>> > >>>>> 1.3. Similarly in StoreLagInfo, seems not necessary to include the > >>> topic > >>>>> partition name in addition to the store name. > >>>>> > >>>>> 2. About querying store lags: we've discussed about separating the > >>>> querying > >>>>> of the lag information and the querying of the host information so I > >>>> still > >>>>> support having separate APIs here. More thoughts: > >>>>> > >>>>> 2.1. Compared with offsets, I'm wondering would time-difference be > >>> more > >>>>> intuitive for users to define the acceptable "staleness"? More > >>> strictly, > >>>>> are there any scenarios where we would actually prefer offsets over > >>>>> timestamps except that the timestamps are not available? > >>>>> > >>>>> 2.2. I'm also a bit leaning towards not putting the burden of > >>>> periodically > >>>>> refreshing our lag and caching it (and introducing another config) on > >>> the > >>>>> streams side but document clearly its cost and let users to consider > >>> its > >>>>> call frequency; of course in terms of implementation there are some > >>>>> optimizations we can consider: > >>>>> > >>>>> 1) for restoring active tasks, the log-end-offset is read once since > >>> it > >>>> is > >>>>> not expected to change, and that offset / timestamp can be remembered > >>> for > >>>>> lag calculation and we do not need to refresh again; > >>>>> 2) for standby tasks, there's a "Map<TopicPartition, Long> > >>>>> endOffsets(Collection<TopicPartition> partitions)" in KafkaConsumer > to > >>>>> batch a list of topic-partitions in one round-trip, and we can use > >>> that > >>>> to > >>>>> let our APIs be sth. like "lagInfoForStores(Collection<String> > >>>> storeNames)" > >>>>> to enable the batching effects. > >>>>> > >>>>> 3. Misc.: > >>>>> > >>>>> 3.1 There's a typo on the pseudo code "globalLagInforation". Also it > >>>> seems > >>>>> not describing how that information is collected (personally I also > >>> feel > >>>>> one "lagInfoForStores" is sufficient). > >>>>> 3.2 Note there's a slight semantical difference between active and > >>>>> standby's "partitions" inside StreamsMetadata, for active tasks the > >>>>> partitions are actually input topic partitions for the task: some of > >>> them > >>>>> may also act as changelog topics but these are exceptional cases; for > >>>>> standby tasks the "standbyTopicPartitions" are actually the changelog > >>>>> topics of the task. So maybe renaming it to > >>> "standbyChangelogPartitions" > >>>> to > >>>>> differentiate it? > >>>>> > >>>>> > >>>>> Overall I think this would be a really good KIP to add to Streams, > >>> thank > >>>>> you so much! > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar > >>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>> > >>>>>> +1 on implementing offset based lag for now and push time-based lag > >>> to a > >>>>>> later point in time when broker changes are done. Although > time-based > >>>> lag > >>>>>> enhances the readability, it would not be a make or break change for > >>>>>> implementing this KIP. > >>>>>> > >>>>>> Vinoth has explained the role of KeyQueryMetadata, let me in add in > >>> my 2 > >>>>>> cents as well. > >>>>>> - There are basically 2 reasons. One is that instead of having > two > >>>>>> functions, one to get StreamsMetadata for active and one for > >>> replicas. > >>>> We > >>>>>> are fetching both in a single call and we have a way to get only > >>> active > >>>> or > >>>>>> only replicas from the KeyQueryMetadata object(just like isStandby() > >>> and > >>>>>> isActive() discussion we had earlier) > >>>>>> - Since even after fetching the metadata now we have a > requirement > >>>> of > >>>>>> fetching the topicPartition for which the query came:- to fetch lag > >>> for > >>>>>> that specific topicPartition. Instead of having another call to > fetch > >>>> the > >>>>>> partition from StreamsMetadataState we thought using one single call > >>> and > >>>>>> fetching partition and all metadata would be better. > >>>>>> - Another option was to change StreamsMetadata object and add > >>>>>> topicPartition in that for which the query came but it doesn’t make > >>>> sense > >>>>>> in terms of semantics as it StreamsMetadata. Also, KeyQueryMetadata > >>>>>> represents all the metadata for the Key being queried, i.e. the > >>>> partition > >>>>>> it belongs to and the list of StreamsMetadata(hosts) active or > >>> replica > >>>>>> where the key could be found. > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth Chandar < > >>>>>> vchan...@confluent.io> wrote: > >>>>>> > >>>>>> +1 to John, suggestion on Duration/Instant and dropping the API to > >>>> fetch > >>>>>> all store's lags. However, I do think we need to return lags per > >>> topic > >>>>>> partition. So not sure if single return value would work? We need > >>> some > >>>> new > >>>>>> class that holds a TopicPartition and Duration/Instant variables > >>>> together? > >>>>>> > >>>>>> 10) Because we needed to return the topicPartition the key belongs > >>> to, > >>>> in > >>>>>> order to correlate with the lag information from the other set of > >>> APIs. > >>>>>> Otherwise, we don't know which topic partition's lag estimate to > >>> use. We > >>>>>> tried to illustrate this on the example code. StreamsMetadata is > >>> simply > >>>>>> capturing state of a streams host/instance, where as TopicPartition > >>>> depends > >>>>>> on the key passed in. This is a side effect of our decision to > >>> decouple > >>>> lag > >>>>>> based filtering on the metadata apis. > >>>>>> > >>>>>> 20) Goes back to the previous point. We needed to return information > >>>> that > >>>>>> is key specific, at which point it seemed natural for the > >>>> KeyQueryMetadata > >>>>>> to contain active, standby, topic partition for that key. If we > >>> merely > >>>>>> returned a standbyMetadataForKey() -> Collection<StreamsMetadata> > >>>> standby, > >>>>>> an active metadataForKey() -> StreamsMetadata, and new > >>>>>> getTopicPartition(key) -> topicPartition object back to the caller, > >>> then > >>>>>> arguably you could do the same kind of correlation. IMO having a the > >>>>>> KeyQueryMetadata class to encapsulate all this is a friendlier API. > >>>>>> allStandbyMetadata() and allStandbyMetadataForStore() are just > >>> counter > >>>>>> parts for metadataForStore() and allMetadata() that we introduce > >>> mostly > >>>> for > >>>>>> consistent API semantics. (their presence implicitly could help > >>> denote > >>>>>> metadataForStore() is for active instances. Happy to drop them if > >>> their > >>>>>> utility is not clear) > >>>>>> > >>>>>> 30) This would assume we refresh all the standby lag information > >>> every > >>>>>> time we query for that StreamsMetadata for a specific store? For > time > >>>> based > >>>>>> lag, this will involve fetching the tail kafka record at once from > >>>> multiple > >>>>>> kafka topic partitions? I would prefer not to couple them like this > >>> and > >>>>>> have the ability to make granular store (or even topic partition > >>> level) > >>>>>> fetches for lag information. > >>>>>> > >>>>>> 32) I actually prefer John's suggestion to let the application drive > >>> the > >>>>>> lag fetches/updation and not have flags as the KIP current points > to. > >>>> Are > >>>>>> you reexamining that position? > >>>>>> > >>>>>> On fetching lag information, +1 we could do this much more > >>> efficiently > >>>> with > >>>>>> a broker changes. Given I don't yet have a burning need for the time > >>>> based > >>>>>> lag, I think we can sequence the APIs such that the offset based > ones > >>>> are > >>>>>> implemented first, while we have a broker side change? > >>>>>> Given we decoupled the offset and time based lag API, I am willing > to > >>>> drop > >>>>>> the time based lag functionality (since its not needed right away > >>> for my > >>>>>> use-case). @navinder . thoughts? > >>>>>> > >>>>>> > >>>>>> On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax < > >>> matth...@confluent.io> > >>>>>> wrote: > >>>>>> > >>>>>>> Navinder, > >>>>>>> > >>>>>>> thanks for updating the KIP. Couple of follow up questions: > >>>>>>> > >>>>>>> > >>>>>>> (10) Why do we need to introduce the class `KeyQueryMetadata`? > >>>>>>> > >>>>>>> (20) Why do we introduce the two methods `allMetadataForKey()`? > >>> Would > >>>> it > >>>>>>> not be simpler to add `Collection<StreamMetatdata> > >>>>>>> standbyMetadataForKey(...)`. This would align with new methods > >>>>>>> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? > >>>>>>> > >>>>>>> (30) Why do we need the class `StoreLagInfo` -- it seems simpler to > >>>> just > >>>>>>> extend `StreamMetadata` with the corresponding attributes and > >>> methods > >>>>>>> (of active task, the lag would always be reported as zero) > >>>>>>> > >>>>>>> (32) Via (30) we can avoid the two new methods `#allLagInfo()` and > >>>>>>> `#lagInfoForStore()`, too, reducing public API and making it > >>> simpler to > >>>>>>> use the feature. > >>>>>>> > >>>>>>> Btw: If we make `StreamMetadata` thread safe, the lag information > >>> can > >>>> be > >>>>>>> updated in the background without the need that the application > >>>>>>> refreshes its metadata. Hence, the user can get active and/or > >>> standby > >>>>>>> metadata once, and only needs to refresh it, if a rebalance > >>> happened. > >>>>>>> > >>>>>>> > >>>>>>> About point (4) of the previous thread: I was also thinking about > >>>>>>> when/how to update the time-lag information, and I agree that we > >>> should > >>>>>>> not update it for each query. > >>>>>>> > >>>>>>> "How": That we need to fetch the last record is a little bit > >>>>>>> unfortunate, but I don't see any other way without a broker change. > >>> One > >>>>>>> issue I still see is with "exactly-once" -- if transaction markers > >>> are > >>>>>>> in the topic, the last message is not at offset "endOffset - 1" and > >>> as > >>>>>>> multiple transaction markers might be after each other, it's > unclear > >>>> how > >>>>>>> to identify the offset of the last record... Thoughts? > >>>>>>> > >>>>>>> Hence, it might be worth to look into a broker change as a > potential > >>>>>>> future improvement. It might be possible that the broker caches the > >>>>>>> latest timestamp per partition to serve this data efficiently, > >>> similar > >>>>>>> to `#endOffset()`. > >>>>>>> > >>>>>>> "When": We refresh the end-offset information based on the > >>>>>>> `commit.interval.ms` -- doing it more often is not really useful, > >>> as > >>>>>>> state store caches will most likely buffer up all writes to > >>> changelogs > >>>>>>> anyway and are only flushed on commit (including a flush of the > >>>>>>> producer). Hence, I would suggest to update the time-lag > information > >>>>>>> based on the same strategy in the background. This way there is no > >>>>>>> additional config or methods and the user does not need to worry > >>> about > >>>>>>> it at all. > >>>>>>> > >>>>>>> To avoid refresh overhead if we don't need it (a user might not use > >>> IQ > >>>>>>> to begin with), it might be worth to maintain an internal flag > >>>>>>> `updateTimeLagEnabled` that is set to `false` initially and only > >>> set to > >>>>>>> `true` on the first call of a user to get standby-metadata. > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On 11/4/19 5:13 PM, Vinoth Chandar wrote: > >>>>>>>>>> I'm having some trouble wrapping my head around what race > >>>> conditions > >>>>>>>> might occur, other than the fundamentally broken state in which > >>>>>> different > >>>>>>>> instances are running totally different topologies. > >>>>>>>> 3. @both Without the topic partitions that the tasks can map back > >>> to, > >>>>>> we > >>>>>>>> have to rely on topology/cluster metadata in each Streams instance > >>> to > >>>>>> map > >>>>>>>> the task back. If the source topics are wild carded for e,g then > >>> each > >>>>>>>> instance could have different source topics in topology, until the > >>>> next > >>>>>>>> rebalance happens. You can also read my comments from here > >>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > >>>>>>>> > >>>>>>>> > >>>>>>>>>> seems hard to imagine how encoding arbitrarily long topic names > >>> plus > >>>>>> an > >>>>>>>> integer for the partition number could be as efficient as task > ids, > >>>>>> which > >>>>>>>> are just two integers. > >>>>>>>> 3. if you still have concerns about the efficacy of dictionary > >>>>>> encoding, > >>>>>>>> happy to engage. The link above also has some benchmark code I > >>> used. > >>>>>>>> Theoretically, we would send each topic name atleast once, so yes > >>> if > >>>>>> you > >>>>>>>> compare a 10-20 character topic name + an integer to two integers, > >>> it > >>>>>>> will > >>>>>>>> be more bytes. But its constant overhead proportional to size of > >>> topic > >>>>>>> name > >>>>>>>> and with 4,8,12, partitions the size difference between baseline > >>>>>>> (version 4 > >>>>>>>> where we just repeated topic names for each topic partition) and > >>> the > >>>>>> two > >>>>>>>> approaches becomes narrow. > >>>>>>>> > >>>>>>>>>> Plus, Navinder is going to implement a bunch of protocol code > >>> that > >>>> we > >>>>>>>> might just want to change when the discussion actually does take > >>>> place, > >>>>>>> if > >>>>>>>> ever. > >>>>>>>>>> it'll just be a mental burden for everyone to remember that we > >>> want > >>>>>> to > >>>>>>>> have this follow-up discussion. > >>>>>>>> 3. Is n't people changing same parts of code and tracking follow > >>> ups a > >>>>>>>> common thing, we need to deal with anyway? For this KIP, is n't > it > >>>>>>> enough > >>>>>>>> to reason about whether the additional map on top of the topic > >>>>>> dictionary > >>>>>>>> would incur more overhead than the sending task_ids? I don't think > >>>> it's > >>>>>>>> case, both of them send two integers. As I see it, we can do a > >>>> separate > >>>>>>>> follow up to (re)pursue the task_id conversion and get it working > >>> for > >>>>>>> both > >>>>>>>> maps within the next release? > >>>>>>>> > >>>>>>>>>> Can you elaborate on "breaking up the API"? It looks like there > >>> are > >>>>>>>> already separate API calls in the proposal, one for time-lag, and > >>>>>> another > >>>>>>>> for offset-lag, so are they not already broken up? > >>>>>>>> The current APIs (e.g lagInfoForStore) for lags return > StoreLagInfo > >>>>>>> objects > >>>>>>>> which has both time and offset lags. If we had separate APIs, say > >>> (e.g > >>>>>>>> offsetLagForStore(), timeLagForStore()), we can implement offset > >>>>>> version > >>>>>>>> using the offset lag that the streams instance already tracks i.e > >>> no > >>>>>> need > >>>>>>>> for external calls. The time based lag API would incur the kafka > >>> read > >>>>>> for > >>>>>>>> the timestamp. makes sense? > >>>>>>>> > >>>>>>>> Based on the discussions so far, I only see these two pending > >>> issues > >>>> to > >>>>>>> be > >>>>>>>> aligned on. Is there any other open item people want to bring up? > >>>>>>>> > >>>>>>>> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman < > >>>>>> sop...@confluent.io > >>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Regarding 3) I'm wondering, does your concern still apply even > now > >>>>>>>>> that the pluggable PartitionGrouper interface has been > deprecated? > >>>>>>>>> Now that we can be sure that the DefaultPartitionGrouper is used > >>> to > >>>>>>>>> generate > >>>>>>>>> the taskId -> partitions mapping, we should be able to convert > any > >>>>>>> taskId > >>>>>>>>> to any > >>>>>>>>> partitions. > >>>>>>>>> > >>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <j...@confluent.io> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hey Vinoth, thanks for the reply! > >>>>>>>>>> > >>>>>>>>>> 3. > >>>>>>>>>> I get that it's not the main focus of this KIP, but if it's ok, > >>> it > >>>>>>>>>> would be nice to hash out this point right now. It only came up > >>>>>>>>>> because this KIP-535 is substantially extending the pattern in > >>>>>>>>>> question. If we push it off until later, then the reviewers are > >>>> going > >>>>>>>>>> to have to suspend their concerns not just while voting for the > >>> KIP, > >>>>>>>>>> but also while reviewing the code. Plus, Navinder is going to > >>>>>>>>>> implement a bunch of protocol code that we might just want to > >>> change > >>>>>>>>>> when the discussion actually does take place, if ever. Finally, > >>>> it'll > >>>>>>>>>> just be a mental burden for everyone to remember that we want to > >>>> have > >>>>>>>>>> this follow-up discussion. > >>>>>>>>>> > >>>>>>>>>> It makes sense what you say... the specific assignment is > already > >>>>>>>>>> encoded in the "main" portion of the assignment, not in the > >>>>>> "userdata" > >>>>>>>>>> part. It also makes sense that it's simpler to reason about > >>> races if > >>>>>>>>>> you simply get all the information about the topics and > >>> partitions > >>>>>>>>>> directly from the assignor, rather than get the partition number > >>>> from > >>>>>>>>>> the assignor and the topic name from your own a priori knowledge > >>> of > >>>>>>>>>> the topology. On the other hand, I'm having some trouble > >>> wrapping my > >>>>>>>>>> head around what race conditions might occur, other than the > >>>>>>>>>> fundamentally broken state in which different instances are > >>> running > >>>>>>>>>> totally different topologies. Sorry, but can you remind us of > the > >>>>>>>>>> specific condition? > >>>>>>>>>> > >>>>>>>>>> To the efficiency counterargument, it seems hard to imagine how > >>>>>>>>>> encoding arbitrarily long topic names plus an integer for the > >>>>>>>>>> partition number could be as efficient as task ids, which are > >>> just > >>>>>> two > >>>>>>>>>> integers. It seems like this would only be true if topic names > >>> were > >>>> 4 > >>>>>>>>>> characters or less. > >>>>>>>>>> > >>>>>>>>>> 4. > >>>>>>>>>> Yeah, clearly, it would not be a good idea to query the metadata > >>>>>>>>>> before every single IQ query. I think there are plenty of > >>>> established > >>>>>>>>>> patterns for distributed database clients to follow. Can you > >>>>>> elaborate > >>>>>>>>>> on "breaking up the API"? It looks like there are already > >>> separate > >>>>>> API > >>>>>>>>>> calls in the proposal, one for time-lag, and another for > >>> offset-lag, > >>>>>>>>>> so are they not already broken up? FWIW, yes, I agree, the > offset > >>>> lag > >>>>>>>>>> is already locally known, so we don't need to build in an extra > >>>>>>>>>> synchronous broker API call, just one for the time-lag. > >>>>>>>>>> > >>>>>>>>>> Thanks again for the discussion, > >>>>>>>>>> -John > >>>>>>>>>> > >>>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar < > >>>>>> vchan...@confluent.io> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> 3. Right now, we still get the topic partitions assigned as a > >>> part > >>>>>> of > >>>>>>>>> the > >>>>>>>>>>> top level Assignment object (the one that wraps AssignmentInfo) > >>> and > >>>>>>> use > >>>>>>>>>>> that to convert taskIds back. This list of only contains > >>>> assignments > >>>>>>>>> for > >>>>>>>>>>> that particular instance. Attempting to also reverse map for > >>> "all" > >>>>>> the > >>>>>>>>>>> tasksIds in the streams cluster i.e all the topic partitions in > >>>>>> these > >>>>>>>>>>> global assignment maps was what was problematic. By explicitly > >>>>>> sending > >>>>>>>>>> the > >>>>>>>>>>> global assignment maps as actual topic partitions, group > >>>>>> coordinator > >>>>>>>>>> (i.e > >>>>>>>>>>> the leader that computes the assignment's ) is able to > >>> consistently > >>>>>>>>>> enforce > >>>>>>>>>>> its view of the topic metadata. Still don't think doing such a > >>>>>> change > >>>>>>>>>> that > >>>>>>>>>>> forces you to reconsider semantics, is not needed to save bits > >>> on > >>>>>>> wire. > >>>>>>>>>> May > >>>>>>>>>>> be we can discuss this separately from this KIP? > >>>>>>>>>>> > >>>>>>>>>>> 4. There needs to be some caching/interval somewhere though > >>> since > >>>> we > >>>>>>>>>> don't > >>>>>>>>>>> want to make 1 kafka read per 1 IQ potentially. But I think its > >>> a > >>>>>>> valid > >>>>>>>>>>> suggestion, to make this call just synchronous and leave the > >>>> caching > >>>>>>> or > >>>>>>>>>> how > >>>>>>>>>>> often you want to call to the application. Would it be good to > >>> then > >>>>>>>>> break > >>>>>>>>>>> up the APIs for time and offset based lag? We can obtain > offset > >>>>>> based > >>>>>>>>>> lag > >>>>>>>>>>> for free? Only incur the overhead of reading kafka if we want > >>> time > >>>>>>>>>>> based lags? > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman < > >>>>>>>>> sop...@confluent.io> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Adding on to John's response to 3), can you clarify when and > >>> why > >>>>>>>>>> exactly we > >>>>>>>>>>>> cannot > >>>>>>>>>>>> convert between taskIds and partitions? If that's really the > >>> case > >>>> I > >>>>>>>>>> don't > >>>>>>>>>>>> feel confident > >>>>>>>>>>>> that the StreamsPartitionAssignor is not full of bugs... > >>>>>>>>>>>> > >>>>>>>>>>>> It seems like it currently just encodes a list of all > >>> partitions > >>>>>> (the > >>>>>>>>>>>> assignment) and also > >>>>>>>>>>>> a list of the corresponding task ids, duplicated to ensure > each > >>>>>>>>>> partition > >>>>>>>>>>>> has the corresponding > >>>>>>>>>>>> taskId at the same offset into the list. Why is that > >>> problematic? > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler < > >>> j...@confluent.io> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, all, for considering the points! > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3. Interesting. I have a vague recollection of that... Still, > >>>>>>>>> though, > >>>>>>>>>>>>> it seems a little fishy. After all, we return the assignments > >>>>>>>>>>>>> themselves as task ids, and the members have to map these to > >>>> topic > >>>>>>>>>>>>> partitions in order to configure themselves properly. If it's > >>> too > >>>>>>>>>>>>> complicated to get this right, then how do we know that > >>> Streams > >>>> is > >>>>>>>>>>>>> computing the correct partitions at all? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4. How about just checking the log-end timestamp when you > call > >>>> the > >>>>>>>>>>>>> method? Then, when you get an answer, it's as fresh as it > >>> could > >>>>>>>>>>>>> possibly be. And as a user you have just one, obvious, "knob" > >>> to > >>>>>>>>>>>>> configure how much overhead you want to devote to checking... > >>> If > >>>>>>>>> you > >>>>>>>>>>>>> want to call the broker API less frequently, you just call > the > >>>>>>>>>> Streams > >>>>>>>>>>>>> API less frequently. And you don't have to worry about the > >>>>>>>>>>>>> relationship between your invocations of that method and the > >>>>>> config > >>>>>>>>>>>>> setting (e.g., you'll never get a negative number, which you > >>>> could > >>>>>>>>> if > >>>>>>>>>>>>> you check the log-end timestamp less frequently than you > check > >>>> the > >>>>>>>>>>>>> lag). > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> -John > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar > >>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks John for going through this. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> - +1, makes sense > >>>>>>>>>>>>>> - +1, no issues there > >>>>>>>>>>>>>> - Yeah the initial patch I had submitted for K-7149( > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/6935) to reduce > >>>>>>>>> assignmentInfo > >>>>>>>>>>>>> object had taskIds but the merged PR had similar size > >>> according > >>>> to > >>>>>>>>>> Vinoth > >>>>>>>>>>>>> and it was simpler so if the end result is of same size, it > >>> would > >>>>>>>>> not > >>>>>>>>>>>> make > >>>>>>>>>>>>> sense to pivot from dictionary and again move to taskIDs. > >>>>>>>>>>>>>> - Not sure about what a good default would be if we don't > >>>>>>>>> have a > >>>>>>>>>>>>> configurable setting. This gives the users the flexibility to > >>> the > >>>>>>>>>> users > >>>>>>>>>>>> to > >>>>>>>>>>>>> serve their requirements as at the end of the day it would > >>> take > >>>>>> CPU > >>>>>>>>>>>> cycles. > >>>>>>>>>>>>> I am ok with starting it with a default and see how it goes > >>> based > >>>>>>>>>> upon > >>>>>>>>>>>>> feedback. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Navinder > >>>>>>>>>>>>>> On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth > >>> Chandar > >>>>>>>>> < > >>>>>>>>>>>>> vchan...@confluent.io> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Was trying to spell them out separately. but makes sense > >>>> for > >>>>>>>>>>>>>> readability. done > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. No I immediately agree :) .. makes sense. @navinder? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 3. I actually attempted only sending taskIds while working > on > >>>>>>>>>>>> KAFKA-7149. > >>>>>>>>>>>>>> Its non-trivial to handle edges cases resulting from newly > >>> added > >>>>>>>>>> topic > >>>>>>>>>>>>>> partitions and wildcarded topic entries. I ended up > >>> simplifying > >>>>>>>>> it > >>>>>>>>>> to > >>>>>>>>>>>>> just > >>>>>>>>>>>>>> dictionary encoding the topic names to reduce size. We can > >>> apply > >>>>>>>>>> the > >>>>>>>>>>>> same > >>>>>>>>>>>>>> technique here for this map. Additionally, we could also > >>>>>>>>> dictionary > >>>>>>>>>>>>> encode > >>>>>>>>>>>>>> HostInfo, given its now repeated twice. I think this would > >>> save > >>>>>>>>>> more > >>>>>>>>>>>>> space > >>>>>>>>>>>>>> than having a flag per topic partition entry. Lmk if you are > >>>> okay > >>>>>>>>>> with > >>>>>>>>>>>>>> this. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 4. This opens up a good discussion. Given we support time > lag > >>>>>>>>>> estimates > >>>>>>>>>>>>>> also, we need to read the tail record of the changelog > >>>>>>>>> periodically > >>>>>>>>>>>>> (unlike > >>>>>>>>>>>>>> offset lag, which we can potentially piggyback on metadata > in > >>>>>>>>>>>>>> ConsumerRecord IIUC). we thought we should have a config > that > >>>>>>>>>> control > >>>>>>>>>>>> how > >>>>>>>>>>>>>> often this read happens? Let me know if there is a simple > >>> way to > >>>>>>>>>> get > >>>>>>>>>>>>>> timestamp value of the tail record that we are missing. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler < > >>>> j...@confluent.io > >>>>>>>>>> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hey Navinder, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for updating the KIP, it's a lot easier to see the > >>>>>>>>> current > >>>>>>>>>>>>>>> state of the proposal now. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> A few remarks: > >>>>>>>>>>>>>>> 1. I'm sure it was just an artifact of revisions, but you > >>> have > >>>>>>>>>> two > >>>>>>>>>>>>>>> separate sections where you list additions to the > >>> KafkaStreams > >>>>>>>>>>>>>>> interface. Can you consolidate those so we can see all the > >>>>>>>>>> additions > >>>>>>>>>>>>>>> at once? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2. For messageLagEstimate, can I suggest > "offsetLagEstimate" > >>>>>>>>>> instead, > >>>>>>>>>>>>>>> to be clearer that we're specifically measuring a number of > >>>>>>>>>> offsets? > >>>>>>>>>>>>>>> If you don't immediately agree, then I'd at least point out > >>>>>>>>> that > >>>>>>>>>> we > >>>>>>>>>>>>>>> usually refer to elements of Kafka topics as "records", not > >>>>>>>>>>>>>>> "messages", so "recordLagEstimate" might be more > >>> appropriate. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3. The proposal mentions adding a map of the standby > >>>>>>>>>> _partitions_ for > >>>>>>>>>>>>>>> each host to AssignmentInfo. I assume this is designed to > >>>>>>>>> mirror > >>>>>>>>>> the > >>>>>>>>>>>>>>> existing "partitionsByHost" map. To keep the size of these > >>>>>>>>>> metadata > >>>>>>>>>>>>>>> messages down, maybe we can consider making two changes: > >>>>>>>>>>>>>>> (a) for both actives and standbys, encode the _task ids_ > >>>>>>>>> instead > >>>>>>>>>> of > >>>>>>>>>>>>>>> _partitions_. Every member of the cluster has a copy of the > >>>>>>>>>> topology, > >>>>>>>>>>>>>>> so they can convert task ids into specific partitions on > >>> their > >>>>>>>>>> own, > >>>>>>>>>>>>>>> and task ids are only (usually) three characters. > >>>>>>>>>>>>>>> (b) instead of encoding two maps (hostinfo -> actives AND > >>>>>>>>>> hostinfo -> > >>>>>>>>>>>>>>> standbys), which requires serializing all the hostinfos > >>> twice, > >>>>>>>>>> maybe > >>>>>>>>>>>>>>> we can pack them together in one map with a structured > value > >>>>>>>>>>>> (hostinfo > >>>>>>>>>>>>>>> -> [actives,standbys]). > >>>>>>>>>>>>>>> Both of these ideas still require bumping the protocol > >>> version > >>>>>>>>>> to 6, > >>>>>>>>>>>>>>> and they basically mean we drop the existing > >>> `PartitionsByHost` > >>>>>>>>>> field > >>>>>>>>>>>>>>> and add a new `TasksByHost` field with the structured value > >>> I > >>>>>>>>>>>>>>> mentioned. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 4. Can we avoid adding the new "lag refresh" config? The > >>> lags > >>>>>>>>>> would > >>>>>>>>>>>>>>> necessarily be approximate anyway, so adding the config > >>> seems > >>>>>>>>> to > >>>>>>>>>>>>>>> increase the operational complexity of the system for > little > >>>>>>>>>> actual > >>>>>>>>>>>>>>> benefit. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the pseudocode, by the way, it really helps > >>>>>>>>> visualize > >>>>>>>>>> how > >>>>>>>>>>>>>>> these new interfaces would play together. And thanks again > >>> for > >>>>>>>>>> the > >>>>>>>>>>>>>>> update! > >>>>>>>>>>>>>>> -John > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler < > >>>>>>>>> j...@confluent.io> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hey Vinoth, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I started going over the KIP again yesterday. There are a > >>> lot > >>>>>>>>>> of > >>>>>>>>>>>>>>>> updates, and I didn't finish my feedback in one day. I'm > >>>>>>>>>> working on > >>>>>>>>>>>>> it > >>>>>>>>>>>>>>>> now. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> John > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar < > >>>>>>>>>>>>> vchan...@confluent.io> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Wondering if anyone has thoughts on these changes? I > liked > >>>>>>>>>> that > >>>>>>>>>>>>> the new > >>>>>>>>>>>>>>>>> metadata fetch APIs provide all the information at once > >>>>>>>>> with > >>>>>>>>>>>>> consistent > >>>>>>>>>>>>>>>>> naming.. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Any guidance on what you would like to be discussed or > >>>>>>>>>> fleshed > >>>>>>>>>>>> out > >>>>>>>>>>>>> more > >>>>>>>>>>>>>>>>> before we call a VOTE? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > >>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>>>> We have made some edits in the KIP( > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > >>>>>>>>>>>>>>> ) > >>>>>>>>>>>>>>>>>> after due deliberation on the agreed design to support > >>>>>>>>> the > >>>>>>>>>> new > >>>>>>>>>>>>> query > >>>>>>>>>>>>>>>>>> design. This includes the new public API to query > >>>>>>>>>> offset/time > >>>>>>>>>>>> lag > >>>>>>>>>>>>>>>>>> information and other details related to querying > standby > >>>>>>>>>> tasks > >>>>>>>>>>>>>>> which have > >>>>>>>>>>>>>>>>>> come up after thinking of thorough details. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> - Addition of new config, “lag.fetch.interval.ms” to > >>>>>>>>>>>>> configure > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>> interval of time/offset lag > >>>>>>>>>>>>>>>>>> - Addition of new class StoreLagInfo to store the > >>>>>>>>>>>> periodically > >>>>>>>>>>>>>>> obtained > >>>>>>>>>>>>>>>>>> time/offset lag > >>>>>>>>>>>>>>>>>> - Addition of two new functions in KafkaStreams, > >>>>>>>>>>>>>>> List<StoreLagInfo> > >>>>>>>>>>>>>>>>>> allLagInfo() and List<StoreLagInfo> > >>>>>>>>> lagInfoForStore(String > >>>>>>>>>>>>>>> storeName) to > >>>>>>>>>>>>>>>>>> return the lag information for an instance and a store > >>>>>>>>>>>>> respectively > >>>>>>>>>>>>>>>>>> - Addition of new class KeyQueryMetadata. We need > >>>>>>>>>>>>> topicPartition > >>>>>>>>>>>>>>> for > >>>>>>>>>>>>>>>>>> each key to be matched with the lag API for the topic > >>>>>>>>>>>> partition. > >>>>>>>>>>>>> One > >>>>>>>>>>>>>>> way is > >>>>>>>>>>>>>>>>>> to add new functions and fetch topicPartition from > >>>>>>>>>>>>>>> StreamsMetadataState but > >>>>>>>>>>>>>>>>>> we thought having one call and fetching StreamsMetadata > >>>>>>>>> and > >>>>>>>>>>>>>>> topicPartition > >>>>>>>>>>>>>>>>>> is more cleaner. > >>>>>>>>>>>>>>>>>> - > >>>>>>>>>>>>>>>>>> Renaming partitionsForHost to activePartitionsForHost in > >>>>>>>>>>>>>>> StreamsMetadataState > >>>>>>>>>>>>>>>>>> and partitionsByHostState to activePartitionsByHostState > >>>>>>>>>>>>>>>>>> in StreamsPartitionAssignor > >>>>>>>>>>>>>>>>>> - We have also added the pseudo code of how all the > >>>>>>>>>> changes > >>>>>>>>>>>>> will > >>>>>>>>>>>>>>> exist > >>>>>>>>>>>>>>>>>> together and support the new querying APIs > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Please let me know if anything is pending now, before a > >>>>>>>>>> vote > >>>>>>>>>>>> can > >>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>> started on this. On Saturday, 26 October, 2019, > 05:41:44 > >>>>>>>>>> pm > >>>>>>>>>>>> IST, > >>>>>>>>>>>>>>> Navinder > >>>>>>>>>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> >> Since there are two soft votes for separate > >>>>>>>>>> active/standby > >>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>>> methods, I also change my position on that. Fine with 2 > >>>>>>>>>>>> separate > >>>>>>>>>>>>>>>>>> methods. Once we remove the lag information from these > >>>>>>>>>> APIs, > >>>>>>>>>>>>>>> returning a > >>>>>>>>>>>>>>>>>> List is less attractive, since the ordering has no > >>>>>>>>> special > >>>>>>>>>>>>> meaning > >>>>>>>>>>>>>>> now. > >>>>>>>>>>>>>>>>>> Agreed, now that we are not returning lag, I am also > sold > >>>>>>>>>> on > >>>>>>>>>>>>> having > >>>>>>>>>>>>>>> two > >>>>>>>>>>>>>>>>>> separate functions. We already have one which returns > >>>>>>>>>>>>>>> streamsMetadata for > >>>>>>>>>>>>>>>>>> active tasks, and now we can add another one for > >>>>>>>>> standbys. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Saturday, 26 October, 2019, 03:55:16 am IST, > Vinoth > >>>>>>>>>>>>> Chandar < > >>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> +1 to Sophie's suggestion. Having both lag in terms of > >>>>>>>>>> time > >>>>>>>>>>>> and > >>>>>>>>>>>>>>> offsets is > >>>>>>>>>>>>>>>>>> good and makes for a more complete API. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Since there are two soft votes for separate > >>>>>>>>> active/standby > >>>>>>>>>> API > >>>>>>>>>>>>>>> methods, I > >>>>>>>>>>>>>>>>>> also change my position on that. Fine with 2 separate > >>>>>>>>>> methods. > >>>>>>>>>>>>>>>>>> Once we remove the lag information from these APIs, > >>>>>>>>>> returning a > >>>>>>>>>>>>> List > >>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>> less attractive, since the ordering has no special > >>>>>>>>> meaning > >>>>>>>>>> now. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> lag in offsets vs time: Having both, as suggested by > >>>>>>>>>> Sophie > >>>>>>>>>>>>> would > >>>>>>>>>>>>>>> of > >>>>>>>>>>>>>>>>>> course be best. What is a little unclear to me is, how > in > >>>>>>>>>>>> details > >>>>>>>>>>>>>>> are we > >>>>>>>>>>>>>>>>>> going to compute both? > >>>>>>>>>>>>>>>>>> @navinder may be next step is to flesh out these details > >>>>>>>>>> and > >>>>>>>>>>>>> surface > >>>>>>>>>>>>>>> any > >>>>>>>>>>>>>>>>>> larger changes we need to make if need be. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Any other details we need to cover, before a VOTE can be > >>>>>>>>>> called > >>>>>>>>>>>>> on > >>>>>>>>>>>>>>> this? > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck < > >>>>>>>>>> bbej...@gmail.com > >>>>>>>>>>>>> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I am jumping in a little late here. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Overall I agree with the proposal to push decision > >>>>>>>>>> making on > >>>>>>>>>>>>>>> what/how to > >>>>>>>>>>>>>>>>>>> query in the query layer. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For point 5 from above, I'm slightly in favor of having > >>>>>>>>>> a new > >>>>>>>>>>>>>>> method, > >>>>>>>>>>>>>>>>>>> "standbyMetadataForKey()" or something similar. > >>>>>>>>>>>>>>>>>>> Because even if we return all tasks in one list, the > >>>>>>>>> user > >>>>>>>>>>>> will > >>>>>>>>>>>>>>> still have > >>>>>>>>>>>>>>>>>>> to perform some filtering to separate the different > >>>>>>>>>> tasks, > >>>>>>>>>>>> so I > >>>>>>>>>>>>>>> don't > >>>>>>>>>>>>>>>>>> feel > >>>>>>>>>>>>>>>>>>> making two calls is a burden, and IMHO makes things > >>>>>>>>> more > >>>>>>>>>>>>>>> transparent for > >>>>>>>>>>>>>>>>>>> the user. > >>>>>>>>>>>>>>>>>>> If the final vote is for using an "isActive" field, I'm > >>>>>>>>>> good > >>>>>>>>>>>>> with > >>>>>>>>>>>>>>> that as > >>>>>>>>>>>>>>>>>>> well. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Just my 2 cents. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > >>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I think now we are aligned on almost all the design > >>>>>>>>>> parts. > >>>>>>>>>>>>>>> Summarising > >>>>>>>>>>>>>>>>>>>> below what has been discussed above and we have a > >>>>>>>>>> general > >>>>>>>>>>>>>>> consensus on. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> - Rather than broadcasting lag across all nodes at > >>>>>>>>>>>>>>> rebalancing/with > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> heartbeat, we will just return a list of all > >>>>>>>>> available > >>>>>>>>>>>>> standby’s > >>>>>>>>>>>>>>> in the > >>>>>>>>>>>>>>>>>>>> system and the user can make IQ query any of those > >>>>>>>>>> nodes > >>>>>>>>>>>>> which > >>>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>>>>> return > >>>>>>>>>>>>>>>>>>>> the response, and the lag and offset time. Based on > >>>>>>>>>> which > >>>>>>>>>>>>> user > >>>>>>>>>>>>>>> can > >>>>>>>>>>>>>>>>>> decide > >>>>>>>>>>>>>>>>>>>> if he wants to return the response back or call > >>>>>>>>> another > >>>>>>>>>>>>> standby. > >>>>>>>>>>>>>>>>>>>> - The current metadata query frequency will not > >>>>>>>>>> change. > >>>>>>>>>>>>> It > >>>>>>>>>>>>>>> will be > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> same as it does now, i.e. before each query. > >>>