proposal). It's similar to my statement in my email to Jun that in
principle KStreams doesn't *need* backfill, we only need it if we want
to
employ partition expansion.
I reckon that the main motivation for backfill is to support KStreams
use
cases and also any other use cases involving stateful consumers.
Thanks for your response, and congrats again!
-John
On Wed, Mar 28, 2018 at 1:34 AM, Dong Lin <lindon...@gmail.com> wrote:
Hey John,
Great! Thanks for all the comment. It seems that we agree that the
current
KIP is in good shape for core Kafka. IMO, what we have been discussing
in
the recent email exchanges is mostly about the second step, i.e. how to
address problem for the stream use-case (or stateful processing in
general).
I will comment inline.
On Tue, Mar 27, 2018 at 4:38 PM, John Roesler <j...@confluent.io>
wrote:
Thanks for the response, Dong.
Here are my answers to your questions:
- "Asking producers and consumers, or even two different producers,
to
share code like the partition function is a pretty huge ask. What
if
they
are using different languages?". It seems that today we already
require
different producer's to use the same hash function -- otherwise
messages
with the same key will go to different partitions of the same topic
which
may cause problem for downstream consumption. So not sure if it
adds
any
more constraint by assuming consumers know the hash function of
producer.
Could you explain more why user would want to use a cusmtom
partition
function? Maybe we can check if this is something that can be
supported
in
the default Kafka hash function. Also, can you explain more why it
is
difficuilt to implement the same hash function in different
languages?
Sorry, I meant two different producers as in producers to two
different
topics. This was in response to the suggestion that we already
require
coordination among producers to different topics in order to achieve
co-partitioning. I was saying that we do not (and should not).
It is probably common for producers of different team to produce
message
to
the same topic. In order to ensure that messages with the same key go
to
same partition, we need producers of different team to share the same
partition algorithm, which by definition requires coordination among
producers of different teams in an organization. Even for producers of
different topics, it may be common to require producers to use the same
partition algorithm in order to join two topics for stream processing.
Does
this make it reasonable to say we already require coordination across
producers?
By design, consumers are currently ignorant of the partitioning
scheme.
It
suffices to trust that the producer has partitioned the topic by key,
if
they claim to have done so. If you don't trust that, or even if you
just
need some other partitioning scheme, then you must re-partition it
yourself. Nothing we're discussing can or should change that. The
value
of
backfill is that it preserves the ability for consumers to avoid
re-partitioning before consuming, in the case where they don't need
to
today.
Regarding shared "hash functions", note that it's a bit inaccurate to
talk
about the "hash function" of the producer. Properly speaking, the
producer
has only a "partition function". We do not know that it is a hash.
The
producer can use any method at their disposal to assign a partition
to
a
record. The partition function obviously may we written in any
programming
language, so in general it's not something that can be shared around
without a formal spec or the ability to execute arbitrary executables
in
arbitrary runtime environments.
Yeah it is probably better to say partition algorithm. I guess it
should
not be difficult to implement same partition algorithms in different
languages, right? Yes we would need a formal specification of the
default
partition algorithm in the producer. I think that can be documented as
part
of the producer interface.
Why would a producer want a custom partition function? I don't
know...
why
did we design the interface so that our users can provide one? In
general,
such systems provide custom partitioners because some data sets may
be
unbalanced under the default or because they can provide some
interesting
functionality built on top of the partitioning scheme, etc. Having
provided
this ability, I don't know why we would remove it.
Yeah it is reasonable to assume that there was reason to support
custom
partition function in producer. On the other hand it may also be
reasonable
to revisit this interface and discuss whether we actually need to
support
custom partition function. If we don't have a good reason, we can
choose
not to support custom partition function in this KIP in a backward
compatible manner, i.e. user can still use custom partition function
but
they would not get the benefit of in-order delivery when there is
partition
expansion. What do you think?
- Besides the assumption that consumer needs to share the hash
function
of
producer, is there other organization overhead of the proposal in
the
current KIP?
It wasn't clear to me that KIP-253 currently required the producer
and
consumer to share the partition function, or in fact that it had a
hard
requirement to abandon the general partition function and use a
linear
hash
function instead.
In my reading, there is a requirement to track the metadata about
what
partitions split into what other partitions during an expansion
operation.
If the partition function is linear, this is easy. If not, you can
always
just record that all old partitions split into all new partitions.
This
has
the effect of forcing all consumers to wait until the old epoch is
completely consumed before starting on the new epoch. But this may
be a
reasonable tradeoff, and it doesn't otherwise alter your design.
You only mention the consumer needing to know that the partition
function
is linear, not what the actual function is, so I don't think your
design
actually calls for sharing the function. Plus, really all the
consumer
needs is the metadata about what old-epoch partitions to wait for
before
consuming a new-epoch partition. This information is directly
captured
in
metadata, so I don't think it actually even cares whether the
partition
function is linear or not.
You are right that the current KIP does not mention it. My comment
related
to the partition function coordination was related to support the
stream-use case which we have been discussing so far.
So, no, I really think KIP-253 is in good shape. I was really more
talking
about the part of this thread that's outside of KIP-253's scope,
namely,
creating the possibility of backfilling partitions after expansion.
Great! Can you also confirm that the main motivation for backfilling
partitions after expansion is to support the stream use-case?
- Currently producer can forget about the message that has been
acknowledged by the broker. Thus the producer probably does not
know
most
of the exiting messages in topic, including those messages produced
by
other producers. We can have the owner of the producer to
split+backfill.
In my opion it will be a new program that wraps around the existing
producer and consumer classes.
This sounds fine by me!
Really, I was just emphasizing that the part of the organization that
produces a topic shouldn't have to export their partition function to
the
part(s) of the organization (or other organizations) that consume the
topic. Whether the backfill operation goes into the Producer
interface
is
secondary, I think.
- Regarding point 5. The argument is in favor of the split+backfill
but
for
changelog topic. And it intends to address the problem for stream
use-case
in general. In this KIP we will provide interface (i.e.
PartitionKeyRebalanceListener in the KIP) to be used by sream
use-case
and
the goal is that user can flush/re-consume the state as part of the
interface implementation regardless of whether there is change log
topic.
Maybe you are suggesting that the main reason to do split+backfill
of
input
topic is to support log compacted topics? You mentioned in Point 1
that
log
compacted topics is out of the scope of this KIP. Maybe I could
understand
your position better. Regarding Jan's proposal to split partitions
with
backfill, do you think this should replace the proposal in the
existing
KIP, or do you think this is something that we should do in
addition
to
the
existing KIP?
I think that interface is a good/necessary component of KIP-253.
I personally (FWIW) feel that KIP-253 is appropriately scoped, but I
do
think its utility will be limited unless there is a later KIP
offering
backfill. But, maybe unlike Jan, I think it makes sense to try and
tackle
the ordering problem independently of backfill, so I'm in support of
the
current KIP.
- Regarding point 6. I guess we can agree that it is better not to
have
the
performance overhread of copying the input data. Before we discuss
more
on
whether the performance overhead is acceptable or not, I am trying
to
figure out what is the benefit of introducing this overhread. You
mentioned
that the benefit is the loose organizational coupling. By
"organizational
coupling", are you referring to the requirement that consumer needs
to
know
the hash function of producer? If so, maybe we can discuss the
use-case
of
custom partiton function and see whether we can find a way to
support
such
use-case without having to copy the input data.
I'm not too sure about what an "input" is in this sense, since we are
just
talking about topics. Actually the point I was making there is that
AKAICT
the performance overhead of a backfill is less than any other option,
assuming you split partitions rarely.
By "input" I was referring to source Kafka topic of a stream
processing
job.
Separately, yes, "organizational coupling" increases if producers and
consumers have to share code, such as the partition function. This
would
not be the case if producers could only pick from a menu of a few
well-known partition functions, but I think this is a poor tradeoff.
Maybe we can revisit the custom partition function and see whether we
actually need it? Otherwise, I am concerned that every user will pay
the
overhead of data movement to support something that was not really
needed
for most users.
To me, this is two strong arguments in favor of backfill being less
expensive than no backfill, but again, I think that particular debate
comes
after KIP-253, so I don't want to create the impression of opposition
to
your proposal.
Finally, to respond to a new email I just noticed:
BTW, here is my understanding of the scope of this KIP. We want to
allow
consumers to always consume messages with the same key from the
same
producer in the order they are produced. And we need to provide a
way
for
stream use-case to be able to flush/load state when messages with
the
same
key are migrated between consumers. In addition to ensuring that
this
goal
is correctly supported, we should do our best to keep the
performance
and
organization overhead of this KIP as low as possible.
I think we're on the same page there! In fact, I would generalize a
little
more and say that the mechanism you've designed provides *all
consumers*
the ability "to flush/load state when messages with the same key are
migrated between consumers", not just Streams.
Thanks for all the comment!
Thanks for the discussion,
-John
On Tue, Mar 27, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>
wrote:
Hey John,
Thanks much for the detailed comments. Here are my thoughts:
- The need to delete messages from log compacted topics is mainly
for
performance (e.g. storage space) optimization than for correctness
for
this
KIP. I agree that we probably don't need to focus on this in our
discussion
since it is mostly for performance optimization.
- "Asking producers and consumers, or even two different producers,
to
share code like the partition function is a pretty huge ask. What
if
they
are using different languages?". It seems that today we already
require
different producer's to use the same hash function -- otherwise
messages
with the same key will go to different partitions of the same topic
which
may cause problem for downstream consumption. So not sure if it
adds
any
more constraint by assuming consumers know the hash function of
producer.
Could you explain more why user would want to use a cusmtom
partition
function? Maybe we can check if this is something that can be
supported
in
the default Kafka hash function. Also, can you explain more why it
is
difficuilt to implement the same hash function in different
languages?
- Besides the assumption that consumer needs to share the hash
function
of
producer, is there other organization overhead of the proposal in
the
current KIP?
- Currently producer can forget about the message that has been
acknowledged by the broker. Thus the producer probably does not
know
most
of the exiting messages in topic, including those messages produced
by
other producers. We can have the owner of the producer to
split+backfill.
In my opion it will be a new program that wraps around the existing
producer and consumer classes.
- Regarding point 5. The argument is in favor of the split+backfill
but
for
changelog topic. And it intends to address the problem for stream
use-case
in general. In this KIP we will provide interface (i.e.
PartitionKeyRebalanceListener in the KIP) to be used by sream
use-case
and
the goal is that user can flush/re-consume the state as part of the
interface implementation regardless of whether there is change log
topic.
Maybe you are suggesting that the main reason to do split+backfill
of
input
topic is to support log compacted topics? You mentioned in Point 1
that
log
compacted topics is out of the scope of this KIP. Maybe I could
understand
your position better. Regarding Jan's proposal to split partitions
with
backfill, do you think this should replace the proposal in the
existing
KIP, or do you think this is something that we should do in
addition
to
the
existing KIP?
- Regarding point 6. I guess we can agree that it is better not to
have
the
performance overhread of copying the input data. Before we discuss
more
on
whether the performance overhead is acceptable or not, I am trying
to
figure out what is the benefit of introducing this overhread. You
mentioned
that the benefit is the loose organizational coupling. By
"organizational
coupling", are you referring to the requirement that consumer needs
to
know
the hash function of producer? If so, maybe we can discuss the
use-case
of
custom partiton function and see whether we can find a way to
support
such
use-case without having to copy the input data.
Thanks,
Dong
On Tue, Mar 27, 2018 at 11:34 AM, John Roesler <j...@confluent.io>
wrote:
Hey Dong and Jun,
Thanks for the thoughtful responses. If you don't mind, I'll mix
my
replies
together to try for a coherent response. I'm not too familiar
with
mailing-list etiquette, though.
I'm going to keep numbering my points because it makes it easy
for
you
all
to respond.
Point 1:
As I read it, KIP-253 is *just* about properly fencing the
producers
and
consumers so that you preserve the correct ordering of records
during
partition expansion. This is clearly necessary regardless of
anything
else
we discuss. I think this whole discussion about backfill,
consumers,
streams, etc., is beyond the scope of KIP-253. But it would be
cumbersome
to start a new thread at this point.
I had missed KIP-253's Proposed Change #9 among all the
details...
I
think
this is a nice addition to the proposal. One thought is that it's
actually
irrelevant whether the hash function is linear. This is simply an
algorithm
for moving a key from one partition to another, so the type of
hash
function need not be a precondition. In fact, it also doesn't
matter
whether the topic is compacted or not, the algorithm works
regardless.
I think this is a good algorithm to keep in mind, as it might
solve a
variety of problems, but it does have a downside: that the
producer
won't
know whether or not K1 was actually in P1, it just knows that K1
was
in
P1's keyspace before the new epoch. Therefore, it will have to
pessimistically send (K1,null) to P1 just in case. But the next
time
K1
comes along, the producer *also* won't remember that it already
retracted
K1 from P1, so it will have to send (K1,null) *again*. By
extension,
every
time the producer sends to P2, it will also have to send a
tombstone
to
P1,
which is a pretty big burden. To make the situation worse, if
there
is
a
second split, say P2 becomes P2 and P3, then any key Kx belonging
to
P3
will also have to be retracted from P2 *and* P1, since the
producer
can't
know whether Kx had been last written to P2 or P1. Over a long
period
of
time, this clearly becomes a issue, as the producer must send an
arbitrary
number of retractions along with every update.
In contrast, the proposed backfill operation has an end, and
after
it
ends,
everyone can afford to forget that there ever was a different
partition
layout.
Really, though, figuring out how to split compacted topics is
beyond
the
scope of KIP-253, so I'm not sure #9 really even needs to be in
this
KIP...
We do need in-order delivery during partition expansion. It would
be
fine
by me to say that you *cannot* expand partitions of a
log-compacted
topic
and call it a day. I think it would be better to tackle that in
another
KIP.
Point 2:
Regarding whether the consumer re-shuffles its inputs, this is
always
on
the table; any consumer who wants to re-shuffle its input is free
to
do
so.
But this is currently not required. It's just that the current
high-level
story with Kafka encourages the use of partitions as a unit of
concurrency.
As long as consumers are single-threaded, they can happily
consume
a
single
partition without concurrency control of any kind. This is a key
aspect
to
this system that lets folks design high-throughput systems on top
of
it
surprisingly easily. If all consumers were instead
encouraged/required
to
implement a repartition of their own, then the consumer becomes
significantly more complex, requiring either the consumer to
first
produce
to its own intermediate repartition topic or to ensure that
consumer
threads have a reliable, high-bandwith channel of communication
with
every
other consumer thread.
Either of those tradeoffs may be reasonable for a particular user
of
Kafka,
but I don't know if we're in a position to say that they are
reasonable
for
*every* user of Kafka.
Point 3:
Regarding Jun's point about this use case, "(3) stateful and
maintaining
the
states in a local store", I agree that they may use a framework
*like*
Kafka Streams, but that is not the same as using Kafka Streams.
This
is
why
I think it's better to solve it in Core: because it is then
solved
for
KStreams and also for everything else that facilitates local
state
maintenance. To me, Streams is a member of the category of
"stream
processing frameworks", which is itself a subcategory of "things
requiring
local state maintenence". I'm not sure if it makes sense to