>during restoring state the active might have some lag Great catch, yes.. we cannot assume lag = 0 for active. Lets report active lag as well then. If active is too laggy, the app can then deem the store partition unavailable (based on what the application is willing to tolerate).
@matthias do you agree? We can then begin the vote. On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar <navinder_b...@yahoo.com.invalid> wrote: > I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all > actives don't have 0 lag, as during restoring state the active might have > some lag and one of the features of this KIP is to provide an option to > query from active (which might be in restoring state). > I will update the KIP with rejected alternatives and post this will start > a vote if everyone agrees on this. > On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler < > j...@confluent.io> wrote: > > Hi all, > > Thanks for the "reset", Vinoth. It brings some needed clarity to the > discussion. > > 10. My 2 cents: we might as well include the lags for the active > copies as well. This is a more context-free API. If we only include > standbys, this choice won't make sense to users unless they understand > that the active task cannot lag in the steady state, since it's the > source of updates. This isn't a bad thing to realize, but it's just > more mental overhead for the person who wants to list the lags for > "all local stores". > > Another reason is that we could consider also reporting the lag for > actives during recovery (when they would have non-zero lag). We don't > have to now, but if we choose to call the method "standby lags", then > we can't make this choice in the future. > > That said, it's just my opinion. I'm fine either way. > > 20. Vinoth's reply makes sense to me, fwiw. > > Beyond these two points, I'm happy with the current proposal. > > Thanks again, > -John > > On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar <vchan...@confluent.io> > wrote: > > > > 10. I considered that. Had to pick one or the other. Can just return > > standby too and rename method to may be “allLocalStandbyOffsetLags()” to > > have it explicit. (Standby should implicitly convey that we are talking > > about stores) > > > > 20. What I meant was, we are returning HostInfo instead of > StreamsMetadata > > since thats sufficient to route query; same for “int partition “ vs topic > > partition before. Previously KeyQueryMetadata had similar structure but > > used StreamsMetadata and TopicPartition objects to convey same > information > > > > @navinder KIP is already upto date with the email I sent, except for the > > reasonings I was laying out. +1 on revisiting rejected alternatives. > > Please make the follow up changes > > > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Thanks for the summary Vinoth! > > > > > > I buy the overall argument. Couple of clarification questions: > > > > > > > > > 10. Why do we need to include the active stores in > > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag > > > for standbys? > > > > > > > > > 20: What does > > > > > > > Thin the KeyQueryMetadata object to just contain the minimum > information > > > > needed. > > > > > > exaclty mean? What is the "minimum information needed" ? > > > > > > > > > @Navinder: if you agree, can you update the KIP accoringly? With all > the > > > proposals, it's hard to keep track and it would be great to have the > > > current proposal summarized in the wiki page. > > > > > > Please also update the "Rejected alternative" sections to avoid that we > > > cycle back to old proposal (including the reason _why_ they got > rejected). > > > > > > > > > Thanks a lot! > > > > > > > > > -Matthias > > > > > > > > > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > > > Given we have had a healthy discussion on this topic for a month now > and > > > > still with many loose ends and open ended conversations, I thought It > > > would > > > > be worthwhile to take a step back and re-evaluate everything in the > > > context > > > > of the very real use-case and its specific scenarios. > > > > > > > > First, let's remind ourselves of the query routing flow of the > streams > > > > application ("app" here on) > > > > > > > > 1. queries get routed to any random streams instance in the > cluster > > > > ("router" here on) > > > > 2. router then uses Streams metadata to pick active/standby > instances > > > > for that key's store/partition > > > > 3. router instance also maintains global lag information for all > > > stores > > > > and all their partitions, by a gossip/broadcast/heartbeat protocol > > > (done > > > > outside of Streams framework), but using > KafkaStreams#allMetadata() > > > for > > > > streams instance discovery. > > > > 4. router then uses information in 2 & 3 to determine which > instance > > > to > > > > send the query to : always picks active instance if alive or the > most > > > > in-sync live standby otherwise. > > > > > > > > Few things to note : > > > > > > > > A) We choose to decouple how the lag information is obtained (control > > > > plane) from query path (data plane), since that provides more > flexibility > > > > in designing the control plane. i.e pick any or combination of > gossip, > > > > N-way broadcast, control the rate of propagation, piggybacking on > request > > > > responses > > > > B) Since the app needs to do its own control plane, talking to other > > > > instances directly for failure detection & exchanging other > metadata, we > > > > can leave the lag APIs added to KafkaStreams class itself local and > > > simply > > > > return lag for all store/partitions on that instance. > > > > C) Streams preserves its existing behavior of instances only talking > to > > > > each other through the Kafka brokers. > > > > D) Since the router treats active/standby differently, it would be > good > > > for > > > > the KafkaStreams APIs to hand them back explicitly, with no > additional > > > > logic needed for computing them. Specifically, the router only knows > two > > > > things - key and store and if we just return a > > > Collection<StreamsMetadata> > > > > back, it cannot easily tease apart active and standby. Say, a streams > > > > instance hosts the same store as both active and standby for > different > > > > partitions, matching by just storename the app will find it in both > > > active > > > > and standby lists. > > > > E) From above, we assume the global lag estimate (lag per store topic > > > > partition) are continuously reported amongst application instances > and > > > > already available on the router during step 2 above. Hence, > attaching lag > > > > APIs to StreamsMetadata is unnecessary and does not solve the needs > > > anyway. > > > > F) Currently returned StreamsMetadata object is really information > about > > > a > > > > streams instance, that is not very specific to the key being queried. > > > > Specifically, router has no knowledge of the topic partition a given > key > > > > belongs, this is needed for matching to the global lag information in > > > step > > > > 4 above (and as also the example code in the KIP showed before). The > > > > StreamsMetadata, since it's about the instance itself, would contain > all > > > > topic partitions and stores on that instance, not specific to the > given > > > key. > > > > G) A cleaner API would thin the amount of information returned to > > > > specifically the given key and store - which is partition the key > belongs > > > > to, active host info, list of standby host info. KeyQueryMetadata was > > > > proposed in this spirit, but still hung onto StreamsMetadata as the > > > member > > > > field(s) which leaks more than what's required to route. > > > > H) For this use case, its sufficient to just support offsetLag > initially, > > > > without needing time based lag information right away. So we > probably > > > > don't need a new top level StoreLagInfo class anymore. > > > > I) On whether the local lag information is cached within Streams or > App > > > > code, its again preferable and be necessary for the App to be able to > > > > invoke the lag api from an app thread and keep a cached state in > memory, > > > to > > > > be piggybacked with request responses. If this information is also > cached > > > > inside Streams, it does no harm anyway. > > > > > > > > Based on this reasoning, I have made the following updates to the KIP > > > > > > > > - Drop the proposed KafkaStreams#allStandbyMetadata() and > > > > KafkaStreams#allStandbyMetadataForStore() methods, with intent of > > > > introducing minimal APIs to support the usage described above > > > > - Drop the KafkaStreams#lagInfoForStore() method and just stick to > one > > > > method that returns all the local store partition's lags. > > > > - Rename allLagInfo() to allLocalStoreOffsetLags() to be explicit > that > > > the > > > > lags are local and only on offsets. > > > > - Drop StoreLagInfo class > > > > - allLocalStoreOffsetLags() returns Map<String, Map<Integer, Long>>, > > > which > > > > maps a store name to a map containing partition to offset lag info. > It > > > will > > > > include both active and standby store partitions. active always has > 0 lag > > > > since its caught up with changelog topic. > > > > - Thin the KeyQueryMetadata object to just contain the minimum > > > information > > > > needed. Rename allMetadataForKey() method variants to > > > queryMetadataForKey() > > > > variants that return KeyQueryMetadata > > > > - Propose deprecating two current methods that return metadata based > on > > > key > > > > : KafkaStreams#metadataForKey(..), to get more users to use the new > > > > queryMetadataForKey APIs > > > > - We will still have to enhance StreamsMetadata with fields for > > > > standbyTopicPartitions and standbyStateStoreNames, since that is a > core > > > > object that gets updated upon rebalance. > > > > > > > > > > > > Please let us know if this is agreeable. We will also work some of > this > > > > discussion into the background/proposed changes sections, upon > feedback. > > > > > > > > > > > > On Tue, Nov 12, 2019 at 9:17 AM Vinoth Chandar < > vchan...@confluent.io> > > > > wrote: > > > > > > > >> In all, is everyone OK with > > > >> > > > >> - Dropping KeyQueryMetadata, and the allMetadataForKey() apis > > > >> - Dropping allLagInfo() from KafkaStreams class, Drop StoreLagInfo > > > class > > > >> - Add offsetLag(store, key, serializer) -> Optional<Long> & > > > >> offsetLag(store, key, partitioner) -> Optional<Long> to > StreamsMetadata > > > >> - Duplicate the current methods for standbyMetadata in > KafkaStreams : > > > >> allStandbyMetadata(), allStandbyMetadataForStore(), two variants of > > > >> standbyMetadataForKey(), > > > >> > > > >> > > > >> Responses to Guozhang: > > > >> > > > >> 1.1 Like I mentioned before, the allStandbyMetadata() and > > > >> allStandbyMetadataForStore() complement existing allMetadata() and > > > >> allMetadataForStore(), since we don't want to change behavior of > > > existing > > > >> APIs. Based on discussions so far, if we decide to drop > > > KeyQueryMetadata, > > > >> then we will need to introduce 4 equivalents for standby metadata as > > > >> Matthias mentioned. > > > >> 1.2 I am okay with pushing lag information to a method on > > > StreamsMetadata > > > >> (Personally, I won't design it like that, but happy to live with it) > > > like > > > >> what Matthias suggested. But assuming topic name <=> store name > > > equivalency > > > >> for mapping this seems like a broken API to me. If all of Streams > code > > > were > > > >> written like this, I can understand. But I don't think its the > case? I > > > >> would not be comfortable making such assumptions outside of public > APIs. > > > >>>> look into each one's standby partition / stores to tell which one > > > >> StreamsMetadata is corresponding to the instance who holds a > specific > > > key > > > >> as standby, yes, but I feel this one extra iteration is worth to > avoid > > > >> introducing a new class. > > > >> This sort of thing would lead to non-standardized/potentially buggy > > > client > > > >> implementations, for something I expect the system would hand me > > > directly. > > > >> I don't personally feel introducing a new class is so bad, to > warrant > > > the > > > >> user to do all this matching. Given the current APIs are not > explicitly > > > >> named to denote active metadata, it gives us a chance to build > something > > > >> more direct and clear IMO. If we do allMetadataForKey apis, then we > > > should > > > >> clearly separate active and standby ourselves. Alternate is separate > > > active > > > >> and standby APIs as Matthias suggests, which I can make peace with. > > > >> > > > >> 1.3 Similar as above. In Streams code, we treat topic partitions and > > > store > > > >> names separately?. > > > >> 2.1 I think most databases build replication using logical offsets, > not > > > >> time. Time lag can be a nice to have feature, but offset lag is > fully > > > >> sufficient for a lot of use-cases. > > > >> 2.2.1 We could support a lagInfoForStores() batch api. makes sense. > > > >> > > > >> > > > >> Responses to Matthias : > > > >> (100) Streams can still keep the upto date version in memory and > > > >> implementation could be for now just reading this already refreshed > > > value. > > > >> Designing the API, with intent of pushing this to the user keeps > doors > > > >> open for supporting time based lag in the future. > > > >> (101) I am not sure what the parameters of evaluating approaches > here > > > is. > > > >> Generally, when I am handed a Metadata object, I don't expect to > further > > > >> query it for more information semantically. I would not also force > user > > > to > > > >> make separate calls for active and standby metadata. > > > >> Well, that may be just me. So sure, we can push this into > > > StreamsMetadata > > > >> if everyone agrees! > > > >> +1 on duplicating all 4 methods for standbys in this case. > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> On Tue, Nov 12, 2019 at 4:12 AM Navinder Brar > > > >> <navinder_b...@yahoo.com.invalid> wrote: > > > >> > > > >>> > > > >>> - Looking back, I agree that 2 calls to StreamsMetadata to fetch > > > >>> StreamsMetadata and then using something like ‘long > > > >>> StreamsMetadata#offsetLag(store, key)’ which Matthias suggested is > > > better > > > >>> than introducing a new public API i.e. ‘KeyQueryMetadata’. I will > > > change > > > >>> the KIP accordingly. > > > >>> - >> I am actually not even sure, why we added > > > >>> `StreamsMetadata#topicPartitions()` originally > > > >>> I think it is helpful in showing which host holds which source > topic > > > >>> partitions for /instances endpoint and when you query a key, you > need > > > to > > > >>> match the partition on which the key belongs to with the hosts > holding > > > >>> source topic partitions for that partition. Is there any other way > to > > > get > > > >>> this info? > > > >>> > > > >>> On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang < > > > >>> wangg...@gmail.com> wrote: > > > >>> > > > >>> Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams > > > instances, so > > > >>> 1) allMetadata would still return the same number of > StreamsMetadata in > > > >>> collection, just that within the StreamsMetadata now you have new > APIs > > > to > > > >>> access standby partitions / stores. So I think it would not be a > > > breaking > > > >>> change to the public API to not include `allStandbyMetadata` and ` > > > >>> allStandbyMetadataForStore` but still rely on > `allMetadata(ForStore)`? > > > >>> > > > >>> Regarding 1.1: Good point about the partition number. But I'm still > > > >>> wondering if it is definitely necessary to introduce a new > > > >>> `KeyQueryMetadata` > > > >>> interface class. E.g. suppose our function signature is > > > >>> > > > >>> Collection<StreamsMetadata> allMetadataForKey > > > >>> > > > >>> When you get the collection of StreamsMetadata you need to iterate > over > > > >>> the > > > >>> collection and look into each one's standby partition / stores to > tell > > > >>> which one StreamsMetadata is corresponding to the instance who > holds a > > > >>> specific key as standby, yes, but I feel this one extra iteration > is > > > worth > > > >>> to avoid introducing a new class. > > > >>> > > > >>> > > > >>> Guozhang > > > >>> > > > >>> On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax < > matth...@confluent.io > > > > > > > >>> wrote: > > > >>> > > > >>>> I agree, that we might want to drop the time-base lag for the > initial > > > >>>> implementation. There is no good way to get this information > without a > > > >>>> broker side change. > > > >>>> > > > >>>> > > > >>>> > > > >>>> (100) For the offset lag information, I don't see a reason why > the app > > > >>>> should drive when this information is updated though, because KS > will > > > >>>> update this information anyway (once per `commit.interval.ms` -- > and > > > >>>> updating it more frequently does not make sense, as it most likely > > > won't > > > >>>> change more frequently anyway). > > > >>>> > > > >>>> If you all insist that the app should drive it, I can live with > it, > > > but > > > >>>> I think it makes the API unnecessarily complex without a benefit. > > > >>>> > > > >>>> > > > >>>> > > > >>>> (101) I still don't understand why we need to have > `KeyQueryMetadata` > > > >>>> though. Note, that an instance can only report lag for it's local > > > >>>> stores, but not remote stores as it does not know to what offset a > > > >>>> remote standby has caught up to. > > > >>>> > > > >>>>> Because we needed to return the topicPartition the key belongs > to, in > > > >>>>> order to correlate with the lag information from the other set of > > > >>> APIs. > > > >>>> > > > >>>> My suggestion is to get the lag information from > `StreamsMetadata` -- > > > >>>> which partition the store belongs to can be completely > encapsulated > > > >>>> within KS as all information is local, and I don't think we need > to > > > >>>> expose it to the user at all. > > > >>>> > > > >>>> We can just add `long StreamsMetadata#offsetLag(store, key)`. If > the > > > >>>> store is local we return its lag, if it's remote we return `-1` > (ie, > > > >>>> UNKNOWN). As an alternative, we can change the return type to > > > >>>> `Optional<Long>`. This works for active and standby task alike. > > > >>>> > > > >>>> Note, that a user must verify if `StreamsMeatadata` is for itself > > > >>>> (local) or remote anyway. We only need to provide a way that > allows > > > >>>> users to distinguish between active an standby. (More below.) > > > >>>> > > > >>>> > > > >>>> I am actually not even sure, why we added > > > >>>> `StreamsMetadata#topicPartitions()` originally -- seems pretty > > > useless. > > > >>>> Can we deprecate it as side cleanup in this KIP? Or do I miss > > > something? > > > >>>> > > > >>>> > > > >>>> > > > >>>> (102) > > > >>>> > > > >>>>> There are basically 2 reasons. One is that instead of having two > > > >>>> functions, one to get StreamsMetadata for active and one for > replicas. > > > >>> We > > > >>>> are fetching both in a single call and we have a way to get only > > > active > > > >>> or > > > >>>> only replicas from the KeyQueryMetadata object(just like > isStandby() > > > and > > > >>>> isActive() discussion we had earlier) > > > >>>> > > > >>>> I understand, that we need two methods. However, I think we can > > > simplify > > > >>>> the API and not introduce `KeyQueryMetadata`, but just "duplicate" > > > all 4 > > > >>>> existing methods for standby tasks: > > > >>>> > > > >>>> // note that `standbyMetadataForKey` return a Collection in > contrast > > > to > > > >>>> existing `metadataForKey` > > > >>>> > > > >>>>> Collection<StreamsMetadata> allStandbyMetadata() > > > >>>>> Collection<StreamsMetadata> allStandbyMetadataForStore(String > > > >>>> storeName) > > > >>>>> Collection<StreamsMetadata> metadataForKey(String storeName, K > key, > > > >>>> Serializer<K> keySerializer) > > > >>>>> Collection<StreamsMetadata> metadataForKey(String storeName, K > key, > > > >>>> StreamPartitioner<? super K, ?> partitioner) > > > >>>> > > > >>>> Because the existing methods return all active metadata, there is > no > > > >>>> reason to return `KeyQueryMetadata` as it's more complicated to > get > > > the > > > >>>> standby metadata. With `KeyQueryMetadata` the user needs to make > more > > > >>>> calls to get the metadata: > > > >>>> > > > >>>> KafkaStreams#allMetadataForKey() > > > >>>> #getActive() > > > >>>> > > > >>>> KafkaStreams#allMetadataForKey() > > > >>>> #getStandby() > > > >>>> > > > >>>> vs: > > > >>>> > > > >>>> KafkaStreams#metadataForKey() > > > >>>> > > > >>>> KafkaStreams#standbyMetadataForKey() > > > >>>> > > > >>>> The wrapping of both within `KeyQueryMetadata` does not seem to > > > provide > > > >>>> any benefit but increase our public API surface. > > > >>>> > > > >>>> > > > >>>> > > > >>>> @Guozhang: > > > >>>> > > > >>>> (1.1. + 1.2.) From my understanding `allMetadata()` (and other > > > existing > > > >>>> methods) will only return the metadata of _active_ tasks for > backward > > > >>>> compatibility reasons. If we would return standby metadata, > existing > > > >>>> code would potentially "break" because the code might pick a > standby > > > to > > > >>>> query a key without noticing. > > > >>>> > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> > > > >>>> On 11/8/19 6:07 AM, Navinder Brar wrote: > > > >>>>> Thanks, Guozhang for going through it again. > > > >>>>> > > > >>>>> - 1.1 & 1.2: The main point of adding topicPartition in > > > >>>> KeyQueryMetadata is not topicName, but the partition number. I > agree > > > >>>> changelog topicNames and store names will have 1-1 mapping but we > also > > > >>> need > > > >>>> the partition number of the changelog for which are calculating > the > > > lag. > > > >>>> Now we can add partition number in StreamsMetadata but it will be > > > >>>> orthogonal to the definition of StreamsMetadata i.e.- “Represents > the > > > >>> state > > > >>>> of an instance (process) in a {@link KafkaStreams} application.” > If > > > we > > > >>> add > > > >>>> partition number in this, it doesn’t stay metadata for an > instance, > > > >>> because > > > >>>> now it is storing the partition information for a key being > queried. > > > So, > > > >>>> having “KeyQueryMetadata” simplifies this as now it contains all > the > > > >>>> metadata and also changelog and partition information for which we > > > need > > > >>> to > > > >>>> calculate the lag. > > > >>>>> > > > >>>>> Another way is having another function in parallel to > metadataForKey, > > > >>>> which returns the partition number for the key being queried. But > then > > > >>> we > > > >>>> would need 2 calls to StreamsMetadataState, once to fetch > metadata and > > > >>>> another to fetch partition number. Let me know if any of these two > > > ways > > > >>>> seem more intuitive than KeyQueryMetadata then we can try to > converge > > > on > > > >>>> one. > > > >>>>> - 1.3: Again, it is required for the partition number. We can > > > drop > > > >>>> store name though. > > > >>>>> - 2.1: I think this was done in accordance with the opinion > from > > > >>> John > > > >>>> as time lag would be better implemented with a broker level > change and > > > >>>> offset change is readily implementable. @vinoth? > > > >>>>> - 2.2.1: Good point. +1 > > > >>>>> - 2.2.2: I am not well aware of it, @vinoth any comments? > > > >>>>> - 3.1: I think we have already agreed on dropping this, we > need to > > > >>>> KIP. Also, is there any opinion on lagInfoForStore(String > storeName) > > > vs > > > >>>> lagInfoForStore(String storeName, int partition) > > > >>>>> - 3.2: But in functions such as onAssignment(), > > > >>>> onPartitionsAssigned(), for standbyTasks also the topicPartitions > we > > > use > > > >>>> are input topic partitions and not changelog partitions. Would > this be > > > >>>> breaking from that semantics? > > > >>>>> > > > >>>>> > > > >>>>> On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang > < > > > >>>> wangg...@gmail.com> wrote: > > > >>>>> > > > >>>>> Hi Navinder, Vinoth, thanks for the updated KIP! > > > >>>>> > > > >>>>> Read through the discussions so far and made another pass on the > wiki > > > >>>> page, > > > >>>>> and here are some more comments: > > > >>>>> > > > >>>>> 1. About the public APIs: > > > >>>>> > > > >>>>> 1.1. It is not clear to me how allStandbyMetadataForStore > > > >>>>> and allStandbyMetadata would be differentiated from the original > APIs > > > >>>> given > > > >>>>> that we will augment StreamsMetadata to include both active and > > > >>> standby > > > >>>>> topic-partitions and store names, so I think we can still use > > > >>> allMetadata > > > >>>>> and allMetadataForStore to get the collection of instance > metadata > > > >>> that > > > >>>>> host the store both as active and standbys. Are there any > specific > > > use > > > >>>>> cases where we ONLY want to get the standby's metadata? And even > if > > > >>> there > > > >>>>> are, we can easily filter it out from the allMetadata / > > > >>>> allMetadataForStore > > > >>>>> right? > > > >>>>> > > > >>>>> 1.2. Similarly I'm wondering for allMetadataForKey, can we > return the > > > >>>> same > > > >>>>> type: "Collection<StreamsMetadata>" which includes 1 for active, > and > > > >>> N-1 > > > >>>>> for standbys, and callers can easily identify them by looking > inside > > > >>> the > > > >>>>> StreamsMetadata objects? In addition I feel the "topicPartition" > > > field > > > >>>>> inside "KeyQueryMetadata" is not very important since the > changelog > > > >>>>> topic-name is always 1-1 mapping to the store name, so as long > as the > > > >>>> store > > > >>>>> name matches, the changelog topic name should always match (i.e. > in > > > >>>>> the pseudo code, just checking store names should be > sufficient). If > > > >>> all > > > >>>> of > > > >>>>> the above assumption is true, I think we can save us from > introducing > > > >>> one > > > >>>>> more public class here. > > > >>>>> > > > >>>>> 1.3. Similarly in StoreLagInfo, seems not necessary to include > the > > > >>> topic > > > >>>>> partition name in addition to the store name. > > > >>>>> > > > >>>>> 2. About querying store lags: we've discussed about separating > the > > > >>>> querying > > > >>>>> of the lag information and the querying of the host information > so I > > > >>>> still > > > >>>>> support having separate APIs here. More thoughts: > > > >>>>> > > > >>>>> 2.1. Compared with offsets, I'm wondering would time-difference > be > > > >>> more > > > >>>>> intuitive for users to define the acceptable "staleness"? More > > > >>> strictly, > > > >>>>> are there any scenarios where we would actually prefer offsets > over > > > >>>>> timestamps except that the timestamps are not available? > > > >>>>> > > > >>>>> 2.2. I'm also a bit leaning towards not putting the burden of > > > >>>> periodically > > > >>>>> refreshing our lag and caching it (and introducing another > config) on > > > >>> the > > > >>>>> streams side but document clearly its cost and let users to > consider > > > >>> its > > > >>>>> call frequency; of course in terms of implementation there are > some > > > >>>>> optimizations we can consider: > > > >>>>> > > > >>>>> 1) for restoring active tasks, the log-end-offset is read once > since > > > >>> it > > > >>>> is > > > >>>>> not expected to change, and that offset / timestamp can be > remembered > > > >>> for > > > >>>>> lag calculation and we do not need to refresh again; > > > >>>>> 2) for standby tasks, there's a "Map<TopicPartition, Long> > > > >>>>> endOffsets(Collection<TopicPartition> partitions)" in > KafkaConsumer > > > to > > > >>>>> batch a list of topic-partitions in one round-trip, and we can > use > > > >>> that > > > >>>> to > > > >>>>> let our APIs be sth. like "lagInfoForStores(Collection<String> > > > >>>> storeNames)" > > > >>>>> to enable the batching effects. > > > >>>>> > > > >>>>> 3. Misc.: > > > >>>>> > > > >>>>> 3.1 There's a typo on the pseudo code "globalLagInforation". > Also it > > > >>>> seems > > > >>>>> not describing how that information is collected (personally I > also > > > >>> feel > > > >>>>> one "lagInfoForStores" is sufficient). > > > >>>>> 3.2 Note there's a slight semantical difference between active > and > > > >>>>> standby's "partitions" inside StreamsMetadata, for active tasks > the > > > >>>>> partitions are actually input topic partitions for the task: > some of > > > >>> them > > > >>>>> may also act as changelog topics but these are exceptional > cases; for > > > >>>>> standby tasks the "standbyTopicPartitions" are actually the > changelog > > > >>>>> topics of the task. So maybe renaming it to > > > >>> "standbyChangelogPartitions" > > > >>>> to > > > >>>>> differentiate it? > > > >>>>> > > > >>>>> > > > >>>>> Overall I think this would be a really good KIP to add to > Streams, > > > >>> thank > > > >>>>> you so much! > > > >>>>> > > > >>>>> > > > >>>>> Guozhang > > > >>>>> > > > >>>>> On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > >>>>> > > > >>>>>> +1 on implementing offset based lag for now and push time-based > lag > > > >>> to a > > > >>>>>> later point in time when broker changes are done. Although > > > time-based > > > >>>> lag > > > >>>>>> enhances the readability, it would not be a make or break > change for > > > >>>>>> implementing this KIP. > > > >>>>>> > > > >>>>>> Vinoth has explained the role of KeyQueryMetadata, let me in > add in > > > >>> my 2 > > > >>>>>> cents as well. > > > >>>>>> - There are basically 2 reasons. One is that instead of > having > > > two > > > >>>>>> functions, one to get StreamsMetadata for active and one for > > > >>> replicas. > > > >>>> We > > > >>>>>> are fetching both in a single call and we have a way to get only > > > >>> active > > > >>>> or > > > >>>>>> only replicas from the KeyQueryMetadata object(just like > isStandby() > > > >>> and > > > >>>>>> isActive() discussion we had earlier) > > > >>>>>> - Since even after fetching the metadata now we have a > > > requirement > > > >>>> of > > > >>>>>> fetching the topicPartition for which the query came:- to fetch > lag > > > >>> for > > > >>>>>> that specific topicPartition. Instead of having another call to > > > fetch > > > >>>> the > > > >>>>>> partition from StreamsMetadataState we thought using one single > call > > > >>> and > > > >>>>>> fetching partition and all metadata would be better. > > > >>>>>> - Another option was to change StreamsMetadata object and add > > > >>>>>> topicPartition in that for which the query came but it doesn’t > make > > > >>>> sense > > > >>>>>> in terms of semantics as it StreamsMetadata. Also, > KeyQueryMetadata > > > >>>>>> represents all the metadata for the Key being queried, i.e. the > > > >>>> partition > > > >>>>>> it belongs to and the list of StreamsMetadata(hosts) active or > > > >>> replica > > > >>>>>> where the key could be found. > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth > Chandar < > > > >>>>>> vchan...@confluent.io> wrote: > > > >>>>>> > > > >>>>>> +1 to John, suggestion on Duration/Instant and dropping the > API to > > > >>>> fetch > > > >>>>>> all store's lags. However, I do think we need to return lags per > > > >>> topic > > > >>>>>> partition. So not sure if single return value would work? We > need > > > >>> some > > > >>>> new > > > >>>>>> class that holds a TopicPartition and Duration/Instant variables > > > >>>> together? > > > >>>>>> > > > >>>>>> 10) Because we needed to return the topicPartition the key > belongs > > > >>> to, > > > >>>> in > > > >>>>>> order to correlate with the lag information from the other set > of > > > >>> APIs. > > > >>>>>> Otherwise, we don't know which topic partition's lag estimate to > > > >>> use. We > > > >>>>>> tried to illustrate this on the example code. StreamsMetadata is > > > >>> simply > > > >>>>>> capturing state of a streams host/instance, where as > TopicPartition > > > >>>> depends > > > >>>>>> on the key passed in. This is a side effect of our decision to > > > >>> decouple > > > >>>> lag > > > >>>>>> based filtering on the metadata apis. > > > >>>>>> > > > >>>>>> 20) Goes back to the previous point. We needed to return > information > > > >>>> that > > > >>>>>> is key specific, at which point it seemed natural for the > > > >>>> KeyQueryMetadata > > > >>>>>> to contain active, standby, topic partition for that key. If we > > > >>> merely > > > >>>>>> returned a standbyMetadataForKey() -> > Collection<StreamsMetadata> > > > >>>> standby, > > > >>>>>> an active metadataForKey() -> StreamsMetadata, and new > > > >>>>>> getTopicPartition(key) -> topicPartition object back to the > caller, > > > >>> then > > > >>>>>> arguably you could do the same kind of correlation. IMO having > a the > > > >>>>>> KeyQueryMetadata class to encapsulate all this is a friendlier > API. > > > >>>>>> allStandbyMetadata() and allStandbyMetadataForStore() are just > > > >>> counter > > > >>>>>> parts for metadataForStore() and allMetadata() that we introduce > > > >>> mostly > > > >>>> for > > > >>>>>> consistent API semantics. (their presence implicitly could help > > > >>> denote > > > >>>>>> metadataForStore() is for active instances. Happy to drop them > if > > > >>> their > > > >>>>>> utility is not clear) > > > >>>>>> > > > >>>>>> 30) This would assume we refresh all the standby lag information > > > >>> every > > > >>>>>> time we query for that StreamsMetadata for a specific store? For > > > time > > > >>>> based > > > >>>>>> lag, this will involve fetching the tail kafka record at once > from > > > >>>> multiple > > > >>>>>> kafka topic partitions? I would prefer not to couple them like > this > > > >>> and > > > >>>>>> have the ability to make granular store (or even topic partition > > > >>> level) > > > >>>>>> fetches for lag information. > > > >>>>>> > > > >>>>>> 32) I actually prefer John's suggestion to let the application > drive > > > >>> the > > > >>>>>> lag fetches/updation and not have flags as the KIP current > points > > > to. > > > >>>> Are > > > >>>>>> you reexamining that position? > > > >>>>>> > > > >>>>>> On fetching lag information, +1 we could do this much more > > > >>> efficiently > > > >>>> with > > > >>>>>> a broker changes. Given I don't yet have a burning need for the > time > > > >>>> based > > > >>>>>> lag, I think we can sequence the APIs such that the offset based > > > ones > > > >>>> are > > > >>>>>> implemented first, while we have a broker side change? > > > >>>>>> Given we decoupled the offset and time based lag API, I am > willing > > > to > > > >>>> drop > > > >>>>>> the time based lag functionality (since its not needed right > away > > > >>> for my > > > >>>>>> use-case). @navinder . thoughts? > > > >>>>>> > > > >>>>>> > > > >>>>>> On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax < > > > >>> matth...@confluent.io> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Navinder, > > > >>>>>>> > > > >>>>>>> thanks for updating the KIP. Couple of follow up questions: > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> (10) Why do we need to introduce the class `KeyQueryMetadata`? > > > >>>>>>> > > > >>>>>>> (20) Why do we introduce the two methods `allMetadataForKey()`? > > > >>> Would > > > >>>> it > > > >>>>>>> not be simpler to add `Collection<StreamMetatdata> > > > >>>>>>> standbyMetadataForKey(...)`. This would align with new methods > > > >>>>>>> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? > > > >>>>>>> > > > >>>>>>> (30) Why do we need the class `StoreLagInfo` -- it seems > simpler to > > > >>>> just > > > >>>>>>> extend `StreamMetadata` with the corresponding attributes and > > > >>> methods > > > >>>>>>> (of active task, the lag would always be reported as zero) > > > >>>>>>> > > > >>>>>>> (32) Via (30) we can avoid the two new methods `#allLagInfo()` > and > > > >>>>>>> `#lagInfoForStore()`, too, reducing public API and making it > > > >>> simpler to > > > >>>>>>> use the feature. > > > >>>>>>> > > > >>>>>>> Btw: If we make `StreamMetadata` thread safe, the lag > information > > > >>> can > > > >>>> be > > > >>>>>>> updated in the background without the need that the application > > > >>>>>>> refreshes its metadata. Hence, the user can get active and/or > > > >>> standby > > > >>>>>>> metadata once, and only needs to refresh it, if a rebalance > > > >>> happened. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> About point (4) of the previous thread: I was also thinking > about > > > >>>>>>> when/how to update the time-lag information, and I agree that > we > > > >>> should > > > >>>>>>> not update it for each query. > > > >>>>>>> > > > >>>>>>> "How": That we need to fetch the last record is a little bit > > > >>>>>>> unfortunate, but I don't see any other way without a broker > change. > > > >>> One > > > >>>>>>> issue I still see is with "exactly-once" -- if transaction > markers > > > >>> are > > > >>>>>>> in the topic, the last message is not at offset "endOffset - > 1" and > > > >>> as > > > >>>>>>> multiple transaction markers might be after each other, it's > > > unclear > > > >>>> how > > > >>>>>>> to identify the offset of the last record... Thoughts? > > > >>>>>>> > > > >>>>>>> Hence, it might be worth to look into a broker change as a > > > potential > > > >>>>>>> future improvement. It might be possible that the broker > caches the > > > >>>>>>> latest timestamp per partition to serve this data efficiently, > > > >>> similar > > > >>>>>>> to `#endOffset()`. > > > >>>>>>> > > > >>>>>>> "When": We refresh the end-offset information based on the > > > >>>>>>> `commit.interval.ms` -- doing it more often is not really > useful, > > > >>> as > > > >>>>>>> state store caches will most likely buffer up all writes to > > > >>> changelogs > > > >>>>>>> anyway and are only flushed on commit (including a flush of the > > > >>>>>>> producer). Hence, I would suggest to update the time-lag > > > information > > > >>>>>>> based on the same strategy in the background. This way there > is no > > > >>>>>>> additional config or methods and the user does not need to > worry > > > >>> about > > > >>>>>>> it at all. > > > >>>>>>> > > > >>>>>>> To avoid refresh overhead if we don't need it (a user might > not use > > > >>> IQ > > > >>>>>>> to begin with), it might be worth to maintain an internal flag > > > >>>>>>> `updateTimeLagEnabled` that is set to `false` initially and > only > > > >>> set to > > > >>>>>>> `true` on the first call of a user to get standby-metadata. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> -Matthias > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On 11/4/19 5:13 PM, Vinoth Chandar wrote: > > > >>>>>>>>>> I'm having some trouble wrapping my head around what race > > > >>>> conditions > > > >>>>>>>> might occur, other than the fundamentally broken state in > which > > > >>>>>> different > > > >>>>>>>> instances are running totally different topologies. > > > >>>>>>>> 3. @both Without the topic partitions that the tasks can map > back > > > >>> to, > > > >>>>>> we > > > >>>>>>>> have to rely on topology/cluster metadata in each Streams > instance > > > >>> to > > > >>>>>> map > > > >>>>>>>> the task back. If the source topics are wild carded for e,g > then > > > >>> each > > > >>>>>>>> instance could have different source topics in topology, > until the > > > >>>> next > > > >>>>>>>> rebalance happens. You can also read my comments from here > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>> > > > >>> > > > > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>>>> seems hard to imagine how encoding arbitrarily long topic > names > > > >>> plus > > > >>>>>> an > > > >>>>>>>> integer for the partition number could be as efficient as task > > > ids, > > > >>>>>> which > > > >>>>>>>> are just two integers. > > > >>>>>>>> 3. if you still have concerns about the efficacy of dictionary > > > >>>>>> encoding, > > > >>>>>>>> happy to engage. The link above also has some benchmark code I > > > >>> used. > > > >>>>>>>> Theoretically, we would send each topic name atleast once, so > yes > > > >>> if > > > >>>>>> you > > > >>>>>>>> compare a 10-20 character topic name + an integer to two > integers, > > > >>> it > > > >>>>>>> will > > > >>>>>>>> be more bytes. But its constant overhead proportional to size > of > > > >>> topic > > > >>>>>>> name > > > >>>>>>>> and with 4,8,12, partitions the size difference between > baseline > > > >>>>>>> (version 4 > > > >>>>>>>> where we just repeated topic names for each topic partition) > and > > > >>> the > > > >>>>>> two > > > >>>>>>>> approaches becomes narrow. > > > >>>>>>>> > > > >>>>>>>>>> Plus, Navinder is going to implement a bunch of protocol > code > > > >>> that > > > >>>> we > > > >>>>>>>> might just want to change when the discussion actually does > take > > > >>>> place, > > > >>>>>>> if > > > >>>>>>>> ever. > > > >>>>>>>>>> it'll just be a mental burden for everyone to remember that > we > > > >>> want > > > >>>>>> to > > > >>>>>>>> have this follow-up discussion. > > > >>>>>>>> 3. Is n't people changing same parts of code and tracking > follow > > > >>> ups a > > > >>>>>>>> common thing, we need to deal with anyway? For this KIP, is > n't > > > it > > > >>>>>>> enough > > > >>>>>>>> to reason about whether the additional map on top of the topic > > > >>>>>> dictionary > > > >>>>>>>> would incur more overhead than the sending task_ids? I don't > think > > > >>>> it's > > > >>>>>>>> case, both of them send two integers. As I see it, we can do a > > > >>>> separate > > > >>>>>>>> follow up to (re)pursue the task_id conversion and get it > working > > > >>> for > > > >>>>>>> both > > > >>>>>>>> maps within the next release? > > > >>>>>>>> > > > >>>>>>>>>> Can you elaborate on "breaking up the API"? It looks like > there > > > >>> are > > > >>>>>>>> already separate API calls in the proposal, one for time-lag, > and > > > >>>>>> another > > > >>>>>>>> for offset-lag, so are they not already broken up? > > > >>>>>>>> The current APIs (e.g lagInfoForStore) for lags return > > > StoreLagInfo > > > >>>>>>> objects > > > >>>>>>>> which has both time and offset lags. If we had separate APIs, > say > > > >>> (e.g > > > >>>>>>>> offsetLagForStore(), timeLagForStore()), we can implement > offset > > > >>>>>> version > > > >>>>>>>> using the offset lag that the streams instance already tracks > i.e > > > >>> no > > > >>>>>> need > > > >>>>>>>> for external calls. The time based lag API would incur the > kafka > > > >>> read > > > >>>>>> for > > > >>>>>>>> the timestamp. makes sense? > > > >>>>>>>> > > > >>>>>>>> Based on the discussions so far, I only see these two pending > > > >>> issues > > > >>>> to > > > >>>>>>> be > > > >>>>>>>> aligned on. Is there any other open item people want to bring > up? > > > >>>>>>>> > > > >>>>>>>> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman < > > > >>>>>> sop...@confluent.io > > > >>>>>>>> > > > >>>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Regarding 3) I'm wondering, does your concern still apply > even > > > now > > > >>>>>>>>> that the pluggable PartitionGrouper interface has been > > > deprecated? > > > >>>>>>>>> Now that we can be sure that the DefaultPartitionGrouper is > used > > > >>> to > > > >>>>>>>>> generate > > > >>>>>>>>> the taskId -> partitions mapping, we should be able to > convert > > > any > > > >>>>>>> taskId > > > >>>>>>>>> to any > > > >>>>>>>>> partitions. > > > >>>>>>>>> > > > >>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler < > j...@confluent.io> > > > >>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Hey Vinoth, thanks for the reply! > > > >>>>>>>>>> > > > >>>>>>>>>> 3. > > > >>>>>>>>>> I get that it's not the main focus of this KIP, but if it's > ok, > > > >>> it > > > >>>>>>>>>> would be nice to hash out this point right now. It only > came up > > > >>>>>>>>>> because this KIP-535 is substantially extending the pattern > in > > > >>>>>>>>>> question. If we push it off until later, then the reviewers > are > > > >>>> going > > > >>>>>>>>>> to have to suspend their concerns not just while voting for > the > > > >>> KIP, > > > >>>>>>>>>> but also while reviewing the code. Plus, Navinder is going > to > > > >>>>>>>>>> implement a bunch of protocol code that we might just want > to > > > >>> change > > > >>>>>>>>>> when the discussion actually does take place, if ever. > Finally, > > > >>>> it'll > > > >>>>>>>>>> just be a mental burden for everyone to remember that we > want to > > > >>>> have > > > >>>>>>>>>> this follow-up discussion. > > > >>>>>>>>>> > > > >>>>>>>>>> It makes sense what you say... the specific assignment is > > > already > > > >>>>>>>>>> encoded in the "main" portion of the assignment, not in the > > > >>>>>> "userdata" > > > >>>>>>>>>> part. It also makes sense that it's simpler to reason about > > > >>> races if > > > >>>>>>>>>> you simply get all the information about the topics and > > > >>> partitions > > > >>>>>>>>>> directly from the assignor, rather than get the partition > number > > > >>>> from > > > >>>>>>>>>> the assignor and the topic name from your own a priori > knowledge > > > >>> of > > > >>>>>>>>>> the topology. On the other hand, I'm having some trouble > > > >>> wrapping my > > > >>>>>>>>>> head around what race conditions might occur, other than the > > > >>>>>>>>>> fundamentally broken state in which different instances are > > > >>> running > > > >>>>>>>>>> totally different topologies. Sorry, but can you remind us > of > > > the > > > >>>>>>>>>> specific condition? > > > >>>>>>>>>> > > > >>>>>>>>>> To the efficiency counterargument, it seems hard to imagine > how > > > >>>>>>>>>> encoding arbitrarily long topic names plus an integer for > the > > > >>>>>>>>>> partition number could be as efficient as task ids, which > are > > > >>> just > > > >>>>>> two > > > >>>>>>>>>> integers. It seems like this would only be true if topic > names > > > >>> were > > > >>>> 4 > > > >>>>>>>>>> characters or less. > > > >>>>>>>>>> > > > >>>>>>>>>> 4. > > > >>>>>>>>>> Yeah, clearly, it would not be a good idea to query the > metadata > > > >>>>>>>>>> before every single IQ query. I think there are plenty of > > > >>>> established > > > >>>>>>>>>> patterns for distributed database clients to follow. Can you > > > >>>>>> elaborate > > > >>>>>>>>>> on "breaking up the API"? It looks like there are already > > > >>> separate > > > >>>>>> API > > > >>>>>>>>>> calls in the proposal, one for time-lag, and another for > > > >>> offset-lag, > > > >>>>>>>>>> so are they not already broken up? FWIW, yes, I agree, the > > > offset > > > >>>> lag > > > >>>>>>>>>> is already locally known, so we don't need to build in an > extra > > > >>>>>>>>>> synchronous broker API call, just one for the time-lag. > > > >>>>>>>>>> > > > >>>>>>>>>> Thanks again for the discussion, > > > >>>>>>>>>> -John > > > >>>>>>>>>> > > > >>>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar < > > > >>>>>> vchan...@confluent.io> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>> 3. Right now, we still get the topic partitions assigned > as a > > > >>> part > > > >>>>>> of > > > >>>>>>>>> the > > > >>>>>>>>>>> top level Assignment object (the one that wraps > AssignmentInfo) > > > >>> and > > > >>>>>>> use > > > >>>>>>>>>>> that to convert taskIds back. This list of only contains > > > >>>> assignments > > > >>>>>>>>> for > > > >>>>>>>>>>> that particular instance. Attempting to also reverse map > for > > > >>> "all" > > > >>>>>> the > > > >>>>>>>>>>> tasksIds in the streams cluster i.e all the topic > partitions in > > > >>>>>> these > > > >>>>>>>>>>> global assignment maps was what was problematic. By > explicitly > > > >>>>>> sending > > > >>>>>>>>>> the > > > >>>>>>>>>>> global assignment maps as actual topic partitions, group > > > >>>>>> coordinator > > > >>>>>>>>>> (i.e > > > >>>>>>>>>>> the leader that computes the assignment's ) is able to > > > >>> consistently > > > >>>>>>>>>> enforce > > > >>>>>>>>>>> its view of the topic metadata. Still don't think doing > such a > > > >>>>>> change > > > >>>>>>>>>> that > > > >>>>>>>>>>> forces you to reconsider semantics, is not needed to save > bits > > > >>> on > > > >>>>>>> wire. > > > >>>>>>>>>> May > > > >>>>>>>>>>> be we can discuss this separately from this KIP? > > > >>>>>>>>>>> > > > >>>>>>>>>>> 4. There needs to be some caching/interval somewhere though > > > >>> since > > > >>>> we > > > >>>>>>>>>> don't > > > >>>>>>>>>>> want to make 1 kafka read per 1 IQ potentially. But I > think its > > > >>> a > > > >>>>>>> valid > > > >>>>>>>>>>> suggestion, to make this call just synchronous and leave > the > > > >>>> caching > > > >>>>>>> or > > > >>>>>>>>>> how > > > >>>>>>>>>>> often you want to call to the application. Would it be > good to > > > >>> then > > > >>>>>>>>> break > > > >>>>>>>>>>> up the APIs for time and offset based lag? We can obtain > > > offset > > > >>>>>> based > > > >>>>>>>>>> lag > > > >>>>>>>>>>> for free? Only incur the overhead of reading kafka if we > want > > > >>> time > > > >>>>>>>>>>> based lags? > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman < > > > >>>>>>>>> sop...@confluent.io> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Adding on to John's response to 3), can you clarify when > and > > > >>> why > > > >>>>>>>>>> exactly we > > > >>>>>>>>>>>> cannot > > > >>>>>>>>>>>> convert between taskIds and partitions? If that's really > the > > > >>> case > > > >>>> I > > > >>>>>>>>>> don't > > > >>>>>>>>>>>> feel confident > > > >>>>>>>>>>>> that the StreamsPartitionAssignor is not full of bugs... > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> It seems like it currently just encodes a list of all > > > >>> partitions > > > >>>>>> (the > > > >>>>>>>>>>>> assignment) and also > > > >>>>>>>>>>>> a list of the corresponding task ids, duplicated to ensure > > > each > > > >>>>>>>>>> partition > > > >>>>>>>>>>>> has the corresponding > > > >>>>>>>>>>>> taskId at the same offset into the list. Why is that > > > >>> problematic? > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler < > > > >>> j...@confluent.io> > > > >>>>>>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks, all, for considering the points! > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 3. Interesting. I have a vague recollection of that... > Still, > > > >>>>>>>>> though, > > > >>>>>>>>>>>>> it seems a little fishy. After all, we return the > assignments > > > >>>>>>>>>>>>> themselves as task ids, and the members have to map > these to > > > >>>> topic > > > >>>>>>>>>>>>> partitions in order to configure themselves properly. If > it's > > > >>> too > > > >>>>>>>>>>>>> complicated to get this right, then how do we know that > > > >>> Streams > > > >>>> is > > > >>>>>>>>>>>>> computing the correct partitions at all? > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> 4. How about just checking the log-end timestamp when you > > > call > > > >>>> the > > > >>>>>>>>>>>>> method? Then, when you get an answer, it's as fresh as it > > > >>> could > > > >>>>>>>>>>>>> possibly be. And as a user you have just one, obvious, > "knob" > > > >>> to > > > >>>>>>>>>>>>> configure how much overhead you want to devote to > checking... > > > >>> If > > > >>>>>>>>> you > > > >>>>>>>>>>>>> want to call the broker API less frequently, you just > call > > > the > > > >>>>>>>>>> Streams > > > >>>>>>>>>>>>> API less frequently. And you don't have to worry about > the > > > >>>>>>>>>>>>> relationship between your invocations of that method and > the > > > >>>>>> config > > > >>>>>>>>>>>>> setting (e.g., you'll never get a negative number, which > you > > > >>>> could > > > >>>>>>>>> if > > > >>>>>>>>>>>>> you check the log-end timestamp less frequently than you > > > check > > > >>>> the > > > >>>>>>>>>>>>> lag). > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>> -John > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar > > > >>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks John for going through this. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> - +1, makes sense > > > >>>>>>>>>>>>>> - +1, no issues there > > > >>>>>>>>>>>>>> - Yeah the initial patch I had submitted for K-7149( > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/6935) to reduce > > > >>>>>>>>> assignmentInfo > > > >>>>>>>>>>>>> object had taskIds but the merged PR had similar size > > > >>> according > > > >>>> to > > > >>>>>>>>>> Vinoth > > > >>>>>>>>>>>>> and it was simpler so if the end result is of same size, > it > > > >>> would > > > >>>>>>>>> not > > > >>>>>>>>>>>> make > > > >>>>>>>>>>>>> sense to pivot from dictionary and again move to taskIDs. > > > >>>>>>>>>>>>>> - Not sure about what a good default would be if we > don't > > > >>>>>>>>> have a > > > >>>>>>>>>>>>> configurable setting. This gives the users the > flexibility to > > > >>> the > > > >>>>>>>>>> users > > > >>>>>>>>>>>> to > > > >>>>>>>>>>>>> serve their requirements as at the end of the day it > would > > > >>> take > > > >>>>>> CPU > > > >>>>>>>>>>>> cycles. > > > >>>>>>>>>>>>> I am ok with starting it with a default and see how it > goes > > > >>> based > > > >>>>>>>>>> upon > > > >>>>>>>>>>>>> feedback. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>> Navinder > > > >>>>>>>>>>>>>> On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth > > > >>> Chandar > > > >>>>>>>>> < > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 1. Was trying to spell them out separately. but makes > sense > > > >>>> for > > > >>>>>>>>>>>>>> readability. done > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 2. No I immediately agree :) .. makes sense. @navinder? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 3. I actually attempted only sending taskIds while > working > > > on > > > >>>>>>>>>>>> KAFKA-7149. > > > >>>>>>>>>>>>>> Its non-trivial to handle edges cases resulting from > newly > > > >>> added > > > >>>>>>>>>> topic > > > >>>>>>>>>>>>>> partitions and wildcarded topic entries. I ended up > > > >>> simplifying > > > >>>>>>>>> it > > > >>>>>>>>>> to > > > >>>>>>>>>>>>> just > > > >>>>>>>>>>>>>> dictionary encoding the topic names to reduce size. We > can > > > >>> apply > > > >>>>>>>>>> the > > > >>>>>>>>>>>> same > > > >>>>>>>>>>>>>> technique here for this map. Additionally, we could also > > > >>>>>>>>> dictionary > > > >>>>>>>>>>>>> encode > > > >>>>>>>>>>>>>> HostInfo, given its now repeated twice. I think this > would > > > >>> save > > > >>>>>>>>>> more > > > >>>>>>>>>>>>> space > > > >>>>>>>>>>>>>> than having a flag per topic partition entry. Lmk if > you are > > > >>>> okay > > > >>>>>>>>>> with > > > >>>>>>>>>>>>>> this. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 4. This opens up a good discussion. Given we support > time > > > lag > > > >>>>>>>>>> estimates > > > >>>>>>>>>>>>>> also, we need to read the tail record of the changelog > > > >>>>>>>>> periodically > > > >>>>>>>>>>>>> (unlike > > > >>>>>>>>>>>>>> offset lag, which we can potentially piggyback on > metadata > > > in > > > >>>>>>>>>>>>>> ConsumerRecord IIUC). we thought we should have a config > > > that > > > >>>>>>>>>> control > > > >>>>>>>>>>>> how > > > >>>>>>>>>>>>>> often this read happens? Let me know if there is a > simple > > > >>> way to > > > >>>>>>>>>> get > > > >>>>>>>>>>>>>> timestamp value of the tail record that we are missing. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler < > > > >>>> j...@confluent.io > > > >>>>>>>>>> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Hey Navinder, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks for updating the KIP, it's a lot easier to see > the > > > >>>>>>>>> current > > > >>>>>>>>>>>>>>> state of the proposal now. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> A few remarks: > > > >>>>>>>>>>>>>>> 1. I'm sure it was just an artifact of revisions, but > you > > > >>> have > > > >>>>>>>>>> two > > > >>>>>>>>>>>>>>> separate sections where you list additions to the > > > >>> KafkaStreams > > > >>>>>>>>>>>>>>> interface. Can you consolidate those so we can see all > the > > > >>>>>>>>>> additions > > > >>>>>>>>>>>>>>> at once? > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 2. For messageLagEstimate, can I suggest > > > "offsetLagEstimate" > > > >>>>>>>>>> instead, > > > >>>>>>>>>>>>>>> to be clearer that we're specifically measuring a > number of > > > >>>>>>>>>> offsets? > > > >>>>>>>>>>>>>>> If you don't immediately agree, then I'd at least > point out > > > >>>>>>>>> that > > > >>>>>>>>>> we > > > >>>>>>>>>>>>>>> usually refer to elements of Kafka topics as > "records", not > > > >>>>>>>>>>>>>>> "messages", so "recordLagEstimate" might be more > > > >>> appropriate. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 3. The proposal mentions adding a map of the standby > > > >>>>>>>>>> _partitions_ for > > > >>>>>>>>>>>>>>> each host to AssignmentInfo. I assume this is designed > to > > > >>>>>>>>> mirror > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>> existing "partitionsByHost" map. To keep the size of > these > > > >>>>>>>>>> metadata > > > >>>>>>>>>>>>>>> messages down, maybe we can consider making two > changes: > > > >>>>>>>>>>>>>>> (a) for both actives and standbys, encode the _task > ids_ > > > >>>>>>>>> instead > > > >>>>>>>>>> of > > > >>>>>>>>>>>>>>> _partitions_. Every member of the cluster has a copy > of the > > > >>>>>>>>>> topology, > > > >>>>>>>>>>>>>>> so they can convert task ids into specific partitions > on > > > >>> their > > > >>>>>>>>>> own, > > > >>>>>>>>>>>>>>> and task ids are only (usually) three characters. > > > >>>>>>>>>>>>>>> (b) instead of encoding two maps (hostinfo -> actives > AND > > > >>>>>>>>>> hostinfo -> > > > >>>>>>>>>>>>>>> standbys), which requires serializing all the hostinfos > > > >>> twice, > > > >>>>>>>>>> maybe > > > >>>>>>>>>>>>>>> we can pack them together in one map with a structured > > > value > > > >>>>>>>>>>>> (hostinfo > > > >>>>>>>>>>>>>>> -> [actives,standbys]). > > > >>>>>>>>>>>>>>> Both of these ideas still require bumping the protocol > > > >>> version > > > >>>>>>>>>> to 6, > > > >>>>>>>>>>>>>>> and they basically mean we drop the existing > > > >>> `PartitionsByHost` > > > >>>>>>>>>> field > > > >>>>>>>>>>>>>>> and add a new `TasksByHost` field with the structured > value > > > >>> I > > > >>>>>>>>>>>>>>> mentioned. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> 4. Can we avoid adding the new "lag refresh" config? > The > > > >>> lags > > > >>>>>>>>>> would > > > >>>>>>>>>>>>>>> necessarily be approximate anyway, so adding the config > > > >>> seems > > > >>>>>>>>> to > > > >>>>>>>>>>>>>>> increase the operational complexity of the system for > > > little > > > >>>>>>>>>> actual > > > >>>>>>>>>>>>>>> benefit. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks for the pseudocode, by the way, it really helps > > > >>>>>>>>> visualize > > > >>>>>>>>>> how > > > >>>>>>>>>>>>>>> these new interfaces would play together. And thanks > again > > > >>> for > > > >>>>>>>>>> the > > > >>>>>>>>>>>>>>> update! > > > >>>>>>>>>>>>>>> -John > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler < > > > >>>>>>>>> j...@confluent.io> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hey Vinoth, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> I started going over the KIP again yesterday. There > are a > > > >>> lot > > > >>>>>>>>>> of > > > >>>>>>>>>>>>>>>> updates, and I didn't finish my feedback in one day. > I'm > > > >>>>>>>>>> working on > > > >>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>>> now. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>>>>> John > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar < > > > >>>>>>>>>>>>> vchan...@confluent.io> > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Wondering if anyone has thoughts on these changes? I > > > liked > > > >>>>>>>>>> that > > > >>>>>>>>>>>>> the new > > > >>>>>>>>>>>>>>>>> metadata fetch APIs provide all the information at > once > > > >>>>>>>>> with > > > >>>>>>>>>>>>> consistent > > > >>>>>>>>>>>>>>>>> naming.. > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> Any guidance on what you would like to be discussed > or > > > >>>>>>>>>> fleshed > > > >>>>>>>>>>>> out > > > >>>>>>>>>>>>> more > > > >>>>>>>>>>>>>>>>> before we call a VOTE? > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > > > >>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > >>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Hi, > > > >>>>>>>>>>>>>>>>>> We have made some edits in the KIP( > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>> > > > >>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > > > >>>>>>>>>>>>>>> ) > > > >>>>>>>>>>>>>>>>>> after due deliberation on the agreed design to > support > > > >>>>>>>>> the > > > >>>>>>>>>> new > > > >>>>>>>>>>>>> query > > > >>>>>>>>>>>>>>>>>> design. This includes the new public API to query > > > >>>>>>>>>> offset/time > > > >>>>>>>>>>>> lag > > > >>>>>>>>>>>>>>>>>> information and other details related to querying > > > standby > > > >>>>>>>>>> tasks > > > >>>>>>>>>>>>>>> which have > > > >>>>>>>>>>>>>>>>>> come up after thinking of thorough details. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> - Addition of new config, “lag.fetch.interval.ms” > to > > > >>>>>>>>>>>>> configure > > > >>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>> interval of time/offset lag > > > >>>>>>>>>>>>>>>>>> - Addition of new class StoreLagInfo to store the > > > >>>>>>>>>>>> periodically > > > >>>>>>>>>>>>>>> obtained > > > >>>>>>>>>>>>>>>>>> time/offset lag > > > >>>>>>>>>>>>>>>>>> - Addition of two new functions in KafkaStreams, > > > >>>>>>>>>>>>>>> List<StoreLagInfo> > > > >>>>>>>>>>>>>>>>>> allLagInfo() and List<StoreLagInfo> > > > >>>>>>>>> lagInfoForStore(String > > > >>>>>>>>>>>>>>> storeName) to > > > >>>>>>>>>>>>>>>>>> return the lag information for an instance and a > store > > > >>>>>>>>>>>>> respectively > > > >>>>>>>>>>>>>>>>>> - Addition of new class KeyQueryMetadata. We need > > > >>>>>>>>>>>>> topicPartition > > > >>>>>>>>>>>>>>> for > > > >>>>>>>>>>>>>>>>>> each key to be matched with the lag API for the > topic > > > >>>>>>>>>>>> partition. > > > >>>>>>>>>>>>> One > > > >>>>>>>>>>>>>>> way is > > > >>>>>>>>>>>>>>>>>> to add new functions and fetch topicPartition from > > > >>>>>>>>>>>>>>> StreamsMetadataState but > > > >>>>>>>>>>>>>>>>>> we thought having one call and fetching > StreamsMetadata > > > >>>>>>>>> and > > > >>>>>>>>>>>>>>> topicPartition > > > >>>>>>>>>>>>>>>>>> is more cleaner. > > > >>>>>>>>>>>>>>>>>> - > > > >>>>>>>>>>>>>>>>>> Renaming partitionsForHost to > activePartitionsForHost in > > > >>>>>>>>>>>>>>> StreamsMetadataState > > > >>>>>>>>>>>>>>>>>> and partitionsByHostState to > activePartitionsByHostState > > > >>>>>>>>>>>>>>>>>> in StreamsPartitionAssignor > > > >>>>>>>>>>>>>>>>>> - We have also added the pseudo code of how all > the > > > >>>>>>>>>> changes > > > >>>>>>>>>>>>> will > > > >>>>>>>>>>>>>>> exist > > > >>>>>>>>>>>>>>>>>> together and support the new querying APIs > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Please let me know if anything is pending now, > before a > > > >>>>>>>>>> vote > > > >>>>>>>>>>>> can > > > >>>>>>>>>>>>> be > > > >>>>>>>>>>>>>>>>>> started on this. On Saturday, 26 October, 2019, > > > 05:41:44 > > > >>>>>>>>>> pm > > > >>>>>>>>>>>> IST, > > > >>>>>>>>>>>>>>> Navinder > > > >>>>>>>>>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> >> Since there are two soft votes for separate > > > >>>>>>>>>> active/standby > > > >>>>>>>>>>>>> API > > > >>>>>>>>>>>>>>>>>> methods, I also change my position on that. Fine > with 2 > > > >>>>>>>>>>>> separate > > > >>>>>>>>>>>>>>>>>> methods. Once we remove the lag information from > these > > > >>>>>>>>>> APIs, > > > >>>>>>>>>>>>>>> returning a > > > >>>>>>>>>>>>>>>>>> List is less attractive, since the ordering has no > > > >>>>>>>>> special > > > >>>>>>>>>>>>> meaning > > > >>>>>>>>>>>>>>> now. > > > >>>>>>>>>>>>>>>>>> Agreed, now that we are not returning lag, I am also > > > sold > > > >>>>>>>>>> on > > > >>>>>>>>>>>>> having > > > >>>>>>>>>>>>>>> two > > > >>>>>>>>>>>>>>>>>> separate functions. We already have one which > returns > > > >>>>>>>>>>>>>>> streamsMetadata for > > > >>>>>>>>>>>>>>>>>> active tasks, and now we can add another one for > > > >>>>>>>>> standbys. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On Saturday, 26 October, 2019, 03:55:16 am IST, > > > Vinoth > > > >>>>>>>>>>>>> Chandar < > > > >>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> +1 to Sophie's suggestion. Having both lag in > terms of > > > >>>>>>>>>> time > > > >>>>>>>>>>>> and > > > >>>>>>>>>>>>>>> offsets is > > > >>>>>>>>>>>>>>>>>> good and makes for a more complete API. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Since there are two soft votes for separate > > > >>>>>>>>> active/standby > > > >>>>>>>>>> API > > > >>>>>>>>>>>>>>> methods, I > > > >>>>>>>>>>>>>>>>>> also change my position on that. Fine with 2 > separate > > > >>>>>>>>>> methods. > > > >>>>>>>>>>>>>>>>>> Once we remove the lag information from these APIs, > > > >>>>>>>>>> returning a > > > >>>>>>>>>>>>> List > > > >>>>>>>>>>>>>>> is > > > >>>>>>>>>>>>>>>>>> less attractive, since the ordering has no special > > > >>>>>>>>> meaning > > > >>>>>>>>>> now. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> lag in offsets vs time: Having both, as suggested > by > > > >>>>>>>>>> Sophie > > > >>>>>>>>>>>>> would > > > >>>>>>>>>>>>>>> of > > > >>>>>>>>>>>>>>>>>> course be best. What is a little unclear to me is, > how > > > in > > > >>>>>>>>>>>> details > > > >>>>>>>>>>>>>>> are we > > > >>>>>>>>>>>>>>>>>> going to compute both? > > > >>>>>>>>>>>>>>>>>> @navinder may be next step is to flesh out these > details > > > >>>>>>>>>> and > > > >>>>>>>>>>>>> surface > > > >>>>>>>>>>>>>>> any > > > >>>>>>>>>>>>>>>>>> larger changes we need to make if need be. > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> Any other details we need to cover, before a VOTE > can be > > > >>>>>>>>>> called > > > >>>>>>>>>>>>> on > > > >>>>>>>>>>>>>>> this? > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck < > > > >>>>>>>>>> bbej...@gmail.com > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> I am jumping in a little late here. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Overall I agree with the proposal to push decision > > > >>>>>>>>>> making on > > > >>>>>>>>>>>>>>> what/how to > > > >>>>>>>>>>>>>>>>>>> query in the query layer. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> For point 5 from above, I'm slightly in favor of > having > > > >>>>>>>>>> a new > > > >>>>>>>>>>>>>>> method, > > > >>>>>>>>>>>>>>>>>>> "standbyMetadataForKey()" or something similar. > > > >>>>>>>>>>>>>>>>>>> Because even if we return all tasks in one list, > the > > > >>>>>>>>> user > > > >>>>>>>>>>>> will > > > >>>>>>>>>>>>>>> still have > > > >>>>>>>>>>>>>>>>>>> to perform some filtering to separate the different > > > >>>>>>>>>> tasks, > > > >>>>>>>>>>>> so I > > > >>>>>>>>>>>>>>> don't > > > >>>>>>>>>>>>>>>>>> feel > > > >>>>>>>>>>>>>>>>>>> making two calls is a burden, and IMHO makes things > > > >>>>>>>>> more > > > >>>>>>>>>>>>>>> transparent for > > > >>>>>>>>>>>>>>>>>>> the user. > > > >>>>>>>>>>>>>>>>>>> If the final vote is for using an "isActive" > field, I'm > > > >>>>>>>>>> good > > > >>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>> that as > > > >>>>>>>>>>>>>>>>>>> well. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> Just my 2 cents. > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > > > >>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > >>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> I think now we are aligned on almost all the > design > > > >>>>>>>>>> parts. > > > >>>>>>>>>>>>>>> Summarising > > > >>>>>>>>>>>>>>>>>>>> below what has been discussed above and we have a > > > >>>>>>>>>> general > > > >>>>>>>>>>>>>>> consensus on. > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>>>>>> - Rather than broadcasting lag across all > nodes at > > > >>>>>>>>>>>>>>> rebalancing/with > > > >>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>> heartbeat, we will just return a list of all > > > >>>>>>>>> available > > > >>>>>>>>>>>>> standby’s > > > >>>>>>>>>>>>>>> in the > > > >>>>>>>>>>>>>>>>>>>> system and the user can make IQ query any of those > > > >>>>>>>>>> nodes > > > >>>>>>>>>>>>> which > > > >>>>>>>>>>>>>>> will > > > >>>>>>>>>>>>>>>>>>> return > > > >>>>>>>>>>>>>>>>>>>> the response, and the lag and offset time. Based > on > > > >>>>>>>>>> which > > > >>>>>>>>>>>>> user > > > >>>>>>>>>>>>>>> can > > > >>>>>>>>>>>>>>>>>> decide > > > >>>>>>>>>>>>>>>>>>>> if he wants to return the response back or call > > > >>>>>>>>> another > > > >>>>>>>>>>>>> standby. > > > >>>>>>>>>>>>>>>>>>>> - The current metadata query frequency will > not > > > >>>>>>>>>> change. > > > >>>>>>>>>>>>> It > > > >>>>>>>>>>>>>>> will be > > > >>>>>>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>>>>>> same as it does now, i.e. before each query. > > > >>>