Hi Sophie,

This looks like a good improvement (given my limited knowledge, at least).
As I understand it, in the subset of use cases where it can be used, it
will make scaling up the #partitions basically frictionless.

Three questions, and forgive me if something doesn't make sense at all:

1) From my understanding, the most common use-case would be a stateless
implementation of the partitioner. i.e. the partitioner generates the
partition number using some pure function given key and value of the input
record. Then, it needs to be guaranteed from outside the Streams
application, that records for partition #N are only produced after the
number of partitions has grown to N. But you also mentioned, one could also
keep track of all existing keys before the partition expansion - would that
require some kind of shared state among the partitioners? I think it's not
clear to me how one would keep this state. A stateful partitioner would
probably open up the feature to a lot more use cases - even if keeping
track of all keys is impractical, one could maybe identify another
distinguishing feature that is smaller in cardinality.

2) I think StaticStreamPartitioner should extend StreamPartitioner in the
KIP?

3) Is there anything we can / want to do to help users detect incorrect
("non-static") implementations of the StaticStreamPartitioner interface?

Cheers,
Lucas


On Thu, Oct 20, 2022 at 1:07 AM Sophie Blee-Goldman
<sop...@confluent.io.invalid> wrote:

> Thanks for your questions, I would say that your understanding sounds
> correct based
> on what you described but I'll try to add some clarity. The basic idea is
> that, as you said,
> any keys that are processed before time T will go to partition 1. All of
> those keys should
> then continue to be routed to partition 1 for the remainder of the app's
> lifetime, if you care
> about maintaining correct history/"state" for that key (I'll come back to
> this in the next
> paragraph). After the time T, new keys that weren't processed prior to T
> may be routed to
> either partition, provided they are similarly mapped to the same partition
> forever after. It's
> up to the user to enforce this, perhaps by trying to keep track of all keys
> but that is likely to
> be impractical. This feature is generally more targeted at cases where the
> partition mapping
> is "obvious" enough to compute without needing to maintain a history of all
> keys and their
> original partition: for example, imagine an application that processes user
> account information.
> You can scale out to a partition per user, and add a new partition each
> time someone opens
> a new account. When they open that account they get a userID number,
> starting with #0 and
> counting up from there. In that case, the partition for any records
> pertaining to a given account
> would just be its userID.
>
> I hope that clears up the kind of intended use case we're targeting with
> this feature. That said,
> another important and equally viable use case that I neglected to mention
> in the KIP is fully
> stateless applications. Technically this feature can produce correct
> results for applications that
> are at least one of (a) statically partitioned, or (b) completely
> stateless. However, the stateless
> case is a bit stickier since even if the Streams application itself doesn't
> care about maintaining
> the same mapping of key to partition, it could for example be feeding into
> a downstream
> application which *does* need to maintain state, and which would wind up
> "losing" the history for
> any keys that changed partition.
>
> I kind of felt like opening this feature up to stateless applications would
> be asking for trouble and
> make it too easy for people to shoot themselves in the foot. That said, I'm
> open to discussion on
> this point if you feel like the benefits here outweigh the risks. I'm also
> happy to consider modifying
> the API so that it could naturally be expanded to include stateless
> applications  in the future, even
> if we decide against allowing that use case in the first iteration of the
> feature.
>
> Thoughts?
>
> Sophie
>
> On Wed, Oct 19, 2022 at 7:46 AM Colt McNealy <c...@littlehorse.io> wrote:
>
> > Sophie,
> >
> > Thank you for the KIP! Choosing the number of partitions in a Streams app
> > is a tricky task because of how difficult it is to re-partition; I'm glad
> > you're working on an improvement. I've got two questions:
> >
> > First, `StaticStreamsPartitioner` is an interface that we (Streams users)
> > must implement, I'm trying to understand how it would work. For example,
> > let's say there's some point in time 'T' before which we have 1
> partition.
> > Then we decide to increase the partition count to 2 at time T. From my
> > understanding, all keys that had passed through the Streams app before
> time
> > T must end up on partition 1 if they appear again in the input topics;
> but
> > any new keys are allowed to be sent to partition 2. Is that correct? And
> > (pardon the naive question) how is this achieved without keeping track of
> > all keys that have been seen at any point?
> >
> > Secondly, will this feature work with applications that use interactive
> > queries?
> >
> > Thank you very much,
> > Colt McNealy
> > *Founder, LittleHorse.io*
> >
> >
> > On Tue, Oct 18, 2022 at 9:34 PM Sophie Blee-Goldman
> > <sop...@confluent.io.invalid> wrote:
> >
> > > Hey all,
> > >
> > > I'd like to propose a new autoscaling feature for Kafka Streams
> > > applications which can follow the constraint of static partitioning.
> For
> > > further details please refer to the KIP document:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Autoscaling+for+Statically+Partitioned+Streams
> > >
> > > This feature will be targeted for 3.4 but may not be fully implemented
> > > until the following release, 3.5.
> > >
> > > Please give this a read and let me know what you think!
> > >
> > > Cheers,
> > > Sophie
> > >
> >
>

Reply via email to