On Tue, Mar 27, 2018 at 12:04 AM, Dong Lin <lindon...@gmail.com> wrote:
> Hey Jan, > > Thanks for the enthusiasm in improving Kafka's design. Now that I have > read through your discussion with Jun, here are my thoughts: > > - The latest proposal should with log compacted topics by properly > deleting old messages after a new message with the same key is produced. So > it is probably not a concern anymore. Could you comment if there is still > issue? > > - I wrote the SEP-5 and I am pretty familiar with the motivation and the > design of SEP-5. SEP-5 is probably orthornal to the motivation of this KIP. > The goal of SEP-5 is to allow user to increase task number of an existing > Samza job. But if we increase the partition number of input topics, > messages may still be consumed out-of-order by tasks in Samza which cause > incorrect result. Similarly, the approach you proposed does not seem to > ensure that the messages can be delivered in order, even if we can make > sure that each consumer instance is assigned the set of new partitions > covering the same set of keys. > Let me correct this comment. The approach of copying data to a new topic can ensure in-order message delivery suppose we properly migrate offsets from old topic to new topic. > - I am trying to understand why it is better to copy the data instead of > copying the change log topic for streaming use-case. For core Kafka > use-case, and for the stream use-case that does not need to increase > consumers, the current KIP already supports in-order delivery without the > overhead of copying the data. For stream use-case that needs to increase > consumer number, the existing consumer can backfill the existing data in > the change log topic to the same change log topic with the new partition > number, before the new set of consumers bootstrap state from the new > partitions of the change log topic. If this solution works, then could you > summarize the advantage of copying the data of input topic as compared to > copying the change log topic? For example, does it enable more use-case, > simplify the implementation of Kafka library, or reduce the operation > overhead etc? > > Thanks, > Dong > > > On Wed, Mar 21, 2018 at 6:57 AM, Jan Filipiak <jan.filip...@trivago.com> > wrote: > >> Hi Jun, >> >> I was really seeing progress in our conversation but your latest reply is >> just devastating. >> I though we were getting close being on the same page now it feels like >> we are in different libraries. >> >> I just quickly slam my answers in here. If they are to brief I am sorry >> give me a ping and try to go into details more. >> Just want to show that your pro/cons listing is broken. >> >> Best Jan >> >> and want to get rid of this horrible compromise >> >> >> On 19.03.2018 05:48, Jun Rao wrote: >> >>> Hi, Jan, >>> >>> Thanks for the discussion. Great points. >>> >>> Let me try to summarize the approach that you are proposing. On the >>> broker >>> side, we reshuffle the existing data in a topic from current partitions >>> to >>> the new partitions. Once the reshuffle fully catches up, switch the >>> consumers to start consuming from the new partitions. If a consumer needs >>> to rebuild its local state (due to partition changes), let the consumer >>> rebuild its state by reading all existing data from the new partitions. >>> Once all consumers have switches over, cut over the producer to the new >>> partitions. >>> >>> The pros for this approach are that : >>> 1. There is just one way to rebuild the local state, which is simpler. >>> >> true thanks >> >>> >>> The cons for this approach are: >>> 1. Need to copy existing data. >>> >> Very unfair and not correct. It does not require you to copy over >> existing data. It _allows_ you to copy all existing data. >> >> 2. The cutover of the producer is a bit complicated since it needs to >>> coordinate with all consumer groups. >>> >> Also not true. I explicitly tried to make clear that there is only one >> special consumer (in the case of actually copying data) coordination is >> required. >> >>> 3. The rebuilding of the state in the consumer is from the input topic, >>> which can be more expensive than rebuilding from the existing state. >>> >> true, but rebuilding state is only required if you want to increase >> processing power, so we assume this is at hand. >> >>> 4. The broker potentially has to know the partitioning function. If this >>> needs to be customized at the topic level, it can be a bit messy. >>> >> I would argue against having the operation being performed by the broker. >> This was not discussed yet but if you see my original email i suggested >> otherwise from the beginning. >> >>> >>> Here is an alternative approach by applying your idea not in the broker, >>> but in the consumer. When new partitions are added, we don't move >>> existing >>> data. In KStreams, we first reshuffle the new input data to a new topic >>> T1 >>> with the old number of partitions and feed T1's data to the rest of the >>> pipeline. In the meantime, KStreams reshuffles all existing data of the >>> change capture topic to another topic C1 with the new number of >>> partitions. >>> We can then build the state of the new tasks from C1. Once the new states >>> have been fully built, we can cut over the consumption to the input topic >>> and delete T1. This approach works with compacted topic too. If an >>> application reads from the beginning of a compacted topic, the consumer >>> will reshuffle the portion of the input when the number of partitions >>> doesn't match the number of tasks. >>> >> We all wipe this idea from our heads instantly. Mixing Ideas from an >> argument is not a resolution strategy >> just leads to horrible horrible software. >> >> >>> The pros of this approach are: >>> 1. No need to copy existing data. >>> 2. Each consumer group can cut over to the new partitions independently. >>> 3. The state is rebuilt from the change capture topic, which is cheaper >>> than rebuilding from the input topic. >>> 4. Only the KStreams job needs to know the partitioning function. >>> >>> The cons of this approach are: >>> 1. Potentially the same input topic needs to be reshuffled more than once >>> in different consumer groups during the transition phase. >>> >>> What do you think? >>> >>> Thanks, >>> >>> Jun >>> >>> >>> >>> On Thu, Mar 15, 2018 at 1:04 AM, Jan Filipiak <jan.filip...@trivago.com> >>> wrote: >>> >>> Hi Jun, >>>> >>>> thank you for following me on these thoughts. It was important to me to >>>> feel that kind of understanding for my arguments. >>>> >>>> What I was hoping for (I mentioned this earlier) is that we can model >>>> the >>>> case where we do not want to copy the data the exact same way as the >>>> case >>>> when we do copy the data. Maybe you can peek into the mails before to >>>> see >>>> more details for this. >>>> >>>> This means we have the same mechanism to transfer consumer groups to >>>> switch topic. The offset mapping that would be generated would even be >>>> simpler End Offset of the Old topic => offset 0 off all the partitions >>>> of >>>> the new topic. Then we could model the transition of a non-copy >>>> expansion >>>> the exact same way as a copy-expansion. >>>> >>>> I know this only works when topic growth by a factor. But the benefits >>>> of >>>> only growing by a factor are to strong anyways. See Clemens's hint and >>>> remember that state reshuffling is entirely not needed if one doesn't >>>> want >>>> to grow processing power. >>>> >>>> I think these benefits should be clear, and that there is basically no >>>> downside to what is currently at hand but just makes everything easy. >>>> >>>> One thing you need to know is. that if you do not offer rebuilding a log >>>> compacted topic like i suggest that even if you have consumer state >>>> reshuffling. The topic is broken and can not be used to bootstrap new >>>> consumers. They don't know if they need to apply a key from and old >>>> partition or not. This is a horrible downside I haven't seen a solution >>>> for >>>> in the email conversation. >>>> >>>> I argue to: >>>> >>>> Only grow topic by a factor always. >>>> Have the "no copy consumer" transition as the trivial case of the "copy >>>> consumer transition". >>>> If processors needs to be scaled, let them rebuild from the new topic >>>> and >>>> leave the old running in the mean time. >>>> Do not implement key shuffling in streams. >>>> >>>> I hope I can convince you especially with the fact how I want to handle >>>> consumer transition. I think >>>> you didn't quite understood me there before. I think the term "new >>>> topic" >>>> intimidated you a little. >>>> How we solve this on disc doesn't really matter, If the data goes into >>>> the >>>> same Dir or a different Dir or anything. I do think that it needs to >>>> involve at least rolling a new segment for the existing partitions. >>>> But most of the transitions should work without restarting consumers. >>>> (newer consumers with support for this). But with new topic i just meant >>>> the topic that now has a different partition count. Plenty of ways to >>>> handle that (versions, aliases) >>>> >>>> Hope I can further get my idea across. >>>> >>>> Best Jan >>>> >>>> >>>> >>>> >>>> >>>> >>>> On 14.03.2018 02:45, Jun Rao wrote: >>>> >>>> Hi, Jan, >>>>> >>>>> Thanks for sharing your view. >>>>> >>>>> I agree with you that recopying the data potentially makes the state >>>>> management easier since the consumer can just rebuild its state from >>>>> scratch (i.e., no need for state reshuffling). >>>>> >>>>> On the flip slide, I saw a few disadvantages of the approach that you >>>>> suggested. (1) Building the state from the input topic from scratch is >>>>> in >>>>> general less efficient than state reshuffling. Let's say one computes a >>>>> count per key from an input topic. The former requires reading all >>>>> existing >>>>> records in the input topic whereas the latter only requires reading >>>>> data >>>>> proportional to the number of unique keys. (2) The switching of the >>>>> topic >>>>> needs modification to the application. If there are many applications >>>>> on a >>>>> topic, coordinating such an effort may not be easy. Also, it's not >>>>> clear >>>>> how to enforce exactly-once semantic during the switch. (3) If a topic >>>>> doesn't need any state management, recopying the data seems wasteful. >>>>> In >>>>> that case, in place partition expansion seems more desirable. >>>>> >>>>> I understand your concern about adding complexity in KStreams. But, >>>>> perhaps >>>>> we could iterate on that a bit more to see if it can be simplified. >>>>> >>>>> Jun >>>>> >>>>> >>>>> On Mon, Mar 12, 2018 at 11:21 PM, Jan Filipiak < >>>>> jan.filip...@trivago.com> >>>>> wrote: >>>>> >>>>> Hi Jun, >>>>> >>>>>> I will focus on point 61 as I think its _the_ fundamental part that I >>>>>> cant >>>>>> get across at the moment. >>>>>> >>>>>> Kafka is the platform to have state materialized multiple times from >>>>>> one >>>>>> input. I emphasize this: It is the building block in architectures >>>>>> that >>>>>> allow you to >>>>>> have your state maintained multiple times. You put a message in once, >>>>>> and >>>>>> you have it pop out as often as you like. I believe you understand >>>>>> this. >>>>>> >>>>>> Now! The path of thinking goes the following: I am using apache kafka >>>>>> and >>>>>> I _want_ my state multiple times. What am I going todo? >>>>>> >>>>>> A) Am I going to take my state that I build up, plunge some sort of >>>>>> RPC >>>>>> layer ontop of it, use that RPC layer to throw my records across >>>>>> instances? >>>>>> B) Am I just going to read the damn message twice? >>>>>> >>>>>> Approach A is fundamentally flawed and a violation of all that is good >>>>>> and >>>>>> holy in kafka deployments. I can not understand how this Idea can >>>>>> come in >>>>>> the first place. >>>>>> (I do understand: IQ in streams, they polluted the kafka streams >>>>>> codebase >>>>>> really bad already. It is not funny! I think they are equally flawed >>>>>> as >>>>>> A) >>>>>> >>>>>> I say, we do what Kafka is good at. We repartition the topic once. We >>>>>> switch the consumers. >>>>>> (Those that need more partitions are going to rebuild their state in >>>>>> multiple partitions by reading the new topic, those that don't just >>>>>> assign >>>>>> the new partitions properly) >>>>>> We switch producers. Done! >>>>>> >>>>>> The best thing! It is trivial, hipster stream processor will have an >>>>>> easy >>>>>> time with that aswell. Its so super simple. And simple IS good! >>>>>> It is what kafka was build todo. It is how we do it today. All I am >>>>>> saying >>>>>> is that a little broker help doing the producer swap is super useful. >>>>>> >>>>>> For everyone interested in why kafka is so powerful with approach B, >>>>>> please watch https://youtu.be/bEbeZPVo98c?t=1633 >>>>>> I already looked up a good point in time, I think after 5 minutes the >>>>>> "state" topic is handled and you should be able to understand me >>>>>> and inch better. >>>>>> >>>>>> Please do not do A to the project, it deserves better! >>>>>> >>>>>> Best Jan >>>>>> >>>>>> >>>>>> >>>>>> On 13.03.2018 02:40, Jun Rao wrote: >>>>>> >>>>>> Hi, Jan, >>>>>> >>>>>>> Thanks for the reply. A few more comments below. >>>>>>> >>>>>>> 50. Ok, we can think a bit harder for supporting compacted topics. >>>>>>> >>>>>>> 51. This is a fundamental design question. In the more common case, >>>>>>> the >>>>>>> reason why someone wants to increase the number of partitions is that >>>>>>> the >>>>>>> consumer application is slow and one wants to run more consumer >>>>>>> instances >>>>>>> to increase the degree of parallelism. So, fixing the number of >>>>>>> running >>>>>>> consumer instances when expanding the partitions won't help this >>>>>>> case. >>>>>>> If >>>>>>> we do need to increase the number of consumer instances, we need to >>>>>>> somehow >>>>>>> reshuffle the state of the consumer across instances. What we have >>>>>>> been >>>>>>> discussing in this KIP is whether we can do this more effectively >>>>>>> through >>>>>>> the KStream library (e.g. through a 2-phase partition expansion). >>>>>>> This >>>>>>> will >>>>>>> add some complexity, but it's probably better than everyone doing >>>>>>> this >>>>>>> in >>>>>>> the application space. The recopying approach that you mentioned >>>>>>> doesn't >>>>>>> seem to address the consumer state management issue when the consumer >>>>>>> switches from an old to a new topic. >>>>>>> >>>>>>> 52. As for your example, it depends on whether the join key is the >>>>>>> same >>>>>>> between (A,B) and (B,C). If the join key is the same, we can do a >>>>>>> 2-phase >>>>>>> partition expansion of A, B, and C together. If the join keys are >>>>>>> different, one would need to repartition the data on a different key >>>>>>> for >>>>>>> the second join, then the partition expansion can be done >>>>>>> independently >>>>>>> between (A,B) and (B,C). >>>>>>> >>>>>>> 53. If you always fix the number of consumer instances, we you >>>>>>> described >>>>>>> works. However, as I mentioned in #51, I am not sure how your >>>>>>> proposal >>>>>>> deals with consumer states when the number of consumer instances >>>>>>> grows. >>>>>>> Also, it just seems that it's better to avoid re-copying the existing >>>>>>> data. >>>>>>> >>>>>>> 60. "just want to throw in my question from the longer email in the >>>>>>> other >>>>>>> Thread here. How will the bloom filter help a new consumer to decide >>>>>>> to >>>>>>> apply the key or not?" Not sure that I fully understood your >>>>>>> question. >>>>>>> The >>>>>>> consumer just reads whatever key is in the log. The bloom filter just >>>>>>> helps >>>>>>> clean up the old keys. >>>>>>> >>>>>>> 61. "Why can we afford having a topic where its apparently not >>>>>>> possible >>>>>>> to >>>>>>> start a new application on? I think this is an overall flaw of the >>>>>>> discussed idea here. Not playing attention to the overall >>>>>>> architecture." >>>>>>> Could you explain a bit more when one can't start a new application? >>>>>>> >>>>>>> Jun >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Mar 10, 2018 at 1:40 AM, Jan Filipiak < >>>>>>> jan.filip...@trivago.com >>>>>>> wrote: >>>>>>> >>>>>>> Hi Jun, thanks for your mail. >>>>>>> >>>>>>> Thank you for your questions! >>>>>>>> I think they are really good and tackle the core of the problem I >>>>>>>> see. >>>>>>>> >>>>>>>> I will answer inline, mostly but still want to set the tone here. >>>>>>>> >>>>>>>> The core strength of kafka is what Martin once called the >>>>>>>> kappa-Architecture. How does this work? >>>>>>>> You have everything as a log as in kafka. When you need to change >>>>>>>> something. >>>>>>>> You create the new version of your application and leave it running >>>>>>>> in >>>>>>>> parallel. >>>>>>>> Once the new version is good you switch your users to use the new >>>>>>>> application. >>>>>>>> >>>>>>>> The online reshuffling effectively breaks this architecture and I >>>>>>>> think >>>>>>>> the switch in thinking here is more harmful >>>>>>>> than any details about the partitioning function to allow such a >>>>>>>> change. >>>>>>>> I >>>>>>>> feel with my suggestion we are the closest to >>>>>>>> the original and battle proven architecture and I can only warn to >>>>>>>> move >>>>>>>> away from it. >>>>>>>> >>>>>>>> I might have forgotten something, sometimes its hard for me to >>>>>>>> getting >>>>>>>> all >>>>>>>> the thoughts captured in a mail, but I hope the comments inline will >>>>>>>> further make my concern clear, and put some emphasis on why I >>>>>>>> prefer my >>>>>>>> solution ;) >>>>>>>> >>>>>>>> One thing we should all be aware of when discussing this, and I >>>>>>>> think >>>>>>>> Dong >>>>>>>> should have mentioned it (maybe he did). >>>>>>>> We are not discussing all of this out of thin air but there is an >>>>>>>> effort >>>>>>>> in the Samza project. >>>>>>>> >>>>>>>> https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+ >>>>>>>> Enable+partition+expansion+of+input+streams >>>>>>>> https://issues.apache.org/jira/browse/SAMZA-1293 >>>>>>>> >>>>>>>> To be clear. I think SEP-5 (state of last week, dont know if it >>>>>>>> adapted >>>>>>>> to >>>>>>>> this discussion) is on a way better path than KIP-253, and I can't >>>>>>>> really >>>>>>>> explain why. >>>>>>>> >>>>>>>> Best Jan, >>>>>>>> >>>>>>>> nice weekend everyone >>>>>>>> >>>>>>>> >>>>>>>> On 09.03.2018 03:36, Jun Rao wrote: >>>>>>>> >>>>>>>> Hi, Jan, >>>>>>>> >>>>>>>> Thanks for the feedback. Just some comments on the earlier points >>>>>>>>> that >>>>>>>>> you >>>>>>>>> mentioned. >>>>>>>>> >>>>>>>>> 50. You brought up the question of whether existing data needs to >>>>>>>>> be >>>>>>>>> copied >>>>>>>>> during partition expansion. My understand of your view is that >>>>>>>>> avoid >>>>>>>>> copying existing data will be more efficient, but it doesn't work >>>>>>>>> well >>>>>>>>> with >>>>>>>>> compacted topics since some keys in the original partitions will >>>>>>>>> never >>>>>>>>> be >>>>>>>>> cleaned. It would be useful to understand your use case of >>>>>>>>> compacted >>>>>>>>> topics >>>>>>>>> a bit more. In the common use case, the data volume in a compacted >>>>>>>>> topic >>>>>>>>> may not be large. So, I am not sure if there is a strong need to >>>>>>>>> expand >>>>>>>>> partitions in a compacted topic, at least initially. >>>>>>>>> >>>>>>>>> I do agree. State is usually smaller. Update rates might be also >>>>>>>>> >>>>>>>>> competitively high. >>>>>>>> Doing Log-compaction (even beeing very efficient and configurable) >>>>>>>> is >>>>>>>> also >>>>>>>> a more expensive operation than >>>>>>>> just discarding old segments. Further if you want to use more >>>>>>>> consumers >>>>>>>> processing the events >>>>>>>> you also have to grow the number of partitions. Especially for >>>>>>>> use-cases >>>>>>>> we do (KIP-213) a tiny state full >>>>>>>> table might be very expensive to process if it joins against a huge >>>>>>>> table. >>>>>>>> >>>>>>>> I can just say we have been in the spot of needing to grow log >>>>>>>> compacted >>>>>>>> topics. Mainly for processing power we can bring to the table. >>>>>>>> >>>>>>>> Further i am not at all concerned about the extra spaced used by >>>>>>>> "garbage >>>>>>>> keys". I am more concerned about the correctness of innocent >>>>>>>> consumers. >>>>>>>> The >>>>>>>> logic becomes complicated. Say for streams one would need to load >>>>>>>> the >>>>>>>> record into state but not forward it the topology ( to have it >>>>>>>> available >>>>>>>> for shuffeling). I rather have it simple and a topic clean >>>>>>>> regardless >>>>>>>> if >>>>>>>> it >>>>>>>> still has its old partition count. Especially with multiple >>>>>>>> partitions >>>>>>>> growth's I think it becomes insanely hard to to this shuffle >>>>>>>> correct. >>>>>>>> Maybe >>>>>>>> Streams and Samza can do it. Especially if you do "hipster stream >>>>>>>> processing" <https://www.confluent.io/blog >>>>>>>> /introducing-kafka-streams- >>>>>>>> stream-processing-made-simple/>. This makes kafka way to >>>>>>>> complicated. >>>>>>>> With my approach I think its way simpler because the topic has no >>>>>>>> "history" >>>>>>>> in terms of partitioning but is always clean. >>>>>>>> >>>>>>>> >>>>>>>> 51. "Growing the topic by an integer factor does not require any >>>>>>>> state >>>>>>>> >>>>>>>> redistribution at all." Could you clarify this a bit more? Let's say >>>>>>>>> you >>>>>>>>> have a consumer app that computes the windowed count per key. If >>>>>>>>> you >>>>>>>>> double >>>>>>>>> the number of partitions from 1 to 2 and grow the consumer >>>>>>>>> instances >>>>>>>>> from >>>>>>>>> 1 >>>>>>>>> to 2, we would need to redistribute some of the counts to the new >>>>>>>>> consumer >>>>>>>>> instance. Regarding to linear hashing, it's true that it won't >>>>>>>>> solve >>>>>>>>> the >>>>>>>>> problem with compacted topics. The main benefit is that it >>>>>>>>> redistributes >>>>>>>>> the keys in one partition to no more than two partitions, which >>>>>>>>> could >>>>>>>>> help >>>>>>>>> redistribute the state. >>>>>>>>> >>>>>>>>> You don't need to spin up a new consumer in this case. every >>>>>>>>> consumer >>>>>>>>> >>>>>>>>> would just read every partition with the (partition % num_task) >>>>>>>> task. >>>>>>>> it >>>>>>>> will still have the previous data right there and can go on. >>>>>>>> >>>>>>>> This sounds contradictory to what I said before, but please bear >>>>>>>> with >>>>>>>> me. >>>>>>>> >>>>>>>> 52. Good point on coordinating the expansion of 2 topics that need >>>>>>>> to >>>>>>>> be >>>>>>>> >>>>>>>> joined together. This is where the 2-phase partition expansion could >>>>>>>>> potentially help. In the first phase, we could add new partitions >>>>>>>>> to >>>>>>>>> the 2 >>>>>>>>> topics one at a time but without publishing to the new patitions. >>>>>>>>> Then, >>>>>>>>> we >>>>>>>>> can add new consumer instances to pick up the new partitions. In >>>>>>>>> this >>>>>>>>> transition phase, no reshuffling is needed since no data is coming >>>>>>>>> from >>>>>>>>> the >>>>>>>>> new partitions. Finally, we can enable the publishing to the new >>>>>>>>> partitions. >>>>>>>>> >>>>>>>>> I think its even worse than you think. I would like to introduce >>>>>>>>> the >>>>>>>>> >>>>>>>>> Term >>>>>>>> transitive copartitioning. Imagine >>>>>>>> 2 streams application. One joins (A,B) the other (B,C) then there >>>>>>>> is a >>>>>>>> transitive copartition requirement for >>>>>>>> (A,C) to be copartitioned aswell. This can spread significantly and >>>>>>>> require many consumers to adapt at the same time. >>>>>>>> >>>>>>>> It is also not entirely clear to me how you not need reshuffling in >>>>>>>> this >>>>>>>> case. If A has a record that never gets updated after the expansion >>>>>>>> and >>>>>>>> the >>>>>>>> coresponding B record moves to a new partition. How shall they meet >>>>>>>> w/o >>>>>>>> shuffle? >>>>>>>> >>>>>>>> 53. "Migrating consumer is a step that might be made completly >>>>>>>> >>>>>>>> unnecessary >>>>>>>>> if - for example streams - takes the gcd as partitioning scheme >>>>>>>>> instead >>>>>>>>> of >>>>>>>>> enforcing 1 to 1." Not sure that I fully understand this. I think >>>>>>>>> you >>>>>>>>> mean >>>>>>>>> that a consumer application can run more instances than the number >>>>>>>>> of >>>>>>>>> partitions. In that case, the consumer can just repartitioning the >>>>>>>>> input >>>>>>>>> data according to the number of instances. This is possible, but >>>>>>>>> just >>>>>>>>> has >>>>>>>>> the overhead of reshuffling the data. >>>>>>>>> >>>>>>>>> No what I meant is ( that is also your question i think Mathias) >>>>>>>>> that >>>>>>>>> if >>>>>>>>> >>>>>>>>> you grow a topic by a factor. >>>>>>>> Even if your processor is statefull you can can just assign all the >>>>>>>> multiples of the previous partition to >>>>>>>> this consumer and the state to keep processing correctly will be >>>>>>>> present >>>>>>>> w/o any shuffling. >>>>>>>> >>>>>>>> Say you have an assignment >>>>>>>> Statefull consumer => partition >>>>>>>> 0 => 0 >>>>>>>> 1 => 1 >>>>>>>> 2 => 2 >>>>>>>> >>>>>>>> and you grow you topic by 4 you get, >>>>>>>> >>>>>>>> 0 => 0,3,6,9 >>>>>>>> 1 => 1,4,7,10 >>>>>>>> 2 => 2,5,8,11 >>>>>>>> >>>>>>>> Say your hashcode is 8. 8%3 => 2 before so consumer for partition 2 >>>>>>>> has >>>>>>>> it. >>>>>>>> Now you you have 12 partitions so 8%12 => 8, so it goes into >>>>>>>> partition >>>>>>>> 8 >>>>>>>> which is assigned to the same consumer >>>>>>>> who had 2 before and therefore knows the key. >>>>>>>> >>>>>>>> Userland reshuffeling is there as an options. And it does exactly >>>>>>>> what >>>>>>>> I >>>>>>>> suggest. And I think its the perfect strategie. All I am suggestion >>>>>>>> is >>>>>>>> broker side support to switch the producers to the newly partitioned >>>>>>>> topic. >>>>>>>> Then the old (to few partition topic) can go away. Remember the >>>>>>>> list >>>>>>>> of >>>>>>>> steps in the beginning of this thread. If one has broker support for >>>>>>>> all >>>>>>>> where its required and streams support for those that aren’t >>>>>>>> necessarily. >>>>>>>> Then one has solved the problem. >>>>>>>> I repeat it because I think its important. I am really happy that >>>>>>>> you >>>>>>>> brought that up! because its 100% what I want just with the >>>>>>>> differences >>>>>>>> to >>>>>>>> have an option to discard the to small topic later (after all >>>>>>>> consumers >>>>>>>> adapted). And to have order correct there. I need broker support >>>>>>>> managing >>>>>>>> the copy process + the produces and fence them against each other. I >>>>>>>> also >>>>>>>> repeat. the copy process can run for weeks in the worst case. >>>>>>>> Copying >>>>>>>> the >>>>>>>> data is not the longest task migrating consumers might very well be. >>>>>>>> Once all consumers switched and copying is really up to date (think >>>>>>>> ISR >>>>>>>> like up to date) only then we stop the producer, wait for the copy >>>>>>>> to >>>>>>>> finish and use the new topic for producing. >>>>>>>> >>>>>>>> After this the topic is perfect in shape. and no one needs to worry >>>>>>>> about >>>>>>>> complicated stuff. (old keys hanging around might arrive in some >>>>>>>> other >>>>>>>> topic later.....). can only imagine how many tricky bugs gonna >>>>>>>> arrive >>>>>>>> after >>>>>>>> someone had grown and shrunken is topic 10 times. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 54. "The other thing I wanted to mention is that I believe the >>>>>>>> current >>>>>>>> >>>>>>>> suggestion (without copying data over) can be implemented in pure >>>>>>>>> userland >>>>>>>>> with a custom partitioner and a small feedbackloop from >>>>>>>>> ProduceResponse >>>>>>>>> => >>>>>>>>> Partitionier in coorporation with a change management system." I am >>>>>>>>> not >>>>>>>>> sure a customized partitioner itself solves the problem. We >>>>>>>>> probably >>>>>>>>> need >>>>>>>>> some broker side support to enforce when the new partitions can be >>>>>>>>> used. >>>>>>>>> We >>>>>>>>> also need some support on the consumer/kstream side to preserve the >>>>>>>>> per >>>>>>>>> key >>>>>>>>> ordering and potentially migrate the processing state. This is not >>>>>>>>> trivial >>>>>>>>> and I am not sure if it's ideal to fully push to the application >>>>>>>>> space. >>>>>>>>> >>>>>>>>> Broker support is defenitly the preferred way here. I have nothing >>>>>>>>> >>>>>>>>> against >>>>>>>> broker support. >>>>>>>> I tried to say that for what I would preffer - copying the data >>>>>>>> over, >>>>>>>> at >>>>>>>> least for log compacted topics - >>>>>>>> I would require more broker support than the KIP currently offers. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Jun >>>>>>>> >>>>>>>> On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak < >>>>>>>>> jan.filip...@trivago.com >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Dong, >>>>>>>>> >>>>>>>>> are you actually reading my emails, or are you just using the >>>>>>>>> thread I >>>>>>>>> >>>>>>>>>> started for general announcements regarding the KIP? >>>>>>>>>> >>>>>>>>>> I tried to argue really hard against linear hashing. Growing the >>>>>>>>>> topic >>>>>>>>>> by >>>>>>>>>> an integer factor does not require any state redistribution at >>>>>>>>>> all. I >>>>>>>>>> fail >>>>>>>>>> to see completely where linear hashing helps on log compacted >>>>>>>>>> topics. >>>>>>>>>> >>>>>>>>>> If you are not willing to explain to me what I might be >>>>>>>>>> overlooking: >>>>>>>>>> that >>>>>>>>>> is fine. >>>>>>>>>> But I ask you to not reply to my emails then. Please understand my >>>>>>>>>> frustration with this. >>>>>>>>>> >>>>>>>>>> Best Jan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 06.03.2018 19:38, Dong Lin wrote: >>>>>>>>>> >>>>>>>>>> Hi everyone, >>>>>>>>>> >>>>>>>>>> Thanks for all the comments! It appears that everyone prefers >>>>>>>>>> linear >>>>>>>>>> >>>>>>>>>>> hashing because it reduces the amount of state that needs to be >>>>>>>>>>> moved >>>>>>>>>>> between consumers (for stream processing). The KIP has been >>>>>>>>>>> updated >>>>>>>>>>> to >>>>>>>>>>> use >>>>>>>>>>> linear hashing. >>>>>>>>>>> >>>>>>>>>>> Regarding the migration endeavor: it seems that migrating >>>>>>>>>>> producer >>>>>>>>>>> library >>>>>>>>>>> to use linear hashing should be pretty straightforward without >>>>>>>>>>> much operational endeavor. If we don't upgrade client library to >>>>>>>>>>> use >>>>>>>>>>> this >>>>>>>>>>> KIP, we can not support in-order delivery after partition is >>>>>>>>>>> changed >>>>>>>>>>> anyway. Suppose we upgrade client library to use this KIP, if >>>>>>>>>>> partition >>>>>>>>>>> number is not changed, the key -> partition mapping will be >>>>>>>>>>> exactly >>>>>>>>>>> the >>>>>>>>>>> same as it is now because it is still determined using >>>>>>>>>>> murmur_hash(key) >>>>>>>>>>> % >>>>>>>>>>> original_partition_num. In other words, this change is backward >>>>>>>>>>> compatible. >>>>>>>>>>> >>>>>>>>>>> Regarding the load distribution: if we use linear hashing, the >>>>>>>>>>> load >>>>>>>>>>> may >>>>>>>>>>> be >>>>>>>>>>> unevenly distributed because those partitions which are not split >>>>>>>>>>> may >>>>>>>>>>> receive twice as much traffic as other partitions that are split. >>>>>>>>>>> This >>>>>>>>>>> issue can be mitigated by creating topic with partitions that are >>>>>>>>>>> several >>>>>>>>>>> times the number of consumers. And there will be no imbalance if >>>>>>>>>>> the >>>>>>>>>>> partition number is always doubled. So this imbalance seems >>>>>>>>>>> acceptable. >>>>>>>>>>> >>>>>>>>>>> Regarding storing the partition strategy as per-topic config: It >>>>>>>>>>> seems >>>>>>>>>>> not >>>>>>>>>>> necessary since we can still use murmur_hash as the default hash >>>>>>>>>>> function >>>>>>>>>>> and additionally apply the linear hashing algorithm if the >>>>>>>>>>> partition >>>>>>>>>>> number >>>>>>>>>>> has increased. Not sure if there is any use-case for producer to >>>>>>>>>>> use a >>>>>>>>>>> different hash function. Jason, can you check if there is some >>>>>>>>>>> use-case >>>>>>>>>>> that I missed for using the per-topic partition strategy? >>>>>>>>>>> >>>>>>>>>>> Regarding how to reduce latency (due to state store/load) in >>>>>>>>>>> stream >>>>>>>>>>> processing consumer when partition number changes: I need to read >>>>>>>>>>> the >>>>>>>>>>> Kafka >>>>>>>>>>> Stream code to understand how Kafka Stream currently migrate >>>>>>>>>>> state >>>>>>>>>>> between >>>>>>>>>>> consumers when the application is added/removed for a given job. >>>>>>>>>>> I >>>>>>>>>>> will >>>>>>>>>>> reply after I finish reading the documentation and code. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Dong >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson < >>>>>>>>>>> ja...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Great discussion. I think I'm wondering whether we can continue >>>>>>>>>>> to >>>>>>>>>>> leave >>>>>>>>>>> >>>>>>>>>>> Kafka agnostic to the partitioning strategy. The challenge is >>>>>>>>>>> >>>>>>>>>>> communicating >>>>>>>>>>>> the partitioning logic from producers to consumers so that the >>>>>>>>>>>> dependencies >>>>>>>>>>>> between each epoch can be determined. For the sake of >>>>>>>>>>>> discussion, >>>>>>>>>>>> imagine >>>>>>>>>>>> you did something like the following: >>>>>>>>>>>> >>>>>>>>>>>> 1. The name (and perhaps version) of a partitioning strategy is >>>>>>>>>>>> stored >>>>>>>>>>>> in >>>>>>>>>>>> topic configuration when a topic is created. >>>>>>>>>>>> 2. The producer looks up the partitioning strategy before >>>>>>>>>>>> writing >>>>>>>>>>>> to >>>>>>>>>>>> a >>>>>>>>>>>> topic and includes it in the produce request (for fencing). If >>>>>>>>>>>> it >>>>>>>>>>>> doesn't >>>>>>>>>>>> have an implementation for the configured strategy, it fails. >>>>>>>>>>>> 3. The consumer also looks up the partitioning strategy and >>>>>>>>>>>> uses it >>>>>>>>>>>> to >>>>>>>>>>>> determine dependencies when reading a new epoch. It could either >>>>>>>>>>>> fail >>>>>>>>>>>> or >>>>>>>>>>>> make the most conservative dependency assumptions if it doesn't >>>>>>>>>>>> know >>>>>>>>>>>> how >>>>>>>>>>>> to >>>>>>>>>>>> implement the partitioning strategy. For the consumer, the new >>>>>>>>>>>> interface >>>>>>>>>>>> might look something like this: >>>>>>>>>>>> >>>>>>>>>>>> // Return the partition dependencies following an epoch bump >>>>>>>>>>>> Map<Integer, List<Integer>> dependencies(int >>>>>>>>>>>> numPartitionsBeforeEpochBump, >>>>>>>>>>>> int numPartitionsAfterEpochBump) >>>>>>>>>>>> >>>>>>>>>>>> The unordered case then is just a particular implementation >>>>>>>>>>>> which >>>>>>>>>>>> never >>>>>>>>>>>> has >>>>>>>>>>>> any epoch dependencies. To implement this, we would need some >>>>>>>>>>>> way >>>>>>>>>>>> for >>>>>>>>>>>> the >>>>>>>>>>>> consumer to find out how many partitions there were in each >>>>>>>>>>>> epoch, >>>>>>>>>>>> but >>>>>>>>>>>> maybe that's not too unreasonable. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Jason >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak < >>>>>>>>>>>> jan.filip...@trivago.com >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Dong >>>>>>>>>>>> >>>>>>>>>>>> thank you very much for your questions. >>>>>>>>>>>> >>>>>>>>>>>> regarding the time spend copying data across: >>>>>>>>>>>>> It is correct that copying data from a topic with one partition >>>>>>>>>>>>> mapping >>>>>>>>>>>>> >>>>>>>>>>>>> to >>>>>>>>>>>>> >>>>>>>>>>>>> a topic with a different partition mapping takes way longer >>>>>>>>>>>>> than >>>>>>>>>>>>> >>>>>>>>>>>> we >>>>>>>>>>>> can >>>>>>>>>>>> >>>>>>>>>>>> stop producers. Tens of minutes is a very optimistic estimate >>>>>>>>>>>> here. >>>>>>>>>>>> >>>>>>>>>>>>> Many >>>>>>>>>>>>> people can not afford copy full steam and therefore will have >>>>>>>>>>>>> some >>>>>>>>>>>>> rate >>>>>>>>>>>>> limiting in place, this can bump the timespan into the day's. >>>>>>>>>>>>> The >>>>>>>>>>>>> good >>>>>>>>>>>>> >>>>>>>>>>>>> part >>>>>>>>>>>>> >>>>>>>>>>>>> is that the vast majority of the data can be copied while the >>>>>>>>>>>>> >>>>>>>>>>>> producers >>>>>>>>>>>> >>>>>>>>>>>> are >>>>>>>>>>>> >>>>>>>>>>>>> still going. One can then, piggyback the consumers ontop of >>>>>>>>>>>>> this >>>>>>>>>>>>> >>>>>>>>>>>> timeframe, >>>>>>>>>>>> >>>>>>>>>>>>> by the method mentioned (provide them an mapping from their old >>>>>>>>>>>>> >>>>>>>>>>>> offsets >>>>>>>>>>>> >>>>>>>>>>>> to >>>>>>>>>>>> >>>>>>>>>>>>> new offsets in their repartitioned topics. In that way we >>>>>>>>>>>>> separate >>>>>>>>>>>>> >>>>>>>>>>>> migration of consumers from migration of producers (decoupling >>>>>>>>>>>> >>>>>>>>>>>>> these >>>>>>>>>>>>> is >>>>>>>>>>>>> what kafka is strongest at). The time to actually swap over the >>>>>>>>>>>>> producers >>>>>>>>>>>>> should be kept minimal by ensuring that when a swap attempt is >>>>>>>>>>>>> started >>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> consumer copying over should be very close to the log end and >>>>>>>>>>>>> is >>>>>>>>>>>>> >>>>>>>>>>>> expected >>>>>>>>>>>> >>>>>>>>>>>>> to finish within the next fetch. The operation should have a >>>>>>>>>>>>> time-out >>>>>>>>>>>>> and >>>>>>>>>>>>> should be "reattemtable". >>>>>>>>>>>>> >>>>>>>>>>>>> Importance of logcompaction: >>>>>>>>>>>>> If a producer produces key A, to partiton 0, its forever gonna >>>>>>>>>>>>> be >>>>>>>>>>>>> there, >>>>>>>>>>>>> unless it gets deleted. The record might sit in there for >>>>>>>>>>>>> years. A >>>>>>>>>>>>> new >>>>>>>>>>>>> producer started with the new partitions will fail to delete >>>>>>>>>>>>> the >>>>>>>>>>>>> record >>>>>>>>>>>>> >>>>>>>>>>>>> in >>>>>>>>>>>>> >>>>>>>>>>>>> the correct partition. Th record will be there forever and one >>>>>>>>>>>>> can >>>>>>>>>>>>> >>>>>>>>>>>> not >>>>>>>>>>>> >>>>>>>>>>>> reliable bootstrap new consumers. I cannot see how linear >>>>>>>>>>>> hashing >>>>>>>>>>>> >>>>>>>>>>>>> can >>>>>>>>>>>>> >>>>>>>>>>>>> solve >>>>>>>>>>>>> >>>>>>>>>>>>> this. >>>>>>>>>>>>> >>>>>>>>>>>> Regarding your skipping of userland copying: >>>>>>>>>>>> >>>>>>>>>>>>> 100%, copying the data across in userland is, as far as i can >>>>>>>>>>>>> see, >>>>>>>>>>>>> only >>>>>>>>>>>>> a >>>>>>>>>>>>> usecase for log compacted topics. Even for logcompaction + >>>>>>>>>>>>> retentions >>>>>>>>>>>>> it >>>>>>>>>>>>> should only be opt-in. Why did I bring it up? I think log >>>>>>>>>>>>> compaction >>>>>>>>>>>>> is >>>>>>>>>>>>> a >>>>>>>>>>>>> very important feature to really embrace kafka as a "data >>>>>>>>>>>>> plattform". >>>>>>>>>>>>> The >>>>>>>>>>>>> point I also want to make is that copying data this way is >>>>>>>>>>>>> completely >>>>>>>>>>>>> inline with the kafka architecture. it only consists of reading >>>>>>>>>>>>> and >>>>>>>>>>>>> >>>>>>>>>>>>> writing >>>>>>>>>>>>> >>>>>>>>>>>>> to topics. >>>>>>>>>>>>> >>>>>>>>>>>> I hope it clarifies more why I think we should aim for more than >>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>>>>>>> current KIP. I fear that once the KIP is done not much more >>>>>>>>>>>>> effort >>>>>>>>>>>>> will >>>>>>>>>>>>> >>>>>>>>>>>>> be >>>>>>>>>>>>> >>>>>>>>>>>>> taken. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On 04.03.2018 02:28, Dong Lin wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Hey Jan, >>>>>>>>>>>>> >>>>>>>>>>>>> In the current proposal, the consumer will be blocked on >>>>>>>>>>>>> waiting >>>>>>>>>>>>> for >>>>>>>>>>>>> >>>>>>>>>>>>> other >>>>>>>>>>>>>> >>>>>>>>>>>>>> consumers of the group to consume up to a given offset. In >>>>>>>>>>>>>> most >>>>>>>>>>>>>> >>>>>>>>>>>>> cases, >>>>>>>>>>>>> all >>>>>>>>>>>>> consumers should be close to the LEO of the partitions when the >>>>>>>>>>>>> partition >>>>>>>>>>>>> expansion happens. Thus the time waiting should not be long >>>>>>>>>>>>> e.g. >>>>>>>>>>>>> on >>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> order of seconds. On the other hand, it may take a long time to >>>>>>>>>>>>> wait >>>>>>>>>>>>> >>>>>>>>>>>>> for >>>>>>>>>>>>>> the entire partition to be copied -- the amount of time is >>>>>>>>>>>>>> proportional >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> the amount of existing data in the partition, which can take >>>>>>>>>>>>>> >>>>>>>>>>>>> tens of >>>>>>>>>>>>> >>>>>>>>>>>>> minutes. So the amount of time that we stop consumers may not >>>>>>>>>>>>> be >>>>>>>>>>>>> on >>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>>>>>>>> same order of magnitude. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If we can implement this suggestion without copying data over >>>>>>>>>>>>>> in >>>>>>>>>>>>>> purse >>>>>>>>>>>>>> userland, it will be much more valuable. Do you have ideas on >>>>>>>>>>>>>> how >>>>>>>>>>>>>> this >>>>>>>>>>>>>> >>>>>>>>>>>>>> can >>>>>>>>>>>>>> >>>>>>>>>>>>>> be done? >>>>>>>>>>>>>> >>>>>>>>>>>>> Not sure why the current KIP not help people who depend on log >>>>>>>>>>>>> >>>>>>>>>>>>> compaction. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Could you elaborate more on this point? >>>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> Dong >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan >>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago. >>>>>>>>>>>>>> com >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Dong, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I tried to focus on what the steps are one can currently >>>>>>>>>>>>>> perform >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> expand >>>>>>>>>>>>>> >>>>>>>>>>>>>>> or shrink a keyed topic while maintaining a top notch >>>>>>>>>>>>>>> semantics. >>>>>>>>>>>>>>> I can understand that there might be confusion about >>>>>>>>>>>>>>> "stopping >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> consumer". It is exactly the same as proposed in the KIP. >>>>>>>>>>>>>>> there >>>>>>>>>>>>>>> needs >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> be >>>>>>>>>>>>>>> >>>>>>>>>>>>>> a time the producers agree on the new partitioning. The extra >>>>>>>>>>>>> >>>>>>>>>>>>> semantics I >>>>>>>>>>>>>> >>>>>>>>>>>>>>> want to put in there is that we have a possibility to wait >>>>>>>>>>>>>>> until >>>>>>>>>>>>>>> >>>>>>>>>>>>>> all >>>>>>>>>>>>> >>>>>>>>>>>>> the >>>>>>>>>>>>> >>>>>>>>>>>>> existing data >>>>>>>>>>>>>> >>>>>>>>>>>>>> is copied over into the new partitioning scheme. When I say >>>>>>>>>>>>> stopping >>>>>>>>>>>>> >>>>>>>>>>>>> I >>>>>>>>>>>>>> >>>>>>>>>>>>>>> think more of having a memory barrier that ensures the >>>>>>>>>>>>>>> ordering. I >>>>>>>>>>>>>>> am >>>>>>>>>>>>>>> still >>>>>>>>>>>>>>> aming for latencies on the scale of leader failovers. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Consumers have to explicitly adapt the new partitioning >>>>>>>>>>>>>>> scheme >>>>>>>>>>>>>>> in >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> above scenario. The reason is that in these cases where you >>>>>>>>>>>>>>> are >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> dependent >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> on a particular partitioning scheme, you also have other >>>>>>>>>>>>>>> topics >>>>>>>>>>>>>>> >>>>>>>>>>>>>> that >>>>>>>>>>>>> >>>>>>>>>>>>> have >>>>>>>>>>>>> >>>>>>>>>>>>> co-partition enforcements or the kind -frequently. Therefore >>>>>>>>>>>>>> all >>>>>>>>>>>>>> >>>>>>>>>>>>>> your >>>>>>>>>>>>> >>>>>>>>>>>>> other >>>>>>>>>>>>> >>>>>>>>>>>>> input topics might need to grow accordingly. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What I was suggesting was to streamline all these operations >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>> best >>>>>>>>>>>>>>> as >>>>>>>>>>>>>>> possible to have "real" partition grow and shrinkage going >>>>>>>>>>>>>>> on. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Migrating >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> the producers to a new partitioning scheme can be much more >>>>>>>>>>>>>>> >>>>>>>>>>>>>> streamlined >>>>>>>>>>>>> >>>>>>>>>>>>> with proper broker support for this. Migrating consumer is a >>>>>>>>>>>>> step >>>>>>>>>>>>> >>>>>>>>>>>>> that >>>>>>>>>>>>>> >>>>>>>>>>>>>>> might be made completly unnecessary if - for example streams >>>>>>>>>>>>>>> - >>>>>>>>>>>>>>> takes >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. >>>>>>>>>>>>>>> Connect >>>>>>>>>>>>>>> >>>>>>>>>>>>>> consumers >>>>>>>>>>>>> >>>>>>>>>>>>> and other consumers should be fine anyways. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I hope this makes more clear where I was aiming at. The rest >>>>>>>>>>>>> needs >>>>>>>>>>>>> >>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>>> be >>>>>>>>>>>>>> >>>>>>>>>>>>>>> figured out. The only danger i see is that when we are >>>>>>>>>>>>>>> >>>>>>>>>>>>>> introducing >>>>>>>>>>>>>> >>>>>>>>>>>>>> this >>>>>>>>>>>>> >>>>>>>>>>>>> feature as supposed in the KIP, it wont help any people >>>>>>>>>>>>> depending >>>>>>>>>>>>> on >>>>>>>>>>>>> >>>>>>>>>>>>> log >>>>>>>>>>>>>> >>>>>>>>>>>>>>> compaction. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> The other thing I wanted to mention is that I believe the >>>>>>>>>>>>> current >>>>>>>>>>>>> >>>>>>>>>>>>> suggestion (without copying data over) can be implemented in >>>>>>>>>>>>>> pure >>>>>>>>>>>>>> >>>>>>>>>>>>>>> userland >>>>>>>>>>>>>>> with a custom partitioner and a small feedbackloop from >>>>>>>>>>>>>>> ProduceResponse >>>>>>>>>>>>>>> => >>>>>>>>>>>>>>> Partitionier in coorporation with a change management system. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hey Jan, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am not sure if it is acceptable for producer to be stopped >>>>>>>>>>>>>>> for a >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> while, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> particularly for online application which requires low >>>>>>>>>>>>>>>> latency. I >>>>>>>>>>>>>>>> am >>>>>>>>>>>>>>>> also >>>>>>>>>>>>>>>> not sure how consumers can switch to a new topic. Does user >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> application >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> needs to explicitly specify a different topic for >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> producer/consumer >>>>>>>>>>>>>> >>>>>>>>>>>>>> to >>>>>>>>>>>>>> >>>>>>>>>>>>> subscribe to? It will be helpful for discussion if you can >>>>>>>>>>>>> provide >>>>>>>>>>>>> >>>>>>>>>>>>> more >>>>>>>>>>>>>> >>>>>>>>>>>>>>> detail on the interface change for this solution. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dong >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan >>>>>>>>>>>>>>>> Filipiak<Jan.Filipiak@trivago. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> com >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> just want to throw my though in. In general the functionality >>>>>>>>>>>>>> is >>>>>>>>>>>>>> >>>>>>>>>>>>>>> very >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> usefull, we should though not try to find the architecture >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> hard >>>>>>>>>>>>>>>>> while >>>>>>>>>>>>>>>>> implementing. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The manual steps would be to >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> create a new topic >>>>>>>>>>>>>>>>> the mirrormake from the new old topic to the new topic >>>>>>>>>>>>>>>>> wait for mirror making to catch up. >>>>>>>>>>>>>>>>> then put the consumers onto the new topic >>>>>>>>>>>>>>>>> (having mirrormaker spit out a mapping from old >>>>>>>>>>>>>>>>> offsets to >>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>> offsets: >>>>>>>>>>>>>>>>> if topic is increased by factor X there is >>>>>>>>>>>>>>>>> gonna >>>>>>>>>>>>>>>>> be a >>>>>>>>>>>>>>>>> clean >>>>>>>>>>>>>>>>> mapping from 1 offset in the old topic to X offsets in the >>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>> topic, >>>>>>>>>>>>>>>>> if there is no factor then there is no >>>>>>>>>>>>>>>>> chance to >>>>>>>>>>>>>>>>> generate a >>>>>>>>>>>>>>>>> mapping that can be reasonable used for continuing) >>>>>>>>>>>>>>>>> make consumers stop at appropriate points and >>>>>>>>>>>>>>>>> continue >>>>>>>>>>>>>>>>> consumption >>>>>>>>>>>>>>>>> with offsets from the mapping. >>>>>>>>>>>>>>>>> have the producers stop for a minimal time. >>>>>>>>>>>>>>>>> wait for mirrormaker to finish >>>>>>>>>>>>>>>>> let producer produce with the new metadata. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Instead of implementing the approach suggest in the KIP >>>>>>>>>>>>>>>>> which >>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> leave >>>>>>>>>>>>>>>>> log compacted topic completely crumbled and unusable. >>>>>>>>>>>>>>>>> I would much rather try to build infrastructure to support >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> mentioned >>>>>>>>>>>>>>>>> above operations more smoothly. >>>>>>>>>>>>>>>>> Especially having producers stop and use another topic is >>>>>>>>>>>>>>>>> difficult >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> it would be nice if one can trigger "invalid metadata" >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> exceptions >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>> >>>>>>>>>>>>>> them >>>>>>>>>>>>> >>>>>>>>>>>>> and >>>>>>>>>>>>>> >>>>>>>>>>>>>>> if one could give topics aliases so that their produces with >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> old >>>>>>>>>>>>>>>>> topic >>>>>>>>>>>>>>>>> will arrive in the new topic. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The downsides are obvious I guess ( having the same data >>>>>>>>>>>>>>>>> twice >>>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> transition period, but kafka tends to scale well with >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> datasize). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> its a >>>>>>>>>>>>>>> >>>>>>>>>>>>>> nicer fit into the architecture. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> I further want to argument that the functionality by the KIP >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>> completely be implementing in "userland" with a custom >>>>>>>>>>>>>>>>> partitioner >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> handles the transition as needed. I would appreciate if >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> someone >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>> >>>>>>>>>>>>>> point >>>>>>>>>>>>> >>>>>>>>>>>>> out what a custom partitioner couldn't handle in this case? >>>>>>>>>>>>>> >>>>>>>>>>>>>>> With the above approach, shrinking a topic becomes the same >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> steps. >>>>>>>>>>>>>>>>> Without >>>>>>>>>>>>>>>>> loosing keys in the discontinued partitions. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Would love to hear what everyone thinks. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best Jan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have created KIP-253: Support in-order message delivery >>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> partition >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> expansion. See >>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confl >>>>>>>>>>>>>>>>>> uence/display/KAFKA/KIP-253% >>>>>>>>>>>>>>>>>> 3A+Support+in-order+message+de >>>>>>>>>>>>>>>>>> livery+with+partition+expansio >>>>>>>>>>>>>>>>>> n >>>>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This KIP provides a way to allow messages of the same key >>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> same >>>>>>>>>>>>>>>>>> producer to be consumed in the same order they are >>>>>>>>>>>>>>>>>> produced >>>>>>>>>>>>>>>>>> even >>>>>>>>>>>>>>>>>> if >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> expand partition of the topic. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Dong >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>> >> >> >