Hey Colin,

On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <cmcc...@apache.org> wrote:

> On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
> > Hey Colin,
> >
> > Thanks for the comment.
> >
> > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <cmcc...@apache.org>
> wrote:
> >
> > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
> > > > Hey Colin,
> > > >
> > > > Thanks for reviewing the KIP.
> > > >
> > > > If I understand you right, you maybe suggesting that we can use a
> global
> > > > metadataEpoch that is incremented every time controller updates
> metadata.
> > > > The problem with this solution is that, if a topic is deleted and
> created
> > > > again, user will not know whether that the offset which is stored
> before
> > > > the topic deletion is no longer valid. This motivates the idea to
> include
> > > > per-partition partitionEpoch. Does this sound reasonable?
> > >
> > > Hi Dong,
> > >
> > > Perhaps we can store the last valid offset of each deleted topic in
> > > ZooKeeper.  Then, when a topic with one of those names gets
> re-created, we
> > > can start the topic at the previous end offset rather than at 0.  This
> > > preserves immutability.  It is no more burdensome than having to
> preserve a
> > > "last epoch" for the deleted partition somewhere, right?
> > >
> >
> > My concern with this solution is that the number of zookeeper nodes get
> > more and more over time if some users keep deleting and creating topics.
> Do
> > you think this can be a problem?
>
> Hi Dong,
>
> We could expire the "partition tombstones" after an hour or so.  In
> practice this would solve the issue for clients that like to destroy and
> re-create topics all the time.  In any case, doesn't the current proposal
> add per-partition znodes as well that we have to track even after the
> partition is deleted?  Or did I misunderstand that?
>

Actually the current KIP does not add per-partition znodes. Could you
double check? I can fix the KIP wiki if there is anything misleading.

If we expire the "partition tomstones" after an hour, and the topic is
re-created after more than an hour since the topic deletion, then we are
back to the situation where user can not tell whether the topic has been
re-created or not, right?


>
> It's not really clear to me what should happen when a topic is destroyed
> and re-created with new data.  Should consumers continue to be able to
> consume?  We don't know where they stopped consuming from the previous
> incarnation of the topic, so messages may have been lost.  Certainly
> consuming data from offset X of the new incarnation of the topic may give
> something totally different from what you would have gotten from offset X
> of the previous incarnation of the topic.
>

With the current KIP, if a consumer consumes a topic based on the last
remembered (offset, partitionEpoch, leaderEpoch), and if the topic is
re-created, consume will throw InvalidPartitionEpochException because the
previous partitionEpoch will be different from the current partitionEpoch.
This is described in the Proposed Changes -> Consumption after topic
deletion in the KIP. I can improve the KIP if there is anything not clear.


> By choosing to reuse the same (topic, partition, offset) 3-tuple, we have

chosen to give up immutability.  That was a really bad decision.  And now
> we have to worry about time dependencies, stale cached data, and all the
> rest.  We can't completely fix this inside Kafka no matter what we do,
> because not all that cached data is inside Kafka itself.  Some of it may be
> in systems that Kafka has sent data to, such as other daemons, SQL
> databases, streams, and so forth.
>

The current KIP will uniquely identify a message using (topic, partition,
offset, partitionEpoch) 4-tuple. This addresses the message immutability
issue that you mentioned. Is there any corner case where the message
immutability is still not preserved with the current KIP?


>
> I guess the idea here is that mirror maker should work as expected when
> users destroy a topic and re-create it with the same name.  That's kind of
> tough, though, since in that scenario, mirror maker probably should destroy
> and re-create the topic on the other end, too, right?  Otherwise, what you
> end up with on the other end could be half of one incarnation of the topic,
> and half of another.
>
> What mirror maker really needs is to be able to follow a stream of events
> about the kafka cluster itself.  We could have some master topic which is
> always present and which contains data about all topic deletions,
> creations, etc.  Then MM can simply follow this topic and do what is needed.
>
> >
> >
> > >
> > > >
> > > > Then the next question maybe, should we use a global metadataEpoch +
> > > > per-partition partitionEpoch, instead of using per-partition
> leaderEpoch
> > > +
> > > > per-partition leaderEpoch. The former solution using metadataEpoch
> would
> > > > not work due to the following scenario (provided by Jun):
> > > >
> > > > "Consider the following scenario. In metadata v1, the leader for a
> > > > partition is at broker 1. In metadata v2, leader is at broker 2. In
> > > > metadata v3, leader is at broker 1 again. The last committed offset
> in
> > > v1,
> > > > v2 and v3 are 10, 20 and 30, respectively. A consumer is started and
> > > reads
> > > > metadata v1 and reads messages from offset 0 to 25 from broker 1. My
> > > > understanding is that in the current proposal, the metadata version
> > > > associated with offset 25 is v1. The consumer is then restarted and
> > > fetches
> > > > metadata v2. The consumer tries to read from broker 2, which is the
> old
> > > > leader with the last offset at 20. In this case, the consumer will
> still
> > > > get OffsetOutOfRangeException incorrectly."
> > > >
> > > > Regarding your comment "For the second purpose, this is "soft state"
> > > > anyway.  If the client thinks X is the leader but Y is really the
> leader,
> > > > the client will talk to X, and X will point out its mistake by
> sending
> > > back
> > > > a NOT_LEADER_FOR_PARTITION.", it is probably no true. The problem
> here is
> > > > that the old leader X may still think it is the leader of the
> partition
> > > and
> > > > thus it will not send back NOT_LEADER_FOR_PARTITION. The reason is
> > > provided
> > > > in KAFKA-6262. Can you check if that makes sense?
> > >
> > > This is solvable with a timeout, right?  If the leader can't
> communicate
> > > with the controller for a certain period of time, it should stop
> acting as
> > > the leader.  We have to solve this problem, anyway, in order to fix
> all the
> > > corner cases.
> > >
> >
> > Not sure if I fully understand your proposal. The proposal seems to
> require
> > non-trivial changes to our existing leadership election mechanism. Could
> > you provide more detail regarding how it works? For example, how should
> > user choose this timeout, how leader determines whether it can still
> > communicate with controller, and how this triggers controller to elect
> new
> > leader?
>
> Before I come up with any proposal, let me make sure I understand the
> problem correctly.  My big question was, what prevents split-brain here?
>
> Let's say I have a partition which is on nodes A, B, and C, with min-ISR
> 2.  The controller is D.  At some point, there is a network partition
> between A and B and the rest of the cluster.  The Controller re-assigns the
> partition to nodes C, D, and E.  But A and B keep chugging away, even
> though they can no longer communicate with the controller.
>
> At some point, a client with stale metadata writes to the partition.  It
> still thinks the partition is on node A, B, and C, so that's where it sends
> the data.  It's unable to talk to C, but A and B reply back that all is
> well.
>
> Is this not a case where we could lose data due to split brain?  Or is
> there a mechanism for preventing this that I missed?  If it is, it seems
> like a pretty serious failure case that we should be handling with our
> metadata rework.  And I think epoch numbers and timeouts might be part of
> the solution.
>

Right, split brain can happen if RF=4 and minIsr=2. However, I am not sure
it is a pretty serious issue which we need to address today. This can be
prevented by configuring the Kafka topic so that minIsr > RF/2. Actually,
if user sets minIsr=2, is there anything reason that user wants to set RF=4
instead of 4?

Introducing timeout in leader election mechanism is non-trivial. I think we
probably want to do that only if there is good use-case that can not
otherwise be addressed with the current mechanism.


> best,
> Colin
>
>
> >
> >
> > > best,
> > > Colin
> > >
> > > >
> > > > Regards,
> > > > Dong
> > > >
> > > >
> > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <cmcc...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Dong,
> > > > >
> > > > > Thanks for proposing this KIP.  I think a metadata epoch is a
> really
> > > good
> > > > > idea.
> > > > >
> > > > > I read through the DISCUSS thread, but I still don't have a clear
> > > picture
> > > > > of why the proposal uses a metadata epoch per partition rather
> than a
> > > > > global metadata epoch.  A metadata epoch per partition is kind of
> > > > > unpleasant-- it's at least 4 extra bytes per partition that we
> have to
> > > send
> > > > > over the wire in every full metadata request, which could become
> extra
> > > > > kilobytes on the wire when the number of partitions becomes large.
> > > Plus,
> > > > > we have to update all the auxillary classes to include an epoch.
> > > > >
> > > > > We need to have a global metadata epoch anyway to handle partition
> > > > > addition and deletion.  For example, if I give you
> > > > > MetadataResponse{part1,epoch 1, part2, epoch 1} and {part1,
> epoch1},
> > > which
> > > > > MetadataResponse is newer?  You have no way of knowing.  It could
> be
> > > that
> > > > > part2 has just been created, and the response with 2 partitions is
> > > newer.
> > > > > Or it coudl be that part2 has just been deleted, and therefore the
> > > response
> > > > > with 1 partition is newer.  You must have a global epoch to
> > > disambiguate
> > > > > these two cases.
> > > > >
> > > > > Previously, I worked on the Ceph distributed filesystem.  Ceph had
> the
> > > > > concept of a map of the whole cluster, maintained by a few servers
> > > doing
> > > > > paxos.  This map was versioned by a single 64-bit epoch number
> which
> > > > > increased on every change.  It was propagated to clients through
> > > gossip.  I
> > > > > wonder if something similar could work here?
> > > > >
> > > > > It seems like the the Kafka MetadataResponse serves two somewhat
> > > unrelated
> > > > > purposes.  Firstly, it lets clients know what partitions exist in
> the
> > > > > system and where they live.  Secondly, it lets clients know which
> nodes
> > > > > within the partition are in-sync (in the ISR) and which node is the
> > > leader.
> > > > >
> > > > > The first purpose is what you really need a metadata epoch for, I
> > > think.
> > > > > You want to know whether a partition exists or not, or you want to
> know
> > > > > which nodes you should talk to in order to write to a given
> > > partition.  A
> > > > > single metadata epoch for the whole response should be adequate
> here.
> > > We
> > > > > should not change the partition assignment without going through
> > > zookeeper
> > > > > (or a similar system), and this inherently serializes updates into
> a
> > > > > numbered stream.  Brokers should also stop responding to requests
> when
> > > they
> > > > > are unable to contact ZK for a certain time period.  This prevents
> the
> > > case
> > > > > where a given partition has been moved off some set of nodes, but a
> > > client
> > > > > still ends up talking to those nodes and writing data there.
> > > > >
> > > > > For the second purpose, this is "soft state" anyway.  If the client
> > > thinks
> > > > > X is the leader but Y is really the leader, the client will talk
> to X,
> > > and
> > > > > X will point out its mistake by sending back a
> > > NOT_LEADER_FOR_PARTITION.
> > > > > Then the client can update its metadata again and find the new
> leader,
> > > if
> > > > > there is one.  There is no need for an epoch to handle this.
> > > Similarly, I
> > > > > can't think of a reason why changing the in-sync replica set needs
> to
> > > bump
> > > > > the epoch.
> > > > >
> > > > > best,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote:
> > > > > > Thanks much for reviewing the KIP!
> > > > > >
> > > > > > Dong
> > > > > >
> > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Yeah that makes sense, again I'm just making sure we understand
> > > all the
> > > > > > > scenarios and what to expect.
> > > > > > >
> > > > > > > I agree that if, more generally speaking, say users have only
> > > consumed
> > > > > to
> > > > > > > offset 8, and then call seek(16) to "jump" to a further
> position,
> > > then
> > > > > she
> > > > > > > needs to be aware that OORE maybe thrown and she needs to
> handle
> > > it or
> > > > > rely
> > > > > > > on reset policy which should not surprise her.
> > > > > > >
> > > > > > >
> > > > > > > I'm +1 on the KIP.
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Yes, in general we can not prevent OffsetOutOfRangeException
> if
> > > user
> > > > > > > seeks
> > > > > > > > to a wrong offset. The main goal is to prevent
> > > > > OffsetOutOfRangeException
> > > > > > > if
> > > > > > > > user has done things in the right way, e.g. user should know
> that
> > > > > there
> > > > > > > is
> > > > > > > > message with this offset.
> > > > > > > >
> > > > > > > > For example, if user calls seek(..) right after
> construction, the
> > > > > only
> > > > > > > > reason I can think of is that user stores offset externally.
> In
> > > this
> > > > > > > case,
> > > > > > > > user currently needs to use the offset which is obtained
> using
> > > > > > > position(..)
> > > > > > > > from the last run. With this KIP, user needs to get the
> offset
> > > and
> > > > > the
> > > > > > > > offsetEpoch using positionAndOffsetEpoch(...) and stores
> these
> > > > > > > information
> > > > > > > > externally. The next time user starts consumer, he/she needs
> to
> > > call
> > > > > > > > seek(..., offset, offsetEpoch) right after construction.
> Then KIP
> > > > > should
> > > > > > > be
> > > > > > > > able to ensure that we don't throw OffsetOutOfRangeException
> if
> > > > > there is
> > > > > > > no
> > > > > > > > unclean leader election.
> > > > > > > >
> > > > > > > > Does this sound OK?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Dong
> > > > > > > >
> > > > > > > >
> > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang <
> > > wangg...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > "If consumer wants to consume message with offset 16, then
> > > consumer
> > > > > > > must
> > > > > > > > > have
> > > > > > > > > already fetched message with offset 15"
> > > > > > > > >
> > > > > > > > > --> this may not be always true right? What if consumer
> just
> > > call
> > > > > > > > seek(16)
> > > > > > > > > after construction and then poll without committed offset
> ever
> > > > > stored
> > > > > > > > > before? Admittedly it is rare but we do not programmably
> > > disallow
> > > > > it.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Guozhang,
> > > > > > > > > >
> > > > > > > > > > Thanks much for reviewing the KIP!
> > > > > > > > > >
> > > > > > > > > > In the scenario you described, let's assume that broker
> A has
> > > > > > > messages
> > > > > > > > > with
> > > > > > > > > > offset up to 10, and broker B has messages with offset
> up to
> > > 20.
> > > > > If
> > > > > > > > > > consumer wants to consume message with offset 9, it will
> not
> > > > > receive
> > > > > > > > > > OffsetOutOfRangeException
> > > > > > > > > > from broker A.
> > > > > > > > > >
> > > > > > > > > > If consumer wants to consume message with offset 16, then
> > > > > consumer
> > > > > > > must
> > > > > > > > > > have already fetched message with offset 15, which can
> only
> > > come
> > > > > from
> > > > > > > > > > broker B. Because consumer will fetch from broker B only
> if
> > > > > > > leaderEpoch
> > > > > > > > > >=
> > > > > > > > > > 2, then the current consumer leaderEpoch can not be 1
> since
> > > this
> > > > > KIP
> > > > > > > > > > prevents leaderEpoch rewind. Thus we will not have
> > > > > > > > > > OffsetOutOfRangeException
> > > > > > > > > > in this case.
> > > > > > > > > >
> > > > > > > > > > Does this address your question, or maybe there is more
> > > advanced
> > > > > > > > scenario
> > > > > > > > > > that the KIP does not handle?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Dong
> > > > > > > > > >
> > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang <
> > > > > wangg...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thanks Dong, I made a pass over the wiki and it lgtm.
> > > > > > > > > > >
> > > > > > > > > > > Just a quick question: can we completely eliminate the
> > > > > > > > > > > OffsetOutOfRangeException with this approach? Say if
> there
> > > is
> > > > > > > > > consecutive
> > > > > > > > > > > leader changes such that the cached metadata's
> partition
> > > epoch
> > > > > is
> > > > > > > 1,
> > > > > > > > > and
> > > > > > > > > > > the metadata fetch response returns  with partition
> epoch 2
> > > > > > > pointing
> > > > > > > > to
> > > > > > > > > > > leader broker A, while the actual up-to-date metadata
> has
> > > > > partition
> > > > > > > > > > epoch 3
> > > > > > > > > > > whose leader is now broker B, the metadata refresh will
> > > still
> > > > > > > succeed
> > > > > > > > > and
> > > > > > > > > > > the follow-up fetch request may still see OORE?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Guozhang
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin <
> > > lindon...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi all,
> > > > > > > > > > > >
> > > > > > > > > > > > I would like to start the voting process for KIP-232:
> > > > > > > > > > > >
> > > > > > > > > > > > https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-
> > > > > > > > > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+
> > > > > > > > and+partitionEpoch
> > > > > > > > > > > >
> > > > > > > > > > > > The KIP will help fix a concurrency issue in Kafka
> which
> > > > > > > currently
> > > > > > > > > can
> > > > > > > > > > > > cause message loss or message duplication in
> consumer.
> > > > > > > > > > > >
> > > > > > > > > > > > Regards,
> > > > > > > > > > > > Dong
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > -- Guozhang
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > >
> > >
>

Reply via email to