2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangg...@gmail.com>:
> Hi Yuto,
>
> Is the destination topic embedded as part of the value in the original
> "foo" topic? If yes could you just access that field directly instead of
> mapping to a (key, value, destination) triplet?
>

Nope. KeyValueWithDestination is just an example of output from the
first Processor and is not included in actual messages that the topic
foo received.
Let me explain bit more realistic use-case. How can we write a
Processor like below in High-level DSL cleanly?

```java
public class EventProcessor implements Processor<String, Event> {
...
  @Override
  public void process(String key, Event value) {
      EventMetadata meta = getEventMetadataFromExternalStorage(value.getId());

      if (isFieldACorrupted(meta, value.getFieldA())) {
          // This event is corrupted! let's evacuate it once to the
grave topic for further investigation.
          context.forward(key, value, "CorruptedEventSink");
      }
      if (isFieldBCorrupted(meta, value.getFieldB())) {
          // Antoher case of corruption, but maybe recoverable.
          context.forward(key, value, "CorruptedEventRecoveryProcessor");
      }

      for (Foo foo : event.getFoos()) {
          context.forward(key, buildMessage(meta, foo), "FooProcessor");
      }
  }
...
}
```


> Guozhang
>
> On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <kawamuray.dad...@gmail.com>
> wrote:
>
>> Hi Guozhang,
>>
>>
>>
>> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangg...@gmail.com>:
>> > Hi Yuto,
>> >
>> > That is a good suggestion, the child index is not very intuitive from
>> > programmer's view and we can even consider replacing it with the
>> processor
>> > name instead of overloading it. Could you file a JIRA?
>> >
>>
>> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
>>
>> > Also I am wondering if you have looked at the higher-level Streams DSL,
>> and
>> > if yes could let me know what are the limitations from using that APIs in
>> > your case?
>> >
>>
>> Well, I read though high-level DSL interface but couldn't find an easy
>> way to handle output from Processors which could issue multiple
>> messages to arbitrary different destinations.
>> Maybe it could be done by doing something like below but it doesn't
>> look good. Please let me know if you have any idea to do this in
>> easier way.
>>
>> ```java
>> class KeyValueWithDestination {
>>     K key;
>>     V value;
>>     String destination;
>> }
>>
>> class DestinationPredicate implements Predicate<K,
>> KeyValueWithDestination> {
>>     String destination;
>>     @Override
>>     public boolean test(K key, KeyValueWithDestination value) {
>>         return value.destination.equals(destination);
>>     }
>> }
>>
>> String[] destTopics = {"topicA", "topicB", "topicC"};
>>
>> Predicate<K, KeyValueWithDestination>[] predicates =
>>         Arrays.stream(destTopics).map(DestinationPredicate::new)
>>                                  .toArray(Predicate<K,
>> KeyValueWithDestination>::new);
>>
>> branches = builder.stream("foo")
>>                   .map((key, value) -> processor.process(key, value)
>> /* => KeyValueWithDestination */)
>>                   .branch(predicates);
>>
>> for (int i = 0; i < branches.length; i++) {
>>     branches[i].to(destTopics[i]);
>> }
>> ```
>>
>>
>> > Guozhang
>> >
>> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
>> kawamuray.dad...@gmail.com>
>> > wrote:
>> >
>> >> When I tried to implement a task which does kinda dispatching to
>> >> downstream processors or sinks, looks like relying on
>> >> context.forward(K, V, int childIndex) is the only way now.
>> >> I have a question why this method implemented using childIndex(which
>> >> is just an index of children "List" that based on order of
>> >> builder.addProcessor() call) instead of child name(first argument to
>> >> add{Processor,Sink}).
>> >> I wanna ask what is the concrete use case of forward(K, V, int
>> >> childIndex) and is it makes sense to introduce another overload:
>> >> forward(K, V, String childName) for much handy use.
>> >> Currently I have a use-case like this in my mind:
>> >> ```
>> >> builder.addProcessor("DispatchProcess", new
>> >> DispatchProcessorSupplier(), "Source");
>> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
>> >> "DispatchProcess");
>> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
>> >> "DispatchProcess");
>> >>
>> >> // in process(key, value)
>> >> if ("key-for-A".equals(key)) {
>> >>     context.forward(key, value, "Process-A");
>> >> } else if ("key-for-B".equals(key)) {
>> >>     context.forward(key, value, "Process-B");
>> >> }
>> >> ```
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Reply via email to