[ https://issues.apache.org/jira/browse/KAFKA-5924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503954#comment-16503954 ]
Guozhang Wang commented on KAFKA-5924: -------------------------------------- We have a {{transform()}} in the StreamsBuilder that allows one to add a customized processor and send the transformed records to a down stream. Is it sufficient for your use case? > Add the compose method to the Kafka Stream API > ---------------------------------------------- > > Key: KAFKA-5924 > URL: https://issues.apache.org/jira/browse/KAFKA-5924 > Project: Kafka > Issue Type: Wish > Components: streams > Reporter: Laurent T > Priority: Minor > Labels: needs-kip > > Hi, > I'm referencing RxJava for it's [compose > method|https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators#transformational-operators] > which is very handy. It would be great if the Streams API would give us > something similar. It's pretty easy to implement and allows to have much more > clarity to the code (it avoids breaking the linearity of the code when you > want to reuse parts of the stream topology). e.g. > Without compose: > {code:java} > TopologyUtils > .myUtil(topology > .map(...) > .flatMap(...) > .through(...)) > .map(...) > .to(...); > {code} > With compose: > {code:java} > topology > .map(...) > .flatMap(...) > .through(...) > .compose(TopologyUtils::myUtil) > .map(...) > .to(...); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)