Hi Josh, As we chatted offline, would you like to summarize your proposed Transform APIs in a separate JIRA so we can continue our discussions there?
Guozhang On Tue, Apr 5, 2016 at 4:13 PM, Guozhang Wang <wangg...@gmail.com> wrote: > HI Josh, > > Re 1): transform is only for usage in the higher-level DSL, while in the > lower-level APIs people are expected to work with Processor only, which for > now use context.forward() to send record to the downstream processors. > > Re 2): I have a few questions for your propose: with different typed > key-value pairs, are they supposed to be forwarding to different children > processors, i.e. you'd better call "forward(K, V, String /* child name*/ )" > rather than "forward(K, V)" which will forward to all the children > processors, one by one. In this case, the topology builder also needs to be > refactored since we need to make sure the children processors exist when > the processors are defined, which could be a bit tricky in implementation. > > Also, is this change also aimed at making the lower-level Processor to be > type-safe as well? The main motivation for not supporting strong typed > Processors is mainly for allowing users to flexibly connect processors in > the topology without worrying about data types of each processor. As I > mentioned above, making specific forwards to downstream processors would > likely to defeat this purpose. > > > Guozhang > > > On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <jos...@gmail.com> wrote: > >> Hi Guozhang, >> >> I'll reply to your points in-line below: >> >> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wangg...@gmail.com> wrote: >> >> > Hi Josh, >> > >> > I think there are a few issues that we want to resolve here, which >> could be >> > orthogonal to each other. >> > >> > 1) one-to-many mapping in transform() function that generates a single >> > stream (i.e. single typed key-value pairs). >> > >> > Since transform() already enforces to make type-safe return values, one >> > thing we can do is to change the punctuate() function return value from >> > "null" to "R" as well. And then for one-to-many mapping one can then >> define >> > R as Array<MyType> >> > >> > stream.transform().flatMap(/* from Array<MyType> to MyType*/) >> > >> > >> Interesting, it hadn't thought of returning an Iterable from a Transformer >> to achieve the one-to-many case. Regardless, my initial reaction is that >> this seems natural when you're working in the declarative DSL (where we >> can >> move the flatMap to the left of the one-to-many lambda), but seems >> cumbersome when writing a lower-level imperative Transformer/Processor >> (which then requires a subsequent flatMap to explode the arrays). I think >> I'd prefer a more imperative, side-effecting style of emission in this >> case; see my reply to #2 below. >> >> >> > 2) having a new function that takes one stream, and generate multiple >> > streams with different key-value types. >> > >> > This is a good-to-have operator in the Streams DSL, and I think this is >> > your proposed new API in the previous email? I am not sure I >> > understand the "telescoping" >> > arity completely though, so let me know if I'm wrong. >> > >> >> Yes, sorry, I wrote my previous mail in a hurry this morning. By >> "telescoping", I meant defining processor/transformer interfaces for each >> supported output arity, with the corresponding output-types expressed as >> generics: >> >> - Processor1<K, V, K1, V1> >> - Processor2<K, V, K1, V1, K2, V2> >> - Processor3<K, V, K1, V1, K2, V2, K3, V3> >> >> (Yes, the number of generics here is getting unwieldy, but I don't >> immediately see a good way to avoid that while preserving type-safety. >> Some >> sort of cosmetic improvement would be nice!) >> >> Given this, the framework could inject type-safe "emitters" into the >> Processors for each output-stream: >> >> interface Processor2<K, V, K1, V1, K2, V2> { >> void init(ProcessorContext context, Forwarder<K1, V1> output1, >> Forwarder<K2, V2> output2); >> >> // these can use the forwarders provided to init() to emit any number of >> values >> void process(I input); >> void punctuate(long timestamp); >> // ... >> } >> >> interface Forwarder<K, V> { >> void forward(K key, V value); >> } >> >> ... then, in KStream<K,V>: >> >> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V, >> K1, V1, K2, V2> processor, String... stateStores); >> >> I haven't worked through all of the details, but I'm optimistic that this >> could work nicely to unify the Transformer and Processor APIs, and address >> all of the described use-cases (up to some arbitrarily-chosen number of >> supported output-streams). >> >> >> > 3) having data-driven emission policy (this will be the building block >> of >> > session windows) as well as time-drive emission policy. >> > >> > I am thinking about how to support this as well, one thing is that we >> can >> > use the underlying process() function for data-driven emission, for >> > example, if there is a session-start / end flag then create the >> > corresponding session record in state, and only emit upon session-end >> flag; >> > and the underlying punctuate() function for time-drive emission (we >> > probably need to first refactor it to be triggered by record timestamp >> > instead of wallclock time). >> > >> >> Yes, I agree: data-driven emission could work just fine with process(), >> and >> delayed emission works nicely with punctuate(). I've also been meaning to >> mention the clear need for event-time punctuation, so I'm glad to hear >> that's on your radar! Watermarking will be important for >> session-windowing. >> >> Thoughts? >> >> -josh >> >> >> > >> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jos...@gmail.com> >> wrote: >> > >> > > Hi all, >> > > >> > > Just chiming in with Yuto: I think the custom Processor becomes >> > attractive >> > > in scenarios where a node in the graph may emit to a variety of >> > downstream >> > > paths, possibly after some delay, depending on logic. This can >> probably >> > > often be achieved with the existing DSL using some combination of >> > > predicates and intermediate representations, but this involves >> > contortions >> > > that feel cumbersome, and probably leads to less intelligible code. >> I'm >> > > also not sure the current DSL can model scenarios where the >> > transformation >> > > may be one-to-many, as in the last part of Yuto's example, or where >> the >> > > emission-delay is data-driven, as in my earlier "sessionization" >> example. >> > > >> > > One idea I'd offer is to provide a mechanism for wiring in Processors >> > with >> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1, >> > O2>, >> > > etc), and providing each arity with type-safe forwarding interfaces >> for >> > > each output stream (eg, Forwarder<T>). This assigns each >> output-stream a >> > > clear ordinal, and suggests a corresponding type-safe return-type for >> the >> > > DSL (eg, KStreamTuple2<O1, O2>). >> > > >> > > I think this pattern could provide a unification of the 'Transformer' >> and >> > > 'Processor' APIs. >> > > This was what I had in mind for a PR we discussed earlier (for >> modifying >> > > the Transformer API), but the scope expanded beyond what I felt >> > comfortable >> > > submitting without discussion, and I had to prioritize other efforts. >> > > Regardless, I could get a WIP branch pushed to github later today to >> > > illustrate if you'd like to see it. >> > > >> > > HTH, >> > > -josh >> > > >> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >> > > >> > > > Thanks Yuto for your code snippet. Since you need to access a >> > customized >> > > > external storage for metadata, that indeed cannot be wrapped in any >> > > > built-in operators in the Streams DSL yet, and your code example in >> the >> > > > previous email would be close to the best you can do with the >> > high-level >> > > > DSL now. >> > > > >> > > > One minor improvement from your above code, though, is that instead >> of >> > > > calling map(... -> process()) you can actually call transform(), >> which >> > > > still allows you to provide a customized transformer function, but >> it >> > > still >> > > > gives you strong typing assuming all these three kinds of records >> are >> > of >> > > > the same key / value types. >> > > > >> > > > Guozhang >> > > > >> > > > >> > > > >> > > > >> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA < >> > > kawamuray.dad...@gmail.com >> > > > > >> > > > wrote: >> > > > >> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: >> > > > > > Hi Yuto, >> > > > > > >> > > > > > Is the destination topic embedded as part of the value in the >> > > original >> > > > > > "foo" topic? If yes could you just access that field directly >> > instead >> > > > of >> > > > > > mapping to a (key, value, destination) triplet? >> > > > > > >> > > > > >> > > > > Nope. KeyValueWithDestination is just an example of output from >> the >> > > > > first Processor and is not included in actual messages that the >> topic >> > > > > foo received. >> > > > > Let me explain bit more realistic use-case. How can we write a >> > > > > Processor like below in High-level DSL cleanly? >> > > > > >> > > > > ```java >> > > > > public class EventProcessor implements Processor<String, Event> { >> > > > > ... >> > > > > @Override >> > > > > public void process(String key, Event value) { >> > > > > EventMetadata meta = >> > > > > getEventMetadataFromExternalStorage(value.getId()); >> > > > > >> > > > > if (isFieldACorrupted(meta, value.getFieldA())) { >> > > > > // This event is corrupted! let's evacuate it once to >> the >> > > > > grave topic for further investigation. >> > > > > context.forward(key, value, "CorruptedEventSink"); >> > > > > } >> > > > > if (isFieldBCorrupted(meta, value.getFieldB())) { >> > > > > // Antoher case of corruption, but maybe recoverable. >> > > > > context.forward(key, value, >> > > "CorruptedEventRecoveryProcessor"); >> > > > > } >> > > > > >> > > > > for (Foo foo : event.getFoos()) { >> > > > > context.forward(key, buildMessage(meta, foo), >> > > "FooProcessor"); >> > > > > } >> > > > > } >> > > > > ... >> > > > > } >> > > > > ``` >> > > > > >> > > > > >> > > > > > Guozhang >> > > > > > >> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA < >> > > > > kawamuray.dad...@gmail.com> >> > > > > > wrote: >> > > > > > >> > > > > >> Hi Guozhang, >> > > > > >> >> > > > > >> >> > > > > >> >> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: >> > > > > >> > Hi Yuto, >> > > > > >> > >> > > > > >> > That is a good suggestion, the child index is not very >> intuitive >> > > > from >> > > > > >> > programmer's view and we can even consider replacing it with >> the >> > > > > >> processor >> > > > > >> > name instead of overloading it. Could you file a JIRA? >> > > > > >> > >> > > > > >> >> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 >> > > > > >> >> > > > > >> > Also I am wondering if you have looked at the higher-level >> > Streams >> > > > > DSL, >> > > > > >> and >> > > > > >> > if yes could let me know what are the limitations from using >> > that >> > > > > APIs in >> > > > > >> > your case? >> > > > > >> > >> > > > > >> >> > > > > >> Well, I read though high-level DSL interface but couldn't find >> an >> > > easy >> > > > > >> way to handle output from Processors which could issue multiple >> > > > > >> messages to arbitrary different destinations. >> > > > > >> Maybe it could be done by doing something like below but it >> > doesn't >> > > > > >> look good. Please let me know if you have any idea to do this >> in >> > > > > >> easier way. >> > > > > >> >> > > > > >> ```java >> > > > > >> class KeyValueWithDestination { >> > > > > >> K key; >> > > > > >> V value; >> > > > > >> String destination; >> > > > > >> } >> > > > > >> >> > > > > >> class DestinationPredicate implements Predicate<K, >> > > > > >> KeyValueWithDestination> { >> > > > > >> String destination; >> > > > > >> @Override >> > > > > >> public boolean test(K key, KeyValueWithDestination value) { >> > > > > >> return value.destination.equals(destination); >> > > > > >> } >> > > > > >> } >> > > > > >> >> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"}; >> > > > > >> >> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates = >> > > > > >> >> Arrays.stream(destTopics).map(DestinationPredicate::new) >> > > > > >> .toArray(Predicate<K, >> > > > > >> KeyValueWithDestination>::new); >> > > > > >> >> > > > > >> branches = builder.stream("foo") >> > > > > >> .map((key, value) -> processor.process(key, >> > value) >> > > > > >> /* => KeyValueWithDestination */) >> > > > > >> .branch(predicates); >> > > > > >> >> > > > > >> for (int i = 0; i < branches.length; i++) { >> > > > > >> branches[i].to(destTopics[i]); >> > > > > >> } >> > > > > >> ``` >> > > > > >> >> > > > > >> >> > > > > >> > Guozhang >> > > > > >> > >> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA < >> > > > > >> kawamuray.dad...@gmail.com> >> > > > > >> > wrote: >> > > > > >> > >> > > > > >> >> When I tried to implement a task which does kinda >> dispatching >> > to >> > > > > >> >> downstream processors or sinks, looks like relying on >> > > > > >> >> context.forward(K, V, int childIndex) is the only way now. >> > > > > >> >> I have a question why this method implemented using >> > > > childIndex(which >> > > > > >> >> is just an index of children "List" that based on order of >> > > > > >> >> builder.addProcessor() call) instead of child name(first >> > argument >> > > > to >> > > > > >> >> add{Processor,Sink}). >> > > > > >> >> I wanna ask what is the concrete use case of forward(K, V, >> int >> > > > > >> >> childIndex) and is it makes sense to introduce another >> > overload: >> > > > > >> >> forward(K, V, String childName) for much handy use. >> > > > > >> >> Currently I have a use-case like this in my mind: >> > > > > >> >> ``` >> > > > > >> >> builder.addProcessor("DispatchProcess", new >> > > > > >> >> DispatchProcessorSupplier(), "Source"); >> > > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(), >> > > > > >> >> "DispatchProcess"); >> > > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(), >> > > > > >> >> "DispatchProcess"); >> > > > > >> >> >> > > > > >> >> // in process(key, value) >> > > > > >> >> if ("key-for-A".equals(key)) { >> > > > > >> >> context.forward(key, value, "Process-A"); >> > > > > >> >> } else if ("key-for-B".equals(key)) { >> > > > > >> >> context.forward(key, value, "Process-B"); >> > > > > >> >> } >> > > > > >> >> ``` >> > > > > >> >> >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > -- >> > > > > >> > -- Guozhang >> > > > > >> >> > > > > > >> > > > > > >> > > > > > >> > > > > > -- >> > > > > > -- Guozhang >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > > -- > -- Guozhang > -- -- Guozhang