I am with Gouzhang here.
I think all the suggestions are far to short-sighted. Especially this
wired materialize(String) call is broken totally and people go nuts
about how this will look. + Implementing more and better joins, not this
wired one we got currently. Implementing an one to many join I couln't
get away without 3 highly complex value mappers
final ValueMapper<VR, K>
keyExtractor,
final ValueMapper<KL, K>
joinPrefixFaker,
final ValueMapper<K, KL>
leftKeyExtractor,
in addition to the one joiner of course
final ValueJoiner<VL, VR, V> joiner,
how to specify if its outer or inner is for sure the smallest problem we
are going to face with proper join semantics. What the resulting Key
will be is is also highly discussable. What happens to the key is very
complex and the API has to tell the user.
Bringing this discussion into a good direction, we would need sample
interfaces we could mock against ( as gouzhang suggested) + We need to
know how the implementation (of joins especially) will be later. As I
strongly recommend stopping the usage of ChangeSerde and have "properly"
repartitioned topic. That is just sane IMO
Best Jan
On 22.06.2017 11:54, Eno Thereska wrote:
Note that while I agree with the initial proposal (withKeySerdes, withJoinType,
etc), I don't agree with things like .materialize(), .enableCaching(),
.enableLogging().
The former maintain the declarative DSL, while the later break the declarative
part by mixing system decisions in the DSL. I think there is a difference
between the two proposals.
Eno
On 22 Jun 2017, at 03:46, Guozhang Wang <wangg...@gmail.com> wrote:
I have been thinking about reducing all these overloaded functions for
stateful operations (there are some other places that introduces overloaded
functions but let's focus on these only in this discussion), what I used to
have is to use some "materialize" function on the KTables, like:
---------------------------------------
// specifying the topology
KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows); // do not allow to pass-in a state store
supplier here any more
// additional specs along with the topology above
table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // add the metrics / logging /
caching / windowing functionalities on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..
---------------------------------------
But thinking about it more, I feel Damian's first proposal is better since
my proposal would likely to break the concatenation (e.g. we may not be
able to do sth. like "table1.filter().map().groupBy().aggregate()" if we
want to use different specs for the intermediate filtered KTable).
But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right. So I'd call out for anyone try to rewrite
your examples / demo code with the proposed new API and see if it feel
natural, for example, if I want to use a different storage engine than the
default rockDB engine how could I easily specify that with the proposed
APIs?
Meanwhile Damian could you provide a formal set of APIs for people to
exercise on them? Also could you briefly describe how custom storage
engines could be swapped in with the above APIs?
Guozhang
On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska <eno.there...@gmail.com>
wrote:
To make it clear, it’s outlined by Damian, I just copy pasted what he told
me in person :)
Eno
On Jun 21, 2017, at 4:40 PM, Bill Bejeck <bbej...@gmail.com> wrote:
+1 for the approach outlined above by Eno.
On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy <damian....@gmail.com>
wrote:
Thanks Eno.
Yes i agree. We could apply this same approach to most of the operations
where we have multiple overloads, i.e., we have a single method for each
operation that takes the required parameters and everything else is
specified as you have done above.
On Wed, 21 Jun 2017 at 16:24 Eno Thereska <eno.there...@gmail.com>
wrote:
(cc’ing user-list too)
Given that we already have StateStoreSuppliers that are configurable
using
the fluent-like API, probably it’s worth discussing the other examples
with
joins and serdes first since those have many overloads and are in need
of
some TLC.
So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)
etc?
I like the approach since it still remains declarative and it’d reduce
the
number of overloads by quite a bit.
Eno
On Jun 21, 2017, at 3:37 PM, Damian Guy <damian....@gmail.com> wrote:
Hi,
I'd like to get a discussion going around some of the API choices
we've
made in the DLS. In particular those that relate to stateful
operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e,
there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and
i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs, on a per operator basis (note this can be done
now
if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).
So this is a bit of an open question. How can we change the DSL
overloads
so that it flows, is simple to use and understand, and is easily
extended
in the future?
One option would be to use a fluent API approach for providing the
optional
params, so something like this:
groupedStream.count()
.withStoreName("name")
.withCachingEnabled(false)
.withLoggingEnabled(config)
.table()
Another option would be to provide a Builder to the count method, so
it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())
Another option is to say: Hey we don't need this, what are you on
about!
The above has focussed on state store related overloads, but the same
ideas
could be applied to joins etc, where we presently have many join
methods
and many overloads.
Anyway, i look forward to hearing your opinions.
Thanks,
Damian
--
-- Guozhang