Ah thanks. I did not check the wiki but just browsed through all open
KIP discussions following the email threads.


-Matthias

On 9/30/18 12:06 PM, Dong Lin wrote:
> Hey Matthias,
> 
> Thanks for checking back  on the status. The KIP has been marked as
> replaced by KIP-320 in the KIP list wiki page and the status has been
> updated in the discussion and voting email thread.
> 
> Thanks,
> Dong
> 
> On Sun, 30 Sep 2018 at 11:51 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> It seems that KIP-320 was accepted. Thus, I am wondering what the status
>> of this KIP is?
>>
>> -Matthias
>>
>> On 7/11/18 10:59 AM, Dong Lin wrote:
>>> Hey Jun,
>>>
>>> Certainly. We can discuss later after KIP-320 settles.
>>>
>>> Thanks!
>>> Dong
>>>
>>>
>>> On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote:
>>>
>>>> Hi, Dong,
>>>>
>>>> Sorry for the late response. Since KIP-320 is covering some of the
>> similar
>>>> problems described in this KIP, perhaps we can wait until KIP-320
>> settles
>>>> and see what's still left uncovered in this KIP.
>>>>
>>>> Thanks,
>>>>
>>>> Jun
>>>>
>>>> On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote:
>>>>
>>>>> Hey Jun,
>>>>>
>>>>> It seems that we have made considerable progress on the discussion of
>>>>> KIP-253 since February. Do you think we should continue the discussion
>>>>> there, or can we continue the voting for this KIP? I am happy to submit
>>>> the
>>>>> PR and move forward the progress for this KIP.
>>>>>
>>>>> Thanks!
>>>>> Dong
>>>>>
>>>>>
>>>>> On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote:
>>>>>
>>>>>> Hey Jun,
>>>>>>
>>>>>> Sure, I will come up with a KIP this week. I think there is a way to
>>>>> allow
>>>>>> partition expansion to arbitrary number without introducing new
>>>> concepts
>>>>>> such as read-only partition or repartition epoch.
>>>>>>
>>>>>> Thanks,
>>>>>> Dong
>>>>>>
>>>>>> On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote:
>>>>>>
>>>>>>> Hi, Dong,
>>>>>>>
>>>>>>> Thanks for the reply. The general idea that you had for adding
>>>>> partitions
>>>>>>> is similar to what we had in mind. It would be useful to make this
>>>> more
>>>>>>> general, allowing adding an arbitrary number of partitions (instead
>> of
>>>>>>> just
>>>>>>> doubling) and potentially removing partitions as well. The following
>>>> is
>>>>>>> the
>>>>>>> high level idea from the discussion with Colin, Jason and Ismael.
>>>>>>>
>>>>>>> * To change the number of partitions from X to Y in a topic, the
>>>>>>> controller
>>>>>>> marks all existing X partitions as read-only and creates Y new
>>>>> partitions.
>>>>>>> The new partitions are writable and are tagged with a higher
>>>> repartition
>>>>>>> epoch (RE).
>>>>>>>
>>>>>>> * The controller propagates the new metadata to every broker. Once
>> the
>>>>>>> leader of a partition is marked as read-only, it rejects the produce
>>>>>>> requests on this partition. The producer will then refresh the
>>>> metadata
>>>>>>> and
>>>>>>> start publishing to the new writable partitions.
>>>>>>>
>>>>>>> * The consumers will then be consuming messages in RE order. The
>>>>> consumer
>>>>>>> coordinator will only assign partitions in the same RE to consumers.
>>>>> Only
>>>>>>> after all messages in an RE are consumed, will partitions in a higher
>>>> RE
>>>>>>> be
>>>>>>> assigned to consumers.
>>>>>>>
>>>>>>> As Colin mentioned, if we do the above, we could potentially (1) use
>> a
>>>>>>> globally unique partition id, or (2) use a globally unique topic id
>> to
>>>>>>> distinguish recreated partitions due to topic deletion.
>>>>>>>
>>>>>>> So, perhaps we can sketch out the re-partitioning KIP a bit more and
>>>> see
>>>>>>> if
>>>>>>> there is any overlap with KIP-232. Would you be interested in doing
>>>>> that?
>>>>>>> If not, we can do that next week.
>>>>>>>
>>>>>>> Jun
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com>
>>>> wrote:
>>>>>>>
>>>>>>>> Hey Jun,
>>>>>>>>
>>>>>>>> Interestingly I am also planning to sketch a KIP to allow partition
>>>>>>>> expansion for keyed topics after this KIP. Since you are already
>>>> doing
>>>>>>>> that, I guess I will just share my high level idea here in case it
>>>> is
>>>>>>>> helpful.
>>>>>>>>
>>>>>>>> The motivation for the KIP is that we currently lose order guarantee
>>>>> for
>>>>>>>> messages with the same key if we expand partitions of keyed topic.
>>>>>>>>
>>>>>>>> The solution can probably be built upon the following ideas:
>>>>>>>>
>>>>>>>> - Partition number of the keyed topic should always be doubled (or
>>>>>>>> multiplied by power of 2). Given that we select a partition based on
>>>>>>>> hash(key) % partitionNum, this should help us ensure that, a message
>>>>>>>> assigned to an existing partition will not be mapped to another
>>>>> existing
>>>>>>>> partition after partition expansion.
>>>>>>>>
>>>>>>>> - Producer includes in the ProduceRequest some information that
>>>> helps
>>>>>>>> ensure that messages produced ti a partition will monotonically
>>>>>>> increase in
>>>>>>>> the partitionNum of the topic. In other words, if broker receives a
>>>>>>>> ProduceRequest and notices that the producer does not know the
>>>>> partition
>>>>>>>> number has increased, broker should reject this request. That
>>>>>>> "information"
>>>>>>>> maybe leaderEpoch, max partitionEpoch of the partitions of the
>>>> topic,
>>>>> or
>>>>>>>> simply partitionNum of the topic. The benefit of this property is
>>>> that
>>>>>>> we
>>>>>>>> can keep the new logic for in-order message consumption entirely in
>>>>> how
>>>>>>>> consumer leader determines the partition -> consumer mapping.
>>>>>>>>
>>>>>>>> - When consumer leader determines partition -> consumer mapping,
>>>>> leader
>>>>>>>> first reads the start position for each partition using
>>>>>>> OffsetFetchRequest.
>>>>>>>> If start position are all non-zero, then assignment can be done in
>>>> its
>>>>>>>> current manner. The assumption is that, a message in the new
>>>> partition
>>>>>>>> should only be consumed after all messages with the same key
>>>> produced
>>>>>>>> before it has been consumed. Since some messages in the new
>>>> partition
>>>>>>> has
>>>>>>>> been consumed, we should not worry about consuming messages
>>>>>>> out-of-order.
>>>>>>>> This benefit of this approach is that we can avoid unnecessary
>>>>> overhead
>>>>>>> in
>>>>>>>> the common case.
>>>>>>>>
>>>>>>>> - If the consumer leader finds that the start position for some
>>>>>>> partition
>>>>>>>> is 0. Say the current partition number is 18 and the partition index
>>>>> is
>>>>>>> 12,
>>>>>>>> then consumer leader should ensure that messages produced to
>>>> partition
>>>>>>> 12 -
>>>>>>>> 18/2 = 3 before the first message of partition 12 is consumed,
>>>> before
>>>>> it
>>>>>>>> assigned partition 12 to any consumer in the consumer group. Since
>>>> we
>>>>>>> have
>>>>>>>> a "information" that is monotonically increasing per partition,
>>>>> consumer
>>>>>>>> can read the value of this information from the first message in
>>>>>>> partition
>>>>>>>> 12, get the offset corresponding to this value in partition 3,
>>>> assign
>>>>>>>> partition except for partition 12 (and probably other new
>>>> partitions)
>>>>> to
>>>>>>>> the existing consumers, waiting for the committed offset to go
>>>> beyond
>>>>>>> this
>>>>>>>> offset for partition 3, and trigger rebalance again so that
>>>> partition
>>>>> 3
>>>>>>> can
>>>>>>>> be reassigned to some consumer.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Dong
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote:
>>>>>>>>
>>>>>>>>> Hi, Dong,
>>>>>>>>>
>>>>>>>>> Thanks for the KIP. It looks good overall. We are working on a
>>>>>>> separate
>>>>>>>> KIP
>>>>>>>>> for adding partitions while preserving the ordering guarantees.
>>>> That
>>>>>>> may
>>>>>>>>> require another flavor of partition epoch. It's not very clear
>>>>> whether
>>>>>>>> that
>>>>>>>>> partition epoch can be merged with the partition epoch in this
>>>> KIP.
>>>>>>> So,
>>>>>>>>> perhaps you can wait on this a bit until we post the other KIP in
>>>>> the
>>>>>>>> next
>>>>>>>>> few days.
>>>>>>>>>
>>>>>>>>> Jun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> +1 on the KIP.
>>>>>>>>>>
>>>>>>>>>> I think the KIP is mainly about adding the capability of
>>>> tracking
>>>>>>> the
>>>>>>>>>> system state change lineage. It does not seem necessary to
>>>> bundle
>>>>>>> this
>>>>>>>>> KIP
>>>>>>>>>> with replacing the topic partition with partition epoch in
>>>>>>>> produce/fetch.
>>>>>>>>>> Replacing topic-partition string with partition epoch is
>>>>>>> essentially a
>>>>>>>>>> performance improvement on top of this KIP. That can probably be
>>>>>>> done
>>>>>>>>>> separately.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>
>>>>>>>>>> On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com
>>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe <
>>>>>>> cmcc...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin <
>>>>>>> lindon...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I understand that the KIP will adds overhead by
>>>>> introducing
>>>>>>>>>>>> per-partition
>>>>>>>>>>>>>> partitionEpoch. I am open to alternative solutions that
>>>>> does
>>>>>>>> not
>>>>>>>>>>> incur
>>>>>>>>>>>>>> additional overhead. But I don't see a better way now.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> IMO the overhead in the FetchResponse may not be that
>>>>> much.
>>>>>>> We
>>>>>>>>>>> probably
>>>>>>>>>>>>>> should discuss the percentage increase rather than the
>>>>>>> absolute
>>>>>>>>>>> number
>>>>>>>>>>>>>> increase. Currently after KIP-227, per-partition header
>>>>> has
>>>>>>> 23
>>>>>>>>>> bytes.
>>>>>>>>>>>> This
>>>>>>>>>>>>>> KIP adds another 4 bytes. Assume the records size is
>>>> 10KB,
>>>>>>> the
>>>>>>>>>>>> percentage
>>>>>>>>>>>>>> increase is 4 / (23 + 10000) = 0.03%. It seems
>>>> negligible,
>>>>>>>> right?
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the response.  I agree that the FetchRequest /
>>>>>>>>> FetchResponse
>>>>>>>>>>>> overhead should be OK, now that we have incremental fetch
>>>>>>> requests
>>>>>>>>> and
>>>>>>>>>>>> responses.  However, there are a lot of cases where the
>>>>>>> percentage
>>>>>>>>>>> increase
>>>>>>>>>>>> is much greater.  For example, if a client is doing full
>>>>>>>>>>> MetadataRequests /
>>>>>>>>>>>> Responses, we have some math kind of like this per
>>>> partition:
>>>>>>>>>>>>
>>>>>>>>>>>>> UpdateMetadataRequestPartitionState => topic partition
>>>>>>>>>>> controller_epoch
>>>>>>>>>>>> leader  leader_epoch partition_epoch isr zk_version replicas
>>>>>>>>>>>> offline_replicas
>>>>>>>>>>>>> 14 bytes:  topic => string (assuming about 10 byte topic
>>>>>>> names)
>>>>>>>>>>>>> 4 bytes:  partition => int32
>>>>>>>>>>>>> 4  bytes: conroller_epoch => int32
>>>>>>>>>>>>> 4  bytes: leader => int32
>>>>>>>>>>>>> 4  bytes: leader_epoch => int32
>>>>>>>>>>>>> +4 EXTRA bytes: partition_epoch => int32        <-- NEW
>>>>>>>>>>>>> 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR)
>>>>>>>>>>>>> 4 bytes: zk_version => int32
>>>>>>>>>>>>> 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas)
>>>>>>>>>>>>> 2  offline_replicas => [int32] (assuming no offline
>>>>> replicas)
>>>>>>>>>>>>
>>>>>>>>>>>> Assuming I added that up correctly, the per-partition
>>>> overhead
>>>>>>> goes
>>>>>>>>>> from
>>>>>>>>>>>> 64 bytes per partition to 68, a 6.2% increase.
>>>>>>>>>>>>
>>>>>>>>>>>> We could do similar math for a lot of the other RPCs.  And
>>>> you
>>>>>>> will
>>>>>>>>>> have
>>>>>>>>>>> a
>>>>>>>>>>>> similar memory and garbage collection impact on the brokers
>>>>>>> since
>>>>>>>> you
>>>>>>>>>>> have
>>>>>>>>>>>> to store all this extra state as well.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That is correct. IMO the Metadata is only updated periodically
>>>>>>> and is
>>>>>>>>>>> probably not a big deal if we increase it by 6%. The
>>>>> FetchResponse
>>>>>>>> and
>>>>>>>>>>> ProduceRequest are probably the only requests that are bounded
>>>>> by
>>>>>>> the
>>>>>>>>>>> bandwidth throughput.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I agree that we can probably save more space by using
>>>>>>> partition
>>>>>>>>> ID
>>>>>>>>>> so
>>>>>>>>>>>> that
>>>>>>>>>>>>>> we no longer needs the string topic name. The similar
>>>> idea
>>>>>>> has
>>>>>>>>> also
>>>>>>>>>>>> been
>>>>>>>>>>>>>> put in the Rejected Alternative section in KIP-227.
>>>> While
>>>>>>> this
>>>>>>>>> idea
>>>>>>>>>>> is
>>>>>>>>>>>>>> promising, it seems orthogonal to the goal of this KIP.
>>>>>>> Given
>>>>>>>>> that
>>>>>>>>>>>> there is
>>>>>>>>>>>>>> already many work to do in this KIP, maybe we can do the
>>>>>>>>> partition
>>>>>>>>>> ID
>>>>>>>>>>>> in a
>>>>>>>>>>>>>> separate KIP?
>>>>>>>>>>>>
>>>>>>>>>>>> I guess my thinking is that the goal here is to replace an
>>>>>>>> identifier
>>>>>>>>>>>> which can be re-used (the tuple of topic name, partition ID)
>>>>>>> with
>>>>>>>> an
>>>>>>>>>>>> identifier that cannot be re-used (the tuple of topic name,
>>>>>>>> partition
>>>>>>>>>> ID,
>>>>>>>>>>>> partition epoch) in order to gain better semantics.  As long
>>>>> as
>>>>>>> we
>>>>>>>>> are
>>>>>>>>>>>> replacing the identifier, why not replace it with an
>>>>> identifier
>>>>>>>> that
>>>>>>>>>> has
>>>>>>>>>>>> important performance advantages?  The KIP freeze for the
>>>> next
>>>>>>>>> release
>>>>>>>>>>> has
>>>>>>>>>>>> already passed, so there is time to do this.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> In general it can be easier for discussion and implementation
>>>> if
>>>>>>> we
>>>>>>>> can
>>>>>>>>>>> split a larger task into smaller and independent tasks. For
>>>>>>> example,
>>>>>>>>>>> KIP-112 and KIP-113 both deals with the JBOD support. KIP-31,
>>>>>>> KIP-32
>>>>>>>>> and
>>>>>>>>>>> KIP-33 are about timestamp support. The option on this can be
>>>>>>> subject
>>>>>>>>>>> though.
>>>>>>>>>>>
>>>>>>>>>>> IMO the change to switch from (topic, partition ID) to
>>>>>>> partitionEpch
>>>>>>>> in
>>>>>>>>>> all
>>>>>>>>>>> request/response requires us to going through all request one
>>>> by
>>>>>>> one.
>>>>>>>>> It
>>>>>>>>>>> may not be hard but it can be time consuming and tedious. At
>>>>> high
>>>>>>>> level
>>>>>>>>>> the
>>>>>>>>>>> goal and the change for that will be orthogonal to the changes
>>>>>>>> required
>>>>>>>>>> in
>>>>>>>>>>> this KIP. That is the main reason I think we can split them
>>>> into
>>>>>>> two
>>>>>>>>>> KIPs.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote:
>>>>>>>>>>>>> I think it is possible to move to entirely use
>>>>> partitionEpoch
>>>>>>>>> instead
>>>>>>>>>>> of
>>>>>>>>>>>>> (topic, partition) to identify a partition. Client can
>>>>> obtain
>>>>>>> the
>>>>>>>>>>>>> partitionEpoch -> (topic, partition) mapping from
>>>>>>>> MetadataResponse.
>>>>>>>>>> We
>>>>>>>>>>>>> probably need to figure out a way to assign partitionEpoch
>>>>> to
>>>>>>>>>> existing
>>>>>>>>>>>>> partitions in the cluster. But this should be doable.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is a good idea. I think it will save us some space in
>>>>> the
>>>>>>>>>>>>> request/response. The actual space saving in percentage
>>>>>>> probably
>>>>>>>>>>> depends
>>>>>>>>>>>> on
>>>>>>>>>>>>> the amount of data and the number of partitions of the
>>>> same
>>>>>>>> topic.
>>>>>>>>> I
>>>>>>>>>>> just
>>>>>>>>>>>>> think we can do it in a separate KIP.
>>>>>>>>>>>>
>>>>>>>>>>>> Hmm.  How much extra work would be required?  It seems like
>>>> we
>>>>>>> are
>>>>>>>>>>> already
>>>>>>>>>>>> changing almost every RPC that involves topics and
>>>> partitions,
>>>>>>>>> already
>>>>>>>>>>>> adding new per-partition state to ZooKeeper, already
>>>> changing
>>>>>>> how
>>>>>>>>>> clients
>>>>>>>>>>>> interact with partitions.  Is there some other big piece of
>>>>> work
>>>>>>>> we'd
>>>>>>>>>>> have
>>>>>>>>>>>> to do to move to partition IDs that we wouldn't need for
>>>>>>> partition
>>>>>>>>>>> epochs?
>>>>>>>>>>>> I guess we'd have to find a way to support regular
>>>>>>> expression-based
>>>>>>>>>> topic
>>>>>>>>>>>> subscriptions.  If we split this into multiple KIPs,
>>>> wouldn't
>>>>> we
>>>>>>>> end
>>>>>>>>> up
>>>>>>>>>>>> changing all that RPCs and ZK state a second time?  Also,
>>>> I'm
>>>>>>>> curious
>>>>>>>>>> if
>>>>>>>>>>>> anyone has done any proof of concept GC, memory, and network
>>>>>>> usage
>>>>>>>>>>>> measurements on switching topic names for topic IDs.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> We will need to go over all requests/responses to check how to
>>>>>>>> replace
>>>>>>>>>>> (topic, partition ID) with partition epoch. It requires
>>>>>>> non-trivial
>>>>>>>>> work
>>>>>>>>>>> and could take time. As you mentioned, we may want to see how
>>>>> much
>>>>>>>>> saving
>>>>>>>>>>> we can get by switching from topic names to partition epoch.
>>>>> That
>>>>>>>>> itself
>>>>>>>>>>> requires time and experiment. It seems that the new idea does
>>>>> not
>>>>>>>>>> rollback
>>>>>>>>>>> any change proposed in this KIP. So I am not sure we can get
>>>>> much
>>>>>>> by
>>>>>>>>>>> putting them into the same KIP.
>>>>>>>>>>>
>>>>>>>>>>> Anyway, if more people are interested in seeing the new idea
>>>> in
>>>>>>> the
>>>>>>>>> same
>>>>>>>>>>> KIP, I can try that.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> best,
>>>>>>>>>>>> Colin
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe <
>>>>>>>>> cmcc...@apache.org
>>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote:
>>>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe <
>>>>>>>>>>> cmcc...@apache.org>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote:
>>>>>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the comment.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe <
>>>>>>>>>>>> cmcc...@apache.org>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote:
>>>>>>>>>>>>>>>>>>>> Hey Colin,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for reviewing the KIP.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> If I understand you right, you maybe
>>>> suggesting
>>>>>>> that
>>>>>>>>> we
>>>>>>>>>>> can
>>>>>>>>>>>> use
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> global
>>>>>>>>>>>>>>>>>>>> metadataEpoch that is incremented every time
>>>>>>>>> controller
>>>>>>>>>>>> updates
>>>>>>>>>>>>>>>>> metadata.
>>>>>>>>>>>>>>>>>>>> The problem with this solution is that, if a
>>>>>>> topic
>>>>>>>> is
>>>>>>>>>>>> deleted
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> created
>>>>>>>>>>>>>>>>>>>> again, user will not know whether that the
>>>>> offset
>>>>>>>>> which
>>>>>>>>>> is
>>>>>>>>>>>>>>> stored
>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>> the topic deletion is no longer valid. This
>>>>>>>> motivates
>>>>>>>>>> the
>>>>>>>>>>>> idea
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> include
>>>>>>>>>>>>>>>>>>>> per-partition partitionEpoch. Does this sound
>>>>>>>>>> reasonable?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Perhaps we can store the last valid offset of
>>>>> each
>>>>>>>>> deleted
>>>>>>>>>>>> topic
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> ZooKeeper.  Then, when a topic with one of
>>>> those
>>>>>>> names
>>>>>>>>>> gets
>>>>>>>>>>>>>>>>> re-created, we
>>>>>>>>>>>>>>>>>>> can start the topic at the previous end offset
>>>>>>> rather
>>>>>>>>> than
>>>>>>>>>>> at
>>>>>>>>>>>> 0.
>>>>>>>>>>>>>>> This
>>>>>>>>>>>>>>>>>>> preserves immutability.  It is no more
>>>> burdensome
>>>>>>> than
>>>>>>>>>>> having
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> preserve a
>>>>>>>>>>>>>>>>>>> "last epoch" for the deleted partition
>>>> somewhere,
>>>>>>>> right?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> My concern with this solution is that the number
>>>> of
>>>>>>>>>> zookeeper
>>>>>>>>>>>> nodes
>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>> more and more over time if some users keep
>>>> deleting
>>>>>>> and
>>>>>>>>>>> creating
>>>>>>>>>>>>>>> topics.
>>>>>>>>>>>>>>>>> Do
>>>>>>>>>>>>>>>>>> you think this can be a problem?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> We could expire the "partition tombstones" after an
>>>>>>> hour
>>>>>>>> or
>>>>>>>>>> so.
>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>> practice this would solve the issue for clients
>>>> that
>>>>>>> like
>>>>>>>> to
>>>>>>>>>>>> destroy
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> re-create topics all the time.  In any case,
>>>> doesn't
>>>>>>> the
>>>>>>>>>> current
>>>>>>>>>>>>>>> proposal
>>>>>>>>>>>>>>>>> add per-partition znodes as well that we have to
>>>>> track
>>>>>>>> even
>>>>>>>>>>> after
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> partition is deleted?  Or did I misunderstand that?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Actually the current KIP does not add per-partition
>>>>>>> znodes.
>>>>>>>>>> Could
>>>>>>>>>>>> you
>>>>>>>>>>>>>>>> double check? I can fix the KIP wiki if there is
>>>>> anything
>>>>>>>>>>>> misleading.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I double-checked the KIP, and I can see that you are in
>>>>>>> fact
>>>>>>>>>> using a
>>>>>>>>>>>>>>> global counter for initializing partition epochs.  So,
>>>>> you
>>>>>>> are
>>>>>>>>>>>> correct, it
>>>>>>>>>>>>>>> doesn't add per-partition znodes for partitions that no
>>>>>>> longer
>>>>>>>>>>> exist.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If we expire the "partition tomstones" after an hour,
>>>>> and
>>>>>>>> the
>>>>>>>>>>> topic
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> re-created after more than an hour since the topic
>>>>>>> deletion,
>>>>>>>>>> then
>>>>>>>>>>>> we are
>>>>>>>>>>>>>>>> back to the situation where user can not tell whether
>>>>> the
>>>>>>>>> topic
>>>>>>>>>>> has
>>>>>>>>>>>> been
>>>>>>>>>>>>>>>> re-created or not, right?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, with an expiration period, it would not ensure
>>>>>>>>> immutability--
>>>>>>>>>>> you
>>>>>>>>>>>>>>> could effectively reuse partition names and they would
>>>>> look
>>>>>>>> the
>>>>>>>>>>> same.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It's not really clear to me what should happen
>>>> when a
>>>>>>>> topic
>>>>>>>>> is
>>>>>>>>>>>>>>> destroyed
>>>>>>>>>>>>>>>>> and re-created with new data.  Should consumers
>>>>>>> continue
>>>>>>>> to
>>>>>>>>> be
>>>>>>>>>>>> able to
>>>>>>>>>>>>>>>>> consume?  We don't know where they stopped
>>>> consuming
>>>>>>> from
>>>>>>>>> the
>>>>>>>>>>>> previous
>>>>>>>>>>>>>>>>> incarnation of the topic, so messages may have been
>>>>>>> lost.
>>>>>>>>>>>> Certainly
>>>>>>>>>>>>>>>>> consuming data from offset X of the new incarnation
>>>>> of
>>>>>>> the
>>>>>>>>>> topic
>>>>>>>>>>>> may
>>>>>>>>>>>>>>> give
>>>>>>>>>>>>>>>>> something totally different from what you would
>>>> have
>>>>>>>> gotten
>>>>>>>>>> from
>>>>>>>>>>>>>>> offset X
>>>>>>>>>>>>>>>>> of the previous incarnation of the topic.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> With the current KIP, if a consumer consumes a topic
>>>>>>> based
>>>>>>>> on
>>>>>>>>>> the
>>>>>>>>>>>> last
>>>>>>>>>>>>>>>> remembered (offset, partitionEpoch, leaderEpoch), and
>>>>> if
>>>>>>> the
>>>>>>>>>> topic
>>>>>>>>>>>> is
>>>>>>>>>>>>>>>> re-created, consume will throw
>>>>>>>> InvalidPartitionEpochException
>>>>>>>>>>>> because
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> previous partitionEpoch will be different from the
>>>>>>> current
>>>>>>>>>>>>>>> partitionEpoch.
>>>>>>>>>>>>>>>> This is described in the Proposed Changes ->
>>>>> Consumption
>>>>>>>> after
>>>>>>>>>>> topic
>>>>>>>>>>>>>>>> deletion in the KIP. I can improve the KIP if there
>>>> is
>>>>>>>>> anything
>>>>>>>>>>> not
>>>>>>>>>>>>>>> clear.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the clarification.  It sounds like what you
>>>>>>> really
>>>>>>>>> want
>>>>>>>>>>> is
>>>>>>>>>>>>>>> immutability-- i.e., to never "really" reuse partition
>>>>>>>>>> identifiers.
>>>>>>>>>>>> And
>>>>>>>>>>>>>>> you do this by making the partition name no longer the
>>>>>>> "real"
>>>>>>>>>>>> identifier.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> My big concern about this KIP is that it seems like an
>>>>>>>>>>>> anti-scalability
>>>>>>>>>>>>>>> feature.  Now we are adding 4 extra bytes for every
>>>>>>> partition
>>>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>>>>>> FetchResponse and Request, for example.  That could be
>>>> 40
>>>>>>> kb
>>>>>>>> per
>>>>>>>>>>>> request,
>>>>>>>>>>>>>>> if the user has 10,000 partitions.  And of course, the
>>>>> KIP
>>>>>>>> also
>>>>>>>>>>> makes
>>>>>>>>>>>>>>> massive changes to UpdateMetadataRequest,
>>>>> MetadataResponse,
>>>>>>>>>>>>>>> OffsetCommitRequest, OffsetFetchResponse,
>>>>>>> LeaderAndIsrRequest,
>>>>>>>>>>>>>>> ListOffsetResponse, etc. which will also increase their
>>>>>>> size
>>>>>>>> on
>>>>>>>>>> the
>>>>>>>>>>>> wire
>>>>>>>>>>>>>>> and in memory.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One thing that we talked a lot about in the past is
>>>>>>> replacing
>>>>>>>>>>>> partition
>>>>>>>>>>>>>>> names with IDs.  IDs have a lot of really nice
>>>> features.
>>>>>>> They
>>>>>>>>>> take
>>>>>>>>>>>> up much
>>>>>>>>>>>>>>> less space in memory than strings (especially 2-byte
>>>> Java
>>>>>>>>>> strings).
>>>>>>>>>>>> They
>>>>>>>>>>>>>>> can often be allocated on the stack rather than the
>>>> heap
>>>>>>>>>> (important
>>>>>>>>>>>> when
>>>>>>>>>>>>>>> you are dealing with hundreds of thousands of them).
>>>>> They
>>>>>>> can
>>>>>>>>> be
>>>>>>>>>>>>>>> efficiently deserialized and serialized.  If we use
>>>>> 64-bit
>>>>>>>> ones,
>>>>>>>>>> we
>>>>>>>>>>>> will
>>>>>>>>>>>>>>> never run out of IDs, which means that they can always
>>>> be
>>>>>>>> unique
>>>>>>>>>> per
>>>>>>>>>>>>>>> partition.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Given that the partition name is no longer the "real"
>>>>>>>> identifier
>>>>>>>>>> for
>>>>>>>>>>>>>>> partitions in the current KIP-232 proposal, why not
>>>> just
>>>>>>> move
>>>>>>>> to
>>>>>>>>>>> using
>>>>>>>>>>>>>>> partition IDs entirely instead of strings?  You have to
>>>>>>> change
>>>>>>>>> all
>>>>>>>>>>> the
>>>>>>>>>>>>>>> messages anyway.  There isn't much point any more to
>>>>>>> carrying
>>>>>>>>>> around
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> partition name in every RPC, since you really need
>>>> (name,
>>>>>>>> epoch)
>>>>>>>>>> to
>>>>>>>>>>>>>>> identify the partition.
>>>>>>>>>>>>>>> Probably the metadata response and a few other messages
>>>>>>> would
>>>>>>>>> have
>>>>>>>>>>> to
>>>>>>>>>>>>>>> still carry the partition name, to allow clients to go
>>>>> from
>>>>>>>> name
>>>>>>>>>> to
>>>>>>>>>>>> id.
>>>>>>>>>>>>>>> But we could mostly forget about the strings.  And then
>>>>>>> this
>>>>>>>>> would
>>>>>>>>>>> be
>>>>>>>>>>>> a
>>>>>>>>>>>>>>> scalability improvement rather than a scalability
>>>>> problem.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> By choosing to reuse the same (topic, partition,
>>>>>>> offset)
>>>>>>>>>>> 3-tuple,
>>>>>>>>>>>> we
>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> chosen to give up immutability.  That was a really
>>>> bad
>>>>>>>>> decision.
>>>>>>>>>>>> And
>>>>>>>>>>>>>>> now
>>>>>>>>>>>>>>>>> we have to worry about time dependencies, stale
>>>>> cached
>>>>>>>> data,
>>>>>>>>>> and
>>>>>>>>>>>> all
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> rest.  We can't completely fix this inside Kafka no
>>>>>>> matter
>>>>>>>>>> what
>>>>>>>>>>>> we do,
>>>>>>>>>>>>>>>>> because not all that cached data is inside Kafka
>>>>>>> itself.
>>>>>>>>> Some
>>>>>>>>>>> of
>>>>>>>>>>>> it
>>>>>>>>>>>>>>> may be
>>>>>>>>>>>>>>>>> in systems that Kafka has sent data to, such as
>>>> other
>>>>>>>>> daemons,
>>>>>>>>>>> SQL
>>>>>>>>>>>>>>>>> databases, streams, and so forth.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The current KIP will uniquely identify a message
>>>> using
>>>>>>>> (topic,
>>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>>>> offset, partitionEpoch) 4-tuple. This addresses the
>>>>>>> message
>>>>>>>>>>>> immutability
>>>>>>>>>>>>>>>> issue that you mentioned. Is there any corner case
>>>>> where
>>>>>>> the
>>>>>>>>>>> message
>>>>>>>>>>>>>>>> immutability is still not preserved with the current
>>>>> KIP?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I guess the idea here is that mirror maker should
>>>>> work
>>>>>>> as
>>>>>>>>>>> expected
>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>> users destroy a topic and re-create it with the
>>>> same
>>>>>>> name.
>>>>>>>>>>> That's
>>>>>>>>>>>>>>> kind of
>>>>>>>>>>>>>>>>> tough, though, since in that scenario, mirror maker
>>>>>>>> probably
>>>>>>>>>>>> should
>>>>>>>>>>>>>>> destroy
>>>>>>>>>>>>>>>>> and re-create the topic on the other end, too,
>>>> right?
>>>>>>>>>>> Otherwise,
>>>>>>>>>>>>>>> what you
>>>>>>>>>>>>>>>>> end up with on the other end could be half of one
>>>>>>>>> incarnation
>>>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> topic,
>>>>>>>>>>>>>>>>> and half of another.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What mirror maker really needs is to be able to
>>>>> follow
>>>>>>> a
>>>>>>>>>> stream
>>>>>>>>>>> of
>>>>>>>>>>>>>>> events
>>>>>>>>>>>>>>>>> about the kafka cluster itself.  We could have some
>>>>>>> master
>>>>>>>>>> topic
>>>>>>>>>>>>>>> which is
>>>>>>>>>>>>>>>>> always present and which contains data about all
>>>>> topic
>>>>>>>>>>> deletions,
>>>>>>>>>>>>>>>>> creations, etc.  Then MM can simply follow this
>>>> topic
>>>>>>> and
>>>>>>>> do
>>>>>>>>>>> what
>>>>>>>>>>>> is
>>>>>>>>>>>>>>> needed.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Then the next question maybe, should we use a
>>>>>>> global
>>>>>>>>>>>>>>> metadataEpoch +
>>>>>>>>>>>>>>>>>>>> per-partition partitionEpoch, instead of
>>>> using
>>>>>>>>>>> per-partition
>>>>>>>>>>>>>>>>> leaderEpoch
>>>>>>>>>>>>>>>>>>> +
>>>>>>>>>>>>>>>>>>>> per-partition leaderEpoch. The former
>>>> solution
>>>>>>> using
>>>>>>>>>>>>>>> metadataEpoch
>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> not work due to the following scenario
>>>>> (provided
>>>>>>> by
>>>>>>>>>> Jun):
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> "Consider the following scenario. In metadata
>>>>> v1,
>>>>>>>> the
>>>>>>>>>>> leader
>>>>>>>>>>>>>>> for a
>>>>>>>>>>>>>>>>>>>> partition is at broker 1. In metadata v2,
>>>>> leader
>>>>>>> is
>>>>>>>> at
>>>>>>>>>>>> broker
>>>>>>>>>>>>>>> 2. In
>>>>>>>>>>>>>>>>>>>> metadata v3, leader is at broker 1 again. The
>>>>>>> last
>>>>>>>>>>> committed
>>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> v1,
>>>>>>>>>>>>>>>>>>>> v2 and v3 are 10, 20 and 30, respectively. A
>>>>>>>> consumer
>>>>>>>>> is
>>>>>>>>>>>>>>> started and
>>>>>>>>>>>>>>>>>>> reads
>>>>>>>>>>>>>>>>>>>> metadata v1 and reads messages from offset 0
>>>> to
>>>>>>> 25
>>>>>>>>> from
>>>>>>>>>>>> broker
>>>>>>>>>>>>>>> 1. My
>>>>>>>>>>>>>>>>>>>> understanding is that in the current
>>>> proposal,
>>>>>>> the
>>>>>>>>>>> metadata
>>>>>>>>>>>>>>> version
>>>>>>>>>>>>>>>>>>>> associated with offset 25 is v1. The consumer
>>>>> is
>>>>>>>> then
>>>>>>>>>>>> restarted
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> fetches
>>>>>>>>>>>>>>>>>>>> metadata v2. The consumer tries to read from
>>>>>>> broker
>>>>>>>> 2,
>>>>>>>>>>>> which is
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>>>>> leader with the last offset at 20. In this
>>>>> case,
>>>>>>> the
>>>>>>>>>>>> consumer
>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>>>> get OffsetOutOfRangeException incorrectly."
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regarding your comment "For the second
>>>> purpose,
>>>>>>> this
>>>>>>>>> is
>>>>>>>>>>>> "soft
>>>>>>>>>>>>>>> state"
>>>>>>>>>>>>>>>>>>>> anyway.  If the client thinks X is the leader
>>>>>>> but Y
>>>>>>>> is
>>>>>>>>>>>> really
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> leader,
>>>>>>>>>>>>>>>>>>>> the client will talk to X, and X will point
>>>> out
>>>>>>> its
>>>>>>>>>>> mistake
>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>> sending
>>>>>>>>>>>>>>>>>>> back
>>>>>>>>>>>>>>>>>>>> a NOT_LEADER_FOR_PARTITION.", it is probably
>>>> no
>>>>>>>> true.
>>>>>>>>>> The
>>>>>>>>>>>>>>> problem
>>>>>>>>>>>>>>>>> here is
>>>>>>>>>>>>>>>>>>>> that the old leader X may still think it is
>>>> the
>>>>>>>> leader
>>>>>>>>>> of
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> thus it will not send back
>>>>>>> NOT_LEADER_FOR_PARTITION.
>>>>>>>>> The
>>>>>>>>>>>> reason
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>> provided
>>>>>>>>>>>>>>>>>>>> in KAFKA-6262. Can you check if that makes
>>>>> sense?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is solvable with a timeout, right?  If the
>>>>>>> leader
>>>>>>>>>> can't
>>>>>>>>>>>>>>>>> communicate
>>>>>>>>>>>>>>>>>>> with the controller for a certain period of
>>>> time,
>>>>>>> it
>>>>>>>>>> should
>>>>>>>>>>>> stop
>>>>>>>>>>>>>>>>> acting as
>>>>>>>>>>>>>>>>>>> the leader.  We have to solve this problem,
>>>>>>> anyway, in
>>>>>>>>>> order
>>>>>>>>>>>> to
>>>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>>>> all the
>>>>>>>>>>>>>>>>>>> corner cases.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Not sure if I fully understand your proposal. The
>>>>>>>> proposal
>>>>>>>>>>>> seems to
>>>>>>>>>>>>>>>>> require
>>>>>>>>>>>>>>>>>> non-trivial changes to our existing leadership
>>>>>>> election
>>>>>>>>>>>> mechanism.
>>>>>>>>>>>>>>> Could
>>>>>>>>>>>>>>>>>> you provide more detail regarding how it works?
>>>> For
>>>>>>>>> example,
>>>>>>>>>>> how
>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>> user choose this timeout, how leader determines
>>>>>>> whether
>>>>>>>> it
>>>>>>>>>> can
>>>>>>>>>>>> still
>>>>>>>>>>>>>>>>>> communicate with controller, and how this
>>>> triggers
>>>>>>>>>> controller
>>>>>>>>>>> to
>>>>>>>>>>>>>>> elect
>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>> leader?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Before I come up with any proposal, let me make
>>>> sure
>>>>> I
>>>>>>>>>>> understand
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> problem correctly.  My big question was, what
>>>>> prevents
>>>>>>>>>>> split-brain
>>>>>>>>>>>>>>> here?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Let's say I have a partition which is on nodes A,
>>>> B,
>>>>>>> and
>>>>>>>> C,
>>>>>>>>>> with
>>>>>>>>>>>>>>> min-ISR
>>>>>>>>>>>>>>>>> 2.  The controller is D.  At some point, there is a
>>>>>>>> network
>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>> between A and B and the rest of the cluster.  The
>>>>>>>> Controller
>>>>>>>>>>>>>>> re-assigns the
>>>>>>>>>>>>>>>>> partition to nodes C, D, and E.  But A and B keep
>>>>>>> chugging
>>>>>>>>>> away,
>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>> though they can no longer communicate with the
>>>>>>> controller.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> At some point, a client with stale metadata writes
>>>> to
>>>>>>> the
>>>>>>>>>>>> partition.
>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>> still thinks the partition is on node A, B, and C,
>>>> so
>>>>>>>> that's
>>>>>>>>>>>> where it
>>>>>>>>>>>>>>> sends
>>>>>>>>>>>>>>>>> the data.  It's unable to talk to C, but A and B
>>>>> reply
>>>>>>>> back
>>>>>>>>>> that
>>>>>>>>>>>> all
>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is this not a case where we could lose data due to
>>>>>>> split
>>>>>>>>>> brain?
>>>>>>>>>>>> Or is
>>>>>>>>>>>>>>>>> there a mechanism for preventing this that I
>>>> missed?
>>>>>>> If
>>>>>>>> it
>>>>>>>>>> is,
>>>>>>>>>>> it
>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>> like a pretty serious failure case that we should
>>>> be
>>>>>>>>> handling
>>>>>>>>>>>> with our
>>>>>>>>>>>>>>>>> metadata rework.  And I think epoch numbers and
>>>>>>> timeouts
>>>>>>>>> might
>>>>>>>>>>> be
>>>>>>>>>>>>>>> part of
>>>>>>>>>>>>>>>>> the solution.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Right, split brain can happen if RF=4 and minIsr=2.
>>>>>>>> However, I
>>>>>>>>>> am
>>>>>>>>>>>> not
>>>>>>>>>>>>>>> sure
>>>>>>>>>>>>>>>> it is a pretty serious issue which we need to address
>>>>>>> today.
>>>>>>>>>> This
>>>>>>>>>>>> can be
>>>>>>>>>>>>>>>> prevented by configuring the Kafka topic so that
>>>>> minIsr >
>>>>>>>>> RF/2.
>>>>>>>>>>>>>>> Actually,
>>>>>>>>>>>>>>>> if user sets minIsr=2, is there anything reason that
>>>>> user
>>>>>>>>> wants
>>>>>>>>>> to
>>>>>>>>>>>> set
>>>>>>>>>>>>>>> RF=4
>>>>>>>>>>>>>>>> instead of 4?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Introducing timeout in leader election mechanism is
>>>>>>>>>> non-trivial. I
>>>>>>>>>>>>>>> think we
>>>>>>>>>>>>>>>> probably want to do that only if there is good
>>>> use-case
>>>>>>> that
>>>>>>>>> can
>>>>>>>>>>> not
>>>>>>>>>>>>>>>> otherwise be addressed with the current mechanism.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I still would like to think about these corner cases
>>>>> more.
>>>>>>>> But
>>>>>>>>>>>> perhaps
>>>>>>>>>>>>>>> it's not directly related to this KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> regards,
>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 10:39 AM, Colin
>>>> McCabe
>>>>> <
>>>>>>>>>>>>>>> cmcc...@apache.org>
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks for proposing this KIP.  I think a
>>>>>>> metadata
>>>>>>>>>> epoch
>>>>>>>>>>>> is a
>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>>>>> idea.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I read through the DISCUSS thread, but I
>>>>> still
>>>>>>>> don't
>>>>>>>>>>> have
>>>>>>>>>>>> a
>>>>>>>>>>>>>>> clear
>>>>>>>>>>>>>>>>>>> picture
>>>>>>>>>>>>>>>>>>>>> of why the proposal uses a metadata epoch
>>>> per
>>>>>>>>>> partition
>>>>>>>>>>>> rather
>>>>>>>>>>>>>>>>> than a
>>>>>>>>>>>>>>>>>>>>> global metadata epoch.  A metadata epoch
>>>> per
>>>>>>>>> partition
>>>>>>>>>>> is
>>>>>>>>>>>>>>> kind of
>>>>>>>>>>>>>>>>>>>>> unpleasant-- it's at least 4 extra bytes
>>>> per
>>>>>>>>> partition
>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>> have to
>>>>>>>>>>>>>>>>>>> send
>>>>>>>>>>>>>>>>>>>>> over the wire in every full metadata
>>>> request,
>>>>>>>> which
>>>>>>>>>>> could
>>>>>>>>>>>>>>> become
>>>>>>>>>>>>>>>>> extra
>>>>>>>>>>>>>>>>>>>>> kilobytes on the wire when the number of
>>>>>>>> partitions
>>>>>>>>>>>> becomes
>>>>>>>>>>>>>>> large.
>>>>>>>>>>>>>>>>>>> Plus,
>>>>>>>>>>>>>>>>>>>>> we have to update all the auxillary classes
>>>>> to
>>>>>>>>> include
>>>>>>>>>>> an
>>>>>>>>>>>>>>> epoch.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We need to have a global metadata epoch
>>>>> anyway
>>>>>>> to
>>>>>>>>>> handle
>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>>>>> addition and deletion.  For example, if I
>>>>> give
>>>>>>> you
>>>>>>>>>>>>>>>>>>>>> MetadataResponse{part1,epoch 1, part2,
>>>> epoch
>>>>> 1}
>>>>>>>> and
>>>>>>>>>>>> {part1,
>>>>>>>>>>>>>>>>> epoch1},
>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> MetadataResponse is newer?  You have no way
>>>>> of
>>>>>>>>>> knowing.
>>>>>>>>>>>> It
>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> part2 has just been created, and the
>>>> response
>>>>>>>> with 2
>>>>>>>>>>>>>>> partitions is
>>>>>>>>>>>>>>>>>>> newer.
>>>>>>>>>>>>>>>>>>>>> Or it coudl be that part2 has just been
>>>>>>> deleted,
>>>>>>>> and
>>>>>>>>>>>>>>> therefore the
>>>>>>>>>>>>>>>>>>> response
>>>>>>>>>>>>>>>>>>>>> with 1 partition is newer.  You must have a
>>>>>>> global
>>>>>>>>>> epoch
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> disambiguate
>>>>>>>>>>>>>>>>>>>>> these two cases.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Previously, I worked on the Ceph
>>>> distributed
>>>>>>>>>> filesystem.
>>>>>>>>>>>>>>> Ceph had
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> concept of a map of the whole cluster,
>>>>>>> maintained
>>>>>>>>> by a
>>>>>>>>>>> few
>>>>>>>>>>>>>>> servers
>>>>>>>>>>>>>>>>>>> doing
>>>>>>>>>>>>>>>>>>>>> paxos.  This map was versioned by a single
>>>>>>> 64-bit
>>>>>>>>>> epoch
>>>>>>>>>>>> number
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> increased on every change.  It was
>>>> propagated
>>>>>>> to
>>>>>>>>>> clients
>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>> gossip.  I
>>>>>>>>>>>>>>>>>>>>> wonder if something similar could work
>>>> here?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> It seems like the the Kafka
>>>> MetadataResponse
>>>>>>>> serves
>>>>>>>>>> two
>>>>>>>>>>>>>>> somewhat
>>>>>>>>>>>>>>>>>>> unrelated
>>>>>>>>>>>>>>>>>>>>> purposes.  Firstly, it lets clients know
>>>> what
>>>>>>>>>> partitions
>>>>>>>>>>>>>>> exist in
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> system and where they live.  Secondly, it
>>>>> lets
>>>>>>>>> clients
>>>>>>>>>>>> know
>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>> nodes
>>>>>>>>>>>>>>>>>>>>> within the partition are in-sync (in the
>>>> ISR)
>>>>>>> and
>>>>>>>>>> which
>>>>>>>>>>>> node
>>>>>>>>>>>>>>> is the
>>>>>>>>>>>>>>>>>>> leader.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The first purpose is what you really need a
>>>>>>>> metadata
>>>>>>>>>>> epoch
>>>>>>>>>>>>>>> for, I
>>>>>>>>>>>>>>>>>>> think.
>>>>>>>>>>>>>>>>>>>>> You want to know whether a partition exists
>>>>> or
>>>>>>>> not,
>>>>>>>>> or
>>>>>>>>>>> you
>>>>>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>> know
>>>>>>>>>>>>>>>>>>>>> which nodes you should talk to in order to
>>>>>>> write
>>>>>>>> to
>>>>>>>>> a
>>>>>>>>>>>> given
>>>>>>>>>>>>>>>>>>> partition.  A
>>>>>>>>>>>>>>>>>>>>> single metadata epoch for the whole
>>>> response
>>>>>>>> should
>>>>>>>>> be
>>>>>>>>>>>>>>> adequate
>>>>>>>>>>>>>>>>> here.
>>>>>>>>>>>>>>>>>>> We
>>>>>>>>>>>>>>>>>>>>> should not change the partition assignment
>>>>>>> without
>>>>>>>>>> going
>>>>>>>>>>>>>>> through
>>>>>>>>>>>>>>>>>>> zookeeper
>>>>>>>>>>>>>>>>>>>>> (or a similar system), and this inherently
>>>>>>>>> serializes
>>>>>>>>>>>> updates
>>>>>>>>>>>>>>> into
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>> numbered stream.  Brokers should also stop
>>>>>>>>> responding
>>>>>>>>>> to
>>>>>>>>>>>>>>> requests
>>>>>>>>>>>>>>>>> when
>>>>>>>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>>>>>>> are unable to contact ZK for a certain time
>>>>>>>> period.
>>>>>>>>>>> This
>>>>>>>>>>>>>>> prevents
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>>> where a given partition has been moved off
>>>>> some
>>>>>>>> set
>>>>>>>>> of
>>>>>>>>>>>> nodes,
>>>>>>>>>>>>>>> but a
>>>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>>>>> still ends up talking to those nodes and
>>>>>>> writing
>>>>>>>>> data
>>>>>>>>>>>> there.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For the second purpose, this is "soft
>>>> state"
>>>>>>>> anyway.
>>>>>>>>>> If
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>>> thinks
>>>>>>>>>>>>>>>>>>>>> X is the leader but Y is really the leader,
>>>>> the
>>>>>>>>> client
>>>>>>>>>>>> will
>>>>>>>>>>>>>>> talk
>>>>>>>>>>>>>>>>> to X,
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> X will point out its mistake by sending
>>>> back
>>>>> a
>>>>>>>>>>>>>>>>>>> NOT_LEADER_FOR_PARTITION.
>>>>>>>>>>>>>>>>>>>>> Then the client can update its metadata
>>>> again
>>>>>>> and
>>>>>>>>> find
>>>>>>>>>>>> the new
>>>>>>>>>>>>>>>>> leader,
>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>> there is one.  There is no need for an
>>>> epoch
>>>>> to
>>>>>>>>> handle
>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>> Similarly, I
>>>>>>>>>>>>>>>>>>>>> can't think of a reason why changing the
>>>>>>> in-sync
>>>>>>>>>> replica
>>>>>>>>>>>> set
>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> bump
>>>>>>>>>>>>>>>>>>>>> the epoch.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> best,
>>>>>>>>>>>>>>>>>>>>> Colin
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 09:45, Dong Lin
>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the KIP!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang
>>>>>>> Wang <
>>>>>>>>>>>>>>>>> wangg...@gmail.com>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Yeah that makes sense, again I'm just
>>>>>>> making
>>>>>>>>> sure
>>>>>>>>>> we
>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>>> all the
>>>>>>>>>>>>>>>>>>>>>>> scenarios and what to expect.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I agree that if, more generally
>>>> speaking,
>>>>>>> say
>>>>>>>>>> users
>>>>>>>>>>>> have
>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>> consumed
>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> offset 8, and then call seek(16) to
>>>>> "jump"
>>>>>>> to
>>>>>>>> a
>>>>>>>>>>>> further
>>>>>>>>>>>>>>>>> position,
>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>> she
>>>>>>>>>>>>>>>>>>>>>>> needs to be aware that OORE maybe
>>>> thrown
>>>>>>> and
>>>>>>>> she
>>>>>>>>>>>> needs to
>>>>>>>>>>>>>>>>> handle
>>>>>>>>>>>>>>>>>>> it or
>>>>>>>>>>>>>>>>>>>>> rely
>>>>>>>>>>>>>>>>>>>>>>> on reset policy which should not
>>>> surprise
>>>>>>> her.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 12:31 AM, Dong
>>>>> Lin
>>>>>>> <
>>>>>>>>>>>>>>>>> lindon...@gmail.com>
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Yes, in general we can not prevent
>>>>>>>>>>>>>>> OffsetOutOfRangeException
>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>>>>> seeks
>>>>>>>>>>>>>>>>>>>>>>>> to a wrong offset. The main goal is
>>>> to
>>>>>>>> prevent
>>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException
>>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>>>> user has done things in the right
>>>> way,
>>>>>>> e.g.
>>>>>>>>> user
>>>>>>>>>>>> should
>>>>>>>>>>>>>>> know
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>>> message with this offset.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> For example, if user calls seek(..)
>>>>> right
>>>>>>>>> after
>>>>>>>>>>>>>>>>> construction, the
>>>>>>>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>>>>>>> reason I can think of is that user
>>>>> stores
>>>>>>>>> offset
>>>>>>>>>>>>>>> externally.
>>>>>>>>>>>>>>>>> In
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>>>>>>>>> user currently needs to use the
>>>> offset
>>>>>>> which
>>>>>>>>> is
>>>>>>>>>>>> obtained
>>>>>>>>>>>>>>>>> using
>>>>>>>>>>>>>>>>>>>>>>> position(..)
>>>>>>>>>>>>>>>>>>>>>>>> from the last run. With this KIP,
>>>> user
>>>>>>> needs
>>>>>>>>> to
>>>>>>>>>>> get
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> offset
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> offsetEpoch using
>>>>>>>> positionAndOffsetEpoch(...)
>>>>>>>>>> and
>>>>>>>>>>>> stores
>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>>>>> externally. The next time user starts
>>>>>>>>> consumer,
>>>>>>>>>>>> he/she
>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>>> to
>>>>>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to