Hello Florian,

Sorry for being late... Found myself keep apologizing for late replies
these days. But I do want to push this KIP's progress forward as I see it
very important and helpful feature for extensibility.

About the exceptions, I've gone through them and hopefully it is an
exhaustive list:

1. KTable#toStream()
2. KStream#merge(KStream)
3. KStream#process() / transform() / transformValues()
4. KGroupedTable / KGroupedStream#count()


Here's my reasoning:

* It is okay not letting users to override the name for 1/2, since they are
too trivial to be useful for debugging, plus their processor names would
not determine any related topic / store names.
* For 3, I'd vote for adding overloaded functions with Named.
* For 4, if users really want to name the processor she can call
aggregate() instead, so I think it is okay to skip this case.


Guozhang



On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <fhussonn...@gmail.com>
wrote:

> Hi,
>
> The option #3 seems to be a good alternative and I find the API more
> elegant (thanks John).
>
> But, we still have the need to overload some methods either because they do
> not accept an action instance or because they are translated to multiple
> processors.
>
> For example, this is the case for methods branch() and merge(). We could
> introduce a new interface Named (or maybe a different name ?) with a method
> name(). All action interfaces could extend this one to implement the option
> 3).
> This would result by having the following overloads  :
>
> Stream<K, V> merge(final Named name, final KStream<K, V> stream);
> KStream<K, V>[] branch(final Named name, final Predicate<? super K, ? super
> V>... predicates)
>
> N.B : The list above is  not exhaustive
>
> ---------
> user's code will become :
>
>         KStream<String, Integer> stream = builder.stream("test");
>         KStream<String, Integer>[] branches =
> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
>                 Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 ==
> 0),
>                 Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 !=
> 0));
>
>         branches[0].to("pair");
>         branches[1].to("impair");
> ---------
>
> This is a mix of the options 3) and 1)
>
> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <wangg...@gmail.com> a écrit :
>
> > Hi folks, just to summarize the options we have so far:
> >
> > 1) Add a new "as" for KTable / KStream, plus adding new fields for
> > operators-returns-void control objects (the current wiki's proposal).
> >
> > Pros: no more overloads.
> > Cons: a bit departing with the current high-level API design of the DSL,
> > plus, the inconsistency between operators-returns-void and
> > operators-not-return-voids.
> >
> > 2) Add overloaded functions for all operators, that accepts a new control
> > object "Described".
> >
> > Pros: consistent with current APIs.
> > Cons: lots of overloaded functions to add.
> >
> > 3) Add another default function in the interface (thank you J8!) as John
> > proposed.
> >
> > Pros: no overloaded functions, no "Described".
> > Cons: do we lose lambda functions really (seems not if we provide a
> "named"
> > for each func)? Plus "Described" may be more extensible than a single
> > `String`.
> >
> >
> > My principle of considering which one is better depends primarily on "how
> > to make advanced users easily use the additional API, while keeping it
> > hidden from normal users who do not care at all". For that purpose I
> think
> > 3) > 1) > 2).
> >
> > One caveat though, is that changing the interface would not be
> > binary-compatible though source-compatible, right? I.e. users need to
> > recompile their code though no changes needed.
> >
> >
> >
> > Another note: for 3), if we really want to keep extensibility of
> Described
> > we could do sth. like:
> >
> > ---------
> >
> > public interface Predicate<K, V> {
> >     // existing method
> >     boolean test(final K key, final V value);
> >
> >     // new default method adds the ability to name the predicate
> >     default Described described() {
> >         return new Described(null);
> >     }
> > }
> >
> > ----------
> >
> > where user's code becomes:
> >
> > stream.filter(named("key", (k, v) -> true));   // note `named` now just
> > sets a Described("key") in "described()".
> >
> > stream.filter(described(Described.as("key", /* any other fancy
> parameters
> > in the future*/), (k, v) -> true));
> > ----------
> >
> >
> > I feel it is not much likely that we'd need to extend it further in the
> > future, so just a `String` would be good enough. But just listing all
> > possibilities here.
> >
> >
> >
> > Guozhang
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <j...@confluent.io> wrote:
> >
> > > Hi Florian,
> > >
> > > Sorry I'm late to the party, but I missed the message originally.
> > >
> > > Regarding the names, it's probably a good idea to stick to the same
> > > character set we're currently using: letters, numbers, and hyphens. The
> > > names are used in Kafka topics, files and folders, and RocksDB
> databases,
> > > and we also need them to work with the file systems of Windows, Linux,
> > and
> > > MacOS. My opinion is that with a situation like that, it's better to be
> > > conservative. It might also be a good idea to impose an upper limit on
> > name
> > > length to avoid running afoul of any of those systems.
> > >
> > > ---
> > >
> > > It seems like there's a small debate between 1) adding a new method to
> > > KStream (and maybe KTable) to modify its name after the fact, or 2)
> > > piggy-backing on the config objects where they exist and adding one
> where
> > > they don't. To me, #2 is the better alternative even though it produces
> > > more overloads and may be a bit awkward in places.
> > >
> > > The reason is simply that #1 is a high-level departure from the
> > > graph-building paradigm we're using in the DSL. Consider:
> > >
> > > Graph.node1(config).node2(config)
> > >
> > > vs
> > >
> > > Graph.node1().config().node2().config()
> > >
> > > We could have done either, but we picked the former. I think it's
> > probably
> > > a good goal to try and stick to it so that developers can develop and
> > rely
> > > on their instincts for how the DSL will behave.
> > >
> > > I do want to present one alternative to adding new config objects: we
> can
> > > just add a "name()" method to all our "action" interfaces. For example,
> > > I'll demonstrate how we can add a "name" to Predicate and then use it
> to
> > > name a "KStream#filter" DSL operator:
> > >
> > > public interface Predicate<K, V> {
> > >     // existing method
> > >     boolean test(final K key, final V value);
> > >
> > >     // new default method adds the ability to name the predicate
> > >     default String name() {
> > >         return null;
> > >     }
> > >
> > >     // new static factory method adds the ability to wrap lambda
> > predicates
> > > with a named predicate
> > >     static <K, V> Predicate<K, V> named(final String name, final
> > > Predicate<K, V> predicate) {
> > >         return new Predicate<K, V>() {
> > >             @Override
> > >             public boolean test(final K key, final V value) {
> > >                 return predicate.test(key, value);
> > >             }
> > >
> > >             @Override
> > >             public String name() {
> > >                 return name;
> > >             }
> > >         };
> > >     }
> > > }
> > >
> > > Then, here's how it would look to use it:
> > >
> > > // Anonymous predicates continue to work just fine
> > > stream.filter((k, v) -> true);
> > >
> > > // Devs can swap in a Predicate that implements the name() method.
> > > stream.filter(new Predicate<Object, Object>() {
> > >     @Override
> > >     public boolean test(final Object key, final Object value) {
> > >         return true;
> > >     }
> > >
> > >     @Override
> > >     public String name() {
> > >         return "hey";
> > >     }
> > > });
> > >
> > > // Or they can wrap their existing lambda using the static factory
> method
> > > stream.filter(named("key", (k, v) -> true));
> > >
> > > Just a thought.
> > >
> > > Overall, I think it's really valuable to be able to name the
> processors,
> > > for all the reasons you mentioned in the KIP. So thank you for
> > introducing
> > > this!
> > >
> > > Thanks,
> > > -John
> > >
> > > On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois <
> fhussonn...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi, thank you very much for all you suggestions. I've started to
> update
> > > the
> > > > KIP (
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > > ).
> > > > Also, I propose to rename the Processed class into Described - this
> > will
> > > be
> > > > more meaningful (but this is just a detail).
> > > >
> > > > I'm OK to not enforcing uppercase for specific names but should we
> > allow
> > > > arbitrary names with whitespaces for example ? Currently, I can't
> tell
> > if
> > > > this can lead to some side effects ?
> > > >
> > > > Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <matth...@confluent.io
> >
> > a
> > > > écrit :
> > > >
> > > > > Just catching up on this thread.
> > > > >
> > > > > I like the general idea. Couple of comments:
> > > > >
> > > > >  - I think that adding `Processed` (or maybe a different name?) is
> a
> > > > > valid proposal for stateless operators that only have a single
> > overload
> > > > > atm. It would align with the overall API design.
> > > > >
> > > > >  - for all methods with multiple existing overloads, we can
> consider
> > to
> > > > > extend `Consumed`, `Produced`, `Materialized` etc to take an
> > additional
> > > > > processor name (not sure atm how elegant this is; we would need to
> > > > > "play" with the API a little bit; the advantage would be, that we
> do
> > > not
> > > > > add more overloads what seems to be key for this KIP)
> > > > >
> > > > >  - operators return void: while I agree that the "name first"
> > chaining
> > > > > idea is not very intuitive, it might still work, if we name the
> > method
> > > > > correctly (again, we would need to "play" with the API a little bit
> > to
> > > > see)
> > > > >
> > > > >  - for DSL operators that are translated to multiple nodes: it
> might
> > > > > make sense to use the specified operator name as prefix and add
> > > > > reasonable suffixes. For example, a join translates into 5
> operators
> > > > > that could be name "name-left-store-processor",
> > > > > "name-left-join-processor", "name-right-store-processor",
> > > > > "name-right-join-processor", and "name-join-merge-processor" (or
> > > > > similar). Maybe just using numbers might also work.
> > > > >
> > > > >  - I think, we should strip the number suffixes if a user provides
> > > names
> > > > >
> > > > >  - enforcing upper case seems to be tricky: for example, we do not
> > > > > enforce upper case for store names and we cannot easily change it
> as
> > it
> > > > > would break compatibility -- thus, for consistency reasons we might
> > not
> > > > > want to do this
> > > > >
> > > > >  - for better understand of the impact of the KIP, it would be
> quite
> > > > > helpful if you would list all method names that are affected in the
> > KIP
> > > > > (ie, list all newly added overloads)
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 5/31/18 6:40 PM, Guozhang Wang wrote:
> > > > > > Hi Florian,
> > > > > >
> > > > > > Re 1: I think changing the KStreamImpl / KTableImpl to allow
> > > modifying
> > > > > the
> > > > > > processor name after the operator is fine as long as we do the
> > check
> > > > > again
> > > > > > when modifying that. In fact, we are having some topology
> > > optimization
> > > > > > going on which may modify processor names in the final topology
> > > > anyways (
> > > > > > https://github.com/apache/kafka/pull/4983). Semantically I think
> > it
> > > is
> > > > > > easier to understand to developers than "deciding the processor
> > name
> > > > for
> > > > > > the next operator".
> > > > > >
> > > > > > Re 2: Yeah I'm thinking that for operators that translates to
> > > multiple
> > > > > > processor names, we can still use the provided "hint" to name the
> > > > > processor
> > > > > > names, e.g. for Joins we can name them as `join-foo-this` and
> > > > > > `join-foo-that` etc if user calls `as("foo")`.
> > > > > >
> > > > > > Re 3: The motivation I had about removing the suffix is that it
> has
> > > > huge
> > > > > > restrictions on topology compatibilities: consider if user code
> > > added a
> > > > > new
> > > > > > operator, or library does some optimization to remove some
> > operators,
> > > > the
> > > > > > suffix indexing may be changed for a large amount of the
> processor
> > > > names:
> > > > > > this will in turn change the internal state store names, as well
> as
> > > > > > internal topic names as well, making the new application topology
> > to
> > > be
> > > > > > incompatible with the ones. One rationale I had about this KIP is
> > > that
> > > > > > aligned this effort, moving forward we can allow users to
> customize
> > > > > > internal names so that they can still be reused even with
> topology
> > > > > changes
> > > > > > (e.g. KIP-230), so I think removing the suffix index would be
> more
> > > > > > applicable in the long run.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <
> > > > > fhussonn...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi ,
> > > > > >> Thank you very much for your feedback.
> > > > > >>
> > > > > >> 1/
> > > > > >> I agree that overloading most of the methods with a Processed is
> > not
> > > > > ideal.
> > > > > >> I've started modifying the KStream API and I got to the same
> > > > conclusion.
> > > > > >> Also ading a new method directly to KStreamImpl and KTableImpl
> > > classes
> > > > > >> seems to be a better option.
> > > > > >>
> > > > > >> However a processor name cannot be redefined after calling an
> > > operator
> > > > > (or
> > > > > >> maybe I miss something in the code).
> > > > > >> From my understanding, this will only set the KStream name
> > property
> > > > not
> > > > > the
> > > > > >> processor name previsouly added to the topology builder -
> leading
> > to
> > > > > >> InvalidTopology exception.
> > > > > >>
> > > > > >> So the new method should actually defines the name of the next
> > > > > processor :
> > > > > >> Below is an example :
> > > > > >>
> > > > > >> *stream.as <http://stream.as
> > >(Processed.name("MAPPE_TO_UPPERCASE")*
> > > > > >> *          .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))*
> > > > > >>
> > > > > >> I think this approach could solve the cases for methods
> returning
> > > > void ?
> > > > > >>
> > > > > >> Regarding this new method we have two possible implementations :
> > > > > >>
> > > > > >>    1. Adding a method like : withName(String processorName)
> > > > > >>    2. or adding a method accepting an Processed object :
> > > > as(Processed).
> > > > > >>
> > > > > >> I think solution 2. is preferable as the Processed class could
> be
> > > > > enriched
> > > > > >> further (in futur).
> > > > > >>
> > > > > >> 2/
> > > > > >> As Guozhang said some operators add internal processors.
> > > > > >> For example the branch() method create one KStreamBranch
> processor
> > > to
> > > > > route
> > > > > >> records and one KStreamPassThrough processor for each branch.
> > > > > >> In that situation only the parent processor can be named. For
> > > children
> > > > > >> processors we could keep the current behaviour that add a suffix
> > > (i.e
> > > > > >> KSTREAM-BRANCHCHILD-)
> > > > > >>
> > > > > >> This also the case for the join() method that result to adding
> > > > multiple
> > > > > >> processors to the topology (windowing, left/right joins and a
> > merge
> > > > > >> processor).
> > > > > >> I think, like for the branch method users could only define a
> > > > processor
> > > > > >> name prefix.
> > > > > >>
> > > > > >> 3/
> > > > > >> I think we should  still added a suffix like "-0000000000" to
> > > > processor
> > > > > >> name and enforce uppercases as this will keep some consistency
> > with
> > > > the
> > > > > >> ones generated by the API.
> > > > > >>
> > > > > >> 4/
> > > > > >> Yes, the KTable interface should be modified like KStream to
> allow
> > > > > custom
> > > > > >> processor names definition.
> > > > > >>
> > > > > >> Thanks,
> > > > > >>
> > > > > >>
> > > > > >> Le jeu. 31 mai 2018 à 19:18, Damian Guy <damian....@gmail.com>
> a
> > > > écrit
> > > > > :
> > > > > >>
> > > > > >>> Hi Florian,
> > > > > >>>
> > > > > >>> Thanks for the KIP. What about KTable and other DSL interfaces?
> > > Will
> > > > > they
> > > > > >>> not want to be able to do the same thing?
> > > > > >>> It would be good to see a complete set of the public API
> changes.
> > > > > >>>
> > > > > >>> Cheers,
> > > > > >>> Damian
> > > > > >>>
> > > > > >>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > > >>>
> > > > > >>>> Hello Florian,
> > > > > >>>>
> > > > > >>>> Thanks for the KIP. I have some meta feedbacks on the
> proposal:
> > > > > >>>>
> > > > > >>>> 1. You mentioned that this `Processed` object will be added
> to a
> > > new
> > > > > >>>> overloaded variant of all the stateless operators, what about
> > the
> > > > > >>> stateful
> > > > > >>>> operators? Would like to hear your opinions if you have
> thought
> > > > about
> > > > > >>> that:
> > > > > >>>> note for stateful operators they will usually be mapped to
> > > multiple
> > > > > >>>> processor node names, so we probably need to come up with some
> > > ways
> > > > to
> > > > > >>>> define all their names.
> > > > > >>>>
> > > > > >>>> 2. I share the same concern with Bill as for adding lots of
> new
> > > > > >> overload
> > > > > >>>> functions into the stateless operators, as we have just spent
> > > quite
> > > > > >> some
> > > > > >>>> effort in trimming them since 1.0.0 release. If the goal is to
> > > just
> > > > > >>> provide
> > > > > >>>> some "hints" on the generated processor node names, not
> strictly
> > > > > >>> enforcing
> > > > > >>>> the exact names that to be generated, then how about we just
> > add a
> > > > new
> > > > > >>>> function to `KStream` and `KTable` classes like:
> > "as(Processed)",
> > > > with
> > > > > >>> the
> > > > > >>>> semantics as "the latest operators that generate this KStream
> /
> > > > KTable
> > > > > >>> will
> > > > > >>>> be named accordingly to this hint".
> > > > > >>>>
> > > > > >>>> The only caveat, is that for all operators like `KStream#to`
> and
> > > > > >>>> `KStream#print` that returns void, this alternative would not
> > > work.
> > > > > But
> > > > > >>> for
> > > > > >>>> the current operators:
> > > > > >>>>
> > > > > >>>> a. KStream#print,
> > > > > >>>> b. KStream#foreach,
> > > > > >>>> c. KStream#to,
> > > > > >>>> d. KStream#process
> > > > > >>>>
> > > > > >>>> I personally felt that except `KStream#process` users would
> not
> > > > > usually
> > > > > >>>> bother to override their names, and for `KStream#process` we
> > could
> > > > add
> > > > > >> an
> > > > > >>>> overload variant with the additional Processed object.
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> 3. In your example, the processor names are still added with a
> > > > suffix
> > > > > >>> like
> > > > > >>>> "
> > > > > >>>> -0000000000", is this intentional? If yes, why (I thought with
> > > user
> > > > > >>>> specified processor name hints we will not add suffix to
> > > distinguish
> > > > > >>>> different nodes of the same type any more)?
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> Guozhang
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck <
> bbej...@gmail.com
> > >
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>>> Hi Florian,
> > > > > >>>>>
> > > > > >>>>> Thanks for the KIP.  I think being able to add more context
> to
> > > the
> > > > > >>>>> processor names would be useful.
> > > > > >>>>>
> > > > > >>>>> I like the idea of adding a "withProcessorName" to Produced,
> > > > Consumed
> > > > > >>> and
> > > > > >>>>> Joined.
> > > > > >>>>>
> > > > > >>>>> But instead of adding the "Processed" parameter to a large
> > > > percentage
> > > > > >>> of
> > > > > >>>>> the methods, which would result in overloaded methods (which
> we
> > > > > >> removed
> > > > > >>>>> quite a bit with KIP-182) what do you think of adding a
> method
> > > > > >>>>> to the AbstractStream class "withName(String processorName)"?
> > BTW
> > > > I"m
> > > > > >>> not
> > > > > >>>>> married to the method name, it's the best I can do off the
> top
> > of
> > > > my
> > > > > >>>> head.
> > > > > >>>>>
> > > > > >>>>> For the methods that return void, we'd have to add a
> parameter,
> > > but
> > > > > >>> that
> > > > > >>>>> would at least cut down on the number of overloaded methods
> in
> > > the
> > > > > >> API.
> > > > > >>>>>
> > > > > >>>>> Just my 2 cents.
> > > > > >>>>>
> > > > > >>>>> Thanks,
> > > > > >>>>> Bill
> > > > > >>>>>
> > > > > >>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois <
> > > > > >>>> fhussonn...@gmail.com
> > > > > >>>>>>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > > >>>>>> Hi,
> > > > > >>>>>>
> > > > > >>>>>> I would like to start a new discussion on following KIP :
> > > > > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >>>>>>
> > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > > > >>>>>>
> > > > > >>>>>> This is still a draft.
> > > > > >>>>>>
> > > > > >>>>>> Looking forward for your feedback.
> > > > > >>>>>> --
> > > > > >>>>>> Florian HUSSONNOIS
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> --
> > > > > >>>> -- Guozhang
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Florian HUSSONNOIS
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > > > --
> > > > Florian HUSSONNOIS
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> Florian HUSSONNOIS
>



-- 
-- Guozhang

Reply via email to