Jun, There is one more case: non-windowed aggregations. For windowed aggregation, the changelog topic will be compact+delete. However, for non-windowed aggregation the policy is compact only.
Even if we assume that windowed aggregations are dominant and non-windowed aggregation are used rarely, it seems to be bad to not support the feature is a non-windowed aggregation is used. Also, non-windowed aggregation volume depends on input-stream volume that might be high. Furthermore, we support stream-table join and this requires that the stream and the table are co-partitioned. Thus, even if the table would have lower volume but the stream must be scaled out, we also need to scale out the table to preserve co-partitioning. -Matthias On 3/8/18 6:44 PM, Jun Rao wrote: > Hi, Matthis, > > My understanding is that in KStream, the only case when a changelog topic > needs to be compacted is when the corresponding input is a KTable. In all > other cases, the changelog topics are of compacted + deletion. So, if most > KTables are not high volume, there may not be a need to expand its > partitions and therefore the partitions of the corresponding changelog > topic. > > Thanks, > > Jun > > On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Jun, >> >> thanks for your comment. This should actually work for Streams, because >> we don't rely on producer "hashing" but specify the partition number >> explicitly on send(). >> >> About not allowing to change the number of partition for changelog >> topics: for Streams, this seems to imply that we need to create a second >> changelog topic for each store with the new partition count. However, it >> would be unclear when/if we can delete the old topic. Thus, it moves the >> "problem" into the application layer. It's hard to judge for me atm what >> the impact would be, but it's something we should pay attention to. >> >> >> -Matthias >> >> On 3/6/18 3:45 PM, Jun Rao wrote: >>> Hi, Mattias, >>> >>> Regarding your comment "If it would be time-delay based, it might be >>> problematic >>> for Kafka Streams: if we get the information that the new input >> partitions >>> are available for producing, we need to enable the new changelog >> partitions >>> for producing, too. If those would not be available yet, because the >>> time-delay did not trigger yet, it would be problematic to avoid >>> crashing.", could you just enable the changelog topic to write to its new >>> partitions immediately? The input topic can be configured with a delay >> in >>> writing to the new partitions. Initially, there won't be new data >> produced >>> into the newly added partitions in the input topic. However, we could >>> prebuild the state for the new input partition and write the state >> changes >>> to the corresponding new partitions in the changelog topic. >>> >>> Hi, Jan, >>> >>> For a compacted topic, garbage collecting the old keys in the existing >>> partitions after partition expansion can be tricky as your pointed out. A >>> few options here. (a) Let brokers exchange keys across brokers during >>> compaction. This will add complexity on the broker side. (b) Build an >>> external tool that scans the compacted topic and drop the prefix of a >>> partition if all records in the prefix are removable. The admin can then >>> run this tool when the unneeded space needs to be reclaimed. (c) Don't >>> support partition change in a compacted topic. This might be ok since >> most >>> compacted topics are not high volume. >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com> wrote: >>> >>>> Hi everyone, >>>> >>>> Thanks for all the comments! It appears that everyone prefers linear >>>> hashing because it reduces the amount of state that needs to be moved >>>> between consumers (for stream processing). The KIP has been updated to >> use >>>> linear hashing. >>>> >>>> Regarding the migration endeavor: it seems that migrating producer >> library >>>> to use linear hashing should be pretty straightforward without >>>> much operational endeavor. If we don't upgrade client library to use >> this >>>> KIP, we can not support in-order delivery after partition is changed >>>> anyway. Suppose we upgrade client library to use this KIP, if partition >>>> number is not changed, the key -> partition mapping will be exactly the >>>> same as it is now because it is still determined using murmur_hash(key) >> % >>>> original_partition_num. In other words, this change is backward >> compatible. >>>> >>>> Regarding the load distribution: if we use linear hashing, the load may >> be >>>> unevenly distributed because those partitions which are not split may >>>> receive twice as much traffic as other partitions that are split. This >>>> issue can be mitigated by creating topic with partitions that are >> several >>>> times the number of consumers. And there will be no imbalance if the >>>> partition number is always doubled. So this imbalance seems acceptable. >>>> >>>> Regarding storing the partition strategy as per-topic config: It seems >> not >>>> necessary since we can still use murmur_hash as the default hash >> function >>>> and additionally apply the linear hashing algorithm if the partition >> number >>>> has increased. Not sure if there is any use-case for producer to use a >>>> different hash function. Jason, can you check if there is some use-case >>>> that I missed for using the per-topic partition strategy? >>>> >>>> Regarding how to reduce latency (due to state store/load) in stream >>>> processing consumer when partition number changes: I need to read the >> Kafka >>>> Stream code to understand how Kafka Stream currently migrate state >> between >>>> consumers when the application is added/removed for a given job. I will >>>> reply after I finish reading the documentation and code. >>>> >>>> >>>> Thanks, >>>> Dong >>>> >>>> >>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io> >>>> wrote: >>>> >>>>> Great discussion. I think I'm wondering whether we can continue to >> leave >>>>> Kafka agnostic to the partitioning strategy. The challenge is >>>> communicating >>>>> the partitioning logic from producers to consumers so that the >>>> dependencies >>>>> between each epoch can be determined. For the sake of discussion, >> imagine >>>>> you did something like the following: >>>>> >>>>> 1. The name (and perhaps version) of a partitioning strategy is stored >> in >>>>> topic configuration when a topic is created. >>>>> 2. The producer looks up the partitioning strategy before writing to a >>>>> topic and includes it in the produce request (for fencing). If it >> doesn't >>>>> have an implementation for the configured strategy, it fails. >>>>> 3. The consumer also looks up the partitioning strategy and uses it to >>>>> determine dependencies when reading a new epoch. It could either fail >> or >>>>> make the most conservative dependency assumptions if it doesn't know >> how >>>> to >>>>> implement the partitioning strategy. For the consumer, the new >> interface >>>>> might look something like this: >>>>> >>>>> // Return the partition dependencies following an epoch bump >>>>> Map<Integer, List<Integer>> dependencies(int >>>> numPartitionsBeforeEpochBump, >>>>> int numPartitionsAfterEpochBump) >>>>> >>>>> The unordered case then is just a particular implementation which never >>>> has >>>>> any epoch dependencies. To implement this, we would need some way for >> the >>>>> consumer to find out how many partitions there were in each epoch, but >>>>> maybe that's not too unreasonable. >>>>> >>>>> Thanks, >>>>> Jason >>>>> >>>>> >>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com >>> >>>>> wrote: >>>>> >>>>>> Hi Dong >>>>>> >>>>>> thank you very much for your questions. >>>>>> >>>>>> regarding the time spend copying data across: >>>>>> It is correct that copying data from a topic with one partition >> mapping >>>>> to >>>>>> a topic with a different partition mapping takes way longer than we >> can >>>>>> stop producers. Tens of minutes is a very optimistic estimate here. >>>> Many >>>>>> people can not afford copy full steam and therefore will have some >> rate >>>>>> limiting in place, this can bump the timespan into the day's. The good >>>>> part >>>>>> is that the vast majority of the data can be copied while the >> producers >>>>> are >>>>>> still going. One can then, piggyback the consumers ontop of this >>>>> timeframe, >>>>>> by the method mentioned (provide them an mapping from their old >> offsets >>>>> to >>>>>> new offsets in their repartitioned topics. In that way we separate >>>>>> migration of consumers from migration of producers (decoupling these >> is >>>>>> what kafka is strongest at). The time to actually swap over the >>>> producers >>>>>> should be kept minimal by ensuring that when a swap attempt is started >>>>> the >>>>>> consumer copying over should be very close to the log end and is >>>> expected >>>>>> to finish within the next fetch. The operation should have a time-out >>>> and >>>>>> should be "reattemtable". >>>>>> >>>>>> Importance of logcompaction: >>>>>> If a producer produces key A, to partiton 0, its forever gonna be >>>> there, >>>>>> unless it gets deleted. The record might sit in there for years. A new >>>>>> producer started with the new partitions will fail to delete the >> record >>>>> in >>>>>> the correct partition. Th record will be there forever and one can not >>>>>> reliable bootstrap new consumers. I cannot see how linear hashing can >>>>> solve >>>>>> this. >>>>>> >>>>>> Regarding your skipping of userland copying: >>>>>> 100%, copying the data across in userland is, as far as i can see, >>>> only a >>>>>> usecase for log compacted topics. Even for logcompaction + retentions >>>> it >>>>>> should only be opt-in. Why did I bring it up? I think log compaction >>>> is a >>>>>> very important feature to really embrace kafka as a "data plattform". >>>> The >>>>>> point I also want to make is that copying data this way is completely >>>>>> inline with the kafka architecture. it only consists of reading and >>>>> writing >>>>>> to topics. >>>>>> >>>>>> I hope it clarifies more why I think we should aim for more than the >>>>>> current KIP. I fear that once the KIP is done not much more effort >> will >>>>> be >>>>>> taken. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 04.03.2018 02:28, Dong Lin wrote: >>>>>> >>>>>>> Hey Jan, >>>>>>> >>>>>>> In the current proposal, the consumer will be blocked on waiting for >>>>> other >>>>>>> consumers of the group to consume up to a given offset. In most >> cases, >>>>> all >>>>>>> consumers should be close to the LEO of the partitions when the >>>>> partition >>>>>>> expansion happens. Thus the time waiting should not be long e.g. on >>>> the >>>>>>> order of seconds. On the other hand, it may take a long time to wait >>>> for >>>>>>> the entire partition to be copied -- the amount of time is >>>> proportional >>>>> to >>>>>>> the amount of existing data in the partition, which can take tens of >>>>>>> minutes. So the amount of time that we stop consumers may not be on >>>> the >>>>>>> same order of magnitude. >>>>>>> >>>>>>> If we can implement this suggestion without copying data over in >> purse >>>>>>> userland, it will be much more valuable. Do you have ideas on how >> this >>>>> can >>>>>>> be done? >>>>>>> >>>>>>> Not sure why the current KIP not help people who depend on log >>>>> compaction. >>>>>>> Could you elaborate more on this point? >>>>>>> >>>>>>> Thanks, >>>>>>> Dong >>>>>>> >>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago. >>>> com >>>>>> >>>>>>> wrote: >>>>>>> >>>>>>> Hi Dong, >>>>>>>> >>>>>>>> I tried to focus on what the steps are one can currently perform to >>>>>>>> expand >>>>>>>> or shrink a keyed topic while maintaining a top notch semantics. >>>>>>>> I can understand that there might be confusion about "stopping the >>>>>>>> consumer". It is exactly the same as proposed in the KIP. there >> needs >>>>> to >>>>>>>> be >>>>>>>> a time the producers agree on the new partitioning. The extra >>>>> semantics I >>>>>>>> want to put in there is that we have a possibility to wait until all >>>>> the >>>>>>>> existing data >>>>>>>> is copied over into the new partitioning scheme. When I say stopping >>>> I >>>>>>>> think more of having a memory barrier that ensures the ordering. I >> am >>>>>>>> still >>>>>>>> aming for latencies on the scale of leader failovers. >>>>>>>> >>>>>>>> Consumers have to explicitly adapt the new partitioning scheme in >> the >>>>>>>> above scenario. The reason is that in these cases where you are >>>>> dependent >>>>>>>> on a particular partitioning scheme, you also have other topics that >>>>> have >>>>>>>> co-partition enforcements or the kind -frequently. Therefore all >> your >>>>>>>> other >>>>>>>> input topics might need to grow accordingly. >>>>>>>> >>>>>>>> >>>>>>>> What I was suggesting was to streamline all these operations as best >>>> as >>>>>>>> possible to have "real" partition grow and shrinkage going on. >>>>> Migrating >>>>>>>> the producers to a new partitioning scheme can be much more >>>> streamlined >>>>>>>> with proper broker support for this. Migrating consumer is a step >>>> that >>>>>>>> might be made completly unnecessary if - for example streams - takes >>>>> the >>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect >>>>> consumers >>>>>>>> and other consumers should be fine anyways. >>>>>>>> >>>>>>>> I hope this makes more clear where I was aiming at. The rest needs >> to >>>>> be >>>>>>>> figured out. The only danger i see is that when we are introducing >>>> this >>>>>>>> feature as supposed in the KIP, it wont help any people depending on >>>>> log >>>>>>>> compaction. >>>>>>>> >>>>>>>> The other thing I wanted to mention is that I believe the current >>>>>>>> suggestion (without copying data over) can be implemented in pure >>>>>>>> userland >>>>>>>> with a custom partitioner and a small feedbackloop from >>>> ProduceResponse >>>>>>>> => >>>>>>>> Partitionier in coorporation with a change management system. >>>>>>>> >>>>>>>> Best Jan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 28.02.2018 07:13, Dong Lin wrote: >>>>>>>> >>>>>>>> Hey Jan, >>>>>>>>> >>>>>>>>> I am not sure if it is acceptable for producer to be stopped for a >>>>>>>>> while, >>>>>>>>> particularly for online application which requires low latency. I >> am >>>>>>>>> also >>>>>>>>> not sure how consumers can switch to a new topic. Does user >>>>> application >>>>>>>>> needs to explicitly specify a different topic for producer/consumer >>>> to >>>>>>>>> subscribe to? It will be helpful for discussion if you can provide >>>>> more >>>>>>>>> detail on the interface change for this solution. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Dong >>>>>>>>> >>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan >> Filipiak<Jan.Filipiak@trivago. >>>>> com >>>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>>> just want to throw my though in. In general the functionality is >>>> very >>>>>>>>>> usefull, we should though not try to find the architecture to hard >>>>>>>>>> while >>>>>>>>>> implementing. >>>>>>>>>> >>>>>>>>>> The manual steps would be to >>>>>>>>>> >>>>>>>>>> create a new topic >>>>>>>>>> the mirrormake from the new old topic to the new topic >>>>>>>>>> wait for mirror making to catch up. >>>>>>>>>> then put the consumers onto the new topic >>>>>>>>>> (having mirrormaker spit out a mapping from old offsets to >>>> new >>>>>>>>>> offsets: >>>>>>>>>> if topic is increased by factor X there is gonna be a >>>> clean >>>>>>>>>> mapping from 1 offset in the old topic to X offsets in the new >>>> topic, >>>>>>>>>> if there is no factor then there is no chance to >>>> generate a >>>>>>>>>> mapping that can be reasonable used for continuing) >>>>>>>>>> make consumers stop at appropriate points and continue >>>>>>>>>> consumption >>>>>>>>>> with offsets from the mapping. >>>>>>>>>> have the producers stop for a minimal time. >>>>>>>>>> wait for mirrormaker to finish >>>>>>>>>> let producer produce with the new metadata. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Instead of implementing the approach suggest in the KIP which will >>>>>>>>>> leave >>>>>>>>>> log compacted topic completely crumbled and unusable. >>>>>>>>>> I would much rather try to build infrastructure to support the >>>>>>>>>> mentioned >>>>>>>>>> above operations more smoothly. >>>>>>>>>> Especially having producers stop and use another topic is >> difficult >>>>> and >>>>>>>>>> it would be nice if one can trigger "invalid metadata" exceptions >>>> for >>>>>>>>>> them >>>>>>>>>> and >>>>>>>>>> if one could give topics aliases so that their produces with the >>>> old >>>>>>>>>> topic >>>>>>>>>> will arrive in the new topic. >>>>>>>>>> >>>>>>>>>> The downsides are obvious I guess ( having the same data twice for >>>>> the >>>>>>>>>> transition period, but kafka tends to scale well with datasize). >> So >>>>>>>>>> its a >>>>>>>>>> nicer fit into the architecture. >>>>>>>>>> >>>>>>>>>> I further want to argument that the functionality by the KIP can >>>>>>>>>> completely be implementing in "userland" with a custom partitioner >>>>> that >>>>>>>>>> handles the transition as needed. I would appreciate if someone >>>> could >>>>>>>>>> point >>>>>>>>>> out what a custom partitioner couldn't handle in this case? >>>>>>>>>> >>>>>>>>>> With the above approach, shrinking a topic becomes the same steps. >>>>>>>>>> Without >>>>>>>>>> loosing keys in the discontinued partitions. >>>>>>>>>> >>>>>>>>>> Would love to hear what everyone thinks. >>>>>>>>>> >>>>>>>>>> Best Jan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote: >>>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>>> I have created KIP-253: Support in-order message delivery with >>>>>>>>>>> partition >>>>>>>>>>> expansion. See >>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253% >>>>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion >>>>>>>>>>> . >>>>>>>>>>> >>>>>>>>>>> This KIP provides a way to allow messages of the same key from >> the >>>>>>>>>>> same >>>>>>>>>>> producer to be consumed in the same order they are produced even >>>> if >>>>> we >>>>>>>>>>> expand partition of the topic. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Dong >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>> >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature