[ 
https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504856#comment-16504856
 ] 

Guozhang Wang commented on KAFKA-5924:
--------------------------------------

Thanks for the explanation, I think I understand your request now.

Note that the reason for a `ProcessorSupplier` and `TransformerSupplier` 
instead of letting users to directly pass in a Processor / Transformer is that, 
there could have been multiple parallel tasks being executed, each with a 
"copy" of the topology to run, i.e. consider if the transformer / processor is 
stateful, we cannot let the same object to be shared among different tasks' 
topologies.

For stateless processing logic, like println, of course the same processor, or 
simply a lambda function can be reused among topologies, but to Kafka Streams 
library it is a black box so it cannot safely tell whether it is reusable or 
not.

If you have some ideas on how to work around this from the API perspective I'd 
love to hear them :)

> Add the compose method to the Kafka Stream API
> ----------------------------------------------
>
>                 Key: KAFKA-5924
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5924
>             Project: Kafka
>          Issue Type: Wish
>          Components: streams
>            Reporter: Laurent T
>            Priority: Minor
>              Labels: needs-kip
>
> Hi,
> I'm referencing RxJava for it's [compose 
> method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators]
>  which is very handy. It would be great if the Streams API would give us 
> something similar. It's pretty easy to implement and allows to have much more 
> clarity to the code (it avoids breaking the linearity of the code when you 
> want to reuse parts of the stream topology). e.g.
> Without compose:
> {code:java}
> TopologyUtils
>     .myUtil(topology
>         .map(...)
>         .flatMap(...)
>         .through(...))
>     .map(...)
>     .to(...);
> {code}
> With compose:
> {code:java}
> topology
>     .map(...)
>     .flatMap(...)
>     .through(...)
>     .compose(TopologyUtils::myUtil)
>     .map(...)
>     .to(...);
> {code}
> Here is what might look like TopologyUtils::myUtil if i wanted it to just 
> print the values.
> {code:java}
> public <K, V> KStream<K, V> myUtil(KStream<K, V> stream) {
>     return stream.peek((k, v) -> System.out.println(k + ": " + v));
> } 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to