[ https://issues.apache.org/jira/browse/KAFKA-18026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
A. Sophie Blee-Goldman updated KAFKA-18026: ------------------------------------------- Description: See KIP-1112: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] Implementation plan/wrapping procedure for DSL operators * make sure processor is added via ProcessorParameters#addProcessorTo instead of calling the InternalTopologyBuilder's #addProcessor and #addStateStore methods directly * convert stateful operators to implement the ProcessorSupplier#stores method, rather than directly calling #addStateStore and/or #connectProcessorAndStateStore * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some existing tests may use processors that haven't been converted yet and are expected to break/need fixing) TODO list: # -Non-source/sink PAPI processors:- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -ProcessorGraphNode (stateless DSL KStream operators):- [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] # KStream/TableAggregate (ie count, reduce, aggregate): [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] # TableProcessorNode (stateless table operators eg KTable#filter): # StreamToTableNode: # Stream-Table join: -- [~ableegoldman] # StreamStreamJoinNode: -- [~ableegoldman] # KTableKTableJoinNode: # Source/sink nodes (PAPI/DSL): # StatefulProcessorNode <-- do this one last Follow up: * clean up StoreFactory<->StoreBuilder wrapping and configuration * future-proof the wrapping mechanism: ** ensure new processor implementations get wrapped, eg by protecting the InternalTopologyBuilder#addProcessor (also #addSource/Sink?) ** protect #addStateStore from being called out-of-band to prevent new state stores (whether from new DSL operators or modifcations to existing ones) from being added to processors without being returned by the ProcessorSupplier#stores method * open question: should we go ahead and deprecate the old method of connecting state stores entirely? docs: [https://github.com/apache/kafka/pull/17906] was: See KIP-1112: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] Implementation plan/wrapping procedure for DSL operators * make sure processor is added via ProcessorParameters#addProcessorTo instead of calling the InternalTopologyBuilder's #addProcessor and #addStateStore methods directly * convert stateful operators to implement the ProcessorSupplier#stores method, rather than directly calling #addStateStore and/or #connectProcessorAndStateStore TODO list: # -Non-source/sink PAPI processors: [https://github.com/apache/kafka/pull/17892 |https://github.com/apache/kafka/pull/17892)] [~ableegoldman] [|https://github.com/apache/kafka/pull/17892)]- # -ProcessorGraphNode (stateless DSL KStream operators): [https://github.com/apache/kafka/pull/17892 |https://github.com/apache/kafka/pull/17892)] [~ableegoldman] [|https://github.com/apache/kafka/pull/17892)]- # -KTableSource [https://github.com/apache/kafka/pull/17903 |https://github.com/apache/kafka/pull/17903] [~agavra] [|https://github.com/apache/kafka/pull/17903]- # KStream/TableAggregate (ie count, reduce, aggregate): [https://github.com/apache/kafka/pull/17929 |https://github.com/apache/kafka/pull/17929] [~ableegoldman] [ |https://github.com/apache/kafka/pull/17929] # TableProcessorNode (stateless table operators eg KTable#filter): # StreamToTableNode: # Stream-Table join: # StreamStreamJoinNode: # KTableKTableJoinNode: # Source/sink nodes (PAPI/DSL): # StatefulProcessorNode <-- do this one last Follow up: * clean up StoreFactory<->StoreBuilder wrapping and configuration * future-proof the wrapping mechanism: ** ensure new processor implementations get wrapped, eg by protecting the InternalTopologyBuilder#addProcessor (also #addSource/Sink?) ** protect #addStateStore from being called out-of-band to prevent new state stores (whether from new DSL operators or modifcations to existing ones) from being added to processors without being returned by the ProcessorSupplier#stores method * open question: should we go ahead and deprecate the old method of connecting state stores entirely? docs: [https://github.com/apache/kafka/pull/17906] > Allow custom processor wrapping > ------------------------------- > > Key: KAFKA-18026 > URL: https://issues.apache.org/jira/browse/KAFKA-18026 > Project: Kafka > Issue Type: New Feature > Components: kip, streams > Reporter: A. Sophie Blee-Goldman > Assignee: A. Sophie Blee-Goldman > Priority: Major > Labels: kip > Fix For: 4.0.0 > > > See KIP-1112: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping] > Implementation plan/wrapping procedure for DSL operators > * make sure processor is added via ProcessorParameters#addProcessorTo > instead of calling the InternalTopologyBuilder's #addProcessor and > #addStateStore methods directly > * convert stateful operators to implement the ProcessorSupplier#stores > method, rather than directly calling #addStateStore and/or > #connectProcessorAndStateStore > * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some > existing tests may use processors that haven't been converted yet and are > expected to break/need fixing) > TODO list: > # -Non-source/sink PAPI processors:- > [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] > -- [~ableegoldman] > # -ProcessorGraphNode (stateless DSL KStream operators):- > [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)] > [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman] > # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] > # KStream/TableAggregate (ie count, reduce, aggregate): > [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] > # TableProcessorNode (stateless table operators eg KTable#filter): > # StreamToTableNode: > # Stream-Table join: -- [~ableegoldman] > # StreamStreamJoinNode: -- [~ableegoldman] > # KTableKTableJoinNode: > # Source/sink nodes (PAPI/DSL): > # StatefulProcessorNode <-- do this one last > Follow up: > * clean up StoreFactory<->StoreBuilder wrapping and configuration > * future-proof the wrapping mechanism: > ** ensure new processor implementations get wrapped, eg by protecting the > InternalTopologyBuilder#addProcessor (also #addSource/Sink?) > ** protect #addStateStore from being called out-of-band to prevent new state > stores (whether from new DSL operators or modifcations to existing ones) from > being added to processors without being returned by the > ProcessorSupplier#stores method > * open question: should we go ahead and deprecate the old method of > connecting state stores entirely? > docs: [https://github.com/apache/kafka/pull/17906] -- This message was sent by Atlassian Jira (v8.20.10#820010)