On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jan,
>
> Thanks for the enthusiasm in improving Kafka's design. Now that I have
> read through your discussion with Jun, here are my thoughts:
>
> - The latest proposal should with log compacted topics by properly
> deleting old messages after a new message with the same key is produced. So
> it is probably not a concern anymore. Could you comment if there is still
> issue?
>
> - I wrote the SEP-5 and I am pretty familiar with the motivation and the
> design of SEP-5. SEP-5 is probably orthornal to the motivation of this KIP.
> The goal of SEP-5 is to allow user to increase task number of an existing
> Samza job. But if we increase the partition number of input topics,
> messages may still be consumed out-of-order by tasks in Samza which cause
> incorrect result. Similarly, the approach you proposed does not seem to
> ensure that the messages can be delivered in order, even if we can make
> sure that each consumer instance is assigned the set of new partitions
> covering the same set of keys.
>

Let me correct this comment. The approach of copying data to a new topic
can ensure in-order message delivery suppose we properly migrate offsets
from old topic to new topic.


> - I am trying to understand why it is better to copy the data instead of
> copying the change log topic for streaming use-case. For core Kafka
> use-case, and for the stream use-case that does not need to increase
> consumers, the current KIP already supports in-order delivery without the
> overhead of copying the data. For stream use-case that needs to increase
> consumer number, the existing consumer can backfill the existing data in
> the change log topic to the same change log topic with the new partition
> number, before the new set of consumers bootstrap state from the new
> partitions of the change log topic. If this solution works, then could you
> summarize the advantage of copying the data of input topic as compared to
> copying the change log topic? For example, does it enable more use-case,
> simplify the implementation of Kafka library, or reduce the operation
> overhead etc?
>
> Thanks,
> Dong
>
>
> On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
>> Hi Jun,
>>
>> I was really seeing progress in our conversation but your latest reply is
>> just devastating.
>> I though we were getting close being on the same page now it feels like
>> we are in different libraries.
>>
>> I just quickly slam my answers in here. If they are to brief I am sorry
>> give me a ping and try to go into details more.
>> Just want to show that your pro/cons listing is broken.
>>
>> Best Jan
>>
>> and want to get rid of this horrible compromise
>>
>>
>> On 19.03.2018 05:48, Jun Rao wrote:
>>
>>> Hi, Jan,
>>>
>>> Thanks for the discussion. Great points.
>>>
>>> Let me try to summarize the approach that you are proposing. On the
>>> broker
>>> side, we reshuffle the existing data in a topic from current partitions
>>> to
>>> the new partitions. Once the reshuffle fully catches up, switch the
>>> consumers to start consuming from the new partitions. If a consumer needs
>>> to rebuild its local state (due to partition changes), let the consumer
>>> rebuild its state by reading all existing data from the new partitions.
>>> Once all consumers have switches over, cut over the producer to the new
>>> partitions.
>>>
>>> The pros for this approach are that :
>>> 1. There is just one way to rebuild the local state, which is simpler.
>>>
>> true thanks
>>
>>>
>>> The cons for this approach are:
>>> 1. Need to copy existing data.
>>>
>> Very unfair and not correct. It does not require you to copy over
>> existing data. It _allows_ you to copy all existing data.
>>
>> 2. The cutover of the producer is a bit complicated since it needs to
>>> coordinate with all consumer groups.
>>>
>> Also not true. I explicitly tried to make clear that there is only one
>> special consumer (in the case of actually copying data) coordination is
>> required.
>>
>>> 3. The rebuilding of the state in the consumer is from the input topic,
>>> which can be more expensive than rebuilding from the existing state.
>>>
>> true, but rebuilding state is only required if you want to increase
>> processing power, so we assume this is at hand.
>>
>>> 4. The broker potentially has to know the partitioning function. If this
>>> needs to be customized at the topic level, it can be a bit messy.
>>>
>> I would argue against having the operation being performed by the broker.
>> This was not discussed yet but if you see my original email i suggested
>> otherwise from the beginning.
>>
>>>
>>> Here is an alternative approach by applying your idea not in the broker,
>>> but in the consumer. When new partitions are added, we don't move
>>> existing
>>> data. In KStreams, we first reshuffle the new input data to a new topic
>>> T1
>>> with the old number of partitions and feed T1's data to the rest of the
>>> pipeline. In the meantime, KStreams reshuffles all existing data of the
>>> change capture topic to another topic C1 with the new number of
>>> partitions.
>>> We can then build the state of the new tasks from C1. Once the new states
>>> have been fully built, we can cut over the consumption to the input topic
>>> and delete T1. This approach works with compacted topic too. If an
>>> application reads from the beginning of a compacted topic, the consumer
>>> will reshuffle the portion of the input when the number of partitions
>>> doesn't match the number of tasks.
>>>
>> We all wipe this idea from our heads instantly. Mixing Ideas from an
>> argument is not a resolution strategy
>> just leads to horrible horrible software.
>>
>>
>>> The pros of this approach are:
>>> 1. No need to copy existing data.
>>> 2. Each consumer group can cut over to the new partitions independently.
>>> 3. The state is rebuilt from the change capture topic, which is cheaper
>>> than rebuilding from the input topic.
>>> 4. Only the KStreams job needs to know the partitioning function.
>>>
>>> The cons of this approach are:
>>> 1. Potentially the same input topic needs to be reshuffled more than once
>>> in different consumer groups during the transition phase.
>>>
>>> What do you think?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>>
>>> On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <jan.filip...@trivago.com>
>>> wrote:
>>>
>>> Hi Jun,
>>>>
>>>> thank you for following me on these thoughts. It was important to me to
>>>> feel that kind of understanding for my arguments.
>>>>
>>>> What I was hoping for (I mentioned this earlier) is that we can model
>>>> the
>>>> case where we do not want to copy the data the exact same way as the
>>>> case
>>>> when we do copy the data. Maybe you can peek into the mails before to
>>>> see
>>>> more details for this.
>>>>
>>>> This means we have the same mechanism to transfer consumer groups to
>>>> switch topic. The offset mapping that would be generated would even be
>>>> simpler End Offset of the Old topic => offset 0 off all the partitions
>>>> of
>>>> the new topic. Then we could model the transition of a non-copy
>>>> expansion
>>>> the exact same way as a copy-expansion.
>>>>
>>>> I know this only works when topic growth by a factor. But the benefits
>>>> of
>>>> only growing by a factor are to strong anyways. See Clemens's hint and
>>>> remember that state reshuffling is entirely not needed if one doesn't
>>>> want
>>>> to grow processing power.
>>>>
>>>> I think these benefits should be clear, and that there is basically no
>>>> downside to what is currently at hand but just makes everything easy.
>>>>
>>>> One thing you need to know is. that if you do not offer rebuilding a log
>>>> compacted topic like i suggest that even if you have consumer state
>>>> reshuffling. The topic is broken and can not be used to bootstrap new
>>>> consumers. They don't know if they need to apply a key from and old
>>>> partition or not. This is a horrible downside I haven't seen a solution
>>>> for
>>>> in the email conversation.
>>>>
>>>> I argue to:
>>>>
>>>> Only grow topic by a factor always.
>>>> Have the "no copy consumer" transition as the trivial case of the "copy
>>>> consumer transition".
>>>> If processors needs to be scaled, let them rebuild from the new topic
>>>> and
>>>> leave the old running in the mean time.
>>>> Do not implement key shuffling in streams.
>>>>
>>>> I hope I can convince you especially with the fact how I want to handle
>>>> consumer transition. I think
>>>> you didn't quite understood me there before. I think the term "new
>>>> topic"
>>>> intimidated you a little.
>>>> How we solve this on disc doesn't really matter, If the data goes into
>>>> the
>>>> same Dir or a different Dir or anything. I do think that it needs to
>>>> involve at least rolling a new segment for the existing partitions.
>>>> But most of the transitions should work without restarting consumers.
>>>> (newer consumers with support for this). But with new topic i just meant
>>>> the topic that now has a different partition count. Plenty of ways to
>>>> handle that (versions, aliases)
>>>>
>>>> Hope I can further get my idea across.
>>>>
>>>> Best Jan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 14.03.2018 02:45, Jun Rao wrote:
>>>>
>>>> Hi, Jan,
>>>>>
>>>>> Thanks for sharing your view.
>>>>>
>>>>> I agree with you that recopying the data potentially makes the state
>>>>> management easier since the consumer can just rebuild its state from
>>>>> scratch (i.e., no need for state reshuffling).
>>>>>
>>>>> On the flip slide, I saw a few disadvantages of the approach that you
>>>>> suggested. (1) Building the state from the input topic from scratch is
>>>>> in
>>>>> general less efficient than state reshuffling. Let's say one computes a
>>>>> count per key from an input topic. The former requires reading all
>>>>> existing
>>>>> records in the input topic whereas the latter only requires reading
>>>>> data
>>>>> proportional to the number of unique keys. (2) The switching of the
>>>>> topic
>>>>> needs modification to the application. If there are many applications
>>>>> on a
>>>>> topic, coordinating such an effort may not be easy. Also, it's not
>>>>> clear
>>>>> how to enforce exactly-once semantic during the switch. (3) If a topic
>>>>> doesn't need any state management, recopying the data seems wasteful.
>>>>> In
>>>>> that case, in place partition expansion seems more desirable.
>>>>>
>>>>> I understand your concern about adding complexity in KStreams. But,
>>>>> perhaps
>>>>> we could iterate on that a bit more to see if it can be simplified.
>>>>>
>>>>> Jun
>>>>>
>>>>>
>>>>> On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak <
>>>>> jan.filip...@trivago.com>
>>>>> wrote:
>>>>>
>>>>> Hi Jun,
>>>>>
>>>>>> I will focus on point 61 as I think its _the_ fundamental part that I
>>>>>> cant
>>>>>> get across at the moment.
>>>>>>
>>>>>> Kafka is the platform to have state materialized multiple times from
>>>>>> one
>>>>>> input. I emphasize this: It is the building block in architectures
>>>>>> that
>>>>>> allow you to
>>>>>> have your state maintained multiple times. You put a message in once,
>>>>>> and
>>>>>> you have it pop out as often as you like. I believe you understand
>>>>>> this.
>>>>>>
>>>>>> Now! The path of thinking goes the following: I am using apache kafka
>>>>>> and
>>>>>> I _want_ my state multiple times. What am I going todo?
>>>>>>
>>>>>> A) Am I going to take my state that I build up, plunge some sort of
>>>>>> RPC
>>>>>> layer ontop of it, use that RPC layer to throw my records across
>>>>>> instances?
>>>>>> B) Am I just going to read the damn message twice?
>>>>>>
>>>>>> Approach A is fundamentally flawed and a violation of all that is good
>>>>>> and
>>>>>> holy in kafka deployments. I can not understand how this Idea can
>>>>>> come in
>>>>>> the first place.
>>>>>> (I do understand: IQ in streams, they polluted the kafka streams
>>>>>> codebase
>>>>>> really bad already. It is not funny! I think they are equally flawed
>>>>>> as
>>>>>> A)
>>>>>>
>>>>>> I say, we do what Kafka is good at. We repartition the topic once. We
>>>>>> switch the consumers.
>>>>>> (Those that need more partitions are going to rebuild their state in
>>>>>> multiple partitions by reading the new topic, those that don't just
>>>>>> assign
>>>>>> the new partitions properly)
>>>>>> We switch producers. Done!
>>>>>>
>>>>>> The best thing! It is trivial, hipster stream processor will have an
>>>>>> easy
>>>>>> time with that aswell. Its so super simple. And simple IS good!
>>>>>> It is what kafka was build todo. It is how we do it today. All I am
>>>>>> saying
>>>>>> is that a little broker help doing the producer swap is super useful.
>>>>>>
>>>>>> For everyone interested in why kafka is so powerful with approach B,
>>>>>> please watch https://youtu.be/bEbeZPVo98c?t=1633
>>>>>> I already looked up a good point in time, I think after 5 minutes the
>>>>>> "state" topic is handled and you should be able to understand me
>>>>>> and inch better.
>>>>>>
>>>>>> Please do not do A to the project, it deserves better!
>>>>>>
>>>>>> Best Jan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 13.03.2018 02:40, Jun Rao wrote:
>>>>>>
>>>>>> Hi, Jan,
>>>>>>
>>>>>>> Thanks for the reply. A few more comments below.
>>>>>>>
>>>>>>> 50. Ok, we can think a bit harder for supporting compacted topics.
>>>>>>>
>>>>>>> 51. This is a fundamental design question. In the more common case,
>>>>>>> the
>>>>>>> reason why someone wants to increase the number of partitions is that
>>>>>>> the
>>>>>>> consumer application is slow and one wants to run more consumer
>>>>>>> instances
>>>>>>> to increase the degree of parallelism. So, fixing the number of
>>>>>>> running
>>>>>>> consumer instances when expanding the partitions won't help this
>>>>>>> case.
>>>>>>> If
>>>>>>> we do need to increase the number of consumer instances, we need to
>>>>>>> somehow
>>>>>>> reshuffle the state of the consumer across instances. What we have
>>>>>>> been
>>>>>>> discussing in this KIP is whether we can do this more effectively
>>>>>>> through
>>>>>>> the KStream library (e.g. through a 2-phase partition expansion).
>>>>>>> This
>>>>>>> will
>>>>>>> add some complexity, but it's probably better than everyone doing
>>>>>>> this
>>>>>>> in
>>>>>>> the application space. The recopying approach that you mentioned
>>>>>>> doesn't
>>>>>>> seem to address the consumer state management issue when the consumer
>>>>>>> switches from an old to a new topic.
>>>>>>>
>>>>>>> 52. As for your example, it depends on whether the join key is the
>>>>>>> same
>>>>>>> between (A,B) and (B,C). If the join key is the same, we can do a
>>>>>>> 2-phase
>>>>>>> partition expansion of A, B, and C together. If the join keys are
>>>>>>> different, one would need to repartition the data on a different key
>>>>>>> for
>>>>>>> the second join, then the partition expansion can be done
>>>>>>> independently
>>>>>>> between (A,B) and (B,C).
>>>>>>>
>>>>>>> 53. If you always fix the number of consumer instances, we you
>>>>>>> described
>>>>>>> works. However, as I mentioned in #51, I am not sure how your
>>>>>>> proposal
>>>>>>> deals with consumer states when the number of consumer instances
>>>>>>> grows.
>>>>>>> Also, it just seems that it's better to avoid re-copying the existing
>>>>>>> data.
>>>>>>>
>>>>>>> 60. "just want to throw in my question from the longer email in the
>>>>>>> other
>>>>>>> Thread here. How will the bloom filter help a new consumer to decide
>>>>>>> to
>>>>>>> apply the key or not?" Not sure that I fully understood your
>>>>>>> question.
>>>>>>> The
>>>>>>> consumer just reads whatever key is in the log. The bloom filter just
>>>>>>> helps
>>>>>>> clean up the old keys.
>>>>>>>
>>>>>>> 61. "Why can we afford having a topic where its apparently not
>>>>>>> possible
>>>>>>> to
>>>>>>> start a new application on? I think this is an overall flaw of the
>>>>>>> discussed idea here. Not playing attention to the overall
>>>>>>> architecture."
>>>>>>> Could you explain a bit more when one can't start a new application?
>>>>>>>
>>>>>>> Jun
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak <
>>>>>>> jan.filip...@trivago.com
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Jun, thanks for your mail.
>>>>>>>
>>>>>>> Thank you for your questions!
>>>>>>>> I think they are really good and tackle the core of the problem I
>>>>>>>> see.
>>>>>>>>
>>>>>>>> I will answer inline, mostly but still want to set the tone here.
>>>>>>>>
>>>>>>>> The core strength of kafka is what Martin once called the
>>>>>>>> kappa-Architecture. How does this work?
>>>>>>>> You have everything as a log as in kafka. When you need to change
>>>>>>>> something.
>>>>>>>> You create the new version of your application and leave it running
>>>>>>>> in
>>>>>>>> parallel.
>>>>>>>> Once the new version is good you switch your users to use the new
>>>>>>>> application.
>>>>>>>>
>>>>>>>> The online reshuffling effectively breaks this architecture and I
>>>>>>>> think
>>>>>>>> the switch in thinking here is more harmful
>>>>>>>> than any details about the partitioning function to allow such a
>>>>>>>> change.
>>>>>>>> I
>>>>>>>> feel with my suggestion we are the closest to
>>>>>>>> the original and battle proven architecture and I can only warn to
>>>>>>>> move
>>>>>>>> away from it.
>>>>>>>>
>>>>>>>> I might have forgotten something, sometimes its hard for me to
>>>>>>>> getting
>>>>>>>> all
>>>>>>>> the thoughts captured in a mail, but I hope the comments inline will
>>>>>>>> further make my concern clear, and put some emphasis on why I
>>>>>>>> prefer my
>>>>>>>> solution ;)
>>>>>>>>
>>>>>>>> One thing we should all be aware of when discussing this, and I
>>>>>>>> think
>>>>>>>> Dong
>>>>>>>> should have mentioned it (maybe he did).
>>>>>>>> We are not discussing all of this out of thin air but there is an
>>>>>>>> effort
>>>>>>>> in the Samza project.
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+
>>>>>>>> Enable+partition+expansion+of+input+streams
>>>>>>>> https://issues.apache.org/jira/browse/SAMZA-1293
>>>>>>>>
>>>>>>>> To be clear. I think SEP-5 (state of last week, dont know if it
>>>>>>>> adapted
>>>>>>>> to
>>>>>>>> this discussion) is on a way better path than KIP-253, and I can't
>>>>>>>> really
>>>>>>>> explain why.
>>>>>>>>
>>>>>>>> Best Jan,
>>>>>>>>
>>>>>>>> nice weekend everyone
>>>>>>>>
>>>>>>>>
>>>>>>>> On 09.03.2018 03:36, Jun Rao wrote:
>>>>>>>>
>>>>>>>> Hi, Jan,
>>>>>>>>
>>>>>>>> Thanks for the feedback. Just some comments on the earlier points
>>>>>>>>> that
>>>>>>>>> you
>>>>>>>>> mentioned.
>>>>>>>>>
>>>>>>>>> 50. You brought up the question of whether existing data needs to
>>>>>>>>> be
>>>>>>>>> copied
>>>>>>>>> during partition expansion. My understand of your view is that
>>>>>>>>> avoid
>>>>>>>>> copying existing data will be more efficient, but it doesn't work
>>>>>>>>> well
>>>>>>>>> with
>>>>>>>>> compacted topics since some keys in the original partitions will
>>>>>>>>> never
>>>>>>>>> be
>>>>>>>>> cleaned. It would be useful to understand your use case of
>>>>>>>>> compacted
>>>>>>>>> topics
>>>>>>>>> a bit more. In the common use case, the data volume in a compacted
>>>>>>>>> topic
>>>>>>>>> may not be large. So, I am not sure if there is a strong need to
>>>>>>>>> expand
>>>>>>>>> partitions in a compacted topic, at least initially.
>>>>>>>>>
>>>>>>>>> I do agree. State is usually smaller. Update rates might be also
>>>>>>>>>
>>>>>>>>> competitively high.
>>>>>>>> Doing Log-compaction (even beeing very efficient and configurable)
>>>>>>>> is
>>>>>>>> also
>>>>>>>> a more expensive operation than
>>>>>>>> just discarding old segments. Further if you want to use more
>>>>>>>> consumers
>>>>>>>> processing the events
>>>>>>>> you also have to grow the number of partitions. Especially for
>>>>>>>> use-cases
>>>>>>>> we do (KIP-213) a tiny state full
>>>>>>>> table might be very expensive to process if it joins against a huge
>>>>>>>> table.
>>>>>>>>
>>>>>>>> I can just say we have been in the spot of needing to grow log
>>>>>>>> compacted
>>>>>>>> topics. Mainly for processing power we can bring to the table.
>>>>>>>>
>>>>>>>> Further i am not at all concerned about the extra spaced used by
>>>>>>>> "garbage
>>>>>>>> keys". I am more concerned about the correctness of innocent
>>>>>>>> consumers.
>>>>>>>> The
>>>>>>>> logic becomes complicated. Say for streams one would need to load
>>>>>>>> the
>>>>>>>> record into state but not forward it the topology ( to have it
>>>>>>>> available
>>>>>>>> for shuffeling). I rather have it simple and a topic clean
>>>>>>>> regardless
>>>>>>>> if
>>>>>>>> it
>>>>>>>> still has its old partition count. Especially with multiple
>>>>>>>> partitions
>>>>>>>> growth's I think it becomes insanely hard to to this shuffle
>>>>>>>> correct.
>>>>>>>> Maybe
>>>>>>>> Streams and Samza can do it. Especially if you do "hipster stream
>>>>>>>> processing" <https://www.confluent.io/blog
>>>>>>>> /introducing-kafka-streams-
>>>>>>>> stream-processing-made-simple/>. This makes kafka way to
>>>>>>>> complicated.
>>>>>>>> With my approach I think its way simpler because the topic has no
>>>>>>>> "history"
>>>>>>>> in terms of partitioning but is always clean.
>>>>>>>>
>>>>>>>>
>>>>>>>> 51. "Growing the topic by an integer factor does not require any
>>>>>>>> state
>>>>>>>>
>>>>>>>> redistribution at all." Could you clarify this a bit more? Let's say
>>>>>>>>> you
>>>>>>>>> have a consumer app that computes the windowed count per key. If
>>>>>>>>> you
>>>>>>>>> double
>>>>>>>>> the number of partitions from 1 to 2 and grow the consumer
>>>>>>>>> instances
>>>>>>>>> from
>>>>>>>>> 1
>>>>>>>>> to 2, we would need to redistribute some of the counts to the new
>>>>>>>>> consumer
>>>>>>>>> instance. Regarding to linear hashing, it's true that it won't
>>>>>>>>> solve
>>>>>>>>> the
>>>>>>>>> problem with compacted topics. The main benefit is that it
>>>>>>>>> redistributes
>>>>>>>>> the keys in one partition to no more than two partitions, which
>>>>>>>>> could
>>>>>>>>> help
>>>>>>>>> redistribute the state.
>>>>>>>>>
>>>>>>>>> You don't need to spin up a new consumer in this case. every
>>>>>>>>> consumer
>>>>>>>>>
>>>>>>>>> would just read every partition with the (partition % num_task)
>>>>>>>> task.
>>>>>>>> it
>>>>>>>> will still have the previous data right there and can go on.
>>>>>>>>
>>>>>>>> This sounds contradictory to what I said before, but please bear
>>>>>>>> with
>>>>>>>> me.
>>>>>>>>
>>>>>>>> 52. Good point on coordinating the expansion of 2 topics that need
>>>>>>>> to
>>>>>>>> be
>>>>>>>>
>>>>>>>> joined together. This is where the 2-phase partition expansion could
>>>>>>>>> potentially help. In the first phase, we could add new partitions
>>>>>>>>> to
>>>>>>>>> the 2
>>>>>>>>> topics one at a time but without publishing to the new patitions.
>>>>>>>>> Then,
>>>>>>>>> we
>>>>>>>>> can add new consumer instances to pick up the new partitions. In
>>>>>>>>> this
>>>>>>>>> transition phase, no reshuffling is needed since no data is coming
>>>>>>>>> from
>>>>>>>>> the
>>>>>>>>> new partitions. Finally, we can enable the publishing to the new
>>>>>>>>> partitions.
>>>>>>>>>
>>>>>>>>> I think its even worse than you think. I would like to introduce
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>> Term
>>>>>>>> transitive copartitioning. Imagine
>>>>>>>> 2 streams application. One joins (A,B) the other (B,C) then there
>>>>>>>> is a
>>>>>>>> transitive copartition requirement for
>>>>>>>> (A,C) to be copartitioned aswell. This can spread significantly and
>>>>>>>> require many consumers to adapt at the same time.
>>>>>>>>
>>>>>>>> It is also not entirely clear to me how you not need reshuffling in
>>>>>>>> this
>>>>>>>> case. If A has a record that never gets updated after the expansion
>>>>>>>> and
>>>>>>>> the
>>>>>>>> coresponding B record moves to a new partition. How shall they meet
>>>>>>>> w/o
>>>>>>>> shuffle?
>>>>>>>>
>>>>>>>> 53. "Migrating consumer is a step that might be made completly
>>>>>>>>
>>>>>>>> unnecessary
>>>>>>>>> if - for example streams - takes the gcd as partitioning scheme
>>>>>>>>> instead
>>>>>>>>> of
>>>>>>>>> enforcing 1 to 1." Not sure that I fully understand this. I think
>>>>>>>>> you
>>>>>>>>> mean
>>>>>>>>> that a consumer application can run more instances than the number
>>>>>>>>> of
>>>>>>>>> partitions. In that case, the consumer can just repartitioning the
>>>>>>>>> input
>>>>>>>>> data according to the number of instances. This is possible, but
>>>>>>>>> just
>>>>>>>>> has
>>>>>>>>> the overhead of reshuffling the data.
>>>>>>>>>
>>>>>>>>> No what I meant is ( that is also your question i think Mathias)
>>>>>>>>> that
>>>>>>>>> if
>>>>>>>>>
>>>>>>>>> you grow a topic by a factor.
>>>>>>>> Even if your processor is statefull you can can just assign all the
>>>>>>>> multiples of the previous partition to
>>>>>>>> this consumer and the state to keep processing correctly will be
>>>>>>>> present
>>>>>>>> w/o any shuffling.
>>>>>>>>
>>>>>>>> Say you have an assignment
>>>>>>>> Statefull consumer => partition
>>>>>>>> 0 => 0
>>>>>>>> 1 => 1
>>>>>>>> 2 => 2
>>>>>>>>
>>>>>>>> and you grow you topic by 4 you get,
>>>>>>>>
>>>>>>>> 0 => 0,3,6,9
>>>>>>>> 1 => 1,4,7,10
>>>>>>>> 2 => 2,5,8,11
>>>>>>>>
>>>>>>>> Say your hashcode is 8. 8%3 => 2  before so consumer for partition 2
>>>>>>>> has
>>>>>>>> it.
>>>>>>>> Now you you have 12 partitions so 8%12 => 8, so it goes into
>>>>>>>> partition
>>>>>>>> 8
>>>>>>>> which is assigned to the same consumer
>>>>>>>> who had 2 before and therefore knows the key.
>>>>>>>>
>>>>>>>> Userland reshuffeling is there as an options. And it does exactly
>>>>>>>> what
>>>>>>>> I
>>>>>>>> suggest. And I think its the perfect strategie. All I am suggestion
>>>>>>>> is
>>>>>>>> broker side support to switch the producers to the newly partitioned
>>>>>>>> topic.
>>>>>>>> Then the old (to few partition topic) can go away.  Remember the
>>>>>>>> list
>>>>>>>> of
>>>>>>>> steps in the beginning of this thread. If one has broker support for
>>>>>>>> all
>>>>>>>> where its required and streams support for those that aren’t
>>>>>>>> necessarily.
>>>>>>>> Then one has solved the problem.
>>>>>>>> I repeat it because I think its important. I am really happy that
>>>>>>>> you
>>>>>>>> brought that up! because its 100% what I want just with the
>>>>>>>> differences
>>>>>>>> to
>>>>>>>> have an option to discard the to small topic later (after all
>>>>>>>> consumers
>>>>>>>> adapted). And to have order correct there. I need broker support
>>>>>>>> managing
>>>>>>>> the copy process + the produces and fence them against each other. I
>>>>>>>> also
>>>>>>>> repeat. the copy process can run for weeks in the worst case.
>>>>>>>> Copying
>>>>>>>> the
>>>>>>>> data is not the longest task migrating consumers might very well be.
>>>>>>>> Once all consumers switched and copying is really up to date (think
>>>>>>>> ISR
>>>>>>>> like up to date) only then we stop the producer, wait for the copy
>>>>>>>> to
>>>>>>>> finish and use the new topic for producing.
>>>>>>>>
>>>>>>>> After this the topic is perfect in shape. and no one needs to worry
>>>>>>>> about
>>>>>>>> complicated stuff. (old keys hanging around might arrive in some
>>>>>>>> other
>>>>>>>> topic later.....). can only imagine how many tricky bugs gonna
>>>>>>>> arrive
>>>>>>>> after
>>>>>>>> someone had grown and shrunken is topic 10 times.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 54. "The other thing I wanted to mention is that I believe the
>>>>>>>> current
>>>>>>>>
>>>>>>>> suggestion (without copying data over) can be implemented in pure
>>>>>>>>> userland
>>>>>>>>> with a custom partitioner and a small feedbackloop from
>>>>>>>>> ProduceResponse
>>>>>>>>> =>
>>>>>>>>> Partitionier in coorporation with a change management system." I am
>>>>>>>>> not
>>>>>>>>> sure a customized partitioner itself solves the problem. We
>>>>>>>>> probably
>>>>>>>>> need
>>>>>>>>> some broker side support to enforce when the new partitions can be
>>>>>>>>> used.
>>>>>>>>> We
>>>>>>>>> also need some support on the consumer/kstream side to preserve the
>>>>>>>>> per
>>>>>>>>> key
>>>>>>>>> ordering and potentially migrate the processing state. This is not
>>>>>>>>> trivial
>>>>>>>>> and I am not sure if it's ideal to fully push to the application
>>>>>>>>> space.
>>>>>>>>>
>>>>>>>>> Broker support is defenitly the preferred way here. I have nothing
>>>>>>>>>
>>>>>>>>> against
>>>>>>>> broker support.
>>>>>>>> I tried to say that for what I would preffer - copying the data
>>>>>>>> over,
>>>>>>>> at
>>>>>>>> least for log compacted topics -
>>>>>>>> I would require more broker support than the KIP currently offers.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Jun
>>>>>>>>
>>>>>>>> On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak <
>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Dong,
>>>>>>>>>
>>>>>>>>> are you actually reading my emails, or are you just using the
>>>>>>>>> thread I
>>>>>>>>>
>>>>>>>>>> started for general announcements regarding the KIP?
>>>>>>>>>>
>>>>>>>>>> I tried to argue really hard against linear hashing. Growing the
>>>>>>>>>> topic
>>>>>>>>>> by
>>>>>>>>>> an integer factor does not require any state redistribution at
>>>>>>>>>> all. I
>>>>>>>>>> fail
>>>>>>>>>> to see completely where linear hashing helps on log compacted
>>>>>>>>>> topics.
>>>>>>>>>>
>>>>>>>>>> If you are not willing to explain to me what I might be
>>>>>>>>>> overlooking:
>>>>>>>>>> that
>>>>>>>>>> is fine.
>>>>>>>>>> But I ask you to not reply to my emails then. Please understand my
>>>>>>>>>> frustration with this.
>>>>>>>>>>
>>>>>>>>>> Best Jan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 06.03.2018 19:38, Dong Lin wrote:
>>>>>>>>>>
>>>>>>>>>> Hi everyone,
>>>>>>>>>>
>>>>>>>>>> Thanks for all the comments! It appears that everyone prefers
>>>>>>>>>> linear
>>>>>>>>>>
>>>>>>>>>>> hashing because it reduces the amount of state that needs to be
>>>>>>>>>>> moved
>>>>>>>>>>> between consumers (for stream processing). The KIP has been
>>>>>>>>>>> updated
>>>>>>>>>>> to
>>>>>>>>>>> use
>>>>>>>>>>> linear hashing.
>>>>>>>>>>>
>>>>>>>>>>> Regarding the migration endeavor: it seems that migrating
>>>>>>>>>>> producer
>>>>>>>>>>> library
>>>>>>>>>>> to use linear hashing should be pretty straightforward without
>>>>>>>>>>> much operational endeavor. If we don't upgrade client library to
>>>>>>>>>>> use
>>>>>>>>>>> this
>>>>>>>>>>> KIP, we can not support in-order delivery after partition is
>>>>>>>>>>> changed
>>>>>>>>>>> anyway. Suppose we upgrade client library to use this KIP, if
>>>>>>>>>>> partition
>>>>>>>>>>> number is not changed, the key -> partition mapping will be
>>>>>>>>>>> exactly
>>>>>>>>>>> the
>>>>>>>>>>> same as it is now because it is still determined using
>>>>>>>>>>> murmur_hash(key)
>>>>>>>>>>> %
>>>>>>>>>>> original_partition_num. In other words, this change is backward
>>>>>>>>>>> compatible.
>>>>>>>>>>>
>>>>>>>>>>> Regarding the load distribution: if we use linear hashing, the
>>>>>>>>>>> load
>>>>>>>>>>> may
>>>>>>>>>>> be
>>>>>>>>>>> unevenly distributed because those partitions which are not split
>>>>>>>>>>> may
>>>>>>>>>>> receive twice as much traffic as other partitions that are split.
>>>>>>>>>>> This
>>>>>>>>>>> issue can be mitigated by creating topic with partitions that are
>>>>>>>>>>> several
>>>>>>>>>>> times the number of consumers. And there will be no imbalance if
>>>>>>>>>>> the
>>>>>>>>>>> partition number is always doubled. So this imbalance seems
>>>>>>>>>>> acceptable.
>>>>>>>>>>>
>>>>>>>>>>> Regarding storing the partition strategy as per-topic config: It
>>>>>>>>>>> seems
>>>>>>>>>>> not
>>>>>>>>>>> necessary since we can still use murmur_hash as the default hash
>>>>>>>>>>> function
>>>>>>>>>>> and additionally apply the linear hashing algorithm if the
>>>>>>>>>>> partition
>>>>>>>>>>> number
>>>>>>>>>>> has increased. Not sure if there is any use-case for producer to
>>>>>>>>>>> use a
>>>>>>>>>>> different hash function. Jason, can you check if there is some
>>>>>>>>>>> use-case
>>>>>>>>>>> that I missed for using the per-topic partition strategy?
>>>>>>>>>>>
>>>>>>>>>>> Regarding how to reduce latency (due to state store/load) in
>>>>>>>>>>> stream
>>>>>>>>>>> processing consumer when partition number changes: I need to read
>>>>>>>>>>> the
>>>>>>>>>>> Kafka
>>>>>>>>>>> Stream code to understand how Kafka Stream currently migrate
>>>>>>>>>>> state
>>>>>>>>>>> between
>>>>>>>>>>> consumers when the application is added/removed for a given job.
>>>>>>>>>>> I
>>>>>>>>>>> will
>>>>>>>>>>> reply after I finish reading the documentation and code.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Dong
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <
>>>>>>>>>>> ja...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Great discussion. I think I'm wondering whether we can continue
>>>>>>>>>>> to
>>>>>>>>>>> leave
>>>>>>>>>>>
>>>>>>>>>>> Kafka agnostic to the partitioning strategy. The challenge is
>>>>>>>>>>>
>>>>>>>>>>> communicating
>>>>>>>>>>>> the partitioning logic from producers to consumers so that the
>>>>>>>>>>>> dependencies
>>>>>>>>>>>> between each epoch can be determined. For the sake of
>>>>>>>>>>>> discussion,
>>>>>>>>>>>> imagine
>>>>>>>>>>>> you did something like the following:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. The name (and perhaps version) of a partitioning strategy is
>>>>>>>>>>>> stored
>>>>>>>>>>>> in
>>>>>>>>>>>> topic configuration when a topic is created.
>>>>>>>>>>>> 2. The producer looks up the partitioning strategy before
>>>>>>>>>>>> writing
>>>>>>>>>>>> to
>>>>>>>>>>>> a
>>>>>>>>>>>> topic and includes it in the produce request (for fencing). If
>>>>>>>>>>>> it
>>>>>>>>>>>> doesn't
>>>>>>>>>>>> have an implementation for the configured strategy, it fails.
>>>>>>>>>>>> 3. The consumer also looks up the partitioning strategy and
>>>>>>>>>>>> uses it
>>>>>>>>>>>> to
>>>>>>>>>>>> determine dependencies when reading a new epoch. It could either
>>>>>>>>>>>> fail
>>>>>>>>>>>> or
>>>>>>>>>>>> make the most conservative dependency assumptions if it doesn't
>>>>>>>>>>>> know
>>>>>>>>>>>> how
>>>>>>>>>>>> to
>>>>>>>>>>>> implement the partitioning strategy. For the consumer, the new
>>>>>>>>>>>> interface
>>>>>>>>>>>> might look something like this:
>>>>>>>>>>>>
>>>>>>>>>>>> // Return the partition dependencies following an epoch bump
>>>>>>>>>>>> Map<Integer, List<Integer>> dependencies(int
>>>>>>>>>>>> numPartitionsBeforeEpochBump,
>>>>>>>>>>>> int numPartitionsAfterEpochBump)
>>>>>>>>>>>>
>>>>>>>>>>>> The unordered case then is just a particular implementation
>>>>>>>>>>>> which
>>>>>>>>>>>> never
>>>>>>>>>>>> has
>>>>>>>>>>>> any epoch dependencies. To implement this, we would need some
>>>>>>>>>>>> way
>>>>>>>>>>>> for
>>>>>>>>>>>> the
>>>>>>>>>>>> consumer to find out how many partitions there were in each
>>>>>>>>>>>> epoch,
>>>>>>>>>>>> but
>>>>>>>>>>>> maybe that's not too unreasonable.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Jason
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <
>>>>>>>>>>>> jan.filip...@trivago.com
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Dong
>>>>>>>>>>>>
>>>>>>>>>>>> thank you very much for your questions.
>>>>>>>>>>>>
>>>>>>>>>>>> regarding the time spend copying data across:
>>>>>>>>>>>>> It is correct that copying data from a topic with one partition
>>>>>>>>>>>>> mapping
>>>>>>>>>>>>>
>>>>>>>>>>>>> to
>>>>>>>>>>>>>
>>>>>>>>>>>>> a topic with a different partition mapping takes way longer
>>>>>>>>>>>>> than
>>>>>>>>>>>>>
>>>>>>>>>>>> we
>>>>>>>>>>>> can
>>>>>>>>>>>>
>>>>>>>>>>>> stop producers. Tens of minutes is a very optimistic estimate
>>>>>>>>>>>> here.
>>>>>>>>>>>>
>>>>>>>>>>>>> Many
>>>>>>>>>>>>> people can not afford copy full steam and therefore will have
>>>>>>>>>>>>> some
>>>>>>>>>>>>> rate
>>>>>>>>>>>>> limiting in place, this can bump the timespan into the day's.
>>>>>>>>>>>>> The
>>>>>>>>>>>>> good
>>>>>>>>>>>>>
>>>>>>>>>>>>> part
>>>>>>>>>>>>>
>>>>>>>>>>>>> is that the vast majority of the data can be copied while the
>>>>>>>>>>>>>
>>>>>>>>>>>> producers
>>>>>>>>>>>>
>>>>>>>>>>>> are
>>>>>>>>>>>>
>>>>>>>>>>>>> still going. One can then, piggyback the consumers ontop of
>>>>>>>>>>>>> this
>>>>>>>>>>>>>
>>>>>>>>>>>> timeframe,
>>>>>>>>>>>>
>>>>>>>>>>>>> by the method mentioned (provide them an mapping from their old
>>>>>>>>>>>>>
>>>>>>>>>>>> offsets
>>>>>>>>>>>>
>>>>>>>>>>>> to
>>>>>>>>>>>>
>>>>>>>>>>>>> new offsets in their repartitioned topics. In that way we
>>>>>>>>>>>>> separate
>>>>>>>>>>>>>
>>>>>>>>>>>> migration of consumers from migration of producers (decoupling
>>>>>>>>>>>>
>>>>>>>>>>>>> these
>>>>>>>>>>>>> is
>>>>>>>>>>>>> what kafka is strongest at). The time to actually swap over the
>>>>>>>>>>>>> producers
>>>>>>>>>>>>> should be kept minimal by ensuring that when a swap attempt is
>>>>>>>>>>>>> started
>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>> consumer copying over should be very close to the log end and
>>>>>>>>>>>>> is
>>>>>>>>>>>>>
>>>>>>>>>>>> expected
>>>>>>>>>>>>
>>>>>>>>>>>>> to finish within the next fetch. The operation should have a
>>>>>>>>>>>>> time-out
>>>>>>>>>>>>> and
>>>>>>>>>>>>> should be "reattemtable".
>>>>>>>>>>>>>
>>>>>>>>>>>>> Importance of logcompaction:
>>>>>>>>>>>>> If a producer produces key A, to partiton 0, its forever gonna
>>>>>>>>>>>>> be
>>>>>>>>>>>>> there,
>>>>>>>>>>>>> unless it gets deleted. The record might sit in there for
>>>>>>>>>>>>> years. A
>>>>>>>>>>>>> new
>>>>>>>>>>>>> producer started with the new partitions will fail to delete
>>>>>>>>>>>>> the
>>>>>>>>>>>>> record
>>>>>>>>>>>>>
>>>>>>>>>>>>> in
>>>>>>>>>>>>>
>>>>>>>>>>>>> the correct partition. Th record will be there forever and one
>>>>>>>>>>>>> can
>>>>>>>>>>>>>
>>>>>>>>>>>> not
>>>>>>>>>>>>
>>>>>>>>>>>> reliable bootstrap new consumers. I cannot see how linear
>>>>>>>>>>>> hashing
>>>>>>>>>>>>
>>>>>>>>>>>>> can
>>>>>>>>>>>>>
>>>>>>>>>>>>> solve
>>>>>>>>>>>>>
>>>>>>>>>>>>> this.
>>>>>>>>>>>>>
>>>>>>>>>>>> Regarding your skipping of userland copying:
>>>>>>>>>>>>
>>>>>>>>>>>>> 100%, copying the data across in userland is, as far as i can
>>>>>>>>>>>>> see,
>>>>>>>>>>>>> only
>>>>>>>>>>>>> a
>>>>>>>>>>>>> usecase for log compacted topics. Even for logcompaction +
>>>>>>>>>>>>> retentions
>>>>>>>>>>>>> it
>>>>>>>>>>>>> should only be opt-in. Why did I bring it up? I think log
>>>>>>>>>>>>> compaction
>>>>>>>>>>>>> is
>>>>>>>>>>>>> a
>>>>>>>>>>>>> very important feature to really embrace kafka as a "data
>>>>>>>>>>>>> plattform".
>>>>>>>>>>>>> The
>>>>>>>>>>>>> point I also want to make is that copying data this way is
>>>>>>>>>>>>> completely
>>>>>>>>>>>>> inline with the kafka architecture. it only consists of reading
>>>>>>>>>>>>> and
>>>>>>>>>>>>>
>>>>>>>>>>>>> writing
>>>>>>>>>>>>>
>>>>>>>>>>>>> to topics.
>>>>>>>>>>>>>
>>>>>>>>>>>> I hope it clarifies more why I think we should aim for more than
>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>>>>>> current KIP. I fear that once the KIP is done not much more
>>>>>>>>>>>>> effort
>>>>>>>>>>>>> will
>>>>>>>>>>>>>
>>>>>>>>>>>>> be
>>>>>>>>>>>>>
>>>>>>>>>>>>> taken.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 04.03.2018 02:28, Dong Lin wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Jan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the current proposal, the consumer will be blocked on
>>>>>>>>>>>>> waiting
>>>>>>>>>>>>> for
>>>>>>>>>>>>>
>>>>>>>>>>>>> other
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> consumers of the group to consume up to a given offset. In
>>>>>>>>>>>>>> most
>>>>>>>>>>>>>>
>>>>>>>>>>>>> cases,
>>>>>>>>>>>>> all
>>>>>>>>>>>>> consumers should be close to the LEO of the partitions when the
>>>>>>>>>>>>> partition
>>>>>>>>>>>>> expansion happens. Thus the time waiting should not be long
>>>>>>>>>>>>> e.g.
>>>>>>>>>>>>> on
>>>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>> order of seconds. On the other hand, it may take a long time to
>>>>>>>>>>>>> wait
>>>>>>>>>>>>>
>>>>>>>>>>>>> for
>>>>>>>>>>>>>> the entire partition to be copied -- the amount of time is
>>>>>>>>>>>>>> proportional
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the amount of existing data in the partition, which can take
>>>>>>>>>>>>>>
>>>>>>>>>>>>> tens of
>>>>>>>>>>>>>
>>>>>>>>>>>>> minutes. So the amount of time that we stop consumers may not
>>>>>>>>>>>>> be
>>>>>>>>>>>>> on
>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> same order of magnitude.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If we can implement this suggestion without copying data over
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> purse
>>>>>>>>>>>>>> userland, it will be much more valuable. Do you have ideas on
>>>>>>>>>>>>>> how
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be done?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Not sure why the current KIP not help people who depend on log
>>>>>>>>>>>>>
>>>>>>>>>>>>> compaction.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Could you elaborate more on this point?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan
>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
>>>>>>>>>>>>>> com
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Dong,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I tried to focus on what the steps are one can currently
>>>>>>>>>>>>>> perform
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> expand
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> or shrink a keyed topic while maintaining a top notch
>>>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>>> I can understand that there might be confusion about
>>>>>>>>>>>>>>> "stopping
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> consumer". It is exactly the same as proposed in the KIP.
>>>>>>>>>>>>>>> there
>>>>>>>>>>>>>>> needs
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> a time the producers agree on the new partitioning. The extra
>>>>>>>>>>>>>
>>>>>>>>>>>>> semantics I
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> want to put in there is that we have a possibility to wait
>>>>>>>>>>>>>>> until
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>
>>>>>>>>>>>>> the
>>>>>>>>>>>>>
>>>>>>>>>>>>> existing data
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> is copied over into the new partitioning scheme. When I say
>>>>>>>>>>>>> stopping
>>>>>>>>>>>>>
>>>>>>>>>>>>> I
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> think more of having a memory barrier that ensures the
>>>>>>>>>>>>>>> ordering. I
>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>> still
>>>>>>>>>>>>>>> aming for latencies  on the scale of leader failovers.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Consumers have to explicitly adapt the new partitioning
>>>>>>>>>>>>>>> scheme
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> above scenario. The reason is that in these cases where you
>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> dependent
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> on a particular partitioning scheme, you also have other
>>>>>>>>>>>>>>> topics
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>
>>>>>>>>>>>>> have
>>>>>>>>>>>>>
>>>>>>>>>>>>> co-partition enforcements or the kind -frequently. Therefore
>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> your
>>>>>>>>>>>>>
>>>>>>>>>>>>> other
>>>>>>>>>>>>>
>>>>>>>>>>>>> input topics might need to grow accordingly.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What I was suggesting was to streamline all these operations
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> best
>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>> possible to have "real" partition grow and shrinkage going
>>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Migrating
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the producers to a new partitioning scheme can be much more
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> streamlined
>>>>>>>>>>>>>
>>>>>>>>>>>>> with proper broker support for this. Migrating consumer is a
>>>>>>>>>>>>> step
>>>>>>>>>>>>>
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> might be made completly unnecessary if - for example streams
>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>> takes
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1.
>>>>>>>>>>>>>>> Connect
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> consumers
>>>>>>>>>>>>>
>>>>>>>>>>>>> and other consumers should be fine anyways.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I hope this makes more clear where I was aiming at. The rest
>>>>>>>>>>>>> needs
>>>>>>>>>>>>>
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> figured out. The only danger i see is that when we are
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> introducing
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>
>>>>>>>>>>>>> feature as supposed in the KIP, it wont help any people
>>>>>>>>>>>>> depending
>>>>>>>>>>>>> on
>>>>>>>>>>>>>
>>>>>>>>>>>>> log
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> compaction.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The other thing I wanted to mention is that I believe the
>>>>>>>>>>>>> current
>>>>>>>>>>>>>
>>>>>>>>>>>>> suggestion (without copying data over) can be implemented in
>>>>>>>>>>>>>> pure
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> userland
>>>>>>>>>>>>>>> with a custom partitioner and a small feedbackloop from
>>>>>>>>>>>>>>> ProduceResponse
>>>>>>>>>>>>>>> =>
>>>>>>>>>>>>>>> Partitionier in coorporation with a change management system.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hey Jan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am not sure if it is acceptable for producer to be stopped
>>>>>>>>>>>>>>> for a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> while,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> particularly for online application which requires low
>>>>>>>>>>>>>>>> latency. I
>>>>>>>>>>>>>>>> am
>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>> not sure how consumers can switch to a new topic. Does user
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> application
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> needs to explicitly specify a different topic for
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> producer/consumer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>
>>>>>>>>>>>>> subscribe to? It will be helpful for discussion if you can
>>>>>>>>>>>>> provide
>>>>>>>>>>>>>
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> detail on the interface change for this solution.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
>>>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> com
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> just want to throw my though in. In general the functionality
>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> very
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> usefull, we should though not try to find the architecture
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> hard
>>>>>>>>>>>>>>>>> while
>>>>>>>>>>>>>>>>> implementing.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The manual steps would be to
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> create a new topic
>>>>>>>>>>>>>>>>> the mirrormake from the new old topic to the new topic
>>>>>>>>>>>>>>>>> wait for mirror making to catch up.
>>>>>>>>>>>>>>>>> then put the consumers onto the new topic
>>>>>>>>>>>>>>>>>            (having mirrormaker spit out a mapping from old
>>>>>>>>>>>>>>>>> offsets to
>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>> offsets:
>>>>>>>>>>>>>>>>>                if topic is increased by factor X there is
>>>>>>>>>>>>>>>>> gonna
>>>>>>>>>>>>>>>>> be a
>>>>>>>>>>>>>>>>> clean
>>>>>>>>>>>>>>>>> mapping from 1 offset in the old topic to X offsets in the
>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>> topic,
>>>>>>>>>>>>>>>>>                if there is no factor then there is no
>>>>>>>>>>>>>>>>> chance to
>>>>>>>>>>>>>>>>> generate a
>>>>>>>>>>>>>>>>> mapping that can be reasonable used for continuing)
>>>>>>>>>>>>>>>>>            make consumers stop at appropriate points and
>>>>>>>>>>>>>>>>> continue
>>>>>>>>>>>>>>>>> consumption
>>>>>>>>>>>>>>>>> with offsets from the mapping.
>>>>>>>>>>>>>>>>> have the producers stop for a minimal time.
>>>>>>>>>>>>>>>>> wait for mirrormaker to finish
>>>>>>>>>>>>>>>>> let producer produce with the new metadata.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Instead of implementing the approach suggest in the KIP
>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> leave
>>>>>>>>>>>>>>>>> log compacted topic completely crumbled and unusable.
>>>>>>>>>>>>>>>>> I would much rather try to build infrastructure to support
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>>>> above operations more smoothly.
>>>>>>>>>>>>>>>>> Especially having producers stop and use another topic is
>>>>>>>>>>>>>>>>> difficult
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> it would be nice if one can trigger "invalid metadata"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> exceptions
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> them
>>>>>>>>>>>>>
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> if one could give topics aliases so that their produces with
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> old
>>>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>> will arrive in the new topic.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The downsides are obvious I guess ( having the same data
>>>>>>>>>>>>>>>>> twice
>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> transition period, but kafka tends to scale well with
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> datasize).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> its a
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> nicer fit into the architecture.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I further want to argument that the functionality by the KIP
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>> completely be implementing in "userland" with a custom
>>>>>>>>>>>>>>>>> partitioner
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> handles the transition as needed. I would appreciate if
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> someone
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> point
>>>>>>>>>>>>>
>>>>>>>>>>>>> out what a custom partitioner couldn't handle in this case?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> With the above approach, shrinking a topic becomes the same
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> steps.
>>>>>>>>>>>>>>>>> Without
>>>>>>>>>>>>>>>>> loosing keys in the discontinued partitions.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Would love to hear what everyone thinks.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I have created KIP-253: Support in-order message delivery
>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> expansion. See
>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confl
>>>>>>>>>>>>>>>>>> uence/display/KAFKA/KIP-253%
>>>>>>>>>>>>>>>>>> 3A+Support+in-order+message+de
>>>>>>>>>>>>>>>>>> livery+with+partition+expansio
>>>>>>>>>>>>>>>>>> n
>>>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This KIP provides a way to allow messages of the same key
>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>> producer to be consumed in the same order they are
>>>>>>>>>>>>>>>>>> produced
>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> expand partition of the topic.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Dong
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>
>>
>>
>

Reply via email to