Hey Matthias, Thanks for checking back on the status. The KIP has been marked as replaced by KIP-320 in the KIP list wiki page and the status has been updated in the discussion and voting email thread.
Thanks, Dong On Sun, 30 Sep 2018 at 11:51 AM Matthias J. Sax <matth...@confluent.io> wrote: > It seems that KIP-320 was accepted. Thus, I am wondering what the status > of this KIP is? > > -Matthias > > On 7/11/18 10:59 AM, Dong Lin wrote: > > Hey Jun, > > > > Certainly. We can discuss later after KIP-320 settles. > > > > Thanks! > > Dong > > > > > > On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote: > > > >> Hi, Dong, > >> > >> Sorry for the late response. Since KIP-320 is covering some of the > similar > >> problems described in this KIP, perhaps we can wait until KIP-320 > settles > >> and see what's still left uncovered in this KIP. > >> > >> Thanks, > >> > >> Jun > >> > >> On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote: > >> > >>> Hey Jun, > >>> > >>> It seems that we have made considerable progress on the discussion of > >>> KIP-253 since February. Do you think we should continue the discussion > >>> there, or can we continue the voting for this KIP? I am happy to submit > >> the > >>> PR and move forward the progress for this KIP. > >>> > >>> Thanks! > >>> Dong > >>> > >>> > >>> On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote: > >>> > >>>> Hey Jun, > >>>> > >>>> Sure, I will come up with a KIP this week. I think there is a way to > >>> allow > >>>> partition expansion to arbitrary number without introducing new > >> concepts > >>>> such as read-only partition or repartition epoch. > >>>> > >>>> Thanks, > >>>> Dong > >>>> > >>>> On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote: > >>>> > >>>>> Hi, Dong, > >>>>> > >>>>> Thanks for the reply. The general idea that you had for adding > >>> partitions > >>>>> is similar to what we had in mind. It would be useful to make this > >> more > >>>>> general, allowing adding an arbitrary number of partitions (instead > of > >>>>> just > >>>>> doubling) and potentially removing partitions as well. The following > >> is > >>>>> the > >>>>> high level idea from the discussion with Colin, Jason and Ismael. > >>>>> > >>>>> * To change the number of partitions from X to Y in a topic, the > >>>>> controller > >>>>> marks all existing X partitions as read-only and creates Y new > >>> partitions. > >>>>> The new partitions are writable and are tagged with a higher > >> repartition > >>>>> epoch (RE). > >>>>> > >>>>> * The controller propagates the new metadata to every broker. Once > the > >>>>> leader of a partition is marked as read-only, it rejects the produce > >>>>> requests on this partition. The producer will then refresh the > >> metadata > >>>>> and > >>>>> start publishing to the new writable partitions. > >>>>> > >>>>> * The consumers will then be consuming messages in RE order. The > >>> consumer > >>>>> coordinator will only assign partitions in the same RE to consumers. > >>> Only > >>>>> after all messages in an RE are consumed, will partitions in a higher > >> RE > >>>>> be > >>>>> assigned to consumers. > >>>>> > >>>>> As Colin mentioned, if we do the above, we could potentially (1) use > a > >>>>> globally unique partition id, or (2) use a globally unique topic id > to > >>>>> distinguish recreated partitions due to topic deletion. > >>>>> > >>>>> So, perhaps we can sketch out the re-partitioning KIP a bit more and > >> see > >>>>> if > >>>>> there is any overlap with KIP-232. Would you be interested in doing > >>> that? > >>>>> If not, we can do that next week. > >>>>> > >>>>> Jun > >>>>> > >>>>> > >>>>> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com> > >> wrote: > >>>>> > >>>>>> Hey Jun, > >>>>>> > >>>>>> Interestingly I am also planning to sketch a KIP to allow partition > >>>>>> expansion for keyed topics after this KIP. Since you are already > >> doing > >>>>>> that, I guess I will just share my high level idea here in case it > >> is > >>>>>> helpful. > >>>>>> > >>>>>> The motivation for the KIP is that we currently lose order guarantee > >>> for > >>>>>> messages with the same key if we expand partitions of keyed topic. > >>>>>> > >>>>>> The solution can probably be built upon the following ideas: > >>>>>> > >>>>>> - Partition number of the keyed topic should always be doubled (or > >>>>>> multiplied by power of 2). Given that we select a partition based on > >>>>>> hash(key) % partitionNum, this should help us ensure that, a message > >>>>>> assigned to an existing partition will not be mapped to another > >>> existing > >>>>>> partition after partition expansion. > >>>>>> > >>>>>> - Producer includes in the ProduceRequest some information that > >> helps > >>>>>> ensure that messages produced ti a partition will monotonically > >>>>> increase in > >>>>>> the partitionNum of the topic. In other words, if broker receives a > >>>>>> ProduceRequest and notices that the producer does not know the > >>> partition > >>>>>> number has increased, broker should reject this request. That > >>>>> "information" > >>>>>> maybe leaderEpoch, max partitionEpoch of the partitions of the > >> topic, > >>> or > >>>>>> simply partitionNum of the topic. The benefit of this property is > >> that > >>>>> we > >>>>>> can keep the new logic for in-order message consumption entirely in > >>> how > >>>>>> consumer leader determines the partition -> consumer mapping. > >>>>>> > >>>>>> - When consumer leader determines partition -> consumer mapping, > >>> leader > >>>>>> first reads the start position for each partition using > >>>>> OffsetFetchRequest. > >>>>>> If start position are all non-zero, then assignment can be done in > >> its > >>>>>> current manner. The assumption is that, a message in the new > >> partition > >>>>>> should only be consumed after all messages with the same key > >> produced > >>>>>> before it has been consumed. Since some messages in the new > >> partition > >>>>> has > >>>>>> been consumed, we should not worry about consuming messages > >>>>> out-of-order. > >>>>>> This benefit of this approach is that we can avoid unnecessary > >>> overhead > >>>>> in > >>>>>> the common case. > >>>>>> > >>>>>> - If the consumer leader finds that the start position for some > >>>>> partition > >>>>>> is 0. Say the current partition number is 18 and the partition index > >>> is > >>>>> 12, > >>>>>> then consumer leader should ensure that messages produced to > >> partition > >>>>> 12 - > >>>>>> 18/2 = 3 before the first message of partition 12 is consumed, > >> before > >>> it > >>>>>> assigned partition 12 to any consumer in the consumer group. Since > >> we > >>>>> have > >>>>>> a "information" that is monotonically increasing per partition, > >>> consumer > >>>>>> can read the value of this information from the first message in > >>>>> partition > >>>>>> 12, get the offset corresponding to this value in partition 3, > >> assign > >>>>>> partition except for partition 12 (and probably other new > >> partitions) > >>> to > >>>>>> the existing consumers, waiting for the committed offset to go > >> beyond > >>>>> this > >>>>>> offset for partition 3, and trigger rebalance again so that > >> partition > >>> 3 > >>>>> can > >>>>>> be reassigned to some consumer. > >>>>>> > >>>>>> > >>>>>> Thanks, > >>>>>> Dong > >>>>>> > >>>>>> > >>>>>> On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote: > >>>>>> > >>>>>>> Hi, Dong, > >>>>>>> > >>>>>>> Thanks for the KIP. It looks good overall. We are working on a > >>>>> separate > >>>>>> KIP > >>>>>>> for adding partitions while preserving the ordering guarantees. > >> That > >>>>> may > >>>>>>> require another flavor of partition epoch. It's not very clear > >>> whether > >>>>>> that > >>>>>>> partition epoch can be merged with the partition epoch in this > >> KIP. > >>>>> So, > >>>>>>> perhaps you can wait on this a bit until we post the other KIP in > >>> the > >>>>>> next > >>>>>>> few days. > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com> > >>>>> wrote: > >>>>>>> > >>>>>>>> +1 on the KIP. > >>>>>>>> > >>>>>>>> I think the KIP is mainly about adding the capability of > >> tracking > >>>>> the > >>>>>>>> system state change lineage. It does not seem necessary to > >> bundle > >>>>> this > >>>>>>> KIP > >>>>>>>> with replacing the topic partition with partition epoch in > >>>>>> produce/fetch. > >>>>>>>> Replacing topic-partition string with partition epoch is > >>>>> essentially a > >>>>>>>> performance improvement on top of this KIP. That can probably be > >>>>> done > >>>>>>>> separately. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> > >>>>>>>> Jiangjie (Becket) Qin > >>>>>>>> > >>>>>>>> On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com > >>> > >>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hey Colin, > >>>>>>>>> > >>>>>>>>> On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe < > >>>>> cmcc...@apache.org> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>>> On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin < > >>>>> lindon...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hey Colin, > >>>>>>>>>>>> > >>>>>>>>>>>> I understand that the KIP will adds overhead by > >>> introducing > >>>>>>>>>> per-partition > >>>>>>>>>>>> partitionEpoch. I am open to alternative solutions that > >>> does > >>>>>> not > >>>>>>>>> incur > >>>>>>>>>>>> additional overhead. But I don't see a better way now. > >>>>>>>>>>>> > >>>>>>>>>>>> IMO the overhead in the FetchResponse may not be that > >>> much. > >>>>> We > >>>>>>>>> probably > >>>>>>>>>>>> should discuss the percentage increase rather than the > >>>>> absolute > >>>>>>>>> number > >>>>>>>>>>>> increase. Currently after KIP-227, per-partition header > >>> has > >>>>> 23 > >>>>>>>> bytes. > >>>>>>>>>> This > >>>>>>>>>>>> KIP adds another 4 bytes. Assume the records size is > >> 10KB, > >>>>> the > >>>>>>>>>> percentage > >>>>>>>>>>>> increase is 4 / (23 + 10000) = 0.03%. It seems > >> negligible, > >>>>>> right? > >>>>>>>>>> > >>>>>>>>>> Hi Dong, > >>>>>>>>>> > >>>>>>>>>> Thanks for the response. I agree that the FetchRequest / > >>>>>>> FetchResponse > >>>>>>>>>> overhead should be OK, now that we have incremental fetch > >>>>> requests > >>>>>>> and > >>>>>>>>>> responses. However, there are a lot of cases where the > >>>>> percentage > >>>>>>>>> increase > >>>>>>>>>> is much greater. For example, if a client is doing full > >>>>>>>>> MetadataRequests / > >>>>>>>>>> Responses, we have some math kind of like this per > >> partition: > >>>>>>>>>> > >>>>>>>>>>> UpdateMetadataRequestPartitionState => topic partition > >>>>>>>>> controller_epoch > >>>>>>>>>> leader leader_epoch partition_epoch isr zk_version replicas > >>>>>>>>>> offline_replicas > >>>>>>>>>>> 14 bytes: topic => string (assuming about 10 byte topic > >>>>> names) > >>>>>>>>>>> 4 bytes: partition => int32 > >>>>>>>>>>> 4 bytes: conroller_epoch => int32 > >>>>>>>>>>> 4 bytes: leader => int32 > >>>>>>>>>>> 4 bytes: leader_epoch => int32 > >>>>>>>>>>> +4 EXTRA bytes: partition_epoch => int32 <-- NEW > >>>>>>>>>>> 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR) > >>>>>>>>>>> 4 bytes: zk_version => int32 > >>>>>>>>>>> 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas) > >>>>>>>>>>> 2 offline_replicas => [int32] (assuming no offline > >>> replicas) > >>>>>>>>>> > >>>>>>>>>> Assuming I added that up correctly, the per-partition > >> overhead > >>>>> goes > >>>>>>>> from > >>>>>>>>>> 64 bytes per partition to 68, a 6.2% increase. > >>>>>>>>>> > >>>>>>>>>> We could do similar math for a lot of the other RPCs. And > >> you > >>>>> will > >>>>>>>> have > >>>>>>>>> a > >>>>>>>>>> similar memory and garbage collection impact on the brokers > >>>>> since > >>>>>> you > >>>>>>>>> have > >>>>>>>>>> to store all this extra state as well. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> That is correct. IMO the Metadata is only updated periodically > >>>>> and is > >>>>>>>>> probably not a big deal if we increase it by 6%. The > >>> FetchResponse > >>>>>> and > >>>>>>>>> ProduceRequest are probably the only requests that are bounded > >>> by > >>>>> the > >>>>>>>>> bandwidth throughput. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> I agree that we can probably save more space by using > >>>>> partition > >>>>>>> ID > >>>>>>>> so > >>>>>>>>>> that > >>>>>>>>>>>> we no longer needs the string topic name. The similar > >> idea > >>>>> has > >>>>>>> also > >>>>>>>>>> been > >>>>>>>>>>>> put in the Rejected Alternative section in KIP-227. > >> While > >>>>> this > >>>>>>> idea > >>>>>>>>> is > >>>>>>>>>>>> promising, it seems orthogonal to the goal of this KIP. > >>>>> Given > >>>>>>> that > >>>>>>>>>> there is > >>>>>>>>>>>> already many work to do in this KIP, maybe we can do the > >>>>>>> partition > >>>>>>>> ID > >>>>>>>>>> in a > >>>>>>>>>>>> separate KIP? > >>>>>>>>>> > >>>>>>>>>> I guess my thinking is that the goal here is to replace an > >>>>>> identifier > >>>>>>>>>> which can be re-used (the tuple of topic name, partition ID) > >>>>> with > >>>>>> an > >>>>>>>>>> identifier that cannot be re-used (the tuple of topic name, > >>>>>> partition > >>>>>>>> ID, > >>>>>>>>>> partition epoch) in order to gain better semantics. As long > >>> as > >>>>> we > >>>>>>> are > >>>>>>>>>> replacing the identifier, why not replace it with an > >>> identifier > >>>>>> that > >>>>>>>> has > >>>>>>>>>> important performance advantages? The KIP freeze for the > >> next > >>>>>>> release > >>>>>>>>> has > >>>>>>>>>> already passed, so there is time to do this. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> In general it can be easier for discussion and implementation > >> if > >>>>> we > >>>>>> can > >>>>>>>>> split a larger task into smaller and independent tasks. For > >>>>> example, > >>>>>>>>> KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, > >>>>> KIP-32 > >>>>>>> and > >>>>>>>>> KIP-33 are about timestamp support. The option on this can be > >>>>> subject > >>>>>>>>> though. > >>>>>>>>> > >>>>>>>>> IMO the change to switch from (topic, partition ID) to > >>>>> partitionEpch > >>>>>> in > >>>>>>>> all > >>>>>>>>> request/response requires us to going through all request one > >> by > >>>>> one. > >>>>>>> It > >>>>>>>>> may not be hard but it can be time consuming and tedious. At > >>> high > >>>>>> level > >>>>>>>> the > >>>>>>>>> goal and the change for that will be orthogonal to the changes > >>>>>> required > >>>>>>>> in > >>>>>>>>> this KIP. That is the main reason I think we can split them > >> into > >>>>> two > >>>>>>>> KIPs. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote: > >>>>>>>>>>> I think it is possible to move to entirely use > >>> partitionEpoch > >>>>>>> instead > >>>>>>>>> of > >>>>>>>>>>> (topic, partition) to identify a partition. Client can > >>> obtain > >>>>> the > >>>>>>>>>>> partitionEpoch -> (topic, partition) mapping from > >>>>>> MetadataResponse. > >>>>>>>> We > >>>>>>>>>>> probably need to figure out a way to assign partitionEpoch > >>> to > >>>>>>>> existing > >>>>>>>>>>> partitions in the cluster. But this should be doable. > >>>>>>>>>>> > >>>>>>>>>>> This is a good idea. I think it will save us some space in > >>> the > >>>>>>>>>>> request/response. The actual space saving in percentage > >>>>> probably > >>>>>>>>> depends > >>>>>>>>>> on > >>>>>>>>>>> the amount of data and the number of partitions of the > >> same > >>>>>> topic. > >>>>>>> I > >>>>>>>>> just > >>>>>>>>>>> think we can do it in a separate KIP. > >>>>>>>>>> > >>>>>>>>>> Hmm. How much extra work would be required? It seems like > >> we > >>>>> are > >>>>>>>>> already > >>>>>>>>>> changing almost every RPC that involves topics and > >> partitions, > >>>>>>> already > >>>>>>>>>> adding new per-partition state to ZooKeeper, already > >> changing > >>>>> how > >>>>>>>> clients > >>>>>>>>>> interact with partitions. Is there some other big piece of > >>> work > >>>>>> we'd > >>>>>>>>> have > >>>>>>>>>> to do to move to partition IDs that we wouldn't need for > >>>>> partition > >>>>>>>>> epochs? > >>>>>>>>>> I guess we'd have to find a way to support regular > >>>>> expression-based > >>>>>>>> topic > >>>>>>>>>> subscriptions. If we split this into multiple KIPs, > >> wouldn't > >>> we > >>>>>> end > >>>>>>> up > >>>>>>>>>> changing all that RPCs and ZK state a second time? Also, > >> I'm > >>>>>> curious > >>>>>>>> if > >>>>>>>>>> anyone has done any proof of concept GC, memory, and network > >>>>> usage > >>>>>>>>>> measurements on switching topic names for topic IDs. > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> We will need to go over all requests/responses to check how to > >>>>>> replace > >>>>>>>>> (topic, partition ID) with partition epoch. It requires > >>>>> non-trivial > >>>>>>> work > >>>>>>>>> and could take time. As you mentioned, we may want to see how > >>> much > >>>>>>> saving > >>>>>>>>> we can get by switching from topic names to partition epoch. > >>> That > >>>>>>> itself > >>>>>>>>> requires time and experiment. It seems that the new idea does > >>> not > >>>>>>>> rollback > >>>>>>>>> any change proposed in this KIP. So I am not sure we can get > >>> much > >>>>> by > >>>>>>>>> putting them into the same KIP. > >>>>>>>>> > >>>>>>>>> Anyway, if more people are interested in seeing the new idea > >> in > >>>>> the > >>>>>>> same > >>>>>>>>> KIP, I can try that. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> best, > >>>>>>>>>> Colin > >>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe < > >>>>>>> cmcc...@apache.org > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote: > >>>>>>>>>>>>>> Hey Colin, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe < > >>>>>>>>> cmcc...@apache.org> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: > >>>>>>>>>>>>>>>> Hey Colin, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Thanks for the comment. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe < > >>>>>>>>>> cmcc...@apache.org> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: > >>>>>>>>>>>>>>>>>> Hey Colin, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks for reviewing the KIP. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> If I understand you right, you maybe > >> suggesting > >>>>> that > >>>>>>> we > >>>>>>>>> can > >>>>>>>>>> use > >>>>>>>>>>>>> a > >>>>>>>>>>>>>>> global > >>>>>>>>>>>>>>>>>> metadataEpoch that is incremented every time > >>>>>>> controller > >>>>>>>>>> updates > >>>>>>>>>>>>>>> metadata. > >>>>>>>>>>>>>>>>>> The problem with this solution is that, if a > >>>>> topic > >>>>>> is > >>>>>>>>>> deleted > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>> created > >>>>>>>>>>>>>>>>>> again, user will not know whether that the > >>> offset > >>>>>>> which > >>>>>>>> is > >>>>>>>>>>>>> stored > >>>>>>>>>>>>>>> before > >>>>>>>>>>>>>>>>>> the topic deletion is no longer valid. This > >>>>>> motivates > >>>>>>>> the > >>>>>>>>>> idea > >>>>>>>>>>>>> to > >>>>>>>>>>>>>>> include > >>>>>>>>>>>>>>>>>> per-partition partitionEpoch. Does this sound > >>>>>>>> reasonable? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Hi Dong, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Perhaps we can store the last valid offset of > >>> each > >>>>>>> deleted > >>>>>>>>>> topic > >>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> ZooKeeper. Then, when a topic with one of > >> those > >>>>> names > >>>>>>>> gets > >>>>>>>>>>>>>>> re-created, we > >>>>>>>>>>>>>>>>> can start the topic at the previous end offset > >>>>> rather > >>>>>>> than > >>>>>>>>> at > >>>>>>>>>> 0. > >>>>>>>>>>>>> This > >>>>>>>>>>>>>>>>> preserves immutability. It is no more > >> burdensome > >>>>> than > >>>>>>>>> having > >>>>>>>>>> to > >>>>>>>>>>>>>>> preserve a > >>>>>>>>>>>>>>>>> "last epoch" for the deleted partition > >> somewhere, > >>>>>> right? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> My concern with this solution is that the number > >> of > >>>>>>>> zookeeper > >>>>>>>>>> nodes > >>>>>>>>>>>>> get > >>>>>>>>>>>>>>>> more and more over time if some users keep > >> deleting > >>>>> and > >>>>>>>>> creating > >>>>>>>>>>>>> topics. > >>>>>>>>>>>>>>> Do > >>>>>>>>>>>>>>>> you think this can be a problem? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Dong, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> We could expire the "partition tombstones" after an > >>>>> hour > >>>>>> or > >>>>>>>> so. > >>>>>>>>>> In > >>>>>>>>>>>>>>> practice this would solve the issue for clients > >> that > >>>>> like > >>>>>> to > >>>>>>>>>> destroy > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>> re-create topics all the time. In any case, > >> doesn't > >>>>> the > >>>>>>>> current > >>>>>>>>>>>>> proposal > >>>>>>>>>>>>>>> add per-partition znodes as well that we have to > >>> track > >>>>>> even > >>>>>>>>> after > >>>>>>>>>> the > >>>>>>>>>>>>>>> partition is deleted? Or did I misunderstand that? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Actually the current KIP does not add per-partition > >>>>> znodes. > >>>>>>>> Could > >>>>>>>>>> you > >>>>>>>>>>>>>> double check? I can fix the KIP wiki if there is > >>> anything > >>>>>>>>>> misleading. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Dong, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I double-checked the KIP, and I can see that you are in > >>>>> fact > >>>>>>>> using a > >>>>>>>>>>>>> global counter for initializing partition epochs. So, > >>> you > >>>>> are > >>>>>>>>>> correct, it > >>>>>>>>>>>>> doesn't add per-partition znodes for partitions that no > >>>>> longer > >>>>>>>>> exist. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> If we expire the "partition tomstones" after an hour, > >>> and > >>>>>> the > >>>>>>>>> topic > >>>>>>>>>> is > >>>>>>>>>>>>>> re-created after more than an hour since the topic > >>>>> deletion, > >>>>>>>> then > >>>>>>>>>> we are > >>>>>>>>>>>>>> back to the situation where user can not tell whether > >>> the > >>>>>>> topic > >>>>>>>>> has > >>>>>>>>>> been > >>>>>>>>>>>>>> re-created or not, right? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Yes, with an expiration period, it would not ensure > >>>>>>> immutability-- > >>>>>>>>> you > >>>>>>>>>>>>> could effectively reuse partition names and they would > >>> look > >>>>>> the > >>>>>>>>> same. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> It's not really clear to me what should happen > >> when a > >>>>>> topic > >>>>>>> is > >>>>>>>>>>>>> destroyed > >>>>>>>>>>>>>>> and re-created with new data. Should consumers > >>>>> continue > >>>>>> to > >>>>>>> be > >>>>>>>>>> able to > >>>>>>>>>>>>>>> consume? We don't know where they stopped > >> consuming > >>>>> from > >>>>>>> the > >>>>>>>>>> previous > >>>>>>>>>>>>>>> incarnation of the topic, so messages may have been > >>>>> lost. > >>>>>>>>>> Certainly > >>>>>>>>>>>>>>> consuming data from offset X of the new incarnation > >>> of > >>>>> the > >>>>>>>> topic > >>>>>>>>>> may > >>>>>>>>>>>>> give > >>>>>>>>>>>>>>> something totally different from what you would > >> have > >>>>>> gotten > >>>>>>>> from > >>>>>>>>>>>>> offset X > >>>>>>>>>>>>>>> of the previous incarnation of the topic. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> With the current KIP, if a consumer consumes a topic > >>>>> based > >>>>>> on > >>>>>>>> the > >>>>>>>>>> last > >>>>>>>>>>>>>> remembered (offset, partitionEpoch, leaderEpoch), and > >>> if > >>>>> the > >>>>>>>> topic > >>>>>>>>>> is > >>>>>>>>>>>>>> re-created, consume will throw > >>>>>> InvalidPartitionEpochException > >>>>>>>>>> because > >>>>>>>>>>>>> the > >>>>>>>>>>>>>> previous partitionEpoch will be different from the > >>>>> current > >>>>>>>>>>>>> partitionEpoch. > >>>>>>>>>>>>>> This is described in the Proposed Changes -> > >>> Consumption > >>>>>> after > >>>>>>>>> topic > >>>>>>>>>>>>>> deletion in the KIP. I can improve the KIP if there > >> is > >>>>>>> anything > >>>>>>>>> not > >>>>>>>>>>>>> clear. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the clarification. It sounds like what you > >>>>> really > >>>>>>> want > >>>>>>>>> is > >>>>>>>>>>>>> immutability-- i.e., to never "really" reuse partition > >>>>>>>> identifiers. > >>>>>>>>>> And > >>>>>>>>>>>>> you do this by making the partition name no longer the > >>>>> "real" > >>>>>>>>>> identifier. > >>>>>>>>>>>>> > >>>>>>>>>>>>> My big concern about this KIP is that it seems like an > >>>>>>>>>> anti-scalability > >>>>>>>>>>>>> feature. Now we are adding 4 extra bytes for every > >>>>> partition > >>>>>> in > >>>>>>>> the > >>>>>>>>>>>>> FetchResponse and Request, for example. That could be > >> 40 > >>>>> kb > >>>>>> per > >>>>>>>>>> request, > >>>>>>>>>>>>> if the user has 10,000 partitions. And of course, the > >>> KIP > >>>>>> also > >>>>>>>>> makes > >>>>>>>>>>>>> massive changes to UpdateMetadataRequest, > >>> MetadataResponse, > >>>>>>>>>>>>> OffsetCommitRequest, OffsetFetchResponse, > >>>>> LeaderAndIsrRequest, > >>>>>>>>>>>>> ListOffsetResponse, etc. which will also increase their > >>>>> size > >>>>>> on > >>>>>>>> the > >>>>>>>>>> wire > >>>>>>>>>>>>> and in memory. > >>>>>>>>>>>>> > >>>>>>>>>>>>> One thing that we talked a lot about in the past is > >>>>> replacing > >>>>>>>>>> partition > >>>>>>>>>>>>> names with IDs. IDs have a lot of really nice > >> features. > >>>>> They > >>>>>>>> take > >>>>>>>>>> up much > >>>>>>>>>>>>> less space in memory than strings (especially 2-byte > >> Java > >>>>>>>> strings). > >>>>>>>>>> They > >>>>>>>>>>>>> can often be allocated on the stack rather than the > >> heap > >>>>>>>> (important > >>>>>>>>>> when > >>>>>>>>>>>>> you are dealing with hundreds of thousands of them). > >>> They > >>>>> can > >>>>>>> be > >>>>>>>>>>>>> efficiently deserialized and serialized. If we use > >>> 64-bit > >>>>>> ones, > >>>>>>>> we > >>>>>>>>>> will > >>>>>>>>>>>>> never run out of IDs, which means that they can always > >> be > >>>>>> unique > >>>>>>>> per > >>>>>>>>>>>>> partition. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Given that the partition name is no longer the "real" > >>>>>> identifier > >>>>>>>> for > >>>>>>>>>>>>> partitions in the current KIP-232 proposal, why not > >> just > >>>>> move > >>>>>> to > >>>>>>>>> using > >>>>>>>>>>>>> partition IDs entirely instead of strings? You have to > >>>>> change > >>>>>>> all > >>>>>>>>> the > >>>>>>>>>>>>> messages anyway. There isn't much point any more to > >>>>> carrying > >>>>>>>> around > >>>>>>>>>> the > >>>>>>>>>>>>> partition name in every RPC, since you really need > >> (name, > >>>>>> epoch) > >>>>>>>> to > >>>>>>>>>>>>> identify the partition. > >>>>>>>>>>>>> Probably the metadata response and a few other messages > >>>>> would > >>>>>>> have > >>>>>>>>> to > >>>>>>>>>>>>> still carry the partition name, to allow clients to go > >>> from > >>>>>> name > >>>>>>>> to > >>>>>>>>>> id. > >>>>>>>>>>>>> But we could mostly forget about the strings. And then > >>>>> this > >>>>>>> would > >>>>>>>>> be > >>>>>>>>>> a > >>>>>>>>>>>>> scalability improvement rather than a scalability > >>> problem. > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> By choosing to reuse the same (topic, partition, > >>>>> offset) > >>>>>>>>> 3-tuple, > >>>>>>>>>> we > >>>>>>>>>>>>> have > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> chosen to give up immutability. That was a really > >> bad > >>>>>>> decision. > >>>>>>>>>> And > >>>>>>>>>>>>> now > >>>>>>>>>>>>>>> we have to worry about time dependencies, stale > >>> cached > >>>>>> data, > >>>>>>>> and > >>>>>>>>>> all > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> rest. We can't completely fix this inside Kafka no > >>>>> matter > >>>>>>>> what > >>>>>>>>>> we do, > >>>>>>>>>>>>>>> because not all that cached data is inside Kafka > >>>>> itself. > >>>>>>> Some > >>>>>>>>> of > >>>>>>>>>> it > >>>>>>>>>>>>> may be > >>>>>>>>>>>>>>> in systems that Kafka has sent data to, such as > >> other > >>>>>>> daemons, > >>>>>>>>> SQL > >>>>>>>>>>>>>>> databases, streams, and so forth. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The current KIP will uniquely identify a message > >> using > >>>>>> (topic, > >>>>>>>>>>>>> partition, > >>>>>>>>>>>>>> offset, partitionEpoch) 4-tuple. This addresses the > >>>>> message > >>>>>>>>>> immutability > >>>>>>>>>>>>>> issue that you mentioned. Is there any corner case > >>> where > >>>>> the > >>>>>>>>> message > >>>>>>>>>>>>>> immutability is still not preserved with the current > >>> KIP? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I guess the idea here is that mirror maker should > >>> work > >>>>> as > >>>>>>>>> expected > >>>>>>>>>>>>> when > >>>>>>>>>>>>>>> users destroy a topic and re-create it with the > >> same > >>>>> name. > >>>>>>>>> That's > >>>>>>>>>>>>> kind of > >>>>>>>>>>>>>>> tough, though, since in that scenario, mirror maker > >>>>>> probably > >>>>>>>>>> should > >>>>>>>>>>>>> destroy > >>>>>>>>>>>>>>> and re-create the topic on the other end, too, > >> right? > >>>>>>>>> Otherwise, > >>>>>>>>>>>>> what you > >>>>>>>>>>>>>>> end up with on the other end could be half of one > >>>>>>> incarnation > >>>>>>>> of > >>>>>>>>>> the > >>>>>>>>>>>>> topic, > >>>>>>>>>>>>>>> and half of another. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> What mirror maker really needs is to be able to > >>> follow > >>>>> a > >>>>>>>> stream > >>>>>>>>> of > >>>>>>>>>>>>> events > >>>>>>>>>>>>>>> about the kafka cluster itself. We could have some > >>>>> master > >>>>>>>> topic > >>>>>>>>>>>>> which is > >>>>>>>>>>>>>>> always present and which contains data about all > >>> topic > >>>>>>>>> deletions, > >>>>>>>>>>>>>>> creations, etc. Then MM can simply follow this > >> topic > >>>>> and > >>>>>> do > >>>>>>>>> what > >>>>>>>>>> is > >>>>>>>>>>>>> needed. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Then the next question maybe, should we use a > >>>>> global > >>>>>>>>>>>>> metadataEpoch + > >>>>>>>>>>>>>>>>>> per-partition partitionEpoch, instead of > >> using > >>>>>>>>> per-partition > >>>>>>>>>>>>>>> leaderEpoch > >>>>>>>>>>>>>>>>> + > >>>>>>>>>>>>>>>>>> per-partition leaderEpoch. The former > >> solution > >>>>> using > >>>>>>>>>>>>> metadataEpoch > >>>>>>>>>>>>>>> would > >>>>>>>>>>>>>>>>>> not work due to the following scenario > >>> (provided > >>>>> by > >>>>>>>> Jun): > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> "Consider the following scenario. In metadata > >>> v1, > >>>>>> the > >>>>>>>>> leader > >>>>>>>>>>>>> for a > >>>>>>>>>>>>>>>>>> partition is at broker 1. In metadata v2, > >>> leader > >>>>> is > >>>>>> at > >>>>>>>>>> broker > >>>>>>>>>>>>> 2. In > >>>>>>>>>>>>>>>>>> metadata v3, leader is at broker 1 again. The > >>>>> last > >>>>>>>>> committed > >>>>>>>>>>>>> offset > >>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>> v1, > >>>>>>>>>>>>>>>>>> v2 and v3 are 10, 20 and 30, respectively. A > >>>>>> consumer > >>>>>>> is > >>>>>>>>>>>>> started and > >>>>>>>>>>>>>>>>> reads > >>>>>>>>>>>>>>>>>> metadata v1 and reads messages from offset 0 > >> to > >>>>> 25 > >>>>>>> from > >>>>>>>>>> broker > >>>>>>>>>>>>> 1. My > >>>>>>>>>>>>>>>>>> understanding is that in the current > >> proposal, > >>>>> the > >>>>>>>>> metadata > >>>>>>>>>>>>> version > >>>>>>>>>>>>>>>>>> associated with offset 25 is v1. The consumer > >>> is > >>>>>> then > >>>>>>>>>> restarted > >>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>> fetches > >>>>>>>>>>>>>>>>>> metadata v2. The consumer tries to read from > >>>>> broker > >>>>>> 2, > >>>>>>>>>> which is > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> old > >>>>>>>>>>>>>>>>>> leader with the last offset at 20. In this > >>> case, > >>>>> the > >>>>>>>>>> consumer > >>>>>>>>>>>>> will > >>>>>>>>>>>>>>> still > >>>>>>>>>>>>>>>>>> get OffsetOutOfRangeException incorrectly." > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regarding your comment "For the second > >> purpose, > >>>>> this > >>>>>>> is > >>>>>>>>>> "soft > >>>>>>>>>>>>> state" > >>>>>>>>>>>>>>>>>> anyway. If the client thinks X is the leader > >>>>> but Y > >>>>>> is > >>>>>>>>>> really > >>>>>>>>>>>>> the > >>>>>>>>>>>>>>> leader, > >>>>>>>>>>>>>>>>>> the client will talk to X, and X will point > >> out > >>>>> its > >>>>>>>>> mistake > >>>>>>>>>> by > >>>>>>>>>>>>>>> sending > >>>>>>>>>>>>>>>>> back > >>>>>>>>>>>>>>>>>> a NOT_LEADER_FOR_PARTITION.", it is probably > >> no > >>>>>> true. > >>>>>>>> The > >>>>>>>>>>>>> problem > >>>>>>>>>>>>>>> here is > >>>>>>>>>>>>>>>>>> that the old leader X may still think it is > >> the > >>>>>> leader > >>>>>>>> of > >>>>>>>>>> the > >>>>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>> thus it will not send back > >>>>> NOT_LEADER_FOR_PARTITION. > >>>>>>> The > >>>>>>>>>> reason > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>> provided > >>>>>>>>>>>>>>>>>> in KAFKA-6262. Can you check if that makes > >>> sense? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> This is solvable with a timeout, right? If the > >>>>> leader > >>>>>>>> can't > >>>>>>>>>>>>>>> communicate > >>>>>>>>>>>>>>>>> with the controller for a certain period of > >> time, > >>>>> it > >>>>>>>> should > >>>>>>>>>> stop > >>>>>>>>>>>>>>> acting as > >>>>>>>>>>>>>>>>> the leader. We have to solve this problem, > >>>>> anyway, in > >>>>>>>> order > >>>>>>>>>> to > >>>>>>>>>>>>> fix > >>>>>>>>>>>>>>> all the > >>>>>>>>>>>>>>>>> corner cases. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Not sure if I fully understand your proposal. The > >>>>>> proposal > >>>>>>>>>> seems to > >>>>>>>>>>>>>>> require > >>>>>>>>>>>>>>>> non-trivial changes to our existing leadership > >>>>> election > >>>>>>>>>> mechanism. > >>>>>>>>>>>>> Could > >>>>>>>>>>>>>>>> you provide more detail regarding how it works? > >> For > >>>>>>> example, > >>>>>>>>> how > >>>>>>>>>>>>> should > >>>>>>>>>>>>>>>> user choose this timeout, how leader determines > >>>>> whether > >>>>>> it > >>>>>>>> can > >>>>>>>>>> still > >>>>>>>>>>>>>>>> communicate with controller, and how this > >> triggers > >>>>>>>> controller > >>>>>>>>> to > >>>>>>>>>>>>> elect > >>>>>>>>>>>>>>> new > >>>>>>>>>>>>>>>> leader? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Before I come up with any proposal, let me make > >> sure > >>> I > >>>>>>>>> understand > >>>>>>>>>> the > >>>>>>>>>>>>>>> problem correctly. My big question was, what > >>> prevents > >>>>>>>>> split-brain > >>>>>>>>>>>>> here? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Let's say I have a partition which is on nodes A, > >> B, > >>>>> and > >>>>>> C, > >>>>>>>> with > >>>>>>>>>>>>> min-ISR > >>>>>>>>>>>>>>> 2. The controller is D. At some point, there is a > >>>>>> network > >>>>>>>>>> partition > >>>>>>>>>>>>>>> between A and B and the rest of the cluster. The > >>>>>> Controller > >>>>>>>>>>>>> re-assigns the > >>>>>>>>>>>>>>> partition to nodes C, D, and E. But A and B keep > >>>>> chugging > >>>>>>>> away, > >>>>>>>>>> even > >>>>>>>>>>>>>>> though they can no longer communicate with the > >>>>> controller. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> At some point, a client with stale metadata writes > >> to > >>>>> the > >>>>>>>>>> partition. > >>>>>>>>>>>>> It > >>>>>>>>>>>>>>> still thinks the partition is on node A, B, and C, > >> so > >>>>>> that's > >>>>>>>>>> where it > >>>>>>>>>>>>> sends > >>>>>>>>>>>>>>> the data. It's unable to talk to C, but A and B > >>> reply > >>>>>> back > >>>>>>>> that > >>>>>>>>>> all > >>>>>>>>>>>>> is > >>>>>>>>>>>>>>> well. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Is this not a case where we could lose data due to > >>>>> split > >>>>>>>> brain? > >>>>>>>>>> Or is > >>>>>>>>>>>>>>> there a mechanism for preventing this that I > >> missed? > >>>>> If > >>>>>> it > >>>>>>>> is, > >>>>>>>>> it > >>>>>>>>>>>>> seems > >>>>>>>>>>>>>>> like a pretty serious failure case that we should > >> be > >>>>>>> handling > >>>>>>>>>> with our > >>>>>>>>>>>>>>> metadata rework. And I think epoch numbers and > >>>>> timeouts > >>>>>>> might > >>>>>>>>> be > >>>>>>>>>>>>> part of > >>>>>>>>>>>>>>> the solution. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Right, split brain can happen if RF=4 and minIsr=2. > >>>>>> However, I > >>>>>>>> am > >>>>>>>>>> not > >>>>>>>>>>>>> sure > >>>>>>>>>>>>>> it is a pretty serious issue which we need to address > >>>>> today. > >>>>>>>> This > >>>>>>>>>> can be > >>>>>>>>>>>>>> prevented by configuring the Kafka topic so that > >>> minIsr > > >>>>>>> RF/2. > >>>>>>>>>>>>> Actually, > >>>>>>>>>>>>>> if user sets minIsr=2, is there anything reason that > >>> user > >>>>>>> wants > >>>>>>>> to > >>>>>>>>>> set > >>>>>>>>>>>>> RF=4 > >>>>>>>>>>>>>> instead of 4? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Introducing timeout in leader election mechanism is > >>>>>>>> non-trivial. I > >>>>>>>>>>>>> think we > >>>>>>>>>>>>>> probably want to do that only if there is good > >> use-case > >>>>> that > >>>>>>> can > >>>>>>>>> not > >>>>>>>>>>>>>> otherwise be addressed with the current mechanism. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I still would like to think about these corner cases > >>> more. > >>>>>> But > >>>>>>>>>> perhaps > >>>>>>>>>>>>> it's not directly related to this KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> regards, > >>>>>>>>>>>>> Colin > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> best, > >>>>>>>>>>>>>>> Colin > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> best, > >>>>>>>>>>>>>>>>> Colin > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>>>> Dong > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 10:39 AM, Colin > >> McCabe > >>> < > >>>>>>>>>>>>> cmcc...@apache.org> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Dong, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Thanks for proposing this KIP. I think a > >>>>> metadata > >>>>>>>> epoch > >>>>>>>>>> is a > >>>>>>>>>>>>>>> really > >>>>>>>>>>>>>>>>> good > >>>>>>>>>>>>>>>>>>> idea. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> I read through the DISCUSS thread, but I > >>> still > >>>>>> don't > >>>>>>>>> have > >>>>>>>>>> a > >>>>>>>>>>>>> clear > >>>>>>>>>>>>>>>>> picture > >>>>>>>>>>>>>>>>>>> of why the proposal uses a metadata epoch > >> per > >>>>>>>> partition > >>>>>>>>>> rather > >>>>>>>>>>>>>>> than a > >>>>>>>>>>>>>>>>>>> global metadata epoch. A metadata epoch > >> per > >>>>>>> partition > >>>>>>>>> is > >>>>>>>>>>>>> kind of > >>>>>>>>>>>>>>>>>>> unpleasant-- it's at least 4 extra bytes > >> per > >>>>>>> partition > >>>>>>>>>> that we > >>>>>>>>>>>>>>> have to > >>>>>>>>>>>>>>>>> send > >>>>>>>>>>>>>>>>>>> over the wire in every full metadata > >> request, > >>>>>> which > >>>>>>>>> could > >>>>>>>>>>>>> become > >>>>>>>>>>>>>>> extra > >>>>>>>>>>>>>>>>>>> kilobytes on the wire when the number of > >>>>>> partitions > >>>>>>>>>> becomes > >>>>>>>>>>>>> large. > >>>>>>>>>>>>>>>>> Plus, > >>>>>>>>>>>>>>>>>>> we have to update all the auxillary classes > >>> to > >>>>>>> include > >>>>>>>>> an > >>>>>>>>>>>>> epoch. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> We need to have a global metadata epoch > >>> anyway > >>>>> to > >>>>>>>> handle > >>>>>>>>>>>>> partition > >>>>>>>>>>>>>>>>>>> addition and deletion. For example, if I > >>> give > >>>>> you > >>>>>>>>>>>>>>>>>>> MetadataResponse{part1,epoch 1, part2, > >> epoch > >>> 1} > >>>>>> and > >>>>>>>>>> {part1, > >>>>>>>>>>>>>>> epoch1}, > >>>>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>> MetadataResponse is newer? You have no way > >>> of > >>>>>>>> knowing. > >>>>>>>>>> It > >>>>>>>>>>>>> could > >>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> part2 has just been created, and the > >> response > >>>>>> with 2 > >>>>>>>>>>>>> partitions is > >>>>>>>>>>>>>>>>> newer. > >>>>>>>>>>>>>>>>>>> Or it coudl be that part2 has just been > >>>>> deleted, > >>>>>> and > >>>>>>>>>>>>> therefore the > >>>>>>>>>>>>>>>>> response > >>>>>>>>>>>>>>>>>>> with 1 partition is newer. You must have a > >>>>> global > >>>>>>>> epoch > >>>>>>>>>> to > >>>>>>>>>>>>>>>>> disambiguate > >>>>>>>>>>>>>>>>>>> these two cases. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Previously, I worked on the Ceph > >> distributed > >>>>>>>> filesystem. > >>>>>>>>>>>>> Ceph had > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> concept of a map of the whole cluster, > >>>>> maintained > >>>>>>> by a > >>>>>>>>> few > >>>>>>>>>>>>> servers > >>>>>>>>>>>>>>>>> doing > >>>>>>>>>>>>>>>>>>> paxos. This map was versioned by a single > >>>>> 64-bit > >>>>>>>> epoch > >>>>>>>>>> number > >>>>>>>>>>>>>>> which > >>>>>>>>>>>>>>>>>>> increased on every change. It was > >> propagated > >>>>> to > >>>>>>>> clients > >>>>>>>>>>>>> through > >>>>>>>>>>>>>>>>> gossip. I > >>>>>>>>>>>>>>>>>>> wonder if something similar could work > >> here? > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> It seems like the the Kafka > >> MetadataResponse > >>>>>> serves > >>>>>>>> two > >>>>>>>>>>>>> somewhat > >>>>>>>>>>>>>>>>> unrelated > >>>>>>>>>>>>>>>>>>> purposes. Firstly, it lets clients know > >> what > >>>>>>>> partitions > >>>>>>>>>>>>> exist in > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> system and where they live. Secondly, it > >>> lets > >>>>>>> clients > >>>>>>>>>> know > >>>>>>>>>>>>> which > >>>>>>>>>>>>>>> nodes > >>>>>>>>>>>>>>>>>>> within the partition are in-sync (in the > >> ISR) > >>>>> and > >>>>>>>> which > >>>>>>>>>> node > >>>>>>>>>>>>> is the > >>>>>>>>>>>>>>>>> leader. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> The first purpose is what you really need a > >>>>>> metadata > >>>>>>>>> epoch > >>>>>>>>>>>>> for, I > >>>>>>>>>>>>>>>>> think. > >>>>>>>>>>>>>>>>>>> You want to know whether a partition exists > >>> or > >>>>>> not, > >>>>>>> or > >>>>>>>>> you > >>>>>>>>>>>>> want to > >>>>>>>>>>>>>>> know > >>>>>>>>>>>>>>>>>>> which nodes you should talk to in order to > >>>>> write > >>>>>> to > >>>>>>> a > >>>>>>>>>> given > >>>>>>>>>>>>>>>>> partition. A > >>>>>>>>>>>>>>>>>>> single metadata epoch for the whole > >> response > >>>>>> should > >>>>>>> be > >>>>>>>>>>>>> adequate > >>>>>>>>>>>>>>> here. > >>>>>>>>>>>>>>>>> We > >>>>>>>>>>>>>>>>>>> should not change the partition assignment > >>>>> without > >>>>>>>> going > >>>>>>>>>>>>> through > >>>>>>>>>>>>>>>>> zookeeper > >>>>>>>>>>>>>>>>>>> (or a similar system), and this inherently > >>>>>>> serializes > >>>>>>>>>> updates > >>>>>>>>>>>>> into > >>>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>>> numbered stream. Brokers should also stop > >>>>>>> responding > >>>>>>>> to > >>>>>>>>>>>>> requests > >>>>>>>>>>>>>>> when > >>>>>>>>>>>>>>>>> they > >>>>>>>>>>>>>>>>>>> are unable to contact ZK for a certain time > >>>>>> period. > >>>>>>>>> This > >>>>>>>>>>>>> prevents > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> case > >>>>>>>>>>>>>>>>>>> where a given partition has been moved off > >>> some > >>>>>> set > >>>>>>> of > >>>>>>>>>> nodes, > >>>>>>>>>>>>> but a > >>>>>>>>>>>>>>>>> client > >>>>>>>>>>>>>>>>>>> still ends up talking to those nodes and > >>>>> writing > >>>>>>> data > >>>>>>>>>> there. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> For the second purpose, this is "soft > >> state" > >>>>>> anyway. > >>>>>>>> If > >>>>>>>>>> the > >>>>>>>>>>>>> client > >>>>>>>>>>>>>>>>> thinks > >>>>>>>>>>>>>>>>>>> X is the leader but Y is really the leader, > >>> the > >>>>>>> client > >>>>>>>>>> will > >>>>>>>>>>>>> talk > >>>>>>>>>>>>>>> to X, > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> X will point out its mistake by sending > >> back > >>> a > >>>>>>>>>>>>>>>>> NOT_LEADER_FOR_PARTITION. > >>>>>>>>>>>>>>>>>>> Then the client can update its metadata > >> again > >>>>> and > >>>>>>> find > >>>>>>>>>> the new > >>>>>>>>>>>>>>> leader, > >>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>> there is one. There is no need for an > >> epoch > >>> to > >>>>>>> handle > >>>>>>>>>> this. > >>>>>>>>>>>>>>>>> Similarly, I > >>>>>>>>>>>>>>>>>>> can't think of a reason why changing the > >>>>> in-sync > >>>>>>>> replica > >>>>>>>>>> set > >>>>>>>>>>>>> needs > >>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>> bump > >>>>>>>>>>>>>>>>>>> the epoch. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> best, > >>>>>>>>>>>>>>>>>>> Colin > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018, at 09:45, Dong Lin > >>> wrote: > >>>>>>>>>>>>>>>>>>>> Thanks much for reviewing the KIP! > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Dong > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 7:10 AM, Guozhang > >>>>> Wang < > >>>>>>>>>>>>>>> wangg...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Yeah that makes sense, again I'm just > >>>>> making > >>>>>>> sure > >>>>>>>> we > >>>>>>>>>>>>> understand > >>>>>>>>>>>>>>>>> all the > >>>>>>>>>>>>>>>>>>>>> scenarios and what to expect. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I agree that if, more generally > >> speaking, > >>>>> say > >>>>>>>> users > >>>>>>>>>> have > >>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>> consumed > >>>>>>>>>>>>>>>>>>> to > >>>>>>>>>>>>>>>>>>>>> offset 8, and then call seek(16) to > >>> "jump" > >>>>> to > >>>>>> a > >>>>>>>>>> further > >>>>>>>>>>>>>>> position, > >>>>>>>>>>>>>>>>> then > >>>>>>>>>>>>>>>>>>> she > >>>>>>>>>>>>>>>>>>>>> needs to be aware that OORE maybe > >> thrown > >>>>> and > >>>>>> she > >>>>>>>>>> needs to > >>>>>>>>>>>>>>> handle > >>>>>>>>>>>>>>>>> it or > >>>>>>>>>>>>>>>>>>> rely > >>>>>>>>>>>>>>>>>>>>> on reset policy which should not > >> surprise > >>>>> her. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I'm +1 on the KIP. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On Wed, Jan 24, 2018 at 12:31 AM, Dong > >>> Lin > >>>>> < > >>>>>>>>>>>>>>> lindon...@gmail.com> > >>>>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> Yes, in general we can not prevent > >>>>>>>>>>>>> OffsetOutOfRangeException > >>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>>>> seeks > >>>>>>>>>>>>>>>>>>>>>> to a wrong offset. The main goal is > >> to > >>>>>> prevent > >>>>>>>>>>>>>>>>>>> OffsetOutOfRangeException > >>>>>>>>>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>>>>>>>> user has done things in the right > >> way, > >>>>> e.g. > >>>>>>> user > >>>>>>>>>> should > >>>>>>>>>>>>> know > >>>>>>>>>>>>>>> that > >>>>>>>>>>>>>>>>>>> there > >>>>>>>>>>>>>>>>>>>>> is > >>>>>>>>>>>>>>>>>>>>>> message with this offset. > >>>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>>> For example, if user calls seek(..) > >>> right > >>>>>>> after > >>>>>>>>>>>>>>> construction, the > >>>>>>>>>>>>>>>>>>> only > >>>>>>>>>>>>>>>>>>>>>> reason I can think of is that user > >>> stores > >>>>>>> offset > >>>>>>>>>>>>> externally. > >>>>>>>>>>>>>>> In > >>>>>>>>>>>>>>>>> this > >>>>>>>>>>>>>>>>>>>>> case, > >>>>>>>>>>>>>>>>>>>>>> user currently needs to use the > >> offset > >>>>> which > >>>>>>> is > >>>>>>>>>> obtained > >>>>>>>>>>>>>>> using > >>>>>>>>>>>>>>>>>>>>> position(..) > >>>>>>>>>>>>>>>>>>>>>> from the last run. With this KIP, > >> user > >>>>> needs > >>>>>>> to > >>>>>>>>> get > >>>>>>>>>> the > >>>>>>>>>>>>>>> offset > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>>>> offsetEpoch using > >>>>>> positionAndOffsetEpoch(...) > >>>>>>>> and > >>>>>>>>>> stores > >>>>>>>>>>>>>>> these > >>>>>>>>>>>>>>>>>>>>> information > >>>>>>>>>>>>>>>>>>>>>> externally. The next time user starts > >>>>>>> consumer, > >>>>>>>>>> he/she > >>>>>>>>>>>>> needs > >>>>>>>>>>>>>>> to > >>>>>