Ah thanks. I did not check the wiki but just browsed through all open KIP discussions following the email threads.
-Matthias On 9/30/18 12:06 PM, Dong Lin wrote: > Hey Matthias, > > Thanks for checking back on the status. The KIP has been marked as > replaced by KIP-320 in the KIP list wiki page and the status has been > updated in the discussion and voting email thread. > > Thanks, > Dong > > On Sun, 30 Sep 2018 at 11:51 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> It seems that KIP-320 was accepted. Thus, I am wondering what the status >> of this KIP is? >> >> -Matthias >> >> On 7/11/18 10:59 AM, Dong Lin wrote: >>> Hey Jun, >>> >>> Certainly. We can discuss later after KIP-320 settles. >>> >>> Thanks! >>> Dong >>> >>> >>> On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote: >>> >>>> Hi, Dong, >>>> >>>> Sorry for the late response. Since KIP-320 is covering some of the >> similar >>>> problems described in this KIP, perhaps we can wait until KIP-320 >> settles >>>> and see what's still left uncovered in this KIP. >>>> >>>> Thanks, >>>> >>>> Jun >>>> >>>> On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote: >>>> >>>>> Hey Jun, >>>>> >>>>> It seems that we have made considerable progress on the discussion of >>>>> KIP-253 since February. Do you think we should continue the discussion >>>>> there, or can we continue the voting for this KIP? I am happy to submit >>>> the >>>>> PR and move forward the progress for this KIP. >>>>> >>>>> Thanks! >>>>> Dong >>>>> >>>>> >>>>> On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote: >>>>> >>>>>> Hey Jun, >>>>>> >>>>>> Sure, I will come up with a KIP this week. I think there is a way to >>>>> allow >>>>>> partition expansion to arbitrary number without introducing new >>>> concepts >>>>>> such as read-only partition or repartition epoch. >>>>>> >>>>>> Thanks, >>>>>> Dong >>>>>> >>>>>> On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote: >>>>>> >>>>>>> Hi, Dong, >>>>>>> >>>>>>> Thanks for the reply. The general idea that you had for adding >>>>> partitions >>>>>>> is similar to what we had in mind. It would be useful to make this >>>> more >>>>>>> general, allowing adding an arbitrary number of partitions (instead >> of >>>>>>> just >>>>>>> doubling) and potentially removing partitions as well. The following >>>> is >>>>>>> the >>>>>>> high level idea from the discussion with Colin, Jason and Ismael. >>>>>>> >>>>>>> * To change the number of partitions from X to Y in a topic, the >>>>>>> controller >>>>>>> marks all existing X partitions as read-only and creates Y new >>>>> partitions. >>>>>>> The new partitions are writable and are tagged with a higher >>>> repartition >>>>>>> epoch (RE). >>>>>>> >>>>>>> * The controller propagates the new metadata to every broker. Once >> the >>>>>>> leader of a partition is marked as read-only, it rejects the produce >>>>>>> requests on this partition. The producer will then refresh the >>>> metadata >>>>>>> and >>>>>>> start publishing to the new writable partitions. >>>>>>> >>>>>>> * The consumers will then be consuming messages in RE order. The >>>>> consumer >>>>>>> coordinator will only assign partitions in the same RE to consumers. >>>>> Only >>>>>>> after all messages in an RE are consumed, will partitions in a higher >>>> RE >>>>>>> be >>>>>>> assigned to consumers. >>>>>>> >>>>>>> As Colin mentioned, if we do the above, we could potentially (1) use >> a >>>>>>> globally unique partition id, or (2) use a globally unique topic id >> to >>>>>>> distinguish recreated partitions due to topic deletion. >>>>>>> >>>>>>> So, perhaps we can sketch out the re-partitioning KIP a bit more and >>>> see >>>>>>> if >>>>>>> there is any overlap with KIP-232. Would you be interested in doing >>>>> that? >>>>>>> If not, we can do that next week. >>>>>>> >>>>>>> Jun >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com> >>>> wrote: >>>>>>> >>>>>>>> Hey Jun, >>>>>>>> >>>>>>>> Interestingly I am also planning to sketch a KIP to allow partition >>>>>>>> expansion for keyed topics after this KIP. Since you are already >>>> doing >>>>>>>> that, I guess I will just share my high level idea here in case it >>>> is >>>>>>>> helpful. >>>>>>>> >>>>>>>> The motivation for the KIP is that we currently lose order guarantee >>>>> for >>>>>>>> messages with the same key if we expand partitions of keyed topic. >>>>>>>> >>>>>>>> The solution can probably be built upon the following ideas: >>>>>>>> >>>>>>>> - Partition number of the keyed topic should always be doubled (or >>>>>>>> multiplied by power of 2). Given that we select a partition based on >>>>>>>> hash(key) % partitionNum, this should help us ensure that, a message >>>>>>>> assigned to an existing partition will not be mapped to another >>>>> existing >>>>>>>> partition after partition expansion. >>>>>>>> >>>>>>>> - Producer includes in the ProduceRequest some information that >>>> helps >>>>>>>> ensure that messages produced ti a partition will monotonically >>>>>>> increase in >>>>>>>> the partitionNum of the topic. In other words, if broker receives a >>>>>>>> ProduceRequest and notices that the producer does not know the >>>>> partition >>>>>>>> number has increased, broker should reject this request. That >>>>>>> "information" >>>>>>>> maybe leaderEpoch, max partitionEpoch of the partitions of the >>>> topic, >>>>> or >>>>>>>> simply partitionNum of the topic. The benefit of this property is >>>> that >>>>>>> we >>>>>>>> can keep the new logic for in-order message consumption entirely in >>>>> how >>>>>>>> consumer leader determines the partition -> consumer mapping. >>>>>>>> >>>>>>>> - When consumer leader determines partition -> consumer mapping, >>>>> leader >>>>>>>> first reads the start position for each partition using >>>>>>> OffsetFetchRequest. >>>>>>>> If start position are all non-zero, then assignment can be done in >>>> its >>>>>>>> current manner. The assumption is that, a message in the new >>>> partition >>>>>>>> should only be consumed after all messages with the same key >>>> produced >>>>>>>> before it has been consumed. Since some messages in the new >>>> partition >>>>>>> has >>>>>>>> been consumed, we should not worry about consuming messages >>>>>>> out-of-order. >>>>>>>> This benefit of this approach is that we can avoid unnecessary >>>>> overhead >>>>>>> in >>>>>>>> the common case. >>>>>>>> >>>>>>>> - If the consumer leader finds that the start position for some >>>>>>> partition >>>>>>>> is 0. Say the current partition number is 18 and the partition index >>>>> is >>>>>>> 12, >>>>>>>> then consumer leader should ensure that messages produced to >>>> partition >>>>>>> 12 - >>>>>>>> 18/2 = 3 before the first message of partition 12 is consumed, >>>> before >>>>> it >>>>>>>> assigned partition 12 to any consumer in the consumer group. Since >>>> we >>>>>>> have >>>>>>>> a "information" that is monotonically increasing per partition, >>>>> consumer >>>>>>>> can read the value of this information from the first message in >>>>>>> partition >>>>>>>> 12, get the offset corresponding to this value in partition 3, >>>> assign >>>>>>>> partition except for partition 12 (and probably other new >>>> partitions) >>>>> to >>>>>>>> the existing consumers, waiting for the committed offset to go >>>> beyond >>>>>>> this >>>>>>>> offset for partition 3, and trigger rebalance again so that >>>> partition >>>>> 3 >>>>>>> can >>>>>>>> be reassigned to some consumer. >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Dong >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote: >>>>>>>> >>>>>>>>> Hi, Dong, >>>>>>>>> >>>>>>>>> Thanks for the KIP. It looks good overall. We are working on a >>>>>>> separate >>>>>>>> KIP >>>>>>>>> for adding partitions while preserving the ordering guarantees. >>>> That >>>>>>> may >>>>>>>>> require another flavor of partition epoch. It's not very clear >>>>> whether >>>>>>>> that >>>>>>>>> partition epoch can be merged with the partition epoch in this >>>> KIP. >>>>>>> So, >>>>>>>>> perhaps you can wait on this a bit until we post the other KIP in >>>>> the >>>>>>>> next >>>>>>>>> few days. >>>>>>>>> >>>>>>>>> Jun >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com> >>>>>>> wrote: >>>>>>>>> >>>>>>>>>> +1 on the KIP. >>>>>>>>>> >>>>>>>>>> I think the KIP is mainly about adding the capability of >>>> tracking >>>>>>> the >>>>>>>>>> system state change lineage. It does not seem necessary to >>>> bundle >>>>>>> this >>>>>>>>> KIP >>>>>>>>>> with replacing the topic partition with partition epoch in >>>>>>>> produce/fetch. >>>>>>>>>> Replacing topic-partition string with partition epoch is >>>>>>> essentially a >>>>>>>>>> performance improvement on top of this KIP. That can probably be >>>>>>> done >>>>>>>>>> separately. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>> >>>>>>>>>> On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com >>>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hey Colin, >>>>>>>>>>> >>>>>>>>>>> On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe < >>>>>>> cmcc...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin < >>>>>>> lindon...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I understand that the KIP will adds overhead by >>>>> introducing >>>>>>>>>>>> per-partition >>>>>>>>>>>>>> partitionEpoch. I am open to alternative solutions that >>>>> does >>>>>>>> not >>>>>>>>>>> incur >>>>>>>>>>>>>> additional overhead. But I don't see a better way now. >>>>>>>>>>>>>> >>>>>>>>>>>>>> IMO the overhead in the FetchResponse may not be that >>>>> much. >>>>>>> We >>>>>>>>>>> probably >>>>>>>>>>>>>> should discuss the percentage increase rather than the >>>>>>> absolute >>>>>>>>>>> number >>>>>>>>>>>>>> increase. Currently after KIP-227, per-partition header >>>>> has >>>>>>> 23 >>>>>>>>>> bytes. >>>>>>>>>>>> This >>>>>>>>>>>>>> KIP adds another 4 bytes. Assume the records size is >>>> 10KB, >>>>>>> the >>>>>>>>>>>> percentage >>>>>>>>>>>>>> increase is 4 / (23 + 10000) = 0.03%. It seems >>>> negligible, >>>>>>>> right? >>>>>>>>>>>> >>>>>>>>>>>> Hi Dong, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the response. I agree that the FetchRequest / >>>>>>>>> FetchResponse >>>>>>>>>>>> overhead should be OK, now that we have incremental fetch >>>>>>> requests >>>>>>>>> and >>>>>>>>>>>> responses. However, there are a lot of cases where the >>>>>>> percentage >>>>>>>>>>> increase >>>>>>>>>>>> is much greater. For example, if a client is doing full >>>>>>>>>>> MetadataRequests / >>>>>>>>>>>> Responses, we have some math kind of like this per >>>> partition: >>>>>>>>>>>> >>>>>>>>>>>>> UpdateMetadataRequestPartitionState => topic partition >>>>>>>>>>> controller_epoch >>>>>>>>>>>> leader leader_epoch partition_epoch isr zk_version replicas >>>>>>>>>>>> offline_replicas >>>>>>>>>>>>> 14 bytes: topic => string (assuming about 10 byte topic >>>>>>> names) >>>>>>>>>>>>> 4 bytes: partition => int32 >>>>>>>>>>>>> 4 bytes: conroller_epoch => int32 >>>>>>>>>>>>> 4 bytes: leader => int32 >>>>>>>>>>>>> 4 bytes: leader_epoch => int32 >>>>>>>>>>>>> +4 EXTRA bytes: partition_epoch => int32 <-- NEW >>>>>>>>>>>>> 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR) >>>>>>>>>>>>> 4 bytes: zk_version => int32 >>>>>>>>>>>>> 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas) >>>>>>>>>>>>> 2 offline_replicas => [int32] (assuming no offline >>>>> replicas) >>>>>>>>>>>> >>>>>>>>>>>> Assuming I added that up correctly, the per-partition >>>> overhead >>>>>>> goes >>>>>>>>>> from >>>>>>>>>>>> 64 bytes per partition to 68, a 6.2% increase. >>>>>>>>>>>> >>>>>>>>>>>> We could do similar math for a lot of the other RPCs. And >>>> you >>>>>>> will >>>>>>>>>> have >>>>>>>>>>> a >>>>>>>>>>>> similar memory and garbage collection impact on the brokers >>>>>>> since >>>>>>>> you >>>>>>>>>>> have >>>>>>>>>>>> to store all this extra state as well. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> That is correct. IMO the Metadata is only updated periodically >>>>>>> and is >>>>>>>>>>> probably not a big deal if we increase it by 6%. The >>>>> FetchResponse >>>>>>>> and >>>>>>>>>>> ProduceRequest are probably the only requests that are bounded >>>>> by >>>>>>> the >>>>>>>>>>> bandwidth throughput. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I agree that we can probably save more space by using >>>>>>> partition >>>>>>>>> ID >>>>>>>>>> so >>>>>>>>>>>> that >>>>>>>>>>>>>> we no longer needs the string topic name. The similar >>>> idea >>>>>>> has >>>>>>>>> also >>>>>>>>>>>> been >>>>>>>>>>>>>> put in the Rejected Alternative section in KIP-227. >>>> While >>>>>>> this >>>>>>>>> idea >>>>>>>>>>> is >>>>>>>>>>>>>> promising, it seems orthogonal to the goal of this KIP. >>>>>>> Given >>>>>>>>> that >>>>>>>>>>>> there is >>>>>>>>>>>>>> already many work to do in this KIP, maybe we can do the >>>>>>>>> partition >>>>>>>>>> ID >>>>>>>>>>>> in a >>>>>>>>>>>>>> separate KIP? >>>>>>>>>>>> >>>>>>>>>>>> I guess my thinking is that the goal here is to replace an >>>>>>>> identifier >>>>>>>>>>>> which can be re-used (the tuple of topic name, partition ID) >>>>>>> with >>>>>>>> an >>>>>>>>>>>> identifier that cannot be re-used (the tuple of topic name, >>>>>>>> partition >>>>>>>>>> ID, >>>>>>>>>>>> partition epoch) in order to gain better semantics. As long >>>>> as >>>>>>> we >>>>>>>>> are >>>>>>>>>>>> replacing the identifier, why not replace it with an >>>>> identifier >>>>>>>> that >>>>>>>>>> has >>>>>>>>>>>> important performance advantages? The KIP freeze for the >>>> next >>>>>>>>> release >>>>>>>>>>> has >>>>>>>>>>>> already passed, so there is time to do this. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> In general it can be easier for discussion and implementation >>>> if >>>>>>> we >>>>>>>> can >>>>>>>>>>> split a larger task into smaller and independent tasks. For >>>>>>> example, >>>>>>>>>>> KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, >>>>>>> KIP-32 >>>>>>>>> and >>>>>>>>>>> KIP-33 are about timestamp support. The option on this can be >>>>>>> subject >>>>>>>>>>> though. >>>>>>>>>>> >>>>>>>>>>> IMO the change to switch from (topic, partition ID) to >>>>>>> partitionEpch >>>>>>>> in >>>>>>>>>> all >>>>>>>>>>> request/response requires us to going through all request one >>>> by >>>>>>> one. >>>>>>>>> It >>>>>>>>>>> may not be hard but it can be time consuming and tedious. At >>>>> high >>>>>>>> level >>>>>>>>>> the >>>>>>>>>>> goal and the change for that will be orthogonal to the changes >>>>>>>> required >>>>>>>>>> in >>>>>>>>>>> this KIP. That is the main reason I think we can split them >>>> into >>>>>>> two >>>>>>>>>> KIPs. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote: >>>>>>>>>>>>> I think it is possible to move to entirely use >>>>> partitionEpoch >>>>>>>>> instead >>>>>>>>>>> of >>>>>>>>>>>>> (topic, partition) to identify a partition. Client can >>>>> obtain >>>>>>> the >>>>>>>>>>>>> partitionEpoch -> (topic, partition) mapping from >>>>>>>> MetadataResponse. >>>>>>>>>> We >>>>>>>>>>>>> probably need to figure out a way to assign partitionEpoch >>>>> to >>>>>>>>>> existing >>>>>>>>>>>>> partitions in the cluster. But this should be doable. >>>>>>>>>>>>> >>>>>>>>>>>>> This is a good idea. I think it will save us some space in >>>>> the >>>>>>>>>>>>> request/response. The actual space saving in percentage >>>>>>> probably >>>>>>>>>>> depends >>>>>>>>>>>> on >>>>>>>>>>>>> the amount of data and the number of partitions of the >>>> same >>>>>>>> topic. >>>>>>>>> I >>>>>>>>>>> just >>>>>>>>>>>>> think we can do it in a separate KIP. >>>>>>>>>>>> >>>>>>>>>>>> Hmm. How much extra work would be required? It seems like >>>> we >>>>>>> are >>>>>>>>>>> already >>>>>>>>>>>> changing almost every RPC that involves topics and >>>> partitions, >>>>>>>>> already >>>>>>>>>>>> adding new per-partition state to ZooKeeper, already >>>> changing >>>>>>> how >>>>>>>>>> clients >>>>>>>>>>>> interact with partitions. Is there some other big piece of >>>>> work >>>>>>>> we'd >>>>>>>>>>> have >>>>>>>>>>>> to do to move to partition IDs that we wouldn't need for >>>>>>> partition >>>>>>>>>>> epochs? >>>>>>>>>>>> I guess we'd have to find a way to support regular >>>>>>> expression-based >>>>>>>>>> topic >>>>>>>>>>>> subscriptions. If we split this into multiple KIPs, >>>> wouldn't >>>>> we >>>>>>>> end >>>>>>>>> up >>>>>>>>>>>> changing all that RPCs and ZK state a second time? Also, >>>> I'm >>>>>>>> curious >>>>>>>>>> if >>>>>>>>>>>> anyone has done any proof of concept GC, memory, and network >>>>>>> usage >>>>>>>>>>>> measurements on switching topic names for topic IDs. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> We will need to go over all requests/responses to check how to >>>>>>>> replace >>>>>>>>>>> (topic, partition ID) with partition epoch. It requires >>>>>>> non-trivial >>>>>>>>> work >>>>>>>>>>> and could take time. As you mentioned, we may want to see how >>>>> much >>>>>>>>> saving >>>>>>>>>>> we can get by switching from topic names to partition epoch. >>>>> That >>>>>>>>> itself >>>>>>>>>>> requires time and experiment. It seems that the new idea does >>>>> not >>>>>>>>>> rollback >>>>>>>>>>> any change proposed in this KIP. So I am not sure we can get >>>>> much >>>>>>> by >>>>>>>>>>> putting them into the same KIP. >>>>>>>>>>> >>>>>>>>>>> Anyway, if more people are interested in seeing the new idea >>>> in >>>>>>> the >>>>>>>>> same >>>>>>>>>>> KIP, I can try that. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> best, >>>>>>>>>>>> Colin >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe < >>>>>>>>> cmcc...@apache.org >>>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote: >>>>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe < >>>>>>>>>>> cmcc...@apache.org> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: >>>>>>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the comment. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe < >>>>>>>>>>>> cmcc...@apache.org> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: >>>>>>>>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks for reviewing the KIP. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If I understand you right, you maybe >>>> suggesting >>>>>>> that >>>>>>>>> we >>>>>>>>>>> can >>>>>>>>>>>> use >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>>>> metadataEpoch that is incremented every time >>>>>>>>> controller >>>>>>>>>>>> updates >>>>>>>>>>>>>>>>> metadata. >>>>>>>>>>>>>>>>>>>> The problem with this solution is that, if a >>>>>>> topic >>>>>>>> is >>>>>>>>>>>> deleted >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> created >>>>>>>>>>>>>>>>>>>> again, user will not know whether that the >>>>> offset >>>>>>>>> which >>>>>>>>>> is >>>>>>>>>>>>>>> stored >>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>>> the topic deletion is no longer valid. This >>>>>>>> motivates >>>>>>>>>> the >>>>>>>>>>>> idea >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> include >>>>>>>>>>>>>>>>>>>> per-partition partitionEpoch. Does this sound >>>>>>>>>> reasonable? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Perhaps we can store the last valid offset of >>>>> each >>>>>>>>> deleted >>>>>>>>>>>> topic >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> ZooKeeper. Then, when a topic with one of >>>> those >>>>>>> names >>>>>>>>>> gets >>>>>>>>>>>>>>>>> re-created, we >>>>>>>>>>>>>>>>>>> can start the topic at the previous end offset >>>>>>> rather >>>>>>>>> than >>>>>>>>>>> at >>>>>>>>>>>> 0. >>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>>>> preserves immutability. It is no more >>>> burdensome >>>>>>> than >>>>>>>>>>> having >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> preserve a >>>>>>>>>>>>>>>>>>> "last epoch" for the deleted partition >>>> somewhere, >>>>>>>> right? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> My concern with this solution is that the number >>>> of >>>>>>>>>> zookeeper >>>>>>>>>>>> nodes >>>>>>>>>>>>>>> get >>>>>>>>>>>>>>>>>> more and more over time if some users keep >>>> deleting >>>>>>> and >>>>>>>>>>> creating >>>>>>>>>>>>>>> topics. >>>>>>>>>>>>>>>>> Do >>>>>>>>>>>>>>>>>> you think this can be a problem? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> We could expire the "partition tombstones" after an >>>>>>> hour >>>>>>>> or >>>>>>>>>> so. >>>>>>>>>>>> In >>>>>>>>>>>>>>>>> practice this would solve the issue for clients >>>> that >>>>>>> like >>>>>>>> to >>>>>>>>>>>> destroy >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> re-create topics all the time. In any case, >>>> doesn't >>>>>>> the >>>>>>>>>> current >>>>>>>>>>>>>>> proposal >>>>>>>>>>>>>>>>> add per-partition znodes as well that we have to >>>>> track >>>>>>>> even >>>>>>>>>>> after >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> partition is deleted? Or did I misunderstand that? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Actually the current KIP does not add per-partition >>>>>>> znodes. >>>>>>>>>> Could >>>>>>>>>>>> you >>>>>>>>>>>>>>>> double check? I can fix the KIP wiki if there is >>>>> anything >>>>>>>>>>>> misleading. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I double-checked the KIP, and I can see that you are in >>>>>>> fact >>>>>>>>>> using a >>>>>>>>>>>>>>> global counter for initializing partition epochs. So, >>>>> you >>>>>>> are >>>>>>>>>>>> correct, it >>>>>>>>>>>>>>> doesn't add per-partition znodes for partitions that no >>>>>>> longer >>>>>>>>>>> exist. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If we expire the "partition tomstones" after an hour, >>>>> and >>>>>>>> the >>>>>>>>>>> topic >>>>>>>>>>>> is >>>>>>>>>>>>>>>> re-created after more than an hour since the topic >>>>>>> deletion, >>>>>>>>>> then >>>>>>>>>>>> we are >>>>>>>>>>>>>>>> back to the situation where user can not tell whether >>>>> the >>>>>>>>> topic >>>>>>>>>>> has >>>>>>>>>>>> been >>>>>>>>>>>>>>>> re-created or not, right? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes, with an expiration period, it would not ensure >>>>>>>>> immutability-- >>>>>>>>>>> you >>>>>>>>>>>>>>> could effectively reuse partition names and they would >>>>> look >>>>>>>> the >>>>>>>>>>> same. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It's not really clear to me what should happen >>>> when a >>>>>>>> topic >>>>>>>>> is >>>>>>>>>>>>>>> destroyed >>>>>>>>>>>>>>>>> and re-created with new data. Should consumers >>>>>>> continue >>>>>>>> to >>>>>>>>> be >>>>>>>>>>>> able to >>>>>>>>>>>>>>>>> consume? We don't know where they stopped >>>> consuming >>>>>>> from >>>>>>>>> the >>>>>>>>>>>> previous >>>>>>>>>>>>>>>>> incarnation of the topic, so messages may have been >>>>>>> lost. >>>>>>>>>>>> Certainly >>>>>>>>>>>>>>>>> consuming data from offset X of the new incarnation >>>>> of >>>>>>> the >>>>>>>>>> topic >>>>>>>>>>>> may >>>>>>>>>>>>>>> give >>>>>>>>>>>>>>>>> something totally different from what you would >>>> have >>>>>>>> gotten >>>>>>>>>> from >>>>>>>>>>>>>>> offset X >>>>>>>>>>>>>>>>> of the previous incarnation of the topic. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> With the current KIP, if a consumer consumes a topic >>>>>>> based >>>>>>>> on >>>>>>>>>> the >>>>>>>>>>>> last >>>>>>>>>>>>>>>> remembered (offset, partitionEpoch, leaderEpoch), and >>>>> if >>>>>>> the >>>>>>>>>> topic >>>>>>>>>>>> is >>>>>>>>>>>>>>>> re-created, consume will throw >>>>>>>> InvalidPartitionEpochException >>>>>>>>>>>> because >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> previous partitionEpoch will be different from the >>>>>>> current >>>>>>>>>>>>>>> partitionEpoch. >>>>>>>>>>>>>>>> This is described in the Proposed Changes -> >>>>> Consumption >>>>>>>> after >>>>>>>>>>> topic >>>>>>>>>>>>>>>> deletion in the KIP. I can improve the KIP if there >>>> is >>>>>>>>> anything >>>>>>>>>>> not >>>>>>>>>>>>>>> clear. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the clarification. It sounds like what you >>>>>>> really >>>>>>>>> want >>>>>>>>>>> is >>>>>>>>>>>>>>> immutability-- i.e., to never "really" reuse partition >>>>>>>>>> identifiers. >>>>>>>>>>>> And >>>>>>>>>>>>>>> you do this by making the partition name no longer the >>>>>>> "real" >>>>>>>>>>>> identifier. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> My big concern about this KIP is that it seems like an >>>>>>>>>>>> anti-scalability >>>>>>>>>>>>>>> feature. Now we are adding 4 extra bytes for every >>>>>>> partition >>>>>>>> in >>>>>>>>>> the >>>>>>>>>>>>>>> FetchResponse and Request, for example. That could be >>>> 40 >>>>>>> kb >>>>>>>> per >>>>>>>>>>>> request, >>>>>>>>>>>>>>> if the user has 10,000 partitions. And of course, the >>>>> KIP >>>>>>>> also >>>>>>>>>>> makes >>>>>>>>>>>>>>> massive changes to UpdateMetadataRequest, >>>>> MetadataResponse, >>>>>>>>>>>>>>> OffsetCommitRequest, OffsetFetchResponse, >>>>>>> LeaderAndIsrRequest, >>>>>>>>>>>>>>> ListOffsetResponse, etc. which will also increase their >>>>>>> size >>>>>>>> on >>>>>>>>>> the >>>>>>>>>>>> wire >>>>>>>>>>>>>>> and in memory. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> One thing that we talked a lot about in the past is >>>>>>> replacing >>>>>>>>>>>> partition >>>>>>>>>>>>>>> names with IDs. IDs have a lot of really nice >>>> features. >>>>>>> They >>>>>>>>>> take >>>>>>>>>>>> up much >>>>>>>>>>>>>>> less space in memory than strings (especially 2-byte >>>> Java >>>>>>>>>> strings). >>>>>>>>>>>> They >>>>>>>>>>>>>>> can often be allocated on the stack rather than the >>>> heap >>>>>>>>>> (important >>>>>>>>>>>> when >>>>>>>>>>>>>>> you are dealing with hundreds of thousands of them). >>>>> They >>>>>>> can >>>>>>>>> be >>>>>>>>>>>>>>> efficiently deserialized and serialized. If we use >>>>> 64-bit >>>>>>>> ones, >>>>>>>>>> we >>>>>>>>>>>> will >>>>>>>>>>>>>>> never run out of IDs, which means that they can always >>>> be >>>>>>>> unique >>>>>>>>>> per >>>>>>>>>>>>>>> partition. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Given that the partition name is no longer the "real" >>>>>>>> identifier >>>>>>>>>> for >>>>>>>>>>>>>>> partitions in the current KIP-232 proposal, why not >>>> just >>>>>>> move >>>>>>>> to >>>>>>>>>>> using >>>>>>>>>>>>>>> partition IDs entirely instead of strings? You have to >>>>>>> change >>>>>>>>> all >>>>>>>>>>> the >>>>>>>>>>>>>>> messages anyway. There isn't much point any more to >>>>>>> carrying >>>>>>>>>> around >>>>>>>>>>>> the >>>>>>>>>>>>>>> partition name in every RPC, since you really need >>>> (name, >>>>>>>> epoch) >>>>>>>>>> to >>>>>>>>>>>>>>> identify the partition. >>>>>>>>>>>>>>> Probably the metadata response and a few other messages >>>>>>> would >>>>>>>>> have >>>>>>>>>>> to >>>>>>>>>>>>>>> still carry the partition name, to allow clients to go >>>>> from >>>>>>>> name >>>>>>>>>> to >>>>>>>>>>>> id. >>>>>>>>>>>>>>> But we could mostly forget about the strings. And then >>>>>>> this >>>>>>>>> would >>>>>>>>>>> be >>>>>>>>>>>> a >>>>>>>>>>>>>>> scalability improvement rather than a scalability >>>>> problem. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> By choosing to reuse the same (topic, partition, >>>>>>> offset) >>>>>>>>>>> 3-tuple, >>>>>>>>>>>> we >>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> chosen to give up immutability. That was a really >>>> bad >>>>>>>>> decision. >>>>>>>>>>>> And >>>>>>>>>>>>>>> now >>>>>>>>>>>>>>>>> we have to worry about time dependencies, stale >>>>> cached >>>>>>>> data, >>>>>>>>>> and >>>>>>>>>>>> all >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> rest. We can't completely fix this inside Kafka no >>>>>>> matter >>>>>>>>>> what >>>>>>>>>>>> we do, >>>>>>>>>>>>>>>>> because not all that cached data is inside Kafka >>>>>>> itself. >>>>>>>>> Some >>>>>>>>>>> of >>>>>>>>>>>> it >>>>>>>>>>>>>>> may be >>>>>>>>>>>>>>>>> in systems that Kafka has sent data to, such as >>>> other >>>>>>>>> daemons, >>>>>>>>>>> SQL >>>>>>>>>>>>>>>>> databases, streams, and so forth. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The current KIP will uniquely identify a message >>>> using >>>>>>>> (topic, >>>>>>>>>>>>>>> partition, >>>>>>>>>>>>>>>> offset, partitionEpoch) 4-tuple. This addresses the >>>>>>> message >>>>>>>>>>>> immutability >>>>>>>>>>>>>>>> issue that you mentioned. Is there any corner case >>>>> where >>>>>>> the >>>>>>>>>>> message >>>>>>>>>>>>>>>> immutability is still not preserved with the current >>>>> KIP? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I guess the idea here is that mirror maker should >>>>> work >>>>>>> as >>>>>>>>>>> expected >>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>> users destroy a topic and re-create it with the >>>> same >>>>>>> name. >>>>>>>>>>> That's >>>>>>>>>>>>>>> kind of >>>>>>>>>>>>>>>>> tough, though, since in that scenario, mirror maker >>>>>>>> probably >>>>>>>>>>>> should >>>>>>>>>>>>>>> destroy >>>>>>>>>>>>>>>>> and re-create the topic on the other end, too, >>>> right? >>>>>>>>>>> Otherwise, >>>>>>>>>>>>>>> what you >>>>>>>>>>>>>>>>> end up with on the other end could be half of one >>>>>>>>> incarnation >>>>>>>>>> of >>>>>>>>>>>> the >>>>>>>>>>>>>>> topic, >>>>>>>>>>>>>>>>> and half of another. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What mirror maker really needs is to be able to >>>>> follow >>>>>>> a >>>>>>>>>> stream >>>>>>>>>>> of >>>>>>>>>>>>>>> events >>>>>>>>>>>>>>>>> about the kafka cluster itself. We could have some >>>>>>> master >>>>>>>>>> topic >>>>>>>>>>>>>>> which is >>>>>>>>>>>>>>>>> always present and which contains data about all >>>>> topic >>>>>>>>>>> deletions, >>>>>>>>>>>>>>>>> creations, etc. Then MM can simply follow this >>>> topic >>>>>>> and >>>>>>>> do >>>>>>>>>>> what >>>>>>>>>>>> is >>>>>>>>>>>>>>> needed. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Then the next question maybe, should we use a >>>>>>> global >>>>>>>>>>>>>>> metadataEpoch + >>>>>>>>>>>>>>>>>>>> per-partition partitionEpoch, instead of >>>> using >>>>>>>>>>> per-partition >>>>>>>>>>>>>>>>> leaderEpoch >>>>>>>>>>>>>>>>>>> + >>>>>>>>>>>>>>>>>>>> per-partition leaderEpoch. The former >>>> solution >>>>>>> using >>>>>>>>>>>>>>> metadataEpoch >>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>> not work due to the following scenario >>>>> (provided >>>>>>> by >>>>>>>>>> Jun): >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> "Consider the following scenario. In metadata >>>>> v1, >>>>>>>> the >>>>>>>>>>> leader >>>>>>>>>>>>>>> for a >>>>>>>>>>>>>>>>>>>> partition is at broker 1. In metadata v2, >>>>> leader >>>>>>> is >>>>>>>> at >>>>>>>>>>>> broker >>>>>>>>>>>>>>> 2. In >>>>>>>>>>>>>>>>>>>> metadata v3, leader is at broker 1 again. The >>>>>>> last >>>>>>>>>>> committed >>>>>>>>>>>>>>> offset >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>> v1, >>>>>>>>>>>>>>>>>>>> v2 and v3 are 10, 20 and 30, respectively. A >>>>>>>> consumer >>>>>>>>> is >>>>>>>>>>>>>>> started and >>>>>>>>>>>>>>>>>>> reads >>>>>>>>>>>>>>>>>>>> metadata v1 and reads messages from offset 0 >>>> to >>>>>>> 25 >>>>>>>>> from >>>>>>>>>>>> broker >>>>>>>>>>>>>>> 1. My >>>>>>>>>>>>>>>>>>>> understanding is that in the current >>>> proposal, >>>>>>> the >>>>>>>>>>> metadata >>>>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>>>> associated with offset 25 is v1. The consumer >>>>> is >>>>>>>> then >>>>>>>>>>>> restarted >>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> fetches >>>>>>>>>>>>>>>>>>>> metadata v2. The consumer tries to read from >>>>>>> broker >>>>>>>> 2, >>>>>>>>>>>> which is >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>>>> leader with the last offset at 20. In this >>>>> case, >>>>>>> the >>>>>>>>>>>> consumer >>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> still >>>>>>>>>>>>>>>>>>>> get OffsetOutOfRangeException incorrectly." >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regarding your comment "For the second >>>> purpose, >>>>>>> this >>>>>>>>> is >>>>>>>>>>>> "soft >>>>>>>>>>>>>>> state" >>>>>>>>>>>>>>>>>>>> anyway. If the client thinks X is the leader >>>>>>> but Y >>>>>>>> is >>>>>>>>>>>> really >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> leader, >>>>>>>>>>>>>>>>>>>> the client will talk to X, and X will point >>>> out >>>>>>> its >>>>>>>>>>> mistake >>>>>>>>>>>> by >>>>>>>>>>>>>>>>> sending >>>>>>>>>>>>>>>>>>> back >>>>>>>>>>>>>>>>>>>> a NOT_LEADER_FOR_PARTITION.", it is probably >>>> no >>>>>>>> true. >>>>>>>>>> The >>>>>>>>>>>>>>> problem >>>>>>>>>>>>>>>>> here is >>>>>>>>>>>>>>>>>>>> that the old leader X may still think it is >>>> the >>>>>>>> leader >>>>>>>>>> of >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> thus it will not send back >>>>>>> NOT_LEADER_FOR_PARTITION. >>>>>>>>> The >>>>>>>>>>>> reason >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>> provided >>>>>>>>>>>>>>>>>>>> in KAFKA-6262. Can you check if that makes >>>>> sense? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is solvable with a timeout, right? If the >>>>>>> leader >>>>>>>>>> can't >>>>>>>>>>>>>>>>> communicate >>>>>>>>>>>>>>>>>>> with the controller for a certain period of >>>> time, >>>>>>> it >>>>>>>>>> should >>>>>>>>>>>> stop >>>>>>>>>>>>>>>>> acting as >>>>>>>>>>>>>>>>>>> the leader. We have to solve this problem, >>>>>>> anyway, in >>>>>>>>>> order >>>>>>>>>>>> to >>>>>>>>>>>>>>> fix >>>>>>>>>>>>>>>>> all the >>>>>>>>>>>>>>>>>>> corner cases. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Not sure if I fully understand your proposal. The >>>>>>>> proposal >>>>>>>>>>>> seems to >>>>>>>>>>>>>>>>> require >>>>>>>>>>>>>>>>>> non-trivial changes to our existing leadership >>>>>>> election >>>>>>>>>>>> mechanism. >>>>>>>>>>>>>>> Could >>>>>>>>>>>>>>>>>> you provide more detail regarding how it works? >>>> For >>>>>>>>> example, >>>>>>>>>>> how >>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>> user choose this timeout, how leader determines >>>>>>> whether >>>>>>>> it >>>>>>>>>> can >>>>>>>>>>>> still >>>>>>>>>>>>>>>>>> communicate with controller, and how this >>>> triggers >>>>>>>>>> controller >>>>>>>>>>> to >>>>>>>>>>>>>>> elect >>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>> leader? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Before I come up with any proposal, let me make >>>> sure >>>>> I >>>>>>>>>>> understand >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> problem correctly. My big question was, what >>>>> prevents >>>>>>>>>>> split-brain >>>>>>>>>>>>>>> here? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Let's say I have a partition which is on nodes A, >>>> B, >>>>>>> and >>>>>>>> C, >>>>>>>>>> with >>>>>>>>>>>>>>> min-ISR >>>>>>>>>>>>>>>>> 2. The controller is D. At some point, there is a >>>>>>>> network >>>>>>>>>>>> partition >>>>>>>>>>>>>>>>> between A and B and the rest of the cluster. The >>>>>>>> Controller >>>>>>>>>>>>>>> re-assigns the >>>>>>>>>>>>>>>>> partition to nodes C, D, and E. But A and B keep >>>>>>> chugging >>>>>>>>>> away, >>>>>>>>>>>> even >>>>>>>>>>>>>>>>> though they can no longer communicate with the >>>>>>> controller. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> At some point, a client with stale metadata writes >>>> to >>>>>>> the >>>>>>>>>>>> partition. >>>>>>>>>>>>>>> It >>>>>>>>>>>>>>>>> still thinks the partition is on node A, B, and C, >>>> so >>>>>>>> that's >>>>>>>>>>>> where it >>>>>>>>>>>>>>> sends >>>>>>>>>>>>>>>>> the data. It's unable to talk to C, but A and B >>>>> reply >>>>>>>> back >>>>>>>>>> that >>>>>>>>>>>> all >>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> well. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is this not a case where we could lose data due to >>>>>>> split >>>>>>>>>> brain? >>>>>>>>>>>> Or is >>>>>>>>>>>>>>>>> there a mechanism for preventing this that I >>>> missed? >>>>>>> If >>>>>>>> it >>>>>>>>>> is, >>>>>>>>>>> it >>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>> like a pretty serious failure case that we should >>>> be >>>>>>>>> handling >>>>>>>>>>>> with our >>>>>>>>>>>>>>>>> metadata rework. And I think epoch numbers and >>>>>>> timeouts >>>>>>>>> might >>>>>>>>>>> be >>>>>>>>>>>>>>> part of >>>>>>>>>>>>>>>>> the solution. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Right, split brain can happen if RF=4 and minIsr=2. >>>>>>>> However, I >>>>>>>>>> am >>>>>>>>>>>> not >>>>>>>>>>>>>>> sure >>>>>>>>>>>>>>>> it is a pretty serious issue which we need to address >>>>>>> today. >>>>>>>>>> This >>>>>>>>>>>> can be >>>>>>>>>>>>>>>> prevented by configuring the Kafka topic so that >>>>> minIsr > >>>>>>>>> RF/2. >>>>>>>>>>>>>>> Actually, >>>>>>>>>>>>>>>> if user sets minIsr=2, is there anything reason that >>>>> user >>>>>>>>> wants >>>>>>>>>> to >>>>>>>>>>>> set >>>>>>>>>>>>>>> RF=4 >>>>>>>>>>>>>>>> instead of 4? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Introducing timeout in leader election mechanism is >>>>>>>>>> non-trivial. I >>>>>>>>>>>>>>> think we >>>>>>>>>>>>>>>> probably want to do that only if there is good >>>> use-case >>>>>>> that >>>>>>>>> can >>>>>>>>>>> not >>>>>>>>>>>>>>>> otherwise be addressed with the current mechanism. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I still would like to think about these corner cases >>>>> more. >>>>>>>> But >>>>>>>>>>>> perhaps >>>>>>>>>>>>>>> it's not directly related to this KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> regards, >>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> best, >>>>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> best, >>>>>>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 10:39 AM, Colin >>>> McCabe >>>>> < >>>>>>>>>>>>>>> cmcc...@apache.org> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks for proposing this KIP. I think a >>>>>>> metadata >>>>>>>>>> epoch >>>>>>>>>>>> is a >>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>>>> idea. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I read through the DISCUSS thread, but I >>>>> still >>>>>>>> don't >>>>>>>>>>> have >>>>>>>>>>>> a >>>>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>>>> picture >>>>>>>>>>>>>>>>>>>>> of why the proposal uses a metadata epoch >>>> per >>>>>>>>>> partition >>>>>>>>>>>> rather >>>>>>>>>>>>>>>>> than a >>>>>>>>>>>>>>>>>>>>> global metadata epoch. A metadata epoch >>>> per >>>>>>>>> partition >>>>>>>>>>> is >>>>>>>>>>>>>>> kind of >>>>>>>>>>>>>>>>>>>>> unpleasant-- it's at least 4 extra bytes >>>> per >>>>>>>>> partition >>>>>>>>>>>> that we >>>>>>>>>>>>>>>>> have to >>>>>>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>>>> over the wire in every full metadata >>>> request, >>>>>>>> which >>>>>>>>>>> could >>>>>>>>>>>>>>> become >>>>>>>>>>>>>>>>> extra >>>>>>>>>>>>>>>>>>>>> kilobytes on the wire when the number of >>>>>>>> partitions >>>>>>>>>>>> becomes >>>>>>>>>>>>>>> large. >>>>>>>>>>>>>>>>>>> Plus, >>>>>>>>>>>>>>>>>>>>> we have to update all the auxillary classes >>>>> to >>>>>>>>> include >>>>>>>>>>> an >>>>>>>>>>>>>>> epoch. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> We need to have a global metadata epoch >>>>> anyway >>>>>>> to >>>>>>>>>> handle >>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>>>> addition and deletion. For example, if I >>>>> give >>>>>>> you >>>>>>>>>>>>>>>>>>>>> MetadataResponse{part1,epoch 1, part2, >>>> epoch >>>>> 1} >>>>>>>> and >>>>>>>>>>>> {part1, >>>>>>>>>>>>>>>>> epoch1}, >>>>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> MetadataResponse is newer? You have no way >>>>> of >>>>>>>>>> knowing. >>>>>>>>>>>> It >>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> part2 has just been created, and the >>>> response >>>>>>>> with 2 >>>>>>>>>>>>>>> partitions is >>>>>>>>>>>>>>>>>>> newer. >>>>>>>>>>>>>>>>>>>>> Or it coudl be that part2 has just been >>>>>>> deleted, >>>>>>>> and >>>>>>>>>>>>>>> therefore the >>>>>>>>>>>>>>>>>>> response >>>>>>>>>>>>>>>>>>>>> with 1 partition is newer. You must have a >>>>>>> global >>>>>>>>>> epoch >>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> disambiguate >>>>>>>>>>>>>>>>>>>>> these two cases. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Previously, I worked on the Ceph >>>> distributed >>>>>>>>>> filesystem. >>>>>>>>>>>>>>> Ceph had >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> concept of a map of the whole cluster, >>>>>>> maintained >>>>>>>>> by a >>>>>>>>>>> few >>>>>>>>>>>>>>> servers >>>>>>>>>>>>>>>>>>> doing >>>>>>>>>>>>>>>>>>>>> paxos. This map was versioned by a single >>>>>>> 64-bit >>>>>>>>>> epoch >>>>>>>>>>>> number >>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> increased on every change. It was >>>> propagated >>>>>>> to >>>>>>>>>> clients >>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>> gossip. I >>>>>>>>>>>>>>>>>>>>> wonder if something similar could work >>>> here? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> It seems like the the Kafka >>>> MetadataResponse >>>>>>>> serves >>>>>>>>>> two >>>>>>>>>>>>>>> somewhat >>>>>>>>>>>>>>>>>>> unrelated >>>>>>>>>>>>>>>>>>>>> purposes. Firstly, it lets clients know >>>> what >>>>>>>>>> partitions >>>>>>>>>>>>>>> exist in >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> system and where they live. Secondly, it >>>>> lets >>>>>>>>> clients >>>>>>>>>>>> know >>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>> nodes >>>>>>>>>>>>>>>>>>>>> within the partition are in-sync (in the >>>> ISR) >>>>>>> and >>>>>>>>>> which >>>>>>>>>>>> node >>>>>>>>>>>>>>> is the >>>>>>>>>>>>>>>>>>> leader. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The first purpose is what you really need a >>>>>>>> metadata >>>>>>>>>>> epoch >>>>>>>>>>>>>>> for, I >>>>>>>>>>>>>>>>>>> think. >>>>>>>>>>>>>>>>>>>>> You want to know whether a partition exists >>>>> or >>>>>>>> not, >>>>>>>>> or >>>>>>>>>>> you >>>>>>>>>>>>>>> want to >>>>>>>>>>>>>>>>> know >>>>>>>>>>>>>>>>>>>>> which nodes you should talk to in order to >>>>>>> write >>>>>>>> to >>>>>>>>> a >>>>>>>>>>>> given >>>>>>>>>>>>>>>>>>> partition. A >>>>>>>>>>>>>>>>>>>>> single metadata epoch for the whole >>>> response >>>>>>>> should >>>>>>>>> be >>>>>>>>>>>>>>> adequate >>>>>>>>>>>>>>>>> here. >>>>>>>>>>>>>>>>>>> We >>>>>>>>>>>>>>>>>>>>> should not change the partition assignment >>>>>>> without >>>>>>>>>> going >>>>>>>>>>>>>>> through >>>>>>>>>>>>>>>>>>> zookeeper >>>>>>>>>>>>>>>>>>>>> (or a similar system), and this inherently >>>>>>>>> serializes >>>>>>>>>>>> updates >>>>>>>>>>>>>>> into >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>> numbered stream. Brokers should also stop >>>>>>>>> responding >>>>>>>>>> to >>>>>>>>>>>>>>> requests >>>>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>>>> are unable to contact ZK for a certain time >>>>>>>> period. >>>>>>>>>>> This >>>>>>>>>>>>>>> prevents >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>>>> where a given partition has been moved off >>>>> some >>>>>>>> set >>>>>>>>> of >>>>>>>>>>>> nodes, >>>>>>>>>>>>>>> but a >>>>>>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>>>>> still ends up talking to those nodes and >>>>>>> writing >>>>>>>>> data >>>>>>>>>>>> there. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For the second purpose, this is "soft >>>> state" >>>>>>>> anyway. >>>>>>>>>> If >>>>>>>>>>>> the >>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>>> thinks >>>>>>>>>>>>>>>>>>>>> X is the leader but Y is really the leader, >>>>> the >>>>>>>>> client >>>>>>>>>>>> will >>>>>>>>>>>>>>> talk >>>>>>>>>>>>>>>>> to X, >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> X will point out its mistake by sending >>>> back >>>>> a >>>>>>>>>>>>>>>>>>> NOT_LEADER_FOR_PARTITION. >>>>>>>>>>>>>>>>>>>>> Then the client can update its metadata >>>> again >>>>>>> and >>>>>>>>> find >>>>>>>>>>>> the new >>>>>>>>>>>>>>>>> leader, >>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>> there is one. There is no need for an >>>> epoch >>>>> to >>>>>>>>> handle >>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>> Similarly, I >>>>>>>>>>>>>>>>>>>>> can't think of a reason why changing the >>>>>>> in-sync >>>>>>>>>> replica >>>>>>>>>>>> set >>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> bump >>>>>>>>>>>>>>>>>>>>> the epoch. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> best, >>>>>>>>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 09:45, Dong Lin >>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the KIP! >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang >>>>>>> Wang < >>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Yeah that makes sense, again I'm just >>>>>>> making >>>>>>>>> sure >>>>>>>>>> we >>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>>>> all the >>>>>>>>>>>>>>>>>>>>>>> scenarios and what to expect. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I agree that if, more generally >>>> speaking, >>>>>>> say >>>>>>>>>> users >>>>>>>>>>>> have >>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>> consumed >>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> offset 8, and then call seek(16) to >>>>> "jump" >>>>>>> to >>>>>>>> a >>>>>>>>>>>> further >>>>>>>>>>>>>>>>> position, >>>>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>>>> she >>>>>>>>>>>>>>>>>>>>>>> needs to be aware that OORE maybe >>>> thrown >>>>>>> and >>>>>>>> she >>>>>>>>>>>> needs to >>>>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>>>> it or >>>>>>>>>>>>>>>>>>>>> rely >>>>>>>>>>>>>>>>>>>>>>> on reset policy which should not >>>> surprise >>>>>>> her. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 12:31 AM, Dong >>>>> Lin >>>>>>> < >>>>>>>>>>>>>>>>> lindon...@gmail.com> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Yes, in general we can not prevent >>>>>>>>>>>>>>> OffsetOutOfRangeException >>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>>>> seeks >>>>>>>>>>>>>>>>>>>>>>>> to a wrong offset. The main goal is >>>> to >>>>>>>> prevent >>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException >>>>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>>>> user has done things in the right >>>> way, >>>>>>> e.g. >>>>>>>>> user >>>>>>>>>>>> should >>>>>>>>>>>>>>> know >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>>> message with this offset. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For example, if user calls seek(..) >>>>> right >>>>>>>>> after >>>>>>>>>>>>>>>>> construction, the >>>>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>>>> reason I can think of is that user >>>>> stores >>>>>>>>> offset >>>>>>>>>>>>>>> externally. >>>>>>>>>>>>>>>>> In >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>>>>>>>>> user currently needs to use the >>>> offset >>>>>>> which >>>>>>>>> is >>>>>>>>>>>> obtained >>>>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>>>>> position(..) >>>>>>>>>>>>>>>>>>>>>>>> from the last run. With this KIP, >>>> user >>>>>>> needs >>>>>>>>> to >>>>>>>>>>> get >>>>>>>>>>>> the >>>>>>>>>>>>>>>>> offset >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> offsetEpoch using >>>>>>>> positionAndOffsetEpoch(...) >>>>>>>>>> and >>>>>>>>>>>> stores >>>>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>>>> externally. The next time user starts >>>>>>>>> consumer, >>>>>>>>>>>> he/she >>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>>>> to >>>>>>> >
signature.asc
Description: OpenPGP digital signature