[
https://issues.apache.org/jira/browse/KAFKA-18026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
A. Sophie Blee-Goldman updated KAFKA-18026:
-------------------------------------------
Description:
See KIP-1112:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
Implementation plan/wrapping procedure for DSL operators
* make sure processor is added via ProcessorParameters#addProcessorTo instead
of calling the InternalTopologyBuilder's #addProcessor and #addStateStore
methods directly
* convert stateful operators to implement the ProcessorSupplier#stores method,
rather than directly calling #addStateStore and/or
#connectProcessorAndStateStore
* update and/or add ProcessorWrapper tests to StreamsBuilderTest (some
existing tests may use processors that haven't been converted yet and are
expected to break/need fixing)
TODO list:
# -Non-source/sink PAPI processors:-
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
-- [~ableegoldman]
# -ProcessorGraphNode (stateless DSL KStream operators):-
[https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
[ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman]
# -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra]
# KStream/TableAggregate (ie count, reduce, aggregate):
[https://github.com/apache/kafka/pull/17929] -- [~ableegoldman]
# TableProcessorNode (stateless table operators eg KTable#filter):
# StreamToTableNode:
# Stream-Table join: -- [~ableegoldman]
# StreamStreamJoinNode: -- [~ableegoldman]
# KTableKTableJoinNode:
# Source/sink nodes (PAPI/DSL):
# StatefulProcessorNode <-- do this one last
Follow up:
* clean up StoreFactory<->StoreBuilder wrapping and configuration
* future-proof the wrapping mechanism:
** ensure new processor implementations get wrapped, eg by protecting the
InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
** protect #addStateStore from being called out-of-band to prevent new state
stores (whether from new DSL operators or modifcations to existing ones) from
being added to processors without being returned by the
ProcessorSupplier#stores method
* open question: should we go ahead and deprecate the old method of connecting
state stores entirely?
docs: [https://github.com/apache/kafka/pull/17906]
was:
See KIP-1112:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
Implementation plan/wrapping procedure for DSL operators
* make sure processor is added via ProcessorParameters#addProcessorTo instead
of calling the InternalTopologyBuilder's #addProcessor and #addStateStore
methods directly
* convert stateful operators to implement the ProcessorSupplier#stores method,
rather than directly calling #addStateStore and/or
#connectProcessorAndStateStore
TODO list:
# -Non-source/sink PAPI processors:
[https://github.com/apache/kafka/pull/17892
|https://github.com/apache/kafka/pull/17892)] [~ableegoldman]
[|https://github.com/apache/kafka/pull/17892)]-
# -ProcessorGraphNode (stateless DSL KStream operators):
[https://github.com/apache/kafka/pull/17892
|https://github.com/apache/kafka/pull/17892)] [~ableegoldman]
[|https://github.com/apache/kafka/pull/17892)]-
# -KTableSource [https://github.com/apache/kafka/pull/17903
|https://github.com/apache/kafka/pull/17903] [~agavra]
[|https://github.com/apache/kafka/pull/17903]-
# KStream/TableAggregate (ie count, reduce, aggregate):
[https://github.com/apache/kafka/pull/17929
|https://github.com/apache/kafka/pull/17929] [~ableegoldman] [
|https://github.com/apache/kafka/pull/17929]
# TableProcessorNode (stateless table operators eg KTable#filter):
# StreamToTableNode:
# Stream-Table join:
# StreamStreamJoinNode:
# KTableKTableJoinNode:
# Source/sink nodes (PAPI/DSL):
# StatefulProcessorNode <-- do this one last
Follow up:
* clean up StoreFactory<->StoreBuilder wrapping and configuration
* future-proof the wrapping mechanism:
** ensure new processor implementations get wrapped, eg by protecting the
InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
** protect #addStateStore from being called out-of-band to prevent new state
stores (whether from new DSL operators or modifcations to existing ones) from
being added to processors without being returned by the
ProcessorSupplier#stores method
* open question: should we go ahead and deprecate the old method of connecting
state stores entirely?
docs: [https://github.com/apache/kafka/pull/17906]
> Allow custom processor wrapping
> -------------------------------
>
> Key: KAFKA-18026
> URL: https://issues.apache.org/jira/browse/KAFKA-18026
> Project: Kafka
> Issue Type: New Feature
> Components: kip, streams
> Reporter: A. Sophie Blee-Goldman
> Assignee: A. Sophie Blee-Goldman
> Priority: Major
> Labels: kip
> Fix For: 4.0.0
>
>
> See KIP-1112:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
> Implementation plan/wrapping procedure for DSL operators
> * make sure processor is added via ProcessorParameters#addProcessorTo
> instead of calling the InternalTopologyBuilder's #addProcessor and
> #addStateStore methods directly
> * convert stateful operators to implement the ProcessorSupplier#stores
> method, rather than directly calling #addStateStore and/or
> #connectProcessorAndStateStore
> * update and/or add ProcessorWrapper tests to StreamsBuilderTest (some
> existing tests may use processors that haven't been converted yet and are
> expected to break/need fixing)
> TODO list:
> # -Non-source/sink PAPI processors:-
> [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
> -- [~ableegoldman]
> # -ProcessorGraphNode (stateless DSL KStream operators):-
> [https://github.com/apache/kafka/pull/17892|https://github.com/apache/kafka/pull/17892)]
> [ |https://github.com/apache/kafka/pull/17892)] -- [~ableegoldman]
> # -KTableSource:- [https://github.com/apache/kafka/pull/17903] -- [~agavra]
> # KStream/TableAggregate (ie count, reduce, aggregate):
> [https://github.com/apache/kafka/pull/17929] -- [~ableegoldman]
> # TableProcessorNode (stateless table operators eg KTable#filter):
> # StreamToTableNode:
> # Stream-Table join: -- [~ableegoldman]
> # StreamStreamJoinNode: -- [~ableegoldman]
> # KTableKTableJoinNode:
> # Source/sink nodes (PAPI/DSL):
> # StatefulProcessorNode <-- do this one last
> Follow up:
> * clean up StoreFactory<->StoreBuilder wrapping and configuration
> * future-proof the wrapping mechanism:
> ** ensure new processor implementations get wrapped, eg by protecting the
> InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
> ** protect #addStateStore from being called out-of-band to prevent new state
> stores (whether from new DSL operators or modifcations to existing ones) from
> being added to processors without being returned by the
> ProcessorSupplier#stores method
> * open question: should we go ahead and deprecate the old method of
> connecting state stores entirely?
> docs: [https://github.com/apache/kafka/pull/17906]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)