>>I'm wondering, rather than putting "acceptable lag" into the configuration at all, or even making it a parameter on `allMetadataForKey`, why not just _always_ return all available metadata (including active/standby or lag) and let the caller decide to which node they want to route the query? +1 on exposing lag information via the APIs. IMO without having continuously updated/fresh lag information, its true value as a signal for query routing decisions is much limited. But we can design the API around this model and iterate? Longer term, we should have continuously shared lag information.
>>more general to refactor it to "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to 0 it means "active task only". +1 IMO if we plan on having `enableReplicaServing`, it makes sense to generalize based on dataStaleness. This seems complementary to exposing the lag information itself. >>This is actually not a public api change at all, and I'm planning to implement it asap as a precursor to the rest of KIP-441 +1 again. Do we have a concrete timeline for when this change will land on master? I would like to get the implementation wrapped up (as much as possible) by end of the month. :). But I agree this sequencing makes sense.. On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Navinder, > > Thanks for the KIP, I have a high level question about the proposed API > regarding: > > "StreamsMetadataState::allMetadataForKey(boolean enableReplicaServing...)" > > I'm wondering if it's more general to refactor it to > "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's set to > 0 it means "active task only". Behind the scene, we can have the committed > offsets to encode the stream time as well, so that when processing standby > tasks the stream process knows not long the lag in terms of offsets > comparing to the committed offset (internally we call it offset limit), but > also the lag in terms of timestamp diff comparing the committed offset. > > Also encoding the timestamp as part of offset have other benefits for > improving Kafka Streams time semantics as well, but for KIP-535 itself I > think it can help giving users a more intuitive interface to reason about. > > > Guozhang > > On Mon, Oct 21, 2019 at 12:30 PM John Roesler <j...@confluent.io> wrote: > > > Hey Navinder, > > > > Thanks for the KIP! I've been reading over the discussion thus far, > > and I have a couple of thoughts to pile on as well: > > > > It seems confusing to propose the API in terms of the current system > > state, but also propose how the API would look if/when KIP-441 is > > implemented. It occurs to me that the only part of KIP-441 that would > > affect you is the availability of the lag information in the > > SubscriptionInfo message. This is actually not a public api change at > > all, and I'm planning to implement it asap as a precursor to the rest > > of KIP-441, so maybe you can just build on top of KIP-441 and assume > > the lag information will be available. Then you could have a more > > straightforward proposal (e.g., mention that you'd return the lag > > information in AssignmentInfo as well as in the StreamsMetadata in > > some form, or make use of it in the API somehow). > > > > I'm partially motivated in that former point because it seems like > > understanding how callers would bound the staleness for their use case > > is _the_ key point for this KIP. FWIW, I think that adding a new > > method to StreamsMetadataState and deprecating the existing method is > > the best way to go; we just can't change the return types of any > > existing methods. > > > > I'm wondering, rather than putting "acceptable lag" into the > > configuration at all, or even making it a parameter on > > `allMetadataForKey`, why not just _always_ return all available > > metadata (including active/standby or lag) and let the caller decide > > to which node they want to route the query? This method isn't making > > any queries itself; it's merely telling you where the local Streams > > instance _thinks_ the key in question is located. Just returning all > > available information lets the caller implement any semantics they > > desire around querying only active stores, or standbys, or recovering > > stores, or whatever. > > > > One fly in the ointment, which you may wish to consider if proposing > > to use lag information, is that the cluster would only become aware of > > new lag information during rebalances. Even in the full expression of > > KIP-441, this information would stop being propagated when the cluster > > achieves a balanced task distribution. There was a follow-on idea I > > POCed to continuously share lag information in the heartbeat protocol, > > which you might be interested in, if you want to make sure that nodes > > are basically _always_ aware of each others' lag on different > > partitions: https://github.com/apache/kafka/pull/7096 > > > > Thanks again! > > -John > > > > > > On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar > > <navinder_b...@yahoo.com.invalid> wrote: > > > > > > Thanks, Vinoth. Looks like we are on the same page. I will add some of > > these explanations to the KIP as well. Have assigned the KAFKA-6144 to > > myself and KAFKA-8994 is closed(by you). As suggested, we will replace > > "replica" with "standby". > > > > > > In the new API, "StreamsMetadataState::allMetadataForKey(boolean > > enableReplicaServing, String storeName, K key, Serializer<K> > > keySerializer)" Do we really need a per key configuration? or a new > > StreamsConfig is good enough?>> Coming from experience, when teams are > > building a platform with Kafka Streams and these API's serve data to > > multiple teams, we can't have a generalized config that says as a > platform > > we will support stale reads or not. It should be the choice of someone > who > > is calling the API's to choose whether they are ok with stale reads or > not. > > Makes sense? > > > On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar < > > vchan...@confluent.io> wrote: > > > > > > Looks like we are covering ground :) > > > > > > >>Only if it is within a permissible range(say 10000) we will serve > from > > > Restoring state of active. > > > +1 on having a knob like this.. My reasoning is as follows. > > > > > > Looking at the Streams state as a read-only distributed kv store. With > > > num_standby = f , we should be able to tolerate f failures and if there > > is > > > a f+1' failure, the system should be unavailable. > > > > > > A) So with num_standby=0, the system should be unavailable even if > there > > is > > > 1 failure and thats my argument for not allowing querying in > restoration > > > state, esp in this case it will be a total rebuild of the state (which > > IMO > > > cannot be considered a normal fault free operational state). > > > > > > B) Even there are standby's, say num_standby=2, if the user decides to > > shut > > > down all 3 instances, then only outcome should be unavailability until > > all > > > of them come back or state is rebuilt on other nodes in the cluster. In > > > normal operations, f <= 2 and when a failure does happen we can then > > either > > > choose to be C over A and fail IQs until replication is fully caught up > > or > > > choose A over C by serving in restoring state as long as lag is > minimal. > > If > > > even with f=1 say, all the standbys are lagging a lot due to some > issue, > > > then that should be considered a failure since that is different from > > > normal/expected operational mode. Serving reads with unbounded > > replication > > > lag and calling it "available" may not be very usable or even desirable > > :) > > > IMHO, since it gives the user no way to reason about the app that is > > going > > > to query this store. > > > > > > So there is definitely a need to distinguish between : Replication > > catchup > > > while being in fault free state vs Restoration of state when we lose > more > > > than f standbys. This knob is a great starting point towards this. > > > > > > If you agree with some of the explanation above, please feel free to > > > include it in the KIP as well since this is sort of our design > principle > > > here.. > > > > > > Small nits : > > > > > > - let's standardize on "standby" instead of "replica", KIP or code, to > > be > > > consistent with rest of Streams code/docs? > > > - Can we merge KAFKA-8994 into KAFKA-6144 now and close the former? > > > Eventually need to consolidate KAFKA-6555 as well > > > - In the new API, "StreamsMetadataState::allMetadataForKey(boolean > > > enableReplicaServing, String storeName, K key, Serializer<K> > > keySerializer)" Do > > > we really need a per key configuration? or a new StreamsConfig is good > > > enough? > > > > > > On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar > > > <navinder_b...@yahoo.com.invalid> wrote: > > > > > > > @Vinoth, I have incorporated a few of the discussions we have had in > > the > > > > KIP. > > > > > > > > In the current code, t0 and t1 serve queries from Active(Running) > > > > partition. For case t2, we are planning to return > List<StreamsMetadata> > > > > such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that > > if IQ > > > > fails on A, the replica on B can serve the data by enabling serving > > from > > > > replicas. This still does not solve case t3 and t4 since B has been > > > > promoted to active but it is in Restoring state to catchup till A’s > > last > > > > committed position as we don’t serve from Restoring state in Active > > and new > > > > Replica on R is building itself from scratch. Both these cases can be > > > > solved if we start serving from Restoring state of active as well > > since it > > > > is almost equivalent to previous Active. > > > > > > > > There could be a case where all replicas of a partition become > > unavailable > > > > and active and all replicas of that partition are building themselves > > from > > > > scratch, in this case, the state in Active is far behind even though > > it is > > > > in Restoring state. To cater to such cases that we don’t serve from > > this > > > > state we can either add another state before Restoring or check the > > > > difference between last committed offset and current position. Only > if > > it > > > > is within a permissible range (say 10000) we will serve from > Restoring > > the > > > > state of Active. > > > > > > > > > > > > On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar < > > > > vchan...@confluent.io> wrote: > > > > > > > > Thanks for the updates on the KIP, Navinder! > > > > > > > > Few comments > > > > > > > > - AssignmentInfo is not public API?. But we will change it and thus > > need to > > > > increment the version and test for version_probing etc. Good to > > separate > > > > that from StreamsMetadata changes (which is public API) > > > > - From what I see, there is going to be choice between the following > > > > > > > > A) introducing a new *KafkaStreams::allMetadataForKey() *API that > > > > potentially returns List<StreamsMetadata> ordered from most upto date > > to > > > > least upto date replicas. Today we cannot fully implement this > > ordering, > > > > since all we know is which hosts are active and which are standbys. > > > > However, this aligns well with the future. KIP-441 adds the lag > > information > > > > to the rebalancing protocol. We could also sort replicas based on the > > > > report lags eventually. This is fully backwards compatible with > > existing > > > > clients. Only drawback I see is the naming of the existing method > > > > KafkaStreams::metadataForKey, not conveying the distinction that it > > simply > > > > returns the active replica i.e allMetadataForKey.get(0). > > > > B) Change KafkaStreams::metadataForKey() to return a List. Its a > > breaking > > > > change. > > > > > > > > I prefer A, since none of the semantics/behavior changes for existing > > > > users. Love to hear more thoughts. Can we also work this into the > KIP? > > > > I already implemented A to unblock myself for now. Seems feasible to > > do. > > > > > > > > > > > > On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar < > vchan...@confluent.io > > > > > > > wrote: > > > > > > > > > >>I get your point. But suppose there is a replica which has just > > become > > > > > active, so in that case replica will still be building itself from > > > > scratch > > > > > and this active will go to restoring state till it catches up with > > > > previous > > > > > active, wouldn't serving from a restoring active make more sense > > than a > > > > > replica in such case. > > > > > > > > > > KIP-441 will change this behavior such that promotion to active > > happens > > > > > based on how caught up a replica is. So, once we have that (work > > underway > > > > > already for 2.5 IIUC) and user sets num.standby.replicas > 0, then > > the > > > > > staleness window should not be that long as you describe. IMO if > user > > > > wants > > > > > availability for state, then should configure num.standby.replicas > > > > 0. > > > > If > > > > > not, then on a node loss, few partitions would be unavailable for a > > while > > > > > (there are other ways to bring this window down, which I won't > bring > > in > > > > > here). We could argue for querying a restoring active (say a new > node > > > > added > > > > > to replace a faulty old node) based on AP vs CP principles. But not > > sure > > > > > reading really really old values for the sake of availability is > > useful. > > > > No > > > > > AP data system would be inconsistent for such a long time in > > practice. > > > > > > > > > > So, I still feel just limiting this to standby reads provides best > > > > > semantics. > > > > > > > > > > Just my 2c. Would love to see what others think as well. > > > > > > > > > > On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar > > > > > <navinder_b...@yahoo.com.invalid> wrote: > > > > > > > > > >> Hi Vinoth, > > > > >> Thanks for the feedback. > > > > >> Can we link the JIRA, discussion thread also to the KIP.>> Added. > > > > >> Based on the discussion on KAFKA-6144, I was under the impression > > that > > > > >> this KIP is also going to cover exposing of the standby > information > > in > > > > >> StreamsMetadata and thus subsume KAFKA-8994 . That would require a > > > > public > > > > >> API change?>> Sure, I can add changes for 8994 in this KIP and > link > > > > >> KAFKA-6144 to KAFKA-8994 as well. > > > > >> KIP seems to be focussing on restoration when a new node is > added. > > > > >> KIP-441 is underway and has some major changes proposed for this. > It > > > > would > > > > >> be good to clarify dependencies if any. Without KIP-441, I am not > > very > > > > sure > > > > >> if we should allow reads from nodes in RESTORING state, which > could > > > > amount > > > > >> to many minutes/few hours of stale reads? This is different from > > > > allowing > > > > >> querying standby replicas, which could be mostly caught up and the > > > > >> staleness window could be much smaller/tolerable. (once again the > > focus > > > > on > > > > >> KAFKA-8994).>> I get your point. But suppose there is a replica > > which > > > > has > > > > >> just become active, so in that case replica will still be building > > > > itself > > > > >> from scratch and this active will go to restoring state till it > > catches > > > > up > > > > >> with previous active, wouldn't serving from a restoring active > make > > more > > > > >> sense than a replica in such case. > > > > >> > > > > >> Finally, we may need to introduce a configuration to control this. > > Some > > > > >> users may prefer errors to stale data. Can we also add it to the > > KIP?>> > > > > >> Will add this. > > > > >> > > > > >> Regards, > > > > >> Navinder > > > > >> > > > > >> > > > > >> On2019/10/14 16:56:49, Vinoth Chandar <v...@confluent.io>wrote: > > > > >> > > > > >> >Hi Navinder,> > > > > >> > > > > >> > > > > > >> > > > > >> >Thanks for sharing the KIP! Few thoughts> > > > > >> > > > > >> > > > > > >> > > > > >> >- Can we link the JIRA, discussion thread also to the KIP> > > > > >> > > > > >> >- Based on the discussion on KAFKA-6144, I was under the > impression > > > > >> that> > > > > >> > > > > >> >this KIP is also going to cover exposing of the standby > > information in> > > > > >> > > > > >> >StreamsMetadata and thus subsume KAFKA-8994 . That would require > a > > > > >> public> > > > > >> > > > > >> >API change?> > > > > >> > > > > >> >- KIP seems to be focussing on restoration when a new node is > > added.> > > > > >> > > > > >> >KIP-441 is underway and has some major changes proposed for this. > > It > > > > >> would> > > > > >> > > > > >> >be good to clarify dependencies if any. Without KIP-441, I am not > > very > > > > >> sure> > > > > >> > > > > >> >if we should allow reads from nodes in RESTORING state, which > could > > > > >> amount> > > > > >> > > > > >> >to many minutes/few hours of stale reads? This is different > > > > >> fromallowing> > > > > >> > > > > >> >querying standby replicas, which could be mostly caught up and > the> > > > > >> > > > > >> >staleness window could be much smaller/tolerable. (once again the > > focus > > > > >> on> > > > > >> > > > > >> >KAFKA-8994)> > > > > >> > > > > >> >- Finally, we may need to introduce a configuration to control > > this. > > > > >> Some> > > > > >> > > > > >> >users may prefer errors to stale data. Can we also add it to the > > KIP?> > > > > >> > > > > >> > > > > > >> > > > > >> >Thanks> > > > > >> > > > > >> >Vinoth> > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> > > > > > >> > > > > >> >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar> > > > > >> > > > > >> ><na...@yahoo.com.invalid>wrote:> > > > > >> > > > > >> > > > > > >> > > > > >> >> Hi,> > > > > >> > > > > >> >> Starting a discussion on the KIP to Allow state stores to serve > > > > stale> > > > > >> > > > > >> >> reads during rebalance(> > > > > >> > > > > >> >> > > > > >> > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance > > > > >> > > > > > >> > > > > >> >> ).> > > > > >> > > > > >> >> Thanks & Regards,Navinder> > > > > >> > > > > >> >> LinkedIn> > > > > >> > > > > >> >>> > > > > >> > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >