@Vinoth, I have incorporated a few of the discussions we have had in the KIP. 

In the current code, t0 and t1 serve queries from Active(Running) partition. 
For case t2, we are planning to return List<StreamsMetadata> such that it 
returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ fails on A, the 
replica on B can serve the data by enabling serving from replicas. This still 
does not solve case t3 and t4 since B has been promoted to active but it is in 
Restoring state to catchup till A’s last committed position as we don’t serve 
from Restoring state in Active and new Replica on R is building itself from 
scratch. Both these cases can be solved if we start serving from Restoring 
state of active as well since it is almost equivalent to previous Active.

There could be a case where all replicas of a partition become unavailable and 
active and all replicas of that partition are building themselves from scratch, 
in this case, the state in Active is far behind even though it is in Restoring 
state. To cater to such cases that we don’t serve from this state we can either 
add another state before Restoring or check the difference between last 
committed offset and current position. Only if it is within a permissible range 
(say 10000) we will serve from Restoring the state of Active.


    On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar 
<vchan...@confluent.io> wrote:  
 
 Thanks for the updates on the KIP, Navinder!

Few comments

- AssignmentInfo is not public API?. But we will change it and thus need to
increment the version and test for version_probing etc. Good to separate
that from StreamsMetadata changes (which is public API)
- From what I see, there is going to be choice between the following

  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
potentially returns List<StreamsMetadata> ordered from most upto date to
least upto date replicas. Today we cannot fully implement this ordering,
since all we know is which hosts are active and which are standbys.
However, this aligns well with the future. KIP-441 adds the lag information
to the rebalancing protocol. We could also sort replicas based on the
report lags eventually. This is fully backwards compatible with existing
clients. Only drawback I see is the naming of the existing method
KafkaStreams::metadataForKey, not conveying the distinction that it simply
returns the active replica i.e allMetadataForKey.get(0).
 B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking
change.

I prefer A, since none of the semantics/behavior changes for existing
users. Love to hear more thoughts. Can we also work this into the KIP?
I already implemented A to unblock myself for now. Seems feasible to do.


On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar <vchan...@confluent.io>
wrote:

> >>I get your point. But suppose there is a replica which has just become
> active, so in that case replica will still be building itself from scratch
> and this active will go to restoring state till it catches up with previous
> active, wouldn't serving from a restoring active make more sense than a
> replica in such case.
>
> KIP-441 will change this behavior such that promotion to active happens
> based on how caught up a replica is. So, once we have that (work underway
> already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
> staleness window should not be that long as you describe. IMO if user wants
> availability for state, then should configure num.standby.replicas > 0. If
> not, then on a node loss, few partitions would be unavailable for a while
> (there are other ways to bring this window down, which I won't bring in
> here). We could argue for querying a restoring active (say a new node added
> to replace a faulty old node) based on AP vs CP principles. But not sure
> reading really really old values for the sake of availability is useful. No
> AP data system would be inconsistent for such a long time in practice.
>
> So, I still feel just limiting this to standby reads provides best
> semantics.
>
> Just my 2c. Would love to see what others think as well.
>
> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
> <navinder_b...@yahoo.com.invalid> wrote:
>
>> Hi Vinoth,
>> Thanks for the feedback.
>>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
>> Based on the discussion on KAFKA-6144, I was under the impression that
>> this KIP is also going to cover exposing of the standby information in
>> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
>> API change?>> Sure, I can add changes for 8994 in this KIP and link
>> KAFKA-6144 to KAFKA-8994 as well.
>>  KIP seems to be focussing on restoration when a new node is added.
>> KIP-441 is underway and has some major changes proposed for this. It would
>> be good to clarify dependencies if any. Without KIP-441, I am not very sure
>> if we should allow reads from nodes in RESTORING state, which could amount
>> to many minutes/few hours of stale reads?  This is different from allowing
>> querying standby replicas, which could be mostly caught up and the
>> staleness window could be much smaller/tolerable. (once again the focus on
>> KAFKA-8994).>> I get your point. But suppose there is a replica which has
>> just become active, so in that case replica will still be building itself
>> from scratch and this active will go to restoring state till it catches up
>> with previous active, wouldn't serving from a restoring active make more
>> sense than a replica in such case.
>>
>> Finally, we may need to introduce a configuration to control this. Some
>> users may prefer errors to stale data. Can we also add it to the KIP?>>
>> Will add this.
>>
>> Regards,
>> Navinder
>>
>>
>> On2019/10/14 16:56:49, Vinoth Chandar <v...@confluent.io>wrote:
>>
>> >Hi Navinder,>
>>
>> >
>>
>> >Thanks for sharing the KIP! Few thoughts>
>>
>> >
>>
>> >- Can we link the JIRA, discussion thread also to the KIP>
>>
>> >- Based on the discussion on KAFKA-6144, I was under the impression
>> that>
>>
>> >this KIP is also going to cover exposing of the standby information in>
>>
>> >StreamsMetadata and thus subsume KAFKA-8994 . That would require a
>> public>
>>
>> >API change?>
>>
>> >- KIP seems to be focussing on restoration when a new node is added.>
>>
>> >KIP-441 is underway and has some major changes proposed for this. It
>> would>
>>
>> >be good to clarify dependencies if any. Without KIP-441, I am not very
>> sure>
>>
>> >if we should allow reads from nodes in RESTORING state, which could
>> amount>
>>
>> >to many minutes/few hours of stale reads?  This is different
>> fromallowing>
>>
>> >querying standby replicas, which could be mostly caught up and the>
>>
>> >staleness window could be much smaller/tolerable. (once again the focus
>> on>
>>
>> >KAFKA-8994)>
>>
>> >- Finally, we may need to introduce a configuration to control this.
>> Some>
>>
>> >users may prefer errors to stale data. Can we also add it to the KIP?>
>>
>> >
>>
>> >Thanks>
>>
>> >Vinoth>
>>
>> >
>>
>> >
>>
>> >
>>
>> >
>>
>> >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
>>
>> ><na...@yahoo.com.invalid>wrote:>
>>
>> >
>>
>> >> Hi,>
>>
>> >> Starting a discussion on the KIP to Allow state stores to serve stale>
>>
>> >> reads during rebalance(>
>>
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>> >
>>
>> >> ).>
>>
>> >> Thanks & Regards,Navinder>
>>
>> >> LinkedIn>
>>
>> >>>
>> >
>
>
  

Reply via email to