As I just mentioned joins: For Kafka Streams it might also be required to change the partition count for multiple topics in a coordinated way that allows to maintain the co-partitioning property that Kafka Streams uses to computed joins.
Any thoughts how this could be handled? -Matthias On 3/8/18 10:08 PM, Matthias J. Sax wrote: > Jun, > > There is one more case: non-windowed aggregations. For windowed > aggregation, the changelog topic will be compact+delete. However, for > non-windowed aggregation the policy is compact only. > > Even if we assume that windowed aggregations are dominant and > non-windowed aggregation are used rarely, it seems to be bad to not > support the feature is a non-windowed aggregation is used. Also, > non-windowed aggregation volume depends on input-stream volume that > might be high. > > Furthermore, we support stream-table join and this requires that the > stream and the table are co-partitioned. Thus, even if the table would > have lower volume but the stream must be scaled out, we also need to > scale out the table to preserve co-partitioning. > > > -Matthias > > On 3/8/18 6:44 PM, Jun Rao wrote: >> Hi, Matthis, >> >> My understanding is that in KStream, the only case when a changelog topic >> needs to be compacted is when the corresponding input is a KTable. In all >> other cases, the changelog topics are of compacted + deletion. So, if most >> KTables are not high volume, there may not be a need to expand its >> partitions and therefore the partitions of the corresponding changelog >> topic. >> >> Thanks, >> >> Jun >> >> On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Jun, >>> >>> thanks for your comment. This should actually work for Streams, because >>> we don't rely on producer "hashing" but specify the partition number >>> explicitly on send(). >>> >>> About not allowing to change the number of partition for changelog >>> topics: for Streams, this seems to imply that we need to create a second >>> changelog topic for each store with the new partition count. However, it >>> would be unclear when/if we can delete the old topic. Thus, it moves the >>> "problem" into the application layer. It's hard to judge for me atm what >>> the impact would be, but it's something we should pay attention to. >>> >>> >>> -Matthias >>> >>> On 3/6/18 3:45 PM, Jun Rao wrote: >>>> Hi, Mattias, >>>> >>>> Regarding your comment "If it would be time-delay based, it might be >>>> problematic >>>> for Kafka Streams: if we get the information that the new input >>> partitions >>>> are available for producing, we need to enable the new changelog >>> partitions >>>> for producing, too. If those would not be available yet, because the >>>> time-delay did not trigger yet, it would be problematic to avoid >>>> crashing.", could you just enable the changelog topic to write to its new >>>> partitions immediately? The input topic can be configured with a delay >>> in >>>> writing to the new partitions. Initially, there won't be new data >>> produced >>>> into the newly added partitions in the input topic. However, we could >>>> prebuild the state for the new input partition and write the state >>> changes >>>> to the corresponding new partitions in the changelog topic. >>>> >>>> Hi, Jan, >>>> >>>> For a compacted topic, garbage collecting the old keys in the existing >>>> partitions after partition expansion can be tricky as your pointed out. A >>>> few options here. (a) Let brokers exchange keys across brokers during >>>> compaction. This will add complexity on the broker side. (b) Build an >>>> external tool that scans the compacted topic and drop the prefix of a >>>> partition if all records in the prefix are removable. The admin can then >>>> run this tool when the unneeded space needs to be reclaimed. (c) Don't >>>> support partition change in a compacted topic. This might be ok since >>> most >>>> compacted topics are not high volume. >>>> >>>> Thanks, >>>> >>>> Jun >>>> >>>> >>>> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com> wrote: >>>> >>>>> Hi everyone, >>>>> >>>>> Thanks for all the comments! It appears that everyone prefers linear >>>>> hashing because it reduces the amount of state that needs to be moved >>>>> between consumers (for stream processing). The KIP has been updated to >>> use >>>>> linear hashing. >>>>> >>>>> Regarding the migration endeavor: it seems that migrating producer >>> library >>>>> to use linear hashing should be pretty straightforward without >>>>> much operational endeavor. If we don't upgrade client library to use >>> this >>>>> KIP, we can not support in-order delivery after partition is changed >>>>> anyway. Suppose we upgrade client library to use this KIP, if partition >>>>> number is not changed, the key -> partition mapping will be exactly the >>>>> same as it is now because it is still determined using murmur_hash(key) >>> % >>>>> original_partition_num. In other words, this change is backward >>> compatible. >>>>> >>>>> Regarding the load distribution: if we use linear hashing, the load may >>> be >>>>> unevenly distributed because those partitions which are not split may >>>>> receive twice as much traffic as other partitions that are split. This >>>>> issue can be mitigated by creating topic with partitions that are >>> several >>>>> times the number of consumers. And there will be no imbalance if the >>>>> partition number is always doubled. So this imbalance seems acceptable. >>>>> >>>>> Regarding storing the partition strategy as per-topic config: It seems >>> not >>>>> necessary since we can still use murmur_hash as the default hash >>> function >>>>> and additionally apply the linear hashing algorithm if the partition >>> number >>>>> has increased. Not sure if there is any use-case for producer to use a >>>>> different hash function. Jason, can you check if there is some use-case >>>>> that I missed for using the per-topic partition strategy? >>>>> >>>>> Regarding how to reduce latency (due to state store/load) in stream >>>>> processing consumer when partition number changes: I need to read the >>> Kafka >>>>> Stream code to understand how Kafka Stream currently migrate state >>> between >>>>> consumers when the application is added/removed for a given job. I will >>>>> reply after I finish reading the documentation and code. >>>>> >>>>> >>>>> Thanks, >>>>> Dong >>>>> >>>>> >>>>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io> >>>>> wrote: >>>>> >>>>>> Great discussion. I think I'm wondering whether we can continue to >>> leave >>>>>> Kafka agnostic to the partitioning strategy. The challenge is >>>>> communicating >>>>>> the partitioning logic from producers to consumers so that the >>>>> dependencies >>>>>> between each epoch can be determined. For the sake of discussion, >>> imagine >>>>>> you did something like the following: >>>>>> >>>>>> 1. The name (and perhaps version) of a partitioning strategy is stored >>> in >>>>>> topic configuration when a topic is created. >>>>>> 2. The producer looks up the partitioning strategy before writing to a >>>>>> topic and includes it in the produce request (for fencing). If it >>> doesn't >>>>>> have an implementation for the configured strategy, it fails. >>>>>> 3. The consumer also looks up the partitioning strategy and uses it to >>>>>> determine dependencies when reading a new epoch. It could either fail >>> or >>>>>> make the most conservative dependency assumptions if it doesn't know >>> how >>>>> to >>>>>> implement the partitioning strategy. For the consumer, the new >>> interface >>>>>> might look something like this: >>>>>> >>>>>> // Return the partition dependencies following an epoch bump >>>>>> Map<Integer, List<Integer>> dependencies(int >>>>> numPartitionsBeforeEpochBump, >>>>>> int numPartitionsAfterEpochBump) >>>>>> >>>>>> The unordered case then is just a particular implementation which never >>>>> has >>>>>> any epoch dependencies. To implement this, we would need some way for >>> the >>>>>> consumer to find out how many partitions there were in each epoch, but >>>>>> maybe that's not too unreasonable. >>>>>> >>>>>> Thanks, >>>>>> Jason >>>>>> >>>>>> >>>>>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com >>>> >>>>>> wrote: >>>>>> >>>>>>> Hi Dong >>>>>>> >>>>>>> thank you very much for your questions. >>>>>>> >>>>>>> regarding the time spend copying data across: >>>>>>> It is correct that copying data from a topic with one partition >>> mapping >>>>>> to >>>>>>> a topic with a different partition mapping takes way longer than we >>> can >>>>>>> stop producers. Tens of minutes is a very optimistic estimate here. >>>>> Many >>>>>>> people can not afford copy full steam and therefore will have some >>> rate >>>>>>> limiting in place, this can bump the timespan into the day's. The good >>>>>> part >>>>>>> is that the vast majority of the data can be copied while the >>> producers >>>>>> are >>>>>>> still going. One can then, piggyback the consumers ontop of this >>>>>> timeframe, >>>>>>> by the method mentioned (provide them an mapping from their old >>> offsets >>>>>> to >>>>>>> new offsets in their repartitioned topics. In that way we separate >>>>>>> migration of consumers from migration of producers (decoupling these >>> is >>>>>>> what kafka is strongest at). The time to actually swap over the >>>>> producers >>>>>>> should be kept minimal by ensuring that when a swap attempt is started >>>>>> the >>>>>>> consumer copying over should be very close to the log end and is >>>>> expected >>>>>>> to finish within the next fetch. The operation should have a time-out >>>>> and >>>>>>> should be "reattemtable". >>>>>>> >>>>>>> Importance of logcompaction: >>>>>>> If a producer produces key A, to partiton 0, its forever gonna be >>>>> there, >>>>>>> unless it gets deleted. The record might sit in there for years. A new >>>>>>> producer started with the new partitions will fail to delete the >>> record >>>>>> in >>>>>>> the correct partition. Th record will be there forever and one can not >>>>>>> reliable bootstrap new consumers. I cannot see how linear hashing can >>>>>> solve >>>>>>> this. >>>>>>> >>>>>>> Regarding your skipping of userland copying: >>>>>>> 100%, copying the data across in userland is, as far as i can see, >>>>> only a >>>>>>> usecase for log compacted topics. Even for logcompaction + retentions >>>>> it >>>>>>> should only be opt-in. Why did I bring it up? I think log compaction >>>>> is a >>>>>>> very important feature to really embrace kafka as a "data plattform". >>>>> The >>>>>>> point I also want to make is that copying data this way is completely >>>>>>> inline with the kafka architecture. it only consists of reading and >>>>>> writing >>>>>>> to topics. >>>>>>> >>>>>>> I hope it clarifies more why I think we should aim for more than the >>>>>>> current KIP. I fear that once the KIP is done not much more effort >>> will >>>>>> be >>>>>>> taken. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On 04.03.2018 02:28, Dong Lin wrote: >>>>>>> >>>>>>>> Hey Jan, >>>>>>>> >>>>>>>> In the current proposal, the consumer will be blocked on waiting for >>>>>> other >>>>>>>> consumers of the group to consume up to a given offset. In most >>> cases, >>>>>> all >>>>>>>> consumers should be close to the LEO of the partitions when the >>>>>> partition >>>>>>>> expansion happens. Thus the time waiting should not be long e.g. on >>>>> the >>>>>>>> order of seconds. On the other hand, it may take a long time to wait >>>>> for >>>>>>>> the entire partition to be copied -- the amount of time is >>>>> proportional >>>>>> to >>>>>>>> the amount of existing data in the partition, which can take tens of >>>>>>>> minutes. So the amount of time that we stop consumers may not be on >>>>> the >>>>>>>> same order of magnitude. >>>>>>>> >>>>>>>> If we can implement this suggestion without copying data over in >>> purse >>>>>>>> userland, it will be much more valuable. Do you have ideas on how >>> this >>>>>> can >>>>>>>> be done? >>>>>>>> >>>>>>>> Not sure why the current KIP not help people who depend on log >>>>>> compaction. >>>>>>>> Could you elaborate more on this point? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Dong >>>>>>>> >>>>>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago. >>>>> com >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Dong, >>>>>>>>> >>>>>>>>> I tried to focus on what the steps are one can currently perform to >>>>>>>>> expand >>>>>>>>> or shrink a keyed topic while maintaining a top notch semantics. >>>>>>>>> I can understand that there might be confusion about "stopping the >>>>>>>>> consumer". It is exactly the same as proposed in the KIP. there >>> needs >>>>>> to >>>>>>>>> be >>>>>>>>> a time the producers agree on the new partitioning. The extra >>>>>> semantics I >>>>>>>>> want to put in there is that we have a possibility to wait until all >>>>>> the >>>>>>>>> existing data >>>>>>>>> is copied over into the new partitioning scheme. When I say stopping >>>>> I >>>>>>>>> think more of having a memory barrier that ensures the ordering. I >>> am >>>>>>>>> still >>>>>>>>> aming for latencies on the scale of leader failovers. >>>>>>>>> >>>>>>>>> Consumers have to explicitly adapt the new partitioning scheme in >>> the >>>>>>>>> above scenario. The reason is that in these cases where you are >>>>>> dependent >>>>>>>>> on a particular partitioning scheme, you also have other topics that >>>>>> have >>>>>>>>> co-partition enforcements or the kind -frequently. Therefore all >>> your >>>>>>>>> other >>>>>>>>> input topics might need to grow accordingly. >>>>>>>>> >>>>>>>>> >>>>>>>>> What I was suggesting was to streamline all these operations as best >>>>> as >>>>>>>>> possible to have "real" partition grow and shrinkage going on. >>>>>> Migrating >>>>>>>>> the producers to a new partitioning scheme can be much more >>>>> streamlined >>>>>>>>> with proper broker support for this. Migrating consumer is a step >>>>> that >>>>>>>>> might be made completly unnecessary if - for example streams - takes >>>>>> the >>>>>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect >>>>>> consumers >>>>>>>>> and other consumers should be fine anyways. >>>>>>>>> >>>>>>>>> I hope this makes more clear where I was aiming at. The rest needs >>> to >>>>>> be >>>>>>>>> figured out. The only danger i see is that when we are introducing >>>>> this >>>>>>>>> feature as supposed in the KIP, it wont help any people depending on >>>>>> log >>>>>>>>> compaction. >>>>>>>>> >>>>>>>>> The other thing I wanted to mention is that I believe the current >>>>>>>>> suggestion (without copying data over) can be implemented in pure >>>>>>>>> userland >>>>>>>>> with a custom partitioner and a small feedbackloop from >>>>> ProduceResponse >>>>>>>>> => >>>>>>>>> Partitionier in coorporation with a change management system. >>>>>>>>> >>>>>>>>> Best Jan >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 28.02.2018 07:13, Dong Lin wrote: >>>>>>>>> >>>>>>>>> Hey Jan, >>>>>>>>>> >>>>>>>>>> I am not sure if it is acceptable for producer to be stopped for a >>>>>>>>>> while, >>>>>>>>>> particularly for online application which requires low latency. I >>> am >>>>>>>>>> also >>>>>>>>>> not sure how consumers can switch to a new topic. Does user >>>>>> application >>>>>>>>>> needs to explicitly specify a different topic for producer/consumer >>>>> to >>>>>>>>>> subscribe to? It will be helpful for discussion if you can provide >>>>>> more >>>>>>>>>> detail on the interface change for this solution. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Dong >>>>>>>>>> >>>>>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan >>> Filipiak<Jan.Filipiak@trivago. >>>>>> com >>>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>>> just want to throw my though in. In general the functionality is >>>>> very >>>>>>>>>>> usefull, we should though not try to find the architecture to hard >>>>>>>>>>> while >>>>>>>>>>> implementing. >>>>>>>>>>> >>>>>>>>>>> The manual steps would be to >>>>>>>>>>> >>>>>>>>>>> create a new topic >>>>>>>>>>> the mirrormake from the new old topic to the new topic >>>>>>>>>>> wait for mirror making to catch up. >>>>>>>>>>> then put the consumers onto the new topic >>>>>>>>>>> (having mirrormaker spit out a mapping from old offsets to >>>>> new >>>>>>>>>>> offsets: >>>>>>>>>>> if topic is increased by factor X there is gonna be a >>>>> clean >>>>>>>>>>> mapping from 1 offset in the old topic to X offsets in the new >>>>> topic, >>>>>>>>>>> if there is no factor then there is no chance to >>>>> generate a >>>>>>>>>>> mapping that can be reasonable used for continuing) >>>>>>>>>>> make consumers stop at appropriate points and continue >>>>>>>>>>> consumption >>>>>>>>>>> with offsets from the mapping. >>>>>>>>>>> have the producers stop for a minimal time. >>>>>>>>>>> wait for mirrormaker to finish >>>>>>>>>>> let producer produce with the new metadata. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Instead of implementing the approach suggest in the KIP which will >>>>>>>>>>> leave >>>>>>>>>>> log compacted topic completely crumbled and unusable. >>>>>>>>>>> I would much rather try to build infrastructure to support the >>>>>>>>>>> mentioned >>>>>>>>>>> above operations more smoothly. >>>>>>>>>>> Especially having producers stop and use another topic is >>> difficult >>>>>> and >>>>>>>>>>> it would be nice if one can trigger "invalid metadata" exceptions >>>>> for >>>>>>>>>>> them >>>>>>>>>>> and >>>>>>>>>>> if one could give topics aliases so that their produces with the >>>>> old >>>>>>>>>>> topic >>>>>>>>>>> will arrive in the new topic. >>>>>>>>>>> >>>>>>>>>>> The downsides are obvious I guess ( having the same data twice for >>>>>> the >>>>>>>>>>> transition period, but kafka tends to scale well with datasize). >>> So >>>>>>>>>>> its a >>>>>>>>>>> nicer fit into the architecture. >>>>>>>>>>> >>>>>>>>>>> I further want to argument that the functionality by the KIP can >>>>>>>>>>> completely be implementing in "userland" with a custom partitioner >>>>>> that >>>>>>>>>>> handles the transition as needed. I would appreciate if someone >>>>> could >>>>>>>>>>> point >>>>>>>>>>> out what a custom partitioner couldn't handle in this case? >>>>>>>>>>> >>>>>>>>>>> With the above approach, shrinking a topic becomes the same steps. >>>>>>>>>>> Without >>>>>>>>>>> loosing keys in the discontinued partitions. >>>>>>>>>>> >>>>>>>>>>> Would love to hear what everyone thinks. >>>>>>>>>>> >>>>>>>>>>> Best Jan >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 11.02.2018 00:35, Dong Lin wrote: >>>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>>> I have created KIP-253: Support in-order message delivery with >>>>>>>>>>>> partition >>>>>>>>>>>> expansion. See >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253% >>>>>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion >>>>>>>>>>>> . >>>>>>>>>>>> >>>>>>>>>>>> This KIP provides a way to allow messages of the same key from >>> the >>>>>>>>>>>> same >>>>>>>>>>>> producer to be consumed in the same order they are produced even >>>>> if >>>>>> we >>>>>>>>>>>> expand partition of the topic. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Dong >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature