[ 
https://issues.apache.org/jira/browse/KAFKA-3429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15245142#comment-15245142
 ] 

Guozhang Wang commented on KAFKA-3429:
--------------------------------------

The {{originalSerde}} is used to materialize the KTable that is going to be 
repartitioned on an internal topic, and in cases where this KTable is generated 
from a source topic or is created from an aggregation, where a `serde` is 
specified in that previous operation. To take an concrete example:

{{<K1, V1, T> KTable<K1, T> aggregate(Initializer<T> initializer,
                                        Aggregator<K1, V1, T> adder,
                                        Aggregator<K1, V1, T> substractor,
                                        KeyValueMapper<K, V, KeyValue<K1, V1>> 
selector,
                                        Serde<K1> keySerde,
                                        Serde<V1> valueSerde,
                                        Serde<T> aggValueSerde,
                                        String name);}}

Here the {{keySerde}} and {{valueSerde}} are used for materializing the 
selected {{<K1, V1>}} pairs, and {{keySerde}} and {{aggValueSerde}} are used 
for materializing the aggregated key-values {{<K1, T>}}. In this general case 
all three serdes are needed since they are not provided before. While in some 
special cases like:

{{<K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> selector,
                                Serde<K1> keySerde,
                                Serde<V> valueSerde,
                                String name);}}

We can save the {{valueSerde}} as it may be already provided.

> Remove Serdes needed for repartitioning in KTable stateful operations
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-3429
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3429
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Matthias J. Sax
>              Labels: api, newbie++
>             Fix For: 0.10.1.0
>
>
> Currently in KTable aggregate operations where a repartition is possibly 
> needed since the aggregation key may not be the same as the original primary 
> key, we require the users to provide serdes (default to configured ones) for 
> read / write to the internally created re-partition topic. However, these are 
> not necessary since for all KTable instances either generated from the topics 
> directly:
> {code}table = builder.table(...){code}
> or from aggregation operations:
> {code}table = stream.aggregate(...){code}
> There are already serde provided for materializing the data, and hence the 
> same serde can be re-used when the resulted KTable is involved in future 
> aggregation operations. For example:
> {code}
> table1 = stream.aggregateByKey(serde);
> table2 = table1.aggregate(aggregator, selector, originalSerde, 
> aggregateSerde);
> {code}
> We would not need to require users to specify the "originalSerde" in 
> table1.aggregate since it could always reuse the "serde" from 
> stream.aggregateByKey, which is used to materialize the table1 object.
> In order to get ride of it, implementation-wise we need to carry the serde 
> information along with the KTableImpl instance in order to re-use it in a 
> future operation that requires repartitioning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to