Hi Guozhang, I'll reply to your points in-line below:
On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Josh, > > I think there are a few issues that we want to resolve here, which could be > orthogonal to each other. > > 1) one-to-many mapping in transform() function that generates a single > stream (i.e. single typed key-value pairs). > > Since transform() already enforces to make type-safe return values, one > thing we can do is to change the punctuate() function return value from > "null" to "R" as well. And then for one-to-many mapping one can then define > R as Array<MyType> > > stream.transform().flatMap(/* from Array<MyType> to MyType*/) > > Interesting, it hadn't thought of returning an Iterable from a Transformer to achieve the one-to-many case. Regardless, my initial reaction is that this seems natural when you're working in the declarative DSL (where we can move the flatMap to the left of the one-to-many lambda), but seems cumbersome when writing a lower-level imperative Transformer/Processor (which then requires a subsequent flatMap to explode the arrays). I think I'd prefer a more imperative, side-effecting style of emission in this case; see my reply to #2 below. > 2) having a new function that takes one stream, and generate multiple > streams with different key-value types. > > This is a good-to-have operator in the Streams DSL, and I think this is > your proposed new API in the previous email? I am not sure I > understand the "telescoping" > arity completely though, so let me know if I'm wrong. > Yes, sorry, I wrote my previous mail in a hurry this morning. By "telescoping", I meant defining processor/transformer interfaces for each supported output arity, with the corresponding output-types expressed as generics: - Processor1<K, V, K1, V1> - Processor2<K, V, K1, V1, K2, V2> - Processor3<K, V, K1, V1, K2, V2, K3, V3> (Yes, the number of generics here is getting unwieldy, but I don't immediately see a good way to avoid that while preserving type-safety. Some sort of cosmetic improvement would be nice!) Given this, the framework could inject type-safe "emitters" into the Processors for each output-stream: interface Processor2<K, V, K1, V1, K2, V2> { void init(ProcessorContext context, Forwarder<K1, V1> output1, Forwarder<K2, V2> output2); // these can use the forwarders provided to init() to emit any number of values void process(I input); void punctuate(long timestamp); // ... } interface Forwarder<K, V> { void forward(K key, V value); } ... then, in KStream<K,V>: <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V, K1, V1, K2, V2> processor, String... stateStores); I haven't worked through all of the details, but I'm optimistic that this could work nicely to unify the Transformer and Processor APIs, and address all of the described use-cases (up to some arbitrarily-chosen number of supported output-streams). > 3) having data-driven emission policy (this will be the building block of > session windows) as well as time-drive emission policy. > > I am thinking about how to support this as well, one thing is that we can > use the underlying process() function for data-driven emission, for > example, if there is a session-start / end flag then create the > corresponding session record in state, and only emit upon session-end flag; > and the underlying punctuate() function for time-drive emission (we > probably need to first refactor it to be triggered by record timestamp > instead of wallclock time). > Yes, I agree: data-driven emission could work just fine with process(), and delayed emission works nicely with punctuate(). I've also been meaning to mention the clear need for event-time punctuation, so I'm glad to hear that's on your radar! Watermarking will be important for session-windowing. Thoughts? -josh > > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jos...@gmail.com> wrote: > > > Hi all, > > > > Just chiming in with Yuto: I think the custom Processor becomes > attractive > > in scenarios where a node in the graph may emit to a variety of > downstream > > paths, possibly after some delay, depending on logic. This can probably > > often be achieved with the existing DSL using some combination of > > predicates and intermediate representations, but this involves > contortions > > that feel cumbersome, and probably leads to less intelligible code. I'm > > also not sure the current DSL can model scenarios where the > transformation > > may be one-to-many, as in the last part of Yuto's example, or where the > > emission-delay is data-driven, as in my earlier "sessionization" example. > > > > One idea I'd offer is to provide a mechanism for wiring in Processors > with > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1, > O2>, > > etc), and providing each arity with type-safe forwarding interfaces for > > each output stream (eg, Forwarder<T>). This assigns each output-stream a > > clear ordinal, and suggests a corresponding type-safe return-type for the > > DSL (eg, KStreamTuple2<O1, O2>). > > > > I think this pattern could provide a unification of the 'Transformer' and > > 'Processor' APIs. > > This was what I had in mind for a PR we discussed earlier (for modifying > > the Transformer API), but the scope expanded beyond what I felt > comfortable > > submitting without discussion, and I had to prioritize other efforts. > > Regardless, I could get a WIP branch pushed to github later today to > > illustrate if you'd like to see it. > > > > HTH, > > -josh > > > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Thanks Yuto for your code snippet. Since you need to access a > customized > > > external storage for metadata, that indeed cannot be wrapped in any > > > built-in operators in the Streams DSL yet, and your code example in the > > > previous email would be close to the best you can do with the > high-level > > > DSL now. > > > > > > One minor improvement from your above code, though, is that instead of > > > calling map(... -> process()) you can actually call transform(), which > > > still allows you to provide a customized transformer function, but it > > still > > > gives you strong typing assuming all these three kinds of records are > of > > > the same key / value types. > > > > > > Guozhang > > > > > > > > > > > > > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA < > > kawamuray.dad...@gmail.com > > > > > > > wrote: > > > > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > > > > > Hi Yuto, > > > > > > > > > > Is the destination topic embedded as part of the value in the > > original > > > > > "foo" topic? If yes could you just access that field directly > instead > > > of > > > > > mapping to a (key, value, destination) triplet? > > > > > > > > > > > > > Nope. KeyValueWithDestination is just an example of output from the > > > > first Processor and is not included in actual messages that the topic > > > > foo received. > > > > Let me explain bit more realistic use-case. How can we write a > > > > Processor like below in High-level DSL cleanly? > > > > > > > > ```java > > > > public class EventProcessor implements Processor<String, Event> { > > > > ... > > > > @Override > > > > public void process(String key, Event value) { > > > > EventMetadata meta = > > > > getEventMetadataFromExternalStorage(value.getId()); > > > > > > > > if (isFieldACorrupted(meta, value.getFieldA())) { > > > > // This event is corrupted! let's evacuate it once to the > > > > grave topic for further investigation. > > > > context.forward(key, value, "CorruptedEventSink"); > > > > } > > > > if (isFieldBCorrupted(meta, value.getFieldB())) { > > > > // Antoher case of corruption, but maybe recoverable. > > > > context.forward(key, value, > > "CorruptedEventRecoveryProcessor"); > > > > } > > > > > > > > for (Foo foo : event.getFoos()) { > > > > context.forward(key, buildMessage(meta, foo), > > "FooProcessor"); > > > > } > > > > } > > > > ... > > > > } > > > > ``` > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA < > > > > kawamuray.dad...@gmail.com> > > > > > wrote: > > > > > > > > > >> Hi Guozhang, > > > > >> > > > > >> > > > > >> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > > > > >> > Hi Yuto, > > > > >> > > > > > >> > That is a good suggestion, the child index is not very intuitive > > > from > > > > >> > programmer's view and we can even consider replacing it with the > > > > >> processor > > > > >> > name instead of overloading it. Could you file a JIRA? > > > > >> > > > > > >> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 > > > > >> > > > > >> > Also I am wondering if you have looked at the higher-level > Streams > > > > DSL, > > > > >> and > > > > >> > if yes could let me know what are the limitations from using > that > > > > APIs in > > > > >> > your case? > > > > >> > > > > > >> > > > > >> Well, I read though high-level DSL interface but couldn't find an > > easy > > > > >> way to handle output from Processors which could issue multiple > > > > >> messages to arbitrary different destinations. > > > > >> Maybe it could be done by doing something like below but it > doesn't > > > > >> look good. Please let me know if you have any idea to do this in > > > > >> easier way. > > > > >> > > > > >> ```java > > > > >> class KeyValueWithDestination { > > > > >> K key; > > > > >> V value; > > > > >> String destination; > > > > >> } > > > > >> > > > > >> class DestinationPredicate implements Predicate<K, > > > > >> KeyValueWithDestination> { > > > > >> String destination; > > > > >> @Override > > > > >> public boolean test(K key, KeyValueWithDestination value) { > > > > >> return value.destination.equals(destination); > > > > >> } > > > > >> } > > > > >> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"}; > > > > >> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates = > > > > >> Arrays.stream(destTopics).map(DestinationPredicate::new) > > > > >> .toArray(Predicate<K, > > > > >> KeyValueWithDestination>::new); > > > > >> > > > > >> branches = builder.stream("foo") > > > > >> .map((key, value) -> processor.process(key, > value) > > > > >> /* => KeyValueWithDestination */) > > > > >> .branch(predicates); > > > > >> > > > > >> for (int i = 0; i < branches.length; i++) { > > > > >> branches[i].to(destTopics[i]); > > > > >> } > > > > >> ``` > > > > >> > > > > >> > > > > >> > Guozhang > > > > >> > > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA < > > > > >> kawamuray.dad...@gmail.com> > > > > >> > wrote: > > > > >> > > > > > >> >> When I tried to implement a task which does kinda dispatching > to > > > > >> >> downstream processors or sinks, looks like relying on > > > > >> >> context.forward(K, V, int childIndex) is the only way now. > > > > >> >> I have a question why this method implemented using > > > childIndex(which > > > > >> >> is just an index of children "List" that based on order of > > > > >> >> builder.addProcessor() call) instead of child name(first > argument > > > to > > > > >> >> add{Processor,Sink}). > > > > >> >> I wanna ask what is the concrete use case of forward(K, V, int > > > > >> >> childIndex) and is it makes sense to introduce another > overload: > > > > >> >> forward(K, V, String childName) for much handy use. > > > > >> >> Currently I have a use-case like this in my mind: > > > > >> >> ``` > > > > >> >> builder.addProcessor("DispatchProcess", new > > > > >> >> DispatchProcessorSupplier(), "Source"); > > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(), > > > > >> >> "DispatchProcess"); > > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(), > > > > >> >> "DispatchProcess"); > > > > >> >> > > > > >> >> // in process(key, value) > > > > >> >> if ("key-for-A".equals(key)) { > > > > >> >> context.forward(key, value, "Process-A"); > > > > >> >> } else if ("key-for-B".equals(key)) { > > > > >> >> context.forward(key, value, "Process-B"); > > > > >> >> } > > > > >> >> ``` > > > > >> >> > > > > >> > > > > > >> > > > > > >> > > > > > >> > -- > > > > >> > -- Guozhang > > > > >> > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >