10/20: I think I'm aligned with John's replies as well. Guozhang
On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar <vchan...@confluent.io> wrote: > >during restoring state the active might have some lag > > Great catch, yes.. we cannot assume lag = 0 for active. Lets report active > lag as well then. If active is too laggy, the app can then deem the store > partition unavailable (based on what the application is willing to > tolerate). > > @matthias do you agree? We can then begin the vote. > > On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar > <navinder_b...@yahoo.com.invalid> wrote: > > > I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all > > actives don't have 0 lag, as during restoring state the active might have > > some lag and one of the features of this KIP is to provide an option to > > query from active (which might be in restoring state). > > I will update the KIP with rejected alternatives and post this will start > > a vote if everyone agrees on this. > > On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler < > > j...@confluent.io> wrote: > > > > Hi all, > > > > Thanks for the "reset", Vinoth. It brings some needed clarity to the > > discussion. > > > > 10. My 2 cents: we might as well include the lags for the active > > copies as well. This is a more context-free API. If we only include > > standbys, this choice won't make sense to users unless they understand > > that the active task cannot lag in the steady state, since it's the > > source of updates. This isn't a bad thing to realize, but it's just > > more mental overhead for the person who wants to list the lags for > > "all local stores". > > > > Another reason is that we could consider also reporting the lag for > > actives during recovery (when they would have non-zero lag). We don't > > have to now, but if we choose to call the method "standby lags", then > > we can't make this choice in the future. > > > > That said, it's just my opinion. I'm fine either way. > > > > 20. Vinoth's reply makes sense to me, fwiw. > > > > Beyond these two points, I'm happy with the current proposal. > > > > Thanks again, > > -John > > > > On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar <vchan...@confluent.io> > > wrote: > > > > > > 10. I considered that. Had to pick one or the other. Can just return > > > standby too and rename method to may be “allLocalStandbyOffsetLags()” > to > > > have it explicit. (Standby should implicitly convey that we are talking > > > about stores) > > > > > > 20. What I meant was, we are returning HostInfo instead of > > StreamsMetadata > > > since thats sufficient to route query; same for “int partition “ vs > topic > > > partition before. Previously KeyQueryMetadata had similar structure but > > > used StreamsMetadata and TopicPartition objects to convey same > > information > > > > > > @navinder KIP is already upto date with the email I sent, except for > the > > > reasonings I was laying out. +1 on revisiting rejected alternatives. > > > Please make the follow up changes > > > > > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax <matth...@confluent.io > > > > > wrote: > > > > > > > Thanks for the summary Vinoth! > > > > > > > > I buy the overall argument. Couple of clarification questions: > > > > > > > > > > > > 10. Why do we need to include the active stores in > > > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return > lag > > > > for standbys? > > > > > > > > > > > > 20: What does > > > > > > > > > Thin the KeyQueryMetadata object to just contain the minimum > > information > > > > > needed. > > > > > > > > exaclty mean? What is the "minimum information needed" ? > > > > > > > > > > > > @Navinder: if you agree, can you update the KIP accoringly? With all > > the > > > > proposals, it's hard to keep track and it would be great to have the > > > > current proposal summarized in the wiki page. > > > > > > > > Please also update the "Rejected alternative" sections to avoid that > we > > > > cycle back to old proposal (including the reason _why_ they got > > rejected). > > > > > > > > > > > > Thanks a lot! > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote: > > > > > Given we have had a healthy discussion on this topic for a month > now > > and > > > > > still with many loose ends and open ended conversations, I thought > It > > > > would > > > > > be worthwhile to take a step back and re-evaluate everything in the > > > > context > > > > > of the very real use-case and its specific scenarios. > > > > > > > > > > First, let's remind ourselves of the query routing flow of the > > streams > > > > > application ("app" here on) > > > > > > > > > > 1. queries get routed to any random streams instance in the > > cluster > > > > > ("router" here on) > > > > > 2. router then uses Streams metadata to pick active/standby > > instances > > > > > for that key's store/partition > > > > > 3. router instance also maintains global lag information for all > > > > stores > > > > > and all their partitions, by a gossip/broadcast/heartbeat > protocol > > > > (done > > > > > outside of Streams framework), but using > > KafkaStreams#allMetadata() > > > > for > > > > > streams instance discovery. > > > > > 4. router then uses information in 2 & 3 to determine which > > instance > > > > to > > > > > send the query to : always picks active instance if alive or > the > > most > > > > > in-sync live standby otherwise. > > > > > > > > > > Few things to note : > > > > > > > > > > A) We choose to decouple how the lag information is obtained > (control > > > > > plane) from query path (data plane), since that provides more > > flexibility > > > > > in designing the control plane. i.e pick any or combination of > > gossip, > > > > > N-way broadcast, control the rate of propagation, piggybacking on > > request > > > > > responses > > > > > B) Since the app needs to do its own control plane, talking to > other > > > > > instances directly for failure detection & exchanging other > > metadata, we > > > > > can leave the lag APIs added to KafkaStreams class itself local and > > > > simply > > > > > return lag for all store/partitions on that instance. > > > > > C) Streams preserves its existing behavior of instances only > talking > > to > > > > > each other through the Kafka brokers. > > > > > D) Since the router treats active/standby differently, it would be > > good > > > > for > > > > > the KafkaStreams APIs to hand them back explicitly, with no > > additional > > > > > logic needed for computing them. Specifically, the router only > knows > > two > > > > > things - key and store and if we just return a > > > > Collection<StreamsMetadata> > > > > > back, it cannot easily tease apart active and standby. Say, a > streams > > > > > instance hosts the same store as both active and standby for > > different > > > > > partitions, matching by just storename the app will find it in both > > > > active > > > > > and standby lists. > > > > > E) From above, we assume the global lag estimate (lag per store > topic > > > > > partition) are continuously reported amongst application instances > > and > > > > > already available on the router during step 2 above. Hence, > > attaching lag > > > > > APIs to StreamsMetadata is unnecessary and does not solve the needs > > > > anyway. > > > > > F) Currently returned StreamsMetadata object is really information > > about > > > > a > > > > > streams instance, that is not very specific to the key being > queried. > > > > > Specifically, router has no knowledge of the topic partition a > given > > key > > > > > belongs, this is needed for matching to the global lag information > in > > > > step > > > > > 4 above (and as also the example code in the KIP showed before). > The > > > > > StreamsMetadata, since it's about the instance itself, would > contain > > all > > > > > topic partitions and stores on that instance, not specific to the > > given > > > > key. > > > > > G) A cleaner API would thin the amount of information returned to > > > > > specifically the given key and store - which is partition the key > > belongs > > > > > to, active host info, list of standby host info. KeyQueryMetadata > was > > > > > proposed in this spirit, but still hung onto StreamsMetadata as the > > > > member > > > > > field(s) which leaks more than what's required to route. > > > > > H) For this use case, its sufficient to just support offsetLag > > initially, > > > > > without needing time based lag information right away. So we > > probably > > > > > don't need a new top level StoreLagInfo class anymore. > > > > > I) On whether the local lag information is cached within Streams or > > App > > > > > code, its again preferable and be necessary for the App to be able > to > > > > > invoke the lag api from an app thread and keep a cached state in > > memory, > > > > to > > > > > be piggybacked with request responses. If this information is also > > cached > > > > > inside Streams, it does no harm anyway. > > > > > > > > > > Based on this reasoning, I have made the following updates to the > KIP > > > > > > > > > > - Drop the proposed KafkaStreams#allStandbyMetadata() and > > > > > KafkaStreams#allStandbyMetadataForStore() methods, with intent of > > > > > introducing minimal APIs to support the usage described above > > > > > - Drop the KafkaStreams#lagInfoForStore() method and just stick to > > one > > > > > method that returns all the local store partition's lags. > > > > > - Rename allLagInfo() to allLocalStoreOffsetLags() to be explicit > > that > > > > the > > > > > lags are local and only on offsets. > > > > > - Drop StoreLagInfo class > > > > > - allLocalStoreOffsetLags() returns Map<String, Map<Integer, > Long>>, > > > > which > > > > > maps a store name to a map containing partition to offset lag info. > > It > > > > will > > > > > include both active and standby store partitions. active always has > > 0 lag > > > > > since its caught up with changelog topic. > > > > > - Thin the KeyQueryMetadata object to just contain the minimum > > > > information > > > > > needed. Rename allMetadataForKey() method variants to > > > > queryMetadataForKey() > > > > > variants that return KeyQueryMetadata > > > > > - Propose deprecating two current methods that return metadata > based > > on > > > > key > > > > > : KafkaStreams#metadataForKey(..), to get more users to use the > new > > > > > queryMetadataForKey APIs > > > > > - We will still have to enhance StreamsMetadata with fields for > > > > > standbyTopicPartitions and standbyStateStoreNames, since that is a > > core > > > > > object that gets updated upon rebalance. > > > > > > > > > > > > > > > Please let us know if this is agreeable. We will also work some of > > this > > > > > discussion into the background/proposed changes sections, upon > > feedback. > > > > > > > > > > > > > > > On Tue, Nov 12, 2019 at 9:17 AM Vinoth Chandar < > > vchan...@confluent.io> > > > > > wrote: > > > > > > > > > >> In all, is everyone OK with > > > > >> > > > > >> - Dropping KeyQueryMetadata, and the allMetadataForKey() apis > > > > >> - Dropping allLagInfo() from KafkaStreams class, Drop > StoreLagInfo > > > > class > > > > >> - Add offsetLag(store, key, serializer) -> Optional<Long> & > > > > >> offsetLag(store, key, partitioner) -> Optional<Long> to > > StreamsMetadata > > > > >> - Duplicate the current methods for standbyMetadata in > > KafkaStreams : > > > > >> allStandbyMetadata(), allStandbyMetadataForStore(), two variants > of > > > > >> standbyMetadataForKey(), > > > > >> > > > > >> > > > > >> Responses to Guozhang: > > > > >> > > > > >> 1.1 Like I mentioned before, the allStandbyMetadata() and > > > > >> allStandbyMetadataForStore() complement existing allMetadata() and > > > > >> allMetadataForStore(), since we don't want to change behavior of > > > > existing > > > > >> APIs. Based on discussions so far, if we decide to drop > > > > KeyQueryMetadata, > > > > >> then we will need to introduce 4 equivalents for standby metadata > as > > > > >> Matthias mentioned. > > > > >> 1.2 I am okay with pushing lag information to a method on > > > > StreamsMetadata > > > > >> (Personally, I won't design it like that, but happy to live with > it) > > > > like > > > > >> what Matthias suggested. But assuming topic name <=> store name > > > > equivalency > > > > >> for mapping this seems like a broken API to me. If all of Streams > > code > > > > were > > > > >> written like this, I can understand. But I don't think its the > > case? I > > > > >> would not be comfortable making such assumptions outside of public > > APIs. > > > > >>>> look into each one's standby partition / stores to tell which > one > > > > >> StreamsMetadata is corresponding to the instance who holds a > > specific > > > > key > > > > >> as standby, yes, but I feel this one extra iteration is worth to > > avoid > > > > >> introducing a new class. > > > > >> This sort of thing would lead to non-standardized/potentially > buggy > > > > client > > > > >> implementations, for something I expect the system would hand me > > > > directly. > > > > >> I don't personally feel introducing a new class is so bad, to > > warrant > > > > the > > > > >> user to do all this matching. Given the current APIs are not > > explicitly > > > > >> named to denote active metadata, it gives us a chance to build > > something > > > > >> more direct and clear IMO. If we do allMetadataForKey apis, then > we > > > > should > > > > >> clearly separate active and standby ourselves. Alternate is > separate > > > > active > > > > >> and standby APIs as Matthias suggests, which I can make peace > with. > > > > >> > > > > >> 1.3 Similar as above. In Streams code, we treat topic partitions > and > > > > store > > > > >> names separately?. > > > > >> 2.1 I think most databases build replication using logical > offsets, > > not > > > > >> time. Time lag can be a nice to have feature, but offset lag is > > fully > > > > >> sufficient for a lot of use-cases. > > > > >> 2.2.1 We could support a lagInfoForStores() batch api. makes > sense. > > > > >> > > > > >> > > > > >> Responses to Matthias : > > > > >> (100) Streams can still keep the upto date version in memory and > > > > >> implementation could be for now just reading this already > refreshed > > > > value. > > > > >> Designing the API, with intent of pushing this to the user keeps > > doors > > > > >> open for supporting time based lag in the future. > > > > >> (101) I am not sure what the parameters of evaluating approaches > > here > > > > is. > > > > >> Generally, when I am handed a Metadata object, I don't expect to > > further > > > > >> query it for more information semantically. I would not also force > > user > > > > to > > > > >> make separate calls for active and standby metadata. > > > > >> Well, that may be just me. So sure, we can push this into > > > > StreamsMetadata > > > > >> if everyone agrees! > > > > >> +1 on duplicating all 4 methods for standbys in this case. > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> On Tue, Nov 12, 2019 at 4:12 AM Navinder Brar > > > > >> <navinder_b...@yahoo.com.invalid> wrote: > > > > >> > > > > >>> > > > > >>> - Looking back, I agree that 2 calls to StreamsMetadata to > fetch > > > > >>> StreamsMetadata and then using something like ‘long > > > > >>> StreamsMetadata#offsetLag(store, key)’ which Matthias suggested > is > > > > better > > > > >>> than introducing a new public API i.e. ‘KeyQueryMetadata’. I will > > > > change > > > > >>> the KIP accordingly. > > > > >>> - >> I am actually not even sure, why we added > > > > >>> `StreamsMetadata#topicPartitions()` originally > > > > >>> I think it is helpful in showing which host holds which source > > topic > > > > >>> partitions for /instances endpoint and when you query a key, you > > need > > > > to > > > > >>> match the partition on which the key belongs to with the hosts > > holding > > > > >>> source topic partitions for that partition. Is there any other > way > > to > > > > get > > > > >>> this info? > > > > >>> > > > > >>> On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang > < > > > > >>> wangg...@gmail.com> wrote: > > > > >>> > > > > >>> Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams > > > > instances, so > > > > >>> 1) allMetadata would still return the same number of > > StreamsMetadata in > > > > >>> collection, just that within the StreamsMetadata now you have new > > APIs > > > > to > > > > >>> access standby partitions / stores. So I think it would not be a > > > > breaking > > > > >>> change to the public API to not include `allStandbyMetadata` and > ` > > > > >>> allStandbyMetadataForStore` but still rely on > > `allMetadata(ForStore)`? > > > > >>> > > > > >>> Regarding 1.1: Good point about the partition number. But I'm > still > > > > >>> wondering if it is definitely necessary to introduce a new > > > > >>> `KeyQueryMetadata` > > > > >>> interface class. E.g. suppose our function signature is > > > > >>> > > > > >>> Collection<StreamsMetadata> allMetadataForKey > > > > >>> > > > > >>> When you get the collection of StreamsMetadata you need to > iterate > > over > > > > >>> the > > > > >>> collection and look into each one's standby partition / stores to > > tell > > > > >>> which one StreamsMetadata is corresponding to the instance who > > holds a > > > > >>> specific key as standby, yes, but I feel this one extra iteration > > is > > > > worth > > > > >>> to avoid introducing a new class. > > > > >>> > > > > >>> > > > > >>> Guozhang > > > > >>> > > > > >>> On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax < > > matth...@confluent.io > > > > > > > > > >>> wrote: > > > > >>> > > > > >>>> I agree, that we might want to drop the time-base lag for the > > initial > > > > >>>> implementation. There is no good way to get this information > > without a > > > > >>>> broker side change. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> (100) For the offset lag information, I don't see a reason why > > the app > > > > >>>> should drive when this information is updated though, because KS > > will > > > > >>>> update this information anyway (once per `commit.interval.ms` > -- > > and > > > > >>>> updating it more frequently does not make sense, as it most > likely > > > > won't > > > > >>>> change more frequently anyway). > > > > >>>> > > > > >>>> If you all insist that the app should drive it, I can live with > > it, > > > > but > > > > >>>> I think it makes the API unnecessarily complex without a > benefit. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> (101) I still don't understand why we need to have > > `KeyQueryMetadata` > > > > >>>> though. Note, that an instance can only report lag for it's > local > > > > >>>> stores, but not remote stores as it does not know to what > offset a > > > > >>>> remote standby has caught up to. > > > > >>>> > > > > >>>>> Because we needed to return the topicPartition the key belongs > > to, in > > > > >>>>> order to correlate with the lag information from the other set > of > > > > >>> APIs. > > > > >>>> > > > > >>>> My suggestion is to get the lag information from > > `StreamsMetadata` -- > > > > >>>> which partition the store belongs to can be completely > > encapsulated > > > > >>>> within KS as all information is local, and I don't think we need > > to > > > > >>>> expose it to the user at all. > > > > >>>> > > > > >>>> We can just add `long StreamsMetadata#offsetLag(store, key)`. If > > the > > > > >>>> store is local we return its lag, if it's remote we return `-1` > > (ie, > > > > >>>> UNKNOWN). As an alternative, we can change the return type to > > > > >>>> `Optional<Long>`. This works for active and standby task alike. > > > > >>>> > > > > >>>> Note, that a user must verify if `StreamsMeatadata` is for > itself > > > > >>>> (local) or remote anyway. We only need to provide a way that > > allows > > > > >>>> users to distinguish between active an standby. (More below.) > > > > >>>> > > > > >>>> > > > > >>>> I am actually not even sure, why we added > > > > >>>> `StreamsMetadata#topicPartitions()` originally -- seems pretty > > > > useless. > > > > >>>> Can we deprecate it as side cleanup in this KIP? Or do I miss > > > > something? > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> (102) > > > > >>>> > > > > >>>>> There are basically 2 reasons. One is that instead of having > two > > > > >>>> functions, one to get StreamsMetadata for active and one for > > replicas. > > > > >>> We > > > > >>>> are fetching both in a single call and we have a way to get only > > > > active > > > > >>> or > > > > >>>> only replicas from the KeyQueryMetadata object(just like > > isStandby() > > > > and > > > > >>>> isActive() discussion we had earlier) > > > > >>>> > > > > >>>> I understand, that we need two methods. However, I think we can > > > > simplify > > > > >>>> the API and not introduce `KeyQueryMetadata`, but just > "duplicate" > > > > all 4 > > > > >>>> existing methods for standby tasks: > > > > >>>> > > > > >>>> // note that `standbyMetadataForKey` return a Collection in > > contrast > > > > to > > > > >>>> existing `metadataForKey` > > > > >>>> > > > > >>>>> Collection<StreamsMetadata> allStandbyMetadata() > > > > >>>>> Collection<StreamsMetadata> allStandbyMetadataForStore(String > > > > >>>> storeName) > > > > >>>>> Collection<StreamsMetadata> metadataForKey(String storeName, K > > key, > > > > >>>> Serializer<K> keySerializer) > > > > >>>>> Collection<StreamsMetadata> metadataForKey(String storeName, K > > key, > > > > >>>> StreamPartitioner<? super K, ?> partitioner) > > > > >>>> > > > > >>>> Because the existing methods return all active metadata, there > is > > no > > > > >>>> reason to return `KeyQueryMetadata` as it's more complicated to > > get > > > > the > > > > >>>> standby metadata. With `KeyQueryMetadata` the user needs to make > > more > > > > >>>> calls to get the metadata: > > > > >>>> > > > > >>>> KafkaStreams#allMetadataForKey() > > > > >>>> #getActive() > > > > >>>> > > > > >>>> KafkaStreams#allMetadataForKey() > > > > >>>> #getStandby() > > > > >>>> > > > > >>>> vs: > > > > >>>> > > > > >>>> KafkaStreams#metadataForKey() > > > > >>>> > > > > >>>> KafkaStreams#standbyMetadataForKey() > > > > >>>> > > > > >>>> The wrapping of both within `KeyQueryMetadata` does not seem to > > > > provide > > > > >>>> any benefit but increase our public API surface. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> @Guozhang: > > > > >>>> > > > > >>>> (1.1. + 1.2.) From my understanding `allMetadata()` (and other > > > > existing > > > > >>>> methods) will only return the metadata of _active_ tasks for > > backward > > > > >>>> compatibility reasons. If we would return standby metadata, > > existing > > > > >>>> code would potentially "break" because the code might pick a > > standby > > > > to > > > > >>>> query a key without noticing. > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>>> -Matthias > > > > >>>> > > > > >>>> > > > > >>>> On 11/8/19 6:07 AM, Navinder Brar wrote: > > > > >>>>> Thanks, Guozhang for going through it again. > > > > >>>>> > > > > >>>>> - 1.1 & 1.2: The main point of adding topicPartition in > > > > >>>> KeyQueryMetadata is not topicName, but the partition number. I > > agree > > > > >>>> changelog topicNames and store names will have 1-1 mapping but > we > > also > > > > >>> need > > > > >>>> the partition number of the changelog for which are calculating > > the > > > > lag. > > > > >>>> Now we can add partition number in StreamsMetadata but it will > be > > > > >>>> orthogonal to the definition of StreamsMetadata i.e.- > “Represents > > the > > > > >>> state > > > > >>>> of an instance (process) in a {@link KafkaStreams} application.” > > If > > > > we > > > > >>> add > > > > >>>> partition number in this, it doesn’t stay metadata for an > > instance, > > > > >>> because > > > > >>>> now it is storing the partition information for a key being > > queried. > > > > So, > > > > >>>> having “KeyQueryMetadata” simplifies this as now it contains all > > the > > > > >>>> metadata and also changelog and partition information for which > we > > > > need > > > > >>> to > > > > >>>> calculate the lag. > > > > >>>>> > > > > >>>>> Another way is having another function in parallel to > > metadataForKey, > > > > >>>> which returns the partition number for the key being queried. > But > > then > > > > >>> we > > > > >>>> would need 2 calls to StreamsMetadataState, once to fetch > > metadata and > > > > >>>> another to fetch partition number. Let me know if any of these > two > > > > ways > > > > >>>> seem more intuitive than KeyQueryMetadata then we can try to > > converge > > > > on > > > > >>>> one. > > > > >>>>> - 1.3: Again, it is required for the partition number. We > can > > > > drop > > > > >>>> store name though. > > > > >>>>> - 2.1: I think this was done in accordance with the opinion > > from > > > > >>> John > > > > >>>> as time lag would be better implemented with a broker level > > change and > > > > >>>> offset change is readily implementable. @vinoth? > > > > >>>>> - 2.2.1: Good point. +1 > > > > >>>>> - 2.2.2: I am not well aware of it, @vinoth any comments? > > > > >>>>> - 3.1: I think we have already agreed on dropping this, we > > need to > > > > >>>> KIP. Also, is there any opinion on lagInfoForStore(String > > storeName) > > > > vs > > > > >>>> lagInfoForStore(String storeName, int partition) > > > > >>>>> - 3.2: But in functions such as onAssignment(), > > > > >>>> onPartitionsAssigned(), for standbyTasks also the > topicPartitions > > we > > > > use > > > > >>>> are input topic partitions and not changelog partitions. Would > > this be > > > > >>>> breaking from that semantics? > > > > >>>>> > > > > >>>>> > > > > >>>>> On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang > Wang > > < > > > > >>>> wangg...@gmail.com> wrote: > > > > >>>>> > > > > >>>>> Hi Navinder, Vinoth, thanks for the updated KIP! > > > > >>>>> > > > > >>>>> Read through the discussions so far and made another pass on > the > > wiki > > > > >>>> page, > > > > >>>>> and here are some more comments: > > > > >>>>> > > > > >>>>> 1. About the public APIs: > > > > >>>>> > > > > >>>>> 1.1. It is not clear to me how allStandbyMetadataForStore > > > > >>>>> and allStandbyMetadata would be differentiated from the > original > > APIs > > > > >>>> given > > > > >>>>> that we will augment StreamsMetadata to include both active and > > > > >>> standby > > > > >>>>> topic-partitions and store names, so I think we can still use > > > > >>> allMetadata > > > > >>>>> and allMetadataForStore to get the collection of instance > > metadata > > > > >>> that > > > > >>>>> host the store both as active and standbys. Are there any > > specific > > > > use > > > > >>>>> cases where we ONLY want to get the standby's metadata? And > even > > if > > > > >>> there > > > > >>>>> are, we can easily filter it out from the allMetadata / > > > > >>>> allMetadataForStore > > > > >>>>> right? > > > > >>>>> > > > > >>>>> 1.2. Similarly I'm wondering for allMetadataForKey, can we > > return the > > > > >>>> same > > > > >>>>> type: "Collection<StreamsMetadata>" which includes 1 for > active, > > and > > > > >>> N-1 > > > > >>>>> for standbys, and callers can easily identify them by looking > > inside > > > > >>> the > > > > >>>>> StreamsMetadata objects? In addition I feel the > "topicPartition" > > > > field > > > > >>>>> inside "KeyQueryMetadata" is not very important since the > > changelog > > > > >>>>> topic-name is always 1-1 mapping to the store name, so as long > > as the > > > > >>>> store > > > > >>>>> name matches, the changelog topic name should always match > (i.e. > > in > > > > >>>>> the pseudo code, just checking store names should be > > sufficient). If > > > > >>> all > > > > >>>> of > > > > >>>>> the above assumption is true, I think we can save us from > > introducing > > > > >>> one > > > > >>>>> more public class here. > > > > >>>>> > > > > >>>>> 1.3. Similarly in StoreLagInfo, seems not necessary to include > > the > > > > >>> topic > > > > >>>>> partition name in addition to the store name. > > > > >>>>> > > > > >>>>> 2. About querying store lags: we've discussed about separating > > the > > > > >>>> querying > > > > >>>>> of the lag information and the querying of the host information > > so I > > > > >>>> still > > > > >>>>> support having separate APIs here. More thoughts: > > > > >>>>> > > > > >>>>> 2.1. Compared with offsets, I'm wondering would time-difference > > be > > > > >>> more > > > > >>>>> intuitive for users to define the acceptable "staleness"? More > > > > >>> strictly, > > > > >>>>> are there any scenarios where we would actually prefer offsets > > over > > > > >>>>> timestamps except that the timestamps are not available? > > > > >>>>> > > > > >>>>> 2.2. I'm also a bit leaning towards not putting the burden of > > > > >>>> periodically > > > > >>>>> refreshing our lag and caching it (and introducing another > > config) on > > > > >>> the > > > > >>>>> streams side but document clearly its cost and let users to > > consider > > > > >>> its > > > > >>>>> call frequency; of course in terms of implementation there are > > some > > > > >>>>> optimizations we can consider: > > > > >>>>> > > > > >>>>> 1) for restoring active tasks, the log-end-offset is read once > > since > > > > >>> it > > > > >>>> is > > > > >>>>> not expected to change, and that offset / timestamp can be > > remembered > > > > >>> for > > > > >>>>> lag calculation and we do not need to refresh again; > > > > >>>>> 2) for standby tasks, there's a "Map<TopicPartition, Long> > > > > >>>>> endOffsets(Collection<TopicPartition> partitions)" in > > KafkaConsumer > > > > to > > > > >>>>> batch a list of topic-partitions in one round-trip, and we can > > use > > > > >>> that > > > > >>>> to > > > > >>>>> let our APIs be sth. like "lagInfoForStores(Collection<String> > > > > >>>> storeNames)" > > > > >>>>> to enable the batching effects. > > > > >>>>> > > > > >>>>> 3. Misc.: > > > > >>>>> > > > > >>>>> 3.1 There's a typo on the pseudo code "globalLagInforation". > > Also it > > > > >>>> seems > > > > >>>>> not describing how that information is collected (personally I > > also > > > > >>> feel > > > > >>>>> one "lagInfoForStores" is sufficient). > > > > >>>>> 3.2 Note there's a slight semantical difference between active > > and > > > > >>>>> standby's "partitions" inside StreamsMetadata, for active tasks > > the > > > > >>>>> partitions are actually input topic partitions for the task: > > some of > > > > >>> them > > > > >>>>> may also act as changelog topics but these are exceptional > > cases; for > > > > >>>>> standby tasks the "standbyTopicPartitions" are actually the > > changelog > > > > >>>>> topics of the task. So maybe renaming it to > > > > >>> "standbyChangelogPartitions" > > > > >>>> to > > > > >>>>> differentiate it? > > > > >>>>> > > > > >>>>> > > > > >>>>> Overall I think this would be a really good KIP to add to > > Streams, > > > > >>> thank > > > > >>>>> you so much! > > > > >>>>> > > > > >>>>> > > > > >>>>> Guozhang > > > > >>>>> > > > > >>>>> On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar > > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>> > > > > >>>>>> +1 on implementing offset based lag for now and push > time-based > > lag > > > > >>> to a > > > > >>>>>> later point in time when broker changes are done. Although > > > > time-based > > > > >>>> lag > > > > >>>>>> enhances the readability, it would not be a make or break > > change for > > > > >>>>>> implementing this KIP. > > > > >>>>>> > > > > >>>>>> Vinoth has explained the role of KeyQueryMetadata, let me in > > add in > > > > >>> my 2 > > > > >>>>>> cents as well. > > > > >>>>>> - There are basically 2 reasons. One is that instead of > > having > > > > two > > > > >>>>>> functions, one to get StreamsMetadata for active and one for > > > > >>> replicas. > > > > >>>> We > > > > >>>>>> are fetching both in a single call and we have a way to get > only > > > > >>> active > > > > >>>> or > > > > >>>>>> only replicas from the KeyQueryMetadata object(just like > > isStandby() > > > > >>> and > > > > >>>>>> isActive() discussion we had earlier) > > > > >>>>>> - Since even after fetching the metadata now we have a > > > > requirement > > > > >>>> of > > > > >>>>>> fetching the topicPartition for which the query came:- to > fetch > > lag > > > > >>> for > > > > >>>>>> that specific topicPartition. Instead of having another call > to > > > > fetch > > > > >>>> the > > > > >>>>>> partition from StreamsMetadataState we thought using one > single > > call > > > > >>> and > > > > >>>>>> fetching partition and all metadata would be better. > > > > >>>>>> - Another option was to change StreamsMetadata object and > add > > > > >>>>>> topicPartition in that for which the query came but it doesn’t > > make > > > > >>>> sense > > > > >>>>>> in terms of semantics as it StreamsMetadata. Also, > > KeyQueryMetadata > > > > >>>>>> represents all the metadata for the Key being queried, i.e. > the > > > > >>>> partition > > > > >>>>>> it belongs to and the list of StreamsMetadata(hosts) active or > > > > >>> replica > > > > >>>>>> where the key could be found. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth > > Chandar < > > > > >>>>>> vchan...@confluent.io> wrote: > > > > >>>>>> > > > > >>>>>> +1 to John, suggestion on Duration/Instant and dropping the > > API to > > > > >>>> fetch > > > > >>>>>> all store's lags. However, I do think we need to return lags > per > > > > >>> topic > > > > >>>>>> partition. So not sure if single return value would work? We > > need > > > > >>> some > > > > >>>> new > > > > >>>>>> class that holds a TopicPartition and Duration/Instant > variables > > > > >>>> together? > > > > >>>>>> > > > > >>>>>> 10) Because we needed to return the topicPartition the key > > belongs > > > > >>> to, > > > > >>>> in > > > > >>>>>> order to correlate with the lag information from the other set > > of > > > > >>> APIs. > > > > >>>>>> Otherwise, we don't know which topic partition's lag estimate > to > > > > >>> use. We > > > > >>>>>> tried to illustrate this on the example code. StreamsMetadata > is > > > > >>> simply > > > > >>>>>> capturing state of a streams host/instance, where as > > TopicPartition > > > > >>>> depends > > > > >>>>>> on the key passed in. This is a side effect of our decision to > > > > >>> decouple > > > > >>>> lag > > > > >>>>>> based filtering on the metadata apis. > > > > >>>>>> > > > > >>>>>> 20) Goes back to the previous point. We needed to return > > information > > > > >>>> that > > > > >>>>>> is key specific, at which point it seemed natural for the > > > > >>>> KeyQueryMetadata > > > > >>>>>> to contain active, standby, topic partition for that key. If > we > > > > >>> merely > > > > >>>>>> returned a standbyMetadataForKey() -> > > Collection<StreamsMetadata> > > > > >>>> standby, > > > > >>>>>> an active metadataForKey() -> StreamsMetadata, and new > > > > >>>>>> getTopicPartition(key) -> topicPartition object back to the > > caller, > > > > >>> then > > > > >>>>>> arguably you could do the same kind of correlation. IMO having > > a the > > > > >>>>>> KeyQueryMetadata class to encapsulate all this is a friendlier > > API. > > > > >>>>>> allStandbyMetadata() and allStandbyMetadataForStore() are > just > > > > >>> counter > > > > >>>>>> parts for metadataForStore() and allMetadata() that we > introduce > > > > >>> mostly > > > > >>>> for > > > > >>>>>> consistent API semantics. (their presence implicitly could > help > > > > >>> denote > > > > >>>>>> metadataForStore() is for active instances. Happy to drop them > > if > > > > >>> their > > > > >>>>>> utility is not clear) > > > > >>>>>> > > > > >>>>>> 30) This would assume we refresh all the standby lag > information > > > > >>> every > > > > >>>>>> time we query for that StreamsMetadata for a specific store? > For > > > > time > > > > >>>> based > > > > >>>>>> lag, this will involve fetching the tail kafka record at once > > from > > > > >>>> multiple > > > > >>>>>> kafka topic partitions? I would prefer not to couple them like > > this > > > > >>> and > > > > >>>>>> have the ability to make granular store (or even topic > partition > > > > >>> level) > > > > >>>>>> fetches for lag information. > > > > >>>>>> > > > > >>>>>> 32) I actually prefer John's suggestion to let the application > > drive > > > > >>> the > > > > >>>>>> lag fetches/updation and not have flags as the KIP current > > points > > > > to. > > > > >>>> Are > > > > >>>>>> you reexamining that position? > > > > >>>>>> > > > > >>>>>> On fetching lag information, +1 we could do this much more > > > > >>> efficiently > > > > >>>> with > > > > >>>>>> a broker changes. Given I don't yet have a burning need for > the > > time > > > > >>>> based > > > > >>>>>> lag, I think we can sequence the APIs such that the offset > based > > > > ones > > > > >>>> are > > > > >>>>>> implemented first, while we have a broker side change? > > > > >>>>>> Given we decoupled the offset and time based lag API, I am > > willing > > > > to > > > > >>>> drop > > > > >>>>>> the time based lag functionality (since its not needed right > > away > > > > >>> for my > > > > >>>>>> use-case). @navinder . thoughts? > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax < > > > > >>> matth...@confluent.io> > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Navinder, > > > > >>>>>>> > > > > >>>>>>> thanks for updating the KIP. Couple of follow up questions: > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> (10) Why do we need to introduce the class > `KeyQueryMetadata`? > > > > >>>>>>> > > > > >>>>>>> (20) Why do we introduce the two methods > `allMetadataForKey()`? > > > > >>> Would > > > > >>>> it > > > > >>>>>>> not be simpler to add `Collection<StreamMetatdata> > > > > >>>>>>> standbyMetadataForKey(...)`. This would align with new > methods > > > > >>>>>>> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`? > > > > >>>>>>> > > > > >>>>>>> (30) Why do we need the class `StoreLagInfo` -- it seems > > simpler to > > > > >>>> just > > > > >>>>>>> extend `StreamMetadata` with the corresponding attributes and > > > > >>> methods > > > > >>>>>>> (of active task, the lag would always be reported as zero) > > > > >>>>>>> > > > > >>>>>>> (32) Via (30) we can avoid the two new methods > `#allLagInfo()` > > and > > > > >>>>>>> `#lagInfoForStore()`, too, reducing public API and making it > > > > >>> simpler to > > > > >>>>>>> use the feature. > > > > >>>>>>> > > > > >>>>>>> Btw: If we make `StreamMetadata` thread safe, the lag > > information > > > > >>> can > > > > >>>> be > > > > >>>>>>> updated in the background without the need that the > application > > > > >>>>>>> refreshes its metadata. Hence, the user can get active and/or > > > > >>> standby > > > > >>>>>>> metadata once, and only needs to refresh it, if a rebalance > > > > >>> happened. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> About point (4) of the previous thread: I was also thinking > > about > > > > >>>>>>> when/how to update the time-lag information, and I agree that > > we > > > > >>> should > > > > >>>>>>> not update it for each query. > > > > >>>>>>> > > > > >>>>>>> "How": That we need to fetch the last record is a little bit > > > > >>>>>>> unfortunate, but I don't see any other way without a broker > > change. > > > > >>> One > > > > >>>>>>> issue I still see is with "exactly-once" -- if transaction > > markers > > > > >>> are > > > > >>>>>>> in the topic, the last message is not at offset "endOffset - > > 1" and > > > > >>> as > > > > >>>>>>> multiple transaction markers might be after each other, it's > > > > unclear > > > > >>>> how > > > > >>>>>>> to identify the offset of the last record... Thoughts? > > > > >>>>>>> > > > > >>>>>>> Hence, it might be worth to look into a broker change as a > > > > potential > > > > >>>>>>> future improvement. It might be possible that the broker > > caches the > > > > >>>>>>> latest timestamp per partition to serve this data > efficiently, > > > > >>> similar > > > > >>>>>>> to `#endOffset()`. > > > > >>>>>>> > > > > >>>>>>> "When": We refresh the end-offset information based on the > > > > >>>>>>> `commit.interval.ms` -- doing it more often is not really > > useful, > > > > >>> as > > > > >>>>>>> state store caches will most likely buffer up all writes to > > > > >>> changelogs > > > > >>>>>>> anyway and are only flushed on commit (including a flush of > the > > > > >>>>>>> producer). Hence, I would suggest to update the time-lag > > > > information > > > > >>>>>>> based on the same strategy in the background. This way there > > is no > > > > >>>>>>> additional config or methods and the user does not need to > > worry > > > > >>> about > > > > >>>>>>> it at all. > > > > >>>>>>> > > > > >>>>>>> To avoid refresh overhead if we don't need it (a user might > > not use > > > > >>> IQ > > > > >>>>>>> to begin with), it might be worth to maintain an internal > flag > > > > >>>>>>> `updateTimeLagEnabled` that is set to `false` initially and > > only > > > > >>> set to > > > > >>>>>>> `true` on the first call of a user to get standby-metadata. > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> -Matthias > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On 11/4/19 5:13 PM, Vinoth Chandar wrote: > > > > >>>>>>>>>> I'm having some trouble wrapping my head around what race > > > > >>>> conditions > > > > >>>>>>>> might occur, other than the fundamentally broken state in > > which > > > > >>>>>> different > > > > >>>>>>>> instances are running totally different topologies. > > > > >>>>>>>> 3. @both Without the topic partitions that the tasks can map > > back > > > > >>> to, > > > > >>>>>> we > > > > >>>>>>>> have to rely on topology/cluster metadata in each Streams > > instance > > > > >>> to > > > > >>>>>> map > > > > >>>>>>>> the task back. If the source topics are wild carded for e,g > > then > > > > >>> each > > > > >>>>>>>> instance could have different source topics in topology, > > until the > > > > >>>> next > > > > >>>>>>>> rebalance happens. You can also read my comments from here > > > > >>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106 > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>>>> seems hard to imagine how encoding arbitrarily long topic > > names > > > > >>> plus > > > > >>>>>> an > > > > >>>>>>>> integer for the partition number could be as efficient as > task > > > > ids, > > > > >>>>>> which > > > > >>>>>>>> are just two integers. > > > > >>>>>>>> 3. if you still have concerns about the efficacy of > dictionary > > > > >>>>>> encoding, > > > > >>>>>>>> happy to engage. The link above also has some benchmark > code I > > > > >>> used. > > > > >>>>>>>> Theoretically, we would send each topic name atleast once, > so > > yes > > > > >>> if > > > > >>>>>> you > > > > >>>>>>>> compare a 10-20 character topic name + an integer to two > > integers, > > > > >>> it > > > > >>>>>>> will > > > > >>>>>>>> be more bytes. But its constant overhead proportional to > size > > of > > > > >>> topic > > > > >>>>>>> name > > > > >>>>>>>> and with 4,8,12, partitions the size difference between > > baseline > > > > >>>>>>> (version 4 > > > > >>>>>>>> where we just repeated topic names for each topic partition) > > and > > > > >>> the > > > > >>>>>> two > > > > >>>>>>>> approaches becomes narrow. > > > > >>>>>>>> > > > > >>>>>>>>>> Plus, Navinder is going to implement a bunch of protocol > > code > > > > >>> that > > > > >>>> we > > > > >>>>>>>> might just want to change when the discussion actually does > > take > > > > >>>> place, > > > > >>>>>>> if > > > > >>>>>>>> ever. > > > > >>>>>>>>>> it'll just be a mental burden for everyone to remember > that > > we > > > > >>> want > > > > >>>>>> to > > > > >>>>>>>> have this follow-up discussion. > > > > >>>>>>>> 3. Is n't people changing same parts of code and tracking > > follow > > > > >>> ups a > > > > >>>>>>>> common thing, we need to deal with anyway? For this KIP, is > > n't > > > > it > > > > >>>>>>> enough > > > > >>>>>>>> to reason about whether the additional map on top of the > topic > > > > >>>>>> dictionary > > > > >>>>>>>> would incur more overhead than the sending task_ids? I don't > > think > > > > >>>> it's > > > > >>>>>>>> case, both of them send two integers. As I see it, we can > do a > > > > >>>> separate > > > > >>>>>>>> follow up to (re)pursue the task_id conversion and get it > > working > > > > >>> for > > > > >>>>>>> both > > > > >>>>>>>> maps within the next release? > > > > >>>>>>>> > > > > >>>>>>>>>> Can you elaborate on "breaking up the API"? It looks like > > there > > > > >>> are > > > > >>>>>>>> already separate API calls in the proposal, one for > time-lag, > > and > > > > >>>>>> another > > > > >>>>>>>> for offset-lag, so are they not already broken up? > > > > >>>>>>>> The current APIs (e.g lagInfoForStore) for lags return > > > > StoreLagInfo > > > > >>>>>>> objects > > > > >>>>>>>> which has both time and offset lags. If we had separate > APIs, > > say > > > > >>> (e.g > > > > >>>>>>>> offsetLagForStore(), timeLagForStore()), we can implement > > offset > > > > >>>>>> version > > > > >>>>>>>> using the offset lag that the streams instance already > tracks > > i.e > > > > >>> no > > > > >>>>>> need > > > > >>>>>>>> for external calls. The time based lag API would incur the > > kafka > > > > >>> read > > > > >>>>>> for > > > > >>>>>>>> the timestamp. makes sense? > > > > >>>>>>>> > > > > >>>>>>>> Based on the discussions so far, I only see these two > pending > > > > >>> issues > > > > >>>> to > > > > >>>>>>> be > > > > >>>>>>>> aligned on. Is there any other open item people want to > bring > > up? > > > > >>>>>>>> > > > > >>>>>>>> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman < > > > > >>>>>> sop...@confluent.io > > > > >>>>>>>> > > > > >>>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>>>>>> Regarding 3) I'm wondering, does your concern still apply > > even > > > > now > > > > >>>>>>>>> that the pluggable PartitionGrouper interface has been > > > > deprecated? > > > > >>>>>>>>> Now that we can be sure that the DefaultPartitionGrouper is > > used > > > > >>> to > > > > >>>>>>>>> generate > > > > >>>>>>>>> the taskId -> partitions mapping, we should be able to > > convert > > > > any > > > > >>>>>>> taskId > > > > >>>>>>>>> to any > > > > >>>>>>>>> partitions. > > > > >>>>>>>>> > > > > >>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler < > > j...@confluent.io> > > > > >>>>>> wrote: > > > > >>>>>>>>> > > > > >>>>>>>>>> Hey Vinoth, thanks for the reply! > > > > >>>>>>>>>> > > > > >>>>>>>>>> 3. > > > > >>>>>>>>>> I get that it's not the main focus of this KIP, but if > it's > > ok, > > > > >>> it > > > > >>>>>>>>>> would be nice to hash out this point right now. It only > > came up > > > > >>>>>>>>>> because this KIP-535 is substantially extending the > pattern > > in > > > > >>>>>>>>>> question. If we push it off until later, then the > reviewers > > are > > > > >>>> going > > > > >>>>>>>>>> to have to suspend their concerns not just while voting > for > > the > > > > >>> KIP, > > > > >>>>>>>>>> but also while reviewing the code. Plus, Navinder is going > > to > > > > >>>>>>>>>> implement a bunch of protocol code that we might just want > > to > > > > >>> change > > > > >>>>>>>>>> when the discussion actually does take place, if ever. > > Finally, > > > > >>>> it'll > > > > >>>>>>>>>> just be a mental burden for everyone to remember that we > > want to > > > > >>>> have > > > > >>>>>>>>>> this follow-up discussion. > > > > >>>>>>>>>> > > > > >>>>>>>>>> It makes sense what you say... the specific assignment is > > > > already > > > > >>>>>>>>>> encoded in the "main" portion of the assignment, not in > the > > > > >>>>>> "userdata" > > > > >>>>>>>>>> part. It also makes sense that it's simpler to reason > about > > > > >>> races if > > > > >>>>>>>>>> you simply get all the information about the topics and > > > > >>> partitions > > > > >>>>>>>>>> directly from the assignor, rather than get the partition > > number > > > > >>>> from > > > > >>>>>>>>>> the assignor and the topic name from your own a priori > > knowledge > > > > >>> of > > > > >>>>>>>>>> the topology. On the other hand, I'm having some trouble > > > > >>> wrapping my > > > > >>>>>>>>>> head around what race conditions might occur, other than > the > > > > >>>>>>>>>> fundamentally broken state in which different instances > are > > > > >>> running > > > > >>>>>>>>>> totally different topologies. Sorry, but can you remind us > > of > > > > the > > > > >>>>>>>>>> specific condition? > > > > >>>>>>>>>> > > > > >>>>>>>>>> To the efficiency counterargument, it seems hard to > imagine > > how > > > > >>>>>>>>>> encoding arbitrarily long topic names plus an integer for > > the > > > > >>>>>>>>>> partition number could be as efficient as task ids, which > > are > > > > >>> just > > > > >>>>>> two > > > > >>>>>>>>>> integers. It seems like this would only be true if topic > > names > > > > >>> were > > > > >>>> 4 > > > > >>>>>>>>>> characters or less. > > > > >>>>>>>>>> > > > > >>>>>>>>>> 4. > > > > >>>>>>>>>> Yeah, clearly, it would not be a good idea to query the > > metadata > > > > >>>>>>>>>> before every single IQ query. I think there are plenty of > > > > >>>> established > > > > >>>>>>>>>> patterns for distributed database clients to follow. Can > you > > > > >>>>>> elaborate > > > > >>>>>>>>>> on "breaking up the API"? It looks like there are already > > > > >>> separate > > > > >>>>>> API > > > > >>>>>>>>>> calls in the proposal, one for time-lag, and another for > > > > >>> offset-lag, > > > > >>>>>>>>>> so are they not already broken up? FWIW, yes, I agree, the > > > > offset > > > > >>>> lag > > > > >>>>>>>>>> is already locally known, so we don't need to build in an > > extra > > > > >>>>>>>>>> synchronous broker API call, just one for the time-lag. > > > > >>>>>>>>>> > > > > >>>>>>>>>> Thanks again for the discussion, > > > > >>>>>>>>>> -John > > > > >>>>>>>>>> > > > > >>>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar < > > > > >>>>>> vchan...@confluent.io> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 3. Right now, we still get the topic partitions assigned > > as a > > > > >>> part > > > > >>>>>> of > > > > >>>>>>>>> the > > > > >>>>>>>>>>> top level Assignment object (the one that wraps > > AssignmentInfo) > > > > >>> and > > > > >>>>>>> use > > > > >>>>>>>>>>> that to convert taskIds back. This list of only contains > > > > >>>> assignments > > > > >>>>>>>>> for > > > > >>>>>>>>>>> that particular instance. Attempting to also reverse map > > for > > > > >>> "all" > > > > >>>>>> the > > > > >>>>>>>>>>> tasksIds in the streams cluster i.e all the topic > > partitions in > > > > >>>>>> these > > > > >>>>>>>>>>> global assignment maps was what was problematic. By > > explicitly > > > > >>>>>> sending > > > > >>>>>>>>>> the > > > > >>>>>>>>>>> global assignment maps as actual topic partitions, group > > > > >>>>>> coordinator > > > > >>>>>>>>>> (i.e > > > > >>>>>>>>>>> the leader that computes the assignment's ) is able to > > > > >>> consistently > > > > >>>>>>>>>> enforce > > > > >>>>>>>>>>> its view of the topic metadata. Still don't think doing > > such a > > > > >>>>>> change > > > > >>>>>>>>>> that > > > > >>>>>>>>>>> forces you to reconsider semantics, is not needed to save > > bits > > > > >>> on > > > > >>>>>>> wire. > > > > >>>>>>>>>> May > > > > >>>>>>>>>>> be we can discuss this separately from this KIP? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> 4. There needs to be some caching/interval somewhere > though > > > > >>> since > > > > >>>> we > > > > >>>>>>>>>> don't > > > > >>>>>>>>>>> want to make 1 kafka read per 1 IQ potentially. But I > > think its > > > > >>> a > > > > >>>>>>> valid > > > > >>>>>>>>>>> suggestion, to make this call just synchronous and leave > > the > > > > >>>> caching > > > > >>>>>>> or > > > > >>>>>>>>>> how > > > > >>>>>>>>>>> often you want to call to the application. Would it be > > good to > > > > >>> then > > > > >>>>>>>>> break > > > > >>>>>>>>>>> up the APIs for time and offset based lag? We can obtain > > > > offset > > > > >>>>>> based > > > > >>>>>>>>>> lag > > > > >>>>>>>>>>> for free? Only incur the overhead of reading kafka if we > > want > > > > >>> time > > > > >>>>>>>>>>> based lags? > > > > >>>>>>>>>>> > > > > >>>>>>>>>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman < > > > > >>>>>>>>> sop...@confluent.io> > > > > >>>>>>>>>>> wrote: > > > > >>>>>>>>>>> > > > > >>>>>>>>>>>> Adding on to John's response to 3), can you clarify when > > and > > > > >>> why > > > > >>>>>>>>>> exactly we > > > > >>>>>>>>>>>> cannot > > > > >>>>>>>>>>>> convert between taskIds and partitions? If that's really > > the > > > > >>> case > > > > >>>> I > > > > >>>>>>>>>> don't > > > > >>>>>>>>>>>> feel confident > > > > >>>>>>>>>>>> that the StreamsPartitionAssignor is not full of bugs... > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> It seems like it currently just encodes a list of all > > > > >>> partitions > > > > >>>>>> (the > > > > >>>>>>>>>>>> assignment) and also > > > > >>>>>>>>>>>> a list of the corresponding task ids, duplicated to > ensure > > > > each > > > > >>>>>>>>>> partition > > > > >>>>>>>>>>>> has the corresponding > > > > >>>>>>>>>>>> taskId at the same offset into the list. Why is that > > > > >>> problematic? > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler < > > > > >>> j...@confluent.io> > > > > >>>>>>>>>> wrote: > > > > >>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks, all, for considering the points! > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 3. Interesting. I have a vague recollection of that... > > Still, > > > > >>>>>>>>> though, > > > > >>>>>>>>>>>>> it seems a little fishy. After all, we return the > > assignments > > > > >>>>>>>>>>>>> themselves as task ids, and the members have to map > > these to > > > > >>>> topic > > > > >>>>>>>>>>>>> partitions in order to configure themselves properly. > If > > it's > > > > >>> too > > > > >>>>>>>>>>>>> complicated to get this right, then how do we know that > > > > >>> Streams > > > > >>>> is > > > > >>>>>>>>>>>>> computing the correct partitions at all? > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> 4. How about just checking the log-end timestamp when > you > > > > call > > > > >>>> the > > > > >>>>>>>>>>>>> method? Then, when you get an answer, it's as fresh as > it > > > > >>> could > > > > >>>>>>>>>>>>> possibly be. And as a user you have just one, obvious, > > "knob" > > > > >>> to > > > > >>>>>>>>>>>>> configure how much overhead you want to devote to > > checking... > > > > >>> If > > > > >>>>>>>>> you > > > > >>>>>>>>>>>>> want to call the broker API less frequently, you just > > call > > > > the > > > > >>>>>>>>>> Streams > > > > >>>>>>>>>>>>> API less frequently. And you don't have to worry about > > the > > > > >>>>>>>>>>>>> relationship between your invocations of that method > and > > the > > > > >>>>>> config > > > > >>>>>>>>>>>>> setting (e.g., you'll never get a negative number, > which > > you > > > > >>>> could > > > > >>>>>>>>> if > > > > >>>>>>>>>>>>> you check the log-end timestamp less frequently than > you > > > > check > > > > >>>> the > > > > >>>>>>>>>>>>> lag). > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> Thanks, > > > > >>>>>>>>>>>>> -John > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar > > > > >>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks John for going through this. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> - +1, makes sense > > > > >>>>>>>>>>>>>> - +1, no issues there > > > > >>>>>>>>>>>>>> - Yeah the initial patch I had submitted for > K-7149( > > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/6935) to reduce > > > > >>>>>>>>> assignmentInfo > > > > >>>>>>>>>>>>> object had taskIds but the merged PR had similar size > > > > >>> according > > > > >>>> to > > > > >>>>>>>>>> Vinoth > > > > >>>>>>>>>>>>> and it was simpler so if the end result is of same > size, > > it > > > > >>> would > > > > >>>>>>>>> not > > > > >>>>>>>>>>>> make > > > > >>>>>>>>>>>>> sense to pivot from dictionary and again move to > taskIDs. > > > > >>>>>>>>>>>>>> - Not sure about what a good default would be if we > > don't > > > > >>>>>>>>> have a > > > > >>>>>>>>>>>>> configurable setting. This gives the users the > > flexibility to > > > > >>> the > > > > >>>>>>>>>> users > > > > >>>>>>>>>>>> to > > > > >>>>>>>>>>>>> serve their requirements as at the end of the day it > > would > > > > >>> take > > > > >>>>>> CPU > > > > >>>>>>>>>>>> cycles. > > > > >>>>>>>>>>>>> I am ok with starting it with a default and see how it > > goes > > > > >>> based > > > > >>>>>>>>>> upon > > > > >>>>>>>>>>>>> feedback. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> Thanks, > > > > >>>>>>>>>>>>>> Navinder > > > > >>>>>>>>>>>>>> On Friday, 1 November, 2019, 03:46:42 am IST, > Vinoth > > > > >>> Chandar > > > > >>>>>>>>> < > > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 1. Was trying to spell them out separately. but makes > > sense > > > > >>>> for > > > > >>>>>>>>>>>>>> readability. done > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 2. No I immediately agree :) .. makes sense. > @navinder? > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 3. I actually attempted only sending taskIds while > > working > > > > on > > > > >>>>>>>>>>>> KAFKA-7149. > > > > >>>>>>>>>>>>>> Its non-trivial to handle edges cases resulting from > > newly > > > > >>> added > > > > >>>>>>>>>> topic > > > > >>>>>>>>>>>>>> partitions and wildcarded topic entries. I ended up > > > > >>> simplifying > > > > >>>>>>>>> it > > > > >>>>>>>>>> to > > > > >>>>>>>>>>>>> just > > > > >>>>>>>>>>>>>> dictionary encoding the topic names to reduce size. We > > can > > > > >>> apply > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>> same > > > > >>>>>>>>>>>>>> technique here for this map. Additionally, we could > also > > > > >>>>>>>>> dictionary > > > > >>>>>>>>>>>>> encode > > > > >>>>>>>>>>>>>> HostInfo, given its now repeated twice. I think this > > would > > > > >>> save > > > > >>>>>>>>>> more > > > > >>>>>>>>>>>>> space > > > > >>>>>>>>>>>>>> than having a flag per topic partition entry. Lmk if > > you are > > > > >>>> okay > > > > >>>>>>>>>> with > > > > >>>>>>>>>>>>>> this. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> 4. This opens up a good discussion. Given we support > > time > > > > lag > > > > >>>>>>>>>> estimates > > > > >>>>>>>>>>>>>> also, we need to read the tail record of the changelog > > > > >>>>>>>>> periodically > > > > >>>>>>>>>>>>> (unlike > > > > >>>>>>>>>>>>>> offset lag, which we can potentially piggyback on > > metadata > > > > in > > > > >>>>>>>>>>>>>> ConsumerRecord IIUC). we thought we should have a > config > > > > that > > > > >>>>>>>>>> control > > > > >>>>>>>>>>>> how > > > > >>>>>>>>>>>>>> often this read happens? Let me know if there is a > > simple > > > > >>> way to > > > > >>>>>>>>>> get > > > > >>>>>>>>>>>>>> timestamp value of the tail record that we are > missing. > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler < > > > > >>>> j...@confluent.io > > > > >>>>>>>>>> > > > > >>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Hey Navinder, > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Thanks for updating the KIP, it's a lot easier to see > > the > > > > >>>>>>>>> current > > > > >>>>>>>>>>>>>>> state of the proposal now. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> A few remarks: > > > > >>>>>>>>>>>>>>> 1. I'm sure it was just an artifact of revisions, but > > you > > > > >>> have > > > > >>>>>>>>>> two > > > > >>>>>>>>>>>>>>> separate sections where you list additions to the > > > > >>> KafkaStreams > > > > >>>>>>>>>>>>>>> interface. Can you consolidate those so we can see > all > > the > > > > >>>>>>>>>> additions > > > > >>>>>>>>>>>>>>> at once? > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 2. For messageLagEstimate, can I suggest > > > > "offsetLagEstimate" > > > > >>>>>>>>>> instead, > > > > >>>>>>>>>>>>>>> to be clearer that we're specifically measuring a > > number of > > > > >>>>>>>>>> offsets? > > > > >>>>>>>>>>>>>>> If you don't immediately agree, then I'd at least > > point out > > > > >>>>>>>>> that > > > > >>>>>>>>>> we > > > > >>>>>>>>>>>>>>> usually refer to elements of Kafka topics as > > "records", not > > > > >>>>>>>>>>>>>>> "messages", so "recordLagEstimate" might be more > > > > >>> appropriate. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 3. The proposal mentions adding a map of the standby > > > > >>>>>>>>>> _partitions_ for > > > > >>>>>>>>>>>>>>> each host to AssignmentInfo. I assume this is > designed > > to > > > > >>>>>>>>> mirror > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>> existing "partitionsByHost" map. To keep the size of > > these > > > > >>>>>>>>>> metadata > > > > >>>>>>>>>>>>>>> messages down, maybe we can consider making two > > changes: > > > > >>>>>>>>>>>>>>> (a) for both actives and standbys, encode the _task > > ids_ > > > > >>>>>>>>> instead > > > > >>>>>>>>>> of > > > > >>>>>>>>>>>>>>> _partitions_. Every member of the cluster has a copy > > of the > > > > >>>>>>>>>> topology, > > > > >>>>>>>>>>>>>>> so they can convert task ids into specific partitions > > on > > > > >>> their > > > > >>>>>>>>>> own, > > > > >>>>>>>>>>>>>>> and task ids are only (usually) three characters. > > > > >>>>>>>>>>>>>>> (b) instead of encoding two maps (hostinfo -> actives > > AND > > > > >>>>>>>>>> hostinfo -> > > > > >>>>>>>>>>>>>>> standbys), which requires serializing all the > hostinfos > > > > >>> twice, > > > > >>>>>>>>>> maybe > > > > >>>>>>>>>>>>>>> we can pack them together in one map with a > structured > > > > value > > > > >>>>>>>>>>>> (hostinfo > > > > >>>>>>>>>>>>>>> -> [actives,standbys]). > > > > >>>>>>>>>>>>>>> Both of these ideas still require bumping the > protocol > > > > >>> version > > > > >>>>>>>>>> to 6, > > > > >>>>>>>>>>>>>>> and they basically mean we drop the existing > > > > >>> `PartitionsByHost` > > > > >>>>>>>>>> field > > > > >>>>>>>>>>>>>>> and add a new `TasksByHost` field with the structured > > value > > > > >>> I > > > > >>>>>>>>>>>>>>> mentioned. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> 4. Can we avoid adding the new "lag refresh" config? > > The > > > > >>> lags > > > > >>>>>>>>>> would > > > > >>>>>>>>>>>>>>> necessarily be approximate anyway, so adding the > config > > > > >>> seems > > > > >>>>>>>>> to > > > > >>>>>>>>>>>>>>> increase the operational complexity of the system for > > > > little > > > > >>>>>>>>>> actual > > > > >>>>>>>>>>>>>>> benefit. > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> Thanks for the pseudocode, by the way, it really > helps > > > > >>>>>>>>> visualize > > > > >>>>>>>>>> how > > > > >>>>>>>>>>>>>>> these new interfaces would play together. And thanks > > again > > > > >>> for > > > > >>>>>>>>>> the > > > > >>>>>>>>>>>>>>> update! > > > > >>>>>>>>>>>>>>> -John > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler < > > > > >>>>>>>>> j...@confluent.io> > > > > >>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Hey Vinoth, > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> I started going over the KIP again yesterday. There > > are a > > > > >>> lot > > > > >>>>>>>>>> of > > > > >>>>>>>>>>>>>>>> updates, and I didn't finish my feedback in one day. > > I'm > > > > >>>>>>>>>> working on > > > > >>>>>>>>>>>>> it > > > > >>>>>>>>>>>>>>>> now. > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> Thanks, > > > > >>>>>>>>>>>>>>>> John > > > > >>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar < > > > > >>>>>>>>>>>>> vchan...@confluent.io> > > > > >>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Wondering if anyone has thoughts on these changes? > I > > > > liked > > > > >>>>>>>>>> that > > > > >>>>>>>>>>>>> the new > > > > >>>>>>>>>>>>>>>>> metadata fetch APIs provide all the information at > > once > > > > >>>>>>>>> with > > > > >>>>>>>>>>>>> consistent > > > > >>>>>>>>>>>>>>>>> naming.. > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> Any guidance on what you would like to be discussed > > or > > > > >>>>>>>>>> fleshed > > > > >>>>>>>>>>>> out > > > > >>>>>>>>>>>>> more > > > > >>>>>>>>>>>>>>>>> before we call a VOTE? > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar > > > > >>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Hi, > > > > >>>>>>>>>>>>>>>>>> We have made some edits in the KIP( > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > >>>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > > > > >>>>>>>>>>>>>>> ) > > > > >>>>>>>>>>>>>>>>>> after due deliberation on the agreed design to > > support > > > > >>>>>>>>> the > > > > >>>>>>>>>> new > > > > >>>>>>>>>>>>> query > > > > >>>>>>>>>>>>>>>>>> design. This includes the new public API to query > > > > >>>>>>>>>> offset/time > > > > >>>>>>>>>>>> lag > > > > >>>>>>>>>>>>>>>>>> information and other details related to querying > > > > standby > > > > >>>>>>>>>> tasks > > > > >>>>>>>>>>>>>>> which have > > > > >>>>>>>>>>>>>>>>>> come up after thinking of thorough details. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> - Addition of new config, “ > lag.fetch.interval.ms” > > to > > > > >>>>>>>>>>>>> configure > > > > >>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>> interval of time/offset lag > > > > >>>>>>>>>>>>>>>>>> - Addition of new class StoreLagInfo to store > the > > > > >>>>>>>>>>>> periodically > > > > >>>>>>>>>>>>>>> obtained > > > > >>>>>>>>>>>>>>>>>> time/offset lag > > > > >>>>>>>>>>>>>>>>>> - Addition of two new functions in > KafkaStreams, > > > > >>>>>>>>>>>>>>> List<StoreLagInfo> > > > > >>>>>>>>>>>>>>>>>> allLagInfo() and List<StoreLagInfo> > > > > >>>>>>>>> lagInfoForStore(String > > > > >>>>>>>>>>>>>>> storeName) to > > > > >>>>>>>>>>>>>>>>>> return the lag information for an instance and a > > store > > > > >>>>>>>>>>>>> respectively > > > > >>>>>>>>>>>>>>>>>> - Addition of new class KeyQueryMetadata. We > need > > > > >>>>>>>>>>>>> topicPartition > > > > >>>>>>>>>>>>>>> for > > > > >>>>>>>>>>>>>>>>>> each key to be matched with the lag API for the > > topic > > > > >>>>>>>>>>>> partition. > > > > >>>>>>>>>>>>> One > > > > >>>>>>>>>>>>>>> way is > > > > >>>>>>>>>>>>>>>>>> to add new functions and fetch topicPartition from > > > > >>>>>>>>>>>>>>> StreamsMetadataState but > > > > >>>>>>>>>>>>>>>>>> we thought having one call and fetching > > StreamsMetadata > > > > >>>>>>>>> and > > > > >>>>>>>>>>>>>>> topicPartition > > > > >>>>>>>>>>>>>>>>>> is more cleaner. > > > > >>>>>>>>>>>>>>>>>> - > > > > >>>>>>>>>>>>>>>>>> Renaming partitionsForHost to > > activePartitionsForHost in > > > > >>>>>>>>>>>>>>> StreamsMetadataState > > > > >>>>>>>>>>>>>>>>>> and partitionsByHostState to > > activePartitionsByHostState > > > > >>>>>>>>>>>>>>>>>> in StreamsPartitionAssignor > > > > >>>>>>>>>>>>>>>>>> - We have also added the pseudo code of how all > > the > > > > >>>>>>>>>> changes > > > > >>>>>>>>>>>>> will > > > > >>>>>>>>>>>>>>> exist > > > > >>>>>>>>>>>>>>>>>> together and support the new querying APIs > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Please let me know if anything is pending now, > > before a > > > > >>>>>>>>>> vote > > > > >>>>>>>>>>>> can > > > > >>>>>>>>>>>>> be > > > > >>>>>>>>>>>>>>>>>> started on this. On Saturday, 26 October, 2019, > > > > 05:41:44 > > > > >>>>>>>>>> pm > > > > >>>>>>>>>>>> IST, > > > > >>>>>>>>>>>>>>> Navinder > > > > >>>>>>>>>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> >> Since there are two soft votes for separate > > > > >>>>>>>>>> active/standby > > > > >>>>>>>>>>>>> API > > > > >>>>>>>>>>>>>>>>>> methods, I also change my position on that. Fine > > with 2 > > > > >>>>>>>>>>>> separate > > > > >>>>>>>>>>>>>>>>>> methods. Once we remove the lag information from > > these > > > > >>>>>>>>>> APIs, > > > > >>>>>>>>>>>>>>> returning a > > > > >>>>>>>>>>>>>>>>>> List is less attractive, since the ordering has no > > > > >>>>>>>>> special > > > > >>>>>>>>>>>>> meaning > > > > >>>>>>>>>>>>>>> now. > > > > >>>>>>>>>>>>>>>>>> Agreed, now that we are not returning lag, I am > also > > > > sold > > > > >>>>>>>>>> on > > > > >>>>>>>>>>>>> having > > > > >>>>>>>>>>>>>>> two > > > > >>>>>>>>>>>>>>>>>> separate functions. We already have one which > > returns > > > > >>>>>>>>>>>>>>> streamsMetadata for > > > > >>>>>>>>>>>>>>>>>> active tasks, and now we can add another one for > > > > >>>>>>>>> standbys. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> On Saturday, 26 October, 2019, 03:55:16 am IST, > > > > Vinoth > > > > >>>>>>>>>>>>> Chandar < > > > > >>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote: > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> +1 to Sophie's suggestion. Having both lag in > > terms of > > > > >>>>>>>>>> time > > > > >>>>>>>>>>>> and > > > > >>>>>>>>>>>>>>> offsets is > > > > >>>>>>>>>>>>>>>>>> good and makes for a more complete API. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Since there are two soft votes for separate > > > > >>>>>>>>> active/standby > > > > >>>>>>>>>> API > > > > >>>>>>>>>>>>>>> methods, I > > > > >>>>>>>>>>>>>>>>>> also change my position on that. Fine with 2 > > separate > > > > >>>>>>>>>> methods. > > > > >>>>>>>>>>>>>>>>>> Once we remove the lag information from these > APIs, > > > > >>>>>>>>>> returning a > > > > >>>>>>>>>>>>> List > > > > >>>>>>>>>>>>>>> is > > > > >>>>>>>>>>>>>>>>>> less attractive, since the ordering has no special > > > > >>>>>>>>> meaning > > > > >>>>>>>>>> now. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> lag in offsets vs time: Having both, as > suggested > > by > > > > >>>>>>>>>> Sophie > > > > >>>>>>>>>>>>> would > > > > >>>>>>>>>>>>>>> of > > > > >>>>>>>>>>>>>>>>>> course be best. What is a little unclear to me is, > > how > > > > in > > > > >>>>>>>>>>>> details > > > > >>>>>>>>>>>>>>> are we > > > > >>>>>>>>>>>>>>>>>> going to compute both? > > > > >>>>>>>>>>>>>>>>>> @navinder may be next step is to flesh out these > > details > > > > >>>>>>>>>> and > > > > >>>>>>>>>>>>> surface > > > > >>>>>>>>>>>>>>> any > > > > >>>>>>>>>>>>>>>>>> larger changes we need to make if need be. > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> Any other details we need to cover, before a VOTE > > can be > > > > >>>>>>>>>> called > > > > >>>>>>>>>>>>> on > > > > >>>>>>>>>>>>>>> this? > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck < > > > > >>>>>>>>>> bbej...@gmail.com > > > > >>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>> wrote: > > > > >>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> I am jumping in a little late here. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Overall I agree with the proposal to push > decision > > > > >>>>>>>>>> making on > > > > >>>>>>>>>>>>>>> what/how to > > > > >>>>>>>>>>>>>>>>>>> query in the query layer. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> For point 5 from above, I'm slightly in favor of > > having > > > > >>>>>>>>>> a new > > > > >>>>>>>>>>>>>>> method, > > > > >>>>>>>>>>>>>>>>>>> "standbyMetadataForKey()" or something similar. > > > > >>>>>>>>>>>>>>>>>>> Because even if we return all tasks in one list, > > the > > > > >>>>>>>>> user > > > > >>>>>>>>>>>> will > > > > >>>>>>>>>>>>>>> still have > > > > >>>>>>>>>>>>>>>>>>> to perform some filtering to separate the > different > > > > >>>>>>>>>> tasks, > > > > >>>>>>>>>>>> so I > > > > >>>>>>>>>>>>>>> don't > > > > >>>>>>>>>>>>>>>>>> feel > > > > >>>>>>>>>>>>>>>>>>> making two calls is a burden, and IMHO makes > things > > > > >>>>>>>>> more > > > > >>>>>>>>>>>>>>> transparent for > > > > >>>>>>>>>>>>>>>>>>> the user. > > > > >>>>>>>>>>>>>>>>>>> If the final vote is for using an "isActive" > > field, I'm > > > > >>>>>>>>>> good > > > > >>>>>>>>>>>>> with > > > > >>>>>>>>>>>>>>> that as > > > > >>>>>>>>>>>>>>>>>>> well. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> Just my 2 cents. > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar > > > > >>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: > > > > >>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> I think now we are aligned on almost all the > > design > > > > >>>>>>>>>> parts. > > > > >>>>>>>>>>>>>>> Summarising > > > > >>>>>>>>>>>>>>>>>>>> below what has been discussed above and we have > a > > > > >>>>>>>>>> general > > > > >>>>>>>>>>>>>>> consensus on. > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>>>>>>>> - Rather than broadcasting lag across all > > nodes at > > > > >>>>>>>>>>>>>>> rebalancing/with > > > > >>>>>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> heartbeat, we will just return a list of all > > > > >>>>>>>>> available > > > > >>>>>>>>>>>>> standby’s > > > > >>>>>>>>>>>>>>> in the > > > > >>>>>>>>>>>>>>>>>>>> system and the user can make IQ query any of > those > > > > >>>>>>>>>> nodes > > > > >>>>>>>>>>>>> which > > > > >>>>>>>>>>>>>>> will > > > > >>>>>>>>>>>>>>>>>>> return > > > > >>>>>>>>>>>>>>>>>>>> the response, and the lag and offset time. Based > > on > > > > >>>>>>>>>> which > > > > >>>>>>>>>>>>> user > > > > >>>>>>>>>>>>>>> can > > > > >>>>>>>>>>>>>>>>>> decide > > > > >>>>>>>>>>>>>>>>>>>> if he wants to return the response back or call > > > > >>>>>>>>> another > > > > >>>>>>>>>>>>> standby. > > > > >>>>>>>>>>>>>>>>>>>> - The current metadata query frequency will > > not > > > > >>>>>>>>>> change. > > > > >>>>>>>>>>>>> It > > > > >>>>>>>>>>>>>>> will be > > > > >>>>>>>>>>>>>>>>>>> the > > > > >>>>>>>>>>>>>>>>>>>> same as it does now, i.e. before each query. > > > > >>> > -- -- Guozhang