I think that is a good point about trying to keep producer level
configurations and (repartition) topic level considerations separate.
Number of partitions is definitely purely a topic level configuration. But
on some level, serdes and partitioners are just as much a topic
configuration as a producer one. You could have two producers configured
with different serdes and/or partitioners, but if they are writing to the
same topic the result would be very difficult to part. So in a sense, these
are configurations of topics in Streams, not just producers.

Another way to think of it: while the Streams API is not always true to
this, ideally all the relevant configs for an operator are wrapped into a
single object (in this case, Repartitioned). We could instead split out the
fields in common with Produced into a separate parameter to keep topic and
producer level configurations separate, but this increases the API surface
area by a lot. It's much more straightforward to just say "this is
everything that this particular operator needs" without worrying about what
exactly you're specifying.

I suppose you could alternatively make Produced a field of Repartitioned,
but I don't think we do this kind of composition elsewhere in Streams at
the moment

On Fri, Jul 19, 2019 at 1:45 PM Levani Kokhreidze <levani.co...@gmail.com>
wrote:

> Hi Bill,
>
> Thanks a lot for the feedback.
> Yes, that makes sense. I’ve updated KIP with `Repartitioned#partitioner`
> configuration.
> In the beginning, I wanted to introduce a class for topic level
> configuration and keep topic level and producer level configurations (such
> as Produced) separately (see my second email in this thread).
> But while looking at the semantics of KStream interface, I couldn’t really
> figure out good operation name for Topic level configuration class and just
> introducing `Topic` config class was kinda breaking the semantics.
> So I think having Repartitioned class which encapsulates topic and
> producer level configurations for internal topics is viable thing to do.
>
> Regards,
> Levani
>
> > On Jul 19, 2019, at 7:47 PM, Bill Bejeck <bbej...@gmail.com> wrote:
> >
> > Hi Lavani,
> >
> > Thanks for resurrecting this KIP.
> >
> > I'm also a +1 for adding a partition option.  In addition to the reason
> > provided by John, my reasoning is:
> >
> >   1. Users may want to use something other than hash-based partitioning
> >   2. Users may wish to partition on something different than the key
> >   without having to change the key.  For example:
> >      1. A combination of fields in the value in conjunction with the key
> >      2. Something other than the key
> >   3. We allow users to specify a partitioner on Produced hence in
> >   KStream.to and KStream.through, so it makes sense for API consistency.
> >
> > Just my  2 cents.
> >
> > Thanks,
> > Bill
> >
> >
> >
> > On Fri, Jul 19, 2019 at 5:46 AM Levani Kokhreidze <
> levani.co...@gmail.com>
> > wrote:
> >
> >> Hi John,
> >>
> >> In my mind it makes sense.
> >> If we add partitioner configuration to Repartitioned class, with the
> >> combination of specifying number of partitions for internal topics, user
> >> will have opportunity to ensure co-partitioning before join operation.
> >> I think this can be quite powerful feature.
> >> Wondering what others think about this?
> >>
> >> Regards,
> >> Levani
> >>
> >>> On Jul 18, 2019, at 1:20 AM, John Roesler <j...@confluent.io> wrote:
> >>>
> >>> Yes, I believe that's what I had in mind. Again, not totally sure it
> >>> makes sense, but I believe something similar is the rationale for
> >>> having the partitioner option in Produced.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Wed, Jul 17, 2019 at 3:20 PM Levani Kokhreidze
> >>> <levani.co...@gmail.com> wrote:
> >>>>
> >>>> Hey John,
> >>>>
> >>>> Oh that’s interesting use-case.
> >>>> Do I understand this correctly, in your example I would first issue
> >> repartition(Repartitioned) with proper partitioner that essentially
> would
> >> be the same as the topic I want to join with and then do the
> KStream#join
> >> with DSL?
> >>>>
> >>>> Regards,
> >>>> Levani
> >>>>
> >>>>> On Jul 17, 2019, at 11:11 PM, John Roesler <j...@confluent.io>
> wrote:
> >>>>>
> >>>>> Hey, all, just to chime in,
> >>>>>
> >>>>> I think it might be useful to have an option to specify the
> >>>>> partitioner. The case I have in mind is that some data may get
> >>>>> repartitioned and then joined with an input topic. If the right-side
> >>>>> input topic uses a custom partitioning strategy, then the
> >>>>> repartitioned stream also needs to be partitioned with the same
> >>>>> strategy.
> >>>>>
> >>>>> Does that make sense, or did I maybe miss something important?
> >>>>>
> >>>>> Thanks,
> >>>>> -John
> >>>>>
> >>>>> On Wed, Jul 17, 2019 at 2:48 PM Levani Kokhreidze
> >>>>> <levani.co...@gmail.com> wrote:
> >>>>>>
> >>>>>> Yes, I was thinking about it as well. To be honest I’m not sure
> about
> >> it yet.
> >>>>>> As Kafka Streams DSL user, I don’t really think I would need control
> >> over partitioner for internal topics.
> >>>>>> As a user, I would assume that Kafka Streams knows best how to
> >> partition data for internal topics.
> >>>>>> In this KIP I wrote that Produced should be used only for topics
> that
> >> are created by user In advance.
> >>>>>> In those cases maybe it make sense to have possibility to specify
> the
> >> partitioner.
> >>>>>> I don’t have clear answer on that yet, but I guess specifying the
> >> partitioner can be added as well if there’s agreement on this.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Levani
> >>>>>>
> >>>>>>> On Jul 17, 2019, at 10:42 PM, Sophie Blee-Goldman <
> >> sop...@confluent.io> wrote:
> >>>>>>>
> >>>>>>> Thanks for clearing that up. I agree that Repartitioned would be a
> >> useful
> >>>>>>> addition. I'm wondering if it might also need to have
> >>>>>>> a withStreamPartitioner method/field, similar to Produced? I'm not
> >> sure how
> >>>>>>> widely this feature is really used, but seems it should be
> available
> >> for
> >>>>>>> repartition topics.
> >>>>>>>
> >>>>>>> On Wed, Jul 17, 2019 at 11:26 AM Levani Kokhreidze <
> >> levani.co...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hey Sophie,
> >>>>>>>>
> >>>>>>>> In both cases KStream#repartition and
> >> KStream#repartition(Repartitioned)
> >>>>>>>> topic will be created and managed by Kafka Streams.
> >>>>>>>> Idea of Repartitioned is to give user more control over the topic
> >> such as
> >>>>>>>> num of partitions.
> >>>>>>>> I feel like Repartitioned parameter is something that is missing
> in
> >>>>>>>> current DSL design.
> >>>>>>>> Essentially giving user control over parallelism by configuring
> num
> >> of
> >>>>>>>> partitions for internal topics.
> >>>>>>>>
> >>>>>>>> Hope this answers your question.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Levani
> >>>>>>>>
> >>>>>>>>> On Jul 17, 2019, at 9:02 PM, Sophie Blee-Goldman <
> >> sop...@confluent.io>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hey Levani,
> >>>>>>>>>
> >>>>>>>>> Thanks for the KIP! Can you clarify one thing for me -- for the
> >>>>>>>>> KStream#repartition signature taking a Repartitioned, will the
> >> topic be
> >>>>>>>>> auto-created by Streams (which seems to be the case for the
> >> signature
> >>>>>>>>> without a Repartitioned) or does it have to be pre-created? The
> >> wording
> >>>>>>>> in
> >>>>>>>>> the KIP makes it seem like one version of the method will
> >> auto-create
> >>>>>>>>> topics while the other will not.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Sophie
> >>>>>>>>>
> >>>>>>>>> On Wed, Jul 17, 2019 at 10:15 AM Levani Kokhreidze <
> >>>>>>>> levani.co...@gmail.com>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hello,
> >>>>>>>>>>
> >>>>>>>>>> One more bump about KIP-221 (
> >>>>>>>>>>
> >>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>> )
> >>>>>>>>>> so it doesn’t get lost in mailing list :)
> >>>>>>>>>> Would love to hear communities opinions/concerns about this KIP.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Levani
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> On Jul 12, 2019, at 5:27 PM, Levani Kokhreidze <
> >> levani.co...@gmail.com
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hello,
> >>>>>>>>>>>
> >>>>>>>>>>> Kind reminder about this KIP:
> >>>>>>>>>>
> >>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Levani
> >>>>>>>>>>>
> >>>>>>>>>>>> On Jul 9, 2019, at 11:38 AM, Levani Kokhreidze <
> >>>>>>>> levani.co...@gmail.com
> >>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hello,
> >>>>>>>>>>>>
> >>>>>>>>>>>> In order to move this KIP forward, I’ve updated confluence
> page
> >> with
> >>>>>>>>>> the new proposal
> >>>>>>>>>>
> >>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221:+Enhance+KStream+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>>>>>>>>>>
> >>>>>>>>>>>> I’ve also filled “Rejected Alternatives” section.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Looking forward to discuss this KIP :)
> >>>>>>>>>>>>
> >>>>>>>>>>>> King regards,
> >>>>>>>>>>>> Levani
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> On Jul 3, 2019, at 1:08 PM, Levani Kokhreidze <
> >>>>>>>> levani.co...@gmail.com
> >>>>>>>>>> <mailto:levani.co...@gmail.com>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hello Matthias,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the feedback and ideas.
> >>>>>>>>>>>>> I like the idea of introducing dedicated `Topic` class for
> >> topic
> >>>>>>>>>> configuration for internal operators like `groupedBy`.
> >>>>>>>>>>>>> Would be great to hear others opinion about this as well.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Kind regards,
> >>>>>>>>>>>>> Levani
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Jul 3, 2019, at 7:00 AM, Matthias J. Sax <
> >> matth...@confluent.io
> >>>>>>>>>> <mailto:matth...@confluent.io>> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Levani,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks for picking up this KIP! And thanks for summarizing
> >>>>>>>> everything.
> >>>>>>>>>>>>>> Even if some points may have been discussed already (can't
> >> really
> >>>>>>>>>>>>>> remember), it's helpful to get a good summary to refresh the
> >>>>>>>>>> discussion.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think your reasoning makes sense. With regard to the
> >> distinction
> >>>>>>>>>>>>>> between operators that manage topics and operators that use
> >>>>>>>>>> user-created
> >>>>>>>>>>>>>> topics: Following this argument, it might indicate that
> >> leaving
> >>>>>>>>>>>>>> `through()` as-is (as an operator that uses use-defined
> >> topics) and
> >>>>>>>>>>>>>> introducing a new `repartition()` operator (an operator that
> >> manages
> >>>>>>>>>>>>>> topics itself) might be good. Otherwise, there is one
> operator
> >>>>>>>>>>>>>> `through()` that sometimes manages topics but sometimes
> not; a
> >>>>>>>>>> different
> >>>>>>>>>>>>>> name, ie, new operator would make the distinction clearer.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About adding `numOfPartitions` to `Grouped`. I am wondering
> >> if the
> >>>>>>>>>> same
> >>>>>>>>>>>>>> argument as for `Produced` does apply and adding it is
> >> semantically
> >>>>>>>>>>>>>> questionable? Might be good to get opinions of others on
> >> this, too.
> >>>>>>>> I
> >>>>>>>>>> am
> >>>>>>>>>>>>>> not sure myself what solution I prefer atm.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> So far, KS uses configuration objects that allow to
> configure
> >> a
> >>>>>>>>>> certain
> >>>>>>>>>>>>>> "entity" like a consumer, producer, store. If we assume that
> >> a topic
> >>>>>>>>>> is
> >>>>>>>>>>>>>> a similar entity, I am wonder if we should have a
> >>>>>>>>>>>>>> `Topic#withNumberOfPartitions()` class and method instead of
> >> a plain
> >>>>>>>>>>>>>> integer? This would allow us to add other configs, like
> >> replication
> >>>>>>>>>>>>>> factor, retention-time etc, easily, without the need to
> >> change the
> >>>>>>>>>> "main
> >>>>>>>>>>>>>> API".
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Just want to give some ideas. Not sure if I like them
> myself.
> >> :)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 7/1/19 1:04 AM, Levani Kokhreidze wrote:
> >>>>>>>>>>>>>>> Actually, giving it more though - maybe enhancing Produced
> >> with num
> >>>>>>>>>> of partitions configuration is not the best approach. Let me
> >> explain
> >>>>>>>> why:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 1) If we enhance Produced class with this configuration,
> >> this will
> >>>>>>>>>> also affect KStream#to operation. Since KStream#to is the final
> >> sink of
> >>>>>>>> the
> >>>>>>>>>> topology, for me, it seems to be reasonable assumption that user
> >> needs
> >>>>>>>> to
> >>>>>>>>>> manually create sink topic in advance. And in that case, having
> >> num of
> >>>>>>>>>> partitions configuration doesn’t make much sense.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 2) Looking at Produced class, based on API contract, seems
> >> like
> >>>>>>>>>> Produced is designed to be something that is explicitly for
> >> producer
> >>>>>>>> (key
> >>>>>>>>>> serializer, value serializer, partitioner those all are producer
> >>>>>>>> specific
> >>>>>>>>>> configurations) and num of partitions is topic level
> >> configuration. And
> >>>>>>>> I
> >>>>>>>>>> don’t think mixing topic and producer level configurations
> >> together in
> >>>>>>>> one
> >>>>>>>>>> class is the good approach.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 3) Looking at KStream interface, seems like Produced
> >> parameter is
> >>>>>>>>>> for operations that work with non-internal (e.g topics created
> and
> >>>>>>>> managed
> >>>>>>>>>> internally by Kafka Streams) topics and I think we should leave
> >> it as
> >>>>>>>> it is
> >>>>>>>>>> in that case.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Taking all this things into account, I think we should
> >> distinguish
> >>>>>>>>>> between DSL operations, where Kafka Streams should create and
> >> manage
> >>>>>>>>>> internal topics (KStream#groupBy) vs topics that should be
> >> created in
> >>>>>>>>>> advance (e.g KStream#to).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> To sum it up, I think adding numPartitions configuration in
> >>>>>>>> Produced
> >>>>>>>>>> will result in mixing topic and producer level configuration in
> >> one
> >>>>>>>> class
> >>>>>>>>>> and it’s gonna break existing API semantics.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Regarding making topic name optional in KStream#through - I
> >> think
> >>>>>>>>>> underline idea is very useful and giving users possibility to
> >> specify
> >>>>>>>> num
> >>>>>>>>>> of partitions there is even more useful :) Considering arguments
> >> against
> >>>>>>>>>> adding num of partitions in Produced class, I see two options
> >> here:
> >>>>>>>>>>>>>>> 1) Add following method overloads
> >>>>>>>>>>>>>>> * through() - topic will be auto-generated and num of
> >> partitions
> >>>>>>>>>> will be taken from source topic
> >>>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
> >>>>>>>>>> generated with specified num of partitions
> >>>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
> >>>>>>>>>> produced) - topic will be with generated with specified num of
> >>>>>>>> partitions
> >>>>>>>>>> and configuration taken from produced parameter.
> >>>>>>>>>>>>>>> 2) Leave KStream#through as it is and introduce new method
> -
> >>>>>>>>>> KStream#repartition (I think Matthias suggested this in one of
> the
> >>>>>>>> threads)
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Considering all mentioned above I propose the following
> plan:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Option A:
> >>>>>>>>>>>>>>> 1) Leave Produced as it is
> >>>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
> >>>>>>>>>> mentioned in the KIP)
> >>>>>>>>>>>>>>> 3) Add following method overloads to KStream#through
> >>>>>>>>>>>>>>> * through() - topic will be auto-generated and num of
> >> partitions
> >>>>>>>>>> will be taken from source topic
> >>>>>>>>>>>>>>> * through(final int numOfPartitions) - topic will be auto
> >>>>>>>>>> generated with specified num of partitions
> >>>>>>>>>>>>>>> * through(final int numOfPartitions, final Produced<K, V>
> >>>>>>>>>> produced) - topic will be with generated with specified num of
> >>>>>>>> partitions
> >>>>>>>>>> and configuration taken from produced parameter.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Option B:
> >>>>>>>>>>>>>>> 1) Leave Produced as it is
> >>>>>>>>>>>>>>> 2) Add num of partitions configuration to Grouped class (as
> >>>>>>>>>> mentioned in the KIP)
> >>>>>>>>>>>>>>> 3) Add new operator KStream#repartition for creating and
> >> managing
> >>>>>>>>>> internal repartition topics
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> P.S. I’m sorry if all of this was already discussed in the
> >> mailing
> >>>>>>>>>> list, but I kinda got with all the threads that were about this
> >> KIP :(
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Kind regards,
> >>>>>>>>>>>>>>> Levani
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On Jul 1, 2019, at 9:56 AM, Levani Kokhreidze <
> >>>>>>>>>> levani.co...@gmail.com <mailto:levani.co...@gmail.com>> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I would like to resurrect discussion around KIP-221. Going
> >> through
> >>>>>>>>>> the discussion thread, there’s seems to agreement around
> >> usefulness of
> >>>>>>>> this
> >>>>>>>>>> feature.
> >>>>>>>>>>>>>>>> Regarding the implementation, as far as I understood, the
> >> most
> >>>>>>>>>> optimal solution for me seems the following:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 1) Add two method overloads to KStream#through method
> >> (essentially
> >>>>>>>>>> making topic name optional)
> >>>>>>>>>>>>>>>> 2) Enhance Produced class with numOfPartitions
> configuration
> >>>>>>>> field.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Those two changes will allow DSL users to control
> >> parallelism and
> >>>>>>>>>> trigger re-partition without doing stateful operations.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I will update KIP with interface changes around
> >> KStream#through if
> >>>>>>>>>> this changes sound sensible.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Kind regards,
> >>>>>>>>>>>>>>>> Levani
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>
> >>
>
>

Reply via email to