With reference to Guozhang's comment:

Then are we going to still only allow the valueTransofmer.init() /
process() to be able to access N stores, with N stores specified with the
stateStoreNames, but not the one specified in materialized.name()?
Personally I think it should be the case as the materialized store should
be managed by the Streams library itself, but we should probably help users
to understand if they have some stores used for the same purpose (storing
the value that are going to be sent to the downstream changelog stream of
KTable), they should save that store and not creating by themselves as it
will be auto created by the Streams library.

In the PR I've pulled out the inline `ProcessorContext` implementation that
was in KStreamTransformValues, that throws an exception if any forward()
method is called.  Sound's like you're suggesting the KTableTransformValues
should also not allow calls to getStateStore() where the name is the
materialzied.name(). If that's the case, then sure that shouldn't be an
issue.

On 11 May 2018 at 09:42, Andy Coates <a...@confluent.io> wrote:

> Sorry for my lack of response - I've been out of action with a bad back
> for a few days!
>
> I originally had the `Materialized` overloads added to the API. I'll
> update the KIP / PR with these again. In terms of semantics, as Matthias
> suggests, these should be consistent with filter() and mapValues(), etc.
>
> On 8 May 2018 at 17:59, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> To follow on Matthias and Damian's comments here:
>>
>> If we are going to add the overload functions as
>>
>> ```
>> <VR> KTable<K, VR> transformValues(final ValueTransformerSupplier<? super
>> V,
>> ? extends VR> valueTransformerSupplier,
>>                                    final String... stateStoreNames,
>>                                    final Materialized<K,
>> VR, KeyValueStore<Bytes, byte[]> materialized);
>>
>> <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplie
>> r<?
>> super K, ? super V, ? extends VR> valueTransformerSupplier,
>>                                    final String... stateStoreNames,
>>                                  final Materialized<K,
>> VR, KeyValueStore<Bytes, byte[]> materialized);
>> ```
>>
>> Then are we going to still only allow the valueTransofmer.init() /
>> process() to be able to access N stores, with N stores specified with the
>> stateStoreNames, but not the one specified in materialized.name()?
>> Personally I think it should be the case as the materialized store should
>> be managed by the Streams library itself, but we should probably help
>> users
>> to understand if they have some stores used for the same purpose (storing
>> the value that are going to be sent to the downstream changelog stream of
>> KTable), they should save that store and not creating by themselves as it
>> will be auto created by the Streams library.
>>
>>
>> Guozhang
>>
>>
>>
>>
>> On Tue, May 8, 2018 at 7:45 AM, Damian Guy <damian....@gmail.com> wrote:
>>
>> > Initially i thought materializing a store would be overkill, but from a
>> > consistency point of view it makes sense to add an overload that takes a
>> > `Materialized` and only create the store if that is supplied.
>> >
>> > On Sun, 6 May 2018 at 17:52 Matthias J. Sax <matth...@confluent.io>
>> wrote:
>> >
>> > > Andy,
>> > >
>> > > thanks for the KIP. I don't have any further comments.
>> > >
>> > > My 2cents about Guozhang's questions: as I like consistent behavior, I
>> > > think transfromValues() should behave the same way as filter() and
>> > > mapValues().
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 5/2/18 2:24 PM, Guozhang Wang wrote:
>> > > > Hello Andy,
>> > > >
>> > > > Thanks for the KIP. The motivation and the general proposal looks
>> good
>> > to
>> > > > me. I think in KTable it is indeed valuable to add the functions
>> that
>> > > does
>> > > > not change key, such as mapValues, transformValues, and filter.
>> > > >
>> > > > There are a few meta comments I have about the semantics of the
>> newly
>> > > added
>> > > > functions:
>> > > >
>> > > > 1) For the resulted KTable, how should its "queryableStoreName()" be
>> > > > returning?
>> > > >
>> > > > 2) More specifically, how do we decide if the resulted KTable is to
>> be
>> > > > materialized or not? E.g. if there is no store names provided then
>> it
>> > is
>> > > > likely that the resulted KTable is not materialized, or at least not
>> > > > logically materialized and not be queryable. What if there is at
>> least
>> > > one
>> > > > state store provided? Will any of them be provided as the
>> materialized
>> > > > store, or should we still add a Materialized parameter for this
>> > purpose?
>> > > >
>> > > > 3) For its internal implementations, how should the key/value serde,
>> > > > sendOldValues flag etc be inherited from its parent processor node?
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Wed, May 2, 2018 at 12:43 PM, Andy Coates <a...@confluent.io>
>> > wrote:
>> > > >
>> > > >> Hi everyone,
>> > > >>
>> > > >> I would like to start a discussion for KIP 292. I would appreciate
>> it
>> > if
>> > > >> you could review and provide feedback.
>> > > >>
>> > > >> KIP: KIP-292: Add transformValues() method to KTable
>> > > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > >> 292%3A+Add+transformValues%28%29+method+to+KTable>
>> > > >> Jira: KAFKA-6849 <https://issues.apache.org/jira/browse/KAFKA-6849
>> >
>> > > >>
>> > > >>    PR: #4959 <https://github.com/apache/kafka/pull/4959>
>> > > >>
>> > > >>
>> > > >>
>> > > >> Thanks,
>> > > >>
>> > > >> Andy
>> > > >>
>> > > >
>> > > >
>> > > >
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to