Hi all,

after some further discussions, the best thing to show my Idea of how it should evolve would be a bigger mock/interface description. The goal is to reduce the store maintaining processors to only the Aggregators + and KTableSource. While having KTableSource optionally materialized.

Introducing KTable:copy() will allow users to maintain state twice if they really want to. KStream::join*() wasn't touched. I never personally used that so I didn't feel comfortable enough touching it. Currently still making up my mind. None of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea seems ideal here.

please have a look. Looking forward for your opinions.

Best Jan



On 21.06.2017 17:24, Eno Thereska wrote:
(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
    .withKeySerdes(…)
    .withValueSerdes(…)
    .withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno

On Jun 21, 2017, at 3:37 PM, Damian Guy <damian....@gmail.com> wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian

@InterfaceStability.Evolving
public interface KTable<K, V> {

    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
    <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> 
mapper);

    KStream<K, V> toStream();

    KTable<K,V> copy(); Inserts a new KTableSource
    KTable<K,V> copy(Materialized m); inserts a new KTableSource using 
toStream() as parent
    

   //I see why, Id rather have users using to+table
    KTable<K, V> through(final String topic);
    KTable<K, V> through(Produced p,
                         final String topic);

    void to(final String topic);
    void to(final Produced<K,V>
            final String topic);

    <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? 
super V, KeyValue<KR, VR>> selector);
    <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? 
super V, KeyValue<KR, VR>> selector, Serialized s);

    <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? 
extends VR> joiner);

    <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                    final ValueJoiner<? super V, ? super VO, ? 
extends VR> joiner);

    <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                     final ValueJoiner<? super V, ? super VO, ? 
extends VR> joiner);

    UninitializedQueryHandle<K,V> QueryHandle(); // causes enable sending old 
value / materialize



    //Currently marked deprecated, easily reproduced by map or similiar
    void writeAsText(final String filePath);
    void writeAsText(final String filePath,
                     final String streamName);
    void  writeAsText(final String filePath,
                      final Serde<K> keySerde,
                      final Serde<V> valSerde);
    void writeAsText(final String filePath,
                     final String streamName,
                     final Serde<K> keySerde,
                     final Serde<V> valSerde);
    void foreach(final ForeachAction<? super K, ? super V> action);
}


public interface UninitializedQueryHandle<K,V>{

        QueryHandle<K,V> initialize(KafkaStreams ks);
}

public interface QueryHandle<K,V> {

        V get(K k);

}

public interface Produced<K,V>{

        Produced<K,V> static with();
        
        Produced<K,V> serializer(Serialized s);

        Produced<K,V> partitioner(StreamPartitionier sp);

        //sneaky new feature. skipable
        Produced<K,V> internalTopic();
        //Hint ignored when cogrouping enforces other or exception thrown? this 
sneaks in a new feature IDC really might help "through()" to bring some value
        Produced<K,V> internalTopic(int numPartitionsHint);
}

public interface Serialized<K,V>{

        static Serialized<K,V> using();
        
        Serialized<K,V> keySerde(Serde<K> s);

        Serialized<K,V> valueSerde(Serde<V> s);
}

public interface KGroupedTable<K,V>{
        
        KTable<K, V> reduce(Reducer<V> adder,
                        Reducer<V> subtractor,
                        String name);

        <T> KTable<K, T> aggregate(Initializer<T> initializer,
                               Aggregator<K, V, T> adder,
                               Aggregator<K, V, T> substractor,
                               Serde<T> aggValueSerde,
                               String name);

        <T> KTable<K, T> aggregate(Initializer<T> initializer,
                               Aggregator<K, V, T> adder,
                               Aggregator<K, V, T> substractor,
                               String name);

        KTable<K, Long> count(String name);

        // would allow to sneak in another aggregate store
        // users can decide their aggregate store freely. 
        KGroupedTable<K,V> resultStored(Materialized m); 
}

public class KStreamBuilder{

        public <K, V> KTable<K, V> table(String topic)
        public <K, V> KTable<K, V> table(Serialized s, String topic)
        public <K, V> KTable<K, V> table(Materialized m, String topic) // kinda 
ugly materialization is still postponed until need

        GlobalKTable<K, V> globalTable(); // silently materialize the global 
table
        GlobalKTable<K, V> globalTable(Materialized<K, V>);

        //KStream kept the same currently

}

public interface Materialized{

    Materialized<K, V>  static with();

    Materialized<K, V>  serializer(Serialized s);

    Materialized<K, V>  supplier(StatestoreSupplier sss);

//    leaving those out. not logging takes away determinism
//    to drastic for me + all the optimisations beeing planned
//    this really means, we have the data somewhere else.
//    Materialized<K, V>  loggingEnabled(Map<String, String>);    
//    Materialized<K, V>  loggingDisabled();     
                 
    Materialized<K, V>  cachingEnabled();
    Materialized<K, V>  cachingDisabled();

}

// Definetly feel less comfortable on the streams side. Not really using them 
day2day

@InterfaceStability.Evolving
public interface KGroupedStream<K, V> { 

    KTable<K, Long> count();
    KTable<K, V> reduce(final Reducer<V> reducer);
    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                 final Aggregator<? super K, ? super V, VR> 
aggregator,
                                 final Serde<VR> aggValueSerde);

    KGroupedStream<K, V> resultStored(Materialized m);

    KGroupedSessionedStream<K,V> intoSessions(SessionWindows sessionWindows);
    KGroupedSessionedStream<K,V> intoSessions(SessionWindows sessionWindows, 
Materialized m); 

    KGroupedWindowedStream<K,V> intoWindows(Windows<W> windows);
    KGroupedWindowedStream<K,V> intoWindows(Windows<W> windows, Materialized m);
}

public interface KGroupedSessionedStream<K,V>{

  KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
  KTable<Windowed<K>, Long> count();
  <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                         final Aggregator<? super K, ? super V, 
T> aggregator,
                                         final Merger<? super K, T> 
sessionMerger,
                                         final Serde<T> aggValueSerde);
}

public interface KGroupedWindowedStream<K,V>{
        <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final 
Initializer<VR> initializer,
                                                             final Aggregator<? 
super K, ? super V, VR> aggregator,
                                                             final Serde<VR> 
aggValueSerde);
        <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> 
reducer);
        <W extends Window> KTable<Windowed<K>, Long> count();
}








Reply via email to