[ 
https://issues.apache.org/jira/browse/KAFKA-9260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989216#comment-16989216
 ] 

Guozhang Wang commented on KAFKA-9260:
--------------------------------------

One cause of the current design is that we used to build the topology first, 
then pass in the config which contains the default serdes, hence it is done in 
two consecutive step.

However now with optimization we already have a 
StreamsBuilder.build(Properties) override, and in practice it would not be too 
aggressive to "always" require the configs be ready when building the topology 
anyways. So on top of my head I think:

1) removing the other `StreamsBuilder.build()` without params to always enforce 
configs being passed in.
2) removing the `KafkaStreams` construct that takes topology and config, but 
only the topology since it should contain the configs already.

Then in the topology building phase, we can have a uniform framework as "if 
there's a serde inheritable, use it; otherwise use the default serde from 
config directly and apply any wrapping logic if necessary". WDYT?

> Improve Serde "push down" and "wrapping"
> ----------------------------------------
>
>                 Key: KAFKA-9260
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9260
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Priority: Major
>
> Kafka Streams DSL supports "serde push down" feature to let downstream 
> operators inherit upstream serdes if key and/or value are not modified. 
> Furthermore, some operators "wrap" user specified serdes internally (eg, 
> windowed aggregations wrap the user specified key-serde with a time/session 
> window serde – some stores use `ValueAndTimestampSerde` and foreign-key joins 
> also uses some internal wrappers).
> The current implementation faces couple of issues, because the "serde push 
> down" feature is a DSL level feature that is used when the Topology is 
> generated. Furthermore, "serde wrapping" is an operator specific feature, not 
> a DSL concept per-se. At runtime, neither "push down" nor "wrapping" are know 
> concepts.
> This design implies that if users specify serdes, wrapping and push down 
> works as expected. However, if we fall back to default serdes, special care 
> needs to be taken: for example, some operators not apply the wrapping logic 
> during translation time, and there is additional code that does the wrapping 
> of default serdes as runtime. Another approach would be to wrap a null-Serde, 
> and update the wrapper later (ie, overwrite `null` with the default serde 
> from the config).
> Overall, the current design leads to bugs (eg, KAFKA-9248 and KAFKA-9259), 
> and user confusion how it actually works and when/where to specify serdes. 
> Hence, we should consider to rework how we do serde push down and/or wrapping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to