Yes, sounds good, Guozhang, thanks. I'll create a jira today. -josh
On Thu, Apr 14, 2016, 1:37 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Josh, > > As we chatted offline, would you like to summarize your proposed Transform > APIs in a separate JIRA so we can continue our discussions there? > > Guozhang > > On Tue, Apr 5, 2016 at 4:13 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > HI Josh, > > > > Re 1): transform is only for usage in the higher-level DSL, while in the > > lower-level APIs people are expected to work with Processor only, which > for > > now use context.forward() to send record to the downstream processors. > > > > Re 2): I have a few questions for your propose: with different typed > > key-value pairs, are they supposed to be forwarding to different children > > processors, i.e. you'd better call "forward(K, V, String /* child name*/ > )" > > rather than "forward(K, V)" which will forward to all the children > > processors, one by one. In this case, the topology builder also needs to > be > > refactored since we need to make sure the children processors exist when > > the processors are defined, which could be a bit tricky in > implementation. > > > > Also, is this change also aimed at making the lower-level Processor to be > > type-safe as well? The main motivation for not supporting strong typed > > Processors is mainly for allowing users to flexibly connect processors in > > the topology without worrying about data types of each processor. As I > > mentioned above, making specific forwards to downstream processors would > > likely to defeat this purpose. > > > > > > Guozhang > > > > > > On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <jos...@gmail.com> wrote: > > > >> Hi Guozhang, > >> > >> I'll reply to your points in-line below: > >> > >> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >> > Hi Josh, > >> > > >> > I think there are a few issues that we want to resolve here, which > >> could be > >> > orthogonal to each other. > >> > > >> > 1) one-to-many mapping in transform() function that generates a single > >> > stream (i.e. single typed key-value pairs). > >> > > >> > Since transform() already enforces to make type-safe return values, > one > >> > thing we can do is to change the punctuate() function return value > from > >> > "null" to "R" as well. And then for one-to-many mapping one can then > >> define > >> > R as Array<MyType> > >> > > >> > stream.transform().flatMap(/* from Array<MyType> to MyType*/) > >> > > >> > > >> Interesting, it hadn't thought of returning an Iterable from a > Transformer > >> to achieve the one-to-many case. Regardless, my initial reaction is that > >> this seems natural when you're working in the declarative DSL (where we > >> can > >> move the flatMap to the left of the one-to-many lambda), but seems > >> cumbersome when writing a lower-level imperative Transformer/Processor > >> (which then requires a subsequent flatMap to explode the arrays). I > think > >> I'd prefer a more imperative, side-effecting style of emission in this > >> case; see my reply to #2 below. > >> > >> > >> > 2) having a new function that takes one stream, and generate multiple > >> > streams with different key-value types. > >> > > >> > This is a good-to-have operator in the Streams DSL, and I think this > is > >> > your proposed new API in the previous email? I am not sure I > >> > understand the "telescoping" > >> > arity completely though, so let me know if I'm wrong. > >> > > >> > >> Yes, sorry, I wrote my previous mail in a hurry this morning. By > >> "telescoping", I meant defining processor/transformer interfaces for > each > >> supported output arity, with the corresponding output-types expressed as > >> generics: > >> > >> - Processor1<K, V, K1, V1> > >> - Processor2<K, V, K1, V1, K2, V2> > >> - Processor3<K, V, K1, V1, K2, V2, K3, V3> > >> > >> (Yes, the number of generics here is getting unwieldy, but I don't > >> immediately see a good way to avoid that while preserving type-safety. > >> Some > >> sort of cosmetic improvement would be nice!) > >> > >> Given this, the framework could inject type-safe "emitters" into the > >> Processors for each output-stream: > >> > >> interface Processor2<K, V, K1, V1, K2, V2> { > >> void init(ProcessorContext context, Forwarder<K1, V1> output1, > >> Forwarder<K2, V2> output2); > >> > >> // these can use the forwarders provided to init() to emit any number > of > >> values > >> void process(I input); > >> void punctuate(long timestamp); > >> // ... > >> } > >> > >> interface Forwarder<K, V> { > >> void forward(K key, V value); > >> } > >> > >> ... then, in KStream<K,V>: > >> > >> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V, > >> K1, V1, K2, V2> processor, String... stateStores); > >> > >> I haven't worked through all of the details, but I'm optimistic that > this > >> could work nicely to unify the Transformer and Processor APIs, and > address > >> all of the described use-cases (up to some arbitrarily-chosen number of > >> supported output-streams). > >> > >> > >> > 3) having data-driven emission policy (this will be the building block > >> of > >> > session windows) as well as time-drive emission policy. > >> > > >> > I am thinking about how to support this as well, one thing is that we > >> can > >> > use the underlying process() function for data-driven emission, for > >> > example, if there is a session-start / end flag then create the > >> > corresponding session record in state, and only emit upon session-end > >> flag; > >> > and the underlying punctuate() function for time-drive emission (we > >> > probably need to first refactor it to be triggered by record timestamp > >> > instead of wallclock time). > >> > > >> > >> Yes, I agree: data-driven emission could work just fine with process(), > >> and > >> delayed emission works nicely with punctuate(). I've also been meaning > to > >> mention the clear need for event-time punctuation, so I'm glad to hear > >> that's on your radar! Watermarking will be important for > >> session-windowing. > >> > >> Thoughts? > >> > >> -josh > >> > >> > >> > > >> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jos...@gmail.com> > >> wrote: > >> > > >> > > Hi all, > >> > > > >> > > Just chiming in with Yuto: I think the custom Processor becomes > >> > attractive > >> > > in scenarios where a node in the graph may emit to a variety of > >> > downstream > >> > > paths, possibly after some delay, depending on logic. This can > >> probably > >> > > often be achieved with the existing DSL using some combination of > >> > > predicates and intermediate representations, but this involves > >> > contortions > >> > > that feel cumbersome, and probably leads to less intelligible code. > >> I'm > >> > > also not sure the current DSL can model scenarios where the > >> > transformation > >> > > may be one-to-many, as in the last part of Yuto's example, or where > >> the > >> > > emission-delay is data-driven, as in my earlier "sessionization" > >> example. > >> > > > >> > > One idea I'd offer is to provide a mechanism for wiring in > Processors > >> > with > >> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, > O1, > >> > O2>, > >> > > etc), and providing each arity with type-safe forwarding interfaces > >> for > >> > > each output stream (eg, Forwarder<T>). This assigns each > >> output-stream a > >> > > clear ordinal, and suggests a corresponding type-safe return-type > for > >> the > >> > > DSL (eg, KStreamTuple2<O1, O2>). > >> > > > >> > > I think this pattern could provide a unification of the > 'Transformer' > >> and > >> > > 'Processor' APIs. > >> > > This was what I had in mind for a PR we discussed earlier (for > >> modifying > >> > > the Transformer API), but the scope expanded beyond what I felt > >> > comfortable > >> > > submitting without discussion, and I had to prioritize other > efforts. > >> > > Regardless, I could get a WIP branch pushed to github later today to > >> > > illustrate if you'd like to see it. > >> > > > >> > > HTH, > >> > > -josh > >> > > > >> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > > >> > > > Thanks Yuto for your code snippet. Since you need to access a > >> > customized > >> > > > external storage for metadata, that indeed cannot be wrapped in > any > >> > > > built-in operators in the Streams DSL yet, and your code example > in > >> the > >> > > > previous email would be close to the best you can do with the > >> > high-level > >> > > > DSL now. > >> > > > > >> > > > One minor improvement from your above code, though, is that > instead > >> of > >> > > > calling map(... -> process()) you can actually call transform(), > >> which > >> > > > still allows you to provide a customized transformer function, but > >> it > >> > > still > >> > > > gives you strong typing assuming all these three kinds of records > >> are > >> > of > >> > > > the same key / value types. > >> > > > > >> > > > Guozhang > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA < > >> > > kawamuray.dad...@gmail.com > >> > > > > > >> > > > wrote: > >> > > > > >> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > >> > > > > > Hi Yuto, > >> > > > > > > >> > > > > > Is the destination topic embedded as part of the value in the > >> > > original > >> > > > > > "foo" topic? If yes could you just access that field directly > >> > instead > >> > > > of > >> > > > > > mapping to a (key, value, destination) triplet? > >> > > > > > > >> > > > > > >> > > > > Nope. KeyValueWithDestination is just an example of output from > >> the > >> > > > > first Processor and is not included in actual messages that the > >> topic > >> > > > > foo received. > >> > > > > Let me explain bit more realistic use-case. How can we write a > >> > > > > Processor like below in High-level DSL cleanly? > >> > > > > > >> > > > > ```java > >> > > > > public class EventProcessor implements Processor<String, Event> > { > >> > > > > ... > >> > > > > @Override > >> > > > > public void process(String key, Event value) { > >> > > > > EventMetadata meta = > >> > > > > getEventMetadataFromExternalStorage(value.getId()); > >> > > > > > >> > > > > if (isFieldACorrupted(meta, value.getFieldA())) { > >> > > > > // This event is corrupted! let's evacuate it once to > >> the > >> > > > > grave topic for further investigation. > >> > > > > context.forward(key, value, "CorruptedEventSink"); > >> > > > > } > >> > > > > if (isFieldBCorrupted(meta, value.getFieldB())) { > >> > > > > // Antoher case of corruption, but maybe recoverable. > >> > > > > context.forward(key, value, > >> > > "CorruptedEventRecoveryProcessor"); > >> > > > > } > >> > > > > > >> > > > > for (Foo foo : event.getFoos()) { > >> > > > > context.forward(key, buildMessage(meta, foo), > >> > > "FooProcessor"); > >> > > > > } > >> > > > > } > >> > > > > ... > >> > > > > } > >> > > > > ``` > >> > > > > > >> > > > > > >> > > > > > Guozhang > >> > > > > > > >> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA < > >> > > > > kawamuray.dad...@gmail.com> > >> > > > > > wrote: > >> > > > > > > >> > > > > >> Hi Guozhang, > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com > >: > >> > > > > >> > Hi Yuto, > >> > > > > >> > > >> > > > > >> > That is a good suggestion, the child index is not very > >> intuitive > >> > > > from > >> > > > > >> > programmer's view and we can even consider replacing it > with > >> the > >> > > > > >> processor > >> > > > > >> > name instead of overloading it. Could you file a JIRA? > >> > > > > >> > > >> > > > > >> > >> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 > >> > > > > >> > >> > > > > >> > Also I am wondering if you have looked at the higher-level > >> > Streams > >> > > > > DSL, > >> > > > > >> and > >> > > > > >> > if yes could let me know what are the limitations from > using > >> > that > >> > > > > APIs in > >> > > > > >> > your case? > >> > > > > >> > > >> > > > > >> > >> > > > > >> Well, I read though high-level DSL interface but couldn't > find > >> an > >> > > easy > >> > > > > >> way to handle output from Processors which could issue > multiple > >> > > > > >> messages to arbitrary different destinations. > >> > > > > >> Maybe it could be done by doing something like below but it > >> > doesn't > >> > > > > >> look good. Please let me know if you have any idea to do this > >> in > >> > > > > >> easier way. > >> > > > > >> > >> > > > > >> ```java > >> > > > > >> class KeyValueWithDestination { > >> > > > > >> K key; > >> > > > > >> V value; > >> > > > > >> String destination; > >> > > > > >> } > >> > > > > >> > >> > > > > >> class DestinationPredicate implements Predicate<K, > >> > > > > >> KeyValueWithDestination> { > >> > > > > >> String destination; > >> > > > > >> @Override > >> > > > > >> public boolean test(K key, KeyValueWithDestination > value) { > >> > > > > >> return value.destination.equals(destination); > >> > > > > >> } > >> > > > > >> } > >> > > > > >> > >> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"}; > >> > > > > >> > >> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates = > >> > > > > >> > >> Arrays.stream(destTopics).map(DestinationPredicate::new) > >> > > > > >> .toArray(Predicate<K, > >> > > > > >> KeyValueWithDestination>::new); > >> > > > > >> > >> > > > > >> branches = builder.stream("foo") > >> > > > > >> .map((key, value) -> processor.process(key, > >> > value) > >> > > > > >> /* => KeyValueWithDestination */) > >> > > > > >> .branch(predicates); > >> > > > > >> > >> > > > > >> for (int i = 0; i < branches.length; i++) { > >> > > > > >> branches[i].to(destTopics[i]); > >> > > > > >> } > >> > > > > >> ``` > >> > > > > >> > >> > > > > >> > >> > > > > >> > Guozhang > >> > > > > >> > > >> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA < > >> > > > > >> kawamuray.dad...@gmail.com> > >> > > > > >> > wrote: > >> > > > > >> > > >> > > > > >> >> When I tried to implement a task which does kinda > >> dispatching > >> > to > >> > > > > >> >> downstream processors or sinks, looks like relying on > >> > > > > >> >> context.forward(K, V, int childIndex) is the only way now. > >> > > > > >> >> I have a question why this method implemented using > >> > > > childIndex(which > >> > > > > >> >> is just an index of children "List" that based on order of > >> > > > > >> >> builder.addProcessor() call) instead of child name(first > >> > argument > >> > > > to > >> > > > > >> >> add{Processor,Sink}). > >> > > > > >> >> I wanna ask what is the concrete use case of forward(K, V, > >> int > >> > > > > >> >> childIndex) and is it makes sense to introduce another > >> > overload: > >> > > > > >> >> forward(K, V, String childName) for much handy use. > >> > > > > >> >> Currently I have a use-case like this in my mind: > >> > > > > >> >> ``` > >> > > > > >> >> builder.addProcessor("DispatchProcess", new > >> > > > > >> >> DispatchProcessorSupplier(), "Source"); > >> > > > > >> >> builder.addProcessor("Process-A", new > ProcessorASupplier(), > >> > > > > >> >> "DispatchProcess"); > >> > > > > >> >> builder.addProcessor("Process-B", new > ProcessorBSupplier(), > >> > > > > >> >> "DispatchProcess"); > >> > > > > >> >> > >> > > > > >> >> // in process(key, value) > >> > > > > >> >> if ("key-for-A".equals(key)) { > >> > > > > >> >> context.forward(key, value, "Process-A"); > >> > > > > >> >> } else if ("key-for-B".equals(key)) { > >> > > > > >> >> context.forward(key, value, "Process-B"); > >> > > > > >> >> } > >> > > > > >> >> ``` > >> > > > > >> >> > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > -- > >> > > > > >> > -- Guozhang > >> > > > > >> > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > -- > >> > > > > > -- Guozhang > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > -- Guozhang > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > >> > > > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang >