[
https://issues.apache.org/jira/browse/KAFKA-20230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brandon Wittwer updated KAFKA-20230:
------------------------------------
Description:
The DSL method globalTable() now lags behind addGlobalStore's capabilities
After KAFKA-7663 was implemented.
*Proposal:*
Allow for an overload of globalTable(...), that accepts an optional
KeyValueMapper or Processor to be provided which would override the
ProcessorSupplier that is currently provided as a TableSource
[here.|https://github.com/apache/kafka/blob/c4631a222fb054e238af445884fb7bbc9be68970/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L191C1-L193C123]
I don't have the skill or knowledge of kafka internals well enough to make a
suggestion for how to do this.
*Justification:*
Prior to KAFKA-7663, the Processor provided in addGlobalStore was not used
during the restore phase, and instead projected the raw topic into memory,
leading to failures. Developers were required to either a separate stream
processor to populate a source topic to be used by addGlobalStore. Our teams
tried to be clever, and created a two-phase deploy which prevented the primary
topology from being created, leaving just a subtopology running to initialize
this intermediate topic in order to hydrated the global state store up to "now"
prior to redeploying with the the primary topology enabled. This is very ugly,
requiring lag monitoring between deploys, and hard to explain to new developers.
Kafka-7663 made it such that the supplied stateless Processor is reliably used
not only on initial ingestion, but also on the restore. This gives the benefit
of being able to re-key into a global state store for use in process() steps.
was:
Prior to KAFKA-7663, developers were required to maintain a separate stream
processor or latent sub-topology to populate a source topic to be used by
addGlobalStore because the Processor provided in addGlobalStore was not used
during the restore phase, instead projecting the raw topic into memory.
Kafka-7663 made it such that the supplied Processor is reliably used not only
on initial ingestion, but also on the restore. This gives the benefit of being
able to re-key into a global state store for use in process() steps.
The DSL method globalTable() now lags behind addGlobalStore in this respect.
Proposal:
Allow for an overload of globalTable(...), that accepts an optional
KeyValueMapper to be provided which would override the ProcessorSupplier that
is currently provided as a TableSource
[here.|https://github.com/apache/kafka/blob/c4631a222fb054e238af445884fb7bbc9be68970/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L191C1-L193C123]
I don't have the skill or knowledge of kafka internals well enough to make a
suggestion for how to do this.
Further Justification:
Our teams have created unfortunate workarounds for this scenario which requires
either multiple independent stream processors to be created so that the
mutations required on <K,V> are
> Allow globalTable to provide an optional KeyValueMapper to materialize an
> alternative K,V
> -----------------------------------------------------------------------------------------
>
> Key: KAFKA-20230
> URL: https://issues.apache.org/jira/browse/KAFKA-20230
> Project: Kafka
> Issue Type: Improvement
> Reporter: Brandon Wittwer
> Priority: Major
>
> The DSL method globalTable() now lags behind addGlobalStore's capabilities
> After KAFKA-7663 was implemented.
> *Proposal:*
> Allow for an overload of globalTable(...), that accepts an optional
> KeyValueMapper or Processor to be provided which would override the
> ProcessorSupplier that is currently provided as a TableSource
> [here.|https://github.com/apache/kafka/blob/c4631a222fb054e238af445884fb7bbc9be68970/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L191C1-L193C123]
> I don't have the skill or knowledge of kafka internals well enough to make
> a suggestion for how to do this.
>
> *Justification:*
> Prior to KAFKA-7663, the Processor provided in addGlobalStore was not used
> during the restore phase, and instead projected the raw topic into memory,
> leading to failures. Developers were required to either a separate stream
> processor to populate a source topic to be used by addGlobalStore. Our teams
> tried to be clever, and created a two-phase deploy which prevented the
> primary topology from being created, leaving just a subtopology running to
> initialize this intermediate topic in order to hydrated the global state
> store up to "now" prior to redeploying with the the primary topology enabled.
> This is very ugly, requiring lag monitoring between deploys, and hard to
> explain to new developers.
> Kafka-7663 made it such that the supplied stateless Processor is reliably
> used not only on initial ingestion, but also on the restore. This gives the
> benefit of being able to re-key into a global state store for use in
> process() steps.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)