With our current state store implementation, when we are doing the two-way
join operators like Stream-Stream or Table-Table, where each stream's
record may trigger the join, it is hard to retrieve the record context for
the matched record from the other stream's materialized state since we do
not keep them in the materialized state. So if we are going to support this
duel RecordContext in the Join operators that would be a much bigger change
in our implementation.

So for this KIP we should only consider either not having any RecordContext
at all or keep one RecordContext and document it is from the "triggering
record". I'm inclined to the second option admitting that "triggering
record" was an impl detail that was never introduced as a concept to users
and that may keep them a bit confused, my motivations were 1) if we are
inheriting the record context of the join result from the triggering record
anyways, and 2) leaving this single context is still useful for some user
cases.


Guozhang


On Tue, Nov 7, 2017 at 4:50 AM, Jeyhun Karimov <je.kari...@gmail.com> wrote:

> Hi Jan,
>
> Thanks for your comments. I agree that the implementation should not
> introduce new "bugs" or "known issues" in future.
> I think  we can either i) just drop RecordContext argument for join methods
> or ii) introduce binary aggregation logic for RecordContexts for
> two-input-stream-operators.
> Any other comments/suggestions of course are welcome.
>
>
> Cheers,
> Jeyhun
>
> On Tue, Nov 7, 2017 at 1:04 PM Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
> >
> > On 07.11.2017 12:59, Jan Filipiak wrote:
> > >
> > > On 07.11.2017 11:20, Matthias J. Sax wrote:
> > >> About implementation if we do the KIP as proposed: I agree with
> Guozhang
> > >> that we would need to use the currently processed record's metadata in
> > >> the context. This does leak some implementation details, but I
> > >> personally don't see a big issue here (at the same time, I am also
> fine
> > >> to remove the RecordContext for joins if people think it's an issue).
> > >>
> > >> About the API: while I agree with Jan, that having two APIs for input
> > >> streams/tables and "derived" streams/table (ie, result of
> > >> KStream-KStream join or an aggregation) would be a way to avoid some
> > >> semantic issue, I am not sure if it is worth the effort. IMHO, it
> would
> > >> make the API more convoluted and if users access the RecordContext on
> a
> > >> derived stream/table it's a "user error"
> > > Why make it easy for the users to make mistakes in order to save some
> > > effort
> > > (That I dont quite think is that big actually)
> > >> -- it's not really wrong as
> > >> users still get the current records context, but of course, we would
> > >> leak implementation details (as above, I don't see a bit issue here
> > >> though).
> > >>
> > >> At the same time, I disagree with Jan that "its not to hard to have a
> > >> user keeping track" -- if we apply this argument, we could even argue
> > >> that it's not to hard to use a Transformer instead of a map/filter
> etc.
> > >> We want to add "syntactic sugar" with this change and thus should
> really
> > >> provide value and not introduce a half-baked solution for which users
> > >> still need to do manual customizing.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 11/7/17 5:54 AM, Jan Filipiak wrote:
> > >>> I Aggree completely.
> > >>>
> > >>> Exposing this information in a place where it has no _natural_
> > >>> belonging
> > >>> might really be a bad blocker in the long run.
> > >>>
> > >>> Concerning your first point. I would argue its not to hard to have a
> > >>> user keep track of these. If we still don't want the user
> > >>> to keep track of these I would argue that all > projection only <
> > >>> transformations on a Source-backed KTable/KStream
> > >>> could also return a Ktable/KStream instance of the type we return
> from
> > >>> the topology builder.
> > >>> Only after any operation that exceeds projection or filter one would
> > >>> return a KTable not granting access to this any longer.
> > >>>
> > >>> Even then its difficult already: I never ran a topology with caching
> > >>> but
> > >>> I am not even 100% sure what the record Context means behind
> > >>> a materialized KTable with Caching? Topic and Partition are probably
> > >>> with some reasoning but offset is probably only the offset causing
> the
> > >>> flush?
> > >>> So one might aswell think to drop offsets from this RecordContext.
> > >>>
> > >>> Best Jan
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> On 07.11.2017 03:18, Guozhang Wang wrote:
> > >>>> Regarding the API design (the proposed set of overloads v.s. one
> > >>>> overload
> > >>>> on #map to enrich the record), I think what we have represents a
> good
> > >>>> trade-off between API succinctness and user convenience: on one
> > >>>> hand we
> > >>>> definitely want to keep as fewer overloaded functions as possible.
> > >>>> But on
> > >>>> the other hand if we only do that in, say, the #map() function then
> > >>>> this
> > >>>> enrichment could be an overkill: think of a topology that has 7
> > >>>> operators
> > >>>> in a chain, where users want to access the record context on
> > >>>> operator #2
> > >>>> and #6 only, with the "enrichment" manner they need to do the
> > >>>> enrichment on
> > >>>> operator #2 and keep it that way until #6. In addition, the
> > >>>> RecordContext
> > >>>> fields (topic, offset, etc) are really orthogonal to the key-value
> > >>>> payloads
> > >>>> themselves, so I think separating them into this object is a
> > >>>> cleaner way.
> > >>>>
> > >>>> Regarding the RecordContext inheritance, this is actually a good
> point
> > >>>> that
> > >>>> have not been discussed thoroughly before. Here are my my two
> > >>>> cents: one
> > >>>> natural way would be to inherit the record context from the
> > >>>> "triggering"
> > >>>> record, for example in a join operator, if the record from stream A
> > >>>> triggers the join then the record context is inherited from with
> that
> > >>>> record. This is also aligned with the lower-level PAPI interface. A
> > >>>> counter
> > >>>> argument, though, would be that this is sort of leaking the internal
> > >>>> implementations of the DSL, so that moving forward if we did some
> > >>>> refactoring to our join implementations so that the triggering
> > >>>> record can
> > >>>> change, the RecordContext would also be different. I do not know
> > >>>> how much
> > >>>> it would really affect end users, but would like to hear your
> > >>>> opinions.
> > >>> Agreed to 100% exposing this information
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>>
> > >>>> On Mon, Nov 6, 2017 at 1:00 PM, Jeyhun Karimov <
> je.kari...@gmail.com>
> > >>>> wrote:
> > >>>>
> > >>>>> Hi Jan,
> > >>>>>
> > >>>>> Sorry for late reply.
> > >>>>>
> > >>>>>
> > >>>>> The API Design doesn't look appealing
> > >>>>>
> > >>>>>
> > >>>>> In terms of API design we tried to preserve the java functional
> > >>>>> interfaces.
> > >>>>> We applied the same set of rich methods for KTable to make it
> > >>>>> compatible
> > >>>>> with the rest of overloaded APIs.
> > >>>>>
> > >>>>> It should be 100% sufficient to offer a KTable + KStream that is
> > >>>>> directly
> > >>>>>> feed from a topic with 1 additional overload for the #map()
> > >>>>>> methods to
> > >>>>>> cover every usecase while keeping the API in a way better state.
> > >>>>> - IMO this seems a workaround, rather than a direct solution.
> > >>>>>
> > >>>>> Perhaps we should continue this discussion in DISCUSS thread.
> > >>>>>
> > >>>>>
> > >>>>> Cheers,
> > >>>>> Jeyhun
> > >>>>>
> > >>>>>
> > >>>>> On Mon, Nov 6, 2017 at 9:14 PM Jan Filipiak
> > >>>>> <jan.filip...@trivago.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi.
> > >>>>>>
> > >>>>>> I do understand that it might come in Handy.
> > >>>>>>    From my POV in any relational algebra this is only a
> projection.
> > >>>>>> Currently we hide these "fields" that come with the input record.
> > >>>>>> It should be 100% sufficient to offer a KTable + KStream that is
> > >>>>>> directly
> > >>>>>> feed from a topic with 1 additional overload for the #map()
> > >>>>>> methods to
> > >>>>>> cover every usecase while keeping the API in a way better state.
> > >>>>>>
> > >>>>>> best Jan
> > >>>>>>
> > >>>>>> On 06.11.2017 17:52, Matthias J. Sax wrote:
> > >>>>>>> Jan,
> > >>>>>>>
> > >>>>>>> I understand what you are saying. However, having a
> > >>>>>>> RecordContext is
> > >>>>>>> super useful for operations that are applied to input topic. Many
> > >>>>>>> users
> > >>>>>>> requested this feature -- it's much more convenient that falling
> > >>>>>>> back
> > >>>>> to
> > >>>>>>> transform() to implement a a filter() for example that want to
> > >>>>>>> access
> > >>>>>>> some meta data.
> > >>>>>>>
> > >>>>>>> Because we cannot distinguish different "origins" of a
> > >>>>>>> KStream/KTable,
> > >>>>> I
> > >>>>>>> am not sure if there would be a better way to do this. The only
> > >>>>>>> "workaround" I see, is to have two KStream/KTable interfaces
> > >>>>>>> each and
> > >>>>> we
> > >>>>>>> would use the first one for KStream/KTable with "proper"
> > >>>>>>> RecordContext.
> > >>>>>>> But this does not seem to be a good solution either.
> > >>>>>>>
> > >>>>>>> Note, a KTable can also be read directly from a topic, I agree
> that
> > >>>>>>> using RecordContext on a KTable that is the result of an
> > >>>>>>> aggregation is
> > >>>>>>> questionable. But I don't see a reason to down vote the KIP for
> > >>>>>>> this
> > >>>>>> reason.
> > >>>>>>> WDYT about this?
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> -Matthias
> > >>>>>>>
> > >>>>>>> On 11/1/17 10:19 PM, Jan Filipiak wrote:
> > >>>>>>>> -1 non binding
> > >>>>>>>>
> > >>>>>>>> I don't get the motivation.
> > >>>>>>>> In 80% of my DSL processors there is no such thing as a
> reasonable
> > >>>>>>>> RecordContext.
> > >>>>>>>> After a join  the record I am processing belongs to at least 2
> > >>>>>>>> topics.
> > >>>>>>>> After a Group by the record I am processing was created from
> > >>>>>>>> multiple
> > >>>>>>>> offsets.
> > >>>>>>>>
> > >>>>>>>> The API Design doesn't look appealing
> > >>>>>>>>
> > >>>>>>>> Best Jan
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On 01.11.2017 22:02, Jeyhun Karimov wrote:
> > >>>>>>>>> Dear community,
> > >>>>>>>>>
> > >>>>>>>>> It seems the discussion for KIP-159 [1] converged finally. I
> > >>>>>>>>> would
> > >>>>>>>>> like to
> > >>>>>>>>> initiate voting for the particular KIP.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> [1]
> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>> 159%3A+Introducing+Rich+functions+to+Streams
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Jeyhun
> > >>>>>>>>>
> > >>>>
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to