[ 
https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503954#comment-16503954
 ] 

Guozhang Wang commented on KAFKA-5924:
--------------------------------------

We have a {{transform()}} in the StreamsBuilder that allows one to add a 
customized processor and send the transformed records to a down stream. Is it 
sufficient for your use case?

> Add the compose method to the Kafka Stream API
> ----------------------------------------------
>
>                 Key: KAFKA-5924
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5924
>             Project: Kafka
>          Issue Type: Wish
>          Components: streams
>            Reporter: Laurent T
>            Priority: Minor
>              Labels: needs-kip
>
> Hi,
> I'm referencing RxJava for it's [compose 
> method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators]
>  which is very handy. It would be great if the Streams API would give us 
> something similar. It's pretty easy to implement and allows to have much more 
> clarity to the code (it avoids breaking the linearity of the code when you 
> want to reuse parts of the stream topology). e.g.
> Without compose:
> {code:java}
> TopologyUtils
>     .myUtil(topology
>         .map(...)
>         .flatMap(...)
>         .through(...))
>     .map(...)
>     .to(...);
> {code}
> With compose:
> {code:java}
> topology
>     .map(...)
>     .flatMap(...)
>     .through(...)
>     .compose(TopologyUtils::myUtil)
>     .map(...)
>     .to(...);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to