Hey Colin,
On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <cmcc...@apache.org> wrote: > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: > > Hey Colin, > > > > Thanks for the comment. > > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <cmcc...@apache.org> > wrote: > > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: > > > > Hey Colin, > > > > > > > > Thanks for reviewing the KIP. > > > > > > > > If I understand you right, you maybe suggesting that we can use a > global > > > > metadataEpoch that is incremented every time controller updates > metadata. > > > > The problem with this solution is that, if a topic is deleted and > created > > > > again, user will not know whether that the offset which is stored > before > > > > the topic deletion is no longer valid. This motivates the idea to > include > > > > per-partition partitionEpoch. Does this sound reasonable? > > > > > > Hi Dong, > > > > > > Perhaps we can store the last valid offset of each deleted topic in > > > ZooKeeper. Then, when a topic with one of those names gets > re-created, we > > > can start the topic at the previous end offset rather than at 0. This > > > preserves immutability. It is no more burdensome than having to > preserve a > > > "last epoch" for the deleted partition somewhere, right? > > > > > > > My concern with this solution is that the number of zookeeper nodes get > > more and more over time if some users keep deleting and creating topics. > Do > > you think this can be a problem? > > Hi Dong, > > We could expire the "partition tombstones" after an hour or so. In > practice this would solve the issue for clients that like to destroy and > re-create topics all the time. In any case, doesn't the current proposal > add per-partition znodes as well that we have to track even after the > partition is deleted? Or did I misunderstand that? > Actually the current KIP does not add per-partition znodes. Could you double check? I can fix the KIP wiki if there is anything misleading. If we expire the "partition tomstones" after an hour, and the topic is re-created after more than an hour since the topic deletion, then we are back to the situation where user can not tell whether the topic has been re-created or not, right? > > It's not really clear to me what should happen when a topic is destroyed > and re-created with new data. Should consumers continue to be able to > consume? We don't know where they stopped consuming from the previous > incarnation of the topic, so messages may have been lost. Certainly > consuming data from offset X of the new incarnation of the topic may give > something totally different from what you would have gotten from offset X > of the previous incarnation of the topic. > With the current KIP, if a consumer consumes a topic based on the last remembered (offset, partitionEpoch, leaderEpoch), and if the topic is re-created, consume will throw InvalidPartitionEpochException because the previous partitionEpoch will be different from the current partitionEpoch. This is described in the Proposed Changes -> Consumption after topic deletion in the KIP. I can improve the KIP if there is anything not clear. > By choosing to reuse the same (topic, partition, offset) 3-tuple, we have chosen to give up immutability. That was a really bad decision. And now > we have to worry about time dependencies, stale cached data, and all the > rest. We can't completely fix this inside Kafka no matter what we do, > because not all that cached data is inside Kafka itself. Some of it may be > in systems that Kafka has sent data to, such as other daemons, SQL > databases, streams, and so forth. > The current KIP will uniquely identify a message using (topic, partition, offset, partitionEpoch) 4-tuple. This addresses the message immutability issue that you mentioned. Is there any corner case where the message immutability is still not preserved with the current KIP? > > I guess the idea here is that mirror maker should work as expected when > users destroy a topic and re-create it with the same name. That's kind of > tough, though, since in that scenario, mirror maker probably should destroy > and re-create the topic on the other end, too, right? Otherwise, what you > end up with on the other end could be half of one incarnation of the topic, > and half of another. > > What mirror maker really needs is to be able to follow a stream of events > about the kafka cluster itself. We could have some master topic which is > always present and which contains data about all topic deletions, > creations, etc. Then MM can simply follow this topic and do what is needed. > > > > > > > > > > > > > > > > Then the next question maybe, should we use a global metadataEpoch + > > > > per-partition partitionEpoch, instead of using per-partition > leaderEpoch > > > + > > > > per-partition leaderEpoch. The former solution using metadataEpoch > would > > > > not work due to the following scenario (provided by Jun): > > > > > > > > "Consider the following scenario. In metadata v1, the leader for a > > > > partition is at broker 1. In metadata v2, leader is at broker 2. In > > > > metadata v3, leader is at broker 1 again. The last committed offset > in > > > v1, > > > > v2 and v3 are 10, 20 and 30, respectively. A consumer is started and > > > reads > > > > metadata v1 and reads messages from offset 0 to 25 from broker 1. My > > > > understanding is that in the current proposal, the metadata version > > > > associated with offset 25 is v1. The consumer is then restarted and > > > fetches > > > > metadata v2. The consumer tries to read from broker 2, which is the > old > > > > leader with the last offset at 20. In this case, the consumer will > still > > > > get OffsetOutOfRangeException incorrectly." > > > > > > > > Regarding your comment "For the second purpose, this is "soft state" > > > > anyway. If the client thinks X is the leader but Y is really the > leader, > > > > the client will talk to X, and X will point out its mistake by > sending > > > back > > > > a NOT_LEADER_FOR_PARTITION.", it is probably no true. The problem > here is > > > > that the old leader X may still think it is the leader of the > partition > > > and > > > > thus it will not send back NOT_LEADER_FOR_PARTITION. The reason is > > > provided > > > > in KAFKA-6262. Can you check if that makes sense? > > > > > > This is solvable with a timeout, right? If the leader can't > communicate > > > with the controller for a certain period of time, it should stop > acting as > > > the leader. We have to solve this problem, anyway, in order to fix > all the > > > corner cases. > > > > > > > Not sure if I fully understand your proposal. The proposal seems to > require > > non-trivial changes to our existing leadership election mechanism. Could > > you provide more detail regarding how it works? For example, how should > > user choose this timeout, how leader determines whether it can still > > communicate with controller, and how this triggers controller to elect > new > > leader? > > Before I come up with any proposal, let me make sure I understand the > problem correctly. My big question was, what prevents split-brain here? > > Let's say I have a partition which is on nodes A, B, and C, with min-ISR > 2. The controller is D. At some point, there is a network partition > between A and B and the rest of the cluster. The Controller re-assigns the > partition to nodes C, D, and E. But A and B keep chugging away, even > though they can no longer communicate with the controller. > > At some point, a client with stale metadata writes to the partition. It > still thinks the partition is on node A, B, and C, so that's where it sends > the data. It's unable to talk to C, but A and B reply back that all is > well. > > Is this not a case where we could lose data due to split brain? Or is > there a mechanism for preventing this that I missed? If it is, it seems > like a pretty serious failure case that we should be handling with our > metadata rework. And I think epoch numbers and timeouts might be part of > the solution. > Right, split brain can happen if RF=4 and minIsr=2. However, I am not sure it is a pretty serious issue which we need to address today. This can be prevented by configuring the Kafka topic so that minIsr > RF/2. Actually, if user sets minIsr=2, is there anything reason that user wants to set RF=4 instead of 4? Introducing timeout in leader election mechanism is non-trivial. I think we probably want to do that only if there is good use-case that can not otherwise be addressed with the current mechanism. > best, > Colin > > > > > > > > > best, > > > Colin > > > > > > > > > > > Regards, > > > > Dong > > > > > > > > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <cmcc...@apache.org> > > > wrote: > > > > > > > > > Hi Dong, > > > > > > > > > > Thanks for proposing this KIP. I think a metadata epoch is a > really > > > good > > > > > idea. > > > > > > > > > > I read through the DISCUSS thread, but I still don't have a clear > > > picture > > > > > of why the proposal uses a metadata epoch per partition rather > than a > > > > > global metadata epoch. A metadata epoch per partition is kind of > > > > > unpleasant-- it's at least 4 extra bytes per partition that we > have to > > > send > > > > > over the wire in every full metadata request, which could become > extra > > > > > kilobytes on the wire when the number of partitions becomes large. > > > Plus, > > > > > we have to update all the auxillary classes to include an epoch. > > > > > > > > > > We need to have a global metadata epoch anyway to handle partition > > > > > addition and deletion. For example, if I give you > > > > > MetadataResponse{part1,epoch 1, part2, epoch 1} and {part1, > epoch1}, > > > which > > > > > MetadataResponse is newer? You have no way of knowing. It could > be > > > that > > > > > part2 has just been created, and the response with 2 partitions is > > > newer. > > > > > Or it coudl be that part2 has just been deleted, and therefore the > > > response > > > > > with 1 partition is newer. You must have a global epoch to > > > disambiguate > > > > > these two cases. > > > > > > > > > > Previously, I worked on the Ceph distributed filesystem. Ceph had > the > > > > > concept of a map of the whole cluster, maintained by a few servers > > > doing > > > > > paxos. This map was versioned by a single 64-bit epoch number > which > > > > > increased on every change. It was propagated to clients through > > > gossip. I > > > > > wonder if something similar could work here? > > > > > > > > > > It seems like the the Kafka MetadataResponse serves two somewhat > > > unrelated > > > > > purposes. Firstly, it lets clients know what partitions exist in > the > > > > > system and where they live. Secondly, it lets clients know which > nodes > > > > > within the partition are in-sync (in the ISR) and which node is the > > > leader. > > > > > > > > > > The first purpose is what you really need a metadata epoch for, I > > > think. > > > > > You want to know whether a partition exists or not, or you want to > know > > > > > which nodes you should talk to in order to write to a given > > > partition. A > > > > > single metadata epoch for the whole response should be adequate > here. > > > We > > > > > should not change the partition assignment without going through > > > zookeeper > > > > > (or a similar system), and this inherently serializes updates into > a > > > > > numbered stream. Brokers should also stop responding to requests > when > > > they > > > > > are unable to contact ZK for a certain time period. This prevents > the > > > case > > > > > where a given partition has been moved off some set of nodes, but a > > > client > > > > > still ends up talking to those nodes and writing data there. > > > > > > > > > > For the second purpose, this is "soft state" anyway. If the client > > > thinks > > > > > X is the leader but Y is really the leader, the client will talk > to X, > > > and > > > > > X will point out its mistake by sending back a > > > NOT_LEADER_FOR_PARTITION. > > > > > Then the client can update its metadata again and find the new > leader, > > > if > > > > > there is one. There is no need for an epoch to handle this. > > > Similarly, I > > > > > can't think of a reason why changing the in-sync replica set needs > to > > > bump > > > > > the epoch. > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote: > > > > > > Thanks much for reviewing the KIP! > > > > > > > > > > > > Dong > > > > > > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang < > wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Yeah that makes sense, again I'm just making sure we understand > > > all the > > > > > > > scenarios and what to expect. > > > > > > > > > > > > > > I agree that if, more generally speaking, say users have only > > > consumed > > > > > to > > > > > > > offset 8, and then call seek(16) to "jump" to a further > position, > > > then > > > > > she > > > > > > > needs to be aware that OORE maybe thrown and she needs to > handle > > > it or > > > > > rely > > > > > > > on reset policy which should not surprise her. > > > > > > > > > > > > > > > > > > > > > I'm +1 on the KIP. > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin < > lindon...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > Yes, in general we can not prevent OffsetOutOfRangeException > if > > > user > > > > > > > seeks > > > > > > > > to a wrong offset. The main goal is to prevent > > > > > OffsetOutOfRangeException > > > > > > > if > > > > > > > > user has done things in the right way, e.g. user should know > that > > > > > there > > > > > > > is > > > > > > > > message with this offset. > > > > > > > > > > > > > > > > For example, if user calls seek(..) right after > construction, the > > > > > only > > > > > > > > reason I can think of is that user stores offset externally. > In > > > this > > > > > > > case, > > > > > > > > user currently needs to use the offset which is obtained > using > > > > > > > position(..) > > > > > > > > from the last run. With this KIP, user needs to get the > offset > > > and > > > > > the > > > > > > > > offsetEpoch using positionAndOffsetEpoch(...) and stores > these > > > > > > > information > > > > > > > > externally. The next time user starts consumer, he/she needs > to > > > call > > > > > > > > seek(..., offset, offsetEpoch) right after construction. > Then KIP > > > > > should > > > > > > > be > > > > > > > > able to ensure that we don't throw OffsetOutOfRangeException > if > > > > > there is > > > > > > > no > > > > > > > > unclean leader election. > > > > > > > > > > > > > > > > Does this sound OK? > > > > > > > > > > > > > > > > Regards, > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang < > > > wangg...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > "If consumer wants to consume message with offset 16, then > > > consumer > > > > > > > must > > > > > > > > > have > > > > > > > > > already fetched message with offset 15" > > > > > > > > > > > > > > > > > > --> this may not be always true right? What if consumer > just > > > call > > > > > > > > seek(16) > > > > > > > > > after construction and then poll without committed offset > ever > > > > > stored > > > > > > > > > before? Admittedly it is rare but we do not programmably > > > disallow > > > > > it. > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin < > > > lindon...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > > > > > > > Thanks much for reviewing the KIP! > > > > > > > > > > > > > > > > > > > > In the scenario you described, let's assume that broker > A has > > > > > > > messages > > > > > > > > > with > > > > > > > > > > offset up to 10, and broker B has messages with offset > up to > > > 20. > > > > > If > > > > > > > > > > consumer wants to consume message with offset 9, it will > not > > > > > receive > > > > > > > > > > OffsetOutOfRangeException > > > > > > > > > > from broker A. > > > > > > > > > > > > > > > > > > > > If consumer wants to consume message with offset 16, then > > > > > consumer > > > > > > > must > > > > > > > > > > have already fetched message with offset 15, which can > only > > > come > > > > > from > > > > > > > > > > broker B. Because consumer will fetch from broker B only > if > > > > > > > leaderEpoch > > > > > > > > > >= > > > > > > > > > > 2, then the current consumer leaderEpoch can not be 1 > since > > > this > > > > > KIP > > > > > > > > > > prevents leaderEpoch rewind. Thus we will not have > > > > > > > > > > OffsetOutOfRangeException > > > > > > > > > > in this case. > > > > > > > > > > > > > > > > > > > > Does this address your question, or maybe there is more > > > advanced > > > > > > > > scenario > > > > > > > > > > that the KIP does not handle? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang < > > > > > wangg...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Thanks Dong, I made a pass over the wiki and it lgtm. > > > > > > > > > > > > > > > > > > > > > > Just a quick question: can we completely eliminate the > > > > > > > > > > > OffsetOutOfRangeException with this approach? Say if > there > > > is > > > > > > > > > consecutive > > > > > > > > > > > leader changes such that the cached metadata's > partition > > > epoch > > > > > is > > > > > > > 1, > > > > > > > > > and > > > > > > > > > > > the metadata fetch response returns with partition > epoch 2 > > > > > > > pointing > > > > > > > > to > > > > > > > > > > > leader broker A, while the actual up-to-date metadata > has > > > > > partition > > > > > > > > > > epoch 3 > > > > > > > > > > > whose leader is now broker B, the metadata refresh will > > > still > > > > > > > succeed > > > > > > > > > and > > > > > > > > > > > the follow-up fetch request may still see OORE? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin < > > > lindon...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > I would like to start the voting process for KIP-232: > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/ > confluence/display/KAFKA/KIP- > > > > > > > > > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+ > > > > > > > > and+partitionEpoch > > > > > > > > > > > > > > > > > > > > > > > > The KIP will help fix a concurrency issue in Kafka > which > > > > > > > currently > > > > > > > > > can > > > > > > > > > > > > cause message loss or message duplication in > consumer. > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > >