Hi Josh, I think there are a few issues that we want to resolve here, which could be orthogonal to each other.
1) one-to-many mapping in transform() function that generates a single stream (i.e. single typed key-value pairs). Since transform() already enforces to make type-safe return values, one thing we can do is to change the punctuate() function return value from "null" to "R" as well. And then for one-to-many mapping one can then define R as Array<MyType> stream.transform().flatMap(/* from Array<MyType> to MyType*/) 2) having a new function that takes one stream, and generate multiple streams with different key-value types. This is a good-to-have operator in the Streams DSL, and I think this is your proposed new API in the previous email? I am not sure I understand the "telescoping" arity completely though, so let me know if I'm wrong. 3) having data-driven emission policy (this will be the building block of session windows) as well as time-drive emission policy. I am thinking about how to support this as well, one thing is that we can use the underlying process() function for data-driven emission, for example, if there is a session-start / end flag then create the corresponding session record in state, and only emit upon session-end flag; and the underlying punctuate() function for time-drive emission (we probably need to first refactor it to be triggered by record timestamp instead of wallclock time). Guozhang On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <jos...@gmail.com> wrote: > Hi all, > > Just chiming in with Yuto: I think the custom Processor becomes attractive > in scenarios where a node in the graph may emit to a variety of downstream > paths, possibly after some delay, depending on logic. This can probably > often be achieved with the existing DSL using some combination of > predicates and intermediate representations, but this involves contortions > that feel cumbersome, and probably leads to less intelligible code. I'm > also not sure the current DSL can model scenarios where the transformation > may be one-to-many, as in the last part of Yuto's example, or where the > emission-delay is data-driven, as in my earlier "sessionization" example. > > One idea I'd offer is to provide a mechanism for wiring in Processors with > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1, O2>, > etc), and providing each arity with type-safe forwarding interfaces for > each output stream (eg, Forwarder<T>). This assigns each output-stream a > clear ordinal, and suggests a corresponding type-safe return-type for the > DSL (eg, KStreamTuple2<O1, O2>). > > I think this pattern could provide a unification of the 'Transformer' and > 'Processor' APIs. > This was what I had in mind for a PR we discussed earlier (for modifying > the Transformer API), but the scope expanded beyond what I felt comfortable > submitting without discussion, and I had to prioritize other efforts. > Regardless, I could get a WIP branch pushed to github later today to > illustrate if you'd like to see it. > > HTH, > -josh > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Thanks Yuto for your code snippet. Since you need to access a customized > > external storage for metadata, that indeed cannot be wrapped in any > > built-in operators in the Streams DSL yet, and your code example in the > > previous email would be close to the best you can do with the high-level > > DSL now. > > > > One minor improvement from your above code, though, is that instead of > > calling map(... -> process()) you can actually call transform(), which > > still allows you to provide a customized transformer function, but it > still > > gives you strong typing assuming all these three kinds of records are of > > the same key / value types. > > > > Guozhang > > > > > > > > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA < > kawamuray.dad...@gmail.com > > > > > wrote: > > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > > > > Hi Yuto, > > > > > > > > Is the destination topic embedded as part of the value in the > original > > > > "foo" topic? If yes could you just access that field directly instead > > of > > > > mapping to a (key, value, destination) triplet? > > > > > > > > > > Nope. KeyValueWithDestination is just an example of output from the > > > first Processor and is not included in actual messages that the topic > > > foo received. > > > Let me explain bit more realistic use-case. How can we write a > > > Processor like below in High-level DSL cleanly? > > > > > > ```java > > > public class EventProcessor implements Processor<String, Event> { > > > ... > > > @Override > > > public void process(String key, Event value) { > > > EventMetadata meta = > > > getEventMetadataFromExternalStorage(value.getId()); > > > > > > if (isFieldACorrupted(meta, value.getFieldA())) { > > > // This event is corrupted! let's evacuate it once to the > > > grave topic for further investigation. > > > context.forward(key, value, "CorruptedEventSink"); > > > } > > > if (isFieldBCorrupted(meta, value.getFieldB())) { > > > // Antoher case of corruption, but maybe recoverable. > > > context.forward(key, value, > "CorruptedEventRecoveryProcessor"); > > > } > > > > > > for (Foo foo : event.getFoos()) { > > > context.forward(key, buildMessage(meta, foo), > "FooProcessor"); > > > } > > > } > > > ... > > > } > > > ``` > > > > > > > > > > Guozhang > > > > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA < > > > kawamuray.dad...@gmail.com> > > > > wrote: > > > > > > > >> Hi Guozhang, > > > >> > > > >> > > > >> > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > > > >> > Hi Yuto, > > > >> > > > > >> > That is a good suggestion, the child index is not very intuitive > > from > > > >> > programmer's view and we can even consider replacing it with the > > > >> processor > > > >> > name instead of overloading it. Could you file a JIRA? > > > >> > > > > >> > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 > > > >> > > > >> > Also I am wondering if you have looked at the higher-level Streams > > > DSL, > > > >> and > > > >> > if yes could let me know what are the limitations from using that > > > APIs in > > > >> > your case? > > > >> > > > > >> > > > >> Well, I read though high-level DSL interface but couldn't find an > easy > > > >> way to handle output from Processors which could issue multiple > > > >> messages to arbitrary different destinations. > > > >> Maybe it could be done by doing something like below but it doesn't > > > >> look good. Please let me know if you have any idea to do this in > > > >> easier way. > > > >> > > > >> ```java > > > >> class KeyValueWithDestination { > > > >> K key; > > > >> V value; > > > >> String destination; > > > >> } > > > >> > > > >> class DestinationPredicate implements Predicate<K, > > > >> KeyValueWithDestination> { > > > >> String destination; > > > >> @Override > > > >> public boolean test(K key, KeyValueWithDestination value) { > > > >> return value.destination.equals(destination); > > > >> } > > > >> } > > > >> > > > >> String[] destTopics = {"topicA", "topicB", "topicC"}; > > > >> > > > >> Predicate<K, KeyValueWithDestination>[] predicates = > > > >> Arrays.stream(destTopics).map(DestinationPredicate::new) > > > >> .toArray(Predicate<K, > > > >> KeyValueWithDestination>::new); > > > >> > > > >> branches = builder.stream("foo") > > > >> .map((key, value) -> processor.process(key, value) > > > >> /* => KeyValueWithDestination */) > > > >> .branch(predicates); > > > >> > > > >> for (int i = 0; i < branches.length; i++) { > > > >> branches[i].to(destTopics[i]); > > > >> } > > > >> ``` > > > >> > > > >> > > > >> > Guozhang > > > >> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA < > > > >> kawamuray.dad...@gmail.com> > > > >> > wrote: > > > >> > > > > >> >> When I tried to implement a task which does kinda dispatching to > > > >> >> downstream processors or sinks, looks like relying on > > > >> >> context.forward(K, V, int childIndex) is the only way now. > > > >> >> I have a question why this method implemented using > > childIndex(which > > > >> >> is just an index of children "List" that based on order of > > > >> >> builder.addProcessor() call) instead of child name(first argument > > to > > > >> >> add{Processor,Sink}). > > > >> >> I wanna ask what is the concrete use case of forward(K, V, int > > > >> >> childIndex) and is it makes sense to introduce another overload: > > > >> >> forward(K, V, String childName) for much handy use. > > > >> >> Currently I have a use-case like this in my mind: > > > >> >> ``` > > > >> >> builder.addProcessor("DispatchProcess", new > > > >> >> DispatchProcessorSupplier(), "Source"); > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(), > > > >> >> "DispatchProcess"); > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(), > > > >> >> "DispatchProcess"); > > > >> >> > > > >> >> // in process(key, value) > > > >> >> if ("key-for-A".equals(key)) { > > > >> >> context.forward(key, value, "Process-A"); > > > >> >> } else if ("key-for-B".equals(key)) { > > > >> >> context.forward(key, value, "Process-B"); > > > >> >> } > > > >> >> ``` > > > >> >> > > > >> > > > > >> > > > > >> > > > > >> > -- > > > >> > -- Guozhang > > > >> > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang