Hi Guozhang,
2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > Hi Yuto, > > That is a good suggestion, the child index is not very intuitive from > programmer's view and we can even consider replacing it with the processor > name instead of overloading it. Could you file a JIRA? > Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 > Also I am wondering if you have looked at the higher-level Streams DSL, and > if yes could let me know what are the limitations from using that APIs in > your case? > Well, I read though high-level DSL interface but couldn't find an easy way to handle output from Processors which could issue multiple messages to arbitrary different destinations. Maybe it could be done by doing something like below but it doesn't look good. Please let me know if you have any idea to do this in easier way. ```java class KeyValueWithDestination { K key; V value; String destination; } class DestinationPredicate implements Predicate<K, KeyValueWithDestination> { String destination; @Override public boolean test(K key, KeyValueWithDestination value) { return value.destination.equals(destination); } } String[] destTopics = {"topicA", "topicB", "topicC"}; Predicate<K, KeyValueWithDestination>[] predicates = Arrays.stream(destTopics).map(DestinationPredicate::new) .toArray(Predicate<K, KeyValueWithDestination>::new); branches = builder.stream("foo") .map((key, value) -> processor.process(key, value) /* => KeyValueWithDestination */) .branch(predicates); for (int i = 0; i < branches.length; i++) { branches[i].to(destTopics[i]); } ``` > Guozhang > > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <kawamuray.dad...@gmail.com> > wrote: > >> When I tried to implement a task which does kinda dispatching to >> downstream processors or sinks, looks like relying on >> context.forward(K, V, int childIndex) is the only way now. >> I have a question why this method implemented using childIndex(which >> is just an index of children "List" that based on order of >> builder.addProcessor() call) instead of child name(first argument to >> add{Processor,Sink}). >> I wanna ask what is the concrete use case of forward(K, V, int >> childIndex) and is it makes sense to introduce another overload: >> forward(K, V, String childName) for much handy use. >> Currently I have a use-case like this in my mind: >> ``` >> builder.addProcessor("DispatchProcess", new >> DispatchProcessorSupplier(), "Source"); >> builder.addProcessor("Process-A", new ProcessorASupplier(), >> "DispatchProcess"); >> builder.addProcessor("Process-B", new ProcessorBSupplier(), >> "DispatchProcess"); >> >> // in process(key, value) >> if ("key-for-A".equals(key)) { >> context.forward(key, value, "Process-A"); >> } else if ("key-for-B".equals(key)) { >> context.forward(key, value, "Process-B"); >> } >> ``` >> > > > > -- > -- Guozhang