[ 
https://issues.apache.org/jira/browse/KAFKA-20230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon Wittwer updated KAFKA-20230:
------------------------------------
    Description: 
The DSL method globalTable() now lags behind addGlobalStore's capabilities 
After KAFKA-7663 was implemented.

*Proposal:*
Allow for an overload of globalTable(...), that accepts an optional 
KeyValueMapper or Processor to be provided which would override the 
ProcessorSupplier that is currently provided as a TableSource 
[here.|https://github.com/apache/kafka/blob/c4631a222fb054e238af445884fb7bbc9be68970/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L191C1-L193C123]
  I don't have the skill or knowledge of kafka internals well enough to make a 
suggestion for how to do this.  

 

*Justification:*

Prior to KAFKA-7663, the Processor provided in addGlobalStore was not used 
during the restore phase, and instead projected the raw topic into memory, 
leading to failures.  Developers were required to either a separate stream 
processor to populate a source topic to be used by addGlobalStore. Our teams 
tried to be clever, and created a two-phase deploy which prevented the primary 
topology from being created, leaving just a subtopology running to initialize 
this intermediate topic in order to hydrated the global state store up to "now" 
prior to redeploying with the the primary topology enabled.  This is very ugly, 
requiring lag monitoring between deploys, and hard to explain to new developers.

Kafka-7663 made it such that the supplied stateless Processor is reliably used 
not only on initial ingestion, but also on the restore.  This gives the benefit 
of being able to re-key into a global state store for use in process() steps. 

 

  was:
Prior to KAFKA-7663, developers were required to maintain a separate stream 
processor or latent sub-topology to populate a source topic to be used by 
addGlobalStore because the Processor provided in addGlobalStore was not used 
during the restore phase, instead projecting the raw topic into memory.  
Kafka-7663 made it such that the supplied Processor is reliably used not only 
on initial ingestion, but also on the restore.  This gives the benefit of being 
able to re-key into a global state store for use in process() steps. 

The DSL method globalTable() now lags behind addGlobalStore in this respect.

 

Proposal:
Allow for an overload of globalTable(...), that accepts an optional 
KeyValueMapper to be provided which would override the ProcessorSupplier that 
is currently provided as a TableSource 
[here.|https://github.com/apache/kafka/blob/c4631a222fb054e238af445884fb7bbc9be68970/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L191C1-L193C123]
  I don't have the skill or knowledge of kafka internals well enough to make a 
suggestion for how to do this.  

Further Justification:

Our teams have created unfortunate workarounds for this scenario which requires 
either multiple independent stream processors to be created so that the 
mutations required on <K,V> are 


> Allow globalTable to provide an optional KeyValueMapper to materialize an 
> alternative K,V
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20230
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20230
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Brandon Wittwer
>            Priority: Major
>
> The DSL method globalTable() now lags behind addGlobalStore's capabilities 
> After KAFKA-7663 was implemented.
> *Proposal:*
> Allow for an overload of globalTable(...), that accepts an optional 
> KeyValueMapper or Processor to be provided which would override the 
> ProcessorSupplier that is currently provided as a TableSource 
> [here.|https://github.com/apache/kafka/blob/c4631a222fb054e238af445884fb7bbc9be68970/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L191C1-L193C123]
>   I don't have the skill or knowledge of kafka internals well enough to make 
> a suggestion for how to do this.  
>  
> *Justification:*
> Prior to KAFKA-7663, the Processor provided in addGlobalStore was not used 
> during the restore phase, and instead projected the raw topic into memory, 
> leading to failures.  Developers were required to either a separate stream 
> processor to populate a source topic to be used by addGlobalStore. Our teams 
> tried to be clever, and created a two-phase deploy which prevented the 
> primary topology from being created, leaving just a subtopology running to 
> initialize this intermediate topic in order to hydrated the global state 
> store up to "now" prior to redeploying with the the primary topology enabled. 
>  This is very ugly, requiring lag monitoring between deploys, and hard to 
> explain to new developers.
> Kafka-7663 made it such that the supplied stateless Processor is reliably 
> used not only on initial ingestion, but also on the restore.  This gives the 
> benefit of being able to re-key into a global state store for use in 
> process() steps. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to