They key point is that we have to keep all replicas consistent with each other such that no matter which replica a consumer reads from, it always reads the same data on a given offset. The following is an example.
Suppose we have 3 brokers A, B and C. Let's assume A is the leader and at some point, we have the following offsets and messages in each replica. offset A B C 1 m1 m1 m1 2 m2 Let's assume that message m1 is committed and message m2 is not. At exactly this moment, replica A dies. After a new leader is elected, say B, new messages can be committed with just replica B and C. Some point later if we commit two more messages m3 and m4, we will have the following. offset A B C 1 m1 m1 m1 2 m2 m3 m3 3 m4 m4 Now A comes back. For consistency, it's important for A's log to be identical to B and C. So, we have to remove m2 from A's log and add m3 and m4. As you can see, whether you want to republish m2 or not, m2 cannot stay in its current offset, since in other replicas, that offset is already taken by other messages. Therefore, a truncation of replica A's log is needed to keep the replicas consistent. Currently, we don republish messages like m2 since (1) it's not necessary since it's never considered committed; (2) it will make our protocol more complicated. Thanks, Jun On Tue, Jul 22, 2014 at 3:40 PM, scott@heroku <sc...@heroku.com> wrote: > Thanks Jun > > Can you explain a little more about what an uncommitted message means? > The messages are in the log so presumably? they have been acked at least > by the the local broker. > > I guess I am hoping for some intuition around why 'replaying' the messages > in question would cause bad things. > > Thanks! > > > Sent from my iPhone > > On Jul 22, 2014, at 3:06 PM, Jun Rao <jun...@gmail.com> wrote: > > > > Scott, > > > > The reason for truncation is that the broker that comes back may have > some > > un-committed messages. Those messages shouldn't be exposed to the > consumer > > and therefore need to be removed from the log. So, on broker startup, we > > first truncate the log to a safe point before which we know all messages > > are committed. This broker will then sync up with the current leader to > get > > the remaining messages. > > > > Thanks, > > > > Jun > > > > > >> On Tue, Jul 22, 2014 at 9:42 AM, Scott Clasen <sc...@heroku.com> wrote: > >> > >> Ahh, yes that message loss case. I've wondered about that myself. > >> > >> I guess I dont really understand why truncating messages is ever the > right > >> thing to do. As kafka is an 'at least once' system. (send a message, > get > >> no ack, it still might be on the topic) consumers that care will have to > >> de-dupe anyhow. > >> > >> To the kafka designers: is there anything preventing implementation of > >> alternatives to truncation? when a broker comes back online and needs to > >> truncate, cant it fire up a producer and take the extra messages and > send > >> them back to the original topic or alternatively an error topic? > >> > >> Would love to understand the rationale for the current design, as my > >> perspective is doubtfully as clear as the designers' > >> > >> > >> > >> > >> On Tue, Jul 22, 2014 at 6:21 AM, Jiang Wu (Pricehistory) (BLOOMBERG/ 731 > >> LEX -) <jwu...@bloomberg.net> wrote: > >> > >>> kafka-1028 addressed another unclean leader election problem. It > prevents > >>> a broker not in ISR from becoming a leader. The problem we are facing > is > >>> that a broker in ISR but without complete messages may become a leader. > >>> It's also a kind of unclean leader election, but not the one that > >>> kafka-1028 addressed. > >>> > >>> Here I'm trying to give a proof that current kafka doesn't achieve the > >>> requirement (no message loss, no blocking when 1 broker down) due to > its > >>> two behaviors: > >>> 1. when choosing a new leader from 2 followers in ISR, the one with > less > >>> messages may be chosen as the leader > >>> 2. even when replica.lag.max.messages=0, a follower can stay in ISR > when > >>> it has less messages than the leader. > >>> > >>> We consider a cluster with 3 brokers and a topic with 3 replicas. We > >>> analyze different cases according to the value of request.required.acks > >>> (acks for short). For each case and it subcases, we find situations > that > >>> either message loss or service blocking happens. We assume that at the > >>> beginning, all 3 replicas, leader A, followers B and C, are in sync, > >> i.e., > >>> they have the same messages and are all in ISR. > >>> > >>> 1. acks=0, 1, 3. Obviously these settings do not satisfy the > requirement. > >>> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At > >>> this time, although C hasn't received m, it's still in ISR. If A is > >> killed, > >>> C can be elected as the new leader, and consumers will miss m. > >>> 3. acks=-1. Suppose replica.lag.max.messages=M. There are two > sub-cases: > >>> 3.1 M>0. Suppose C be killed. C will be out of ISR after > >>> replica.lag.time.max.ms. Then the producer publishes M messages to A > and > >>> B. C restarts. C will join in ISR since it is M messages behind A and > B. > >>> Before C replicates all messages, A is killed, and C becomes leader, > then > >>> message loss happens. > >>> 3.2 M=0. In this case, when the producer publishes at a high speed, B > and > >>> C will fail out of ISR, only A keeps receiving messages. Then A is > >> killed. > >>> Either message loss or service blocking will happen, depending on > whether > >>> unclean leader election is enabled. > >>> > >>> > >>> From: users@kafka.apache.org At: Jul 21 2014 22:28:18 > >>> To: JIANG WU (PRICEHISTORY) (BLOOMBERG/ 731 LEX -), > >> users@kafka.apache.org > >>> Subject: Re: how to ensure strong consistency with reasonable > >> availability > >>> > >>> You will probably need 0.8.2 which gives > >>> https://issues.apache.org/jira/browse/KAFKA-1028 > >>> > >>> > >>> On Mon, Jul 21, 2014 at 6:37 PM, Jiang Wu (Pricehistory) (BLOOMBERG/ > 731 > >>> LEX -) <jwu...@bloomberg.net> wrote: > >>> > >>>> Hi everyone, > >>>> > >>>> With a cluster of 3 brokers and a topic of 3 replicas, we want to > >> achieve > >>>> the following two properties: > >>>> 1. when only one broker is down, there's no message loss, and > >>>> procuders/consumers are not blocked. > >>>> 2. in other more serious problems, for example, one broker is > restarted > >>>> twice in a short period or two brokers are down at the same time, > >>>> producers/consumers can be blocked, but no message loss is allowed. > >>>> > >>>> We haven't found any producer/broker paramter combinations that > achieve > >>>> this. If you know or think some configurations will work, please post > >>>> details. We have a test bed to verify any given configurations. > >>>> > >>>> In addition, I'm wondering if it's necessary to open a jira to require > >>> the > >>>> above feature? > >>>> > >>>> Thanks, > >>>> Jiang > >> >