It seems that KIP-320 was accepted. Thus, I am wondering what the status of this KIP is?
-Matthias On 7/11/18 10:59 AM, Dong Lin wrote: > Hey Jun, > > Certainly. We can discuss later after KIP-320 settles. > > Thanks! > Dong > > > On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Sorry for the late response. Since KIP-320 is covering some of the similar >> problems described in this KIP, perhaps we can wait until KIP-320 settles >> and see what's still left uncovered in this KIP. >> >> Thanks, >> >> Jun >> >> On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote: >> >>> Hey Jun, >>> >>> It seems that we have made considerable progress on the discussion of >>> KIP-253 since February. Do you think we should continue the discussion >>> there, or can we continue the voting for this KIP? I am happy to submit >> the >>> PR and move forward the progress for this KIP. >>> >>> Thanks! >>> Dong >>> >>> >>> On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote: >>> >>>> Hey Jun, >>>> >>>> Sure, I will come up with a KIP this week. I think there is a way to >>> allow >>>> partition expansion to arbitrary number without introducing new >> concepts >>>> such as read-only partition or repartition epoch. >>>> >>>> Thanks, >>>> Dong >>>> >>>> On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote: >>>> >>>>> Hi, Dong, >>>>> >>>>> Thanks for the reply. The general idea that you had for adding >>> partitions >>>>> is similar to what we had in mind. It would be useful to make this >> more >>>>> general, allowing adding an arbitrary number of partitions (instead of >>>>> just >>>>> doubling) and potentially removing partitions as well. The following >> is >>>>> the >>>>> high level idea from the discussion with Colin, Jason and Ismael. >>>>> >>>>> * To change the number of partitions from X to Y in a topic, the >>>>> controller >>>>> marks all existing X partitions as read-only and creates Y new >>> partitions. >>>>> The new partitions are writable and are tagged with a higher >> repartition >>>>> epoch (RE). >>>>> >>>>> * The controller propagates the new metadata to every broker. Once the >>>>> leader of a partition is marked as read-only, it rejects the produce >>>>> requests on this partition. The producer will then refresh the >> metadata >>>>> and >>>>> start publishing to the new writable partitions. >>>>> >>>>> * The consumers will then be consuming messages in RE order. The >>> consumer >>>>> coordinator will only assign partitions in the same RE to consumers. >>> Only >>>>> after all messages in an RE are consumed, will partitions in a higher >> RE >>>>> be >>>>> assigned to consumers. >>>>> >>>>> As Colin mentioned, if we do the above, we could potentially (1) use a >>>>> globally unique partition id, or (2) use a globally unique topic id to >>>>> distinguish recreated partitions due to topic deletion. >>>>> >>>>> So, perhaps we can sketch out the re-partitioning KIP a bit more and >> see >>>>> if >>>>> there is any overlap with KIP-232. Would you be interested in doing >>> that? >>>>> If not, we can do that next week. >>>>> >>>>> Jun >>>>> >>>>> >>>>> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com> >> wrote: >>>>> >>>>>> Hey Jun, >>>>>> >>>>>> Interestingly I am also planning to sketch a KIP to allow partition >>>>>> expansion for keyed topics after this KIP. Since you are already >> doing >>>>>> that, I guess I will just share my high level idea here in case it >> is >>>>>> helpful. >>>>>> >>>>>> The motivation for the KIP is that we currently lose order guarantee >>> for >>>>>> messages with the same key if we expand partitions of keyed topic. >>>>>> >>>>>> The solution can probably be built upon the following ideas: >>>>>> >>>>>> - Partition number of the keyed topic should always be doubled (or >>>>>> multiplied by power of 2). Given that we select a partition based on >>>>>> hash(key) % partitionNum, this should help us ensure that, a message >>>>>> assigned to an existing partition will not be mapped to another >>> existing >>>>>> partition after partition expansion. >>>>>> >>>>>> - Producer includes in the ProduceRequest some information that >> helps >>>>>> ensure that messages produced ti a partition will monotonically >>>>> increase in >>>>>> the partitionNum of the topic. In other words, if broker receives a >>>>>> ProduceRequest and notices that the producer does not know the >>> partition >>>>>> number has increased, broker should reject this request. That >>>>> "information" >>>>>> maybe leaderEpoch, max partitionEpoch of the partitions of the >> topic, >>> or >>>>>> simply partitionNum of the topic. The benefit of this property is >> that >>>>> we >>>>>> can keep the new logic for in-order message consumption entirely in >>> how >>>>>> consumer leader determines the partition -> consumer mapping. >>>>>> >>>>>> - When consumer leader determines partition -> consumer mapping, >>> leader >>>>>> first reads the start position for each partition using >>>>> OffsetFetchRequest. >>>>>> If start position are all non-zero, then assignment can be done in >> its >>>>>> current manner. The assumption is that, a message in the new >> partition >>>>>> should only be consumed after all messages with the same key >> produced >>>>>> before it has been consumed. Since some messages in the new >> partition >>>>> has >>>>>> been consumed, we should not worry about consuming messages >>>>> out-of-order. >>>>>> This benefit of this approach is that we can avoid unnecessary >>> overhead >>>>> in >>>>>> the common case. >>>>>> >>>>>> - If the consumer leader finds that the start position for some >>>>> partition >>>>>> is 0. Say the current partition number is 18 and the partition index >>> is >>>>> 12, >>>>>> then consumer leader should ensure that messages produced to >> partition >>>>> 12 - >>>>>> 18/2 = 3 before the first message of partition 12 is consumed, >> before >>> it >>>>>> assigned partition 12 to any consumer in the consumer group. Since >> we >>>>> have >>>>>> a "information" that is monotonically increasing per partition, >>> consumer >>>>>> can read the value of this information from the first message in >>>>> partition >>>>>> 12, get the offset corresponding to this value in partition 3, >> assign >>>>>> partition except for partition 12 (and probably other new >> partitions) >>> to >>>>>> the existing consumers, waiting for the committed offset to go >> beyond >>>>> this >>>>>> offset for partition 3, and trigger rebalance again so that >> partition >>> 3 >>>>> can >>>>>> be reassigned to some consumer. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Dong >>>>>> >>>>>> >>>>>> On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote: >>>>>> >>>>>>> Hi, Dong, >>>>>>> >>>>>>> Thanks for the KIP. It looks good overall. We are working on a >>>>> separate >>>>>> KIP >>>>>>> for adding partitions while preserving the ordering guarantees. >> That >>>>> may >>>>>>> require another flavor of partition epoch. It's not very clear >>> whether >>>>>> that >>>>>>> partition epoch can be merged with the partition epoch in this >> KIP. >>>>> So, >>>>>>> perhaps you can wait on this a bit until we post the other KIP in >>> the >>>>>> next >>>>>>> few days. >>>>>>> >>>>>>> Jun >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com> >>>>> wrote: >>>>>>> >>>>>>>> +1 on the KIP. >>>>>>>> >>>>>>>> I think the KIP is mainly about adding the capability of >> tracking >>>>> the >>>>>>>> system state change lineage. It does not seem necessary to >> bundle >>>>> this >>>>>>> KIP >>>>>>>> with replacing the topic partition with partition epoch in >>>>>> produce/fetch. >>>>>>>> Replacing topic-partition string with partition epoch is >>>>> essentially a >>>>>>>> performance improvement on top of this KIP. That can probably be >>>>> done >>>>>>>> separately. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Jiangjie (Becket) Qin >>>>>>>> >>>>>>>> On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com >>> >>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Colin, >>>>>>>>> >>>>>>>>> On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe < >>>>> cmcc...@apache.org> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>>> On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin < >>>>> lindon...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey Colin, >>>>>>>>>>>> >>>>>>>>>>>> I understand that the KIP will adds overhead by >>> introducing >>>>>>>>>> per-partition >>>>>>>>>>>> partitionEpoch. I am open to alternative solutions that >>> does >>>>>> not >>>>>>>>> incur >>>>>>>>>>>> additional overhead. But I don't see a better way now. >>>>>>>>>>>> >>>>>>>>>>>> IMO the overhead in the FetchResponse may not be that >>> much. >>>>> We >>>>>>>>> probably >>>>>>>>>>>> should discuss the percentage increase rather than the >>>>> absolute >>>>>>>>> number >>>>>>>>>>>> increase. Currently after KIP-227, per-partition header >>> has >>>>> 23 >>>>>>>> bytes. >>>>>>>>>> This >>>>>>>>>>>> KIP adds another 4 bytes. Assume the records size is >> 10KB, >>>>> the >>>>>>>>>> percentage >>>>>>>>>>>> increase is 4 / (23 + 10000) = 0.03%. It seems >> negligible, >>>>>> right? >>>>>>>>>> >>>>>>>>>> Hi Dong, >>>>>>>>>> >>>>>>>>>> Thanks for the response. I agree that the FetchRequest / >>>>>>> FetchResponse >>>>>>>>>> overhead should be OK, now that we have incremental fetch >>>>> requests >>>>>>> and >>>>>>>>>> responses. However, there are a lot of cases where the >>>>> percentage >>>>>>>>> increase >>>>>>>>>> is much greater. For example, if a client is doing full >>>>>>>>> MetadataRequests / >>>>>>>>>> Responses, we have some math kind of like this per >> partition: >>>>>>>>>> >>>>>>>>>>> UpdateMetadataRequestPartitionState => topic partition >>>>>>>>> controller_epoch >>>>>>>>>> leader leader_epoch partition_epoch isr zk_version replicas >>>>>>>>>> offline_replicas >>>>>>>>>>> 14 bytes: topic => string (assuming about 10 byte topic >>>>> names) >>>>>>>>>>> 4 bytes: partition => int32 >>>>>>>>>>> 4 bytes: conroller_epoch => int32 >>>>>>>>>>> 4 bytes: leader => int32 >>>>>>>>>>> 4 bytes: leader_epoch => int32 >>>>>>>>>>> +4 EXTRA bytes: partition_epoch => int32 <-- NEW >>>>>>>>>>> 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR) >>>>>>>>>>> 4 bytes: zk_version => int32 >>>>>>>>>>> 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas) >>>>>>>>>>> 2 offline_replicas => [int32] (assuming no offline >>> replicas) >>>>>>>>>> >>>>>>>>>> Assuming I added that up correctly, the per-partition >> overhead >>>>> goes >>>>>>>> from >>>>>>>>>> 64 bytes per partition to 68, a 6.2% increase. >>>>>>>>>> >>>>>>>>>> We could do similar math for a lot of the other RPCs. And >> you >>>>> will >>>>>>>> have >>>>>>>>> a >>>>>>>>>> similar memory and garbage collection impact on the brokers >>>>> since >>>>>> you >>>>>>>>> have >>>>>>>>>> to store all this extra state as well. >>>>>>>>>> >>>>>>>>> >>>>>>>>> That is correct. IMO the Metadata is only updated periodically >>>>> and is >>>>>>>>> probably not a big deal if we increase it by 6%. The >>> FetchResponse >>>>>> and >>>>>>>>> ProduceRequest are probably the only requests that are bounded >>> by >>>>> the >>>>>>>>> bandwidth throughput. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I agree that we can probably save more space by using >>>>> partition >>>>>>> ID >>>>>>>> so >>>>>>>>>> that >>>>>>>>>>>> we no longer needs the string topic name. The similar >> idea >>>>> has >>>>>>> also >>>>>>>>>> been >>>>>>>>>>>> put in the Rejected Alternative section in KIP-227. >> While >>>>> this >>>>>>> idea >>>>>>>>> is >>>>>>>>>>>> promising, it seems orthogonal to the goal of this KIP. >>>>> Given >>>>>>> that >>>>>>>>>> there is >>>>>>>>>>>> already many work to do in this KIP, maybe we can do the >>>>>>> partition >>>>>>>> ID >>>>>>>>>> in a >>>>>>>>>>>> separate KIP? >>>>>>>>>> >>>>>>>>>> I guess my thinking is that the goal here is to replace an >>>>>> identifier >>>>>>>>>> which can be re-used (the tuple of topic name, partition ID) >>>>> with >>>>>> an >>>>>>>>>> identifier that cannot be re-used (the tuple of topic name, >>>>>> partition >>>>>>>> ID, >>>>>>>>>> partition epoch) in order to gain better semantics. As long >>> as >>>>> we >>>>>>> are >>>>>>>>>> replacing the identifier, why not replace it with an >>> identifier >>>>>> that >>>>>>>> has >>>>>>>>>> important performance advantages? The KIP freeze for the >> next >>>>>>> release >>>>>>>>> has >>>>>>>>>> already passed, so there is time to do this. >>>>>>>>>> >>>>>>>>> >>>>>>>>> In general it can be easier for discussion and implementation >> if >>>>> we >>>>>> can >>>>>>>>> split a larger task into smaller and independent tasks. For >>>>> example, >>>>>>>>> KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, >>>>> KIP-32 >>>>>>> and >>>>>>>>> KIP-33 are about timestamp support. The option on this can be >>>>> subject >>>>>>>>> though. >>>>>>>>> >>>>>>>>> IMO the change to switch from (topic, partition ID) to >>>>> partitionEpch >>>>>> in >>>>>>>> all >>>>>>>>> request/response requires us to going through all request one >> by >>>>> one. >>>>>>> It >>>>>>>>> may not be hard but it can be time consuming and tedious. At >>> high >>>>>> level >>>>>>>> the >>>>>>>>> goal and the change for that will be orthogonal to the changes >>>>>> required >>>>>>>> in >>>>>>>>> this KIP. That is the main reason I think we can split them >> into >>>>> two >>>>>>>> KIPs. >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote: >>>>>>>>>>> I think it is possible to move to entirely use >>> partitionEpoch >>>>>>> instead >>>>>>>>> of >>>>>>>>>>> (topic, partition) to identify a partition. Client can >>> obtain >>>>> the >>>>>>>>>>> partitionEpoch -> (topic, partition) mapping from >>>>>> MetadataResponse. >>>>>>>> We >>>>>>>>>>> probably need to figure out a way to assign partitionEpoch >>> to >>>>>>>> existing >>>>>>>>>>> partitions in the cluster. But this should be doable. >>>>>>>>>>> >>>>>>>>>>> This is a good idea. I think it will save us some space in >>> the >>>>>>>>>>> request/response. The actual space saving in percentage >>>>> probably >>>>>>>>> depends >>>>>>>>>> on >>>>>>>>>>> the amount of data and the number of partitions of the >> same >>>>>> topic. >>>>>>> I >>>>>>>>> just >>>>>>>>>>> think we can do it in a separate KIP. >>>>>>>>>> >>>>>>>>>> Hmm. How much extra work would be required? It seems like >> we >>>>> are >>>>>>>>> already >>>>>>>>>> changing almost every RPC that involves topics and >> partitions, >>>>>>> already >>>>>>>>>> adding new per-partition state to ZooKeeper, already >> changing >>>>> how >>>>>>>> clients >>>>>>>>>> interact with partitions. Is there some other big piece of >>> work >>>>>> we'd >>>>>>>>> have >>>>>>>>>> to do to move to partition IDs that we wouldn't need for >>>>> partition >>>>>>>>> epochs? >>>>>>>>>> I guess we'd have to find a way to support regular >>>>> expression-based >>>>>>>> topic >>>>>>>>>> subscriptions. If we split this into multiple KIPs, >> wouldn't >>> we >>>>>> end >>>>>>> up >>>>>>>>>> changing all that RPCs and ZK state a second time? Also, >> I'm >>>>>> curious >>>>>>>> if >>>>>>>>>> anyone has done any proof of concept GC, memory, and network >>>>> usage >>>>>>>>>> measurements on switching topic names for topic IDs. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> We will need to go over all requests/responses to check how to >>>>>> replace >>>>>>>>> (topic, partition ID) with partition epoch. It requires >>>>> non-trivial >>>>>>> work >>>>>>>>> and could take time. As you mentioned, we may want to see how >>> much >>>>>>> saving >>>>>>>>> we can get by switching from topic names to partition epoch. >>> That >>>>>>> itself >>>>>>>>> requires time and experiment. It seems that the new idea does >>> not >>>>>>>> rollback >>>>>>>>> any change proposed in this KIP. So I am not sure we can get >>> much >>>>> by >>>>>>>>> putting them into the same KIP. >>>>>>>>> >>>>>>>>> Anyway, if more people are interested in seeing the new idea >> in >>>>> the >>>>>>> same >>>>>>>>> KIP, I can try that. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> best, >>>>>>>>>> Colin >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe < >>>>>>> cmcc...@apache.org >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote: >>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe < >>>>>>>>> cmcc...@apache.org> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: >>>>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the comment. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe < >>>>>>>>>> cmcc...@apache.org> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: >>>>>>>>>>>>>>>>>> Hey Colin, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for reviewing the KIP. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If I understand you right, you maybe >> suggesting >>>>> that >>>>>>> we >>>>>>>>> can >>>>>>>>>> use >>>>>>>>>>>>> a >>>>>>>>>>>>>>> global >>>>>>>>>>>>>>>>>> metadataEpoch that is incremented every time >>>>>>> controller >>>>>>>>>> updates >>>>>>>>>>>>>>> metadata. >>>>>>>>>>>>>>>>>> The problem with this solution is that, if a >>>>> topic >>>>>> is >>>>>>>>>> deleted >>>>>>>>>>>>> and >>>>>>>>>>>>>>> created >>>>>>>>>>>>>>>>>> again, user will not know whether that the >>> offset >>>>>>> which >>>>>>>> is >>>>>>>>>>>>> stored >>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>> the topic deletion is no longer valid. This >>>>>> motivates >>>>>>>> the >>>>>>>>>> idea >>>>>>>>>>>>> to >>>>>>>>>>>>>>> include >>>>>>>>>>>>>>>>>> per-partition partitionEpoch. Does this sound >>>>>>>> reasonable? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Perhaps we can store the last valid offset of >>> each >>>>>>> deleted >>>>>>>>>> topic >>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> ZooKeeper. Then, when a topic with one of >> those >>>>> names >>>>>>>> gets >>>>>>>>>>>>>>> re-created, we >>>>>>>>>>>>>>>>> can start the topic at the previous end offset >>>>> rather >>>>>>> than >>>>>>>>> at >>>>>>>>>> 0. >>>>>>>>>>>>> This >>>>>>>>>>>>>>>>> preserves immutability. It is no more >> burdensome >>>>> than >>>>>>>>> having >>>>>>>>>> to >>>>>>>>>>>>>>> preserve a >>>>>>>>>>>>>>>>> "last epoch" for the deleted partition >> somewhere, >>>>>> right? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My concern with this solution is that the number >> of >>>>>>>> zookeeper >>>>>>>>>> nodes >>>>>>>>>>>>> get >>>>>>>>>>>>>>>> more and more over time if some users keep >> deleting >>>>> and >>>>>>>>> creating >>>>>>>>>>>>> topics. >>>>>>>>>>>>>>> Do >>>>>>>>>>>>>>>> you think this can be a problem? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We could expire the "partition tombstones" after an >>>>> hour >>>>>> or >>>>>>>> so. >>>>>>>>>> In >>>>>>>>>>>>>>> practice this would solve the issue for clients >> that >>>>> like >>>>>> to >>>>>>>>>> destroy >>>>>>>>>>>>> and >>>>>>>>>>>>>>> re-create topics all the time. In any case, >> doesn't >>>>> the >>>>>>>> current >>>>>>>>>>>>> proposal >>>>>>>>>>>>>>> add per-partition znodes as well that we have to >>> track >>>>>> even >>>>>>>>> after >>>>>>>>>> the >>>>>>>>>>>>>>> partition is deleted? Or did I misunderstand that? >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Actually the current KIP does not add per-partition >>>>> znodes. >>>>>>>> Could >>>>>>>>>> you >>>>>>>>>>>>>> double check? I can fix the KIP wiki if there is >>> anything >>>>>>>>>> misleading. >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>> >>>>>>>>>>>>> I double-checked the KIP, and I can see that you are in >>>>> fact >>>>>>>> using a >>>>>>>>>>>>> global counter for initializing partition epochs. So, >>> you >>>>> are >>>>>>>>>> correct, it >>>>>>>>>>>>> doesn't add per-partition znodes for partitions that no >>>>> longer >>>>>>>>> exist. >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> If we expire the "partition tomstones" after an hour, >>> and >>>>>> the >>>>>>>>> topic >>>>>>>>>> is >>>>>>>>>>>>>> re-created after more than an hour since the topic >>>>> deletion, >>>>>>>> then >>>>>>>>>> we are >>>>>>>>>>>>>> back to the situation where user can not tell whether >>> the >>>>>>> topic >>>>>>>>> has >>>>>>>>>> been >>>>>>>>>>>>>> re-created or not, right? >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, with an expiration period, it would not ensure >>>>>>> immutability-- >>>>>>>>> you >>>>>>>>>>>>> could effectively reuse partition names and they would >>> look >>>>>> the >>>>>>>>> same. >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It's not really clear to me what should happen >> when a >>>>>> topic >>>>>>> is >>>>>>>>>>>>> destroyed >>>>>>>>>>>>>>> and re-created with new data. Should consumers >>>>> continue >>>>>> to >>>>>>> be >>>>>>>>>> able to >>>>>>>>>>>>>>> consume? We don't know where they stopped >> consuming >>>>> from >>>>>>> the >>>>>>>>>> previous >>>>>>>>>>>>>>> incarnation of the topic, so messages may have been >>>>> lost. >>>>>>>>>> Certainly >>>>>>>>>>>>>>> consuming data from offset X of the new incarnation >>> of >>>>> the >>>>>>>> topic >>>>>>>>>> may >>>>>>>>>>>>> give >>>>>>>>>>>>>>> something totally different from what you would >> have >>>>>> gotten >>>>>>>> from >>>>>>>>>>>>> offset X >>>>>>>>>>>>>>> of the previous incarnation of the topic. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> With the current KIP, if a consumer consumes a topic >>>>> based >>>>>> on >>>>>>>> the >>>>>>>>>> last >>>>>>>>>>>>>> remembered (offset, partitionEpoch, leaderEpoch), and >>> if >>>>> the >>>>>>>> topic >>>>>>>>>> is >>>>>>>>>>>>>> re-created, consume will throw >>>>>> InvalidPartitionEpochException >>>>>>>>>> because >>>>>>>>>>>>> the >>>>>>>>>>>>>> previous partitionEpoch will be different from the >>>>> current >>>>>>>>>>>>> partitionEpoch. >>>>>>>>>>>>>> This is described in the Proposed Changes -> >>> Consumption >>>>>> after >>>>>>>>> topic >>>>>>>>>>>>>> deletion in the KIP. I can improve the KIP if there >> is >>>>>>> anything >>>>>>>>> not >>>>>>>>>>>>> clear. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the clarification. It sounds like what you >>>>> really >>>>>>> want >>>>>>>>> is >>>>>>>>>>>>> immutability-- i.e., to never "really" reuse partition >>>>>>>> identifiers. >>>>>>>>>> And >>>>>>>>>>>>> you do this by making the partition name no longer the >>>>> "real" >>>>>>>>>> identifier. >>>>>>>>>>>>> >>>>>>>>>>>>> My big concern about this KIP is that it seems like an >>>>>>>>>> anti-scalability >>>>>>>>>>>>> feature. Now we are adding 4 extra bytes for every >>>>> partition >>>>>> in >>>>>>>> the >>>>>>>>>>>>> FetchResponse and Request, for example. That could be >> 40 >>>>> kb >>>>>> per >>>>>>>>>> request, >>>>>>>>>>>>> if the user has 10,000 partitions. And of course, the >>> KIP >>>>>> also >>>>>>>>> makes >>>>>>>>>>>>> massive changes to UpdateMetadataRequest, >>> MetadataResponse, >>>>>>>>>>>>> OffsetCommitRequest, OffsetFetchResponse, >>>>> LeaderAndIsrRequest, >>>>>>>>>>>>> ListOffsetResponse, etc. which will also increase their >>>>> size >>>>>> on >>>>>>>> the >>>>>>>>>> wire >>>>>>>>>>>>> and in memory. >>>>>>>>>>>>> >>>>>>>>>>>>> One thing that we talked a lot about in the past is >>>>> replacing >>>>>>>>>> partition >>>>>>>>>>>>> names with IDs. IDs have a lot of really nice >> features. >>>>> They >>>>>>>> take >>>>>>>>>> up much >>>>>>>>>>>>> less space in memory than strings (especially 2-byte >> Java >>>>>>>> strings). >>>>>>>>>> They >>>>>>>>>>>>> can often be allocated on the stack rather than the >> heap >>>>>>>> (important >>>>>>>>>> when >>>>>>>>>>>>> you are dealing with hundreds of thousands of them). >>> They >>>>> can >>>>>>> be >>>>>>>>>>>>> efficiently deserialized and serialized. If we use >>> 64-bit >>>>>> ones, >>>>>>>> we >>>>>>>>>> will >>>>>>>>>>>>> never run out of IDs, which means that they can always >> be >>>>>> unique >>>>>>>> per >>>>>>>>>>>>> partition. >>>>>>>>>>>>> >>>>>>>>>>>>> Given that the partition name is no longer the "real" >>>>>> identifier >>>>>>>> for >>>>>>>>>>>>> partitions in the current KIP-232 proposal, why not >> just >>>>> move >>>>>> to >>>>>>>>> using >>>>>>>>>>>>> partition IDs entirely instead of strings? You have to >>>>> change >>>>>>> all >>>>>>>>> the >>>>>>>>>>>>> messages anyway. There isn't much point any more to >>>>> carrying >>>>>>>> around >>>>>>>>>> the >>>>>>>>>>>>> partition name in every RPC, since you really need >> (name, >>>>>> epoch) >>>>>>>> to >>>>>>>>>>>>> identify the partition. >>>>>>>>>>>>> Probably the metadata response and a few other messages >>>>> would >>>>>>> have >>>>>>>>> to >>>>>>>>>>>>> still carry the partition name, to allow clients to go >>> from >>>>>> name >>>>>>>> to >>>>>>>>>> id. >>>>>>>>>>>>> But we could mostly forget about the strings. And then >>>>> this >>>>>>> would >>>>>>>>> be >>>>>>>>>> a >>>>>>>>>>>>> scalability improvement rather than a scalability >>> problem. >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> By choosing to reuse the same (topic, partition, >>>>> offset) >>>>>>>>> 3-tuple, >>>>>>>>>> we >>>>>>>>>>>>> have >>>>>>>>>>>>>> >>>>>>>>>>>>>> chosen to give up immutability. That was a really >> bad >>>>>>> decision. >>>>>>>>>> And >>>>>>>>>>>>> now >>>>>>>>>>>>>>> we have to worry about time dependencies, stale >>> cached >>>>>> data, >>>>>>>> and >>>>>>>>>> all >>>>>>>>>>>>> the >>>>>>>>>>>>>>> rest. We can't completely fix this inside Kafka no >>>>> matter >>>>>>>> what >>>>>>>>>> we do, >>>>>>>>>>>>>>> because not all that cached data is inside Kafka >>>>> itself. >>>>>>> Some >>>>>>>>> of >>>>>>>>>> it >>>>>>>>>>>>> may be >>>>>>>>>>>>>>> in systems that Kafka has sent data to, such as >> other >>>>>>> daemons, >>>>>>>>> SQL >>>>>>>>>>>>>>> databases, streams, and so forth. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> The current KIP will uniquely identify a message >> using >>>>>> (topic, >>>>>>>>>>>>> partition, >>>>>>>>>>>>>> offset, partitionEpoch) 4-tuple. This addresses the >>>>> message >>>>>>>>>> immutability >>>>>>>>>>>>>> issue that you mentioned. Is there any corner case >>> where >>>>> the >>>>>>>>> message >>>>>>>>>>>>>> immutability is still not preserved with the current >>> KIP? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I guess the idea here is that mirror maker should >>> work >>>>> as >>>>>>>>> expected >>>>>>>>>>>>> when >>>>>>>>>>>>>>> users destroy a topic and re-create it with the >> same >>>>> name. >>>>>>>>> That's >>>>>>>>>>>>> kind of >>>>>>>>>>>>>>> tough, though, since in that scenario, mirror maker >>>>>> probably >>>>>>>>>> should >>>>>>>>>>>>> destroy >>>>>>>>>>>>>>> and re-create the topic on the other end, too, >> right? >>>>>>>>> Otherwise, >>>>>>>>>>>>> what you >>>>>>>>>>>>>>> end up with on the other end could be half of one >>>>>>> incarnation >>>>>>>> of >>>>>>>>>> the >>>>>>>>>>>>> topic, >>>>>>>>>>>>>>> and half of another. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What mirror maker really needs is to be able to >>> follow >>>>> a >>>>>>>> stream >>>>>>>>> of >>>>>>>>>>>>> events >>>>>>>>>>>>>>> about the kafka cluster itself. We could have some >>>>> master >>>>>>>> topic >>>>>>>>>>>>> which is >>>>>>>>>>>>>>> always present and which contains data about all >>> topic >>>>>>>>> deletions, >>>>>>>>>>>>>>> creations, etc. Then MM can simply follow this >> topic >>>>> and >>>>>> do >>>>>>>>> what >>>>>>>>>> is >>>>>>>>>>>>> needed. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Then the next question maybe, should we use a >>>>> global >>>>>>>>>>>>> metadataEpoch + >>>>>>>>>>>>>>>>>> per-partition partitionEpoch, instead of >> using >>>>>>>>> per-partition >>>>>>>>>>>>>>> leaderEpoch >>>>>>>>>>>>>>>>> + >>>>>>>>>>>>>>>>>> per-partition leaderEpoch. The former >> solution >>>>> using >>>>>>>>>>>>> metadataEpoch >>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>> not work due to the following scenario >>> (provided >>>>> by >>>>>>>> Jun): >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> "Consider the following scenario. In metadata >>> v1, >>>>>> the >>>>>>>>> leader >>>>>>>>>>>>> for a >>>>>>>>>>>>>>>>>> partition is at broker 1. In metadata v2, >>> leader >>>>> is >>>>>> at >>>>>>>>>> broker >>>>>>>>>>>>> 2. In >>>>>>>>>>>>>>>>>> metadata v3, leader is at broker 1 again. The >>>>> last >>>>>>>>> committed >>>>>>>>>>>>> offset >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> v1, >>>>>>>>>>>>>>>>>> v2 and v3 are 10, 20 and 30, respectively. A >>>>>> consumer >>>>>>> is >>>>>>>>>>>>> started and >>>>>>>>>>>>>>>>> reads >>>>>>>>>>>>>>>>>> metadata v1 and reads messages from offset 0 >> to >>>>> 25 >>>>>>> from >>>>>>>>>> broker >>>>>>>>>>>>> 1. My >>>>>>>>>>>>>>>>>> understanding is that in the current >> proposal, >>>>> the >>>>>>>>> metadata >>>>>>>>>>>>> version >>>>>>>>>>>>>>>>>> associated with offset 25 is v1. The consumer >>> is >>>>>> then >>>>>>>>>> restarted >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> fetches >>>>>>>>>>>>>>>>>> metadata v2. The consumer tries to read from >>>>> broker >>>>>> 2, >>>>>>>>>> which is >>>>>>>>>>>>> the >>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>>> leader with the last offset at 20. In this >>> case, >>>>> the >>>>>>>>>> consumer >>>>>>>>>>>>> will >>>>>>>>>>>>>>> still >>>>>>>>>>>>>>>>>> get OffsetOutOfRangeException incorrectly." >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regarding your comment "For the second >> purpose, >>>>> this >>>>>>> is >>>>>>>>>> "soft >>>>>>>>>>>>> state" >>>>>>>>>>>>>>>>>> anyway. If the client thinks X is the leader >>>>> but Y >>>>>> is >>>>>>>>>> really >>>>>>>>>>>>> the >>>>>>>>>>>>>>> leader, >>>>>>>>>>>>>>>>>> the client will talk to X, and X will point >> out >>>>> its >>>>>>>>> mistake >>>>>>>>>> by >>>>>>>>>>>>>>> sending >>>>>>>>>>>>>>>>> back >>>>>>>>>>>>>>>>>> a NOT_LEADER_FOR_PARTITION.", it is probably >> no >>>>>> true. >>>>>>>> The >>>>>>>>>>>>> problem >>>>>>>>>>>>>>> here is >>>>>>>>>>>>>>>>>> that the old leader X may still think it is >> the >>>>>> leader >>>>>>>> of >>>>>>>>>> the >>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> thus it will not send back >>>>> NOT_LEADER_FOR_PARTITION. >>>>>>> The >>>>>>>>>> reason >>>>>>>>>>>>> is >>>>>>>>>>>>>>>>> provided >>>>>>>>>>>>>>>>>> in KAFKA-6262. Can you check if that makes >>> sense? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is solvable with a timeout, right? If the >>>>> leader >>>>>>>> can't >>>>>>>>>>>>>>> communicate >>>>>>>>>>>>>>>>> with the controller for a certain period of >> time, >>>>> it >>>>>>>> should >>>>>>>>>> stop >>>>>>>>>>>>>>> acting as >>>>>>>>>>>>>>>>> the leader. We have to solve this problem, >>>>> anyway, in >>>>>>>> order >>>>>>>>>> to >>>>>>>>>>>>> fix >>>>>>>>>>>>>>> all the >>>>>>>>>>>>>>>>> corner cases. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Not sure if I fully understand your proposal. The >>>>>> proposal >>>>>>>>>> seems to >>>>>>>>>>>>>>> require >>>>>>>>>>>>>>>> non-trivial changes to our existing leadership >>>>> election >>>>>>>>>> mechanism. >>>>>>>>>>>>> Could >>>>>>>>>>>>>>>> you provide more detail regarding how it works? >> For >>>>>>> example, >>>>>>>>> how >>>>>>>>>>>>> should >>>>>>>>>>>>>>>> user choose this timeout, how leader determines >>>>> whether >>>>>> it >>>>>>>> can >>>>>>>>>> still >>>>>>>>>>>>>>>> communicate with controller, and how this >> triggers >>>>>>>> controller >>>>>>>>> to >>>>>>>>>>>>> elect >>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>> leader? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Before I come up with any proposal, let me make >> sure >>> I >>>>>>>>> understand >>>>>>>>>> the >>>>>>>>>>>>>>> problem correctly. My big question was, what >>> prevents >>>>>>>>> split-brain >>>>>>>>>>>>> here? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Let's say I have a partition which is on nodes A, >> B, >>>>> and >>>>>> C, >>>>>>>> with >>>>>>>>>>>>> min-ISR >>>>>>>>>>>>>>> 2. The controller is D. At some point, there is a >>>>>> network >>>>>>>>>> partition >>>>>>>>>>>>>>> between A and B and the rest of the cluster. The >>>>>> Controller >>>>>>>>>>>>> re-assigns the >>>>>>>>>>>>>>> partition to nodes C, D, and E. But A and B keep >>>>> chugging >>>>>>>> away, >>>>>>>>>> even >>>>>>>>>>>>>>> though they can no longer communicate with the >>>>> controller. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> At some point, a client with stale metadata writes >> to >>>>> the >>>>>>>>>> partition. >>>>>>>>>>>>> It >>>>>>>>>>>>>>> still thinks the partition is on node A, B, and C, >> so >>>>>> that's >>>>>>>>>> where it >>>>>>>>>>>>> sends >>>>>>>>>>>>>>> the data. It's unable to talk to C, but A and B >>> reply >>>>>> back >>>>>>>> that >>>>>>>>>> all >>>>>>>>>>>>> is >>>>>>>>>>>>>>> well. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is this not a case where we could lose data due to >>>>> split >>>>>>>> brain? >>>>>>>>>> Or is >>>>>>>>>>>>>>> there a mechanism for preventing this that I >> missed? >>>>> If >>>>>> it >>>>>>>> is, >>>>>>>>> it >>>>>>>>>>>>> seems >>>>>>>>>>>>>>> like a pretty serious failure case that we should >> be >>>>>>> handling >>>>>>>>>> with our >>>>>>>>>>>>>>> metadata rework. And I think epoch numbers and >>>>> timeouts >>>>>>> might >>>>>>>>> be >>>>>>>>>>>>> part of >>>>>>>>>>>>>>> the solution. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Right, split brain can happen if RF=4 and minIsr=2. >>>>>> However, I >>>>>>>> am >>>>>>>>>> not >>>>>>>>>>>>> sure >>>>>>>>>>>>>> it is a pretty serious issue which we need to address >>>>> today. >>>>>>>> This >>>>>>>>>> can be >>>>>>>>>>>>>> prevented by configuring the Kafka topic so that >>> minIsr > >>>>>>> RF/2. >>>>>>>>>>>>> Actually, >>>>>>>>>>>>>> if user sets minIsr=2, is there anything reason that >>> user >>>>>>> wants >>>>>>>> to >>>>>>>>>> set >>>>>>>>>>>>> RF=4 >>>>>>>>>>>>>> instead of 4? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Introducing timeout in leader election mechanism is >>>>>>>> non-trivial. I >>>>>>>>>>>>> think we >>>>>>>>>>>>>> probably want to do that only if there is good >> use-case >>>>> that >>>>>>> can >>>>>>>>> not >>>>>>>>>>>>>> otherwise be addressed with the current mechanism. >>>>>>>>>>>>> >>>>>>>>>>>>> I still would like to think about these corner cases >>> more. >>>>>> But >>>>>>>>>> perhaps >>>>>>>>>>>>> it's not directly related to this KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> regards, >>>>>>>>>>>>> Colin >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> best, >>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> best, >>>>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 10:39 AM, Colin >> McCabe >>> < >>>>>>>>>>>>> cmcc...@apache.org> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for proposing this KIP. I think a >>>>> metadata >>>>>>>> epoch >>>>>>>>>> is a >>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>> good >>>>>>>>>>>>>>>>>>> idea. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I read through the DISCUSS thread, but I >>> still >>>>>> don't >>>>>>>>> have >>>>>>>>>> a >>>>>>>>>>>>> clear >>>>>>>>>>>>>>>>> picture >>>>>>>>>>>>>>>>>>> of why the proposal uses a metadata epoch >> per >>>>>>>> partition >>>>>>>>>> rather >>>>>>>>>>>>>>> than a >>>>>>>>>>>>>>>>>>> global metadata epoch. A metadata epoch >> per >>>>>>> partition >>>>>>>>> is >>>>>>>>>>>>> kind of >>>>>>>>>>>>>>>>>>> unpleasant-- it's at least 4 extra bytes >> per >>>>>>> partition >>>>>>>>>> that we >>>>>>>>>>>>>>> have to >>>>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>>> over the wire in every full metadata >> request, >>>>>> which >>>>>>>>> could >>>>>>>>>>>>> become >>>>>>>>>>>>>>> extra >>>>>>>>>>>>>>>>>>> kilobytes on the wire when the number of >>>>>> partitions >>>>>>>>>> becomes >>>>>>>>>>>>> large. >>>>>>>>>>>>>>>>> Plus, >>>>>>>>>>>>>>>>>>> we have to update all the auxillary classes >>> to >>>>>>> include >>>>>>>>> an >>>>>>>>>>>>> epoch. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> We need to have a global metadata epoch >>> anyway >>>>> to >>>>>>>> handle >>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>> addition and deletion. For example, if I >>> give >>>>> you >>>>>>>>>>>>>>>>>>> MetadataResponse{part1,epoch 1, part2, >> epoch >>> 1} >>>>>> and >>>>>>>>>> {part1, >>>>>>>>>>>>>>> epoch1}, >>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>> MetadataResponse is newer? You have no way >>> of >>>>>>>> knowing. >>>>>>>>>> It >>>>>>>>>>>>> could >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> part2 has just been created, and the >> response >>>>>> with 2 >>>>>>>>>>>>> partitions is >>>>>>>>>>>>>>>>> newer. >>>>>>>>>>>>>>>>>>> Or it coudl be that part2 has just been >>>>> deleted, >>>>>> and >>>>>>>>>>>>> therefore the >>>>>>>>>>>>>>>>> response >>>>>>>>>>>>>>>>>>> with 1 partition is newer. You must have a >>>>> global >>>>>>>> epoch >>>>>>>>>> to >>>>>>>>>>>>>>>>> disambiguate >>>>>>>>>>>>>>>>>>> these two cases. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Previously, I worked on the Ceph >> distributed >>>>>>>> filesystem. >>>>>>>>>>>>> Ceph had >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> concept of a map of the whole cluster, >>>>> maintained >>>>>>> by a >>>>>>>>> few >>>>>>>>>>>>> servers >>>>>>>>>>>>>>>>> doing >>>>>>>>>>>>>>>>>>> paxos. This map was versioned by a single >>>>> 64-bit >>>>>>>> epoch >>>>>>>>>> number >>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>> increased on every change. It was >> propagated >>>>> to >>>>>>>> clients >>>>>>>>>>>>> through >>>>>>>>>>>>>>>>> gossip. I >>>>>>>>>>>>>>>>>>> wonder if something similar could work >> here? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> It seems like the the Kafka >> MetadataResponse >>>>>> serves >>>>>>>> two >>>>>>>>>>>>> somewhat >>>>>>>>>>>>>>>>> unrelated >>>>>>>>>>>>>>>>>>> purposes. Firstly, it lets clients know >> what >>>>>>>> partitions >>>>>>>>>>>>> exist in >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>> system and where they live. Secondly, it >>> lets >>>>>>> clients >>>>>>>>>> know >>>>>>>>>>>>> which >>>>>>>>>>>>>>> nodes >>>>>>>>>>>>>>>>>>> within the partition are in-sync (in the >> ISR) >>>>> and >>>>>>>> which >>>>>>>>>> node >>>>>>>>>>>>> is the >>>>>>>>>>>>>>>>> leader. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The first purpose is what you really need a >>>>>> metadata >>>>>>>>> epoch >>>>>>>>>>>>> for, I >>>>>>>>>>>>>>>>> think. >>>>>>>>>>>>>>>>>>> You want to know whether a partition exists >>> or >>>>>> not, >>>>>>> or >>>>>>>>> you >>>>>>>>>>>>> want to >>>>>>>>>>>>>>> know >>>>>>>>>>>>>>>>>>> which nodes you should talk to in order to >>>>> write >>>>>> to >>>>>>> a >>>>>>>>>> given >>>>>>>>>>>>>>>>> partition. A >>>>>>>>>>>>>>>>>>> single metadata epoch for the whole >> response >>>>>> should >>>>>>> be >>>>>>>>>>>>> adequate >>>>>>>>>>>>>>> here. >>>>>>>>>>>>>>>>> We >>>>>>>>>>>>>>>>>>> should not change the partition assignment >>>>> without >>>>>>>> going >>>>>>>>>>>>> through >>>>>>>>>>>>>>>>> zookeeper >>>>>>>>>>>>>>>>>>> (or a similar system), and this inherently >>>>>>> serializes >>>>>>>>>> updates >>>>>>>>>>>>> into >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>> numbered stream. Brokers should also stop >>>>>>> responding >>>>>>>> to >>>>>>>>>>>>> requests >>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>> they >>>>>>>>>>>>>>>>>>> are unable to contact ZK for a certain time >>>>>> period. >>>>>>>>> This >>>>>>>>>>>>> prevents >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> case >>>>>>>>>>>>>>>>>>> where a given partition has been moved off >>> some >>>>>> set >>>>>>> of >>>>>>>>>> nodes, >>>>>>>>>>>>> but a >>>>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>>> still ends up talking to those nodes and >>>>> writing >>>>>>> data >>>>>>>>>> there. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> For the second purpose, this is "soft >> state" >>>>>> anyway. >>>>>>>> If >>>>>>>>>> the >>>>>>>>>>>>> client >>>>>>>>>>>>>>>>> thinks >>>>>>>>>>>>>>>>>>> X is the leader but Y is really the leader, >>> the >>>>>>> client >>>>>>>>>> will >>>>>>>>>>>>> talk >>>>>>>>>>>>>>> to X, >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> X will point out its mistake by sending >> back >>> a >>>>>>>>>>>>>>>>> NOT_LEADER_FOR_PARTITION. >>>>>>>>>>>>>>>>>>> Then the client can update its metadata >> again >>>>> and >>>>>>> find >>>>>>>>>> the new >>>>>>>>>>>>>>> leader, >>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> there is one. There is no need for an >> epoch >>> to >>>>>>> handle >>>>>>>>>> this. >>>>>>>>>>>>>>>>> Similarly, I >>>>>>>>>>>>>>>>>>> can't think of a reason why changing the >>>>> in-sync >>>>>>>> replica >>>>>>>>>> set >>>>>>>>>>>>> needs >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> bump >>>>>>>>>>>>>>>>>>> the epoch. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> best, >>>>>>>>>>>>>>>>>>> Colin >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 09:45, Dong Lin >>> wrote: >>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the KIP! >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang >>>>> Wang < >>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Yeah that makes sense, again I'm just >>>>> making >>>>>>> sure >>>>>>>> we >>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>> all the >>>>>>>>>>>>>>>>>>>>> scenarios and what to expect. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I agree that if, more generally >> speaking, >>>>> say >>>>>>>> users >>>>>>>>>> have >>>>>>>>>>>>> only >>>>>>>>>>>>>>>>> consumed >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> offset 8, and then call seek(16) to >>> "jump" >>>>> to >>>>>> a >>>>>>>>>> further >>>>>>>>>>>>>>> position, >>>>>>>>>>>>>>>>> then >>>>>>>>>>>>>>>>>>> she >>>>>>>>>>>>>>>>>>>>> needs to be aware that OORE maybe >> thrown >>>>> and >>>>>> she >>>>>>>>>> needs to >>>>>>>>>>>>>>> handle >>>>>>>>>>>>>>>>> it or >>>>>>>>>>>>>>>>>>> rely >>>>>>>>>>>>>>>>>>>>> on reset policy which should not >> surprise >>>>> her. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 12:31 AM, Dong >>> Lin >>>>> < >>>>>>>>>>>>>>> lindon...@gmail.com> >>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yes, in general we can not prevent >>>>>>>>>>>>> OffsetOutOfRangeException >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>>>>>> seeks >>>>>>>>>>>>>>>>>>>>>> to a wrong offset. The main goal is >> to >>>>>> prevent >>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException >>>>>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>>> user has done things in the right >> way, >>>>> e.g. >>>>>>> user >>>>>>>>>> should >>>>>>>>>>>>> know >>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>> message with this offset. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> For example, if user calls seek(..) >>> right >>>>>>> after >>>>>>>>>>>>>>> construction, the >>>>>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>>>>>> reason I can think of is that user >>> stores >>>>>>> offset >>>>>>>>>>>>> externally. >>>>>>>>>>>>>>> In >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> case, >>>>>>>>>>>>>>>>>>>>>> user currently needs to use the >> offset >>>>> which >>>>>>> is >>>>>>>>>> obtained >>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>>>>>>> position(..) >>>>>>>>>>>>>>>>>>>>>> from the last run. With this KIP, >> user >>>>> needs >>>>>>> to >>>>>>>>> get >>>>>>>>>> the >>>>>>>>>>>>>>> offset >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> offsetEpoch using >>>>>> positionAndOffsetEpoch(...) >>>>>>>> and >>>>>>>>>> stores >>>>>>>>>>>>>>> these >>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>> externally. The next time user starts >>>>>>> consumer, >>>>>>>>>> he/she >>>>>>>>>>>>> needs >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>>>>> seek(..., offset, offsetEpoch) right >>>>> after >>>>>>>>>> construction. >>>>>>>>>>>>>>> Then KIP >>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>> able to ensure that we don't throw >>>>>>>>>>>>> OffsetOutOfRangeException >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>> there is >>>>>>>>>>>>>>>>>>>>> no >>>>>>>>>>>>>>>>>>>>>> unclean leader election. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this sound OK? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 11:44 PM, >>>>> Guozhang >>>>>>> Wang >>>>>>>> < >>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> "If consumer wants to consume >> message >>>>> with >>>>>>>>> offset >>>>>>>>>> 16, >>>>>>>>>>>>> then >>>>>>>>>>>>>>>>> consumer >>>>>>>>>>>>>>>>>>>>> must >>>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>> already fetched message with offset >>> 15" >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> --> this may not be always true >>> right? >>>>>> What >>>>>>> if >>>>>>>>>>>>> consumer >>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>>>>> seek(16) >>>>>>>>>>>>>>>>>>>>>>> after construction and then poll >>>>> without >>>>>>>>> committed >>>>>>>>>>>>> offset >>>>>>>>>>>>>>> ever >>>>>>>>>>>>>>>>>>> stored >>>>>>>>>>>>>>>>>>>>>>> before? Admittedly it is rare but >> we >>> do >>>>>> not >>>>>>>>>>>>> programmably >>>>>>>>>>>>>>>>> disallow >>>>>>>>>>>>>>>>>>> it. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 10:42 PM, >>> Dong >>>>>> Lin < >>>>>>>>>>>>>>>>> lindon...@gmail.com> >>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hey Guozhang, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the >> KIP! >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> In the scenario you described, >>> let's >>>>>>> assume >>>>>>>>> that >>>>>>>>>>>>> broker >>>>>>>>>>>>>>> A has >>>>>>>>>>>>>>>>>>>>> messages >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>>> offset up to 10, and broker B has >>>>>> messages >>>>>>>>> with >>>>>>>>>>>>> offset >>>>>>>>>>>>>>> up to >>>>>>>>>>>>>>>>> 20. >>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>>>> consumer wants to consume message >>>>> with >>>>>>>> offset >>>>>>>>>> 9, it >>>>>>>>>>>>> will >>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>> receive >>>>>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException >>>>>>>>>>>>>>>>>>>>>>>> from broker A. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> If consumer wants to consume >>> message >>>>>> with >>>>>>>>> offset >>>>>>>>>>>>> 16, then >>>>>>>>>>>>>>>>>>> consumer >>>>>>>>>>>>>>>>>>>>> must >>>>>>>>>>>>>>>>>>>>>>>> have already fetched message with >>>>> offset >>>>>>> 15, >>>>>>>>>> which >>>>>>>>>>>>> can >>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>> come >>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>> broker B. Because consumer will >>> fetch >>>>>> from >>>>>>>>>> broker B >>>>>>>>>>>>> only >>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>>>>> leaderEpoch >>>>>>>>>>>>>>>>>>>>>>>> = >>>>>>>>>>>>>>>>>>>>>>>> 2, then the current consumer >>>>> leaderEpoch >>>>>>> can >>>>>>>>>> not be >>>>>>>>>>>>> 1 >>>>>>>>>>>>>>> since >>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> KIP >>>>>>>>>>>>>>>>>>>>>>>> prevents leaderEpoch rewind. Thus >>> we >>>>>> will >>>>>>>> not >>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException >>>>>>>>>>>>>>>>>>>>>>>> in this case. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Does this address your question, >> or >>>>>> maybe >>>>>>>>> there >>>>>>>>>> is >>>>>>>>>>>>> more >>>>>>>>>>>>>>>>> advanced >>>>>>>>>>>>>>>>>>>>>> scenario >>>>>>>>>>>>>>>>>>>>>>>> that the KIP does not handle? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 9:43 PM, >>>>>> Guozhang >>>>>>>>> Wang < >>>>>>>>>>>>>>>>>>> wangg...@gmail.com> >>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks Dong, I made a pass over >>> the >>>>>> wiki >>>>>>>> and >>>>>>>>>> it >>>>>>>>>>>>> lgtm. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Just a quick question: can we >>>>>> completely >>>>>>>>>>>>> eliminate the >>>>>>>>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException with >>> this >>>>>>>>> approach? >>>>>>>>>> Say >>>>>>>>>>>>> if >>>>>>>>>>>>>>> there >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> consecutive >>>>>>>>>>>>>>>>>>>>>>>>> leader changes such that the >>> cached >>>>>>>>> metadata's >>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>> epoch >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> 1, >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> the metadata fetch response >>> returns >>>>>>> with >>>>>>>>>>>>> partition >>>>>>>>>>>>>>> epoch 2 >>>>>>>>>>>>>>>>>>>>> pointing >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>>> leader broker A, while the >> actual >>>>>>>> up-to-date >>>>>>>>>>>>> metadata >>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>>>>>>>>> epoch 3 >>>>>>>>>>>>>>>>>>>>>>>>> whose leader is now broker B, >> the >>>>>>> metadata >>>>>>>>>>>>> refresh will >>>>>>>>>>>>>>>>> still >>>>>>>>>>>>>>>>>>>>> succeed >>>>>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>>>>> the follow-up fetch request may >>>>> still >>>>>>> see >>>>>>>>>> OORE? >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 23, 2018 at 3:47 >> PM, >>>>> Dong >>>>>>> Lin >>>>>>>> < >>>>>>>>>>>>>>>>> lindon...@gmail.com >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I would like to start the >>> voting >>>>>>> process >>>>>>>>> for >>>>>>>>>>>>> KIP-232: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/ >>>>>>>>>>>>>>> confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>>>>>>>>> >> 232%3A+Detect+outdated+metadat >>>>>>>>>>>>> a+using+leaderEpoch+ >>>>>>>>>>>>>>>>>>>>>> and+partitionEpoch >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> The KIP will help fix a >>>>> concurrency >>>>>>>> issue >>>>>>>>> in >>>>>>>>>>>>> Kafka >>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> currently >>>>>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>>>> cause message loss or message >>>>>>>> duplication >>>>>>>>> in >>>>>>>>>>>>>>> consumer. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature