Hey John,

Thanks much for your comments!!

I have yet to go through the emails of John/Jun/Guozhang in detail. But let
me present my idea for how to minimize the delay for state loading for
stream use-case.

For ease of understanding, let's assume that the initial partition number
of input topics and change log topic are both 10. And initial number of
stream processor is also 10. If we only increase initial partition number
of input topics to 15 without changing number of stream processor, the
current KIP already guarantees in-order delivery and no state needs to be
moved between consumers for stream use-case. Next, let's say we want to
increase the number of processor to expand the processing capacity for
stream use-case. This requires us to move state between processors which
will take time. Our goal is to minimize the impact (i.e. delay) for
processing while we increase the number of processors.

Note that stream processor generally includes both consumer and producer.
In addition to consume from the input topic, consumer may also need to
consume from change log topic on startup for recovery. And producer may
produce state to the change log topic.

The solution will include the following steps:

1) Increase partition number of the input topic from 10 to 15. Since the
messages with the same key will still go to the same consume before and
after the partition expansion, this step can be done without having to move
state between processors.

2) Increase partition number of the change log topic from 10 to 15. Note
that this step can also be done without impacting existing workflow. After
we increase partition number of the change log topic, key space may split
and some key will be produced to the newly-added partition. But the same
key will still go to the same processor (i.e. consumer) before and after
the partition. Thus this step can also be done without having to move state
between processors.

3) Now, let's add 5 new consumers whose groupId is different from the
existing processor's groupId. Thus these new consumers will not impact
existing workflow. Each of these new consumers should consume two
partitions from the earliest offset, where these two partitions are the
same partitions that will be consumed if the consumers have the same
groupId as the existing processor's groupId. For example, the first of the
five consumers will consume partition 0 and partition 10. The purpose of
these consumers is to rebuild the state (e.g. RocksDB) for the processors
in advance. Also note that, by design of the current KIP, each consume will
consume the existing partition of the change log topic up to the offset
before the partition expansion. Then they will only need to consume the
state of the new partition of the change log topic.

4) After consumers have caught up in step 3), we should stop these
consumers and add 5 new processors to the stream processing job. These 5
new processors should run in the same location as the previous 5 consumers
to re-use the state (e.g. RocksDB). And these processors' consumers should
consume partitions of the change log topic from the committed offset the
previous 5 consumers so that no state is missed.

One important trick to note here is that, the mapping from partition to
consumer should also use linear hashing. And we need to remember the
initial number of processors in the job, 10 in this example, and use this
number in the linear hashing algorithm. This is pretty much the same as how
we use linear hashing to map key to partition. In this case, we get an
identity map from partition -> processor, for both input topic and the
change log topic. For example, processor 12 will consume partition 12 of
the input topic and produce state to the partition 12 of the change log
topic.

There are a few important properties of this solution to note:

- We can increase the number of partitions for input topic and the change
log topic in any order asynchronously.
- The expansion of the processors in a given job in step 4) only requires
the step 3) for the same job. It does not require coordination across
different jobs for step 3) and 4). Thus different jobs can independently
expand there capacity without waiting for each other.
- The logic for 1) and 2) is already supported in the current KIP. The
logic for 3) and 4) appears to be independent of the core Kafka logic and
can be implemented separately outside core Kafka. Thus the current KIP is
probably sufficient after we agree on the efficiency and the correctness of
the solution. We can have a separate KIP for Kafka Stream to support 3) and
4).


Cheers,
Dong


On Mon, Apr 2, 2018 at 3:25 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hey guys, just sharing my two cents here (I promise it will be shorter than
> John's article :).
>
> 0. Just to quickly recap, the main discussion point now is how to support
> "key partitioning preservation" (John's #4 in topic characteristics above)
> beyond the "single-key ordering preservation" that KIP-253 was originally
> proposed to maintain (John's #6 above).
>
> 1. From the streams project, we are actively working on improving the
> elastic scalability of the library. One of the key features is to decouple
> the input topics from the parallelism model of Streams: i.e. not enforcing
> the topic to be partitioned by the key, not enforcing joining topics to be
> co-partitioned, not relying the number of parallel tasks on the input topic
> partitions. This can be achieved by re-shuffling on the input topics to
> make sure key-partitioning / co-partitioning on the internal topics. Note
> the re-shuffling task is purely stateless and hence does not require "key
> partitioning preservation". Operational-wise it is similar to the "creating
> a new topic with new number of partitions, pipe the data to the new topic
> and cut over consumers from old topics" idea, just that users can
> optionally let Streams to handle such rather than doing it manually
> themselves. There are a few more details on that regard but I will skip
> since they are not directly related to this discussion.
>
> 2. Assuming that 1) above is done, then the only topics involved in the
> scaling events are all input topics. For these topics the only producers /
> consumers of these topics are controlled by Streams clients themselves, and
> hence achieving "key partitioning preservation" is simpler than non-Streams
> scenarios: consumers know the partitioning scheme that producers are using,
> so that for their stateful operations it is doable to split the local state
> stores accordingly or execute backfilling on its own. Of course, if we
> decide to do server-side backfilling, it can still help Streams to directly
> rely on that functionality.
>
> 3. As John mentioned, another way inside Streams is to do over-partitioning
> on all internal topics; then with 1) Streams would not rely on KIP-253 at
> all. But personally I'd like to avoid it if possible to reduce Kafka side
> footprint: say we overpartition each input topic up to 1k, with a
> reasonable sized stateful topology it can still contribute to tens of
> thousands of topics to the topic partition capacity of a single cluster.
>
> 4. Summing up 1/2/3, I think we should focus more on non-Streams users
> writing their stateful computations with local states, and think whether /
> how we could enable "key partitioning preservation" for them easily, than
> to think heavily for Streams library. People may have different opinion on
> how common of a usage pattern it is (I think Jun might be suggesting that
> for DIY apps people may more likely use remote states so that it is not a
> problem for them). My opinion is that for non-Streams users such usage
> pattern could still be large (think: if you are piping data from Kafka to
> an external data storage which has single-writer requirements for each
> single shard, even though it is not a stateful computational application it
> may still require "key partitioning preservation"), so I prefer to have
> backfilling in our KIP than only exposing the API for expansion and
> requires consumers to have pre-knowledge of the producer's partitioning
> scheme.
>
>
>
> Guozhang
>
>
>
> On Thu, Mar 29, 2018 at 2:33 PM, John Roesler <j...@confluent.io> wrote:
>
> > Hey Dong,
> >
> > Congrats on becoming a committer!!!
> >
> > Since I just sent a novel-length email, I'll try and keep this one brief
> ;)
> >
> > Regarding producer coordination, I'll grant that in that case, producers
> > may coordinate among themselves to produce into the same topic or to
> > produce co-partitioned topics. Nothing in KStreams or the Kafka ecosystem
> > in general requires such coordination for correctness or in fact for any
> > optional features, though, so I would not say that we require producer
> > coordination of partition logic. If producers currently coordinate, it's
> > completely optional and their own choice.
> >
> > Regarding the portability of partition algorithms, my observation is that
> > systems requiring independent implementations of the same algorithm with
> > 100% correctness are a large source of risk and also a burden on those
> who
> > have to maintain them. If people could flawlessly implement algorithms in
> > actual software, the world would be a wonderful place indeed! For a
> system
> > as important and widespread as Kafka, I would recommend restricting
> > limiting such requirements as aggressively as possible.
> >
> > I'd agree that we can always revisit decisions like allowing arbitrary
> > partition functions, but of course, we shouldn't do that in a vacuum.
> That
> > feels like the kind of thing we'd need to proactively seek guidance from
> > the users list about. I do think that the general approach of saying that
> > "if you use a custom partitioner, you cannot do partition expansion" is
> > very reasonable (but I don't think we need to go that far with the
> current
> > proposal). It's similar to my statement in my email to Jun that in
> > principle KStreams doesn't *need* backfill, we only need it if we want to
> > employ partition expansion.
> >
> > I reckon that the main motivation for backfill is to support KStreams use
> > cases and also any other use cases involving stateful consumers.
> >
> > Thanks for your response, and congrats again!
> > -John
> >
> >
> > On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey John,
> > >
> > > Great! Thanks for all the comment. It seems that we agree that the
> > current
> > > KIP is in good shape for core Kafka. IMO, what we have been discussing
> in
> > > the recent email exchanges is mostly about the second step, i.e. how to
> > > address problem for the stream use-case (or stateful processing in
> > > general).
> > >
> > > I will comment inline.
> > >
> > >
> > >
> > >
> > > On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io>
> wrote:
> > >
> > > > Thanks for the response, Dong.
> > > >
> > > > Here are my answers to your questions:
> > > >
> > > > - "Asking producers and consumers, or even two different producers,
> to
> > > > > share code like the partition function is a pretty huge ask. What
> if
> > > they
> > > > > are using different languages?". It seems that today we already
> > require
> > > > > different producer's to use the same hash function -- otherwise
> > > messages
> > > > > with the same key will go to different partitions of the same topic
> > > which
> > > > > may cause problem for downstream consumption. So not sure if it
> adds
> > > any
> > > > > more constraint by assuming consumers know the hash function of
> > > producer.
> > > > > Could you explain more why user would want to use a cusmtom
> partition
> > > > > function? Maybe we can check if this is something that can be
> > supported
> > > > in
> > > > > the default Kafka hash function. Also, can you explain more why it
> is
> > > > > difficuilt to implement the same hash function in different
> > languages?
> > > >
> > > >
> > > > Sorry, I meant two different producers as in producers to two
> different
> > > > topics. This was in response to the suggestion that we already
> require
> > > > coordination among producers to different topics in order to achieve
> > > > co-partitioning. I was saying that we do not (and should not).
> > >
> > >
> > > It is probably common for producers of different team to produce
> message
> > to
> > > the same topic. In order to ensure that messages with the same key go
> to
> > > same partition, we need producers of different team to share the same
> > > partition algorithm, which by definition requires coordination among
> > > producers of different teams in an organization. Even for producers of
> > > different topics, it may be common to require producers to use the same
> > > partition algorithm in order to join two topics for stream processing.
> > Does
> > > this make it reasonable to say we already require coordination across
> > > producers?
> > >
> > >
> > > > By design, consumers are currently ignorant of the partitioning
> scheme.
> > > It
> > > > suffices to trust that the producer has partitioned the topic by key,
> > if
> > > > they claim to have done so. If you don't trust that, or even if you
> > just
> > > > need some other partitioning scheme, then you must re-partition it
> > > > yourself. Nothing we're discussing can or should change that. The
> value
> > > of
> > > > backfill is that it preserves the ability for consumers to avoid
> > > > re-partitioning before consuming, in the case where they don't need
> to
> > > > today.
> > >
> > >
> > > > Regarding shared "hash functions", note that it's a bit inaccurate to
> > > talk
> > > > about the "hash function" of the producer. Properly speaking, the
> > > producer
> > > > has only a "partition function". We do not know that it is a hash.
> The
> > > > producer can use any method at their disposal to assign a partition
> to
> > a
> > > > record. The partition function obviously may we written in any
> > > programming
> > > > language, so in general it's not something that can be shared around
> > > > without a formal spec or the ability to execute arbitrary executables
> > in
> > > > arbitrary runtime environments.
> > > >
> > >
> > > Yeah it is probably better to say partition algorithm. I guess it
> should
> > > not be difficult to implement same partition algorithms in different
> > > languages, right? Yes we would need a formal specification of the
> default
> > > partition algorithm in the producer. I think that can be documented as
> > part
> > > of the producer interface.
> > >
> > >
> > > >
> > > > Why would a producer want a custom partition function? I don't
> know...
> > > why
> > > > did we design the interface so that our users can provide one? In
> > > general,
> > > > such systems provide custom partitioners because some data sets may
> be
> > > > unbalanced under the default or because they can provide some
> > interesting
> > > > functionality built on top of the partitioning scheme, etc. Having
> > > provided
> > > > this ability, I don't know why we would remove it.
> > > >
> > >
> > > Yeah it is reasonable to assume that there was reason to support custom
> > > partition function in producer. On the other hand it may also be
> > reasonable
> > > to revisit this interface and discuss whether we actually need to
> support
> > > custom partition function. If we don't have a good reason, we can
> choose
> > > not to support custom partition function in this KIP in a backward
> > > compatible manner, i.e. user can still use custom partition function
> but
> > > they would not get the benefit of in-order delivery when there is
> > partition
> > > expansion. What do you think?
> > >
> > >
> > > >
> > > > - Besides the assumption that consumer needs to share the hash
> function
> > > of
> > > > > producer, is there other organization overhead of the proposal in
> the
> > > > > current KIP?
> > > > >
> > > >
> > > > It wasn't clear to me that KIP-253 currently required the producer
> and
> > > > consumer to share the partition function, or in fact that it had a
> hard
> > > > requirement to abandon the general partition function and use a
> linear
> > > hash
> > > > function instead.
> > >
> > >
> > > > In my reading, there is a requirement to track the metadata about
> what
> > > > partitions split into what other partitions during an expansion
> > > operation.
> > > > If the partition function is linear, this is easy. If not, you can
> > always
> > > > just record that all old partitions split into all new partitions.
> This
> > > has
> > > > the effect of forcing all consumers to wait until the old epoch is
> > > > completely consumed before starting on the new epoch. But this may
> be a
> > > > reasonable tradeoff, and it doesn't otherwise alter your design.
> > > >
> > > > You only mention the consumer needing to know that the partition
> > function
> > > > is linear, not what the actual function is, so I don't think your
> > design
> > > > actually calls for sharing the function. Plus, really all the
> consumer
> > > > needs is the metadata about what old-epoch partitions to wait for
> > before
> > > > consuming a new-epoch partition. This information is directly
> captured
> > in
> > > > metadata, so I don't think it actually even cares whether the
> partition
> > > > function is linear or not.
> > > >
> > >
> > > You are right that the current KIP does not mention it. My comment
> > related
> > > to the partition function coordination was related to support the
> > > stream-use case which we have been discussing so far.
> > >
> > >
> > > > So, no, I really think KIP-253 is in good shape. I was really more
> > > talking
> > > > about the part of this thread that's outside of KIP-253's scope,
> > namely,
> > > > creating the possibility of backfilling partitions after expansion.
> > > >
> > >
> > > Great! Can you also confirm that the main motivation for backfilling
> > > partitions after expansion is to support the stream use-case?
> > >
> > >
> > > > - Currently producer can forget about the message that has been
> > > > > acknowledged by the broker. Thus the producer probably does not
> know
> > > most
> > > > > of the exiting messages in topic, including those messages produced
> > by
> > > > > other producers. We can have the owner of the producer to
> > > split+backfill.
> > > > > In my opion it will be a new program that wraps around the existing
> > > > > producer and consumer classes.
> > > > >
> > > >
> > > > This sounds fine by me!
> > > >
> > > > Really, I was just emphasizing that the part of the organization that
> > > > produces a topic shouldn't have to export their partition function to
> > the
> > > > part(s) of the organization (or other organizations) that consume the
> > > > topic. Whether the backfill operation goes into the Producer
> interface
> > is
> > > > secondary, I think.
> > > >
> > > > - Regarding point 5. The argument is in favor of the split+backfill
> but
> > > for
> > > > > changelog topic. And it intends to address the problem for stream
> > > > use-case
> > > > > in general. In this KIP we will provide interface (i.e.
> > > > > PartitionKeyRebalanceListener in the KIP) to be used by sream
> > use-case
> > > > and
> > > > > the goal is that user can flush/re-consume the state as part of the
> > > > > interface implementation regardless of whether there is change log
> > > topic.
> > > > >
> > > > > Maybe you are suggesting that the main reason to do split+backfill
> of
> > > > input
> > > > > topic is to support log compacted topics? You mentioned in Point 1
> > that
> > > > log
> > > > > compacted topics is out of the scope of this KIP. Maybe I could
> > > > understand
> > > > > your position better. Regarding Jan's proposal to split partitions
> > with
> > > > > backfill, do you think this should replace the proposal in the
> > existing
> > > > > KIP, or do you think this is something that we should do in
> addition
> > to
> > > > the
> > > > > existing KIP?
> > > > >
> > > >
> > > > I think that interface is a good/necessary component of KIP-253.
> > > >
> > > > I personally (FWIW) feel that KIP-253 is appropriately scoped, but I
> do
> > > > think its utility will be limited unless there is a later KIP
> offering
> > > > backfill. But, maybe unlike Jan, I think it makes sense to try and
> > tackle
> > > > the ordering problem independently of backfill, so I'm in support of
> > the
> > > > current KIP.
> > > >
> > > > - Regarding point 6. I guess we can agree that it is better not to
> have
> > > the
> > > > > performance overhread of copying the input data. Before we discuss
> > more
> > > > on
> > > > > whether the performance overhead is acceptable or not, I am trying
> to
> > > > > figure out what is the benefit of introducing this overhread. You
> > > > mentioned
> > > > > that the benefit is the loose organizational coupling. By
> > > "organizational
> > > > > coupling", are you referring to the requirement that consumer needs
> > to
> > > > know
> > > > > the hash function of producer? If so, maybe we can discuss the
> > use-case
> > > > of
> > > > > custom partiton function and see whether we can find a way to
> support
> > > > such
> > > > > use-case without having to copy the input data.
> > > > >
> > > >
> > > > I'm not too sure about what an "input" is in this sense, since we are
> > > just
> > > > talking about topics. Actually the point I was making there is that
> > > AKAICT
> > > > the performance overhead of a backfill is less than any other option,
> > > > assuming you split partitions rarely.
> > > >
> > >
> > > By "input" I was referring to source Kafka topic of a stream processing
> > > job.
> > >
> > >
> > > > Separately, yes, "organizational coupling" increases if producers and
> > > > consumers have to share code, such as the partition function. This
> > would
> > > > not be the case if producers could only pick from a menu of a few
> > > > well-known partition functions, but I think this is a poor tradeoff.
> > > >
> > >
> > > Maybe we can revisit the custom partition function and see whether we
> > > actually need it? Otherwise, I am concerned that every user will pay
> the
> > > overhead of data movement to support something that was not really
> needed
> > > for most users.
> > >
> > >
> > > >
> > > > To me, this is two strong arguments in favor of backfill being less
> > > > expensive than no backfill, but again, I think that particular debate
> > > comes
> > > > after KIP-253, so I don't want to create the impression of opposition
> > to
> > > > your proposal.
> > > >
> > > >
> > > > Finally, to respond to a new email I just noticed:
> > > >
> > > > > BTW, here is my understanding of the scope of this KIP. We want to
> > > allow
> > > > > consumers to always consume messages with the same key from the
> same
> > > > > producer in the order they are produced. And we need to provide a
> way
> > > for
> > > > > stream use-case to be able to flush/load state when messages with
> the
> > > > same
> > > > > key are migrated between consumers. In addition to ensuring that
> this
> > > > goal
> > > > > is correctly supported, we should do our best to keep the
> performance
> > > and
> > > > > organization overhead of this KIP as low as possible.
> > > > >
> > > >
> > > > I think we're on the same page there! In fact, I would generalize a
> > > little
> > > > more and say that the mechanism you've designed provides *all
> > consumers*
> > > > the ability "to flush/load state when messages with the same key are
> > > > migrated between consumers", not just Streams.
> > > >
> > >
> > > Thanks for all the comment!
> > >
> > >
> > > >
> > > > Thanks for the discussion,
> > > > -John
> > > >
> > > >
> > > >
> > > > On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey John,
> > > > >
> > > > > Thanks much for the detailed comments. Here are my thoughts:
> > > > >
> > > > > - The need to delete messages from log compacted topics is mainly
> for
> > > > > performance (e.g. storage space) optimization than for correctness
> > for
> > > > this
> > > > > KIP. I agree that we probably don't need to focus on this in our
> > > > discussion
> > > > > since it is mostly for performance optimization.
> > > > >
> > > > > - "Asking producers and consumers, or even two different producers,
> > to
> > > > > share code like the partition function is a pretty huge ask. What
> if
> > > they
> > > > > are using different languages?". It seems that today we already
> > require
> > > > > different producer's to use the same hash function -- otherwise
> > > messages
> > > > > with the same key will go to different partitions of the same topic
> > > which
> > > > > may cause problem for downstream consumption. So not sure if it
> adds
> > > any
> > > > > more constraint by assuming consumers know the hash function of
> > > producer.
> > > > > Could you explain more why user would want to use a cusmtom
> partition
> > > > > function? Maybe we can check if this is something that can be
> > supported
> > > > in
> > > > > the default Kafka hash function. Also, can you explain more why it
> is
> > > > > difficuilt to implement the same hash function in different
> > languages?
> > > > >
> > > > > - Besides the assumption that consumer needs to share the hash
> > function
> > > > of
> > > > > producer, is there other organization overhead of the proposal in
> the
> > > > > current KIP?
> > > > >
> > > > > - Currently producer can forget about the message that has been
> > > > > acknowledged by the broker. Thus the producer probably does not
> know
> > > most
> > > > > of the exiting messages in topic, including those messages produced
> > by
> > > > > other producers. We can have the owner of the producer to
> > > split+backfill.
> > > > > In my opion it will be a new program that wraps around the existing
> > > > > producer and consumer classes.
> > > > >
> > > > > - Regarding point 5. The argument is in favor of the split+backfill
> > but
> > > > for
> > > > > changelog topic. And it intends to address the problem for stream
> > > > use-case
> > > > > in general. In this KIP we will provide interface (i.e.
> > > > > PartitionKeyRebalanceListener in the KIP) to be used by sream
> > use-case
> > > > and
> > > > > the goal is that user can flush/re-consume the state as part of the
> > > > > interface implementation regardless of whether there is change log
> > > topic.
> > > > >
> > > > > Maybe you are suggesting that the main reason to do split+backfill
> of
> > > > input
> > > > > topic is to support log compacted topics? You mentioned in Point 1
> > that
> > > > log
> > > > > compacted topics is out of the scope of this KIP. Maybe I could
> > > > understand
> > > > > your position better. Regarding Jan's proposal to split partitions
> > with
> > > > > backfill, do you think this should replace the proposal in the
> > existing
> > > > > KIP, or do you think this is something that we should do in
> addition
> > to
> > > > the
> > > > > existing KIP?
> > > > >
> > > > > - Regarding point 6. I guess we can agree that it is better not to
> > have
> > > > the
> > > > > performance overhread of copying the input data. Before we discuss
> > more
> > > > on
> > > > > whether the performance overhead is acceptable or not, I am trying
> to
> > > > > figure out what is the benefit of introducing this overhread. You
> > > > mentioned
> > > > > that the benefit is the loose organizational coupling. By
> > > "organizational
> > > > > coupling", are you referring to the requirement that consumer needs
> > to
> > > > know
> > > > > the hash function of producer? If so, maybe we can discuss the
> > use-case
> > > > of
> > > > > custom partiton function and see whether we can find a way to
> support
> > > > such
> > > > > use-case without having to copy the input data.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Hey Dong and Jun,
> > > > > >
> > > > > > Thanks for the thoughtful responses. If you don't mind, I'll mix
> my
> > > > > replies
> > > > > > together to try for a coherent response. I'm not too familiar
> with
> > > > > > mailing-list etiquette, though.
> > > > > >
> > > > > > I'm going to keep numbering my points because it makes it easy
> for
> > > you
> > > > > all
> > > > > > to respond.
> > > > > >
> > > > > > Point 1:
> > > > > > As I read it, KIP-253 is *just* about properly fencing the
> > producers
> > > > and
> > > > > > consumers so that you preserve the correct ordering of records
> > during
> > > > > > partition expansion. This is clearly necessary regardless of
> > anything
> > > > > else
> > > > > > we discuss. I think this whole discussion about backfill,
> > consumers,
> > > > > > streams, etc., is beyond the scope of KIP-253. But it would be
> > > > cumbersome
> > > > > > to start a new thread at this point.
> > > > > >
> > > > > > I had missed KIP-253's Proposed Change #9 among all the
> details...
> > I
> > > > > think
> > > > > > this is a nice addition to the proposal. One thought is that it's
> > > > > actually
> > > > > > irrelevant whether the hash function is linear. This is simply an
> > > > > algorithm
> > > > > > for moving a key from one partition to another, so the type of
> hash
> > > > > > function need not be a precondition. In fact, it also doesn't
> > matter
> > > > > > whether the topic is compacted or not, the algorithm works
> > > regardless.
> > > > > >
> > > > > > I think this is a good algorithm to keep in mind, as it might
> > solve a
> > > > > > variety of problems, but it does have a downside: that the
> producer
> > > > won't
> > > > > > know whether or not K1 was actually in P1, it just knows that K1
> > was
> > > in
> > > > > > P1's keyspace before the new epoch. Therefore, it will have to
> > > > > > pessimistically send (K1,null) to P1 just in case. But the next
> > time
> > > K1
> > > > > > comes along, the producer *also* won't remember that it already
> > > > retracted
> > > > > > K1 from P1, so it will have to send (K1,null) *again*. By
> > extension,
> > > > > every
> > > > > > time the producer sends to P2, it will also have to send a
> > tombstone
> > > to
> > > > > P1,
> > > > > > which is a pretty big burden. To make the situation worse, if
> there
> > > is
> > > > a
> > > > > > second split, say P2 becomes P2 and P3, then any key Kx belonging
> > to
> > > P3
> > > > > > will also have to be retracted from P2 *and* P1, since the
> producer
> > > > can't
> > > > > > know whether Kx had been last written to P2 or P1. Over a long
> > period
> > > > of
> > > > > > time, this clearly becomes a issue, as the producer must send an
> > > > > arbitrary
> > > > > > number of retractions along with every update.
> > > > > >
> > > > > > In contrast, the proposed backfill operation has an end, and
> after
> > it
> > > > > ends,
> > > > > > everyone can afford to forget that there ever was a different
> > > partition
> > > > > > layout.
> > > > > >
> > > > > > Really, though, figuring out how to split compacted topics is
> > beyond
> > > > the
> > > > > > scope of KIP-253, so I'm not sure #9 really even needs to be in
> > this
> > > > > KIP...
> > > > > > We do need in-order delivery during partition expansion. It would
> > be
> > > > fine
> > > > > > by me to say that you *cannot* expand partitions of a
> log-compacted
> > > > topic
> > > > > > and call it a day. I think it would be better to tackle that in
> > > another
> > > > > > KIP.
> > > > > >
> > > > > >
> > > > > > Point 2:
> > > > > > Regarding whether the consumer re-shuffles its inputs, this is
> > always
> > > > on
> > > > > > the table; any consumer who wants to re-shuffle its input is free
> > to
> > > do
> > > > > so.
> > > > > > But this is currently not required. It's just that the current
> > > > high-level
> > > > > > story with Kafka encourages the use of partitions as a unit of
> > > > > concurrency.
> > > > > > As long as consumers are single-threaded, they can happily
> consume
> > a
> > > > > single
> > > > > > partition without concurrency control of any kind. This is a key
> > > aspect
> > > > > to
> > > > > > this system that lets folks design high-throughput systems on top
> > of
> > > it
> > > > > > surprisingly easily. If all consumers were instead
> > > encouraged/required
> > > > to
> > > > > > implement a repartition of their own, then the consumer becomes
> > > > > > significantly more complex, requiring either the consumer to
> first
> > > > > produce
> > > > > > to its own intermediate repartition topic or to ensure that
> > consumer
> > > > > > threads have a reliable, high-bandwith channel of communication
> > with
> > > > > every
> > > > > > other consumer thread.
> > > > > >
> > > > > > Either of those tradeoffs may be reasonable for a particular user
> > of
> > > > > Kafka,
> > > > > > but I don't know if we're in a position to say that they are
> > > reasonable
> > > > > for
> > > > > > *every* user of Kafka.
> > > > > >
> > > > > >
> > > > > > Point 3:
> > > > > > Regarding Jun's point about this use case, "(3) stateful and
> > > > maintaining
> > > > > > the
> > > > > > states in a local store", I agree that they may use a framework
> > > *like*
> > > > > > Kafka Streams, but that is not the same as using Kafka Streams.
> > This
> > > is
> > > > > why
> > > > > > I think it's better to solve it in Core: because it is then
> solved
> > > for
> > > > > > KStreams and also for everything else that facilitates local
> state
> > > > > > maintenance. To me, Streams is a member of the category of
> "stream
> > > > > > processing frameworks", which is itself a subcategory of "things
> > > > > requiring
> > > > > > local state maintenence". I'm not sure if it makes sense to
> assert
> > > that
> > > > > > Streams is a sufficient and practical replacement for everything
> in
> > > > > "things
> > > > > > requiring local state maintenence".
> > > > > >
> > > > > > But, yes, I do agree that per-key ordering is an absolute
> > > requirement,
> > > > > > therefore I think that KIP-253 itself is a necessary step.
> > Regarding
> > > > the
> > > > > > coupling of the state store partitioning to the topic
> partitioning,
> > > > yes,
> > > > > > this is an issue we are discussing solutions to right now. We may
> > go
> > > > > ahead
> > > > > > and introduce an overpartition layer on our inputs to solve it,
> but
> > > > then
> > > > > > again, if we get the ability to split partitions with backfill,
> we
> > > may
> > > > > not
> > > > > > need to!
> > > > > >
> > > > > >
> > > > > > Point 4:
> > > > > > On this:
> > > > > >
> > > > > > > Regarding thought 2: If we don't care about the stream
> use-case,
> > > then
> > > > > the
> > > > > > > current KIP probably has already addressed problem without
> > > requiring
> > > > > > > consumer to know the partition function. If we care about the
> > > stream
> > > > > > > use-case, we already need coordination across producers of
> > > different
> > > > > > > topics, i.e. the same partition function needs to be used by
> > > > producers
> > > > > of
> > > > > > > topics A and B in order to join topics A and B. Thus, it might
> be
> > > > > > > reasonable to extend coordination a bit and say we need
> > > coordination
> > > > > > across
> > > > > > > clients (i.e. producer and consumer), such that consumer knows
> > the
> > > > > > > partition function used by producer. If we do so, then we can
> let
> > > > > > consumer
> > > > > > > re-copy data for the change log topic using the same partition
> > > > function
> > > > > > as
> > > > > > > producer. This approach has lower overhead as compared to
> having
> > > > > producer
> > > > > > > re-copy data of the input topic.
> > > > > > > Also, producer currently does not need to know the data already
> > > > > produced
> > > > > > to
> > > > > > > the topic. If we let producer split/merge partition, it would
> > > require
> > > > > > > producer to consume the existing data, which intuitively is the
> > > task
> > > > of
> > > > > > > consumer.
> > > > > >
> > > > > >
> > > > > > I think we do care about use cases *like* Streams, I just don't
> > think
> > > > we
> > > > > > should rely on Streams to implement a feature of Core like
> > partition
> > > > > > expansion.
> > > > > >
> > > > > > Note, though, that we (Streams) do not require coordination
> across
> > > > > > producers. If two topics are certified to be co-partitioned, then
> > > > Streams
> > > > > > apps can make use of that knowledge to optimize their topology
> > > > (skipping
> > > > > a
> > > > > > repartition). But if they don't know whether they are
> > co-partitioned,
> > > > > then
> > > > > > they'd better go ahead and repartition within the topology. This
> is
> > > the
> > > > > > current state.
> > > > > >
> > > > > > A huge selling point of Kafka is enabling different parts of
> > loosely
> > > > > > coupled organizations to produce and consume data independently.
> > Some
> > > > > > coordination between producers and consumers is necessary, like
> > > > > > coordinating on the names of topics and their schemas. But
> Kafka's
> > > > value
> > > > > > proposition w.r.t. ESBs, etc. is inversely proportional to the
> > amount
> > > > of
> > > > > > coordination required. I think it behooves us to be extremely
> > > skeptical
> > > > > > about introducing any coordination beyond correctness protocols.
> > > > > >
> > > > > > Asking producers and consumers, or even two different producers,
> to
> > > > share
> > > > > > code like the partition function is a pretty huge ask. What if
> they
> > > are
> > > > > > using different languages?
> > > > > >
> > > > > > Comparing organizational overhead vs computational overhead,
> there
> > > are
> > > > > > maybe two orders of magnitude difference between them. In other
> > > words,
> > > > I
> > > > > > would happily take on the (linear) overhead of having the
> producer
> > > > > re-copy
> > > > > > the data once during a re-partition in order to save the
> > > organizational
> > > > > > overhead of tying all the producers and consumers together across
> > > > > multiple
> > > > > > boundaries.
> > > > > >
> > > > > > On that last paragraph: note that the producer *did* know the
> data
> > it
> > > > > > already produced. It handled it the first time around. Asking it
> to
> > > > > > re-produce it into a new partition layout is squarely within its
> > > scope
> > > > of
> > > > > > capabilities. Contrast this with the alternative, asking the
> > consumer
> > > > to
> > > > > > re-partition the data. I think this is even less intuitive, when
> > the
> > > > > > partition function belongs to the producer.
> > > > > >
> > > > > >
> > > > > > Point 5:
> > > > > > Dong asked this:
> > > > > >
> > > > > > > For stream use-case that needs to increase consumer number, the
> > > > > > > existing consumer can backfill the existing data in the change
> > log
> > > > > topic
> > > > > > to
> > > > > > > the same change log topic with the new partition number, before
> > the
> > > > new
> > > > > > set
> > > > > > > of consumers bootstrap state from the new partitions of the
> > change
> > > > log
> > > > > > > topic, right?
> > > > > >
> > > > > >
> > > > > > In this sense, the "consumer" is actually the producer of the
> > > changelog
> > > > > > topic, so if we support partition expansion + backfill as a
> > > > > producer/broker
> > > > > > operation, then it would be very straightforward for Streams to
> > > split a
> > > > > > state store. As you say, they would simply instruct the broker to
> > > split
> > > > > the
> > > > > > changelog topic's partitions, then backfill. Once the backfill is
> > > > ready,
> > > > > > they can create a new crop of StandbyTasks to bootstrap the more
> > > > granular
> > > > > > state stores and finally switch over to them when they are ready.
> > > > > >
> > > > > > But this actually seems to be an argument in favor of
> > split+backfill,
> > > > so
> > > > > > maybe I missed the point.
> > > > > >
> > > > > > You also asked me to explain why copying the "input" topic is
> > better
> > > > than
> > > > > > copying the "changelog" topic. I think they are totally
> > independent,
> > > > > > actually. For one thing, you can't depend on the existence of a
> > > > > "changelog"
> > > > > > topic in general, only within Streams, but Kafka's user base
> > clearly
> > > > > > exceeds Streams's user base. Plus, you actually also can't depend
> > on
> > > > the
> > > > > > existence of a changelog topic within Streams, since that is an
> > > > optional
> > > > > > feature of *some* state store implementations. Even in the
> > situation
> > > > > where
> > > > > > you do have a changelog topic in Streams, there may be use cases
> > > where
> > > > it
> > > > > > makes sense to expand the partitions of just the input, or just
> the
> > > > > > changelog.
> > > > > >
> > > > > > The ask for a Core feature of split+backfill is really about
> > > supporting
> > > > > the
> > > > > > use case of splitting partitions in log-compacted topics,
> > regardless
> > > of
> > > > > > whether that topic is an "input" or a "changelog" or anything
> else
> > > for
> > > > > that
> > > > > > matter.
> > > > > >
> > > > > >
> > > > > > Point 6:
> > > > > > On the concern about the performance overhead of copying data
> > between
> > > > the
> > > > > > brokers, I think it's actually a bit overestimated. Splitting a
> > > topic's
> > > > > > partition is probably rare, certainly rarer in general than
> > > > bootstrapping
> > > > > > new consumers on that topic. If "bootstrapping new consumers"
> means
> > > > that
> > > > > > they have to re-shuffle the data before they consume it, then you
> > > wind
> > > > up
> > > > > > copying the same record multiple times:
> > > > > >
> > > > > > (broker: input topic) -> (initial consumer) -> (broker:
> repartition
> > > > > topic)
> > > > > > -> (real consumer)
> > > > > >
> > > > > > That's 3x, and it's also 3x for every new record after the split
> as
> > > > well,
> > > > > > since you don't get to stop repartitioning/reshuffling once you
> > > start.
> > > > > >
> > > > > > Whereas if you do a backfill in something like the procedure I
> > > > outlined,
> > > > > > you only copy the prefix of the partition before the split, and
> you
> > > > send
> > > > > it
> > > > > > once to the producer and then once to the new generation
> partition.
> > > > Plus,
> > > > > > assuming we're splitting the partition for the benefit of
> > consumers,
> > > > > > there's no reason we can't co-locate the post-split partitions on
> > the
> > > > > same
> > > > > > host as the pre-split partition, making the second copy a local
> > > > > filesystem
> > > > > > operation.
> > > > > >
> > > > > > Even if you follow these two copies up with bootstrapping a new
> > > > consumer,
> > > > > > it's still rare for this to occur, so you get to amortize these
> > > copies
> > > > > over
> > > > > > the lifetime of the topic, whereas a reshuffle just keeps making
> > > copies
> > > > > for
> > > > > > every new event.
> > > > > >
> > > > > > And finally, I really do think that regardless of any performance
> > > > > concerns
> > > > > > about this operation, if it preserves loose organizational
> > coupling,
> > > it
> > > > > is
> > > > > > certainly worth it.
> > > > > >
> > > > > >
> > > > > > In conclusion:
> > > > > > It might actually be a good idea for us to clarify the scope of
> > > > KIP-253.
> > > > > If
> > > > > > we're all agreed that it's a good algorithm for allowing in-order
> > > > message
> > > > > > delivery during partition expansion, then we can continue this
> > > > discussion
> > > > > > as a new KIP, something like "backfill with partition expansion".
> > > This
> > > > > > would let Dong proceed with KIP-253. On the other hand, if it
> seems
> > > > like
> > > > > > this conversation may alter the design of KIP-253, then maybe we
> > > > *should*
> > > > > > just finish working it out.
> > > > > >
> > > > > > For my part, my only concern about KIP-253 is the one I raised
> > > earlier.
> > > > > >
> > > > > > Thanks again, all, for considering these points,
> > > > > > -John
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 27, 2018 at 2:10 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <
> lindon...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hey Jan,
> > > > > > > >
> > > > > > > > Thanks for the enthusiasm in improving Kafka's design. Now
> > that I
> > > > > have
> > > > > > > > read through your discussion with Jun, here are my thoughts:
> > > > > > > >
> > > > > > > > - The latest proposal should with log compacted topics by
> > > properly
> > > > > > > > deleting old messages after a new message with the same key
> is
> > > > > > produced.
> > > > > > > So
> > > > > > > > it is probably not a concern anymore. Could you comment if
> > there
> > > is
> > > > > > still
> > > > > > > > issue?
> > > > > > > >
> > > > > > > > - I wrote the SEP-5 and I am pretty familiar with the
> > motivation
> > > > and
> > > > > > the
> > > > > > > > design of SEP-5. SEP-5 is probably orthornal to the
> motivation
> > of
> > > > > this
> > > > > > > KIP.
> > > > > > > > The goal of SEP-5 is to allow user to increase task number of
> > an
> > > > > > existing
> > > > > > > > Samza job. But if we increase the partition number of input
> > > topics,
> > > > > > > > messages may still be consumed out-of-order by tasks in Samza
> > > which
> > > > > > cause
> > > > > > > > incorrect result. Similarly, the approach you proposed does
> not
> > > > seem
> > > > > to
> > > > > > > > ensure that the messages can be delivered in order, even if
> we
> > > can
> > > > > make
> > > > > > > > sure that each consumer instance is assigned the set of new
> > > > > partitions
> > > > > > > > covering the same set of keys.
> > > > > > > >
> > > > > > >
> > > > > > > Let me correct this comment. The approach of copying data to a
> > new
> > > > > topic
> > > > > > > can ensure in-order message delivery suppose we properly
> migrate
> > > > > offsets
> > > > > > > from old topic to new topic.
> > > > > > >
> > > > > > >
> > > > > > > > - I am trying to understand why it is better to copy the data
> > > > instead
> > > > > > of
> > > > > > > > copying the change log topic for streaming use-case. For core
> > > Kafka
> > > > > > > > use-case, and for the stream use-case that does not need to
> > > > increase
> > > > > > > > consumers, the current KIP already supports in-order delivery
> > > > without
> > > > > > the
> > > > > > > > overhead of copying the data. For stream use-case that needs
> to
> > > > > > increase
> > > > > > > > consumer number, the existing consumer can backfill the
> > existing
> > > > data
> > > > > > in
> > > > > > > > the change log topic to the same change log topic with the
> new
> > > > > > partition
> > > > > > > > number, before the new set of consumers bootstrap state from
> > the
> > > > new
> > > > > > > > partitions of the change log topic. If this solution works,
> > then
> > > > > could
> > > > > > > you
> > > > > > > > summarize the advantage of copying the data of input topic as
> > > > > compared
> > > > > > to
> > > > > > > > copying the change log topic? For example, does it enable
> more
> > > > > > use-case,
> > > > > > > > simplify the implementation of Kafka library, or reduce the
> > > > operation
> > > > > > > > overhead etc?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Dong
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <
> > > > > > jan.filip...@trivago.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi Jun,
> > > > > > > >>
> > > > > > > >> I was really seeing progress in our conversation but your
> > latest
> > > > > reply
> > > > > > > is
> > > > > > > >> just devastating.
> > > > > > > >> I though we were getting close being on the same page now it
> > > feels
> > > > > > like
> > > > > > > >> we are in different libraries.
> > > > > > > >>
> > > > > > > >> I just quickly slam my answers in here. If they are to
> brief I
> > > am
> > > > > > sorry
> > > > > > > >> give me a ping and try to go into details more.
> > > > > > > >> Just want to show that your pro/cons listing is broken.
> > > > > > > >>
> > > > > > > >> Best Jan
> > > > > > > >>
> > > > > > > >> and want to get rid of this horrible compromise
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On 19.03.2018 05:48, Jun Rao wrote:
> > > > > > > >>
> > > > > > > >>> Hi, Jan,
> > > > > > > >>>
> > > > > > > >>> Thanks for the discussion. Great points.
> > > > > > > >>>
> > > > > > > >>> Let me try to summarize the approach that you are
> proposing.
> > On
> > > > the
> > > > > > > >>> broker
> > > > > > > >>> side, we reshuffle the existing data in a topic from
> current
> > > > > > partitions
> > > > > > > >>> to
> > > > > > > >>> the new partitions. Once the reshuffle fully catches up,
> > switch
> > > > the
> > > > > > > >>> consumers to start consuming from the new partitions. If a
> > > > consumer
> > > > > > > needs
> > > > > > > >>> to rebuild its local state (due to partition changes), let
> > the
> > > > > > consumer
> > > > > > > >>> rebuild its state by reading all existing data from the new
> > > > > > partitions.
> > > > > > > >>> Once all consumers have switches over, cut over the
> producer
> > to
> > > > the
> > > > > > new
> > > > > > > >>> partitions.
> > > > > > > >>>
> > > > > > > >>> The pros for this approach are that :
> > > > > > > >>> 1. There is just one way to rebuild the local state, which
> is
> > > > > > simpler.
> > > > > > > >>>
> > > > > > > >> true thanks
> > > > > > > >>
> > > > > > > >>>
> > > > > > > >>> The cons for this approach are:
> > > > > > > >>> 1. Need to copy existing data.
> > > > > > > >>>
> > > > > > > >> Very unfair and not correct. It does not require you to copy
> > > over
> > > > > > > >> existing data. It _allows_ you to copy all existing data.
> > > > > > > >>
> > > > > > > >> 2. The cutover of the producer is a bit complicated since it
> > > needs
> > > > > to
> > > > > > > >>> coordinate with all consumer groups.
> > > > > > > >>>
> > > > > > > >> Also not true. I explicitly tried to make clear that there
> is
> > > only
> > > > > one
> > > > > > > >> special consumer (in the case of actually copying data)
> > > > coordination
> > > > > > is
> > > > > > > >> required.
> > > > > > > >>
> > > > > > > >>> 3. The rebuilding of the state in the consumer is from the
> > > input
> > > > > > topic,
> > > > > > > >>> which can be more expensive than rebuilding from the
> existing
> > > > > state.
> > > > > > > >>>
> > > > > > > >> true, but rebuilding state is only required if you want to
> > > > increase
> > > > > > > >> processing power, so we assume this is at hand.
> > > > > > > >>
> > > > > > > >>> 4. The broker potentially has to know the partitioning
> > > function.
> > > > If
> > > > > > > this
> > > > > > > >>> needs to be customized at the topic level, it can be a bit
> > > messy.
> > > > > > > >>>
> > > > > > > >> I would argue against having the operation being performed
> by
> > > the
> > > > > > > broker.
> > > > > > > >> This was not discussed yet but if you see my original email
> i
> > > > > > suggested
> > > > > > > >> otherwise from the beginning.
> > > > > > > >>
> > > > > > > >>>
> > > > > > > >>> Here is an alternative approach by applying your idea not
> in
> > > the
> > > > > > > broker,
> > > > > > > >>> but in the consumer. When new partitions are added, we
> don't
> > > move
> > > > > > > >>> existing
> > > > > > > >>> data. In KStreams, we first reshuffle the new input data
> to a
> > > new
> > > > > > topic
> > > > > > > >>> T1
> > > > > > > >>> with the old number of partitions and feed T1's data to the
> > > rest
> > > > of
> > > > > > the
> > > > > > > >>> pipeline. In the meantime, KStreams reshuffles all existing
> > > data
> > > > of
> > > > > > the
> > > > > > > >>> change capture topic to another topic C1 with the new
> number
> > of
> > > > > > > >>> partitions.
> > > > > > > >>> We can then build the state of the new tasks from C1. Once
> > the
> > > > new
> > > > > > > states
> > > > > > > >>> have been fully built, we can cut over the consumption to
> the
> > > > input
> > > > > > > topic
> > > > > > > >>> and delete T1. This approach works with compacted topic
> too.
> > If
> > > > an
> > > > > > > >>> application reads from the beginning of a compacted topic,
> > the
> > > > > > consumer
> > > > > > > >>> will reshuffle the portion of the input when the number of
> > > > > partitions
> > > > > > > >>> doesn't match the number of tasks.
> > > > > > > >>>
> > > > > > > >> We all wipe this idea from our heads instantly. Mixing Ideas
> > > from
> > > > an
> > > > > > > >> argument is not a resolution strategy
> > > > > > > >> just leads to horrible horrible software.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>> The pros of this approach are:
> > > > > > > >>> 1. No need to copy existing data.
> > > > > > > >>> 2. Each consumer group can cut over to the new partitions
> > > > > > > independently.
> > > > > > > >>> 3. The state is rebuilt from the change capture topic,
> which
> > is
> > > > > > cheaper
> > > > > > > >>> than rebuilding from the input topic.
> > > > > > > >>> 4. Only the KStreams job needs to know the partitioning
> > > function.
> > > > > > > >>>
> > > > > > > >>> The cons of this approach are:
> > > > > > > >>> 1. Potentially the same input topic needs to be reshuffled
> > more
> > > > > than
> > > > > > > once
> > > > > > > >>> in different consumer groups during the transition phase.
> > > > > > > >>>
> > > > > > > >>> What do you think?
> > > > > > > >>>
> > > > > > > >>> Thanks,
> > > > > > > >>>
> > > > > > > >>> Jun
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <
> > > > > > > jan.filip...@trivago.com>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>> Hi Jun,
> > > > > > > >>>>
> > > > > > > >>>> thank you for following me on these thoughts. It was
> > important
> > > > to
> > > > > me
> > > > > > > to
> > > > > > > >>>> feel that kind of understanding for my arguments.
> > > > > > > >>>>
> > > > > > > >>>> What I was hoping for (I mentioned this earlier) is that
> we
> > > can
> > > > > > model
> > > > > > > >>>> the
> > > > > > > >>>> case where we do not want to copy the data the exact same
> > way
> > > as
> > > > > the
> > > > > > > >>>> case
> > > > > > > >>>> when we do copy the data. Maybe you can peek into the
> mails
> > > > before
> > > > > > to
> > > > > > > >>>> see
> > > > > > > >>>> more details for this.
> > > > > > > >>>>
> > > > > > > >>>> This means we have the same mechanism to transfer consumer
> > > > groups
> > > > > to
> > > > > > > >>>> switch topic. The offset mapping that would be generated
> > would
> > > > > even
> > > > > > be
> > > > > > > >>>> simpler End Offset of the Old topic => offset 0 off all
> the
> > > > > > partitions
> > > > > > > >>>> of
> > > > > > > >>>> the new topic. Then we could model the transition of a
> > > non-copy
> > > > > > > >>>> expansion
> > > > > > > >>>> the exact same way as a copy-expansion.
> > > > > > > >>>>
> > > > > > > >>>> I know this only works when topic growth by a factor. But
> > the
> > > > > > benefits
> > > > > > > >>>> of
> > > > > > > >>>> only growing by a factor are to strong anyways. See
> > Clemens's
> > > > hint
> > > > > > and
> > > > > > > >>>> remember that state reshuffling is entirely not needed if
> > one
> > > > > > doesn't
> > > > > > > >>>> want
> > > > > > > >>>> to grow processing power.
> > > > > > > >>>>
> > > > > > > >>>> I think these benefits should be clear, and that there is
> > > > > basically
> > > > > > no
> > > > > > > >>>> downside to what is currently at hand but just makes
> > > everything
> > > > > > easy.
> > > > > > > >>>>
> > > > > > > >>>> One thing you need to know is. that if you do not offer
> > > > > rebuilding a
> > > > > > > log
> > > > > > > >>>> compacted topic like i suggest that even if you have
> > consumer
> > > > > state
> > > > > > > >>>> reshuffling. The topic is broken and can not be used to
> > > > bootstrap
> > > > > > new
> > > > > > > >>>> consumers. They don't know if they need to apply a key
> from
> > > and
> > > > > old
> > > > > > > >>>> partition or not. This is a horrible downside I haven't
> > seen a
> > > > > > > solution
> > > > > > > >>>> for
> > > > > > > >>>> in the email conversation.
> > > > > > > >>>>
> > > > > > > >>>> I argue to:
> > > > > > > >>>>
> > > > > > > >>>> Only grow topic by a factor always.
> > > > > > > >>>> Have the "no copy consumer" transition as the trivial case
> > of
> > > > the
> > > > > > > "copy
> > > > > > > >>>> consumer transition".
> > > > > > > >>>> If processors needs to be scaled, let them rebuild from
> the
> > > new
> > > > > > topic
> > > > > > > >>>> and
> > > > > > > >>>> leave the old running in the mean time.
> > > > > > > >>>> Do not implement key shuffling in streams.
> > > > > > > >>>>
> > > > > > > >>>> I hope I can convince you especially with the fact how I
> > want
> > > to
> > > > > > > handle
> > > > > > > >>>> consumer transition. I think
> > > > > > > >>>> you didn't quite understood me there before. I think the
> > term
> > > > "new
> > > > > > > >>>> topic"
> > > > > > > >>>> intimidated you a little.
> > > > > > > >>>> How we solve this on disc doesn't really matter, If the
> data
> > > > goes
> > > > > > into
> > > > > > > >>>> the
> > > > > > > >>>> same Dir or a different Dir or anything. I do think that
> it
> > > > needs
> > > > > to
> > > > > > > >>>> involve at least rolling a new segment for the existing
> > > > > partitions.
> > > > > > > >>>> But most of the transitions should work without restarting
> > > > > > consumers.
> > > > > > > >>>> (newer consumers with support for this). But with new
> topic
> > i
> > > > just
> > > > > > > meant
> > > > > > > >>>> the topic that now has a different partition count. Plenty
> > of
> > > > ways
> > > > > > to
> > > > > > > >>>> handle that (versions, aliases)
> > > > > > > >>>>
> > > > > > > >>>> Hope I can further get my idea across.
> > > > > > > >>>>
> > > > > > > >>>> Best Jan
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>> On 14.03.2018 02:45, Jun Rao wrote:
> > > > > > > >>>>
> > > > > > > >>>> Hi, Jan,
> > > > > > > >>>>>
> > > > > > > >>>>> Thanks for sharing your view.
> > > > > > > >>>>>
> > > > > > > >>>>> I agree with you that recopying the data potentially
> makes
> > > the
> > > > > > state
> > > > > > > >>>>> management easier since the consumer can just rebuild its
> > > state
> > > > > > from
> > > > > > > >>>>> scratch (i.e., no need for state reshuffling).
> > > > > > > >>>>>
> > > > > > > >>>>> On the flip slide, I saw a few disadvantages of the
> > approach
> > > > that
> > > > > > you
> > > > > > > >>>>> suggested. (1) Building the state from the input topic
> from
> > > > > scratch
> > > > > > > is
> > > > > > > >>>>> in
> > > > > > > >>>>> general less efficient than state reshuffling. Let's say
> > one
> > > > > > > computes a
> > > > > > > >>>>> count per key from an input topic. The former requires
> > > reading
> > > > > all
> > > > > > > >>>>> existing
> > > > > > > >>>>> records in the input topic whereas the latter only
> requires
> > > > > reading
> > > > > > > >>>>> data
> > > > > > > >>>>> proportional to the number of unique keys. (2) The
> > switching
> > > of
> > > > > the
> > > > > > > >>>>> topic
> > > > > > > >>>>> needs modification to the application. If there are many
> > > > > > applications
> > > > > > > >>>>> on a
> > > > > > > >>>>> topic, coordinating such an effort may not be easy. Also,
> > > it's
> > > > > not
> > > > > > > >>>>> clear
> > > > > > > >>>>> how to enforce exactly-once semantic during the switch.
> (3)
> > > If
> > > > a
> > > > > > > topic
> > > > > > > >>>>> doesn't need any state management, recopying the data
> seems
> > > > > > wasteful.
> > > > > > > >>>>> In
> > > > > > > >>>>> that case, in place partition expansion seems more
> > desirable.
> > > > > > > >>>>>
> > > > > > > >>>>> I understand your concern about adding complexity in
> > > KStreams.
> > > > > But,
> > > > > > > >>>>> perhaps
> > > > > > > >>>>> we could iterate on that a bit more to see if it can be
> > > > > simplified.
> > > > > > > >>>>>
> > > > > > > >>>>> Jun
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > > >>>>> On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <
> > > > > > > >>>>> jan.filip...@trivago.com>
> > > > > > > >>>>> wrote:
> > > > > > > >>>>>
> > > > > > > >>>>> Hi Jun,
> > > > > > > >>>>>
> > > > > > > >>>>>> I will focus on point 61 as I think its _the_
> fundamental
> > > part
> > > > > > that
> > > > > > > I
> > > > > > > >>>>>> cant
> > > > > > > >>>>>> get across at the moment.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Kafka is the platform to have state materialized
> multiple
> > > > times
> > > > > > from
> > > > > > > >>>>>> one
> > > > > > > >>>>>> input. I emphasize this: It is the building block in
> > > > > architectures
> > > > > > > >>>>>> that
> > > > > > > >>>>>> allow you to
> > > > > > > >>>>>> have your state maintained multiple times. You put a
> > message
> > > > in
> > > > > > > once,
> > > > > > > >>>>>> and
> > > > > > > >>>>>> you have it pop out as often as you like. I believe you
> > > > > understand
> > > > > > > >>>>>> this.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Now! The path of thinking goes the following: I am using
> > > > apache
> > > > > > > kafka
> > > > > > > >>>>>> and
> > > > > > > >>>>>> I _want_ my state multiple times. What am I going todo?
> > > > > > > >>>>>>
> > > > > > > >>>>>> A) Am I going to take my state that I build up, plunge
> > some
> > > > sort
> > > > > > of
> > > > > > > >>>>>> RPC
> > > > > > > >>>>>> layer ontop of it, use that RPC layer to throw my
> records
> > > > across
> > > > > > > >>>>>> instances?
> > > > > > > >>>>>> B) Am I just going to read the damn message twice?
> > > > > > > >>>>>>
> > > > > > > >>>>>> Approach A is fundamentally flawed and a violation of
> all
> > > that
> > > > > is
> > > > > > > good
> > > > > > > >>>>>> and
> > > > > > > >>>>>> holy in kafka deployments. I can not understand how this
> > > Idea
> > > > > can
> > > > > > > >>>>>> come in
> > > > > > > >>>>>> the first place.
> > > > > > > >>>>>> (I do understand: IQ in streams, they polluted the kafka
> > > > streams
> > > > > > > >>>>>> codebase
> > > > > > > >>>>>> really bad already. It is not funny! I think they are
> > > equally
> > > > > > flawed
> > > > > > > >>>>>> as
> > > > > > > >>>>>> A)
> > > > > > > >>>>>>
> > > > > > > >>>>>> I say, we do what Kafka is good at. We repartition the
> > topic
> > > > > once.
> > > > > > > We
> > > > > > > >>>>>> switch the consumers.
> > > > > > > >>>>>> (Those that need more partitions are going to rebuild
> > their
> > > > > state
> > > > > > in
> > > > > > > >>>>>> multiple partitions by reading the new topic, those that
> > > don't
> > > > > > just
> > > > > > > >>>>>> assign
> > > > > > > >>>>>> the new partitions properly)
> > > > > > > >>>>>> We switch producers. Done!
> > > > > > > >>>>>>
> > > > > > > >>>>>> The best thing! It is trivial, hipster stream processor
> > will
> > > > > have
> > > > > > an
> > > > > > > >>>>>> easy
> > > > > > > >>>>>> time with that aswell. Its so super simple. And simple
> IS
> > > > good!
> > > > > > > >>>>>> It is what kafka was build todo. It is how we do it
> today.
> > > > All I
> > > > > > am
> > > > > > > >>>>>> saying
> > > > > > > >>>>>> is that a little broker help doing the producer swap is
> > > super
> > > > > > > useful.
> > > > > > > >>>>>>
> > > > > > > >>>>>> For everyone interested in why kafka is so powerful with
> > > > > approach
> > > > > > B,
> > > > > > > >>>>>> please watch https://youtu.be/bEbeZPVo98c?t=1633
> > > > > > > >>>>>> I already looked up a good point in time, I think after
> 5
> > > > > minutes
> > > > > > > the
> > > > > > > >>>>>> "state" topic is handled and you should be able to
> > > understand
> > > > me
> > > > > > > >>>>>> and inch better.
> > > > > > > >>>>>>
> > > > > > > >>>>>> Please do not do A to the project, it deserves better!
> > > > > > > >>>>>>
> > > > > > > >>>>>> Best Jan
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> On 13.03.2018 02:40, Jun Rao wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>> Hi, Jan,
> > > > > > > >>>>>>
> > > > > > > >>>>>>> Thanks for the reply. A few more comments below.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> 50. Ok, we can think a bit harder for supporting
> > compacted
> > > > > > topics.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> 51. This is a fundamental design question. In the more
> > > common
> > > > > > case,
> > > > > > > >>>>>>> the
> > > > > > > >>>>>>> reason why someone wants to increase the number of
> > > partitions
> > > > > is
> > > > > > > that
> > > > > > > >>>>>>> the
> > > > > > > >>>>>>> consumer application is slow and one wants to run more
> > > > consumer
> > > > > > > >>>>>>> instances
> > > > > > > >>>>>>> to increase the degree of parallelism. So, fixing the
> > > number
> > > > of
> > > > > > > >>>>>>> running
> > > > > > > >>>>>>> consumer instances when expanding the partitions won't
> > help
> > > > > this
> > > > > > > >>>>>>> case.
> > > > > > > >>>>>>> If
> > > > > > > >>>>>>> we do need to increase the number of consumer
> instances,
> > we
> > > > > need
> > > > > > to
> > > > > > > >>>>>>> somehow
> > > > > > > >>>>>>> reshuffle the state of the consumer across instances.
> > What
> > > we
> > > > > > have
> > > > > > > >>>>>>> been
> > > > > > > >>>>>>> discussing in this KIP is whether we can do this more
> > > > > effectively
> > > > > > > >>>>>>> through
> > > > > > > >>>>>>> the KStream library (e.g. through a 2-phase partition
> > > > > expansion).
> > > > > > > >>>>>>> This
> > > > > > > >>>>>>> will
> > > > > > > >>>>>>> add some complexity, but it's probably better than
> > everyone
> > > > > doing
> > > > > > > >>>>>>> this
> > > > > > > >>>>>>> in
> > > > > > > >>>>>>> the application space. The recopying approach that you
> > > > > mentioned
> > > > > > > >>>>>>> doesn't
> > > > > > > >>>>>>> seem to address the consumer state management issue
> when
> > > the
> > > > > > > consumer
> > > > > > > >>>>>>> switches from an old to a new topic.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> 52. As for your example, it depends on whether the join
> > key
> > > > is
> > > > > > the
> > > > > > > >>>>>>> same
> > > > > > > >>>>>>> between (A,B) and (B,C). If the join key is the same,
> we
> > > can
> > > > > do a
> > > > > > > >>>>>>> 2-phase
> > > > > > > >>>>>>> partition expansion of A, B, and C together. If the
> join
> > > keys
> > > > > are
> > > > > > > >>>>>>> different, one would need to repartition the data on a
> > > > > different
> > > > > > > key
> > > > > > > >>>>>>> for
> > > > > > > >>>>>>> the second join, then the partition expansion can be
> done
> > > > > > > >>>>>>> independently
> > > > > > > >>>>>>> between (A,B) and (B,C).
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> 53. If you always fix the number of consumer instances,
> > we
> > > > you
> > > > > > > >>>>>>> described
> > > > > > > >>>>>>> works. However, as I mentioned in #51, I am not sure
> how
> > > your
> > > > > > > >>>>>>> proposal
> > > > > > > >>>>>>> deals with consumer states when the number of consumer
> > > > > instances
> > > > > > > >>>>>>> grows.
> > > > > > > >>>>>>> Also, it just seems that it's better to avoid
> re-copying
> > > the
> > > > > > > existing
> > > > > > > >>>>>>> data.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> 60. "just want to throw in my question from the longer
> > > email
> > > > in
> > > > > > the
> > > > > > > >>>>>>> other
> > > > > > > >>>>>>> Thread here. How will the bloom filter help a new
> > consumer
> > > to
> > > > > > > decide
> > > > > > > >>>>>>> to
> > > > > > > >>>>>>> apply the key or not?" Not sure that I fully understood
> > > your
> > > > > > > >>>>>>> question.
> > > > > > > >>>>>>> The
> > > > > > > >>>>>>> consumer just reads whatever key is in the log. The
> bloom
> > > > > filter
> > > > > > > just
> > > > > > > >>>>>>> helps
> > > > > > > >>>>>>> clean up the old keys.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> 61. "Why can we afford having a topic where its
> > apparently
> > > > not
> > > > > > > >>>>>>> possible
> > > > > > > >>>>>>> to
> > > > > > > >>>>>>> start a new application on? I think this is an overall
> > flaw
> > > > of
> > > > > > the
> > > > > > > >>>>>>> discussed idea here. Not playing attention to the
> overall
> > > > > > > >>>>>>> architecture."
> > > > > > > >>>>>>> Could you explain a bit more when one can't start a new
> > > > > > > application?
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Jun
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <
> > > > > > > >>>>>>> jan.filip...@trivago.com
> > > > > > > >>>>>>> wrote:
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Hi Jun, thanks for your mail.
> > > > > > > >>>>>>>
> > > > > > > >>>>>>> Thank you for your questions!
> > > > > > > >>>>>>>> I think they are really good and tackle the core of
> the
> > > > > problem
> > > > > > I
> > > > > > > >>>>>>>> see.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> I will answer inline, mostly but still want to set the
> > > tone
> > > > > > here.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> The core strength of kafka is what Martin once called
> > the
> > > > > > > >>>>>>>> kappa-Architecture. How does this work?
> > > > > > > >>>>>>>> You have everything as a log as in kafka. When you
> need
> > to
> > > > > > change
> > > > > > > >>>>>>>> something.
> > > > > > > >>>>>>>> You create the new version of your application and
> leave
> > > it
> > > > > > > running
> > > > > > > >>>>>>>> in
> > > > > > > >>>>>>>> parallel.
> > > > > > > >>>>>>>> Once the new version is good you switch your users to
> > use
> > > > the
> > > > > > new
> > > > > > > >>>>>>>> application.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> The online reshuffling effectively breaks this
> > > architecture
> > > > > and
> > > > > > I
> > > > > > > >>>>>>>> think
> > > > > > > >>>>>>>> the switch in thinking here is more harmful
> > > > > > > >>>>>>>> than any details about the partitioning function to
> > allow
> > > > > such a
> > > > > > > >>>>>>>> change.
> > > > > > > >>>>>>>> I
> > > > > > > >>>>>>>> feel with my suggestion we are the closest to
> > > > > > > >>>>>>>> the original and battle proven architecture and I can
> > only
> > > > > warn
> > > > > > to
> > > > > > > >>>>>>>> move
> > > > > > > >>>>>>>> away from it.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> I might have forgotten something, sometimes its hard
> for
> > > me
> > > > to
> > > > > > > >>>>>>>> getting
> > > > > > > >>>>>>>> all
> > > > > > > >>>>>>>> the thoughts captured in a mail, but I hope the
> comments
> > > > > inline
> > > > > > > will
> > > > > > > >>>>>>>> further make my concern clear, and put some emphasis
> on
> > > why
> > > > I
> > > > > > > >>>>>>>> prefer my
> > > > > > > >>>>>>>> solution ;)
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> One thing we should all be aware of when discussing
> > this,
> > > > and
> > > > > I
> > > > > > > >>>>>>>> think
> > > > > > > >>>>>>>> Dong
> > > > > > > >>>>>>>> should have mentioned it (maybe he did).
> > > > > > > >>>>>>>> We are not discussing all of this out of thin air but
> > > there
> > > > is
> > > > > > an
> > > > > > > >>>>>>>> effort
> > > > > > > >>>>>>>> in the Samza project.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> https://cwiki.apache.org/confl
> uence/display/SAMZA/SEP-
> > > 5%3A+
> > > > > > > >>>>>>>> Enable+partition+expansion+of+input+streams
> > > > > > > >>>>>>>> https://issues.apache.org/jira/browse/SAMZA-1293
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> To be clear. I think SEP-5 (state of last week, dont
> > know
> > > if
> > > > > it
> > > > > > > >>>>>>>> adapted
> > > > > > > >>>>>>>> to
> > > > > > > >>>>>>>> this discussion) is on a way better path than KIP-253,
> > > and I
> > > > > > can't
> > > > > > > >>>>>>>> really
> > > > > > > >>>>>>>> explain why.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Best Jan,
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> nice weekend everyone
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> On 09.03.2018 03:36, Jun Rao wrote:
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Hi, Jan,
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Thanks for the feedback. Just some comments on the
> > earlier
> > > > > > points
> > > > > > > >>>>>>>>> that
> > > > > > > >>>>>>>>> you
> > > > > > > >>>>>>>>> mentioned.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> 50. You brought up the question of whether existing
> > data
> > > > > needs
> > > > > > to
> > > > > > > >>>>>>>>> be
> > > > > > > >>>>>>>>> copied
> > > > > > > >>>>>>>>> during partition expansion. My understand of your
> view
> > is
> > > > > that
> > > > > > > >>>>>>>>> avoid
> > > > > > > >>>>>>>>> copying existing data will be more efficient, but it
> > > > doesn't
> > > > > > work
> > > > > > > >>>>>>>>> well
> > > > > > > >>>>>>>>> with
> > > > > > > >>>>>>>>> compacted topics since some keys in the original
> > > partitions
> > > > > > will
> > > > > > > >>>>>>>>> never
> > > > > > > >>>>>>>>> be
> > > > > > > >>>>>>>>> cleaned. It would be useful to understand your use
> case
> > > of
> > > > > > > >>>>>>>>> compacted
> > > > > > > >>>>>>>>> topics
> > > > > > > >>>>>>>>> a bit more. In the common use case, the data volume
> in
> > a
> > > > > > > compacted
> > > > > > > >>>>>>>>> topic
> > > > > > > >>>>>>>>> may not be large. So, I am not sure if there is a
> > strong
> > > > need
> > > > > > to
> > > > > > > >>>>>>>>> expand
> > > > > > > >>>>>>>>> partitions in a compacted topic, at least initially.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> I do agree. State is usually smaller. Update rates
> > might
> > > be
> > > > > > also
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> competitively high.
> > > > > > > >>>>>>>> Doing Log-compaction (even beeing very efficient and
> > > > > > configurable)
> > > > > > > >>>>>>>> is
> > > > > > > >>>>>>>> also
> > > > > > > >>>>>>>> a more expensive operation than
> > > > > > > >>>>>>>> just discarding old segments. Further if you want to
> use
> > > > more
> > > > > > > >>>>>>>> consumers
> > > > > > > >>>>>>>> processing the events
> > > > > > > >>>>>>>> you also have to grow the number of partitions.
> > Especially
> > > > for
> > > > > > > >>>>>>>> use-cases
> > > > > > > >>>>>>>> we do (KIP-213) a tiny state full
> > > > > > > >>>>>>>> table might be very expensive to process if it joins
> > > > against a
> > > > > > > huge
> > > > > > > >>>>>>>> table.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> I can just say we have been in the spot of needing to
> > grow
> > > > log
> > > > > > > >>>>>>>> compacted
> > > > > > > >>>>>>>> topics. Mainly for processing power we can bring to
> the
> > > > table.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Further i am not at all concerned about the extra
> spaced
> > > > used
> > > > > by
> > > > > > > >>>>>>>> "garbage
> > > > > > > >>>>>>>> keys". I am more concerned about the correctness of
> > > innocent
> > > > > > > >>>>>>>> consumers.
> > > > > > > >>>>>>>> The
> > > > > > > >>>>>>>> logic becomes complicated. Say for streams one would
> > need
> > > to
> > > > > > load
> > > > > > > >>>>>>>> the
> > > > > > > >>>>>>>> record into state but not forward it the topology ( to
> > > have
> > > > it
> > > > > > > >>>>>>>> available
> > > > > > > >>>>>>>> for shuffeling). I rather have it simple and a topic
> > clean
> > > > > > > >>>>>>>> regardless
> > > > > > > >>>>>>>> if
> > > > > > > >>>>>>>> it
> > > > > > > >>>>>>>> still has its old partition count. Especially with
> > > multiple
> > > > > > > >>>>>>>> partitions
> > > > > > > >>>>>>>> growth's I think it becomes insanely hard to to this
> > > shuffle
> > > > > > > >>>>>>>> correct.
> > > > > > > >>>>>>>> Maybe
> > > > > > > >>>>>>>> Streams and Samza can do it. Especially if you do
> > "hipster
> > > > > > stream
> > > > > > > >>>>>>>> processing" <https://www.confluent.io/blog
> > > > > > > >>>>>>>> /introducing-kafka-streams-
> > > > > > > >>>>>>>> stream-processing-made-simple/>. This makes kafka way
> > to
> > > > > > > >>>>>>>> complicated.
> > > > > > > >>>>>>>> With my approach I think its way simpler because the
> > topic
> > > > has
> > > > > > no
> > > > > > > >>>>>>>> "history"
> > > > > > > >>>>>>>> in terms of partitioning but is always clean.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 51. "Growing the topic by an integer factor does not
> > > require
> > > > > any
> > > > > > > >>>>>>>> state
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> redistribution at all." Could you clarify this a bit
> > more?
> > > > > Let's
> > > > > > > say
> > > > > > > >>>>>>>>> you
> > > > > > > >>>>>>>>> have a consumer app that computes the windowed count
> > per
> > > > key.
> > > > > > If
> > > > > > > >>>>>>>>> you
> > > > > > > >>>>>>>>> double
> > > > > > > >>>>>>>>> the number of partitions from 1 to 2 and grow the
> > > consumer
> > > > > > > >>>>>>>>> instances
> > > > > > > >>>>>>>>> from
> > > > > > > >>>>>>>>> 1
> > > > > > > >>>>>>>>> to 2, we would need to redistribute some of the
> counts
> > to
> > > > the
> > > > > > new
> > > > > > > >>>>>>>>> consumer
> > > > > > > >>>>>>>>> instance. Regarding to linear hashing, it's true that
> > it
> > > > > won't
> > > > > > > >>>>>>>>> solve
> > > > > > > >>>>>>>>> the
> > > > > > > >>>>>>>>> problem with compacted topics. The main benefit is
> that
> > > it
> > > > > > > >>>>>>>>> redistributes
> > > > > > > >>>>>>>>> the keys in one partition to no more than two
> > partitions,
> > > > > which
> > > > > > > >>>>>>>>> could
> > > > > > > >>>>>>>>> help
> > > > > > > >>>>>>>>> redistribute the state.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> You don't need to spin up a new consumer in this
> case.
> > > > every
> > > > > > > >>>>>>>>> consumer
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> would just read every partition with the (partition %
> > > > > num_task)
> > > > > > > >>>>>>>> task.
> > > > > > > >>>>>>>> it
> > > > > > > >>>>>>>> will still have the previous data right there and can
> go
> > > on.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> This sounds contradictory to what I said before, but
> > > please
> > > > > bear
> > > > > > > >>>>>>>> with
> > > > > > > >>>>>>>> me.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 52. Good point on coordinating the expansion of 2
> topics
> > > > that
> > > > > > need
> > > > > > > >>>>>>>> to
> > > > > > > >>>>>>>> be
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> joined together. This is where the 2-phase partition
> > > > expansion
> > > > > > > could
> > > > > > > >>>>>>>>> potentially help. In the first phase, we could add
> new
> > > > > > partitions
> > > > > > > >>>>>>>>> to
> > > > > > > >>>>>>>>> the 2
> > > > > > > >>>>>>>>> topics one at a time but without publishing to the
> new
> > > > > > patitions.
> > > > > > > >>>>>>>>> Then,
> > > > > > > >>>>>>>>> we
> > > > > > > >>>>>>>>> can add new consumer instances to pick up the new
> > > > partitions.
> > > > > > In
> > > > > > > >>>>>>>>> this
> > > > > > > >>>>>>>>> transition phase, no reshuffling is needed since no
> > data
> > > is
> > > > > > > coming
> > > > > > > >>>>>>>>> from
> > > > > > > >>>>>>>>> the
> > > > > > > >>>>>>>>> new partitions. Finally, we can enable the publishing
> > to
> > > > the
> > > > > > new
> > > > > > > >>>>>>>>> partitions.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> I think its even worse than you think. I would like
> to
> > > > > > introduce
> > > > > > > >>>>>>>>> the
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Term
> > > > > > > >>>>>>>> transitive copartitioning. Imagine
> > > > > > > >>>>>>>> 2 streams application. One joins (A,B) the other (B,C)
> > > then
> > > > > > there
> > > > > > > >>>>>>>> is a
> > > > > > > >>>>>>>> transitive copartition requirement for
> > > > > > > >>>>>>>> (A,C) to be copartitioned aswell. This can spread
> > > > > significantly
> > > > > > > and
> > > > > > > >>>>>>>> require many consumers to adapt at the same time.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> It is also not entirely clear to me how you not need
> > > > > reshuffling
> > > > > > > in
> > > > > > > >>>>>>>> this
> > > > > > > >>>>>>>> case. If A has a record that never gets updated after
> > the
> > > > > > > expansion
> > > > > > > >>>>>>>> and
> > > > > > > >>>>>>>> the
> > > > > > > >>>>>>>> coresponding B record moves to a new partition. How
> > shall
> > > > they
> > > > > > > meet
> > > > > > > >>>>>>>> w/o
> > > > > > > >>>>>>>> shuffle?
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 53. "Migrating consumer is a step that might be made
> > > > completly
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> unnecessary
> > > > > > > >>>>>>>>> if - for example streams - takes the gcd as
> > partitioning
> > > > > scheme
> > > > > > > >>>>>>>>> instead
> > > > > > > >>>>>>>>> of
> > > > > > > >>>>>>>>> enforcing 1 to 1." Not sure that I fully understand
> > > this. I
> > > > > > think
> > > > > > > >>>>>>>>> you
> > > > > > > >>>>>>>>> mean
> > > > > > > >>>>>>>>> that a consumer application can run more instances
> than
> > > the
> > > > > > > number
> > > > > > > >>>>>>>>> of
> > > > > > > >>>>>>>>> partitions. In that case, the consumer can just
> > > > > repartitioning
> > > > > > > the
> > > > > > > >>>>>>>>> input
> > > > > > > >>>>>>>>> data according to the number of instances. This is
> > > > possible,
> > > > > > but
> > > > > > > >>>>>>>>> just
> > > > > > > >>>>>>>>> has
> > > > > > > >>>>>>>>> the overhead of reshuffling the data.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> No what I meant is ( that is also your question i
> think
> > > > > > Mathias)
> > > > > > > >>>>>>>>> that
> > > > > > > >>>>>>>>> if
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> you grow a topic by a factor.
> > > > > > > >>>>>>>> Even if your processor is statefull you can can just
> > > assign
> > > > > all
> > > > > > > the
> > > > > > > >>>>>>>> multiples of the previous partition to
> > > > > > > >>>>>>>> this consumer and the state to keep processing
> correctly
> > > > will
> > > > > be
> > > > > > > >>>>>>>> present
> > > > > > > >>>>>>>> w/o any shuffling.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Say you have an assignment
> > > > > > > >>>>>>>> Statefull consumer => partition
> > > > > > > >>>>>>>> 0 => 0
> > > > > > > >>>>>>>> 1 => 1
> > > > > > > >>>>>>>> 2 => 2
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> and you grow you topic by 4 you get,
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 0 => 0,3,6,9
> > > > > > > >>>>>>>> 1 => 1,4,7,10
> > > > > > > >>>>>>>> 2 => 2,5,8,11
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Say your hashcode is 8. 8%3 => 2  before so consumer
> for
> > > > > > > partition 2
> > > > > > > >>>>>>>> has
> > > > > > > >>>>>>>> it.
> > > > > > > >>>>>>>> Now you you have 12 partitions so 8%12 => 8, so it
> goes
> > > into
> > > > > > > >>>>>>>> partition
> > > > > > > >>>>>>>> 8
> > > > > > > >>>>>>>> which is assigned to the same consumer
> > > > > > > >>>>>>>> who had 2 before and therefore knows the key.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Userland reshuffeling is there as an options. And it
> > does
> > > > > > exactly
> > > > > > > >>>>>>>> what
> > > > > > > >>>>>>>> I
> > > > > > > >>>>>>>> suggest. And I think its the perfect strategie. All I
> am
> > > > > > > suggestion
> > > > > > > >>>>>>>> is
> > > > > > > >>>>>>>> broker side support to switch the producers to the
> newly
> > > > > > > partitioned
> > > > > > > >>>>>>>> topic.
> > > > > > > >>>>>>>> Then the old (to few partition topic) can go away.
> > > Remember
> > > > > the
> > > > > > > >>>>>>>> list
> > > > > > > >>>>>>>> of
> > > > > > > >>>>>>>> steps in the beginning of this thread. If one has
> broker
> > > > > support
> > > > > > > for
> > > > > > > >>>>>>>> all
> > > > > > > >>>>>>>> where its required and streams support for those that
> > > aren’t
> > > > > > > >>>>>>>> necessarily.
> > > > > > > >>>>>>>> Then one has solved the problem.
> > > > > > > >>>>>>>> I repeat it because I think its important. I am really
> > > happy
> > > > > > that
> > > > > > > >>>>>>>> you
> > > > > > > >>>>>>>> brought that up! because its 100% what I want just
> with
> > > the
> > > > > > > >>>>>>>> differences
> > > > > > > >>>>>>>> to
> > > > > > > >>>>>>>> have an option to discard the to small topic later
> > (after
> > > > all
> > > > > > > >>>>>>>> consumers
> > > > > > > >>>>>>>> adapted). And to have order correct there. I need
> broker
> > > > > support
> > > > > > > >>>>>>>> managing
> > > > > > > >>>>>>>> the copy process + the produces and fence them against
> > > each
> > > > > > > other. I
> > > > > > > >>>>>>>> also
> > > > > > > >>>>>>>> repeat. the copy process can run for weeks in the
> worst
> > > > case.
> > > > > > > >>>>>>>> Copying
> > > > > > > >>>>>>>> the
> > > > > > > >>>>>>>> data is not the longest task migrating consumers might
> > > very
> > > > > well
> > > > > > > be.
> > > > > > > >>>>>>>> Once all consumers switched and copying is really up
> to
> > > date
> > > > > > > (think
> > > > > > > >>>>>>>> ISR
> > > > > > > >>>>>>>> like up to date) only then we stop the producer, wait
> > for
> > > > the
> > > > > > copy
> > > > > > > >>>>>>>> to
> > > > > > > >>>>>>>> finish and use the new topic for producing.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> After this the topic is perfect in shape. and no one
> > needs
> > > > to
> > > > > > > worry
> > > > > > > >>>>>>>> about
> > > > > > > >>>>>>>> complicated stuff. (old keys hanging around might
> arrive
> > > in
> > > > > some
> > > > > > > >>>>>>>> other
> > > > > > > >>>>>>>> topic later.....). can only imagine how many tricky
> bugs
> > > > gonna
> > > > > > > >>>>>>>> arrive
> > > > > > > >>>>>>>> after
> > > > > > > >>>>>>>> someone had grown and shrunken is topic 10 times.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> 54. "The other thing I wanted to mention is that I
> > believe
> > > > the
> > > > > > > >>>>>>>> current
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> suggestion (without copying data over) can be
> > implemented
> > > in
> > > > > > pure
> > > > > > > >>>>>>>>> userland
> > > > > > > >>>>>>>>> with a custom partitioner and a small feedbackloop
> from
> > > > > > > >>>>>>>>> ProduceResponse
> > > > > > > >>>>>>>>> =>
> > > > > > > >>>>>>>>> Partitionier in coorporation with a change management
> > > > > system."
> > > > > > I
> > > > > > > am
> > > > > > > >>>>>>>>> not
> > > > > > > >>>>>>>>> sure a customized partitioner itself solves the
> > problem.
> > > We
> > > > > > > >>>>>>>>> probably
> > > > > > > >>>>>>>>> need
> > > > > > > >>>>>>>>> some broker side support to enforce when the new
> > > partitions
> > > > > can
> > > > > > > be
> > > > > > > >>>>>>>>> used.
> > > > > > > >>>>>>>>> We
> > > > > > > >>>>>>>>> also need some support on the consumer/kstream side
> to
> > > > > preserve
> > > > > > > the
> > > > > > > >>>>>>>>> per
> > > > > > > >>>>>>>>> key
> > > > > > > >>>>>>>>> ordering and potentially migrate the processing
> state.
> > > This
> > > > > is
> > > > > > > not
> > > > > > > >>>>>>>>> trivial
> > > > > > > >>>>>>>>> and I am not sure if it's ideal to fully push to the
> > > > > > application
> > > > > > > >>>>>>>>> space.
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Broker support is defenitly the preferred way here. I
> > > have
> > > > > > > nothing
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> against
> > > > > > > >>>>>>>> broker support.
> > > > > > > >>>>>>>> I tried to say that for what I would preffer - copying
> > the
> > > > > data
> > > > > > > >>>>>>>> over,
> > > > > > > >>>>>>>> at
> > > > > > > >>>>>>>> least for log compacted topics -
> > > > > > > >>>>>>>> I would require more broker support than the KIP
> > currently
> > > > > > offers.
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> Jun
> > > > > > > >>>>>>>>
> > > > > > > >>>>>>>> On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <
> > > > > > > >>>>>>>>> jan.filip...@trivago.com
> > > > > > > >>>>>>>>> wrote:
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> Hi Dong,
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>> are you actually reading my emails, or are you just
> > using
> > > > the
> > > > > > > >>>>>>>>> thread I
> > > > > > > >>>>>>>>>
> > > > > > > >>>>>>>>>> started for general announcements regarding the KIP?
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> I tried to argue really hard against linear hashing.
> > > > Growing
> > > > > > the
> > > > > > > >>>>>>>>>> topic
> > > > > > > >>>>>>>>>> by
> > > > > > > >>>>>>>>>> an integer factor does not require any state
> > > > redistribution
> > > > > at
> > > > > > > >>>>>>>>>> all. I
> > > > > > > >>>>>>>>>> fail
> > > > > > > >>>>>>>>>> to see completely where linear hashing helps on log
> > > > > compacted
> > > > > > > >>>>>>>>>> topics.
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> If you are not willing to explain to me what I might
> > be
> > > > > > > >>>>>>>>>> overlooking:
> > > > > > > >>>>>>>>>> that
> > > > > > > >>>>>>>>>> is fine.
> > > > > > > >>>>>>>>>> But I ask you to not reply to my emails then. Please
> > > > > > understand
> > > > > > > my
> > > > > > > >>>>>>>>>> frustration with this.
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Best Jan
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> On 06.03.2018 19:38, Dong Lin wrote:
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Hi everyone,
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>> Thanks for all the comments! It appears that
> everyone
> > > > > prefers
> > > > > > > >>>>>>>>>> linear
> > > > > > > >>>>>>>>>>
> > > > > > > >>>>>>>>>>> hashing because it reduces the amount of state that
> > > needs
> > > > > to
> > > > > > be
> > > > > > > >>>>>>>>>>> moved
> > > > > > > >>>>>>>>>>> between consumers (for stream processing). The KIP
> > has
> > > > been
> > > > > > > >>>>>>>>>>> updated
> > > > > > > >>>>>>>>>>> to
> > > > > > > >>>>>>>>>>> use
> > > > > > > >>>>>>>>>>> linear hashing.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Regarding the migration endeavor: it seems that
> > > migrating
> > > > > > > >>>>>>>>>>> producer
> > > > > > > >>>>>>>>>>> library
> > > > > > > >>>>>>>>>>> to use linear hashing should be pretty
> > straightforward
> > > > > > without
> > > > > > > >>>>>>>>>>> much operational endeavor. If we don't upgrade
> client
> > > > > library
> > > > > > > to
> > > > > > > >>>>>>>>>>> use
> > > > > > > >>>>>>>>>>> this
> > > > > > > >>>>>>>>>>> KIP, we can not support in-order delivery after
> > > partition
> > > > > is
> > > > > > > >>>>>>>>>>> changed
> > > > > > > >>>>>>>>>>> anyway. Suppose we upgrade client library to use
> this
> > > > KIP,
> > > > > if
> > > > > > > >>>>>>>>>>> partition
> > > > > > > >>>>>>>>>>> number is not changed, the key -> partition mapping
> > > will
> > > > be
> > > > > > > >>>>>>>>>>> exactly
> > > > > > > >>>>>>>>>>> the
> > > > > > > >>>>>>>>>>> same as it is now because it is still determined
> > using
> > > > > > > >>>>>>>>>>> murmur_hash(key)
> > > > > > > >>>>>>>>>>> %
> > > > > > > >>>>>>>>>>> original_partition_num. In other words, this change
> > is
> > > > > > backward
> > > > > > > >>>>>>>>>>> compatible.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Regarding the load distribution: if we use linear
> > > > hashing,
> > > > > > the
> > > > > > > >>>>>>>>>>> load
> > > > > > > >>>>>>>>>>> may
> > > > > > > >>>>>>>>>>> be
> > > > > > > >>>>>>>>>>> unevenly distributed because those partitions which
> > are
> > > > not
> > > > > > > split
> > > > > > > >>>>>>>>>>> may
> > > > > > > >>>>>>>>>>> receive twice as much traffic as other partitions
> > that
> > > > are
> > > > > > > split.
> > > > > > > >>>>>>>>>>> This
> > > > > > > >>>>>>>>>>> issue can be mitigated by creating topic with
> > > partitions
> > > > > that
> > > > > > > are
> > > > > > > >>>>>>>>>>> several
> > > > > > > >>>>>>>>>>> times the number of consumers. And there will be no
> > > > > imbalance
> > > > > > > if
> > > > > > > >>>>>>>>>>> the
> > > > > > > >>>>>>>>>>> partition number is always doubled. So this
> imbalance
> > > > seems
> > > > > > > >>>>>>>>>>> acceptable.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Regarding storing the partition strategy as
> per-topic
> > > > > config:
> > > > > > > It
> > > > > > > >>>>>>>>>>> seems
> > > > > > > >>>>>>>>>>> not
> > > > > > > >>>>>>>>>>> necessary since we can still use murmur_hash as the
> > > > default
> > > > > > > hash
> > > > > > > >>>>>>>>>>> function
> > > > > > > >>>>>>>>>>> and additionally apply the linear hashing algorithm
> > if
> > > > the
> > > > > > > >>>>>>>>>>> partition
> > > > > > > >>>>>>>>>>> number
> > > > > > > >>>>>>>>>>> has increased. Not sure if there is any use-case
> for
> > > > > producer
> > > > > > > to
> > > > > > > >>>>>>>>>>> use a
> > > > > > > >>>>>>>>>>> different hash function. Jason, can you check if
> > there
> > > is
> > > > > > some
> > > > > > > >>>>>>>>>>> use-case
> > > > > > > >>>>>>>>>>> that I missed for using the per-topic partition
> > > strategy?
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Regarding how to reduce latency (due to state
> > > store/load)
> > > > > in
> > > > > > > >>>>>>>>>>> stream
> > > > > > > >>>>>>>>>>> processing consumer when partition number changes:
> I
> > > need
> > > > > to
> > > > > > > read
> > > > > > > >>>>>>>>>>> the
> > > > > > > >>>>>>>>>>> Kafka
> > > > > > > >>>>>>>>>>> Stream code to understand how Kafka Stream
> currently
> > > > > migrate
> > > > > > > >>>>>>>>>>> state
> > > > > > > >>>>>>>>>>> between
> > > > > > > >>>>>>>>>>> consumers when the application is added/removed
> for a
> > > > given
> > > > > > > job.
> > > > > > > >>>>>>>>>>> I
> > > > > > > >>>>>>>>>>> will
> > > > > > > >>>>>>>>>>> reply after I finish reading the documentation and
> > > code.
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Thanks,
> > > > > > > >>>>>>>>>>> Dong
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
> > > > > > > >>>>>>>>>>> ja...@confluent.io>
> > > > > > > >>>>>>>>>>> wrote:
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Great discussion. I think I'm wondering whether we
> > can
> > > > > > continue
> > > > > > > >>>>>>>>>>> to
> > > > > > > >>>>>>>>>>> leave
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> Kafka agnostic to the partitioning strategy. The
> > > > challenge
> > > > > is
> > > > > > > >>>>>>>>>>>
> > > > > > > >>>>>>>>>>> communicating
> > > > > > > >>>>>>>>>>>> the partitioning logic from producers to consumers
> > so
> > > > that
> > > > > > the
> > > > > > > >>>>>>>>>>>> dependencies
> > > > > > > >>>>>>>>>>>> between each epoch can be determined. For the sake
> > of
> > > > > > > >>>>>>>>>>>> discussion,
> > > > > > > >>>>>>>>>>>> imagine
> > > > > > > >>>>>>>>>>>> you did something like the following:
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> 1. The name (and perhaps version) of a
> partitioning
> > > > > strategy
> > > > > > > is
> > > > > > > >>>>>>>>>>>> stored
> > > > > > > >>>>>>>>>>>> in
> > > > > > > >>>>>>>>>>>> topic configuration when a topic is created.
> > > > > > > >>>>>>>>>>>> 2. The producer looks up the partitioning strategy
> > > > before
> > > > > > > >>>>>>>>>>>> writing
> > > > > > > >>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>> a
> > > > > > > >>>>>>>>>>>> topic and includes it in the produce request (for
> > > > > fencing).
> > > > > > If
> > > > > > > >>>>>>>>>>>> it
> > > > > > > >>>>>>>>>>>> doesn't
> > > > > > > >>>>>>>>>>>> have an implementation for the configured
> strategy,
> > it
> > > > > > fails.
> > > > > > > >>>>>>>>>>>> 3. The consumer also looks up the partitioning
> > > strategy
> > > > > and
> > > > > > > >>>>>>>>>>>> uses it
> > > > > > > >>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>> determine dependencies when reading a new epoch.
> It
> > > > could
> > > > > > > either
> > > > > > > >>>>>>>>>>>> fail
> > > > > > > >>>>>>>>>>>> or
> > > > > > > >>>>>>>>>>>> make the most conservative dependency assumptions
> if
> > > it
> > > > > > > doesn't
> > > > > > > >>>>>>>>>>>> know
> > > > > > > >>>>>>>>>>>> how
> > > > > > > >>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>> implement the partitioning strategy. For the
> > consumer,
> > > > the
> > > > > > new
> > > > > > > >>>>>>>>>>>> interface
> > > > > > > >>>>>>>>>>>> might look something like this:
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> // Return the partition dependencies following an
> > > epoch
> > > > > bump
> > > > > > > >>>>>>>>>>>> Map<Integer, List<Integer>> dependencies(int
> > > > > > > >>>>>>>>>>>> numPartitionsBeforeEpochBump,
> > > > > > > >>>>>>>>>>>> int numPartitionsAfterEpochBump)
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> The unordered case then is just a particular
> > > > > implementation
> > > > > > > >>>>>>>>>>>> which
> > > > > > > >>>>>>>>>>>> never
> > > > > > > >>>>>>>>>>>> has
> > > > > > > >>>>>>>>>>>> any epoch dependencies. To implement this, we
> would
> > > need
> > > > > > some
> > > > > > > >>>>>>>>>>>> way
> > > > > > > >>>>>>>>>>>> for
> > > > > > > >>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>> consumer to find out how many partitions there
> were
> > in
> > > > > each
> > > > > > > >>>>>>>>>>>> epoch,
> > > > > > > >>>>>>>>>>>> but
> > > > > > > >>>>>>>>>>>> maybe that's not too unreasonable.
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Thanks,
> > > > > > > >>>>>>>>>>>> Jason
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
> > > > > > > >>>>>>>>>>>> jan.filip...@trivago.com
> > > > > > > >>>>>>>>>>>> wrote:
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Hi Dong
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> thank you very much for your questions.
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> regarding the time spend copying data across:
> > > > > > > >>>>>>>>>>>>> It is correct that copying data from a topic with
> > one
> > > > > > > partition
> > > > > > > >>>>>>>>>>>>> mapping
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> a topic with a different partition mapping takes
> > way
> > > > > longer
> > > > > > > >>>>>>>>>>>>> than
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> we
> > > > > > > >>>>>>>>>>>> can
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> stop producers. Tens of minutes is a very
> optimistic
> > > > > > estimate
> > > > > > > >>>>>>>>>>>> here.
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Many
> > > > > > > >>>>>>>>>>>>> people can not afford copy full steam and
> therefore
> > > > will
> > > > > > have
> > > > > > > >>>>>>>>>>>>> some
> > > > > > > >>>>>>>>>>>>> rate
> > > > > > > >>>>>>>>>>>>> limiting in place, this can bump the timespan
> into
> > > the
> > > > > > day's.
> > > > > > > >>>>>>>>>>>>> The
> > > > > > > >>>>>>>>>>>>> good
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> part
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> is that the vast majority of the data can be
> copied
> > > > while
> > > > > > the
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> producers
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> are
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> still going. One can then, piggyback the
> consumers
> > > > ontop
> > > > > of
> > > > > > > >>>>>>>>>>>>> this
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> timeframe,
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> by the method mentioned (provide them an mapping
> > from
> > > > > their
> > > > > > > old
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> offsets
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> new offsets in their repartitioned topics. In
> that
> > > way
> > > > we
> > > > > > > >>>>>>>>>>>>> separate
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> migration of consumers from migration of producers
> > > > > > (decoupling
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> these
> > > > > > > >>>>>>>>>>>>> is
> > > > > > > >>>>>>>>>>>>> what kafka is strongest at). The time to actually
> > > swap
> > > > > over
> > > > > > > the
> > > > > > > >>>>>>>>>>>>> producers
> > > > > > > >>>>>>>>>>>>> should be kept minimal by ensuring that when a
> swap
> > > > > attempt
> > > > > > > is
> > > > > > > >>>>>>>>>>>>> started
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> consumer copying over should be very close to the
> > log
> > > > end
> > > > > > and
> > > > > > > >>>>>>>>>>>>> is
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> expected
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> to finish within the next fetch. The operation
> > should
> > > > > have
> > > > > > a
> > > > > > > >>>>>>>>>>>>> time-out
> > > > > > > >>>>>>>>>>>>> and
> > > > > > > >>>>>>>>>>>>> should be "reattemtable".
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Importance of logcompaction:
> > > > > > > >>>>>>>>>>>>> If a producer produces key A, to partiton 0, its
> > > > forever
> > > > > > > gonna
> > > > > > > >>>>>>>>>>>>> be
> > > > > > > >>>>>>>>>>>>> there,
> > > > > > > >>>>>>>>>>>>> unless it gets deleted. The record might sit in
> > there
> > > > for
> > > > > > > >>>>>>>>>>>>> years. A
> > > > > > > >>>>>>>>>>>>> new
> > > > > > > >>>>>>>>>>>>> producer started with the new partitions will
> fail
> > to
> > > > > > delete
> > > > > > > >>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>> record
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> in
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> the correct partition. Th record will be there
> > > forever
> > > > > and
> > > > > > > one
> > > > > > > >>>>>>>>>>>>> can
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> not
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> reliable bootstrap new consumers. I cannot see how
> > > > linear
> > > > > > > >>>>>>>>>>>> hashing
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> can
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> solve
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> this.
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> Regarding your skipping of userland copying:
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> 100%, copying the data across in userland is, as
> > far
> > > > as i
> > > > > > can
> > > > > > > >>>>>>>>>>>>> see,
> > > > > > > >>>>>>>>>>>>> only
> > > > > > > >>>>>>>>>>>>> a
> > > > > > > >>>>>>>>>>>>> usecase for log compacted topics. Even for
> > > > logcompaction
> > > > > +
> > > > > > > >>>>>>>>>>>>> retentions
> > > > > > > >>>>>>>>>>>>> it
> > > > > > > >>>>>>>>>>>>> should only be opt-in. Why did I bring it up? I
> > think
> > > > log
> > > > > > > >>>>>>>>>>>>> compaction
> > > > > > > >>>>>>>>>>>>> is
> > > > > > > >>>>>>>>>>>>> a
> > > > > > > >>>>>>>>>>>>> very important feature to really embrace kafka
> as a
> > > > "data
> > > > > > > >>>>>>>>>>>>> plattform".
> > > > > > > >>>>>>>>>>>>> The
> > > > > > > >>>>>>>>>>>>> point I also want to make is that copying data
> this
> > > way
> > > > > is
> > > > > > > >>>>>>>>>>>>> completely
> > > > > > > >>>>>>>>>>>>> inline with the kafka architecture. it only
> > consists
> > > of
> > > > > > > reading
> > > > > > > >>>>>>>>>>>>> and
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> writing
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> to topics.
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> I hope it clarifies more why I think we should aim
> > for
> > > > > more
> > > > > > > than
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>> current KIP. I fear that once the KIP is done not
> > > much
> > > > > more
> > > > > > > >>>>>>>>>>>>> effort
> > > > > > > >>>>>>>>>>>>> will
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> be
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> taken.
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Hey Jan,
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> In the current proposal, the consumer will be
> > blocked
> > > > on
> > > > > > > >>>>>>>>>>>>> waiting
> > > > > > > >>>>>>>>>>>>> for
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> other
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> consumers of the group to consume up to a given
> > > > offset.
> > > > > In
> > > > > > > >>>>>>>>>>>>>> most
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> cases,
> > > > > > > >>>>>>>>>>>>> all
> > > > > > > >>>>>>>>>>>>> consumers should be close to the LEO of the
> > > partitions
> > > > > when
> > > > > > > the
> > > > > > > >>>>>>>>>>>>> partition
> > > > > > > >>>>>>>>>>>>> expansion happens. Thus the time waiting should
> not
> > > be
> > > > > long
> > > > > > > >>>>>>>>>>>>> e.g.
> > > > > > > >>>>>>>>>>>>> on
> > > > > > > >>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> order of seconds. On the other hand, it may take
> a
> > > long
> > > > > > time
> > > > > > > to
> > > > > > > >>>>>>>>>>>>> wait
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> for
> > > > > > > >>>>>>>>>>>>>> the entire partition to be copied -- the amount
> of
> > > > time
> > > > > is
> > > > > > > >>>>>>>>>>>>>> proportional
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> the amount of existing data in the partition,
> > which
> > > > can
> > > > > > take
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> tens of
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> minutes. So the amount of time that we stop
> > consumers
> > > > may
> > > > > > not
> > > > > > > >>>>>>>>>>>>> be
> > > > > > > >>>>>>>>>>>>> on
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>> same order of magnitude.
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> If we can implement this suggestion without
> > copying
> > > > data
> > > > > > > over
> > > > > > > >>>>>>>>>>>>>> in
> > > > > > > >>>>>>>>>>>>>> purse
> > > > > > > >>>>>>>>>>>>>> userland, it will be much more valuable. Do you
> > have
> > > > > ideas
> > > > > > > on
> > > > > > > >>>>>>>>>>>>>> how
> > > > > > > >>>>>>>>>>>>>> this
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> can
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> be done?
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Not sure why the current KIP not help people who
> > > depend
> > > > > on
> > > > > > > log
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> compaction.
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> Could you elaborate more on this point?
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Thanks,
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> Dong
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan
> > > > > > > >>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
> > > > > > > >>>>>>>>>>>>>> com
> > > > > > > >>>>>>>>>>>>>> wrote:
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> Hi Dong,
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> I tried to focus on what the steps are one can
> > > > currently
> > > > > > > >>>>>>>>>>>>>> perform
> > > > > > > >>>>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> expand
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> or shrink a keyed topic while maintaining a top
> > > notch
> > > > > > > >>>>>>>>>>>>>>> semantics.
> > > > > > > >>>>>>>>>>>>>>> I can understand that there might be confusion
> > > about
> > > > > > > >>>>>>>>>>>>>>> "stopping
> > > > > > > >>>>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>>> consumer". It is exactly the same as proposed
> in
> > > the
> > > > > KIP.
> > > > > > > >>>>>>>>>>>>>>> there
> > > > > > > >>>>>>>>>>>>>>> needs
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> be
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> a time the producers agree on the new
> > partitioning.
> > > > The
> > > > > > > extra
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> semantics I
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> want to put in there is that we have a
> > possibility
> > > to
> > > > > > wait
> > > > > > > >>>>>>>>>>>>>>> until
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> all
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> existing data
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> is copied over into the new partitioning scheme.
> > > When
> > > > I
> > > > > > say
> > > > > > > >>>>>>>>>>>>> stopping
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> I
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> think more of having a memory barrier that
> > ensures
> > > > the
> > > > > > > >>>>>>>>>>>>>>> ordering. I
> > > > > > > >>>>>>>>>>>>>>> am
> > > > > > > >>>>>>>>>>>>>>> still
> > > > > > > >>>>>>>>>>>>>>> aming for latencies  on the scale of leader
> > > > failovers.
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> Consumers have to explicitly adapt the new
> > > > partitioning
> > > > > > > >>>>>>>>>>>>>>> scheme
> > > > > > > >>>>>>>>>>>>>>> in
> > > > > > > >>>>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>>> above scenario. The reason is that in these
> cases
> > > > where
> > > > > > you
> > > > > > > >>>>>>>>>>>>>>> are
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> dependent
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> on a particular partitioning scheme, you also
> > have
> > > > > other
> > > > > > > >>>>>>>>>>>>>>> topics
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> that
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> have
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> co-partition enforcements or the kind
> -frequently.
> > > > > > Therefore
> > > > > > > >>>>>>>>>>>>>> all
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> your
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> other
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> input topics might need to grow accordingly.
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> What I was suggesting was to streamline all
> these
> > > > > > > operations
> > > > > > > >>>>>>>>>>>>>>> as
> > > > > > > >>>>>>>>>>>>>>> best
> > > > > > > >>>>>>>>>>>>>>> as
> > > > > > > >>>>>>>>>>>>>>> possible to have "real" partition grow and
> > > shrinkage
> > > > > > going
> > > > > > > >>>>>>>>>>>>>>> on.
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> Migrating
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> the producers to a new partitioning scheme can
> be
> > > > much
> > > > > > more
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> streamlined
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> with proper broker support for this. Migrating
> > > consumer
> > > > > is
> > > > > > a
> > > > > > > >>>>>>>>>>>>> step
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> that
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> might be made completly unnecessary if - for
> > > example
> > > > > > > streams
> > > > > > > >>>>>>>>>>>>>>> -
> > > > > > > >>>>>>>>>>>>>>> takes
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> gcd as partitioning scheme instead of
> enforcing 1
> > > to
> > > > 1.
> > > > > > > >>>>>>>>>>>>>>> Connect
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> consumers
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> and other consumers should be fine anyways.
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> I hope this makes more clear where I was aiming
> > at.
> > > > The
> > > > > > rest
> > > > > > > >>>>>>>>>>>>> needs
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> be
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> figured out. The only danger i see is that when
> > we
> > > > are
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> introducing
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> this
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> feature as supposed in the KIP, it wont help any
> > > people
> > > > > > > >>>>>>>>>>>>> depending
> > > > > > > >>>>>>>>>>>>> on
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> log
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> compaction.
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> The other thing I wanted to mention is that I
> > > believe
> > > > > the
> > > > > > > >>>>>>>>>>>>> current
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> suggestion (without copying data over) can be
> > > > implemented
> > > > > > in
> > > > > > > >>>>>>>>>>>>>> pure
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> userland
> > > > > > > >>>>>>>>>>>>>>> with a custom partitioner and a small
> > feedbackloop
> > > > from
> > > > > > > >>>>>>>>>>>>>>> ProduceResponse
> > > > > > > >>>>>>>>>>>>>>> =>
> > > > > > > >>>>>>>>>>>>>>> Partitionier in coorporation with a change
> > > management
> > > > > > > system.
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> Best Jan
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> Hey Jan,
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> I am not sure if it is acceptable for producer
> to
> > > be
> > > > > > > stopped
> > > > > > > >>>>>>>>>>>>>>> for a
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> while,
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> particularly for online application which
> > requires
> > > > low
> > > > > > > >>>>>>>>>>>>>>>> latency. I
> > > > > > > >>>>>>>>>>>>>>>> am
> > > > > > > >>>>>>>>>>>>>>>> also
> > > > > > > >>>>>>>>>>>>>>>> not sure how consumers can switch to a new
> > topic.
> > > > Does
> > > > > > > user
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> application
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> needs to explicitly specify a different topic
> > for
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> producer/consumer
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> subscribe to? It will be helpful for discussion
> if
> > > you
> > > > > can
> > > > > > > >>>>>>>>>>>>> provide
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> more
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> detail on the interface change for this
> solution.
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> Thanks,
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> Dong
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
> > > > > > > >>>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> com
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> wrote:
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> Hi,
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> just want to throw my though in. In general the
> > > > > > > functionality
> > > > > > > >>>>>>>>>>>>>> is
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> very
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> usefull, we should though not try to find the
> > > > > > architecture
> > > > > > > >>>>>>>>>>>>>>>> to
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> hard
> > > > > > > >>>>>>>>>>>>>>>>> while
> > > > > > > >>>>>>>>>>>>>>>>> implementing.
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> The manual steps would be to
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> create a new topic
> > > > > > > >>>>>>>>>>>>>>>>> the mirrormake from the new old topic to the
> > new
> > > > > topic
> > > > > > > >>>>>>>>>>>>>>>>> wait for mirror making to catch up.
> > > > > > > >>>>>>>>>>>>>>>>> then put the consumers onto the new topic
> > > > > > > >>>>>>>>>>>>>>>>>            (having mirrormaker spit out a
> > mapping
> > > > > from
> > > > > > > old
> > > > > > > >>>>>>>>>>>>>>>>> offsets to
> > > > > > > >>>>>>>>>>>>>>>>> new
> > > > > > > >>>>>>>>>>>>>>>>> offsets:
> > > > > > > >>>>>>>>>>>>>>>>>                if topic is increased by
> factor
> > X
> > > > > there
> > > > > > is
> > > > > > > >>>>>>>>>>>>>>>>> gonna
> > > > > > > >>>>>>>>>>>>>>>>> be a
> > > > > > > >>>>>>>>>>>>>>>>> clean
> > > > > > > >>>>>>>>>>>>>>>>> mapping from 1 offset in the old topic to X
> > > offsets
> > > > > in
> > > > > > > the
> > > > > > > >>>>>>>>>>>>>>>>> new
> > > > > > > >>>>>>>>>>>>>>>>> topic,
> > > > > > > >>>>>>>>>>>>>>>>>                if there is no factor then
> there
> > > is
> > > > no
> > > > > > > >>>>>>>>>>>>>>>>> chance to
> > > > > > > >>>>>>>>>>>>>>>>> generate a
> > > > > > > >>>>>>>>>>>>>>>>> mapping that can be reasonable used for
> > > continuing)
> > > > > > > >>>>>>>>>>>>>>>>>            make consumers stop at appropriate
> > > > points
> > > > > > and
> > > > > > > >>>>>>>>>>>>>>>>> continue
> > > > > > > >>>>>>>>>>>>>>>>> consumption
> > > > > > > >>>>>>>>>>>>>>>>> with offsets from the mapping.
> > > > > > > >>>>>>>>>>>>>>>>> have the producers stop for a minimal time.
> > > > > > > >>>>>>>>>>>>>>>>> wait for mirrormaker to finish
> > > > > > > >>>>>>>>>>>>>>>>> let producer produce with the new metadata.
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> Instead of implementing the approach suggest
> in
> > > the
> > > > > KIP
> > > > > > > >>>>>>>>>>>>>>>>> which
> > > > > > > >>>>>>>>>>>>>>>>> will
> > > > > > > >>>>>>>>>>>>>>>>> leave
> > > > > > > >>>>>>>>>>>>>>>>> log compacted topic completely crumbled and
> > > > unusable.
> > > > > > > >>>>>>>>>>>>>>>>> I would much rather try to build
> infrastructure
> > > to
> > > > > > > support
> > > > > > > >>>>>>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>>>>> mentioned
> > > > > > > >>>>>>>>>>>>>>>>> above operations more smoothly.
> > > > > > > >>>>>>>>>>>>>>>>> Especially having producers stop and use
> > another
> > > > > topic
> > > > > > is
> > > > > > > >>>>>>>>>>>>>>>>> difficult
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> and
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> it would be nice if one can trigger "invalid
> > > > > metadata"
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> exceptions
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> for
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> them
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> and
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> if one could give topics aliases so that their
> > > > produces
> > > > > > > with
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>>>>> old
> > > > > > > >>>>>>>>>>>>>>>>> topic
> > > > > > > >>>>>>>>>>>>>>>>> will arrive in the new topic.
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> The downsides are obvious I guess ( having
> the
> > > same
> > > > > > data
> > > > > > > >>>>>>>>>>>>>>>>> twice
> > > > > > > >>>>>>>>>>>>>>>>> for
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> transition period, but kafka tends to scale
> > well
> > > > with
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> datasize).
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> So
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> its a
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> nicer fit into the architecture.
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> I further want to argument that the
> functionality
> > > by
> > > > > the
> > > > > > > KIP
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> can
> > > > > > > >>>>>>>>>>>>>>>>> completely be implementing in "userland"
> with a
> > > > > custom
> > > > > > > >>>>>>>>>>>>>>>>> partitioner
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> that
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> handles the transition as needed. I would
> > > > appreciate
> > > > > if
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> someone
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>> could
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>> point
> > > > > > > >>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>> out what a custom partitioner couldn't handle in
> > this
> > > > > case?
> > > > > > > >>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> With the above approach, shrinking a topic
> > becomes
> > > > the
> > > > > > same
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> steps.
> > > > > > > >>>>>>>>>>>>>>>>> Without
> > > > > > > >>>>>>>>>>>>>>>>> loosing keys in the discontinued partitions.
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> Would love to hear what everyone thinks.
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> Best Jan
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> I have created KIP-253: Support in-order
> > message
> > > > > > delivery
> > > > > > > >>>>>>>>>>>>>>>>> with
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> partition
> > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> expansion. See
> > > > > > > >>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confl
> > > > > > > >>>>>>>>>>>>>>>>>> uence/display/KAFKA/KIP-253%
> > > > > > > >>>>>>>>>>>>>>>>>> 3A+Support+in-order+message+de
> > > > > > > >>>>>>>>>>>>>>>>>> livery+with+partition+expansio
> > > > > > > >>>>>>>>>>>>>>>>>> n
> > > > > > > >>>>>>>>>>>>>>>>>> .
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>> This KIP provides a way to allow messages of
> > the
> > > > > same
> > > > > > > key
> > > > > > > >>>>>>>>>>>>>>>>>> from
> > > > > > > >>>>>>>>>>>>>>>>>> the
> > > > > > > >>>>>>>>>>>>>>>>>> same
> > > > > > > >>>>>>>>>>>>>>>>>> producer to be consumed in the same order
> they
> > > are
> > > > > > > >>>>>>>>>>>>>>>>>> produced
> > > > > > > >>>>>>>>>>>>>>>>>> even
> > > > > > > >>>>>>>>>>>>>>>>>> if
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>> we
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>> expand partition of the topic.
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>> Thanks,
> > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>> Dong
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > >>>>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to