[ 
https://issues.apache.org/jira/browse/KAFKA-18026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]
A. Sophie Blee-Goldman updated KAFKA-18026:
-------------------------------------------
    Description: 
See KIP-1112: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]

Implementation plan/wrapping procedure for DSL operators
 * make sure processor is added via ProcessorParameters#addProcessorTo instead 
of calling the  InternalTopologyBuilder's #addProcessor and #addStateStore 
methods directly 
 * convert stateful operators to implement the ProcessorSupplier#stores method, 
rather than directly calling #addStateStore and/or 
#connectProcessorAndStateStore
 * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some 
existing tests may use processors that haven't been converted yet and are 
expected to break/need fixing)  

TODO list:
 # -Non-source/sink PAPI processors:- 
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
 --  [~ableegoldman]
 # -ProcessorGraphNode (stateless DSL KStream operators):- 
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
 [ |https://github.com/apache/kafka/pull/17892)] --  [~ableegoldman]
 # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] 
 # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: 
[https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] 
 # TableProcessorNode (stateless table operators eg KTable#filter): 
 # StreamToTableNode:
 # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – 
[~ableegoldman] 
 # StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] -- 
[~ableegoldman] 
 # KTableKTableJoinNode: [https://github.com/apache/kafka/pull/18048] -– 
[~agavra] 
 # Source/sink nodes (PAPI/DSL):
 # StatefulProcessorNode (and children!!) <-- do this one last (should cover 
children of KTableProcessorSupplier?)

Follow up:
 * clean up StoreFactory<->StoreBuilder wrapping and configuration
 * future-proof the wrapping mechanism:
 ** ensure new processor implementations get wrapped, eg by protecting the 
InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
 ** protect #addStateStore from being called out-of-band to prevent new state 
stores (whether from new DSL operators or modifications to existing ones) from 
being added to processors without being returned by the 
ProcessorSupplier#stores method 
 * consider deprecating older alternative to ProcessorSupplier#stores
 ** cons: using lambdas for processor suppliers is very nice

docs: [https://github.com/apache/kafka/pull/17906]

  was:
See KIP-1112: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]

Implementation plan/wrapping procedure for DSL operators
 * make sure processor is added via ProcessorParameters#addProcessorTo instead 
of calling the  InternalTopologyBuilder's #addProcessor and #addStateStore 
methods directly 
 * convert stateful operators to implement the ProcessorSupplier#stores method, 
rather than directly calling #addStateStore and/or 
#connectProcessorAndStateStore
 * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some 
existing tests may use processors that haven't been converted yet and are 
expected to break/need fixing)  

TODO list:
 # -Non-source/sink PAPI processors:- 
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
 --  [~ableegoldman]
 # -ProcessorGraphNode (stateless DSL KStream operators):- 
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
 [ |https://github.com/apache/kafka/pull/17892)] --  [~ableegoldman]
 # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] 
 # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: 
[https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] 
 # TableProcessorNode (stateless table operators eg KTable#filter): 
 # StreamToTableNode:
 # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – 
[~ableegoldman] 
 # StreamStreamJoinNode: -- [~ableegoldman] 
 # KTableKTableJoinNode: – [~agavra] 
 # Source/sink nodes (PAPI/DSL):
 # StatefulProcessorNode (and children!!) <-- do this one last (should cover 
children of KTableProcessorSupplier?)

Follow up:
 * clean up StoreFactory<->StoreBuilder wrapping and configuration
 * future-proof the wrapping mechanism:
 ** ensure new processor implementations get wrapped, eg by protecting the 
InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
 ** protect #addStateStore from being called out-of-band to prevent new state 
stores (whether from new DSL operators or modifications to existing ones) from 
being added to processors without being returned by the 
ProcessorSupplier#stores method 
 * consider deprecating older alternative to ProcessorSupplier#stores
 ** cons: using lambdas for processor suppliers is very nice

docs: [https://github.com/apache/kafka/pull/17906]


> Allow custom processor wrapping
> -------------------------------
>
>                 Key: KAFKA-18026
>                 URL: https://issues.apache.org/jira/browse/KAFKA-18026
>             Project: Kafka
>          Issue Type: New Feature
>          Components: kip, streams
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Major
>              Labels: kip
>             Fix For: 4.0.0
>
>
> See KIP-1112: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
> Implementation plan/wrapping procedure for DSL operators
>  * make sure processor is added via ProcessorParameters#addProcessorTo 
> instead of calling the  InternalTopologyBuilder's #addProcessor and 
> #addStateStore methods directly 
>  * convert stateful operators to implement the ProcessorSupplier#stores 
> method, rather than directly calling #addStateStore and/or 
> #connectProcessorAndStateStore
>  * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some 
> existing tests may use processors that haven't been converted yet and are 
> expected to break/need fixing)  
> TODO list:
>  # -Non-source/sink PAPI processors:- 
> [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
>  --  [~ableegoldman]
>  # -ProcessorGraphNode (stateless DSL KStream operators):- 
> [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
>  [ |https://github.com/apache/kafka/pull/17892)] --  [~ableegoldman]
>  # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra] 
>  # {-}KStream/TableAggregate (ie count, reduce, aggregate){-}: 
> [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman] 
>  # TableProcessorNode (stateless table operators eg KTable#filter): 
>  # StreamToTableNode:
>  # -Stream-Table join:- [https://github.com/apache/kafka/pull/18047] – 
> [~ableegoldman] 
>  # StreamStreamJoinNode: [https://github.com/apache/kafka/pull/18111] -- 
> [~ableegoldman] 
>  # KTableKTableJoinNode: [https://github.com/apache/kafka/pull/18048] -– 
> [~agavra] 
>  # Source/sink nodes (PAPI/DSL):
>  # StatefulProcessorNode (and children!!) <-- do this one last (should cover 
> children of KTableProcessorSupplier?)
> Follow up:
>  * clean up StoreFactory<->StoreBuilder wrapping and configuration
>  * future-proof the wrapping mechanism:
>  ** ensure new processor implementations get wrapped, eg by protecting the 
> InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
>  ** protect #addStateStore from being called out-of-band to prevent new state 
> stores (whether from new DSL operators or modifications to existing ones) 
> from being added to processors without being returned by the 
> ProcessorSupplier#stores method 
>  * consider deprecating older alternative to ProcessorSupplier#stores
>  ** cons: using lambdas for processor suppliers is very nice
> docs: [https://github.com/apache/kafka/pull/17906]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to