Just catching up on this discussion.

My overall personal take is, that I am not a big fan of the interface
`Named` that is used as a factory. I would rather prefer to add a
control object parameter to all methods that don't have one yet. This
KIP was started a while ago, and we added new naming capabilities in the
meantime. Guozhang's example in the PR comment about naming in
stream-stream join shows, that we might end up in a confusion situation
for users if we use `Named`. Also, in 2.1, user can already name as
repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
provide non-functional naming?

Hence, for all methods that allow to specify names already, I don't see
any reason to change them, but use the existing API to also name the
processor(s) instead of allowing uses to specify a new name.

About the inconsistency in method naming. I agree, that `as` is very
generic and maybe not the best choice.

I think it might be helpful, to have a table overview in the KIP, that
list all existing static/non-static methods that allow to specify a
name, plus a columns with the new suggested naming for those methods?

Thoughts?


-Matthias


On 12/12/18 12:45 AM, Florian Hussonnois wrote:
> Thank you very much for your feedbacks.
> 
> Currently, there is still lot of discussions regarding the Named interface.
> On the one hand we should provided consistency over the stream API and on
> the other hand we should not break the semantic as John point it up.
> 
> Guozhang, I'm sorry, but I'm little bit confused, maybe I missed something.
> In your comment you have suggested that :
> * Produced/Consumed/Suppressed should extends Named
> * Named should have a private-package method to get the specified processor
> name internally (processorName())
> * Finally we should end up with something like :  Named -> XXX ->
> XXXInternal or Named -> Produced -> ProducedInternal
> 
> The objective behind that is to :
> * consolidate the internal method processorName()
> * consolidate the method withName that exists now existing into Produced,
> Consumed and Suppressed.
> 
> But, Named is an interface so we can't define a private-package method on
> it. Also, for example Produced and ProducedInternal are not in the same
> package so having a private-package method doesn't really help.
> In addition, if we add the withName method into Named interface this can
> become confusing for developers because action interfaces (ValueMapper,
> Reducer, etc) extend it.
> The interface would look like :
> 
> public interface Named<T extends Named<T>> {
>     default String name() {
>         return null;
>     }
>     default Named<T> withName(final String name) {
>         return null;
>     }
> ...
> }
> 
> So maybe instead of adding another method to Named we could create a new
> package-private class that could be extended by
> Produced/Consumed/Joined/Suppressed. For exemple,
> class SettableName<T extends SettableName<T>> implements Named {
> 
>     protected String processorName;
> 
>     SettableName(final SettableName settable) {
>         this(Objects.requireNonNull(settable, "settable can't be
> null").name());
>     }
> 
>     SettableName(final String processorName) {
>         this.processorName = processorName;
>     }
> 
>     @Override
>     public String name() {
>         return processorName;
>     }
>     public T withName(final String processorName) {
>         this.processorName = processorName;
>         return (T)this;
>     }
> }
> 
> In that way, we will get : public class Produced implements
> SettableName<Produced> { ...
> 
> WDYT?
> 
> 
> Le mar. 11 déc. 2018 à 02:46, Guozhang Wang <wangg...@gmail.com> a écrit :
> 
>> I had one meta comment on the PR:
>> https://github.com/apache/kafka/pull/5909#discussion_r240447153
>>
>> On Mon, Dec 10, 2018 at 5:22 PM John Roesler <j...@confluent.io> wrote:
>>
>>> Hi Florian,
>>>
>>> I hope it's ok if I ask a few questions at this late stage...
>>>
>>> Comment 1 ======
>>>
>>> It seems like the proposal is to add a new "Named" interface that is
>>> intended to be mixed in with the existing API objects at various points.
>>>
>>> Just to preface some of my comments, it looks like your KIP was created
>>> quite a while ago, so the API may have changed somewhat since you
>> started.
>>>
>>> As I see the API, there are a few different kinds of DSL method
>> arguments:
>>> * functions: things like Initializer, Aggregator, ValueJoiner,
>>> ForEachAction... All of these are essentially Streams-flavored Function
>>> interfaces with different arities, type bounds, and semantics.
>>> * config objects: things like Produced, Consumed, Joined, Grouped...
>> These
>>> are containers for configurations, where the target of the configuration
>> is
>>> the operation itself
>>> * raw configurations: things like a raw topic-name string and
>> Materialized:
>>> These are configurations for operations that have no config object, and
>> for
>>> various reasons, we didn't make one. The distinguishing feature is that
>> the
>>> target of the configuration is not the operation itself, but some aspect
>> of
>>> it. For example, in Materialized, we are not setting the caching behavior
>>> of, for example, an aggregation; we're setting the caching behavior of a
>>> materialized state store attached to the aggregation.
>>>
>>> It seems like choosing to mix the Named interface in with the functions
>> has
>>> a couple of unfortunate side-effects:
>>> * Aggregator is not the only function passed to any of the relevant
>>> aggregate methods, so it seems a little arbitrary to pick that function
>>> over Initializer or Merger.
>>> * As you noted, branch() takes an array of Predicate, so we just ignore
>> the
>>> provided name(s), even though Predicate names are used elsewhere.
>>> * Not all things that we want to name have function arguments, notably
>>> source and sink, so we'd switch paradigms and use the config object
>>> instead.
>>> * Adding an extra method to the function interfaces means that those are
>> no
>>> longer SAM interfaces. You proposed to add a default implementation, so
>> we
>>> could still pass a lambda if we don't want to set the name, but if we
>> *do*
>>> want to set the name, we can no longer use lambdas.
>>>
>>> I think the obvious other choice would be to mix Named in with the config
>>> objects instead, but this has one main downside of its own...
>>> * not every operator we wish to name has a config object. I don't know if
>>> everyone involved is comfortable with adding a config object to every
>>> operator that's missing one.
>>>
>>> Personally, I favor moving toward a more consistent state that's forward
>>> compatible with any further changes we wish to make. I *think* that
>> giving
>>> every operator two forms (one with no config and one with a config
>> object)
>>> would be such an API.
>>>
>>> Comment 2 =========
>>>
>>> Finally, just a minor comment: the static method in Named wouldn't work
>>> properly as defined. Assuming that we mix Named in with Produced, for
>>> example, we'd need to be able to use it like:
>>>>  kStream.to("out", Produced.with("myOut"))
>>> This doesn't work because with() returns a Named, but we need a Produced.
>>>
>>> We can pull off a builder method in the interface, but not a static
>> method.
>>> To define a builder method in the interface that returns an instance of
>> the
>>> concrete subtype, you have to use the "curiously recurring generic"
>>> pattern.
>>>
>>> It would look like:
>>>
>>> public interface Named<N extends Named<N>> {
>>>   String name();
>>>   N withName(String name);
>>> }
>>>
>>> You can see where the name of the pattern comes from ;)
>>> An implementation would then look like:
>>>
>>> public class Produced implements Named<Produced> {
>>>   String name() { return name; }
>>>   Produced withName(final String name) { this.name = name; return this;
>> }
>>> }
>>>
>>> Note that the generic parameter gets filled in properly in the
>> implementing
>>> class, so that you get the right return type out.
>>>
>>> It doesn't work at all with a static factory method at the interface
>> level,
>>> so it would be up to Produced to define a static factory if it wants to
>>> present one.
>>>
>>> ======
>>>
>>> Those are my two feedbacks!
>>>
>>> I hope you find this helpful, rather than frustrating. I'm sorry I didn't
>>> get a chance to comment sooner.
>>>
>>> Thanks for the KIP, I think it will be much nicer to be able to name the
>>> processor nodes.
>>>
>>> -John
>>>
>>> On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>
>>>> Hi Florian,
>>>>
>>>> I've made a pass over the PR. There are some comments that are related
>> to
>>>> the function names which may be affecting the KIP wiki page, but
>> overall
>>> I
>>>> think it looks good already.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>>
>>>>> Thanks Florian! I will take a look at the PR.
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois <
>>>> fhussonn...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Matthias,
>>>>>>
>>>>>> Sorry I was absent for a while. I have started a new PR for this
>> KIP.
>>> It
>>>>>> is
>>>>>> still in progress for now. I'm working on it.
>>>>>> https://github.com/apache/kafka/pull/5909
>>>>>>
>>>>>> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax <
>> matth...@confluent.io>
>>> a
>>>>>> écrit :
>>>>>>
>>>>>>> What is the status of this KIP?
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 7/19/18 5:17 PM, Guozhang Wang wrote:
>>>>>>>> Hello Florian,
>>>>>>>>
>>>>>>>> Sorry for being late... Found myself keep apologizing for late
>>>> replies
>>>>>>>> these days. But I do want to push this KIP's progress forward
>> as I
>>>>>> see it
>>>>>>>> very important and helpful feature for extensibility.
>>>>>>>>
>>>>>>>> About the exceptions, I've gone through them and hopefully it is
>>> an
>>>>>>>> exhaustive list:
>>>>>>>>
>>>>>>>> 1. KTable#toStream()
>>>>>>>> 2. KStream#merge(KStream)
>>>>>>>> 3. KStream#process() / transform() / transformValues()
>>>>>>>> 4. KGroupedTable / KGroupedStream#count()
>>>>>>>>
>>>>>>>>
>>>>>>>> Here's my reasoning:
>>>>>>>>
>>>>>>>> * It is okay not letting users to override the name for 1/2,
>> since
>>>>>> they
>>>>>>> are
>>>>>>>> too trivial to be useful for debugging, plus their processor
>> names
>>>>>> would
>>>>>>>> not determine any related topic / store names.
>>>>>>>> * For 3, I'd vote for adding overloaded functions with Named.
>>>>>>>> * For 4, if users really want to name the processor she can call
>>>>>>>> aggregate() instead, so I think it is okay to skip this case.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <
>>>>>>> fhussonn...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> The option #3 seems to be a good alternative and I find the API
>>>> more
>>>>>>>>> elegant (thanks John).
>>>>>>>>>
>>>>>>>>> But, we still have the need to overload some methods either
>>> because
>>>>>>> they do
>>>>>>>>> not accept an action instance or because they are translated to
>>>>>> multiple
>>>>>>>>> processors.
>>>>>>>>>
>>>>>>>>> For example, this is the case for methods branch() and merge().
>>> We
>>>>>> could
>>>>>>>>> introduce a new interface Named (or maybe a different name ?)
>>> with
>>>> a
>>>>>>> method
>>>>>>>>> name(). All action interfaces could extend this one to
>> implement
>>>> the
>>>>>>> option
>>>>>>>>> 3).
>>>>>>>>> This would result by having the following overloads  :
>>>>>>>>>
>>>>>>>>> Stream<K, V> merge(final Named name, final KStream<K, V>
>> stream);
>>>>>>>>> KStream<K, V>[] branch(final Named name, final Predicate<?
>> super
>>>> K, ?
>>>>>>> super
>>>>>>>>> V>... predicates)
>>>>>>>>>
>>>>>>>>> N.B : The list above is  not exhaustive
>>>>>>>>>
>>>>>>>>> ---------
>>>>>>>>> user's code will become :
>>>>>>>>>
>>>>>>>>>         KStream<String, Integer> stream =
>> builder.stream("test");
>>>>>>>>>         KStream<String, Integer>[] branches =
>>>>>>>>> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
>>>>>>>>>                 Predicate.named("STREAM-PAIR-VALUE", (k, v) ->
>> v
>>> %
>>>> 2
>>>>>> ==
>>>>>>>>> 0),
>>>>>>>>>                 Predicate.named("STREAM-IMPAIR-VALUE", (k, v)
>> ->
>>> v
>>>> %
>>>>>> 2
>>>>>>> !=
>>>>>>>>> 0));
>>>>>>>>>
>>>>>>>>>         branches[0].to("pair");
>>>>>>>>>         branches[1].to("impair");
>>>>>>>>> ---------
>>>>>>>>>
>>>>>>>>> This is a mix of the options 3) and 1)
>>>>>>>>>
>>>>>>>>> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <
>> wangg...@gmail.com>
>>> a
>>>>>>> écrit :
>>>>>>>>>
>>>>>>>>>> Hi folks, just to summarize the options we have so far:
>>>>>>>>>>
>>>>>>>>>> 1) Add a new "as" for KTable / KStream, plus adding new fields
>>> for
>>>>>>>>>> operators-returns-void control objects (the current wiki's
>>>>>> proposal).
>>>>>>>>>>
>>>>>>>>>> Pros: no more overloads.
>>>>>>>>>> Cons: a bit departing with the current high-level API design
>> of
>>>> the
>>>>>>> DSL,
>>>>>>>>>> plus, the inconsistency between operators-returns-void and
>>>>>>>>>> operators-not-return-voids.
>>>>>>>>>>
>>>>>>>>>> 2) Add overloaded functions for all operators, that accepts a
>>> new
>>>>>>> control
>>>>>>>>>> object "Described".
>>>>>>>>>>
>>>>>>>>>> Pros: consistent with current APIs.
>>>>>>>>>> Cons: lots of overloaded functions to add.
>>>>>>>>>>
>>>>>>>>>> 3) Add another default function in the interface (thank you
>> J8!)
>>>> as
>>>>>>> John
>>>>>>>>>> proposed.
>>>>>>>>>>
>>>>>>>>>> Pros: no overloaded functions, no "Described".
>>>>>>>>>> Cons: do we lose lambda functions really (seems not if we
>>> provide
>>>> a
>>>>>>>>> "named"
>>>>>>>>>> for each func)? Plus "Described" may be more extensible than a
>>>>>> single
>>>>>>>>>> `String`.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> My principle of considering which one is better depends
>>> primarily
>>>> on
>>>>>>> "how
>>>>>>>>>> to make advanced users easily use the additional API, while
>>>> keeping
>>>>>> it
>>>>>>>>>> hidden from normal users who do not care at all". For that
>>>> purpose I
>>>>>>>>> think
>>>>>>>>>> 3) > 1) > 2).
>>>>>>>>>>
>>>>>>>>>> One caveat though, is that changing the interface would not be
>>>>>>>>>> binary-compatible though source-compatible, right? I.e. users
>>> need
>>>>>> to
>>>>>>>>>> recompile their code though no changes needed.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Another note: for 3), if we really want to keep extensibility
>> of
>>>>>>>>> Described
>>>>>>>>>> we could do sth. like:
>>>>>>>>>>
>>>>>>>>>> ---------
>>>>>>>>>>
>>>>>>>>>> public interface Predicate<K, V> {
>>>>>>>>>>     // existing method
>>>>>>>>>>     boolean test(final K key, final V value);
>>>>>>>>>>
>>>>>>>>>>     // new default method adds the ability to name the
>> predicate
>>>>>>>>>>     default Described described() {
>>>>>>>>>>         return new Described(null);
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> ----------
>>>>>>>>>>
>>>>>>>>>> where user's code becomes:
>>>>>>>>>>
>>>>>>>>>> stream.filter(named("key", (k, v) -> true));   // note `named`
>>> now
>>>>>> just
>>>>>>>>>> sets a Described("key") in "described()".
>>>>>>>>>>
>>>>>>>>>> stream.filter(described(Described.as("key", /* any other fancy
>>>>>>>>> parameters
>>>>>>>>>> in the future*/), (k, v) -> true));
>>>>>>>>>> ----------
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I feel it is not much likely that we'd need to extend it
>> further
>>>> in
>>>>>> the
>>>>>>>>>> future, so just a `String` would be good enough. But just
>>> listing
>>>>>> all
>>>>>>>>>> possibilities here.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <
>> j...@confluent.io
>>>>
>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Florian,
>>>>>>>>>>>
>>>>>>>>>>> Sorry I'm late to the party, but I missed the message
>>> originally.
>>>>>>>>>>>
>>>>>>>>>>> Regarding the names, it's probably a good idea to stick to
>> the
>>>> same
>>>>>>>>>>> character set we're currently using: letters, numbers, and
>>>> hyphens.
>>>>>>> The
>>>>>>>>>>> names are used in Kafka topics, files and folders, and
>> RocksDB
>>>>>>>>> databases,
>>>>>>>>>>> and we also need them to work with the file systems of
>> Windows,
>>>>>> Linux,
>>>>>>>>>> and
>>>>>>>>>>> MacOS. My opinion is that with a situation like that, it's
>>> better
>>>>>> to
>>>>>>> be
>>>>>>>>>>> conservative. It might also be a good idea to impose an upper
>>>>>> limit on
>>>>>>>>>> name
>>>>>>>>>>> length to avoid running afoul of any of those systems.
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>>
>>>>>>>>>>> It seems like there's a small debate between 1) adding a new
>>>>>> method to
>>>>>>>>>>> KStream (and maybe KTable) to modify its name after the fact,
>>> or
>>>> 2)
>>>>>>>>>>> piggy-backing on the config objects where they exist and
>> adding
>>>> one
>>>>>>>>> where
>>>>>>>>>>> they don't. To me, #2 is the better alternative even though
>> it
>>>>>>> produces
>>>>>>>>>>> more overloads and may be a bit awkward in places.
>>>>>>>>>>>
>>>>>>>>>>> The reason is simply that #1 is a high-level departure from
>> the
>>>>>>>>>>> graph-building paradigm we're using in the DSL. Consider:
>>>>>>>>>>>
>>>>>>>>>>> Graph.node1(config).node2(config)
>>>>>>>>>>>
>>>>>>>>>>> vs
>>>>>>>>>>>
>>>>>>>>>>> Graph.node1().config().node2().config()
>>>>>>>>>>>
>>>>>>>>>>> We could have done either, but we picked the former. I think
>>> it's
>>>>>>>>>> probably
>>>>>>>>>>> a good goal to try and stick to it so that developers can
>>> develop
>>>>>> and
>>>>>>>>>> rely
>>>>>>>>>>> on their instincts for how the DSL will behave.
>>>>>>>>>>>
>>>>>>>>>>> I do want to present one alternative to adding new config
>>>> objects:
>>>>>> we
>>>>>>>>> can
>>>>>>>>>>> just add a "name()" method to all our "action" interfaces.
>> For
>>>>>>> example,
>>>>>>>>>>> I'll demonstrate how we can add a "name" to Predicate and
>> then
>>>> use
>>>>>> it
>>>>>>>>> to
>>>>>>>>>>> name a "KStream#filter" DSL operator:
>>>>>>>>>>>
>>>>>>>>>>> public interface Predicate<K, V> {
>>>>>>>>>>>     // existing method
>>>>>>>>>>>     boolean test(final K key, final V value);
>>>>>>>>>>>
>>>>>>>>>>>     // new default method adds the ability to name the
>>> predicate
>>>>>>>>>>>     default String name() {
>>>>>>>>>>>         return null;
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     // new static factory method adds the ability to wrap
>>> lambda
>>>>>>>>>> predicates
>>>>>>>>>>> with a named predicate
>>>>>>>>>>>     static <K, V> Predicate<K, V> named(final String name,
>>> final
>>>>>>>>>>> Predicate<K, V> predicate) {
>>>>>>>>>>>         return new Predicate<K, V>() {
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public boolean test(final K key, final V value) {
>>>>>>>>>>>                 return predicate.test(key, value);
>>>>>>>>>>>             }
>>>>>>>>>>>
>>>>>>>>>>>             @Override
>>>>>>>>>>>             public String name() {
>>>>>>>>>>>                 return name;
>>>>>>>>>>>             }
>>>>>>>>>>>         };
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Then, here's how it would look to use it:
>>>>>>>>>>>
>>>>>>>>>>> // Anonymous predicates continue to work just fine
>>>>>>>>>>> stream.filter((k, v) -> true);
>>>>>>>>>>>
>>>>>>>>>>> // Devs can swap in a Predicate that implements the name()
>>>> method.
>>>>>>>>>>> stream.filter(new Predicate<Object, Object>() {
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public boolean test(final Object key, final Object
>> value) {
>>>>>>>>>>>         return true;
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public String name() {
>>>>>>>>>>>         return "hey";
>>>>>>>>>>>     }
>>>>>>>>>>> });
>>>>>>>>>>>
>>>>>>>>>>> // Or they can wrap their existing lambda using the static
>>>> factory
>>>>>>>>> method
>>>>>>>>>>> stream.filter(named("key", (k, v) -> true));
>>>>>>>>>>>
>>>>>>>>>>> Just a thought.
>>>>>>>>>>>
>>>>>>>>>>> Overall, I think it's really valuable to be able to name the
>>>>>>>>> processors,
>>>>>>>>>>> for all the reasons you mentioned in the KIP. So thank you
>> for
>>>>>>>>>> introducing
>>>>>>>>>>> this!
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois <
>>>>>>>>> fhussonn...@gmail.com
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi, thank you very much for all you suggestions. I've
>> started
>>> to
>>>>>>>>> update
>>>>>>>>>>> the
>>>>>>>>>>>> KIP (
>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>
>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>>>>>>> ).
>>>>>>>>>>>> Also, I propose to rename the Processed class into
>> Described -
>>>>>> this
>>>>>>>>>> will
>>>>>>>>>>> be
>>>>>>>>>>>> more meaningful (but this is just a detail).
>>>>>>>>>>>>
>>>>>>>>>>>> I'm OK to not enforcing uppercase for specific names but
>>> should
>>>> we
>>>>>>>>>> allow
>>>>>>>>>>>> arbitrary names with whitespaces for example ? Currently, I
>>>> can't
>>>>>>>>> tell
>>>>>>>>>> if
>>>>>>>>>>>> this can lead to some side effects ?
>>>>>>>>>>>>
>>>>>>>>>>>> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <
>>>>>> matth...@confluent.io
>>>>>>>>>>
>>>>>>>>>> a
>>>>>>>>>>>> écrit :
>>>>>>>>>>>>
>>>>>>>>>>>>> Just catching up on this thread.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I like the general idea. Couple of comments:
>>>>>>>>>>>>>
>>>>>>>>>>>>>  - I think that adding `Processed` (or maybe a different
>>> name?)
>>>>>> is
>>>>>>>>> a
>>>>>>>>>>>>> valid proposal for stateless operators that only have a
>>> single
>>>>>>>>>> overload
>>>>>>>>>>>>> atm. It would align with the overall API design.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  - for all methods with multiple existing overloads, we can
>>>>>>>>> consider
>>>>>>>>>> to
>>>>>>>>>>>>> extend `Consumed`, `Produced`, `Materialized` etc to take
>> an
>>>>>>>>>> additional
>>>>>>>>>>>>> processor name (not sure atm how elegant this is; we would
>>> need
>>>>>> to
>>>>>>>>>>>>> "play" with the API a little bit; the advantage would be,
>>> that
>>>> we
>>>>>>>>> do
>>>>>>>>>>> not
>>>>>>>>>>>>> add more overloads what seems to be key for this KIP)
>>>>>>>>>>>>>
>>>>>>>>>>>>>  - operators return void: while I agree that the "name
>> first"
>>>>>>>>>> chaining
>>>>>>>>>>>>> idea is not very intuitive, it might still work, if we name
>>> the
>>>>>>>>>> method
>>>>>>>>>>>>> correctly (again, we would need to "play" with the API a
>>> little
>>>>>> bit
>>>>>>>>>> to
>>>>>>>>>>>> see)
>>>>>>>>>>>>>
>>>>>>>>>>>>>  - for DSL operators that are translated to multiple nodes:
>>> it
>>>>>>>>> might
>>>>>>>>>>>>> make sense to use the specified operator name as prefix and
>>> add
>>>>>>>>>>>>> reasonable suffixes. For example, a join translates into 5
>>>>>>>>> operators
>>>>>>>>>>>>> that could be name "name-left-store-processor",
>>>>>>>>>>>>> "name-left-join-processor", "name-right-store-processor",
>>>>>>>>>>>>> "name-right-join-processor", and
>> "name-join-merge-processor"
>>>> (or
>>>>>>>>>>>>> similar). Maybe just using numbers might also work.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  - I think, we should strip the number suffixes if a user
>>>>>> provides
>>>>>>>>>>> names
>>>>>>>>>>>>>
>>>>>>>>>>>>>  - enforcing upper case seems to be tricky: for example, we
>>> do
>>>>>> not
>>>>>>>>>>>>> enforce upper case for store names and we cannot easily
>>> change
>>>> it
>>>>>>>>> as
>>>>>>>>>> it
>>>>>>>>>>>>> would break compatibility -- thus, for consistency reasons
>> we
>>>>>> might
>>>>>>>>>> not
>>>>>>>>>>>>> want to do this
>>>>>>>>>>>>>
>>>>>>>>>>>>>  - for better understand of the impact of the KIP, it would
>>> be
>>>>>>>>> quite
>>>>>>>>>>>>> helpful if you would list all method names that are
>> affected
>>> in
>>>>>> the
>>>>>>>>>> KIP
>>>>>>>>>>>>> (ie, list all newly added overloads)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 5/31/18 6:40 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>> Hi Florian,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re 1: I think changing the KStreamImpl / KTableImpl to
>> allow
>>>>>>>>>>> modifying
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> processor name after the operator is fine as long as we do
>>> the
>>>>>>>>>> check
>>>>>>>>>>>>> again
>>>>>>>>>>>>>> when modifying that. In fact, we are having some topology
>>>>>>>>>>> optimization
>>>>>>>>>>>>>> going on which may modify processor names in the final
>>>> topology
>>>>>>>>>>>> anyways (
>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4983). Semantically
>> I
>>>>>> think
>>>>>>>>>> it
>>>>>>>>>>> is
>>>>>>>>>>>>>> easier to understand to developers than "deciding the
>>>> processor
>>>>>>>>>> name
>>>>>>>>>>>> for
>>>>>>>>>>>>>> the next operator".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re 2: Yeah I'm thinking that for operators that translates
>>> to
>>>>>>>>>>> multiple
>>>>>>>>>>>>>> processor names, we can still use the provided "hint" to
>>> name
>>>>>> the
>>>>>>>>>>>>> processor
>>>>>>>>>>>>>> names, e.g. for Joins we can name them as `join-foo-this`
>>> and
>>>>>>>>>>>>>> `join-foo-that` etc if user calls `as("foo")`.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Re 3: The motivation I had about removing the suffix is
>> that
>>>> it
>>>>>>>>> has
>>>>>>>>>>>> huge
>>>>>>>>>>>>>> restrictions on topology compatibilities: consider if user
>>>> code
>>>>>>>>>>> added a
>>>>>>>>>>>>> new
>>>>>>>>>>>>>> operator, or library does some optimization to remove some
>>>>>>>>>> operators,
>>>>>>>>>>>> the
>>>>>>>>>>>>>> suffix indexing may be changed for a large amount of the
>>>>>>>>> processor
>>>>>>>>>>>> names:
>>>>>>>>>>>>>> this will in turn change the internal state store names,
>> as
>>>> well
>>>>>>>>> as
>>>>>>>>>>>>>> internal topic names as well, making the new application
>>>>>> topology
>>>>>>>>>> to
>>>>>>>>>>> be
>>>>>>>>>>>>>> incompatible with the ones. One rationale I had about this
>>> KIP
>>>>>> is
>>>>>>>>>>> that
>>>>>>>>>>>>>> aligned this effort, moving forward we can allow users to
>>>>>>>>> customize
>>>>>>>>>>>>>> internal names so that they can still be reused even with
>>>>>>>>> topology
>>>>>>>>>>>>> changes
>>>>>>>>>>>>>> (e.g. KIP-230), so I think removing the suffix index would
>>> be
>>>>>>>>> more
>>>>>>>>>>>>>> applicable in the long run.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <
>>>>>>>>>>>>> fhussonn...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi ,
>>>>>>>>>>>>>>> Thank you very much for your feedback.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1/
>>>>>>>>>>>>>>> I agree that overloading most of the methods with a
>>> Processed
>>>>>> is
>>>>>>>>>> not
>>>>>>>>>>>>> ideal.
>>>>>>>>>>>>>>> I've started modifying the KStream API and I got to the
>>> same
>>>>>>>>>>>> conclusion.
>>>>>>>>>>>>>>> Also ading a new method directly to KStreamImpl and
>>>> KTableImpl
>>>>>>>>>>> classes
>>>>>>>>>>>>>>> seems to be a better option.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> However a processor name cannot be redefined after
>> calling
>>> an
>>>>>>>>>>> operator
>>>>>>>>>>>>> (or
>>>>>>>>>>>>>>> maybe I miss something in the code).
>>>>>>>>>>>>>>> From my understanding, this will only set the KStream
>> name
>>>>>>>>>> property
>>>>>>>>>>>> not
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> processor name previsouly added to the topology builder -
>>>>>>>>> leading
>>>>>>>>>> to
>>>>>>>>>>>>>>> InvalidTopology exception.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So the new method should actually defines the name of the
>>>> next
>>>>>>>>>>>>> processor :
>>>>>>>>>>>>>>> Below is an example :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *stream.as <http://stream.as
>>>>>>>>>>> (Processed.name("MAPPE_TO_UPPERCASE")*
>>>>>>>>>>>>>>> *          .map( (k, v) -> KeyValue.pair(k,
>>>> v.toUpperCase()))*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think this approach could solve the cases for methods
>>>>>>>>> returning
>>>>>>>>>>>> void ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding this new method we have two possible
>>>> implementations
>>>>>> :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1. Adding a method like : withName(String
>> processorName)
>>>>>>>>>>>>>>>    2. or adding a method accepting an Processed object :
>>>>>>>>>>>> as(Processed).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I think solution 2. is preferable as the Processed class
>>>> could
>>>>>>>>> be
>>>>>>>>>>>>> enriched
>>>>>>>>>>>>>>> further (in futur).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2/
>>>>>>>>>>>>>>> As Guozhang said some operators add internal processors.
>>>>>>>>>>>>>>> For example the branch() method create one KStreamBranch
>>>>>>>>> processor
>>>>>>>>>>> to
>>>>>>>>>>>>> route
>>>>>>>>>>>>>>> records and one KStreamPassThrough processor for each
>>> branch.
>>>>>>>>>>>>>>> In that situation only the parent processor can be named.
>>> For
>>>>>>>>>>> children
>>>>>>>>>>>>>>> processors we could keep the current behaviour that add a
>>>>>> suffix
>>>>>>>>>>> (i.e
>>>>>>>>>>>>>>> KSTREAM-BRANCHCHILD-)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This also the case for the join() method that result to
>>>> adding
>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>> processors to the topology (windowing, left/right joins
>>> and a
>>>>>>>>>> merge
>>>>>>>>>>>>>>> processor).
>>>>>>>>>>>>>>> I think, like for the branch method users could only
>>> define a
>>>>>>>>>>>> processor
>>>>>>>>>>>>>>> name prefix.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3/
>>>>>>>>>>>>>>> I think we should  still added a suffix like
>> "-0000000000"
>>> to
>>>>>>>>>>>> processor
>>>>>>>>>>>>>>> name and enforce uppercases as this will keep some
>>>> consistency
>>>>>>>>>> with
>>>>>>>>>>>> the
>>>>>>>>>>>>>>> ones generated by the API.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 4/
>>>>>>>>>>>>>>> Yes, the KTable interface should be modified like KStream
>>> to
>>>>>>>>> allow
>>>>>>>>>>>>> custom
>>>>>>>>>>>>>>> processor names definition.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le jeu. 31 mai 2018 à 19:18, Damian Guy <
>>>> damian....@gmail.com>
>>>>>>>>> a
>>>>>>>>>>>> écrit
>>>>>>>>>>>>> :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Florian,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the KIP. What about KTable and other DSL
>>>>>> interfaces?
>>>>>>>>>>> Will
>>>>>>>>>>>>> they
>>>>>>>>>>>>>>>> not want to be able to do the same thing?
>>>>>>>>>>>>>>>> It would be good to see a complete set of the public API
>>>>>>>>> changes.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <
>>>>>> wangg...@gmail.com
>>>>>>>>>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hello Florian,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the KIP. I have some meta feedbacks on the
>>>>>>>>> proposal:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. You mentioned that this `Processed` object will be
>>> added
>>>>>>>>> to a
>>>>>>>>>>> new
>>>>>>>>>>>>>>>>> overloaded variant of all the stateless operators, what
>>>> about
>>>>>>>>>> the
>>>>>>>>>>>>>>>> stateful
>>>>>>>>>>>>>>>>> operators? Would like to hear your opinions if you have
>>>>>>>>> thought
>>>>>>>>>>>> about
>>>>>>>>>>>>>>>> that:
>>>>>>>>>>>>>>>>> note for stateful operators they will usually be mapped
>>> to
>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>> processor node names, so we probably need to come up
>> with
>>>>>> some
>>>>>>>>>>> ways
>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> define all their names.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. I share the same concern with Bill as for adding
>> lots
>>> of
>>>>>>>>> new
>>>>>>>>>>>>>>> overload
>>>>>>>>>>>>>>>>> functions into the stateless operators, as we have just
>>>> spent
>>>>>>>>>>> quite
>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>> effort in trimming them since 1.0.0 release. If the
>> goal
>>> is
>>>>>> to
>>>>>>>>>>> just
>>>>>>>>>>>>>>>> provide
>>>>>>>>>>>>>>>>> some "hints" on the generated processor node names, not
>>>>>>>>> strictly
>>>>>>>>>>>>>>>> enforcing
>>>>>>>>>>>>>>>>> the exact names that to be generated, then how about we
>>>> just
>>>>>>>>>> add a
>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>> function to `KStream` and `KTable` classes like:
>>>>>>>>>> "as(Processed)",
>>>>>>>>>>>> with
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> semantics as "the latest operators that generate this
>>>> KStream
>>>>>>>>> /
>>>>>>>>>>>> KTable
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>> be named accordingly to this hint".
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The only caveat, is that for all operators like
>>>> `KStream#to`
>>>>>>>>> and
>>>>>>>>>>>>>>>>> `KStream#print` that returns void, this alternative
>> would
>>>> not
>>>>>>>>>>> work.
>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>> the current operators:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> a. KStream#print,
>>>>>>>>>>>>>>>>> b. KStream#foreach,
>>>>>>>>>>>>>>>>> c. KStream#to,
>>>>>>>>>>>>>>>>> d. KStream#process
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I personally felt that except `KStream#process` users
>>> would
>>>>>>>>> not
>>>>>>>>>>>>> usually
>>>>>>>>>>>>>>>>> bother to override their names, and for
>> `KStream#process`
>>>> we
>>>>>>>>>> could
>>>>>>>>>>>> add
>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> overload variant with the additional Processed object.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3. In your example, the processor names are still added
>>>> with
>>>>>> a
>>>>>>>>>>>> suffix
>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>>> -0000000000", is this intentional? If yes, why (I
>> thought
>>>>>> with
>>>>>>>>>>> user
>>>>>>>>>>>>>>>>> specified processor name hints we will not add suffix
>> to
>>>>>>>>>>> distinguish
>>>>>>>>>>>>>>>>> different nodes of the same type any more)?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck <
>>>>>>>>> bbej...@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Florian,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the KIP.  I think being able to add more
>>>> context
>>>>>>>>> to
>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> processor names would be useful.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I like the idea of adding a "withProcessorName" to
>>>> Produced,
>>>>>>>>>>>> Consumed
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> Joined.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> But instead of adding the "Processed" parameter to a
>>> large
>>>>>>>>>>>> percentage
>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>> the methods, which would result in overloaded methods
>>>> (which
>>>>>>>>> we
>>>>>>>>>>>>>>> removed
>>>>>>>>>>>>>>>>>> quite a bit with KIP-182) what do you think of adding
>> a
>>>>>>>>> method
>>>>>>>>>>>>>>>>>> to the AbstractStream class "withName(String
>>>>>> processorName)"?
>>>>>>>>>> BTW
>>>>>>>>>>>> I"m
>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> married to the method name, it's the best I can do off
>>> the
>>>>>>>>> top
>>>>>>>>>> of
>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>> head.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For the methods that return void, we'd have to add a
>>>>>>>>> parameter,
>>>>>>>>>>> but
>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> would at least cut down on the number of overloaded
>>>> methods
>>>>>>>>> in
>>>>>>>>>>> the
>>>>>>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Just my 2 cents.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois <
>>>>>>>>>>>>>>>>> fhussonn...@gmail.com
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I would like to start a new discussion on following
>>> KIP :
>>>>>>>>>>>>>>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>
>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is still a draft.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Looking forward for your feedback.
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Florian HUSSONNOIS
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to