Hi all,
after some further discussions, the best thing to show my Idea of how it
should evolve would be a bigger mock/interface description.
The goal is to reduce the store maintaining processors to only the
Aggregators + and KTableSource. While having KTableSource optionally
materialized.
Introducing KTable:copy() will allow users to maintain state twice if
they really want to. KStream::join*() wasn't touched. I never personally
used that so I didn't feel
comfortable enough touching it. Currently still making up my mind. None
of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea
seems ideal here.
please have a look. Looking forward for your opinions.
Best Jan
On 21.06.2017 17:24, Eno Thereska wrote:
(cc’ing user-list too)
Given that we already have StateStoreSuppliers that are configurable using the
fluent-like API, probably it’s worth discussing the other examples with joins
and serdes first since those have many overloads and are in need of some TLC.
So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)
etc?
I like the approach since it still remains declarative and it’d reduce the
number of overloads by quite a bit.
Eno
On Jun 21, 2017, at 3:37 PM, Damian Guy <damian....@gmail.com> wrote:
Hi,
I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs, on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).
So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?
One option would be to use a fluent API approach for providing the optional
params, so something like this:
groupedStream.count()
.withStoreName("name")
.withCachingEnabled(false)
.withLoggingEnabled(config)
.table()
Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())
Another option is to say: Hey we don't need this, what are you on about!
The above has focussed on state store related overloads, but the same ideas
could be applied to joins etc, where we presently have many join methods
and many overloads.
Anyway, i look forward to hearing your opinions.
Thanks,
Damian
@InterfaceStability.Evolving
public interface KTable<K, V> {
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR>
mapper);
KStream<K, V> toStream();
KTable<K,V> copy(); Inserts a new KTableSource
KTable<K,V> copy(Materialized m); inserts a new KTableSource using
toStream() as parent
//I see why, Id rather have users using to+table
KTable<K, V> through(final String topic);
KTable<K, V> through(Produced p,
final String topic);
void to(final String topic);
void to(final Produced<K,V>
final String topic);
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ?
super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ?
super V, KeyValue<KR, VR>> selector, Serialized s);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ?
extends VR> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ?
extends VR> joiner);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ?
extends VR> joiner);
UninitializedQueryHandle<K,V> QueryHandle(); // causes enable sending old
value / materialize
//Currently marked deprecated, easily reproduced by map or similiar
void writeAsText(final String filePath);
void writeAsText(final String filePath,
final String streamName);
void writeAsText(final String filePath,
final Serde<K> keySerde,
final Serde<V> valSerde);
void writeAsText(final String filePath,
final String streamName,
final Serde<K> keySerde,
final Serde<V> valSerde);
void foreach(final ForeachAction<? super K, ? super V> action);
}
public interface UninitializedQueryHandle<K,V>{
QueryHandle<K,V> initialize(KafkaStreams ks);
}
public interface QueryHandle<K,V> {
V get(K k);
}
public interface Produced<K,V>{
Produced<K,V> static with();
Produced<K,V> serializer(Serialized s);
Produced<K,V> partitioner(StreamPartitionier sp);
//sneaky new feature. skipable
Produced<K,V> internalTopic();
//Hint ignored when cogrouping enforces other or exception thrown? this
sneaks in a new feature IDC really might help "through()" to bring some value
Produced<K,V> internalTopic(int numPartitionsHint);
}
public interface Serialized<K,V>{
static Serialized<K,V> using();
Serialized<K,V> keySerde(Serde<K> s);
Serialized<K,V> valueSerde(Serde<V> s);
}
public interface KGroupedTable<K,V>{
KTable<K, V> reduce(Reducer<V> adder,
Reducer<V> subtractor,
String name);
<T> KTable<K, T> aggregate(Initializer<T> initializer,
Aggregator<K, V, T> adder,
Aggregator<K, V, T> substractor,
Serde<T> aggValueSerde,
String name);
<T> KTable<K, T> aggregate(Initializer<T> initializer,
Aggregator<K, V, T> adder,
Aggregator<K, V, T> substractor,
String name);
KTable<K, Long> count(String name);
// would allow to sneak in another aggregate store
// users can decide their aggregate store freely.
KGroupedTable<K,V> resultStored(Materialized m);
}
public class KStreamBuilder{
public <K, V> KTable<K, V> table(String topic)
public <K, V> KTable<K, V> table(Serialized s, String topic)
public <K, V> KTable<K, V> table(Materialized m, String topic) // kinda
ugly materialization is still postponed until need
GlobalKTable<K, V> globalTable(); // silently materialize the global
table
GlobalKTable<K, V> globalTable(Materialized<K, V>);
//KStream kept the same currently
}
public interface Materialized{
Materialized<K, V> static with();
Materialized<K, V> serializer(Serialized s);
Materialized<K, V> supplier(StatestoreSupplier sss);
// leaving those out. not logging takes away determinism
// to drastic for me + all the optimisations beeing planned
// this really means, we have the data somewhere else.
// Materialized<K, V> loggingEnabled(Map<String, String>);
// Materialized<K, V> loggingDisabled();
Materialized<K, V> cachingEnabled();
Materialized<K, V> cachingDisabled();
}
// Definetly feel less comfortable on the streams side. Not really using them
day2day
@InterfaceStability.Evolving
public interface KGroupedStream<K, V> {
KTable<K, Long> count();
KTable<K, V> reduce(final Reducer<V> reducer);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR>
aggregator,
final Serde<VR> aggValueSerde);
KGroupedStream<K, V> resultStored(Materialized m);
KGroupedSessionedStream<K,V> intoSessions(SessionWindows sessionWindows);
KGroupedSessionedStream<K,V> intoSessions(SessionWindows sessionWindows,
Materialized m);
KGroupedWindowedStream<K,V> intoWindows(Windows<W> windows);
KGroupedWindowedStream<K,V> intoWindows(Windows<W> windows, Materialized m);
}
public interface KGroupedSessionedStream<K,V>{
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
KTable<Windowed<K>, Long> count();
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V,
T> aggregator,
final Merger<? super K, T>
sessionMerger,
final Serde<T> aggValueSerde);
}
public interface KGroupedWindowedStream<K,V>{
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final
Initializer<VR> initializer,
final Aggregator<?
super K, ? super V, VR> aggregator,
final Serde<VR>
aggValueSerde);
<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V>
reducer);
<W extends Window> KTable<Windowed<K>, Long> count();
}