Yes, sounds good, Guozhang, thanks. I'll create a jira today.

-josh

On Thu, Apr 14, 2016, 1:37 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Josh,
>
> As we chatted offline, would you like to summarize your proposed Transform
> APIs in a separate JIRA so we can continue our discussions there?
>
> Guozhang
>
> On Tue, Apr 5, 2016 at 4:13 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > HI Josh,
> >
> > Re 1): transform is only for usage in the higher-level DSL, while in the
> > lower-level APIs people are expected to work with Processor only, which
> for
> > now use context.forward() to send record to the downstream processors.
> >
> > Re 2): I have a few questions for your propose: with different typed
> > key-value pairs, are they supposed to be forwarding to different children
> > processors, i.e. you'd better call "forward(K, V, String /* child name*/
> )"
> > rather than "forward(K, V)" which will forward to all the children
> > processors, one by one. In this case, the topology builder also needs to
> be
> > refactored since we need to make sure the children processors exist when
> > the processors are defined, which could be a bit tricky in
> implementation.
> >
> > Also, is this change also aimed at making the lower-level Processor to be
> > type-safe as well? The main motivation for not supporting strong typed
> > Processors is mainly for allowing users to flexibly connect processors in
> > the topology without worrying about data types of each processor. As I
> > mentioned above, making specific forwards to downstream processors would
> > likely to defeat this purpose.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <jos...@gmail.com> wrote:
> >
> >> Hi Guozhang,
> >>
> >> I'll reply to your points in-line below:
> >>
> >> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >> > Hi Josh,
> >> >
> >> > I think there are a few issues that we want to resolve here, which
> >> could be
> >> > orthogonal to each other.
> >> >
> >> > 1) one-to-many mapping in transform() function that generates a single
> >> > stream (i.e. single typed key-value pairs).
> >> >
> >> > Since transform() already enforces to make type-safe return values,
> one
> >> > thing we can do is to change the punctuate() function return value
> from
> >> > "null" to "R" as well. And then for one-to-many mapping one can then
> >> define
> >> > R as Array<MyType>
> >> >
> >> > stream.transform().flatMap(/* from Array<MyType> to MyType*/)
> >> >
> >> >
> >> Interesting, it hadn't thought of returning an Iterable from a
> Transformer
> >> to achieve the one-to-many case. Regardless, my initial reaction is that
> >> this seems natural when you're working in the declarative DSL (where we
> >> can
> >> move the flatMap to the left of the one-to-many lambda), but seems
> >> cumbersome when writing a lower-level imperative Transformer/Processor
> >> (which then requires a subsequent flatMap to explode the arrays). I
> think
> >> I'd prefer a more imperative, side-effecting style of emission in this
> >> case; see my reply to #2 below.
> >>
> >>
> >> > 2) having a new function that takes one stream, and generate multiple
> >> > streams with different key-value types.
> >> >
> >> > This is a good-to-have operator in the Streams DSL, and I think this
> is
> >> > your proposed new API in the previous email? I am not sure I
> >> > understand the "telescoping"
> >> > arity completely though, so let me know if I'm wrong.
> >> >
> >>
> >> Yes, sorry, I wrote my previous mail in a hurry this morning. By
> >> "telescoping", I meant defining processor/transformer interfaces for
> each
> >> supported output arity, with the corresponding output-types expressed as
> >> generics:
> >>
> >>    - Processor1<K, V, K1, V1>
> >>    - Processor2<K, V, K1, V1, K2, V2>
> >>    - Processor3<K, V, K1, V1, K2, V2, K3, V3>
> >>
> >> (Yes, the number of generics here is getting unwieldy, but I don't
> >> immediately see a good way to avoid that while preserving type-safety.
> >> Some
> >> sort of cosmetic improvement would be nice!)
> >>
> >> Given this, the framework could inject type-safe "emitters" into the
> >> Processors for each output-stream:
> >>
> >> interface Processor2<K, V, K1, V1, K2, V2> {
> >>   void init(ProcessorContext context, Forwarder<K1, V1> output1,
> >> Forwarder<K2, V2> output2);
> >>
> >>   // these can use the forwarders provided to init() to emit any number
> of
> >> values
> >>   void process(I input);
> >>   void punctuate(long timestamp);
> >>   // ...
> >> }
> >>
> >> interface Forwarder<K, V> {
> >>   void forward(K key, V value);
> >> }
> >>
> >> ... then, in KStream<K,V>:
> >>
> >> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V,
> >> K1, V1, K2, V2> processor, String... stateStores);
> >>
> >> I haven't worked through all of the details, but I'm optimistic that
> this
> >> could work nicely to unify the Transformer and Processor APIs, and
> address
> >> all of the described use-cases (up to some arbitrarily-chosen number of
> >> supported output-streams).
> >>
> >>
> >> > 3) having data-driven emission policy (this will be the building block
> >> of
> >> > session windows) as well as time-drive emission policy.
> >> >
> >> > I am thinking about how to support this as well, one thing is that we
> >> can
> >> > use the underlying process() function for data-driven emission, for
> >> > example, if there is a session-start / end flag then create the
> >> > corresponding session record in state, and only emit upon session-end
> >> flag;
> >> > and the underlying punctuate() function for time-drive emission (we
> >> > probably need to first refactor it to be triggered by record timestamp
> >> > instead of wallclock time).
> >> >
> >>
> >> Yes, I agree: data-driven emission could work just fine with process(),
> >> and
> >> delayed emission works nicely with punctuate(). I've also been meaning
> to
> >> mention the clear need for event-time punctuation, so I'm glad to hear
> >> that's on your radar! Watermarking will be important for
> >> session-windowing.
> >>
> >> Thoughts?
> >>
> >> -josh
> >>
> >>
> >> >
> >> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jos...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > > Just chiming in with Yuto: I think the custom Processor becomes
> >> > attractive
> >> > > in scenarios where a node in the graph may emit to a variety of
> >> > downstream
> >> > > paths, possibly after some delay, depending on logic. This can
> >> probably
> >> > > often be achieved with the existing DSL using some combination of
> >> > > predicates and intermediate representations, but this involves
> >> > contortions
> >> > > that feel cumbersome, and probably leads to less intelligible code.
> >> I'm
> >> > > also not sure the current DSL can model scenarios where the
> >> > transformation
> >> > > may be one-to-many, as in the last part of Yuto's example, or where
> >> the
> >> > > emission-delay is data-driven, as in my earlier "sessionization"
> >> example.
> >> > >
> >> > > One idea I'd offer is to provide a mechanism for wiring in
> Processors
> >> > with
> >> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I,
> O1,
> >> > O2>,
> >> > > etc), and providing each arity with type-safe forwarding interfaces
> >> for
> >> > > each output stream (eg, Forwarder<T>). This assigns each
> >> output-stream a
> >> > > clear ordinal, and suggests a corresponding type-safe return-type
> for
> >> the
> >> > > DSL (eg, KStreamTuple2<O1, O2>).
> >> > >
> >> > > I think this pattern could provide a unification of the
> 'Transformer'
> >> and
> >> > > 'Processor' APIs.
> >> > > This was what I had in mind for a PR we discussed earlier (for
> >> modifying
> >> > > the Transformer API), but the scope expanded beyond what I felt
> >> > comfortable
> >> > > submitting without discussion, and I had to prioritize other
> efforts.
> >> > > Regardless, I could get a WIP branch pushed to github later today to
> >> > > illustrate if you'd like to see it.
> >> > >
> >> > > HTH,
> >> > > -josh
> >> > >
> >> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Thanks Yuto for your code snippet. Since you need to access a
> >> > customized
> >> > > > external storage for metadata, that indeed cannot be wrapped in
> any
> >> > > > built-in operators in the Streams DSL yet, and your code example
> in
> >> the
> >> > > > previous email would be close to the best you can do with the
> >> > high-level
> >> > > > DSL now.
> >> > > >
> >> > > > One minor improvement from your above code, though, is that
> instead
> >> of
> >> > > > calling map(... -> process()) you can actually call transform(),
> >> which
> >> > > > still allows you to provide a customized transformer function, but
> >> it
> >> > > still
> >> > > > gives you strong typing assuming all these three kinds of records
> >> are
> >> > of
> >> > > > the same key / value types.
> >> > > >
> >> > > > Guozhang
> >> > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> >> > > kawamuray.dad...@gmail.com
> >> > > > >
> >> > > > wrote:
> >> > > >
> >> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>:
> >> > > > > > Hi Yuto,
> >> > > > > >
> >> > > > > > Is the destination topic embedded as part of the value in the
> >> > > original
> >> > > > > > "foo" topic? If yes could you just access that field directly
> >> > instead
> >> > > > of
> >> > > > > > mapping to a (key, value, destination) triplet?
> >> > > > > >
> >> > > > >
> >> > > > > Nope. KeyValueWithDestination is just an example of output from
> >> the
> >> > > > > first Processor and is not included in actual messages that the
> >> topic
> >> > > > > foo received.
> >> > > > > Let me explain bit more realistic use-case. How can we write a
> >> > > > > Processor like below in High-level DSL cleanly?
> >> > > > >
> >> > > > > ```java
> >> > > > > public class EventProcessor implements Processor<String, Event>
> {
> >> > > > > ...
> >> > > > >   @Override
> >> > > > >   public void process(String key, Event value) {
> >> > > > >       EventMetadata meta =
> >> > > > > getEventMetadataFromExternalStorage(value.getId());
> >> > > > >
> >> > > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> >> > > > >           // This event is corrupted! let's evacuate it once to
> >> the
> >> > > > > grave topic for further investigation.
> >> > > > >           context.forward(key, value, "CorruptedEventSink");
> >> > > > >       }
> >> > > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> >> > > > >           // Antoher case of corruption, but maybe recoverable.
> >> > > > >           context.forward(key, value,
> >> > > "CorruptedEventRecoveryProcessor");
> >> > > > >       }
> >> > > > >
> >> > > > >       for (Foo foo : event.getFoos()) {
> >> > > > >           context.forward(key, buildMessage(meta, foo),
> >> > > "FooProcessor");
> >> > > > >       }
> >> > > > >   }
> >> > > > > ...
> >> > > > > }
> >> > > > > ```
> >> > > > >
> >> > > > >
> >> > > > > > Guozhang
> >> > > > > >
> >> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> >> > > > > kawamuray.dad...@gmail.com>
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > >> Hi Guozhang,
> >> > > > > >>
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com
> >:
> >> > > > > >> > Hi Yuto,
> >> > > > > >> >
> >> > > > > >> > That is a good suggestion, the child index is not very
> >> intuitive
> >> > > > from
> >> > > > > >> > programmer's view and we can even consider replacing it
> with
> >> the
> >> > > > > >> processor
> >> > > > > >> > name instead of overloading it. Could you file a JIRA?
> >> > > > > >> >
> >> > > > > >>
> >> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> >> > > > > >>
> >> > > > > >> > Also I am wondering if you have looked at the higher-level
> >> > Streams
> >> > > > > DSL,
> >> > > > > >> and
> >> > > > > >> > if yes could let me know what are the limitations from
> using
> >> > that
> >> > > > > APIs in
> >> > > > > >> > your case?
> >> > > > > >> >
> >> > > > > >>
> >> > > > > >> Well, I read though high-level DSL interface but couldn't
> find
> >> an
> >> > > easy
> >> > > > > >> way to handle output from Processors which could issue
> multiple
> >> > > > > >> messages to arbitrary different destinations.
> >> > > > > >> Maybe it could be done by doing something like below but it
> >> > doesn't
> >> > > > > >> look good. Please let me know if you have any idea to do this
> >> in
> >> > > > > >> easier way.
> >> > > > > >>
> >> > > > > >> ```java
> >> > > > > >> class KeyValueWithDestination {
> >> > > > > >>     K key;
> >> > > > > >>     V value;
> >> > > > > >>     String destination;
> >> > > > > >> }
> >> > > > > >>
> >> > > > > >> class DestinationPredicate implements Predicate<K,
> >> > > > > >> KeyValueWithDestination> {
> >> > > > > >>     String destination;
> >> > > > > >>     @Override
> >> > > > > >>     public boolean test(K key, KeyValueWithDestination
> value) {
> >> > > > > >>         return value.destination.equals(destination);
> >> > > > > >>     }
> >> > > > > >> }
> >> > > > > >>
> >> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> >> > > > > >>
> >> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates =
> >> > > > > >>
> >>  Arrays.stream(destTopics).map(DestinationPredicate::new)
> >> > > > > >>                                  .toArray(Predicate<K,
> >> > > > > >> KeyValueWithDestination>::new);
> >> > > > > >>
> >> > > > > >> branches = builder.stream("foo")
> >> > > > > >>                   .map((key, value) -> processor.process(key,
> >> > value)
> >> > > > > >> /* => KeyValueWithDestination */)
> >> > > > > >>                   .branch(predicates);
> >> > > > > >>
> >> > > > > >> for (int i = 0; i < branches.length; i++) {
> >> > > > > >>     branches[i].to(destTopics[i]);
> >> > > > > >> }
> >> > > > > >> ```
> >> > > > > >>
> >> > > > > >>
> >> > > > > >> > Guozhang
> >> > > > > >> >
> >> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> >> > > > > >> kawamuray.dad...@gmail.com>
> >> > > > > >> > wrote:
> >> > > > > >> >
> >> > > > > >> >> When I tried to implement a task which does kinda
> >> dispatching
> >> > to
> >> > > > > >> >> downstream processors or sinks, looks like relying on
> >> > > > > >> >> context.forward(K, V, int childIndex) is the only way now.
> >> > > > > >> >> I have a question why this method implemented using
> >> > > > childIndex(which
> >> > > > > >> >> is just an index of children "List" that based on order of
> >> > > > > >> >> builder.addProcessor() call) instead of child name(first
> >> > argument
> >> > > > to
> >> > > > > >> >> add{Processor,Sink}).
> >> > > > > >> >> I wanna ask what is the concrete use case of forward(K, V,
> >> int
> >> > > > > >> >> childIndex) and is it makes sense to introduce another
> >> > overload:
> >> > > > > >> >> forward(K, V, String childName) for much handy use.
> >> > > > > >> >> Currently I have a use-case like this in my mind:
> >> > > > > >> >> ```
> >> > > > > >> >> builder.addProcessor("DispatchProcess", new
> >> > > > > >> >> DispatchProcessorSupplier(), "Source");
> >> > > > > >> >> builder.addProcessor("Process-A", new
> ProcessorASupplier(),
> >> > > > > >> >> "DispatchProcess");
> >> > > > > >> >> builder.addProcessor("Process-B", new
> ProcessorBSupplier(),
> >> > > > > >> >> "DispatchProcess");
> >> > > > > >> >>
> >> > > > > >> >> // in process(key, value)
> >> > > > > >> >> if ("key-for-A".equals(key)) {
> >> > > > > >> >>     context.forward(key, value, "Process-A");
> >> > > > > >> >> } else if ("key-for-B".equals(key)) {
> >> > > > > >> >>     context.forward(key, value, "Process-B");
> >> > > > > >> >> }
> >> > > > > >> >> ```
> >> > > > > >> >>
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> >
> >> > > > > >> > --
> >> > > > > >> > -- Guozhang
> >> > > > > >>
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > > -- Guozhang
> >> > > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > -- Guozhang
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to