Hey Sophie, This looks like a very nice feature. Going through the comments, I agree with Bill above that there could be a case for skew on keys given the earlier partitions would have the data which it already had and get some more. Do you think that's a concern/side-effect that this feature could bring in?
Thanks! Sagar. On Wed, Oct 26, 2022 at 2:15 AM Walker Carlson <wcarl...@confluent.io.invalid> wrote: > Hey Sophie, > > Thanks for the KIP. I think this could be useful for a lot of cases. I also > think that this could cause a lot of confusion. > > Just to make sure we are doing our best to prevent people from > misusing this feature, I wanted to clarify a couple of things. > 1) There will be only an interface and no "default" implementation that a > user can plug in for the static partitioner. I am considering when it comes > to testing we want to make sure that we do not make our testing > implementation avaible to a user. > 2) If a user wanted to use auto scaling for a stateless application it > should be as easy as implementing the StaticStreamsPartitioner. Their > implementation could even just wrap the default partitioner if they wanted, > right? I can't think of any way we could detect and then warn them about > the output topic not being partitioned by keys if that were to happen, can > you? > > Overall this looks good to me! > > Walker > > On Tue, Oct 25, 2022 at 12:27 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > Hi Sophie, > > > > Thanks for the KIP! I think this is a worthwhile feature to add. I have > > two main questions about how this new feature will work. > > > > > > 1. You mention that for stateless applications auto-scaling is a > sticker > > situation. But I was thinking that the auto-scaling would actually > > benefit > > stateless applications the most, let me explain my thinking. Let's > say > > you > > have a stateless Kafka Streams application with one input topic and 2 > > partitions, meaning you're limited to at most 2 stream threads. In > > order > > to increase the throughput, you increase the number of partitions of > the > > source topic to 4, so you can 4 stream threads. In this case would > the > > auto-scaling feature automatically increase the number of tasks from 2 > > to > > 4? Since the application is stateless, say using a filter then a map > > for > > example, the partition for the record doesn't matter, so it seems that > > stateless applications would stand to gain a great deal. > > 2. For stateful applications I can see the immediate benefit from > > autoscaling and static partitioning. But again going with a > partition > > expansion for increased throughput example, what would be the > mitigation > > strategy for a stateful application that eventually wants to take > > advantage > > of the increased number of partitions? Otherwise keeping all keys on > > their > > original partition means you could end up with "key skew" due to not > > allowing keys to distribute out to the new partitions. > > > > One last comment, the KIP states "only the key, rather than the key and > > value, are passed in to the partitioner", but the interface has it > taking a > > key and a value as parameters. Based on your comments earlier in this > > thread I was thinking that the text needs to be updated. > > > > Thanks, > > Bill > > > > On Fri, Oct 21, 2022 at 12:21 PM Lucas Brutschy > > <lbruts...@confluent.io.invalid> wrote: > > > > > Hi all, > > > > > > thanks, Sophie, this makes sense. I suppose then the way to help the > user > > > not apply this in the wrong setting is having good documentation and a > > one > > > or two examples of good use cases. > > > > > > I think Colt's time-based partitioning is a good example of how to use > > > this. It actually doesn't have to be time, the same will work with any > > > monotonically increasing identifier. I.e. the new partitions will only > > get > > > records for users with a "large" user ID greater than some user ID > > > threshold hardcoded in the static partitioner. At least in this > > restricted > > > use-case, lookups by user ID would still be possible. > > > > > > Cheers, > > > Lucas > > > > > > On Fri, Oct 21, 2022 at 5:37 PM Colt McNealy <c...@littlehorse.io> > > wrote: > > > > > > > Sophie, > > > > > > > > Regarding item "3" (my last paragraph from the previous email), > > perhaps I > > > > should give a more general example now that I've had more time to > > clarify > > > > my thoughts: > > > > > > > > In some stateful applications, certain keys have to be findable > without > > > any > > > > information about when the relevant data was created. For example, if > > I'm > > > > running a word-count app and I want to use Interactive Queries to > find > > > the > > > > count for "foo", I would need to know whether "foo" first arrived > > before > > > or > > > > after time T before I could find the correct partition to look up the > > > data. > > > > In this case, I don't think static partitioning is possible. Is this > > > > use-case a non-goal of the KIP, or am I missing something? > > > > > > > > Colt McNealy > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > Thanks for the responses guys! I'll get the easy stuff out of the > way > > > > > first: > > > > > > > > > > 1) Fixed the KIP so that StaticStreamPartitioner extends > > > > StreamPartitioner > > > > > 2) I totally agree with you Colt, the record value might have > > valuable > > > > (no > > > > > pun) information > > > > > in it that is needed to compute the partition without breaking the > > > static > > > > > constraint. As in my > > > > > own example earlier, maybe the userId is a field in the value and > not > > > the > > > > > key itself. Actually > > > > > it was that exact thought that made me do a U-turn on this but I > > forgot > > > > to > > > > > update the thread > > > > > 3) Colt, I'm not sure I follow what you're trying to say in that > > last > > > > > paragraph, can you expand? > > > > > 4) Lucas, it's a good question as to what kind of guard-rails we > > could > > > > put > > > > > up to enforce or even > > > > > detect a violation of static partitioning. Most likely Streams > would > > > need > > > > > to track every key to > > > > > partition mapping in an internal state store, but we have no > > guarantee > > > > the > > > > > key space is bounded > > > > > and the store wouldn't grow out of control. Mostly however I > imagine > > > > users > > > > > would be frustrated > > > > > to find out there's a secret, extra state store taking up space > when > > > you > > > > > enable autoscaling, and > > > > > it's not even to provide functionality but just to make sure users > > > aren't > > > > > doing something wrong. > > > > > > > > > > I wish I had a better idea, but sadly I think the only practical > > > solution > > > > > here is to try and make this > > > > > condition as clear and obvious and easy to understand as possible, > > > > perhaps > > > > > by providing an > > > > > example of what does and does not satisfy the constraint in the > > > javadocs. > > > > > I'll work on that > > > > > 5) I covered a bit above the impracticality of storing a > potentially > > > > > unbounded keyspace, which > > > > > as you mention would need to be shared by all partitioners as well, > > so > > > I > > > > > would agree that this > > > > > feels insurmountable. I'm leaning towards only enabling this > feature > > > for > > > > > the static partitioning > > > > > case at least in the first iteration, and we can see how things go > > from > > > > > there -- for example, are > > > > > people generally able to implement it correctly? If we find that > the > > > > > feature is working well and > > > > > users are hungry for more, then it would be relatively > > straightforward > > > to > > > > > open things up to > > > > > stateless applications, or even stateful applications which can > > > withstand > > > > > some "blips" in the > > > > > logic/correctness. > > > > > > > > > > That said, *technically* the feature would be able to be turned on > > for > > > > any > > > > > such case as it is, since > > > > > as discussed above it's difficult to place true guardrails around > the > > > > > feature that can enforce > > > > > static partitioning. Perhaps we could put a short note in the > > > > > StaticStreamPartitioner docs that > > > > > explain how and when it's safe to break the static requirement, but > > > that > > > > we > > > > > recommend against > > > > > doing so.. > > > > > > > > > > Thoughts? > > > > > > > > > > -Sophie > > > > > > > > > > On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy <c...@littlehorse.io> > > > > wrote: > > > > > > > > > > > Sophie, > > > > > > > > > > > > Thank you for your detailed response. That makes sense (one > > partition > > > > per > > > > > > user seems like a lot of extra metadata if you've got millions of > > > > users, > > > > > > but I'm guessing that was just for illustrative purposes). > > > > > > > > > > > > In this case I'd like to question one small detail in your kip. > The > > > > > > StaticPartitioner takes in just the key and not the value...in an > > > > > > application I've been working on, the "value" is a long-lived > > entity > > > > > > (spanning hundreds of records over several days) that has > timestamp > > > > > > information about the creation of the entity inside of it. The ID > > > > itself > > > > > is > > > > > > provided by the end-user of the system and as such isn't > guaranteed > > > to > > > > > have > > > > > > timestamp info. > > > > > > > > > > > > This is quite a corner case, but if the StaticStreamPartitioner > > > > interface > > > > > > were allowed to peak at the record value, it would be trivial to > > > > > implement > > > > > > logic as follows: > > > > > > ``` > > > > > > entity = deserialize(record.value()) > > > > > > > > > > > > if entity.created_before(T): > > > > > > return hash(key) % old_partitions > > > > > > else: > > > > > > return hash(key) % new_partitions > > > > > > ``` > > > > > > > > > > > > That said, you're a rockstar architect and have seen a lot more > > > system > > > > > > design than I have (I'm 23 and only 3 years out of school...you > > > > > implemented > > > > > > cooperative rebalancing 😀). So don't make that decision unless > you > > > can > > > > > see > > > > > > other use-cases where it is appropriate. > > > > > > > > > > > > Additionally, for my own use-case I'm not sure if static > > partitioning > > > > > alone > > > > > > (as opposed to re-partitioning and re-playing the changelogs into > > new > > > > > > stores) would enable auto-scaleout because my system uses Kafka > > > Streams > > > > > as > > > > > > the data store *and* a secondary index...for example, when a user > > > wants > > > > > to > > > > > > look up all entities where the variable `user_email==f...@bar.com > `, > > > we > > > > > have > > > > > > an index store that has keys partitioned by and prefixed with > > > > > `user_email== > > > > > > f...@bar.com`. Entities with that email (for example) could come > > > before > > > > > or > > > > > > after time T. > > > > > > > > > > > > Anyways, that's just my twopence, if I were a voting committer > I'd > > > vote > > > > > for > > > > > > this KIP as-is. > > > > > > > > > > > > Cheers, > > > > > > Colt McNealy > > > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > > > > > > > On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman > > > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > > > > > Thanks for your questions, I would say that your understanding > > > sounds > > > > > > > correct based > > > > > > > on what you described but I'll try to add some clarity. The > basic > > > > idea > > > > > is > > > > > > > that, as you said, > > > > > > > any keys that are processed before time T will go to partition > 1. > > > All > > > > > of > > > > > > > those keys should > > > > > > > then continue to be routed to partition 1 for the remainder of > > the > > > > > app's > > > > > > > lifetime, if you care > > > > > > > about maintaining correct history/"state" for that key (I'll > come > > > > back > > > > > to > > > > > > > this in the next > > > > > > > paragraph). After the time T, new keys that weren't processed > > prior > > > > to > > > > > T > > > > > > > may be routed to > > > > > > > either partition, provided they are similarly mapped to the > same > > > > > > partition > > > > > > > forever after. It's > > > > > > > up to the user to enforce this, perhaps by trying to keep track > > of > > > > all > > > > > > keys > > > > > > > but that is likely to > > > > > > > be impractical. This feature is generally more targeted at > cases > > > > where > > > > > > the > > > > > > > partition mapping > > > > > > > is "obvious" enough to compute without needing to maintain a > > > history > > > > of > > > > > > all > > > > > > > keys and their > > > > > > > original partition: for example, imagine an application that > > > > processes > > > > > > user > > > > > > > account information. > > > > > > > You can scale out to a partition per user, and add a new > > partition > > > > each > > > > > > > time someone opens > > > > > > > a new account. When they open that account they get a userID > > > number, > > > > > > > starting with #0 and > > > > > > > counting up from there. In that case, the partition for any > > records > > > > > > > pertaining to a given account > > > > > > > would just be its userID. > > > > > > > > > > > > > > I hope that clears up the kind of intended use case we're > > targeting > > > > > with > > > > > > > this feature. That said, > > > > > > > another important and equally viable use case that I neglected > to > > > > > mention > > > > > > > in the KIP is fully > > > > > > > stateless applications. Technically this feature can produce > > > correct > > > > > > > results for applications that > > > > > > > are at least one of (a) statically partitioned, or (b) > completely > > > > > > > stateless. However, the stateless > > > > > > > case is a bit stickier since even if the Streams application > > itself > > > > > > doesn't > > > > > > > care about maintaining > > > > > > > the same mapping of key to partition, it could for example be > > > feeding > > > > > > into > > > > > > > a downstream > > > > > > > application which *does* need to maintain state, and which > would > > > wind > > > > > up > > > > > > > "losing" the history for > > > > > > > any keys that changed partition. > > > > > > > > > > > > > > I kind of felt like opening this feature up to stateless > > > applications > > > > > > would > > > > > > > be asking for trouble and > > > > > > > make it too easy for people to shoot themselves in the foot. > That > > > > said, > > > > > > I'm > > > > > > > open to discussion on > > > > > > > this point if you feel like the benefits here outweigh the > risks. > > > I'm > > > > > > also > > > > > > > happy to consider modifying > > > > > > > the API so that it could naturally be expanded to include > > stateless > > > > > > > applications in the future, even > > > > > > > if we decide against allowing that use case in the first > > iteration > > > of > > > > > the > > > > > > > feature. > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > Sophie > > > > > > > > > > > > > > On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy < > > c...@littlehorse.io> > > > > > > wrote: > > > > > > > > > > > > > > > Sophie, > > > > > > > > > > > > > > > > Thank you for the KIP! Choosing the number of partitions in a > > > > Streams > > > > > > app > > > > > > > > is a tricky task because of how difficult it is to > > re-partition; > > > > I'm > > > > > > glad > > > > > > > > you're working on an improvement. I've got two questions: > > > > > > > > > > > > > > > > First, `StaticStreamsPartitioner` is an interface that we > > > (Streams > > > > > > users) > > > > > > > > must implement, I'm trying to understand how it would work. > For > > > > > > example, > > > > > > > > let's say there's some point in time 'T' before which we > have 1 > > > > > > > partition. > > > > > > > > Then we decide to increase the partition count to 2 at time > T. > > > From > > > > > my > > > > > > > > understanding, all keys that had passed through the Streams > app > > > > > before > > > > > > > time > > > > > > > > T must end up on partition 1 if they appear again in the > input > > > > > topics; > > > > > > > but > > > > > > > > any new keys are allowed to be sent to partition 2. Is that > > > > correct? > > > > > > And > > > > > > > > (pardon the naive question) how is this achieved without > > keeping > > > > > track > > > > > > of > > > > > > > > all keys that have been seen at any point? > > > > > > > > > > > > > > > > Secondly, will this feature work with applications that use > > > > > interactive > > > > > > > > queries? > > > > > > > > > > > > > > > > Thank you very much, > > > > > > > > Colt McNealy > > > > > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman > > > > > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > > > > > I'd like to propose a new autoscaling feature for Kafka > > Streams > > > > > > > > > applications which can follow the constraint of static > > > > > partitioning. > > > > > > > For > > > > > > > > > further details please refer to the KIP document: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams > > > > > > > > > > > > > > > > > > This feature will be targeted for 3.4 but may not be fully > > > > > > implemented > > > > > > > > > until the following release, 3.5. > > > > > > > > > > > > > > > > > > Please give this a read and let me know what you think! > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Sophie > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >