Hi all!
I was about to write another KIP, but found out that KIP-401 addresses
exactly the problem I faced. So let me jump into your discussion and ask
you to assess another idea.
I fully agree with the KIP-401's motivation part. E. g in my project I
had to invent a wrapper class that hides the details of KeyValueStore
management from business logic. Of course this should be done better in
KStreams API.
But I was about to look at this problem from another side and propose a
simple alternative in high-level DSL, that will not fit all the cases,
but most of them. Hence my idea does not exclude the Paul's proposal.
What if we restrict ourselves to *only one* KeyValueStore and propose a
method that resembles `aggregate` and `reduce` methods, like this:
stream
.map(...)
.filter(...)
.transform ((k, v, s)->{....}, Transformed.with(....))
where
* k, v -- input key & value
* s -- a KeyValueStore provided as an argument
* return value of the lambda should be KeyValue.pair(...)
* Transformed.with... is a builder, used in order to define the
Transformer and KeyValueStore building parameters. Some of these
parameters should be:
** store's KeySerde,
** store's ValueSerde,
** whether the store is persistent or in-memory,
** store's name -- optional parameter, the system should be able to
devise the name of the store transparently for the user, if we don't
want to devise it ourselves/share the store between processors.
** scheduled punctuation.
Imagine we have a KStream<String, Integer>, and we need to calculate a
`derivative` stream, that is, a stream of 'deltas' of the provided
integer values.
This could be achieved as simple as
stream.transform((key, value, stateStore) -> {
int previousValue =
Optional.ofNullable(stateStore.get(key)).orElse(0);
stateStore.put(key, value);
return KeyValue.pair(key, value - previousValue);
}
//we do not need to bother with store name, punctuation etc.
//may be even Serde part can be omitted, since we can inherit
the serdes from stream by default
, Transformed.with(Serdes.String(), Serdes.Integer())
}
The hard part of it is that new `transform` method definition should be
parameterized by six type parameters:
* input/output/KeyValueStore key type,
* input/output/KeyValueStore value type.
However, it seems that all these types can be inferred from the provided
lambda and Transformed.with instances.
What do you think about this?
Regards,
Ivan
27.03.2019 20:45, Guozhang Wang пишет:
Hello Paul,
Thanks for the uploaded PR and the detailed description! I've made a pass
on it and left some comments.
Overall I think I agree with you that passing in the storebuilder directly
that store name is more convienent as it does not require another
`addStore` call, but we just need to spend some more documentation effort
on educating users about the two ways of connecting their stores. I'm
slightly concerned about this education curve but I can be convinced if
most people felt it is worthy.
Guozhang
On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen <pgwha...@gmail.com> wrote:
I'd like to resurrect this discussion with a cursory, proof-of-concept
implementation of the KIP which combines many of our ideas:
https://github.com/apache/kafka/pull/6496. I tried to keep the diff as
small as possible for now, just using it to convey the main ideas. But
I'll separately address some of our earlier discussion:
- Will there be a new, separate interface for users to implement for the
new functionality? No, to hopefully keep things simple, all of the
Processor/TransformerSupplier interfaces will just extend
StateStoresSupplier, allowing users to opt in to this functionality by
overriding the default implementation that gives an empty list.
- Will the interface allow users to specify the store name, or the
entire StoreBuilder? The entire StoreBuilder, so the
Processor/TransformerSupplier can completely encapsulate name and
implementation of a state store if desired.
- Will the old way of specifying store names alongside the supplier when
calling stream.process/transform() be deprecated? No, this is still a
legitimate way to wire up Processors/Transformers and their stores. But
I
would recommend not allowing stream.process/transform() calls that use
both
store declaration mechanisms (this restriction is not in the proof of
concept)
- How will we handle adding the same state store to the topology
multiple times because different Processor/TransformerSuppliers declare
it?
topology.addStateStore() will be slightly relaxed for convenience, and
will
allow adding the same StoreBuilder multiple times as long as the exact
same
StoreBuilder instance is being added for the same store name. This
seems
to prevent in practice the issue of accidentally making two state stores
one by adding with the same name. For additional safety, if we wanted
to
(not in the proof of concept), we could allow for this relaxation only
for
internal callers of topology.addStateStore().
So, in summary, the use cases look like:
- 1 transformer/processor that owns its store: Using the new
StateStoresSupplier interface method to supply its StoreBuilders that
will
be added to the topology automatically.
- Multiple transformer/processors that share the same store: Either
1. The old way: the StoreBuilder is defined "far away" from the
Transformer/Processor implementations, and is added to the topology
manually by the user
2. The new way: the StoreBuilder is defined closer to the
Transformer/Processor implementations, and the same instance is
returned by
all Transformer/ProcessorSuppliers that need it
This makes the KIP wiki a bit stale; I'll update if we want to bring this
design to a vote.
Thanks!
Paul
On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang <wangg...@gmail.com> wrote:
Matthias / Paul,
The concern I had about introducing `StoreBuilderSupplier` is simply
because it is another XXSupplier to the public API, so I'd like to ask if
we really have to add it :)
The difference between encapsulating the store name and encapsulating the
full state store builder is that, in the former:
-----------
String storeName = "store1";
builder.addStore(new MyStoreBuilder(storeName));
stream1.transform(new MyTransformerSupplier(storeName)); // following
my
proposal, that the store name can be passed in and used for both
`listStores` and in the `Transformer#init`; so the Transformer function
does not need to get the constant string name again.
// one caveat to admit, is that
MyTransofmerSupplier logic may be just unique to `store1` so it cannot be
reused with a different store name anyways.
-----------
While in the latter:
-----------
stream1.transform(new MyTransformerSupplierForStore1); // the name is
just indicating that we may have one such supplier for each store.
-----------
I understand the latter introduce more convenience from the API, but the
cost is that since we still cannot completely `builder.addStore`, but
only
reduce its semantic scope to shared state stores only,; hence users need
to
learn two ways of creating state stores for those two patterns.
My argument is that more public APIs requires longer learning curve for
users, and introduces more usage patterns that may confuse users (the
proposal I had tries to replace one with another completely).
Guozhang
On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen <pgwha...@gmail.com> wrote:
Thanks for the great thoughts Matthias and Guozhang!
If I'm not mistaken, Guozhang's suggestion is what my second
alternative
on
the KIP is ("Have the added method on the Supplier interfaces only
return
store names, not builders"). I do think it would be a worthwhile
usability
improvement on its own, but to Matthias's point, it doesn't achieve the
full goal of completing encapsulating a state store and it's processor
-
it
encapsulates the name, but not the StateStoreBuilder.
I'm really intrigued by Matthias's idea that forgoes the default
interface
method I proposed. Having smaller, separate interfaces is a powerful
idea
and I think a cleaner API than what I proposed. The non-shared store
use
case is handled well here, and the shared store use case is possible,
though maybe still not as graceful as we would like (having to add the
StoreBuilderSupplier before the StoreNameSupplier seems maybe too
subtle
to
me).
We're all agreed that one of the big problems with the shared store use
case is how to deal with adding the same store to the topology multiple
times. Catching the "store already added" exception is risky. Here's
a
maybe radical idea: change `topology.addStateStore()` to be idempotent
for
adding a given state store name and `StoreBuilder`. In other words,
`addStateStore` would not throw the "store already added" exception if
the
`StoreBuilder` being added for a given name has the same identity as
the
one that has already been added. Does this eliminate all the bugs
we're
worried about? Thinking about it for a few minutes, it seems to
eliminate
most at least (would a user really use the exact same StoreBuilder when
they intend there to be two stores?). It might make the API slightly
harder to use if a user isn't immediately aware of that subtlety, but a
good error message should ease the pain, and it would happen
immediately
during development.
And with regards to Matthias's comment about whether we need to
deprecate
existing varargs transform methods - I don't think we need to, but it
might
be nice for there only to be one way to do things, assuming whatever we
come up with supports all existing use cases. I don't feel strongly
about
this, but if we don't deprecate, I do think it's important to add
checks
that prevent users from trying to do the same thing in two different
ways,
as we've discussed.
Paul
On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax <matth...@confluent.io
wrote:
Guozhang,
Regarding the last option to catch "store exist already" exception
and
fallback to connect stores, I'm a bit concerned it may be hiding
actual
user bugs.
I agree with this concern. From my original email:
The only disadvantage I see, might be
potential bugs about sharing state if two different stores are
named
the
same by mistake (this would not be detected).
For your new proposal: I am not sure if it addresses Paul's original
idea -- I hope Paul can clarify. From my understanding, the idea was
to
encapsulate a store and its processor. As many stores are not shared,
this seems to be quite useful. Your proposal falls a little short to
support encapsulation for none-shared stores.
-Matthias
On 12/15/18 1:40 AM, Guozhang Wang wrote:
Matthias,
Thanks for your feedbacks.
Regarding the last option to catch "store exist already" exception
and
fallback to connect stores, I'm a bit concerned it may be hiding
actual
user bugs.
Thinking about Paul's proposal and your suggestion again, I'd like
to
propose another alternative somewhere in the middle of your
approaches,
i.e. we still let users to create sharable state stores via
`addStateStore`, and we allow the TransformerSupplier to return a
list
of
state stores that it needs, i.e.:
public interface TransformerSupplier<K, V, R> {
Transformer<K, V, R> get();
default List<String> stateStoreNames() {
return Collections.emptyList();
<https://cwiki.apache.org/confluence/pages/Collections.emptyList()
;>
}
}
by doing this users can still "consolidate" the references of store
names
in a single place in the transform call, e.g.:
public class MyTransformerSupplier<K, V, R> {
private String storeName;
public class MyTransformer<K, V, R> {
....
init() {
store = context.getStateStore(storeName);
}
}
default List<String> stateStoreNames() {
return Collections.singletonList(storeName);
<https://cwiki.apache.org/confluence/pages/Collections.emptyList()
;>
}
}
Basically, we move the parameters from the caller of `transform` to
inside
the TransformSuppliers. DSL implementations would not change much,
simply
calling `connectStateStore` by getting the list of names from the
provided
function.
Guozhang
On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax <
matth...@confluent.io
wrote:
Just a meta comment: do we really need to deprecate existing
`transform()` etc methods?
The last argument is a vararg, and thus, just keeping the existing
API
for this part seems to work too, allowing to implement both
patterns?
Also, instead of adding a default method, we could also add a new
interface `StoreBuilderSupplier` with method `List<StoreBuilder>
stateStores()` -- users could implement `TransformerSupplier` and
`StoreBuilderSupplier` at once; and for this case, we require that
users
don't provide store name in `transform()`.
Similar, we could add an interface `StoreNameSupplier` with method
`List<String> stateStores()`. This allows to "auto-wire" a
transformer
to existing stores (to avoid the issue to add the same store
multiple
times).
Hence, for shared stores, there would be one "main" transformer
that
implements `StoreBuilderSupplier` and that must be added first to
the
topology. The other transformers would implement
`StoreNameSupplier`
and
just connect to those stores.
Another possibility to avoid the issue of adding the same stores
multiple times would be, that the DSL always calls
`addStateStore()`
but
catches a potential "store exists already" exception and falls
back
to
`connectProcessorAndStateStore()` for this case. Thus, we would
not
need
the `StoreNameSupplier` interface and the order in which
transformers
are added would not matter either. The only disadvantage I see,
might
be
potential bugs about sharing state if two different stores are
named
the
same by mistake (this would not be detected).
Just some ideas I wanted to share. What do you think?
-Matthias
On 12/11/18 3:46 AM, Paul Whalen wrote:
Ah yes of course, this was an oversight, I completely ignored the
multiple
processors sharing the same state store when writing up the KIP.
Which
is
funny, because I've actually done this (different processors
sharing
state
stores) a fair amount myself, and I've settled on a pattern
where I
group
the Processors in an enclosing class, and that enclosing class
handles
as
much as possible. Here's a gist showing the rough structure,
just
for
context:
https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
. Note how it adds the stores to the topology, as well as
providing a
public method with the store names.
I don't think my proposal completely conflicts with the multiple
processors
sharing state stores use case, since you can create a supplier
that
provides the store name you want, somewhat independently of your
actual
Processor logic. The issue I do see though, is that
topology.addStateStore() can only be called once for a given
store.
So
for
your example, if the there was a single TransformerSupplier that
was
passed
into both transform() calls, "store1" would be added (under the
hood)
to
the topology twice, which is no good.
Perhaps this suggests that one of my alternatives on the KIP
might
be
desirable: either not having the suppliers return StoreBuilders
(just
store
names), or not deprecating the old methods that take "String...
stateStoreNames". I'll have to think about it a bit.
Paul
On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang <
wangg...@gmail.com>
wrote:
Hello Paul,
Thanks for the great writeup (very detailed and crystal
motivation
sections!).
This is quite an interesting idea and I do like the API
cleanness
you
proposed. The original motivation of letting StreamsTopology to
add
state
stores though, is to allow different processors to share the
state
store.
For example:
builder.addStore("store1");
// a path of stream transformations that leads to KStream
stream1.
stream1.transform(..., "store1");
// another path that generates a KStream stream2.
stream2.transform(..., "store1");
Behind the scene, Streams will make sure stream1 / stream2
transformations
will always be grouped together as a single group of tasks, each
of
which
will be executed by a single thread and hence there's no
concurrency
issues
on accessing the store from different operators within the same
task.
I'm
not sure how common this use case is, but I'd like to hear if
you
have
any
thoughts maintaining this since the current proposal seems
exclude
this
possibility.
Guozhang
On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen <pgwha...@gmail.com>
wrote:
Here's KIP-401 for discussion, a minor Kafka Streams API change
that
I
think could greatly increase the usability of the low-level
processor
API.
I have some code written but will wait to see if there is buy
in
before
going all out and creating a pull request. It seems like most
of
the
work
would be in updating documentation and tests.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
Thanks!
Paul
--
-- Guozhang
--
-- Guozhang