2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: > Hi Yuto, > > Is the destination topic embedded as part of the value in the original > "foo" topic? If yes could you just access that field directly instead of > mapping to a (key, value, destination) triplet? >
Nope. KeyValueWithDestination is just an example of output from the first Processor and is not included in actual messages that the topic foo received. Let me explain bit more realistic use-case. How can we write a Processor like below in High-level DSL cleanly? ```java public class EventProcessor implements Processor<String, Event> { ... @Override public void process(String key, Event value) { EventMetadata meta = getEventMetadataFromExternalStorage(value.getId()); if (isFieldACorrupted(meta, value.getFieldA())) { // This event is corrupted! let's evacuate it once to the grave topic for further investigation. context.forward(key, value, "CorruptedEventSink"); } if (isFieldBCorrupted(meta, value.getFieldB())) { // Antoher case of corruption, but maybe recoverable. context.forward(key, value, "CorruptedEventRecoveryProcessor"); } for (Foo foo : event.getFoos()) { context.forward(key, buildMessage(meta, foo), "FooProcessor"); } } ... } ``` > Guozhang > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <kawamuray.dad...@gmail.com> > wrote: > >> Hi Guozhang, >> >> >> >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>: >> > Hi Yuto, >> > >> > That is a good suggestion, the child index is not very intuitive from >> > programmer's view and we can even consider replacing it with the >> processor >> > name instead of overloading it. Could you file a JIRA? >> > >> >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497 >> >> > Also I am wondering if you have looked at the higher-level Streams DSL, >> and >> > if yes could let me know what are the limitations from using that APIs in >> > your case? >> > >> >> Well, I read though high-level DSL interface but couldn't find an easy >> way to handle output from Processors which could issue multiple >> messages to arbitrary different destinations. >> Maybe it could be done by doing something like below but it doesn't >> look good. Please let me know if you have any idea to do this in >> easier way. >> >> ```java >> class KeyValueWithDestination { >> K key; >> V value; >> String destination; >> } >> >> class DestinationPredicate implements Predicate<K, >> KeyValueWithDestination> { >> String destination; >> @Override >> public boolean test(K key, KeyValueWithDestination value) { >> return value.destination.equals(destination); >> } >> } >> >> String[] destTopics = {"topicA", "topicB", "topicC"}; >> >> Predicate<K, KeyValueWithDestination>[] predicates = >> Arrays.stream(destTopics).map(DestinationPredicate::new) >> .toArray(Predicate<K, >> KeyValueWithDestination>::new); >> >> branches = builder.stream("foo") >> .map((key, value) -> processor.process(key, value) >> /* => KeyValueWithDestination */) >> .branch(predicates); >> >> for (int i = 0; i < branches.length; i++) { >> branches[i].to(destTopics[i]); >> } >> ``` >> >> >> > Guozhang >> > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA < >> kawamuray.dad...@gmail.com> >> > wrote: >> > >> >> When I tried to implement a task which does kinda dispatching to >> >> downstream processors or sinks, looks like relying on >> >> context.forward(K, V, int childIndex) is the only way now. >> >> I have a question why this method implemented using childIndex(which >> >> is just an index of children "List" that based on order of >> >> builder.addProcessor() call) instead of child name(first argument to >> >> add{Processor,Sink}). >> >> I wanna ask what is the concrete use case of forward(K, V, int >> >> childIndex) and is it makes sense to introduce another overload: >> >> forward(K, V, String childName) for much handy use. >> >> Currently I have a use-case like this in my mind: >> >> ``` >> >> builder.addProcessor("DispatchProcess", new >> >> DispatchProcessorSupplier(), "Source"); >> >> builder.addProcessor("Process-A", new ProcessorASupplier(), >> >> "DispatchProcess"); >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(), >> >> "DispatchProcess"); >> >> >> >> // in process(key, value) >> >> if ("key-for-A".equals(key)) { >> >> context.forward(key, value, "Process-A"); >> >> } else if ("key-for-B".equals(key)) { >> >> context.forward(key, value, "Process-B"); >> >> } >> >> ``` >> >> >> > >> > >> > >> > -- >> > -- Guozhang >> > > > > -- > -- Guozhang