[ https://issues.apache.org/jira/browse/KAFKA-18026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman updated KAFKA-18026: ------------------------------------------- Description: See KIP-1112: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] Implementation plan/wrapping procedure for DSL operators * make sure processor is added via ProcessorParameters#addProcessorTo instead of calling the InternalTopologyBuilder's #addProcessor and #addStateStore methods directly * convert stateful operators to implement the ProcessorSupplier#stores method, rather than directly calling #addStateStore and/or #connectProcessorAndStateStore * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some existing tests may use processors that haven't been converted yet and are expected to break/need fixing) TODO list: # -Non-source/sink PAPI processors:- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -ProcessorGraphNode (stateless DSL KStream operators):- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] # TableProcessorNode (stateless table operators eg KTable#filter): # StreamToTableNode: # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – [~ableegoldman] # StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] -- [~ableegoldman] # KTableKTableJoinNode: [https://github.com/apache/kafka/pull/18048] -– [~agavra] # Source/sink nodes (PAPI/DSL): # StatefulProcessorNode (and children!!) <-- do this one last (should cover children of KTableProcessorSupplier?) Follow up: * clean up StoreFactory<->StoreBuilder wrapping and configuration * future-proof the wrapping mechanism: ** ensure new processor implementations get wrapped, eg by protecting the InternalTopologyBuilder#addProcessor (also #addSource/Sink?) ** protect #addStateStore from being called out-of-band to prevent new state stores (whether from new DSL operators or modifications to existing ones) from being added to processors without being returned by the ProcessorSupplier#stores method * consider deprecating older alternative to ProcessorSupplier#stores ** cons: using lambdas for processor suppliers is very nice docs: [https://github.com/apache/kafka/pull/17906] was: See KIP-1112: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] Implementation plan/wrapping procedure for DSL operators * make sure processor is added via ProcessorParameters#addProcessorTo instead of calling the InternalTopologyBuilder's #addProcessor and #addStateStore methods directly * convert stateful operators to implement the ProcessorSupplier#stores method, rather than directly calling #addStateStore and/or #connectProcessorAndStateStore * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some existing tests may use processors that haven't been converted yet and are expected to break/need fixing) TODO list: # -Non-source/sink PAPI processors:- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -ProcessorGraphNode (stateless DSL KStream operators):- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] # TableProcessorNode (stateless table operators eg KTable#filter): # StreamToTableNode: # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – [~ableegoldman] # StreamStreamJoinNode: -- [~ableegoldman] # KTableKTableJoinNode: – [~agavra] # Source/sink nodes (PAPI/DSL): # StatefulProcessorNode (and children!!) <-- do this one last (should cover children of KTableProcessorSupplier?) Follow up: * clean up StoreFactory<->StoreBuilder wrapping and configuration * future-proof the wrapping mechanism: ** ensure new processor implementations get wrapped, eg by protecting the InternalTopologyBuilder#addProcessor (also #addSource/Sink?) ** protect #addStateStore from being called out-of-band to prevent new state stores (whether from new DSL operators or modifications to existing ones) from being added to processors without being returned by the ProcessorSupplier#stores method * consider deprecating older alternative to ProcessorSupplier#stores ** cons: using lambdas for processor suppliers is very nice docs: [https://github.com/apache/kafka/pull/17906] > Allow custom processor wrapping > ------------------------------- > > Key: KAFKA-18026 > URL: https://issues.apache.org/jira/browse/KAFKA-18026 > Project: Kafka > Issue Type: New Feature > Components: kip, streams > Reporter: A. Sophie Blee-Goldman > Assignee: A. Sophie Blee-Goldman > Priority: Major > Labels: kip > Fix For: 4.0.0 > > > See KIP-1112: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] > Implementation plan/wrapping procedure for DSL operators > * make sure processor is added via ProcessorParameters#addProcessorTo > instead of calling the InternalTopologyBuilder's #addProcessor and > #addStateStore methods directly > * convert stateful operators to implement the ProcessorSupplier#stores > method, rather than directly calling #addStateStore and/or > #connectProcessorAndStateStore > * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some > existing tests may use processors that haven't been converted yet and are > expected to break/need fixing) > TODO list: > # -Non-source/sink PAPI processors:- > [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] > -- [~ableegoldman] > # -ProcessorGraphNode (stateless DSL KStream operators):- > [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] > [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] > # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] > # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: > [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] > # TableProcessorNode (stateless table operators eg KTable#filter): > # StreamToTableNode: > # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – > [~ableegoldman] > # StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] -- > [~ableegoldman] > # KTableKTableJoinNode: [https://github.com/apache/kafka/pull/18048] -– > [~agavra] > # Source/sink nodes (PAPI/DSL): > # StatefulProcessorNode (and children!!) <-- do this one last (should cover > children of KTableProcessorSupplier?) > Follow up: > * clean up StoreFactory<->StoreBuilder wrapping and configuration > * future-proof the wrapping mechanism: > ** ensure new processor implementations get wrapped, eg by protecting the > InternalTopologyBuilder#addProcessor (also #addSource/Sink?) > ** protect #addStateStore from being called out-of-band to prevent new state > stores (whether from new DSL operators or modifications to existing ones) > from being added to processors without being returned by the > ProcessorSupplier#stores method > * consider deprecating older alternative to ProcessorSupplier#stores > ** cons: using lambdas for processor suppliers is very nice > docs: [https://github.com/apache/kafka/pull/17906] -- This message was sent by Atlassian Jira (v8.20.10#820010)