Hi Sophie, Thanks for the KIP! I think this is a worthwhile feature to add. I have two main questions about how this new feature will work.
1. You mention that for stateless applications auto-scaling is a sticker situation. But I was thinking that the auto-scaling would actually benefit stateless applications the most, let me explain my thinking. Let's say you have a stateless Kafka Streams application with one input topic and 2 partitions, meaning you're limited to at most 2 stream threads. In order to increase the throughput, you increase the number of partitions of the source topic to 4, so you can 4 stream threads. In this case would the auto-scaling feature automatically increase the number of tasks from 2 to 4? Since the application is stateless, say using a filter then a map for example, the partition for the record doesn't matter, so it seems that stateless applications would stand to gain a great deal. 2. For stateful applications I can see the immediate benefit from autoscaling and static partitioning. But again going with a partition expansion for increased throughput example, what would be the mitigation strategy for a stateful application that eventually wants to take advantage of the increased number of partitions? Otherwise keeping all keys on their original partition means you could end up with "key skew" due to not allowing keys to distribute out to the new partitions. One last comment, the KIP states "only the key, rather than the key and value, are passed in to the partitioner", but the interface has it taking a key and a value as parameters. Based on your comments earlier in this thread I was thinking that the text needs to be updated. Thanks, Bill On Fri, Oct 21, 2022 at 12:21 PM Lucas Brutschy <lbruts...@confluent.io.invalid> wrote: > Hi all, > > thanks, Sophie, this makes sense. I suppose then the way to help the user > not apply this in the wrong setting is having good documentation and a one > or two examples of good use cases. > > I think Colt's time-based partitioning is a good example of how to use > this. It actually doesn't have to be time, the same will work with any > monotonically increasing identifier. I.e. the new partitions will only get > records for users with a "large" user ID greater than some user ID > threshold hardcoded in the static partitioner. At least in this restricted > use-case, lookups by user ID would still be possible. > > Cheers, > Lucas > > On Fri, Oct 21, 2022 at 5:37 PM Colt McNealy <c...@littlehorse.io> wrote: > > > Sophie, > > > > Regarding item "3" (my last paragraph from the previous email), perhaps I > > should give a more general example now that I've had more time to clarify > > my thoughts: > > > > In some stateful applications, certain keys have to be findable without > any > > information about when the relevant data was created. For example, if I'm > > running a word-count app and I want to use Interactive Queries to find > the > > count for "foo", I would need to know whether "foo" first arrived before > or > > after time T before I could find the correct partition to look up the > data. > > In this case, I don't think static partitioning is possible. Is this > > use-case a non-goal of the KIP, or am I missing something? > > > > Colt McNealy > > *Founder, LittleHorse.io* > > > > > > On Thu, Oct 20, 2022 at 6:37 PM Sophie Blee-Goldman > > <sop...@confluent.io.invalid> wrote: > > > > > Thanks for the responses guys! I'll get the easy stuff out of the way > > > first: > > > > > > 1) Fixed the KIP so that StaticStreamPartitioner extends > > StreamPartitioner > > > 2) I totally agree with you Colt, the record value might have valuable > > (no > > > pun) information > > > in it that is needed to compute the partition without breaking the > static > > > constraint. As in my > > > own example earlier, maybe the userId is a field in the value and not > the > > > key itself. Actually > > > it was that exact thought that made me do a U-turn on this but I forgot > > to > > > update the thread > > > 3) Colt, I'm not sure I follow what you're trying to say in that last > > > paragraph, can you expand? > > > 4) Lucas, it's a good question as to what kind of guard-rails we could > > put > > > up to enforce or even > > > detect a violation of static partitioning. Most likely Streams would > need > > > to track every key to > > > partition mapping in an internal state store, but we have no guarantee > > the > > > key space is bounded > > > and the store wouldn't grow out of control. Mostly however I imagine > > users > > > would be frustrated > > > to find out there's a secret, extra state store taking up space when > you > > > enable autoscaling, and > > > it's not even to provide functionality but just to make sure users > aren't > > > doing something wrong. > > > > > > I wish I had a better idea, but sadly I think the only practical > solution > > > here is to try and make this > > > condition as clear and obvious and easy to understand as possible, > > perhaps > > > by providing an > > > example of what does and does not satisfy the constraint in the > javadocs. > > > I'll work on that > > > 5) I covered a bit above the impracticality of storing a potentially > > > unbounded keyspace, which > > > as you mention would need to be shared by all partitioners as well, so > I > > > would agree that this > > > feels insurmountable. I'm leaning towards only enabling this feature > for > > > the static partitioning > > > case at least in the first iteration, and we can see how things go from > > > there -- for example, are > > > people generally able to implement it correctly? If we find that the > > > feature is working well and > > > users are hungry for more, then it would be relatively straightforward > to > > > open things up to > > > stateless applications, or even stateful applications which can > withstand > > > some "blips" in the > > > logic/correctness. > > > > > > That said, *technically* the feature would be able to be turned on for > > any > > > such case as it is, since > > > as discussed above it's difficult to place true guardrails around the > > > feature that can enforce > > > static partitioning. Perhaps we could put a short note in the > > > StaticStreamPartitioner docs that > > > explain how and when it's safe to break the static requirement, but > that > > we > > > recommend against > > > doing so.. > > > > > > Thoughts? > > > > > > -Sophie > > > > > > On Thu, Oct 20, 2022 at 8:11 AM Colt McNealy <c...@littlehorse.io> > > wrote: > > > > > > > Sophie, > > > > > > > > Thank you for your detailed response. That makes sense (one partition > > per > > > > user seems like a lot of extra metadata if you've got millions of > > users, > > > > but I'm guessing that was just for illustrative purposes). > > > > > > > > In this case I'd like to question one small detail in your kip. The > > > > StaticPartitioner takes in just the key and not the value...in an > > > > application I've been working on, the "value" is a long-lived entity > > > > (spanning hundreds of records over several days) that has timestamp > > > > information about the creation of the entity inside of it. The ID > > itself > > > is > > > > provided by the end-user of the system and as such isn't guaranteed > to > > > have > > > > timestamp info. > > > > > > > > This is quite a corner case, but if the StaticStreamPartitioner > > interface > > > > were allowed to peak at the record value, it would be trivial to > > > implement > > > > logic as follows: > > > > ``` > > > > entity = deserialize(record.value()) > > > > > > > > if entity.created_before(T): > > > > return hash(key) % old_partitions > > > > else: > > > > return hash(key) % new_partitions > > > > ``` > > > > > > > > That said, you're a rockstar architect and have seen a lot more > system > > > > design than I have (I'm 23 and only 3 years out of school...you > > > implemented > > > > cooperative rebalancing 😀). So don't make that decision unless you > can > > > see > > > > other use-cases where it is appropriate. > > > > > > > > Additionally, for my own use-case I'm not sure if static partitioning > > > alone > > > > (as opposed to re-partitioning and re-playing the changelogs into new > > > > stores) would enable auto-scaleout because my system uses Kafka > Streams > > > as > > > > the data store *and* a secondary index...for example, when a user > wants > > > to > > > > look up all entities where the variable `user_email==f...@bar.com`, > we > > > have > > > > an index store that has keys partitioned by and prefixed with > > > `user_email== > > > > f...@bar.com`. Entities with that email (for example) could come > before > > > or > > > > after time T. > > > > > > > > Anyways, that's just my twopence, if I were a voting committer I'd > vote > > > for > > > > this KIP as-is. > > > > > > > > Cheers, > > > > Colt McNealy > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > On Wed, Oct 19, 2022 at 4:07 PM Sophie Blee-Goldman > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > Thanks for your questions, I would say that your understanding > sounds > > > > > correct based > > > > > on what you described but I'll try to add some clarity. The basic > > idea > > > is > > > > > that, as you said, > > > > > any keys that are processed before time T will go to partition 1. > All > > > of > > > > > those keys should > > > > > then continue to be routed to partition 1 for the remainder of the > > > app's > > > > > lifetime, if you care > > > > > about maintaining correct history/"state" for that key (I'll come > > back > > > to > > > > > this in the next > > > > > paragraph). After the time T, new keys that weren't processed prior > > to > > > T > > > > > may be routed to > > > > > either partition, provided they are similarly mapped to the same > > > > partition > > > > > forever after. It's > > > > > up to the user to enforce this, perhaps by trying to keep track of > > all > > > > keys > > > > > but that is likely to > > > > > be impractical. This feature is generally more targeted at cases > > where > > > > the > > > > > partition mapping > > > > > is "obvious" enough to compute without needing to maintain a > history > > of > > > > all > > > > > keys and their > > > > > original partition: for example, imagine an application that > > processes > > > > user > > > > > account information. > > > > > You can scale out to a partition per user, and add a new partition > > each > > > > > time someone opens > > > > > a new account. When they open that account they get a userID > number, > > > > > starting with #0 and > > > > > counting up from there. In that case, the partition for any records > > > > > pertaining to a given account > > > > > would just be its userID. > > > > > > > > > > I hope that clears up the kind of intended use case we're targeting > > > with > > > > > this feature. That said, > > > > > another important and equally viable use case that I neglected to > > > mention > > > > > in the KIP is fully > > > > > stateless applications. Technically this feature can produce > correct > > > > > results for applications that > > > > > are at least one of (a) statically partitioned, or (b) completely > > > > > stateless. However, the stateless > > > > > case is a bit stickier since even if the Streams application itself > > > > doesn't > > > > > care about maintaining > > > > > the same mapping of key to partition, it could for example be > feeding > > > > into > > > > > a downstream > > > > > application which *does* need to maintain state, and which would > wind > > > up > > > > > "losing" the history for > > > > > any keys that changed partition. > > > > > > > > > > I kind of felt like opening this feature up to stateless > applications > > > > would > > > > > be asking for trouble and > > > > > make it too easy for people to shoot themselves in the foot. That > > said, > > > > I'm > > > > > open to discussion on > > > > > this point if you feel like the benefits here outweigh the risks. > I'm > > > > also > > > > > happy to consider modifying > > > > > the API so that it could naturally be expanded to include stateless > > > > > applications in the future, even > > > > > if we decide against allowing that use case in the first iteration > of > > > the > > > > > feature. > > > > > > > > > > Thoughts? > > > > > > > > > > Sophie > > > > > > > > > > On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <c...@littlehorse.io> > > > > wrote: > > > > > > > > > > > Sophie, > > > > > > > > > > > > Thank you for the KIP! Choosing the number of partitions in a > > Streams > > > > app > > > > > > is a tricky task because of how difficult it is to re-partition; > > I'm > > > > glad > > > > > > you're working on an improvement. I've got two questions: > > > > > > > > > > > > First, `StaticStreamsPartitioner` is an interface that we > (Streams > > > > users) > > > > > > must implement, I'm trying to understand how it would work. For > > > > example, > > > > > > let's say there's some point in time 'T' before which we have 1 > > > > > partition. > > > > > > Then we decide to increase the partition count to 2 at time T. > From > > > my > > > > > > understanding, all keys that had passed through the Streams app > > > before > > > > > time > > > > > > T must end up on partition 1 if they appear again in the input > > > topics; > > > > > but > > > > > > any new keys are allowed to be sent to partition 2. Is that > > correct? > > > > And > > > > > > (pardon the naive question) how is this achieved without keeping > > > track > > > > of > > > > > > all keys that have been seen at any point? > > > > > > > > > > > > Secondly, will this feature work with applications that use > > > interactive > > > > > > queries? > > > > > > > > > > > > Thank you very much, > > > > > > Colt McNealy > > > > > > *Founder, LittleHorse.io* > > > > > > > > > > > > > > > > > > On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman > > > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > I'd like to propose a new autoscaling feature for Kafka Streams > > > > > > > applications which can follow the constraint of static > > > partitioning. > > > > > For > > > > > > > further details please refer to the KIP document: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams > > > > > > > > > > > > > > This feature will be targeted for 3.4 but may not be fully > > > > implemented > > > > > > > until the following release, 3.5. > > > > > > > > > > > > > > Please give this a read and let me know what you think! > > > > > > > > > > > > > > Cheers, > > > > > > > Sophie > > > > > > > > > > > > > > > > > > > > > > > > > > > >