Hi Yuto, Is the destination topic embedded as part of the value in the original "foo" topic? If yes could you just access that field directly instead of mapping to a (key, value, destination) triplet?
Guozhang On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <kawamuray.dad...@gmail.com> wrote: > Hi Guozhang, > > > > 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > > Hi Yuto, > > > > That is a good suggestion, the child index is not very intuitive from > > programmer's view and we can even consider replacing it with the > processor > > name instead of overloading it. Could you file a JIRA? > > > > Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 > > > Also I am wondering if you have looked at the higher-level Streams DSL, > and > > if yes could let me know what are the limitations from using that APIs in > > your case? > > > > Well, I read though high-level DSL interface but couldn't find an easy > way to handle output from Processors which could issue multiple > messages to arbitrary different destinations. > Maybe it could be done by doing something like below but it doesn't > look good. Please let me know if you have any idea to do this in > easier way. > > ```java > class KeyValueWithDestination { > K key; > V value; > String destination; > } > > class DestinationPredicate implements Predicate<K, > KeyValueWithDestination> { > String destination; > @Override > public boolean test(K key, KeyValueWithDestination value) { > return value.destination.equals(destination); > } > } > > String[] destTopics = {"topicA", "topicB", "topicC"}; > > Predicate<K, KeyValueWithDestination>[] predicates = > Arrays.stream(destTopics).map(DestinationPredicate::new) > .toArray(Predicate<K, > KeyValueWithDestination>::new); > > branches = builder.stream("foo") > .map((key, value) -> processor.process(key, value) > /* => KeyValueWithDestination */) > .branch(predicates); > > for (int i = 0; i < branches.length; i++) { > branches[i].to(destTopics[i]); > } > ``` > > > > Guozhang > > > > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA < > kawamuray.dad...@gmail.com> > > wrote: > > > >> When I tried to implement a task which does kinda dispatching to > >> downstream processors or sinks, looks like relying on > >> context.forward(K, V, int childIndex) is the only way now. > >> I have a question why this method implemented using childIndex(which > >> is just an index of children "List" that based on order of > >> builder.addProcessor() call) instead of child name(first argument to > >> add{Processor,Sink}). > >> I wanna ask what is the concrete use case of forward(K, V, int > >> childIndex) and is it makes sense to introduce another overload: > >> forward(K, V, String childName) for much handy use. > >> Currently I have a use-case like this in my mind: > >> ``` > >> builder.addProcessor("DispatchProcess", new > >> DispatchProcessorSupplier(), "Source"); > >> builder.addProcessor("Process-A", new ProcessorASupplier(), > >> "DispatchProcess"); > >> builder.addProcessor("Process-B", new ProcessorBSupplier(), > >> "DispatchProcess"); > >> > >> // in process(key, value) > >> if ("key-for-A".equals(key)) { > >> context.forward(key, value, "Process-A"); > >> } else if ("key-for-B".equals(key)) { > >> context.forward(key, value, "Process-B"); > >> } > >> ``` > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang