[ 
https://issues.apache.org/jira/browse/KAFKA-7663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7663:
-----------------------------------
    Description: 
I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
processor responsible to transform a K,V record from the input stream into a 
V,K records. It works fine and my {{store.all()}} does print the correct 
persisted V,K records. However, if I clean the local store and restart the 
stream app, the global table is reloaded but without going through the 
processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
which simply stores the input topic K,V records into rocksDB (hence bypassing 
the mapping function of my custom processor). I believe this must not be the 
expected result?

 This is a follow up on stackoverflow discussion around storing a K,V topic as 
a global table with some stateless transformations based on a "custom" 
processor added on the global store:

[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]

If we address this issue, we should also apply 
`default.deserialization.exception.handler` during restore (cf. KAFKA-8037)

 

  was:
I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
processor responsible to transform a K,V record from the input stream into a 
V,K records. It works fine and my {{store.all()}} does print the correct 
persisted V,K records. However, if I clean the local store and restart the 
stream app, the global table is reloaded but without going through the 
processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
which simply stores the input topic K,V records into rocksDB (hence bypassing 
the mapping function of my custom processor). I believe this must not be the 
expected result?

 This is a follow up on stackoverflow discussion around storing a K,V topic as 
a global table with some stateless transformations based on a "custom" 
processor added on the global store:

[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]

If we address this issue, we should also apply 
`default.deserialization.exception.handler` during restore.

 


> Custom Processor supplied on addGlobalStore is not used when restoring state 
> from topic
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7663
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7663
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Frederic Tardif
>            Priority: Major
>         Attachments: image-2018-11-20-11-42-14-697.png
>
>
> I have implemented a StreamBuilder#{{addGlobalStore}} supplying a custom 
> processor responsible to transform a K,V record from the input stream into a 
> V,K records. It works fine and my {{store.all()}} does print the correct 
> persisted V,K records. However, if I clean the local store and restart the 
> stream app, the global table is reloaded but without going through the 
> processor supplied; instead, it calls {{GlobalStateManagerImp#restoreState}} 
> which simply stores the input topic K,V records into rocksDB (hence bypassing 
> the mapping function of my custom processor). I believe this must not be the 
> expected result?
>  This is a follow up on stackoverflow discussion around storing a K,V topic 
> as a global table with some stateless transformations based on a "custom" 
> processor added on the global store:
> [https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic#comment93591818_50993729]
> If we address this issue, we should also apply 
> `default.deserialization.exception.handler` during restore (cf. KAFKA-8037)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to