Thanks Jun! The time works for me.

On Thu, 5 Apr 2018 at 4:34 AM Jun Rao <j...@confluent.io> wrote:

> Hi, Jan, Dong, John, Guozhang,
>
> Perhaps it will be useful to have a KIP meeting to discuss this together as
> a group. Would Apr. 9 (Monday) at 9:00am PDT work? If so, I will send out
> an invite to the mailing list.
>
> Thanks,
>
> Jun
>
>
> On Wed, Apr 4, 2018 at 1:25 AM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
> > Want to quickly step in here again because it is going places again.
> >
> > The last part of the discussion is just a pain to read and completely
> > diverged from what I suggested without making the reasons clear to me.
> >
> > I don't know why this happens.... here are my comments anyway.
> >
> > @Guozhang: That Streams is working on automatic creating
> > copartition-usuable topics: great for streams, has literally nothing todo
> > with the KIP as we want to grow the
> > input topic. Everyone can reshuffle rel. easily but that is not what we
> > need todo, we need to grow the topic in question. After streams
> > automatically reshuffled the input topic
> > still has the same size and it didn't help a bit. I fail to see why this
> > is relevant. What am i missing here?
> >
> > @Dong
> > I am still on the position that the current proposal brings us into the
> > wrong direction. Especially introducing PartitionKeyRebalanceListener
> > From this point we can never move away to proper state full handling
> > without completely deprecating this creature from hell again.
> > Linear hashing is not the optimising step we have todo here. An interface
> > that when a topic is a topic its always the same even after it had
> > grown or shrunk is important. So from my POV I have major concerns that
> > this KIP is benefitial in its current state.
> >
> > What is it that makes everyone so addicted to the idea of linear hashing?
> > not attractive at all for me.
> > And with statefull consumers still a complete mess. Why not stick with
> the
> > Kappa architecture???
> >
> >
> >
> >
> >
> > On 03.04.2018 17:38, Dong Lin wrote:
> >
> >> Hey John,
> >>
> >> Thanks much for your comments!!
> >>
> >> I have yet to go through the emails of John/Jun/Guozhang in detail. But
> >> let
> >> me present my idea for how to minimize the delay for state loading for
> >> stream use-case.
> >>
> >> For ease of understanding, let's assume that the initial partition
> number
> >> of input topics and change log topic are both 10. And initial number of
> >> stream processor is also 10. If we only increase initial partition
> number
> >> of input topics to 15 without changing number of stream processor, the
> >> current KIP already guarantees in-order delivery and no state needs to
> be
> >> moved between consumers for stream use-case. Next, let's say we want to
> >> increase the number of processor to expand the processing capacity for
> >> stream use-case. This requires us to move state between processors which
> >> will take time. Our goal is to minimize the impact (i.e. delay) for
> >> processing while we increase the number of processors.
> >>
> >> Note that stream processor generally includes both consumer and
> producer.
> >> In addition to consume from the input topic, consumer may also need to
> >> consume from change log topic on startup for recovery. And producer may
> >> produce state to the change log topic.
> >>
> >>
> > The solution will include the following steps:
> >>
> >> 1) Increase partition number of the input topic from 10 to 15. Since the
> >> messages with the same key will still go to the same consume before and
> >> after the partition expansion, this step can be done without having to
> >> move
> >> state between processors.
> >>
> >> 2) Increase partition number of the change log topic from 10 to 15. Note
> >> that this step can also be done without impacting existing workflow.
> After
> >> we increase partition number of the change log topic, key space may
> split
> >> and some key will be produced to the newly-added partition. But the same
> >> key will still go to the same processor (i.e. consumer) before and after
> >> the partition. Thus this step can also be done without having to move
> >> state
> >> between processors.
> >>
> >> 3) Now, let's add 5 new consumers whose groupId is different from the
> >> existing processor's groupId. Thus these new consumers will not impact
> >> existing workflow. Each of these new consumers should consume two
> >> partitions from the earliest offset, where these two partitions are the
> >> same partitions that will be consumed if the consumers have the same
> >> groupId as the existing processor's groupId. For example, the first of
> the
> >> five consumers will consume partition 0 and partition 10. The purpose of
> >> these consumers is to rebuild the state (e.g. RocksDB) for the
> processors
> >> in advance. Also note that, by design of the current KIP, each consume
> >> will
> >> consume the existing partition of the change log topic up to the offset
> >> before the partition expansion. Then they will only need to consume the
> >> state of the new partition of the change log topic.
> >>
> >> 4) After consumers have caught up in step 3), we should stop these
> >> consumers and add 5 new processors to the stream processing job. These 5
> >> new processors should run in the same location as the previous 5
> consumers
> >> to re-use the state (e.g. RocksDB). And these processors' consumers
> should
> >> consume partitions of the change log topic from the committed offset the
> >> previous 5 consumers so that no state is missed.
> >>
> >> One important trick to note here is that, the mapping from partition to
> >> consumer should also use linear hashing. And we need to remember the
> >> initial number of processors in the job, 10 in this example, and use
> this
> >> number in the linear hashing algorithm. This is pretty much the same as
> >> how
> >> we use linear hashing to map key to partition. In this case, we get an
> >> identity map from partition -> processor, for both input topic and the
> >> change log topic. For example, processor 12 will consume partition 12 of
> >> the input topic and produce state to the partition 12 of the change log
> >> topic.
> >>
> >> There are a few important properties of this solution to note:
> >>
> >> - We can increase the number of partitions for input topic and the
> change
> >> log topic in any order asynchronously.
> >> - The expansion of the processors in a given job in step 4) only
> requires
> >> the step 3) for the same job. It does not require coordination across
> >> different jobs for step 3) and 4). Thus different jobs can independently
> >> expand there capacity without waiting for each other.
> >> - The logic for 1) and 2) is already supported in the current KIP. The
> >> logic for 3) and 4) appears to be independent of the core Kafka logic
> and
> >> can be implemented separately outside core Kafka. Thus the current KIP
> is
> >> probably sufficient after we agree on the efficiency and the correctness
> >> of
> >> the solution. We can have a separate KIP for Kafka Stream to support 3)
> >> and
> >> 4).
> >>
> >>
> >> Cheers,
> >> Dong
> >>
> >>
> >> On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> Hey guys, just sharing my two cents here (I promise it will be shorter
> >>> than
> >>> John's article :).
> >>>
> >>> 0. Just to quickly recap, the main discussion point now is how to
> support
> >>> "key partitioning preservation" (John's #4 in topic characteristics
> >>> above)
> >>> beyond the "single-key ordering preservation" that KIP-253 was
> originally
> >>> proposed to maintain (John's #6 above).
> >>>
> >>> 1. From the streams project, we are actively working on improving the
> >>> elastic scalability of the library. One of the key features is to
> >>> decouple
> >>> the input topics from the parallelism model of Streams: i.e. not
> >>> enforcing
> >>> the topic to be partitioned by the key, not enforcing joining topics to
> >>> be
> >>> co-partitioned, not relying the number of parallel tasks on the input
> >>> topic
> >>> partitions. This can be achieved by re-shuffling on the input topics to
> >>> make sure key-partitioning / co-partitioning on the internal topics.
> Note
> >>> the re-shuffling task is purely stateless and hence does not require
> "key
> >>> partitioning preservation". Operational-wise it is similar to the
> >>> "creating
> >>> a new topic with new number of partitions, pipe the data to the new
> topic
> >>> and cut over consumers from old topics" idea, just that users can
> >>> optionally let Streams to handle such rather than doing it manually
> >>> themselves. There are a few more details on that regard but I will skip
> >>> since they are not directly related to this discussion.
> >>>
> >>> 2. Assuming that 1) above is done, then the only topics involved in the
> >>> scaling events are all input topics. For these topics the only
> producers
> >>> /
> >>> consumers of these topics are controlled by Streams clients themselves,
> >>> and
> >>> hence achieving "key partitioning preservation" is simpler than
> >>> non-Streams
> >>> scenarios: consumers know the partitioning scheme that producers are
> >>> using,
> >>> so that for their stateful operations it is doable to split the local
> >>> state
> >>> stores accordingly or execute backfilling on its own. Of course, if we
> >>> decide to do server-side backfilling, it can still help Streams to
> >>> directly
> >>> rely on that functionality.
> >>>
> >>> 3. As John mentioned, another way inside Streams is to do
> >>> over-partitioning
> >>> on all internal topics; then with 1) Streams would not rely on KIP-253
> at
> >>> all. But personally I'd like to avoid it if possible to reduce Kafka
> side
> >>> footprint: say we overpartition each input topic up to 1k, with a
> >>> reasonable sized stateful topology it can still contribute to tens of
> >>> thousands of topics to the topic partition capacity of a single
> cluster.
> >>>
> >>> 4. Summing up 1/2/3, I think we should focus more on non-Streams users
> >>> writing their stateful computations with local states, and think
> whether
> >>> /
> >>> how we could enable "key partitioning preservation" for them easily,
> than
> >>> to think heavily for Streams library. People may have different opinion
> >>> on
> >>> how common of a usage pattern it is (I think Jun might be suggesting
> that
> >>> for DIY apps people may more likely use remote states so that it is
> not a
> >>> problem for them). My opinion is that for non-Streams users such usage
> >>> pattern could still be large (think: if you are piping data from Kafka
> to
> >>> an external data storage which has single-writer requirements for each
> >>> single shard, even though it is not a stateful computational
> application
> >>> it
> >>> may still require "key partitioning preservation"), so I prefer to have
> >>> backfilling in our KIP than only exposing the API for expansion and
> >>> requires consumers to have pre-knowledge of the producer's partitioning
> >>> scheme.
> >>>
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io>
> wrote:
> >>>
> >>> Hey Dong,
> >>>>
> >>>> Congrats on becoming a committer!!!
> >>>>
> >>>> Since I just sent a novel-length email, I'll try and keep this one
> brief
> >>>>
> >>> ;)
> >>>
> >>>> Regarding producer coordination, I'll grant that in that case,
> producers
> >>>> may coordinate among themselves to produce into the same topic or to
> >>>> produce co-partitioned topics. Nothing in KStreams or the Kafka
> >>>> ecosystem
> >>>> in general requires such coordination for correctness or in fact for
> any
> >>>> optional features, though, so I would not say that we require producer
> >>>> coordination of partition logic. If producers currently coordinate,
> it's
> >>>> completely optional and their own choice.
> >>>>
> >>>> Regarding the portability of partition algorithms, my observation is
> >>>> that
> >>>> systems requiring independent implementations of the same algorithm
> with
> >>>> 100% correctness are a large source of risk and also a burden on those
> >>>>
> >>> who
> >>>
> >>>> have to maintain them. If people could flawlessly implement algorithms
> >>>> in
> >>>> actual software, the world would be a wonderful place indeed! For a
> >>>>
> >>> system
> >>>
> >>>> as important and widespread as Kafka, I would recommend restricting
> >>>> limiting such requirements as aggressively as possible.
> >>>>
> >>>> I'd agree that we can always revisit decisions like allowing arbitrary
> >>>> partition functions, but of course, we shouldn't do that in a vacuum.
> >>>>
> >>> That
> >>>
> >>>> feels like the kind of thing we'd need to proactively seek guidance
> from
> >>>> the users list about. I do think that the general approach of saying
> >>>> that
> >>>> "if you use a custom partitioner, you cannot do partition expansion"
> is
> >>>> very reasonable (but I don't think we need to go that far with the
> >>>>
> >>> current
> >>>
> >>>> proposal). It's similar to my statement in my email to Jun that in
> >>>> principle KStreams doesn't *need* backfill, we only need it if we want
> >>>> to
> >>>> employ partition expansion.
> >>>>
> >>>> I reckon that the main motivation for backfill is to support KStreams
> >>>> use
> >>>> cases and also any other use cases involving stateful consumers.
> >>>>
> >>>> Thanks for your response, and congrats again!
> >>>> -John
> >>>>
> >>>>
> >>>> On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> >>>>
> >>>> Hey John,
> >>>>>
> >>>>> Great! Thanks for all the comment. It seems that we agree that the
> >>>>>
> >>>> current
> >>>>
> >>>>> KIP is in good shape for core Kafka. IMO, what we have been
> discussing
> >>>>>
> >>>> in
> >>>
> >>>> the recent email exchanges is mostly about the second step, i.e. how
> to
> >>>>> address problem for the stream use-case (or stateful processing in
> >>>>> general).
> >>>>>
> >>>>> I will comment inline.
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io>
> >>>>>
> >>>> wrote:
> >>>
> >>>> Thanks for the response, Dong.
> >>>>>>
> >>>>>> Here are my answers to your questions:
> >>>>>>
> >>>>>> - "Asking producers and consumers, or even two different producers,
> >>>>>>
> >>>>> to
> >>>
> >>>> share code like the partition function is a pretty huge ask. What
> >>>>>>>
> >>>>>> if
> >>>
> >>>> they
> >>>>>
> >>>>>> are using different languages?". It seems that today we already
> >>>>>>>
> >>>>>> require
> >>>>
> >>>>> different producer's to use the same hash function -- otherwise
> >>>>>>>
> >>>>>> messages
> >>>>>
> >>>>>> with the same key will go to different partitions of the same topic
> >>>>>>>
> >>>>>> which
> >>>>>
> >>>>>> may cause problem for downstream consumption. So not sure if it
> >>>>>>>
> >>>>>> adds
> >>>
> >>>> any
> >>>>>
> >>>>>> more constraint by assuming consumers know the hash function of
> >>>>>>>
> >>>>>> producer.
> >>>>>
> >>>>>> Could you explain more why user would want to use a cusmtom
> >>>>>>>
> >>>>>> partition
> >>>
> >>>> function? Maybe we can check if this is something that can be
> >>>>>>>
> >>>>>> supported
> >>>>
> >>>>> in
> >>>>>>
> >>>>>>> the default Kafka hash function. Also, can you explain more why it
> >>>>>>>
> >>>>>> is
> >>>
> >>>> difficuilt to implement the same hash function in different
> >>>>>>>
> >>>>>> languages?
> >>>>
> >>>>>
> >>>>>> Sorry, I meant two different producers as in producers to two
> >>>>>>
> >>>>> different
> >>>
> >>>> topics. This was in response to the suggestion that we already
> >>>>>>
> >>>>> require
> >>>
> >>>> coordination among producers to different topics in order to achieve
> >>>>>> co-partitioning. I was saying that we do not (and should not).
> >>>>>>
> >>>>>
> >>>>> It is probably common for producers of different team to produce
> >>>>>
> >>>> message
> >>>
> >>>> to
> >>>>
> >>>>> the same topic. In order to ensure that messages with the same key go
> >>>>>
> >>>> to
> >>>
> >>>> same partition, we need producers of different team to share the same
> >>>>> partition algorithm, which by definition requires coordination among
> >>>>> producers of different teams in an organization. Even for producers
> of
> >>>>> different topics, it may be common to require producers to use the
> same
> >>>>> partition algorithm in order to join two topics for stream
> processing.
> >>>>>
> >>>> Does
> >>>>
> >>>>> this make it reasonable to say we already require coordination across
> >>>>> producers?
> >>>>>
> >>>>>
> >>>>> By design, consumers are currently ignorant of the partitioning
> >>>>>>
> >>>>> scheme.
> >>>
> >>>> It
> >>>>>
> >>>>>> suffices to trust that the producer has partitioned the topic by
> key,
> >>>>>>
> >>>>> if
> >>>>
> >>>>> they claim to have done so. If you don't trust that, or even if you
> >>>>>>
> >>>>> just
> >>>>
> >>>>> need some other partitioning scheme, then you must re-partition it
> >>>>>> yourself. Nothing we're discussing can or should change that. The
> >>>>>>
> >>>>> value
> >>>
> >>>> of
> >>>>>
> >>>>>> backfill is that it preserves the ability for consumers to avoid
> >>>>>> re-partitioning before consuming, in the case where they don't need
> >>>>>>
> >>>>> to
> >>>
> >>>> today.
> >>>>>>
> >>>>>
> >>>>> Regarding shared "hash functions", note that it's a bit inaccurate to
> >>>>>>
> >>>>> talk
> >>>>>
> >>>>>> about the "hash function" of the producer. Properly speaking, the
> >>>>>>
> >>>>> producer
> >>>>>
> >>>>>> has only a "partition function". We do not know that it is a hash.
> >>>>>>
> >>>>> The
> >>>
> >>>> producer can use any method at their disposal to assign a partition
> >>>>>>
> >>>>> to
> >>>
> >>>> a
> >>>>
> >>>>> record. The partition function obviously may we written in any
> >>>>>>
> >>>>> programming
> >>>>>
> >>>>>> language, so in general it's not something that can be shared around
> >>>>>> without a formal spec or the ability to execute arbitrary
> executables
> >>>>>>
> >>>>> in
> >>>>
> >>>>> arbitrary runtime environments.
> >>>>>>
> >>>>>> Yeah it is probably better to say partition algorithm. I guess it
> >>>>>
> >>>> should
> >>>
> >>>> not be difficult to implement same partition algorithms in different
> >>>>> languages, right? Yes we would need a formal specification of the
> >>>>>
> >>>> default
> >>>
> >>>> partition algorithm in the producer. I think that can be documented as
> >>>>>
> >>>> part
> >>>>
> >>>>> of the producer interface.
> >>>>>
> >>>>>
> >>>>> Why would a producer want a custom partition function? I don't
> >>>>>>
> >>>>> know...
> >>>
> >>>> why
> >>>>>
> >>>>>> did we design the interface so that our users can provide one? In
> >>>>>>
> >>>>> general,
> >>>>>
> >>>>>> such systems provide custom partitioners because some data sets may
> >>>>>>
> >>>>> be
> >>>
> >>>> unbalanced under the default or because they can provide some
> >>>>>>
> >>>>> interesting
> >>>>
> >>>>> functionality built on top of the partitioning scheme, etc. Having
> >>>>>>
> >>>>> provided
> >>>>>
> >>>>>> this ability, I don't know why we would remove it.
> >>>>>>
> >>>>>> Yeah it is reasonable to assume that there was reason to support
> >>>>> custom
> >>>>> partition function in producer. On the other hand it may also be
> >>>>>
> >>>> reasonable
> >>>>
> >>>>> to revisit this interface and discuss whether we actually need to
> >>>>>
> >>>> support
> >>>
> >>>> custom partition function. If we don't have a good reason, we can
> >>>>>
> >>>> choose
> >>>
> >>>> not to support custom partition function in this KIP in a backward
> >>>>> compatible manner, i.e. user can still use custom partition function
> >>>>>
> >>>> but
> >>>
> >>>> they would not get the benefit of in-order delivery when there is
> >>>>>
> >>>> partition
> >>>>
> >>>>> expansion. What do you think?
> >>>>>
> >>>>>
> >>>>> - Besides the assumption that consumer needs to share the hash
> >>>>>>
> >>>>> function
> >>>
> >>>> of
> >>>>>
> >>>>>> producer, is there other organization overhead of the proposal in
> >>>>>>>
> >>>>>> the
> >>>
> >>>> current KIP?
> >>>>>>>
> >>>>>>> It wasn't clear to me that KIP-253 currently required the producer
> >>>>>>
> >>>>> and
> >>>
> >>>> consumer to share the partition function, or in fact that it had a
> >>>>>>
> >>>>> hard
> >>>
> >>>> requirement to abandon the general partition function and use a
> >>>>>>
> >>>>> linear
> >>>
> >>>> hash
> >>>>>
> >>>>>> function instead.
> >>>>>>
> >>>>>
> >>>>> In my reading, there is a requirement to track the metadata about
> >>>>>>
> >>>>> what
> >>>
> >>>> partitions split into what other partitions during an expansion
> >>>>>>
> >>>>> operation.
> >>>>>
> >>>>>> If the partition function is linear, this is easy. If not, you can
> >>>>>>
> >>>>> always
> >>>>
> >>>>> just record that all old partitions split into all new partitions.
> >>>>>>
> >>>>> This
> >>>
> >>>> has
> >>>>>
> >>>>>> the effect of forcing all consumers to wait until the old epoch is
> >>>>>> completely consumed before starting on the new epoch. But this may
> >>>>>>
> >>>>> be a
> >>>
> >>>> reasonable tradeoff, and it doesn't otherwise alter your design.
> >>>>>>
> >>>>>> You only mention the consumer needing to know that the partition
> >>>>>>
> >>>>> function
> >>>>
> >>>>> is linear, not what the actual function is, so I don't think your
> >>>>>>
> >>>>> design
> >>>>
> >>>>> actually calls for sharing the function. Plus, really all the
> >>>>>>
> >>>>> consumer
> >>>
> >>>> needs is the metadata about what old-epoch partitions to wait for
> >>>>>>
> >>>>> before
> >>>>
> >>>>> consuming a new-epoch partition. This information is directly
> >>>>>>
> >>>>> captured
> >>>
> >>>> in
> >>>>
> >>>>> metadata, so I don't think it actually even cares whether the
> >>>>>>
> >>>>> partition
> >>>
> >>>> function is linear or not.
> >>>>>>
> >>>>>> You are right that the current KIP does not mention it. My comment
> >>>>>
> >>>> related
> >>>>
> >>>>> to the partition function coordination was related to support the
> >>>>> stream-use case which we have been discussing so far.
> >>>>>
> >>>>>
> >>>>> So, no, I really think KIP-253 is in good shape. I was really more
> >>>>>>
> >>>>> talking
> >>>>>
> >>>>>> about the part of this thread that's outside of KIP-253's scope,
> >>>>>>
> >>>>> namely,
> >>>>
> >>>>> creating the possibility of backfilling partitions after expansion.
> >>>>>>
> >>>>>> Great! Can you also confirm that the main motivation for backfilling
> >>>>> partitions after expansion is to support the stream use-case?
> >>>>>
> >>>>>
> >>>>> - Currently producer can forget about the message that has been
> >>>>>>
> >>>>>>> acknowledged by the broker. Thus the producer probably does not
> >>>>>>>
> >>>>>> know
> >>>
> >>>> most
> >>>>>
> >>>>>> of the exiting messages in topic, including those messages produced
> >>>>>>>
> >>>>>> by
> >>>>
> >>>>> other producers. We can have the owner of the producer to
> >>>>>>>
> >>>>>> split+backfill.
> >>>>>
> >>>>>> In my opion it will be a new program that wraps around the existing
> >>>>>>> producer and consumer classes.
> >>>>>>>
> >>>>>>> This sounds fine by me!
> >>>>>>
> >>>>>> Really, I was just emphasizing that the part of the organization
> that
> >>>>>> produces a topic shouldn't have to export their partition function
> to
> >>>>>>
> >>>>> the
> >>>>
> >>>>> part(s) of the organization (or other organizations) that consume the
> >>>>>> topic. Whether the backfill operation goes into the Producer
> >>>>>>
> >>>>> interface
> >>>
> >>>> is
> >>>>
> >>>>> secondary, I think.
> >>>>>>
> >>>>>> - Regarding point 5. The argument is in favor of the split+backfill
> >>>>>>
> >>>>> but
> >>>
> >>>> for
> >>>>>
> >>>>>> changelog topic. And it intends to address the problem for stream
> >>>>>>>
> >>>>>> use-case
> >>>>>>
> >>>>>>> in general. In this KIP we will provide interface (i.e.
> >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream
> >>>>>>>
> >>>>>> use-case
> >>>>
> >>>>> and
> >>>>>>
> >>>>>>> the goal is that user can flush/re-consume the state as part of the
> >>>>>>> interface implementation regardless of whether there is change log
> >>>>>>>
> >>>>>> topic.
> >>>>>
> >>>>>> Maybe you are suggesting that the main reason to do split+backfill
> >>>>>>>
> >>>>>> of
> >>>
> >>>> input
> >>>>>>
> >>>>>>> topic is to support log compacted topics? You mentioned in Point 1
> >>>>>>>
> >>>>>> that
> >>>>
> >>>>> log
> >>>>>>
> >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could
> >>>>>>>
> >>>>>> understand
> >>>>>>
> >>>>>>> your position better. Regarding Jan's proposal to split partitions
> >>>>>>>
> >>>>>> with
> >>>>
> >>>>> backfill, do you think this should replace the proposal in the
> >>>>>>>
> >>>>>> existing
> >>>>
> >>>>> KIP, or do you think this is something that we should do in
> >>>>>>>
> >>>>>> addition
> >>>
> >>>> to
> >>>>
> >>>>> the
> >>>>>>
> >>>>>>> existing KIP?
> >>>>>>>
> >>>>>>> I think that interface is a good/necessary component of KIP-253.
> >>>>>>
> >>>>>> I personally (FWIW) feel that KIP-253 is appropriately scoped, but I
> >>>>>>
> >>>>> do
> >>>
> >>>> think its utility will be limited unless there is a later KIP
> >>>>>>
> >>>>> offering
> >>>
> >>>> backfill. But, maybe unlike Jan, I think it makes sense to try and
> >>>>>>
> >>>>> tackle
> >>>>
> >>>>> the ordering problem independently of backfill, so I'm in support of
> >>>>>>
> >>>>> the
> >>>>
> >>>>> current KIP.
> >>>>>>
> >>>>>> - Regarding point 6. I guess we can agree that it is better not to
> >>>>>>
> >>>>> have
> >>>
> >>>> the
> >>>>>
> >>>>>> performance overhread of copying the input data. Before we discuss
> >>>>>>>
> >>>>>> more
> >>>>
> >>>>> on
> >>>>>>
> >>>>>>> whether the performance overhead is acceptable or not, I am trying
> >>>>>>>
> >>>>>> to
> >>>
> >>>> figure out what is the benefit of introducing this overhread. You
> >>>>>>>
> >>>>>> mentioned
> >>>>>>
> >>>>>>> that the benefit is the loose organizational coupling. By
> >>>>>>>
> >>>>>> "organizational
> >>>>>
> >>>>>> coupling", are you referring to the requirement that consumer needs
> >>>>>>>
> >>>>>> to
> >>>>
> >>>>> know
> >>>>>>
> >>>>>>> the hash function of producer? If so, maybe we can discuss the
> >>>>>>>
> >>>>>> use-case
> >>>>
> >>>>> of
> >>>>>>
> >>>>>>> custom partiton function and see whether we can find a way to
> >>>>>>>
> >>>>>> support
> >>>
> >>>> such
> >>>>>>
> >>>>>>> use-case without having to copy the input data.
> >>>>>>>
> >>>>>>> I'm not too sure about what an "input" is in this sense, since we
> are
> >>>>>>
> >>>>> just
> >>>>>
> >>>>>> talking about topics. Actually the point I was making there is that
> >>>>>>
> >>>>> AKAICT
> >>>>>
> >>>>>> the performance overhead of a backfill is less than any other
> option,
> >>>>>> assuming you split partitions rarely.
> >>>>>>
> >>>>>> By "input" I was referring to source Kafka topic of a stream
> >>>>> processing
> >>>>> job.
> >>>>>
> >>>>>
> >>>>> Separately, yes, "organizational coupling" increases if producers and
> >>>>>> consumers have to share code, such as the partition function. This
> >>>>>>
> >>>>> would
> >>>>
> >>>>> not be the case if producers could only pick from a menu of a few
> >>>>>> well-known partition functions, but I think this is a poor tradeoff.
> >>>>>>
> >>>>>> Maybe we can revisit the custom partition function and see whether
> we
> >>>>> actually need it? Otherwise, I am concerned that every user will pay
> >>>>>
> >>>> the
> >>>
> >>>> overhead of data movement to support something that was not really
> >>>>>
> >>>> needed
> >>>
> >>>> for most users.
> >>>>>
> >>>>>
> >>>>> To me, this is two strong arguments in favor of backfill being less
> >>>>>> expensive than no backfill, but again, I think that particular
> debate
> >>>>>>
> >>>>> comes
> >>>>>
> >>>>>> after KIP-253, so I don't want to create the impression of
> opposition
> >>>>>>
> >>>>> to
> >>>>
> >>>>> your proposal.
> >>>>>>
> >>>>>>
> >>>>>> Finally, to respond to a new email I just noticed:
> >>>>>>
> >>>>>> BTW, here is my understanding of the scope of this KIP. We want to
> >>>>>>>
> >>>>>> allow
> >>>>>
> >>>>>> consumers to always consume messages with the same key from the
> >>>>>>>
> >>>>>> same
> >>>
> >>>> producer in the order they are produced. And we need to provide a
> >>>>>>>
> >>>>>> way
> >>>
> >>>> for
> >>>>>
> >>>>>> stream use-case to be able to flush/load state when messages with
> >>>>>>>
> >>>>>> the
> >>>
> >>>> same
> >>>>>>
> >>>>>>> key are migrated between consumers. In addition to ensuring that
> >>>>>>>
> >>>>>> this
> >>>
> >>>> goal
> >>>>>>
> >>>>>>> is correctly supported, we should do our best to keep the
> >>>>>>>
> >>>>>> performance
> >>>
> >>>> and
> >>>>>
> >>>>>> organization overhead of this KIP as low as possible.
> >>>>>>>
> >>>>>>> I think we're on the same page there! In fact, I would generalize a
> >>>>>>
> >>>>> little
> >>>>>
> >>>>>> more and say that the mechanism you've designed provides *all
> >>>>>>
> >>>>> consumers*
> >>>>
> >>>>> the ability "to flush/load state when messages with the same key are
> >>>>>> migrated between consumers", not just Streams.
> >>>>>>
> >>>>>> Thanks for all the comment!
> >>>>>
> >>>>>
> >>>>> Thanks for the discussion,
> >>>>>> -John
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>
> >>>>>>
> >>>>> wrote:
> >>>
> >>>> Hey John,
> >>>>>>>
> >>>>>>> Thanks much for the detailed comments. Here are my thoughts:
> >>>>>>>
> >>>>>>> - The need to delete messages from log compacted topics is mainly
> >>>>>>>
> >>>>>> for
> >>>
> >>>> performance (e.g. storage space) optimization than for correctness
> >>>>>>>
> >>>>>> for
> >>>>
> >>>>> this
> >>>>>>
> >>>>>>> KIP. I agree that we probably don't need to focus on this in our
> >>>>>>>
> >>>>>> discussion
> >>>>>>
> >>>>>>> since it is mostly for performance optimization.
> >>>>>>>
> >>>>>>> - "Asking producers and consumers, or even two different producers,
> >>>>>>>
> >>>>>> to
> >>>>
> >>>>> share code like the partition function is a pretty huge ask. What
> >>>>>>>
> >>>>>> if
> >>>
> >>>> they
> >>>>>
> >>>>>> are using different languages?". It seems that today we already
> >>>>>>>
> >>>>>> require
> >>>>
> >>>>> different producer's to use the same hash function -- otherwise
> >>>>>>>
> >>>>>> messages
> >>>>>
> >>>>>> with the same key will go to different partitions of the same topic
> >>>>>>>
> >>>>>> which
> >>>>>
> >>>>>> may cause problem for downstream consumption. So not sure if it
> >>>>>>>
> >>>>>> adds
> >>>
> >>>> any
> >>>>>
> >>>>>> more constraint by assuming consumers know the hash function of
> >>>>>>>
> >>>>>> producer.
> >>>>>
> >>>>>> Could you explain more why user would want to use a cusmtom
> >>>>>>>
> >>>>>> partition
> >>>
> >>>> function? Maybe we can check if this is something that can be
> >>>>>>>
> >>>>>> supported
> >>>>
> >>>>> in
> >>>>>>
> >>>>>>> the default Kafka hash function. Also, can you explain more why it
> >>>>>>>
> >>>>>> is
> >>>
> >>>> difficuilt to implement the same hash function in different
> >>>>>>>
> >>>>>> languages?
> >>>>
> >>>>> - Besides the assumption that consumer needs to share the hash
> >>>>>>>
> >>>>>> function
> >>>>
> >>>>> of
> >>>>>>
> >>>>>>> producer, is there other organization overhead of the proposal in
> >>>>>>>
> >>>>>> the
> >>>
> >>>> current KIP?
> >>>>>>>
> >>>>>>> - Currently producer can forget about the message that has been
> >>>>>>> acknowledged by the broker. Thus the producer probably does not
> >>>>>>>
> >>>>>> know
> >>>
> >>>> most
> >>>>>
> >>>>>> of the exiting messages in topic, including those messages produced
> >>>>>>>
> >>>>>> by
> >>>>
> >>>>> other producers. We can have the owner of the producer to
> >>>>>>>
> >>>>>> split+backfill.
> >>>>>
> >>>>>> In my opion it will be a new program that wraps around the existing
> >>>>>>> producer and consumer classes.
> >>>>>>>
> >>>>>>> - Regarding point 5. The argument is in favor of the split+backfill
> >>>>>>>
> >>>>>> but
> >>>>
> >>>>> for
> >>>>>>
> >>>>>>> changelog topic. And it intends to address the problem for stream
> >>>>>>>
> >>>>>> use-case
> >>>>>>
> >>>>>>> in general. In this KIP we will provide interface (i.e.
> >>>>>>> PartitionKeyRebalanceListener in the KIP) to be used by sream
> >>>>>>>
> >>>>>> use-case
> >>>>
> >>>>> and
> >>>>>>
> >>>>>>> the goal is that user can flush/re-consume the state as part of the
> >>>>>>> interface implementation regardless of whether there is change log
> >>>>>>>
> >>>>>> topic.
> >>>>>
> >>>>>> Maybe you are suggesting that the main reason to do split+backfill
> >>>>>>>
> >>>>>> of
> >>>
> >>>> input
> >>>>>>
> >>>>>>> topic is to support log compacted topics? You mentioned in Point 1
> >>>>>>>
> >>>>>> that
> >>>>
> >>>>> log
> >>>>>>
> >>>>>>> compacted topics is out of the scope of this KIP. Maybe I could
> >>>>>>>
> >>>>>> understand
> >>>>>>
> >>>>>>> your position better. Regarding Jan's proposal to split partitions
> >>>>>>>
> >>>>>> with
> >>>>
> >>>>> backfill, do you think this should replace the proposal in the
> >>>>>>>
> >>>>>> existing
> >>>>
> >>>>> KIP, or do you think this is something that we should do in
> >>>>>>>
> >>>>>> addition
> >>>
> >>>> to
> >>>>
> >>>>> the
> >>>>>>
> >>>>>>> existing KIP?
> >>>>>>>
> >>>>>>> - Regarding point 6. I guess we can agree that it is better not to
> >>>>>>>
> >>>>>> have
> >>>>
> >>>>> the
> >>>>>>
> >>>>>>> performance overhread of copying the input data. Before we discuss
> >>>>>>>
> >>>>>> more
> >>>>
> >>>>> on
> >>>>>>
> >>>>>>> whether the performance overhead is acceptable or not, I am trying
> >>>>>>>
> >>>>>> to
> >>>
> >>>> figure out what is the benefit of introducing this overhread. You
> >>>>>>>
> >>>>>> mentioned
> >>>>>>
> >>>>>>> that the benefit is the loose organizational coupling. By
> >>>>>>>
> >>>>>> "organizational
> >>>>>
> >>>>>> coupling", are you referring to the requirement that consumer needs
> >>>>>>>
> >>>>>> to
> >>>>
> >>>>> know
> >>>>>>
> >>>>>>> the hash function of producer? If so, maybe we can discuss the
> >>>>>>>
> >>>>>> use-case
> >>>>
> >>>>> of
> >>>>>>
> >>>>>>> custom partiton function and see whether we can find a way to
> >>>>>>>
> >>>>>> support
> >>>
> >>>> such
> >>>>>>
> >>>>>>> use-case without having to copy the input data.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Dong
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io>
> >>>>>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hey Dong and Jun,
> >>>>>>>>
> >>>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll mix
> >>>>>>>>
> >>>>>>> my
> >>>
> >>>> replies
> >>>>>>>
> >>>>>>>> together to try for a coherent response. I'm not too familiar
> >>>>>>>>
> >>>>>>> with
> >>>
> >>>> mailing-list etiquette, though.
> >>>>>>>>
> >>>>>>>> I'm going to keep numbering my points because it makes it easy
> >>>>>>>>
> >>>>>>> for
> >>>
> >>>> you
> >>>>>
> >>>>>> all
> >>>>>>>
> >>>>>>>> to respond.
> >>>>>>>>
> >>>>>>>> Point 1:
> >>>>>>>> As I read it, KIP-253 is *just* about properly fencing the
> >>>>>>>>
> >>>>>>> producers
> >>>>
> >>>>> and
> >>>>>>
> >>>>>>> consumers so that you preserve the correct ordering of records
> >>>>>>>>
> >>>>>>> during
> >>>>
> >>>>> partition expansion. This is clearly necessary regardless of
> >>>>>>>>
> >>>>>>> anything
> >>>>
> >>>>> else
> >>>>>>>
> >>>>>>>> we discuss. I think this whole discussion about backfill,
> >>>>>>>>
> >>>>>>> consumers,
> >>>>
> >>>>> streams, etc., is beyond the scope of KIP-253. But it would be
> >>>>>>>>
> >>>>>>> cumbersome
> >>>>>>
> >>>>>>> to start a new thread at this point.
> >>>>>>>>
> >>>>>>>> I had missed KIP-253's Proposed Change #9 among all the
> >>>>>>>>
> >>>>>>> details...
> >>>
> >>>> I
> >>>>
> >>>>> think
> >>>>>>>
> >>>>>>>> this is a nice addition to the proposal. One thought is that it's
> >>>>>>>>
> >>>>>>> actually
> >>>>>>>
> >>>>>>>> irrelevant whether the hash function is linear. This is simply an
> >>>>>>>>
> >>>>>>> algorithm
> >>>>>>>
> >>>>>>>> for moving a key from one partition to another, so the type of
> >>>>>>>>
> >>>>>>> hash
> >>>
> >>>> function need not be a precondition. In fact, it also doesn't
> >>>>>>>>
> >>>>>>> matter
> >>>>
> >>>>> whether the topic is compacted or not, the algorithm works
> >>>>>>>>
> >>>>>>> regardless.
> >>>>>
> >>>>>> I think this is a good algorithm to keep in mind, as it might
> >>>>>>>>
> >>>>>>> solve a
> >>>>
> >>>>> variety of problems, but it does have a downside: that the
> >>>>>>>>
> >>>>>>> producer
> >>>
> >>>> won't
> >>>>>>
> >>>>>>> know whether or not K1 was actually in P1, it just knows that K1
> >>>>>>>>
> >>>>>>> was
> >>>>
> >>>>> in
> >>>>>
> >>>>>> P1's keyspace before the new epoch. Therefore, it will have to
> >>>>>>>> pessimistically send (K1,null) to P1 just in case. But the next
> >>>>>>>>
> >>>>>>> time
> >>>>
> >>>>> K1
> >>>>>
> >>>>>> comes along, the producer *also* won't remember that it already
> >>>>>>>>
> >>>>>>> retracted
> >>>>>>
> >>>>>>> K1 from P1, so it will have to send (K1,null) *again*. By
> >>>>>>>>
> >>>>>>> extension,
> >>>>
> >>>>> every
> >>>>>>>
> >>>>>>>> time the producer sends to P2, it will also have to send a
> >>>>>>>>
> >>>>>>> tombstone
> >>>>
> >>>>> to
> >>>>>
> >>>>>> P1,
> >>>>>>>
> >>>>>>>> which is a pretty big burden. To make the situation worse, if
> >>>>>>>>
> >>>>>>> there
> >>>
> >>>> is
> >>>>>
> >>>>>> a
> >>>>>>
> >>>>>>> second split, say P2 becomes P2 and P3, then any key Kx belonging
> >>>>>>>>
> >>>>>>> to
> >>>>
> >>>>> P3
> >>>>>
> >>>>>> will also have to be retracted from P2 *and* P1, since the
> >>>>>>>>
> >>>>>>> producer
> >>>
> >>>> can't
> >>>>>>
> >>>>>>> know whether Kx had been last written to P2 or P1. Over a long
> >>>>>>>>
> >>>>>>> period
> >>>>
> >>>>> of
> >>>>>>
> >>>>>>> time, this clearly becomes a issue, as the producer must send an
> >>>>>>>>
> >>>>>>> arbitrary
> >>>>>>>
> >>>>>>>> number of retractions along with every update.
> >>>>>>>>
> >>>>>>>> In contrast, the proposed backfill operation has an end, and
> >>>>>>>>
> >>>>>>> after
> >>>
> >>>> it
> >>>>
> >>>>> ends,
> >>>>>>>
> >>>>>>>> everyone can afford to forget that there ever was a different
> >>>>>>>>
> >>>>>>> partition
> >>>>>
> >>>>>> layout.
> >>>>>>>>
> >>>>>>>> Really, though, figuring out how to split compacted topics is
> >>>>>>>>
> >>>>>>> beyond
> >>>>
> >>>>> the
> >>>>>>
> >>>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in
> >>>>>>>>
> >>>>>>> this
> >>>>
> >>>>> KIP...
> >>>>>>>
> >>>>>>>> We do need in-order delivery during partition expansion. It would
> >>>>>>>>
> >>>>>>> be
> >>>>
> >>>>> fine
> >>>>>>
> >>>>>>> by me to say that you *cannot* expand partitions of a
> >>>>>>>>
> >>>>>>> log-compacted
> >>>
> >>>> topic
> >>>>>>
> >>>>>>> and call it a day. I think it would be better to tackle that in
> >>>>>>>>
> >>>>>>> another
> >>>>>
> >>>>>> KIP.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Point 2:
> >>>>>>>> Regarding whether the consumer re-shuffles its inputs, this is
> >>>>>>>>
> >>>>>>> always
> >>>>
> >>>>> on
> >>>>>>
> >>>>>>> the table; any consumer who wants to re-shuffle its input is free
> >>>>>>>>
> >>>>>>> to
> >>>>
> >>>>> do
> >>>>>
> >>>>>> so.
> >>>>>>>
> >>>>>>>> But this is currently not required. It's just that the current
> >>>>>>>>
> >>>>>>> high-level
> >>>>>>
> >>>>>>> story with Kafka encourages the use of partitions as a unit of
> >>>>>>>>
> >>>>>>> concurrency.
> >>>>>>>
> >>>>>>>> As long as consumers are single-threaded, they can happily
> >>>>>>>>
> >>>>>>> consume
> >>>
> >>>> a
> >>>>
> >>>>> single
> >>>>>>>
> >>>>>>>> partition without concurrency control of any kind. This is a key
> >>>>>>>>
> >>>>>>> aspect
> >>>>>
> >>>>>> to
> >>>>>>>
> >>>>>>>> this system that lets folks design high-throughput systems on top
> >>>>>>>>
> >>>>>>> of
> >>>>
> >>>>> it
> >>>>>
> >>>>>> surprisingly easily. If all consumers were instead
> >>>>>>>>
> >>>>>>> encouraged/required
> >>>>>
> >>>>>> to
> >>>>>>
> >>>>>>> implement a repartition of their own, then the consumer becomes
> >>>>>>>> significantly more complex, requiring either the consumer to
> >>>>>>>>
> >>>>>>> first
> >>>
> >>>> produce
> >>>>>>>
> >>>>>>>> to its own intermediate repartition topic or to ensure that
> >>>>>>>>
> >>>>>>> consumer
> >>>>
> >>>>> threads have a reliable, high-bandwith channel of communication
> >>>>>>>>
> >>>>>>> with
> >>>>
> >>>>> every
> >>>>>>>
> >>>>>>>> other consumer thread.
> >>>>>>>>
> >>>>>>>> Either of those tradeoffs may be reasonable for a particular user
> >>>>>>>>
> >>>>>>> of
> >>>>
> >>>>> Kafka,
> >>>>>>>
> >>>>>>>> but I don't know if we're in a position to say that they are
> >>>>>>>>
> >>>>>>> reasonable
> >>>>>
> >>>>>> for
> >>>>>>>
> >>>>>>>> *every* user of Kafka.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Point 3:
> >>>>>>>> Regarding Jun's point about this use case, "(3) stateful and
> >>>>>>>>
> >>>>>>> maintaining
> >>>>>>
> >>>>>>> the
> >>>>>>>> states in a local store", I agree that they may use a framework
> >>>>>>>>
> >>>>>>> *like*
> >>>>>
> >>>>>> Kafka Streams, but that is not the same as using Kafka Streams.
> >>>>>>>>
> >>>>>>> This
> >>>>
> >>>>> is
> >>>>>
> >>>>>> why
> >>>>>>>
> >>>>>>>> I think it's better to solve it in Core: because it is then
> >>>>>>>>
> >>>>>>> solved
> >>>
> >>>> for
> >>>>>
> >>>>>> KStreams and also for everything else that facilitates local
> >>>>>>>>
> >>>>>>> state
> >>>
> >>>> maintenance. To me, Streams is a member of the category of
> >>>>>>>>
> >>>>>>> "stream
> >>>
> >>>> processing frameworks", which is itself a subcategory of "things
> >>>>>>>>
> >>>>>>> requiring
> >>>>>>>
> >>>>>>>> local state maintenence". I'm not sure if it makes sense to
> >>>>>>>
> >>>>>>>
>

Reply via email to