Thanks Yuto for your code snippet. Since you need to access a customized
external storage for metadata, that indeed cannot be wrapped in any
built-in operators in the Streams DSL yet, and your code example in the
previous email would be close to the best you can do with the high-level
DSL now.

One minor improvement from your above code, though, is that instead of
calling map(... -> process()) you can actually call transform(), which
still allows you to provide a customized transformer function, but it still
gives you strong typing assuming all these three kinds of records are of
the same key / value types.

Guozhang




On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <kawamuray.dad...@gmail.com>
wrote:

> 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>:
> > Hi Yuto,
> >
> > Is the destination topic embedded as part of the value in the original
> > "foo" topic? If yes could you just access that field directly instead of
> > mapping to a (key, value, destination) triplet?
> >
>
> Nope. KeyValueWithDestination is just an example of output from the
> first Processor and is not included in actual messages that the topic
> foo received.
> Let me explain bit more realistic use-case. How can we write a
> Processor like below in High-level DSL cleanly?
>
> ```java
> public class EventProcessor implements Processor<String, Event> {
> ...
>   @Override
>   public void process(String key, Event value) {
>       EventMetadata meta =
> getEventMetadataFromExternalStorage(value.getId());
>
>       if (isFieldACorrupted(meta, value.getFieldA())) {
>           // This event is corrupted! let's evacuate it once to the
> grave topic for further investigation.
>           context.forward(key, value, "CorruptedEventSink");
>       }
>       if (isFieldBCorrupted(meta, value.getFieldB())) {
>           // Antoher case of corruption, but maybe recoverable.
>           context.forward(key, value, "CorruptedEventRecoveryProcessor");
>       }
>
>       for (Foo foo : event.getFoos()) {
>           context.forward(key, buildMessage(meta, foo), "FooProcessor");
>       }
>   }
> ...
> }
> ```
>
>
> > Guozhang
> >
> > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> kawamuray.dad...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >>
> >>
> >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>:
> >> > Hi Yuto,
> >> >
> >> > That is a good suggestion, the child index is not very intuitive from
> >> > programmer's view and we can even consider replacing it with the
> >> processor
> >> > name instead of overloading it. Could you file a JIRA?
> >> >
> >>
> >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> >>
> >> > Also I am wondering if you have looked at the higher-level Streams
> DSL,
> >> and
> >> > if yes could let me know what are the limitations from using that
> APIs in
> >> > your case?
> >> >
> >>
> >> Well, I read though high-level DSL interface but couldn't find an easy
> >> way to handle output from Processors which could issue multiple
> >> messages to arbitrary different destinations.
> >> Maybe it could be done by doing something like below but it doesn't
> >> look good. Please let me know if you have any idea to do this in
> >> easier way.
> >>
> >> ```java
> >> class KeyValueWithDestination {
> >>     K key;
> >>     V value;
> >>     String destination;
> >> }
> >>
> >> class DestinationPredicate implements Predicate<K,
> >> KeyValueWithDestination> {
> >>     String destination;
> >>     @Override
> >>     public boolean test(K key, KeyValueWithDestination value) {
> >>         return value.destination.equals(destination);
> >>     }
> >> }
> >>
> >> String[] destTopics = {"topicA", "topicB", "topicC"};
> >>
> >> Predicate<K, KeyValueWithDestination>[] predicates =
> >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> >>                                  .toArray(Predicate<K,
> >> KeyValueWithDestination>::new);
> >>
> >> branches = builder.stream("foo")
> >>                   .map((key, value) -> processor.process(key, value)
> >> /* => KeyValueWithDestination */)
> >>                   .branch(predicates);
> >>
> >> for (int i = 0; i < branches.length; i++) {
> >>     branches[i].to(destTopics[i]);
> >> }
> >> ```
> >>
> >>
> >> > Guozhang
> >> >
> >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> >> kawamuray.dad...@gmail.com>
> >> > wrote:
> >> >
> >> >> When I tried to implement a task which does kinda dispatching to
> >> >> downstream processors or sinks, looks like relying on
> >> >> context.forward(K, V, int childIndex) is the only way now.
> >> >> I have a question why this method implemented using childIndex(which
> >> >> is just an index of children "List" that based on order of
> >> >> builder.addProcessor() call) instead of child name(first argument to
> >> >> add{Processor,Sink}).
> >> >> I wanna ask what is the concrete use case of forward(K, V, int
> >> >> childIndex) and is it makes sense to introduce another overload:
> >> >> forward(K, V, String childName) for much handy use.
> >> >> Currently I have a use-case like this in my mind:
> >> >> ```
> >> >> builder.addProcessor("DispatchProcess", new
> >> >> DispatchProcessorSupplier(), "Source");
> >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> >> >> "DispatchProcess");
> >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> >> >> "DispatchProcess");
> >> >>
> >> >> // in process(key, value)
> >> >> if ("key-for-A".equals(key)) {
> >> >>     context.forward(key, value, "Process-A");
> >> >> } else if ("key-for-B".equals(key)) {
> >> >>     context.forward(key, value, "Process-B");
> >> >> }
> >> >> ```
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to