@Jun, yeah that works for me too. @Jan, just to clarify on my previous email: assuming we do the reshuffling out of the user input topics, Streams has the advantage that the repartition topic is purely owned by itself: the only producer writing to this repartition topic is Streams itself; so then we can change the split the number of partitions of the repartition without requiring to expand the input topics. And since the reshuffling task itself is stateless, it does not require key partitioning preservation. So as I mentioned, it is indeed not dependent on KIP-253 itself as KIP-254 is primarily for expanding an input topic for consumer applications.
Guozhang On Wed, Apr 4, 2018 at 4:01 PM, Dong Lin <lindon...@gmail.com> wrote: > Thanks Jun! The time works for me. > > > On Thu, 5 Apr 2018 at 4:34 AM Jun Rao <j...@confluent.io> wrote: > > > Hi, Jan, Dong, John, Guozhang, > > > > Perhaps it will be useful to have a KIP meeting to discuss this together > as > > a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out > > an invite to the mailing list. > > > > Thanks, > > > > Jun > > > > > > On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com> > > wrote: > > > > > Want to quickly step in here again because it is going places again. > > > > > > The last part of the discussion is just a pain to read and completely > > > diverged from what I suggested without making the reasons clear to me. > > > > > > I don't know why this happens.... here are my comments anyway. > > > > > > @Guozhang: That Streams is working on automatic creating > > > copartition-usuable topics: great for streams, has literally nothing > todo > > > with the KIP as we want to grow the > > > input topic. Everyone can reshuffle rel. easily but that is not what we > > > need todo, we need to grow the topic in question. After streams > > > automatically reshuffled the input topic > > > still has the same size and it didn't help a bit. I fail to see why > this > > > is relevant. What am i missing here? > > > > > > @Dong > > > I am still on the position that the current proposal brings us into the > > > wrong direction. Especially introducing PartitionKeyRebalanceListener > > > From this point we can never move away to proper state full handling > > > without completely deprecating this creature from hell again. > > > Linear hashing is not the optimising step we have todo here. An > interface > > > that when a topic is a topic its always the same even after it had > > > grown or shrunk is important. So from my POV I have major concerns that > > > this KIP is benefitial in its current state. > > > > > > What is it that makes everyone so addicted to the idea of linear > hashing? > > > not attractive at all for me. > > > And with statefull consumers still a complete mess. Why not stick with > > the > > > Kappa architecture??? > > > > > > > > > > > > > > > > > > On 03.04.2018 17:38, Dong Lin wrote: > > > > > >> Hey John, > > >> > > >> Thanks much for your comments!! > > >> > > >> I have yet to go through the emails of John/Jun/Guozhang in detail. > But > > >> let > > >> me present my idea for how to minimize the delay for state loading for > > >> stream use-case. > > >> > > >> For ease of understanding, let's assume that the initial partition > > number > > >> of input topics and change log topic are both 10. And initial number > of > > >> stream processor is also 10. If we only increase initial partition > > number > > >> of input topics to 15 without changing number of stream processor, the > > >> current KIP already guarantees in-order delivery and no state needs to > > be > > >> moved between consumers for stream use-case. Next, let's say we want > to > > >> increase the number of processor to expand the processing capacity for > > >> stream use-case. This requires us to move state between processors > which > > >> will take time. Our goal is to minimize the impact (i.e. delay) for > > >> processing while we increase the number of processors. > > >> > > >> Note that stream processor generally includes both consumer and > > producer. > > >> In addition to consume from the input topic, consumer may also need to > > >> consume from change log topic on startup for recovery. And producer > may > > >> produce state to the change log topic. > > >> > > >> > > > The solution will include the following steps: > > >> > > >> 1) Increase partition number of the input topic from 10 to 15. Since > the > > >> messages with the same key will still go to the same consume before > and > > >> after the partition expansion, this step can be done without having to > > >> move > > >> state between processors. > > >> > > >> 2) Increase partition number of the change log topic from 10 to 15. > Note > > >> that this step can also be done without impacting existing workflow. > > After > > >> we increase partition number of the change log topic, key space may > > split > > >> and some key will be produced to the newly-added partition. But the > same > > >> key will still go to the same processor (i.e. consumer) before and > after > > >> the partition. Thus this step can also be done without having to move > > >> state > > >> between processors. > > >> > > >> 3) Now, let's add 5 new consumers whose groupId is different from the > > >> existing processor's groupId. Thus these new consumers will not impact > > >> existing workflow. Each of these new consumers should consume two > > >> partitions from the earliest offset, where these two partitions are > the > > >> same partitions that will be consumed if the consumers have the same > > >> groupId as the existing processor's groupId. For example, the first of > > the > > >> five consumers will consume partition 0 and partition 10. The purpose > of > > >> these consumers is to rebuild the state (e.g. RocksDB) for the > > processors > > >> in advance. Also note that, by design of the current KIP, each consume > > >> will > > >> consume the existing partition of the change log topic up to the > offset > > >> before the partition expansion. Then they will only need to consume > the > > >> state of the new partition of the change log topic. > > >> > > >> 4) After consumers have caught up in step 3), we should stop these > > >> consumers and add 5 new processors to the stream processing job. > These 5 > > >> new processors should run in the same location as the previous 5 > > consumers > > >> to re-use the state (e.g. RocksDB). And these processors' consumers > > should > > >> consume partitions of the change log topic from the committed offset > the > > >> previous 5 consumers so that no state is missed. > > >> > > >> One important trick to note here is that, the mapping from partition > to > > >> consumer should also use linear hashing. And we need to remember the > > >> initial number of processors in the job, 10 in this example, and use > > this > > >> number in the linear hashing algorithm. This is pretty much the same > as > > >> how > > >> we use linear hashing to map key to partition. In this case, we get an > > >> identity map from partition -> processor, for both input topic and the > > >> change log topic. For example, processor 12 will consume partition 12 > of > > >> the input topic and produce state to the partition 12 of the change > log > > >> topic. > > >> > > >> There are a few important properties of this solution to note: > > >> > > >> - We can increase the number of partitions for input topic and the > > change > > >> log topic in any order asynchronously. > > >> - The expansion of the processors in a given job in step 4) only > > requires > > >> the step 3) for the same job. It does not require coordination across > > >> different jobs for step 3) and 4). Thus different jobs can > independently > > >> expand there capacity without waiting for each other. > > >> - The logic for 1) and 2) is already supported in the current KIP. The > > >> logic for 3) and 4) appears to be independent of the core Kafka logic > > and > > >> can be implemented separately outside core Kafka. Thus the current KIP > > is > > >> probably sufficient after we agree on the efficiency and the > correctness > > >> of > > >> the solution. We can have a separate KIP for Kafka Stream to support > 3) > > >> and > > >> 4). > > >> > > >> > > >> Cheers, > > >> Dong > > >> > > >> > > >> On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > >> > > >> Hey guys, just sharing my two cents here (I promise it will be shorter > > >>> than > > >>> John's article :). > > >>> > > >>> 0. Just to quickly recap, the main discussion point now is how to > > support > > >>> "key partitioning preservation" (John's #4 in topic characteristics > > >>> above) > > >>> beyond the "single-key ordering preservation" that KIP-253 was > > originally > > >>> proposed to maintain (John's #6 above). > > >>> > > >>> 1. From the streams project, we are actively working on improving the > > >>> elastic scalability of the library. One of the key features is to > > >>> decouple > > >>> the input topics from the parallelism model of Streams: i.e. not > > >>> enforcing > > >>> the topic to be partitioned by the key, not enforcing joining topics > to > > >>> be > > >>> co-partitioned, not relying the number of parallel tasks on the input > > >>> topic > > >>> partitions. This can be achieved by re-shuffling on the input topics > to > > >>> make sure key-partitioning / co-partitioning on the internal topics. > > Note > > >>> the re-shuffling task is purely stateless and hence does not require > > "key > > >>> partitioning preservation". Operational-wise it is similar to the > > >>> "creating > > >>> a new topic with new number of partitions, pipe the data to the new > > topic > > >>> and cut over consumers from old topics" idea, just that users can > > >>> optionally let Streams to handle such rather than doing it manually > > >>> themselves. There are a few more details on that regard but I will > skip > > >>> since they are not directly related to this discussion. > > >>> > > >>> 2. Assuming that 1) above is done, then the only topics involved in > the > > >>> scaling events are all input topics. For these topics the only > > producers > > >>> / > > >>> consumers of these topics are controlled by Streams clients > themselves, > > >>> and > > >>> hence achieving "key partitioning preservation" is simpler than > > >>> non-Streams > > >>> scenarios: consumers know the partitioning scheme that producers are > > >>> using, > > >>> so that for their stateful operations it is doable to split the local > > >>> state > > >>> stores accordingly or execute backfilling on its own. Of course, if > we > > >>> decide to do server-side backfilling, it can still help Streams to > > >>> directly > > >>> rely on that functionality. > > >>> > > >>> 3. As John mentioned, another way inside Streams is to do > > >>> over-partitioning > > >>> on all internal topics; then with 1) Streams would not rely on > KIP-253 > > at > > >>> all. But personally I'd like to avoid it if possible to reduce Kafka > > side > > >>> footprint: say we overpartition each input topic up to 1k, with a > > >>> reasonable sized stateful topology it can still contribute to tens of > > >>> thousands of topics to the topic partition capacity of a single > > cluster. > > >>> > > >>> 4. Summing up 1/2/3, I think we should focus more on non-Streams > users > > >>> writing their stateful computations with local states, and think > > whether > > >>> / > > >>> how we could enable "key partitioning preservation" for them easily, > > than > > >>> to think heavily for Streams library. People may have different > opinion > > >>> on > > >>> how common of a usage pattern it is (I think Jun might be suggesting > > that > > >>> for DIY apps people may more likely use remote states so that it is > > not a > > >>> problem for them). My opinion is that for non-Streams users such > usage > > >>> pattern could still be large (think: if you are piping data from > Kafka > > to > > >>> an external data storage which has single-writer requirements for > each > > >>> single shard, even though it is not a stateful computational > > application > > >>> it > > >>> may still require "key partitioning preservation"), so I prefer to > have > > >>> backfilling in our KIP than only exposing the API for expansion and > > >>> requires consumers to have pre-knowledge of the producer's > partitioning > > >>> scheme. > > >>> > > >>> > > >>> > > >>> Guozhang > > >>> > > >>> > > >>> > > >>> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> > > wrote: > > >>> > > >>> Hey Dong, > > >>>> > > >>>> Congrats on becoming a committer!!! > > >>>> > > >>>> Since I just sent a novel-length email, I'll try and keep this one > > brief > > >>>> > > >>> ;) > > >>> > > >>>> Regarding producer coordination, I'll grant that in that case, > > producers > > >>>> may coordinate among themselves to produce into the same topic or to > > >>>> produce co-partitioned topics. Nothing in KStreams or the Kafka > > >>>> ecosystem > > >>>> in general requires such coordination for correctness or in fact for > > any > > >>>> optional features, though, so I would not say that we require > producer > > >>>> coordination of partition logic. If producers currently coordinate, > > it's > > >>>> completely optional and their own choice. > > >>>> > > >>>> Regarding the portability of partition algorithms, my observation is > > >>>> that > > >>>> systems requiring independent implementations of the same algorithm > > with > > >>>> 100% correctness are a large source of risk and also a burden on > those > > >>>> > > >>> who > > >>> > > >>>> have to maintain them. If people could flawlessly implement > algorithms > > >>>> in > > >>>> actual software, the world would be a wonderful place indeed! For a > > >>>> > > >>> system > > >>> > > >>>> as important and widespread as Kafka, I would recommend restricting > > >>>> limiting such requirements as aggressively as possible. > > >>>> > > >>>> I'd agree that we can always revisit decisions like allowing > arbitrary > > >>>> partition functions, but of course, we shouldn't do that in a > vacuum. > > >>>> > > >>> That > > >>> > > >>>> feels like the kind of thing we'd need to proactively seek guidance > > from > > >>>> the users list about. I do think that the general approach of saying > > >>>> that > > >>>> "if you use a custom partitioner, you cannot do partition expansion" > > is > > >>>> very reasonable (but I don't think we need to go that far with the > > >>>> > > >>> current > > >>> > > >>>> proposal). It's similar to my statement in my email to Jun that in > > >>>> principle KStreams doesn't *need* backfill, we only need it if we > want > > >>>> to > > >>>> employ partition expansion. > > >>>> > > >>>> I reckon that the main motivation for backfill is to support > KStreams > > >>>> use > > >>>> cases and also any other use cases involving stateful consumers. > > >>>> > > >>>> Thanks for your response, and congrats again! > > >>>> -John > > >>>> > > >>>> > > >>>> On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> > > wrote: > > >>>> > > >>>> Hey John, > > >>>>> > > >>>>> Great! Thanks for all the comment. It seems that we agree that the > > >>>>> > > >>>> current > > >>>> > > >>>>> KIP is in good shape for core Kafka. IMO, what we have been > > discussing > > >>>>> > > >>>> in > > >>> > > >>>> the recent email exchanges is mostly about the second step, i.e. how > > to > > >>>>> address problem for the stream use-case (or stateful processing in > > >>>>> general). > > >>>>> > > >>>>> I will comment inline. > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > >>>>> On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io> > > >>>>> > > >>>> wrote: > > >>> > > >>>> Thanks for the response, Dong. > > >>>>>> > > >>>>>> Here are my answers to your questions: > > >>>>>> > > >>>>>> - "Asking producers and consumers, or even two different > producers, > > >>>>>> > > >>>>> to > > >>> > > >>>> share code like the partition function is a pretty huge ask. What > > >>>>>>> > > >>>>>> if > > >>> > > >>>> they > > >>>>> > > >>>>>> are using different languages?". It seems that today we already > > >>>>>>> > > >>>>>> require > > >>>> > > >>>>> different producer's to use the same hash function -- otherwise > > >>>>>>> > > >>>>>> messages > > >>>>> > > >>>>>> with the same key will go to different partitions of the same > topic > > >>>>>>> > > >>>>>> which > > >>>>> > > >>>>>> may cause problem for downstream consumption. So not sure if it > > >>>>>>> > > >>>>>> adds > > >>> > > >>>> any > > >>>>> > > >>>>>> more constraint by assuming consumers know the hash function of > > >>>>>>> > > >>>>>> producer. > > >>>>> > > >>>>>> Could you explain more why user would want to use a cusmtom > > >>>>>>> > > >>>>>> partition > > >>> > > >>>> function? Maybe we can check if this is something that can be > > >>>>>>> > > >>>>>> supported > > >>>> > > >>>>> in > > >>>>>> > > >>>>>>> the default Kafka hash function. Also, can you explain more why > it > > >>>>>>> > > >>>>>> is > > >>> > > >>>> difficuilt to implement the same hash function in different > > >>>>>>> > > >>>>>> languages? > > >>>> > > >>>>> > > >>>>>> Sorry, I meant two different producers as in producers to two > > >>>>>> > > >>>>> different > > >>> > > >>>> topics. This was in response to the suggestion that we already > > >>>>>> > > >>>>> require > > >>> > > >>>> coordination among producers to different topics in order to achieve > > >>>>>> co-partitioning. I was saying that we do not (and should not). > > >>>>>> > > >>>>> > > >>>>> It is probably common for producers of different team to produce > > >>>>> > > >>>> message > > >>> > > >>>> to > > >>>> > > >>>>> the same topic. In order to ensure that messages with the same key > go > > >>>>> > > >>>> to > > >>> > > >>>> same partition, we need producers of different team to share the > same > > >>>>> partition algorithm, which by definition requires coordination > among > > >>>>> producers of different teams in an organization. Even for producers > > of > > >>>>> different topics, it may be common to require producers to use the > > same > > >>>>> partition algorithm in order to join two topics for stream > > processing. > > >>>>> > > >>>> Does > > >>>> > > >>>>> this make it reasonable to say we already require coordination > across > > >>>>> producers? > > >>>>> > > >>>>> > > >>>>> By design, consumers are currently ignorant of the partitioning > > >>>>>> > > >>>>> scheme. > > >>> > > >>>> It > > >>>>> > > >>>>>> suffices to trust that the producer has partitioned the topic by > > key, > > >>>>>> > > >>>>> if > > >>>> > > >>>>> they claim to have done so. If you don't trust that, or even if you > > >>>>>> > > >>>>> just > > >>>> > > >>>>> need some other partitioning scheme, then you must re-partition it > > >>>>>> yourself. Nothing we're discussing can or should change that. The > > >>>>>> > > >>>>> value > > >>> > > >>>> of > > >>>>> > > >>>>>> backfill is that it preserves the ability for consumers to avoid > > >>>>>> re-partitioning before consuming, in the case where they don't > need > > >>>>>> > > >>>>> to > > >>> > > >>>> today. > > >>>>>> > > >>>>> > > >>>>> Regarding shared "hash functions", note that it's a bit inaccurate > to > > >>>>>> > > >>>>> talk > > >>>>> > > >>>>>> about the "hash function" of the producer. Properly speaking, the > > >>>>>> > > >>>>> producer > > >>>>> > > >>>>>> has only a "partition function". We do not know that it is a hash. > > >>>>>> > > >>>>> The > > >>> > > >>>> producer can use any method at their disposal to assign a partition > > >>>>>> > > >>>>> to > > >>> > > >>>> a > > >>>> > > >>>>> record. The partition function obviously may we written in any > > >>>>>> > > >>>>> programming > > >>>>> > > >>>>>> language, so in general it's not something that can be shared > around > > >>>>>> without a formal spec or the ability to execute arbitrary > > executables > > >>>>>> > > >>>>> in > > >>>> > > >>>>> arbitrary runtime environments. > > >>>>>> > > >>>>>> Yeah it is probably better to say partition algorithm. I guess it > > >>>>> > > >>>> should > > >>> > > >>>> not be difficult to implement same partition algorithms in different > > >>>>> languages, right? Yes we would need a formal specification of the > > >>>>> > > >>>> default > > >>> > > >>>> partition algorithm in the producer. I think that can be documented > as > > >>>>> > > >>>> part > > >>>> > > >>>>> of the producer interface. > > >>>>> > > >>>>> > > >>>>> Why would a producer want a custom partition function? I don't > > >>>>>> > > >>>>> know... > > >>> > > >>>> why > > >>>>> > > >>>>>> did we design the interface so that our users can provide one? In > > >>>>>> > > >>>>> general, > > >>>>> > > >>>>>> such systems provide custom partitioners because some data sets > may > > >>>>>> > > >>>>> be > > >>> > > >>>> unbalanced under the default or because they can provide some > > >>>>>> > > >>>>> interesting > > >>>> > > >>>>> functionality built on top of the partitioning scheme, etc. Having > > >>>>>> > > >>>>> provided > > >>>>> > > >>>>>> this ability, I don't know why we would remove it. > > >>>>>> > > >>>>>> Yeah it is reasonable to assume that there was reason to support > > >>>>> custom > > >>>>> partition function in producer. On the other hand it may also be > > >>>>> > > >>>> reasonable > > >>>> > > >>>>> to revisit this interface and discuss whether we actually need to > > >>>>> > > >>>> support > > >>> > > >>>> custom partition function. If we don't have a good reason, we can > > >>>>> > > >>>> choose > > >>> > > >>>> not to support custom partition function in this KIP in a backward > > >>>>> compatible manner, i.e. user can still use custom partition > function > > >>>>> > > >>>> but > > >>> > > >>>> they would not get the benefit of in-order delivery when there is > > >>>>> > > >>>> partition > > >>>> > > >>>>> expansion. What do you think? > > >>>>> > > >>>>> > > >>>>> - Besides the assumption that consumer needs to share the hash > > >>>>>> > > >>>>> function > > >>> > > >>>> of > > >>>>> > > >>>>>> producer, is there other organization overhead of the proposal in > > >>>>>>> > > >>>>>> the > > >>> > > >>>> current KIP? > > >>>>>>> > > >>>>>>> It wasn't clear to me that KIP-253 currently required the > producer > > >>>>>> > > >>>>> and > > >>> > > >>>> consumer to share the partition function, or in fact that it had a > > >>>>>> > > >>>>> hard > > >>> > > >>>> requirement to abandon the general partition function and use a > > >>>>>> > > >>>>> linear > > >>> > > >>>> hash > > >>>>> > > >>>>>> function instead. > > >>>>>> > > >>>>> > > >>>>> In my reading, there is a requirement to track the metadata about > > >>>>>> > > >>>>> what > > >>> > > >>>> partitions split into what other partitions during an expansion > > >>>>>> > > >>>>> operation. > > >>>>> > > >>>>>> If the partition function is linear, this is easy. If not, you can > > >>>>>> > > >>>>> always > > >>>> > > >>>>> just record that all old partitions split into all new partitions. > > >>>>>> > > >>>>> This > > >>> > > >>>> has > > >>>>> > > >>>>>> the effect of forcing all consumers to wait until the old epoch is > > >>>>>> completely consumed before starting on the new epoch. But this may > > >>>>>> > > >>>>> be a > > >>> > > >>>> reasonable tradeoff, and it doesn't otherwise alter your design. > > >>>>>> > > >>>>>> You only mention the consumer needing to know that the partition > > >>>>>> > > >>>>> function > > >>>> > > >>>>> is linear, not what the actual function is, so I don't think your > > >>>>>> > > >>>>> design > > >>>> > > >>>>> actually calls for sharing the function. Plus, really all the > > >>>>>> > > >>>>> consumer > > >>> > > >>>> needs is the metadata about what old-epoch partitions to wait for > > >>>>>> > > >>>>> before > > >>>> > > >>>>> consuming a new-epoch partition. This information is directly > > >>>>>> > > >>>>> captured > > >>> > > >>>> in > > >>>> > > >>>>> metadata, so I don't think it actually even cares whether the > > >>>>>> > > >>>>> partition > > >>> > > >>>> function is linear or not. > > >>>>>> > > >>>>>> You are right that the current KIP does not mention it. My comment > > >>>>> > > >>>> related > > >>>> > > >>>>> to the partition function coordination was related to support the > > >>>>> stream-use case which we have been discussing so far. > > >>>>> > > >>>>> > > >>>>> So, no, I really think KIP-253 is in good shape. I was really more > > >>>>>> > > >>>>> talking > > >>>>> > > >>>>>> about the part of this thread that's outside of KIP-253's scope, > > >>>>>> > > >>>>> namely, > > >>>> > > >>>>> creating the possibility of backfilling partitions after expansion. > > >>>>>> > > >>>>>> Great! Can you also confirm that the main motivation for > backfilling > > >>>>> partitions after expansion is to support the stream use-case? > > >>>>> > > >>>>> > > >>>>> - Currently producer can forget about the message that has been > > >>>>>> > > >>>>>>> acknowledged by the broker. Thus the producer probably does not > > >>>>>>> > > >>>>>> know > > >>> > > >>>> most > > >>>>> > > >>>>>> of the exiting messages in topic, including those messages > produced > > >>>>>>> > > >>>>>> by > > >>>> > > >>>>> other producers. We can have the owner of the producer to > > >>>>>>> > > >>>>>> split+backfill. > > >>>>> > > >>>>>> In my opion it will be a new program that wraps around the > existing > > >>>>>>> producer and consumer classes. > > >>>>>>> > > >>>>>>> This sounds fine by me! > > >>>>>> > > >>>>>> Really, I was just emphasizing that the part of the organization > > that > > >>>>>> produces a topic shouldn't have to export their partition function > > to > > >>>>>> > > >>>>> the > > >>>> > > >>>>> part(s) of the organization (or other organizations) that consume > the > > >>>>>> topic. Whether the backfill operation goes into the Producer > > >>>>>> > > >>>>> interface > > >>> > > >>>> is > > >>>> > > >>>>> secondary, I think. > > >>>>>> > > >>>>>> - Regarding point 5. The argument is in favor of the > split+backfill > > >>>>>> > > >>>>> but > > >>> > > >>>> for > > >>>>> > > >>>>>> changelog topic. And it intends to address the problem for stream > > >>>>>>> > > >>>>>> use-case > > >>>>>> > > >>>>>>> in general. In this KIP we will provide interface (i.e. > > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream > > >>>>>>> > > >>>>>> use-case > > >>>> > > >>>>> and > > >>>>>> > > >>>>>>> the goal is that user can flush/re-consume the state as part of > the > > >>>>>>> interface implementation regardless of whether there is change > log > > >>>>>>> > > >>>>>> topic. > > >>>>> > > >>>>>> Maybe you are suggesting that the main reason to do split+backfill > > >>>>>>> > > >>>>>> of > > >>> > > >>>> input > > >>>>>> > > >>>>>>> topic is to support log compacted topics? You mentioned in Point > 1 > > >>>>>>> > > >>>>>> that > > >>>> > > >>>>> log > > >>>>>> > > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could > > >>>>>>> > > >>>>>> understand > > >>>>>> > > >>>>>>> your position better. Regarding Jan's proposal to split > partitions > > >>>>>>> > > >>>>>> with > > >>>> > > >>>>> backfill, do you think this should replace the proposal in the > > >>>>>>> > > >>>>>> existing > > >>>> > > >>>>> KIP, or do you think this is something that we should do in > > >>>>>>> > > >>>>>> addition > > >>> > > >>>> to > > >>>> > > >>>>> the > > >>>>>> > > >>>>>>> existing KIP? > > >>>>>>> > > >>>>>>> I think that interface is a good/necessary component of KIP-253. > > >>>>>> > > >>>>>> I personally (FWIW) feel that KIP-253 is appropriately scoped, > but I > > >>>>>> > > >>>>> do > > >>> > > >>>> think its utility will be limited unless there is a later KIP > > >>>>>> > > >>>>> offering > > >>> > > >>>> backfill. But, maybe unlike Jan, I think it makes sense to try and > > >>>>>> > > >>>>> tackle > > >>>> > > >>>>> the ordering problem independently of backfill, so I'm in support > of > > >>>>>> > > >>>>> the > > >>>> > > >>>>> current KIP. > > >>>>>> > > >>>>>> - Regarding point 6. I guess we can agree that it is better not to > > >>>>>> > > >>>>> have > > >>> > > >>>> the > > >>>>> > > >>>>>> performance overhread of copying the input data. Before we discuss > > >>>>>>> > > >>>>>> more > > >>>> > > >>>>> on > > >>>>>> > > >>>>>>> whether the performance overhead is acceptable or not, I am > trying > > >>>>>>> > > >>>>>> to > > >>> > > >>>> figure out what is the benefit of introducing this overhread. You > > >>>>>>> > > >>>>>> mentioned > > >>>>>> > > >>>>>>> that the benefit is the loose organizational coupling. By > > >>>>>>> > > >>>>>> "organizational > > >>>>> > > >>>>>> coupling", are you referring to the requirement that consumer > needs > > >>>>>>> > > >>>>>> to > > >>>> > > >>>>> know > > >>>>>> > > >>>>>>> the hash function of producer? If so, maybe we can discuss the > > >>>>>>> > > >>>>>> use-case > > >>>> > > >>>>> of > > >>>>>> > > >>>>>>> custom partiton function and see whether we can find a way to > > >>>>>>> > > >>>>>> support > > >>> > > >>>> such > > >>>>>> > > >>>>>>> use-case without having to copy the input data. > > >>>>>>> > > >>>>>>> I'm not too sure about what an "input" is in this sense, since we > > are > > >>>>>> > > >>>>> just > > >>>>> > > >>>>>> talking about topics. Actually the point I was making there is > that > > >>>>>> > > >>>>> AKAICT > > >>>>> > > >>>>>> the performance overhead of a backfill is less than any other > > option, > > >>>>>> assuming you split partitions rarely. > > >>>>>> > > >>>>>> By "input" I was referring to source Kafka topic of a stream > > >>>>> processing > > >>>>> job. > > >>>>> > > >>>>> > > >>>>> Separately, yes, "organizational coupling" increases if producers > and > > >>>>>> consumers have to share code, such as the partition function. This > > >>>>>> > > >>>>> would > > >>>> > > >>>>> not be the case if producers could only pick from a menu of a few > > >>>>>> well-known partition functions, but I think this is a poor > tradeoff. > > >>>>>> > > >>>>>> Maybe we can revisit the custom partition function and see whether > > we > > >>>>> actually need it? Otherwise, I am concerned that every user will > pay > > >>>>> > > >>>> the > > >>> > > >>>> overhead of data movement to support something that was not really > > >>>>> > > >>>> needed > > >>> > > >>>> for most users. > > >>>>> > > >>>>> > > >>>>> To me, this is two strong arguments in favor of backfill being less > > >>>>>> expensive than no backfill, but again, I think that particular > > debate > > >>>>>> > > >>>>> comes > > >>>>> > > >>>>>> after KIP-253, so I don't want to create the impression of > > opposition > > >>>>>> > > >>>>> to > > >>>> > > >>>>> your proposal. > > >>>>>> > > >>>>>> > > >>>>>> Finally, to respond to a new email I just noticed: > > >>>>>> > > >>>>>> BTW, here is my understanding of the scope of this KIP. We want to > > >>>>>>> > > >>>>>> allow > > >>>>> > > >>>>>> consumers to always consume messages with the same key from the > > >>>>>>> > > >>>>>> same > > >>> > > >>>> producer in the order they are produced. And we need to provide a > > >>>>>>> > > >>>>>> way > > >>> > > >>>> for > > >>>>> > > >>>>>> stream use-case to be able to flush/load state when messages with > > >>>>>>> > > >>>>>> the > > >>> > > >>>> same > > >>>>>> > > >>>>>>> key are migrated between consumers. In addition to ensuring that > > >>>>>>> > > >>>>>> this > > >>> > > >>>> goal > > >>>>>> > > >>>>>>> is correctly supported, we should do our best to keep the > > >>>>>>> > > >>>>>> performance > > >>> > > >>>> and > > >>>>> > > >>>>>> organization overhead of this KIP as low as possible. > > >>>>>>> > > >>>>>>> I think we're on the same page there! In fact, I would > generalize a > > >>>>>> > > >>>>> little > > >>>>> > > >>>>>> more and say that the mechanism you've designed provides *all > > >>>>>> > > >>>>> consumers* > > >>>> > > >>>>> the ability "to flush/load state when messages with the same key > are > > >>>>>> migrated between consumers", not just Streams. > > >>>>>> > > >>>>>> Thanks for all the comment! > > >>>>> > > >>>>> > > >>>>> Thanks for the discussion, > > >>>>>> -John > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> > > >>>>>> > > >>>>> wrote: > > >>> > > >>>> Hey John, > > >>>>>>> > > >>>>>>> Thanks much for the detailed comments. Here are my thoughts: > > >>>>>>> > > >>>>>>> - The need to delete messages from log compacted topics is mainly > > >>>>>>> > > >>>>>> for > > >>> > > >>>> performance (e.g. storage space) optimization than for correctness > > >>>>>>> > > >>>>>> for > > >>>> > > >>>>> this > > >>>>>> > > >>>>>>> KIP. I agree that we probably don't need to focus on this in our > > >>>>>>> > > >>>>>> discussion > > >>>>>> > > >>>>>>> since it is mostly for performance optimization. > > >>>>>>> > > >>>>>>> - "Asking producers and consumers, or even two different > producers, > > >>>>>>> > > >>>>>> to > > >>>> > > >>>>> share code like the partition function is a pretty huge ask. What > > >>>>>>> > > >>>>>> if > > >>> > > >>>> they > > >>>>> > > >>>>>> are using different languages?". It seems that today we already > > >>>>>>> > > >>>>>> require > > >>>> > > >>>>> different producer's to use the same hash function -- otherwise > > >>>>>>> > > >>>>>> messages > > >>>>> > > >>>>>> with the same key will go to different partitions of the same > topic > > >>>>>>> > > >>>>>> which > > >>>>> > > >>>>>> may cause problem for downstream consumption. So not sure if it > > >>>>>>> > > >>>>>> adds > > >>> > > >>>> any > > >>>>> > > >>>>>> more constraint by assuming consumers know the hash function of > > >>>>>>> > > >>>>>> producer. > > >>>>> > > >>>>>> Could you explain more why user would want to use a cusmtom > > >>>>>>> > > >>>>>> partition > > >>> > > >>>> function? Maybe we can check if this is something that can be > > >>>>>>> > > >>>>>> supported > > >>>> > > >>>>> in > > >>>>>> > > >>>>>>> the default Kafka hash function. Also, can you explain more why > it > > >>>>>>> > > >>>>>> is > > >>> > > >>>> difficuilt to implement the same hash function in different > > >>>>>>> > > >>>>>> languages? > > >>>> > > >>>>> - Besides the assumption that consumer needs to share the hash > > >>>>>>> > > >>>>>> function > > >>>> > > >>>>> of > > >>>>>> > > >>>>>>> producer, is there other organization overhead of the proposal in > > >>>>>>> > > >>>>>> the > > >>> > > >>>> current KIP? > > >>>>>>> > > >>>>>>> - Currently producer can forget about the message that has been > > >>>>>>> acknowledged by the broker. Thus the producer probably does not > > >>>>>>> > > >>>>>> know > > >>> > > >>>> most > > >>>>> > > >>>>>> of the exiting messages in topic, including those messages > produced > > >>>>>>> > > >>>>>> by > > >>>> > > >>>>> other producers. We can have the owner of the producer to > > >>>>>>> > > >>>>>> split+backfill. > > >>>>> > > >>>>>> In my opion it will be a new program that wraps around the > existing > > >>>>>>> producer and consumer classes. > > >>>>>>> > > >>>>>>> - Regarding point 5. The argument is in favor of the > split+backfill > > >>>>>>> > > >>>>>> but > > >>>> > > >>>>> for > > >>>>>> > > >>>>>>> changelog topic. And it intends to address the problem for stream > > >>>>>>> > > >>>>>> use-case > > >>>>>> > > >>>>>>> in general. In this KIP we will provide interface (i.e. > > >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream > > >>>>>>> > > >>>>>> use-case > > >>>> > > >>>>> and > > >>>>>> > > >>>>>>> the goal is that user can flush/re-consume the state as part of > the > > >>>>>>> interface implementation regardless of whether there is change > log > > >>>>>>> > > >>>>>> topic. > > >>>>> > > >>>>>> Maybe you are suggesting that the main reason to do split+backfill > > >>>>>>> > > >>>>>> of > > >>> > > >>>> input > > >>>>>> > > >>>>>>> topic is to support log compacted topics? You mentioned in Point > 1 > > >>>>>>> > > >>>>>> that > > >>>> > > >>>>> log > > >>>>>> > > >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could > > >>>>>>> > > >>>>>> understand > > >>>>>> > > >>>>>>> your position better. Regarding Jan's proposal to split > partitions > > >>>>>>> > > >>>>>> with > > >>>> > > >>>>> backfill, do you think this should replace the proposal in the > > >>>>>>> > > >>>>>> existing > > >>>> > > >>>>> KIP, or do you think this is something that we should do in > > >>>>>>> > > >>>>>> addition > > >>> > > >>>> to > > >>>> > > >>>>> the > > >>>>>> > > >>>>>>> existing KIP? > > >>>>>>> > > >>>>>>> - Regarding point 6. I guess we can agree that it is better not > to > > >>>>>>> > > >>>>>> have > > >>>> > > >>>>> the > > >>>>>> > > >>>>>>> performance overhread of copying the input data. Before we > discuss > > >>>>>>> > > >>>>>> more > > >>>> > > >>>>> on > > >>>>>> > > >>>>>>> whether the performance overhead is acceptable or not, I am > trying > > >>>>>>> > > >>>>>> to > > >>> > > >>>> figure out what is the benefit of introducing this overhread. You > > >>>>>>> > > >>>>>> mentioned > > >>>>>> > > >>>>>>> that the benefit is the loose organizational coupling. By > > >>>>>>> > > >>>>>> "organizational > > >>>>> > > >>>>>> coupling", are you referring to the requirement that consumer > needs > > >>>>>>> > > >>>>>> to > > >>>> > > >>>>> know > > >>>>>> > > >>>>>>> the hash function of producer? If so, maybe we can discuss the > > >>>>>>> > > >>>>>> use-case > > >>>> > > >>>>> of > > >>>>>> > > >>>>>>> custom partiton function and see whether we can find a way to > > >>>>>>> > > >>>>>> support > > >>> > > >>>> such > > >>>>>> > > >>>>>>> use-case without having to copy the input data. > > >>>>>>> > > >>>>>>> Thanks, > > >>>>>>> Dong > > >>>>>>> > > >>>>>>> > > >>>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler < > j...@confluent.io> > > >>>>>>> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hey Dong and Jun, > > >>>>>>>> > > >>>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll mix > > >>>>>>>> > > >>>>>>> my > > >>> > > >>>> replies > > >>>>>>> > > >>>>>>>> together to try for a coherent response. I'm not too familiar > > >>>>>>>> > > >>>>>>> with > > >>> > > >>>> mailing-list etiquette, though. > > >>>>>>>> > > >>>>>>>> I'm going to keep numbering my points because it makes it easy > > >>>>>>>> > > >>>>>>> for > > >>> > > >>>> you > > >>>>> > > >>>>>> all > > >>>>>>> > > >>>>>>>> to respond. > > >>>>>>>> > > >>>>>>>> Point 1: > > >>>>>>>> As I read it, KIP-253 is *just* about properly fencing the > > >>>>>>>> > > >>>>>>> producers > > >>>> > > >>>>> and > > >>>>>> > > >>>>>>> consumers so that you preserve the correct ordering of records > > >>>>>>>> > > >>>>>>> during > > >>>> > > >>>>> partition expansion. This is clearly necessary regardless of > > >>>>>>>> > > >>>>>>> anything > > >>>> > > >>>>> else > > >>>>>>> > > >>>>>>>> we discuss. I think this whole discussion about backfill, > > >>>>>>>> > > >>>>>>> consumers, > > >>>> > > >>>>> streams, etc., is beyond the scope of KIP-253. But it would be > > >>>>>>>> > > >>>>>>> cumbersome > > >>>>>> > > >>>>>>> to start a new thread at this point. > > >>>>>>>> > > >>>>>>>> I had missed KIP-253's Proposed Change #9 among all the > > >>>>>>>> > > >>>>>>> details... > > >>> > > >>>> I > > >>>> > > >>>>> think > > >>>>>>> > > >>>>>>>> this is a nice addition to the proposal. One thought is that > it's > > >>>>>>>> > > >>>>>>> actually > > >>>>>>> > > >>>>>>>> irrelevant whether the hash function is linear. This is simply > an > > >>>>>>>> > > >>>>>>> algorithm > > >>>>>>> > > >>>>>>>> for moving a key from one partition to another, so the type of > > >>>>>>>> > > >>>>>>> hash > > >>> > > >>>> function need not be a precondition. In fact, it also doesn't > > >>>>>>>> > > >>>>>>> matter > > >>>> > > >>>>> whether the topic is compacted or not, the algorithm works > > >>>>>>>> > > >>>>>>> regardless. > > >>>>> > > >>>>>> I think this is a good algorithm to keep in mind, as it might > > >>>>>>>> > > >>>>>>> solve a > > >>>> > > >>>>> variety of problems, but it does have a downside: that the > > >>>>>>>> > > >>>>>>> producer > > >>> > > >>>> won't > > >>>>>> > > >>>>>>> know whether or not K1 was actually in P1, it just knows that K1 > > >>>>>>>> > > >>>>>>> was > > >>>> > > >>>>> in > > >>>>> > > >>>>>> P1's keyspace before the new epoch. Therefore, it will have to > > >>>>>>>> pessimistically send (K1,null) to P1 just in case. But the next > > >>>>>>>> > > >>>>>>> time > > >>>> > > >>>>> K1 > > >>>>> > > >>>>>> comes along, the producer *also* won't remember that it already > > >>>>>>>> > > >>>>>>> retracted > > >>>>>> > > >>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By > > >>>>>>>> > > >>>>>>> extension, > > >>>> > > >>>>> every > > >>>>>>> > > >>>>>>>> time the producer sends to P2, it will also have to send a > > >>>>>>>> > > >>>>>>> tombstone > > >>>> > > >>>>> to > > >>>>> > > >>>>>> P1, > > >>>>>>> > > >>>>>>>> which is a pretty big burden. To make the situation worse, if > > >>>>>>>> > > >>>>>>> there > > >>> > > >>>> is > > >>>>> > > >>>>>> a > > >>>>>> > > >>>>>>> second split, say P2 becomes P2 and P3, then any key Kx belonging > > >>>>>>>> > > >>>>>>> to > > >>>> > > >>>>> P3 > > >>>>> > > >>>>>> will also have to be retracted from P2 *and* P1, since the > > >>>>>>>> > > >>>>>>> producer > > >>> > > >>>> can't > > >>>>>> > > >>>>>>> know whether Kx had been last written to P2 or P1. Over a long > > >>>>>>>> > > >>>>>>> period > > >>>> > > >>>>> of > > >>>>>> > > >>>>>>> time, this clearly becomes a issue, as the producer must send an > > >>>>>>>> > > >>>>>>> arbitrary > > >>>>>>> > > >>>>>>>> number of retractions along with every update. > > >>>>>>>> > > >>>>>>>> In contrast, the proposed backfill operation has an end, and > > >>>>>>>> > > >>>>>>> after > > >>> > > >>>> it > > >>>> > > >>>>> ends, > > >>>>>>> > > >>>>>>>> everyone can afford to forget that there ever was a different > > >>>>>>>> > > >>>>>>> partition > > >>>>> > > >>>>>> layout. > > >>>>>>>> > > >>>>>>>> Really, though, figuring out how to split compacted topics is > > >>>>>>>> > > >>>>>>> beyond > > >>>> > > >>>>> the > > >>>>>> > > >>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in > > >>>>>>>> > > >>>>>>> this > > >>>> > > >>>>> KIP... > > >>>>>>> > > >>>>>>>> We do need in-order delivery during partition expansion. It > would > > >>>>>>>> > > >>>>>>> be > > >>>> > > >>>>> fine > > >>>>>> > > >>>>>>> by me to say that you *cannot* expand partitions of a > > >>>>>>>> > > >>>>>>> log-compacted > > >>> > > >>>> topic > > >>>>>> > > >>>>>>> and call it a day. I think it would be better to tackle that in > > >>>>>>>> > > >>>>>>> another > > >>>>> > > >>>>>> KIP. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Point 2: > > >>>>>>>> Regarding whether the consumer re-shuffles its inputs, this is > > >>>>>>>> > > >>>>>>> always > > >>>> > > >>>>> on > > >>>>>> > > >>>>>>> the table; any consumer who wants to re-shuffle its input is free > > >>>>>>>> > > >>>>>>> to > > >>>> > > >>>>> do > > >>>>> > > >>>>>> so. > > >>>>>>> > > >>>>>>>> But this is currently not required. It's just that the current > > >>>>>>>> > > >>>>>>> high-level > > >>>>>> > > >>>>>>> story with Kafka encourages the use of partitions as a unit of > > >>>>>>>> > > >>>>>>> concurrency. > > >>>>>>> > > >>>>>>>> As long as consumers are single-threaded, they can happily > > >>>>>>>> > > >>>>>>> consume > > >>> > > >>>> a > > >>>> > > >>>>> single > > >>>>>>> > > >>>>>>>> partition without concurrency control of any kind. This is a key > > >>>>>>>> > > >>>>>>> aspect > > >>>>> > > >>>>>> to > > >>>>>>> > > >>>>>>>> this system that lets folks design high-throughput systems on > top > > >>>>>>>> > > >>>>>>> of > > >>>> > > >>>>> it > > >>>>> > > >>>>>> surprisingly easily. If all consumers were instead > > >>>>>>>> > > >>>>>>> encouraged/required > > >>>>> > > >>>>>> to > > >>>>>> > > >>>>>>> implement a repartition of their own, then the consumer becomes > > >>>>>>>> significantly more complex, requiring either the consumer to > > >>>>>>>> > > >>>>>>> first > > >>> > > >>>> produce > > >>>>>>> > > >>>>>>>> to its own intermediate repartition topic or to ensure that > > >>>>>>>> > > >>>>>>> consumer > > >>>> > > >>>>> threads have a reliable, high-bandwith channel of communication > > >>>>>>>> > > >>>>>>> with > > >>>> > > >>>>> every > > >>>>>>> > > >>>>>>>> other consumer thread. > > >>>>>>>> > > >>>>>>>> Either of those tradeoffs may be reasonable for a particular > user > > >>>>>>>> > > >>>>>>> of > > >>>> > > >>>>> Kafka, > > >>>>>>> > > >>>>>>>> but I don't know if we're in a position to say that they are > > >>>>>>>> > > >>>>>>> reasonable > > >>>>> > > >>>>>> for > > >>>>>>> > > >>>>>>>> *every* user of Kafka. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Point 3: > > >>>>>>>> Regarding Jun's point about this use case, "(3) stateful and > > >>>>>>>> > > >>>>>>> maintaining > > >>>>>> > > >>>>>>> the > > >>>>>>>> states in a local store", I agree that they may use a framework > > >>>>>>>> > > >>>>>>> *like* > > >>>>> > > >>>>>> Kafka Streams, but that is not the same as using Kafka Streams. > > >>>>>>>> > > >>>>>>> This > > >>>> > > >>>>> is > > >>>>> > > >>>>>> why > > >>>>>>> > > >>>>>>>> I think it's better to solve it in Core: because it is then > > >>>>>>>> > > >>>>>>> solved > > >>> > > >>>> for > > >>>>> > > >>>>>> KStreams and also for everything else that facilitates local > > >>>>>>>> > > >>>>>>> state > > >>> > > >>>> maintenance. To me, Streams is a member of the category of > > >>>>>>>> > > >>>>>>> "stream > > >>> > > >>>> processing frameworks", which is itself a subcategory of "things > > >>>>>>>> > > >>>>>>> requiring > > >>>>>>> > > >>>>>>>> local state maintenence". I'm not sure if it makes sense to > > >>>>>>> > > >>>>>>> > > > -- -- Guozhang