Hi Dong, I took a look at KIP-287. Producers writing using the new scheme
prior to processor catch up and cut-over makes sense. Thanks.

On Sat, Apr 14, 2018 at 7:09 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jeff,
>
> Thanks for the review. The scheme for expanding processors of the stateful
> processing job is described in "Support processor expansion" section in
> KIP-287 (link
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 287%3A+Support+partition+and+processor+expansion+for+
> stateful+processing+jobs#KIP-287:Supportpartitionandprocessorex
> pansionforstatefulprocessingjobs-Supportprocessorexpansion>).
> In particular, we will expand the partitions of the input topics before
> expanding processors of the processing job. While the new processor is
> catching up, the producer would already be producing using the new
> partitioning scheme. Does this answer your question?
>
> Thanks,
> Dong
>
>
> On Sat, Apr 14, 2018 at 1:44 PM, Jeff Chao <jc...@salesforce.com> wrote:
>
> > Hi, I had a question from Monday's meeting. The current mechanism, as
> Ray's
> > notes points out, is to create a copy consumer in which a switch over
> > happens when it catches up. Meanwhile, it looks like a producer would
> still
> > be writing using the old partitioning scheme. Wouldn't there be a case
> > where the copy consumer never catches up given a high enough produce
> rate?
> > I'm imagining this to work similarly to Kinesis's resharding mechanism,
> > though I may be missing something from the current proposal. Anyway, with
> > Kinesis as an example, this would mean a producer would stop writing
> using
> > the old scheme and start writing using the new scheme. That way, the
> > consumer will be able to catch up in which it will start consuming using
> > the new scheme afterwards.
> >
> > Thanks,
> > Jeff Chao
> >
> > On Sat, Apr 14, 2018 at 6:44 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Thanks for the notes by Jun and Ray. I have read through the notes. It
> > > seems that there are a few questions for the alternative solution by
> Jan
> > > and maybe Jan will answer these questions later?
> > >
> > > I have summarized the solution, which I previously provided in this
> > thread,
> > > in KIP-287, to hopefully show that we may have a promising direction to
> > > solve the partition and processor expansion problem for stateful
> > processing
> > > jobs. Maybe we can continue the discussion related to stateful
> processing
> > > in that discussion thread. If that looks good, maybe we can conclude
> the
> > > discussion for KIP-253.
> > >
> > > Not sure if I missed some ideas or questions in the notes. Is there
> > > specific concern for the latest KIP-253?
> > >
> > >
> > > On Mon, Apr 9, 2018 at 11:18 AM, Ray Chiang <rchi...@apache.org>
> wrote:
> > >
> > > > My notes from today's meeting.  Sorry if I got anyone's name wrong.
> > Plus
> > > I
> > > > missed a few moments with noise at home and/or dropped video.
> > > >
> > > > -Ray
> > > >
> > > > =====
> > > >
> > > > KIP-253 Discussion
> > > >
> > > > - Currently, adding partitions can cause keys to be read
> out-of-order.
> > > >   This KIP is trying to preserve the key ordering when adding
> > partitions.
> > > >
> > > > - State management in applications (i.e. Kafka Streams) can maintain
> > > >   local state via caching.  If the number of partitions changes, how
> > > >   would those applications update their local state.  This is the
> > current
> > > >   point of discussion/disagreement.
> > > >
> > > > - Jan Filipiak is mostly worried about log compacted topics.  Not as
> > > >   concerned about producer swapping.  Worried about the consumer
> design
> > > is
> > > >   a bit contradictory compared to the architecture.
> > > >
> > > >   Current design is to start up a new consumer in parallel with old
> > > >   topic/consumer.  Run until consumer finishes "copying" to the new
> > > topic.
> > > >   Once the consumer is caught up, point the producer at the new
> topic.
> > > >
> > > >   Would like to have this technique as a "core primitive" to Kafka.
> > > >   - Is this a useful primitive?
> > > >   - What's the best way to support it?
> > > >
> > > >   - Topic expansion as it currently exists just "adds partitions".
> But
> > > >     how does this affect bootstrapping applications?  How to deal
> with
> > > >     "moved" (from "old partition" to "new expanded partition") keys?
> > > >
> > > >   - Dong's proposal example.  10 partitions growing to 15.  5 of the
> > > >     first 10 partitions are split into 2 each.  Say Kafka remembers
> > > >     parent->child relationship.  Then for each parent partition,
> there
> > > >     are two child partitions.  Initially, there were 10 states to
> > > >     manage.  Then bootstrapping new application would have 15 states.
> > > >     Need to know which "generation" of partition you are consuming
> > > >     from.  Until you get to "newer" generation of data, then the keys
> > > >     will be find (i.e. reading from old partition).
> > > >
> > > >   - Scheme works well for transient events.  Any stateful processor
> > will
> > > >     likely break.
> > > >
> > > >   - Tracking can become extremely complicated, since each split
> > requires
> > > >     potentially more and more offset/partition combos.
> > > >
> > > >   - Need to support continuation for consumers to read the new
> > > partitions.
> > > >
> > > >   - With linear hashing, integral multiple increase (2x, 3x, 4x,
> etc).
> > > >     Easier mapping from old partition sets to new partition sets.
> > > >     Keys end up with a clean hierarchy instead of a major
> reshuffling.
> > > >
> > > >   - Dong's approach piggyback on existing leader epoch.  Log segment
> > > >     could be tagged with version in linear hashing case.
> > > >
> > > >   - In Jan's case, existing consumers bootstrap from the beginning.
> > > >
> > > >   - James' use case.  Using Kafka as a long term persistent data
> store.
> > > >     Putting "source of truth" information into Kafka.  Bootstrap case
> > > >     is very important.  New applications could be bootstrapping as
> they
> > > >     come up.
> > > >
> > > >     - Increasing partitions will help with load from prodcuer and
> > > >       increasing consumer parallelism.
> > > >     - How does Kinesis handling partition splits?  They don't have
> > > >       compacted logs, so no issue with bootstrapping.  Kinesis uses
> > > >       MD5 and splits results based on md5sum into bucket ranges.
> > > >     - Is it useful for the server to know the partitioning function?
> > > >       Consumer has some implicit assumptions about keyed partitions,
> > > >       but not strongly enforced on server side.
> > > >
> > > >     - KIP-213 (one to many joins in Kafka Streams)
> > > >
> > > >       - MySQL case.  Primary key forced to be used as Kafka key.
> > > >
> > > >         (Sorry had some audio and video drop at this point)
> > > >
> > > >       - Mirror Maker.  Source cluster has custom partitioning
> function.
> > > >         Producer won't duplicate to same partitioning setup as
> source.
> > > >         Need to provide same partitioning function to producer.
> > > >         Would need to determine partitioning function based on topic.
> > > >
> > > >         - Should server validate partitioning?
> > > >         - Who does actual determination of which key goes to which
> > > > partition.
> > > >
> > > >       - How to implement backfill?
> > > >
> > > >         - Who will do it?  In producer?  Hard to do.  Every client
> > would
> > > >           need to add this functionality.  Better to do on server
> side.
> > > >         - Add a type of "copy consumer"?  Add backoff to producer?
> > > >           Benefit of doing in consumer vs. producer?
> > > >
> > > >   - Still TBD
> > > >     - How to dedupe control messages?
> > > >     - How to deal with subtle cases during transition?
> > > >     - Is it useful for the server to have the option to validate the
> > key
> > > >       distribution?
> > > >     - Jan concerned about how a consumer application would look with
> > the
> > > >       new "split partition" design.
> > > >     - KIP introduced callback.  Jan doesn't think is useful. Callback
> > > >       for switching "between Partition 1 and can start on Partition
> > 11".
> > > >       Rely on marker in Partition 1 instead.  Intent for callback is
> > > >       for possibility that delivery of messages for given key is
> moved
> > > >       to a different consumer instance.
> > > >
> > > >
> > > >
> > > > On 4/6/18 9:44 AM, Dong Lin wrote:
> > > >
> > > >> Hey John,
> > > >>
> > > >> Thanks much for your super-detailed explanation. This is very
> helpful.
> > > >>
> > > >> Now that I have finished reading through your email, I think the
> > > proposed
> > > >> solution in my previous email probably meets the requirement #6
> > without
> > > >> requiring additional coordination (w.r.t. partition function) among
> > > >> clients. My understanding of requirement #6 is that, after partition
> > > >> expansion, messages with the given key will go to the same consumer
> > > before
> > > >> and after the partition expansion such that stream processing jobs
> > won't
> > > >> be
> > > >> affected. Thus this approach seems to be better than backfilling
> since
> > > it
> > > >> does not require data copy for input topics.
> > > >>
> > > >> In order for the proposed solution to meet requirements #6, we need
> > two
> > > >> extra requirements in addition to what has been described in the
> > > previous
> > > >> email: 1) stream processing job starts with the same number of
> > > processors
> > > >> as the initial number of partitions of the input topics; and 2) at
> any
> > > >> given time the number of partitions of the input topic >= the number
> > of
> > > >> processors of the given stream processing job.
> > > >>
> > > >> Could you take a look at the proposed solution and see if any of the
> > > >> claims
> > > >> above is false?
> > > >>
> > > >>
> > > >> Hey Jan,
> > > >>
> > > >> Maybe it is more efficient for us to discuss your concern in the KIP
> > > >> Meeting.
> > > >>
> > > >>
> > > >> Thanks,
> > > >> Dong
> > > >>
> > > >>
> > > >> On Thu, Mar 29, 2018 at 2:05 PM, John Roesler <j...@confluent.io>
> > > wrote:
> > > >>
> > > >> Hi Jun,
> > > >>>
> > > >>> Thanks for the response. I'm very new to this project, but I will
> > share
> > > >>> my
> > > >>> perspective. I'm going to say a bunch of stuff that I know you know
> > > >>> already, but just so we're on the same page...
> > > >>>
> > > >>> This may also be a good time to get feedback from the other
> KStreams
> > > >>> folks.
> > > >>>
> > > >>> Using KStreams as a reference implementation for how stream
> > processing
> > > >>> frameworks may interact with Kafka, I think it's important to
> eschew
> > > >>> knowledge about how KStreams currently handles internal
> > communication,
> > > >>> making state durable, etc. Both because these details may change,
> and
> > > >>> because they won't be shared with other stream processors.
> > > >>>
> > > >>> =================================
> > > >>> Background
> > > >>>
> > > >>> We are looking at a picture like this:
> > > >>>
> > > >>>       input input input
> > > >>>           \   |   /
> > > >>>        +-------------+
> > > >>> +-----+ Consumer(s) +-------+
> > > >>> |     +-------------+       |
> > > >>> |                           |
> > > >>> |    KStreams Application   |
> > > >>> |                           |
> > > >>> |     +-------------+       |
> > > >>> +-----+ Producer(s) +-------+
> > > >>>        +-------------+
> > > >>>             /    \
> > > >>>          output output
> > > >>>
> > > >>> The inputs and outputs are Kafka topics (and therefore have 1 or
> more
> > > >>> partitions). We'd have at least 1 input and 0 or more outputs. The
> > > >>> Consumers and Producers are both the official KafkaConsumer and
> > > >>> KafkaProducer.
> > > >>>
> > > >>> In general, we'll assume that the input topics are provided by
> actors
> > > >>> over
> > > >>> which we have no control, although we may as well assume they are
> > > >>> friendly
> > > >>> and amenable to requests, and also that their promises are
> > trustworthy.
> > > >>> This is important because we must depend on them to uphold some
> > > promises:
> > > >>> * That they tell us the schema of the data they publish, and abide
> by
> > > >>> that
> > > >>> schema. Without this, the inputs are essentially garbage.
> > > >>> * That they tell us some defining characteristics of the topics
> (more
> > > on
> > > >>> this in a sec.) and again strictly abide by that promise.
> > > >>>
> > > >>> What are the topic characteristics we care about?
> > > >>> 1. The name (or name pattern)
> > > >>> 2. How the messages are keyed (if at all)
> > > >>> 3. Whether the message timestamps are meaningful, and if so, what
> > their
> > > >>> meaning is
> > > >>> 4. Assuming the records have identity, whether the partitions
> > partition
> > > >>> the
> > > >>> records' identity space
> > > >>> 5. Whether the topic completely contains the data set
> > > >>> 6. Whether the messages in the topic are ordered
> > > >>>
> > > >>> #1 is obvious: without this information, we cannot access the data
> at
> > > >>> all.
> > > >>>
> > > >>> For #2, #3, #4, and #6, we may or may not need this information,
> > > >>> depending
> > > >>> on the logic of the application. For example, a trivial application
> > > that
> > > >>> simply counts all events it sees doesn't care about #2, #3, #4, or
> > #6.
> > > >>> But
> > > >>> an application that groups by some attribute can take advantage of
> #2
> > > and
> > > >>> #4 if the topic data is already keyed and partitioned over that
> > > >>> attribute.
> > > >>> Likewise, if the application includes some temporal semantics on a
> > > >>> temporal
> > > >>> dimension that is already captured in #3, it may take advantage of
> > that
> > > >>> fact.
> > > >>>
> > > >>> Note that #2, #3, #4, and #6 are all optional. If they are not
> > > promised,
> > > >>> we
> > > >>> can do extra work inside the application to accomplish what we
> need.
> > > >>> However, if they are promised (and if we depend on that promise),
> it
> > is
> > > >>> essential that the topic providers uphold those promises, as we may
> > not
> > > >>> be
> > > >>> in a position to verify them.
> > > >>>
> > > >>> Note also that if they make a promise, but it doesn't happen to
> line
> > up
> > > >>> with our needs (data is keyed by attr1, but we need it by attr2, or
> > > >>> timestamp is produce-time, but we need it by event-time, etc.),
> then
> > we
> > > >>> will have to go ahead and do that extra work internally anyway.
> This
> > > also
> > > >>> captures the situation in which two inputs are produced by
> different
> > > >>> providers, one of which meets our needs, and the other does not.
> The
> > > fact
> > > >>> that we can cope with this situation is the basis for my statement
> > that
> > > >>> we
> > > >>> do not require coordination among producers.
> > > >>>
> > > >>> (Key Point A): In terms of optimization, #4 and #6 are the most
> > > valuable.
> > > >>> If these characteristics happen to line up with our needs, then
> > > KStreams
> > > >>> can be incredibly efficient in both time and computational
> resources.
> > > >>>
> > > >>>   #5 is similar to knowing the schema in that it tells us whether
> the
> > > >>> computation we want to do is possible or not. For example, suppose
> we
> > > >>> have
> > > >>> a topic of "users", and we want to construct a table for querying.
> If
> > > the
> > > >>> user topic doesn't completely contain the dataset, we cannot
> > construct
> > > >>> the
> > > >>> table. Note that it doesn't matter whether the topic is compacted
> or
> > > not.
> > > >>> If the topic is complete, I can consume it starting at "earliest"
> and
> > > >>> build
> > > >>> my table. If it is not complete, I can do other computations on it.
> > In
> > > >>> both
> > > >>> cases, it may or may not be compacted; it just doesn't matter.
> > > >>>
> > > >>> On the output side, the roles are reversed. We provide (or not)
> > exactly
> > > >>> the
> > > >>> same set of guarantees to consumers of our outputs, and we likewise
> > > must
> > > >>> abide by the promises we make.
> > > >>>
> > > >>>
> > > >>> =================================
> > > >>> Partition Expansion
> > > >>>
> > > >>> With this formation in place, let's talk about partition expansion.
> > > >>>
> > > >>> Why do we have partitions in the first place? (let me know if I
> miss
> > > >>> something here)
> > > >>> * For logical data streams that are themselves partitionable, it
> > allows
> > > >>> producers to operate concurrently without coordination. For
> example,
> > > >>> streaming data from a sensor in a particle accelerator, the sensor
> > can
> > > be
> > > >>> subdivided into a grid and each grid square can produce
> independently
> > > to
> > > >>> a
> > > >>> different topic. This may be valuable because the total rate of
> data
> > > >>> exceeds the throughput to a single broker node or just because it
> > > allows
> > > >>> for failure of a single producer to cause the loss of only part of
> > the
> > > >>> data.
> > > >>> * The brokers can offer linearly scaling throughput on the number
> of
> > > >>> partitions by hosting each partition on a separate broker node
> > > >>> * The brokers can host topics that are too large to fit on a single
> > > >>> broker's storage by hosting some partitions on separate broker
> nodes
> > > >>> * In cases where the use case permits handling partitions
> > > independently,
> > > >>> consumers can have algorithmic simplicity by processing the data
> for
> > > >>> separate partitions in separate threads, avoiding costly and
> > > error-prone
> > > >>> concurrent coordination code
> > > >>> * In cases where the use case permits handling partitions
> > > independently,
> > > >>> consumers can exceed the total throughput of a single
> broker-consumer
> > > >>> pair
> > > >>> * Just to throw this in as well, in cases where some network links
> > are
> > > >>> less
> > > >>> costly than others (or lower latency or more reliable), such as
> when
> > > >>> brokers, producers, and consumers are running in racks: producer
> and
> > > >>> consumers can both benefit (independently) by locating work on each
> > > >>> partition in the same rack as the broker hosting that partition.
> > > >>>
> > > >>> In other words, we have three actors in this system: producers,
> > > brokers,
> > > >>> and consumers, and they all benefit from partitioning for different
> > > (but
> > > >>> sometimes related) reasons.
> > > >>>
> > > >>> This leads naturally to the conclusion that any of these actors may
> > > find
> > > >>> themselves in a sub-optimal or even dangerous situation in which
> > > >>> partition
> > > >>> expansion would be the solution. For example, the producer may find
> > > that
> > > >>> the existing throughput to the brokers is insufficient to match the
> > > data
> > > >>> rate, forcing them to drop data. Or a broker hosting a single
> > partition
> > > >>> may
> > > >>> be running out of disk space. Or a consumer node handling a single
> > > >>> partition cannot match the rate of production for that partition,
> > > causing
> > > >>> it to fall behind.
> > > >>>
> > > >>> I think it's reasonable to assume that all the actors in the system
> > > can't
> > > >>> just arbitrarily expand a topic's partition. I think it's
> reasonable
> > to
> > > >>> align this responsibility with the provider of the data, namely the
> > > >>> producer (the logical producer, not the KafkaProducer class).
> > > Therefore,
> > > >>> the producer who finds themselves in trouble can unilaterally
> expand
> > > >>> partitions to solve their problem.
> > > >>>
> > > >>> For the broker or a consumer in trouble, they have only one resort:
> > to
> > > >>> request the producer to expand partitions. This is where it's
> helpful
> > > to
> > > >>> assume the producer is friendly.
> > > >>>
> > > >>>
> > > >>> Now, let's look at how a KStreams application fits into this
> > scenario.
> > > >>>
> > > >>> (Key Point B1): As a consumer, we may find that the producer
> expands
> > > the
> > > >>> partitions of a topic, either for their own benefit or for the
> > brokers.
> > > >>> In
> > > >>> this situation, the expand operation MUST NOT violate any promises
> > that
> > > >>> have previously been made to us. This is the essence of KIP-253, to
> > > >>> ensure
> > > >>> the maintenance of promises #6 and #4. It would be great if the
> > > mechanics
> > > >>> of the expansion required no major disruption to processing or
> human
> > > >>> intervention.
> > > >>>
> > > >>> Specifically, let's say that input partition X splits into X1 and
> X2.
> > > #6
> > > >>> requires that the same old ordering guarantees of Kafka continue to
> > > hold.
> > > >>> Obviously, this is what KIP-253's title is about. #4 requires that
> we
> > > >>> either ensure that X1 and X2 are assigned to the same thread that
> was
> > > >>> previously assigned X OR that we immediately pause processing and
> > split
> > > >>> any
> > > >>> state such that it appears X1 and X2 were *always* separate
> > partitions.
> > > >>>
> > > >>> In other words, Option 1 is we treat X1 and X2 as still logically
> one
> > > >>> partition, equal to X. This is ideal, since in this scenario,
> > > partitions
> > > >>> are expanding for external reasons. We don't need to expand our
> > > >>> processing
> > > >>> to match. Option 2 requires a major disruption, since we'd have to
> > > pause
> > > >>> processing while we split our state. Clearly, KStreams or any other
> > > >>> stateful consumer would advocate for Option 1.
> > > >>>
> > > >>>
> > > >>> (Corollary to Key Point A): Still on the consumer side, we may find
> > > that
> > > >>> we
> > > >>> ourselves can benefit from partition expansion of an input. Since
> we
> > > can
> > > >>> cope with the absence of promise #4, partition expansion is not a
> > hard
> > > >>> requirement for us, but assuming we were already benefiting from
> the
> > > >>> major
> > > >>> performance optimizations afforded by #4, it would be nice to be
> able
> > > to
> > > >>> request the producer satisfy our request for partition expansion
> > **and
> > > to
> > > >>> be able to benefit from it**.
> > > >>>
> > > >>> What does it mean to be able to benefit from partition expansion?
> > > >>> Assuming
> > > >>> input topic partition X splits into X1 and X2, in this scenario, we
> > > >>> *would*
> > > >>> wish to be able to split our state such that it appears X1 and X2
> > were
> > > >>> *always* separate partitions. Of course, the conclusion of Key
> Point
> > B1
> > > >>> still applies: we should be able to continue operating on (X1+X2 =
> X)
> > > as
> > > >>> one partition while asynchronously building the state of X1 and X2
> > > >>> separately.
> > > >>>
> > > >>> When it comes to the mechanics of building the state of X1 and X2
> > > >>> separately, we have really just two high-level options. Either this
> > > >>> problem
> > > >>> is solved by Kafka itself, giving me a view in which X1 and X2 were
> > > >>> always
> > > >>> separate partitions, or I have to do it myself. The latter means
> > that I
> > > >>> have to take on substantially more complexity than I do today:
> > > >>> Bummer 1: My state has to be splittable to begin with, implying at
> > the
> > > >>> least that I need to be able to scan every record in my state, a
> > > >>> requirement that otherwise does not exist.
> > > >>> Bummer 2: After splitting the state  of X1 and X2, I need to be
> able
> > to
> > > >>> send at least one of those tasks, state included, to another
> > > application
> > > >>> node (in order to realize the benefit of the split). This is also a
> > > >>> requirement that does not currently exist.
> > > >>> Bummer 3: In order to actually perform the split, I must know and
> be
> > > able
> > > >>> to execute the exact same partition function the producer of my
> topic
> > > >>> uses.
> > > >>> This introduces a brand-new a priori commitment from my input
> > > producers:
> > > >>> (would-be #7: convey the partition function and abide by it). This
> > is a
> > > >>> big
> > > >>> restriction over #4, which only requires them to guarantee *that
> > there
> > > >>> is a
> > > >>> partition function*. Now they actually have to share the function
> > with
> > > >>> me.
> > > >>> And I have to be able to implement and execute it myself. And if
> the
> > > >>> producer wishes to refine the partition function for an existing
> > topic,
> > > >>> we
> > > >>> have another round of coordination, as they have to be sure that I,
> > and
> > > >>> all
> > > >>> other consumers, begin using the new function *before* they do.
> This
> > is
> > > >>> similar to the schema problem, with a similar set of solutions. We
> > > would
> > > >>> likely need a partition function registry and another magic byte on
> > > every
> > > >>> record to be sure we do this right. Not to mention some way to
> > express
> > > >>> arbitrary partitioning logic over arbitrary data in a way that is
> > > >>> portable
> > > >>> across programming languages.
> > > >>>
> > > >>> Alternatively, if Kafka gives me a view in which X1 and X2 were
> > always
> > > >>> separate, then I can create tasks for X1 and X2 and allow them to
> > > >>> bootstrap
> > > >>> while I continue to process X. Once they are ready, I can
> coordinate
> > a
> > > >>> transition to stop X's task and switch to X1 and X2. None of those
> > > >>> bummers
> > > >>> are present, so this is a significantly better option for me.
> > > >>>
> > > >>> (Key Point B2): As a (friendly) producer, we may once again want on
> > our
> > > >>> own
> > > >>> to expand partitions, or we may want to satisfy a request from the
> > > broker
> > > >>> or our consumers to do so. Again, we MUST NOT violate any promises
> we
> > > >>> have
> > > >>> previously given, and it would be great if the expansion required
> no
> > > >>> major
> > > >>> disruption to processing or human intervention. Additionally, since
> > we
> > > >>> are
> > > >>> actually driving the expansion, it would also be great if we could
> > > avoid
> > > >>> Bummer 3's coordination problems from the producer's perspective.
> > > >>>
> > > >>>
> > > >>> ======================================
> > > >>> Briefly: KStreams internals
> > > >>>
> > > >>> I'm pretty sure you were asking me to comment on the implementation
> > > >>> details
> > > >>> of KStreams, so I'll say a few words about it. The most important
> > thing
> > > >>> is
> > > >>> that KStreams is still very early in its development. Maybe
> > > "early-middle
> > > >>> maturity" is a good way to put it. We are actively discussing
> > > >>> more-or-less
> > > >>> major implementation changes to improve performance, footprint,
> > > >>> scalability, and ergonomics. So it may actually be misleading to
> > > discuss
> > > >>> deeply how KStreams internally uses Kafka topics.
> > > >>>
> > > >>> Nevertheless, it is currently true that KStreams uses Kafka topics
> > for
> > > >>> communication between some internal computation nodes. We partition
> > > these
> > > >>> topics as the base unit of concurrency granularity, so it would
> > > >>> potentially
> > > >>> be beneficial to be able to expand partitions for some of these
> > > internal
> > > >>> topics at some point. However, we can alternatively just
> > overpartition
> > > >>> these internal topics, creating in the low 100s of partitions
> instead
> > > of
> > > >>> the low 10s, for example. (Side note: if Kafka were someday to
> > support
> > > >>> higher numbers of partitions, we could expand this scheme to
> > > >>> overpartition
> > > >>> in the 1000s of partitions.) With the option to overpartition, we
> > don't
> > > >>> have a strong need for partition expansion internally.
> > > >>>
> > > >>> It is also currently true that KStreams uses Kafka to store a
> durable
> > > >>> changelog for some of our internal state stores. But we *only* read
> > > from
> > > >>> this topic *if* we need to restore a state store after node loss
> (or
> > to
> > > >>> maintain a hot mirror of the state store), so I think it's unlikely
> > > that
> > > >>> we
> > > >>> would ever make use of partition expansion on the changelog topics.
> > > >>>
> > > >>> But once again, I'd like to emphasize that we may choose an
> > alternative
> > > >>> implementation for either interprocess communication or state
> > > durability.
> > > >>>
> > > >>>
> > > >>> ======================================
> > > >>> Concluding thoughts
> > > >>>
> > > >>> I know this is a very long email, and I really appreciate you
> > sticking
> > > >>> with
> > > >>> me this long. I hope it was useful for syncing our mental picture
> of
> > > this
> > > >>> system. Also, you're far more knowledgeable than I am about this
> > system
> > > >>> and
> > > >>> this domain, so please correct me if I've said anything wrong.
> > > >>>
> > > >>> To me the key takeaways are that:
> > > >>> - KIP-253 satisfies all we need for correctness, since it contains
> > > >>> solutions to guarantee producers can abide by their promises w.r.t.
> > #4
> > > >>> and
> > > >>> #6.
> > > >>> - From Key Point A: #4 is actually optional for KIP-253, but
> without
> > > it,
> > > >>> we
> > > >>> lose a potentially valuable optimization in KStreams (and all other
> > > >>> consumer applications)
> > > >>> - From Corollary to Point A: Without low-level support for
> partition
> > > >>> expansion with backfill, we cannot employ requesting partition
> > > expansion
> > > >>> as
> > > >>> a consumer to improve application performance. In that case, to
> > ensure
> > > >>> performance scalability, we would have to discard for all KStreams
> > > >>> applications the performance optimization afforded by #4.
> > > >>> - From Key Point B1: After a partition split, we really need to be
> > able
> > > >>> to
> > > >>> seamlessly continue operating as if it had not split.
> > > >>> - From Key Point B2: Since KIP-253 allows us to maintain all our
> > > >>> promises,
> > > >>> we have the option of expanding partitions in the topics we
> produce.
> > > >>> Without a backfill operation, though, our consumers may not be able
> > to
> > > >>> realize the benefits of that split, if they were hoping to.
> > > >>>
> > > >>> In general, faced with the possibility of having to coordinate the
> > > >>> partition function with our inputs' producers or with our outputs'
> > > >>> consumers, I would personally lean toward overprovisioning and
> > > completely
> > > >>> avoiding resize for our use case. This doesn't mean that it's not
> > > useful
> > > >>> in
> > > >>> the ecosystem at large without backfill, just that it loses its
> > luster
> > > >>> for
> > > >>> me. It also means that we can no longer take advantage of some of
> our
> > > >>> current optimizations, and in fact that we must introduce an extra
> > hop
> > > of
> > > >>> repartitioning on every single input.
> > > >>>
> > > >>> I think this is actually a pretty good picture of the opportunities
> > and
> > > >>> challenges that other consumers and producers in the Kafka
> ecosystem
> > > will
> > > >>> face.
> > > >>>
> > > >>> I hope this helps!
> > > >>>
> > > >>> Thanks,
> > > >>> -John
> > > >>>
> > > >>> On Wed, Mar 28, 2018 at 11:51 AM, Jun Rao <j...@confluent.io>
> wrote:
> > > >>>
> > > >>> Hi, John,
> > > >>>>
> > > >>>> I actually think it's important to think through how KStreams
> > handles
> > > >>>> partition expansion in this KIP. If we do decide that we truly
> need
> > > >>>> backfilling, it's much better to think through how to add it now,
> > > >>>> instead
> > > >>>> of retrofitting it later. It would be useful to outline how both
> > > >>>> existing
> > > >>>> KStreams jobs and new KStreams jobs work to see if backfilling is
> > > really
> > > >>>> needed.
> > > >>>>
> > > >>>> If we can figure out how KStreams works, at least we have one
> > > reference
> > > >>>> implementation for other stream processing frameworks that face
> the
> > > same
> > > >>>> issue.
> > > >>>>
> > > >>>> Thanks,
> > > >>>>
> > > >>>> Jun
> > > >>>>
> > > >>>>
> > > >>>> On Tue, Mar 27, 2018 at 4:56 PM, John Roesler <j...@confluent.io>
> > > >>>> wrote:
> > > >>>>
> > > >>>> Hi Jun,
> > > >>>>>
> > > >>>>> That's a good point.
> > > >>>>>
> > > >>>>> Yeah, I don't think it would work too well for existing consumers
> > in
> > > >>>>>
> > > >>>> the
> > > >>>
> > > >>>> middle of gen 0 to try and switch to a newly backfilled prefix of
> > gen
> > > >>>>>
> > > >>>> 1.
> > > >>>
> > > >>>> They probably just need to finish up until they get to the end of
> > gen
> > > 0
> > > >>>>>
> > > >>>> and
> > > >>>>
> > > >>>>> transition just as if there were no backfill available yet.
> > > >>>>>
> > > >>>>> This isn't terrible, since consumer applications that care about
> > > >>>>>
> > > >>>> scaling
> > > >>>
> > > >>>> up
> > > >>>>
> > > >>>>> to match a freshly split partition would wait until after the
> > > backfill
> > > >>>>>
> > > >>>> is
> > > >>>
> > > >>>> available to scale up. The consumer that starts out in gen=0,
> part=0
> > > is
> > > >>>>> going to be stuck with part=0 and part=3 in gen=1 in my example
> > > >>>>>
> > > >>>> regardless
> > > >>>>
> > > >>>>> of whether they finish scanning gen=0 before or after the
> backfill
> > is
> > > >>>>> available.
> > > >>>>>
> > > >>>>> The broker knowing when it's ok to delete gen 0, including its
> > offset
> > > >>>>> mappings, is a big issue, though. I don't have any immediate
> ideas
> > > for
> > > >>>>> solving it, but it doesn't feel impossible. Hopefully, you agree
> > this
> > > >>>>>
> > > >>>> is
> > > >>>
> > > >>>> outside of KIP-253's scope, so maybe we don't need to worry about
> it
> > > >>>>>
> > > >>>> right
> > > >>>>
> > > >>>>> now.
> > > >>>>>
> > > >>>>> I do agree that reshuffling in KStreams effectively solves the
> > > >>>>>
> > > >>>> scalability
> > > >>>>
> > > >>>>> problem as well, as it decouples the partition count (and the
> > > partition
> > > >>>>> scheme) upstream from the parallelism of the streams application.
> > > >>>>>
> > > >>>> Likely,
> > > >>>
> > > >>>> we will do this in any case. I'm predominantly advocating for
> > > follow-on
> > > >>>>> work to enable backfill for the *other* Kafka users that are not
> > > >>>>>
> > > >>>> KStreams.
> > > >>>>
> > > >>>>> Thanks for your consideration,
> > > >>>>> -John
> > > >>>>>
> > > >>>>> On Tue, Mar 27, 2018 at 6:19 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > >>>>>
> > > >>>>> Hi, John,
> > > >>>>>>
> > > >>>>>> Thanks for the reply. I agree that the backfill approach works
> > > >>>>>>
> > > >>>>> cleaner
> > > >>>
> > > >>>> for
> > > >>>>>
> > > >>>>>> newly started consumers. I am just not sure if it's a good
> > primitive
> > > >>>>>>
> > > >>>>> to
> > > >>>
> > > >>>> support for existing consumers. One of the challenges that I see
> is
> > > >>>>>>
> > > >>>>> the
> > > >>>
> > > >>>> remapping of the offsets. In your approach, we need to copy the
> > > >>>>>>
> > > >>>>> existing
> > > >>>>
> > > >>>>> records from the partitions in generation 0 to generation 1.
> Those
> > > >>>>>>
> > > >>>>> records
> > > >>>>>
> > > >>>>>> will get different offsets in the new generation. The broker
> will
> > > >>>>>>
> > > >>>>> have
> > > >>>
> > > >>>> to
> > > >>>>
> > > >>>>> store those offset mappings somewhere. When the backfill
> completes,
> > > >>>>>>
> > > >>>>> you
> > > >>>
> > > >>>> can
> > > >>>>>
> > > >>>>>> delete generation 0's data. However, the broker can't throw away
> > the
> > > >>>>>>
> > > >>>>> offset
> > > >>>>>
> > > >>>>>> mappings immediately since it doesn't know if there is any
> > existing
> > > >>>>>> consumer still consuming generation 0's records. In a compacted
> > > >>>>>>
> > > >>>>> topic,
> > > >>>
> > > >>>> the
> > > >>>>>
> > > >>>>>> broker probably can only safely remove the offset mappings when
> > all
> > > >>>>>>
> > > >>>>> records
> > > >>>>>
> > > >>>>>> in generation 0 are removed by the cleaner. This may never
> happen
> > > >>>>>>
> > > >>>>> though.
> > > >>>>
> > > >>>>> If we reshuffle the input inside a KStreams job, it obviates the
> > need
> > > >>>>>>
> > > >>>>> for
> > > >>>>
> > > >>>>> offset remapping on the broker.
> > > >>>>>>
> > > >>>>>> Jun
> > > >>>>>>
> > > >>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <
> j...@confluent.io
> > >
> > > >>>>>>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hey Dong and Jun,
> > > >>>>>>>
> > > >>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll
> mix
> > my
> > > >>>>>>>
> > > >>>>>> replies
> > > >>>>>>
> > > >>>>>>> together to try for a coherent response. I'm not too familiar
> > with
> > > >>>>>>> mailing-list etiquette, though.
> > > >>>>>>>
> > > >>>>>>> I'm going to keep numbering my points because it makes it easy
> > for
> > > >>>>>>>
> > > >>>>>> you
> > > >>>>
> > > >>>>> all
> > > >>>>>>
> > > >>>>>>> to respond.
> > > >>>>>>>
> > > >>>>>>> Point 1:
> > > >>>>>>> As I read it, KIP-253 is *just* about properly fencing the
> > > >>>>>>>
> > > >>>>>> producers
> > > >>>
> > > >>>> and
> > > >>>>>
> > > >>>>>> consumers so that you preserve the correct ordering of records
> > > >>>>>>>
> > > >>>>>> during
> > > >>>
> > > >>>> partition expansion. This is clearly necessary regardless of
> > > >>>>>>>
> > > >>>>>> anything
> > > >>>
> > > >>>> else
> > > >>>>>>
> > > >>>>>>> we discuss. I think this whole discussion about backfill,
> > > >>>>>>>
> > > >>>>>> consumers,
> > > >>>
> > > >>>> streams, etc., is beyond the scope of KIP-253. But it would be
> > > >>>>>>>
> > > >>>>>> cumbersome
> > > >>>>>
> > > >>>>>> to start a new thread at this point.
> > > >>>>>>>
> > > >>>>>>> I had missed KIP-253's Proposed Change #9 among all the
> > details...
> > > >>>>>>>
> > > >>>>>> I
> > > >>>
> > > >>>> think
> > > >>>>>>
> > > >>>>>>> this is a nice addition to the proposal. One thought is that
> it's
> > > >>>>>>>
> > > >>>>>> actually
> > > >>>>>>
> > > >>>>>>> irrelevant whether the hash function is linear. This is simply
> an
> > > >>>>>>>
> > > >>>>>> algorithm
> > > >>>>>>
> > > >>>>>>> for moving a key from one partition to another, so the type of
> > hash
> > > >>>>>>> function need not be a precondition. In fact, it also doesn't
> > > >>>>>>>
> > > >>>>>> matter
> > > >>>
> > > >>>> whether the topic is compacted or not, the algorithm works
> > > >>>>>>>
> > > >>>>>> regardless.
> > > >>>>
> > > >>>>> I think this is a good algorithm to keep in mind, as it might
> > > >>>>>>>
> > > >>>>>> solve a
> > > >>>
> > > >>>> variety of problems, but it does have a downside: that the
> producer
> > > >>>>>>>
> > > >>>>>> won't
> > > >>>>>
> > > >>>>>> know whether or not K1 was actually in P1, it just knows that K1
> > > >>>>>>>
> > > >>>>>> was
> > > >>>
> > > >>>> in
> > > >>>>
> > > >>>>> P1's keyspace before the new epoch. Therefore, it will have to
> > > >>>>>>> pessimistically send (K1,null) to P1 just in case. But the next
> > > >>>>>>>
> > > >>>>>> time
> > > >>>
> > > >>>> K1
> > > >>>>
> > > >>>>> comes along, the producer *also* won't remember that it already
> > > >>>>>>>
> > > >>>>>> retracted
> > > >>>>>
> > > >>>>>> K1 from P1, so it will have to send (K1,null) *again*. By
> > > >>>>>>>
> > > >>>>>> extension,
> > > >>>
> > > >>>> every
> > > >>>>>>
> > > >>>>>>> time the producer sends to P2, it will also have to send a
> > > >>>>>>>
> > > >>>>>> tombstone
> > > >>>
> > > >>>> to
> > > >>>>
> > > >>>>> P1,
> > > >>>>>>
> > > >>>>>>> which is a pretty big burden. To make the situation worse, if
> > there
> > > >>>>>>>
> > > >>>>>> is
> > > >>>>
> > > >>>>> a
> > > >>>>>
> > > >>>>>> second split, say P2 becomes P2 and P3, then any key Kx
> belonging
> > > >>>>>>>
> > > >>>>>> to
> > > >>>
> > > >>>> P3
> > > >>>>
> > > >>>>> will also have to be retracted from P2 *and* P1, since the
> producer
> > > >>>>>>>
> > > >>>>>> can't
> > > >>>>>
> > > >>>>>> know whether Kx had been last written to P2 or P1. Over a long
> > > >>>>>>>
> > > >>>>>> period
> > > >>>
> > > >>>> of
> > > >>>>>
> > > >>>>>> time, this clearly becomes a issue, as the producer must send an
> > > >>>>>>>
> > > >>>>>> arbitrary
> > > >>>>>>
> > > >>>>>>> number of retractions along with every update.
> > > >>>>>>>
> > > >>>>>>> In contrast, the proposed backfill operation has an end, and
> > after
> > > >>>>>>>
> > > >>>>>> it
> > > >>>
> > > >>>> ends,
> > > >>>>>>
> > > >>>>>>> everyone can afford to forget that there ever was a different
> > > >>>>>>>
> > > >>>>>> partition
> > > >>>>
> > > >>>>> layout.
> > > >>>>>>>
> > > >>>>>>> Really, though, figuring out how to split compacted topics is
> > > >>>>>>>
> > > >>>>>> beyond
> > > >>>
> > > >>>> the
> > > >>>>>
> > > >>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be in
> > > >>>>>>>
> > > >>>>>> this
> > > >>>
> > > >>>> KIP...
> > > >>>>>>
> > > >>>>>>> We do need in-order delivery during partition expansion. It
> would
> > > >>>>>>>
> > > >>>>>> be
> > > >>>
> > > >>>> fine
> > > >>>>>
> > > >>>>>> by me to say that you *cannot* expand partitions of a
> > log-compacted
> > > >>>>>>>
> > > >>>>>> topic
> > > >>>>>
> > > >>>>>> and call it a day. I think it would be better to tackle that in
> > > >>>>>>>
> > > >>>>>> another
> > > >>>>
> > > >>>>> KIP.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Point 2:
> > > >>>>>>> Regarding whether the consumer re-shuffles its inputs, this is
> > > >>>>>>>
> > > >>>>>> always
> > > >>>
> > > >>>> on
> > > >>>>>
> > > >>>>>> the table; any consumer who wants to re-shuffle its input is
> free
> > > >>>>>>>
> > > >>>>>> to
> > > >>>
> > > >>>> do
> > > >>>>
> > > >>>>> so.
> > > >>>>>>
> > > >>>>>>> But this is currently not required. It's just that the current
> > > >>>>>>>
> > > >>>>>> high-level
> > > >>>>>
> > > >>>>>> story with Kafka encourages the use of partitions as a unit of
> > > >>>>>>>
> > > >>>>>> concurrency.
> > > >>>>>>
> > > >>>>>>> As long as consumers are single-threaded, they can happily
> > consume
> > > >>>>>>>
> > > >>>>>> a
> > > >>>
> > > >>>> single
> > > >>>>>>
> > > >>>>>>> partition without concurrency control of any kind. This is a
> key
> > > >>>>>>>
> > > >>>>>> aspect
> > > >>>>
> > > >>>>> to
> > > >>>>>>
> > > >>>>>>> this system that lets folks design high-throughput systems on
> top
> > > >>>>>>>
> > > >>>>>> of
> > > >>>
> > > >>>> it
> > > >>>>
> > > >>>>> surprisingly easily. If all consumers were instead
> > > >>>>>>>
> > > >>>>>> encouraged/required
> > > >>>>
> > > >>>>> to
> > > >>>>>
> > > >>>>>> implement a repartition of their own, then the consumer becomes
> > > >>>>>>> significantly more complex, requiring either the consumer to
> > first
> > > >>>>>>>
> > > >>>>>> produce
> > > >>>>>>
> > > >>>>>>> to its own intermediate repartition topic or to ensure that
> > > >>>>>>>
> > > >>>>>> consumer
> > > >>>
> > > >>>> threads have a reliable, high-bandwith channel of communication
> > > >>>>>>>
> > > >>>>>> with
> > > >>>
> > > >>>> every
> > > >>>>>>
> > > >>>>>>> other consumer thread.
> > > >>>>>>>
> > > >>>>>>> Either of those tradeoffs may be reasonable for a particular
> user
> > > >>>>>>>
> > > >>>>>> of
> > > >>>
> > > >>>> Kafka,
> > > >>>>>>
> > > >>>>>>> but I don't know if we're in a position to say that they are
> > > >>>>>>>
> > > >>>>>> reasonable
> > > >>>>
> > > >>>>> for
> > > >>>>>>
> > > >>>>>>> *every* user of Kafka.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Point 3:
> > > >>>>>>> Regarding Jun's point about this use case, "(3) stateful and
> > > >>>>>>>
> > > >>>>>> maintaining
> > > >>>>>
> > > >>>>>> the
> > > >>>>>>> states in a local store", I agree that they may use a framework
> > > >>>>>>>
> > > >>>>>> *like*
> > > >>>>
> > > >>>>> Kafka Streams, but that is not the same as using Kafka Streams.
> > > >>>>>>>
> > > >>>>>> This
> > > >>>
> > > >>>> is
> > > >>>>
> > > >>>>> why
> > > >>>>>>
> > > >>>>>>> I think it's better to solve it in Core: because it is then
> > solved
> > > >>>>>>>
> > > >>>>>> for
> > > >>>>
> > > >>>>> KStreams and also for everything else that facilitates local
> state
> > > >>>>>>> maintenance. To me, Streams is a member of the category of
> > "stream
> > > >>>>>>> processing frameworks", which is itself a subcategory of
> "things
> > > >>>>>>>
> > > >>>>>> requiring
> > > >>>>>>
> > > >>>>>>> local state maintenence". I'm not sure if it makes sense to
> > assert
> > > >>>>>>>
> > > >>>>>> that
> > > >>>>
> > > >>>>> Streams is a sufficient and practical replacement for everything
> in
> > > >>>>>>>
> > > >>>>>> "things
> > > >>>>>>
> > > >>>>>>> requiring local state maintenence".
> > > >>>>>>>
> > > >>>>>>> But, yes, I do agree that per-key ordering is an absolute
> > > >>>>>>>
> > > >>>>>> requirement,
> > > >>>>
> > > >>>>> therefore I think that KIP-253 itself is a necessary step.
> > > >>>>>>>
> > > >>>>>> Regarding
> > > >>>
> > > >>>> the
> > > >>>>>
> > > >>>>>> coupling of the state store partitioning to the topic
> > partitioning,
> > > >>>>>>>
> > > >>>>>> yes,
> > > >>>>>
> > > >>>>>> this is an issue we are discussing solutions to right now. We
> may
> > > >>>>>>>
> > > >>>>>> go
> > > >>>
> > > >>>> ahead
> > > >>>>>>
> > > >>>>>>> and introduce an overpartition layer on our inputs to solve it,
> > but
> > > >>>>>>>
> > > >>>>>> then
> > > >>>>>
> > > >>>>>> again, if we get the ability to split partitions with backfill,
> we
> > > >>>>>>>
> > > >>>>>> may
> > > >>>>
> > > >>>>> not
> > > >>>>>>
> > > >>>>>>> need to!
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Point 4:
> > > >>>>>>> On this:
> > > >>>>>>>
> > > >>>>>>> Regarding thought 2: If we don't care about the stream
> use-case,
> > > >>>>>>>>
> > > >>>>>>> then
> > > >>>>
> > > >>>>> the
> > > >>>>>>
> > > >>>>>>> current KIP probably has already addressed problem without
> > > >>>>>>>>
> > > >>>>>>> requiring
> > > >>>>
> > > >>>>> consumer to know the partition function. If we care about the
> > > >>>>>>>>
> > > >>>>>>> stream
> > > >>>>
> > > >>>>> use-case, we already need coordination across producers of
> > > >>>>>>>>
> > > >>>>>>> different
> > > >>>>
> > > >>>>> topics, i.e. the same partition function needs to be used by
> > > >>>>>>>>
> > > >>>>>>> producers
> > > >>>>>
> > > >>>>>> of
> > > >>>>>>
> > > >>>>>>> topics A and B in order to join topics A and B. Thus, it might
> be
> > > >>>>>>>> reasonable to extend coordination a bit and say we need
> > > >>>>>>>>
> > > >>>>>>> coordination
> > > >>>>
> > > >>>>> across
> > > >>>>>>>
> > > >>>>>>>> clients (i.e. producer and consumer), such that consumer knows
> > > >>>>>>>>
> > > >>>>>>> the
> > > >>>
> > > >>>> partition function used by producer. If we do so, then we can let
> > > >>>>>>>>
> > > >>>>>>> consumer
> > > >>>>>>>
> > > >>>>>>>> re-copy data for the change log topic using the same partition
> > > >>>>>>>>
> > > >>>>>>> function
> > > >>>>>
> > > >>>>>> as
> > > >>>>>>>
> > > >>>>>>>> producer. This approach has lower overhead as compared to
> having
> > > >>>>>>>>
> > > >>>>>>> producer
> > > >>>>>>
> > > >>>>>>> re-copy data of the input topic.
> > > >>>>>>>> Also, producer currently does not need to know the data
> already
> > > >>>>>>>>
> > > >>>>>>> produced
> > > >>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>>> the topic. If we let producer split/merge partition, it would
> > > >>>>>>>>
> > > >>>>>>> require
> > > >>>>
> > > >>>>> producer to consume the existing data, which intuitively is the
> > > >>>>>>>>
> > > >>>>>>> task
> > > >>>>
> > > >>>>> of
> > > >>>>>
> > > >>>>>> consumer.
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> I think we do care about use cases *like* Streams, I just don't
> > > >>>>>>>
> > > >>>>>> think
> > > >>>
> > > >>>> we
> > > >>>>>
> > > >>>>>> should rely on Streams to implement a feature of Core like
> > > >>>>>>>
> > > >>>>>> partition
> > > >>>
> > > >>>> expansion.
> > > >>>>>>>
> > > >>>>>>> Note, though, that we (Streams) do not require coordination
> > across
> > > >>>>>>> producers. If two topics are certified to be co-partitioned,
> then
> > > >>>>>>>
> > > >>>>>> Streams
> > > >>>>>
> > > >>>>>> apps can make use of that knowledge to optimize their topology
> > > >>>>>>>
> > > >>>>>> (skipping
> > > >>>>>
> > > >>>>>> a
> > > >>>>>>
> > > >>>>>>> repartition). But if they don't know whether they are
> > > >>>>>>>
> > > >>>>>> co-partitioned,
> > > >>>
> > > >>>> then
> > > >>>>>>
> > > >>>>>>> they'd better go ahead and repartition within the topology.
> This
> > is
> > > >>>>>>>
> > > >>>>>> the
> > > >>>>
> > > >>>>> current state.
> > > >>>>>>>
> > > >>>>>>> A huge selling point of Kafka is enabling different parts of
> > > >>>>>>>
> > > >>>>>> loosely
> > > >>>
> > > >>>> coupled organizations to produce and consume data independently.
> > > >>>>>>>
> > > >>>>>> Some
> > > >>>
> > > >>>> coordination between producers and consumers is necessary, like
> > > >>>>>>> coordinating on the names of topics and their schemas. But
> > Kafka's
> > > >>>>>>>
> > > >>>>>> value
> > > >>>>>
> > > >>>>>> proposition w.r.t. ESBs, etc. is inversely proportional to the
> > > >>>>>>>
> > > >>>>>> amount
> > > >>>
> > > >>>> of
> > > >>>>>
> > > >>>>>> coordination required. I think it behooves us to be extremely
> > > >>>>>>>
> > > >>>>>> skeptical
> > > >>>>
> > > >>>>> about introducing any coordination beyond correctness protocols.
> > > >>>>>>>
> > > >>>>>>> Asking producers and consumers, or even two different
> producers,
> > to
> > > >>>>>>>
> > > >>>>>> share
> > > >>>>>
> > > >>>>>> code like the partition function is a pretty huge ask. What if
> > they
> > > >>>>>>>
> > > >>>>>> are
> > > >>>>
> > > >>>>> using different languages?
> > > >>>>>>>
> > > >>>>>>> Comparing organizational overhead vs computational overhead,
> > there
> > > >>>>>>>
> > > >>>>>> are
> > > >>>>
> > > >>>>> maybe two orders of magnitude difference between them. In other
> > > >>>>>>>
> > > >>>>>> words,
> > > >>>>
> > > >>>>> I
> > > >>>>>
> > > >>>>>> would happily take on the (linear) overhead of having the
> producer
> > > >>>>>>>
> > > >>>>>> re-copy
> > > >>>>>>
> > > >>>>>>> the data once during a re-partition in order to save the
> > > >>>>>>>
> > > >>>>>> organizational
> > > >>>>
> > > >>>>> overhead of tying all the producers and consumers together across
> > > >>>>>>>
> > > >>>>>> multiple
> > > >>>>>>
> > > >>>>>>> boundaries.
> > > >>>>>>>
> > > >>>>>>> On that last paragraph: note that the producer *did* know the
> > data
> > > >>>>>>>
> > > >>>>>> it
> > > >>>
> > > >>>> already produced. It handled it the first time around. Asking it
> to
> > > >>>>>>> re-produce it into a new partition layout is squarely within
> its
> > > >>>>>>>
> > > >>>>>> scope
> > > >>>>
> > > >>>>> of
> > > >>>>>
> > > >>>>>> capabilities. Contrast this with the alternative, asking the
> > > >>>>>>>
> > > >>>>>> consumer
> > > >>>
> > > >>>> to
> > > >>>>>
> > > >>>>>> re-partition the data. I think this is even less intuitive, when
> > > >>>>>>>
> > > >>>>>> the
> > > >>>
> > > >>>> partition function belongs to the producer.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Point 5:
> > > >>>>>>> Dong asked this:
> > > >>>>>>>
> > > >>>>>>> For stream use-case that needs to increase consumer number, the
> > > >>>>>>>> existing consumer can backfill the existing data in the change
> > > >>>>>>>>
> > > >>>>>>> log
> > > >>>
> > > >>>> topic
> > > >>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>>> the same change log topic with the new partition number,
> before
> > > >>>>>>>>
> > > >>>>>>> the
> > > >>>
> > > >>>> new
> > > >>>>>
> > > >>>>>> set
> > > >>>>>>>
> > > >>>>>>>> of consumers bootstrap state from the new partitions of the
> > > >>>>>>>>
> > > >>>>>>> change
> > > >>>
> > > >>>> log
> > > >>>>>
> > > >>>>>> topic, right?
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>> In this sense, the "consumer" is actually the producer of the
> > > >>>>>>>
> > > >>>>>> changelog
> > > >>>>
> > > >>>>> topic, so if we support partition expansion + backfill as a
> > > >>>>>>>
> > > >>>>>> producer/broker
> > > >>>>>>
> > > >>>>>>> operation, then it would be very straightforward for Streams to
> > > >>>>>>>
> > > >>>>>> split a
> > > >>>>
> > > >>>>> state store. As you say, they would simply instruct the broker to
> > > >>>>>>>
> > > >>>>>> split
> > > >>>>
> > > >>>>> the
> > > >>>>>>
> > > >>>>>>> changelog topic's partitions, then backfill. Once the backfill
> is
> > > >>>>>>>
> > > >>>>>> ready,
> > > >>>>>
> > > >>>>>> they can create a new crop of StandbyTasks to bootstrap the more
> > > >>>>>>>
> > > >>>>>> granular
> > > >>>>>
> > > >>>>>> state stores and finally switch over to them when they are
> ready.
> > > >>>>>>>
> > > >>>>>>> But this actually seems to be an argument in favor of
> > > >>>>>>>
> > > >>>>>> split+backfill,
> > > >>>
> > > >>>> so
> > > >>>>>
> > > >>>>>> maybe I missed the point.
> > > >>>>>>>
> > > >>>>>>> You also asked me to explain why copying the "input" topic is
> > > >>>>>>>
> > > >>>>>> better
> > > >>>
> > > >>>> than
> > > >>>>>
> > > >>>>>> copying the "changelog" topic. I think they are totally
> > > >>>>>>>
> > > >>>>>> independent,
> > > >>>
> > > >>>> actually. For one thing, you can't depend on the existence of a
> > > >>>>>>>
> > > >>>>>> "changelog"
> > > >>>>>>
> > > >>>>>>> topic in general, only within Streams, but Kafka's user base
> > > >>>>>>>
> > > >>>>>> clearly
> > > >>>
> > > >>>> exceeds Streams's user base. Plus, you actually also can't depend
> > > >>>>>>>
> > > >>>>>> on
> > > >>>
> > > >>>> the
> > > >>>>>
> > > >>>>>> existence of a changelog topic within Streams, since that is an
> > > >>>>>>>
> > > >>>>>> optional
> > > >>>>>
> > > >>>>>> feature of *some* state store implementations. Even in the
> > > >>>>>>>
> > > >>>>>> situation
> > > >>>
> > > >>>> where
> > > >>>>>>
> > > >>>>>>> you do have a changelog topic in Streams, there may be use
> cases
> > > >>>>>>>
> > > >>>>>> where
> > > >>>>
> > > >>>>> it
> > > >>>>>
> > > >>>>>> makes sense to expand the partitions of just the input, or just
> > the
> > > >>>>>>> changelog.
> > > >>>>>>>
> > > >>>>>>> The ask for a Core feature of split+backfill is really about
> > > >>>>>>>
> > > >>>>>> supporting
> > > >>>>
> > > >>>>> the
> > > >>>>>>
> > > >>>>>>> use case of splitting partitions in log-compacted topics,
> > > >>>>>>>
> > > >>>>>> regardless
> > > >>>
> > > >>>> of
> > > >>>>
> > > >>>>> whether that topic is an "input" or a "changelog" or anything
> else
> > > >>>>>>>
> > > >>>>>> for
> > > >>>>
> > > >>>>> that
> > > >>>>>>
> > > >>>>>>> matter.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Point 6:
> > > >>>>>>> On the concern about the performance overhead of copying data
> > > >>>>>>>
> > > >>>>>> between
> > > >>>
> > > >>>> the
> > > >>>>>
> > > >>>>>> brokers, I think it's actually a bit overestimated. Splitting a
> > > >>>>>>>
> > > >>>>>> topic's
> > > >>>>
> > > >>>>> partition is probably rare, certainly rarer in general than
> > > >>>>>>>
> > > >>>>>> bootstrapping
> > > >>>>>
> > > >>>>>> new consumers on that topic. If "bootstrapping new consumers"
> > means
> > > >>>>>>>
> > > >>>>>> that
> > > >>>>>
> > > >>>>>> they have to re-shuffle the data before they consume it, then
> you
> > > >>>>>>>
> > > >>>>>> wind
> > > >>>>
> > > >>>>> up
> > > >>>>>
> > > >>>>>> copying the same record multiple times:
> > > >>>>>>>
> > > >>>>>>> (broker: input topic) -> (initial consumer) -> (broker:
> > repartition
> > > >>>>>>>
> > > >>>>>> topic)
> > > >>>>>>
> > > >>>>>>> -> (real consumer)
> > > >>>>>>>
> > > >>>>>>> That's 3x, and it's also 3x for every new record after the
> split
> > as
> > > >>>>>>>
> > > >>>>>> well,
> > > >>>>>
> > > >>>>>> since you don't get to stop repartitioning/reshuffling once you
> > > >>>>>>>
> > > >>>>>> start.
> > > >>>>
> > > >>>>> Whereas if you do a backfill in something like the procedure I
> > > >>>>>>>
> > > >>>>>> outlined,
> > > >>>>>
> > > >>>>>> you only copy the prefix of the partition before the split, and
> > you
> > > >>>>>>>
> > > >>>>>> send
> > > >>>>>
> > > >>>>>> it
> > > >>>>>>
> > > >>>>>>> once to the producer and then once to the new generation
> > partition.
> > > >>>>>>>
> > > >>>>>> Plus,
> > > >>>>>
> > > >>>>>> assuming we're splitting the partition for the benefit of
> > > >>>>>>>
> > > >>>>>> consumers,
> > > >>>
> > > >>>> there's no reason we can't co-locate the post-split partitions on
> > > >>>>>>>
> > > >>>>>> the
> > > >>>
> > > >>>> same
> > > >>>>>>
> > > >>>>>>> host as the pre-split partition, making the second copy a local
> > > >>>>>>>
> > > >>>>>> filesystem
> > > >>>>>>
> > > >>>>>>> operation.
> > > >>>>>>>
> > > >>>>>>> Even if you follow these two copies up with bootstrapping a new
> > > >>>>>>>
> > > >>>>>> consumer,
> > > >>>>>
> > > >>>>>> it's still rare for this to occur, so you get to amortize these
> > > >>>>>>>
> > > >>>>>> copies
> > > >>>>
> > > >>>>> over
> > > >>>>>>
> > > >>>>>>> the lifetime of the topic, whereas a reshuffle just keeps
> making
> > > >>>>>>>
> > > >>>>>> copies
> > > >>>>
> > > >>>>> for
> > > >>>>>>
> > > >>>>>>> every new event.
> > > >>>>>>>
> > > >>>>>>> And finally, I really do think that regardless of any
> performance
> > > >>>>>>>
> > > >>>>>> concerns
> > > >>>>>>
> > > >>>>>>> about this operation, if it preserves loose organizational
> > > >>>>>>>
> > > >>>>>> coupling,
> > > >>>
> > > >>>> it
> > > >>>>
> > > >>>>> is
> > > >>>>>>
> > > >>>>>>> certainly worth it.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> In conclusion:
> > > >>>>>>> It might actually be a good idea for us to clarify the scope of
> > > >>>>>>>
> > > >>>>>> KIP-253.
> > > >>>>>
> > > >>>>>> If
> > > >>>>>>
> > > >>>>>>> we're all agreed that it's a good algorithm for allowing
> in-order
> > > >>>>>>>
> > > >>>>>> message
> > > >>>>>
> > > >>>>>> delivery during partition expansion, then we can continue this
> > > >>>>>>>
> > > >>>>>> discussion
> > > >>>>>
> > > >>>>>> as a new KIP, something like "backfill with partition
> expansion".
> > > >>>>>>>
> > > >>>>>> This
> > > >>>>
> > > >>>>> would let Dong proceed with KIP-253. On the other hand, if it
> seems
> > > >>>>>>>
> > > >>>>>> like
> > > >>>>>
> > > >>>>>> this conversation may alter the design of KIP-253, then maybe we
> > > >>>>>>>
> > > >>>>>> *should*
> > > >>>>>
> > > >>>>>> just finish working it out.
> > > >>>>>>>
> > > >>>>>>> For my part, my only concern about KIP-253 is the one I raised
> > > >>>>>>>
> > > >>>>>> earlier.
> > > >>>>
> > > >>>>> Thanks again, all, for considering these points,
> > > >>>>>>> -John
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <lindon...@gmail.com
> >
> > > >>>>>>>
> > > >>>>>> wrote:
> > > >>>>
> > > >>>>> On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <lindon...@gmail.com>
> > > >>>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hey Jan,
> > > >>>>>>>>>
> > > >>>>>>>>> Thanks for the enthusiasm in improving Kafka's design. Now
> > > >>>>>>>>>
> > > >>>>>>>> that I
> > > >>>
> > > >>>> have
> > > >>>>>>
> > > >>>>>>> read through your discussion with Jun, here are my thoughts:
> > > >>>>>>>>>
> > > >>>>>>>>> - The latest proposal should with log compacted topics by
> > > >>>>>>>>>
> > > >>>>>>>> properly
> > > >>>>
> > > >>>>> deleting old messages after a new message with the same key is
> > > >>>>>>>>>
> > > >>>>>>>> produced.
> > > >>>>>>>
> > > >>>>>>>> So
> > > >>>>>>>>
> > > >>>>>>>>> it is probably not a concern anymore. Could you comment if
> > > >>>>>>>>>
> > > >>>>>>>> there
> > > >>>
> > > >>>> is
> > > >>>>
> > > >>>>> still
> > > >>>>>>>
> > > >>>>>>>> issue?
> > > >>>>>>>>>
> > > >>>>>>>>> - I wrote the SEP-5 and I am pretty familiar with the
> > > >>>>>>>>>
> > > >>>>>>>> motivation
> > > >>
> > > >>
> > >
> >
>

Reply via email to