[ https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245142#comment-15245142 ]
Guozhang Wang commented on KAFKA-3429: -------------------------------------- The {{originalSerde}} is used to materialize the KTable that is going to be repartitioned on an internal topic, and in cases where this KTable is generated from a source topic or is created from an aggregation, where a `serde` is specified in that previous operation. To take an concrete example: {{<K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer, Aggregator<K1, V1, T> adder, Aggregator<K1, V1, T> substractor, KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde, Serde<T> aggValueSerde, String name);}} Here the {{keySerde}} and {{valueSerde}} are used for materializing the selected {{<K1, V1>}} pairs, and {{keySerde}} and {{aggValueSerde}} are used for materializing the aggregated key-values {{<K1, T>}}. In this general case all three serdes are needed since they are not provided before. While in some special cases like: {{<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector, Serde<K1> keySerde, Serde<V> valueSerde, String name);}} We can save the {{valueSerde}} as it may be already provided. > Remove Serdes needed for repartitioning in KTable stateful operations > --------------------------------------------------------------------- > > Key: KAFKA-3429 > URL: https://issues.apache.org/jira/browse/KAFKA-3429 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Guozhang Wang > Assignee: Matthias J. Sax > Labels: api, newbie++ > Fix For: 0.10.1.0 > > > Currently in KTable aggregate operations where a repartition is possibly > needed since the aggregation key may not be the same as the original primary > key, we require the users to provide serdes (default to configured ones) for > read / write to the internally created re-partition topic. However, these are > not necessary since for all KTable instances either generated from the topics > directly: > {code}table = builder.table(...){code} > or from aggregation operations: > {code}table = stream.aggregate(...){code} > There are already serde provided for materializing the data, and hence the > same serde can be re-used when the resulted KTable is involved in future > aggregation operations. For example: > {code} > table1 = stream.aggregateByKey(serde); > table2 = table1.aggregate(aggregator, selector, originalSerde, > aggregateSerde); > {code} > We would not need to require users to specify the "originalSerde" in > table1.aggregate since it could always reuse the "serde" from > stream.aggregateByKey, which is used to materialize the table1 object. > In order to get ride of it, implementation-wise we need to carry the serde > information along with the KTableImpl instance in order to re-use it in a > future operation that requires repartitioning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)