Hi all,

Not sure if there is further concern with this KIP. And the vote for
KIP-232 is also blocked on the discussion of this KIP. I am happy to
continue the discussion and move forward the progress of this KIP.

Thanks,
Dong

On Tue, Apr 17, 2018 at 8:58 PM, Jeff Chao <jc...@salesforce.com> wrote:

> Hi Dong, I took a look at KIP-287. Producers writing using the new scheme
> prior to processor catch up and cut-over makes sense. Thanks.
>
> On Sat, Apr 14, 2018 at 7:09 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jeff,
> >
> > Thanks for the review. The scheme for expanding processors of the
> stateful
> > processing job is described in "Support processor expansion" section in
> > KIP-287 (link
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 287%3A+Support+partition+and+processor+expansion+for+
> > stateful+processing+jobs#KIP-287:Supportpartitionandprocessorex
> > pansionforstatefulprocessingjobs-Supportprocessorexpansion>).
> > In particular, we will expand the partitions of the input topics before
> > expanding processors of the processing job. While the new processor is
> > catching up, the producer would already be producing using the new
> > partitioning scheme. Does this answer your question?
> >
> > Thanks,
> > Dong
> >
> >
> > On Sat, Apr 14, 2018 at 1:44 PM, Jeff Chao <jc...@salesforce.com> wrote:
> >
> > > Hi, I had a question from Monday's meeting. The current mechanism, as
> > Ray's
> > > notes points out, is to create a copy consumer in which a switch over
> > > happens when it catches up. Meanwhile, it looks like a producer would
> > still
> > > be writing using the old partitioning scheme. Wouldn't there be a case
> > > where the copy consumer never catches up given a high enough produce
> > rate?
> > > I'm imagining this to work similarly to Kinesis's resharding mechanism,
> > > though I may be missing something from the current proposal. Anyway,
> with
> > > Kinesis as an example, this would mean a producer would stop writing
> > using
> > > the old scheme and start writing using the new scheme. That way, the
> > > consumer will be able to catch up in which it will start consuming
> using
> > > the new scheme afterwards.
> > >
> > > Thanks,
> > > Jeff Chao
> > >
> > > On Sat, Apr 14, 2018 at 6:44 AM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Thanks for the notes by Jun and Ray. I have read through the notes.
> It
> > > > seems that there are a few questions for the alternative solution by
> > Jan
> > > > and maybe Jan will answer these questions later?
> > > >
> > > > I have summarized the solution, which I previously provided in this
> > > thread,
> > > > in KIP-287, to hopefully show that we may have a promising direction
> to
> > > > solve the partition and processor expansion problem for stateful
> > > processing
> > > > jobs. Maybe we can continue the discussion related to stateful
> > processing
> > > > in that discussion thread. If that looks good, maybe we can conclude
> > the
> > > > discussion for KIP-253.
> > > >
> > > > Not sure if I missed some ideas or questions in the notes. Is there
> > > > specific concern for the latest KIP-253?
> > > >
> > > >
> > > > On Mon, Apr 9, 2018 at 11:18 AM, Ray Chiang <rchi...@apache.org>
> > wrote:
> > > >
> > > > > My notes from today's meeting.  Sorry if I got anyone's name wrong.
> > > Plus
> > > > I
> > > > > missed a few moments with noise at home and/or dropped video.
> > > > >
> > > > > -Ray
> > > > >
> > > > > =====
> > > > >
> > > > > KIP-253 Discussion
> > > > >
> > > > > - Currently, adding partitions can cause keys to be read
> > out-of-order.
> > > > >   This KIP is trying to preserve the key ordering when adding
> > > partitions.
> > > > >
> > > > > - State management in applications (i.e. Kafka Streams) can
> maintain
> > > > >   local state via caching.  If the number of partitions changes,
> how
> > > > >   would those applications update their local state.  This is the
> > > current
> > > > >   point of discussion/disagreement.
> > > > >
> > > > > - Jan Filipiak is mostly worried about log compacted topics.  Not
> as
> > > > >   concerned about producer swapping.  Worried about the consumer
> > design
> > > > is
> > > > >   a bit contradictory compared to the architecture.
> > > > >
> > > > >   Current design is to start up a new consumer in parallel with old
> > > > >   topic/consumer.  Run until consumer finishes "copying" to the new
> > > > topic.
> > > > >   Once the consumer is caught up, point the producer at the new
> > topic.
> > > > >
> > > > >   Would like to have this technique as a "core primitive" to Kafka.
> > > > >   - Is this a useful primitive?
> > > > >   - What's the best way to support it?
> > > > >
> > > > >   - Topic expansion as it currently exists just "adds partitions".
> > But
> > > > >     how does this affect bootstrapping applications?  How to deal
> > with
> > > > >     "moved" (from "old partition" to "new expanded partition")
> keys?
> > > > >
> > > > >   - Dong's proposal example.  10 partitions growing to 15.  5 of
> the
> > > > >     first 10 partitions are split into 2 each.  Say Kafka remembers
> > > > >     parent->child relationship.  Then for each parent partition,
> > there
> > > > >     are two child partitions.  Initially, there were 10 states to
> > > > >     manage.  Then bootstrapping new application would have 15
> states.
> > > > >     Need to know which "generation" of partition you are consuming
> > > > >     from.  Until you get to "newer" generation of data, then the
> keys
> > > > >     will be find (i.e. reading from old partition).
> > > > >
> > > > >   - Scheme works well for transient events.  Any stateful processor
> > > will
> > > > >     likely break.
> > > > >
> > > > >   - Tracking can become extremely complicated, since each split
> > > requires
> > > > >     potentially more and more offset/partition combos.
> > > > >
> > > > >   - Need to support continuation for consumers to read the new
> > > > partitions.
> > > > >
> > > > >   - With linear hashing, integral multiple increase (2x, 3x, 4x,
> > etc).
> > > > >     Easier mapping from old partition sets to new partition sets.
> > > > >     Keys end up with a clean hierarchy instead of a major
> > reshuffling.
> > > > >
> > > > >   - Dong's approach piggyback on existing leader epoch.  Log
> segment
> > > > >     could be tagged with version in linear hashing case.
> > > > >
> > > > >   - In Jan's case, existing consumers bootstrap from the beginning.
> > > > >
> > > > >   - James' use case.  Using Kafka as a long term persistent data
> > store.
> > > > >     Putting "source of truth" information into Kafka.  Bootstrap
> case
> > > > >     is very important.  New applications could be bootstrapping as
> > they
> > > > >     come up.
> > > > >
> > > > >     - Increasing partitions will help with load from prodcuer and
> > > > >       increasing consumer parallelism.
> > > > >     - How does Kinesis handling partition splits?  They don't have
> > > > >       compacted logs, so no issue with bootstrapping.  Kinesis uses
> > > > >       MD5 and splits results based on md5sum into bucket ranges.
> > > > >     - Is it useful for the server to know the partitioning
> function?
> > > > >       Consumer has some implicit assumptions about keyed
> partitions,
> > > > >       but not strongly enforced on server side.
> > > > >
> > > > >     - KIP-213 (one to many joins in Kafka Streams)
> > > > >
> > > > >       - MySQL case.  Primary key forced to be used as Kafka key.
> > > > >
> > > > >         (Sorry had some audio and video drop at this point)
> > > > >
> > > > >       - Mirror Maker.  Source cluster has custom partitioning
> > function.
> > > > >         Producer won't duplicate to same partitioning setup as
> > source.
> > > > >         Need to provide same partitioning function to producer.
> > > > >         Would need to determine partitioning function based on
> topic.
> > > > >
> > > > >         - Should server validate partitioning?
> > > > >         - Who does actual determination of which key goes to which
> > > > > partition.
> > > > >
> > > > >       - How to implement backfill?
> > > > >
> > > > >         - Who will do it?  In producer?  Hard to do.  Every client
> > > would
> > > > >           need to add this functionality.  Better to do on server
> > side.
> > > > >         - Add a type of "copy consumer"?  Add backoff to producer?
> > > > >           Benefit of doing in consumer vs. producer?
> > > > >
> > > > >   - Still TBD
> > > > >     - How to dedupe control messages?
> > > > >     - How to deal with subtle cases during transition?
> > > > >     - Is it useful for the server to have the option to validate
> the
> > > key
> > > > >       distribution?
> > > > >     - Jan concerned about how a consumer application would look
> with
> > > the
> > > > >       new "split partition" design.
> > > > >     - KIP introduced callback.  Jan doesn't think is useful.
> Callback
> > > > >       for switching "between Partition 1 and can start on Partition
> > > 11".
> > > > >       Rely on marker in Partition 1 instead.  Intent for callback
> is
> > > > >       for possibility that delivery of messages for given key is
> > moved
> > > > >       to a different consumer instance.
> > > > >
> > > > >
> > > > >
> > > > > On 4/6/18 9:44 AM, Dong Lin wrote:
> > > > >
> > > > >> Hey John,
> > > > >>
> > > > >> Thanks much for your super-detailed explanation. This is very
> > helpful.
> > > > >>
> > > > >> Now that I have finished reading through your email, I think the
> > > > proposed
> > > > >> solution in my previous email probably meets the requirement #6
> > > without
> > > > >> requiring additional coordination (w.r.t. partition function)
> among
> > > > >> clients. My understanding of requirement #6 is that, after
> partition
> > > > >> expansion, messages with the given key will go to the same
> consumer
> > > > before
> > > > >> and after the partition expansion such that stream processing jobs
> > > won't
> > > > >> be
> > > > >> affected. Thus this approach seems to be better than backfilling
> > since
> > > > it
> > > > >> does not require data copy for input topics.
> > > > >>
> > > > >> In order for the proposed solution to meet requirements #6, we
> need
> > > two
> > > > >> extra requirements in addition to what has been described in the
> > > > previous
> > > > >> email: 1) stream processing job starts with the same number of
> > > > processors
> > > > >> as the initial number of partitions of the input topics; and 2) at
> > any
> > > > >> given time the number of partitions of the input topic >= the
> number
> > > of
> > > > >> processors of the given stream processing job.
> > > > >>
> > > > >> Could you take a look at the proposed solution and see if any of
> the
> > > > >> claims
> > > > >> above is false?
> > > > >>
> > > > >>
> > > > >> Hey Jan,
> > > > >>
> > > > >> Maybe it is more efficient for us to discuss your concern in the
> KIP
> > > > >> Meeting.
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >> Dong
> > > > >>
> > > > >>
> > > > >> On Thu, Mar 29, 2018 at 2:05 PM, John Roesler <j...@confluent.io>
> > > > wrote:
> > > > >>
> > > > >> Hi Jun,
> > > > >>>
> > > > >>> Thanks for the response. I'm very new to this project, but I will
> > > share
> > > > >>> my
> > > > >>> perspective. I'm going to say a bunch of stuff that I know you
> know
> > > > >>> already, but just so we're on the same page...
> > > > >>>
> > > > >>> This may also be a good time to get feedback from the other
> > KStreams
> > > > >>> folks.
> > > > >>>
> > > > >>> Using KStreams as a reference implementation for how stream
> > > processing
> > > > >>> frameworks may interact with Kafka, I think it's important to
> > eschew
> > > > >>> knowledge about how KStreams currently handles internal
> > > communication,
> > > > >>> making state durable, etc. Both because these details may change,
> > and
> > > > >>> because they won't be shared with other stream processors.
> > > > >>>
> > > > >>> =================================
> > > > >>> Background
> > > > >>>
> > > > >>> We are looking at a picture like this:
> > > > >>>
> > > > >>>       input input input
> > > > >>>           \   |   /
> > > > >>>        +-------------+
> > > > >>> +-----+ Consumer(s) +-------+
> > > > >>> |     +-------------+       |
> > > > >>> |                           |
> > > > >>> |    KStreams Application   |
> > > > >>> |                           |
> > > > >>> |     +-------------+       |
> > > > >>> +-----+ Producer(s) +-------+
> > > > >>>        +-------------+
> > > > >>>             /    \
> > > > >>>          output output
> > > > >>>
> > > > >>> The inputs and outputs are Kafka topics (and therefore have 1 or
> > more
> > > > >>> partitions). We'd have at least 1 input and 0 or more outputs.
> The
> > > > >>> Consumers and Producers are both the official KafkaConsumer and
> > > > >>> KafkaProducer.
> > > > >>>
> > > > >>> In general, we'll assume that the input topics are provided by
> > actors
> > > > >>> over
> > > > >>> which we have no control, although we may as well assume they are
> > > > >>> friendly
> > > > >>> and amenable to requests, and also that their promises are
> > > trustworthy.
> > > > >>> This is important because we must depend on them to uphold some
> > > > promises:
> > > > >>> * That they tell us the schema of the data they publish, and
> abide
> > by
> > > > >>> that
> > > > >>> schema. Without this, the inputs are essentially garbage.
> > > > >>> * That they tell us some defining characteristics of the topics
> > (more
> > > > on
> > > > >>> this in a sec.) and again strictly abide by that promise.
> > > > >>>
> > > > >>> What are the topic characteristics we care about?
> > > > >>> 1. The name (or name pattern)
> > > > >>> 2. How the messages are keyed (if at all)
> > > > >>> 3. Whether the message timestamps are meaningful, and if so, what
> > > their
> > > > >>> meaning is
> > > > >>> 4. Assuming the records have identity, whether the partitions
> > > partition
> > > > >>> the
> > > > >>> records' identity space
> > > > >>> 5. Whether the topic completely contains the data set
> > > > >>> 6. Whether the messages in the topic are ordered
> > > > >>>
> > > > >>> #1 is obvious: without this information, we cannot access the
> data
> > at
> > > > >>> all.
> > > > >>>
> > > > >>> For #2, #3, #4, and #6, we may or may not need this information,
> > > > >>> depending
> > > > >>> on the logic of the application. For example, a trivial
> application
> > > > that
> > > > >>> simply counts all events it sees doesn't care about #2, #3, #4,
> or
> > > #6.
> > > > >>> But
> > > > >>> an application that groups by some attribute can take advantage
> of
> > #2
> > > > and
> > > > >>> #4 if the topic data is already keyed and partitioned over that
> > > > >>> attribute.
> > > > >>> Likewise, if the application includes some temporal semantics on
> a
> > > > >>> temporal
> > > > >>> dimension that is already captured in #3, it may take advantage
> of
> > > that
> > > > >>> fact.
> > > > >>>
> > > > >>> Note that #2, #3, #4, and #6 are all optional. If they are not
> > > > promised,
> > > > >>> we
> > > > >>> can do extra work inside the application to accomplish what we
> > need.
> > > > >>> However, if they are promised (and if we depend on that promise),
> > it
> > > is
> > > > >>> essential that the topic providers uphold those promises, as we
> may
> > > not
> > > > >>> be
> > > > >>> in a position to verify them.
> > > > >>>
> > > > >>> Note also that if they make a promise, but it doesn't happen to
> > line
> > > up
> > > > >>> with our needs (data is keyed by attr1, but we need it by attr2,
> or
> > > > >>> timestamp is produce-time, but we need it by event-time, etc.),
> > then
> > > we
> > > > >>> will have to go ahead and do that extra work internally anyway.
> > This
> > > > also
> > > > >>> captures the situation in which two inputs are produced by
> > different
> > > > >>> providers, one of which meets our needs, and the other does not.
> > The
> > > > fact
> > > > >>> that we can cope with this situation is the basis for my
> statement
> > > that
> > > > >>> we
> > > > >>> do not require coordination among producers.
> > > > >>>
> > > > >>> (Key Point A): In terms of optimization, #4 and #6 are the most
> > > > valuable.
> > > > >>> If these characteristics happen to line up with our needs, then
> > > > KStreams
> > > > >>> can be incredibly efficient in both time and computational
> > resources.
> > > > >>>
> > > > >>>   #5 is similar to knowing the schema in that it tells us whether
> > the
> > > > >>> computation we want to do is possible or not. For example,
> suppose
> > we
> > > > >>> have
> > > > >>> a topic of "users", and we want to construct a table for
> querying.
> > If
> > > > the
> > > > >>> user topic doesn't completely contain the dataset, we cannot
> > > construct
> > > > >>> the
> > > > >>> table. Note that it doesn't matter whether the topic is compacted
> > or
> > > > not.
> > > > >>> If the topic is complete, I can consume it starting at "earliest"
> > and
> > > > >>> build
> > > > >>> my table. If it is not complete, I can do other computations on
> it.
> > > In
> > > > >>> both
> > > > >>> cases, it may or may not be compacted; it just doesn't matter.
> > > > >>>
> > > > >>> On the output side, the roles are reversed. We provide (or not)
> > > exactly
> > > > >>> the
> > > > >>> same set of guarantees to consumers of our outputs, and we
> likewise
> > > > must
> > > > >>> abide by the promises we make.
> > > > >>>
> > > > >>>
> > > > >>> =================================
> > > > >>> Partition Expansion
> > > > >>>
> > > > >>> With this formation in place, let's talk about partition
> expansion.
> > > > >>>
> > > > >>> Why do we have partitions in the first place? (let me know if I
> > miss
> > > > >>> something here)
> > > > >>> * For logical data streams that are themselves partitionable, it
> > > allows
> > > > >>> producers to operate concurrently without coordination. For
> > example,
> > > > >>> streaming data from a sensor in a particle accelerator, the
> sensor
> > > can
> > > > be
> > > > >>> subdivided into a grid and each grid square can produce
> > independently
> > > > to
> > > > >>> a
> > > > >>> different topic. This may be valuable because the total rate of
> > data
> > > > >>> exceeds the throughput to a single broker node or just because it
> > > > allows
> > > > >>> for failure of a single producer to cause the loss of only part
> of
> > > the
> > > > >>> data.
> > > > >>> * The brokers can offer linearly scaling throughput on the number
> > of
> > > > >>> partitions by hosting each partition on a separate broker node
> > > > >>> * The brokers can host topics that are too large to fit on a
> single
> > > > >>> broker's storage by hosting some partitions on separate broker
> > nodes
> > > > >>> * In cases where the use case permits handling partitions
> > > > independently,
> > > > >>> consumers can have algorithmic simplicity by processing the data
> > for
> > > > >>> separate partitions in separate threads, avoiding costly and
> > > > error-prone
> > > > >>> concurrent coordination code
> > > > >>> * In cases where the use case permits handling partitions
> > > > independently,
> > > > >>> consumers can exceed the total throughput of a single
> > broker-consumer
> > > > >>> pair
> > > > >>> * Just to throw this in as well, in cases where some network
> links
> > > are
> > > > >>> less
> > > > >>> costly than others (or lower latency or more reliable), such as
> > when
> > > > >>> brokers, producers, and consumers are running in racks: producer
> > and
> > > > >>> consumers can both benefit (independently) by locating work on
> each
> > > > >>> partition in the same rack as the broker hosting that partition.
> > > > >>>
> > > > >>> In other words, we have three actors in this system: producers,
> > > > brokers,
> > > > >>> and consumers, and they all benefit from partitioning for
> different
> > > > (but
> > > > >>> sometimes related) reasons.
> > > > >>>
> > > > >>> This leads naturally to the conclusion that any of these actors
> may
> > > > find
> > > > >>> themselves in a sub-optimal or even dangerous situation in which
> > > > >>> partition
> > > > >>> expansion would be the solution. For example, the producer may
> find
> > > > that
> > > > >>> the existing throughput to the brokers is insufficient to match
> the
> > > > data
> > > > >>> rate, forcing them to drop data. Or a broker hosting a single
> > > partition
> > > > >>> may
> > > > >>> be running out of disk space. Or a consumer node handling a
> single
> > > > >>> partition cannot match the rate of production for that partition,
> > > > causing
> > > > >>> it to fall behind.
> > > > >>>
> > > > >>> I think it's reasonable to assume that all the actors in the
> system
> > > > can't
> > > > >>> just arbitrarily expand a topic's partition. I think it's
> > reasonable
> > > to
> > > > >>> align this responsibility with the provider of the data, namely
> the
> > > > >>> producer (the logical producer, not the KafkaProducer class).
> > > > Therefore,
> > > > >>> the producer who finds themselves in trouble can unilaterally
> > expand
> > > > >>> partitions to solve their problem.
> > > > >>>
> > > > >>> For the broker or a consumer in trouble, they have only one
> resort:
> > > to
> > > > >>> request the producer to expand partitions. This is where it's
> > helpful
> > > > to
> > > > >>> assume the producer is friendly.
> > > > >>>
> > > > >>>
> > > > >>> Now, let's look at how a KStreams application fits into this
> > > scenario.
> > > > >>>
> > > > >>> (Key Point B1): As a consumer, we may find that the producer
> > expands
> > > > the
> > > > >>> partitions of a topic, either for their own benefit or for the
> > > brokers.
> > > > >>> In
> > > > >>> this situation, the expand operation MUST NOT violate any
> promises
> > > that
> > > > >>> have previously been made to us. This is the essence of KIP-253,
> to
> > > > >>> ensure
> > > > >>> the maintenance of promises #6 and #4. It would be great if the
> > > > mechanics
> > > > >>> of the expansion required no major disruption to processing or
> > human
> > > > >>> intervention.
> > > > >>>
> > > > >>> Specifically, let's say that input partition X splits into X1 and
> > X2.
> > > > #6
> > > > >>> requires that the same old ordering guarantees of Kafka continue
> to
> > > > hold.
> > > > >>> Obviously, this is what KIP-253's title is about. #4 requires
> that
> > we
> > > > >>> either ensure that X1 and X2 are assigned to the same thread that
> > was
> > > > >>> previously assigned X OR that we immediately pause processing and
> > > split
> > > > >>> any
> > > > >>> state such that it appears X1 and X2 were *always* separate
> > > partitions.
> > > > >>>
> > > > >>> In other words, Option 1 is we treat X1 and X2 as still logically
> > one
> > > > >>> partition, equal to X. This is ideal, since in this scenario,
> > > > partitions
> > > > >>> are expanding for external reasons. We don't need to expand our
> > > > >>> processing
> > > > >>> to match. Option 2 requires a major disruption, since we'd have
> to
> > > > pause
> > > > >>> processing while we split our state. Clearly, KStreams or any
> other
> > > > >>> stateful consumer would advocate for Option 1.
> > > > >>>
> > > > >>>
> > > > >>> (Corollary to Key Point A): Still on the consumer side, we may
> find
> > > > that
> > > > >>> we
> > > > >>> ourselves can benefit from partition expansion of an input. Since
> > we
> > > > can
> > > > >>> cope with the absence of promise #4, partition expansion is not a
> > > hard
> > > > >>> requirement for us, but assuming we were already benefiting from
> > the
> > > > >>> major
> > > > >>> performance optimizations afforded by #4, it would be nice to be
> > able
> > > > to
> > > > >>> request the producer satisfy our request for partition expansion
> > > **and
> > > > to
> > > > >>> be able to benefit from it**.
> > > > >>>
> > > > >>> What does it mean to be able to benefit from partition expansion?
> > > > >>> Assuming
> > > > >>> input topic partition X splits into X1 and X2, in this scenario,
> we
> > > > >>> *would*
> > > > >>> wish to be able to split our state such that it appears X1 and X2
> > > were
> > > > >>> *always* separate partitions. Of course, the conclusion of Key
> > Point
> > > B1
> > > > >>> still applies: we should be able to continue operating on (X1+X2
> =
> > X)
> > > > as
> > > > >>> one partition while asynchronously building the state of X1 and
> X2
> > > > >>> separately.
> > > > >>>
> > > > >>> When it comes to the mechanics of building the state of X1 and X2
> > > > >>> separately, we have really just two high-level options. Either
> this
> > > > >>> problem
> > > > >>> is solved by Kafka itself, giving me a view in which X1 and X2
> were
> > > > >>> always
> > > > >>> separate partitions, or I have to do it myself. The latter means
> > > that I
> > > > >>> have to take on substantially more complexity than I do today:
> > > > >>> Bummer 1: My state has to be splittable to begin with, implying
> at
> > > the
> > > > >>> least that I need to be able to scan every record in my state, a
> > > > >>> requirement that otherwise does not exist.
> > > > >>> Bummer 2: After splitting the state  of X1 and X2, I need to be
> > able
> > > to
> > > > >>> send at least one of those tasks, state included, to another
> > > > application
> > > > >>> node (in order to realize the benefit of the split). This is
> also a
> > > > >>> requirement that does not currently exist.
> > > > >>> Bummer 3: In order to actually perform the split, I must know and
> > be
> > > > able
> > > > >>> to execute the exact same partition function the producer of my
> > topic
> > > > >>> uses.
> > > > >>> This introduces a brand-new a priori commitment from my input
> > > > producers:
> > > > >>> (would-be #7: convey the partition function and abide by it).
> This
> > > is a
> > > > >>> big
> > > > >>> restriction over #4, which only requires them to guarantee *that
> > > there
> > > > >>> is a
> > > > >>> partition function*. Now they actually have to share the function
> > > with
> > > > >>> me.
> > > > >>> And I have to be able to implement and execute it myself. And if
> > the
> > > > >>> producer wishes to refine the partition function for an existing
> > > topic,
> > > > >>> we
> > > > >>> have another round of coordination, as they have to be sure that
> I,
> > > and
> > > > >>> all
> > > > >>> other consumers, begin using the new function *before* they do.
> > This
> > > is
> > > > >>> similar to the schema problem, with a similar set of solutions.
> We
> > > > would
> > > > >>> likely need a partition function registry and another magic byte
> on
> > > > every
> > > > >>> record to be sure we do this right. Not to mention some way to
> > > express
> > > > >>> arbitrary partitioning logic over arbitrary data in a way that is
> > > > >>> portable
> > > > >>> across programming languages.
> > > > >>>
> > > > >>> Alternatively, if Kafka gives me a view in which X1 and X2 were
> > > always
> > > > >>> separate, then I can create tasks for X1 and X2 and allow them to
> > > > >>> bootstrap
> > > > >>> while I continue to process X. Once they are ready, I can
> > coordinate
> > > a
> > > > >>> transition to stop X's task and switch to X1 and X2. None of
> those
> > > > >>> bummers
> > > > >>> are present, so this is a significantly better option for me.
> > > > >>>
> > > > >>> (Key Point B2): As a (friendly) producer, we may once again want
> on
> > > our
> > > > >>> own
> > > > >>> to expand partitions, or we may want to satisfy a request from
> the
> > > > broker
> > > > >>> or our consumers to do so. Again, we MUST NOT violate any
> promises
> > we
> > > > >>> have
> > > > >>> previously given, and it would be great if the expansion required
> > no
> > > > >>> major
> > > > >>> disruption to processing or human intervention. Additionally,
> since
> > > we
> > > > >>> are
> > > > >>> actually driving the expansion, it would also be great if we
> could
> > > > avoid
> > > > >>> Bummer 3's coordination problems from the producer's perspective.
> > > > >>>
> > > > >>>
> > > > >>> ======================================
> > > > >>> Briefly: KStreams internals
> > > > >>>
> > > > >>> I'm pretty sure you were asking me to comment on the
> implementation
> > > > >>> details
> > > > >>> of KStreams, so I'll say a few words about it. The most important
> > > thing
> > > > >>> is
> > > > >>> that KStreams is still very early in its development. Maybe
> > > > "early-middle
> > > > >>> maturity" is a good way to put it. We are actively discussing
> > > > >>> more-or-less
> > > > >>> major implementation changes to improve performance, footprint,
> > > > >>> scalability, and ergonomics. So it may actually be misleading to
> > > > discuss
> > > > >>> deeply how KStreams internally uses Kafka topics.
> > > > >>>
> > > > >>> Nevertheless, it is currently true that KStreams uses Kafka
> topics
> > > for
> > > > >>> communication between some internal computation nodes. We
> partition
> > > > these
> > > > >>> topics as the base unit of concurrency granularity, so it would
> > > > >>> potentially
> > > > >>> be beneficial to be able to expand partitions for some of these
> > > > internal
> > > > >>> topics at some point. However, we can alternatively just
> > > overpartition
> > > > >>> these internal topics, creating in the low 100s of partitions
> > instead
> > > > of
> > > > >>> the low 10s, for example. (Side note: if Kafka were someday to
> > > support
> > > > >>> higher numbers of partitions, we could expand this scheme to
> > > > >>> overpartition
> > > > >>> in the 1000s of partitions.) With the option to overpartition, we
> > > don't
> > > > >>> have a strong need for partition expansion internally.
> > > > >>>
> > > > >>> It is also currently true that KStreams uses Kafka to store a
> > durable
> > > > >>> changelog for some of our internal state stores. But we *only*
> read
> > > > from
> > > > >>> this topic *if* we need to restore a state store after node loss
> > (or
> > > to
> > > > >>> maintain a hot mirror of the state store), so I think it's
> unlikely
> > > > that
> > > > >>> we
> > > > >>> would ever make use of partition expansion on the changelog
> topics.
> > > > >>>
> > > > >>> But once again, I'd like to emphasize that we may choose an
> > > alternative
> > > > >>> implementation for either interprocess communication or state
> > > > durability.
> > > > >>>
> > > > >>>
> > > > >>> ======================================
> > > > >>> Concluding thoughts
> > > > >>>
> > > > >>> I know this is a very long email, and I really appreciate you
> > > sticking
> > > > >>> with
> > > > >>> me this long. I hope it was useful for syncing our mental picture
> > of
> > > > this
> > > > >>> system. Also, you're far more knowledgeable than I am about this
> > > system
> > > > >>> and
> > > > >>> this domain, so please correct me if I've said anything wrong.
> > > > >>>
> > > > >>> To me the key takeaways are that:
> > > > >>> - KIP-253 satisfies all we need for correctness, since it
> contains
> > > > >>> solutions to guarantee producers can abide by their promises
> w.r.t.
> > > #4
> > > > >>> and
> > > > >>> #6.
> > > > >>> - From Key Point A: #4 is actually optional for KIP-253, but
> > without
> > > > it,
> > > > >>> we
> > > > >>> lose a potentially valuable optimization in KStreams (and all
> other
> > > > >>> consumer applications)
> > > > >>> - From Corollary to Point A: Without low-level support for
> > partition
> > > > >>> expansion with backfill, we cannot employ requesting partition
> > > > expansion
> > > > >>> as
> > > > >>> a consumer to improve application performance. In that case, to
> > > ensure
> > > > >>> performance scalability, we would have to discard for all
> KStreams
> > > > >>> applications the performance optimization afforded by #4.
> > > > >>> - From Key Point B1: After a partition split, we really need to
> be
> > > able
> > > > >>> to
> > > > >>> seamlessly continue operating as if it had not split.
> > > > >>> - From Key Point B2: Since KIP-253 allows us to maintain all our
> > > > >>> promises,
> > > > >>> we have the option of expanding partitions in the topics we
> > produce.
> > > > >>> Without a backfill operation, though, our consumers may not be
> able
> > > to
> > > > >>> realize the benefits of that split, if they were hoping to.
> > > > >>>
> > > > >>> In general, faced with the possibility of having to coordinate
> the
> > > > >>> partition function with our inputs' producers or with our
> outputs'
> > > > >>> consumers, I would personally lean toward overprovisioning and
> > > > completely
> > > > >>> avoiding resize for our use case. This doesn't mean that it's not
> > > > useful
> > > > >>> in
> > > > >>> the ecosystem at large without backfill, just that it loses its
> > > luster
> > > > >>> for
> > > > >>> me. It also means that we can no longer take advantage of some of
> > our
> > > > >>> current optimizations, and in fact that we must introduce an
> extra
> > > hop
> > > > of
> > > > >>> repartitioning on every single input.
> > > > >>>
> > > > >>> I think this is actually a pretty good picture of the
> opportunities
> > > and
> > > > >>> challenges that other consumers and producers in the Kafka
> > ecosystem
> > > > will
> > > > >>> face.
> > > > >>>
> > > > >>> I hope this helps!
> > > > >>>
> > > > >>> Thanks,
> > > > >>> -John
> > > > >>>
> > > > >>> On Wed, Mar 28, 2018 at 11:51 AM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > >>>
> > > > >>> Hi, John,
> > > > >>>>
> > > > >>>> I actually think it's important to think through how KStreams
> > > handles
> > > > >>>> partition expansion in this KIP. If we do decide that we truly
> > need
> > > > >>>> backfilling, it's much better to think through how to add it
> now,
> > > > >>>> instead
> > > > >>>> of retrofitting it later. It would be useful to outline how both
> > > > >>>> existing
> > > > >>>> KStreams jobs and new KStreams jobs work to see if backfilling
> is
> > > > really
> > > > >>>> needed.
> > > > >>>>
> > > > >>>> If we can figure out how KStreams works, at least we have one
> > > > reference
> > > > >>>> implementation for other stream processing frameworks that face
> > the
> > > > same
> > > > >>>> issue.
> > > > >>>>
> > > > >>>> Thanks,
> > > > >>>>
> > > > >>>> Jun
> > > > >>>>
> > > > >>>>
> > > > >>>> On Tue, Mar 27, 2018 at 4:56 PM, John Roesler <
> j...@confluent.io>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>> Hi Jun,
> > > > >>>>>
> > > > >>>>> That's a good point.
> > > > >>>>>
> > > > >>>>> Yeah, I don't think it would work too well for existing
> consumers
> > > in
> > > > >>>>>
> > > > >>>> the
> > > > >>>
> > > > >>>> middle of gen 0 to try and switch to a newly backfilled prefix
> of
> > > gen
> > > > >>>>>
> > > > >>>> 1.
> > > > >>>
> > > > >>>> They probably just need to finish up until they get to the end
> of
> > > gen
> > > > 0
> > > > >>>>>
> > > > >>>> and
> > > > >>>>
> > > > >>>>> transition just as if there were no backfill available yet.
> > > > >>>>>
> > > > >>>>> This isn't terrible, since consumer applications that care
> about
> > > > >>>>>
> > > > >>>> scaling
> > > > >>>
> > > > >>>> up
> > > > >>>>
> > > > >>>>> to match a freshly split partition would wait until after the
> > > > backfill
> > > > >>>>>
> > > > >>>> is
> > > > >>>
> > > > >>>> available to scale up. The consumer that starts out in gen=0,
> > part=0
> > > > is
> > > > >>>>> going to be stuck with part=0 and part=3 in gen=1 in my example
> > > > >>>>>
> > > > >>>> regardless
> > > > >>>>
> > > > >>>>> of whether they finish scanning gen=0 before or after the
> > backfill
> > > is
> > > > >>>>> available.
> > > > >>>>>
> > > > >>>>> The broker knowing when it's ok to delete gen 0, including its
> > > offset
> > > > >>>>> mappings, is a big issue, though. I don't have any immediate
> > ideas
> > > > for
> > > > >>>>> solving it, but it doesn't feel impossible. Hopefully, you
> agree
> > > this
> > > > >>>>>
> > > > >>>> is
> > > > >>>
> > > > >>>> outside of KIP-253's scope, so maybe we don't need to worry
> about
> > it
> > > > >>>>>
> > > > >>>> right
> > > > >>>>
> > > > >>>>> now.
> > > > >>>>>
> > > > >>>>> I do agree that reshuffling in KStreams effectively solves the
> > > > >>>>>
> > > > >>>> scalability
> > > > >>>>
> > > > >>>>> problem as well, as it decouples the partition count (and the
> > > > partition
> > > > >>>>> scheme) upstream from the parallelism of the streams
> application.
> > > > >>>>>
> > > > >>>> Likely,
> > > > >>>
> > > > >>>> we will do this in any case. I'm predominantly advocating for
> > > > follow-on
> > > > >>>>> work to enable backfill for the *other* Kafka users that are
> not
> > > > >>>>>
> > > > >>>> KStreams.
> > > > >>>>
> > > > >>>>> Thanks for your consideration,
> > > > >>>>> -John
> > > > >>>>>
> > > > >>>>> On Tue, Mar 27, 2018 at 6:19 PM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > >>>>>
> > > > >>>>> Hi, John,
> > > > >>>>>>
> > > > >>>>>> Thanks for the reply. I agree that the backfill approach works
> > > > >>>>>>
> > > > >>>>> cleaner
> > > > >>>
> > > > >>>> for
> > > > >>>>>
> > > > >>>>>> newly started consumers. I am just not sure if it's a good
> > > primitive
> > > > >>>>>>
> > > > >>>>> to
> > > > >>>
> > > > >>>> support for existing consumers. One of the challenges that I see
> > is
> > > > >>>>>>
> > > > >>>>> the
> > > > >>>
> > > > >>>> remapping of the offsets. In your approach, we need to copy the
> > > > >>>>>>
> > > > >>>>> existing
> > > > >>>>
> > > > >>>>> records from the partitions in generation 0 to generation 1.
> > Those
> > > > >>>>>>
> > > > >>>>> records
> > > > >>>>>
> > > > >>>>>> will get different offsets in the new generation. The broker
> > will
> > > > >>>>>>
> > > > >>>>> have
> > > > >>>
> > > > >>>> to
> > > > >>>>
> > > > >>>>> store those offset mappings somewhere. When the backfill
> > completes,
> > > > >>>>>>
> > > > >>>>> you
> > > > >>>
> > > > >>>> can
> > > > >>>>>
> > > > >>>>>> delete generation 0's data. However, the broker can't throw
> away
> > > the
> > > > >>>>>>
> > > > >>>>> offset
> > > > >>>>>
> > > > >>>>>> mappings immediately since it doesn't know if there is any
> > > existing
> > > > >>>>>> consumer still consuming generation 0's records. In a
> compacted
> > > > >>>>>>
> > > > >>>>> topic,
> > > > >>>
> > > > >>>> the
> > > > >>>>>
> > > > >>>>>> broker probably can only safely remove the offset mappings
> when
> > > all
> > > > >>>>>>
> > > > >>>>> records
> > > > >>>>>
> > > > >>>>>> in generation 0 are removed by the cleaner. This may never
> > happen
> > > > >>>>>>
> > > > >>>>> though.
> > > > >>>>
> > > > >>>>> If we reshuffle the input inside a KStreams job, it obviates
> the
> > > need
> > > > >>>>>>
> > > > >>>>> for
> > > > >>>>
> > > > >>>>> offset remapping on the broker.
> > > > >>>>>>
> > > > >>>>>> Jun
> > > > >>>>>>
> > > > >>>>>> On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <
> > j...@confluent.io
> > > >
> > > > >>>>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hey Dong and Jun,
> > > > >>>>>>>
> > > > >>>>>>> Thanks for the thoughtful responses. If you don't mind, I'll
> > mix
> > > my
> > > > >>>>>>>
> > > > >>>>>> replies
> > > > >>>>>>
> > > > >>>>>>> together to try for a coherent response. I'm not too familiar
> > > with
> > > > >>>>>>> mailing-list etiquette, though.
> > > > >>>>>>>
> > > > >>>>>>> I'm going to keep numbering my points because it makes it
> easy
> > > for
> > > > >>>>>>>
> > > > >>>>>> you
> > > > >>>>
> > > > >>>>> all
> > > > >>>>>>
> > > > >>>>>>> to respond.
> > > > >>>>>>>
> > > > >>>>>>> Point 1:
> > > > >>>>>>> As I read it, KIP-253 is *just* about properly fencing the
> > > > >>>>>>>
> > > > >>>>>> producers
> > > > >>>
> > > > >>>> and
> > > > >>>>>
> > > > >>>>>> consumers so that you preserve the correct ordering of records
> > > > >>>>>>>
> > > > >>>>>> during
> > > > >>>
> > > > >>>> partition expansion. This is clearly necessary regardless of
> > > > >>>>>>>
> > > > >>>>>> anything
> > > > >>>
> > > > >>>> else
> > > > >>>>>>
> > > > >>>>>>> we discuss. I think this whole discussion about backfill,
> > > > >>>>>>>
> > > > >>>>>> consumers,
> > > > >>>
> > > > >>>> streams, etc., is beyond the scope of KIP-253. But it would be
> > > > >>>>>>>
> > > > >>>>>> cumbersome
> > > > >>>>>
> > > > >>>>>> to start a new thread at this point.
> > > > >>>>>>>
> > > > >>>>>>> I had missed KIP-253's Proposed Change #9 among all the
> > > details...
> > > > >>>>>>>
> > > > >>>>>> I
> > > > >>>
> > > > >>>> think
> > > > >>>>>>
> > > > >>>>>>> this is a nice addition to the proposal. One thought is that
> > it's
> > > > >>>>>>>
> > > > >>>>>> actually
> > > > >>>>>>
> > > > >>>>>>> irrelevant whether the hash function is linear. This is
> simply
> > an
> > > > >>>>>>>
> > > > >>>>>> algorithm
> > > > >>>>>>
> > > > >>>>>>> for moving a key from one partition to another, so the type
> of
> > > hash
> > > > >>>>>>> function need not be a precondition. In fact, it also doesn't
> > > > >>>>>>>
> > > > >>>>>> matter
> > > > >>>
> > > > >>>> whether the topic is compacted or not, the algorithm works
> > > > >>>>>>>
> > > > >>>>>> regardless.
> > > > >>>>
> > > > >>>>> I think this is a good algorithm to keep in mind, as it might
> > > > >>>>>>>
> > > > >>>>>> solve a
> > > > >>>
> > > > >>>> variety of problems, but it does have a downside: that the
> > producer
> > > > >>>>>>>
> > > > >>>>>> won't
> > > > >>>>>
> > > > >>>>>> know whether or not K1 was actually in P1, it just knows that
> K1
> > > > >>>>>>>
> > > > >>>>>> was
> > > > >>>
> > > > >>>> in
> > > > >>>>
> > > > >>>>> P1's keyspace before the new epoch. Therefore, it will have to
> > > > >>>>>>> pessimistically send (K1,null) to P1 just in case. But the
> next
> > > > >>>>>>>
> > > > >>>>>> time
> > > > >>>
> > > > >>>> K1
> > > > >>>>
> > > > >>>>> comes along, the producer *also* won't remember that it already
> > > > >>>>>>>
> > > > >>>>>> retracted
> > > > >>>>>
> > > > >>>>>> K1 from P1, so it will have to send (K1,null) *again*. By
> > > > >>>>>>>
> > > > >>>>>> extension,
> > > > >>>
> > > > >>>> every
> > > > >>>>>>
> > > > >>>>>>> time the producer sends to P2, it will also have to send a
> > > > >>>>>>>
> > > > >>>>>> tombstone
> > > > >>>
> > > > >>>> to
> > > > >>>>
> > > > >>>>> P1,
> > > > >>>>>>
> > > > >>>>>>> which is a pretty big burden. To make the situation worse, if
> > > there
> > > > >>>>>>>
> > > > >>>>>> is
> > > > >>>>
> > > > >>>>> a
> > > > >>>>>
> > > > >>>>>> second split, say P2 becomes P2 and P3, then any key Kx
> > belonging
> > > > >>>>>>>
> > > > >>>>>> to
> > > > >>>
> > > > >>>> P3
> > > > >>>>
> > > > >>>>> will also have to be retracted from P2 *and* P1, since the
> > producer
> > > > >>>>>>>
> > > > >>>>>> can't
> > > > >>>>>
> > > > >>>>>> know whether Kx had been last written to P2 or P1. Over a long
> > > > >>>>>>>
> > > > >>>>>> period
> > > > >>>
> > > > >>>> of
> > > > >>>>>
> > > > >>>>>> time, this clearly becomes a issue, as the producer must send
> an
> > > > >>>>>>>
> > > > >>>>>> arbitrary
> > > > >>>>>>
> > > > >>>>>>> number of retractions along with every update.
> > > > >>>>>>>
> > > > >>>>>>> In contrast, the proposed backfill operation has an end, and
> > > after
> > > > >>>>>>>
> > > > >>>>>> it
> > > > >>>
> > > > >>>> ends,
> > > > >>>>>>
> > > > >>>>>>> everyone can afford to forget that there ever was a different
> > > > >>>>>>>
> > > > >>>>>> partition
> > > > >>>>
> > > > >>>>> layout.
> > > > >>>>>>>
> > > > >>>>>>> Really, though, figuring out how to split compacted topics is
> > > > >>>>>>>
> > > > >>>>>> beyond
> > > > >>>
> > > > >>>> the
> > > > >>>>>
> > > > >>>>>> scope of KIP-253, so I'm not sure #9 really even needs to be
> in
> > > > >>>>>>>
> > > > >>>>>> this
> > > > >>>
> > > > >>>> KIP...
> > > > >>>>>>
> > > > >>>>>>> We do need in-order delivery during partition expansion. It
> > would
> > > > >>>>>>>
> > > > >>>>>> be
> > > > >>>
> > > > >>>> fine
> > > > >>>>>
> > > > >>>>>> by me to say that you *cannot* expand partitions of a
> > > log-compacted
> > > > >>>>>>>
> > > > >>>>>> topic
> > > > >>>>>
> > > > >>>>>> and call it a day. I think it would be better to tackle that
> in
> > > > >>>>>>>
> > > > >>>>>> another
> > > > >>>>
> > > > >>>>> KIP.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Point 2:
> > > > >>>>>>> Regarding whether the consumer re-shuffles its inputs, this
> is
> > > > >>>>>>>
> > > > >>>>>> always
> > > > >>>
> > > > >>>> on
> > > > >>>>>
> > > > >>>>>> the table; any consumer who wants to re-shuffle its input is
> > free
> > > > >>>>>>>
> > > > >>>>>> to
> > > > >>>
> > > > >>>> do
> > > > >>>>
> > > > >>>>> so.
> > > > >>>>>>
> > > > >>>>>>> But this is currently not required. It's just that the
> current
> > > > >>>>>>>
> > > > >>>>>> high-level
> > > > >>>>>
> > > > >>>>>> story with Kafka encourages the use of partitions as a unit of
> > > > >>>>>>>
> > > > >>>>>> concurrency.
> > > > >>>>>>
> > > > >>>>>>> As long as consumers are single-threaded, they can happily
> > > consume
> > > > >>>>>>>
> > > > >>>>>> a
> > > > >>>
> > > > >>>> single
> > > > >>>>>>
> > > > >>>>>>> partition without concurrency control of any kind. This is a
> > key
> > > > >>>>>>>
> > > > >>>>>> aspect
> > > > >>>>
> > > > >>>>> to
> > > > >>>>>>
> > > > >>>>>>> this system that lets folks design high-throughput systems on
> > top
> > > > >>>>>>>
> > > > >>>>>> of
> > > > >>>
> > > > >>>> it
> > > > >>>>
> > > > >>>>> surprisingly easily. If all consumers were instead
> > > > >>>>>>>
> > > > >>>>>> encouraged/required
> > > > >>>>
> > > > >>>>> to
> > > > >>>>>
> > > > >>>>>> implement a repartition of their own, then the consumer
> becomes
> > > > >>>>>>> significantly more complex, requiring either the consumer to
> > > first
> > > > >>>>>>>
> > > > >>>>>> produce
> > > > >>>>>>
> > > > >>>>>>> to its own intermediate repartition topic or to ensure that
> > > > >>>>>>>
> > > > >>>>>> consumer
> > > > >>>
> > > > >>>> threads have a reliable, high-bandwith channel of communication
> > > > >>>>>>>
> > > > >>>>>> with
> > > > >>>
> > > > >>>> every
> > > > >>>>>>
> > > > >>>>>>> other consumer thread.
> > > > >>>>>>>
> > > > >>>>>>> Either of those tradeoffs may be reasonable for a particular
> > user
> > > > >>>>>>>
> > > > >>>>>> of
> > > > >>>
> > > > >>>> Kafka,
> > > > >>>>>>
> > > > >>>>>>> but I don't know if we're in a position to say that they are
> > > > >>>>>>>
> > > > >>>>>> reasonable
> > > > >>>>
> > > > >>>>> for
> > > > >>>>>>
> > > > >>>>>>> *every* user of Kafka.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Point 3:
> > > > >>>>>>> Regarding Jun's point about this use case, "(3) stateful and
> > > > >>>>>>>
> > > > >>>>>> maintaining
> > > > >>>>>
> > > > >>>>>> the
> > > > >>>>>>> states in a local store", I agree that they may use a
> framework
> > > > >>>>>>>
> > > > >>>>>> *like*
> > > > >>>>
> > > > >>>>> Kafka Streams, but that is not the same as using Kafka Streams.
> > > > >>>>>>>
> > > > >>>>>> This
> > > > >>>
> > > > >>>> is
> > > > >>>>
> > > > >>>>> why
> > > > >>>>>>
> > > > >>>>>>> I think it's better to solve it in Core: because it is then
> > > solved
> > > > >>>>>>>
> > > > >>>>>> for
> > > > >>>>
> > > > >>>>> KStreams and also for everything else that facilitates local
> > state
> > > > >>>>>>> maintenance. To me, Streams is a member of the category of
> > > "stream
> > > > >>>>>>> processing frameworks", which is itself a subcategory of
> > "things
> > > > >>>>>>>
> > > > >>>>>> requiring
> > > > >>>>>>
> > > > >>>>>>> local state maintenence". I'm not sure if it makes sense to
> > > assert
> > > > >>>>>>>
> > > > >>>>>> that
> > > > >>>>
> > > > >>>>> Streams is a sufficient and practical replacement for
> everything
> > in
> > > > >>>>>>>
> > > > >>>>>> "things
> > > > >>>>>>
> > > > >>>>>>> requiring local state maintenence".
> > > > >>>>>>>
> > > > >>>>>>> But, yes, I do agree that per-key ordering is an absolute
> > > > >>>>>>>
> > > > >>>>>> requirement,
> > > > >>>>
> > > > >>>>> therefore I think that KIP-253 itself is a necessary step.
> > > > >>>>>>>
> > > > >>>>>> Regarding
> > > > >>>
> > > > >>>> the
> > > > >>>>>
> > > > >>>>>> coupling of the state store partitioning to the topic
> > > partitioning,
> > > > >>>>>>>
> > > > >>>>>> yes,
> > > > >>>>>
> > > > >>>>>> this is an issue we are discussing solutions to right now. We
> > may
> > > > >>>>>>>
> > > > >>>>>> go
> > > > >>>
> > > > >>>> ahead
> > > > >>>>>>
> > > > >>>>>>> and introduce an overpartition layer on our inputs to solve
> it,
> > > but
> > > > >>>>>>>
> > > > >>>>>> then
> > > > >>>>>
> > > > >>>>>> again, if we get the ability to split partitions with
> backfill,
> > we
> > > > >>>>>>>
> > > > >>>>>> may
> > > > >>>>
> > > > >>>>> not
> > > > >>>>>>
> > > > >>>>>>> need to!
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Point 4:
> > > > >>>>>>> On this:
> > > > >>>>>>>
> > > > >>>>>>> Regarding thought 2: If we don't care about the stream
> > use-case,
> > > > >>>>>>>>
> > > > >>>>>>> then
> > > > >>>>
> > > > >>>>> the
> > > > >>>>>>
> > > > >>>>>>> current KIP probably has already addressed problem without
> > > > >>>>>>>>
> > > > >>>>>>> requiring
> > > > >>>>
> > > > >>>>> consumer to know the partition function. If we care about the
> > > > >>>>>>>>
> > > > >>>>>>> stream
> > > > >>>>
> > > > >>>>> use-case, we already need coordination across producers of
> > > > >>>>>>>>
> > > > >>>>>>> different
> > > > >>>>
> > > > >>>>> topics, i.e. the same partition function needs to be used by
> > > > >>>>>>>>
> > > > >>>>>>> producers
> > > > >>>>>
> > > > >>>>>> of
> > > > >>>>>>
> > > > >>>>>>> topics A and B in order to join topics A and B. Thus, it
> might
> > be
> > > > >>>>>>>> reasonable to extend coordination a bit and say we need
> > > > >>>>>>>>
> > > > >>>>>>> coordination
> > > > >>>>
> > > > >>>>> across
> > > > >>>>>>>
> > > > >>>>>>>> clients (i.e. producer and consumer), such that consumer
> knows
> > > > >>>>>>>>
> > > > >>>>>>> the
> > > > >>>
> > > > >>>> partition function used by producer. If we do so, then we can
> let
> > > > >>>>>>>>
> > > > >>>>>>> consumer
> > > > >>>>>>>
> > > > >>>>>>>> re-copy data for the change log topic using the same
> partition
> > > > >>>>>>>>
> > > > >>>>>>> function
> > > > >>>>>
> > > > >>>>>> as
> > > > >>>>>>>
> > > > >>>>>>>> producer. This approach has lower overhead as compared to
> > having
> > > > >>>>>>>>
> > > > >>>>>>> producer
> > > > >>>>>>
> > > > >>>>>>> re-copy data of the input topic.
> > > > >>>>>>>> Also, producer currently does not need to know the data
> > already
> > > > >>>>>>>>
> > > > >>>>>>> produced
> > > > >>>>>>
> > > > >>>>>>> to
> > > > >>>>>>>
> > > > >>>>>>>> the topic. If we let producer split/merge partition, it
> would
> > > > >>>>>>>>
> > > > >>>>>>> require
> > > > >>>>
> > > > >>>>> producer to consume the existing data, which intuitively is the
> > > > >>>>>>>>
> > > > >>>>>>> task
> > > > >>>>
> > > > >>>>> of
> > > > >>>>>
> > > > >>>>>> consumer.
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> I think we do care about use cases *like* Streams, I just
> don't
> > > > >>>>>>>
> > > > >>>>>> think
> > > > >>>
> > > > >>>> we
> > > > >>>>>
> > > > >>>>>> should rely on Streams to implement a feature of Core like
> > > > >>>>>>>
> > > > >>>>>> partition
> > > > >>>
> > > > >>>> expansion.
> > > > >>>>>>>
> > > > >>>>>>> Note, though, that we (Streams) do not require coordination
> > > across
> > > > >>>>>>> producers. If two topics are certified to be co-partitioned,
> > then
> > > > >>>>>>>
> > > > >>>>>> Streams
> > > > >>>>>
> > > > >>>>>> apps can make use of that knowledge to optimize their topology
> > > > >>>>>>>
> > > > >>>>>> (skipping
> > > > >>>>>
> > > > >>>>>> a
> > > > >>>>>>
> > > > >>>>>>> repartition). But if they don't know whether they are
> > > > >>>>>>>
> > > > >>>>>> co-partitioned,
> > > > >>>
> > > > >>>> then
> > > > >>>>>>
> > > > >>>>>>> they'd better go ahead and repartition within the topology.
> > This
> > > is
> > > > >>>>>>>
> > > > >>>>>> the
> > > > >>>>
> > > > >>>>> current state.
> > > > >>>>>>>
> > > > >>>>>>> A huge selling point of Kafka is enabling different parts of
> > > > >>>>>>>
> > > > >>>>>> loosely
> > > > >>>
> > > > >>>> coupled organizations to produce and consume data independently.
> > > > >>>>>>>
> > > > >>>>>> Some
> > > > >>>
> > > > >>>> coordination between producers and consumers is necessary, like
> > > > >>>>>>> coordinating on the names of topics and their schemas. But
> > > Kafka's
> > > > >>>>>>>
> > > > >>>>>> value
> > > > >>>>>
> > > > >>>>>> proposition w.r.t. ESBs, etc. is inversely proportional to the
> > > > >>>>>>>
> > > > >>>>>> amount
> > > > >>>
> > > > >>>> of
> > > > >>>>>
> > > > >>>>>> coordination required. I think it behooves us to be extremely
> > > > >>>>>>>
> > > > >>>>>> skeptical
> > > > >>>>
> > > > >>>>> about introducing any coordination beyond correctness
> protocols.
> > > > >>>>>>>
> > > > >>>>>>> Asking producers and consumers, or even two different
> > producers,
> > > to
> > > > >>>>>>>
> > > > >>>>>> share
> > > > >>>>>
> > > > >>>>>> code like the partition function is a pretty huge ask. What if
> > > they
> > > > >>>>>>>
> > > > >>>>>> are
> > > > >>>>
> > > > >>>>> using different languages?
> > > > >>>>>>>
> > > > >>>>>>> Comparing organizational overhead vs computational overhead,
> > > there
> > > > >>>>>>>
> > > > >>>>>> are
> > > > >>>>
> > > > >>>>> maybe two orders of magnitude difference between them. In other
> > > > >>>>>>>
> > > > >>>>>> words,
> > > > >>>>
> > > > >>>>> I
> > > > >>>>>
> > > > >>>>>> would happily take on the (linear) overhead of having the
> > producer
> > > > >>>>>>>
> > > > >>>>>> re-copy
> > > > >>>>>>
> > > > >>>>>>> the data once during a re-partition in order to save the
> > > > >>>>>>>
> > > > >>>>>> organizational
> > > > >>>>
> > > > >>>>> overhead of tying all the producers and consumers together
> across
> > > > >>>>>>>
> > > > >>>>>> multiple
> > > > >>>>>>
> > > > >>>>>>> boundaries.
> > > > >>>>>>>
> > > > >>>>>>> On that last paragraph: note that the producer *did* know the
> > > data
> > > > >>>>>>>
> > > > >>>>>> it
> > > > >>>
> > > > >>>> already produced. It handled it the first time around. Asking it
> > to
> > > > >>>>>>> re-produce it into a new partition layout is squarely within
> > its
> > > > >>>>>>>
> > > > >>>>>> scope
> > > > >>>>
> > > > >>>>> of
> > > > >>>>>
> > > > >>>>>> capabilities. Contrast this with the alternative, asking the
> > > > >>>>>>>
> > > > >>>>>> consumer
> > > > >>>
> > > > >>>> to
> > > > >>>>>
> > > > >>>>>> re-partition the data. I think this is even less intuitive,
> when
> > > > >>>>>>>
> > > > >>>>>> the
> > > > >>>
> > > > >>>> partition function belongs to the producer.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Point 5:
> > > > >>>>>>> Dong asked this:
> > > > >>>>>>>
> > > > >>>>>>> For stream use-case that needs to increase consumer number,
> the
> > > > >>>>>>>> existing consumer can backfill the existing data in the
> change
> > > > >>>>>>>>
> > > > >>>>>>> log
> > > > >>>
> > > > >>>> topic
> > > > >>>>>>
> > > > >>>>>>> to
> > > > >>>>>>>
> > > > >>>>>>>> the same change log topic with the new partition number,
> > before
> > > > >>>>>>>>
> > > > >>>>>>> the
> > > > >>>
> > > > >>>> new
> > > > >>>>>
> > > > >>>>>> set
> > > > >>>>>>>
> > > > >>>>>>>> of consumers bootstrap state from the new partitions of the
> > > > >>>>>>>>
> > > > >>>>>>> change
> > > > >>>
> > > > >>>> log
> > > > >>>>>
> > > > >>>>>> topic, right?
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> In this sense, the "consumer" is actually the producer of the
> > > > >>>>>>>
> > > > >>>>>> changelog
> > > > >>>>
> > > > >>>>> topic, so if we support partition expansion + backfill as a
> > > > >>>>>>>
> > > > >>>>>> producer/broker
> > > > >>>>>>
> > > > >>>>>>> operation, then it would be very straightforward for Streams
> to
> > > > >>>>>>>
> > > > >>>>>> split a
> > > > >>>>
> > > > >>>>> state store. As you say, they would simply instruct the broker
> to
> > > > >>>>>>>
> > > > >>>>>> split
> > > > >>>>
> > > > >>>>> the
> > > > >>>>>>
> > > > >>>>>>> changelog topic's partitions, then backfill. Once the
> backfill
> > is
> > > > >>>>>>>
> > > > >>>>>> ready,
> > > > >>>>>
> > > > >>>>>> they can create a new crop of StandbyTasks to bootstrap the
> more
> > > > >>>>>>>
> > > > >>>>>> granular
> > > > >>>>>
> > > > >>>>>> state stores and finally switch over to them when they are
> > ready.
> > > > >>>>>>>
> > > > >>>>>>> But this actually seems to be an argument in favor of
> > > > >>>>>>>
> > > > >>>>>> split+backfill,
> > > > >>>
> > > > >>>> so
> > > > >>>>>
> > > > >>>>>> maybe I missed the point.
> > > > >>>>>>>
> > > > >>>>>>> You also asked me to explain why copying the "input" topic is
> > > > >>>>>>>
> > > > >>>>>> better
> > > > >>>
> > > > >>>> than
> > > > >>>>>
> > > > >>>>>> copying the "changelog" topic. I think they are totally
> > > > >>>>>>>
> > > > >>>>>> independent,
> > > > >>>
> > > > >>>> actually. For one thing, you can't depend on the existence of a
> > > > >>>>>>>
> > > > >>>>>> "changelog"
> > > > >>>>>>
> > > > >>>>>>> topic in general, only within Streams, but Kafka's user base
> > > > >>>>>>>
> > > > >>>>>> clearly
> > > > >>>
> > > > >>>> exceeds Streams's user base. Plus, you actually also can't
> depend
> > > > >>>>>>>
> > > > >>>>>> on
> > > > >>>
> > > > >>>> the
> > > > >>>>>
> > > > >>>>>> existence of a changelog topic within Streams, since that is
> an
> > > > >>>>>>>
> > > > >>>>>> optional
> > > > >>>>>
> > > > >>>>>> feature of *some* state store implementations. Even in the
> > > > >>>>>>>
> > > > >>>>>> situation
> > > > >>>
> > > > >>>> where
> > > > >>>>>>
> > > > >>>>>>> you do have a changelog topic in Streams, there may be use
> > cases
> > > > >>>>>>>
> > > > >>>>>> where
> > > > >>>>
> > > > >>>>> it
> > > > >>>>>
> > > > >>>>>> makes sense to expand the partitions of just the input, or
> just
> > > the
> > > > >>>>>>> changelog.
> > > > >>>>>>>
> > > > >>>>>>> The ask for a Core feature of split+backfill is really about
> > > > >>>>>>>
> > > > >>>>>> supporting
> > > > >>>>
> > > > >>>>> the
> > > > >>>>>>
> > > > >>>>>>> use case of splitting partitions in log-compacted topics,
> > > > >>>>>>>
> > > > >>>>>> regardless
> > > > >>>
> > > > >>>> of
> > > > >>>>
> > > > >>>>> whether that topic is an "input" or a "changelog" or anything
> > else
> > > > >>>>>>>
> > > > >>>>>> for
> > > > >>>>
> > > > >>>>> that
> > > > >>>>>>
> > > > >>>>>>> matter.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> Point 6:
> > > > >>>>>>> On the concern about the performance overhead of copying data
> > > > >>>>>>>
> > > > >>>>>> between
> > > > >>>
> > > > >>>> the
> > > > >>>>>
> > > > >>>>>> brokers, I think it's actually a bit overestimated. Splitting
> a
> > > > >>>>>>>
> > > > >>>>>> topic's
> > > > >>>>
> > > > >>>>> partition is probably rare, certainly rarer in general than
> > > > >>>>>>>
> > > > >>>>>> bootstrapping
> > > > >>>>>
> > > > >>>>>> new consumers on that topic. If "bootstrapping new consumers"
> > > means
> > > > >>>>>>>
> > > > >>>>>> that
> > > > >>>>>
> > > > >>>>>> they have to re-shuffle the data before they consume it, then
> > you
> > > > >>>>>>>
> > > > >>>>>> wind
> > > > >>>>
> > > > >>>>> up
> > > > >>>>>
> > > > >>>>>> copying the same record multiple times:
> > > > >>>>>>>
> > > > >>>>>>> (broker: input topic) -> (initial consumer) -> (broker:
> > > repartition
> > > > >>>>>>>
> > > > >>>>>> topic)
> > > > >>>>>>
> > > > >>>>>>> -> (real consumer)
> > > > >>>>>>>
> > > > >>>>>>> That's 3x, and it's also 3x for every new record after the
> > split
> > > as
> > > > >>>>>>>
> > > > >>>>>> well,
> > > > >>>>>
> > > > >>>>>> since you don't get to stop repartitioning/reshuffling once
> you
> > > > >>>>>>>
> > > > >>>>>> start.
> > > > >>>>
> > > > >>>>> Whereas if you do a backfill in something like the procedure I
> > > > >>>>>>>
> > > > >>>>>> outlined,
> > > > >>>>>
> > > > >>>>>> you only copy the prefix of the partition before the split,
> and
> > > you
> > > > >>>>>>>
> > > > >>>>>> send
> > > > >>>>>
> > > > >>>>>> it
> > > > >>>>>>
> > > > >>>>>>> once to the producer and then once to the new generation
> > > partition.
> > > > >>>>>>>
> > > > >>>>>> Plus,
> > > > >>>>>
> > > > >>>>>> assuming we're splitting the partition for the benefit of
> > > > >>>>>>>
> > > > >>>>>> consumers,
> > > > >>>
> > > > >>>> there's no reason we can't co-locate the post-split partitions
> on
> > > > >>>>>>>
> > > > >>>>>> the
> > > > >>>
> > > > >>>> same
> > > > >>>>>>
> > > > >>>>>>> host as the pre-split partition, making the second copy a
> local
> > > > >>>>>>>
> > > > >>>>>> filesystem
> > > > >>>>>>
> > > > >>>>>>> operation.
> > > > >>>>>>>
> > > > >>>>>>> Even if you follow these two copies up with bootstrapping a
> new
> > > > >>>>>>>
> > > > >>>>>> consumer,
> > > > >>>>>
> > > > >>>>>> it's still rare for this to occur, so you get to amortize
> these
> > > > >>>>>>>
> > > > >>>>>> copies
> > > > >>>>
> > > > >>>>> over
> > > > >>>>>>
> > > > >>>>>>> the lifetime of the topic, whereas a reshuffle just keeps
> > making
> > > > >>>>>>>
> > > > >>>>>> copies
> > > > >>>>
> > > > >>>>> for
> > > > >>>>>>
> > > > >>>>>>> every new event.
> > > > >>>>>>>
> > > > >>>>>>> And finally, I really do think that regardless of any
> > performance
> > > > >>>>>>>
> > > > >>>>>> concerns
> > > > >>>>>>
> > > > >>>>>>> about this operation, if it preserves loose organizational
> > > > >>>>>>>
> > > > >>>>>> coupling,
> > > > >>>
> > > > >>>> it
> > > > >>>>
> > > > >>>>> is
> > > > >>>>>>
> > > > >>>>>>> certainly worth it.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> In conclusion:
> > > > >>>>>>> It might actually be a good idea for us to clarify the scope
> of
> > > > >>>>>>>
> > > > >>>>>> KIP-253.
> > > > >>>>>
> > > > >>>>>> If
> > > > >>>>>>
> > > > >>>>>>> we're all agreed that it's a good algorithm for allowing
> > in-order
> > > > >>>>>>>
> > > > >>>>>> message
> > > > >>>>>
> > > > >>>>>> delivery during partition expansion, then we can continue this
> > > > >>>>>>>
> > > > >>>>>> discussion
> > > > >>>>>
> > > > >>>>>> as a new KIP, something like "backfill with partition
> > expansion".
> > > > >>>>>>>
> > > > >>>>>> This
> > > > >>>>
> > > > >>>>> would let Dong proceed with KIP-253. On the other hand, if it
> > seems
> > > > >>>>>>>
> > > > >>>>>> like
> > > > >>>>>
> > > > >>>>>> this conversation may alter the design of KIP-253, then maybe
> we
> > > > >>>>>>>
> > > > >>>>>> *should*
> > > > >>>>>
> > > > >>>>>> just finish working it out.
> > > > >>>>>>>
> > > > >>>>>>> For my part, my only concern about KIP-253 is the one I
> raised
> > > > >>>>>>>
> > > > >>>>>> earlier.
> > > > >>>>
> > > > >>>>> Thanks again, all, for considering these points,
> > > > >>>>>>> -John
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <
> lindon...@gmail.com
> > >
> > > > >>>>>>>
> > > > >>>>>> wrote:
> > > > >>>>
> > > > >>>>> On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <
> lindon...@gmail.com>
> > > > >>>>>>>>
> > > > >>>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hey Jan,
> > > > >>>>>>>>>
> > > > >>>>>>>>> Thanks for the enthusiasm in improving Kafka's design. Now
> > > > >>>>>>>>>
> > > > >>>>>>>> that I
> > > > >>>
> > > > >>>> have
> > > > >>>>>>
> > > > >>>>>>> read through your discussion with Jun, here are my thoughts:
> > > > >>>>>>>>>
> > > > >>>>>>>>> - The latest proposal should with log compacted topics by
> > > > >>>>>>>>>
> > > > >>>>>>>> properly
> > > > >>>>
> > > > >>>>> deleting old messages after a new message with the same key is
> > > > >>>>>>>>>
> > > > >>>>>>>> produced.
> > > > >>>>>>>
> > > > >>>>>>>> So
> > > > >>>>>>>>
> > > > >>>>>>>>> it is probably not a concern anymore. Could you comment if
> > > > >>>>>>>>>
> > > > >>>>>>>> there
> > > > >>>
> > > > >>>> is
> > > > >>>>
> > > > >>>>> still
> > > > >>>>>>>
> > > > >>>>>>>> issue?
> > > > >>>>>>>>>
> > > > >>>>>>>>> - I wrote the SEP-5 and I am pretty familiar with the
> > > > >>>>>>>>>
> > > > >>>>>>>> motivation
> > > > >>
> > > > >>
> > > >
> > >
> >
>

Reply via email to