[ https://issues.apache.org/jira/browse/KAFKA-18026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman updated KAFKA-18026: ------------------------------------------- Description: See KIP-1112: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] Implementation plan/wrapping procedure for DSL operators * make sure processor is added via ProcessorParameters#addProcessorTo instead of calling the InternalTopologyBuilder's #addProcessor and #addStateStore methods directly * convert stateful operators to implement the ProcessorSupplier#stores method, rather than directly calling #addStateStore and/or #connectProcessorAndStateStore * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some existing tests may use processors that haven't been converted yet and are expected to break/need fixing) TODO list: # -Non-source/sink PAPI processors:- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -ProcessorGraphNode (stateless DSL KStream operators):- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] # TableProcessorNode (stateless table operators eg KTable#filter): ## -filter:- [https://github.com/apache/kafka/pull/18205] – rohan ## -mapValues:- – rohan ## transformValues: won't do, will be replaced with processValues anyway # {-}StreamToTableNode{-}: [https://github.com/apache/kafka/pull/18149] – [~agavra] # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – [~ableegoldman] # {-}StreamStreamJoinNode{-}: [https://github.com/apache/kafka/pull/18111] – [~ableegoldman] # -KTableKTableJoinNode:- [https://github.com/apache/kafka/pull/18048] -– [~agavra] # StatefulProcessorNode: ## {-}TableSuppressNode{-}: https://github.com/apache/kafka/pull/18150 – [~agavra] ## {-}ForeignTableJoinNode{-}: [https://github.com/apache/kafka/pull/18194] – [~ableegoldman] ## clean up StatefulProcessorNode and migrate all converted operators to use ProcessorGraphNode instead: [https://github.com/apache/kafka/pull/18195] sophie Follow up: * convert source & sink nodes to using ProcessorSupplier and wrap them too * clean up StoreFactory<->StoreBuilder wrapping and configuration * future-proof the wrapping mechanism: ** ensure new processor implementations get wrapped, eg by protecting the InternalTopologyBuilder#addProcessor (also #addSource/Sink?) ** protect #addStateStore from being called out-of-band to prevent new state stores (whether from new DSL operators or modifications to existing ones) from being added to processors without being returned by the ProcessorSupplier#stores method * consider deprecating older alternative to ProcessorSupplier#stores ** cons: using lambdas for processor suppliers is very nice docs: [https://github.com/apache/kafka/pull/17906] was: See KIP-1112: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] Implementation plan/wrapping procedure for DSL operators * make sure processor is added via ProcessorParameters#addProcessorTo instead of calling the InternalTopologyBuilder's #addProcessor and #addStateStore methods directly * convert stateful operators to implement the ProcessorSupplier#stores method, rather than directly calling #addStateStore and/or #connectProcessorAndStateStore * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some existing tests may use processors that haven't been converted yet and are expected to break/need fixing) TODO list: # -Non-source/sink PAPI processors:- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -ProcessorGraphNode (stateless DSL KStream operators):- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] # TableProcessorNode (stateless table operators eg KTable#filter): – rohan # {-}StreamToTableNode{-}: [https://github.com/apache/kafka/pull/18149] – [~agavra] # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – [~ableegoldman] # StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] – [~ableegoldman] # -KTableKTableJoinNode:- [https://github.com/apache/kafka/pull/18048] -– [~agavra] # StatefulProcessorNode: ## TableSuppressNode: – [~agavra] ## {-}ForeignTableJoinNode{-}: https://github.com/apache/kafka/pull/18194 – [~ableegoldman] ## clean up StatefulProcessorNode and migrate all converted operators to use ProcessorGraphNode instead: [https://github.com/apache/kafka/pull/18195] sophie Follow up: * convert source & sink nodes to using ProcessorSupplier and wrap them too * clean up StoreFactory<->StoreBuilder wrapping and configuration * future-proof the wrapping mechanism: ** ensure new processor implementations get wrapped, eg by protecting the InternalTopologyBuilder#addProcessor (also #addSource/Sink?) ** protect #addStateStore from being called out-of-band to prevent new state stores (whether from new DSL operators or modifications to existing ones) from being added to processors without being returned by the ProcessorSupplier#stores method * consider deprecating older alternative to ProcessorSupplier#stores ** cons: using lambdas for processor suppliers is very nice docs: [https://github.com/apache/kafka/pull/17906] > Allow custom processor wrapping > ------------------------------- > > Key: KAFKA-18026 > URL: https://issues.apache.org/jira/browse/KAFKA-18026 > Project: Kafka > Issue Type: New Feature > Components: kip, streams > Reporter: A. Sophie Blee-Goldman > Assignee: A. Sophie Blee-Goldman > Priority: Major > Labels: kip > Fix For: 4.0.0 > > > See KIP-1112: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] > Implementation plan/wrapping procedure for DSL operators > * make sure processor is added via ProcessorParameters#addProcessorTo > instead of calling the InternalTopologyBuilder's #addProcessor and > #addStateStore methods directly > * convert stateful operators to implement the ProcessorSupplier#stores > method, rather than directly calling #addStateStore and/or > #connectProcessorAndStateStore > * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some > existing tests may use processors that haven't been converted yet and are > expected to break/need fixing) > TODO list: > # -Non-source/sink PAPI processors:- > [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] > -- [~ableegoldman] > # -ProcessorGraphNode (stateless DSL KStream operators):- > [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] > [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] > # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] > # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: > [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] > # TableProcessorNode (stateless table operators eg KTable#filter): > ## -filter:- [https://github.com/apache/kafka/pull/18205] – rohan > ## -mapValues:- – rohan > ## transformValues: won't do, will be replaced with processValues anyway > # {-}StreamToTableNode{-}: [https://github.com/apache/kafka/pull/18149] – > [~agavra] > # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – > [~ableegoldman] > # {-}StreamStreamJoinNode{-}: [https://github.com/apache/kafka/pull/18111] – > [~ableegoldman] > # -KTableKTableJoinNode:- [https://github.com/apache/kafka/pull/18048] -– > [~agavra] > # StatefulProcessorNode: > ## {-}TableSuppressNode{-}: https://github.com/apache/kafka/pull/18150 – > [~agavra] > ## {-}ForeignTableJoinNode{-}: [https://github.com/apache/kafka/pull/18194] > – [~ableegoldman] > ## clean up StatefulProcessorNode and migrate all converted operators to > use ProcessorGraphNode instead: [https://github.com/apache/kafka/pull/18195] > sophie > Follow up: > * convert source & sink nodes to using ProcessorSupplier and wrap them too > * clean up StoreFactory<->StoreBuilder wrapping and configuration > * future-proof the wrapping mechanism: > ** ensure new processor implementations get wrapped, eg by protecting the > InternalTopologyBuilder#addProcessor (also #addSource/Sink?) > ** protect #addStateStore from being called out-of-band to prevent new state > stores (whether from new DSL operators or modifications to existing ones) > from being added to processors without being returned by the > ProcessorSupplier#stores method > * consider deprecating older alternative to ProcessorSupplier#stores > ** cons: using lambdas for processor suppliers is very nice > docs: [https://github.com/apache/kafka/pull/17906] -- This message was sent by Atlassian Jira (v8.20.10#820010)