10/20: I think I'm aligned with John's replies as well.

Guozhang

On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar <vchan...@confluent.io>
wrote:

> >during restoring state the active might have some lag
>
> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
> lag as well then. If active is too laggy, the app can then deem the store
> partition unavailable (based on what the application is willing to
> tolerate).
>
> @matthias do you agree? We can then begin the vote.
>
> On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
> <navinder_b...@yahoo.com.invalid> wrote:
>
> > I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
> > actives don't have 0 lag, as during restoring state the active might have
> > some lag and one of the features of this KIP is to provide an option to
> > query from active (which might be in restoring state).
> > I will update the KIP with rejected alternatives and post this will start
> > a vote if everyone agrees on this.
> >     On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
> > j...@confluent.io> wrote:
> >
> >  Hi all,
> >
> > Thanks for the "reset", Vinoth. It brings some needed clarity to the
> > discussion.
> >
> > 10. My 2 cents: we might as well include the lags for the active
> > copies as well. This is a more context-free API. If we only include
> > standbys, this choice won't make sense to users unless they understand
> > that the active task cannot lag in the steady state, since it's the
> > source of updates. This isn't a bad thing to realize, but it's just
> > more mental overhead for the person who wants to list the lags for
> > "all local stores".
> >
> > Another reason is that we could consider also reporting the lag for
> > actives during recovery (when they would have non-zero lag). We don't
> > have to now, but if we choose to call the method "standby lags", then
> > we can't make this choice in the future.
> >
> > That said, it's just my opinion. I'm fine either way.
> >
> > 20. Vinoth's reply makes sense to me, fwiw.
> >
> > Beyond these two points, I'm happy with the current proposal.
> >
> > Thanks again,
> > -John
> >
> > On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar <vchan...@confluent.io>
> > wrote:
> > >
> > > 10. I considered that. Had to pick one or the other. Can just return
> > > standby too and rename method to may be “allLocalStandbyOffsetLags()”
> to
> > > have it explicit. (Standby should implicitly convey that we are talking
> > > about stores)
> > >
> > > 20. What I meant was, we are returning HostInfo instead of
> > StreamsMetadata
> > > since thats sufficient to route query; same for “int partition “ vs
> topic
> > > partition before. Previously KeyQueryMetadata had similar structure but
> > > used StreamsMetadata and TopicPartition objects to convey same
> > information
> > >
> > > @navinder KIP is already upto date with the email I sent, except for
> the
> > > reasonings I was laying out. +1 on revisiting rejected alternatives.
> > > Please make the follow up changes
> > >
> > > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > > > Thanks for the summary Vinoth!
> > > >
> > > > I buy the overall argument. Couple of clarification questions:
> > > >
> > > >
> > > > 10. Why do we need to include the active stores in
> > > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return
> lag
> > > > for standbys?
> > > >
> > > >
> > > > 20: What does
> > > >
> > > > > Thin the KeyQueryMetadata object to just contain the minimum
> > information
> > > > > needed.
> > > >
> > > > exaclty mean? What is the "minimum information needed" ?
> > > >
> > > >
> > > > @Navinder: if you agree, can you update the KIP accoringly? With all
> > the
> > > > proposals, it's hard to keep track and it would be great to have the
> > > > current proposal summarized in the wiki page.
> > > >
> > > > Please also update the "Rejected alternative" sections to avoid that
> we
> > > > cycle back to old proposal (including the reason _why_ they got
> > rejected).
> > > >
> > > >
> > > > Thanks a lot!
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > > > Given we have had a healthy discussion on this topic for a month
> now
> > and
> > > > > still with many loose ends and open ended conversations, I thought
> It
> > > > would
> > > > > be worthwhile to take a step back and re-evaluate everything in the
> > > > context
> > > > > of the very real use-case and its specific scenarios.
> > > > >
> > > > > First, let's remind ourselves of the query routing flow of the
> > streams
> > > > > application ("app" here on)
> > > > >
> > > > >    1. queries get routed to any random streams instance in the
> > cluster
> > > > >    ("router" here on)
> > > > >    2. router then uses Streams metadata to pick active/standby
> > instances
> > > > >    for that key's store/partition
> > > > >    3. router instance also maintains global lag information for all
> > > > stores
> > > > >    and all their partitions, by a gossip/broadcast/heartbeat
> protocol
> > > > (done
> > > > >    outside of Streams framework), but using
> > KafkaStreams#allMetadata()
> > > > for
> > > > >    streams instance discovery.
> > > > >    4. router then uses information in 2 & 3 to determine which
> > instance
> > > > to
> > > > >    send the query to  : always picks active instance if alive or
> the
> > most
> > > > >    in-sync live standby otherwise.
> > > > >
> > > > > Few things to note :
> > > > >
> > > > > A) We choose to decouple how the lag information is obtained
> (control
> > > > > plane) from query path (data plane), since that provides more
> > flexibility
> > > > > in designing the control plane. i.e pick any or combination of
> > gossip,
> > > > > N-way broadcast, control the rate of propagation, piggybacking on
> > request
> > > > > responses
> > > > > B) Since the app needs to do its own control plane, talking to
> other
> > > > > instances directly for failure detection & exchanging other
> > metadata, we
> > > > > can leave the lag APIs added to KafkaStreams class itself local and
> > > > simply
> > > > > return lag for all store/partitions on that instance.
> > > > > C) Streams preserves its existing behavior of instances only
> talking
> > to
> > > > > each other through the Kafka brokers.
> > > > > D) Since the router treats active/standby differently, it would be
> > good
> > > > for
> > > > > the KafkaStreams APIs to hand them back explicitly, with no
> > additional
> > > > > logic needed for computing them. Specifically, the router only
> knows
> > two
> > > > > things - key and store and if we just return a
> > > > Collection<StreamsMetadata>
> > > > > back, it cannot easily tease apart active and standby. Say, a
> streams
> > > > > instance hosts the same store as both active and standby for
> > different
> > > > > partitions, matching by just storename the app will find it in both
> > > > active
> > > > > and standby lists.
> > > > > E) From above, we assume the global lag estimate (lag per store
> topic
> > > > > partition) are continuously reported amongst application instances
> > and
> > > > > already available on the router during step 2 above. Hence,
> > attaching lag
> > > > > APIs to StreamsMetadata is unnecessary and does not solve the needs
> > > > anyway.
> > > > > F) Currently returned StreamsMetadata object is really information
> > about
> > > > a
> > > > > streams instance, that is not very specific to the key being
> queried.
> > > > > Specifically, router has no knowledge of the topic partition a
> given
> > key
> > > > > belongs, this is needed for matching to the global lag information
> in
> > > > step
> > > > > 4 above (and as also the example code in the KIP showed before).
> The
> > > > > StreamsMetadata, since it's about the instance itself, would
> contain
> > all
> > > > > topic partitions and stores on that instance, not specific to the
> > given
> > > > key.
> > > > > G) A cleaner API would thin the amount of information returned to
> > > > > specifically the given key and store - which is partition the key
> > belongs
> > > > > to, active host info, list of standby host info. KeyQueryMetadata
> was
> > > > > proposed in this spirit, but still hung onto StreamsMetadata as the
> > > > member
> > > > > field(s) which leaks more than what's required to route.
> > > > > H) For this use case, its sufficient to just support offsetLag
> > initially,
> > > > > without needing time based lag information right away.  So we
> > probably
> > > > > don't need a new top level StoreLagInfo class anymore.
> > > > > I) On whether the local lag information is cached within Streams or
> > App
> > > > > code, its again preferable and be necessary for the App to be able
> to
> > > > > invoke the lag api from an app thread and keep a cached state in
> > memory,
> > > > to
> > > > > be piggybacked with request responses. If this information is also
> > cached
> > > > > inside Streams, it does no harm anyway.
> > > > >
> > > > > Based on this reasoning, I have made the following updates to the
> KIP
> > > > >
> > > > > - Drop the proposed KafkaStreams#allStandbyMetadata() and
> > > > > KafkaStreams#allStandbyMetadataForStore() methods, with intent of
> > > > > introducing minimal APIs to support the usage described above
> > > > > - Drop the KafkaStreams#lagInfoForStore() method and just stick to
> > one
> > > > > method that returns all the local store partition's lags.
> > > > > - Rename allLagInfo() to allLocalStoreOffsetLags() to be explicit
> > that
> > > > the
> > > > > lags are local and only on offsets.
> > > > > - Drop StoreLagInfo class
> > > > > - allLocalStoreOffsetLags() returns Map<String, Map<Integer,
> Long>>,
> > > > which
> > > > > maps a store name to a map containing partition to offset lag info.
> > It
> > > > will
> > > > > include both active and standby store partitions. active always has
> > 0 lag
> > > > > since its caught up with changelog topic.
> > > > > - Thin the KeyQueryMetadata object to just contain the minimum
> > > > information
> > > > > needed. Rename allMetadataForKey() method variants to
> > > > queryMetadataForKey()
> > > > > variants that return KeyQueryMetadata
> > > > > - Propose deprecating two current methods that return metadata
> based
> > on
> > > > key
> > > > > :  KafkaStreams#metadataForKey(..), to get more users to use the
> new
> > > > > queryMetadataForKey APIs
> > > > > - We will still have to enhance StreamsMetadata with fields for
> > > > > standbyTopicPartitions and standbyStateStoreNames, since that is a
> > core
> > > > > object that gets updated upon rebalance.
> > > > >
> > > > >
> > > > > Please let us know if this is agreeable.  We will also work some of
> > this
> > > > > discussion into the background/proposed changes sections, upon
> > feedback.
> > > > >
> > > > >
> > > > > On Tue, Nov 12, 2019 at 9:17 AM Vinoth Chandar <
> > vchan...@confluent.io>
> > > > > wrote:
> > > > >
> > > > >> In all, is everyone OK with
> > > > >>
> > > > >>  - Dropping KeyQueryMetadata, and the allMetadataForKey() apis
> > > > >>  - Dropping allLagInfo() from KafkaStreams class, Drop
> StoreLagInfo
> > > > class
> > > > >>  - Add offsetLag(store, key, serializer) -> Optional<Long> &
> > > > >> offsetLag(store, key, partitioner) -> Optional<Long> to
> > StreamsMetadata
> > > > >>  - Duplicate the current methods for standbyMetadata in
> > KafkaStreams :
> > > > >> allStandbyMetadata(), allStandbyMetadataForStore(), two variants
> of
> > > > >> standbyMetadataForKey(),
> > > > >>
> > > > >>
> > > > >> Responses to Guozhang:
> > > > >>
> > > > >> 1.1 Like I mentioned before, the allStandbyMetadata() and
> > > > >> allStandbyMetadataForStore() complement existing allMetadata() and
> > > > >> allMetadataForStore(), since we don't want to change behavior of
> > > > existing
> > > > >> APIs. Based on discussions so far, if we decide to drop
> > > > KeyQueryMetadata,
> > > > >> then we will need to introduce 4 equivalents for standby metadata
> as
> > > > >> Matthias mentioned.
> > > > >> 1.2 I am okay with pushing lag information to a method on
> > > > StreamsMetadata
> > > > >> (Personally, I won't design it like that, but happy to live with
> it)
> > > > like
> > > > >> what Matthias suggested. But assuming topic name <=> store name
> > > > equivalency
> > > > >> for mapping this seems like a broken API to me. If all of Streams
> > code
> > > > were
> > > > >> written like this, I can understand. But I don't think its the
> > case? I
> > > > >> would not be comfortable making such assumptions outside of public
> > APIs.
> > > > >>>> look into each one's standby partition / stores to tell which
> one
> > > > >> StreamsMetadata is corresponding to the instance who holds a
> > specific
> > > > key
> > > > >> as standby, yes, but I feel this one extra iteration is worth to
> > avoid
> > > > >> introducing a new class.
> > > > >> This sort of thing would lead to non-standardized/potentially
> buggy
> > > > client
> > > > >> implementations, for something I expect the system would hand me
> > > > directly.
> > > > >> I don't personally feel introducing a new class is so bad, to
> > warrant
> > > > the
> > > > >> user to do all this matching. Given the current APIs are not
> > explicitly
> > > > >> named to denote active metadata, it gives us a chance to build
> > something
> > > > >> more direct and clear IMO. If we do allMetadataForKey apis, then
> we
> > > > should
> > > > >> clearly separate active and standby ourselves. Alternate is
> separate
> > > > active
> > > > >> and standby APIs as Matthias suggests, which I can make peace
> with.
> > > > >>
> > > > >> 1.3 Similar as above. In Streams code, we treat topic partitions
> and
> > > > store
> > > > >> names separately?.
> > > > >> 2.1 I think most databases build replication using logical
> offsets,
> > not
> > > > >> time. Time lag can be a nice to have feature, but offset lag is
> > fully
> > > > >> sufficient for a lot of use-cases.
> > > > >> 2.2.1 We could support a lagInfoForStores() batch api. makes
> sense.
> > > > >>
> > > > >>
> > > > >> Responses to Matthias :
> > > > >> (100) Streams can still keep the upto date version in memory and
> > > > >> implementation could be for now just reading this already
> refreshed
> > > > value.
> > > > >> Designing the API, with intent  of pushing this to the user keeps
> > doors
> > > > >> open for supporting time based lag in the future.
> > > > >> (101) I am not sure what the parameters of evaluating approaches
> > here
> > > > is.
> > > > >> Generally, when I am handed a Metadata object, I don't expect to
> > further
> > > > >> query it for more information semantically. I would not also force
> > user
> > > > to
> > > > >> make separate calls for active and standby metadata.
> > > > >> Well, that may be just me. So sure, we can push this into
> > > > StreamsMetadata
> > > > >> if everyone agrees!
> > > > >> +1 on duplicating all 4 methods for standbys in this case.
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Tue, Nov 12, 2019 at 4:12 AM Navinder Brar
> > > > >> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>
> > > > >>>
> > > > >>>    - Looking back, I agree that 2 calls to StreamsMetadata to
> fetch
> > > > >>> StreamsMetadata and then using something like ‘long
> > > > >>> StreamsMetadata#offsetLag(store, key)’ which Matthias suggested
> is
> > > > better
> > > > >>> than introducing a new public API i.e. ‘KeyQueryMetadata’. I will
> > > > change
> > > > >>> the KIP accordingly.
> > > > >>>    - >> I am actually not even sure, why we added
> > > > >>> `StreamsMetadata#topicPartitions()` originally
> > > > >>> I think it is helpful in showing which host holds which source
> > topic
> > > > >>> partitions for /instances endpoint and when you query a key, you
> > need
> > > > to
> > > > >>> match the partition on which the key belongs to with the hosts
> > holding
> > > > >>> source topic partitions for that partition. Is there any other
> way
> > to
> > > > get
> > > > >>> this info?
> > > > >>>
> > > > >>>    On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang
> <
> > > > >>> wangg...@gmail.com> wrote:
> > > > >>>
> > > > >>>  Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams
> > > > instances, so
> > > > >>> 1) allMetadata would still return the same number of
> > StreamsMetadata in
> > > > >>> collection, just that within the StreamsMetadata now you have new
> > APIs
> > > > to
> > > > >>> access standby partitions / stores. So I think it would not be a
> > > > breaking
> > > > >>> change to the public API to not include `allStandbyMetadata` and
> `
> > > > >>> allStandbyMetadataForStore` but still rely on
> > `allMetadata(ForStore)`?
> > > > >>>
> > > > >>> Regarding 1.1: Good point about the partition number. But I'm
> still
> > > > >>> wondering if it is definitely necessary to introduce a new
> > > > >>> `KeyQueryMetadata`
> > > > >>> interface class. E.g. suppose our function signature is
> > > > >>>
> > > > >>> Collection<StreamsMetadata> allMetadataForKey
> > > > >>>
> > > > >>> When you get the collection of StreamsMetadata you need to
> iterate
> > over
> > > > >>> the
> > > > >>> collection and look into each one's standby partition / stores to
> > tell
> > > > >>> which one StreamsMetadata is corresponding to the instance who
> > holds a
> > > > >>> specific key as standby, yes, but I feel this one extra iteration
> > is
> > > > worth
> > > > >>> to avoid introducing a new class.
> > > > >>>
> > > > >>>
> > > > >>> Guozhang
> > > > >>>
> > > > >>> On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax <
> > matth...@confluent.io
> > > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> I agree, that we might want to drop the time-base lag for the
> > initial
> > > > >>>> implementation. There is no good way to get this information
> > without a
> > > > >>>> broker side change.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> (100) For the offset lag information, I don't see a reason why
> > the app
> > > > >>>> should drive when this information is updated though, because KS
> > will
> > > > >>>> update this information anyway (once per `commit.interval.ms`
> --
> > and
> > > > >>>> updating it more frequently does not make sense, as it most
> likely
> > > > won't
> > > > >>>> change more frequently anyway).
> > > > >>>>
> > > > >>>> If you all insist that the app should drive it, I can live with
> > it,
> > > > but
> > > > >>>> I think it makes the API unnecessarily complex without a
> benefit.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> (101) I still don't understand why we need to have
> > `KeyQueryMetadata`
> > > > >>>> though. Note, that an instance can only report lag for it's
> local
> > > > >>>> stores, but not remote stores as it does not know to what
> offset a
> > > > >>>> remote standby has caught up to.
> > > > >>>>
> > > > >>>>> Because we needed to return the topicPartition the key belongs
> > to, in
> > > > >>>>> order to correlate with the lag information from the other set
> of
> > > > >>> APIs.
> > > > >>>>
> > > > >>>> My suggestion is to get the lag information from
> > `StreamsMetadata` --
> > > > >>>> which partition the store belongs to can be completely
> > encapsulated
> > > > >>>> within KS as all information is local, and I don't think we need
> > to
> > > > >>>> expose it to the user at all.
> > > > >>>>
> > > > >>>> We can just add `long StreamsMetadata#offsetLag(store, key)`. If
> > the
> > > > >>>> store is local we return its lag, if it's remote we return `-1`
> > (ie,
> > > > >>>> UNKNOWN). As an alternative, we can change the return type to
> > > > >>>> `Optional<Long>`. This works for active and standby task alike.
> > > > >>>>
> > > > >>>> Note, that a user must verify if `StreamsMeatadata` is for
> itself
> > > > >>>> (local) or remote anyway. We only need to provide a way that
> > allows
> > > > >>>> users to distinguish between active an standby. (More below.)
> > > > >>>>
> > > > >>>>
> > > > >>>> I am actually not even sure, why we added
> > > > >>>> `StreamsMetadata#topicPartitions()` originally -- seems pretty
> > > > useless.
> > > > >>>> Can we deprecate it as side cleanup in this KIP? Or do I miss
> > > > something?
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> (102)
> > > > >>>>
> > > > >>>>> There are basically 2 reasons. One is that instead of having
> two
> > > > >>>> functions, one to get StreamsMetadata for active and one for
> > replicas.
> > > > >>> We
> > > > >>>> are fetching both in a single call and we have a way to get only
> > > > active
> > > > >>> or
> > > > >>>> only replicas from the KeyQueryMetadata object(just like
> > isStandby()
> > > > and
> > > > >>>> isActive() discussion we had earlier)
> > > > >>>>
> > > > >>>> I understand, that we need two methods. However, I think we can
> > > > simplify
> > > > >>>> the API and not introduce `KeyQueryMetadata`, but just
> "duplicate"
> > > > all 4
> > > > >>>> existing methods for standby tasks:
> > > > >>>>
> > > > >>>> // note that `standbyMetadataForKey` return a Collection in
> > contrast
> > > > to
> > > > >>>> existing `metadataForKey`
> > > > >>>>
> > > > >>>>>  Collection<StreamsMetadata> allStandbyMetadata()
> > > > >>>>>  Collection<StreamsMetadata> allStandbyMetadataForStore(String
> > > > >>>> storeName)
> > > > >>>>>  Collection<StreamsMetadata> metadataForKey(String storeName, K
> > key,
> > > > >>>> Serializer<K> keySerializer)
> > > > >>>>>  Collection<StreamsMetadata> metadataForKey(String storeName, K
> > key,
> > > > >>>> StreamPartitioner<? super K, ?> partitioner)
> > > > >>>>
> > > > >>>> Because the existing methods return all active metadata, there
> is
> > no
> > > > >>>> reason to return `KeyQueryMetadata` as it's more complicated to
> > get
> > > > the
> > > > >>>> standby metadata. With `KeyQueryMetadata` the user needs to make
> > more
> > > > >>>> calls to get the metadata:
> > > > >>>>
> > > > >>>>  KafkaStreams#allMetadataForKey()
> > > > >>>>              #getActive()
> > > > >>>>
> > > > >>>>  KafkaStreams#allMetadataForKey()
> > > > >>>>              #getStandby()
> > > > >>>>
> > > > >>>> vs:
> > > > >>>>
> > > > >>>>  KafkaStreams#metadataForKey()
> > > > >>>>
> > > > >>>>  KafkaStreams#standbyMetadataForKey()
> > > > >>>>
> > > > >>>> The wrapping of both within `KeyQueryMetadata` does not seem to
> > > > provide
> > > > >>>> any benefit but increase our public API surface.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> @Guozhang:
> > > > >>>>
> > > > >>>> (1.1. + 1.2.) From my understanding `allMetadata()` (and other
> > > > existing
> > > > >>>> methods) will only return the metadata of _active_ tasks for
> > backward
> > > > >>>> compatibility reasons. If we would return standby metadata,
> > existing
> > > > >>>> code would potentially "break" because the code might pick a
> > standby
> > > > to
> > > > >>>> query a key without noticing.
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> -Matthias
> > > > >>>>
> > > > >>>>
> > > > >>>> On 11/8/19 6:07 AM, Navinder Brar wrote:
> > > > >>>>> Thanks, Guozhang for going through it again.
> > > > >>>>>
> > > > >>>>>    - 1.1 & 1.2: The main point of adding topicPartition in
> > > > >>>> KeyQueryMetadata is not topicName, but the partition number. I
> > agree
> > > > >>>> changelog topicNames and store names will have 1-1 mapping but
> we
> > also
> > > > >>> need
> > > > >>>> the partition number of the changelog for which are calculating
> > the
> > > > lag.
> > > > >>>> Now we can add partition number in StreamsMetadata but it will
> be
> > > > >>>> orthogonal to the definition of StreamsMetadata i.e.-
> “Represents
> > the
> > > > >>> state
> > > > >>>> of an instance (process) in a {@link KafkaStreams} application.”
> > If
> > > > we
> > > > >>> add
> > > > >>>> partition number in this, it doesn’t stay metadata for an
> > instance,
> > > > >>> because
> > > > >>>> now it is storing the partition information for a key being
> > queried.
> > > > So,
> > > > >>>> having “KeyQueryMetadata” simplifies this as now it contains all
> > the
> > > > >>>> metadata and also changelog and partition information for which
> we
> > > > need
> > > > >>> to
> > > > >>>> calculate the lag.
> > > > >>>>>
> > > > >>>>> Another way is having another function in parallel to
> > metadataForKey,
> > > > >>>> which returns the partition number for the key being queried.
> But
> > then
> > > > >>> we
> > > > >>>> would need 2 calls to StreamsMetadataState, once to fetch
> > metadata and
> > > > >>>> another to fetch partition number. Let me know if any of these
> two
> > > > ways
> > > > >>>> seem more intuitive than KeyQueryMetadata then we can try to
> > converge
> > > > on
> > > > >>>> one.
> > > > >>>>>    - 1.3:  Again, it is required for the partition number. We
> can
> > > > drop
> > > > >>>> store name though.
> > > > >>>>>    - 2.1: I think this was done in accordance with the opinion
> > from
> > > > >>> John
> > > > >>>> as time lag would be better implemented with a broker level
> > change and
> > > > >>>> offset change is readily implementable. @vinoth?
> > > > >>>>>    - 2.2.1: Good point.  +1
> > > > >>>>>    - 2.2.2: I am not well aware of it, @vinoth any comments?
> > > > >>>>>    - 3.1: I think we have already agreed on dropping this, we
> > need to
> > > > >>>> KIP. Also, is there any opinion on lagInfoForStore(String
> > storeName)
> > > > vs
> > > > >>>> lagInfoForStore(String storeName, int partition)
> > > > >>>>>    - 3.2: But in functions such as onAssignment(),
> > > > >>>> onPartitionsAssigned(), for standbyTasks also the
> topicPartitions
> > we
> > > > use
> > > > >>>> are input topic partitions and not changelog partitions. Would
> > this be
> > > > >>>> breaking from that semantics?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>    On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang
> Wang
> > <
> > > > >>>> wangg...@gmail.com> wrote:
> > > > >>>>>
> > > > >>>>>  Hi Navinder, Vinoth, thanks for the updated KIP!
> > > > >>>>>
> > > > >>>>> Read through the discussions so far and made another pass on
> the
> > wiki
> > > > >>>> page,
> > > > >>>>> and here are some more comments:
> > > > >>>>>
> > > > >>>>> 1. About the public APIs:
> > > > >>>>>
> > > > >>>>> 1.1. It is not clear to me how allStandbyMetadataForStore
> > > > >>>>> and allStandbyMetadata would be differentiated from the
> original
> > APIs
> > > > >>>> given
> > > > >>>>> that we will augment StreamsMetadata to include both active and
> > > > >>> standby
> > > > >>>>> topic-partitions and store names, so I think we can still use
> > > > >>> allMetadata
> > > > >>>>> and allMetadataForStore to get the collection of instance
> > metadata
> > > > >>> that
> > > > >>>>> host the store both as active and standbys. Are there any
> > specific
> > > > use
> > > > >>>>> cases where we ONLY want to get the standby's metadata? And
> even
> > if
> > > > >>> there
> > > > >>>>> are, we can easily filter it out from the allMetadata /
> > > > >>>> allMetadataForStore
> > > > >>>>> right?
> > > > >>>>>
> > > > >>>>> 1.2. Similarly I'm wondering for allMetadataForKey, can we
> > return the
> > > > >>>> same
> > > > >>>>> type: "Collection<StreamsMetadata>" which includes 1 for
> active,
> > and
> > > > >>> N-1
> > > > >>>>> for standbys, and callers can easily identify them by looking
> > inside
> > > > >>> the
> > > > >>>>> StreamsMetadata objects? In addition I feel the
> "topicPartition"
> > > > field
> > > > >>>>> inside "KeyQueryMetadata" is not very important since the
> > changelog
> > > > >>>>> topic-name is always 1-1 mapping to the store name, so as long
> > as the
> > > > >>>> store
> > > > >>>>> name matches, the changelog topic name should always match
> (i.e.
> > in
> > > > >>>>> the pseudo code, just checking store names should be
> > sufficient). If
> > > > >>> all
> > > > >>>> of
> > > > >>>>> the above assumption is true, I think we can save us from
> > introducing
> > > > >>> one
> > > > >>>>> more public class here.
> > > > >>>>>
> > > > >>>>> 1.3. Similarly in StoreLagInfo, seems not necessary to include
> > the
> > > > >>> topic
> > > > >>>>> partition name in addition to the store name.
> > > > >>>>>
> > > > >>>>> 2. About querying store lags: we've discussed about separating
> > the
> > > > >>>> querying
> > > > >>>>> of the lag information and the querying of the host information
> > so I
> > > > >>>> still
> > > > >>>>> support having separate APIs here. More thoughts:
> > > > >>>>>
> > > > >>>>> 2.1. Compared with offsets, I'm wondering would time-difference
> > be
> > > > >>> more
> > > > >>>>> intuitive for users to define the acceptable "staleness"? More
> > > > >>> strictly,
> > > > >>>>> are there any scenarios where we would actually prefer offsets
> > over
> > > > >>>>> timestamps except that the timestamps are not available?
> > > > >>>>>
> > > > >>>>> 2.2. I'm also a bit leaning towards not putting the burden of
> > > > >>>> periodically
> > > > >>>>> refreshing our lag and caching it (and introducing another
> > config) on
> > > > >>> the
> > > > >>>>> streams side but document clearly its cost and let users to
> > consider
> > > > >>> its
> > > > >>>>> call frequency; of course in terms of implementation there are
> > some
> > > > >>>>> optimizations we can consider:
> > > > >>>>>
> > > > >>>>> 1) for restoring active tasks, the log-end-offset is read once
> > since
> > > > >>> it
> > > > >>>> is
> > > > >>>>> not expected to change, and that offset / timestamp can be
> > remembered
> > > > >>> for
> > > > >>>>> lag calculation and we do not need to refresh again;
> > > > >>>>> 2) for standby tasks,  there's a "Map<TopicPartition, Long>
> > > > >>>>> endOffsets(Collection<TopicPartition> partitions)" in
> > KafkaConsumer
> > > > to
> > > > >>>>> batch a list of topic-partitions in one round-trip, and we can
> > use
> > > > >>> that
> > > > >>>> to
> > > > >>>>> let our APIs be sth. like "lagInfoForStores(Collection<String>
> > > > >>>> storeNames)"
> > > > >>>>> to enable the batching effects.
> > > > >>>>>
> > > > >>>>> 3. Misc.:
> > > > >>>>>
> > > > >>>>> 3.1 There's a typo on the pseudo code "globalLagInforation".
> > Also it
> > > > >>>> seems
> > > > >>>>> not describing how that information is collected (personally I
> > also
> > > > >>> feel
> > > > >>>>> one "lagInfoForStores" is sufficient).
> > > > >>>>> 3.2 Note there's a slight semantical difference between active
> > and
> > > > >>>>> standby's "partitions" inside StreamsMetadata, for active tasks
> > the
> > > > >>>>> partitions are actually input topic partitions for the task:
> > some of
> > > > >>> them
> > > > >>>>> may also act as changelog topics but these are exceptional
> > cases; for
> > > > >>>>> standby tasks the "standbyTopicPartitions" are actually the
> > changelog
> > > > >>>>> topics of the task. So maybe renaming it to
> > > > >>> "standbyChangelogPartitions"
> > > > >>>> to
> > > > >>>>> differentiate it?
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Overall I think this would be a really good KIP to add to
> > Streams,
> > > > >>> thank
> > > > >>>>> you so much!
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> Guozhang
> > > > >>>>>
> > > > >>>>> On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar
> > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>
> > > > >>>>>> +1 on implementing offset based lag for now and push
> time-based
> > lag
> > > > >>> to a
> > > > >>>>>> later point in time when broker changes are done. Although
> > > > time-based
> > > > >>>> lag
> > > > >>>>>> enhances the readability, it would not be a make or break
> > change for
> > > > >>>>>> implementing this KIP.
> > > > >>>>>>
> > > > >>>>>> Vinoth has explained the role of KeyQueryMetadata, let me in
> > add in
> > > > >>> my 2
> > > > >>>>>> cents as well.
> > > > >>>>>>    - There are basically 2 reasons. One is that instead of
> > having
> > > > two
> > > > >>>>>> functions, one to get StreamsMetadata for active and one for
> > > > >>> replicas.
> > > > >>>> We
> > > > >>>>>> are fetching both in a single call and we have a way to get
> only
> > > > >>> active
> > > > >>>> or
> > > > >>>>>> only replicas from the KeyQueryMetadata object(just like
> > isStandby()
> > > > >>> and
> > > > >>>>>> isActive() discussion we had earlier)
> > > > >>>>>>    - Since even after fetching the metadata now we have a
> > > > requirement
> > > > >>>> of
> > > > >>>>>> fetching the topicPartition for which the query came:- to
> fetch
> > lag
> > > > >>> for
> > > > >>>>>> that specific topicPartition. Instead of having another call
> to
> > > > fetch
> > > > >>>> the
> > > > >>>>>> partition from StreamsMetadataState we thought using one
> single
> > call
> > > > >>> and
> > > > >>>>>> fetching partition and all metadata would be better.
> > > > >>>>>>    - Another option was to change StreamsMetadata object and
> add
> > > > >>>>>> topicPartition in that for which the query came but it doesn’t
> > make
> > > > >>>> sense
> > > > >>>>>> in terms of semantics as it StreamsMetadata. Also,
> > KeyQueryMetadata
> > > > >>>>>> represents all the metadata for the Key being queried, i.e.
> the
> > > > >>>> partition
> > > > >>>>>> it belongs to and the list of StreamsMetadata(hosts) active or
> > > > >>> replica
> > > > >>>>>> where the key could be found.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>    On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth
> > Chandar <
> > > > >>>>>> vchan...@confluent.io> wrote:
> > > > >>>>>>
> > > > >>>>>>  +1 to John, suggestion on Duration/Instant and dropping the
> > API to
> > > > >>>> fetch
> > > > >>>>>> all store's lags. However, I do think we need to return lags
> per
> > > > >>> topic
> > > > >>>>>> partition. So not sure if single return value would work? We
> > need
> > > > >>> some
> > > > >>>> new
> > > > >>>>>> class that holds a TopicPartition and Duration/Instant
> variables
> > > > >>>> together?
> > > > >>>>>>
> > > > >>>>>> 10) Because we needed to return the topicPartition the key
> > belongs
> > > > >>> to,
> > > > >>>> in
> > > > >>>>>> order to correlate with the lag information from the other set
> > of
> > > > >>> APIs.
> > > > >>>>>> Otherwise, we don't know which topic partition's lag estimate
> to
> > > > >>> use. We
> > > > >>>>>> tried to illustrate this on the example code. StreamsMetadata
> is
> > > > >>> simply
> > > > >>>>>> capturing state of a streams host/instance, where as
> > TopicPartition
> > > > >>>> depends
> > > > >>>>>> on the key passed in. This is a side effect of our decision to
> > > > >>> decouple
> > > > >>>> lag
> > > > >>>>>> based filtering on the metadata apis.
> > > > >>>>>>
> > > > >>>>>> 20) Goes back to the previous point. We needed to return
> > information
> > > > >>>> that
> > > > >>>>>> is key specific, at which point it seemed natural for the
> > > > >>>> KeyQueryMetadata
> > > > >>>>>> to contain active, standby, topic partition for that key. If
> we
> > > > >>> merely
> > > > >>>>>> returned a standbyMetadataForKey() ->
> > Collection<StreamsMetadata>
> > > > >>>> standby,
> > > > >>>>>> an active metadataForKey() -> StreamsMetadata, and new
> > > > >>>>>> getTopicPartition(key) -> topicPartition object back to the
> > caller,
> > > > >>> then
> > > > >>>>>> arguably you could do the same kind of correlation. IMO having
> > a the
> > > > >>>>>> KeyQueryMetadata class to encapsulate all this is a friendlier
> > API.
> > > > >>>>>>  allStandbyMetadata() and allStandbyMetadataForStore() are
> just
> > > > >>> counter
> > > > >>>>>> parts for metadataForStore() and allMetadata() that we
> introduce
> > > > >>> mostly
> > > > >>>> for
> > > > >>>>>> consistent API semantics. (their presence implicitly could
> help
> > > > >>> denote
> > > > >>>>>> metadataForStore() is for active instances. Happy to drop them
> > if
> > > > >>> their
> > > > >>>>>> utility is not clear)
> > > > >>>>>>
> > > > >>>>>> 30) This would assume we refresh all the standby lag
> information
> > > > >>> every
> > > > >>>>>> time we query for that StreamsMetadata for a specific store?
> For
> > > > time
> > > > >>>> based
> > > > >>>>>> lag, this will involve fetching the tail kafka record at once
> > from
> > > > >>>> multiple
> > > > >>>>>> kafka topic partitions? I would prefer not to couple them like
> > this
> > > > >>> and
> > > > >>>>>> have the ability to make granular store (or even topic
> partition
> > > > >>> level)
> > > > >>>>>> fetches for lag information.
> > > > >>>>>>
> > > > >>>>>> 32) I actually prefer John's suggestion to let the application
> > drive
> > > > >>> the
> > > > >>>>>> lag fetches/updation and not have flags as the KIP current
> > points
> > > > to.
> > > > >>>> Are
> > > > >>>>>> you reexamining that position?
> > > > >>>>>>
> > > > >>>>>> On fetching lag information, +1 we could do this much more
> > > > >>> efficiently
> > > > >>>> with
> > > > >>>>>> a broker changes. Given I don't yet have a burning need for
> the
> > time
> > > > >>>> based
> > > > >>>>>> lag, I think we can sequence the APIs such that the offset
> based
> > > > ones
> > > > >>>> are
> > > > >>>>>> implemented first, while we have a broker side change?
> > > > >>>>>> Given we decoupled the offset and time based lag API, I am
> > willing
> > > > to
> > > > >>>> drop
> > > > >>>>>> the time based lag functionality (since its not needed right
> > away
> > > > >>> for my
> > > > >>>>>> use-case). @navinder . thoughts?
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax <
> > > > >>> matth...@confluent.io>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Navinder,
> > > > >>>>>>>
> > > > >>>>>>> thanks for updating the KIP. Couple of follow up questions:
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> (10) Why do we need to introduce the class
> `KeyQueryMetadata`?
> > > > >>>>>>>
> > > > >>>>>>> (20) Why do we introduce the two methods
> `allMetadataForKey()`?
> > > > >>> Would
> > > > >>>> it
> > > > >>>>>>> not be simpler to add `Collection<StreamMetatdata>
> > > > >>>>>>> standbyMetadataForKey(...)`. This would align with new
> methods
> > > > >>>>>>> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`?
> > > > >>>>>>>
> > > > >>>>>>> (30) Why do we need the class `StoreLagInfo` -- it seems
> > simpler to
> > > > >>>> just
> > > > >>>>>>> extend `StreamMetadata` with the corresponding attributes and
> > > > >>> methods
> > > > >>>>>>> (of active task, the lag would always be reported as zero)
> > > > >>>>>>>
> > > > >>>>>>> (32) Via (30) we can avoid the two new methods
> `#allLagInfo()`
> > and
> > > > >>>>>>> `#lagInfoForStore()`, too, reducing public API and making it
> > > > >>> simpler to
> > > > >>>>>>> use the feature.
> > > > >>>>>>>
> > > > >>>>>>> Btw: If we make `StreamMetadata` thread safe, the lag
> > information
> > > > >>> can
> > > > >>>> be
> > > > >>>>>>> updated in the background without the need that the
> application
> > > > >>>>>>> refreshes its metadata. Hence, the user can get active and/or
> > > > >>> standby
> > > > >>>>>>> metadata once, and only needs to refresh it, if a rebalance
> > > > >>> happened.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> About point (4) of the previous thread: I was also thinking
> > about
> > > > >>>>>>> when/how to update the time-lag information, and I agree that
> > we
> > > > >>> should
> > > > >>>>>>> not update it for each query.
> > > > >>>>>>>
> > > > >>>>>>> "How": That we need to fetch the last record is a little bit
> > > > >>>>>>> unfortunate, but I don't see any other way without a broker
> > change.
> > > > >>> One
> > > > >>>>>>> issue I still see is with "exactly-once" -- if transaction
> > markers
> > > > >>> are
> > > > >>>>>>> in the topic, the last message is not at offset "endOffset -
> > 1" and
> > > > >>> as
> > > > >>>>>>> multiple transaction markers might be after each other, it's
> > > > unclear
> > > > >>>> how
> > > > >>>>>>> to identify the offset of the last record... Thoughts?
> > > > >>>>>>>
> > > > >>>>>>> Hence, it might be worth to look into a broker change as a
> > > > potential
> > > > >>>>>>> future improvement. It might be possible that the broker
> > caches the
> > > > >>>>>>> latest timestamp per partition to serve this data
> efficiently,
> > > > >>> similar
> > > > >>>>>>> to `#endOffset()`.
> > > > >>>>>>>
> > > > >>>>>>> "When": We refresh the end-offset information based on the
> > > > >>>>>>> `commit.interval.ms` -- doing it more often is not really
> > useful,
> > > > >>> as
> > > > >>>>>>> state store caches will most likely buffer up all writes to
> > > > >>> changelogs
> > > > >>>>>>> anyway and are only flushed on commit (including a flush of
> the
> > > > >>>>>>> producer). Hence, I would suggest to update the time-lag
> > > > information
> > > > >>>>>>> based on the same strategy in the background. This way there
> > is no
> > > > >>>>>>> additional config or methods and the user does not need to
> > worry
> > > > >>> about
> > > > >>>>>>> it at all.
> > > > >>>>>>>
> > > > >>>>>>> To avoid refresh overhead if we don't need it (a user might
> > not use
> > > > >>> IQ
> > > > >>>>>>> to begin with), it might be worth to maintain an internal
> flag
> > > > >>>>>>> `updateTimeLagEnabled` that is set to `false` initially and
> > only
> > > > >>> set to
> > > > >>>>>>> `true` on the first call of a user to get standby-metadata.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> -Matthias
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On 11/4/19 5:13 PM, Vinoth Chandar wrote:
> > > > >>>>>>>>>>  I'm having some trouble wrapping my head around what race
> > > > >>>> conditions
> > > > >>>>>>>> might occur, other than the fundamentally broken state in
> > which
> > > > >>>>>> different
> > > > >>>>>>>> instances are running totally different topologies.
> > > > >>>>>>>> 3. @both Without the topic partitions that the tasks can map
> > back
> > > > >>> to,
> > > > >>>>>> we
> > > > >>>>>>>> have to rely on topology/cluster metadata in each Streams
> > instance
> > > > >>> to
> > > > >>>>>> map
> > > > >>>>>>>> the task back. If the source topics are wild carded for e,g
> > then
> > > > >>> each
> > > > >>>>>>>> instance could have different source topics in topology,
> > until the
> > > > >>>> next
> > > > >>>>>>>> rebalance happens. You can also read my comments from here
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > >
> >
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>>> seems hard to imagine how encoding arbitrarily long topic
> > names
> > > > >>> plus
> > > > >>>>>> an
> > > > >>>>>>>> integer for the partition number could be as efficient as
> task
> > > > ids,
> > > > >>>>>> which
> > > > >>>>>>>> are just two integers.
> > > > >>>>>>>> 3. if you still have concerns about the efficacy of
> dictionary
> > > > >>>>>> encoding,
> > > > >>>>>>>> happy to engage. The link above also has some benchmark
> code I
> > > > >>> used.
> > > > >>>>>>>> Theoretically, we would send each topic name atleast once,
> so
> > yes
> > > > >>> if
> > > > >>>>>> you
> > > > >>>>>>>> compare a 10-20 character topic name + an integer to two
> > integers,
> > > > >>> it
> > > > >>>>>>> will
> > > > >>>>>>>> be more bytes. But its constant overhead proportional to
> size
> > of
> > > > >>> topic
> > > > >>>>>>> name
> > > > >>>>>>>> and with 4,8,12, partitions the size difference between
> > baseline
> > > > >>>>>>> (version 4
> > > > >>>>>>>> where we just repeated topic names for each topic partition)
> > and
> > > > >>> the
> > > > >>>>>> two
> > > > >>>>>>>> approaches becomes narrow.
> > > > >>>>>>>>
> > > > >>>>>>>>>> Plus, Navinder is going to implement a bunch of protocol
> > code
> > > > >>> that
> > > > >>>> we
> > > > >>>>>>>> might just want to change when the discussion actually does
> > take
> > > > >>>> place,
> > > > >>>>>>> if
> > > > >>>>>>>> ever.
> > > > >>>>>>>>>> it'll just be a mental burden for everyone to remember
> that
> > we
> > > > >>> want
> > > > >>>>>> to
> > > > >>>>>>>> have this follow-up discussion.
> > > > >>>>>>>> 3. Is n't people changing same parts of code and tracking
> > follow
> > > > >>> ups a
> > > > >>>>>>>> common thing, we need to deal with anyway?  For this KIP, is
> > n't
> > > > it
> > > > >>>>>>> enough
> > > > >>>>>>>> to reason about whether the additional map on top of the
> topic
> > > > >>>>>> dictionary
> > > > >>>>>>>> would incur more overhead than the sending task_ids? I don't
> > think
> > > > >>>> it's
> > > > >>>>>>>> case, both of them send two integers. As I see it, we can
> do a
> > > > >>>> separate
> > > > >>>>>>>> follow up to (re)pursue the task_id conversion and get it
> > working
> > > > >>> for
> > > > >>>>>>> both
> > > > >>>>>>>> maps within the next release?
> > > > >>>>>>>>
> > > > >>>>>>>>>> Can you elaborate on "breaking up the API"? It looks like
> > there
> > > > >>> are
> > > > >>>>>>>> already separate API calls in the proposal, one for
> time-lag,
> > and
> > > > >>>>>> another
> > > > >>>>>>>> for offset-lag, so are they not already broken up?
> > > > >>>>>>>> The current APIs (e.g lagInfoForStore) for lags return
> > > > StoreLagInfo
> > > > >>>>>>> objects
> > > > >>>>>>>> which has both time and offset lags. If we had separate
> APIs,
> > say
> > > > >>> (e.g
> > > > >>>>>>>> offsetLagForStore(), timeLagForStore()), we can implement
> > offset
> > > > >>>>>> version
> > > > >>>>>>>> using the offset lag that the streams instance already
> tracks
> > i.e
> > > > >>> no
> > > > >>>>>> need
> > > > >>>>>>>> for external calls. The time based lag API would incur the
> > kafka
> > > > >>> read
> > > > >>>>>> for
> > > > >>>>>>>> the timestamp. makes sense?
> > > > >>>>>>>>
> > > > >>>>>>>> Based on the discussions so far, I only see these two
> pending
> > > > >>> issues
> > > > >>>> to
> > > > >>>>>>> be
> > > > >>>>>>>> aligned on. Is there any other open item people want to
> bring
> > up?
> > > > >>>>>>>>
> > > > >>>>>>>> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman <
> > > > >>>>>> sop...@confluent.io
> > > > >>>>>>>>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> Regarding 3) I'm wondering, does your concern still apply
> > even
> > > > now
> > > > >>>>>>>>> that the pluggable PartitionGrouper interface has been
> > > > deprecated?
> > > > >>>>>>>>> Now that we can be sure that the DefaultPartitionGrouper is
> > used
> > > > >>> to
> > > > >>>>>>>>> generate
> > > > >>>>>>>>> the taskId -> partitions mapping, we should be able to
> > convert
> > > > any
> > > > >>>>>>> taskId
> > > > >>>>>>>>> to any
> > > > >>>>>>>>> partitions.
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <
> > j...@confluent.io>
> > > > >>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hey Vinoth, thanks for the reply!
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 3.
> > > > >>>>>>>>>> I get that it's not the main focus of this KIP, but if
> it's
> > ok,
> > > > >>> it
> > > > >>>>>>>>>> would be nice to hash out this point right now. It only
> > came up
> > > > >>>>>>>>>> because this KIP-535 is substantially extending the
> pattern
> > in
> > > > >>>>>>>>>> question. If we push it off until later, then the
> reviewers
> > are
> > > > >>>> going
> > > > >>>>>>>>>> to have to suspend their concerns not just while voting
> for
> > the
> > > > >>> KIP,
> > > > >>>>>>>>>> but also while reviewing the code. Plus, Navinder is going
> > to
> > > > >>>>>>>>>> implement a bunch of protocol code that we might just want
> > to
> > > > >>> change
> > > > >>>>>>>>>> when the discussion actually does take place, if ever.
> > Finally,
> > > > >>>> it'll
> > > > >>>>>>>>>> just be a mental burden for everyone to remember that we
> > want to
> > > > >>>> have
> > > > >>>>>>>>>> this follow-up discussion.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> It makes sense what you say... the specific assignment is
> > > > already
> > > > >>>>>>>>>> encoded in the "main" portion of the assignment, not in
> the
> > > > >>>>>> "userdata"
> > > > >>>>>>>>>> part. It also makes sense that it's simpler to reason
> about
> > > > >>> races if
> > > > >>>>>>>>>> you simply get all the information about the topics and
> > > > >>> partitions
> > > > >>>>>>>>>> directly from the assignor, rather than get the partition
> > number
> > > > >>>> from
> > > > >>>>>>>>>> the assignor and the topic name from your own a priori
> > knowledge
> > > > >>> of
> > > > >>>>>>>>>> the topology. On the other hand, I'm having some trouble
> > > > >>> wrapping my
> > > > >>>>>>>>>> head around what race conditions might occur, other than
> the
> > > > >>>>>>>>>> fundamentally broken state in which different instances
> are
> > > > >>> running
> > > > >>>>>>>>>> totally different topologies. Sorry, but can you remind us
> > of
> > > > the
> > > > >>>>>>>>>> specific condition?
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> To the efficiency counterargument, it seems hard to
> imagine
> > how
> > > > >>>>>>>>>> encoding arbitrarily long topic names plus an integer for
> > the
> > > > >>>>>>>>>> partition number could be as efficient as task ids, which
> > are
> > > > >>> just
> > > > >>>>>> two
> > > > >>>>>>>>>> integers. It seems like this would only be true if topic
> > names
> > > > >>> were
> > > > >>>> 4
> > > > >>>>>>>>>> characters or less.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> 4.
> > > > >>>>>>>>>> Yeah, clearly, it would not be a good idea to query the
> > metadata
> > > > >>>>>>>>>> before every single IQ query. I think there are plenty of
> > > > >>>> established
> > > > >>>>>>>>>> patterns for distributed database clients to follow. Can
> you
> > > > >>>>>> elaborate
> > > > >>>>>>>>>> on "breaking up the API"? It looks like there are already
> > > > >>> separate
> > > > >>>>>> API
> > > > >>>>>>>>>> calls in the proposal, one for time-lag, and another for
> > > > >>> offset-lag,
> > > > >>>>>>>>>> so are they not already broken up? FWIW, yes, I agree, the
> > > > offset
> > > > >>>> lag
> > > > >>>>>>>>>> is already locally known, so we don't need to build in an
> > extra
> > > > >>>>>>>>>> synchronous broker API call, just one for the time-lag.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Thanks again for the discussion,
> > > > >>>>>>>>>> -John
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar <
> > > > >>>>>> vchan...@confluent.io>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 3. Right now, we still get the topic partitions assigned
> > as a
> > > > >>> part
> > > > >>>>>> of
> > > > >>>>>>>>> the
> > > > >>>>>>>>>>> top level Assignment object (the one that wraps
> > AssignmentInfo)
> > > > >>> and
> > > > >>>>>>> use
> > > > >>>>>>>>>>> that to convert taskIds back. This list of only contains
> > > > >>>> assignments
> > > > >>>>>>>>> for
> > > > >>>>>>>>>>> that particular instance. Attempting to also reverse map
> > for
> > > > >>> "all"
> > > > >>>>>> the
> > > > >>>>>>>>>>> tasksIds in the streams cluster i.e all the topic
> > partitions in
> > > > >>>>>> these
> > > > >>>>>>>>>>> global assignment maps was what was problematic. By
> > explicitly
> > > > >>>>>> sending
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>> global assignment maps as actual topic partitions,  group
> > > > >>>>>> coordinator
> > > > >>>>>>>>>> (i.e
> > > > >>>>>>>>>>> the leader that computes the assignment's ) is able to
> > > > >>> consistently
> > > > >>>>>>>>>> enforce
> > > > >>>>>>>>>>> its view of the topic metadata. Still don't think doing
> > such a
> > > > >>>>>> change
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>> forces you to reconsider semantics, is not needed to save
> > bits
> > > > >>> on
> > > > >>>>>>> wire.
> > > > >>>>>>>>>> May
> > > > >>>>>>>>>>> be we can discuss this separately from this KIP?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 4. There needs to be some caching/interval somewhere
> though
> > > > >>> since
> > > > >>>> we
> > > > >>>>>>>>>> don't
> > > > >>>>>>>>>>> want to make 1 kafka read per 1 IQ potentially. But I
> > think its
> > > > >>> a
> > > > >>>>>>> valid
> > > > >>>>>>>>>>> suggestion, to make this call just synchronous and leave
> > the
> > > > >>>> caching
> > > > >>>>>>> or
> > > > >>>>>>>>>> how
> > > > >>>>>>>>>>> often you want to call to the application. Would it be
> > good to
> > > > >>> then
> > > > >>>>>>>>> break
> > > > >>>>>>>>>>> up the APIs for time and offset based lag?  We can obtain
> > > > offset
> > > > >>>>>> based
> > > > >>>>>>>>>> lag
> > > > >>>>>>>>>>> for free? Only incur the overhead of reading kafka if we
> > want
> > > > >>> time
> > > > >>>>>>>>>>> based lags?
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <
> > > > >>>>>>>>> sop...@confluent.io>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> Adding on to John's response to 3), can you clarify when
> > and
> > > > >>> why
> > > > >>>>>>>>>> exactly we
> > > > >>>>>>>>>>>> cannot
> > > > >>>>>>>>>>>> convert between taskIds and partitions? If that's really
> > the
> > > > >>> case
> > > > >>>> I
> > > > >>>>>>>>>> don't
> > > > >>>>>>>>>>>> feel confident
> > > > >>>>>>>>>>>> that the StreamsPartitionAssignor is not full of bugs...
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> It seems like it currently just encodes a list of all
> > > > >>> partitions
> > > > >>>>>> (the
> > > > >>>>>>>>>>>> assignment) and also
> > > > >>>>>>>>>>>> a list of the corresponding task ids, duplicated to
> ensure
> > > > each
> > > > >>>>>>>>>> partition
> > > > >>>>>>>>>>>> has the corresponding
> > > > >>>>>>>>>>>> taskId at the same offset into the list. Why is that
> > > > >>> problematic?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler <
> > > > >>> j...@confluent.io>
> > > > >>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks, all, for considering the points!
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 3. Interesting. I have a vague recollection of that...
> > Still,
> > > > >>>>>>>>> though,
> > > > >>>>>>>>>>>>> it seems a little fishy. After all, we return the
> > assignments
> > > > >>>>>>>>>>>>> themselves as task ids, and the members have to map
> > these to
> > > > >>>> topic
> > > > >>>>>>>>>>>>> partitions in order to configure themselves properly.
> If
> > it's
> > > > >>> too
> > > > >>>>>>>>>>>>> complicated to get this right, then how do we know that
> > > > >>> Streams
> > > > >>>> is
> > > > >>>>>>>>>>>>> computing the correct partitions at all?
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> 4. How about just checking the log-end timestamp when
> you
> > > > call
> > > > >>>> the
> > > > >>>>>>>>>>>>> method? Then, when you get an answer, it's as fresh as
> it
> > > > >>> could
> > > > >>>>>>>>>>>>> possibly be. And as a user you have just one, obvious,
> > "knob"
> > > > >>> to
> > > > >>>>>>>>>>>>> configure how much overhead you want to devote to
> > checking...
> > > > >>> If
> > > > >>>>>>>>> you
> > > > >>>>>>>>>>>>> want to call the broker API less frequently, you just
> > call
> > > > the
> > > > >>>>>>>>>> Streams
> > > > >>>>>>>>>>>>> API less frequently. And you don't have to worry about
> > the
> > > > >>>>>>>>>>>>> relationship between your invocations of that method
> and
> > the
> > > > >>>>>> config
> > > > >>>>>>>>>>>>> setting (e.g., you'll never get a negative number,
> which
> > you
> > > > >>>> could
> > > > >>>>>>>>> if
> > > > >>>>>>>>>>>>> you check the log-end timestamp less frequently than
> you
> > > > check
> > > > >>>> the
> > > > >>>>>>>>>>>>> lag).
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>> -John
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
> > > > >>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks John for going through this.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>    - +1, makes sense
> > > > >>>>>>>>>>>>>>    - +1, no issues there
> > > > >>>>>>>>>>>>>>    - Yeah the initial patch I had submitted for
> K-7149(
> > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/6935) to reduce
> > > > >>>>>>>>> assignmentInfo
> > > > >>>>>>>>>>>>> object had taskIds but the merged PR had similar size
> > > > >>> according
> > > > >>>> to
> > > > >>>>>>>>>> Vinoth
> > > > >>>>>>>>>>>>> and it was simpler so if the end result is of same
> size,
> > it
> > > > >>> would
> > > > >>>>>>>>> not
> > > > >>>>>>>>>>>> make
> > > > >>>>>>>>>>>>> sense to pivot from dictionary and again move to
> taskIDs.
> > > > >>>>>>>>>>>>>>    - Not sure about what a good default would be if we
> > don't
> > > > >>>>>>>>> have a
> > > > >>>>>>>>>>>>> configurable setting. This gives the users the
> > flexibility to
> > > > >>> the
> > > > >>>>>>>>>> users
> > > > >>>>>>>>>>>> to
> > > > >>>>>>>>>>>>> serve their requirements as at the end of the day it
> > would
> > > > >>> take
> > > > >>>>>> CPU
> > > > >>>>>>>>>>>> cycles.
> > > > >>>>>>>>>>>>> I am ok with starting it with a default and see how it
> > goes
> > > > >>> based
> > > > >>>>>>>>>> upon
> > > > >>>>>>>>>>>>> feedback.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>> Navinder
> > > > >>>>>>>>>>>>>>    On Friday, 1 November, 2019, 03:46:42 am IST,
> Vinoth
> > > > >>> Chandar
> > > > >>>>>>>>> <
> > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>  1. Was trying to spell them out separately. but makes
> > sense
> > > > >>>> for
> > > > >>>>>>>>>>>>>> readability. done
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 2. No I immediately agree :) .. makes sense.
> @navinder?
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 3. I actually attempted only sending taskIds while
> > working
> > > > on
> > > > >>>>>>>>>>>> KAFKA-7149.
> > > > >>>>>>>>>>>>>> Its non-trivial to handle edges cases resulting from
> > newly
> > > > >>> added
> > > > >>>>>>>>>> topic
> > > > >>>>>>>>>>>>>> partitions and wildcarded topic entries. I ended up
> > > > >>> simplifying
> > > > >>>>>>>>> it
> > > > >>>>>>>>>> to
> > > > >>>>>>>>>>>>> just
> > > > >>>>>>>>>>>>>> dictionary encoding the topic names to reduce size. We
> > can
> > > > >>> apply
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>> same
> > > > >>>>>>>>>>>>>> technique here for this map. Additionally, we could
> also
> > > > >>>>>>>>> dictionary
> > > > >>>>>>>>>>>>> encode
> > > > >>>>>>>>>>>>>> HostInfo, given its now repeated twice. I think this
> > would
> > > > >>> save
> > > > >>>>>>>>>> more
> > > > >>>>>>>>>>>>> space
> > > > >>>>>>>>>>>>>> than having a flag per topic partition entry. Lmk if
> > you are
> > > > >>>> okay
> > > > >>>>>>>>>> with
> > > > >>>>>>>>>>>>>> this.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> 4. This opens up a good discussion. Given we support
> > time
> > > > lag
> > > > >>>>>>>>>> estimates
> > > > >>>>>>>>>>>>>> also, we need to read the tail record of the changelog
> > > > >>>>>>>>> periodically
> > > > >>>>>>>>>>>>> (unlike
> > > > >>>>>>>>>>>>>> offset lag, which we can potentially piggyback on
> > metadata
> > > > in
> > > > >>>>>>>>>>>>>> ConsumerRecord IIUC). we thought we should have a
> config
> > > > that
> > > > >>>>>>>>>> control
> > > > >>>>>>>>>>>> how
> > > > >>>>>>>>>>>>>> often this read happens? Let me know if there is a
> > simple
> > > > >>> way to
> > > > >>>>>>>>>> get
> > > > >>>>>>>>>>>>>> timestamp value of the tail record that we are
> missing.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler <
> > > > >>>> j...@confluent.io
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Hey Navinder,
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks for updating the KIP, it's a lot easier to see
> > the
> > > > >>>>>>>>> current
> > > > >>>>>>>>>>>>>>> state of the proposal now.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> A few remarks:
> > > > >>>>>>>>>>>>>>> 1. I'm sure it was just an artifact of revisions, but
> > you
> > > > >>> have
> > > > >>>>>>>>>> two
> > > > >>>>>>>>>>>>>>> separate sections where you list additions to the
> > > > >>> KafkaStreams
> > > > >>>>>>>>>>>>>>> interface. Can you consolidate those so we can see
> all
> > the
> > > > >>>>>>>>>> additions
> > > > >>>>>>>>>>>>>>> at once?
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 2. For messageLagEstimate, can I suggest
> > > > "offsetLagEstimate"
> > > > >>>>>>>>>> instead,
> > > > >>>>>>>>>>>>>>> to be clearer that we're specifically measuring a
> > number of
> > > > >>>>>>>>>> offsets?
> > > > >>>>>>>>>>>>>>> If you don't immediately agree, then I'd at least
> > point out
> > > > >>>>>>>>> that
> > > > >>>>>>>>>> we
> > > > >>>>>>>>>>>>>>> usually refer to elements of Kafka topics as
> > "records", not
> > > > >>>>>>>>>>>>>>> "messages", so "recordLagEstimate" might be more
> > > > >>> appropriate.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 3. The proposal mentions adding a map of the standby
> > > > >>>>>>>>>> _partitions_ for
> > > > >>>>>>>>>>>>>>> each host to AssignmentInfo. I assume this is
> designed
> > to
> > > > >>>>>>>>> mirror
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> existing "partitionsByHost" map. To keep the size of
> > these
> > > > >>>>>>>>>> metadata
> > > > >>>>>>>>>>>>>>> messages down, maybe we can consider making two
> > changes:
> > > > >>>>>>>>>>>>>>> (a) for both actives and standbys, encode the _task
> > ids_
> > > > >>>>>>>>> instead
> > > > >>>>>>>>>> of
> > > > >>>>>>>>>>>>>>> _partitions_. Every member of the cluster has a copy
> > of the
> > > > >>>>>>>>>> topology,
> > > > >>>>>>>>>>>>>>> so they can convert task ids into specific partitions
> > on
> > > > >>> their
> > > > >>>>>>>>>> own,
> > > > >>>>>>>>>>>>>>> and task ids are only (usually) three characters.
> > > > >>>>>>>>>>>>>>> (b) instead of encoding two maps (hostinfo -> actives
> > AND
> > > > >>>>>>>>>> hostinfo ->
> > > > >>>>>>>>>>>>>>> standbys), which requires serializing all the
> hostinfos
> > > > >>> twice,
> > > > >>>>>>>>>> maybe
> > > > >>>>>>>>>>>>>>> we can pack them together in one map with a
> structured
> > > > value
> > > > >>>>>>>>>>>> (hostinfo
> > > > >>>>>>>>>>>>>>> -> [actives,standbys]).
> > > > >>>>>>>>>>>>>>> Both of these ideas still require bumping the
> protocol
> > > > >>> version
> > > > >>>>>>>>>> to 6,
> > > > >>>>>>>>>>>>>>> and they basically mean we drop the existing
> > > > >>> `PartitionsByHost`
> > > > >>>>>>>>>> field
> > > > >>>>>>>>>>>>>>> and add a new `TasksByHost` field with the structured
> > value
> > > > >>> I
> > > > >>>>>>>>>>>>>>> mentioned.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> 4. Can we avoid adding the new "lag refresh" config?
> > The
> > > > >>> lags
> > > > >>>>>>>>>> would
> > > > >>>>>>>>>>>>>>> necessarily be approximate anyway, so adding the
> config
> > > > >>> seems
> > > > >>>>>>>>> to
> > > > >>>>>>>>>>>>>>> increase the operational complexity of the system for
> > > > little
> > > > >>>>>>>>>> actual
> > > > >>>>>>>>>>>>>>> benefit.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Thanks for the pseudocode, by the way, it really
> helps
> > > > >>>>>>>>> visualize
> > > > >>>>>>>>>> how
> > > > >>>>>>>>>>>>>>> these new interfaces would play together. And thanks
> > again
> > > > >>> for
> > > > >>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> update!
> > > > >>>>>>>>>>>>>>> -John
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler <
> > > > >>>>>>>>> j...@confluent.io>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hey Vinoth,
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I started going over the KIP again yesterday. There
> > are a
> > > > >>> lot
> > > > >>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>> updates, and I didn't finish my feedback in one day.
> > I'm
> > > > >>>>>>>>>> working on
> > > > >>>>>>>>>>>>> it
> > > > >>>>>>>>>>>>>>>> now.
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>>>> John
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <
> > > > >>>>>>>>>>>>> vchan...@confluent.io>
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Wondering if anyone has thoughts on these changes?
> I
> > > > liked
> > > > >>>>>>>>>> that
> > > > >>>>>>>>>>>>> the new
> > > > >>>>>>>>>>>>>>>>> metadata fetch APIs provide all the information at
> > once
> > > > >>>>>>>>> with
> > > > >>>>>>>>>>>>> consistent
> > > > >>>>>>>>>>>>>>>>> naming..
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Any guidance on what you would like to be discussed
> > or
> > > > >>>>>>>>>> fleshed
> > > > >>>>>>>>>>>> out
> > > > >>>>>>>>>>>>> more
> > > > >>>>>>>>>>>>>>>>> before we call a VOTE?
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> > > > >>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Hi,
> > > > >>>>>>>>>>>>>>>>>> We have made some edits in the KIP(
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > > > >>>>>>>>>>>>>>> )
> > > > >>>>>>>>>>>>>>>>>> after due deliberation on the agreed design to
> > support
> > > > >>>>>>>>> the
> > > > >>>>>>>>>> new
> > > > >>>>>>>>>>>>> query
> > > > >>>>>>>>>>>>>>>>>> design. This includes the new public API to query
> > > > >>>>>>>>>> offset/time
> > > > >>>>>>>>>>>> lag
> > > > >>>>>>>>>>>>>>>>>> information and other details related to querying
> > > > standby
> > > > >>>>>>>>>> tasks
> > > > >>>>>>>>>>>>>>> which have
> > > > >>>>>>>>>>>>>>>>>> come up after thinking of thorough details.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>    - Addition of new config, “
> lag.fetch.interval.ms”
> > to
> > > > >>>>>>>>>>>>> configure
> > > > >>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>> interval of time/offset lag
> > > > >>>>>>>>>>>>>>>>>>    - Addition of new class StoreLagInfo to store
> the
> > > > >>>>>>>>>>>> periodically
> > > > >>>>>>>>>>>>>>> obtained
> > > > >>>>>>>>>>>>>>>>>> time/offset lag
> > > > >>>>>>>>>>>>>>>>>>    - Addition of two new functions in
> KafkaStreams,
> > > > >>>>>>>>>>>>>>> List<StoreLagInfo>
> > > > >>>>>>>>>>>>>>>>>> allLagInfo() and List<StoreLagInfo>
> > > > >>>>>>>>> lagInfoForStore(String
> > > > >>>>>>>>>>>>>>> storeName) to
> > > > >>>>>>>>>>>>>>>>>> return the lag information for an instance and a
> > store
> > > > >>>>>>>>>>>>> respectively
> > > > >>>>>>>>>>>>>>>>>>    - Addition of new class KeyQueryMetadata. We
> need
> > > > >>>>>>>>>>>>> topicPartition
> > > > >>>>>>>>>>>>>>> for
> > > > >>>>>>>>>>>>>>>>>> each key to be matched with the lag API for the
> > topic
> > > > >>>>>>>>>>>> partition.
> > > > >>>>>>>>>>>>> One
> > > > >>>>>>>>>>>>>>> way is
> > > > >>>>>>>>>>>>>>>>>> to add new functions and fetch topicPartition from
> > > > >>>>>>>>>>>>>>> StreamsMetadataState but
> > > > >>>>>>>>>>>>>>>>>> we thought having one call and fetching
> > StreamsMetadata
> > > > >>>>>>>>> and
> > > > >>>>>>>>>>>>>>> topicPartition
> > > > >>>>>>>>>>>>>>>>>> is more cleaner.
> > > > >>>>>>>>>>>>>>>>>>    -
> > > > >>>>>>>>>>>>>>>>>> Renaming partitionsForHost to
> > activePartitionsForHost in
> > > > >>>>>>>>>>>>>>> StreamsMetadataState
> > > > >>>>>>>>>>>>>>>>>> and partitionsByHostState to
> > activePartitionsByHostState
> > > > >>>>>>>>>>>>>>>>>> in StreamsPartitionAssignor
> > > > >>>>>>>>>>>>>>>>>>    - We have also added the pseudo code of how all
> > the
> > > > >>>>>>>>>> changes
> > > > >>>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>> exist
> > > > >>>>>>>>>>>>>>>>>> together and support the new querying APIs
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Please let me know if anything is pending now,
> > before a
> > > > >>>>>>>>>> vote
> > > > >>>>>>>>>>>> can
> > > > >>>>>>>>>>>>> be
> > > > >>>>>>>>>>>>>>>>>> started on this.  On Saturday, 26 October, 2019,
> > > > 05:41:44
> > > > >>>>>>>>>> pm
> > > > >>>>>>>>>>>> IST,
> > > > >>>>>>>>>>>>>>> Navinder
> > > > >>>>>>>>>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>  >> Since there are two soft votes for separate
> > > > >>>>>>>>>> active/standby
> > > > >>>>>>>>>>>>> API
> > > > >>>>>>>>>>>>>>>>>> methods, I also change my position on that. Fine
> > with 2
> > > > >>>>>>>>>>>> separate
> > > > >>>>>>>>>>>>>>>>>> methods. Once we remove the lag information from
> > these
> > > > >>>>>>>>>> APIs,
> > > > >>>>>>>>>>>>>>> returning a
> > > > >>>>>>>>>>>>>>>>>> List is less attractive, since the ordering has no
> > > > >>>>>>>>> special
> > > > >>>>>>>>>>>>> meaning
> > > > >>>>>>>>>>>>>>> now.
> > > > >>>>>>>>>>>>>>>>>> Agreed, now that we are not returning lag, I am
> also
> > > > sold
> > > > >>>>>>>>>> on
> > > > >>>>>>>>>>>>> having
> > > > >>>>>>>>>>>>>>> two
> > > > >>>>>>>>>>>>>>>>>> separate functions. We already have one which
> > returns
> > > > >>>>>>>>>>>>>>> streamsMetadata for
> > > > >>>>>>>>>>>>>>>>>> active tasks, and now we can add another one for
> > > > >>>>>>>>> standbys.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>    On Saturday, 26 October, 2019, 03:55:16 am IST,
> > > > Vinoth
> > > > >>>>>>>>>>>>> Chandar <
> > > > >>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>  +1 to Sophie's suggestion. Having both lag in
> > terms of
> > > > >>>>>>>>>> time
> > > > >>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>> offsets is
> > > > >>>>>>>>>>>>>>>>>> good and makes for a more complete API.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Since there are two soft votes for separate
> > > > >>>>>>>>> active/standby
> > > > >>>>>>>>>> API
> > > > >>>>>>>>>>>>>>> methods, I
> > > > >>>>>>>>>>>>>>>>>> also change my position on that. Fine with 2
> > separate
> > > > >>>>>>>>>> methods.
> > > > >>>>>>>>>>>>>>>>>> Once we remove the lag information from these
> APIs,
> > > > >>>>>>>>>> returning a
> > > > >>>>>>>>>>>>> List
> > > > >>>>>>>>>>>>>>> is
> > > > >>>>>>>>>>>>>>>>>> less attractive, since the ordering has no special
> > > > >>>>>>>>> meaning
> > > > >>>>>>>>>> now.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> lag in offsets vs time: Having both, as
> suggested
> > by
> > > > >>>>>>>>>> Sophie
> > > > >>>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>> of
> > > > >>>>>>>>>>>>>>>>>> course be best. What is a little unclear to me is,
> > how
> > > > in
> > > > >>>>>>>>>>>> details
> > > > >>>>>>>>>>>>>>> are we
> > > > >>>>>>>>>>>>>>>>>> going to compute both?
> > > > >>>>>>>>>>>>>>>>>> @navinder may be next step is to flesh out these
> > details
> > > > >>>>>>>>>> and
> > > > >>>>>>>>>>>>> surface
> > > > >>>>>>>>>>>>>>> any
> > > > >>>>>>>>>>>>>>>>>> larger changes we need to make if need be.
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> Any other details we need to cover, before a VOTE
> > can be
> > > > >>>>>>>>>> called
> > > > >>>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>>> this?
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <
> > > > >>>>>>>>>> bbej...@gmail.com
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> I am jumping in a little late here.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Overall I agree with the proposal to push
> decision
> > > > >>>>>>>>>> making on
> > > > >>>>>>>>>>>>>>> what/how to
> > > > >>>>>>>>>>>>>>>>>>> query in the query layer.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> For point 5 from above, I'm slightly in favor of
> > having
> > > > >>>>>>>>>> a new
> > > > >>>>>>>>>>>>>>> method,
> > > > >>>>>>>>>>>>>>>>>>> "standbyMetadataForKey()" or something similar.
> > > > >>>>>>>>>>>>>>>>>>> Because even if we return all tasks in one list,
> > the
> > > > >>>>>>>>> user
> > > > >>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>> still have
> > > > >>>>>>>>>>>>>>>>>>> to perform some filtering to separate the
> different
> > > > >>>>>>>>>> tasks,
> > > > >>>>>>>>>>>> so I
> > > > >>>>>>>>>>>>>>> don't
> > > > >>>>>>>>>>>>>>>>>> feel
> > > > >>>>>>>>>>>>>>>>>>> making two calls is a burden, and IMHO makes
> things
> > > > >>>>>>>>> more
> > > > >>>>>>>>>>>>>>> transparent for
> > > > >>>>>>>>>>>>>>>>>>> the user.
> > > > >>>>>>>>>>>>>>>>>>> If the final vote is for using an "isActive"
> > field, I'm
> > > > >>>>>>>>>> good
> > > > >>>>>>>>>>>>> with
> > > > >>>>>>>>>>>>>>> that as
> > > > >>>>>>>>>>>>>>>>>>> well.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> Just my 2 cents.
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> > > > >>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>> I think now we are aligned on almost all the
> > design
> > > > >>>>>>>>>> parts.
> > > > >>>>>>>>>>>>>>> Summarising
> > > > >>>>>>>>>>>>>>>>>>>> below what has been discussed above and we have
> a
> > > > >>>>>>>>>> general
> > > > >>>>>>>>>>>>>>> consensus on.
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>>>    - Rather than broadcasting lag across all
> > nodes at
> > > > >>>>>>>>>>>>>>> rebalancing/with
> > > > >>>>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> heartbeat, we will just return a list of all
> > > > >>>>>>>>> available
> > > > >>>>>>>>>>>>> standby’s
> > > > >>>>>>>>>>>>>>> in the
> > > > >>>>>>>>>>>>>>>>>>>> system and the user can make IQ query any of
> those
> > > > >>>>>>>>>> nodes
> > > > >>>>>>>>>>>>> which
> > > > >>>>>>>>>>>>>>> will
> > > > >>>>>>>>>>>>>>>>>>> return
> > > > >>>>>>>>>>>>>>>>>>>> the response, and the lag and offset time. Based
> > on
> > > > >>>>>>>>>> which
> > > > >>>>>>>>>>>>> user
> > > > >>>>>>>>>>>>>>> can
> > > > >>>>>>>>>>>>>>>>>> decide
> > > > >>>>>>>>>>>>>>>>>>>> if he wants to return the response back or call
> > > > >>>>>>>>> another
> > > > >>>>>>>>>>>>> standby.
> > > > >>>>>>>>>>>>>>>>>>>>    -  The current metadata query frequency will
> > not
> > > > >>>>>>>>>> change.
> > > > >>>>>>>>>>>>> It
> > > > >>>>>>>>>>>>>>> will be
> > > > >>>>>>>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>>>>>>> same as it does now, i.e. before each query.
> > > > >>>
>


-- 
-- Guozhang

Reply via email to