[ https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-6840: ----------------------------------- Priority: Major (was: Blocker) > support windowing in ktable API > ------------------------------- > > Key: KAFKA-6840 > URL: https://issues.apache.org/jira/browse/KAFKA-6840 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.1.0 > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > Labels: api, needs-kip > > The StreamsBuilder provides table() API to materialize a changelog topic into > a local key-value store (KTable), which is very convenient. However, current > underlying implementation does not support materializing one topic to a > windowed key-value store, which in certain cases would be very useful. > To make up the gap, we proposed a new API in StreamsBuilder that could get a > windowed Ktable. > The table() API in StreamsBuilder looks like this: > public synchronized <K, V> KTable<K, V> table(final String topic, > final Consumed<K, V> > consumed, > final Materialized<K, V, > KeyValueStore<Bytes, byte[]>> materialized) { > Objects.requireNonNull(topic, "topic can't be null"); > Objects.requireNonNull(consumed, "consumed can't be null"); > Objects.requireNonNull(materialized, "materialized can't be null"); > > materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); > return internalStreamsBuilder.table(topic, > new ConsumedInternal<>(consumed), > new > MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); > } > > Where we could see that the store type is given as KeyValueStore. There is no > flexibility to change it to WindowStore. > > To maintain compatibility of the existing API, we have two options to define > a new API: > 1.Overload existing KTable struct > public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String > topic, > final Consumed<K, V> > consumed, > final Materialized<K, V, > WindowStore<Bytes, byte[]>> materialized); > > This could give developer an alternative to use windowed table instead. > However, this implies that we need to make sure all the KTable logic still > works as expected, such as join, aggregation, etc, so the challenge would be > making sure all current KTable logics work. > > 2.Define a new type called WindowedKTable > public synchronized <K, V> WindowedKTable<K, V> windowedTable(final String > topic, > final Consumed<K, V> > consumed, > final Materialized<K, V, > WindowStore<Bytes, byte[]>> materialized); > The benefit of doing this is that we don’t need to worry about the existing > functionality of KTable. However, the cost is to introduce redundancy of > common operation logic. When upgrading common functionality, we need to take > care of both types. > We could fill in more details in the KIP. Right now I would like to hear some > feedbacks on the two approaches, thank you! -- This message was sent by Atlassian Jira (v8.20.10#820010)