Hi Yuto,

Is the destination topic embedded as part of the value in the original
"foo" topic? If yes could you just access that field directly instead of
mapping to a (key, value, destination) triplet?

Guozhang

On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <kawamuray.dad...@gmail.com>
wrote:

> Hi Guozhang,
>
>
>
> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>:
> > Hi Yuto,
> >
> > That is a good suggestion, the child index is not very intuitive from
> > programmer's view and we can even consider replacing it with the
> processor
> > name instead of overloading it. Could you file a JIRA?
> >
>
> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
>
> > Also I am wondering if you have looked at the higher-level Streams DSL,
> and
> > if yes could let me know what are the limitations from using that APIs in
> > your case?
> >
>
> Well, I read though high-level DSL interface but couldn't find an easy
> way to handle output from Processors which could issue multiple
> messages to arbitrary different destinations.
> Maybe it could be done by doing something like below but it doesn't
> look good. Please let me know if you have any idea to do this in
> easier way.
>
> ```java
> class KeyValueWithDestination {
>     K key;
>     V value;
>     String destination;
> }
>
> class DestinationPredicate implements Predicate<K,
> KeyValueWithDestination> {
>     String destination;
>     @Override
>     public boolean test(K key, KeyValueWithDestination value) {
>         return value.destination.equals(destination);
>     }
> }
>
> String[] destTopics = {"topicA", "topicB", "topicC"};
>
> Predicate<K, KeyValueWithDestination>[] predicates =
>         Arrays.stream(destTopics).map(DestinationPredicate::new)
>                                  .toArray(Predicate<K,
> KeyValueWithDestination>::new);
>
> branches = builder.stream("foo")
>                   .map((key, value) -> processor.process(key, value)
> /* => KeyValueWithDestination */)
>                   .branch(predicates);
>
> for (int i = 0; i < branches.length; i++) {
>     branches[i].to(destTopics[i]);
> }
> ```
>
>
> > Guozhang
> >
> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> kawamuray.dad...@gmail.com>
> > wrote:
> >
> >> When I tried to implement a task which does kinda dispatching to
> >> downstream processors or sinks, looks like relying on
> >> context.forward(K, V, int childIndex) is the only way now.
> >> I have a question why this method implemented using childIndex(which
> >> is just an index of children "List" that based on order of
> >> builder.addProcessor() call) instead of child name(first argument to
> >> add{Processor,Sink}).
> >> I wanna ask what is the concrete use case of forward(K, V, int
> >> childIndex) and is it makes sense to introduce another overload:
> >> forward(K, V, String childName) for much handy use.
> >> Currently I have a use-case like this in my mind:
> >> ```
> >> builder.addProcessor("DispatchProcess", new
> >> DispatchProcessorSupplier(), "Source");
> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> >> "DispatchProcess");
> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> >> "DispatchProcess");
> >>
> >> // in process(key, value)
> >> if ("key-for-A".equals(key)) {
> >>     context.forward(key, value, "Process-A");
> >> } else if ("key-for-B".equals(key)) {
> >>     context.forward(key, value, "Process-B");
> >> }
> >> ```
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to