Hey,
just want to throw in my question from the longer email in the other
Thread here.
How will the bloom filter help a new consumer to decide to apply the key
or not?
Why can we afford having a topic where its apparently not possible to
start a new application on?
I think this is an overall flaw of the discussed idea here. Not playing
attention to the overall architecture.
Best Jan
On 12.03.2018 00:09, Dong Lin wrote:
Hey Jason,
This is a good solution on the server side for log compacted topic.
Thinking about this more, there maybe another probably simpler solution on
the client side for log compacted topics. This solution is now specified in
the section "Changes in how producer produced keyed messages to log
compacted topics" of the KIP. The client-side solution seems simpler with
less performance overhead than the server-side solution. What do you think?
Thanks,
Dong
On Sat, Mar 10, 2018 at 10:55 AM, Jason Gustafson <ja...@confluent.io>
wrote:
Hey Dong,
I was thinking a bit about log compaction after a partition split. I think
the best you could hope for in terms of efficiency is that the network
overhead would be proportional to the number of remapped keys that need
cleaning. One thought I had which gets close to this is to propagate a
bloom filter covering the keys in the log prior to the split to all
partitions that might contain some of the remapped keys. As a simple
example, suppose we have a single partition which is split into two at
offset N. Say that broker 0 owns partition 0 and broker 1 owns partition 1.
Some subset of the keys prior to N will move to partition 1 and the rest
will remain on partition 0. The idea is something like this:
1. Every time we clean partition 0 on broker 0, we compute a bloom filter
for the keys in the log prior to offset N.
2. The bloom filter is propagated to broker 1 and cached.
3. The next time broker 1 cleans the log, it uses the bloom filter to
collect a set of possible matches.
4. When the cleaning completes, the matching keys are propagated to broker
0, where they are cached until the next cleaning.
5. The next time broker 0 cleans the log, it can remove all keys that have
been cached from the region prior to the split.
This incremental approach allows us to tradeoff cleaning latency to reduce
network traffic and memory overhead. A few points:
- The accuracy of bloom filters decreases as you add more elements to them.
We would probably choose to propagate the bloom filter for a subset of the
keys once it had reached a certain capacity to avoid having too many false
positives.
- We can limit the number of bloom filter matches that we will collect and
return in a single round of cleaning. These keys have to be cached in the
broker for a little while (until the next cleaning), so this lets us keep
the memory usage bounded.
There is probably some room for cleverness as well to avoid repeating work.
For example, the broker matching the bloom filter can also send the offset
of the last key that was matched against the filter. The next time we send
a bloom filter for a certain range of keys, we can send the starting offset
for matching. It's kind of like our "dirty offset" notion.
Needs a bit of investigation to work out the details (e.g. handling
multiple splits), but seems like it could work. What do you think?
-Jason
On Fri, Mar 9, 2018 at 1:23 PM, Matthias J. Sax <matth...@confluent.io>
wrote:
Thanks for your comment Clemens. It make sense what you are saying.
However, your described pattern is to split partitions and use linear
hashing to avoid random key distribution. But this is what Jan thinks we
should not do...
Also, I just picked an example with 2 -> 3 partitions, but if you don't
use linear hashing I think the same issue occurs if you double the
number of partitions.
I am in favor of using linear hashing. Still think, it is also useful to
split single partitions, too, in case load is not balanced and some
partitions are hot spots while others are "idle".
-Matthias
On 3/9/18 5:41 AM, Clemens Valiente wrote:
I think it's fair to assume that topics will always be increased by an
integer factor - e.g. from 2 partitions to 4 partitions. Then the mapping
is much easier.
Why anyone would increase partitions by lass than x2 is a mystery to
me.
If your two partitions cannot handle the load, then with three partitions
each one will still get 67% of that load which is still way too
dangerous.
So in your case we go from
part1: A B C D
part2: E F G H
to
part1: A C
part2: B D
part3: E F
part4: G H
________________________________
From: Matthias J. Sax <matth...@confluent.io>
Sent: 09 March 2018 07:53
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-253: Support in-order message delivery with
partition expansion
@Jan: You suggest to copy the data from one topic to a new topic, and
provide an "offset mapping" from the old to the new topic for the
consumers. I don't quite understand how this would work.
Let's say there are 2 partitions in the original topic and 3 partitions
in the new topic. If we assume that we don't use linear hashing as you
suggest, there is no guarantee how data will be distributed in the new
topic and also no guarantee about ordering of records in the new topic.
Example (I hope I got it right -- please correct me if it's wrong)
A B C D
E F G H
could be copied to:
A C H
B E F
D G
If the consumer was at offset 1 and 2 in the first topic how would the
mapping be computed? We need to enures that B C D as well as G H are
read after the switch. Thus, offset would need to be 1 0 0. I am not
sure how this would be computed?
Furthermore, I want to point out that the new offsets would imply that
E
is consumed a second time by the consumer. E and F were consumed
originally, but E is copied after B that was not yet consumed.
Or is there a way that we can ensure that this "flip" does never happen
while we copy the data?
-Matthias
On 3/8/18 10:32 PM, Matthias J. Sax wrote:
As I just mentioned joins:
For Kafka Streams it might also be required to change the partition
count for multiple topics in a coordinated way that allows to maintain
the co-partitioning property that Kafka Streams uses to computed
joins.
Any thoughts how this could be handled?
-Matthias
On 3/8/18 10:08 PM, Matthias J. Sax wrote:
Jun,
There is one more case: non-windowed aggregations. For windowed
aggregation, the changelog topic will be compact+delete. However, for
non-windowed aggregation the policy is compact only.
Even if we assume that windowed aggregations are dominant and
non-windowed aggregation are used rarely, it seems to be bad to not
support the feature is a non-windowed aggregation is used. Also,
non-windowed aggregation volume depends on input-stream volume that
might be high.
Furthermore, we support stream-table join and this requires that the
stream and the table are co-partitioned. Thus, even if the table
would
have lower volume but the stream must be scaled out, we also need to
scale out the table to preserve co-partitioning.
-Matthias
On 3/8/18 6:44 PM, Jun Rao wrote:
Hi, Matthis,
My understanding is that in KStream, the only case when a changelog
topic
needs to be compacted is when the corresponding input is a KTable.
In
all
other cases, the changelog topics are of compacted + deletion. So,
if
most
KTables are not high volume, there may not be a need to expand its
partitions and therefore the partitions of the corresponding
changelog
topic.
Thanks,
Jun
On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <
matth...@confluent.io>
wrote:
Jun,
thanks for your comment. This should actually work for Streams,
because
we don't rely on producer "hashing" but specify the partition
number
explicitly on send().
About not allowing to change the number of partition for changelog
topics: for Streams, this seems to imply that we need to create a
second
changelog topic for each store with the new partition count.
However, it
would be unclear when/if we can delete the old topic. Thus, it
moves
the
"problem" into the application layer. It's hard to judge for me atm
what
the impact would be, but it's something we should pay attention to.
-Matthias
On 3/6/18 3:45 PM, Jun Rao wrote:
Hi, Mattias,
Regarding your comment "If it would be time-delay based, it might
be
problematic
for Kafka Streams: if we get the information that the new input
partitions
are available for producing, we need to enable the new changelog
partitions
for producing, too. If those would not be available yet, because
the
time-delay did not trigger yet, it would be problematic to avoid
crashing.", could you just enable the changelog topic to write to
its new
partitions immediately? The input topic can be configured with a
delay
in
writing to the new partitions. Initially, there won't be new data
produced
into the newly added partitions in the input topic. However, we
could
prebuild the state for the new input partition and write the state
changes
to the corresponding new partitions in the changelog topic.
Hi, Jan,
For a compacted topic, garbage collecting the old keys in the
existing
partitions after partition expansion can be tricky as your pointed
out. A
few options here. (a) Let brokers exchange keys across brokers
during
compaction. This will add complexity on the broker side. (b) Build
an
external tool that scans the compacted topic and drop the prefix
of
a
partition if all records in the prefix are removable. The admin
can
then
run this tool when the unneeded space needs to be reclaimed. (c)
Don't
support partition change in a compacted topic. This might be ok
since
most
compacted topics are not high volume.
Thanks,
Jun
On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com>
wrote:
Hi everyone,
Thanks for all the comments! It appears that everyone prefers
linear
hashing because it reduces the amount of state that needs to be
moved
between consumers (for stream processing). The KIP has been
updated to
use
linear hashing.
Regarding the migration endeavor: it seems that migrating
producer
library
to use linear hashing should be pretty straightforward without
much operational endeavor. If we don't upgrade client library to
use
this
KIP, we can not support in-order delivery after partition is
changed
anyway. Suppose we upgrade client library to use this KIP, if
partition
number is not changed, the key -> partition mapping will be
exactly the
same as it is now because it is still determined using
murmur_hash(key)
%
original_partition_num. In other words, this change is backward
compatible.
Regarding the load distribution: if we use linear hashing, the
load may
be
unevenly distributed because those partitions which are not split
may
receive twice as much traffic as other partitions that are split.
This
issue can be mitigated by creating topic with partitions that are
several
times the number of consumers. And there will be no imbalance if
the
partition number is always doubled. So this imbalance seems
acceptable.
Regarding storing the partition strategy as per-topic config: It
seems
not
necessary since we can still use murmur_hash as the default hash
function
and additionally apply the linear hashing algorithm if the
partition
number
has increased. Not sure if there is any use-case for producer to
use a
different hash function. Jason, can you check if there is some
use-case
that I missed for using the per-topic partition strategy?
Regarding how to reduce latency (due to state store/load) in
stream
processing consumer when partition number changes: I need to read
the
Kafka
Stream code to understand how Kafka Stream currently migrate
state
between
consumers when the application is added/removed for a given job.
I
will
reply after I finish reading the documentation and code.
Thanks,
Dong
On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
ja...@confluent.io>
wrote:
Great discussion. I think I'm wondering whether we can continue
to
leave
Kafka agnostic to the partitioning strategy. The challenge is
communicating
the partitioning logic from producers to consumers so that the
dependencies
between each epoch can be determined. For the sake of
discussion,
imagine
you did something like the following:
1. The name (and perhaps version) of a partitioning strategy is
stored
in
topic configuration when a topic is created.
2. The producer looks up the partitioning strategy before
writing
to a
topic and includes it in the produce request (for fencing). If
it
doesn't
have an implementation for the configured strategy, it fails.
3. The consumer also looks up the partitioning strategy and uses
it to
determine dependencies when reading a new epoch. It could either
fail
or
make the most conservative dependency assumptions if it doesn't
know
how
to
implement the partitioning strategy. For the consumer, the new
interface
might look something like this:
// Return the partition dependencies following an epoch bump
Map<Integer, List<Integer>> dependencies(int
numPartitionsBeforeEpochBump,
int numPartitionsAfterEpochBump)
The unordered case then is just a particular implementation
which
never
has
any epoch dependencies. To implement this, we would need some
way
for
the
consumer to find out how many partitions there were in each
epoch, but
maybe that's not too unreasonable.
Thanks,
Jason
On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
jan.filip...@trivago.com
wrote:
Hi Dong
thank you very much for your questions.
regarding the time spend copying data across:
It is correct that copying data from a topic with one partition
mapping
to
a topic with a different partition mapping takes way longer
than
we
can
stop producers. Tens of minutes is a very optimistic estimate
here.
Many
people can not afford copy full steam and therefore will have
some
rate
limiting in place, this can bump the timespan into the day's.
The good
part
is that the vast majority of the data can be copied while the
producers
are
still going. One can then, piggyback the consumers ontop of
this
timeframe,
by the method mentioned (provide them an mapping from their old
offsets
to
new offsets in their repartitioned topics. In that way we
separate
migration of consumers from migration of producers (decoupling
these
is
what kafka is strongest at). The time to actually swap over the
producers
should be kept minimal by ensuring that when a swap attempt is
started
the
consumer copying over should be very close to the log end and
is
expected
to finish within the next fetch. The operation should have a
time-out
and
should be "reattemtable".
Importance of logcompaction:
If a producer produces key A, to partiton 0, its forever gonna
be
there,
unless it gets deleted. The record might sit in there for
years.
A new
producer started with the new partitions will fail to delete
the
record
in
the correct partition. Th record will be there forever and one
can not
reliable bootstrap new consumers. I cannot see how linear
hashing can
solve
this.
Regarding your skipping of userland copying:
100%, copying the data across in userland is, as far as i can
see,
only a
usecase for log compacted topics. Even for logcompaction +
retentions
it
should only be opt-in. Why did I bring it up? I think log
compaction
is a
very important feature to really embrace kafka as a "data
plattform".
The
point I also want to make is that copying data this way is
completely
inline with the kafka architecture. it only consists of reading
and
writing
to topics.
I hope it clarifies more why I think we should aim for more
than
the
current KIP. I fear that once the KIP is done not much more
effort
will
be
taken.
On 04.03.2018 02:28, Dong Lin wrote:
Hey Jan,
In the current proposal, the consumer will be blocked on
waiting for
other
consumers of the group to consume up to a given offset. In
most
cases,
all
consumers should be close to the LEO of the partitions when
the
partition
expansion happens. Thus the time waiting should not be long
e.g. on
the
order of seconds. On the other hand, it may take a long time
to
wait
for
the entire partition to be copied -- the amount of time is
proportional
to
the amount of existing data in the partition, which can take
tens of
minutes. So the amount of time that we stop consumers may not
be on
the
same order of magnitude.
If we can implement this suggestion without copying data over
in
purse
userland, it will be much more valuable. Do you have ideas on
how
this
can
be done?
Not sure why the current KIP not help people who depend on log
compaction.
Could you elaborate more on this point?
Thanks,
Dong
On Wed, Feb 28, 2018 at 10:55 PM, Jan
Filipiak<Jan.Filipiak@trivago.
com
wrote:
Hi Dong,
I tried to focus on what the steps are one can currently
perform to
expand
or shrink a keyed topic while maintaining a top notch
semantics.
I can understand that there might be confusion about
"stopping
the
consumer". It is exactly the same as proposed in the KIP.
there
needs
to
be
a time the producers agree on the new partitioning. The extra
semantics I
want to put in there is that we have a possibility to wait
until all
the
existing data
is copied over into the new partitioning scheme. When I say
stopping
I
think more of having a memory barrier that ensures the
ordering. I
am
still
aming for latencies on the scale of leader failovers.
Consumers have to explicitly adapt the new partitioning
scheme
in
the
above scenario. The reason is that in these cases where you
are
dependent
on a particular partitioning scheme, you also have other
topics that
have
co-partition enforcements or the kind -frequently. Therefore
all
your
other
input topics might need to grow accordingly.
What I was suggesting was to streamline all these operations
as best
as
possible to have "real" partition grow and shrinkage going
on.
Migrating
the producers to a new partitioning scheme can be much more
streamlined
with proper broker support for this. Migrating consumer is a
step
that
might be made completly unnecessary if - for example streams
-
takes
the
gcd as partitioning scheme instead of enforcing 1 to 1.
Connect
consumers
and other consumers should be fine anyways.
I hope this makes more clear where I was aiming at. The rest
needs
to
be
figured out. The only danger i see is that when we are
introducing
this
feature as supposed in the KIP, it wont help any people
depending on
log
compaction.
The other thing I wanted to mention is that I believe the
current
suggestion (without copying data over) can be implemented in
pure
userland
with a custom partitioner and a small feedbackloop from
ProduceResponse
=>
Partitionier in coorporation with a change management system.
Best Jan
On 28.02.2018 07:13, Dong Lin wrote:
Hey Jan,
I am not sure if it is acceptable for producer to be stopped
for a
while,
particularly for online application which requires low
latency. I
am
also
not sure how consumers can switch to a new topic. Does user
application
needs to explicitly specify a different topic for
producer/consumer
to
subscribe to? It will be helpful for discussion if you can
provide
more
detail on the interface change for this solution.
Thanks,
Dong
On Mon, Feb 26, 2018 at 12:48 AM, Jan
Filipiak<Jan.Filipiak@trivago.
com
wrote:
Hi,
just want to throw my though in. In general the
functionality is
very
usefull, we should though not try to find the architecture
to hard
while
implementing.
The manual steps would be to
create a new topic
the mirrormake from the new old topic to the new topic
wait for mirror making to catch up.
then put the consumers onto the new topic
(having mirrormaker spit out a mapping from old
offsets to
new
offsets:
if topic is increased by factor X there is gonna
be a
clean
mapping from 1 offset in the old topic to X offsets in the
new
topic,
if there is no factor then there is no chance to
generate a
mapping that can be reasonable used for continuing)
make consumers stop at appropriate points and
continue
consumption
with offsets from the mapping.
have the producers stop for a minimal time.
wait for mirrormaker to finish
let producer produce with the new metadata.
Instead of implementing the approach suggest in the KIP
which will
leave
log compacted topic completely crumbled and unusable.
I would much rather try to build infrastructure to support
the
mentioned
above operations more smoothly.
Especially having producers stop and use another topic is
difficult
and
it would be nice if one can trigger "invalid metadata"
exceptions
for
them
and
if one could give topics aliases so that their produces
with
the
old
topic
will arrive in the new topic.
The downsides are obvious I guess ( having the same data
twice for
the
transition period, but kafka tends to scale well with
datasize).
So
its a
nicer fit into the architecture.
I further want to argument that the functionality by the
KIP
can
completely be implementing in "userland" with a custom
partitioner
that
handles the transition as needed. I would appreciate if
someone
could
point
out what a custom partitioner couldn't handle in this case?
With the above approach, shrinking a topic becomes the same
steps.
Without
loosing keys in the discontinued partitions.
Would love to hear what everyone thinks.
Best Jan
On 11.02.2018 00:35, Dong Lin wrote:
Hi all,
I have created KIP-253: Support in-order message delivery
with
partition
expansion. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-
253%
3A+Support+in-order+message+delivery+with+partition+
expansion
.
This KIP provides a way to allow messages of the same key
from
the
same
producer to be consumed in the same order they are
produced
even
if
we
expand partition of the topic.
Thanks,
Dong