I've updated the experimental code with a couple of ways of doing joins.
One following the fluent approach and one following the builder approach.
The 2 examples can be found here:
https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java#L714

The code looks like:

@Test
public void shouldBeFluentIsh() throws Exception {
    final KStream<String, String> one = null;
    final KStream<String, String> two = null;
    final Serde<String> serde = null;
    final ValueJoiner<String, String, String> vj = null;

    // inner join
    one.join2(two, vj, JoinWindows.of(10))
            .withKeySerde(serde)
            .withThisValueSerde(serde)
            .withOtherValueSerde(serde)
            .stream();

    // left join
    one.join2(two, vj, JoinWindows.of(10))
            .withJoinType(JoinType.LEFT)
            .stream();
}

@Test
public void shouldUseBuilder() throws Exception {
    final KStream<String, String> one = null;
    final KStream<String, String> two = null;
    final Serde<String> serde = null;
    final ValueJoiner<String, String, String> vj = null;

    //inner
    one.join(Joins.streamStreamJoin(two, vj, JoinWindows.of(10)).build());

    //left
    one.join(Joins.streamStreamJoin(two, vj,
JoinWindows.of(10)).withJoinType(JoinType.LEFT).build());
}


I'm not going to say which way i'm leaning, yet!

Thanks,
Damian

On Thu, 29 Jun 2017 at 11:47 Damian Guy <damian....@gmail.com> wrote:

>
>> However, I don't understand your argument about putting aggregate()
>> after the withXX() -- all the calls to withXX() set optional parameters
>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
>> indicates that the withXX() belongs to the groupBy(). IMHO, this might
>> be quite confusion for developers.
>>
>>
> I see what you are saying, but the grouped stream is effectively a no-op
> until you call one of the aggregate/count/reduce etc functions. So the
> optional params are ones that are applicable to any of the operations you
> can perform on this grouped stream. Then the final
> count()/reduce()/aggregate() call has any of the params that are
> required/specific to that function.
>
>
>>
>> -Matthias
>>
>> On 6/28/17 2:55 AM, Damian Guy wrote:
>> >> I also think that mixing optional parameters with configs is a bad
>> idea.
>> >> Have not proposal for this atm but just wanted to mention it. Hope to
>> >> find some time to come up with something.
>> >>
>> >>
>> > Yes, i don't like the mix of config either. But the only real config
>> here
>> > is the logging config - which we don't really need as it can already be
>> > done via a custom StateStoreSupplier.
>> >
>> >
>> >> What I don't like in the current proposal is the
>> >> .grouped().withKeyMapper() -- the current solution with .groupBy(...)
>> >> and .groupByKey() seems better. For clarity, we could rename to
>> >> .groupByNewKey(...) and .groupByCurrentKey() (even if we should find
>> >> some better names).
>> >>
>> >>
>> > it could be groupByKey(), groupBy() or something different bt
>> >
>> >
>> >
>> >> The proposed pattern "chains" grouping and aggregation too close
>> >> together. I would rather separate both more than less, ie, do into the
>> >> opposite direction.
>> >>
>> >> I am also wondering, if we could so something more "fluent". The
>> initial
>> >> proposal was like:
>> >>
>> >>>> groupedStream.count()
>> >>>>    .withStoreName("name")
>> >>>>    .withCachingEnabled(false)
>> >>>>    .withLoggingEnabled(config)
>> >>>>    .table()
>> >>
>> >> The .table() statement in the end was kinda alien.
>> >>
>> >
>> > I agree, but then all of the withXXX methods need to be on KTable which
>> is
>> > worse in my opinion. You also need something that is going to "build"
>> the
>> > internal processors and add them to the topology.
>> >
>> >
>> >> The current proposal put the count() into the end -- ie, the optional
>> >> parameter for count() have to specified on the .grouped() call -- this
>> >> does not seems to be the best way either.
>> >>
>> >>
>> > I actually prefer this method as you are building a grouped stream that
>> you
>> > will aggregate. So table.grouped(...).withOptionalStuff().aggregate(..)
>> etc
>> > seems natural to me.
>> >
>> >
>> >> I did not think this through in detail, but can't we just do the
>> initial
>> >> proposal with the .table() ?
>> >>
>> >> groupedStream.count().withStoreName("name").mapValues(...)
>> >>
>> >> Each .withXXX(...) return the current KTable and all the .withXXX() are
>> >> just added to the KTable interface. Or do I miss anything why this
>> wont'
>> >> work or any obvious disadvantage?
>> >>
>> >>
>> >>
>> > See above.
>> >
>> >
>> >>
>> >> -Matthias
>> >>
>> >> On 6/22/17 4:06 AM, Damian Guy wrote:
>> >>> Thanks everyone. My latest attempt is below. It builds on the fluent
>> >>> approach, but i think it is slightly nicer.
>> >>> I agree with some of what Eno said about mixing configy stuff in the
>> DSL,
>> >>> but i think that enabling caching and enabling logging are things that
>> >>> aren't actually config. I'd probably not add withLogConfig(...) (even
>> >>> though it is below) as this is actually config and we already have a
>> way
>> >> of
>> >>> doing that, via the StateStoreSupplier. Arguably we could use the
>> >>> StateStoreSupplier for disabling caching etc, but as it stands that
>> is a
>> >>> bit of a tedious process for someone that just wants to use the
>> default
>> >>> storage engine, but not have caching enabled.
>> >>>
>> >>> There is also an orthogonal concern that Guozhang alluded to.... If
>> you
>> >>> want to plug in a custom storage engine and you want it to be logged
>> etc,
>> >>> you would currently need to implement that yourself. Ideally we can
>> >> provide
>> >>> a way where we will wrap the custom store with logging, metrics, etc.
>> I
>> >>> need to think about where this fits, it is probably more appropriate
>> on
>> >> the
>> >>> Stores API.
>> >>>
>> >>> final KeyValueMapper<String, String, Long> keyMapper = null;
>> >>> // count with mapped key
>> >>> final KTable<Long, Long> count = stream.grouped()
>> >>>         .withKeyMapper(keyMapper)
>> >>>         .withKeySerde(Serdes.Long())
>> >>>         .withValueSerde(Serdes.String())
>> >>>         .withQueryableName("my-store")
>> >>>         .count();
>> >>>
>> >>> // windowed count
>> >>> final KTable<Windowed<String>, Long> windowedCount = stream.grouped()
>> >>>         .withQueryableName("my-window-store")
>> >>>         .windowed(TimeWindows.of(10L).until(10))
>> >>>         .count();
>> >>>
>> >>> // windowed reduce
>> >>> final Reducer<String> windowedReducer = null;
>> >>> final KTable<Windowed<String>, String> windowedReduce =
>> stream.grouped()
>> >>>         .withQueryableName("my-window-store")
>> >>>         .windowed(TimeWindows.of(10L).until(10))
>> >>>         .reduce(windowedReducer);
>> >>>
>> >>> final Aggregator<String, String, Long> aggregator = null;
>> >>> final Initializer<Long> init = null;
>> >>>
>> >>> // aggregate
>> >>> final KTable<String, Long> aggregate = stream.grouped()
>> >>>         .withQueryableName("my-aggregate-store")
>> >>>         .aggregate(aggregator, init, Serdes.Long());
>> >>>
>> >>> final StateStoreSupplier<KeyValueStore<String, Long>>
>> stateStoreSupplier
>> >> = null;
>> >>>
>> >>> // aggregate with custom store
>> >>> final KTable<String, Long> aggWithCustomStore = stream.grouped()
>> >>>         .withStateStoreSupplier(stateStoreSupplier)
>> >>>         .aggregate(aggregator, init);
>> >>>
>> >>> // disable caching
>> >>> stream.grouped()
>> >>>         .withQueryableName("name")
>> >>>         .withCachingEnabled(false)
>> >>>         .count();
>> >>>
>> >>> // disable logging
>> >>> stream.grouped()
>> >>>         .withQueryableName("q")
>> >>>         .withLoggingEnabled(false)
>> >>>         .count();
>> >>>
>> >>> // override log config
>> >>> final Reducer<String> reducer = null;
>> >>> stream.grouped()
>> >>>         .withLogConfig(Collections.singletonMap("segment.size", "10"))
>> >>>         .reduce(reducer);
>> >>>
>> >>>
>> >>> If anyone wants to play around with this you can find the code here:
>> >>> https://github.com/dguy/kafka/tree/dsl-experiment
>> >>>
>> >>> Note: It won't actually work as most of the methods just return null.
>> >>>
>> >>> Thanks,
>> >>> Damian
>> >>>
>> >>>
>> >>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ism...@juma.me.uk> wrote:
>> >>>
>> >>>> Thanks Damian. I think both options have pros and cons. And both are
>> >> better
>> >>>> than overload abuse.
>> >>>>
>> >>>> The fluent API approach reads better, no mention of builder or build
>> >>>> anywhere. The main downside is that the method signatures are a
>> little
>> >> less
>> >>>> clear. By reading the method signature, one doesn't necessarily knows
>> >> what
>> >>>> it returns. Also, one needs to figure out the special method
>> (`table()`
>> >> in
>> >>>> this case) that gives you what you actually care about (`KTable` in
>> this
>> >>>> case). Not major issues, but worth mentioning while doing the
>> >> comparison.
>> >>>>
>> >>>> The builder approach avoids the issues mentioned above, but it
>> doesn't
>> >> read
>> >>>> as well.
>> >>>>
>> >>>> Ismael
>> >>>>
>> >>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian....@gmail.com>
>> >> wrote:
>> >>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> I'd like to get a discussion going around some of the API choices
>> we've
>> >>>>> made in the DLS. In particular those that relate to stateful
>> operations
>> >>>>> (though this could expand).
>> >>>>> As it stands we lean heavily on overloaded methods in the API, i.e,
>> >> there
>> >>>>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy
>> and
>> >> i
>> >>>>> feel it is only going to get worse as we add more optional params.
>> In
>> >>>>> particular we've had some requests to be able to turn caching off,
>> or
>> >>>>> change log configs,  on a per operator basis (note this can be done
>> now
>> >>>> if
>> >>>>> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
>> >>>>>
>> >>>>> So this is a bit of an open question. How can we change the DSL
>> >> overloads
>> >>>>> so that it flows, is simple to use and understand, and is easily
>> >> extended
>> >>>>> in the future?
>> >>>>>
>> >>>>> One option would be to use a fluent API approach for providing the
>> >>>> optional
>> >>>>> params, so something like this:
>> >>>>>
>> >>>>> groupedStream.count()
>> >>>>>    .withStoreName("name")
>> >>>>>    .withCachingEnabled(false)
>> >>>>>    .withLoggingEnabled(config)
>> >>>>>    .table()
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> Another option would be to provide a Builder to the count method,
>> so it
>> >>>>> would look something like this:
>> >>>>> groupedStream.count(new
>> >>>>> CountBuilder("storeName").withCachingEnabled(false).build())
>> >>>>>
>> >>>>> Another option is to say: Hey we don't need this, what are you on
>> >> about!
>> >>>>>
>> >>>>> The above has focussed on state store related overloads, but the
>> same
>> >>>> ideas
>> >>>>> could  be applied to joins etc, where we presently have many join
>> >> methods
>> >>>>> and many overloads.
>> >>>>>
>> >>>>> Anyway, i look forward to hearing your opinions.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Damian
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >>
>> >
>>
>>

Reply via email to