[
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245142#comment-15245142
]
Guozhang Wang commented on KAFKA-3429:
--------------------------------------
The {{originalSerde}} is used to materialize the KTable that is going to be
repartitioned on an internal topic, and in cases where this KTable is generated
from a source topic or is created from an aggregation, where a `serde` is
specified in that previous operation. To take an concrete example:
{{<K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
Aggregator<K1, V1, T> adder,
Aggregator<K1, V1, T> substractor,
KeyValueMapper<K, V, KeyValue<K1, V1>>
selector,
Serde<K1> keySerde,
Serde<V1> valueSerde,
Serde<T> aggValueSerde,
String name);}}
Here the {{keySerde}} and {{valueSerde}} are used for materializing the
selected {{<K1, V1>}} pairs, and {{keySerde}} and {{aggValueSerde}} are used
for materializing the aggregated key-values {{<K1, T>}}. In this general case
all three serdes are needed since they are not provided before. While in some
special cases like:
{{<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
Serde<K1> keySerde,
Serde<V> valueSerde,
String name);}}
We can save the {{valueSerde}} as it may be already provided.
> Remove Serdes needed for repartitioning in KTable stateful operations
> ---------------------------------------------------------------------
>
> Key: KAFKA-3429
> URL: https://issues.apache.org/jira/browse/KAFKA-3429
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Matthias J. Sax
> Labels: api, newbie++
> Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly
> needed since the aggregation key may not be the same as the original primary
> key, we require the users to provide serdes (default to configured ones) for
> read / write to the internally created re-partition topic. However, these are
> not necessary since for all KTable instances either generated from the topics
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the
> same serde can be re-used when the resulted KTable is involved in future
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde,
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in
> table1.aggregate since it could always reuse the "serde" from
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde
> information along with the KTableImpl instance in order to re-use it in a
> future operation that requires repartitioning.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)