I agree, that we might want to drop the time-base lag for the initial
implementation. There is no good way to get this information without a
broker side change.

(100) For the offset lag information, I don't see a reason why the app
should drive when this information is updated though, because KS will
update this information anyway (once per `commit.interval.ms` -- and
updating it more frequently does not make sense, as it most likely won't
change more frequently anyway).

If you all insist that the app should drive it, I can live with it, but
I think it makes the API unnecessarily complex without a benefit.

(101) I still don't understand why we need to have `KeyQueryMetadata`
though. Note, that an instance can only report lag for it's local
stores, but not remote stores as it does not know to what offset a
remote standby has caught up to.

> Because we needed to return the topicPartition the key belongs to, in
> order to correlate with the lag information from the other set of APIs.

My suggestion is to get the lag information from `StreamsMetadata` --
which partition the store belongs to can be completely encapsulated
within KS as all information is local, and I don't think we need to
expose it to the user at all.

We can just add `long StreamsMetadata#offsetLag(store, key)`. If the
store is local we return its lag, if it's remote we return `-1` (ie,
UNKNOWN). As an alternative, we can change the return type to
`Optional<Long>`. This works for active and standby task alike.

Note, that a user must verify if `StreamsMeatadata` is for itself
(local) or remote anyway. We only need to provide a way that allows
users to distinguish between active an standby. (More below.)

I am actually not even sure, why we added
`StreamsMetadata#topicPartitions()` originally -- seems pretty useless.
Can we deprecate it as side cleanup in this KIP? Or do I miss something?


> There are basically 2 reasons. One is that instead of having two functions, 
> one to get StreamsMetadata for active and one for replicas. We are fetching 
> both in a single call and we have a way to get only active or only replicas 
> from the KeyQueryMetadata object(just like isStandby() and isActive() 
> discussion we had earlier)

I understand, that we need two methods. However, I think we can simplify
the API and not introduce `KeyQueryMetadata`, but just "duplicate" all 4
existing methods for standby tasks:

// note that `standbyMetadataForKey` return a Collection in contrast to
existing `metadataForKey`

>   Collection<StreamsMetadata> allStandbyMetadata() 
>   Collection<StreamsMetadata> allStandbyMetadataForStore(String storeName)
>   Collection<StreamsMetadata> metadataForKey(String storeName, K key, 
> Serializer<K> keySerializer)
>   Collection<StreamsMetadata> metadataForKey(String storeName, K key, 
> StreamPartitioner<? super K, ?> partitioner)

Because the existing methods return all active metadata, there is no
reason to return `KeyQueryMetadata` as it's more complicated to get the
standby metadata. With `KeyQueryMetadata` the user needs to make more
calls to get the metadata:






The wrapping of both within `KeyQueryMetadata` does not seem to provide
any benefit but increase our public API surface.


(1.1. + 1.2.) From my understanding `allMetadata()` (and other existing
methods) will only return the metadata of _active_ tasks for backward
compatibility reasons. If we would return standby metadata, existing
code would potentially "break" because the code might pick a standby to
query a key without noticing.


On 11/8/19 6:07 AM, Navinder Brar wrote:
> Thanks, Guozhang for going through it again.
>    - 1.1 & 1.2: The main point of adding topicPartition in KeyQueryMetadata 
> is not topicName, but the partition number. I agree changelog topicNames and 
> store names will have 1-1 mapping but we also need the partition number of 
> the changelog for which are calculating the lag. Now we can add partition 
> number in StreamsMetadata but it will be orthogonal to the definition of 
> StreamsMetadata i.e.- “Represents the state of an instance (process) in a 
> {@link KafkaStreams} application.”  If we add partition number in this, it 
> doesn’t stay metadata for an instance, because now it is storing the 
> partition information for a key being queried. So, having “KeyQueryMetadata” 
> simplifies this as now it contains all the metadata and also changelog and 
> partition information for which we need to calculate the lag.     
> Another way is having another function in parallel to metadataForKey, which 
> returns the partition number for the key being queried. But then we would 
> need 2 calls to StreamsMetadataState, once to fetch metadata and another to 
> fetch partition number. Let me know if any of these two ways seem more 
> intuitive than KeyQueryMetadata then we can try to converge on one.
>    - 1.3:  Again, it is required for the partition number. We can drop store 
> name though.
>    - 2.1: I think this was done in accordance with the opinion from John as 
> time lag would be better implemented with a broker level change and offset 
> change is readily implementable. @vinoth? 
>    - 2.2.1: Good point.  +1
>    - 2.2.2: I am not well aware of it, @vinoth any comments?
>    - 3.1: I think we have already agreed on dropping this, we need to KIP. 
> Also, is there any opinion on lagInfoForStore(String storeName) vs 
> lagInfoForStore(String storeName, int partition)
>    - 3.2: But in functions such as onAssignment(), onPartitionsAssigned(), 
> for standbyTasks also the topicPartitions we use are input topic partitions 
> and not changelog partitions. Would this be breaking from that semantics?  
>     On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang 
> <wangg...@gmail.com> wrote:  
>  Hi Navinder, Vinoth, thanks for the updated KIP!
> Read through the discussions so far and made another pass on the wiki page,
> and here are some more comments:
> 1. About the public APIs:
> 1.1. It is not clear to me how allStandbyMetadataForStore
> and allStandbyMetadata would be differentiated from the original APIs given
> that we will augment StreamsMetadata to include both active and standby
> topic-partitions and store names, so I think we can still use allMetadata
> and allMetadataForStore to get the collection of instance metadata that
> host the store both as active and standbys. Are there any specific use
> cases where we ONLY want to get the standby's metadata? And even if there
> are, we can easily filter it out from the allMetadata / allMetadataForStore
> right?
> 1.2. Similarly I'm wondering for allMetadataForKey, can we return the same
> type: "Collection<StreamsMetadata>" which includes 1 for active, and N-1
> for standbys, and callers can easily identify them by looking inside the
> StreamsMetadata objects? In addition I feel the "topicPartition" field
> inside "KeyQueryMetadata" is not very important since the changelog
> topic-name is always 1-1 mapping to the store name, so as long as the store
> name matches, the changelog topic name should always match (i.e. in
> the pseudo code, just checking store names should be sufficient). If all of
> the above assumption is true, I think we can save us from introducing one
> more public class here.
> 1.3. Similarly in StoreLagInfo, seems not necessary to include the topic
> partition name in addition to the store name.
> 2. About querying store lags: we've discussed about separating the querying
> of the lag information and the querying of the host information so I still
> support having separate APIs here. More thoughts:
> 2.1. Compared with offsets, I'm wondering would time-difference be more
> intuitive for users to define the acceptable "staleness"? More strictly,
> are there any scenarios where we would actually prefer offsets over
> timestamps except that the timestamps are not available?
> 2.2. I'm also a bit leaning towards not putting the burden of periodically
> refreshing our lag and caching it (and introducing another config) on the
> streams side but document clearly its cost and let users to consider its
> call frequency; of course in terms of implementation there are some
> optimizations we can consider:
> 1) for restoring active tasks, the log-end-offset is read once since it is
> not expected to change, and that offset / timestamp can be remembered for
> lag calculation and we do not need to refresh again;
> 2) for standby tasks,  there's a "Map<TopicPartition, Long>
> endOffsets(Collection<TopicPartition> partitions)" in KafkaConsumer to
> batch a list of topic-partitions in one round-trip, and we can use that to
> let our APIs be sth. like "lagInfoForStores(Collection<String> storeNames)"
> to enable the batching effects.
> 3. Misc.:
> 3.1 There's a typo on the pseudo code "globalLagInforation". Also it seems
> not describing how that information is collected (personally I also feel
> one "lagInfoForStores" is sufficient).
> 3.2 Note there's a slight semantical difference between active and
> standby's "partitions" inside StreamsMetadata, for active tasks the
> partitions are actually input topic partitions for the task: some of them
> may also act as changelog topics but these are exceptional cases; for
> standby tasks the "standbyTopicPartitions" are actually the changelog
> topics of the task. So maybe renaming it to "standbyChangelogPartitions" to
> differentiate it?
> Overall I think this would be a really good KIP to add to Streams, thank
> you so much!
> Guozhang
> On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar
> <navinder_b...@yahoo.com.invalid> wrote:
>> +1 on implementing offset based lag for now and push time-based lag to a
>> later point in time when broker changes are done. Although time-based lag
>> enhances the readability, it would not be a make or break change for
>> implementing this KIP.
>> Vinoth has explained the role of KeyQueryMetadata, let me in add in my 2
>> cents as well.
>>     - There are basically 2 reasons. One is that instead of having two
>> functions, one to get StreamsMetadata for active and one for replicas. We
>> are fetching both in a single call and we have a way to get only active or
>> only replicas from the KeyQueryMetadata object(just like isStandby() and
>> isActive() discussion we had earlier)
>>     - Since even after fetching the metadata now we have a requirement of
>> fetching the topicPartition for which the query came:- to fetch lag for
>> that specific topicPartition. Instead of having another call to fetch the
>> partition from StreamsMetadataState we thought using one single call and
>> fetching partition and all metadata would be better.
>>     - Another option was to change StreamsMetadata object and add
>> topicPartition in that for which the query came but it doesn’t make sense
>> in terms of semantics as it StreamsMetadata. Also, KeyQueryMetadata
>> represents all the metadata for the Key being queried, i.e. the partition
>> it belongs to and the list of StreamsMetadata(hosts) active or replica
>> where the key could be found.
>>     On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth Chandar <
>> vchan...@confluent.io> wrote:
>>   +1 to John, suggestion on Duration/Instant and dropping the API to fetch
>> all store's lags. However, I do think we need to return lags per topic
>> partition. So not sure if single return value would work? We need some new
>> class that holds a TopicPartition and Duration/Instant variables together?
>> 10) Because we needed to return the topicPartition the key belongs to, in
>> order to correlate with the lag information from the other set of APIs.
>> Otherwise, we don't know which topic partition's lag estimate to use. We
>> tried to illustrate this on the example code. StreamsMetadata is simply
>> capturing state of a streams host/instance, where as TopicPartition depends
>> on the key passed in. This is a side effect of our decision to decouple lag
>> based filtering on the metadata apis.
>> 20) Goes back to the previous point. We needed to return information that
>> is key specific, at which point it seemed natural for the KeyQueryMetadata
>> to contain active, standby, topic partition for that key. If we merely
>> returned a standbyMetadataForKey() -> Collection<StreamsMetadata> standby,
>> an active metadataForKey() -> StreamsMetadata, and new
>> getTopicPartition(key) -> topicPartition object back to the caller, then
>> arguably you could do the same kind of correlation. IMO having a the
>> KeyQueryMetadata class to encapsulate all this is a friendlier API.
>>   allStandbyMetadata() and allStandbyMetadataForStore() are just counter
>> parts for metadataForStore() and allMetadata() that we introduce mostly for
>> consistent API semantics. (their presence implicitly could help denote
>> metadataForStore() is for active instances. Happy to drop them if their
>> utility is not clear)
>> 30) This would assume we refresh all the standby lag information every
>> time we query for that StreamsMetadata for a specific store? For time based
>> lag, this will involve fetching the tail kafka record at once from multiple
>> kafka topic partitions? I would prefer not to couple them like this and
>> have the ability to make granular store (or even topic partition level)
>> fetches for lag information.
>> 32) I actually prefer John's suggestion to let the application drive the
>> lag fetches/updation and not have flags as the KIP current points to. Are
>> you reexamining that position?
>> On fetching lag information, +1 we could do this much more efficiently with
>> a broker changes. Given I don't yet have a burning need for the time based
>> lag, I think we can sequence the APIs such that the offset based ones are
>> implemented first, while we have a broker side change?
>> Given we decoupled the offset and time based lag API, I am willing to drop
>> the time based lag functionality (since its not needed right away for my
>> use-case). @navinder . thoughts?
>> On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>> Navinder,
>>> thanks for updating the KIP. Couple of follow up questions:
>>> (10) Why do we need to introduce the class `KeyQueryMetadata`?
>>> (20) Why do we introduce the two methods `allMetadataForKey()`? Would it
>>> not be simpler to add `Collection<StreamMetatdata>
>>> standbyMetadataForKey(...)`. This would align with new methods
>>> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`?
>>> (30) Why do we need the class `StoreLagInfo` -- it seems simpler to just
>>> extend `StreamMetadata` with the corresponding attributes and methods
>>> (of active task, the lag would always be reported as zero)
>>> (32) Via (30) we can avoid the two new methods `#allLagInfo()` and
>>> `#lagInfoForStore()`, too, reducing public API and making it simpler to
>>> use the feature.
>>> Btw: If we make `StreamMetadata` thread safe, the lag information can be
>>> updated in the background without the need that the application
>>> refreshes its metadata. Hence, the user can get active and/or standby
>>> metadata once, and only needs to refresh it, if a rebalance happened.
>>> About point (4) of the previous thread: I was also thinking about
>>> when/how to update the time-lag information, and I agree that we should
>>> not update it for each query.
>>> "How": That we need to fetch the last record is a little bit
>>> unfortunate, but I don't see any other way without a broker change. One
>>> issue I still see is with "exactly-once" -- if transaction markers are
>>> in the topic, the last message is not at offset "endOffset - 1" and as
>>> multiple transaction markers might be after each other, it's unclear how
>>> to identify the offset of the last record... Thoughts?
>>> Hence, it might be worth to look into a broker change as a potential
>>> future improvement. It might be possible that the broker caches the
>>> latest timestamp per partition to serve this data efficiently, similar
>>> to `#endOffset()`.
>>> "When": We refresh the end-offset information based on the
>>> `commit.interval.ms` -- doing it more often is not really useful, as
>>> state store caches will most likely buffer up all writes to changelogs
>>> anyway and are only flushed on commit (including a flush of the
>>> producer). Hence, I would suggest to update the time-lag information
>>> based on the same strategy in the background. This way there is no
>>> additional config or methods and the user does not need to worry about
>>> it at all.
>>> To avoid refresh overhead if we don't need it (a user might not use IQ
>>> to begin with), it might be worth to maintain an internal flag
>>> `updateTimeLagEnabled` that is set to `false` initially and only set to
>>> `true` on the first call of a user to get standby-metadata.
>>> -Matthias
>>> On 11/4/19 5:13 PM, Vinoth Chandar wrote:
>>>>>>   I'm having some trouble wrapping my head around what race conditions
>>>> might occur, other than the fundamentally broken state in which
>> different
>>>> instances are running totally different topologies.
>>>> 3. @both Without the topic partitions that the tasks can map back to,
>> we
>>>> have to rely on topology/cluster metadata in each Streams instance to
>> map
>>>> the task back. If the source topics are wild carded for e,g then each
>>>> instance could have different source topics in topology, until the next
>>>> rebalance happens. You can also read my comments from here
>> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
>>>>>> seems hard to imagine how encoding arbitrarily long topic names plus
>> an
>>>> integer for the partition number could be as efficient as task ids,
>> which
>>>> are just two integers.
>>>> 3. if you still have concerns about the efficacy of dictionary
>> encoding,
>>>> happy to engage. The link above also has some benchmark code I used.
>>>> Theoretically, we would send each topic name atleast once, so yes if
>> you
>>>> compare a 10-20 character topic name + an integer to two integers, it
>>> will
>>>> be more bytes. But its constant overhead proportional to size of topic
>>> name
>>>> and with 4,8,12, partitions the size difference between baseline
>>> (version 4
>>>> where we just repeated topic names for each topic partition) and the
>> two
>>>> approaches becomes narrow.
>>>>>> Plus, Navinder is going to implement a bunch of protocol code that we
>>>> might just want to change when the discussion actually does take place,
>>> if
>>>> ever.
>>>>>> it'll just be a mental burden for everyone to remember that we want
>> to
>>>> have this follow-up discussion.
>>>> 3. Is n't people changing same parts of code and tracking follow ups a
>>>> common thing, we need to deal with anyway?  For this KIP, is n't it
>>> enough
>>>> to reason about whether the additional map on top of the topic
>> dictionary
>>>> would incur more overhead than the sending task_ids? I don't think it's
>>>> case, both of them send two integers. As I see it, we can do a separate
>>>> follow up to (re)pursue the task_id conversion and get it working for
>>> both
>>>> maps within the next release?
>>>>>> Can you elaborate on "breaking up the API"? It looks like there are
>>>> already separate API calls in the proposal, one for time-lag, and
>> another
>>>> for offset-lag, so are they not already broken up?
>>>> The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo
>>> objects
>>>> which has both time and offset lags. If we had separate APIs, say (e.g
>>>> offsetLagForStore(), timeLagForStore()), we can implement offset
>> version
>>>> using the offset lag that the streams instance already tracks i.e no
>> need
>>>> for external calls. The time based lag API would incur the kafka read
>> for
>>>> the timestamp. makes sense?
>>>> Based on the discussions so far, I only see these two pending issues to
>>> be
>>>> aligned on. Is there any other open item people want to bring up?
>>>> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman <
>> sop...@confluent.io
>>>> wrote:
>>>>> Regarding 3) I'm wondering, does your concern still apply even now
>>>>> that the pluggable PartitionGrouper interface has been deprecated?
>>>>> Now that we can be sure that the DefaultPartitionGrouper is used to
>>>>> generate
>>>>> the taskId -> partitions mapping, we should be able to convert any
>>> taskId
>>>>> to any
>>>>> partitions.
>>>>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <j...@confluent.io>
>> wrote:
>>>>>> Hey Vinoth, thanks for the reply!
>>>>>> 3.
>>>>>> I get that it's not the main focus of this KIP, but if it's ok, it
>>>>>> would be nice to hash out this point right now. It only came up
>>>>>> because this KIP-535 is substantially extending the pattern in
>>>>>> question. If we push it off until later, then the reviewers are going
>>>>>> to have to suspend their concerns not just while voting for the KIP,
>>>>>> but also while reviewing the code. Plus, Navinder is going to
>>>>>> implement a bunch of protocol code that we might just want to change
>>>>>> when the discussion actually does take place, if ever. Finally, it'll
>>>>>> just be a mental burden for everyone to remember that we want to have
>>>>>> this follow-up discussion.
>>>>>> It makes sense what you say... the specific assignment is already
>>>>>> encoded in the "main" portion of the assignment, not in the
>> "userdata"
>>>>>> part. It also makes sense that it's simpler to reason about races if
>>>>>> you simply get all the information about the topics and partitions
>>>>>> directly from the assignor, rather than get the partition number from
>>>>>> the assignor and the topic name from your own a priori knowledge of
>>>>>> the topology. On the other hand, I'm having some trouble wrapping my
>>>>>> head around what race conditions might occur, other than the
>>>>>> fundamentally broken state in which different instances are running
>>>>>> totally different topologies. Sorry, but can you remind us of the
>>>>>> specific condition?
>>>>>> To the efficiency counterargument, it seems hard to imagine how
>>>>>> encoding arbitrarily long topic names plus an integer for the
>>>>>> partition number could be as efficient as task ids, which are just
>> two
>>>>>> integers. It seems like this would only be true if topic names were 4
>>>>>> characters or less.
>>>>>> 4.
>>>>>> Yeah, clearly, it would not be a good idea to query the metadata
>>>>>> before every single IQ query. I think there are plenty of established
>>>>>> patterns for distributed database clients to follow. Can you
>> elaborate
>>>>>> on "breaking up the API"? It looks like there are already separate
>> API
>>>>>> calls in the proposal, one for time-lag, and another for offset-lag,
>>>>>> so are they not already broken up? FWIW, yes, I agree, the offset lag
>>>>>> is already locally known, so we don't need to build in an extra
>>>>>> synchronous broker API call, just one for the time-lag.
>>>>>> Thanks again for the discussion,
>>>>>> -John
>>>>>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar <
>> vchan...@confluent.io>
>>>>>> wrote:
>>>>>>> 3. Right now, we still get the topic partitions assigned as a part
>> of
>>>>> the
>>>>>>> top level Assignment object (the one that wraps AssignmentInfo) and
>>> use
>>>>>>> that to convert taskIds back. This list of only contains assignments
>>>>> for
>>>>>>> that particular instance. Attempting to also reverse map for "all"
>> the
>>>>>>> tasksIds in the streams cluster i.e all the topic partitions in
>> these
>>>>>>> global assignment maps was what was problematic. By explicitly
>> sending
>>>>>> the
>>>>>>> global assignment maps as actual topic partitions,  group
>> coordinator
>>>>>> (i.e
>>>>>>> the leader that computes the assignment's ) is able to consistently
>>>>>> enforce
>>>>>>> its view of the topic metadata. Still don't think doing such a
>> change
>>>>>> that
>>>>>>> forces you to reconsider semantics, is not needed to save bits on
>>> wire.
>>>>>> May
>>>>>>> be we can discuss this separately from this KIP?
>>>>>>> 4. There needs to be some caching/interval somewhere though since we
>>>>>> don't
>>>>>>> want to make 1 kafka read per 1 IQ potentially. But I think its a
>>> valid
>>>>>>> suggestion, to make this call just synchronous and leave the caching
>>> or
>>>>>> how
>>>>>>> often you want to call to the application. Would it be good to then
>>>>> break
>>>>>>> up the APIs for time and offset based lag?  We can obtain offset
>> based
>>>>>> lag
>>>>>>> for free? Only incur the overhead of reading kafka if we want time
>>>>>>> based lags?
>>>>>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <
>>>>> sop...@confluent.io>
>>>>>>> wrote:
>>>>>>>> Adding on to John's response to 3), can you clarify when and why
>>>>>> exactly we
>>>>>>>> cannot
>>>>>>>> convert between taskIds and partitions? If that's really the case I
>>>>>> don't
>>>>>>>> feel confident
>>>>>>>> that the StreamsPartitionAssignor is not full of bugs...
>>>>>>>> It seems like it currently just encodes a list of all partitions
>> (the
>>>>>>>> assignment) and also
>>>>>>>> a list of the corresponding task ids, duplicated to ensure each
>>>>>> partition
>>>>>>>> has the corresponding
>>>>>>>> taskId at the same offset into the list. Why is that problematic?
>>>>>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler <j...@confluent.io>
>>>>>> wrote:
>>>>>>>>> Thanks, all, for considering the points!
>>>>>>>>> 3. Interesting. I have a vague recollection of that... Still,
>>>>> though,
>>>>>>>>> it seems a little fishy. After all, we return the assignments
>>>>>>>>> themselves as task ids, and the members have to map these to topic
>>>>>>>>> partitions in order to configure themselves properly. If it's too
>>>>>>>>> complicated to get this right, then how do we know that Streams is
>>>>>>>>> computing the correct partitions at all?
>>>>>>>>> 4. How about just checking the log-end timestamp when you call the
>>>>>>>>> method? Then, when you get an answer, it's as fresh as it could
>>>>>>>>> possibly be. And as a user you have just one, obvious, "knob" to
>>>>>>>>> configure how much overhead you want to devote to checking... If
>>>>> you
>>>>>>>>> want to call the broker API less frequently, you just call the
>>>>>> Streams
>>>>>>>>> API less frequently. And you don't have to worry about the
>>>>>>>>> relationship between your invocations of that method and the
>> config
>>>>>>>>> setting (e.g., you'll never get a negative number, which you could
>>>>> if
>>>>>>>>> you check the log-end timestamp less frequently than you check the
>>>>>>>>> lag).
>>>>>>>>> Thanks,
>>>>>>>>> -John
>>>>>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>> Thanks John for going through this.
>>>>>>>>>>     - +1, makes sense
>>>>>>>>>>     - +1, no issues there
>>>>>>>>>>     - Yeah the initial patch I had submitted for K-7149(
>>>>>>>>> https://github.com/apache/kafka/pull/6935) to reduce
>>>>> assignmentInfo
>>>>>>>>> object had taskIds but the merged PR had similar size according to
>>>>>> Vinoth
>>>>>>>>> and it was simpler so if the end result is of same size, it would
>>>>> not
>>>>>>>> make
>>>>>>>>> sense to pivot from dictionary and again move to taskIDs.
>>>>>>>>>>     - Not sure about what a good default would be if we don't
>>>>> have a
>>>>>>>>> configurable setting. This gives the users the flexibility to the
>>>>>> users
>>>>>>>> to
>>>>>>>>> serve their requirements as at the end of the day it would take
>> CPU
>>>>>>>> cycles.
>>>>>>>>> I am ok with starting it with a default and see how it goes based
>>>>>> upon
>>>>>>>>> feedback.
>>>>>>>>>> Thanks,
>>>>>>>>>> Navinder
>>>>>>>>>>     On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar
>>>>> <
>>>>>>>>> vchan...@confluent.io> wrote:
>>>>>>>>>>   1. Was trying to spell them out separately. but makes sense for
>>>>>>>>>> readability. done
>>>>>>>>>> 2. No I immediately agree :) .. makes sense. @navinder?
>>>>>>>>>> 3. I actually attempted only sending taskIds while working on
>>>>>>>> KAFKA-7149.
>>>>>>>>>> Its non-trivial to handle edges cases resulting from newly added
>>>>>> topic
>>>>>>>>>> partitions and wildcarded topic entries. I ended up simplifying
>>>>> it
>>>>>> to
>>>>>>>>> just
>>>>>>>>>> dictionary encoding the topic names to reduce size. We can apply
>>>>>> the
>>>>>>>> same
>>>>>>>>>> technique here for this map. Additionally, we could also
>>>>> dictionary
>>>>>>>>> encode
>>>>>>>>>> HostInfo, given its now repeated twice. I think this would save
>>>>>> more
>>>>>>>>> space
>>>>>>>>>> than having a flag per topic partition entry. Lmk if you are okay
>>>>>> with
>>>>>>>>>> this.
>>>>>>>>>> 4. This opens up a good discussion. Given we support time lag
>>>>>> estimates
>>>>>>>>>> also, we need to read the tail record of the changelog
>>>>> periodically
>>>>>>>>> (unlike
>>>>>>>>>> offset lag, which we can potentially piggyback on metadata in
>>>>>>>>>> ConsumerRecord IIUC). we thought we should have a config that
>>>>>> control
>>>>>>>> how
>>>>>>>>>> often this read happens? Let me know if there is a simple way to
>>>>>> get
>>>>>>>>>> timestamp value of the tail record that we are missing.
>>>>>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler <j...@confluent.io
>>>>>>>> wrote:
>>>>>>>>>>> Hey Navinder,
>>>>>>>>>>> Thanks for updating the KIP, it's a lot easier to see the
>>>>> current
>>>>>>>>>>> state of the proposal now.
>>>>>>>>>>> A few remarks:
>>>>>>>>>>> 1. I'm sure it was just an artifact of revisions, but you have
>>>>>> two
>>>>>>>>>>> separate sections where you list additions to the KafkaStreams
>>>>>>>>>>> interface. Can you consolidate those so we can see all the
>>>>>> additions
>>>>>>>>>>> at once?
>>>>>>>>>>> 2. For messageLagEstimate, can I suggest "offsetLagEstimate"
>>>>>> instead,
>>>>>>>>>>> to be clearer that we're specifically measuring a number of
>>>>>> offsets?
>>>>>>>>>>> If you don't immediately agree, then I'd at least point out
>>>>> that
>>>>>> we
>>>>>>>>>>> usually refer to elements of Kafka topics as "records", not
>>>>>>>>>>> "messages", so "recordLagEstimate" might be more appropriate.
>>>>>>>>>>> 3. The proposal mentions adding a map of the standby
>>>>>> _partitions_ for
>>>>>>>>>>> each host to AssignmentInfo. I assume this is designed to
>>>>> mirror
>>>>>> the
>>>>>>>>>>> existing "partitionsByHost" map. To keep the size of these
>>>>>> metadata
>>>>>>>>>>> messages down, maybe we can consider making two changes:
>>>>>>>>>>> (a) for both actives and standbys, encode the _task ids_
>>>>> instead
>>>>>> of
>>>>>>>>>>> _partitions_. Every member of the cluster has a copy of the
>>>>>> topology,
>>>>>>>>>>> so they can convert task ids into specific partitions on their
>>>>>> own,
>>>>>>>>>>> and task ids are only (usually) three characters.
>>>>>>>>>>> (b) instead of encoding two maps (hostinfo -> actives AND
>>>>>> hostinfo ->
>>>>>>>>>>> standbys), which requires serializing all the hostinfos twice,
>>>>>> maybe
>>>>>>>>>>> we can pack them together in one map with a structured value
>>>>>>>> (hostinfo
>>>>>>>>>>> -> [actives,standbys]).
>>>>>>>>>>> Both of these ideas still require bumping the protocol version
>>>>>> to 6,
>>>>>>>>>>> and they basically mean we drop the existing `PartitionsByHost`
>>>>>> field
>>>>>>>>>>> and add a new `TasksByHost` field with the structured value I
>>>>>>>>>>> mentioned.
>>>>>>>>>>> 4. Can we avoid adding the new "lag refresh" config? The lags
>>>>>> would
>>>>>>>>>>> necessarily be approximate anyway, so adding the config seems
>>>>> to
>>>>>>>>>>> increase the operational complexity of the system for little
>>>>>> actual
>>>>>>>>>>> benefit.
>>>>>>>>>>> Thanks for the pseudocode, by the way, it really helps
>>>>> visualize
>>>>>> how
>>>>>>>>>>> these new interfaces would play together. And thanks again for
>>>>>> the
>>>>>>>>>>> update!
>>>>>>>>>>> -John
>>>>>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler <
>>>>> j...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>>>> Hey Vinoth,
>>>>>>>>>>>> I started going over the KIP again yesterday. There are a lot
>>>>>> of
>>>>>>>>>>>> updates, and I didn't finish my feedback in one day. I'm
>>>>>> working on
>>>>>>>>> it
>>>>>>>>>>>> now.
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> John
>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <
>>>>>>>>> vchan...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Wondering if anyone has thoughts on these changes? I liked
>>>>>> that
>>>>>>>>> the new
>>>>>>>>>>>>> metadata fetch APIs provide all the information at once
>>>>> with
>>>>>>>>> consistent
>>>>>>>>>>>>> naming..
>>>>>>>>>>>>> Any guidance on what you would like to be discussed or
>>>>>> fleshed
>>>>>>>> out
>>>>>>>>> more
>>>>>>>>>>>>> before we call a VOTE?
>>>>>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> We have made some edits in the KIP(
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>>>>>>>>>>> )
>>>>>>>>>>>>>> after due deliberation on the agreed design to support
>>>>> the
>>>>>> new
>>>>>>>>> query
>>>>>>>>>>>>>> design. This includes the new public API to query
>>>>>> offset/time
>>>>>>>> lag
>>>>>>>>>>>>>> information and other details related to querying standby
>>>>>> tasks
>>>>>>>>>>> which have
>>>>>>>>>>>>>> come up after thinking of thorough details.
>>>>>>>>>>>>>>     - Addition of new config, “lag.fetch.interval.ms” to
>>>>>>>>> configure
>>>>>>>>>>> the
>>>>>>>>>>>>>> interval of time/offset lag
>>>>>>>>>>>>>>     - Addition of new class StoreLagInfo to store the
>>>>>>>> periodically
>>>>>>>>>>> obtained
>>>>>>>>>>>>>> time/offset lag
>>>>>>>>>>>>>>     - Addition of two new functions in KafkaStreams,
>>>>>>>>>>> List<StoreLagInfo>
>>>>>>>>>>>>>> allLagInfo() and List<StoreLagInfo>
>>>>> lagInfoForStore(String
>>>>>>>>>>> storeName) to
>>>>>>>>>>>>>> return the lag information for an instance and a store
>>>>>>>>> respectively
>>>>>>>>>>>>>>     - Addition of new class KeyQueryMetadata. We need
>>>>>>>>> topicPartition
>>>>>>>>>>> for
>>>>>>>>>>>>>> each key to be matched with the lag API for the topic
>>>>>>>> partition.
>>>>>>>>> One
>>>>>>>>>>> way is
>>>>>>>>>>>>>> to add new functions and fetch topicPartition from
>>>>>>>>>>> StreamsMetadataState but
>>>>>>>>>>>>>> we thought having one call and fetching StreamsMetadata
>>>>> and
>>>>>>>>>>> topicPartition
>>>>>>>>>>>>>> is more cleaner.
>>>>>>>>>>>>>>     -
>>>>>>>>>>>>>> Renaming partitionsForHost to activePartitionsForHost in
>>>>>>>>>>> StreamsMetadataState
>>>>>>>>>>>>>> and partitionsByHostState to activePartitionsByHostState
>>>>>>>>>>>>>> in StreamsPartitionAssignor
>>>>>>>>>>>>>>     - We have also added the pseudo code of how all the
>>>>>> changes
>>>>>>>>> will
>>>>>>>>>>> exist
>>>>>>>>>>>>>> together and support the new querying APIs
>>>>>>>>>>>>>> Please let me know if anything is pending now, before a
>>>>>> vote
>>>>>>>> can
>>>>>>>>> be
>>>>>>>>>>>>>> started on this.  On Saturday, 26 October, 2019, 05:41:44
>>>>>> pm
>>>>>>>> IST,
>>>>>>>>>>> Navinder
>>>>>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>   >> Since there are two soft votes for separate
>>>>>> active/standby
>>>>>>>>> API
>>>>>>>>>>>>>> methods, I also change my position on that. Fine with 2
>>>>>>>> separate
>>>>>>>>>>>>>> methods. Once we remove the lag information from these
>>>>>> APIs,
>>>>>>>>>>> returning a
>>>>>>>>>>>>>> List is less attractive, since the ordering has no
>>>>> special
>>>>>>>>> meaning
>>>>>>>>>>> now.
>>>>>>>>>>>>>> Agreed, now that we are not returning lag, I am also sold
>>>>>> on
>>>>>>>>> having
>>>>>>>>>>> two
>>>>>>>>>>>>>> separate functions. We already have one which returns
>>>>>>>>>>> streamsMetadata for
>>>>>>>>>>>>>> active tasks, and now we can add another one for
>>>>> standbys.
>>>>>>>>>>>>>>     On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth
>>>>>>>>> Chandar <
>>>>>>>>>>>>>> vchan...@confluent.io> wrote:
>>>>>>>>>>>>>>   +1 to Sophie's suggestion. Having both lag in terms of
>>>>>> time
>>>>>>>> and
>>>>>>>>>>> offsets is
>>>>>>>>>>>>>> good and makes for a more complete API.
>>>>>>>>>>>>>> Since there are two soft votes for separate
>>>>> active/standby
>>>>>> API
>>>>>>>>>>> methods, I
>>>>>>>>>>>>>> also change my position on that. Fine with 2 separate
>>>>>> methods.
>>>>>>>>>>>>>> Once we remove the lag information from these APIs,
>>>>>> returning a
>>>>>>>>> List
>>>>>>>>>>> is
>>>>>>>>>>>>>> less attractive, since the ordering has no special
>>>>> meaning
>>>>>> now.
>>>>>>>>>>>>>>>> lag in offsets vs time: Having both, as suggested by
>>>>>> Sophie
>>>>>>>>> would
>>>>>>>>>>> of
>>>>>>>>>>>>>> course be best. What is a little unclear to me is, how in
>>>>>>>> details
>>>>>>>>>>> are we
>>>>>>>>>>>>>> going to compute both?
>>>>>>>>>>>>>> @navinder may be next step is to flesh out these details
>>>>>> and
>>>>>>>>> surface
>>>>>>>>>>> any
>>>>>>>>>>>>>> larger changes we need to make if need be.
>>>>>>>>>>>>>> Any other details we need to cover, before a VOTE can be
>>>>>> called
>>>>>>>>> on
>>>>>>>>>>> this?
>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <
>>>>>> bbej...@gmail.com
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> I am jumping in a little late here.
>>>>>>>>>>>>>>> Overall I agree with the proposal to push decision
>>>>>> making on
>>>>>>>>>>> what/how to
>>>>>>>>>>>>>>> query in the query layer.
>>>>>>>>>>>>>>> For point 5 from above, I'm slightly in favor of having
>>>>>> a new
>>>>>>>>>>> method,
>>>>>>>>>>>>>>> "standbyMetadataForKey()" or something similar.
>>>>>>>>>>>>>>> Because even if we return all tasks in one list, the
>>>>> user
>>>>>>>> will
>>>>>>>>>>> still have
>>>>>>>>>>>>>>> to perform some filtering to separate the different
>>>>>> tasks,
>>>>>>>> so I
>>>>>>>>>>> don't
>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>> making two calls is a burden, and IMHO makes things
>>>>> more
>>>>>>>>>>> transparent for
>>>>>>>>>>>>>>> the user.
>>>>>>>>>>>>>>> If the final vote is for using an "isActive" field, I'm
>>>>>> good
>>>>>>>>> with
>>>>>>>>>>> that as
>>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>> Just my 2 cents.
>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>> I think now we are aligned on almost all the design
>>>>>> parts.
>>>>>>>>>>> Summarising
>>>>>>>>>>>>>>>> below what has been discussed above and we have a
>>>>>> general
>>>>>>>>>>> consensus on.
>>>>>>>>>>>>>>>>     - Rather than broadcasting lag across all nodes at
>>>>>>>>>>> rebalancing/with
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> heartbeat, we will just return a list of all
>>>>> available
>>>>>>>>> standby’s
>>>>>>>>>>> in the
>>>>>>>>>>>>>>>> system and the user can make IQ query any of those
>>>>>> nodes
>>>>>>>>> which
>>>>>>>>>>> will
>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>> the response, and the lag and offset time. Based on
>>>>>> which
>>>>>>>>> user
>>>>>>>>>>> can
>>>>>>>>>>>>>> decide
>>>>>>>>>>>>>>>> if he wants to return the response back or call
>>>>> another
>>>>>>>>> standby.
>>>>>>>>>>>>>>>>     -  The current metadata query frequency will not
>>>>>> change.
>>>>>>>>> It
>>>>>>>>>>> will be
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> same as it does now, i.e. before each query.
>>>>>>>>>>>>>>>>     -  For fetching list<StreamsMetadata> in
>>>>>>>>>>> StreamsMetadataState.java
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> List<QueryableStoreProvider> in
>>>>>>>>>>> StreamThreadStateStoreProvider.java
>>>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>> will return all active stores which are
>>>>>> running/restoring
>>>>>>>> and
>>>>>>>>>>> replica
>>>>>>>>>>>>>>>> stores which are running), we will add new functions
>>>>>> and
>>>>>>>> not
>>>>>>>>>>> disturb
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> existing functions
>>>>>>>>>>>>>>>>     - There is no need to add new StreamsConfig for
>>>>>>>>> implementing
>>>>>>>>>>> this
>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>     - We will add standbyPartitionsByHost in
>>>>>> AssignmentInfo
>>>>>>>>> and
>>>>>>>>>>>>>>>> StreamsMetadataState which would change the existing
>>>>>>>>>>> rebuildMetadata()
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> setPartitionsByHostState()
>>>>>>>>>>>>>>>> If anyone has any more concerns please feel free to
>>>>>> add.
>>>>>>>> Post
>>>>>>>>>>> this I
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> be initiating a vote.
>>>>>>>>>>>>>>>> ~Navinder
>>>>>>>>>>>>>>>>     On Friday, 25 October, 2019, 12:05:29 pm IST,
>>>>>> Matthias
>>>>>>>> J.
>>>>>>>>> Sax
>>>>>>>>>>> <
>>>>>>>>>>>>>>>> matth...@confluent.io> wrote:
>>>>>>>>>>>>>>>>   Just to close the loop @Vinoth:
>>>>>>>>>>>>>>>>> 1. IIUC John intends to add (or we can do this in
>>>>>> this
>>>>>>>>> KIP) lag
>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>> to AssignmentInfo, which gets sent to every
>>>>>> participant.
>>>>>>>>>>>>>>>> As explained by John, currently KIP-441 plans to only
>>>>>>>> report
>>>>>>>>> the
>>>>>>>>>>>>>>>> information to the leader. But I guess, with the new
>>>>>>>>> proposal to
>>>>>>>>>>> not
>>>>>>>>>>>>>>>> broadcast this information anyway, this concern is
>>>>>>>>> invalidated
>>>>>>>>>>> anyway
>>>>>>>>>>>>>>>>> 2. At-least I was under the assumption that it can
>>>>> be
>>>>>>>>> called
>>>>>>>>>>> per
>>>>>>>>>>>>>> query,
>>>>>>>>>>>>>>>>> since the API docs don't seem to suggest otherwise.
>>>>>> Do
>>>>>>>> you
>>>>>>>>> see
>>>>>>>>>>> any
>>>>>>>>>>>>>>>>> potential issues if we call this every query? (we
>>>>>> should
>>>>>>>>>>> benchmark
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> nonetheless)
>>>>>>>>>>>>>>>> I did not see a real issue if people refresh the
>>>>>> metadata
>>>>>>>>>>> frequently,
>>>>>>>>>>>>>>>> because it would be a local call. My main point was,
>>>>>> that
>>>>>>>>> this
>>>>>>>>>>> would
>>>>>>>>>>>>>>>> change the current usage pattern of the API, and we
>>>>>> would
>>>>>>>>>>> clearly need
>>>>>>>>>>>>>>>> to communicate this change. Similar to (1), this
>>>>>> concern in
>>>>>>>>>>> invalidated
>>>>>>>>>>>>>>>> anyway.
>>>>>>>>>>>>>>>> @John: I think it's a great idea to get rid of
>>>>>> reporting
>>>>>>>>> lag, and
>>>>>>>>>>>>>>>> pushing the decision making process about "what to
>>>>>> query"
>>>>>>>>> into
>>>>>>>>>>> the
>>>>>>>>>>>>>> query
>>>>>>>>>>>>>>>> serving layer itself. This simplifies the overall
>>>>>> design of
>>>>>>>>> this
>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>> significantly, and actually aligns very well with the
>>>>>> idea
>>>>>>>>> that
>>>>>>>>>>> Kafka
>>>>>>>>>>>>>>>> Streams (as it is a library) should only provide the
>>>>>> basic
>>>>>>>>>>> building
>>>>>>>>>>>>>>>> block. Many of my raised questions are invalided by
>>>>>> this.
>>>>>>>>>>>>>>>> Some questions are still open though:
>>>>>>>>>>>>>>>>> 10) Do we need to distinguish between
>>>>>> active(restoring)
>>>>>>>> and
>>>>>>>>>>> standby
>>>>>>>>>>>>>>>>> tasks? Or could be treat both as the same?
>>>>>>>>>>>>>>>> @Vinoth: about (5). I see your point about multiple
>>>>>> calls
>>>>>>>> vs
>>>>>>>>> a
>>>>>>>>>>> single
>>>>>>>>>>>>>>>> call. I still slightly prefer multiple calls, but
>>>>> it's
>>>>>>>> highly
>>>>>>>>>>>>>> subjective
>>>>>>>>>>>>>>>> and I would also be fine to add an #isActive()
>>>>> method.
>>>>>>>> Would
>>>>>>>>> be
>>>>>>>>>>> good
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> get feedback from others.
>>>>>>>>>>>>>>>> For (14), ie, lag in offsets vs time: Having both, as
>>>>>>>>> suggested
>>>>>>>>>>> by
>>>>>>>>>>>>>>>> Sophie would of course be best. What is a little
>>>>>> unclear to
>>>>>>>>> me
>>>>>>>>>>> is, how
>>>>>>>>>>>>>>>> in details are we going to compute both?
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>> On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
>>>>>>>>>>>>>>>>> Just to chime in on the "report lag vs timestamp
>>>>>>>>> difference"
>>>>>>>>>>> issue, I
>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>> actually advocate for both. As mentioned already,
>>>>>> time
>>>>>>>>>>> difference is
>>>>>>>>>>>>>>>>> probably a lot easier and/or more useful to reason
>>>>>> about
>>>>>>>> in
>>>>>>>>>>> terms of
>>>>>>>>>>>>>>>>> "freshness"
>>>>>>>>>>>>>>>>> of the state. But in the case when all queried
>>>>>> stores are
>>>>>>>>> far
>>>>>>>>>>> behind,
>>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>> be used to estimate the recovery velocity. You can
>>>>>> then
>>>>>>>>> get a
>>>>>>>>>>> (pretty
>>>>>>>>>>>>>>>> rough)
>>>>>>>>>>>>>>>>> idea of when a store might be ready, and wait until
>>>>>>>> around
>>>>>>>>>>> then to
>>>>>>>>>>>>>>> query
>>>>>>>>>>>>>>>>> again.
>>>>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang <
>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> I think I agree with John's recent reasoning as
>>>>>> well:
>>>>>>>>> instead
>>>>>>>>>>> of
>>>>>>>>>>>>>>> letting
>>>>>>>>>>>>>>>>>> the storeMetadataAPI to return the staleness
>>>>>>>> information,
>>>>>>>>>>> letting
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> client to query either active or standby and
>>>>> letting
>>>>>>>>> standby
>>>>>>>>>>> query
>>>>>>>>>>>>>>>> response
>>>>>>>>>>>>>>>>>> to include both the values + timestamp (or lag, as
>>>>>> in
>>>>>>>>> diffs of
>>>>>>>>>>>>>>>> timestamps)
>>>>>>>>>>>>>>>>>> would actually be more intuitive -- not only the
>>>>>> streams
>>>>>>>>>>> client is
>>>>>>>>>>>>>>>> simpler,
>>>>>>>>>>>>>>>>>> from user's perspective they also do not need to
>>>>>>>>> periodically
>>>>>>>>>>>>>> refresh
>>>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>> staleness information from the client, but only
>>>>>> need to
>>>>>>>>> make
>>>>>>>>>>>>>> decisions
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>> the fly whenever they need to query.
>>>>>>>>>>>>>>>>>> Again the standby replica then need to know the
>>>>>> current
>>>>>>>>> active
>>>>>>>>>>>>>> task's
>>>>>>>>>>>>>>>>>> timestamp, which can be found from the log end
>>>>>> record's
>>>>>>>>>>> encoded
>>>>>>>>>>>>>>>> timestamp;
>>>>>>>>>>>>>>>>>> today we standby tasks do not read that specific
>>>>>> record,
>>>>>>>>> but
>>>>>>>>>>> only
>>>>>>>>>>>>>>>> refresh
>>>>>>>>>>>>>>>>>> its knowledge on the log end offset, but I think
>>>>>>>>> refreshing
>>>>>>>>>>> the
>>>>>>>>>>>>>> latest
>>>>>>>>>>>>>>>>>> record timestamp is not a very bad request to add
>>>>>> on the
>>>>>>>>>>> standby
>>>>>>>>>>>>>>>> replicas.
>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar <
>>>>>>>>>>>>>> vchan...@confluent.io
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> +1 As someone implementing a query routing layer,
>>>>>> there
>>>>>>>>> is
>>>>>>>>>>> already
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>> to have mechanisms in place to do
>>>>>> healthchecks/failure
>>>>>>>>>>> detection to
>>>>>>>>>>>>>>>>>> detect
>>>>>>>>>>>>>>>>>>> failures for queries, while Streams rebalancing
>>>>>>>>> eventually
>>>>>>>>>>> kicks in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> background.
>>>>>>>>>>>>>>>>>>> So, pushing this complexity to the IQ client app
>>>>>> keeps
>>>>>>>>>>> Streams
>>>>>>>>>>>>>>> simpler
>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>> well. IQs will be potentially issues at an order
>>>>> of
>>>>>>>>>>> magnitude more
>>>>>>>>>>>>>>>>>>> frequently and it can achieve good freshness for
>>>>>> the
>>>>>>>> lag
>>>>>>>>>>>>>> information.
>>>>>>>>>>>>>>>>>>> I would like to add however, that we would also
>>>>>> need to
>>>>>>>>>>> introduce
>>>>>>>>>>>>>>> apis
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> KafkaStreams class, for obtaining lag information
>>>>>> for
>>>>>>>> all
>>>>>>>>>>> stores
>>>>>>>>>>>>>>> local
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> that host. This is for the IQs to relay back with
>>>>>> the
>>>>>>>>>>> response/its
>>>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>>>> heartbeat mechanism.
>>>>>>>>>>>>>>>>>>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler <
>>>>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>> I've been mulling about this KIP, and I think I
>>>>>> was on
>>>>>>>>> the
>>>>>>>>>>> wrong
>>>>>>>>>>>>>>> track
>>>>>>>>>>>>>>>>>>>> earlier with regard to task lags. Tl;dr: I don't
>>>>>> think
>>>>>>>>> we
>>>>>>>>>>> should
>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>> lags at all to the metadata API (and also not to
>>>>>> the
>>>>>>>>>>>>>> AssignmentInfo
>>>>>>>>>>>>>>>>>>>> protocol message).
>>>>>>>>>>>>>>>>>>>> Like I mentioned early on, reporting lag via
>>>>>>>>>>>>>>>>>>>> SubscriptionInfo/AssignmentInfo would only work
>>>>>> while
>>>>>>>>>>> rebalances
>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>> happening. Once the group stabilizes, no members
>>>>>> would
>>>>>>>>> be
>>>>>>>>>>> notified
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> each others' lags anymore. I had been thinking
>>>>>> that
>>>>>>>> the
>>>>>>>>>>> solution
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> be the heartbeat proposal I mentioned earlier,
>>>>> but
>>>>>>>> that
>>>>>>>>>>> proposal
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> have reported the heartbeats of the members only
>>>>>> to
>>>>>>>> the
>>>>>>>>>>> leader
>>>>>>>>>>>>>>> member
>>>>>>>>>>>>>>>>>>>> (the one who makes assignments). To be useful in
>>>>>> the
>>>>>>>>>>> context of
>>>>>>>>>>>>>>> _this_
>>>>>>>>>>>>>>>>>>>> KIP, we would also have to report the lags in
>>>>> the
>>>>>>>>> heartbeat
>>>>>>>>>>>>>>> responses
>>>>>>>>>>>>>>>>>>>> to of _all_ members. This is a concern to be
>>>>>> because
>>>>>>>> now
>>>>>>>>>>> _all_ the
>>>>>>>>>>>>>>>>>>>> lags get reported to _all_ the members on
>>>>> _every_
>>>>>>>>>>> heartbeat... a
>>>>>>>>>>>>>> lot
>>>>>>>>>>>>>>>>>>>> of chatter.
>>>>>>>>>>>>>>>>>>>> Plus, the proposal for KIP-441 is only to report
>>>>>> the
>>>>>>>>> lags
>>>>>>>>>>> of each
>>>>>>>>>>>>>>>>>>>> _task_. This is the sum of the lags of all the
>>>>>> stores
>>>>>>>>> in the
>>>>>>>>>>>>>> tasks.
>>>>>>>>>>>>>>>>>>>> But this would be insufficient for KIP-535. For
>>>>>> this
>>>>>>>>> kip,
>>>>>>>>>>> we would
>>>>>>>>>>>>>>>>>>>> want the lag specifically of the store we want
>>>>> to
>>>>>>>>> query. So
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> means, we have to report the lags of all the
>>>>>> stores of
>>>>>>>>> all
>>>>>>>>>>> the
>>>>>>>>>>>>>>> members
>>>>>>>>>>>>>>>>>>>> to every member... even more chatter!
>>>>>>>>>>>>>>>>>>>> The final nail in the coffin to me is that IQ
>>>>>> clients
>>>>>>>>> would
>>>>>>>>>>> have
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> start refreshing their metadata quite frequently
>>>>>> to
>>>>>>>>> stay up
>>>>>>>>>>> to
>>>>>>>>>>>>>> date
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>> the lags, which adds even more overhead to the
>>>>>> system.
>>>>>>>>>>>>>>>>>>>> Consider a strawman alternative: we bring
>>>>> KIP-535
>>>>>> back
>>>>>>>>> to
>>>>>>>>>>>>>> extending
>>>>>>>>>>>>>>>>>>>> the metadata API to tell the client the active
>>>>> and
>>>>>>>>> standby
>>>>>>>>>>>>>> replicas
>>>>>>>>>>>>>>>>>>>> for the key in question (not including and
>>>>>>>>> "staleness/lag"
>>>>>>>>>>>>>>>>>>>> restriction, just returning all the replicas).
>>>>>> Then,
>>>>>>>> the
>>>>>>>>>>> client
>>>>>>>>>>>>>>> picks
>>>>>>>>>>>>>>>>>>>> a replica and sends the query. The server
>>>>> returns
>>>>>> the
>>>>>>>>>>> current lag
>>>>>>>>>>>>>>>>>>>> along with the response (maybe in an HTML header
>>>>>> or
>>>>>>>>>>> something).
>>>>>>>>>>>>>>> Then,
>>>>>>>>>>>>>>>>>>>> the client keeps a map of its last observed lags
>>>>>> for
>>>>>>>>> each
>>>>>>>>>>> replica,
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> uses this information to prefer fresher
>>>>> replicas.
>>>>>>>>>>>>>>>>>>>> OR, if it wants only to query the active
>>>>> replica,
>>>>>> it
>>>>>>>>> would
>>>>>>>>>>> throw
>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>> error on any lag response greater than zero,
>>>>>> refreshes
>>>>>>>>> its
>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>> re-querying the metadata API, and tries again
>>>>>> with the
>>>>>>>>>>> current
>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>>>> replica.
>>>>>>>>>>>>>>>>>>>> This way, the lag information will be super
>>>>> fresh
>>>>>> for
>>>>>>>>> the
>>>>>>>>>>> client,
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> we keep the Metadata API /
>>>>>> Assignment,Subscription /
>>>>>>>> and
>>>>>>>>>>> Heartbeat
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> slim as possible.
>>>>>>>>>>>>>>>>>>>> Side note: I do think that some time soon, we'll
>>>>>> have
>>>>>>>> to
>>>>>>>>>>> add a
>>>>>>>>>>>>>>> library
>>>>>>>>>>>>>>>>>>>> for IQ server/clients. I think that this logic
>>>>>> will
>>>>>>>>> start
>>>>>>>>>>> to get
>>>>>>>>>>>>>>>>>>>> pretty complex.
>>>>>>>>>>>>>>>>>>>> I hope this thinking is reasonably clear!
>>>>>>>>>>>>>>>>>>>> Thanks again,
>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>> Does that
>>>>>>>>>>>>>>>>>>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar
>>>>> <
>>>>>>>>>>>>>>>> vchan...@confluent.io
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> Responding to the points raised by Matthias
>>>>>>>>>>>>>>>>>>>>> 1. IIUC John intends to add (or we can do this
>>>>> in
>>>>>>>> this
>>>>>>>>>>> KIP) lag
>>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>> to AssignmentInfo, which gets sent to every
>>>>>>>>> participant.
>>>>>>>>>>>>>>>>>>>>> 2. At-least I was under the assumption that it
>>>>>> can be
>>>>>>>>>>> called per
>>>>>>>>>>>>>>>>>> query,
>>>>>>>>>>>>>>>>>>>>> since the API docs don't seem to suggest
>>>>>> otherwise.
>>>>>>>> Do
>>>>>>>>> you
>>>>>>>>>>> see
>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>> potential issues if we call this every query?
>>>>> (we
>>>>>>>>> should
>>>>>>>>>>>>>> benchmark
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> nonetheless)
>>>>>>>>>>>>>>>>>>>>> 4. Agree. metadataForKey() implicitly would
>>>>>> return
>>>>>>>> the
>>>>>>>>>>> active
>>>>>>>>>>>>>> host
>>>>>>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>>> (as it was before). We should also document
>>>>> this
>>>>>> in
>>>>>>>>> that
>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>>>> javadoc,
>>>>>>>>>>>>>>>>>>>>> given we have another method(s) that returns
>>>>> more
>>>>>>>> host
>>>>>>>>>>> metadata
>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>>>>>> 5.  While I see the point, the app/caller has
>>>>> to
>>>>>> make
>>>>>>>>> two
>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>> APIs
>>>>>>>>>>>>>>>>>>>>> calls to obtain active/standby and potentially
>>>>>> do the
>>>>>>>>> same
>>>>>>>>>>> set of
>>>>>>>>>>>>>>>>>>>> operation
>>>>>>>>>>>>>>>>>>>>> to query the state. I personally still like a
>>>>>> method
>>>>>>>>> like
>>>>>>>>>>>>>>> isActive()
>>>>>>>>>>>>>>>>>>>>> better, but don't have strong opinions.
>>>>>>>>>>>>>>>>>>>>> 9. If we do expose the lag information, could
>>>>> we
>>>>>> just
>>>>>>>>>>> leave it
>>>>>>>>>>>>>> upto
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> caller to decide whether it errors out or not
>>>>>> and not
>>>>>>>>> make
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> decision
>>>>>>>>>>>>>>>>>>>>> within Streams? i.e we don't need a new config
>>>>>>>>>>>>>>>>>>>>> 14. +1 . If it's easier to do right away. We
>>>>>> started
>>>>>>>>> with
>>>>>>>>>>> number
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> records, following the lead from KIP-441
>>>>>>>>>>>>>>>>>>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>>> Thanks, everyone for taking a look. Some very
>>>>>> cool
>>>>>>>>> ideas
>>>>>>>>>>> have
>>>>>>>>>>>>>>> flown
>>>>>>>>>>>>>>>>>>> in.
>>>>>>>>>>>>>>>>>>>>>>>> There was a follow-on idea I POCed to
>>>>>> continuously
>>>>>>>>>>> share lag
>>>>>>>>>>>>>>>>>>>>>> information in the heartbeat protocol+1 that
>>>>>> would
>>>>>>>> be
>>>>>>>>>>> great, I
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> update
>>>>>>>>>>>>>>>>>>>>>> the KIP assuming this work will finish soon
>>>>>>>>>>>>>>>>>>>>>>>> I think that adding a new method to
>>>>>>>>>>> StreamsMetadataState and
>>>>>>>>>>>>>>>>>>>>>> deprecating the existing method isthe best way
>>>>>> to
>>>>>>>> go;
>>>>>>>>> we
>>>>>>>>>>> just
>>>>>>>>>>>>>>> can't
>>>>>>>>>>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>>>>> the return types of any existing methods.+1 on
>>>>>> this,
>>>>>>>>> we
>>>>>>>>>>> will add
>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>> methods for users who would be interested in
>>>>>>>> querying
>>>>>>>>>>> back a
>>>>>>>>>>>>>> list
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> possible options to query from and leave the
>>>>>> current
>>>>>>>>>>> function
>>>>>>>>>>>>>>>>>>>>>> getStreamsMetadataForKey() untouched for users
>>>>>> who
>>>>>>>>> want
>>>>>>>>>>> absolute
>>>>>>>>>>>>>>>>>>>>>> consistency.
>>>>>>>>>>>>>>>>>>>>>>>> why not just always return all available
>>>>>> metadata
>>>>>>>>>>> (including
>>>>>>>>>>>>>>>>>>>>>> active/standby or lag) and let the caller
>>>>>> decide to
>>>>>>>>> which
>>>>>>>>>>> node
>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>>>>>>> route the query+1. I think this makes sense as
>>>>>> from
>>>>>>>> a
>>>>>>>>> user
>>>>>>>>>>>>>>>>>> standpoint
>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>> is no difference b/w an active and a standby
>>>>> if
>>>>>> both
>>>>>>>>> have
>>>>>>>>>>> same
>>>>>>>>>>>>>>> lag,
>>>>>>>>>>>>>>>>>>>> Infact
>>>>>>>>>>>>>>>>>>>>>> users would be able to use this API to reduce
>>>>>> query
>>>>>>>>> load
>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>> actives,
>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>> returning all available options along with the
>>>>>>>> current
>>>>>>>>>>> lag in
>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>> make sense and leave it to user how they want
>>>>>> to use
>>>>>>>>> this
>>>>>>>>>>> data.
>>>>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>>>> another added advantage. If a user queries any
>>>>>>>> random
>>>>>>>>>>> machine
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> key and
>>>>>>>>>>>>>>>>>>>>>> that machine has a replica for the
>>>>>> partition(where
>>>>>>>> key
>>>>>>>>>>> belongs)
>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>> choose to serve the data from there itself(if
>>>>> it
>>>>>>>>> doesn’t
>>>>>>>>>>> lag
>>>>>>>>>>>>>> much)
>>>>>>>>>>>>>>>>>>>> rather
>>>>>>>>>>>>>>>>>>>>>> than finding the active and making an IQ to
>>>>>> that.
>>>>>>>> This
>>>>>>>>>>> would
>>>>>>>>>>>>>> save
>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>> critical time in serving for some
>>>>> applications.
>>>>>>>>>>>>>>>>>>>>>>>> Adding the lag in terms of timestamp diff
>>>>>>>> comparing
>>>>>>>>> the
>>>>>>>>>>>>>>>>>> committed
>>>>>>>>>>>>>>>>>>>>>> offset.+1 on this, I think it’s more readable.
>>>>>> But
>>>>>>>> as
>>>>>>>>>>> John said
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> function allMetadataForKey() is just returning
>>>>>> the
>>>>>>>>>>> possible
>>>>>>>>>>>>>>> options
>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>> where users can query a key, so we can even
>>>>>> drop the
>>>>>>>>>>> parameter
>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing/tolerableDataStaleness
>>>>> and
>>>>>> just
>>>>>>>>>>> return all
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> streamsMetadata containing that key along with
>>>>>> the
>>>>>>>>> offset
>>>>>>>>>>> limit.
>>>>>>>>>>>>>>>>>>>>>> Answering the questions posted by Matthias in
>>>>>>>>> sequence.
>>>>>>>>>>>>>>>>>>>>>> 1. @John can you please comment on this one.2.
>>>>>> Yeah
>>>>>>>>> the
>>>>>>>>>>> usage
>>>>>>>>>>>>>>>>>> pattern
>>>>>>>>>>>>>>>>>>>>>> would include querying this prior to every
>>>>>> request
>>>>>>>> 3.
>>>>>>>>>>> Will add
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>>>>>> to StreamsMetadata in the KIP, would include
>>>>>> changes
>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> rebuildMetadata()
>>>>>>>>>>>>>>>>>>>>>> etc.4. Makes sense, already addressed above5.
>>>>>> Is it
>>>>>>>>>>> important
>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>> perspective if they are querying an
>>>>>>>>> active(processing),
>>>>>>>>>>>>>>>>>>>> active(restoring),
>>>>>>>>>>>>>>>>>>>>>> a standby task if we have away of denoting lag
>>>>>> in a
>>>>>>>>>>> readable
>>>>>>>>>>>>>>> manner
>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>> kind of signifies the user that this is the
>>>>> best
>>>>>>>> node
>>>>>>>>> to
>>>>>>>>>>> query
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> fresh
>>>>>>>>>>>>>>>>>>>>>> data.6. Yes, I intend to return the actives
>>>>> and
>>>>>>>>> replicas
>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>> list in allMetadataForKey()7. tricky8. yes, we
>>>>>> need
>>>>>>>>> new
>>>>>>>>>>>>>> functions
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>> activeRestoring and standbyRunning tasks.9.
>>>>>>>>> StreamsConfig
>>>>>>>>>>>>>> doesn’t
>>>>>>>>>>>>>>>>>>> look
>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>> of much use to me since we are giving all
>>>>>> possible
>>>>>>>>>>> options via
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> function, or they can use existing function
>>>>>>>>>>>>>>>>>>> getStreamsMetadataForKey()
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> get just the active10. I think treat them both
>>>>>> the
>>>>>>>>> same
>>>>>>>>>>> and let
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> lag do
>>>>>>>>>>>>>>>>>>>>>> the talking11. We are just sending them the
>>>>>> option
>>>>>>>> to
>>>>>>>>>>> query from
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> allMetadataForKey(), which doesn’t include any
>>>>>>>>> handle. We
>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>> query
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> machine for the key where it calls allStores()
>>>>>> and
>>>>>>>>> tries
>>>>>>>>>>> to find
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> task
>>>>>>>>>>>>>>>>>>>>>> in
>>>>> activeRunning/activeRestoring/standbyRunning
>>>>>> and
>>>>>>>>> adds
>>>>>>>>>>> the
>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>>>>> here. 12. Need to verify, but during the exact
>>>>>> point
>>>>>>>>> when
>>>>>>>>>>> store
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> closed
>>>>>>>>>>>>>>>>>>>>>> to transition it from restoring to running the
>>>>>>>> queries
>>>>>>>>>>> will
>>>>>>>>>>>>>> fail.
>>>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>>>>>>>>> caller in such case can have their own
>>>>>> configurable
>>>>>>>>>>> retries to
>>>>>>>>>>>>>>>>>> check
>>>>>>>>>>>>>>>>>>>> again
>>>>>>>>>>>>>>>>>>>>>> or try the replica if a call fails to
>>>>> active13.
>>>>>> I
>>>>>>>>> think
>>>>>>>>>>> KIP-216
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>> working
>>>>>>>>>>>>>>>>>>>>>> on those lines, we might not need few of those
>>>>>>>>> exceptions
>>>>>>>>>>> since
>>>>>>>>>>>>>>> now
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> basic idea of this KIP is to support IQ during
>>>>>>>>>>> rebalancing.14.
>>>>>>>>>>>>>>>>>>>> Addressed
>>>>>>>>>>>>>>>>>>>>>> above, agreed it looks more readable.
>>>>>>>>>>>>>>>>>>>>>>     On Tuesday, 22 October, 2019, 08:39:07 pm
>>>>>> IST,
>>>>>>>>>>> Matthias J.
>>>>>>>>>>>>>> Sax
>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>> matth...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>>>>>   One more thought:
>>>>>>>>>>>>>>>>>>>>>> 14) Is specifying the allowed lag in number of
>>>>>>>>> records a
>>>>>>>>>>> useful
>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> users to declare how stale an instance is
>>>>>> allowed to
>>>>>>>>> be?
>>>>>>>>>>> Would
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> more intuitive for users to specify the
>>>>> allowed
>>>>>> lag
>>>>>>>> in
>>>>>>>>>>> time
>>>>>>>>>>>>>> units
>>>>>>>>>>>>>>>>>>>> (would
>>>>>>>>>>>>>>>>>>>>>> event time or processing time be better)? It
>>>>>> seems
>>>>>>>>> hard
>>>>>>>>>>> for
>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> reason how "fresh" a store really is when
>>>>>> number of
>>>>>>>>>>> records is
>>>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>>>>>>>>> Some more follow up thoughts:
>>>>>>>>>>>>>>>>>>>>>>> 11) If we get a store handle of an
>>>>>>>> active(restoring)
>>>>>>>>>>> task, and
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> task
>>>>>>>>>>>>>>>>>>>>>>> transits to running, does the store handle
>>>>>> become
>>>>>>>>>>> invalid and a
>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>> must be retrieved? Or can we "switch it out"
>>>>>>>>> underneath
>>>>>>>>>>> -- for
>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> case, how does the user know when they start
>>>>> to
>>>>>>>>> query the
>>>>>>>>>>>>>>>>>>> up-to-date
>>>>>>>>>>>>>>>>>>>>>> state?
>>>>>>>>>>>>>>>>>>>>>>> 12) Standby tasks will have the store open in
>>>>>>>> regular
>>>>>>>>>>> mode,
>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>>>> active(restoring) tasks open stores in
>>>>> "upgrade
>>>>>>>> mode"
>>>>>>>>>>> for more
>>>>>>>>>>>>>>>>>>>> efficient
>>>>>>>>>>>>>>>>>>>>>>> bulk loading. When we switch the store into
>>>>>> active
>>>>>>>>> mode,
>>>>>>>>>>> we
>>>>>>>>>>>>>> close
>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>> reopen it. What is the impact if we query the
>>>>>> store
>>>>>>>>>>> during
>>>>>>>>>>>>>>>>>> restore?
>>>>>>>>>>>>>>>>>>>> What
>>>>>>>>>>>>>>>>>>>>>>> is the impact if we close the store to
>>>>> transit
>>>>>> to
>>>>>>>>>>> running (eg,
>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>> might be open iterators)?
>>>>>>>>>>>>>>>>>>>>>>> 13) Do we need to introduced new exception
>>>>>> types?
>>>>>>>>> Compare
>>>>>>>>>>>>>> KIP-216
>>>>>>>>>>>>>>>>>>>>>>> (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
>>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>> that aims to improve the user experience with
>>>>>>>> regard
>>>>>>>>> to
>>>>>>>>>>> IQ
>>>>>>>>>>>>>>>>>>>> exceptions.
>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>>>>>>>>> Couple of comments:
>>>>>>>>>>>>>>>>>>>>>>>> 1) With regard to KIP-441, my current
>>>>>>>> understanding
>>>>>>>>> is
>>>>>>>>>>> that
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>>>>>>>>> information is only reported to the leader
>>>>>> (please
>>>>>>>>>>> correct me
>>>>>>>>>>>>>>>>>> if I
>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>>>> wrong). This seems to be quite a limitation
>>>>> to
>>>>>>>>> actually
>>>>>>>>>>> use
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>>>>>>>>> information.
>>>>>>>>>>>>>>>>>>>>>>>> 2) The idea of the metadata API is actually
>>>>>> to get
>>>>>>>>>>> metadata
>>>>>>>>>>>>>> once
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>> only refresh the metadata if a store was
>>>>>> migrated.
>>>>>>>>> The
>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> proposal
>>>>>>>>>>>>>>>>>>>>>>>> would require to get the metadata before
>>>>> each
>>>>>>>> query.
>>>>>>>>>>> The KIP
>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>> describe the usage pattern and impact in
>>>>> more
>>>>>>>>> detail.
>>>>>>>>>>>>>>>>>>>>>>>> 3) Currently, the KIP does not list the
>>>>>> public API
>>>>>>>>>>> changes in
>>>>>>>>>>>>>>>>>>>> detail.
>>>>>>>>>>>>>>>>>>>>>>>> Please list all methods you intend to
>>>>>> deprecate
>>>>>>>> and
>>>>>>>>>>> list all
>>>>>>>>>>>>>>>>>>>> methods you
>>>>>>>>>>>>>>>>>>>>>>>> intend to add (best, using a code-block
>>>>>> markup --
>>>>>>>>>>> compare
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
>>>>>>>>>>>>>>>>>>>>>>>> as an example)
>>>>>>>>>>>>>>>>>>>>>>>> 4) Also note (as already pointed out by
>>>>> John),
>>>>>>>> that
>>>>>>>>> we
>>>>>>>>>>> cannot
>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>> breaking API changes. Thus, the API should
>>>>> be
>>>>>>>>> designed
>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>>>>>>>>>>> backward compatible manner.
>>>>>>>>>>>>>>>>>>>>>>>> 5) Returning a list of metadata object makes
>>>>>> it
>>>>>>>> hard
>>>>>>>>>>> for user
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> know if
>>>>>>>>>>>>>>>>>>>>>>>> the first object refers to the
>>>>>> active(processing),
>>>>>>>>>>>>>>>>>>>> active(restoring), or
>>>>>>>>>>>>>>>>>>>>>>>> a standby task. IMHO, we should be more
>>>>>> explicit.
>>>>>>>>> For
>>>>>>>>>>>>>> example, a
>>>>>>>>>>>>>>>>>>>>>>>> metadata object could have a flag that one
>>>>> can
>>>>>>>> test
>>>>>>>>> via
>>>>>>>>>>>>>>>>>>>> `#isActive()`.
>>>>>>>>>>>>>>>>>>>>>>>> Or maybe even better, we could keep the
>>>>>> current
>>>>>>>> API
>>>>>>>>>>> as-is and
>>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>>>>>> something like `standbyMetadataForKey()`
>>>>> (and
>>>>>>>>> similar
>>>>>>>>>>> methods
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>> other). Having just a flag `isActive()` is a
>>>>>>>> little
>>>>>>>>>>> subtle and
>>>>>>>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>>>>>>>>>> new overloads would make the API much
>>>>> clearer
>>>>>>>>> (passing
>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>> boolean
>>>>>>>>>>>>>>>>>>>> flag
>>>>>>>>>>>>>>>>>>>>>>>> does not seem to be a nice API).
>>>>>>>>>>>>>>>>>>>>>>>> 6) Do you intent to return all standby
>>>>>> metadata
>>>>>>>>>>> information at
>>>>>>>>>>>>>>>>>>> once,
>>>>>>>>>>>>>>>>>>>>>>>> similar to `allMetadata()` -- seems to be
>>>>>> useful.
>>>>>>>>>>>>>>>>>>>>>>>> 7) Even if the lag information is propagated
>>>>>> to
>>>>>>>> all
>>>>>>>>>>> instances,
>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>> happen in an async manner. Hence, I am
>>>>>> wondering
>>>>>>>> if
>>>>>>>>> we
>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>> address
>>>>>>>>>>>>>>>>>>>>>>>> this race condition (I think we should). The
>>>>>> idea
>>>>>>>>> would
>>>>>>>>>>> be to
>>>>>>>>>>>>>>>>>>> check
>>>>>>>>>>>>>>>>>>>> if a
>>>>>>>>>>>>>>>>>>>>>>>> standby/active(restoring) task is actually
>>>>>> still
>>>>>>>>> within
>>>>>>>>>>> the
>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>>>>> bounds
>>>>>>>>>>>>>>>>>>>>>>>> when a query is executed and we would throw
>>>>> an
>>>>>>>>>>> exception if
>>>>>>>>>>>>>> not.
>>>>>>>>>>>>>>>>>>>>>>>> 8) The current `KafkaStreams#state()` method
>>>>>> only
>>>>>>>>>>> returns a
>>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> stores of active(processing) tasks. How can
>>>>> a
>>>>>> user
>>>>>>>>>>> actually
>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>>>>>>> to an store of an active(restoring) or
>>>>> standby
>>>>>>>> task
>>>>>>>>> for
>>>>>>>>>>>>>>>>>> querying?
>>>>>>>>>>>>>>>>>>>> Seems
>>>>>>>>>>>>>>>>>>>>>>>> we should add a new method to get standby
>>>>>> handles?
>>>>>>>>>>> Changing
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> semantics to existing `state()` would be
>>>>>> possible,
>>>>>>>>> but
>>>>>>>>>>> I think
>>>>>>>>>>>>>>>>>>>> adding a
>>>>>>>>>>>>>>>>>>>>>>>> new method is preferable?
>>>>>>>>>>>>>>>>>>>>>>>> 9) How does the user actually specify the
>>>>>>>> acceptable
>>>>>>>>>>> lag? A
>>>>>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>>>>>> config via StreamsConfig (this would be a
>>>>>> public
>>>>>>>> API
>>>>>>>>>>> change
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>>>>>>>>> to be covered in the KIP)? Or on a per-store
>>>>>> or
>>>>>>>> even
>>>>>>>>>>> per-query
>>>>>>>>>>>>>>>>>>>> basis for
>>>>>>>>>>>>>>>>>>>>>>>> more flexibility? We could also have a
>>>>> global
>>>>>>>>> setting
>>>>>>>>>>> that is
>>>>>>>>>>>>>>>>>> used
>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>> default and allow to overwrite it on a
>>>>>> per-query
>>>>>>>>> basis.
>>>>>>>>>>>>>>>>>>>>>>>> 10) Do we need to distinguish between
>>>>>>>>> active(restoring)
>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> standby
>>>>>>>>>>>>>>>>>>>>>>>> tasks? Or could be treat both as the same?
>>>>>>>>>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>>>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering, rather than putting
>>>>>> "acceptable
>>>>>>>>> lag"
>>>>>>>>>>> into
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> configuration at all, or even making it a
>>>>>>>>> parameter on
>>>>>>>>>>>>>>>>>>>>>> `allMetadataForKey`,
>>>>>>>>>>>>>>>>>>>>>>>>> why not just _always_ return all available
>>>>>>>> metadata
>>>>>>>>>>>>>> (including
>>>>>>>>>>>>>>>>>>>>>>>>> active/standby or lag) and let the caller
>>>>>> decide
>>>>>>>> to
>>>>>>>>>>> which
>>>>>>>>>>>>>> node
>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>>>>>>>>>> route the query?
>>>>>>>>>>>>>>>>>>>>>>>>> +1 on exposing lag information via the
>>>>> APIs.
>>>>>> IMO
>>>>>>>>>>> without
>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>>>>>>>>>>> continuously updated/fresh lag information,
>>>>>> its
>>>>>>>>> true
>>>>>>>>>>> value
>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>>> signal
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>> query routing decisions is much limited.
>>>>> But
>>>>>> we
>>>>>>>> can
>>>>>>>>>>> design
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>> around
>>>>>>>>>>>>>>>>>>>>>>>>> this model and iterate? Longer term, we
>>>>>> should
>>>>>>>> have
>>>>>>>>>>>>>>>>>> continuously
>>>>>>>>>>>>>>>>>>>>>> shared lag
>>>>>>>>>>>>>>>>>>>>>>>>> information.
>>>>>>>>>>>>>>>>>>>>>>>>>>> more general to refactor it to
>>>>>>>>>>> "allMetadataForKey(long
>>>>>>>>>>>>>>>>>>>>>>>>> tolerableDataStaleness, ...)", and when
>>>>> it's
>>>>>> set
>>>>>>>>> to 0
>>>>>>>>>>> it
>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>>>>>> "active
>>>>>>>>>>>>>>>>>>>>>> task
>>>>>>>>>>>>>>>>>>>>>>>>> only".
>>>>>>>>>>>>>>>>>>>>>>>>> +1 IMO if we plan on having
>>>>>>>>> `enableReplicaServing`, it
>>>>>>>>>>> makes
>>>>>>>>>>>>>>>>>>> sense
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>> generalize based on dataStaleness. This
>>>>> seems
>>>>>>>>>>> complementary
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> exposing the
>>>>>>>>>>>>>>>>>>>>>>>>> lag information itself.
>>>>>>>>>>>>>>>>>>>>>>>>>>> This is actually not a public api change
>>>>> at
>>>>>>>> all,
>>>>>>>>> and
>>>>>>>>>>> I'm
>>>>>>>>>>>>>>>>>>>> planning to
>>>>>>>>>>>>>>>>>>>>>>>>> implement it asap as a precursor to the
>>>>> rest
>>>>>> of
>>>>>>>>> KIP-441
>>>>>>>>>>>>>>>>>>>>>>>>> +1 again. Do we have a concrete timeline
>>>>> for
>>>>>> when
>>>>>>>>> this
>>>>>>>>>>> change
>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>> land on
>>>>>>>>>>>>>>>>>>>>>>>>> master? I would like to get the
>>>>>> implementation
>>>>>>>>> wrapped
>>>>>>>>>>> up (as
>>>>>>>>>>>>>>>>>>> much
>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>>>>> possible) by end of the month. :). But I
>>>>>> agree
>>>>>>>> this
>>>>>>>>>>>>>> sequencing
>>>>>>>>>>>>>>>>>>>> makes
>>>>>>>>>>>>>>>>>>>>>>>>> sense..
>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang
>>>>>> Wang <
>>>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Navinder,
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP, I have a high level
>>>>>> question
>>>>>>>>>>> about the
>>>>>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>>>>> regarding:
>>>>>> "StreamsMetadataState::allMetadataForKey(boolean
>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing...)"
>>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering if it's more general to
>>>>>> refactor
>>>>>>>> it
>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>> "allMetadataForKey(long
>>>>>> tolerableDataStaleness,
>>>>>>>>>>> ...)", and
>>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>>>>>>> set to
>>>>>>>>>>>>>>>>>>>>>>>>>> 0 it means "active task only". Behind the
>>>>>> scene,
>>>>>>>>> we
>>>>>>>>>>> can have
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> committed
>>>>>>>>>>>>>>>>>>>>>>>>>> offsets to encode the stream time as well,
>>>>>> so
>>>>>>>> that
>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>> standby
>>>>>>>>>>>>>>>>>>>>>>>>>> tasks the stream process knows not long
>>>>> the
>>>>>> lag
>>>>>>>> in
>>>>>>>>>>> terms of
>>>>>>>>>>>>>>>>>>>> offsets
>>>>>>>>>>>>>>>>>>>>>>>>>> comparing to the committed offset
>>>>>> (internally we
>>>>>>>>> call
>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>>>>>>>>> limit), but
>>>>>>>>>>>>>>>>>>>>>>>>>> also the lag in terms of timestamp diff
>>>>>>>> comparing
>>>>>>>>> the
>>>>>>>>>>>>>>>>>> committed
>>>>>>>>>>>>>>>>>>>>>> offset.
>>>>>>>>>>>>>>>>>>>>>>>>>> Also encoding the timestamp as part of
>>>>>> offset
>>>>>>>> have
>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>> benefits
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>>>>> improving Kafka Streams time semantics as
>>>>>> well,
>>>>>>>>> but
>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>> KIP-535
>>>>>>>>>>>>>>>>>>>>>> itself I
>>>>>>>>>>>>>>>>>>>>>>>>>> think it can help giving users a more
>>>>>> intuitive
>>>>>>>>>>> interface to
>>>>>>>>>>>>>>>>>>>> reason
>>>>>>>>>>>>>>>>>>>>>> about.
>>>>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John
>>>>>> Roesler <
>>>>>>>>>>>>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hey Navinder,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP! I've been reading
>>>>> over
>>>>>> the
>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>>>> thus
>>>>>>>>>>>>>>>>>>>> far,
>>>>>>>>>>>>>>>>>>>>>>>>>>> and I have a couple of thoughts to pile
>>>>> on
>>>>>> as
>>>>>>>>> well:
>>>>>>>>>>>>>>>>>>>>>>>>>>> It seems confusing to propose the API in
>>>>>> terms
>>>>>>>>> of the
>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> system
>>>>>>>>>>>>>>>>>>>>>>>>>>> state, but also propose how the API would
>>>>>> look
>>>>>>>>>>> if/when
>>>>>>>>>>>>>>>>>> KIP-441
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>> implemented. It occurs to me that the
>>>>> only
>>>>>> part
>>>>>>>>> of
>>>>>>>>>>> KIP-441
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>> affect you is the availability of the lag
>>>>>>>>>>> information in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> SubscriptionInfo message. This is
>>>>> actually
>>>>>> not
>>>>>>>> a
>>>>>>>>>>> public api
>>>>>>>>>>>>>>>>>>>> change at
>>>>>>>>>>>>>>>>>>>>>>>>>>> all, and I'm planning to implement it
>>>>> asap
>>>>>> as a
>>>>>>>>>>> precursor
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> rest
>>>>>>>>>>>>>>>>>>>>>>>>>>> of KIP-441, so maybe you can just build
>>>>> on
>>>>>> top
>>>>>>>> of
>>>>>>>>>>> KIP-441
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> assume
>>>>>>>>>>>>>>>>>>>>>>>>>>> the lag information will be available.
>>>>>> Then you
>>>>>>>>>>> could have
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>> straightforward proposal (e.g., mention
>>>>>> that
>>>>>>>>> you'd
>>>>>>>>>>> return
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>>>>>>>>>>>> information in AssignmentInfo as well as
>>>>>> in the
>>>>>>>>>>>>>>>>>> StreamsMetadata
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>> some form, or make use of it in the API
>>>>>>>> somehow).
>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm partially motivated in that former
>>>>>> point
>>>>>>>>> because
>>>>>>>>>>> it
>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>> understanding how callers would bound the
>>>>>>>>> staleness
>>>>>>>>>>> for
>>>>>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>> use
>>>>>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>>>>>>>>> is _the_ key point for this KIP. FWIW, I
>>>>>> think
>>>>>>>>> that
>>>>>>>>>>> adding
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>>>> method to StreamsMetadataState and
>>>>>> deprecating
>>>>>>>>> the
>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>> method is
>>>>>>>>>>>>>>>>>>>>>>>>>>> the best way to go; we just can't change
>>>>>> the
>>>>>>>>> return
>>>>>>>>>>> types
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>>>>>> existing methods.
>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm wondering, rather than putting
>>>>>> "acceptable
>>>>>>>>> lag"
>>>>>>>>>>> into
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> configuration at all, or even making it a
>>>>>>>>> parameter
>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>> `allMetadataForKey`, why not just
>>>>> _always_
>>>>>>>>> return all
>>>>>>>>>>>>>>>>>> available
>>>>>>>>>>>>>>>>>>>>>>>>>>> metadata (including active/standby or
>>>>> lag)
>>>>>> and
>>>>>>>>> let
>>>>>>>>>>> the
>>>>>>>>>>>>>> caller
>>>>>>>>>>>>>>>>>>>> decide
>>>>>>>>>>>>>>>>>>>>>>>>>>> to which node they want to route the
>>>>> query?
>>>>>>>> This
>>>>>>>>>>> method
>>>>>>>>>>>>>> isn't
>>>>>>>>>>>>>>>>>>>> making
>>>>>>>>>>>>>>>>>>>>>>>>>>> any queries itself; it's merely telling
>>>>> you
>>>>>>>> where
>>>>>>>>>>> the local
>>>>>>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>>>>>>>>>>> instance _thinks_ the key in question is
>>>>>>>> located.
>>>>>>>>>>> Just
>>>>>>>>>>>>>>>>>>> returning
>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>>>> available information lets the caller
>>>>>> implement
>>>>>>>>> any
>>>>>>>>>>>>>> semantics
>>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>>>>>>>> desire around querying only active
>>>>> stores,
>>>>>> or
>>>>>>>>>>> standbys, or
>>>>>>>>>>>>>>>>>>>> recovering
>>>>>>>>>>>>>>>>>>>>>>>>>>> stores, or whatever.
>>>>>>>>>>>>>>>>>>>>>>>>>>> One fly in the ointment, which you may
>>>>>> wish to
>>>>>>>>>>> consider if
>>>>>>>>>>>>>>>>>>>> proposing
>>>>>>>>>>>>>>>>>>>>>>>>>>> to use lag information, is that the
>>>>> cluster
>>>>>>>> would
>>>>>>>>>>> only
>>>>>>>>>>>>>> become
>>>>>>>>>>>>>>>>>>>> aware
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>> new lag information during rebalances.
>>>>>> Even in
>>>>>>>>> the
>>>>>>>>>>> full
>>>>>>>>>>>>>>>>>>>> expression of
>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441, this information would stop
>>>>> being
>>>>>>>>>>> propagated when
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> cluster
>>>>>>>>>>>>>>>>>>>>>>>>>>> achieves a balanced task distribution.
>>>>>> There
>>>>>>>> was
>>>>>>>>> a
>>>>>>>>>>>>>> follow-on
>>>>>>>>>>>>>>>>>>>> idea I
>>>>>>>>>>>>>>>>>>>>>>>>>>> POCed to continuously share lag
>>>>>> information in
>>>>>>>>> the
>>>>>>>>>>>>>> heartbeat
>>>>>>>>>>>>>>>>>>>>>> protocol,
>>>>>>>>>>>>>>>>>>>>>>>>>>> which you might be interested in, if you
>>>>>> want
>>>>>>>> to
>>>>>>>>>>> make sure
>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> nodes
>>>>>>>>>>>>>>>>>>>>>>>>>>> are basically _always_ aware of each
>>>>>> others'
>>>>>>>> lag
>>>>>>>>> on
>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions:
>>>>>>>>>>> https://github.com/apache/kafka/pull/7096
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks again!
>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder
>>>>>> Brar
>>>>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, Vinoth. Looks like we are on the
>>>>>> same
>>>>>>>>> page.
>>>>>>>>>>> I will
>>>>>>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>>>>>> these explanations to the KIP as well.
>>>>> Have
>>>>>>>>> assigned
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> KAFKA-6144
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> myself and KAFKA-8994 is closed(by you).
>>>>> As
>>>>>>>>>>> suggested, we
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>> replace
>>>>>>>>>>>>>>>>>>>>>>>>>>> "replica" with "standby".
>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the new API,
>>>>>>>>>>>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean
>>>>>>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing, String storeName, K
>>>>>> key,
>>>>>>>>>>>>>> Serializer<K>
>>>>>>>>>>>>>>>>>>>>>>>>>>> keySerializer)" Do we really need a per
>>>>> key
>>>>>>>>>>> configuration?
>>>>>>>>>>>>>>>>>> or a
>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsConfig is good enough?>> Coming
>>>>> from
>>>>>>>>>>> experience,
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>>> teams
>>>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>> building a platform with Kafka Streams
>>>>> and
>>>>>>>> these
>>>>>>>>>>> API's
>>>>>>>>>>>>>> serve
>>>>>>>>>>>>>>>>>>>> data to
>>>>>>>>>>>>>>>>>>>>>>>>>>> multiple teams, we can't have a
>>>>> generalized
>>>>>>>>> config
>>>>>>>>>>> that
>>>>>>>>>>>>>> says
>>>>>>>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>>>>>>>>> platform
>>>>>>>>>>>>>>>>>>>>>>>>>>> we will support stale reads or not. It
>>>>>> should
>>>>>>>> be
>>>>>>>>> the
>>>>>>>>>>> choice
>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> someone
>>>>>>>>>>>>>>>>>>>>>>>>>> who
>>>>>>>>>>>>>>>>>>>>>>>>>>> is calling the API's to choose whether
>>>>>> they are
>>>>>>>>> ok
>>>>>>>>>>> with
>>>>>>>>>>>>>> stale
>>>>>>>>>>>>>>>>>>>> reads
>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>> not.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Makes sense?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>     On Thursday, 17 October, 2019,
>>>>>> 11:56:02 pm
>>>>>>>>> IST,
>>>>>>>>>>> Vinoth
>>>>>>>>>>>>>>>>>>>> Chandar <
>>>>>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>   Looks like we are covering ground :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Only if it is within a permissible
>>>>>>>> range(say
>>>>>>>>>>> 10000) we
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> serve
>>>>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Restoring state of active.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> +1 on having a knob like this.. My
>>>>>> reasoning
>>>>>>>> is
>>>>>>>>> as
>>>>>>>>>>>>>> follows.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Looking at the Streams state as a
>>>>>> read-only
>>>>>>>>>>> distributed kv
>>>>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>>>>>> With
>>>>>>>>>>>>>>>>>>>>>>>>>>>> num_standby = f , we should be able to
>>>>>>>> tolerate
>>>>>>>>> f
>>>>>>>>>>> failures
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>> a f+1' failure, the system should be
>>>>>>>>> unavailable.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> A) So with num_standby=0, the system
>>>>>> should be
>>>>>>>>>>> unavailable
>>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1 failure and thats my argument for not
>>>>>>>> allowing
>>>>>>>>>>> querying
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>> restoration
>>>>>>>>>>>>>>>>>>>>>>>>>>>> state, esp in this case it will be a
>>>>> total
>>>>>>>>> rebuild
>>>>>>>>>>> of the
>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>>>>>>>>>>>>> IMO
>>>>>>>>>>>>>>>>>>>>>>>>>>>> cannot be considered a normal fault free
>>>>>>>>> operational
>>>>>>>>>>>>>> state).
>>>>>>>>>>>>>>>>>>>>>>>>>>>> B) Even there are standby's, say
>>>>>>>> num_standby=2,
>>>>>>>>> if
>>>>>>>>>>> the
>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>> decides
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> shut
>>>>>>>>>>>>>>>>>>>>>>>>>>>> down all 3 instances, then only outcome
>>>>>> should
>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> unavailability
>>>>>>>>>>>>>>>>>>>>>> until
>>>>>>>>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>>>>>>>>>> of them come back or state is rebuilt on
>>>>>> other
>>>>>>>>>>> nodes in
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> cluster. In
>>>>>>>>>>>>>>>>>>>>>>>>>>>> normal operations, f <= 2 and when a
>>>>>> failure
>>>>>>>>> does
>>>>>>>>>>> happen
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>>>>>>> either
>>>>>>>>>>>>>>>>>>>>>>>>>>>> choose to be C over A and fail IQs until
>>>>>>>>>>> replication is
>>>>>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>>>>>>>>>> caught up
>>>>>>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>>> choose A over C by serving in restoring
>>>>>> state
>>>>>>>> as
>>>>>>>>>>> long as
>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>> minimal.
>>>>>>>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>>>> even with f=1 say, all the standbys are
>>>>>>>> lagging
>>>>>>>>> a
>>>>>>>>>>> lot due
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>> issue,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> then that should be considered a failure
>>>>>> since
>>>>>>>>> that
>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>> normal/expected operational mode.
>>>>> Serving
>>>>>>>> reads
>>>>>>>>> with
>>>>>>>>>>>>>>>>>> unbounded
>>>>>>>>>>>>>>>>>>>>>>>>>>> replication
>>>>>>>>>>>>>>>>>>>>>>>>>>>> lag and calling it "available" may not
>>>>> be
>>>>>> very
>>>>>>>>>>> usable or
>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>> desirable
>>>>>>>>>>>>>>>>>>>>>>>>>>> :)
>>>>>>>>>>>>>>>>>>>>>>>>>>>> IMHO, since it gives the user no way to
>>>>>> reason
>>>>>>>>>>> about the
>>>>>>>>>>>>>> app
>>>>>>>>>>>>>>>>>>>> that is
>>>>>>>>>>>>>>>>>>>>>>>>>>> going
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to query this store.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> So there is definitely a need to
>>>>>> distinguish
>>>>>>>>>>> between :
>>>>>>>>>>>>>>>>>>>> Replication
>>>>>>>>>>>>>>>>>>>>>>>>>>> catchup
>>>>>>>>>>>>>>>>>>>>>>>>>>>> while being in fault free state vs
>>>>>> Restoration
>>>>>>>>> of
>>>>>>>>>>> state
>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> lose
>>>>>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>>> than f standbys. This knob is a great
>>>>>> starting
>>>>>>>>> point
>>>>>>>>>>>>>> towards
>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> If you agree with some of the
>>>>> explanation
>>>>>>>> above,
>>>>>>>>>>> please
>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>>>>>>> free to
>>>>>>>>>>>>>>>>>>>>>>>>>>>> include it in the KIP as well since this
>>>>>> is
>>>>>>>>> sort of
>>>>>>>>>>> our
>>>>>>>>>>>>>>>>>> design
>>>>>>>>>>>>>>>>>>>>>>>>>> principle
>>>>>>>>>>>>>>>>>>>>>>>>>>>> here..
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Small nits :
>>>>>>>>>>>>>>>>>>>>>>>>>>>> - let's standardize on "standby" instead
>>>>>> of
>>>>>>>>>>> "replica", KIP
>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>> code,  to
>>>>>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistent with rest of Streams
>>>>> code/docs?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Can we merge KAFKA-8994 into
>>>>> KAFKA-6144
>>>>>> now
>>>>>>>>> and
>>>>>>>>>>> close
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> former?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Eventually need to consolidate
>>>>> KAFKA-6555
>>>>>> as
>>>>>>>>> well
>>>>>>>>>>>>>>>>>>>>>>>>>>>> - In the new API,
>>>>>>>>>>>>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean
>>>>>>>>>>>>>>>>>>>>>>>>>>>> enableReplicaServing, String storeName,
>>>>> K
>>>>>> key,
>>>>>>>>>>>>>> Serializer<K>
>>>>>>>>>>>>>>>>>>>>>>>>>>> keySerializer)" Do
>>>>>>>>>>>>>>>>>>>>>>>>>>>> we really need a per key configuration?
>>>>>> or a
>>>>>>>> new
>>>>>>>>>>>>>>>>>> StreamsConfig
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder
>>>>>> Brar
>>>>>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Vinoth, I have incorporated a few of
>>>>> the
>>>>>>>>>>> discussions we
>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>> had
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the current code, t0 and t1 serve
>>>>>> queries
>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>> Active(Running)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partition. For case t2, we are planning
>>>>>> to
>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>>>>>> List<StreamsMetadata>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> such that it returns
>>>>> <StreamsMetadata(A),
>>>>>>>>>>>>>>>>>> StreamsMetadata(B)>
>>>>>>>>>>>>>>>>>>>> so
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>> if IQ
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fails on A, the replica on B can serve
>>>>>> the
>>>>>>>>> data by
>>>>>>>>>>>>>> enabling
>>>>>>>>>>>>>>>>>>>> serving
>>>>>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replicas. This still does not solve
>>>>> case
>>>>>> t3
>>>>>>>>> and t4
>>>>>>>>>>> since
>>>>>>>>>>>>>> B
>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> promoted to active but it is in
>>>>> Restoring
>>>>>>>>> state to
>>>>>>>>>>>>>> catchup
>>>>>>>>>>>>>>>>>>>> till A’s
>>>>>>>>>>>>>>>>>>>>>>>>>>> last
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> committed position as we don’t serve
>>>>> from
>>>>>>>>>>> Restoring state
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> Active
>>>>>>>>>>>>>>>>>>>>>>>>>>> and new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Replica on R is building itself from
>>>>>> scratch.
>>>>>>>>> Both
>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>> cases
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> solved if we start serving from
>>>>> Restoring
>>>>>>>>> state of
>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>> well
>>>>>>>>>>>>>>>>>>>>>>>>>>> since it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is almost equivalent to previous
>>>>> Active.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There could be a case where all
>>>>> replicas
>>>>>> of a
>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>> become
>>>>>>>>>>>>>>>>>>>>>>>>>>> unavailable
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and active and all replicas of that
>>>>>> partition
>>>>>>>>> are
>>>>>>>>>>>>>> building
>>>>>>>>>>>>>>>>>>>>>> themselves
>>>>>>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scratch, in this case, the state in
>>>>>> Active is
>>>>>>>>> far
>>>>>>>>>>> behind
>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>> though
>>>>>>>>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in Restoring state. To cater to such
>>>>>> cases
>>>>>>>>> that we
>>>>>>>>>>> don’t
>>>>>>>>>>>>>>>>>>> serve
>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state we can either add another state
>>>>>> before
>>>>>>>>>>> Restoring or
>>>>>>>>>>>>>>>>>>>> check the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> difference between last committed
>>>>> offset
>>>>>> and
>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>> position.
>>>>>>>>>>>>>>>>>>>> Only
>>>>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is within a permissible range (say
>>>>>> 10000) we
>>>>>>>>> will
>>>>>>>>>>> serve
>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>> Restoring
>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state of Active.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     On Wednesday, 16 October, 2019,
>>>>>> 10:01:35
>>>>>>>> pm
>>>>>>>>> IST,
>>>>>>>>>>>>>> Vinoth
>>>>>>>>>>>>>>>>>>>> Chandar
>>>>>>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   Thanks for the updates on the KIP,
>>>>>> Navinder!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Few comments
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - AssignmentInfo is not public API?.
>>>>> But
>>>>>> we
>>>>>>>>> will
>>>>>>>>>>> change
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> thus
>>>>>>>>>>>>>>>>>>>>>>>>>>> need to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> increment the version and test for
>>>>>>>>> version_probing
>>>>>>>>>>> etc.
>>>>>>>>>>>>>>>>>> Good
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that from StreamsMetadata changes
>>>>> (which
>>>>>> is
>>>>>>>>> public
>>>>>>>>>>> API)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - From what I see, there is going to be
>>>>>>>> choice
>>>>>>>>>>> between
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   A) introducing a new
>>>>>>>>>>> *KafkaStreams::allMetadataForKey()
>>>>>>>>>>>>>>>>>> *API
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> potentially returns
>>>>> List<StreamsMetadata>
>>>>>>>>> ordered
>>>>>>>>>>> from
>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>>>>>>> upto
>>>>>>>>>>>>>>>>>>>>>> date
>>>>>>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> least upto date replicas. Today we
>>>>> cannot
>>>>>>>> fully
>>>>>>>>>>> implement
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>> ordering,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> since all we know is which hosts are
>>>>>> active
>>>>>>>> and
>>>>>>>>>>> which are
>>>>>>>>>>>>>>>>>>>> standbys.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, this aligns well with the
>>>>>> future.
>>>>>>>>> KIP-441
>>>>>>>>>>> adds
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the rebalancing protocol. We could
>>>>>> also
>>>>>>>> sort
>>>>>>>>>>> replicas
>>>>>>>>>>>>>>>>>>> based
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> report lags eventually. This is fully
>>>>>>>> backwards
>>>>>>>>>>>>>> compatible
>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> clients. Only drawback I see is the
>>>>>> naming of
>>>>>>>>> the
>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>> method
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KafkaStreams::metadataForKey, not
>>>>>> conveying
>>>>>>>> the
>>>>>>>>>>>>>> distinction
>>>>>>>>>>>>>>>>>>>> that it
>>>>>>>>>>>>>>>>>>>>>>>>>>> simply
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> returns the active replica i.e
>>>>>>>>>>> allMetadataForKey.get(0).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   B) Change
>>>>>> KafkaStreams::metadataForKey() to
>>>>>>>>>>> return a
>>>>>>>>>>>>>> List.
>>>>>>>>>>>>>>>>>>>> Its a
>>>>>>>>>>>>>>>>>>>>>>>>>>> breaking
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> change.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I prefer A, since none of the
>>>>>>>>> semantics/behavior
>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users. Love to hear more thoughts. Can
>>>>> we
>>>>>>>> also
>>>>>>>>>>> work this
>>>>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> KIP?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I already implemented A to unblock
>>>>>> myself for
>>>>>>>>> now.
>>>>>>>>>>> Seems
>>>>>>>>>>>>>>>>>>>> feasible
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>>>>> do.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth
>>>>>>>>> Chandar <
>>>>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I get your point. But suppose there
>>>>>> is a
>>>>>>>>>>> replica which
>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>>>> become
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> active, so in that case replica will
>>>>>> still
>>>>>>>> be
>>>>>>>>>>> building
>>>>>>>>>>>>>>>>>>> itself
>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> scratch
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and this active will go to restoring
>>>>>> state
>>>>>>>>> till it
>>>>>>>>>>>>>> catches
>>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> previous
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> active, wouldn't serving from a
>>>>>> restoring
>>>>>>>>> active
>>>>>>>>>>> make
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>> sense
>>>>>>>>>>>>>>>>>>>>>>>>>>> than a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replica in such case.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 will change this behavior such
>>>>>> that
>>>>>>>>>>> promotion to
>>>>>>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>>>>>>>>>>> happens
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> based on how caught up a replica is.
>>>>> So,
>>>>>>>> once
>>>>>>>>> we
>>>>>>>>>>> have
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> (work
>>>>>>>>>>>>>>>>>>>>>>>>>>> underway
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already for 2.5 IIUC) and user sets
>>>>>>>>>>>>>> num.standby.replicas >
>>>>>>>>>>>>>>>>>>> 0,
>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window should not be that
>>>>>> long as
>>>>>>>>> you
>>>>>>>>>>>>>> describe.
>>>>>>>>>>>>>>>>>>> IMO
>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wants
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> availability for state, then should
>>>>>>>> configure
>>>>>>>>>>>>>>>>>>>> num.standby.replicas
>>>>>>>>>>>>>>>>>>>>>>>>>>> 0.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not, then on a node loss, few
>>>>> partitions
>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>> unavailable
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (there are other ways to bring this
>>>>>> window
>>>>>>>>> down,
>>>>>>>>>>> which I
>>>>>>>>>>>>>>>>>>> won't
>>>>>>>>>>>>>>>>>>>>>>>>>> bring
>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here). We could argue for querying a
>>>>>>>> restoring
>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>> (say a
>>>>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>>>>>>>>> node
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> added
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to replace a faulty old node) based on
>>>>>> AP vs
>>>>>>>>> CP
>>>>>>>>>>>>>>>>>> principles.
>>>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading really really old values for
>>>>> the
>>>>>>>> sake
>>>>>>>>> of
>>>>>>>>>>>>>>>>>>> availability
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>> useful.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> No
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> AP data system would be inconsistent
>>>>> for
>>>>>>>> such
>>>>>>>>> a
>>>>>>>>>>> long
>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>> practice.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> So, I still feel just limiting this to
>>>>>>>> standby
>>>>>>>>>>> reads
>>>>>>>>>>>>>>>>>>> provides
>>>>>>>>>>>>>>>>>>>> best
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just my 2c. Would love to see what
>>>>>> others
>>>>>>>>> think
>>>>>>>>>>> as well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM
>>>>> Navinder
>>>>>>>> Brar
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Vinoth,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the feedback.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   Can we link the JIRA, discussion
>>>>>> thread
>>>>>>>>> also to
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> KIP.>>
>>>>>>>>>>>>>>>>>>>>>> Added.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Based on the discussion on
>>>>> KAFKA-6144,
>>>>>> I
>>>>>>>> was
>>>>>>>>>>> under the
>>>>>>>>>>>>>>>>>>>> impression
>>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this KIP is also going to cover
>>>>>> exposing of
>>>>>>>>> the
>>>>>>>>>>> standby
>>>>>>>>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsMetadata and thus subsume
>>>>>>>> KAFKA-8994 .
>>>>>>>>>>> That
>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API change?>> Sure, I can add changes
>>>>>> for
>>>>>>>>> 8994
>>>>>>>>>>> in this
>>>>>>>>>>>>>>>>>> KIP
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> link
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   KIP seems to be focussing on
>>>>>> restoration
>>>>>>>>> when a
>>>>>>>>>>> new
>>>>>>>>>>>>>> node
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>> added.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 is underway and has some
>>>>> major
>>>>>>>>> changes
>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be good to clarify dependencies if
>>>>> any.
>>>>>>>>> Without
>>>>>>>>>>>>>> KIP-441,
>>>>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> am not
>>>>>>>>>>>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we should allow reads from nodes
>>>>> in
>>>>>>>>> RESTORING
>>>>>>>>>>> state,
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> amount
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to many minutes/few hours of stale
>>>>>> reads?
>>>>>>>>> This
>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> querying standby replicas, which
>>>>> could
>>>>>> be
>>>>>>>>> mostly
>>>>>>>>>>> caught
>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window could be much
>>>>>>>>>>> smaller/tolerable. (once
>>>>>>>>>>>>>>>>>>>> again the
>>>>>>>>>>>>>>>>>>>>>>>>>>> focus
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But
>>>>>>>> suppose
>>>>>>>>>>> there is a
>>>>>>>>>>>>>>>>>>>> replica
>>>>>>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> just become active, so in that case
>>>>>> replica
>>>>>>>>> will
>>>>>>>>>>> still
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> building
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> itself
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from scratch and this active will go
>>>>> to
>>>>>>>>>>> restoring state
>>>>>>>>>>>>>>>>>>> till
>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>>>>> catches
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with previous active, wouldn't
>>>>> serving
>>>>>>>> from a
>>>>>>>>>>> restoring
>>>>>>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sense than a replica in such case.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Finally, we may need to introduce a
>>>>>>>>>>> configuration to
>>>>>>>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users may prefer errors to stale
>>>>> data.
>>>>>> Can
>>>>>>>> we
>>>>>>>>>>> also add
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP?>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Will add this.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Navinder
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth
>>>>> Chandar <
>>>>>>>>>>>>>> v...@confluent.io
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Navinder,>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few
>>>>>> thoughts>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Can we link the JIRA, discussion
>>>>>> thread
>>>>>>>>> also
>>>>>>>>>>> to the
>>>>>>>>>>>>>>>>>> KIP>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Based on the discussion on
>>>>>> KAFKA-6144, I
>>>>>>>>> was
>>>>>>>>>>> under
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>> impression
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this KIP is also going to cover
>>>>>> exposing
>>>>>>>> of
>>>>>>>>> the
>>>>>>>>>>>>>> standby
>>>>>>>>>>>>>>>>>>>>>>>>>>> information in>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamsMetadata and thus subsume
>>>>>>>> KAFKA-8994
>>>>>>>>> .
>>>>>>>>>>> That
>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> API change?>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - KIP seems to be focussing on
>>>>>> restoration
>>>>>>>>> when
>>>>>>>>>>> a new
>>>>>>>>>>>>>>>>>> node
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>>>>> added.>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-441 is underway and has some
>>>>> major
>>>>>>>>> changes
>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be good to clarify dependencies if
>>>>>> any.
>>>>>>>>> Without
>>>>>>>>>>>>>>>>>> KIP-441, I
>>>>>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sure>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if we should allow reads from nodes
>>>>> in
>>>>>>>>> RESTORING
>>>>>>>>>>>>>> state,
>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> amount>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to many minutes/few hours of stale
>>>>>> reads?
>>>>>>>>> This
>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fromallowing>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> querying standby replicas, which
>>>>>> could be
>>>>>>>>> mostly
>>>>>>>>>>>>>> caught
>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>>>>>> the>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> staleness window could be much
>>>>>>>>>>> smaller/tolerable.
>>>>>>>>>>>>>> (once
>>>>>>>>>>>>>>>>>>>> again
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> focus
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KAFKA-8994)>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Finally, we may need to introduce
>>>>> a
>>>>>>>>>>> configuration to
>>>>>>>>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Some>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> users may prefer errors to stale
>>>>>> data. Can
>>>>>>>>> we
>>>>>>>>>>> also add
>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> to the
>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP?>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Vinoth>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM
>>>>>> Navinder
>>>>>>>>> Brar>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Starting a discussion on the KIP to
>>>>>> Allow
>>>>>>>>> state
>>>>>>>>>>>>>> stores
>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> serve
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> stale>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reads during rebalance(>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ).>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks & Regards,Navinder>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LinkedIn>
>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> -- Guozhang

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to