Jay, thank you for the excellence. Regarding your point 6, I would just like to mention that with a partial function style interface facet for event listener registrations on top of FarRefs and RemotePromiseRefs, in ERights style distributed promises, it is easy to think about multiple registrations for different future scenarios, scheduled or failure. Even if undetected partition occurs, replays can be revoked after a previous attempt succeeds or other client defined strategies. I do not know if this is considered for 0.9.
Thank you, - Rob > On Sep 25, 2014, at 3:53 PM, "Jay Kreps (JIRA)" <j...@apache.org> wrote: > > > [ > https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14148371#comment-14148371 > ] > > Jay Kreps commented on KAFKA-1555: > ---------------------------------- > > This is a good discussion, I am glad we are taking the time to think this > through carefully. Let's aim for the right end state rather than optimizing > for what is easiest to implement now (since these features never get removed > and we end up spending a lot of time explaining them). > > [~sriramsub] it sounds like you are not totally sold on min.isr. Let me try > to summarize a few things people have said that I think are true and see if > people can agree on them: > > 1. Unclean leader election is an orthogonal issue. Regardless of settings, > choosing a leader that is not caught up means losing data. This option covers > catastrophic recovery (i.e. no server with complete data exists). We can give > finer control over whether unclean election is manual or automatic but I > think you need to have this escape hatch for the case where the authoritative > copy of the data is destroyed. > > 2. Specifying a min.isr does actually make sense. I think people have one of > two cases in mind. In one case non-availability means data loss. This is > likely the most common case. In this case even if you are down to your last > replica you still want to perform the write because there is still some hope > the data will not be lost and if you refuse the write the chance of loss is > 100%. In another case non-availability can be tolerated because something > upstream (perhaps the client or another system) can hold onto the data and > retry later. In this case you want to be sure that when you accept a write it > is safe. In this case refusing a write is okay but accepting a write and then > losing it is much worse. It's true that it is very hard to reason about the > right min.isr as that depends on the probability of failure over time. But > this criticism is also true of replication factor (e.g. to know an > appropriate replication factor to yield a particular probability of data loss > you need to know the joint probability distribution over machine failures). > > 3. With regard to min.isr there are three issues: (1) what are the settings > that actually make sense, (2) what is the best way to express these in the > protocol, and (3) what is the best way to represent this in the client > configuration. I think we need to start by agreeing on (1). > > 4. I believe people are actually in agreement that the following settings > make sense: > a. acks = 0, min.isr=0 > b. acks = 1, min.isr = 1 > c. acks = -1, min.isr in {1, ..., N} > Conversely no other settings make sense. Does everyone agree on this? If so > the question is really how to expose this to the user. > > 4. There were several proposals for how to express these options. > a. The current proposal is to have acks remain in the protocol with its > original meaning and values but add a topic configuration controlling > min.isr. I personally think this is a bit weird since both about the > definition of success for the request so it makes sense to send them with the > request. > b. Alternately we could add a new field in the produce request specifying the > min.isr. > c. Alternately we could add a new field in the response returning the actual > isr size. An advantage of this is that it allows the client to distinguish > between "write failed" and "write succeeded but not with enough replicas". > d. Finally we could overload acks = -2, -3, etc to mean acks = -1 and min.isr > = 2, 3, etc. This sounds insane, but maybe it actually isn't. Since not all > combinations of acks and min.isr make sense, this does actually enumerate the > sensible cases. > > 5. Regardless of what we do in (4) the configuration can be somewhat simpler. > Probably just by having the user specify min.isr and erroring out if they > combine min.isr and acks in a non-sensical way. > > 6. It isn't clear to me whether the right behavior is to fail fast when the > min.isr likely won't be met (because the current isr < min) and not attempt > the write at all or else to always do the write and then return an error if > the min.isr isn't met. The later case means that retries, which are the sane > thing to do when you get the error, will lead to potentially many duplicates. > In fact in the common case where you want to kind of block and keep trying > the write until durability can be guaranteed even if that takes a few minutes > this might means thousands of duplicates. > > > >> provide strong consistency with reasonable availability >> ------------------------------------------------------- >> >> Key: KAFKA-1555 >> URL: https://issues.apache.org/jira/browse/KAFKA-1555 >> Project: Kafka >> Issue Type: Improvement >> Components: controller >> Affects Versions: 0.8.1.1 >> Reporter: Jiang Wu >> Assignee: Gwen Shapira >> Fix For: 0.8.2 >> >> Attachments: KAFKA-1555.0.patch, KAFKA-1555.1.patch, >> KAFKA-1555.2.patch, KAFKA-1555.3.patch, KAFKA-1555.4.patch, >> KAFKA-1555.5.patch >> >> >> In a mission critical application, we expect a kafka cluster with 3 brokers >> can satisfy two requirements: >> 1. When 1 broker is down, no message loss or service blocking happens. >> 2. In worse cases such as two brokers are down, service can be blocked, but >> no message loss happens. >> We found that current kafka versoin (0.8.1.1) cannot achieve the >> requirements due to its three behaviors: >> 1. when choosing a new leader from 2 followers in ISR, the one with less >> messages may be chosen as the leader. >> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it >> has less messages than the leader. >> 3. ISR can contains only 1 broker, therefore acknowledged messages may be >> stored in only 1 broker. >> The following is an analytical proof. >> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume >> that at the beginning, all 3 replicas, leader A, followers B and C, are in >> sync, i.e., they have the same messages and are all in ISR. >> According to the value of request.required.acks (acks for short), there are >> the following cases. >> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. >> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this >> time, although C hasn't received m, C is still in ISR. If A is killed, C can >> be elected as the new leader, and consumers will miss m. >> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a >> message m to A, and receives an acknowledgement. Disk failure happens in A >> before B and C replicate m. Message m is lost. >> In summary, any existing configuration cannot satisfy the requirements. > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332)