What is the status of this KIP?

-Matthias

On 7/19/18 5:17 PM, Guozhang Wang wrote:
> Hello Florian,
> 
> Sorry for being late... Found myself keep apologizing for late replies
> these days. But I do want to push this KIP's progress forward as I see it
> very important and helpful feature for extensibility.
> 
> About the exceptions, I've gone through them and hopefully it is an
> exhaustive list:
> 
> 1. KTable#toStream()
> 2. KStream#merge(KStream)
> 3. KStream#process() / transform() / transformValues()
> 4. KGroupedTable / KGroupedStream#count()
> 
> 
> Here's my reasoning:
> 
> * It is okay not letting users to override the name for 1/2, since they are
> too trivial to be useful for debugging, plus their processor names would
> not determine any related topic / store names.
> * For 3, I'd vote for adding overloaded functions with Named.
> * For 4, if users really want to name the processor she can call
> aggregate() instead, so I think it is okay to skip this case.
> 
> 
> Guozhang
> 
> 
> 
> On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <fhussonn...@gmail.com>
> wrote:
> 
>> Hi,
>>
>> The option #3 seems to be a good alternative and I find the API more
>> elegant (thanks John).
>>
>> But, we still have the need to overload some methods either because they do
>> not accept an action instance or because they are translated to multiple
>> processors.
>>
>> For example, this is the case for methods branch() and merge(). We could
>> introduce a new interface Named (or maybe a different name ?) with a method
>> name(). All action interfaces could extend this one to implement the option
>> 3).
>> This would result by having the following overloads  :
>>
>> Stream<K, V> merge(final Named name, final KStream<K, V> stream);
>> KStream<K, V>[] branch(final Named name, final Predicate<? super K, ? super
>> V>... predicates)
>>
>> N.B : The list above is  not exhaustive
>>
>> ---------
>> user's code will become :
>>
>>         KStream<String, Integer> stream = builder.stream("test");
>>         KStream<String, Integer>[] branches =
>> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
>>                 Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 ==
>> 0),
>>                 Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 !=
>> 0));
>>
>>         branches[0].to("pair");
>>         branches[1].to("impair");
>> ---------
>>
>> This is a mix of the options 3) and 1)
>>
>> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <wangg...@gmail.com> a écrit :
>>
>>> Hi folks, just to summarize the options we have so far:
>>>
>>> 1) Add a new "as" for KTable / KStream, plus adding new fields for
>>> operators-returns-void control objects (the current wiki's proposal).
>>>
>>> Pros: no more overloads.
>>> Cons: a bit departing with the current high-level API design of the DSL,
>>> plus, the inconsistency between operators-returns-void and
>>> operators-not-return-voids.
>>>
>>> 2) Add overloaded functions for all operators, that accepts a new control
>>> object "Described".
>>>
>>> Pros: consistent with current APIs.
>>> Cons: lots of overloaded functions to add.
>>>
>>> 3) Add another default function in the interface (thank you J8!) as John
>>> proposed.
>>>
>>> Pros: no overloaded functions, no "Described".
>>> Cons: do we lose lambda functions really (seems not if we provide a
>> "named"
>>> for each func)? Plus "Described" may be more extensible than a single
>>> `String`.
>>>
>>>
>>> My principle of considering which one is better depends primarily on "how
>>> to make advanced users easily use the additional API, while keeping it
>>> hidden from normal users who do not care at all". For that purpose I
>> think
>>> 3) > 1) > 2).
>>>
>>> One caveat though, is that changing the interface would not be
>>> binary-compatible though source-compatible, right? I.e. users need to
>>> recompile their code though no changes needed.
>>>
>>>
>>>
>>> Another note: for 3), if we really want to keep extensibility of
>> Described
>>> we could do sth. like:
>>>
>>> ---------
>>>
>>> public interface Predicate<K, V> {
>>>     // existing method
>>>     boolean test(final K key, final V value);
>>>
>>>     // new default method adds the ability to name the predicate
>>>     default Described described() {
>>>         return new Described(null);
>>>     }
>>> }
>>>
>>> ----------
>>>
>>> where user's code becomes:
>>>
>>> stream.filter(named("key", (k, v) -> true));   // note `named` now just
>>> sets a Described("key") in "described()".
>>>
>>> stream.filter(described(Described.as("key", /* any other fancy
>> parameters
>>> in the future*/), (k, v) -> true));
>>> ----------
>>>
>>>
>>> I feel it is not much likely that we'd need to extend it further in the
>>> future, so just a `String` would be good enough. But just listing all
>>> possibilities here.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <j...@confluent.io> wrote:
>>>
>>>> Hi Florian,
>>>>
>>>> Sorry I'm late to the party, but I missed the message originally.
>>>>
>>>> Regarding the names, it's probably a good idea to stick to the same
>>>> character set we're currently using: letters, numbers, and hyphens. The
>>>> names are used in Kafka topics, files and folders, and RocksDB
>> databases,
>>>> and we also need them to work with the file systems of Windows, Linux,
>>> and
>>>> MacOS. My opinion is that with a situation like that, it's better to be
>>>> conservative. It might also be a good idea to impose an upper limit on
>>> name
>>>> length to avoid running afoul of any of those systems.
>>>>
>>>> ---
>>>>
>>>> It seems like there's a small debate between 1) adding a new method to
>>>> KStream (and maybe KTable) to modify its name after the fact, or 2)
>>>> piggy-backing on the config objects where they exist and adding one
>> where
>>>> they don't. To me, #2 is the better alternative even though it produces
>>>> more overloads and may be a bit awkward in places.
>>>>
>>>> The reason is simply that #1 is a high-level departure from the
>>>> graph-building paradigm we're using in the DSL. Consider:
>>>>
>>>> Graph.node1(config).node2(config)
>>>>
>>>> vs
>>>>
>>>> Graph.node1().config().node2().config()
>>>>
>>>> We could have done either, but we picked the former. I think it's
>>> probably
>>>> a good goal to try and stick to it so that developers can develop and
>>> rely
>>>> on their instincts for how the DSL will behave.
>>>>
>>>> I do want to present one alternative to adding new config objects: we
>> can
>>>> just add a "name()" method to all our "action" interfaces. For example,
>>>> I'll demonstrate how we can add a "name" to Predicate and then use it
>> to
>>>> name a "KStream#filter" DSL operator:
>>>>
>>>> public interface Predicate<K, V> {
>>>>     // existing method
>>>>     boolean test(final K key, final V value);
>>>>
>>>>     // new default method adds the ability to name the predicate
>>>>     default String name() {
>>>>         return null;
>>>>     }
>>>>
>>>>     // new static factory method adds the ability to wrap lambda
>>> predicates
>>>> with a named predicate
>>>>     static <K, V> Predicate<K, V> named(final String name, final
>>>> Predicate<K, V> predicate) {
>>>>         return new Predicate<K, V>() {
>>>>             @Override
>>>>             public boolean test(final K key, final V value) {
>>>>                 return predicate.test(key, value);
>>>>             }
>>>>
>>>>             @Override
>>>>             public String name() {
>>>>                 return name;
>>>>             }
>>>>         };
>>>>     }
>>>> }
>>>>
>>>> Then, here's how it would look to use it:
>>>>
>>>> // Anonymous predicates continue to work just fine
>>>> stream.filter((k, v) -> true);
>>>>
>>>> // Devs can swap in a Predicate that implements the name() method.
>>>> stream.filter(new Predicate<Object, Object>() {
>>>>     @Override
>>>>     public boolean test(final Object key, final Object value) {
>>>>         return true;
>>>>     }
>>>>
>>>>     @Override
>>>>     public String name() {
>>>>         return "hey";
>>>>     }
>>>> });
>>>>
>>>> // Or they can wrap their existing lambda using the static factory
>> method
>>>> stream.filter(named("key", (k, v) -> true));
>>>>
>>>> Just a thought.
>>>>
>>>> Overall, I think it's really valuable to be able to name the
>> processors,
>>>> for all the reasons you mentioned in the KIP. So thank you for
>>> introducing
>>>> this!
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois <
>> fhussonn...@gmail.com
>>>>
>>>> wrote:
>>>>
>>>>> Hi, thank you very much for all you suggestions. I've started to
>> update
>>>> the
>>>>> KIP (
>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>> ).
>>>>> Also, I propose to rename the Processed class into Described - this
>>> will
>>>> be
>>>>> more meaningful (but this is just a detail).
>>>>>
>>>>> I'm OK to not enforcing uppercase for specific names but should we
>>> allow
>>>>> arbitrary names with whitespaces for example ? Currently, I can't
>> tell
>>> if
>>>>> this can lead to some side effects ?
>>>>>
>>>>> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <matth...@confluent.io
>>>
>>> a
>>>>> écrit :
>>>>>
>>>>>> Just catching up on this thread.
>>>>>>
>>>>>> I like the general idea. Couple of comments:
>>>>>>
>>>>>>  - I think that adding `Processed` (or maybe a different name?) is
>> a
>>>>>> valid proposal for stateless operators that only have a single
>>> overload
>>>>>> atm. It would align with the overall API design.
>>>>>>
>>>>>>  - for all methods with multiple existing overloads, we can
>> consider
>>> to
>>>>>> extend `Consumed`, `Produced`, `Materialized` etc to take an
>>> additional
>>>>>> processor name (not sure atm how elegant this is; we would need to
>>>>>> "play" with the API a little bit; the advantage would be, that we
>> do
>>>> not
>>>>>> add more overloads what seems to be key for this KIP)
>>>>>>
>>>>>>  - operators return void: while I agree that the "name first"
>>> chaining
>>>>>> idea is not very intuitive, it might still work, if we name the
>>> method
>>>>>> correctly (again, we would need to "play" with the API a little bit
>>> to
>>>>> see)
>>>>>>
>>>>>>  - for DSL operators that are translated to multiple nodes: it
>> might
>>>>>> make sense to use the specified operator name as prefix and add
>>>>>> reasonable suffixes. For example, a join translates into 5
>> operators
>>>>>> that could be name "name-left-store-processor",
>>>>>> "name-left-join-processor", "name-right-store-processor",
>>>>>> "name-right-join-processor", and "name-join-merge-processor" (or
>>>>>> similar). Maybe just using numbers might also work.
>>>>>>
>>>>>>  - I think, we should strip the number suffixes if a user provides
>>>> names
>>>>>>
>>>>>>  - enforcing upper case seems to be tricky: for example, we do not
>>>>>> enforce upper case for store names and we cannot easily change it
>> as
>>> it
>>>>>> would break compatibility -- thus, for consistency reasons we might
>>> not
>>>>>> want to do this
>>>>>>
>>>>>>  - for better understand of the impact of the KIP, it would be
>> quite
>>>>>> helpful if you would list all method names that are affected in the
>>> KIP
>>>>>> (ie, list all newly added overloads)
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 5/31/18 6:40 PM, Guozhang Wang wrote:
>>>>>>> Hi Florian,
>>>>>>>
>>>>>>> Re 1: I think changing the KStreamImpl / KTableImpl to allow
>>>> modifying
>>>>>> the
>>>>>>> processor name after the operator is fine as long as we do the
>>> check
>>>>>> again
>>>>>>> when modifying that. In fact, we are having some topology
>>>> optimization
>>>>>>> going on which may modify processor names in the final topology
>>>>> anyways (
>>>>>>> https://github.com/apache/kafka/pull/4983). Semantically I think
>>> it
>>>> is
>>>>>>> easier to understand to developers than "deciding the processor
>>> name
>>>>> for
>>>>>>> the next operator".
>>>>>>>
>>>>>>> Re 2: Yeah I'm thinking that for operators that translates to
>>>> multiple
>>>>>>> processor names, we can still use the provided "hint" to name the
>>>>>> processor
>>>>>>> names, e.g. for Joins we can name them as `join-foo-this` and
>>>>>>> `join-foo-that` etc if user calls `as("foo")`.
>>>>>>>
>>>>>>> Re 3: The motivation I had about removing the suffix is that it
>> has
>>>>> huge
>>>>>>> restrictions on topology compatibilities: consider if user code
>>>> added a
>>>>>> new
>>>>>>> operator, or library does some optimization to remove some
>>> operators,
>>>>> the
>>>>>>> suffix indexing may be changed for a large amount of the
>> processor
>>>>> names:
>>>>>>> this will in turn change the internal state store names, as well
>> as
>>>>>>> internal topic names as well, making the new application topology
>>> to
>>>> be
>>>>>>> incompatible with the ones. One rationale I had about this KIP is
>>>> that
>>>>>>> aligned this effort, moving forward we can allow users to
>> customize
>>>>>>> internal names so that they can still be reused even with
>> topology
>>>>>> changes
>>>>>>> (e.g. KIP-230), so I think removing the suffix index would be
>> more
>>>>>>> applicable in the long run.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Guozhang
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <
>>>>>> fhussonn...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi ,
>>>>>>>> Thank you very much for your feedback.
>>>>>>>>
>>>>>>>> 1/
>>>>>>>> I agree that overloading most of the methods with a Processed is
>>> not
>>>>>> ideal.
>>>>>>>> I've started modifying the KStream API and I got to the same
>>>>> conclusion.
>>>>>>>> Also ading a new method directly to KStreamImpl and KTableImpl
>>>> classes
>>>>>>>> seems to be a better option.
>>>>>>>>
>>>>>>>> However a processor name cannot be redefined after calling an
>>>> operator
>>>>>> (or
>>>>>>>> maybe I miss something in the code).
>>>>>>>> From my understanding, this will only set the KStream name
>>> property
>>>>> not
>>>>>> the
>>>>>>>> processor name previsouly added to the topology builder -
>> leading
>>> to
>>>>>>>> InvalidTopology exception.
>>>>>>>>
>>>>>>>> So the new method should actually defines the name of the next
>>>>>> processor :
>>>>>>>> Below is an example :
>>>>>>>>
>>>>>>>> *stream.as <http://stream.as
>>>> (Processed.name("MAPPE_TO_UPPERCASE")*
>>>>>>>> *          .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))*
>>>>>>>>
>>>>>>>> I think this approach could solve the cases for methods
>> returning
>>>>> void ?
>>>>>>>>
>>>>>>>> Regarding this new method we have two possible implementations :
>>>>>>>>
>>>>>>>>    1. Adding a method like : withName(String processorName)
>>>>>>>>    2. or adding a method accepting an Processed object :
>>>>> as(Processed).
>>>>>>>>
>>>>>>>> I think solution 2. is preferable as the Processed class could
>> be
>>>>>> enriched
>>>>>>>> further (in futur).
>>>>>>>>
>>>>>>>> 2/
>>>>>>>> As Guozhang said some operators add internal processors.
>>>>>>>> For example the branch() method create one KStreamBranch
>> processor
>>>> to
>>>>>> route
>>>>>>>> records and one KStreamPassThrough processor for each branch.
>>>>>>>> In that situation only the parent processor can be named. For
>>>> children
>>>>>>>> processors we could keep the current behaviour that add a suffix
>>>> (i.e
>>>>>>>> KSTREAM-BRANCHCHILD-)
>>>>>>>>
>>>>>>>> This also the case for the join() method that result to adding
>>>>> multiple
>>>>>>>> processors to the topology (windowing, left/right joins and a
>>> merge
>>>>>>>> processor).
>>>>>>>> I think, like for the branch method users could only define a
>>>>> processor
>>>>>>>> name prefix.
>>>>>>>>
>>>>>>>> 3/
>>>>>>>> I think we should  still added a suffix like "-0000000000" to
>>>>> processor
>>>>>>>> name and enforce uppercases as this will keep some consistency
>>> with
>>>>> the
>>>>>>>> ones generated by the API.
>>>>>>>>
>>>>>>>> 4/
>>>>>>>> Yes, the KTable interface should be modified like KStream to
>> allow
>>>>>> custom
>>>>>>>> processor names definition.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>>
>>>>>>>> Le jeu. 31 mai 2018 à 19:18, Damian Guy <damian....@gmail.com>
>> a
>>>>> écrit
>>>>>> :
>>>>>>>>
>>>>>>>>> Hi Florian,
>>>>>>>>>
>>>>>>>>> Thanks for the KIP. What about KTable and other DSL interfaces?
>>>> Will
>>>>>> they
>>>>>>>>> not want to be able to do the same thing?
>>>>>>>>> It would be good to see a complete set of the public API
>> changes.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <wangg...@gmail.com
>>>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Florian,
>>>>>>>>>>
>>>>>>>>>> Thanks for the KIP. I have some meta feedbacks on the
>> proposal:
>>>>>>>>>>
>>>>>>>>>> 1. You mentioned that this `Processed` object will be added
>> to a
>>>> new
>>>>>>>>>> overloaded variant of all the stateless operators, what about
>>> the
>>>>>>>>> stateful
>>>>>>>>>> operators? Would like to hear your opinions if you have
>> thought
>>>>> about
>>>>>>>>> that:
>>>>>>>>>> note for stateful operators they will usually be mapped to
>>>> multiple
>>>>>>>>>> processor node names, so we probably need to come up with some
>>>> ways
>>>>> to
>>>>>>>>>> define all their names.
>>>>>>>>>>
>>>>>>>>>> 2. I share the same concern with Bill as for adding lots of
>> new
>>>>>>>> overload
>>>>>>>>>> functions into the stateless operators, as we have just spent
>>>> quite
>>>>>>>> some
>>>>>>>>>> effort in trimming them since 1.0.0 release. If the goal is to
>>>> just
>>>>>>>>> provide
>>>>>>>>>> some "hints" on the generated processor node names, not
>> strictly
>>>>>>>>> enforcing
>>>>>>>>>> the exact names that to be generated, then how about we just
>>> add a
>>>>> new
>>>>>>>>>> function to `KStream` and `KTable` classes like:
>>> "as(Processed)",
>>>>> with
>>>>>>>>> the
>>>>>>>>>> semantics as "the latest operators that generate this KStream
>> /
>>>>> KTable
>>>>>>>>> will
>>>>>>>>>> be named accordingly to this hint".
>>>>>>>>>>
>>>>>>>>>> The only caveat, is that for all operators like `KStream#to`
>> and
>>>>>>>>>> `KStream#print` that returns void, this alternative would not
>>>> work.
>>>>>> But
>>>>>>>>> for
>>>>>>>>>> the current operators:
>>>>>>>>>>
>>>>>>>>>> a. KStream#print,
>>>>>>>>>> b. KStream#foreach,
>>>>>>>>>> c. KStream#to,
>>>>>>>>>> d. KStream#process
>>>>>>>>>>
>>>>>>>>>> I personally felt that except `KStream#process` users would
>> not
>>>>>> usually
>>>>>>>>>> bother to override their names, and for `KStream#process` we
>>> could
>>>>> add
>>>>>>>> an
>>>>>>>>>> overload variant with the additional Processed object.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 3. In your example, the processor names are still added with a
>>>>> suffix
>>>>>>>>> like
>>>>>>>>>> "
>>>>>>>>>> -0000000000", is this intentional? If yes, why (I thought with
>>>> user
>>>>>>>>>> specified processor name hints we will not add suffix to
>>>> distinguish
>>>>>>>>>> different nodes of the same type any more)?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck <
>> bbej...@gmail.com
>>>>
>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Florian,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the KIP.  I think being able to add more context
>> to
>>>> the
>>>>>>>>>>> processor names would be useful.
>>>>>>>>>>>
>>>>>>>>>>> I like the idea of adding a "withProcessorName" to Produced,
>>>>> Consumed
>>>>>>>>> and
>>>>>>>>>>> Joined.
>>>>>>>>>>>
>>>>>>>>>>> But instead of adding the "Processed" parameter to a large
>>>>> percentage
>>>>>>>>> of
>>>>>>>>>>> the methods, which would result in overloaded methods (which
>> we
>>>>>>>> removed
>>>>>>>>>>> quite a bit with KIP-182) what do you think of adding a
>> method
>>>>>>>>>>> to the AbstractStream class "withName(String processorName)"?
>>> BTW
>>>>> I"m
>>>>>>>>> not
>>>>>>>>>>> married to the method name, it's the best I can do off the
>> top
>>> of
>>>>> my
>>>>>>>>>> head.
>>>>>>>>>>>
>>>>>>>>>>> For the methods that return void, we'd have to add a
>> parameter,
>>>> but
>>>>>>>>> that
>>>>>>>>>>> would at least cut down on the number of overloaded methods
>> in
>>>> the
>>>>>>>> API.
>>>>>>>>>>>
>>>>>>>>>>> Just my 2 cents.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Bill
>>>>>>>>>>>
>>>>>>>>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois <
>>>>>>>>>> fhussonn...@gmail.com
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I would like to start a new discussion on following KIP :
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>>
>>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>>>>>>>
>>>>>>>>>>>> This is still a draft.
>>>>>>>>>>>>
>>>>>>>>>>>> Looking forward for your feedback.
>>>>>>>>>>>> --
>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Florian HUSSONNOIS
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Florian HUSSONNOIS
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>> --
>> Florian HUSSONNOIS
>>
> 
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to