[
https://issues.apache.org/jira/browse/KAFKA-18026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
A. Sophie Blee-Goldman updated KAFKA-18026:
-------------------------------------------
Description:
See KIP-1112:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
Implementation plan/wrapping procedure for DSL operators
* make sure processor is added via ProcessorParameters#addProcessorTo instead
of calling the InternalTopologyBuilder's #addProcessor and #addStateStore
methods directly
* convert stateful operators to implement the ProcessorSupplier#stores method,
rather than directly calling #addStateStore and/or
#connectProcessorAndStateStore
TODO list:
# -Non-source/sink PAPI processors:
([https://github.com/apache/kafka/pull/17892)]-
# -ProcessorGraphNode (stateless DSL KStream operators):
([https://github.com/apache/kafka/pull/17892)]-
# -KTableSource ([https://github.com/apache/kafka/pull/17903])-
# KStream/TableAggregate (ie count, reduce, aggregate):
https://github.com/apache/kafka/pull/17929
# TableProcessorNode (stateless table operators eg KTable#filter):
# StreamToTableNode:
# Stream-Table join:
# StreamStreamJoinNode:
# KTableKTableJoinNode:
# Source/sink nodes (PAPI/DSL):
# StatefulProcessorNode <-- do this one last
Follow up:
* clean up StoreFactory<->StoreBuilder wrapping and configuration
* future-proof the wrapping mechanism:
** ensure new processor implementations get wrapped, eg by protecting the
InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
** protect #addStateStore from being called out-of-band to prevent new state
stores (whether from new DSL operators or modifcations to existing ones) from
being added to processors without being returned by the
ProcessorSupplier#stores method
* open question: should we go ahead and deprecate the old method of connecting
state stores entirely?
docs: https://github.com/apache/kafka/pull/17906
was:
See KIP-1112:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
To wrap:
# PAPI processors (DONE: )
# Source & sink processors
# Stateless DSL operators
To wrap & convert to ProcessorSupplier#stores:
# TableProcessorNode (https://github.com/apache/kafka/pull/17903)
# StreamToTableNode
# StatefulProcessorNode
# KTableKTableJoinNode
> Allow custom processor wrapping
> -------------------------------
>
> Key: KAFKA-18026
> URL: https://issues.apache.org/jira/browse/KAFKA-18026
> Project: Kafka
> Issue Type: New Feature
> Components: kip, streams
> Reporter: A. Sophie Blee-Goldman
> Assignee: A. Sophie Blee-Goldman
> Priority: Major
> Labels: kip
> Fix For: 4.0.0
>
>
> See KIP-1112:
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1112%3A+allow+custom+processor+wrapping]
> Implementation plan/wrapping procedure for DSL operators
> * make sure processor is added via ProcessorParameters#addProcessorTo
> instead of calling the InternalTopologyBuilder's #addProcessor and
> #addStateStore methods directly
> * convert stateful operators to implement the ProcessorSupplier#stores
> method, rather than directly calling #addStateStore and/or
> #connectProcessorAndStateStore
> TODO list:
> # -Non-source/sink PAPI processors:
> ([https://github.com/apache/kafka/pull/17892)]-
> # -ProcessorGraphNode (stateless DSL KStream operators):
> ([https://github.com/apache/kafka/pull/17892)]-
> # -KTableSource ([https://github.com/apache/kafka/pull/17903])-
> # KStream/TableAggregate (ie count, reduce, aggregate):
> https://github.com/apache/kafka/pull/17929
> # TableProcessorNode (stateless table operators eg KTable#filter):
> # StreamToTableNode:
> # Stream-Table join:
> # StreamStreamJoinNode:
> # KTableKTableJoinNode:
> # Source/sink nodes (PAPI/DSL):
> # StatefulProcessorNode <-- do this one last
> Follow up:
> * clean up StoreFactory<->StoreBuilder wrapping and configuration
> * future-proof the wrapping mechanism:
> ** ensure new processor implementations get wrapped, eg by protecting the
> InternalTopologyBuilder#addProcessor (also #addSource/Sink?)
> ** protect #addStateStore from being called out-of-band to prevent new state
> stores (whether from new DSL operators or modifcations to existing ones) from
> being added to processors without being returned by the
> ProcessorSupplier#stores method
> * open question: should we go ahead and deprecate the old method of
> connecting state stores entirely?
> docs: https://github.com/apache/kafka/pull/17906
--
This message was sent by Atlassian Jira
(v8.20.10#820010)