Thanks Yuto for your code snippet. Since you need to access a customized external storage for metadata, that indeed cannot be wrapped in any built-in operators in the Streams DSL yet, and your code example in the previous email would be close to the best you can do with the high-level DSL now.
One minor improvement from your above code, though, is that instead of calling map(... -> process()) you can actually call transform(), which still allows you to provide a customized transformer function, but it still gives you strong typing assuming all these three kinds of records are of the same key / value types. Guozhang On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <kawamuray.dad...@gmail.com> wrote: > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > > Hi Yuto, > > > > Is the destination topic embedded as part of the value in the original > > "foo" topic? If yes could you just access that field directly instead of > > mapping to a (key, value, destination) triplet? > > > > Nope. KeyValueWithDestination is just an example of output from the > first Processor and is not included in actual messages that the topic > foo received. > Let me explain bit more realistic use-case. How can we write a > Processor like below in High-level DSL cleanly? > > ```java > public class EventProcessor implements Processor<String, Event> { > ... > @Override > public void process(String key, Event value) { > EventMetadata meta = > getEventMetadataFromExternalStorage(value.getId()); > > if (isFieldACorrupted(meta, value.getFieldA())) { > // This event is corrupted! let's evacuate it once to the > grave topic for further investigation. > context.forward(key, value, "CorruptedEventSink"); > } > if (isFieldBCorrupted(meta, value.getFieldB())) { > // Antoher case of corruption, but maybe recoverable. > context.forward(key, value, "CorruptedEventRecoveryProcessor"); > } > > for (Foo foo : event.getFoos()) { > context.forward(key, buildMessage(meta, foo), "FooProcessor"); > } > } > ... > } > ``` > > > > Guozhang > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA < > kawamuray.dad...@gmail.com> > > wrote: > > > >> Hi Guozhang, > >> > >> > >> > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > >> > Hi Yuto, > >> > > >> > That is a good suggestion, the child index is not very intuitive from > >> > programmer's view and we can even consider replacing it with the > >> processor > >> > name instead of overloading it. Could you file a JIRA? > >> > > >> > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 > >> > >> > Also I am wondering if you have looked at the higher-level Streams > DSL, > >> and > >> > if yes could let me know what are the limitations from using that > APIs in > >> > your case? > >> > > >> > >> Well, I read though high-level DSL interface but couldn't find an easy > >> way to handle output from Processors which could issue multiple > >> messages to arbitrary different destinations. > >> Maybe it could be done by doing something like below but it doesn't > >> look good. Please let me know if you have any idea to do this in > >> easier way. > >> > >> ```java > >> class KeyValueWithDestination { > >> K key; > >> V value; > >> String destination; > >> } > >> > >> class DestinationPredicate implements Predicate<K, > >> KeyValueWithDestination> { > >> String destination; > >> @Override > >> public boolean test(K key, KeyValueWithDestination value) { > >> return value.destination.equals(destination); > >> } > >> } > >> > >> String[] destTopics = {"topicA", "topicB", "topicC"}; > >> > >> Predicate<K, KeyValueWithDestination>[] predicates = > >> Arrays.stream(destTopics).map(DestinationPredicate::new) > >> .toArray(Predicate<K, > >> KeyValueWithDestination>::new); > >> > >> branches = builder.stream("foo") > >> .map((key, value) -> processor.process(key, value) > >> /* => KeyValueWithDestination */) > >> .branch(predicates); > >> > >> for (int i = 0; i < branches.length; i++) { > >> branches[i].to(destTopics[i]); > >> } > >> ``` > >> > >> > >> > Guozhang > >> > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA < > >> kawamuray.dad...@gmail.com> > >> > wrote: > >> > > >> >> When I tried to implement a task which does kinda dispatching to > >> >> downstream processors or sinks, looks like relying on > >> >> context.forward(K, V, int childIndex) is the only way now. > >> >> I have a question why this method implemented using childIndex(which > >> >> is just an index of children "List" that based on order of > >> >> builder.addProcessor() call) instead of child name(first argument to > >> >> add{Processor,Sink}). > >> >> I wanna ask what is the concrete use case of forward(K, V, int > >> >> childIndex) and is it makes sense to introduce another overload: > >> >> forward(K, V, String childName) for much handy use. > >> >> Currently I have a use-case like this in my mind: > >> >> ``` > >> >> builder.addProcessor("DispatchProcess", new > >> >> DispatchProcessorSupplier(), "Source"); > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(), > >> >> "DispatchProcess"); > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(), > >> >> "DispatchProcess"); > >> >> > >> >> // in process(key, value) > >> >> if ("key-for-A".equals(key)) { > >> >> context.forward(key, value, "Process-A"); > >> >> } else if ("key-for-B".equals(key)) { > >> >> context.forward(key, value, "Process-B"); > >> >> } > >> >> ``` > >> >> > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > > > > > > > -- > > -- Guozhang > -- -- Guozhang