Thanks Jun! The time works for me.
On Thu, 5 Apr 2018 at 4:34 AM Jun Rao <j...@confluent.io> wrote: > Hi, Jan, Dong, John, Guozhang, > > Perhaps it will be useful to have a KIP meeting to discuss this together as > a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out > an invite to the mailing list. > > Thanks, > > Jun > > > On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com> > wrote: > > > Want to quickly step in here again because it is going places again. > > > > The last part of the discussion is just a pain to read and completely > > diverged from what I suggested without making the reasons clear to me. > > > > I don't know why this happens.... here are my comments anyway. > > > > @Guozhang: That Streams is working on automatic creating > > copartition-usuable topics: great for streams, has literally nothing todo > > with the KIP as we want to grow the > > input topic. Everyone can reshuffle rel. easily but that is not what we > > need todo, we need to grow the topic in question. After streams > > automatically reshuffled the input topic > > still has the same size and it didn't help a bit. I fail to see why this > > is relevant. What am i missing here? > > > > @Dong > > I am still on the position that the current proposal brings us into the > > wrong direction. Especially introducing PartitionKeyRebalanceListener > > From this point we can never move away to proper state full handling > > without completely deprecating this creature from hell again. > > Linear hashing is not the optimising step we have todo here. An interface > > that when a topic is a topic its always the same even after it had > > grown or shrunk is important. So from my POV I have major concerns that > > this KIP is benefitial in its current state. > > > > What is it that makes everyone so addicted to the idea of linear hashing? > > not attractive at all for me. > > And with statefull consumers still a complete mess. Why not stick with > the > > Kappa architecture??? > > > > > > > > > > > > On 03.04.2018 17:38, Dong Lin wrote: > > > >> Hey John, > >> > >> Thanks much for your comments!! > >> > >> I have yet to go through the emails of John/Jun/Guozhang in detail. But > >> let > >> me present my idea for how to minimize the delay for state loading for > >> stream use-case. > >> > >> For ease of understanding, let's assume that the initial partition > number > >> of input topics and change log topic are both 10. And initial number of > >> stream processor is also 10. If we only increase initial partition > number > >> of input topics to 15 without changing number of stream processor, the > >> current KIP already guarantees in-order delivery and no state needs to > be > >> moved between consumers for stream use-case. Next, let's say we want to > >> increase the number of processor to expand the processing capacity for > >> stream use-case. This requires us to move state between processors which > >> will take time. Our goal is to minimize the impact (i.e. delay) for > >> processing while we increase the number of processors. > >> > >> Note that stream processor generally includes both consumer and > producer. > >> In addition to consume from the input topic, consumer may also need to > >> consume from change log topic on startup for recovery. And producer may > >> produce state to the change log topic. > >> > >> > > The solution will include the following steps: > >> > >> 1) Increase partition number of the input topic from 10 to 15. Since the > >> messages with the same key will still go to the same consume before and > >> after the partition expansion, this step can be done without having to > >> move > >> state between processors. > >> > >> 2) Increase partition number of the change log topic from 10 to 15. Note > >> that this step can also be done without impacting existing workflow. > After > >> we increase partition number of the change log topic, key space may > split > >> and some key will be produced to the newly-added partition. But the same > >> key will still go to the same processor (i.e. consumer) before and after > >> the partition. Thus this step can also be done without having to move > >> state > >> between processors. > >> > >> 3) Now, let's add 5 new consumers whose groupId is different from the > >> existing processor's groupId. Thus these new consumers will not impact > >> existing workflow. Each of these new consumers should consume two > >> partitions from the earliest offset, where these two partitions are the > >> same partitions that will be consumed if the consumers have the same > >> groupId as the existing processor's groupId. For example, the first of > the > >> five consumers will consume partition 0 and partition 10. The purpose of > >> these consumers is to rebuild the state (e.g. RocksDB) for the > processors > >> in advance. Also note that, by design of the current KIP, each consume > >> will > >> consume the existing partition of the change log topic up to the offset > >> before the partition expansion. Then they will only need to consume the > >> state of the new partition of the change log topic. > >> > >> 4) After consumers have caught up in step 3), we should stop these > >> consumers and add 5 new processors to the stream processing job. These 5 > >> new processors should run in the same location as the previous 5 > consumers > >> to re-use the state (e.g. RocksDB). And these processors' consumers > should > >> consume partitions of the change log topic from the committed offset the > >> previous 5 consumers so that no state is missed. > >> > >> One important trick to note here is that, the mapping from partition to > >> consumer should also use linear hashing. And we need to remember the > >> initial number of processors in the job, 10 in this example, and use > this > >> number in the linear hashing algorithm. This is pretty much the same as > >> how > >> we use linear hashing to map key to partition. In this case, we get an > >> identity map from partition -> processor, for both input topic and the > >> change log topic. For example, processor 12 will consume partition 12 of > >> the input topic and produce state to the partition 12 of the change log > >> topic. > >> > >> There are a few important properties of this solution to note: > >> > >> - We can increase the number of partitions for input topic and the > change > >> log topic in any order asynchronously. > >> - The expansion of the processors in a given job in step 4) only > requires > >> the step 3) for the same job. It does not require coordination across > >> different jobs for step 3) and 4). Thus different jobs can independently > >> expand there capacity without waiting for each other. > >> - The logic for 1) and 2) is already supported in the current KIP. The > >> logic for 3) and 4) appears to be independent of the core Kafka logic > and > >> can be implemented separately outside core Kafka. Thus the current KIP > is > >> probably sufficient after we agree on the efficiency and the correctness > >> of > >> the solution. We can have a separate KIP for Kafka Stream to support 3) > >> and > >> 4). > >> > >> > >> Cheers, > >> Dong > >> > >> > >> On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >> Hey guys, just sharing my two cents here (I promise it will be shorter > >>> than > >>> John's article :). > >>> > >>> 0. Just to quickly recap, the main discussion point now is how to > support > >>> "key partitioning preservation" (John's #4 in topic characteristics > >>> above) > >>> beyond the "single-key ordering preservation" that KIP-253 was > originally > >>> proposed to maintain (John's #6 above). > >>> > >>> 1. From the streams project, we are actively working on improving the > >>> elastic scalability of the library. One of the key features is to > >>> decouple > >>> the input topics from the parallelism model of Streams: i.e. not > >>> enforcing > >>> the topic to be partitioned by the key, not enforcing joining topics to > >>> be > >>> co-partitioned, not relying the number of parallel tasks on the input > >>> topic > >>> partitions. This can be achieved by re-shuffling on the input topics to > >>> make sure key-partitioning / co-partitioning on the internal topics. > Note > >>> the re-shuffling task is purely stateless and hence does not require > "key > >>> partitioning preservation". Operational-wise it is similar to the > >>> "creating > >>> a new topic with new number of partitions, pipe the data to the new > topic > >>> and cut over consumers from old topics" idea, just that users can > >>> optionally let Streams to handle such rather than doing it manually > >>> themselves. There are a few more details on that regard but I will skip > >>> since they are not directly related to this discussion. > >>> > >>> 2. Assuming that 1) above is done, then the only topics involved in the > >>> scaling events are all input topics. For these topics the only > producers > >>> / > >>> consumers of these topics are controlled by Streams clients themselves, > >>> and > >>> hence achieving "key partitioning preservation" is simpler than > >>> non-Streams > >>> scenarios: consumers know the partitioning scheme that producers are > >>> using, > >>> so that for their stateful operations it is doable to split the local > >>> state > >>> stores accordingly or execute backfilling on its own. Of course, if we > >>> decide to do server-side backfilling, it can still help Streams to > >>> directly > >>> rely on that functionality. > >>> > >>> 3. As John mentioned, another way inside Streams is to do > >>> over-partitioning > >>> on all internal topics; then with 1) Streams would not rely on KIP-253 > at > >>> all. But personally I'd like to avoid it if possible to reduce Kafka > side > >>> footprint: say we overpartition each input topic up to 1k, with a > >>> reasonable sized stateful topology it can still contribute to tens of > >>> thousands of topics to the topic partition capacity of a single > cluster. > >>> > >>> 4. Summing up 1/2/3, I think we should focus more on non-Streams users > >>> writing their stateful computations with local states, and think > whether > >>> / > >>> how we could enable "key partitioning preservation" for them easily, > than > >>> to think heavily for Streams library. People may have different opinion > >>> on > >>> how common of a usage pattern it is (I think Jun might be suggesting > that > >>> for DIY apps people may more likely use remote states so that it is > not a > >>> problem for them). My opinion is that for non-Streams users such usage > >>> pattern could still be large (think: if you are piping data from Kafka > to > >>> an external data storage which has single-writer requirements for each > >>> single shard, even though it is not a stateful computational > application > >>> it > >>> may still require "key partitioning preservation"), so I prefer to have > >>> backfilling in our KIP than only exposing the API for expansion and > >>> requires consumers to have pre-knowledge of the producer's partitioning > >>> scheme. > >>> > >>> > >>> > >>> Guozhang > >>> > >>> > >>> > >>> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> > wrote: > >>> > >>> Hey Dong, > >>>> > >>>> Congrats on becoming a committer!!! > >>>> > >>>> Since I just sent a novel-length email, I'll try and keep this one > brief > >>>> > >>> ;) > >>> > >>>> Regarding producer coordination, I'll grant that in that case, > producers > >>>> may coordinate among themselves to produce into the same topic or to > >>>> produce co-partitioned topics. Nothing in KStreams or the Kafka > >>>> ecosystem > >>>> in general requires such coordination for correctness or in fact for > any > >>>> optional features, though, so I would not say that we require producer > >>>> coordination of partition logic. If producers currently coordinate, > it's > >>>> completely optional and their own choice. > >>>> > >>>> Regarding the portability of partition algorithms, my observation is > >>>> that > >>>> systems requiring independent implementations of the same algorithm > with > >>>> 100% correctness are a large source of risk and also a burden on those > >>>> > >>> who > >>> > >>>> have to maintain them. If people could flawlessly implement algorithms > >>>> in > >>>> actual software, the world would be a wonderful place indeed! For a > >>>> > >>> system > >>> > >>>> as important and widespread as Kafka, I would recommend restricting > >>>> limiting such requirements as aggressively as possible. > >>>> > >>>> I'd agree that we can always revisit decisions like allowing arbitrary > >>>> partition functions, but of course, we shouldn't do that in a vacuum. > >>>> > >>> That > >>> > >>>> feels like the kind of thing we'd need to proactively seek guidance > from > >>>> the users list about. I do think that the general approach of saying > >>>> that > >>>> "if you use a custom partitioner, you cannot do partition expansion" > is > >>>> very reasonable (but I don't think we need to go that far with the > >>>> > >>> current > >>> > >>>> proposal). It's similar to my statement in my email to Jun that in > >>>> principle KStreams doesn't *need* backfill, we only need it if we want > >>>> to > >>>> employ partition expansion. > >>>> > >>>> I reckon that the main motivation for backfill is to support KStreams > >>>> use > >>>> cases and also any other use cases involving stateful consumers. > >>>> > >>>> Thanks for your response, and congrats again! > >>>> -John > >>>> > >>>> > >>>> On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> > wrote: > >>>> > >>>> Hey John, > >>>>> > >>>>> Great! Thanks for all the comment. It seems that we agree that the > >>>>> > >>>> current > >>>> > >>>>> KIP is in good shape for core Kafka. IMO, what we have been > discussing > >>>>> > >>>> in > >>> > >>>> the recent email exchanges is mostly about the second step, i.e. how > to > >>>>> address problem for the stream use-case (or stateful processing in > >>>>> general). > >>>>> > >>>>> I will comment inline. > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io> > >>>>> > >>>> wrote: > >>> > >>>> Thanks for the response, Dong. > >>>>>> > >>>>>> Here are my answers to your questions: > >>>>>> > >>>>>> - "Asking producers and consumers, or even two different producers, > >>>>>> > >>>>> to > >>> > >>>> share code like the partition function is a pretty huge ask. What > >>>>>>> > >>>>>> if > >>> > >>>> they > >>>>> > >>>>>> are using different languages?". It seems that today we already > >>>>>>> > >>>>>> require > >>>> > >>>>> different producer's to use the same hash function -- otherwise > >>>>>>> > >>>>>> messages > >>>>> > >>>>>> with the same key will go to different partitions of the same topic > >>>>>>> > >>>>>> which > >>>>> > >>>>>> may cause problem for downstream consumption. So not sure if it > >>>>>>> > >>>>>> adds > >>> > >>>> any > >>>>> > >>>>>> more constraint by assuming consumers know the hash function of > >>>>>>> > >>>>>> producer. > >>>>> > >>>>>> Could you explain more why user would want to use a cusmtom > >>>>>>> > >>>>>> partition > >>> > >>>> function? Maybe we can check if this is something that can be > >>>>>>> > >>>>>> supported > >>>> > >>>>> in > >>>>>> > >>>>>>> the default Kafka hash function. Also, can you explain more why it > >>>>>>> > >>>>>> is > >>> > >>>> difficuilt to implement the same hash function in different > >>>>>>> > >>>>>> languages? > >>>> > >>>>> > >>>>>> Sorry, I meant two different producers as in producers to two > >>>>>> > >>>>> different > >>> > >>>> topics. This was in response to the suggestion that we already > >>>>>> > >>>>> require > >>> > >>>> coordination among producers to different topics in order to achieve > >>>>>> co-partitioning. I was saying that we do not (and should not). > >>>>>> > >>>>> > >>>>> It is probably common for producers of different team to produce > >>>>> > >>>> message > >>> > >>>> to > >>>> > >>>>> the same topic. In order to ensure that messages with the same key go > >>>>> > >>>> to > >>> > >>>> same partition, we need producers of different team to share the same > >>>>> partition algorithm, which by definition requires coordination among > >>>>> producers of different teams in an organization. Even for producers > of > >>>>> different topics, it may be common to require producers to use the > same > >>>>> partition algorithm in order to join two topics for stream > processing. > >>>>> > >>>> Does > >>>> > >>>>> this make it reasonable to say we already require coordination across > >>>>> producers? > >>>>> > >>>>> > >>>>> By design, consumers are currently ignorant of the partitioning > >>>>>> > >>>>> scheme. > >>> > >>>> It > >>>>> > >>>>>> suffices to trust that the producer has partitioned the topic by > key, > >>>>>> > >>>>> if > >>>> > >>>>> they claim to have done so. If you don't trust that, or even if you > >>>>>> > >>>>> just > >>>> > >>>>> need some other partitioning scheme, then you must re-partition it > >>>>>> yourself. Nothing we're discussing can or should change that. The > >>>>>> > >>>>> value > >>> > >>>> of > >>>>> > >>>>>> backfill is that it preserves the ability for consumers to avoid > >>>>>> re-partitioning before consuming, in the case where they don't need > >>>>>> > >>>>> to > >>> > >>>> today. > >>>>>> > >>>>> > >>>>> Regarding shared "hash functions", note that it's a bit inaccurate to > >>>>>> > >>>>> talk > >>>>> > >>>>>> about the "hash function" of the producer. Properly speaking, the > >>>>>> > >>>>> producer > >>>>> > >>>>>> has only a "partition function". We do not know that it is a hash. > >>>>>> > >>>>> The > >>> > >>>> producer can use any method at their disposal to assign a partition > >>>>>> > >>>>> to > >>> > >>>> a > >>>> > >>>>> record. The partition function obviously may we written in any > >>>>>> > >>>>> programming > >>>>> > >>>>>> language, so in general it's not something that can be shared around > >>>>>> without a formal spec or the ability to execute arbitrary > executables > >>>>>> > >>>>> in > >>>> > >>>>> arbitrary runtime environments. > >>>>>> > >>>>>> Yeah it is probably better to say partition algorithm. I guess it > >>>>> > >>>> should > >>> > >>>> not be difficult to implement same partition algorithms in different > >>>>> languages, right? Yes we would need a formal specification of the > >>>>> > >>>> default > >>> > >>>> partition algorithm in the producer. I think that can be documented as > >>>>> > >>>> part > >>>> > >>>>> of the producer interface. > >>>>> > >>>>> > >>>>> Why would a producer want a custom partition function? I don't > >>>>>> > >>>>> know... > >>> > >>>> why > >>>>> > >>>>>> did we design the interface so that our users can provide one? In > >>>>>> > >>>>> general, > >>>>> > >>>>>> such systems provide custom partitioners because some data sets may > >>>>>> > >>>>> be > >>> > >>>> unbalanced under the default or because they can provide some > >>>>>> > >>>>> interesting > >>>> > >>>>> functionality built on top of the partitioning scheme, etc. Having > >>>>>> > >>>>> provided > >>>>> > >>>>>> this ability, I don't know why we would remove it. > >>>>>> > >>>>>> Yeah it is reasonable to assume that there was reason to support > >>>>> custom > >>>>> partition function in producer. On the other hand it may also be > >>>>> > >>>> reasonable > >>>> > >>>>> to revisit this interface and discuss whether we actually need to > >>>>> > >>>> support > >>> > >>>> custom partition function. If we don't have a good reason, we can > >>>>> > >>>> choose > >>> > >>>> not to support custom partition function in this KIP in a backward > >>>>> compatible manner, i.e. user can still use custom partition function > >>>>> > >>>> but > >>> > >>>> they would not get the benefit of in-order delivery when there is > >>>>> > >>>> partition > >>>> > >>>>> expansion. What do you think? > >>>>> > >>>>> > >>>>> - Besides the assumption that consumer needs to share the hash > >>>>>> > >>>>> function > >>> > >>>> of > >>>>> > >>>>>> producer, is there other organization overhead of the proposal in > >>>>>>> > >>>>>> the > >>> > >>>> current KIP? > >>>>>>> > >>>>>>> It wasn't clear to me that KIP-253 currently required the producer > >>>>>> > >>>>> and > >>> > >>>> consumer to share the partition function, or in fact that it had a > >>>>>> > >>>>> hard > >>> > >>>> requirement to abandon the general partition function and use a > >>>>>> > >>>>> linear > >>> > >>>> hash > >>>>> > >>>>>> function instead. > >>>>>> > >>>>> > >>>>> In my reading, there is a requirement to track the metadata about > >>>>>> > >>>>> what > >>> > >>>> partitions split into what other partitions during an expansion > >>>>>> > >>>>> operation. > >>>>> > >>>>>> If the partition function is linear, this is easy. If not, you can > >>>>>> > >>>>> always > >>>> > >>>>> just record that all old partitions split into all new partitions. > >>>>>> > >>>>> This > >>> > >>>> has > >>>>> > >>>>>> the effect of forcing all consumers to wait until the old epoch is > >>>>>> completely consumed before starting on the new epoch. But this may > >>>>>> > >>>>> be a > >>> > >>>> reasonable tradeoff, and it doesn't otherwise alter your design. > >>>>>> > >>>>>> You only mention the consumer needing to know that the partition > >>>>>> > >>>>> function > >>>> > >>>>> is linear, not what the actual function is, so I don't think your > >>>>>> > >>>>> design > >>>> > >>>>> actually calls for sharing the function. Plus, really all the > >>>>>> > >>>>> consumer > >>> > >>>> needs is the metadata about what old-epoch partitions to wait for > >>>>>> > >>>>> before > >>>> > >>>>> consuming a new-epoch partition. This information is directly > >>>>>> > >>>>> captured > >>> > >>>> in > >>>> > >>>>> metadata, so I don't think it actually even cares whether the > >>>>>> > >>>>> partition > >>> > >>>> function is linear or not. > >>>>>> > >>>>>> You are right that the current KIP does not mention it. My comment > >>>>> > >>>> related > >>>> > >>>>> to the partition function coordination was related to support the > >>>>> stream-use case which we have been discussing so far. > >>>>> > >>>>> > >>>>> So, no, I really think KIP-253 is in good shape. I was really more > >>>>>> > >>>>> talking > >>>>> > >>>>>> about the part of this thread that's outside of KIP-253's scope, > >>>>>> > >>>>> namely, > >>>> > >>>>> creating the possibility of backfilling partitions after expansion. > >>>>>> > >>>>>> Great! Can you also confirm that the main motivation for backfilling > >>>>> partitions after expansion is to support the stream use-case? > >>>>> > >>>>> > >>>>> - Currently producer can forget about the message that has been > >>>>>> > >>>>>>> acknowledged by the broker. Thus the producer probably does not > >>>>>>> > >>>>>> know > >>> > >>>> most > >>>>> > >>>>>> of the exiting messages in topic, including those messages produced > >>>>>>> > >>>>>> by > >>>> > >>>>> other producers. We can have the owner of the producer to > >>>>>>> > >>>>>> split+backfill. > >>>>> > >>>>>> In my opion it will be a new program that wraps around the existing > >>>>>>> producer and consumer classes. > >>>>>>> > >>>>>>> This sounds fine by me! > >>>>>> > >>>>>> Really, I was just emphasizing that the part of the organization > that > >>>>>> produces a topic shouldn't have to export their partition function > to > >>>>>> > >>>>> the > >>>> > >>>>> part(s) of the organization (or other organizations) that consume the > >>>>>> topic. Whether the backfill operation goes into the Producer > >>>>>> > >>>>> interface > >>> > >>>> is > >>>> > >>>>> secondary, I think. > >>>>>> > >>>>>> - Regarding point 5. The argument is in favor of the split+backfill > >>>>>> > >>>>> but > >>> > >>>> for > >>>>> > >>>>>> changelog topic. And it intends to address the problem for stream > >>>>>>> > >>>>>> use-case > >>>>>> > >>>>>>> in general. In this KIP we will provide interface (i.e. > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream > >>>>>>> > >>>>>> use-case > >>>> > >>>>> and > >>>>>> > >>>>>>> the goal is that user can flush/re-consume the state as part of the > >>>>>>> interface implementation regardless of whether there is change log > >>>>>>> > >>>>>> topic. > >>>>> > >>>>>> Maybe you are suggesting that the main reason to do split+backfill > >>>>>>> > >>>>>> of > >>> > >>>> input > >>>>>> > >>>>>>> topic is to support log compacted topics? You mentioned in Point 1 > >>>>>>> > >>>>>> that > >>>> > >>>>> log > >>>>>> > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could > >>>>>>> > >>>>>> understand > >>>>>> > >>>>>>> your position better. Regarding Jan's proposal to split partitions > >>>>>>> > >>>>>> with > >>>> > >>>>> backfill, do you think this should replace the proposal in the > >>>>>>> > >>>>>> existing > >>>> > >>>>> KIP, or do you think this is something that we should do in > >>>>>>> > >>>>>> addition > >>> > >>>> to > >>>> > >>>>> the > >>>>>> > >>>>>>> existing KIP? > >>>>>>> > >>>>>>> I think that interface is a good/necessary component of KIP-253. > >>>>>> > >>>>>> I personally (FWIW) feel that KIP-253 is appropriately scoped, but I > >>>>>> > >>>>> do > >>> > >>>> think its utility will be limited unless there is a later KIP > >>>>>> > >>>>> offering > >>> > >>>> backfill. But, maybe unlike Jan, I think it makes sense to try and > >>>>>> > >>>>> tackle > >>>> > >>>>> the ordering problem independently of backfill, so I'm in support of > >>>>>> > >>>>> the > >>>> > >>>>> current KIP. > >>>>>> > >>>>>> - Regarding point 6. I guess we can agree that it is better not to > >>>>>> > >>>>> have > >>> > >>>> the > >>>>> > >>>>>> performance overhread of copying the input data. Before we discuss > >>>>>>> > >>>>>> more > >>>> > >>>>> on > >>>>>> > >>>>>>> whether the performance overhead is acceptable or not, I am trying > >>>>>>> > >>>>>> to > >>> > >>>> figure out what is the benefit of introducing this overhread. You > >>>>>>> > >>>>>> mentioned > >>>>>> > >>>>>>> that the benefit is the loose organizational coupling. By > >>>>>>> > >>>>>> "organizational > >>>>> > >>>>>> coupling", are you referring to the requirement that consumer needs > >>>>>>> > >>>>>> to > >>>> > >>>>> know > >>>>>> > >>>>>>> the hash function of producer? If so, maybe we can discuss the > >>>>>>> > >>>>>> use-case > >>>> > >>>>> of > >>>>>> > >>>>>>> custom partiton function and see whether we can find a way to > >>>>>>> > >>>>>> support > >>> > >>>> such > >>>>>> > >>>>>>> use-case without having to copy the input data. > >>>>>>> > >>>>>>> I'm not too sure about what an "input" is in this sense, since we > are > >>>>>> > >>>>> just > >>>>> > >>>>>> talking about topics. Actually the point I was making there is that > >>>>>> > >>>>> AKAICT > >>>>> > >>>>>> the performance overhead of a backfill is less than any other > option, > >>>>>> assuming you split partitions rarely. > >>>>>> > >>>>>> By "input" I was referring to source Kafka topic of a stream > >>>>> processing > >>>>> job. > >>>>> > >>>>> > >>>>> Separately, yes, "organizational coupling" increases if producers and > >>>>>> consumers have to share code, such as the partition function. This > >>>>>> > >>>>> would > >>>> > >>>>> not be the case if producers could only pick from a menu of a few > >>>>>> well-known partition functions, but I think this is a poor tradeoff. > >>>>>> > >>>>>> Maybe we can revisit the custom partition function and see whether > we > >>>>> actually need it? Otherwise, I am concerned that every user will pay > >>>>> > >>>> the > >>> > >>>> overhead of data movement to support something that was not really > >>>>> > >>>> needed > >>> > >>>> for most users. > >>>>> > >>>>> > >>>>> To me, this is two strong arguments in favor of backfill being less > >>>>>> expensive than no backfill, but again, I think that particular > debate > >>>>>> > >>>>> comes > >>>>> > >>>>>> after KIP-253, so I don't want to create the impression of > opposition > >>>>>> > >>>>> to > >>>> > >>>>> your proposal. > >>>>>> > >>>>>> > >>>>>> Finally, to respond to a new email I just noticed: > >>>>>> > >>>>>> BTW, here is my understanding of the scope of this KIP. We want to > >>>>>>> > >>>>>> allow > >>>>> > >>>>>> consumers to always consume messages with the same key from the > >>>>>>> > >>>>>> same > >>> > >>>> producer in the order they are produced. And we need to provide a > >>>>>>> > >>>>>> way > >>> > >>>> for > >>>>> > >>>>>> stream use-case to be able to flush/load state when messages with > >>>>>>> > >>>>>> the > >>> > >>>> same > >>>>>> > >>>>>>> key are migrated between consumers. In addition to ensuring that > >>>>>>> > >>>>>> this > >>> > >>>> goal > >>>>>> > >>>>>>> is correctly supported, we should do our best to keep the > >>>>>>> > >>>>>> performance > >>> > >>>> and > >>>>> > >>>>>> organization overhead of this KIP as low as possible. > >>>>>>> > >>>>>>> I think we're on the same page there! In fact, I would generalize a > >>>>>> > >>>>> little > >>>>> > >>>>>> more and say that the mechanism you've designed provides *all > >>>>>> > >>>>> consumers* > >>>> > >>>>> the ability "to flush/load state when messages with the same key are > >>>>>> migrated between consumers", not just Streams. > >>>>>> > >>>>>> Thanks for all the comment! > >>>>> > >>>>> > >>>>> Thanks for the discussion, > >>>>>> -John > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> > >>>>>> > >>>>> wrote: > >>> > >>>> Hey John, > >>>>>>> > >>>>>>> Thanks much for the detailed comments. Here are my thoughts: > >>>>>>> > >>>>>>> - The need to delete messages from log compacted topics is mainly > >>>>>>> > >>>>>> for > >>> > >>>> performance (e.g. storage space) optimization than for correctness > >>>>>>> > >>>>>> for > >>>> > >>>>> this > >>>>>> > >>>>>>> KIP. I agree that we probably don't need to focus on this in our > >>>>>>> > >>>>>> discussion > >>>>>> > >>>>>>> since it is mostly for performance optimization. > >>>>>>> > >>>>>>> - "Asking producers and consumers, or even two different producers, > >>>>>>> > >>>>>> to > >>>> > >>>>> share code like the partition function is a pretty huge ask. What > >>>>>>> > >>>>>> if > >>> > >>>> they > >>>>> > >>>>>> are using different languages?". It seems that today we already > >>>>>>> > >>>>>> require > >>>> > >>>>> different producer's to use the same hash function -- otherwise > >>>>>>> > >>>>>> messages > >>>>> > >>>>>> with the same key will go to different partitions of the same topic > >>>>>>> > >>>>>> which > >>>>> > >>>>>> may cause problem for downstream consumption. So not sure if it > >>>>>>> > >>>>>> adds > >>> > >>>> any > >>>>> > >>>>>> more constraint by assuming consumers know the hash function of > >>>>>>> > >>>>>> producer. > >>>>> > >>>>>> Could you explain more why user would want to use a cusmtom > >>>>>>> > >>>>>> partition > >>> > >>>> function? Maybe we can check if this is something that can be > >>>>>>> > >>>>>> supported > >>>> > >>>>> in > >>>>>> > >>>>>>> the default Kafka hash function. Also, can you explain more why it > >>>>>>> > >>>>>> is > >>> > >>>> difficuilt to implement the same hash function in different > >>>>>>> > >>>>>> languages? > >>>> > >>>>> - Besides the assumption that consumer needs to share the hash > >>>>>>> > >>>>>> function > >>>> > >>>>> of > >>>>>> > >>>>>>> producer, is there other organization overhead of the proposal in > >>>>>>> > >>>>>> the > >>> > >>>> current KIP? > >>>>>>> > >>>>>>> - Currently producer can forget about the message that has been > >>>>>>> acknowledged by the broker. Thus the producer probably does not > >>>>>>> > >>>>>> know > >>> > >>>> most > >>>>> > >>>>>> of the exiting messages in topic, including those messages produced > >>>>>>> > >>>>>> by > >>>> > >>>>> other producers. We can have the owner of the producer to > >>>>>>> > >>>>>> split+backfill. > >>>>> > >>>>>> In my opion it will be a new program that wraps around the existing > >>>>>>> producer and consumer classes. > >>>>>>> > >>>>>>> - Regarding point 5. The argument is in favor of the split+backfill > >>>>>>> > >>>>>> but > >>>> > >>>>> for > >>>>>> > >>>>>>> changelog topic. And it intends to address the problem for stream > >>>>>>> > >>>>>> use-case > >>>>>> > >>>>>>> in general. In this KIP we will provide interface (i.e. > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream > >>>>>>> > >>>>>> use-case > >>>> > >>>>> and > >>>>>> > >>>>>>> the goal is that user can flush/re-consume the state as part of the > >>>>>>> interface implementation regardless of whether there is change log > >>>>>>> > >>>>>> topic. > >>>>> > >>>>>> Maybe you are suggesting that the main reason to do split+backfill > >>>>>>> > >>>>>> of > >>> > >>>> input > >>>>>> > >>>>>>> topic is to support log compacted topics? You mentioned in Point 1 > >>>>>>> > >>>>>> that > >>>> > >>>>> log > >>>>>> > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could > >>>>>>> > >>>>>> understand > >>>>>> > >>>>>>> your position better. Regarding Jan's proposal to split partitions > >>>>>>> > >>>>>> with > >>>> > >>>>> backfill, do you think this should replace the proposal in the > >>>>>>> > >>>>>> existing > >>>> > >>>>> KIP, or do you think this is something that we should do in > >>>>>>> > >>>>>> addition > >>> > >>>> to > >>>> > >>>>> the > >>>>>> > >>>>>>> existing KIP? > >>>>>>> > >>>>>>> - Regarding point 6. I guess we can agree that it is better not to > >>>>>>> > >>>>>> have > >>>> > >>>>> the > >>>>>> > >>>>>>> performance overhread of copying the input data. Before we discuss > >>>>>>> > >>>>>> more > >>>> > >>>>> on > >>>>>> > >>>>>>> whether the performance overhead is acceptable or not, I am trying > >>>>>>> > >>>>>> to > >>> > >>>> figure out what is the benefit of introducing this overhread. You > >>>>>>> > >>>>>> mentioned > >>>>>> > >>>>>>> that the benefit is the loose organizational coupling. By > >>>>>>> > >>>>>> "organizational > >>>>> > >>>>>> coupling", are you referring to the requirement that consumer needs > >>>>>>> > >>>>>> to > >>>> > >>>>> know > >>>>>> > >>>>>>> the hash function of producer? If so, maybe we can discuss the > >>>>>>> > >>>>>> use-case > >>>> > >>>>> of > >>>>>> > >>>>>>> custom partiton function and see whether we can find a way to > >>>>>>> > >>>>>> support > >>> > >>>> such > >>>>>> > >>>>>>> use-case without having to copy the input data. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Dong > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io> > >>>>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hey Dong and Jun, > >>>>>>>> > >>>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll mix > >>>>>>>> > >>>>>>> my > >>> > >>>> replies > >>>>>>> > >>>>>>>> together to try for a coherent response. I'm not too familiar > >>>>>>>> > >>>>>>> with > >>> > >>>> mailing-list etiquette, though. > >>>>>>>> > >>>>>>>> I'm going to keep numbering my points because it makes it easy > >>>>>>>> > >>>>>>> for > >>> > >>>> you > >>>>> > >>>>>> all > >>>>>>> > >>>>>>>> to respond. > >>>>>>>> > >>>>>>>> Point 1: > >>>>>>>> As I read it, KIP-253 is *just* about properly fencing the > >>>>>>>> > >>>>>>> producers > >>>> > >>>>> and > >>>>>> > >>>>>>> consumers so that you preserve the correct ordering of records > >>>>>>>> > >>>>>>> during > >>>> > >>>>> partition expansion. This is clearly necessary regardless of > >>>>>>>> > >>>>>>> anything > >>>> > >>>>> else > >>>>>>> > >>>>>>>> we discuss. I think this whole discussion about backfill, > >>>>>>>> > >>>>>>> consumers, > >>>> > >>>>> streams, etc., is beyond the scope of KIP-253. But it would be > >>>>>>>> > >>>>>>> cumbersome > >>>>>> > >>>>>>> to start a new thread at this point. > >>>>>>>> > >>>>>>>> I had missed KIP-253's Proposed Change #9 among all the > >>>>>>>> > >>>>>>> details... > >>> > >>>> I > >>>> > >>>>> think > >>>>>>> > >>>>>>>> this is a nice addition to the proposal. One thought is that it's > >>>>>>>> > >>>>>>> actually > >>>>>>> > >>>>>>>> irrelevant whether the hash function is linear. This is simply an > >>>>>>>> > >>>>>>> algorithm > >>>>>>> > >>>>>>>> for moving a key from one partition to another, so the type of > >>>>>>>> > >>>>>>> hash > >>> > >>>> function need not be a precondition. In fact, it also doesn't > >>>>>>>> > >>>>>>> matter > >>>> > >>>>> whether the topic is compacted or not, the algorithm works > >>>>>>>> > >>>>>>> regardless. > >>>>> > >>>>>> I think this is a good algorithm to keep in mind, as it might > >>>>>>>> > >>>>>>> solve a > >>>> > >>>>> variety of problems, but it does have a downside: that the > >>>>>>>> > >>>>>>> producer > >>> > >>>> won't > >>>>>> > >>>>>>> know whether or not K1 was actually in P1, it just knows that K1 > >>>>>>>> > >>>>>>> was > >>>> > >>>>> in > >>>>> > >>>>>> P1's keyspace before the new epoch. Therefore, it will have to > >>>>>>>> pessimistically send (K1,null) to P1 just in case. But the next > >>>>>>>> > >>>>>>> time > >>>> > >>>>> K1 > >>>>> > >>>>>> comes along, the producer *also* won't remember that it already > >>>>>>>> > >>>>>>> retracted > >>>>>> > >>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By > >>>>>>>> > >>>>>>> extension, > >>>> > >>>>> every > >>>>>>> > >>>>>>>> time the producer sends to P2, it will also have to send a > >>>>>>>> > >>>>>>> tombstone > >>>> > >>>>> to > >>>>> > >>>>>> P1, > >>>>>>> > >>>>>>>> which is a pretty big burden. To make the situation worse, if > >>>>>>>> > >>>>>>> there > >>> > >>>> is > >>>>> > >>>>>> a > >>>>>> > >>>>>>> second split, say P2 becomes P2 and P3, then any key Kx belonging > >>>>>>>> > >>>>>>> to > >>>> > >>>>> P3 > >>>>> > >>>>>> will also have to be retracted from P2 *and* P1, since the > >>>>>>>> > >>>>>>> producer > >>> > >>>> can't > >>>>>> > >>>>>>> know whether Kx had been last written to P2 or P1. Over a long > >>>>>>>> > >>>>>>> period > >>>> > >>>>> of > >>>>>> > >>>>>>> time, this clearly becomes a issue, as the producer must send an > >>>>>>>> > >>>>>>> arbitrary > >>>>>>> > >>>>>>>> number of retractions along with every update. > >>>>>>>> > >>>>>>>> In contrast, the proposed backfill operation has an end, and > >>>>>>>> > >>>>>>> after > >>> > >>>> it > >>>> > >>>>> ends, > >>>>>>> > >>>>>>>> everyone can afford to forget that there ever was a different > >>>>>>>> > >>>>>>> partition > >>>>> > >>>>>> layout. > >>>>>>>> > >>>>>>>> Really, though, figuring out how to split compacted topics is > >>>>>>>> > >>>>>>> beyond > >>>> > >>>>> the > >>>>>> > >>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in > >>>>>>>> > >>>>>>> this > >>>> > >>>>> KIP... > >>>>>>> > >>>>>>>> We do need in-order delivery during partition expansion. It would > >>>>>>>> > >>>>>>> be > >>>> > >>>>> fine > >>>>>> > >>>>>>> by me to say that you *cannot* expand partitions of a > >>>>>>>> > >>>>>>> log-compacted > >>> > >>>> topic > >>>>>> > >>>>>>> and call it a day. I think it would be better to tackle that in > >>>>>>>> > >>>>>>> another > >>>>> > >>>>>> KIP. > >>>>>>>> > >>>>>>>> > >>>>>>>> Point 2: > >>>>>>>> Regarding whether the consumer re-shuffles its inputs, this is > >>>>>>>> > >>>>>>> always > >>>> > >>>>> on > >>>>>> > >>>>>>> the table; any consumer who wants to re-shuffle its input is free > >>>>>>>> > >>>>>>> to > >>>> > >>>>> do > >>>>> > >>>>>> so. > >>>>>>> > >>>>>>>> But this is currently not required. It's just that the current > >>>>>>>> > >>>>>>> high-level > >>>>>> > >>>>>>> story with Kafka encourages the use of partitions as a unit of > >>>>>>>> > >>>>>>> concurrency. > >>>>>>> > >>>>>>>> As long as consumers are single-threaded, they can happily > >>>>>>>> > >>>>>>> consume > >>> > >>>> a > >>>> > >>>>> single > >>>>>>> > >>>>>>>> partition without concurrency control of any kind. This is a key > >>>>>>>> > >>>>>>> aspect > >>>>> > >>>>>> to > >>>>>>> > >>>>>>>> this system that lets folks design high-throughput systems on top > >>>>>>>> > >>>>>>> of > >>>> > >>>>> it > >>>>> > >>>>>> surprisingly easily. If all consumers were instead > >>>>>>>> > >>>>>>> encouraged/required > >>>>> > >>>>>> to > >>>>>> > >>>>>>> implement a repartition of their own, then the consumer becomes > >>>>>>>> significantly more complex, requiring either the consumer to > >>>>>>>> > >>>>>>> first > >>> > >>>> produce > >>>>>>> > >>>>>>>> to its own intermediate repartition topic or to ensure that > >>>>>>>> > >>>>>>> consumer > >>>> > >>>>> threads have a reliable, high-bandwith channel of communication > >>>>>>>> > >>>>>>> with > >>>> > >>>>> every > >>>>>>> > >>>>>>>> other consumer thread. > >>>>>>>> > >>>>>>>> Either of those tradeoffs may be reasonable for a particular user > >>>>>>>> > >>>>>>> of > >>>> > >>>>> Kafka, > >>>>>>> > >>>>>>>> but I don't know if we're in a position to say that they are > >>>>>>>> > >>>>>>> reasonable > >>>>> > >>>>>> for > >>>>>>> > >>>>>>>> *every* user of Kafka. > >>>>>>>> > >>>>>>>> > >>>>>>>> Point 3: > >>>>>>>> Regarding Jun's point about this use case, "(3) stateful and > >>>>>>>> > >>>>>>> maintaining > >>>>>> > >>>>>>> the > >>>>>>>> states in a local store", I agree that they may use a framework > >>>>>>>> > >>>>>>> *like* > >>>>> > >>>>>> Kafka Streams, but that is not the same as using Kafka Streams. > >>>>>>>> > >>>>>>> This > >>>> > >>>>> is > >>>>> > >>>>>> why > >>>>>>> > >>>>>>>> I think it's better to solve it in Core: because it is then > >>>>>>>> > >>>>>>> solved > >>> > >>>> for > >>>>> > >>>>>> KStreams and also for everything else that facilitates local > >>>>>>>> > >>>>>>> state > >>> > >>>> maintenance. To me, Streams is a member of the category of > >>>>>>>> > >>>>>>> "stream > >>> > >>>> processing frameworks", which is itself a subcategory of "things > >>>>>>>> > >>>>>>> requiring > >>>>>>> > >>>>>>>> local state maintenence". I'm not sure if it makes sense to > >>>>>>> > >>>>>>> >