I think that is a good point about trying to keep producer level configurations and (repartition) topic level considerations separate. Number of partitions is definitely purely a topic level configuration. But on some level, serdes and partitioners are just as much a topic configuration as a producer one. You could have two producers configured with different serdes and/or partitioners, but if they are writing to the same topic the result would be very difficult to part. So in a sense, these are configurations of topics in Streams, not just producers.
Another way to think of it: while the Streams API is not always true to this, ideally all the relevant configs for an operator are wrapped into a single object (in this case, Repartitioned). We could instead split out the fields in common with Produced into a separate parameter to keep topic and producer level configurations separate, but this increases the API surface area by a lot. It's much more straightforward to just say "this is everything that this particular operator needs" without worrying about what exactly you're specifying. I suppose you could alternatively make Produced a field of Repartitioned, but I don't think we do this kind of composition elsewhere in Streams at the moment On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze <levani.co...@gmail.com> wrote: > Hi Bill, > > Thanks a lot for the feedback. > Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner` > configuration. > In the beginning, I wanted to introduce a class for topic level > configuration and keep topic level and producer level configurations (such > as Produced) separately (see my second email in this thread). > But while looking at the semantics of KStream interface, I couldn’t really > figure out good operation name for Topic level configuration class and just > introducing `Topic` config class was kinda breaking the semantics. > So I think having Repartitioned class which encapsulates topic and > producer level configurations for internal topics is viable thing to do. > > Regards, > Levani > > > On Jul 19, 2019, at 7:47 PM, Bill Bejeck <bbej...@gmail.com> wrote: > > > > Hi Lavani, > > > > Thanks for resurrecting this KIP. > > > > I'm also a +1 for adding a partition option. In addition to the reason > > provided by John, my reasoning is: > > > > 1. Users may want to use something other than hash-based partitioning > > 2. Users may wish to partition on something different than the key > > without having to change the key. For example: > > 1. A combination of fields in the value in conjunction with the key > > 2. Something other than the key > > 3. We allow users to specify a partitioner on Produced hence in > > KStream.to and KStream.through, so it makes sense for API consistency. > > > > Just my 2 cents. > > > > Thanks, > > Bill > > > > > > > > On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze < > levani.co...@gmail.com> > > wrote: > > > >> Hi John, > >> > >> In my mind it makes sense. > >> If we add partitioner configuration to Repartitioned class, with the > >> combination of specifying number of partitions for internal topics, user > >> will have opportunity to ensure co-partitioning before join operation. > >> I think this can be quite powerful feature. > >> Wondering what others think about this? > >> > >> Regards, > >> Levani > >> > >>> On Jul 18, 2019, at 1:20 AM, John Roesler <j...@confluent.io> wrote: > >>> > >>> Yes, I believe that's what I had in mind. Again, not totally sure it > >>> makes sense, but I believe something similar is the rationale for > >>> having the partitioner option in Produced. > >>> > >>> Thanks, > >>> -John > >>> > >>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze > >>> <levani.co...@gmail.com> wrote: > >>>> > >>>> Hey John, > >>>> > >>>> Oh that’s interesting use-case. > >>>> Do I understand this correctly, in your example I would first issue > >> repartition(Repartitioned) with proper partitioner that essentially > would > >> be the same as the topic I want to join with and then do the > KStream#join > >> with DSL? > >>>> > >>>> Regards, > >>>> Levani > >>>> > >>>>> On Jul 17, 2019, at 11:11 PM, John Roesler <j...@confluent.io> > wrote: > >>>>> > >>>>> Hey, all, just to chime in, > >>>>> > >>>>> I think it might be useful to have an option to specify the > >>>>> partitioner. The case I have in mind is that some data may get > >>>>> repartitioned and then joined with an input topic. If the right-side > >>>>> input topic uses a custom partitioning strategy, then the > >>>>> repartitioned stream also needs to be partitioned with the same > >>>>> strategy. > >>>>> > >>>>> Does that make sense, or did I maybe miss something important? > >>>>> > >>>>> Thanks, > >>>>> -John > >>>>> > >>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze > >>>>> <levani.co...@gmail.com> wrote: > >>>>>> > >>>>>> Yes, I was thinking about it as well. To be honest I’m not sure > about > >> it yet. > >>>>>> As Kafka Streams DSL user, I don’t really think I would need control > >> over partitioner for internal topics. > >>>>>> As a user, I would assume that Kafka Streams knows best how to > >> partition data for internal topics. > >>>>>> In this KIP I wrote that Produced should be used only for topics > that > >> are created by user In advance. > >>>>>> In those cases maybe it make sense to have possibility to specify > the > >> partitioner. > >>>>>> I don’t have clear answer on that yet, but I guess specifying the > >> partitioner can be added as well if there’s agreement on this. > >>>>>> > >>>>>> Regards, > >>>>>> Levani > >>>>>> > >>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman < > >> sop...@confluent.io> wrote: > >>>>>>> > >>>>>>> Thanks for clearing that up. I agree that Repartitioned would be a > >> useful > >>>>>>> addition. I'm wondering if it might also need to have > >>>>>>> a withStreamPartitioner method/field, similar to Produced? I'm not > >> sure how > >>>>>>> widely this feature is really used, but seems it should be > available > >> for > >>>>>>> repartition topics. > >>>>>>> > >>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze < > >> levani.co...@gmail.com> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hey Sophie, > >>>>>>>> > >>>>>>>> In both cases KStream#repartition and > >> KStream#repartition(Repartitioned) > >>>>>>>> topic will be created and managed by Kafka Streams. > >>>>>>>> Idea of Repartitioned is to give user more control over the topic > >> such as > >>>>>>>> num of partitions. > >>>>>>>> I feel like Repartitioned parameter is something that is missing > in > >>>>>>>> current DSL design. > >>>>>>>> Essentially giving user control over parallelism by configuring > num > >> of > >>>>>>>> partitions for internal topics. > >>>>>>>> > >>>>>>>> Hope this answers your question. > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Levani > >>>>>>>> > >>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman < > >> sop...@confluent.io> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hey Levani, > >>>>>>>>> > >>>>>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the > >>>>>>>>> KStream#repartition signature taking a Repartitioned, will the > >> topic be > >>>>>>>>> auto-created by Streams (which seems to be the case for the > >> signature > >>>>>>>>> without a Repartitioned) or does it have to be pre-created? The > >> wording > >>>>>>>> in > >>>>>>>>> the KIP makes it seem like one version of the method will > >> auto-create > >>>>>>>>> topics while the other will not. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Sophie > >>>>>>>>> > >>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze < > >>>>>>>> levani.co...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hello, > >>>>>>>>>> > >>>>>>>>>> One more bump about KIP-221 ( > >>>>>>>>>> > >>>>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>> ) > >>>>>>>>>> so it doesn’t get lost in mailing list :) > >>>>>>>>>> Would love to hear communities opinions/concerns about this KIP. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Levani > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze < > >> levani.co...@gmail.com > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hello, > >>>>>>>>>>> > >>>>>>>>>>> Kind reminder about this KIP: > >>>>>>>>>> > >>>>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Regards, > >>>>>>>>>>> Levani > >>>>>>>>>>> > >>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze < > >>>>>>>> levani.co...@gmail.com > >>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hello, > >>>>>>>>>>>> > >>>>>>>>>>>> In order to move this KIP forward, I’ve updated confluence > page > >> with > >>>>>>>>>> the new proposal > >>>>>>>>>> > >>>>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint > >>>>>>>>>>> > >>>>>>>>>>>> I’ve also filled “Rejected Alternatives” section. > >>>>>>>>>>>> > >>>>>>>>>>>> Looking forward to discuss this KIP :) > >>>>>>>>>>>> > >>>>>>>>>>>> King regards, > >>>>>>>>>>>> Levani > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze < > >>>>>>>> levani.co...@gmail.com > >>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hello Matthias, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the feedback and ideas. > >>>>>>>>>>>>> I like the idea of introducing dedicated `Topic` class for > >> topic > >>>>>>>>>> configuration for internal operators like `groupedBy`. > >>>>>>>>>>>>> Would be great to hear others opinion about this as well. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Kind regards, > >>>>>>>>>>>>> Levani > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax < > >> matth...@confluent.io > >>>>>>>>>> <mailto:matth...@confluent.io>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Levani, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for picking up this KIP! And thanks for summarizing > >>>>>>>> everything. > >>>>>>>>>>>>>> Even if some points may have been discussed already (can't > >> really > >>>>>>>>>>>>>> remember), it's helpful to get a good summary to refresh the > >>>>>>>>>> discussion. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I think your reasoning makes sense. With regard to the > >> distinction > >>>>>>>>>>>>>> between operators that manage topics and operators that use > >>>>>>>>>> user-created > >>>>>>>>>>>>>> topics: Following this argument, it might indicate that > >> leaving > >>>>>>>>>>>>>> `through()` as-is (as an operator that uses use-defined > >> topics) and > >>>>>>>>>>>>>> introducing a new `repartition()` operator (an operator that > >> manages > >>>>>>>>>>>>>> topics itself) might be good. Otherwise, there is one > operator > >>>>>>>>>>>>>> `through()` that sometimes manages topics but sometimes > not; a > >>>>>>>>>> different > >>>>>>>>>>>>>> name, ie, new operator would make the distinction clearer. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> About adding `numOfPartitions` to `Grouped`. I am wondering > >> if the > >>>>>>>>>> same > >>>>>>>>>>>>>> argument as for `Produced` does apply and adding it is > >> semantically > >>>>>>>>>>>>>> questionable? Might be good to get opinions of others on > >> this, too. > >>>>>>>> I > >>>>>>>>>> am > >>>>>>>>>>>>>> not sure myself what solution I prefer atm. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> So far, KS uses configuration objects that allow to > configure > >> a > >>>>>>>>>> certain > >>>>>>>>>>>>>> "entity" like a consumer, producer, store. If we assume that > >> a topic > >>>>>>>>>> is > >>>>>>>>>>>>>> a similar entity, I am wonder if we should have a > >>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` class and method instead of > >> a plain > >>>>>>>>>>>>>> integer? This would allow us to add other configs, like > >> replication > >>>>>>>>>>>>>> factor, retention-time etc, easily, without the need to > >> change the > >>>>>>>>>> "main > >>>>>>>>>>>>>> API". > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Just want to give some ideas. Not sure if I like them > myself. > >> :) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote: > >>>>>>>>>>>>>>> Actually, giving it more though - maybe enhancing Produced > >> with num > >>>>>>>>>> of partitions configuration is not the best approach. Let me > >> explain > >>>>>>>> why: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 1) If we enhance Produced class with this configuration, > >> this will > >>>>>>>>>> also affect KStream#to operation. Since KStream#to is the final > >> sink of > >>>>>>>> the > >>>>>>>>>> topology, for me, it seems to be reasonable assumption that user > >> needs > >>>>>>>> to > >>>>>>>>>> manually create sink topic in advance. And in that case, having > >> num of > >>>>>>>>>> partitions configuration doesn’t make much sense. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 2) Looking at Produced class, based on API contract, seems > >> like > >>>>>>>>>> Produced is designed to be something that is explicitly for > >> producer > >>>>>>>> (key > >>>>>>>>>> serializer, value serializer, partitioner those all are producer > >>>>>>>> specific > >>>>>>>>>> configurations) and num of partitions is topic level > >> configuration. And > >>>>>>>> I > >>>>>>>>>> don’t think mixing topic and producer level configurations > >> together in > >>>>>>>> one > >>>>>>>>>> class is the good approach. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> 3) Looking at KStream interface, seems like Produced > >> parameter is > >>>>>>>>>> for operations that work with non-internal (e.g topics created > and > >>>>>>>> managed > >>>>>>>>>> internally by Kafka Streams) topics and I think we should leave > >> it as > >>>>>>>> it is > >>>>>>>>>> in that case. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Taking all this things into account, I think we should > >> distinguish > >>>>>>>>>> between DSL operations, where Kafka Streams should create and > >> manage > >>>>>>>>>> internal topics (KStream#groupBy) vs topics that should be > >> created in > >>>>>>>>>> advance (e.g KStream#to). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> To sum it up, I think adding numPartitions configuration in > >>>>>>>> Produced > >>>>>>>>>> will result in mixing topic and producer level configuration in > >> one > >>>>>>>> class > >>>>>>>>>> and it’s gonna break existing API semantics. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Regarding making topic name optional in KStream#through - I > >> think > >>>>>>>>>> underline idea is very useful and giving users possibility to > >> specify > >>>>>>>> num > >>>>>>>>>> of partitions there is even more useful :) Considering arguments > >> against > >>>>>>>>>> adding num of partitions in Produced class, I see two options > >> here: > >>>>>>>>>>>>>>> 1) Add following method overloads > >>>>>>>>>>>>>>> * through() - topic will be auto-generated and num of > >> partitions > >>>>>>>>>> will be taken from source topic > >>>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto > >>>>>>>>>> generated with specified num of partitions > >>>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V> > >>>>>>>>>> produced) - topic will be with generated with specified num of > >>>>>>>> partitions > >>>>>>>>>> and configuration taken from produced parameter. > >>>>>>>>>>>>>>> 2) Leave KStream#through as it is and introduce new method > - > >>>>>>>>>> KStream#repartition (I think Matthias suggested this in one of > the > >>>>>>>> threads) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Considering all mentioned above I propose the following > plan: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Option A: > >>>>>>>>>>>>>>> 1) Leave Produced as it is > >>>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as > >>>>>>>>>> mentioned in the KIP) > >>>>>>>>>>>>>>> 3) Add following method overloads to KStream#through > >>>>>>>>>>>>>>> * through() - topic will be auto-generated and num of > >> partitions > >>>>>>>>>> will be taken from source topic > >>>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto > >>>>>>>>>> generated with specified num of partitions > >>>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V> > >>>>>>>>>> produced) - topic will be with generated with specified num of > >>>>>>>> partitions > >>>>>>>>>> and configuration taken from produced parameter. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Option B: > >>>>>>>>>>>>>>> 1) Leave Produced as it is > >>>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as > >>>>>>>>>> mentioned in the KIP) > >>>>>>>>>>>>>>> 3) Add new operator KStream#repartition for creating and > >> managing > >>>>>>>>>> internal repartition topics > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> P.S. I’m sorry if all of this was already discussed in the > >> mailing > >>>>>>>>>> list, but I kinda got with all the threads that were about this > >> KIP :( > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Kind regards, > >>>>>>>>>>>>>>> Levani > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze < > >>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> wrote: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hello, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I would like to resurrect discussion around KIP-221. Going > >> through > >>>>>>>>>> the discussion thread, there’s seems to agreement around > >> usefulness of > >>>>>>>> this > >>>>>>>>>> feature. > >>>>>>>>>>>>>>>> Regarding the implementation, as far as I understood, the > >> most > >>>>>>>>>> optimal solution for me seems the following: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1) Add two method overloads to KStream#through method > >> (essentially > >>>>>>>>>> making topic name optional) > >>>>>>>>>>>>>>>> 2) Enhance Produced class with numOfPartitions > configuration > >>>>>>>> field. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Those two changes will allow DSL users to control > >> parallelism and > >>>>>>>>>> trigger re-partition without doing stateful operations. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I will update KIP with interface changes around > >> KStream#through if > >>>>>>>>>> this changes sound sensible. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Kind regards, > >>>>>>>>>>>>>>>> Levani > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>> > >>>> > >> > >> > >