[
https://issues.apache.org/jira/browse/KAFKA-7397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Frederic Tardif updated KAFKA-7397:
-----------------------------------
Description:
When consuming a globalKTable (with the expectation of caching all the data of
a topic in a consumer store), we can't apply any stateless transformation
(filter, map), prior to materializing. To achieve this, while ensuring to
consume the records of all the partitions, we must first run a stream app that
pre-process the ingress topic into an exact K,V egress topic as we want to
store in our GlobalKTable. This looks unnecessarily complex, and causes to
double the storage of the topic, while the only goal is to adapt statelessly
the data prior to storing (rockDB) at the receiving end.
See discussion on
:[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic]
As a workaround, I have used `new Builder().addGlobalStore(....)` with a Custom
Processor able to filter and map prior to store (see attached). Although this
seem to work, I believe this functionality should be part of the basic dsl api
when working with a globalTable (`new
StreamsBuilder().globalTable().filter(...).map()... `).
was:
When consuming a globalKTable (with the expectation of caching all the data of
a topic in a consumer store), we can't apply any stateless transformation
(filter, map), prior to materializing. To achieve this, while ensure to consume
the records of all the partitions, we must first run a stream app that
pre-process the ingress topic into an exact K,V egress topic as we want to
store in our GlobalKTable. This looks unnecessarily complex, and causes to
double the storage of the topic, while the only goal is to adapt statelessly
the data prior to storing (rockDB) at the receiving end.
See discussion on
:https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic
As a workaround, I have used `new Builder().addGlobalStore(....)` with a Custom
Processor able to filter and map prior to store (see attached). Although this
seem to work, I believe this functionality should be part of the basic dsl api
when working with a globalTable (`new
StreamsBuilder().globalTable().filter(...).map()... `).
> Ability to apply DSL stateless transformation on a global table
> ---------------------------------------------------------------
>
> Key: KAFKA-7397
> URL: https://issues.apache.org/jira/browse/KAFKA-7397
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Frederic Tardif
> Priority: Major
> Attachments: kafka.zip
>
>
> When consuming a globalKTable (with the expectation of caching all the data
> of a topic in a consumer store), we can't apply any stateless transformation
> (filter, map), prior to materializing. To achieve this, while ensuring to
> consume the records of all the partitions, we must first run a stream app
> that pre-process the ingress topic into an exact K,V egress topic as we want
> to store in our GlobalKTable. This looks unnecessarily complex, and causes to
> double the storage of the topic, while the only goal is to adapt statelessly
> the data prior to storing (rockDB) at the receiving end.
> See discussion on
> :[https://stackoverflow.com/questions/50993292/kafka-streams-shared-changelog-topic]
> As a workaround, I have used `new Builder().addGlobalStore(....)` with a
> Custom Processor able to filter and map prior to store (see attached).
> Although this seem to work, I believe this functionality should be part of
> the basic dsl api when working with a globalTable (`new
> StreamsBuilder().globalTable().filter(...).map()... `).
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)